From: HAOYUatHZ Date: Tue, 27 Aug 2019 08:45:39 +0000 (+0800) Subject: fix deadlock X-Git-Url: http://git.osdn.net/view?p=bytom%2Fvapor.git;a=commitdiff_plain;h=27b0aeba0095c8fce84d729021a7ca3b7d551c7d fix deadlock --- diff --git a/toolbar/precog/monitor/discover.go b/toolbar/precog/monitor/discover.go index 07bf6e17..a7e553df 100644 --- a/toolbar/precog/monitor/discover.go +++ b/toolbar/precog/monitor/discover.go @@ -1,6 +1,7 @@ package monitor import ( + "sync" "time" log "github.com/sirupsen/logrus" @@ -14,34 +15,37 @@ var ( discvFreqSec = 60 ) -func (m *monitor) discoveryRoutine() { +func (m *monitor) discoveryRoutine(discvWg *sync.WaitGroup) { ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second) for range ticker.C { + m.Lock() nodes := make([]*dht.Node, nodesToDiscv) - n := m.sw.GetDiscv().ReadRandomNodes(nodes) - for i := 0; i < n; i++ { + num := m.sw.GetDiscv().ReadRandomNodes(nodes) + for i := 0; i < num; i++ { + if n, ok := m.discvMap[nodes[i].ID.String()]; ok && n.String() == nodes[i].String() { + continue + } + + log.Infof("discover new node: %v", nodes[i]) + discvWg.Add(1) m.discvCh <- nodes[i] } + discvWg.Wait() + m.Unlock() } } -func (m *monitor) collectDiscoveredNodes() { - // nodeMap maps a node's public key to the node itself - nodeMap := make(map[string]*dht.Node) +func (m *monitor) collectDiscoveredNodes(discvWg *sync.WaitGroup) { for node := range m.discvCh { - if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() { - continue - } - log.Infof("discover new node: %v", node) - if err := m.upSertNode(&config.Node{ PublicKey: node.ID.String(), Host: node.IP.String(), Port: node.TCP, }); err == nil { - nodeMap[node.ID.String()] = node + m.discvMap[node.ID.String()] = node } else { log.Error(err) } + discvWg.Done() } } diff --git a/toolbar/precog/monitor/monitor.go b/toolbar/precog/monitor/monitor.go index a90e8d2e..c52adb79 100644 --- a/toolbar/precog/monitor/monitor.go +++ b/toolbar/precog/monitor/monitor.go @@ -28,15 +28,18 @@ import ( type monitor struct { *sync.RWMutex - cfg *config.Config - db *gorm.DB - nodeCfg *vaporCfg.Config - sw *p2p.Switch - privKey chainkd.XPrv - chain *mock.Chain - txPool *mock.Mempool - discvCh chan *dht.Node - dialCh chan struct{} + cfg *config.Config + db *gorm.DB + nodeCfg *vaporCfg.Config + sw *p2p.Switch + privKey chainkd.XPrv + chain *mock.Chain + txPool *mock.Mempool + // discvMap maps a node's public key to the node itself + discvMap map[string]*dht.Node + discvCh chan *dht.Node + dialCh chan struct{} + // TODO: maybe remove? checkStatusCh chan struct{} } @@ -75,6 +78,7 @@ func NewMonitor(cfg *config.Config, db *gorm.DB) *monitor { privKey: privKey.(chainkd.XPrv), chain: chain, txPool: txPool, + discvMap: make(map[string]*dht.Node), discvCh: make(chan *dht.Node), dialCh: make(chan struct{}, 1), checkStatusCh: make(chan struct{}, 1), @@ -116,8 +120,9 @@ func (m *monitor) Run() { } m.dialCh <- struct{}{} - go m.discoveryRoutine() - go m.collectDiscoveredNodes() + var discvWg sync.WaitGroup + go m.discoveryRoutine(&discvWg) + go m.collectDiscoveredNodes(&discvWg) go m.connectNodesRoutine() go m.checkStatusRoutine() }