OSDN Git Service

72c2203bdde99d549033d83c5079d2c7dfde20fd
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/p2p"
14         "github.com/vapor/toolbar/precog/common"
15         "github.com/vapor/toolbar/precog/config"
16         "github.com/vapor/toolbar/precog/database/orm"
17 )
18
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upSertNode(node *config.Node) error {
21         if node.XPub != nil {
22                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
23         }
24
25         ormNode := &orm.Node{PublicKey: node.PublicKey}
26         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
27                 return err
28         }
29
30         if node.Alias != "" {
31                 ormNode.Alias = node.Alias
32         }
33         if node.XPub != nil {
34                 ormNode.Xpub = node.XPub.String()
35         }
36         ormNode.Host = node.Host
37         ormNode.Port = node.Port
38         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
39                 Assign(&orm.Node{
40                         Xpub:  ormNode.Xpub,
41                         Alias: ormNode.Alias,
42                         Host:  ormNode.Host,
43                         Port:  ormNode.Port,
44                 }).FirstOrCreate(ormNode).Error
45 }
46
47 func (m *monitor) processDialResults() error {
48         var ormNodes []*orm.Node
49         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
50                 return err
51         }
52
53         publicKeyMap := make(map[string]*orm.Node, len(ormNodes))
54         for _, ormNode := range ormNodes {
55                 publicKeyMap[ormNode.PublicKey] = ormNode
56         }
57
58         connMap := make(map[string]bool, len(ormNodes))
59         // connected peers
60         for _, peer := range m.sw.GetPeers().List() {
61                 xPub := &chainkd.XPub{}
62                 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
63                         log.Error(err)
64                         continue
65                 }
66
67                 publicKey := xPub.PublicKey().String()
68                 connMap[publicKey] = true
69                 if err := m.processConnectedPeer(publicKeyMap[publicKey], peer); err != nil {
70                         log.Error(err)
71                 }
72         }
73
74         // offline peers
75         for _, ormNode := range ormNodes {
76                 if _, ok := connMap[ormNode.PublicKey]; ok {
77                         continue
78                 }
79
80                 if err := m.processOfflinePeer(ormNode); err != nil {
81                         log.Error(err)
82                 }
83         }
84
85         return nil
86 }
87
88 func (m *monitor) processConnectedPeer(ormNode *orm.Node, peer *p2p.Peer) error {
89         ormNodeLiveness := &orm.NodeLiveness{}
90         err := m.db.Model(&orm.NodeLiveness{}).Joins("join nodes on nodes.id = node_livenesses.node_id").
91                 Where("nodes.public_key = ? AND status != ?", ormNode.PublicKey, common.NodeOfflineStatus).Last(ormNodeLiveness).Error
92         if err == nil {
93                 return m.db.Model(&orm.NodeLiveness{}).Where(ormNodeLiveness).UpdateColumn(&orm.NodeLiveness{
94                         PingTimes: ormNodeLiveness.PingTimes + 1,
95                 }).Error
96         } else if err != gorm.ErrRecordNotFound {
97                 return err
98         }
99
100         // gorm.ErrRecordNotFound
101         return m.db.Create(&orm.NodeLiveness{
102                 NodeID:    ormNode.ID,
103                 PingTimes: 1,
104                 Status:    common.NodeUnknownStatus,
105         }).Error
106 }
107
108 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
109         return m.db.Model(&orm.NodeLiveness{}).
110                 Where(&orm.NodeLiveness{NodeID: ormNode.ID}).
111                 UpdateColumn(&orm.NodeLiveness{
112                         Status: common.NodeOfflineStatus,
113                 }).Error
114 }
115
116 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
117         for _, peerInfo := range peerInfos {
118                 dbTx := m.db.Begin()
119                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
120                         log.Error(err)
121                         dbTx.Rollback()
122                 } else {
123                         dbTx.Commit()
124                 }
125         }
126
127         return nil
128 }
129
130 // TODO: fix pong time
131 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
132         xPub := &chainkd.XPub{}
133         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
134                 return err
135         }
136
137         ormNode := &orm.Node{}
138         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
139                 return err
140         }
141
142         log.Debugf("peerInfo Ping: %v", peerInfo.Ping)
143         ping, err := time.ParseDuration(peerInfo.Ping)
144         if err != nil {
145                 log.Debugf("Parse ping time err: %v", err)
146         }
147
148         // TODO: preload?
149         ormNodeLiveness := &orm.NodeLiveness{
150                 NodeID:        ormNode.ID,
151                 BestHeight:    ormNode.BestHeight,
152                 AvgLantencyMS: sql.NullInt64{Int64: ping.Nanoseconds() / 1000, Valid: true},
153                 // PingTimes     uint64
154                 // PongTimes     uint64
155         }
156         if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
157                 UpdateColumn(&orm.NodeLiveness{
158                         BestHeight:    ormNodeLiveness.BestHeight,
159                         AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,
160                 }).FirstOrCreate(ormNodeLiveness).Error; err != nil {
161                 return err
162         }
163
164         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
165                 UpdateColumn(&orm.Node{
166                         Alias:      peerInfo.Moniker,
167                         Xpub:       peerInfo.ID,
168                         BestHeight: peerInfo.Height,
169                         // LatestDailyUptimeMinutes uint64
170                 }).First(ormNode).Error; err != nil {
171                 return err
172         }
173
174         return nil
175 }