OSDN Git Service

fix
[bytom/vapor.git] / toolbar / precog / monitor / monitor.go
1 package monitor
2
3 import (
4         // "encoding/binary"
5         // "encoding/hex"
6         // "io/ioutil"
7         "fmt"
8         "net"
9         "os"
10         "strings"
11         "time"
12
13         "github.com/jinzhu/gorm"
14         log "github.com/sirupsen/logrus"
15         // dbm "github.com/vapor/database/leveldb"
16
17         vaporCfg "github.com/vapor/config"
18         "github.com/vapor/consensus"
19         "github.com/vapor/crypto/ed25519/chainkd"
20         "github.com/vapor/event"
21         "github.com/vapor/p2p"
22         // conn "github.com/vapor/p2p/connection"
23         "github.com/vapor/netsync/peers"
24         // "github.com/vapor/consensus"
25         // "github.com/vapor/crypto/sha3pool"
26         "github.com/vapor/netsync/consensusmgr"
27         "github.com/vapor/p2p/discover/dht"
28         "github.com/vapor/p2p/discover/mdns"
29         "github.com/vapor/p2p/signlib"
30         "github.com/vapor/toolbar/precog/config"
31         "github.com/vapor/toolbar/precog/database/orm"
32 )
33
34 var (
35         nodesToDiscv = 150
36         discvFreqSec = 60
37 )
38
39 type monitor struct {
40         cfg     *config.Config
41         db      *gorm.DB
42         nodeCfg *vaporCfg.Config
43         sw      *p2p.Switch
44         discvCh chan *dht.Node
45         privKey chainkd.XPrv
46 }
47
48 func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
49         //TODO: for test
50         cfg.CheckFreqSeconds = 1
51
52         nodeCfg := &vaporCfg.Config{
53                 BaseConfig: vaporCfg.DefaultBaseConfig(),
54                 P2P:        vaporCfg.DefaultP2PConfig(),
55                 Federation: vaporCfg.DefaultFederationConfig(),
56         }
57         nodeCfg.DBPath = "vapor_precog_data"
58         nodeCfg.ChainID = "mainnet"
59         discvCh := make(chan *dht.Node)
60         privKey, err := signlib.NewPrivKey()
61         if err != nil {
62                 log.Fatal(err)
63         }
64
65         return &monitor{
66                 cfg:     cfg,
67                 db:      db,
68                 nodeCfg: nodeCfg,
69                 discvCh: discvCh,
70                 privKey: privKey.(chainkd.XPrv),
71         }
72 }
73
74 func (m *monitor) Run() {
75         defer os.RemoveAll(m.nodeCfg.DBPath)
76
77         var seeds []string
78         for _, node := range m.cfg.Nodes {
79                 seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
80                 if err := m.upSertNode(&node); err != nil {
81                         log.Error(err)
82                 }
83         }
84         m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
85         if err := m.makeSwitch(); err != nil {
86                 log.Fatal(err)
87         }
88
89         go m.discoveryRoutine()
90         go m.collectDiscoveredNodes()
91         go m.connectNodesRoutine()
92         go m.checkStatusRoutine()
93 }
94
95 // create or update: https://github.com/jinzhu/gorm/issues/1307
96 func (m *monitor) upSertNode(node *config.Node) error {
97         if node.XPub != nil {
98                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
99         }
100
101         ormNode := &orm.Node{PublicKey: node.PublicKey}
102         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
103                 return err
104         }
105
106         if node.Alias != "" {
107                 ormNode.Alias = node.Alias
108         }
109         if node.XPub != nil {
110                 ormNode.Xpub = node.XPub.String()
111         }
112         ormNode.Host = node.Host
113         ormNode.Port = node.Port
114         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
115                 Assign(&orm.Node{
116                         Xpub:  ormNode.Xpub,
117                         Alias: ormNode.Alias,
118                         Host:  ormNode.Host,
119                         Port:  ormNode.Port,
120                 }).FirstOrCreate(ormNode).Error
121 }
122
123 func (m *monitor) makeSwitch() error {
124         l, listenAddr := p2p.GetListener(m.nodeCfg.P2P)
125         discv, err := dht.NewDiscover(m.nodeCfg, m.privKey, l.ExternalAddress().Port, m.cfg.NetworkID)
126         if err != nil {
127                 return err
128         }
129
130         // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
131         lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
132         sw, err := p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
133         if err != nil {
134                 return err
135         }
136
137         m.sw = sw
138         return nil
139 }
140
141 func (m *monitor) discoveryRoutine() {
142         ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
143         for range ticker.C {
144                 nodes := make([]*dht.Node, nodesToDiscv)
145                 n := m.sw.GetDiscv().ReadRandomNodes(nodes)
146                 for i := 0; i < n; i++ {
147                         m.discvCh <- nodes[i]
148                 }
149         }
150 }
151
152 func (m *monitor) collectDiscoveredNodes() {
153         // nodeMap maps a node's public key to the node itself
154         nodeMap := make(map[string]*dht.Node)
155         for node := range m.discvCh {
156                 if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
157                         continue
158                 }
159                 log.Info("discover new node: ", node)
160
161                 if err := m.upSertNode(&config.Node{
162                         PublicKey: node.ID.String(),
163                         Host:      node.IP.String(),
164                         Port:      node.TCP,
165                 }); err != nil {
166                         log.Error(err)
167                 }
168
169                 nodeMap[node.ID.String()] = node
170         }
171 }
172
173 func (m *monitor) connectNodesRoutine() {
174         // TODO: change name?
175         ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
176         for ; true; <-ticker.C {
177                 if err := m.dialNodes(); err != nil {
178                         log.Error(err)
179                 }
180         }
181 }
182
183 func (m *monitor) dialNodes() error {
184         var nodes []*orm.Node
185         if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
186                 return err
187         }
188
189         addresses := make([]*p2p.NetAddress, 0)
190         for i := 0; i < len(nodes); i++ {
191                 ips, err := net.LookupIP(nodes[i].Host)
192                 if err != nil {
193                         log.Error(err)
194                         continue
195                 }
196                 if len(ips) == 0 {
197                         log.Errorf("fail to look up ip for %s", nodes[i].Host)
198                         continue
199                 }
200
201                 address := p2p.NewNetAddressIPPort(ips[0], nodes[i].Port)
202                 addresses = append(addresses, address)
203         }
204
205         m.sw.DialPeers(addresses)
206         return nil
207 }
208
209 func (m *monitor) checkStatusRoutine() {
210
211         peers := peers.NewPeerSet(m.sw)
212         dispatcher := event.NewDispatcher()
213         // consensusMgr := consensusmgr.NewManager(sw, chain, peers, dispatcher)
214         consensusMgr := consensusmgr.NewManager(m.sw, nil, peers, dispatcher)
215         consensusMgr.Start()
216
217         // TODO: change name?
218         ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
219         for ; true; <-ticker.C {
220                 log.Debug("p2p.peer list", m.sw.GetPeers().List())
221                 // TODO: SFSPV?
222                 log.Info("best", peers.BestPeer(consensus.SFFullNode))
223         }
224 }
225
226 // TODO:
227 // implement logic first, and then refactor
228 // /home/gavin/work/go/src/github.com/vapor/
229 // p2p/test_util.go
230 // p2p/switch_test.go
231 // syncManager
232 // notificationMgr
233
234 // TODO: dial nodes, get lantency & best_height
235 // TODO: decide check_height("best best_height" - "confirmations")
236 // TODO: get blockhash by check_height, get latency
237 // TODO: update lantency, active_time and status