func (m *monitor) discoveryRoutine() {
ticker := time.NewTicker(time.Duration(discvFreqSec) * time.Second)
for range ticker.C {
+ m.Lock()
+
nodes := make([]*dht.Node, nodesToDiscv)
- n := m.sw.GetDiscv().ReadRandomNodes(nodes)
- for i := 0; i < n; i++ {
- m.discvCh <- nodes[i]
+ num := m.sw.GetDiscv().ReadRandomNodes(nodes)
+ for _, node := range nodes[:num] {
+ if n, ok := m.discvMap[node.ID.String()]; ok && n.String() == node.String() {
+ continue
+ }
+
+ log.WithFields(log.Fields{"new node": node}).Info("discover")
+ m.saveDiscoveredNode(node)
}
+
+ m.Unlock()
}
}
-func (m *monitor) collectDiscoveredNodes() {
- // nodeMap maps a node's public key to the node itself
- nodeMap := make(map[string]*dht.Node)
- for node := range m.discvCh {
- if n, ok := nodeMap[node.ID.String()]; ok && n.String() == node.String() {
- continue
- }
- log.Info("discover new node: ", node)
-
- if err := m.upSertNode(&config.Node{
- PublicKey: node.ID.String(),
- Host: node.IP.String(),
- Port: node.TCP,
- }); err == nil {
- nodeMap[node.ID.String()] = node
- } else {
- log.Error(err)
- }
+func (m *monitor) saveDiscoveredNode(node *dht.Node) {
+ if err := m.upSertNode(&config.Node{
+ PublicKey: node.ID.String(),
+ IP: node.IP.String(),
+ Port: node.TCP,
+ }); err == nil {
+ m.discvMap[node.ID.String()] = node
+ } else {
+ log.WithFields(log.Fields{
+ "node": node,
+ "err": err,
+ }).Error("upSertNode")
}
}