OSDN Git Service

fix join
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
1 package monitor
2
3 import (
4         "database/sql"
5         "fmt"
6         "time"
7
8         "github.com/jinzhu/gorm"
9         log "github.com/sirupsen/logrus"
10
11         "github.com/vapor/crypto/ed25519/chainkd"
12         "github.com/vapor/netsync/peers"
13         "github.com/vapor/p2p"
14         "github.com/vapor/toolbar/precog/common"
15         "github.com/vapor/toolbar/precog/config"
16         "github.com/vapor/toolbar/precog/database/orm"
17 )
18
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upSertNode(node *config.Node) error {
21         if node.XPub != nil {
22                 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
23         }
24
25         ormNode := &orm.Node{PublicKey: node.PublicKey}
26         if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
27                 return err
28         }
29
30         if node.Alias != "" {
31                 ormNode.Alias = node.Alias
32         }
33         if node.XPub != nil {
34                 ormNode.Xpub = node.XPub.String()
35         }
36         ormNode.Host = node.Host
37         ormNode.Port = node.Port
38         return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
39                 Assign(&orm.Node{
40                         Xpub:  ormNode.Xpub,
41                         Alias: ormNode.Alias,
42                         Host:  ormNode.Host,
43                         Port:  ormNode.Port,
44                 }).FirstOrCreate(ormNode).Error
45 }
46
47 func (m *monitor) processDialResults() error {
48         for _, peer := range m.sw.GetPeers().List() {
49                 if err := m.processDialResult(peer); err != nil {
50                         log.Error(err)
51                 }
52         }
53         return nil
54 }
55
56 // TODO: add start time here
57 func (m *monitor) processDialResult(peer *p2p.Peer) error {
58         xPub := &chainkd.XPub{}
59         if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
60                 return err
61         }
62
63         ormNodeLiveness := &orm.NodeLiveness{}
64         if err := m.db.Model(&orm.NodeLiveness{}).
65                 Joins("join nodes on nodes.id = node_livenesses.node_id").
66                 Where("nodes.public_key = ?", xPub.PublicKey().String()).First(ormNodeLiveness).Error; err != nil {
67                 return err
68         }
69
70         return nil
71 }
72
73 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
74         for _, peerInfo := range peerInfos {
75                 dbTx := m.db.Begin()
76                 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
77                         log.Error(err)
78                         dbTx.Rollback()
79                 } else {
80                         dbTx.Commit()
81                 }
82         }
83
84         return nil
85 }
86
87 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
88         xPub := &chainkd.XPub{}
89         if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
90                 return err
91         }
92
93         ormNode := &orm.Node{}
94         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
95                 return err
96         }
97
98         log.Debugf("peerInfo.Ping: %v", peerInfo.Ping)
99         ping, err := time.ParseDuration(peerInfo.Ping)
100         if err != nil {
101                 log.Debugf("Parse ping time err: %v", err)
102         }
103
104         // TODO: preload?
105         ormNodeLiveness := &orm.NodeLiveness{
106                 NodeID:        ormNode.ID,
107                 BestHeight:    ormNode.BestHeight,
108                 AvgLantencyMS: sql.NullInt64{Int64: ping.Nanoseconds() / 1000, Valid: true},
109                 // PingTimes     uint64
110                 // PongTimes     uint64
111         }
112         if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
113                 UpdateColumn(&orm.NodeLiveness{
114                         BestHeight:    ormNodeLiveness.BestHeight,
115                         AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,
116                 }).FirstOrCreate(ormNodeLiveness).Error; err != nil {
117                 return err
118         }
119
120         if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
121                 UpdateColumn(&orm.Node{
122                         Alias:      peerInfo.Moniker,
123                         Xpub:       peerInfo.ID,
124                         BestHeight: peerInfo.Height,
125                         // LatestDailyUptimeMinutes uint64
126                 }).First(ormNode).Error; err != nil {
127                 return err
128         }
129
130         return nil
131 }