package monitor
import (
- "sync"
"time"
log "github.com/sirupsen/logrus"
discvFreqSec = 60
)
-func (m *monitor) discoveryRoutine(discvWg *sync.WaitGroup) {
+func (m *monitor) discoveryRoutine() {
ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
for range ticker.C {
m.Lock()
+
nodes := make([]*dht.Node, nodesToDiscv)
num := m.sw.GetDiscv().ReadRandomNodes(nodes)
- for i := 0; i < num; i++ {
- node := nodes[i]
+ for _, node := range nodes[:num] {
if n, ok := m.discvMap[node.ID.String()]; ok && n.String() == node.String() {
continue
}
log.Infof("discover new node: %v", node)
- discvWg.Add(1)
- m.discvCh <- node
+ m.saveDiscoveredNode(node)
}
- discvWg.Wait()
+
m.Unlock()
}
}
-func (m *monitor) collectDiscoveredNodes(discvWg *sync.WaitGroup) {
- for node := range m.discvCh {
- if err := m.upSertNode(&config.Node{
- PublicKey: node.ID.String(),
- Host: node.IP.String(),
- Port: node.TCP,
- }); err == nil {
- m.discvMap[node.ID.String()] = node
- } else {
- log.Error(err)
- }
- discvWg.Done()
+func (m *monitor) saveDiscoveredNode(node *dht.Node) {
+ if err := m.upSertNode(&config.Node{
+ PublicKey: node.ID.String(),
+ Host: node.IP.String(),
+ Port: node.TCP,
+ }); err == nil {
+ m.discvMap[node.ID.String()] = node
+ } else {
+ log.Error(err)
}
}
txPool *mock.Mempool
// discvMap maps a node's public key to the node itself
discvMap map[string]*dht.Node
- discvCh chan *dht.Node
dialCh chan struct{}
// TODO: maybe remove?
checkStatusCh chan struct{}
chain: chain,
txPool: txPool,
discvMap: make(map[string]*dht.Node),
- discvCh: make(chan *dht.Node),
dialCh: make(chan struct{}, 1),
checkStatusCh: make(chan struct{}, 1),
}
}
m.dialCh <- struct{}{}
- var discvWg sync.WaitGroup
- go m.discoveryRoutine(&discvWg)
- go m.collectDiscoveredNodes(&discvWg)
+ go m.discoveryRoutine()
go m.connectNodesRoutine()
go m.checkStatusRoutine()
}