OSDN Git Service

2dd27294dbce723b45d4a23ec71ea795e4068758
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/toolbar/precog/common"
14         "github.com/vapor/toolbar/precog/config"
15         "github.com/vapor/toolbar/precog/database/orm"
16 )
17
18 // create or update: https://github.com/jinzhu/gorm/issues/1307
19 func (m *monitor) upsertNode(node *config.Node) error {
20         if node.XPub != nil {
21                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
22         }
23
24         ormNode := &orm.Node{PublicKey: node.PublicKey}
25         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
26                 return err
27         }
28
29         if node.XPub != nil {
30                 ormNode.Xpub = node.XPub.String()
31         }
32         ormNode.IP = node.IP
33         ormNode.Port = node.Port
34         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
35                 Assign(&orm.Node{
36                         Xpub: ormNode.Xpub,
37                         IP:   ormNode.IP,
38                         Port: ormNode.Port,
39                 }).FirstOrCreate(ormNode).Error
40 }
41
42 // TODO: maybe return connected nodes here for checkStatus
43 func (m *monitor) processDialResults() error {
44         var ormNodes []*orm.Node
45         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
46                 return err
47         }
48
49         publicKeyMap := make(map[string]*orm.Node, len(ormNodes))
50         for _, ormNode := range ormNodes {
51                 publicKeyMap[ormNode.PublicKey] = ormNode
52         }
53
54         connMap := make(map[string]bool, len(ormNodes))
55         // connected peers
56         for _, peer := range m.sw.GetPeers().List() {
57                 xPub := &chainkd.XPub{}
58                 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
59                         log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub")
60                         continue
61                 }
62
63                 publicKey := xPub.PublicKey().String()
64                 connMap[publicKey] = true
65                 if err := m.processConnectedPeer(publicKeyMap[publicKey]); err != nil {
66                         log.WithFields(log.Fields{"peer publicKey": publicKey, "err": err}).Error("processConnectedPeer")
67                 }
68         }
69
70         // offline peers
71         for _, ormNode := range ormNodes {
72                 if _, ok := connMap[ormNode.PublicKey]; ok {
73                         continue
74                 }
75
76                 if err := m.processOfflinePeer(ormNode); err != nil {
77                         log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
78                 }
79         }
80
81         return nil
82 }
83
84 func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
85         ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
86         err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
87         if err != nil && err != gorm.ErrRecordNotFound {
88                 return err
89         }
90
91         ormNodeLiveness.PongTimes += 1
92         if ormNode.Status == common.NodeOfflineStatus {
93                 ormNode.Status = common.NodeUnknownStatus
94         }
95         ormNodeLiveness.Node = ormNode
96         return m.db.Save(ormNodeLiveness).Error
97 }
98
99 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
100         ormNode.Status = common.NodeOfflineStatus
101         return m.db.Save(ormNode).Error
102 }
103
104 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
105         for _, peerInfo := range peerInfos {
106                 dbTx := m.db.Begin()
107                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
108                         log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
109                         dbTx.Rollback()
110                 } else {
111                         dbTx.Commit()
112                 }
113         }
114 }
115
116 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
117         xPub := &chainkd.XPub{}
118         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
119                 return err
120         }
121
122         ormNode := &orm.Node{}
123         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
124                 return err
125         }
126
127         if ormNode.Status == common.NodeOfflineStatus {
128                 return fmt.Errorf("node %s status error", ormNode.PublicKey)
129         }
130
131         log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
132         ping, err := time.ParseDuration(peerInfo.Ping)
133         if err != nil {
134                 return err
135         }
136
137         now := time.Now()
138         yesterday := now.Add(-24 * time.Hour)
139         var ormNodeLivenesses []*orm.NodeLiveness
140         if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
141                 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
142                 Order(fmt.Sprintf("created_at %s", "DESC")).
143                 Find(&ormNodeLivenesses).Error; err != nil {
144                 return err
145         }
146
147         // update latest liveness
148         latestLiveness := ormNodeLivenesses[0]
149         lantencyMS := ping.Nanoseconds() / 1000
150         if lantencyMS != 0 {
151                 latestLiveness.AvgLantencyMS = sql.NullInt64{
152                         Int64: (latestLiveness.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
153                         Valid: true,
154                 }
155         }
156         latestLiveness.PongTimes += 1
157         if peerInfo.Height != 0 {
158                 latestLiveness.BestHeight = peerInfo.Height
159         }
160         if err := dbTx.Save(latestLiveness).Error; err != nil {
161                 return err
162         }
163
164         // calc LatestDailyUptimeMinutes
165         total := 0 * time.Minute
166         ormNodeLivenesses[0].UpdatedAt = now
167         for _, ormNodeLiveness := range ormNodeLivenesses {
168                 if ormNodeLiveness.CreatedAt.Before(yesterday) {
169                         ormNodeLiveness.CreatedAt = yesterday
170                 }
171
172                 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
173         }
174
175         return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
176                 UpdateColumn(&orm.Node{
177                         Alias:                    peerInfo.Moniker,
178                         Xpub:                     peerInfo.ID,
179                         BestHeight:               peerInfo.Height,
180                         LatestDailyUptimeMinutes: uint64(total.Minutes()),
181                 }).First(ormNode).Error
182 }