package monitor
import (
- "sync"
"time"
log "github.com/sirupsen/logrus"
discvFreqSec = 60
)
-func (m *monitor) discoveryRoutine(discvWg *sync.WaitGroup) {
+func (m *monitor) discoveryRoutine() {
ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
-
for range ticker.C {
- nodes := make([]*dht.Node, nodesToDiscv)
- n := m.sw.GetDiscv().ReadRandomNodes(nodes)
m.Lock()
- for i := 0; i < n; i++ {
- m.discvCh <- nodes[i]
- discvWg.Add(1)
+
+ nodes := make([]*dht.Node, nodesToDiscv)
+ num := m.sw.GetDiscv().ReadRandomNodes(nodes)
+ for _, node := range nodes[:num] {
+ if n, ok := m.discvMap[node.ID.String()]; ok && n.String() == node.String() {
+ continue
+ }
+
+ log.WithFields(log.Fields{"new node": node}).Info("discover")
+ m.saveDiscoveredNode(node)
}
- discvWg.Wait()
+
m.Unlock()
}
}
-func (m *monitor) collectDiscoveredNodes(discvWg *sync.WaitGroup) {
- // nodeMap maps a node's public key to the node itself
- nodeMap := make(map[string]*dht.Node)
- for node := range m.discvCh {
- if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
- continue
- }
- log.Infof("discover new node: %v", node)
-
- if err := m.upSertNode(&config.Node{
- PublicKey: node.ID.String(),
- Host: node.IP.String(),
- Port: node.TCP,
- }); err == nil {
- nodeMap[node.ID.String()] = node
- } else {
- log.Error(err)
- }
-
- discvWg.Done()
+func (m *monitor) saveDiscoveredNode(node *dht.Node) {
+ if err := m.upSertNode(&config.Node{
+ PublicKey: node.ID.String(),
+ IP: node.IP.String(),
+ Port: node.TCP,
+ }); err == nil {
+ m.discvMap[node.ID.String()] = node
+ } else {
+ log.WithFields(log.Fields{
+ "node": node,
+ "err": err,
+ }).Error("upSertNode")
}
}