import (
"fmt"
- "io/ioutil"
"os"
"os/user"
"strings"
- "sync/atomic"
- "time"
+ "sync"
"github.com/jinzhu/gorm"
log "github.com/sirupsen/logrus"
"github.com/vapor/event"
"github.com/vapor/netsync/chainmgr"
"github.com/vapor/netsync/consensusmgr"
- "github.com/vapor/p2p"
- // msgs "github.com/vapor/netsync/messages"
"github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p"
"github.com/vapor/p2p/discover/dht"
"github.com/vapor/p2p/discover/mdns"
"github.com/vapor/p2p/signlib"
)
type monitor struct {
- cfg *config.Config
- db *gorm.DB
- nodeCfg *vaporCfg.Config
- sw *p2p.Switch
- discvCh chan *dht.Node
- privKey chainkd.XPrv
- chain *mock.Chain
- txPool *mock.Mempool
- connected uint32
+ *sync.RWMutex
+ cfg *config.Config
+ db *gorm.DB
+ nodeCfg *vaporCfg.Config
+ sw *p2p.Switch
+ privKey chainkd.XPrv
+ chain *mock.Chain
+ txPool *mock.Mempool
+ // discvMap maps a node's public key to the node itself
+ discvMap map[string]*dht.Node
+ dialCh chan struct{}
+ checkStatusCh chan struct{}
+ bestHeightSeen uint64
}
-// TODO: set myself as SPV?
func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
- //TODO: for test
- cfg.CheckFreqSeconds = 30
-
dbPath, err := makePath()
if err != nil {
- log.Fatal(err)
+ log.WithFields(log.Fields{"err": err}).Fatal("makePath")
}
nodeCfg := &vaporCfg.Config{
}
nodeCfg.DBPath = dbPath
nodeCfg.ChainID = "mainnet"
- discvCh := make(chan *dht.Node)
privKey, err := signlib.NewPrivKey()
if err != nil {
- log.Fatal(err)
+ log.WithFields(log.Fields{"err": err}).Fatal("NewPrivKey")
}
chain, txPool, err := mockChainAndPool()
if err != nil {
- log.Fatal(err)
+ log.WithFields(log.Fields{"err": err}).Fatal("mockChainAndPool")
}
return &monitor{
- cfg: cfg,
- db: db,
- nodeCfg: nodeCfg,
- discvCh: discvCh,
- privKey: privKey.(chainkd.XPrv),
- chain: chain,
- txPool: txPool,
+ RWMutex: &sync.RWMutex{},
+ cfg: cfg,
+ db: db,
+ nodeCfg: nodeCfg,
+ privKey: privKey.(chainkd.XPrv),
+ chain: chain,
+ txPool: txPool,
+ discvMap: make(map[string]*dht.Node),
+ dialCh: make(chan struct{}, 1),
+ checkStatusCh: make(chan struct{}, 1),
+ bestHeightSeen: uint64(0),
}
}
return "", err
}
- dataPath := usr.HomeDir + "/.precog"
+ dataPath := usr.HomeDir + "/.vapor/precog"
if err := os.MkdirAll(dataPath, os.ModePerm); err != nil {
return "", err
}
- tmpDir, err := ioutil.TempDir(dataPath, "")
- if err != nil {
- return "", err
- }
-
- return tmpDir, nil
+ return dataPath, nil
}
func (m *monitor) Run() {
- defer os.RemoveAll(m.nodeCfg.DBPath)
-
var seeds []string
for _, node := range m.cfg.Nodes {
- seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
+ seeds = append(seeds, fmt.Sprintf("%s:%d", node.IP, node.Port))
if err := m.upSertNode(&node); err != nil {
- log.Error(err)
+ log.WithFields(log.Fields{
+ "node": node,
+ "err": err,
+ }).Error("upSertNode")
}
}
m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
if err := m.makeSwitch(); err != nil {
- log.Fatal(err)
+ log.WithFields(log.Fields{"err": err}).Fatal("makeSwitch")
}
+ m.dialCh <- struct{}{}
go m.discoveryRoutine()
- go m.collectDiscoveredNodes()
go m.connectNodesRoutine()
go m.checkStatusRoutine()
}
// no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
- sw, err := p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
+ m.sw, err = p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
if err != nil {
return err
}
- m.sw = sw
return nil
}
_ = consensusmgr.NewManager(m.sw, m.chain, peers, dispatcher)
fastSyncDB := dbm.NewDB("fastsync", m.nodeCfg.DBBackend, m.nodeCfg.DBDir())
// add ProtocolReactor to handle msgs
- _, err := chainmgr.NewManager(m.nodeCfg, m.sw, m.chain, m.txPool, dispatcher, peers, fastSyncDB)
- if err != nil {
+ if _, err := chainmgr.NewManager(m.nodeCfg, m.sw, m.chain, m.txPool, dispatcher, peers, fastSyncDB); err != nil {
return err
}
for label, reactor := range m.sw.GetReactors() {
- log.Debug("start reactor: (%s:%v)", label, reactor)
+ log.WithFields(log.Fields{
+ "label": label,
+ "reactor": reactor,
+ }).Debug("start reactor")
if _, err := reactor.Start(); err != nil {
return nil
}
m.sw.GetSecurity().RegisterFilter(m.sw.GetNodeInfo())
m.sw.GetSecurity().RegisterFilter(m.sw.GetPeers())
-
return m.sw.GetSecurity().Start()
}
-// TODO:
-// 现象是,时间区间过小时, 会一直有 dial ,但是不能 send业务层 msg
-// 还不确定是不是死锁,时间调大一点比如15s 就可以正确运行
-// 想法,自己再另外加锁,或者找到锁住的真正原因
func (m *monitor) checkStatusRoutine() {
peers := peers.NewPeerSet(m.sw)
if err := m.prepareReactors(peers); err != nil {
- log.Fatal(err)
- }
-
- protocolReactor, ok := m.sw.GetReactors()["PROTOCOL"]
- if !ok {
- log.Fatal("protocolReactor not found")
+ log.WithFields(log.Fields{"err": err}).Fatal("prepareReactors")
}
- bestHeight := uint64(0)
- ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
- for range ticker.C {
- for !m.isConnected() {
- time.Sleep(1 * time.Second)
- }
- log.Info("connected peers: ", m.sw.GetPeers().List())
-
+ for range m.checkStatusCh {
for _, peer := range m.sw.GetPeers().List() {
peer.Start()
- protocolReactor.AddPeer(peer)
+ peers.AddPeer(peer)
}
+ log.WithFields(log.Fields{
+ "num": len(m.sw.GetPeers().List()),
+ "peers": m.sw.GetPeers().List(),
+ }).Info("connected peers")
for _, peer := range m.sw.GetPeers().List() {
p := peers.GetPeer(peer.ID())
continue
}
- if err := p.SendStatus(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
+ if err := p.(m.chain.BestBlockHeader(), m.chain.LastIrreversibleHeader()); err != nil {
+ log.WithFields(log.Fields{
+ "peer": p,
+ "err": err,
+ }).Error("SendStatus")
peers.RemovePeer(p.ID())
}
}
for _, peerInfo := range peers.GetPeerInfos() {
- if peerInfo.Height > bestHeight {
- bestHeight = peerInfo.Height
+ if peerInfo.Height > m.bestHeightSeen {
+ m.bestHeightSeen = peerInfo.Height
}
-
- m.savePeerInfo(peerInfo)
}
- log.Info("bestHeight: ", bestHeight)
-
- // TODO:
- // msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: bestHeight + 1}}
- // for _, peer := range m.sw.GetPeers().List() {
- // peers.SendMsg(peer.ID(), msgs.BlockchainChannel, msg)
- // }
+ log.Info("bestHeight: ", m.bestHeightSeen)
+ m.processPeerInfos(peers.GetPeerInfos())
for _, peer := range m.sw.GetPeers().List() {
p := peers.GetPeer(peer.ID())
peers.RemovePeer(p.ID())
}
-
- m.setDisonnected()
log.Info("Disonnect all peers.")
- }
-}
-
-func (m *monitor) isConnected() bool {
- atomic.LoadUint32(&m.connected)
- return m.connected == uint32(1)
-}
-func (m *monitor) setConnected() {
- atomic.StoreUint32(&m.connected, 1)
-}
-
-func (m *monitor) setDisonnected() {
- atomic.StoreUint32(&m.connected, 0)
+ m.Unlock()
+ m.dialCh <- struct{}{}
+ }
}