X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=toolbar%2Fprecog%2Fmonitor%2Fstats.go;h=13c8818ecfddc336d74ce015a3d765e4597ef5f1;hb=e58860febe4cdc588aeab377c0c4dd658f4ba724;hp=e3fe0de7c3ade1d7aed6cd2d1cf0f3f718c786d3;hpb=a8ccbd941822183c87b5436281c2728572db10ec;p=bytom%2Fvapor.git diff --git a/toolbar/precog/monitor/stats.go b/toolbar/precog/monitor/stats.go index e3fe0de7..13c8818e 100644 --- a/toolbar/precog/monitor/stats.go +++ b/toolbar/precog/monitor/stats.go @@ -3,12 +3,13 @@ package monitor import ( "database/sql" "fmt" + "net" + "strconv" "time" "github.com/jinzhu/gorm" log "github.com/sirupsen/logrus" - "github.com/vapor/crypto/ed25519/chainkd" "github.com/vapor/netsync/peers" "github.com/vapor/p2p" "github.com/vapor/toolbar/precog/common" @@ -16,113 +17,96 @@ import ( "github.com/vapor/toolbar/precog/database/orm" ) -// create or update: https://github.com/jinzhu/gorm/issues/1307 -func (m *monitor) upSertNode(node *config.Node) error { - if node.XPub != nil { - node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String()) +func (m *monitor) upsertNode(node *config.Node) error { + ormNode := &orm.Node{ + IP: node.IP, + Port: node.Port, } - - ormNode := &orm.Node{PublicKey: node.PublicKey} - if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound { + if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound { return err } - if node.Alias != "" { - ormNode.Alias = node.Alias - } + ormNode.PublicKey = node.PublicKey if node.XPub != nil { ormNode.Xpub = node.XPub.String() + ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String()) } - ormNode.IP = node.IP - ormNode.Port = node.Port - return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}). - Assign(&orm.Node{ - Xpub: ormNode.Xpub, - Alias: ormNode.Alias, - IP: ormNode.IP, - Port: ormNode.Port, - }).FirstOrCreate(ormNode).Error + return m.db.Save(ormNode).Error +} + +func parseRemoteAddr(remoteAddr string) (string, uint16, error) { + host, portStr, err := net.SplitHostPort(remoteAddr) + if err != nil { + return "", 0, err + } + + port, err := strconv.Atoi(portStr) + if err != nil { + return "", 0, err + } + + return host, uint16(port), nil } -func (m *monitor) processDialResults() error { +func (m *monitor) processDialResults(peerList []*p2p.Peer) error { var ormNodes []*orm.Node if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil { return err } - publicKeyMap := make(map[string]*orm.Node, len(ormNodes)) + addressMap := make(map[string]*orm.Node, len(ormNodes)) for _, ormNode := range ormNodes { - publicKeyMap[ormNode.PublicKey] = ormNode + addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode } connMap := make(map[string]bool, len(ormNodes)) // connected peers - for _, peer := range m.sw.GetPeers().List() { - xPub := &chainkd.XPub{} - if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil { - log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub") - continue - } - - publicKey := xPub.PublicKey().String() - connMap[publicKey] = true - if err := m.processConnectedPeer(publicKeyMap[publicKey], peer); err != nil { - log.WithFields(log.Fields{ - "peer publicKey": publicKey, - "err": err, - }).Error("processConnectedPeer") + for _, peer := range peerList { + connMap[peer.RemoteAddr] = true + if err := m.processConnectedPeer(addressMap[peer.RemoteAddr]); err != nil { + log.WithFields(log.Fields{"peer remoteAddr": peer.RemoteAddr, "err": err}).Error("processConnectedPeer") } } // offline peers for _, ormNode := range ormNodes { - if _, ok := connMap[ormNode.PublicKey]; ok { + if _, ok := connMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)]; ok { continue } if err := m.processOfflinePeer(ormNode); err != nil { - log.WithFields(log.Fields{ - "peer publicKey": ormNode.PublicKey, - "err": err, - }).Error("processOfflinePeer") + log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer") } } return nil } -func (m *monitor) processConnectedPeer(ormNode *orm.Node, peer *p2p.Peer) error { - ormNodeLiveness := &orm.NodeLiveness{} - err := m.db.Model(&orm.NodeLiveness{}).Joins("join nodes on nodes.id = node_livenesses.node_id"). - Where("nodes.public_key = ? AND status != ?", ormNode.PublicKey, common.NodeOfflineStatus).Last(ormNodeLiveness).Error - if err == nil { - return m.db.Model(&orm.NodeLiveness{}).Where(ormNodeLiveness).UpdateColumn(&orm.NodeLiveness{ - PingTimes: ormNodeLiveness.PingTimes + 1, - }).Error - } else if err != gorm.ErrRecordNotFound { +func (m *monitor) processConnectedPeer(ormNode *orm.Node) error { + ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID} + err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error + if err != nil && err != gorm.ErrRecordNotFound { return err } - // gorm.ErrRecordNotFound - return m.db.Create(&orm.NodeLiveness{ - NodeID: ormNode.ID, - PingTimes: 1, - Status: common.NodeUnknownStatus, - }).Error + ormNodeLiveness.PongTimes += 1 + if ormNode.Status == common.NodeOfflineStatus { + ormNode.Status = common.NodeUnknownStatus + } + ormNodeLiveness.Node = ormNode + return m.db.Save(ormNodeLiveness).Error } func (m *monitor) processOfflinePeer(ormNode *orm.Node) error { - return m.db.Model(&orm.NodeLiveness{}).Where(&orm.NodeLiveness{NodeID: ormNode.ID}).UpdateColumn(&orm.NodeLiveness{Status: common.NodeOfflineStatus}).Error + ormNode.Status = common.NodeOfflineStatus + return m.db.Save(ormNode).Error } func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) { for _, peerInfo := range peerInfos { dbTx := m.db.Begin() if err := m.processPeerInfo(dbTx, peerInfo); err != nil { - log.WithFields(log.Fields{ - "peerInfo": peerInfo, - "err": err, - }).Error("processPeerInfo") + log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo") dbTx.Rollback() } else { dbTx.Commit() @@ -131,16 +115,20 @@ func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) { } func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error { - xPub := &chainkd.XPub{} - if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil { + ip, port, err := parseRemoteAddr(peerInfo.RemoteAddr) + if err != nil { return err } - ormNode := &orm.Node{} - if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil { + ormNode := &orm.Node{IP: ip, Port: uint16(port)} + if err := dbTx.Where(ormNode).First(ormNode).Error; err != nil { return err } + if ormNode.Status == common.NodeOfflineStatus { + return fmt.Errorf("node %s:%d status error", ormNode.IP, ormNode.Port) + } + log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo") ping, err := time.ParseDuration(peerInfo.Ping) if err != nil { @@ -150,29 +138,30 @@ func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error now := time.Now() yesterday := now.Add(-24 * time.Hour) var ormNodeLivenesses []*orm.NodeLiveness - if err := dbTx.Model(&orm.NodeLiveness{}). + if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}). Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday). - Order(fmt.Sprintf("created_at %s", "DESC")). - Find(&ormNodeLivenesses).Error; err != nil { + Order(fmt.Sprintf("created_at %s", "DESC")).Find(&ormNodeLivenesses).Error; err != nil { return err } // update latest liveness latestLiveness := ormNodeLivenesses[0] - if latestLiveness.Status == common.NodeOfflineStatus { - return fmt.Errorf("node %s latest liveness status error", ormNode.PublicKey) - } - - lantencyMS := ping.Nanoseconds() / 1000 - if lantencyMS != 0 { - latestLiveness.AvgLantencyMS = sql.NullInt64{ - Int64: (latestLiveness.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1), + rttMS := ping.Nanoseconds() / 1000000 + if rttMS > 0 && rttMS < 2000 { + ormNode.Status = common.NodeHealthyStatus + } else if rttMS > 2000 { + ormNode.Status = common.NodeCongestedStatus + } + if rttMS != 0 { + ormNode.AvgRttMS = sql.NullInt64{ + Int64: (ormNode.AvgRttMS.Int64*int64(latestLiveness.PongTimes) + rttMS) / int64(latestLiveness.PongTimes+1), Valid: true, } } latestLiveness.PongTimes += 1 if peerInfo.Height != 0 { latestLiveness.BestHeight = peerInfo.Height + ormNode.BestHeight = peerInfo.Height } if err := dbTx.Save(latestLiveness).Error; err != nil { return err @@ -188,12 +177,8 @@ func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt) } - - return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}). - UpdateColumn(&orm.Node{ - Alias: peerInfo.Moniker, - Xpub: peerInfo.ID, - BestHeight: peerInfo.Height, - LatestDailyUptimeMinutes: uint64(total.Minutes()), - }).First(ormNode).Error + ormNode.LatestDailyUptimeMinutes = uint64(total.Minutes()) + ormNode.Alias = peerInfo.Moniker + ormNode.Xpub = peerInfo.ID + return dbTx.Save(ormNode).Error }