8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/crypto/ed25519/chainkd"
12 "github.com/vapor/netsync/peers"
13 "github.com/vapor/p2p"
14 "github.com/vapor/toolbar/precog/common"
15 "github.com/vapor/toolbar/precog/config"
16 "github.com/vapor/toolbar/precog/database/orm"
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upSertNode(node *config.Node) error {
22 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
25 ormNode := &orm.Node{PublicKey: node.PublicKey}
26 if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
31 ormNode.Alias = node.Alias
34 ormNode.Xpub = node.XPub.String()
36 ormNode.Host = node.Host
37 ormNode.Port = node.Port
38 return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
44 }).FirstOrCreate(ormNode).Error
47 func (m *monitor) processDialResults() error {
48 var ormNodes []*orm.Node
49 if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
53 publicKeyMap := make(map[string]*orm.Node, len(ormNodes))
54 for _, ormNode := range ormNodes {
55 publicKeyMap[ormNode.PublicKey] = ormNode
58 connMap := make(map[string]bool, len(ormNodes))
60 for _, peer := range m.sw.GetPeers().List() {
61 xPub := &chainkd.XPub{}
62 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
67 publicKey := xPub.PublicKey().String()
68 connMap[publicKey] = true
69 if err := m.processConnectedPeer(publicKeyMap[publicKey], peer); err != nil {
75 for _, ormNode := range ormNodes {
76 if _, ok := connMap[ormNode.PublicKey]; ok {
80 if err := m.processOfflinePeer(ormNode); err != nil {
88 func (m *monitor) processConnectedPeer(ormNode *orm.Node, peer *p2p.Peer) error {
89 ormNodeLiveness := &orm.NodeLiveness{}
90 err := m.db.Model(&orm.NodeLiveness{}).Joins("join nodes on nodes.id = node_livenesses.node_id").
91 Where("nodes.public_key = ? AND status != ?", ormNode.PublicKey, common.NodeOfflineStatus).Last(ormNodeLiveness).Error
93 return m.db.Model(&orm.NodeLiveness{}).Where(ormNodeLiveness).UpdateColumn(&orm.NodeLiveness{
94 PingTimes: ormNodeLiveness.PingTimes + 1,
96 } else if err != gorm.ErrRecordNotFound {
100 // gorm.ErrRecordNotFound
101 return m.db.Create(&orm.NodeLiveness{
104 Status: common.NodeUnknownStatus,
108 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
109 return m.db.Model(&orm.NodeLiveness{}).
110 Where(&orm.NodeLiveness{NodeID: ormNode.ID}).
111 UpdateColumn(&orm.NodeLiveness{
112 Status: common.NodeOfflineStatus,
116 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
117 for _, peerInfo := range peerInfos {
119 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
130 // TODO: fix pong time
131 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
132 xPub := &chainkd.XPub{}
133 if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
137 ormNode := &orm.Node{}
138 if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
142 log.Debugf("peerInfo Ping: %v", peerInfo.Ping)
143 ping, err := time.ParseDuration(peerInfo.Ping)
145 log.Debugf("Parse ping time err: %v", err)
149 ormNodeLiveness := &orm.NodeLiveness{
151 BestHeight: ormNode.BestHeight,
152 AvgLantencyMS: sql.NullInt64{Int64: ping.Nanoseconds() / 1000, Valid: true},
156 if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
157 UpdateColumn(&orm.NodeLiveness{
158 BestHeight: ormNodeLiveness.BestHeight,
159 AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,
160 }).FirstOrCreate(ormNodeLiveness).Error; err != nil {
164 if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
165 UpdateColumn(&orm.Node{
166 Alias: peerInfo.Moniker,
168 BestHeight: peerInfo.Height,
169 // LatestDailyUptimeMinutes uint64
170 }).First(ormNode).Error; err != nil {