OSDN Git Service

clean
[bytom/vapor.git] / toolbar / precog / monitor / monitor.go
1 package monitor
2
3 import (
4         // "encoding/binary"
5         // "encoding/hex"
6         // "io/ioutil"
7         "fmt"
8         "os"
9         // "strings"
10         "time"
11
12         "github.com/jinzhu/gorm"
13         log "github.com/sirupsen/logrus"
14         // dbm "github.com/vapor/database/leveldb"
15
16         vaporCfg "github.com/vapor/config"
17         // "github.com/vapor/crypto/ed25519/chainkd"
18         "github.com/vapor/p2p"
19         // conn "github.com/vapor/p2p/connection"
20         // "github.com/vapor/consensus"
21         // "github.com/vapor/crypto/sha3pool"
22         "github.com/vapor/p2p/discover/dht"
23         // "github.com/vapor/p2p/discover/mdns"
24         "github.com/vapor/p2p/signlib"
25         "github.com/vapor/toolbar/precog/config"
26         "github.com/vapor/toolbar/precog/database/orm"
27 )
28
29 var (
30         nodesToDiscv = 150
31         discvFreqSec = 1
32 )
33
34 type monitor struct {
35         cfg     *config.Config
36         db      *gorm.DB
37         nodeCfg *vaporCfg.Config
38         discvCh chan *dht.Node
39 }
40
41 func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
42         nodeCfg := &vaporCfg.Config{
43                 BaseConfig: vaporCfg.DefaultBaseConfig(),
44                 P2P:        vaporCfg.DefaultP2PConfig(),
45                 Federation: vaporCfg.DefaultFederationConfig(),
46         }
47         nodeCfg.DBPath = "vapor_precog_data"
48         nodeCfg.ChainID = "mainnet"
49         discvCh := make(chan *dht.Node)
50
51         return &monitor{
52                 cfg:     cfg,
53                 db:      db,
54                 nodeCfg: nodeCfg,
55                 discvCh: discvCh,
56         }
57 }
58
59 func (m *monitor) Run() {
60         defer os.RemoveAll(m.nodeCfg.DBPath)
61
62         for _, node := range m.cfg.Nodes {
63                 m.upSertNode(&node)
64         }
65
66         go m.discovery()
67         go m.collectDiscv()
68
69         ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
70         for ; true; <-ticker.C {
71                 // TODO: lock?
72                 m.monitorRountine()
73         }
74 }
75
76 // create or update: https://github.com/jinzhu/gorm/issues/1307
77 func (m *monitor) upSertNode(node *config.Node) error {
78         if node.XPub != nil {
79                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
80         }
81
82         ormNode := &orm.Node{PublicKey: node.PublicKey}
83         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
84                 return err
85         }
86
87         if node.Alias != "" {
88                 ormNode.Alias = node.Alias
89         }
90         if node.XPub != nil {
91                 ormNode.Xpub = node.XPub.String()
92         }
93         ormNode.Host = node.Host
94         ormNode.Port = node.Port
95         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
96                 Assign(&orm.Node{
97                         Xpub:  ormNode.Xpub,
98                         Alias: ormNode.Alias,
99                         Host:  ormNode.Host,
100                         Port:  ormNode.Port,
101                 }).FirstOrCreate(ormNode).Error
102 }
103
104 func (m *monitor) discovery() {
105         swPrivKey, err := signlib.NewPrivKey()
106         if err != nil {
107                 log.Fatal(err)
108         }
109
110         l, _ := p2p.GetListener(m.nodeCfg.P2P)
111         discv, err := dht.NewDiscover(m.nodeCfg, swPrivKey, l.ExternalAddress().Port, m.cfg.NetworkID)
112         if err != nil {
113                 log.Fatal(err)
114         }
115
116         ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
117         for range ticker.C {
118                 nodes := make([]*dht.Node, nodesToDiscv)
119                 n := discv.ReadRandomNodes(nodes)
120                 for i := 0; i < n; i++ {
121                         m.discvCh <- nodes[i]
122                 }
123         }
124 }
125
126 // whatz the pubKey?
127 func (m *monitor) collectDiscv() {
128         // nodeMap maps a node's public key to the node itself
129         nodeMap := make(map[string]*dht.Node)
130         for node := range m.discvCh {
131                 if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
132                         continue
133                 }
134                 log.Info("discover new node: ", node)
135
136                 m.upSertNode(&config.Node{
137                         PublicKey: node.ID.String(),
138                         Host:      node.IP.String(),
139                         Port:      node.TCP,
140                 })
141                 nodeMap[node.ID.String()] = node
142         }
143 }
144
145 func (m *monitor) monitorRountine() error {
146         // TODO: dail nodes, get lantency & best_height
147         // TODO: decide check_height("best best_height" - "confirmations")
148         // TODO: get blockhash by check_height, get latency
149         // TODO: update lantency, active_time and status
150         return nil
151 }
152
153 // TODO:
154 // implement logic first, and then refactor
155 // /home/gavin/work/go/src/github.com/vapor/
156 // p2p/test_util.go
157 // p2p/switch_test.go
158 // syncManager
159 // notificationMgr
160 /*
161 func (m *monitor) discovery() {
162         sw, err := m.makeSwitch()
163         if err != nil {
164                 log.Fatal(err)
165         }
166
167         sw.Start()
168 }
169 */
170
171 /*
172 func (m *monitor) makeSwitch() (*p2p.Switch, error) {
173         swPrivKey, err := signlib.NewPrivKey()
174         if err != nil {
175                 return nil, err
176         }
177
178         l, listenAddr := p2p.GetListener(m.nodeCfg.P2P)
179         discv, err := dht.NewDiscover(m.nodeCfg, swPrivKey, l.ExternalAddress().Port, m.cfg.NetworkID)
180         if err != nil {
181                 return nil, err
182         }
183
184         // no need for lanDiscv, but passing &mdns.LANDiscover{} will cause NilPointer
185         lanDiscv := mdns.NewLANDiscover(mdns.NewProtocol(), int(l.ExternalAddress().Port))
186         return p2p.NewSwitch(m.nodeCfg, discv, lanDiscv, l, swPrivKey, listenAddr, m.cfg.NetworkID)
187 }
188 */