OSDN Git Service

fk
authorHAOYUatHZ <haoyu@protonmail.com>
Mon, 19 Aug 2019 07:37:41 +0000 (15:37 +0800)
committerHAOYUatHZ <haoyu@protonmail.com>
Mon, 19 Aug 2019 07:37:41 +0000 (15:37 +0800)
toolbar/precog/monitor/connection.go
toolbar/precog/monitor/discover.go
toolbar/precog/monitor/monitor.go
toolbar/precog/monitor/stats.go

index ec78dc4..5d067e9 100644 (file)
@@ -20,10 +20,11 @@ func (m *monitor) connectNodesRoutine() {
 }
 
 func (m *monitor) dialNodes() error {
-       // m.Lock()
        log.Info("Start to reconnect to nodes...")
+       dbTx := m.db.Begin()
        var nodes []*orm.Node
-       if err := m.db.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
+       if err := dbTx.Model(&orm.Node{}).Find(&nodes).Error; err != nil {
+               dbTx.Rollback()
                return err
        }
 
@@ -46,6 +47,6 @@ func (m *monitor) dialNodes() error {
        // connected peers will be skipped in switch.DialPeers()
        m.sw.DialPeers(addresses)
        log.Info("DialPeers done.")
-       // m.Unlock()
+       dbTx.Commit()
        return nil
 }
index 0e9ef47..5459688 100644 (file)
@@ -33,16 +33,18 @@ func (m *monitor) collectDiscoveredNodes() {
                        continue
                }
                log.Info("discover new node: ", node)
-               // m.Lock()
-               if err := m.upSertNode(&config.Node{
+
+               dbTx := m.db.Begin()
+               if err := m.upSertNode(dbTx, &config.Node{
                        PublicKey: node.ID.String(),
                        Host:      node.IP.String(),
                        Port:      node.TCP,
                }); err != nil {
+                       dbTx.Rollback()
                        log.Error(err)
+               } else {
+                       nodeMap[node.ID.String()] = node
+                       dbTx.Commit()
                }
-
-               nodeMap[node.ID.String()] = node
-               // m.Unlock()
        }
 }
index 4a302f7..38010c9 100644 (file)
@@ -103,13 +103,16 @@ func makePath() (string, error) {
 func (m *monitor) Run() {
        defer os.RemoveAll(m.nodeCfg.DBPath)
 
+       dbTx := m.db.Begin()
        var seeds []string
        for _, node := range m.cfg.Nodes {
                seeds = append(seeds, fmt.Sprintf("%s:%d", node.Host, node.Port))
-               if err := m.upSertNode(&node); err != nil {
+               if err := m.upSertNode(dbTx, &node); err != nil {
+                       dbTx.Rollback()
                        log.Error(err)
                }
        }
+       dbTx.Commit()
        m.nodeCfg.P2P.Seeds = strings.Join(seeds, ",")
        if err := m.makeSwitch(); err != nil {
                log.Fatal(err)
@@ -177,7 +180,7 @@ func (m *monitor) checkStatusRoutine() {
        bestHeight := uint64(0)
        ticker := time.NewTicker(time.Duration(m.cfg.CheckFreqSeconds) * time.Second)
        for range ticker.C {
-               // m.Lock()
+               dbTx := m.db.Begin()
                log.Info("connected peers: ", m.sw.GetPeers().List())
 
                for _, peer := range m.sw.GetPeers().List() {
@@ -200,8 +203,6 @@ func (m *monitor) checkStatusRoutine() {
                        if peerInfo.Height > bestHeight {
                                bestHeight = peerInfo.Height
                        }
-
-                       m.savePeerInfo(peerInfo)
                }
                log.Info("bestHeight: ", bestHeight)
 
@@ -220,6 +221,14 @@ func (m *monitor) checkStatusRoutine() {
                        peers.RemovePeer(p.ID())
                }
                log.Info("Disonnect all peers.")
-               // m.Unlock()
+
+               for _, peerInfo := range peers.GetPeerInfos() {
+                       if err := m.savePeerInfo(dbTx, peerInfo); err != nil {
+                               dbTx.Rollback()
+                               log.Error(err)
+                               break
+                       }
+               }
+               dbTx.Commit()
        }
 }
index 88c819e..6943532 100644 (file)
@@ -21,13 +21,13 @@ import (
 // TODO: update lantency, active_time and status
 
 // create or update: https://github.com/jinzhu/gorm/issues/1307
-func (m *monitor) upSertNode(node *config.Node) error {
+func (m *monitor) upSertNode(dbTx *gorm.DB, node *config.Node) error {
        if node.XPub != nil {
                node.PublicKey = fmt.Sprintf("%v", node.XPub.PublicKey().String())
        }
 
        ormNode := &orm.Node{PublicKey: node.PublicKey}
-       if err := m.db.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
+       if err := dbTx.Where(&orm.Node{PublicKey: node.PublicKey}).First(ormNode).Error; err != nil && err != gorm.ErrRecordNotFound {
                return err
        }
 
@@ -39,7 +39,7 @@ func (m *monitor) upSertNode(node *config.Node) error {
        }
        ormNode.Host = node.Host
        ormNode.Port = node.Port
-       return m.db.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
+       return dbTx.Where(&orm.Node{PublicKey: ormNode.PublicKey}).
                Assign(&orm.Node{
                        Xpub:  ormNode.Xpub,
                        Alias: ormNode.Alias,
@@ -48,14 +48,14 @@ func (m *monitor) upSertNode(node *config.Node) error {
                }).FirstOrCreate(ormNode).Error
 }
 
-func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error {
+func (m *monitor) savePeerInfo(dbTx *gorm.DB, peerInfo *peers.PeerInfo) error {
        xPub := &chainkd.XPub{}
        if err := xPub.UnmarshalText([]byte(peerInfo.ID)); err != nil {
                return err
        }
 
        ormNode := &orm.Node{}
-       if err := m.db.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
+       if err := dbTx.Model(&orm.Node{}).Where(&orm.Node{PublicKey: xPub.PublicKey().String()}).
                UpdateColumn(&orm.Node{
                        Alias:      peerInfo.Moniker,
                        Xpub:       peerInfo.ID,
@@ -74,7 +74,7 @@ func (m *monitor) savePeerInfo(peerInfo *peers.PeerInfo) error {
                // PingTimes     uint64
                // PongTimes     uint64
        }
-       if err := m.db.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
+       if err := dbTx.Model(&orm.NodeLiveness{}).Where("node_id = ? AND status != ?", ormNode.ID, common.NodeOfflineStatus).
                UpdateColumn(&orm.NodeLiveness{
                        BestHeight:    ormNodeLiveness.BestHeight,
                        AvgLantencyMS: ormNodeLiveness.AvgLantencyMS,