From a24d9d6d08105f3db232c0a1c54e2a466ce779fd Mon Sep 17 00:00:00 2001 From: HAOYUatHZ Date: Mon, 19 Aug 2019 15:37:41 +0800 Subject: [PATCH] fk --- toolbar/precog/monitor/connection.go | 7 ++++--- toolbar/precog/monitor/discover.go | 12 +++++++----- toolbar/precog/monitor/monitor.go | 19 ++++++++++++++----- toolbar/precog/monitor/stats.go | 12 ++++++------ 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/toolbar/precog/monitor/connection.go b/toolbar/precog/monitor/connection.go index ec78dc45..5d067e95 100644 --- a/toolbar/precog/monitor/connection.go +++ b/toolbar/precog/monitor/connection.go @@ -20,10 +20,11 @@ func (m *monitor) connectNodesRoutine() { } func (m *monitor) dialNodes() error { - // m.Lock() log.Info("Start to reconnect to nodes...") + dbTx := m.db.Begin() var nodes []*orm.Node - if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil { + if err := dbTx.Model(&orm.Node{}).Find(&nodes).Error; err != nil { + dbTx.Rollback() return err } @@ -46,6 +47,6 @@ func (m *monitor) dialNodes() error { // connected peers will be skipped in switch.DialPeers() m.sw.DialPeers(addresses) log.Info("DialPeers done.") - // m.Unlock() + dbTx.Commit() return nil } diff --git a/toolbar/precog/monitor/discover.go b/toolbar/precog/monitor/discover.go index 0e9ef473..54596888 100644 --- a/toolbar/precog/monitor/discover.go +++ b/toolbar/precog/monitor/discover.go @@ -33,16 +33,18 @@ func (m *monitor) collectDiscoveredNodes() { continue } log.Info("discover new node: ", node) - // m.Lock() - if err := m.upSertNode(&config.Node{ + + dbTx := m.db.Begin() + if err := m.upSertNode(dbTx, &config.Node{ PublicKey: node.ID.String(), Host: node.IP.String(), Port: node.TCP, }); err != nil { + dbTx.Rollback() log.Error(err) + } else { + nodeMap[node.ID.String()] = node + dbTx.Commit() } - - nodeMap[node.ID.String()] = node - // m.Unlock() } } diff --git a/toolbar/precog/monitor/monitor.go b/toolbar/precog/monitor/monitor.go index 4a302f7f..38010c9b 100644 --- a/toolbar/precog/monitor/monitor.go +++ b/toolbar/precog/monitor/monitor.go @@ -103,13 +103,16 @@ func makePath() (string, error) { func (m *monitor) Run() { defer os.RemoveAll(m.nodeCfg.DBPath) + dbTx := m.db.Begin() var seeds []string for _, node := range m.cfg.Nodes { seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port)) - if err := m.upSertNode(&node); err != nil { + if err := m.upSertNode(dbTx, &node); err != nil { + dbTx.Rollback() log.Error(err) } } + dbTx.Commit() m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",") if err := m.makeSwitch(); err != nil { log.Fatal(err) @@ -177,7 +180,7 @@ func (m *monitor) checkStatusRoutine() { bestHeight := uint64(0) ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second) for range ticker.C { - // m.Lock() + dbTx := m.db.Begin() log.Info("connected peers: ", m.sw.GetPeers().List()) for _, peer := range m.sw.GetPeers().List() { @@ -200,8 +203,6 @@ func (m *monitor) checkStatusRoutine() { if peerInfo.Height > bestHeight { bestHeight = peerInfo.Height } - - m.savePeerInfo(peerInfo) } log.Info("bestHeight: ", bestHeight) @@ -220,6 +221,14 @@ func (m *monitor) checkStatusRoutine() { peers.RemovePeer(p.ID()) } log.Info("Disonnect all peers.") - // m.Unlock() + + for _, peerInfo := range peers.GetPeerInfos() { + if err := m.savePeerInfo(dbTx, peerInfo); err != nil { + dbTx.Rollback() + log.Error(err) + break + } + } + dbTx.Commit() } } diff --git a/toolbar/precog/monitor/stats.go b/toolbar/precog/monitor/stats.go index 88c819ec..69435329 100644 --- a/toolbar/precog/monitor/stats.go +++ b/toolbar/precog/monitor/stats.go @@ -21,13 +21,13 @@ import ( // TODO: update lantency, active_time and status // create or update: https://github.com/jinzhu/gorm/issues/1307 -func (m *monitor) upSertNode(node *config.Node) error { +func (m *monitor) upSertNode(dbTx *gorm.DB, node *config.Node) error { if node.XPub != nil { node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String()) } ormNode := &orm.Node{PublicKey: node.PublicKey} - if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound { + if err := dbTx.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound { return err } @@ -39,7 +39,7 @@ func (m *monitor) upSertNode(node *config.Node) error { } ormNode.Host = node.Host ormNode.Port = node.Port - return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}). + return dbTx.Where(&orm.Node{PublicKey: ormNode.PublicKey}). Assign(&orm.Node{ Xpub: ormNode.Xpub, Alias: ormNode.Alias, @@ -48,14 +48,14 @@ func (m *monitor) upSertNode(node *config.Node) error { }).FirstOrCreate(ormNode).Error } -func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error { +func (m *monitor) savePeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error { xPub := &chainkd.XPub{} if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil { return err } ormNode := &orm.Node{} - if err := m.db.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}). + if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}). UpdateColumn(&orm.Node{ Alias: peerInfo.Moniker, Xpub: peerInfo.ID, @@ -74,7 +74,7 @@ func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error { // PingTimes uint64 // PongTimes uint64 } - if err := m.db.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus). + if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus). UpdateColumn(&orm.NodeLiveness{ BestHeight: ormNodeLiveness.BestHeight, AvgLantencyMS: ormNodeLiveness.AvgLantencyMS, -- 2.11.0