OSDN Git Service

fk
[bytom/vapor.git] / toolbar / precog / monitor / monitor.go
1 package monitor
2
3 import (
4         // "encoding/binary"
5         // "encoding/hex"
6         "fmt"
7         "io/ioutil"
8         "net"
9         "os"
10         // "os/user"
11         "strings"
12         "time"
13
14         "github.com/jinzhu/gorm"
15         log "github.com/sirupsen/logrus"
16         // dbm "github.com/vapor/database/leveldb"
17
18         vaporCfg "github.com/vapor/config"
19         "github.com/vapor/consensus"
20         "github.com/vapor/crypto/ed25519/chainkd"
21         dbm "github.com/vapor/database/leveldb"
22         "github.com/vapor/event"
23         "github.com/vapor/p2p"
24         "github.com/vapor/protocol/bc/types"
25         // conn "github.com/vapor/p2p/connection"
26         "github.com/vapor/netsync/peers"
27         // "github.com/vapor/consensus"
28         // "github.com/vapor/crypto/sha3pool"
29         "github.com/vapor/netsync/chainmgr"
30         "github.com/vapor/netsync/consensusmgr"
31         "github.com/vapor/p2p/discover/dht"
32         "github.com/vapor/p2p/discover/mdns"
33         "github.com/vapor/p2p/signlib"
34         "github.com/vapor/test/mock"
35         "github.com/vapor/toolbar/precog/config"
36         "github.com/vapor/toolbar/precog/database/orm"
37 )
38
39 var (
40         nodesToDiscv = 150
41         discvFreqSec = 60
42 )
43
44 type monitor struct {
45         cfg     *config.Config
46         db      *gorm.DB
47         nodeCfg *vaporCfg.Config
48         sw      *p2p.Switch
49         discvCh chan *dht.Node
50         privKey chainkd.XPrv
51 }
52
53 // TODO: set SF myself?
54 func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
55         //TODO: for test
56         cfg.CheckFreqSeconds = 1
57
58         tmpDir, err := ioutil.TempDir(".", "vpPrecog")
59         if err != nil {
60                 log.Fatalf("failed to create temporary data folder: %v", err)
61         }
62
63         nodeCfg := &vaporCfg.Config{
64                 BaseConfig: vaporCfg.DefaultBaseConfig(),
65                 P2P:        vaporCfg.DefaultP2PConfig(),
66                 Federation: vaporCfg.DefaultFederationConfig(),
67         }
68         nodeCfg.DBPath = tmpDir
69         nodeCfg.ChainID = "mainnet"
70         discvCh := make(chan *dht.Node)
71         privKey, err := signlib.NewPrivKey()
72         if err != nil {
73                 log.Fatal(err)
74         }
75
76         return &monitor{
77                 cfg:     cfg,
78                 db:      db,
79                 nodeCfg: nodeCfg,
80                 discvCh: discvCh,
81                 privKey: privKey.(chainkd.XPrv),
82         }
83 }
84
85 func (m *monitor) Run() {
86         defer os.RemoveAll(m.nodeCfg.DBPath)
87
88         var seeds []string
89         for _, node := range m.cfg.Nodes {
90                 seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
91                 if err := m.upSertNode(&node); err != nil {
92                         log.Error(err)
93                 }
94         }
95         m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
96         if err := m.makeSwitch(); err != nil {
97                 log.Fatal(err)
98         }
99
100         go m.discoveryRoutine()
101         go m.collectDiscoveredNodes()
102         go m.connectNodesRoutine()
103         go m.checkStatusRoutine()
104 }
105
106 // create or update: https://github.com/jinzhu/gorm/issues/1307
107 func (m *monitor) upSertNode(node *config.Node) error {
108         if node.XPub != nil {
109                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
110         }
111
112         ormNode := &orm.Node{PublicKey: node.PublicKey}
113         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
114                 return err
115         }
116
117         if node.Alias != "" {
118                 ormNode.Alias = node.Alias
119         }
120         if node.XPub != nil {
121                 ormNode.Xpub = node.XPub.String()
122         }
123         ormNode.Host = node.Host
124         ormNode.Port = node.Port
125         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
126                 Assign(&orm.Node{
127                         Xpub:  ormNode.Xpub,
128                         Alias: ormNode.Alias,
129                         Host:  ormNode.Host,
130                         Port:  ormNode.Port,
131                 }).FirstOrCreate(ormNode).Error
132 }
133
134 func (m *monitor) makeSwitch() error {
135         l, listenAddr := p2p.GetListener(m.nodeCfg.P2P)
136         discv, err := dht.NewDiscover(m.nodeCfg, m.privKey, l.ExternalAddress().Port, m.cfg.NetworkID)
137         if err != nil {
138                 return err
139         }
140
141         // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
142         lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
143         sw, err := p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, m.privKey, listenAddr, m.cfg.NetworkID)
144         if err != nil {
145                 return err
146         }
147
148         m.sw = sw
149         return nil
150 }
151
152 func (m *monitor) discoveryRoutine() {
153         ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
154         for range ticker.C {
155                 nodes := make([]*dht.Node, nodesToDiscv)
156                 n := m.sw.GetDiscv().ReadRandomNodes(nodes)
157                 for i := 0; i < n; i++ {
158                         m.discvCh <- nodes[i]
159                 }
160         }
161 }
162
163 func (m *monitor) collectDiscoveredNodes() {
164         // nodeMap maps a node's public key to the node itself
165         nodeMap := make(map[string]*dht.Node)
166         for node := range m.discvCh {
167                 if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
168                         continue
169                 }
170                 log.Info("discover new node: ", node)
171
172                 if err := m.upSertNode(&config.Node{
173                         PublicKey: node.ID.String(),
174                         Host:      node.IP.String(),
175                         Port:      node.TCP,
176                 }); err != nil {
177                         log.Error(err)
178                 }
179
180                 nodeMap[node.ID.String()] = node
181         }
182 }
183
184 func (m *monitor) connectNodesRoutine() {
185         // TODO: change name?
186         ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
187         for ; true; <-ticker.C {
188                 if err := m.dialNodes(); err != nil {
189                         log.Error(err)
190                 }
191         }
192 }
193
194 func (m *monitor) dialNodes() error {
195         var nodes []*orm.Node
196         if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
197                 return err
198         }
199
200         addresses := make([]*p2p.NetAddress, 0)
201         for i := 0; i < len(nodes); i++ {
202                 ips, err := net.LookupIP(nodes[i].Host)
203                 if err != nil {
204                         log.Error(err)
205                         continue
206                 }
207                 if len(ips) == 0 {
208                         log.Errorf("fail to look up ip for %s", nodes[i].Host)
209                         continue
210                 }
211
212                 address := p2p.NewNetAddressIPPort(ips[0], nodes[i].Port)
213                 addresses = append(addresses, address)
214         }
215
216         m.sw.DialPeers(addresses)
217         return nil
218 }
219
220 func (m *monitor) getGenesisBlock() (*types.Block, error) {
221         genesisBlock := &types.Block{}
222         if err := genesisBlock.UnmarshalText([]byte("030100000000000000000000000000000000000000000000000000000000000000000082bfe3f4bf2d4052415e796436f587fac94677b20f027e910b70e2c220c411c0e87c37e0e1cc2ec9c377e5192668bc0a367e4a4764f11e7c725ecced1d7b6a492974fab1b6d5bc01000107010001012402220020f86826d640810eb08a2bfb706e0092273e05e9a7d3d71f9d53f4f6cc2e3d6c6a0001013b0039ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff00011600148c9d063ff74ee6d9ffa88d83aeb038068366c4c400")); err != nil {
223                 return nil, err
224         }
225
226         return genesisBlock, nil
227 }
228
229 func (m *monitor) checkStatusRoutine() {
230         txPool := &mock.Mempool{}
231         mockChain := mock.NewChain(txPool)
232         genesisBlock, err := m.getGenesisBlock()
233         if err != nil {
234                 log.Fatal(err)
235         }
236
237         mockChain.SetBlockByHeight(genesisBlock.BlockHeader.Height, genesisBlock)
238         mockChain.SetBestBlockHeader(&genesisBlock.BlockHeader)
239         dispatcher := event.NewDispatcher()
240         peers := peers.NewPeerSet(m.sw)
241         // add ConsensusReactor for consensusChannel
242         _ = consensusmgr.NewManager(m.sw, mockChain, peers, dispatcher)
243         fastSyncDB := dbm.NewDB("fastsync", m.nodeCfg.DBBackend, m.nodeCfg.DBDir())
244         // add ProtocolReactor to handle msgs
245         _, err = chainmgr.NewManager(m.nodeCfg, m.sw, mockChain, txPool, dispatcher, peers, fastSyncDB)
246         if err != nil {
247                 log.Fatal(err)
248         }
249
250         // TODO: clean up?? only start reactors??
251         m.sw.Start()
252
253         // for k, v := range m.sw.GetReactors() {
254         //      log.Debug("start", k, ",", v)
255         //      v.Start()
256         // }
257
258         ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
259         for ; true; <-ticker.C {
260                 for _, v := range m.sw.GetReactors() {
261                         for _, peer := range m.sw.GetPeers().List() {
262                                 log.Debug("AddPeer for", v, peer)
263                                 // TODO: if not in sw
264                                 v.AddPeer(peer)
265                         }
266                 }
267
268                 // TODO: SFSPV?
269                 log.Debug("best", peers.BestPeer(consensus.SFFullNode))
270                 for _, peerInfo := range peers.GetPeerInfos() {
271                         log.Info(peerInfo)
272                 }
273         }
274 }
275
276 // TODO:
277 // implement logic first, and then refactor
278 // /home/gavin/work/go/src/github.com/vapor/
279 // p2p/test_util.go
280 // p2p/switch_test.go
281 // syncManager
282
283 // TODO: dial nodes
284 // TODO: get lantency
285 // TODO: get best_height
286 // TODO: decide check_height("best best_height" - "confirmations")
287 // TODO: get blockhash by check_height, get latency
288 // TODO: update lantency, active_time and status