OSDN Git Service

fix
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
index 0ea4103..13c8818 100644 (file)
@@ -3,80 +3,182 @@ package monitor
 import (
        "database/sql"
        "fmt"
+       "net"
+       "strconv"
+       "time"
 
        "github.com/jinzhu/gorm"
        log "github.com/sirupsen/logrus"
 
-       "github.com/vapor/crypto/ed25519/chainkd"
        "github.com/vapor/netsync/peers"
+       "github.com/vapor/p2p"
        "github.com/vapor/toolbar/precog/common"
        "github.com/vapor/toolbar/precog/config"
        "github.com/vapor/toolbar/precog/database/orm"
 )
 
-// TODO: get lantency
-// TODO: get best_height
-// TODO: decide check_height("best best_height" - "confirmations")
-// TODO: get blockhash by check_height, get latency
-// TODO: update lantency, active_time and status
+func (m *monitor) upsertNode(node *config.Node) error {
+       ormNode := &orm.Node{
+               IP:   node.IP,
+               Port: node.Port,
+       }
+       if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+               return err
+       }
 
-// create or update: https://github.com/jinzhu/gorm/issues/1307
-func (m *monitor) upSertNode(node *config.Node) error {
+       ormNode.PublicKey = node.PublicKey
        if node.XPub != nil {
-               node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
+               ormNode.Xpub = node.XPub.String()
+               ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
        }
+       return m.db.Save(ormNode).Error
+}
+
+func parseRemoteAddr(remoteAddr string) (string, uint16, error) {
+       host, portStr, err := net.SplitHostPort(remoteAddr)
+       if err != nil {
+               return "", 0, err
+       }
+
+       port, err := strconv.Atoi(portStr)
+       if err != nil {
+               return "", 0, err
+       }
+
+       return host, uint16(port), nil
+}
 
-       ormNode := &orm.Node{PublicKey: node.PublicKey}
-       if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+func (m *monitor) processDialResults(peerList []*p2p.Peer) error {
+       var ormNodes []*orm.Node
+       if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
                return err
        }
 
-       if node.Alias != "" {
-               ormNode.Alias = node.Alias
+       addressMap := make(map[string]*orm.Node, len(ormNodes))
+       for _, ormNode := range ormNodes {
+               addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode
        }
-       if node.XPub != nil {
-               ormNode.Xpub = node.XPub.String()
+
+       connMap := make(map[string]bool, len(ormNodes))
+       // connected peers
+       for _, peer := range peerList {
+               connMap[peer.RemoteAddr] = true
+               if err := m.processConnectedPeer(addressMap[peer.RemoteAddr]); err != nil {
+                       log.WithFields(log.Fields{"peer remoteAddr": peer.RemoteAddr, "err": err}).Error("processConnectedPeer")
+               }
+       }
+
+       // offline peers
+       for _, ormNode := range ormNodes {
+               if _, ok := connMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)]; ok {
+                       continue
+               }
+
+               if err := m.processOfflinePeer(ormNode); err != nil {
+                       log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
+               }
        }
-       ormNode.Host = node.Host
-       ormNode.Port = node.Port
-       return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
-               Assign(&orm.Node{
-                       Xpub:  ormNode.Xpub,
-                       Alias: ormNode.Alias,
-                       Host:  ormNode.Host,
-                       Port:  ormNode.Port,
-               }).FirstOrCreate(ormNode).Error
+
+       return nil
 }
 
