OSDN Git Service

clean
[bytom/vapor.git] / toolbar / precog / monitor / stats.go
index 44ae8b1..f16b09e 100644 (file)
@@ -10,40 +10,33 @@ import (
 
        "github.com/vapor/crypto/ed25519/chainkd"
        "github.com/vapor/netsync/peers"
-       "github.com/vapor/p2p"
        "github.com/vapor/toolbar/precog/common"
        "github.com/vapor/toolbar/precog/config"
        "github.com/vapor/toolbar/precog/database/orm"
 )
 
 // create or update: https://github.com/jinzhu/gorm/issues/1307
-func (m *monitor) upSertNode(node *config.Node) error {
+func (m *monitor) upsertNode(node *config.Node) error {
        if node.XPub != nil {
                node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
        }
 
-       ormNode := &orm.Node{PublicKey: node.PublicKey}
-       if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+       ormNode := &orm.Node{
+               IP:   node.IP,
+               Port: node.Port,
+       }
+       if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
                return err
        }
 
-       if node.Alias != "" {
-               ormNode.Alias = node.Alias
-       }
+       ormNode.PublicKey = node.PublicKey
        if node.XPub != nil {
                ormNode.Xpub = node.XPub.String()
        }
-       ormNode.IP = node.IP
-       ormNode.Port = node.Port
-       return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
-               Assign(&orm.Node{
-                       Xpub:  ormNode.Xpub,
-                       Alias: ormNode.Alias,
-                       IP:    ormNode.IP,
-                       Port:  ormNode.Port,
-               }).FirstOrCreate(ormNode).Error
+       return m.db.Save(ormNode).Error
 }
 
+// TODO: maybe return connected nodes here for checkStatus
 func (m *monitor) processDialResults() error {
        var ormNodes []*orm.Node
        if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
@@ -60,14 +53,14 @@ func (m *monitor) processDialResults() error {
        for _, peer := range m.sw.GetPeers().List() {
                xPub := &chainkd.XPub{}
                if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
-                       log.Error(err)
+                       log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub")
                        continue
                }
 
                publicKey := xPub.PublicKey().String()
                connMap[publicKey] = true
-               if err := m.processConnectedPeer(publicKeyMap[publicKey], peer); err != nil {
-                       log.Error(err)
+               if err := m.processConnectedPeer(publicKeyMap[publicKey]); err != nil {
+                       log.WithFields(log.Fields{"peer publicKey": publicKey, "err": err}).Error("processConnectedPeer")
                }
        }
 
@@ -78,53 +71,43 @@ func (m *monitor) processDialResults() error {
                }
 
                if err := m.processOfflinePeer(ormNode); err != nil {
-                       log.Error(err)
+                       log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
                }
        }
 
        return nil
 }
 
-func (m *monitor) processConnectedPeer(ormNode *orm.Node, peer *p2p.Peer) error {
-       ormNodeLiveness := &orm.NodeLiveness{}
-       err := m.db.Model(&orm.NodeLiveness{}).Joins("join nodes on nodes.id = node_livenesses.node_id").
-               Where("nodes.public_key = ? AND status != ?", ormNode.PublicKey, common.NodeOfflineStatus).Last(ormNodeLiveness).Error
-       if err == nil {
-               return m.db.Model(&orm.NodeLiveness{}).Where(ormNodeLiveness).UpdateColumn(&orm.NodeLiveness{
-                       PingTimes: ormNodeLiveness.PingTimes + 1,
-               }).Error
-       } else if err != gorm.ErrRecordNotFound {
+func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
+       ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
+       err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
+       if err != nil && err != gorm.ErrRecordNotFound {
                return err
        }
 
-       // gorm.ErrRecordNotFound
-       return m.db.Create(&orm.NodeLiveness{
-               NodeID:    ormNode.ID,
-               PingTimes: 1,
-               Status:    common.NodeUnknownStatus,
-       }).Error
+       ormNodeLiveness.PongTimes += 1
+       if ormNode.Status == common.NodeOfflineStatus {
+               ormNode.Status = common.NodeUnknownStatus
+       }
+       ormNodeLiveness.Node = ormNode
+       return m.db.Save(ormNodeLiveness).Error
 }
 
 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
-       return m.db.Model(&orm.NodeLiveness{}).
-               Where(&orm.NodeLiveness{NodeID: ormNode.ID}).
-               UpdateColumn(&orm.NodeLiveness{
-                       Status: common.NodeOfflineStatus,
-               }).Error
+       ormNode.Status = common.NodeOfflineStatus
+       return m.db.Save(ormNode).Error
 }
 
-func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
+func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
        for _, peerInfo := range peerInfos {
                dbTx := m.db.Begin()
                if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
-                       log.Error(err)
+                       log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
                        dbTx.Rollback()
                } else {
                        dbTx.Commit()
                }
        }
-
-       return nil
 }
 
 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
@@ -138,7 +121,11 @@ func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error
                return err
        }
 
-       log.Debugf("peerInfo ping: %v", peerInfo.Ping)
+       if ormNode.Status == common.NodeOfflineStatus {
+               return fmt.Errorf("node %s status error", ormNode.PublicKey)
+       }
+
+       log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
        ping, err := time.ParseDuration(peerInfo.Ping)
        if err != nil {
                return err
@@ -147,7 +134,7 @@ func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error
        now := time.Now()
        yesterday := now.Add(-24 * time.Hour)
        var ormNodeLivenesses []*orm.NodeLiveness
-       if err := dbTx.Model(&orm.NodeLiveness{}).
+       if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
                Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
                Order(fmt.Sprintf("created_at %s", "DESC")).
                Find(&ormNodeLivenesses).Error; err != nil {
@@ -156,14 +143,10 @@ func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error
 
        // update latest liveness
        latestLiveness := ormNodeLivenesses[0]
-       if latestLiveness.Status == common.NodeOfflineStatus {
-               return fmt.Errorf("node %s latest liveness status error", ormNode.PublicKey)
-       }
-
        lantencyMS := ping.Nanoseconds() / 1000
        if lantencyMS != 0 {
-               latestLiveness.AvgLantencyMS = sql.NullInt64{
-                       Int64: (latestLiveness.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
+               ormNode.AvgLantencyMS = sql.NullInt64{
+                       Int64: (ormNode.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
                        Valid: true,
                }
        }