OSDN Git Service

fix log
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/p2p"
14         "github.com/vapor/toolbar/precog/common"
15         "github.com/vapor/toolbar/precog/config"
16         "github.com/vapor/toolbar/precog/database/orm"
17 )
18
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upSertNode(node *config.Node) error {
21         if node.XPub != nil {
22                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
23         }
24
25         ormNode := &orm.Node{PublicKey: node.PublicKey}
26         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
27                 return err
28         }
29
30         if node.Alias != "" {
31                 ormNode.Alias = node.Alias
32         }
33         if node.XPub != nil {
34                 ormNode.Xpub = node.XPub.String()
35         }
36         ormNode.IP = node.IP
37         ormNode.Port = node.Port
38         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
39                 Assign(&orm.Node{
40                         Xpub:  ormNode.Xpub,
41                         Alias: ormNode.Alias,
42                         IP:    ormNode.IP,
43                         Port:  ormNode.Port,
44                 }).FirstOrCreate(ormNode).Error
45 }
46
47 func (m *monitor) processDialResults() error {
48         var ormNodes []*orm.Node
49         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
50                 return err
51         }
52
53         publicKeyMap := make(map[string]*orm.Node, len(ormNodes))
54         for _, ormNode := range ormNodes {
55                 publicKeyMap[ormNode.PublicKey] = ormNode
56         }
57
58         connMap := make(map[string]bool, len(ormNodes))
59         // connected peers
60         for _, peer := range m.sw.GetPeers().List() {
61                 xPub := &chainkd.XPub{}
62                 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
63                         log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub")
64                         continue
65                 }
66
67                 publicKey := xPub.PublicKey().String()
68                 connMap[publicKey] = true
69                 if err := m.processConnectedPeer(publicKeyMap[publicKey], peer); err != nil {
70                         log.WithFields(log.Fields{
71                                 "peer publicKey": publicKey,
72                                 "err":            err,
73                         }).Error("processConnectedPeer")
74                 }
75         }
76
77         // offline peers
78         for _, ormNode := range ormNodes {
79                 if _, ok := connMap[ormNode.PublicKey]; ok {
80                         continue
81                 }
82
83                 if err := m.processOfflinePeer(ormNode); err != nil {
84                         log.WithFields(log.Fields{
85                                 "peer publicKey": ormNode.PublicKey,
86                                 "err":            err,
87                         }).Error("processOfflinePeer")
88                 }
89         }
90
91         return nil
92 }
93
94 func (m *monitor) processConnectedPeer(ormNode *orm.Node, peer *p2p.Peer) error {
95         ormNodeLiveness := &orm.NodeLiveness{}
96         err := m.db.Model(&orm.NodeLiveness{}).Joins("join nodes on nodes.id = node_livenesses.node_id").
97                 Where("nodes.public_key = ? AND status != ?", ormNode.PublicKey, common.NodeOfflineStatus).Last(ormNodeLiveness).Error
98         if err == nil {
99                 return m.db.Model(&orm.NodeLiveness{}).Where(ormNodeLiveness).UpdateColumn(&orm.NodeLiveness{
100                         PingTimes: ormNodeLiveness.PingTimes + 1,
101                 }).Error
102         } else if err != gorm.ErrRecordNotFound {
103                 return err
104         }
105
106         // gorm.ErrRecordNotFound
107         return m.db.Create(&orm.NodeLiveness{
108                 NodeID:    ormNode.ID,
109                 PingTimes: 1,
110                 Status:    common.NodeUnknownStatus,
111         }).Error
112 }
113
114 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
115         return m.db.Model(&orm.NodeLiveness{}).
116                 Where(&orm.NodeLiveness{NodeID: ormNode.ID}).
117                 UpdateColumn(&orm.NodeLiveness{
118                         Status: common.NodeOfflineStatus,
119                 }).Error
120 }
121
122 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
123         for _, peerInfo := range peerInfos {
124                 dbTx := m.db.Begin()
125                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
126                         log.WithFields(log.Fields{
127                                 "peerInfo": peerInfo,
128                                 "err":      err,
129                         }).Error("processPeerInfo")
130                         dbTx.Rollback()
131                 } else {
132                         dbTx.Commit()
133                 }
134         }
135
136         return nil
137 }
138
139 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
140         xPub := &chainkd.XPub{}
141         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
142                 return err
143         }
144
145         ormNode := &orm.Node{}
146         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
147                 return err
148         }
149
150         log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
151         ping, err := time.ParseDuration(peerInfo.Ping)
152         if err != nil {
153                 return err
154         }
155
156         now := time.Now()
157         yesterday := now.Add(-24 * time.Hour)
158         var ormNodeLivenesses []*orm.NodeLiveness
159         if err := dbTx.Model(&orm.NodeLiveness{}).
160                 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
161                 Order(fmt.Sprintf("created_at %s", "DESC")).
162                 Find(&ormNodeLivenesses).Error; err != nil {
163                 return err
164         }
165
166         // update latest liveness
167         latestLiveness := ormNodeLivenesses[0]
168         if latestLiveness.Status == common.NodeOfflineStatus {
169                 return fmt.Errorf("node %s latest liveness status error", ormNode.PublicKey)
170         }
171
172         lantencyMS := ping.Nanoseconds() / 1000
173         if lantencyMS != 0 {
174                 latestLiveness.AvgLantencyMS = sql.NullInt64{
175                         Int64: (latestLiveness.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
176                         Valid: true,
177                 }
178         }
179         latestLiveness.PongTimes += 1
180         if peerInfo.Height != 0 {
181                 latestLiveness.BestHeight = peerInfo.Height
182         }
183         if err := dbTx.Save(latestLiveness).Error; err != nil {
184                 return err
185         }
186
187         // calc LatestDailyUptimeMinutes
188         total := 0 * time.Minute
189         ormNodeLivenesses[0].UpdatedAt = now
190         for _, ormNodeLiveness := range ormNodeLivenesses {
191                 if ormNodeLiveness.CreatedAt.Before(yesterday) {
192                         ormNodeLiveness.CreatedAt = yesterday
193                 }
194
195                 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
196         }
197
198         return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
199                 UpdateColumn(&orm.Node{
200                         Alias:                    peerInfo.Moniker,
201                         Xpub:                     peerInfo.ID,
202                         BestHeight:               peerInfo.Height,
203                         LatestDailyUptimeMinutes: uint64(total.Minutes()),
204                 }).First(ormNode).Error
205 }