X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=toolbar%2Fprecog%2Fmonitor%2Fmonitor.go;h=ccc24c21c2e531f16426971f4677c64e26b7f21d;hb=66c1b5ac5b0c265df41aaf1cfb7dee427c24e040;hp=d61b443dd5ecff9690941e897ed68a629fa6b2f3;hpb=cc1cd4fe3c9f76383557d54027abc29d95cd2173;p=bytom%2Fvapor.git diff --git a/toolbar/precog/monitor/monitor.go b/toolbar/precog/monitor/monitor.go index d61b443d..ccc24c21 100644 --- a/toolbar/precog/monitor/monitor.go +++ b/toolbar/precog/monitor/monitor.go @@ -1,56 +1,50 @@ package monitor import ( - // "encoding/binary" - // "encoding/hex" "fmt" - "io/ioutil" - "net" "os" - // "os/user" + "os/user" "strings" - "time" + "sync" "github.com/jinzhu/gorm" log "github.com/sirupsen/logrus" - // dbm "github.com/vapor/database/leveldb" vaporCfg "github.com/vapor/config" "github.com/vapor/crypto/ed25519/chainkd" dbm "github.com/vapor/database/leveldb" + "github.com/vapor/errors" "github.com/vapor/event" - "github.com/vapor/p2p" - // conn "github.com/vapor/p2p/connection" "github.com/vapor/netsync/chainmgr" "github.com/vapor/netsync/consensusmgr" "github.com/vapor/netsync/peers" + "github.com/vapor/p2p" "github.com/vapor/p2p/discover/dht" "github.com/vapor/p2p/discover/mdns" "github.com/vapor/p2p/signlib" "github.com/vapor/test/mock" "github.com/vapor/toolbar/precog/config" - "github.com/vapor/toolbar/precog/database/orm" ) type monitor struct { + *sync.RWMutex cfg *config.Config db *gorm.DB nodeCfg *vaporCfg.Config sw *p2p.Switch - discvCh chan *dht.Node privKey chainkd.XPrv chain *mock.Chain txPool *mock.Mempool + // discvMap maps a node's public key to the node itself + discvMap map[string]*dht.Node + bestHeightSeen uint64 + peers *peers.PeerSet } -// TODO: set myself as SPV? func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor { - //TODO: for test - cfg.CheckFreqSeconds = 1 - - tmpDir, err := ioutil.TempDir(".", "vpPrecog") + dbPath, err := makePath() if err != nil { - log.Fatalf("failed to create temporary data folder: %v", err) + log.WithFields(log.Fields{"err": err}).Fatal("makePath") } nodeCfg := &vaporCfg.Config{ @@ -58,49 +52,60 @@ func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor { P2P: vaporCfg.DefaultP2PConfig(), Federation: vaporCfg.DefaultFederationConfig(), } - nodeCfg.DBPath = tmpDir + nodeCfg.DBPath = dbPath nodeCfg.ChainID = "mainnet" - discvCh := make(chan *dht.Node) privKey, err := signlib.NewPrivKey() if err != nil { - log.Fatal(err) + log.WithFields(log.Fields{"err": err}).Fatal("NewPrivKey") } chain, txPool, err := mockChainAndPool() if err != nil { - log.Fatal(err) + log.WithFields(log.Fields{"err": err}).Fatal("mockChainAndPool") } return &monitor{ - cfg: cfg, - db: db, - nodeCfg: nodeCfg, - discvCh: discvCh, - privKey: privKey.(chainkd.XPrv), - chain: chain, - txPool: txPool, + RWMutex: &sync.RWMutex{}, + cfg: cfg, + db: db, + nodeCfg: nodeCfg, + privKey: privKey.(chainkd.XPrv), + chain: chain, + txPool: txPool, + discvMap: make(map[string]*dht.Node), + bestHeightSeen: uint64(0), } } -func (m *monitor) Run() { - defer os.RemoveAll(m.nodeCfg.DBPath) +func makePath() (string, error) { + usr, err := user.Current() + if err != nil { + return "", err + } + + dataPath := usr.HomeDir + "/.vapor/precog" + if err := os.MkdirAll(dataPath, os.ModePerm); err != nil { + return "", err + } + + return dataPath, nil +} +func (m *monitor) Run() { var seeds []string for _, node := range m.cfg.Nodes { - seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port)) + seeds = append(seeds, fmt.Sprintf("%s:%d", node.IP, node.Port)) if err := m.upSertNode(&node); err != nil { - log.Error(err) + log.WithFields(log.Fields{"node": node, "err": err}).Error("upSertNode") } } m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",") if err := m.makeSwitch(); err != nil { - log.Fatal(err) + log.WithFields(log.Fields{"err": err}).Fatal("makeSwitch") } go m.discoveryRoutine() - go m.collectDiscoveredNodes() - go m.connectNodesRoutine() - go m.checkStatusRoutine() + go m.connectionRoutine() } func (m *monitor) makeSwitch() error { @@ -112,48 +117,16 @@ func (m *monitor) makeSwitch() error { // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port)) - sw, err := p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID) + m.sw, err = p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID) if err != nil { return err } - m.sw = sw - return nil -} - -func (m *monitor) connectNodesRoutine() { - // TODO: change name? - ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second) - for ; true; <-ticker.C { - if err := m.dialNodes(); err != nil { - log.Error(err) - } - } -} - -func (m *monitor) dialNodes() error { - var nodes []*orm.Node - if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil { - return err + m.peers = peers.NewPeerSet(m.sw) + if err := m.prepareReactors(m.peers); err != nil { + return errors.Wrap(err, "prepareReactors") } - addresses := make([]*p2p.NetAddress, 0) - for i := 0; i < len(nodes); i++ { - ips, err := net.LookupIP(nodes[i].Host) - if err != nil { - log.Error(err) - continue - } - if len(ips) == 0 { - log.Errorf("fail to look up ip for %s", nodes[i].Host) - continue - } - - address := p2p.NewNetAddressIPPort(ips[0], nodes[i].Port) - addresses = append(addresses, address) - } - - m.sw.DialPeers(addresses) return nil } @@ -163,60 +136,18 @@ func (m *monitor) prepareReactors(peers *peers.PeerSet) error { _ = consensusmgr.NewManager(m.sw, m.chain, peers, dispatcher) fastSyncDB := dbm.NewDB("fastsync", m.nodeCfg.DBBackend, m.nodeCfg.DBDir()) // add ProtocolReactor to handle msgs - _, err := chainmgr.NewManager(m.nodeCfg, m.sw, m.chain, m.txPool, dispatcher, peers, fastSyncDB) - if err != nil { + if _, err := chainmgr.NewManager(m.nodeCfg, m.sw, m.chain, m.txPool, dispatcher, peers, fastSyncDB); err != nil { return err } - // TODO: clean up?? only start reactors?? - m.sw.Start() - - // for label, reactor := range m.sw.GetReactors() { - // log.Debug("start reactor: (%s:%v)", label, reactor) - // if _, err := reactor.Start(); err != nil { - // return - // } - // } - - // m.sw.GetSecurity().RegisterFilter(m.sw.GetNodeInfo()) - // m.sw.GetSecurity().RegisterFilter(m.sw.GetPeers()) - // if err := m.sw.GetSecurity().Start(); err != nil { - // return - // } - - return nil -} - -func (m *monitor) checkStatusRoutine() { - peers := peers.NewPeerSet(m.sw) - if err := m.prepareReactors(peers); err != nil { - log.Fatal(err) - } - - ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second) - for ; true; <-ticker.C { - for _, reactor := range m.sw.GetReactors() { - for _, peer := range m.sw.GetPeers().List() { - log.Debug("AddPeer %v for reactor %v", peer, reactor) - // TODO: if not in sw - reactor.AddPeer(peer) - } - } - - for _, peerInfo := range peers.GetPeerInfos() { - log.Info(peerInfo) + for label, reactor := range m.sw.GetReactors() { + log.WithFields(log.Fields{"label": label, "reactor": reactor}).Debug("start reactor") + if _, err := reactor.Start(); err != nil { + return nil } } -} -// TODO: -// implement logic first, and then refactor -// /home/gavin/work/go/src/github.com/vapor/ -// p2p/test_util.go -// p2p/switch_test.go - -// TODO: get lantency -// TODO: get best_height -// TODO: decide check_height("best best_height" - "confirmations") -// TODO: get blockhash by check_height, get latency -// TODO: update lantency, active_time and status + m.sw.GetSecurity().RegisterFilter(m.sw.GetNodeInfo()) + m.sw.GetSecurity().RegisterFilter(m.sw.GetPeers()) + return m.sw.GetSecurity().Start() +}