OSDN Git Service

clean
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/toolbar/precog/common"
14         "github.com/vapor/toolbar/precog/config"
15         "github.com/vapor/toolbar/precog/database/orm"
16 )
17
18 // create or update: https://github.com/jinzhu/gorm/issues/1307
19 func (m *monitor) upsertNode(node *config.Node) error {
20         if node.XPub != nil {
21                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
22         }
23
24         ormNode := &orm.Node{
25                 IP:   node.IP,
26                 Port: node.Port,
27         }
28         if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
29                 return err
30         }
31
32         ormNode.PublicKey = node.PublicKey
33         if node.XPub != nil {
34                 ormNode.Xpub = node.XPub.String()
35         }
36         return m.db.Save(ormNode).Error
37 }
38
39 // TODO: maybe return connected nodes here for checkStatus
40 func (m *monitor) processDialResults() error {
41         var ormNodes []*orm.Node
42         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
43                 return err
44         }
45
46         publicKeyMap := make(map[string]*orm.Node, len(ormNodes))
47         for _, ormNode := range ormNodes {
48                 publicKeyMap[ormNode.PublicKey] = ormNode
49         }
50
51         connMap := make(map[string]bool, len(ormNodes))
52         // connected peers
53         for _, peer := range m.sw.GetPeers().List() {
54                 xPub := &chainkd.XPub{}
55                 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
56                         log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub")
57                         continue
58                 }
59
60                 publicKey := xPub.PublicKey().String()
61                 connMap[publicKey] = true
62                 if err := m.processConnectedPeer(publicKeyMap[publicKey]); err != nil {
63                         log.WithFields(log.Fields{"peer publicKey": publicKey, "err": err}).Error("processConnectedPeer")
64                 }
65         }
66
67         // offline peers
68         for _, ormNode := range ormNodes {
69                 if _, ok := connMap[ormNode.PublicKey]; ok {
70                         continue
71                 }
72
73                 if err := m.processOfflinePeer(ormNode); err != nil {
74                         log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
75                 }
76         }
77
78         return nil
79 }
80
81 func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
82         ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
83         err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
84         if err != nil && err != gorm.ErrRecordNotFound {
85                 return err
86         }
87
88         ormNodeLiveness.PongTimes += 1
89         if ormNode.Status == common.NodeOfflineStatus {
90                 ormNode.Status = common.NodeUnknownStatus
91         }
92         ormNodeLiveness.Node = ormNode
93         return m.db.Save(ormNodeLiveness).Error
94 }
95
96 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
97         ormNode.Status = common.NodeOfflineStatus
98         return m.db.Save(ormNode).Error
99 }
100
101 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
102         for _, peerInfo := range peerInfos {
103                 dbTx := m.db.Begin()
104                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
105                         log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
106                         dbTx.Rollback()
107                 } else {
108                         dbTx.Commit()
109                 }
110         }
111 }
112
113 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
114         xPub := &chainkd.XPub{}
115         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
116                 return err
117         }
118
119         ormNode := &orm.Node{}
120         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
121                 return err
122         }
123
124         if ormNode.Status == common.NodeOfflineStatus {
125                 return fmt.Errorf("node %s status error", ormNode.PublicKey)
126         }
127
128         log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
129         ping, err := time.ParseDuration(peerInfo.Ping)
130         if err != nil {
131                 return err
132         }
133
134         now := time.Now()
135         yesterday := now.Add(-24 * time.Hour)
136         var ormNodeLivenesses []*orm.NodeLiveness
137         if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
138                 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
139                 Order(fmt.Sprintf("created_at %s", "DESC")).
140                 Find(&ormNodeLivenesses).Error; err != nil {
141                 return err
142         }
143
144         // update latest liveness
145         latestLiveness := ormNodeLivenesses[0]
146         lantencyMS := ping.Nanoseconds() / 1000
147         if lantencyMS != 0 {
148                 ormNode.AvgLantencyMS = sql.NullInt64{
149                         Int64: (ormNode.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
150                         Valid: true,
151                 }
152         }
153         latestLiveness.PongTimes += 1
154         if peerInfo.Height != 0 {
155                 latestLiveness.BestHeight = peerInfo.Height
156         }
157         if err := dbTx.Save(latestLiveness).Error; err != nil {
158                 return err
159         }
160
161         // calc LatestDailyUptimeMinutes
162         total := 0 * time.Minute
163         ormNodeLivenesses[0].UpdatedAt = now
164         for _, ormNodeLiveness := range ormNodeLivenesses {
165                 if ormNodeLiveness.CreatedAt.Before(yesterday) {
166                         ormNodeLiveness.CreatedAt = yesterday
167                 }
168
169                 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
170         }
171
172         return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
173                 UpdateColumn(&orm.Node{
174                         Alias:                    peerInfo.Moniker,
175                         Xpub:                     peerInfo.ID,
176                         BestHeight:               peerInfo.Height,
177                         LatestDailyUptimeMinutes: uint64(total.Minutes()),
178                 }).First(ormNode).Error
179 }