10 "github.com/jinzhu/gorm"
11 log "github.com/sirupsen/logrus"
13 "github.com/vapor/netsync/peers"
14 "github.com/vapor/p2p"
15 "github.com/vapor/toolbar/precog/common"
16 "github.com/vapor/toolbar/precog/config"
17 "github.com/vapor/toolbar/precog/database/orm"
20 func (m *monitor) upsertNode(node *config.Node) error {
25 if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
29 ormNode.PublicKey = node.PublicKey
31 ormNode.Xpub = node.XPub.String()
32 ormNode.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
34 return m.db.Save(ormNode).Error
37 func parseRemoteAddr(remoteAddr string) (string, uint16, error) {
38 host, portStr, err := net.SplitHostPort(remoteAddr)
43 port, err := strconv.Atoi(portStr)
48 return host, uint16(port), nil
51 func (m *monitor) processDialResults(peerList []*p2p.Peer) error {
52 var ormNodes []*orm.Node
53 if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
57 addressMap := make(map[string]*orm.Node, len(ormNodes))
58 for _, ormNode := range ormNodes {
59 addressMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)] = ormNode
62 connMap := make(map[string]bool, len(ormNodes))
64 for _, peer := range peerList {
65 connMap[peer.RemoteAddr] = true
66 if err := m.processConnectedPeer(addressMap[peer.RemoteAddr]); err != nil {
67 log.WithFields(log.Fields{"peer remoteAddr": peer.RemoteAddr, "err": err}).Error("processConnectedPeer")
72 for _, ormNode := range ormNodes {
73 if _, ok := connMap[fmt.Sprintf("%s:%d", ormNode.IP, ormNode.Port)]; ok {
77 if err := m.processOfflinePeer(ormNode); err != nil {
78 log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
85 func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
86 ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
87 err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
88 if err != nil && err != gorm.ErrRecordNotFound {
92 ormNodeLiveness.PongTimes += 1
93 if ormNode.Status == common.NodeOfflineStatus {
94 ormNode.Status = common.NodeUnknownStatus
96 ormNodeLiveness.Node = ormNode
97 return m.db.Save(ormNodeLiveness).Error
100 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
101 ormNode.Status = common.NodeOfflineStatus
102 return m.db.Save(ormNode).Error
105 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
106 for _, peerInfo := range peerInfos {
108 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
109 log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
117 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
118 ip, port, err := parseRemoteAddr(peerInfo.RemoteAddr)
123 ormNode := &orm.Node{IP: ip, Port: uint16(port)}
124 if err := dbTx.Where(ormNode).First(ormNode).Error; err != nil {
128 if ormNode.Status == common.NodeOfflineStatus {
129 return fmt.Errorf("node %s:%d status error", ormNode.IP, ormNode.Port)
132 log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
133 ping, err := time.ParseDuration(peerInfo.Ping)
139 yesterday := now.Add(-24 * time.Hour)
140 var ormNodeLivenesses []*orm.NodeLiveness
141 if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
142 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
143 Order(fmt.Sprintf("created_at %s", "DESC")).Find(&ormNodeLivenesses).Error; err != nil {
147 // update latest liveness
148 latestLiveness := ormNodeLivenesses[0]
149 rttMS := ping.Nanoseconds() / 1000000
150 if rttMS > 0 && rttMS < 2000 {
151 ormNode.Status = common.NodeHealthyStatus
152 } else if rttMS > 2000 {
153 ormNode.Status = common.NodeCongestedStatus
156 ormNode.AvgRttMS = sql.NullInt64{
157 Int64: (ormNode.AvgRttMS.Int64*int64(latestLiveness.PongTimes) + rttMS) / int64(latestLiveness.PongTimes+1),
161 latestLiveness.PongTimes += 1
162 if peerInfo.Height != 0 {
163 latestLiveness.BestHeight = peerInfo.Height
164 ormNode.BestHeight = peerInfo.Height
166 if err := dbTx.Save(latestLiveness).Error; err != nil {
170 // calc LatestDailyUptimeMinutes
171 total := 0 * time.Minute
172 ormNodeLivenesses[0].UpdatedAt = now
173 for _, ormNodeLiveness := range ormNodeLivenesses {
174 if ormNodeLiveness.CreatedAt.Before(yesterday) {
175 ormNodeLiveness.CreatedAt = yesterday
178 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
180 ormNode.LatestDailyUptimeMinutes = uint64(total.Minutes())
181 ormNode.Alias = peerInfo.Moniker
182 ormNode.Xpub = peerInfo.ID
183 return dbTx.Save(ormNode).Error