OSDN Git Service

70746e8033ddc3bcab6c93149b72d546382f398b
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/p2p"
14         "github.com/vapor/toolbar/precog/common"
15         "github.com/vapor/toolbar/precog/config"
16         "github.com/vapor/toolbar/precog/database/orm"
17 )
18
19 func (m *monitor) upsertNode(node *config.Node) error {
20         ormNode := &orm.Node{
21                 IP:   node.IP,
22                 Port: node.Port,
23         }
24         if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
25                 return err
26         }
27
28         ormNode.PublicKey = node.PublicKey
29         if node.XPub != nil {
30                 ormNode.Xpub = node.XPub.String()
31                 ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
32         }
33         return m.db.Save(ormNode).Error
34 }
35
36 func (m *monitor) processDialResults(peerList []*p2p.Peer) error {
37         var ormNodes []*orm.Node
38         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
39                 return err
40         }
41
42         addressMap := make(map[string]*orm.Node, len(ormNodes))
43         for _, ormNode := range ormNodes {
44                 addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode
45         }
46
47         connMap := make(map[string]bool, len(ormNodes))
48         // connected peers
49         for _, peer := range peerList {
50                 connMap[peer.ListenAddr] = true
51                 if err := m.processConnectedPeer(addressMap[peer.ListenAddr]); err != nil {
52                         log.WithFields(log.Fields{"peer listenAddr": peer.ListenAddr, "err": err}).Error("processConnectedPeer")
53                 }
54         }
55
56         // offline peers
57         for _, ormNode := range ormNodes {
58                 if _, ok := connMap[peer.ListenAddr]; ok {
59                         continue
60                 }
61
62                 if err := m.processOfflinePeer(ormNode); err != nil {
63                         log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
64                 }
65         }
66
67         return nil
68 }
69
70 func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
71         ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
72         err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
73         if err != nil && err != gorm.ErrRecordNotFound {
74                 return err
75         }
76
77         ormNodeLiveness.PongTimes += 1
78         if ormNode.Status == common.NodeOfflineStatus {
79                 ormNode.Status = common.NodeUnknownStatus
80         }
81         ormNodeLiveness.Node = ormNode
82         return m.db.Save(ormNodeLiveness).Error
83 }
84
85 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
86         ormNode.Status = common.NodeOfflineStatus
87         return m.db.Save(ormNode).Error
88 }
89
90 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
91         for _, peerInfo := range peerInfos {
92                 dbTx := m.db.Begin()
93                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
94                         log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
95                         dbTx.Rollback()
96                 } else {
97                         dbTx.Commit()
98                 }
99         }
100 }
101
102 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
103         xPub := &chainkd.XPub{}
104         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
105                 return err
106         }
107
108         ormNode := &orm.Node{}
109         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
110                 return err
111         }
112
113         if ormNode.Status == common.NodeOfflineStatus {
114                 return fmt.Errorf("node %s status error", ormNode.PublicKey)
115         }
116
117         log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
118         ping, err := time.ParseDuration(peerInfo.Ping)
119         if err != nil {
120                 return err
121         }
122
123         now := time.Now()
124         yesterday := now.Add(-24 * time.Hour)
125         var ormNodeLivenesses []*orm.NodeLiveness
126         if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
127                 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
128                 Order(fmt.Sprintf("created_at %s", "DESC")).
129                 Find(&ormNodeLivenesses).Error; err != nil {
130                 return err
131         }
132
133         // update latest liveness
134         latestLiveness := ormNodeLivenesses[0]
135         lantencyMS := ping.Nanoseconds() / 1000
136         if lantencyMS != 0 {
137                 ormNode.AvgLantencyMS = sql.NullInt64{
138                         Int64: (ormNode.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
139                         Valid: true,
140                 }
141         }
142         latestLiveness.PongTimes += 1
143         if peerInfo.Height != 0 {
144                 latestLiveness.BestHeight = peerInfo.Height
145         }
146         if err := dbTx.Save(latestLiveness).Error; err != nil {
147                 return err
148         }
149
150         // calc LatestDailyUptimeMinutes
151         total := 0 * time.Minute
152         ormNodeLivenesses[0].UpdatedAt = now
153         for _, ormNodeLiveness := range ormNodeLivenesses {
154                 if ormNodeLiveness.CreatedAt.Before(yesterday) {
155                         ormNodeLiveness.CreatedAt = yesterday
156                 }
157
158                 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
159         }
160
161         return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
162                 UpdateColumn(&orm.Node{
163                         Alias:                    peerInfo.Moniker,
164                         Xpub:                     peerInfo.ID,
165                         BestHeight:               peerInfo.Height,
166                         LatestDailyUptimeMinutes: uint64(total.Minutes()),
167                 }).First(ormNode).Error
168 }