"github.com/vapor/crypto/ed25519/chainkd"
"github.com/vapor/netsync/peers"
- "github.com/vapor/p2p"
"github.com/vapor/toolbar/precog/common"
"github.com/vapor/toolbar/precog/config"
"github.com/vapor/toolbar/precog/database/orm"
)
// create or update: https://github.com/jinzhu/gorm/issues/1307
-func (m *monitor) upSertNode(node *config.Node) error {
+func (m *monitor) upsertNode(node *config.Node) error {
if node.XPub != nil {
node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
}
- ormNode := &orm.Node{PublicKey: node.PublicKey}
- if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+ ormNode := &orm.Node{
+ IP: node.IP,
+ Port: node.Port,
+ }
+ if err := m.db.Where(ormNode).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
return err
}
- if node.Alias != "" {
- ormNode.Alias = node.Alias
- }
+ ormNode.PublicKey = node.PublicKey
if node.XPub != nil {
ormNode.Xpub = node.XPub.String()
}
- ormNode.IP = node.IP
- ormNode.Port = node.Port
- return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
- Assign(&orm.Node{
- Xpub: ormNode.Xpub,
- Alias: ormNode.Alias,
- IP: ormNode.IP,
- Port: ormNode.Port,
- }).FirstOrCreate(ormNode).Error
+ return m.db.Save(ormNode).Error
}
+// TODO: maybe return connected nodes here for checkStatus
func (m *monitor) processDialResults() error {
var ormNodes []*orm.Node
if err := m.db.Model(&orm.Node{}).Find(&ormNodes).Error; err != nil {
for _, peer := range m.sw.GetPeers().List() {
xPub := &chainkd.XPub{}
if err := xPub.UnmarshalText([]byte(peer.Key)); err != nil {
- log.Error(err)
+ log.WithFields(log.Fields{"xpub": peer.Key}).Error("unmarshal xpub")
continue
}
publicKey := xPub.PublicKey().String()
connMap[publicKey] = true
- if err := m.processConnectedPeer(publicKeyMap[publicKey], peer); err != nil {
- log.Error(err)
+ if err := m.processConnectedPeer(publicKeyMap[publicKey]); err != nil {
+ log.WithFields(log.Fields{"peer publicKey": publicKey, "err": err}).Error("processConnectedPeer")
}
}
}
if err := m.processOfflinePeer(ormNode); err != nil {
- log.Error(err)
+ log.WithFields(log.Fields{"peer publicKey": ormNode.PublicKey, "err": err}).Error("processOfflinePeer")
}
}
return nil
}
-func (m *monitor) processConnectedPeer(ormNode *orm.Node, peer *p2p.Peer) error {
- ormNodeLiveness := &orm.NodeLiveness{}
- err := m.db.Model(&orm.NodeLiveness{}).Joins("join nodes on nodes.id = node_livenesses.node_id").
- Where("nodes.public_key = ? AND status != ?", ormNode.PublicKey, common.NodeOfflineStatus).Last(ormNodeLiveness).Error
- if err == nil {
- return m.db.Model(&orm.NodeLiveness{}).Where(ormNodeLiveness).UpdateColumn(&orm.NodeLiveness{
- PingTimes: ormNodeLiveness.PingTimes + 1,
- }).Error
- } else if err != gorm.ErrRecordNotFound {
+func (m *monitor) processConnectedPeer(ormNode *orm.Node) error {
+ ormNodeLiveness := &orm.NodeLiveness{NodeID: ormNode.ID}
+ err := m.db.Preload("Node").Where(ormNodeLiveness).Last(ormNodeLiveness).Error
+ if err != nil && err != gorm.ErrRecordNotFound {
return err
}
- // gorm.ErrRecordNotFound
- return m.db.Create(&orm.NodeLiveness{
- NodeID: ormNode.ID,
- PingTimes: 1,
- Status: common.NodeUnknownStatus,
- }).Error
+ ormNodeLiveness.PongTimes += 1
+ if ormNode.Status == common.NodeOfflineStatus {
+ ormNode.Status = common.NodeUnknownStatus
+ }
+ ormNodeLiveness.Node = ormNode
+ return m.db.Save(ormNodeLiveness).Error
}
func (m *monitor) processOfflinePeer(ormNode *orm.Node) error {
- return m.db.Model(&orm.NodeLiveness{}).
- Where(&orm.NodeLiveness{NodeID: ormNode.ID}).
- UpdateColumn(&orm.NodeLiveness{
- Status: common.NodeOfflineStatus,
- }).Error
+ ormNode.Status = common.NodeOfflineStatus
+ return m.db.Save(ormNode).Error
}
-func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) error {
+func (m *monitor) processPeerInfos(peerInfos []*peers.PeerInfo) {
for _, peerInfo := range peerInfos {
dbTx := m.db.Begin()
if err := m.processPeerInfo(dbTx, peerInfo); err != nil {
- log.Error(err)
+ log.WithFields(log.Fields{"peerInfo": peerInfo, "err": err}).Error("processPeerInfo")
dbTx.Rollback()
} else {
dbTx.Commit()
}
}
-
- return nil
}
func (m *monitor) processPeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
return err
}
- log.Debugf("peerInfo ping: %v", peerInfo.Ping)
+ if ormNode.Status == common.NodeOfflineStatus {
+ return fmt.Errorf("node %s status error", ormNode.PublicKey)
+ }
+
+ log.WithFields(log.Fields{"ping": peerInfo.Ping}).Debug("peerInfo")
ping, err := time.ParseDuration(peerInfo.Ping)
if err != nil {
return err
now := time.Now()
yesterday := now.Add(-24 * time.Hour)
var ormNodeLivenesses []*orm.NodeLiveness
- if err := dbTx.Model(&orm.NodeLiveness{}).
+ if err := dbTx.Preload("Node").Model(&orm.NodeLiveness{}).
Where("node_id = ? AND updated_at >= ?", ormNode.ID, yesterday).
Order(fmt.Sprintf("created_at %s", "DESC")).
Find(&ormNodeLivenesses).Error; err != nil {
// update latest liveness
latestLiveness := ormNodeLivenesses[0]
- if latestLiveness.Status == common.NodeOfflineStatus {
- return fmt.Errorf("node %s latest liveness status error", ormNode.PublicKey)
- }
-
lantencyMS := ping.Nanoseconds() / 1000
if lantencyMS != 0 {
- latestLiveness.AvgLantencyMS = sql.NullInt64{
- Int64: (latestLiveness.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
+ ormNode.AvgLantencyMS = sql.NullInt64{
+ Int64: (ormNode.AvgLantencyMS.Int64*int64(latestLiveness.PongTimes) + lantencyMS) / int64(latestLiveness.PongTimes+1),
Valid: true,
}
}