OSDN Git Service

13c8818ecfddc336d74ce015a3d765e4597ef5f1
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "net"
7         "strconv"
8         "time"
9
10         "github.com/jinzhu/gorm"
11         log "github.com/sirupsen/logrus"
12
13         "github.com/vapor/netsync/peers"
14         "github.com/vapor/p2p"
15         "github.com/vapor/toolbar/precog/common"
16         "github.com/vapor/toolbar/precog/config"
17         "github.com/vapor/toolbar/precog/database/orm"
18 )
19
20 func (m *monitor) upsertNode(node *config.Node) error {
21         ormNode := &orm.Node{
22                 IP:   node.IP,
23                 Port: node.Port,
24         }
25         if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
26                 return err
27         }
28
29         ormNode.PublicKey = node.PublicKey
30         if node.XPub != nil {
31                 ormNode.Xpub = node.XPub.String()
32                 ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
33         }
34         return m.db.Save(ormNode).Error
35 }
36
37 func parseRemoteAddr(remoteAddr string) (string, uint16, error) {
38         host, portStr, err := net.SplitHostPort(remoteAddr)
39         if err != nil {
40                 return "", 0, err
41         }
42
43         port, err := strconv.Atoi(portStr)
44         if err != nil {
45                 return "", 0, err
46         }
47
48         return host, uint16(port), nil
49 }
50
51 func (m *monitor) processDialResults(peerList []*p2p.Peer) error {
52         var ormNodes []*orm.Node
53         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
54                 return err
55         }
56
57         addressMap := make(map[string]*orm.Node, len(ormNodes))
58         for _, ormNode := range ormNodes {
59                 addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode
60         }
61
62         connMap := make(map[string]bool, len(ormNodes))
63         // connected peers
64         for _, peer := range peerList {
65                 connMap[peer.RemoteAddr] = true
66                 if err := m.processConnectedPeer(addressMap[peer.RemoteAddr]); err != nil {
67                         log.WithFields(log.Fields{"peer remoteAddr": peer.RemoteAddr, "err": err}).Error("processConnectedPeer")
68                 }
69         }
70
71         // offline peers
72         for _, ormNode := range ormNodes {
73                 if _, ok := connMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)]; ok {
74                         continue
75                 }
76
77                 if err := m.processOfflinePeer(ormNode); err != nil {
78                         log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
79                 }
80         }
81
82         return nil
83 }
84
85 func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
86         ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
87         err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
88         if err != nil && err != gorm.ErrRecordNotFound {
89                 return err
90         }
91
92         ormNodeLiveness.PongTimes += 1
93         if ormNode.Status == common.NodeOfflineStatus {
94                 ormNode.Status = common.NodeUnknownStatus
95         }
96         ormNodeLiveness.Node = ormNode
97         return m.db.Save(ormNodeLiveness).Error
98 }
99
100 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
101         ormNode.Status = common.NodeOfflineStatus
102         return m.db.Save(ormNode).Error
103 }
104
105 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
106         for _, peerInfo := range peerInfos {
107                 dbTx := m.db.Begin()
108                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
109                         log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
110                         dbTx.Rollback()
111                 } else {
112                         dbTx.Commit()
113                 }
114         }
115 }
116
117 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
118         ip, port, err := parseRemoteAddr(peerInfo.RemoteAddr)
119         if err != nil {
120                 return err
121         }
122
123         ormNode := &orm.Node{IP: ip, Port: uint16(port)}
124         if err := dbTx.Where(ormNode).First(ormNode).Error; err != nil {
125                 return err
126         }
127
128         if ormNode.Status == common.NodeOfflineStatus {
129                 return fmt.Errorf("node %s:%d status error", ormNode.IP, ormNode.Port)
130         }
131
132         log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
133         ping, err := time.ParseDuration(peerInfo.Ping)
134         if err != nil {
135                 return err
136         }
137
138         now := time.Now()
139         yesterday := now.Add(-24 * time.Hour)
140         var ormNodeLivenesses []*orm.NodeLiveness
141         if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
142                 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
143                 Order(fmt.Sprintf("created_at %s", "DESC")).Find(&ormNodeLivenesses).Error; err != nil {
144                 return err
145         }
146
147         // update latest liveness
148         latestLiveness := ormNodeLivenesses[0]
149         rttMS := ping.Nanoseconds() / 1000000
150         if rttMS > 0 && rttMS < 2000 {
151                 ormNode.Status = common.NodeHealthyStatus
152         } else if rttMS > 2000 {
153                 ormNode.Status = common.NodeCongestedStatus
154         }
155         if rttMS != 0 {
156                 ormNode.AvgRttMS = sql.NullInt64{
157                         Int64: (ormNode.AvgRttMS.Int64*int64(latestLiveness.PongTimes) + rttMS) / int64(latestLiveness.PongTimes+1),
158                         Valid: true,
159                 }
160         }
161         latestLiveness.PongTimes += 1
162         if peerInfo.Height != 0 {
163                 latestLiveness.BestHeight = peerInfo.Height
164                 ormNode.BestHeight = peerInfo.Height
165         }
166         if err := dbTx.Save(latestLiveness).Error; err != nil {
167                 return err
168         }
169
170         // calc LatestDailyUptimeMinutes
171         total := 0 * time.Minute
172         ormNodeLivenesses[0].UpdatedAt = now
173         for _, ormNodeLiveness := range ormNodeLivenesses {
174                 if ormNodeLiveness.CreatedAt.Before(yesterday) {
175                         ormNodeLiveness.CreatedAt = yesterday
176                 }
177
178                 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
179         }
180         ormNode.LatestDailyUptimeMinutes = uint64(total.Minutes())
181         ormNode.Alias = peerInfo.Moniker
182         ormNode.Xpub = peerInfo.ID
183         return dbTx.Save(ormNode).Error
184 }