OSDN Git Service

clean
[bytom/vapor.git] / toolbar / precog / monitor / monitor.go
index 2794b88..8b99b7f 100644 (file)
@@ -2,8 +2,12 @@ package monitor
 
 import (
        // "encoding/binary"
-       "io/ioutil"
+       // "encoding/hex"
+       // "io/ioutil"
+       "fmt"
+       "net"
        "os"
+       // "strings"
        "time"
 
        "github.com/jinzhu/gorm"
@@ -11,6 +15,7 @@ import (
        // dbm "github.com/vapor/database/leveldb"
 
        vaporCfg "github.com/vapor/config"
+       "github.com/vapor/crypto/ed25519/chainkd"
        "github.com/vapor/p2p"
        // conn "github.com/vapor/p2p/connection"
        // "github.com/vapor/consensus"
@@ -22,102 +27,169 @@ import (
        "github.com/vapor/toolbar/precog/database/orm"
 )
 
-const vaporNetID = 10817814959495988245
+var (
+       nodesToDiscv = 150
+       discvFreqSec = 1
+)
 
 type monitor struct {
        cfg     *config.Config
        db      *gorm.DB
        nodeCfg *vaporCfg.Config
+       sw      *p2p.Switch
+       discvCh chan *dht.Node
+       privKey chainkd.XPrv
 }
 
 func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
-       dirPath, err := ioutil.TempDir(".", "")
-       if err != nil {
-               log.Fatal(err)
-       }
-
        nodeCfg := &vaporCfg.Config{
                BaseConfig: vaporCfg.DefaultBaseConfig(),
                P2P:        vaporCfg.DefaultP2PConfig(),
                Federation: vaporCfg.DefaultFederationConfig(),
        }
-       nodeCfg.DBPath = dirPath
+       nodeCfg.DBPath = "vapor_precog_data"
+       nodeCfg.ChainID = "mainnet"
+       discvCh := make(chan *dht.Node)
+       privKey, err := signlib.NewPrivKey()
+       if err != nil {
+               log.Fatal(err)
+       }
 
        return &monitor{
                cfg:     cfg,
                db:      db,
                nodeCfg: nodeCfg,
+               discvCh: discvCh,
+               privKey: privKey.(chainkd.XPrv),
        }
 }
 
 func (m *monitor) Run() {
        defer os.RemoveAll(m.nodeCfg.DBPath)
 
-       m.updateBootstrapNodes()
-       go m.discovery()
-       ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
-       for ; true; <-ticker.C {
-               // TODO: lock?
-               m.monitorRountine()
-       }
-}
-
-// create or update: https://github.com/jinzhu/gorm/issues/1307
-func (m *monitor) updateBootstrapNodes() {
        for _, node := range m.cfg.Nodes {
-               ormNode := &orm.Node{
-                       PublicKey: node.PublicKey.String(),
-                       Alias:     node.Alias,
-                       Host:      node.Host,
-                       Port:      node.Port,
-               }
-
-               if err := m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
-                       Assign(&orm.Node{
-                               Alias: node.Alias,
-                               Host:  node.Host,
-                               Port:  node.Port,
-                       }).FirstOrCreate(ormNode).Error; err != nil {
-                       log.Error(err)
-                       continue
-               }
+               m.upSertNode(&node)
        }
-}
 
-// TODO:
-// implement logic first, and then refactor
-// /home/gavin/work/go/src/github.com/vapor/
-// p2p/test_util.go
-// p2p/switch_test.go
-func (m *monitor) discovery() {
        sw, err := m.makeSwitch()
        if err != nil {
                log.Fatal(err)
        }
 
-       sw.Start()
+       m.sw = sw
+
+       go m.discovery()
+       go m.collectDiscv()
+       go m.dialRoutine()
 }
 
-func (m *monitor) makeSwitch() (*p2p.Switch, error) {
-       swPrivKey, err := signlib.NewPrivKey()
-       if err != nil {
-               return nil, err
+// create or update: https://github.com/jinzhu/gorm/issues/1307
+func (m *monitor) upSertNode(node *config.Node) error {
+       if node.XPub != nil {
+               node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
+       }
+
+       ormNode := &orm.Node{PublicKey: node.PublicKey}
+       if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+               return err
        }
 
+       if node.Alias != "" {
+               ormNode.Alias = node.Alias
+       }
+       if node.XPub != nil {
+               ormNode.Xpub = node.XPub.String()
+       }
+       ormNode.Host = node.Host
+       ormNode.Port = node.Port
+       return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
+               Assign(&orm.Node{
+                       Xpub:  ormNode.Xpub,
+                       Alias: ormNode.Alias,
+                       Host:  ormNode.Host,
+                       Port:  ormNode.Port,
+               }).FirstOrCreate(ormNode).Error
+}
+
+func (m *monitor) makeSwitch() (*p2p.Switch, error) {
        l, listenAddr := p2p.GetListener(m.nodeCfg.P2P)
-       discv, err := dht.NewDiscover(m.nodeCfg, swPrivKey, l.ExternalAddress().Port, vaporNetID)
+       discv, err := dht.NewDiscover(m.nodeCfg, m.privKey, l.ExternalAddress().Port, m.cfg.NetworkID)
        if err != nil {
                return nil, err
        }
 
+       // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
        lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
-       return p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, swPrivKey, listenAddr, vaporNetID)
+       return p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
 }
 
-func (m *monitor) monitorRountine() error {
-       // TODO: dail nodes, get lantency & best_height
-       // TODO: decide check_height("best best_height" - "confirmations")
-       // TODO: get blockhash by check_height, get latency
-       // TODO: update lantency, active_time and status
-       return nil
+func (m *monitor) discovery() {
+       ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
+       for range ticker.C {
+               nodes := make([]*dht.Node, nodesToDiscv)
+               n := m.sw.GetDiscv().ReadRandomNodes(nodes)
+               for i := 0; i < n; i++ {
+                       m.discvCh <- nodes[i]
+               }
+       }
 }
+
+func (m *monitor) collectDiscv() {
+       // nodeMap maps a node's public key to the node itself
+       nodeMap := make(map[string]*dht.Node)
+       for node := range m.discvCh {
+               if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
+                       continue
+               }
+               log.Info("discover new node: ", node)
+
+               m.upSertNode(&config.Node{
+                       PublicKey: node.ID.String(),
+                       Host:      node.IP.String(),
+                       Port:      node.TCP,
+               })
+               nodeMap[node.ID.String()] = node
+       }
+}
+
+func (m *monitor) dialRoutine() {
+       // TODO: rm hardcode
+       m.cfg.CheckFreqSeconds = 60
+       ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
+       for ; true; <-ticker.C {
+               m.dialNodes()
+       }
+}
+
+func (m *monitor) dialNodes() {
+       var nodes []*orm.Node
+       if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
+               log.Error(err)
+               return
+       }
+
+       addresses := make([]*p2p.NetAddress, 0)
+       for i := 0; i < len(nodes); i++ {
+               ip, err := net.LookupIP(nodes[i].Host)
+               if err != nil || len(ip) == 0 {
+                       continue
+               }
+
+               address := p2p.NewNetAddressIPPort(ip[0], nodes[i].Port)
+               addresses = append(addresses, address)
+       }
+
+       m.sw.DialPeers(addresses)
+}
+
+// TODO:
+// implement logic first, and then refactor
+// /home/gavin/work/go/src/github.com/vapor/
+// p2p/test_util.go
+// p2p/switch_test.go
+// syncManager
+// notificationMgr
+// TODO: dail nodes, get lantency & best_height
+// TODO: decide check_height("best best_height" - "confirmations")
+// TODO: get blockhash by check_height, get latency
+// TODO: update lantency, active_time and status