OSDN Git Service

wip
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/p2p"
14         "github.com/vapor/toolbar/precog/common"
15         "github.com/vapor/toolbar/precog/config"
16         "github.com/vapor/toolbar/precog/database/orm"
17 )
18
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upSertNode(node *config.Node) error {
21         if node.XPub != nil {
22                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
23         }
24
25         ormNode := &orm.Node{PublicKey: node.PublicKey}
26         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
27                 return err
28         }
29
30         if node.Alias != "" {
31                 ormNode.Alias = node.Alias
32         }
33         if node.XPub != nil {
34                 ormNode.Xpub = node.XPub.String()
35         }
36         ormNode.Host = node.Host
37         ormNode.Port = node.Port
38         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
39                 Assign(&orm.Node{
40                         Xpub:  ormNode.Xpub,
41                         Alias: ormNode.Alias,
42                         Host:  ormNode.Host,
43                         Port:  ormNode.Port,
44                 }).FirstOrCreate(ormNode).Error
45 }
46
47 func (m *monitor) processDialResults() error {
48         log.Info("================================================================")
49         var ormNodes []*orm.Node
50         if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
51                 return err
52         }
53
54         xPub := &chainkd.XPub{}
55         connMap := make(map[string]bool, len(ormNodes))
56         // connected peers
57         for _, peer := range m.sw.GetPeers().List() {
58                 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
59                         log.Error(err)
60                         continue
61                 }
62
63                 publicKey := xPub.PublicKey().String()
64                 connMap[publicKey] = true
65                 if err := m.processConnectedPeer(publicKey, peer); err != nil {
66                         log.Error(err)
67                 }
68         }
69
70         // offline peers
71         for _, ormNode := range ormNodes {
72                 if _, ok := connMap[ormNode.PublicKey]; ok {
73                         continue
74                 }
75
76                 if err := m.processOfflinePeer(ormNode); err != nil {
77                         log.Error(err)
78                 }
79         }
80
81         return nil
82 }
83
84 // TODO: add start time here
85 func (m *monitor) processConnectedPeer(publicKey string, peer *p2p.Peer) error {
86         ormNodeLiveness := &orm.NodeLiveness{}
87         if err := m.db.Model(&orm.NodeLiveness{}).
88                 Joins("join nodes on nodes.id = node_livenesses.node_id").
89                 Where("nodes.public_key = ?", publicKey).First(ormNodeLiveness).Error; err != nil {
90                 return err
91         }
92
93         return nil
94 }
95
96 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
97         return m.db.Model(&orm.NodeLiveness{}).
98                 Where(&orm.NodeLiveness{NodeID: ormNode.ID}).
99                 UpdateColumn(&orm.NodeLiveness{
100                         Status: common.NodeOfflineStatus,
101                 }).Error
102 }
103
104 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
105         for _, peerInfo := range peerInfos {
106                 dbTx := m.db.Begin()
107                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
108                         log.Error(err)
109                         dbTx.Rollback()
110                 } else {
111                         dbTx.Commit()
112                 }
113         }
114
115         return nil
116 }
117
118 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
119         xPub := &chainkd.XPub{}
120         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
121                 return err
122         }
123
124         ormNode := &orm.Node{}
125         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
126                 return err
127         }
128
129         log.Debugf("peerInfo.Ping: %v", peerInfo.Ping)
130         ping, err := time.ParseDuration(peerInfo.Ping)
131         if err != nil {
132                 log.Debugf("Parse ping time err: %v", err)
133         }
134
135         // TODO: preload?
136         ormNodeLiveness := &orm.NodeLiveness{
137                 NodeID:        ormNode.ID,
138                 BestHeight:    ormNode.BestHeight,
139                 AvgLantencyMS: sql.NullInt64{Int64: ping.Nanoseconds() / 1000, Valid: true},
140                 // PingTimes     uint64
141                 // PongTimes     uint64
142         }
143         if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
144                 UpdateColumn(&orm.NodeLiveness{
145                         BestHeight:    ormNodeLiveness.BestHeight,
146                         AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,
147                 }).FirstOrCreate(ormNodeLiveness).Error; err != nil {
148                 return err
149         }
150
151         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
152                 UpdateColumn(&orm.Node{
153                         Alias:      peerInfo.Moniker,
154                         Xpub:       peerInfo.ID,
155                         BestHeight: peerInfo.Height,
156                         // LatestDailyUptimeMinutes uint64
157                 }).First(ormNode).Error; err != nil {
158                 return err
159         }
160
161         return nil
162 }