package monitor
import (
+ "sync"
"time"
log "github.com/sirupsen/logrus"
discvFreqSec = 60
)
-func (m *monitor) discoveryRoutine() {
+func (m *monitor) discoveryRoutine(discvWg *sync.WaitGroup) {
ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
for range ticker.C {
+ m.Lock()
nodes := make([]*dht.Node, nodesToDiscv)
- n := m.sw.GetDiscv().ReadRandomNodes(nodes)
- for i := 0; i < n; i++ {
+ num := m.sw.GetDiscv().ReadRandomNodes(nodes)
+ for i := 0; i < num; i++ {
+ if n, ok := m.discvMap[nodes[i].ID.String()]; ok && n.String() == nodes[i].String() {
+ continue
+ }
+
+ log.Infof("discover new node: %v", nodes[i])
+ discvWg.Add(1)
m.discvCh <- nodes[i]
}
+ discvWg.Wait()
+ m.Unlock()
}
}
-func (m *monitor) collectDiscoveredNodes() {
- // nodeMap maps a node's public key to the node itself
- nodeMap := make(map[string]*dht.Node)
+func (m *monitor) collectDiscoveredNodes(discvWg *sync.WaitGroup) {
for node := range m.discvCh {
- if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
- continue
- }
- log.Infof("discover new node: %v", node)
-
if err := m.upSertNode(&config.Node{
PublicKey: node.ID.String(),
Host: node.IP.String(),
Port: node.TCP,
}); err == nil {
- nodeMap[node.ID.String()] = node
+ m.discvMap[node.ID.String()] = node
} else {
log.Error(err)
}
+ discvWg.Done()
}
}
type monitor struct {
*sync.RWMutex
- cfg *config.Config
- db *gorm.DB
- nodeCfg *vaporCfg.Config
- sw *p2p.Switch
- privKey chainkd.XPrv
- chain *mock.Chain
- txPool *mock.Mempool
- discvCh chan *dht.Node
- dialCh chan struct{}
+ cfg *config.Config
+ db *gorm.DB
+ nodeCfg *vaporCfg.Config
+ sw *p2p.Switch
+ privKey chainkd.XPrv
+ chain *mock.Chain
+ txPool *mock.Mempool
+ // discvMap maps a node's public key to the node itself
+ discvMap map[string]*dht.Node
+ discvCh chan *dht.Node
+ dialCh chan struct{}
+ // TODO: maybe remove?
checkStatusCh chan struct{}
}
privKey: privKey.(chainkd.XPrv),
chain: chain,
txPool: txPool,
+ discvMap: make(map[string]*dht.Node),
discvCh: make(chan *dht.Node),
dialCh: make(chan struct{}, 1),
checkStatusCh: make(chan struct{}, 1),
}
m.dialCh <- struct{}{}
- go m.discoveryRoutine()
- go m.collectDiscoveredNodes()
+ var discvWg sync.WaitGroup
+ go m.discoveryRoutine(&discvWg)
+ go m.collectDiscoveredNodes(&discvWg)
go m.connectNodesRoutine()
go m.checkStatusRoutine()
}