}
func (m *monitor) dialNodes() error {
- // m.Lock()
log.Info("Start to reconnect to nodes...")
+ dbTx := m.db.Begin()
var nodes []*orm.Node
- if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
+ if err := dbTx.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
+ dbTx.Rollback()
return err
}
// connected peers will be skipped in switch.DialPeers()
m.sw.DialPeers(addresses)
log.Info("DialPeers done.")
- // m.Unlock()
+ dbTx.Commit()
return nil
}
continue
}
log.Info("discover new node: ", node)
- // m.Lock()
- if err := m.upSertNode(&config.Node{
+
+ dbTx := m.db.Begin()
+ if err := m.upSertNode(dbTx, &config.Node{
PublicKey: node.ID.String(),
Host: node.IP.String(),
Port: node.TCP,
}); err != nil {
+ dbTx.Rollback()
log.Error(err)
+ } else {
+ nodeMap[node.ID.String()] = node
+ dbTx.Commit()
}
-
- nodeMap[node.ID.String()] = node
- // m.Unlock()
}
}
func (m *monitor) Run() {
defer os.RemoveAll(m.nodeCfg.DBPath)
+ dbTx := m.db.Begin()
var seeds []string
for _, node := range m.cfg.Nodes {
seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
- if err := m.upSertNode(&node); err != nil {
+ if err := m.upSertNode(dbTx, &node); err != nil {
+ dbTx.Rollback()
log.Error(err)
}
}
+ dbTx.Commit()
m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
if err := m.makeSwitch(); err != nil {
log.Fatal(err)
bestHeight := uint64(0)
ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
for range ticker.C {
- // m.Lock()
+ dbTx := m.db.Begin()
log.Info("connected peers: ", m.sw.GetPeers().List())
for _, peer := range m.sw.GetPeers().List() {
if peerInfo.Height > bestHeight {
bestHeight = peerInfo.Height
}
-
- m.savePeerInfo(peerInfo)
}
log.Info("bestHeight: ", bestHeight)
peers.RemovePeer(p.ID())
}
log.Info("Disonnect all peers.")
- // m.Unlock()
+
+ for _, peerInfo := range peers.GetPeerInfos() {
+ if err := m.savePeerInfo(dbTx, peerInfo); err != nil {
+ dbTx.Rollback()
+ log.Error(err)
+ break
+ }
+ }
+ dbTx.Commit()
}
}
// TODO: update lantency, active_time and status
// create or update: https://github.com/jinzhu/gorm/issues/1307
-func (m *monitor) upSertNode(node *config.Node) error {
+func (m *monitor) upSertNode(dbTx *gorm.DB, node *config.Node) error {
if node.XPub != nil {
node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
}
ormNode := &orm.Node{PublicKey: node.PublicKey}
- if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+ if err := dbTx.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
return err
}
}
ormNode.Host = node.Host
ormNode.Port = node.Port
- return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
+ return dbTx.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
Assign(&orm.Node{
Xpub: ormNode.Xpub,
Alias: ormNode.Alias,
}).FirstOrCreate(ormNode).Error
}
-func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error {
+func (m *monitor) savePeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
xPub := &chainkd.XPub{}
if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
return err
}
ormNode := &orm.Node{}
- if err := m.db.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
+ if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
UpdateColumn(&orm.Node{
Alias: peerInfo.Moniker,
Xpub: peerInfo.ID,
// PingTimes uint64
// PongTimes uint64
}
- if err := m.db.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
+ if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
UpdateColumn(&orm.NodeLiveness{
BestHeight: ormNodeLiveness.BestHeight,
AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,