OSDN Git Service

fix
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/p2p"
14         "github.com/vapor/toolbar/precog/common"
15         "github.com/vapor/toolbar/precog/config"
16         "github.com/vapor/toolbar/precog/database/orm"
17 )
18
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upsertNode(node *config.Node) error {
21         if node.XPub != nil {
22                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
23         }
24
25         ormNode := &orm.Node{PublicKey: node.PublicKey}
26         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
27                 return err
28         }
29
30         if node.XPub != nil {
31                 ormNode.Xpub = node.XPub.String()
32         }
33         ormNode.IP = node.IP
34         ormNode.Port = node.Port
35         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
36                 Assign(&orm.Node{
37                         Xpub: ormNode.Xpub,
38                         IP:   ormNode.IP,
39                         Port: ormNode.Port,
40                 }).FirstOrCreate(ormNode).Error
41 }
42
43 // TODO: maybe return connected nodes here for checkStatus
44 func (m *monitor) processDialResults() error {
45         var ormNodes []*orm.Node
46         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
47                 return err
48         }
49
50         publicKeyMap := make(map[string]*orm.Node, len(ormNodes))
51         for _, ormNode := range ormNodes {
52                 publicKeyMap[ormNode.PublicKey] = ormNode
53         }
54
55         connMap := make(map[string]bool, len(ormNodes))
56         // connected peers
57         for _, peer := range m.sw.GetPeers().List() {
58                 xPub := &chainkd.XPub{}
59                 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
60                         log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub")
61                         continue
62                 }
63
64                 publicKey := xPub.PublicKey().String()
65                 connMap[publicKey] = true
66                 if err := m.processConnectedPeer(publicKeyMap[publicKey]); err != nil {
67                         log.WithFields(log.Fields{"peer publicKey": publicKey, "err": err}).Error("processConnectedPeer")
68                 }
69         }
70
71         // offline peers
72         for _, ormNode := range ormNodes {
73                 if _, ok := connMap[ormNode.PublicKey]; ok {
74                         continue
75                 }
76
77                 if err := m.processOfflinePeer(ormNode); err != nil {
78                         log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
79                 }
80         }
81
82         return nil
83 }
84
85 func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
86         ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
87         err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
88         if err != nil && err != gorm.ErrRecordNotFound {
89                 return err
90         }
91
92         ormNodeLiveness.PongTimes += 1
93         ormNode.Status = common.NodeUnknownStatus
94         ormNodeLiveness.Node = ormNode
95         return m.db.Save(ormNodeLiveness).Error
96 }
97
98 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
99         ormNode.Status = common.NodeOfflineStatus
100         return m.db.Save(ormNode).Error
101 }
102
103 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
104         for _, peerInfo := range peerInfos {
105                 dbTx := m.db.Begin()
106                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
107                         log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
108                         dbTx.Rollback()
109                 } else {
110                         dbTx.Commit()
111                 }
112         }
113 }
114
115 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
116         xPub := &chainkd.XPub{}
117         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
118                 return err
119         }
120
121         ormNode := &orm.Node{}
122         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
123                 return err
124         }
125
126         log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
127         ping, err := time.ParseDuration(peerInfo.Ping)
128         if err != nil {
129                 return err
130         }
131
132         now := time.Now()
133         yesterday := now.Add(-24 * time.Hour)
134         var ormNodeLivenesses []*orm.NodeLiveness
135         if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
136                 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
137                 Order(fmt.Sprintf("created_at %s", "DESC")).
138                 Find(&ormNodeLivenesses).Error; err != nil {
139                 return err
140         }
141
142         // update latest liveness
143         latestLiveness := ormNodeLivenesses[0]
144         // if latestLiveness.Status == common.NodeOfflineStatus {
145         //      return fmt.Errorf("node %s latest liveness status error", ormNode.PublicKey)
146         // }
147
148         lantencyMS := ping.Nanoseconds() / 1000
149         if lantencyMS != 0 {
150                 latestLiveness.AvgLantencyMS = sql.NullInt64{
151                         Int64: (latestLiveness.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
152                         Valid: true,
153                 }
154         }
155         latestLiveness.PongTimes += 1
156         if peerInfo.Height != 0 {
157                 latestLiveness.BestHeight = peerInfo.Height
158         }
159         if err := dbTx.Save(latestLiveness).Error; err != nil {
160                 return err
161         }
162
163         // calc LatestDailyUptimeMinutes
164         total := 0 * time.Minute
165         ormNodeLivenesses[0].UpdatedAt = now
166         for _, ormNodeLiveness := range ormNodeLivenesses {
167                 if ormNodeLiveness.CreatedAt.Before(yesterday) {
168                         ormNodeLiveness.CreatedAt = yesterday
169                 }
170
171                 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
172         }
173
174         return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
175                 UpdateColumn(&orm.Node{
176                         Alias:                    peerInfo.Moniker,
177                         Xpub:                     peerInfo.ID,
178                         BestHeight:               peerInfo.Height,
179                         LatestDailyUptimeMinutes: uint64(total.Minutes()),
180                 }).First(ormNode).Error
181 }