8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/crypto/ed25519/chainkd"
12 "github.com/vapor/netsync/peers"
13 "github.com/vapor/p2p"
14 "github.com/vapor/toolbar/precog/common"
15 "github.com/vapor/toolbar/precog/config"
16 "github.com/vapor/toolbar/precog/database/orm"
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upsertNode(node *config.Node) error {
22 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
25 ormNode := &orm.Node{PublicKey: node.PublicKey}
26 if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
31 ormNode.Xpub = node.XPub.String()
34 ormNode.Port = node.Port
35 return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
40 }).FirstOrCreate(ormNode).Error
43 func (m *monitor) processDialResults() error {
44 var ormNodes []*orm.Node
45 if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
49 publicKeyMap := make(map[string]*orm.Node, len(ormNodes))
50 for _, ormNode := range ormNodes {
51 publicKeyMap[ormNode.PublicKey] = ormNode
54 connMap := make(map[string]bool, len(ormNodes))
56 for _, peer := range m.sw.GetPeers().List() {
57 xPub := &chainkd.XPub{}
58 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
59 log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub")
63 publicKey := xPub.PublicKey().String()
64 connMap[publicKey] = true
65 if err := m.processConnectedPeer(publicKeyMap[publicKey], peer); err != nil {
66 log.WithFields(log.Fields{"peer publicKey": publicKey, "err": err}).Error("processConnectedPeer")
71 for _, ormNode := range ormNodes {
72 if _, ok := connMap[ormNode.PublicKey]; ok {
76 if err := m.processOfflinePeer(ormNode); err != nil {
77 log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
84 func (m *monitor) processConnectedPeer(ormNode *orm.Node, peer *p2p.Peer) error {
85 ormNodeLiveness := &orm.NodeLiveness{}
86 err := m.db.Model(&orm.NodeLiveness{}).Joins("join nodes on nodes.id = node_livenesses.node_id").
87 Where("nodes.public_key = ? AND status != ?", ormNode.PublicKey, common.NodeOfflineStatus).Last(ormNodeLiveness).Error
89 return m.db.Model(&orm.NodeLiveness{}).Where(ormNodeLiveness).UpdateColumn(&orm.NodeLiveness{
90 PingTimes: ormNodeLiveness.PingTimes + 1,
92 } else if err != gorm.ErrRecordNotFound {
96 // gorm.ErrRecordNotFound
97 return m.db.Create(&orm.NodeLiveness{
100 Status: common.NodeUnknownStatus,
104 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
105 return m.db.Model(&orm.NodeLiveness{}).Where(&orm.NodeLiveness{NodeID: ormNode.ID}).UpdateColumn(&orm.NodeLiveness{Status: common.NodeOfflineStatus}).Error
108 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
109 for _, peerInfo := range peerInfos {
111 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
112 log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
120 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
121 xPub := &chainkd.XPub{}
122 if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
126 ormNode := &orm.Node{}
127 if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
131 log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
132 ping, err := time.ParseDuration(peerInfo.Ping)
138 yesterday := now.Add(-24 * time.Hour)
139 var ormNodeLivenesses []*orm.NodeLiveness
140 if err := dbTx.Model(&orm.NodeLiveness{}).
141 Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
142 Order(fmt.Sprintf("created_at %s", "DESC")).
143 Find(&ormNodeLivenesses).Error; err != nil {
147 // update latest liveness
148 latestLiveness := ormNodeLivenesses[0]
149 if latestLiveness.Status == common.NodeOfflineStatus {
150 return fmt.Errorf("node %s latest liveness status error", ormNode.PublicKey)
153 lantencyMS := ping.Nanoseconds() / 1000
155 latestLiveness.AvgLantencyMS = sql.NullInt64{
156 Int64: (latestLiveness.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
160 latestLiveness.PongTimes += 1
161 if peerInfo.Height != 0 {
162 latestLiveness.BestHeight = peerInfo.Height
164 if err := dbTx.Save(latestLiveness).Error; err != nil {
168 // calc LatestDailyUptimeMinutes
169 total := 0 * time.Minute
170 ormNodeLivenesses[0].UpdatedAt = now
171 for _, ormNodeLiveness := range ormNodeLivenesses {
172 if ormNodeLiveness.CreatedAt.Before(yesterday) {
173 ormNodeLiveness.CreatedAt = yesterday
176 total += ormNodeLiveness.UpdatedAt.Sub(ormNodeLiveness.CreatedAt)
179 return dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
180 UpdateColumn(&orm.Node{
181 Alias: peerInfo.Moniker,
183 BestHeight: peerInfo.Height,
184 LatestDailyUptimeMinutes: uint64(total.Minutes()),
185 }).First(ormNode).Error