8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/crypto/ed25519/chainkd"
12 "github.com/vapor/netsync/peers"
13 "github.com/vapor/p2p"
14 "github.com/vapor/toolbar/precog/common"
15 "github.com/vapor/toolbar/precog/config"
16 "github.com/vapor/toolbar/precog/database/orm"
19 func (m *monitor) upsertNode(node *config.Node) error {
24 if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
28 ormNode.PublicKey = node.PublicKey
30 ormNode.Xpub = node.XPub.String()
31 ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
33 return m.db.Save(ormNode).Error
36 func (m *monitor) processDialResults(peerList []*p2p.Peer) error {
37 var ormNodes []*orm.Node
38 if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
42 addressMap := make(map[string]*orm.Node, len(ormNodes))
43 for _, ormNode := range ormNodes {
44 addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode
47 connMap := make(map[string]bool, len(ormNodes))
49 for _, peer := range peerList {
50 connMap[peer.ListenAddr] = true
51 if err := m.processConnectedPeer(addressMap[peer.ListenAddr]); err != nil {
52 log.WithFields(log.Fields{"peer listenAddr": peer.ListenAddr, "err": err}).Error("processConnectedPeer")
57 for _, ormNode := range ormNodes {
58 if _, ok := connMap[peer.ListenAddr]; ok {
62 if err := m.processOfflinePeer(ormNode); err != nil {
63 log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
70 func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
71 ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
72 err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
73 if err != nil && err != gorm.ErrRecordNotFound {
77 ormNodeLiveness.PongTimes += 1
78 if ormNode.Status == common.NodeOfflineStatus {
79 ormNode.Status = common.NodeUnknownStatus
81 ormNodeLiveness.Node = ormNode
82 return m.db.Save(ormNodeLiveness).Error
85 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
86 ormNode.Status = common.NodeOfflineStatus
87 return m.db.Save(ormNode).Error
90 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
91 for _, peerInfo := range peerInfos {
93 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
94 log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
102 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
103 xPub := &chainkd.XPub{}
104 if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
108 ormNode := &orm.Node{}
109 if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
113 if ormNode.Status == common.NodeOfflineStatus {
114 return fmt.Errorf("node %s status error", ormNode.PublicKey)
117 log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
118 ping, err := time.ParseDuration(peerInfo.Ping)
124 yesterday := now.Add(-24 * time.Hour)
125 var ormNodeLivenesses []*orm.NodeLiveness
126 if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
127 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
128 Order(fmt.Sprintf("created_at %s", "DESC")).
129 Find(&ormNodeLivenesses).Error; err != nil {
133 // update latest liveness
134 latestLiveness := ormNodeLivenesses[0]
135 lantencyMS := ping.Nanoseconds() / 1000
137 ormNode.AvgLantencyMS = sql.NullInt64{
138 Int64: (ormNode.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
142 latestLiveness.PongTimes += 1
143 if peerInfo.Height != 0 {
144 latestLiveness.BestHeight = peerInfo.Height
146 if err := dbTx.Save(latestLiveness).Error; err != nil {
150 // calc LatestDailyUptimeMinutes
151 total := 0 * time.Minute
152 ormNodeLivenesses[0].UpdatedAt = now
153 for _, ormNodeLiveness := range ormNodeLivenesses {
154 if ormNodeLiveness.CreatedAt.Before(yesterday) {
155 ormNodeLiveness.CreatedAt = yesterday
158 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
161 return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
162 UpdateColumn(&orm.Node{
163 Alias: peerInfo.Moniker,
165 BestHeight: peerInfo.Height,
166 LatestDailyUptimeMinutes: uint64(total.Minutes()),
167 }).First(ormNode).Error