import (
"fmt"
- "io/ioutil"
"os"
"os/user"
"strings"
- // "sync"
- // "time"
+ "sync"
"github.com/jinzhu/gorm"
log "github.com/sirupsen/logrus"
"github.com/vapor/event"
"github.com/vapor/netsync/chainmgr"
"github.com/vapor/netsync/consensusmgr"
- "github.com/vapor/p2p"
- // msgs "github.com/vapor/netsync/messages"
"github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p"
"github.com/vapor/p2p/discover/dht"
"github.com/vapor/p2p/discover/mdns"
"github.com/vapor/p2p/signlib"
)
type monitor struct {
- // *sync.RWMutex
- cfg *config.Config
- db *gorm.DB
- nodeCfg *vaporCfg.Config
- sw *p2p.Switch
- discvCh chan *dht.Node
- privKey chainkd.XPrv
- chain *mock.Chain
- txPool *mock.Mempool
+ *sync.RWMutex
+ cfg *config.Config
+ db *gorm.DB
+ nodeCfg *vaporCfg.Config
+ sw *p2p.Switch
+ privKey chainkd.XPrv
+ chain *mock.Chain
+ txPool *mock.Mempool
+ // discvMap maps a node's public key to the node itself
+ discvMap map[string]*dht.Node
dialCh chan struct{}
checkStatusCh chan struct{}
}
-// TODO: set myself as SPV?
func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
- //TODO: for test
- cfg.CheckFreqSeconds = 15
-
dbPath, err := makePath()
if err != nil {
log.Fatal(err)
}
nodeCfg.DBPath = dbPath
nodeCfg.ChainID = "mainnet"
- discvCh := make(chan *dht.Node)
privKey, err := signlib.NewPrivKey()
if err != nil {
log.Fatal(err)
}
return &monitor{
- // RWMutex: &sync.RWMutex{},
+ RWMutex: &sync.RWMutex{},
cfg: cfg,
db: db,
nodeCfg: nodeCfg,
- discvCh: discvCh,
privKey: privKey.(chainkd.XPrv),
chain: chain,
txPool: txPool,
+ discvMap: make(map[string]*dht.Node),
dialCh: make(chan struct{}, 1),
checkStatusCh: make(chan struct{}, 1),
}
return "", err
}
- tmpDir, err := ioutil.TempDir(dataPath, "")
- if err != nil {
- return "", err
- }
-
- return tmpDir, nil
+ return dataPath, nil
}
func (m *monitor) Run() {
- defer os.RemoveAll(m.nodeCfg.DBPath)
-
var seeds []string
for _, node := range m.cfg.Nodes {
seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
m.dialCh <- struct{}{}
go m.discoveryRoutine()
- go m.collectDiscoveredNodes()
go m.connectNodesRoutine()
go m.checkStatusRoutine()
}
}
for label, reactor := range m.sw.GetReactors() {
- log.Debug("start reactor: (%s:%v)", label, reactor)
+ log.Debugf("start reactor: (%s:%v)", label, reactor)
if _, err := reactor.Start(); err != nil {
return nil
}
}
}
log.Info("bestHeight: ", bestHeight)
- m.savePeerInfos(peers.GetPeerInfos())
-
- // TODO:
- // msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: bestHeight + 1}}
- // for _, peer := range m.sw.GetPeers().List() {
- // peers.SendMsg(peer.ID(), msgs.BlockchainChannel, msg)
- // }
+ m.processPeerInfos(peers.GetPeerInfos())
for _, peer := range m.sw.GetPeers().List() {
p := peers.GetPeer(peer.ID())
peers.RemovePeer(p.ID())
}
log.Info("Disonnect all peers.")
+
+ m.Unlock()
m.dialCh <- struct{}{}
}
}