import (
"fmt"
- "io/ioutil"
"os"
"os/user"
"strings"
"sync"
- "time"
"github.com/jinzhu/gorm"
log "github.com/sirupsen/logrus"
"github.com/vapor/event"
"github.com/vapor/netsync/chainmgr"
"github.com/vapor/netsync/consensusmgr"
- "github.com/vapor/p2p"
- // msgs "github.com/vapor/netsync/messages"
"github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p"
"github.com/vapor/p2p/discover/dht"
"github.com/vapor/p2p/discover/mdns"
"github.com/vapor/p2p/signlib"
type monitor struct {
*sync.RWMutex
- cfg *config.Config
- db *gorm.DB
- nodeCfg *vaporCfg.Config
- sw *p2p.Switch
- discvCh chan *dht.Node
- privKey chainkd.XPrv
- chain *mock.Chain
- txPool *mock.Mempool
- connected uint32
+ cfg *config.Config
+ db *gorm.DB
+ nodeCfg *vaporCfg.Config
+ sw *p2p.Switch
+ privKey chainkd.XPrv
+ chain *mock.Chain
+ txPool *mock.Mempool
+ // discvMap maps a node's public key to the node itself
+ discvMap map[string]*dht.Node
+ dialCh chan struct{}
+ checkStatusCh chan struct{}
}
// TODO: set myself as SPV?
func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
//TODO: for test
- cfg.CheckFreqSeconds = 120
+ cfg.CheckFreqSeconds = 15
dbPath, err := makePath()
if err != nil {
}
nodeCfg.DBPath = dbPath
nodeCfg.ChainID = "mainnet"
- discvCh := make(chan *dht.Node)
privKey, err := signlib.NewPrivKey()
if err != nil {
log.Fatal(err)
}
return &monitor{
- RWMutex: &sync.RWMutex{},
- cfg: cfg,
- db: db,
- nodeCfg: nodeCfg,
- discvCh: discvCh,
- privKey: privKey.(chainkd.XPrv),
- chain: chain,
- txPool: txPool,
+ RWMutex: &sync.RWMutex{},
+ cfg: cfg,
+ db: db,
+ nodeCfg: nodeCfg,
+ privKey: privKey.(chainkd.XPrv),
+ chain: chain,
+ txPool: txPool,
+ discvMap: make(map[string]*dht.Node),
+ dialCh: make(chan struct{}, 1),
+ checkStatusCh: make(chan struct{}, 1),
}
}
return "", err
}
- tmpDir, err := ioutil.TempDir(dataPath, "")
- if err != nil {
- return "", err
- }
-
- return tmpDir, nil
+ return dataPath, nil
}
func (m *monitor) Run() {
- defer os.RemoveAll(m.nodeCfg.DBPath)
-
var seeds []string
for _, node := range m.cfg.Nodes {
seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
log.Fatal(err)
}
+ m.dialCh <- struct{}{}
go m.discoveryRoutine()
- go m.collectDiscoveredNodes()
go m.connectNodesRoutine()
go m.checkStatusRoutine()
}
}
for label, reactor := range m.sw.GetReactors() {
- log.Debug("start reactor: (%s:%v)", label, reactor)
+ log.Debugf("start reactor: (%s:%v)", label, reactor)
if _, err := reactor.Start(); err != nil {
return nil
}
m.sw.GetSecurity().RegisterFilter(m.sw.GetNodeInfo())
m.sw.GetSecurity().RegisterFilter(m.sw.GetPeers())
-
return m.sw.GetSecurity().Start()
}
log.Fatal(err)
}
- protocolReactor, ok := m.sw.GetReactors()["PROTOCOL"]
- if !ok {
- log.Fatal("protocolReactor not found")
- }
-
bestHeight := uint64(0)
- ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
- for range ticker.C {
- m.Lock()
- log.Info("connected peers: ", m.sw.GetPeers().List())
-
+ for range m.checkStatusCh {
for _, peer := range m.sw.GetPeers().List() {
peer.Start()
- protocolReactor.AddPeer(peer)
+ peers.AddPeer(peer)
}
+ log.Infof("%d connected peers: %v", len(m.sw.GetPeers().List()), m.sw.GetPeers().List())
for _, peer := range m.sw.GetPeers().List() {
p := peers.GetPeer(peer.ID())
}
if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
+ log.Error(err)
peers.RemovePeer(p.ID())
}
}
if peerInfo.Height > bestHeight {
bestHeight = peerInfo.Height
}
-
- m.savePeerInfo(peerInfo)
}
log.Info("bestHeight: ", bestHeight)
-
- // TODO:
- // msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: bestHeight + 1}}
- // for _, peer := range m.sw.GetPeers().List() {
- // peers.SendMsg(peer.ID(), msgs.BlockchainChannel, msg)
- // }
+ m.processPeerInfos(peers.GetPeerInfos())
for _, peer := range m.sw.GetPeers().List() {
p := peers.GetPeer(peer.ID())
}
log.Info("Disonnect all peers.")
m.Unlock()
+ m.dialCh <- struct{}{}
}
}