-func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error {
-       xPub := &chainkd.XPub{}
-       if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
+func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
+       ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
+       err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
+       if err != nil && err != gorm.ErrRecordNotFound {
                return err
        }
 
-       ormNode := &orm.Node{}
-       if err := m.db.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
-               UpdateColumn(&orm.Node{
-                       Alias:      peerInfo.Moniker,
-                       Xpub:       peerInfo.ID,
-                       BestHeight: peerInfo.Height,
-                       // LatestDailyUptimeMinutes uint64
-               }).First(ormNode).Error; err != nil {
+       ormNodeLiveness.PongTimes += 1
+       if ormNode.Status == common.NodeOfflineStatus {
+               ormNode.Status = common.NodeUnknownStatus
+       }
+       ormNodeLiveness.Node = ormNode
+       return m.db.Save(ormNodeLiveness).Error
+}
+
+func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
+       ormNode.Status = common.NodeOfflineStatus
+       return m.db.Save(ormNode).Error
+}
+
+func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
+       for _, peerInfo := range peerInfos {
+               dbTx := m.db.Begin()
+               if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
+                       log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
+                       dbTx.Rollback()
+               } else {
+                       dbTx.Commit()
+               }
+       }
+}
+
+func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
+       ip, port, err := parseRemoteAddr(peerInfo.RemoteAddr)
+       if err != nil {
+               return err
+       }
+
+       ormNode := &orm.Node{IP: ip, Port: uint16(port)}
+       if err := dbTx.Where(ormNode).First(ormNode).Error; err != nil {
+               return err
+       }
+
+       if ormNode.Status == common.NodeOfflineStatus {
+               return fmt.Errorf("node %s:%d status error", ormNode.IP, ormNode.Port)
+       }
+
+       log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
+       ping, err := time.ParseDuration(peerInfo.Ping)
+       if err != nil {
+               return err
+       }
+
+       now := time.Now()
+       yesterday := now.Add(-24 * time.Hour)
+       var ormNodeLivenesses []*orm.NodeLiveness
+       if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
+               Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
+               Order(fmt.Sprintf("created_at %s", "DESC")).Find(&ormNodeLivenesses).Error; err != nil {
+               return err
+       }
+
+       // update latest liveness
+       latestLiveness := ormNodeLivenesses[0]
+       rttMS := ping.Nanoseconds() / 1000000
+       if rttMS > 0 && rttMS < 2000 {
+               ormNode.Status = common.NodeHealthyStatus
+       } else if rttMS > 2000 {
+               ormNode.Status = common.NodeCongestedStatus
+       }
+       if rttMS != 0 {
+               ormNode.AvgRttMS = sql.NullInt64{
+                       Int64: (ormNode.AvgRttMS.Int64*int64(latestLiveness.PongTimes) + rttMS) / int64(latestLiveness.PongTimes+1),
+                       Valid: true,
+               }
+       }
+       latestLiveness.PongTimes += 1
+       if peerInfo.Height != 0 {
+               latestLiveness.BestHeight = peerInfo.Height
+               ormNode.BestHeight = peerInfo.Height
+       }
+       if err := dbTx.Save(latestLiveness).Error; err != nil {
                return err
        }
 
-       log.Debug("peerInfo.Ping:", peerInfo.Ping)
+       // calc LatestDailyUptimeMinutes
+       total := 0 * time.Minute
+       ormNodeLivenesses[0].UpdatedAt = now
+       for _, ormNodeLiveness := range ormNodeLivenesses {
+               if ormNodeLiveness.CreatedAt.Before(yesterday) {
+                       ormNodeLiveness.CreatedAt = yesterday
+               }
 
-       ormNodeLiveness := &orm.NodeLiveness{
-               NodeID:        ormNode.ID,
-               BestHeight:    ormNode.BestHeight,
-               AvgLantencyMS: sql.NullInt64{Int64: 1, Valid: true},
-               // PingTimes     uint64
-               // PongTimes     uint64
+               total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
        }
-       return m.db.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
-               UpdateColumn(&orm.NodeLiveness{
-                       BestHeight:    ormNodeLiveness.BestHeight,
-                       AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,
-               }).FirstOrCreate(ormNodeLiveness).Error
+       ormNode.LatestDailyUptimeMinutes = uint64(total.Minutes())
+       ormNode.Alias = peerInfo.Moniker
+       ormNode.Xpub = peerInfo.ID
+       return dbTx.Save(ormNode).Error
 }