OSDN Git Service

fix deadlock
authorHAOYUatHZ <haoyu@protonmail.com>
Tue, 27 Aug 2019 08:45:39 +0000 (16:45 +0800)
committerHAOYUatHZ <haoyu@protonmail.com>
Tue, 27 Aug 2019 08:45:39 +0000 (16:45 +0800)
toolbar/precog/monitor/discover.go
toolbar/precog/monitor/monitor.go

index 07bf6e1..a7e553d 100644 (file)
@@ -1,6 +1,7 @@
 package monitor
 
 import (
+       "sync"
        "time"
 
        log "github.com/sirupsen/logrus"
@@ -14,34 +15,37 @@ var (
        discvFreqSec = 60
 )
 
-func (m *monitor) discoveryRoutine() {
+func (m *monitor) discoveryRoutine(discvWg *sync.WaitGroup) {
        ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
        for range ticker.C {
+               m.Lock()
                nodes := make([]*dht.Node, nodesToDiscv)
-               n := m.sw.GetDiscv().ReadRandomNodes(nodes)
-               for i := 0; i < n; i++ {
+               num := m.sw.GetDiscv().ReadRandomNodes(nodes)
+               for i := 0; i < num; i++ {
+                       if n, ok := m.discvMap[nodes[i].ID.String()]; ok && n.String() == nodes[i].String() {
+                               continue
+                       }
+
+                       log.Infof("discover new node: %v", nodes[i])
+                       discvWg.Add(1)
                        m.discvCh <- nodes[i]
                }
+               discvWg.Wait()
+               m.Unlock()
        }
 }
 
-func (m *monitor) collectDiscoveredNodes() {
-       // nodeMap maps a node's public key to the node itself
-       nodeMap := make(map[string]*dht.Node)
+func (m *monitor) collectDiscoveredNodes(discvWg *sync.WaitGroup) {
        for node := range m.discvCh {
-               if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
-                       continue
-               }
-               log.Infof("discover new node: %v", node)
-
                if err := m.upSertNode(&config.Node{
                        PublicKey: node.ID.String(),
                        Host:      node.IP.String(),
                        Port:      node.TCP,
                }); err == nil {
-                       nodeMap[node.ID.String()] = node
+                       m.discvMap[node.ID.String()] = node
                } else {
                        log.Error(err)
                }
+               discvWg.Done()
        }
 }
index a90e8d2..c52adb7 100644 (file)
@@ -28,15 +28,18 @@ import (
 
 type monitor struct {
        *sync.RWMutex
-       cfg           *config.Config
-       db            *gorm.DB
-       nodeCfg       *vaporCfg.Config
-       sw            *p2p.Switch
-       privKey       chainkd.XPrv
-       chain         *mock.Chain
-       txPool        *mock.Mempool
-       discvCh       chan *dht.Node
-       dialCh        chan struct{}
+       cfg     *config.Config
+       db      *gorm.DB
+       nodeCfg *vaporCfg.Config
+       sw      *p2p.Switch
+       privKey chainkd.XPrv
+       chain   *mock.Chain
+       txPool  *mock.Mempool
+       // discvMap maps a node's public key to the node itself
+       discvMap map[string]*dht.Node
+       discvCh  chan *dht.Node
+       dialCh   chan struct{}
+       // TODO: maybe remove?
        checkStatusCh chan struct{}
 }
 
@@ -75,6 +78,7 @@ func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor {
                privKey:       privKey.(chainkd.XPrv),
                chain:         chain,
                txPool:        txPool,
+               discvMap:      make(map[string]*dht.Node),
                discvCh:       make(chan *dht.Node),
                dialCh:        make(chan struct{}, 1),
                checkStatusCh: make(chan struct{}, 1),
@@ -116,8 +120,9 @@ func (m *monitor) Run() {
        }
 
        m.dialCh <- struct{}{}
-       go m.discoveryRoutine()
-       go m.collectDiscoveredNodes()
+       var discvWg sync.WaitGroup
+       go m.discoveryRoutine(&discvWg)
+       go m.collectDiscoveredNodes(&discvWg)
        go m.connectNodesRoutine()
        go m.checkStatusRoutine()
 }