8 "github.com/jinzhu/gorm"
9 log "github.com/sirupsen/logrus"
11 "github.com/vapor/crypto/ed25519/chainkd"
12 "github.com/vapor/netsync/peers"
13 "github.com/vapor/p2p"
14 "github.com/vapor/toolbar/precog/common"
15 "github.com/vapor/toolbar/precog/config"
16 "github.com/vapor/toolbar/precog/database/orm"
19 // create or update: https://github.com/jinzhu/gorm/issues/1307
20 func (m *monitor) upSertNode(node *config.Node) error {
22 node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
25 ormNode := &orm.Node{PublicKey: node.PublicKey}
26 if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
31 ormNode.Alias = node.Alias
34 ormNode.Xpub = node.XPub.String()
36 ormNode.Host = node.Host
37 ormNode.Port = node.Port
38 return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
44 }).FirstOrCreate(ormNode).Error
47 func (m *monitor) processDialResults() error {
48 log.Info("================================================================")
49 var ormNodes []*orm.Node
50 if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
54 xPub := &chainkd.XPub{}
55 connMap := make(map[string]bool, len(ormNodes))
57 for _, peer := range m.sw.GetPeers().List() {
58 if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
63 publicKey := xPub.PublicKey().String()
64 connMap[publicKey] = true
65 if err := m.processConnectedPeer(publicKey, peer); err != nil {
71 for _, ormNode := range ormNodes {
72 if _, ok := connMap[ormNode.PublicKey]; ok {
76 if err := m.processOfflinePeer(ormNode); err != nil {
84 // TODO: add start time here
85 func (m *monitor) processConnectedPeer(publicKey string, peer *p2p.Peer) error {
86 ormNodeLiveness := &orm.NodeLiveness{}
87 if err := m.db.Model(&orm.NodeLiveness{}).
88 Joins("join nodes on nodes.id = node_livenesses.node_id").
89 Where("nodes.public_key = ?", publicKey).First(ormNodeLiveness).Error; err != nil {
96 func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
97 return m.db.Model(&orm.NodeLiveness{}).
98 Where(&orm.NodeLiveness{NodeID: ormNode.ID}).
99 UpdateColumn(&orm.NodeLiveness{
100 Status: common.NodeOfflineStatus,
104 func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
105 for _, peerInfo := range peerInfos {
107 if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
118 func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
119 xPub := &chainkd.XPub{}
120 if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
124 ormNode := &orm.Node{}
125 if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).First(ormNode).Error; err != nil {
129 log.Debugf("peerInfo.Ping: %v", peerInfo.Ping)
130 ping, err := time.ParseDuration(peerInfo.Ping)
132 log.Debugf("Parse ping time err: %v", err)
136 ormNodeLiveness := &orm.NodeLiveness{
138 BestHeight: ormNode.BestHeight,
139 AvgLantencyMS: sql.NullInt64{Int64: ping.Nanoseconds() / 1000, Valid: true},
143 if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
144 UpdateColumn(&orm.NodeLiveness{
145 BestHeight: ormNodeLiveness.BestHeight,
146 AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,
147 }).FirstOrCreate(ormNodeLiveness).Error; err != nil {
151 if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
152 UpdateColumn(&orm.Node{
153 Alias: peerInfo.Moniker,
155 BestHeight: peerInfo.Height,
156 // LatestDailyUptimeMinutes uint64
157 }).First(ormNode).Error; err != nil {