11 log "github.com/sirupsen/logrus"
12 "github.com/tendermint/go-wire"
13 "golang.org/x/crypto/sha3"
15 "github.com/vapor/common"
16 "github.com/vapor/crypto/sha3pool"
17 "github.com/vapor/p2p/netutil"
18 "github.com/vapor/p2p/signlib"
22 errInvalidEvent = errors.New("invalid in current state")
23 errNoQuery = errors.New("no pending query")
24 errWrongAddress = errors.New("unknown sender address")
28 autoRefreshInterval = 1 * time.Hour
29 bucketRefreshInterval = 1 * time.Minute
31 seedMaxAge = 5 * 24 * time.Hour
36 printTestImgLogs = false
39 // Network manages the table and all protocol interaction.
41 db *nodeDB // database of known nodes
43 netrestrict *netutil.Netlist
45 closed chan struct{} // closed when loop is done
46 closeReq chan struct{} // 'request to close'
47 refreshReq chan []*Node // lookups ask for refresh on this channel
48 refreshResp chan (<-chan struct{}) // ...and get the channel to block on from this one
49 read chan ingressPacket // ingress packets arrive here
50 timeout chan timeoutEvent
51 queryReq chan *findnodeQuery // lookups submit findnode queries on this channel
52 tableOpReq chan func()
53 tableOpResp chan struct{}
54 topicRegisterReq chan topicRegisterReq
55 topicSearchReq chan topicSearchReq
57 // State of the main loop.
60 ticketStore *ticketStore
62 nodes map[NodeID]*Node // tracks active nodes with state != known
63 timeoutTimers map[timeoutEvent]*time.Timer
65 // Revalidation queues.
66 // Nodes put on these queues will be pinged eventually.
67 slowRevalidateQueue []*Node
68 fastRevalidateQueue []*Node
70 // Buffers for state transition.
71 sendBuf []*ingressPacket
74 // transport is implemented by the UDP transport.
75 // it is an interface so we can test without opening lots of UDP
76 // sockets and without generating a private key.
77 type transport interface {
78 sendPing(remote *Node, remoteAddr *net.UDPAddr, topics []Topic) (hash []byte)
79 sendNeighbours(remote *Node, nodes []*Node)
80 sendFindnodeHash(remote *Node, target common.Hash)
81 sendTopicRegister(remote *Node, topics []Topic, topicIdx int, pong []byte)
82 sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node)
84 send(remote *Node, ptype nodeEvent, p interface{}) (hash []byte)
86 localAddr() *net.UDPAddr
91 type findnodeQuery struct {
95 nresults int // counter for received nodes
98 type topicRegisterReq struct {
103 type topicSearchReq struct {
110 type topicSearchResult struct {
115 type timeoutEvent struct {
120 func hash(target []byte) common.Hash {
122 sha3pool.Sum256(h[:], target)
123 return common.BytesToHash(h[:])
126 func newNetwork(conn transport, ourPubkey signlib.PubKey, dbPath string, netrestrict *netutil.Netlist) (*Network, error) {
128 copy(ourID[:], ourPubkey.Bytes()[:nodeIDBits])
131 if dbPath != "<no database>" {
133 if db, err = newNodeDB(dbPath, Version, ourID); err != nil {
138 tab := newTable(ourID, conn.localAddr())
142 netrestrict: netrestrict,
144 topictab: newTopicTable(db, tab.self),
145 ticketStore: newTicketStore(),
146 refreshReq: make(chan []*Node),
147 refreshResp: make(chan (<-chan struct{})),
148 closed: make(chan struct{}),
149 closeReq: make(chan struct{}),
150 read: make(chan ingressPacket, 100),
151 timeout: make(chan timeoutEvent),
152 timeoutTimers: make(map[timeoutEvent]*time.Timer),
153 tableOpReq: make(chan func()),
154 tableOpResp: make(chan struct{}),
155 queryReq: make(chan *findnodeQuery),
156 topicRegisterReq: make(chan topicRegisterReq),
157 topicSearchReq: make(chan topicSearchReq),
158 nodes: make(map[NodeID]*Node),
164 // Close terminates the network listener and flushes the node database.
165 func (net *Network) Close() {
169 case net.closeReq <- struct{}{}:
174 // Self returns the local node.
175 // The returned node should not be modified by the caller.
176 func (net *Network) Self() *Node {
180 func (net *Network) selfIP() net.IP {
181 return net.tab.self.IP
184 // ReadRandomNodes fills the given slice with random nodes from the
185 // table. It will not write the same node more than once. The nodes in
186 // the slice are copies and can be modified by the caller.
187 func (net *Network) ReadRandomNodes(buf []*Node) (n int) {
188 net.reqTableOp(func() { n = net.tab.readRandomNodes(buf) })
192 // SetFallbackNodes sets the initial points of contact. These nodes
193 // are used to connect to the network if the table is empty and there
194 // are no known nodes in the database.
195 func (net *Network) SetFallbackNodes(nodes []*Node) error {
196 nursery := make([]*Node, 0, len(nodes))
197 for _, n := range nodes {
198 if err := n.validateComplete(); err != nil {
199 return fmt.Errorf("bad bootstrap/fallback node %q (%v)", n, err)
201 // Recompute cpy.sha because the node might not have been
202 // created by NewNode or ParseNode.
204 cpy.sha = hash(n.ID[:])
205 nursery = append(nursery, &cpy)
207 net.reqRefresh(nursery)
211 // Resolve searches for a specific node with the given ID.
212 // It returns nil if the node could not be found.
213 func (net *Network) Resolve(targetID NodeID) *Node {
214 result := net.lookup(hash(targetID[:]), true)
215 for _, n := range result {
216 if n.ID == targetID {
223 // Lookup performs a network search for nodes close
224 // to the given target. It approaches the target by querying
225 // nodes that are closer to it on each iteration.
226 // The given target does not need to be an actual node
229 // The local node may be included in the result.
230 func (net *Network) Lookup(targetID NodeID) []*Node {
231 return net.lookup(hash(targetID[:]), false)
234 func (net *Network) lookup(target common.Hash, stopOnMatch bool) []*Node {
236 asked = make(map[NodeID]bool)
237 seen = make(map[NodeID]bool)
238 reply = make(chan []*Node, alpha)
239 result = nodesByDistance{target: target}
242 // Get initial answers from the local node.
243 result.push(net.tab.self, bucketSize)
245 // Ask the α closest nodes that we haven't asked yet.
246 for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
247 n := result.entries[i]
251 net.reqQueryFindnode(n, target, reply)
254 if pendingQueries == 0 {
255 // We have asked all closest nodes, stop the search.
258 // Wait for the next reply.
260 case nodes := <-reply:
261 for _, n := range nodes {
262 if n != nil && !seen[n.ID] {
264 result.push(n, bucketSize)
265 if stopOnMatch && n.sha == target {
266 return result.entries
271 case <-time.After(respTimeout):
272 // forget all pending requests, start new ones
274 reply = make(chan []*Node, alpha)
277 return result.entries
280 func (net *Network) RegisterTopic(topic Topic, stop <-chan struct{}) {
282 case net.topicRegisterReq <- topicRegisterReq{true, topic}:
290 case net.topicRegisterReq <- topicRegisterReq{false, topic}:
296 func (net *Network) SearchTopic(topic Topic, setPeriod <-chan time.Duration, found chan<- *Node, lookup chan<- bool) {
301 case delay, ok := <-setPeriod:
303 case net.topicSearchReq <- topicSearchReq{topic: topic, found: found, lookup: lookup, delay: delay}:
314 func (net *Network) reqRefresh(nursery []*Node) <-chan struct{} {
316 case net.refreshReq <- nursery:
317 return <-net.refreshResp
323 func (net *Network) reqQueryFindnode(n *Node, target common.Hash, reply chan []*Node) bool {
324 q := &findnodeQuery{remote: n, target: target, reply: reply}
326 case net.queryReq <- q:
333 func (net *Network) reqReadPacket(pkt ingressPacket) {
335 case net.read <- pkt:
340 func (net *Network) reqTableOp(f func()) (called bool) {
342 case net.tableOpReq <- f:
350 // TODO: external address handling.
352 type topicSearchInfo struct {
353 lookupChn chan<- bool
357 const maxSearchCount = 5
359 func (net *Network) loop() {
361 refreshTimer = time.NewTicker(autoRefreshInterval)
362 bucketRefreshTimer = time.NewTimer(bucketRefreshInterval)
363 refreshDone chan struct{} // closed when the 'refresh' lookup has ended
366 // Tracking the next ticket to register.
368 nextTicket *ticketRef
369 nextRegisterTimer *time.Timer
370 nextRegisterTime <-chan time.Time
373 if nextRegisterTimer != nil {
374 nextRegisterTimer.Stop()
377 bucketRefreshTimer.Stop()
379 resetNextTicket := func() {
380 ticket, timeout := net.ticketStore.nextFilteredTicket()
381 if nextTicket != ticket {
383 if nextRegisterTimer != nil {
384 nextRegisterTimer.Stop()
385 nextRegisterTime = nil
388 nextRegisterTimer = time.NewTimer(timeout)
389 nextRegisterTime = nextRegisterTimer.C
394 // Tracking registration and search lookups.
396 topicRegisterLookupTarget lookupInfo
397 topicRegisterLookupDone chan []*Node
398 topicRegisterLookupTick = time.NewTimer(0)
399 searchReqWhenRefreshDone []topicSearchReq
400 searchInfo = make(map[Topic]topicSearchInfo)
401 activeSearchCount int
403 topicSearchLookupDone := make(chan topicSearchResult, 100)
404 topicSearch := make(chan Topic, 100)
405 <-topicRegisterLookupTick.C
407 statsDump := time.NewTicker(10 * time.Second)
408 defer statsDump.Stop()
416 log.WithFields(log.Fields{"module": logModule}).Debug("close request")
419 // Ingress packet handling.
420 case pkt := <-net.read:
421 log.WithFields(log.Fields{"module": logModule}).Debug("read from net")
422 n := net.internNode(&pkt)
425 if err := net.handle(n, pkt.ev, &pkt); err != nil {
428 log.WithFields(log.Fields{"module": logModule, "node num": net.tab.count, "event": pkt.ev, "remote id": hex.EncodeToString(pkt.remoteID[:8]), "remote addr": pkt.remoteAddr, "pre state": prestate, "node state": n.state, "status": status}).Debug("handle ingress msg")
430 // TODO: persist state if n.state goes >= known, delete if it goes <= known
432 // State transition timeouts.
433 case timeout := <-net.timeout:
434 log.WithFields(log.Fields{"module": logModule}).Debug("net timeout")
435 if net.timeoutTimers[timeout] == nil {
436 // Stale timer (was aborted).
439 delete(net.timeoutTimers, timeout)
440 prestate := timeout.node.state
442 if err := net.handle(timeout.node, timeout.ev, nil); err != nil {
445 log.WithFields(log.Fields{"module": logModule, "node num": net.tab.count, "event": timeout.ev, "node id": hex.EncodeToString(timeout.node.ID[:8]), "node addr": timeout.node.addr(), "pre state": prestate, "node state": timeout.node.state, "status": status}).Debug("handle timeout")
448 case q := <-net.queryReq:
449 log.WithFields(log.Fields{"module": logModule}).Debug("net query request")
451 q.remote.deferQuery(q)
454 // Interacting with the table.
455 case f := <-net.tableOpReq:
456 log.WithFields(log.Fields{"module": logModule}).Debug("net table operate request")
458 net.tableOpResp <- struct{}{}
460 // Topic registration stuff.
461 case req := <-net.topicRegisterReq:
462 log.WithFields(log.Fields{"module": logModule, "topic": req.topic}).Debug("net topic register request")
464 net.ticketStore.removeRegisterTopic(req.topic)
467 net.ticketStore.addTopic(req.topic, true)
468 // If we're currently waiting idle (nothing to look up), give the ticket store a
469 // chance to start it sooner. This should speed up convergence of the radius
470 // determination for new topics.
471 // if topicRegisterLookupDone == nil {
472 if topicRegisterLookupTarget.target == (common.Hash{}) {
473 log.WithFields(log.Fields{"module": logModule, "topic": req.topic}).Debug("topic register lookup target null")
474 if topicRegisterLookupTick.Stop() {
475 <-topicRegisterLookupTick.C
477 target, delay := net.ticketStore.nextRegisterLookup()
478 topicRegisterLookupTarget = target
479 topicRegisterLookupTick.Reset(delay)
482 case nodes := <-topicRegisterLookupDone:
483 log.WithFields(log.Fields{"module": logModule}).Debug("topic register lookup done")
484 net.ticketStore.registerLookupDone(topicRegisterLookupTarget, nodes, func(n *Node) []byte {
485 net.ping(n, n.addr())
488 target, delay := net.ticketStore.nextRegisterLookup()
489 topicRegisterLookupTarget = target
490 topicRegisterLookupTick.Reset(delay)
491 topicRegisterLookupDone = nil
493 case <-topicRegisterLookupTick.C:
494 log.WithFields(log.Fields{"module": logModule}).Debug("topic register lookup tick")
495 if (topicRegisterLookupTarget.target == common.Hash{}) {
496 target, delay := net.ticketStore.nextRegisterLookup()
497 topicRegisterLookupTarget = target
498 topicRegisterLookupTick.Reset(delay)
499 topicRegisterLookupDone = nil
501 topicRegisterLookupDone = make(chan []*Node)
502 target := topicRegisterLookupTarget.target
503 go func() { topicRegisterLookupDone <- net.lookup(target, false) }()
506 case <-nextRegisterTime:
507 log.WithFields(log.Fields{"module": logModule}).Debug("next register time")
508 net.ticketStore.ticketRegistered(*nextTicket)
509 net.conn.sendTopicRegister(nextTicket.t.node, nextTicket.t.topics, nextTicket.idx, nextTicket.t.pong)
511 case req := <-net.topicSearchReq:
512 if refreshDone == nil {
513 log.WithFields(log.Fields{"module": logModule, "topic": req.topic}).Debug("net topic rearch req")
514 info, ok := searchInfo[req.topic]
516 if req.delay == time.Duration(0) {
517 delete(searchInfo, req.topic)
518 net.ticketStore.removeSearchTopic(req.topic)
520 info.period = req.delay
521 searchInfo[req.topic] = info
525 if req.delay != time.Duration(0) {
526 var info topicSearchInfo
527 info.period = req.delay
528 info.lookupChn = req.lookup
529 searchInfo[req.topic] = info
530 net.ticketStore.addSearchTopic(req.topic, req.found)
531 topicSearch <- req.topic
534 searchReqWhenRefreshDone = append(searchReqWhenRefreshDone, req)
537 case topic := <-topicSearch:
538 if activeSearchCount < maxSearchCount {
540 target := net.ticketStore.nextSearchLookup(topic)
542 nodes := net.lookup(target.target, false)
543 topicSearchLookupDone <- topicSearchResult{target: target, nodes: nodes}
546 period := searchInfo[topic].period
547 if period != time.Duration(0) {
554 case res := <-topicSearchLookupDone:
556 if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil {
557 lookupChn <- net.ticketStore.radius[res.target.topic].converged
559 net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node, topic Topic) []byte {
560 if n.state != nil && n.state.canQuery {
561 return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration
563 if n.state == unknown {
564 net.ping(n, n.addr())
571 log.WithFields(log.Fields{"module": logModule}).Debug("stats dump clock")
572 /*r, ok := net.ticketStore.radius[testTopic]
574 fmt.Printf("(%x) no radius @ %v\n", net.tab.self.ID[:8], time.Now())
576 topics := len(net.ticketStore.tickets)
577 tickets := len(net.ticketStore.nodes)
578 rad := r.radius / (maxRadius/10000+1)
579 fmt.Printf("(%x) topics:%d radius:%d tickets:%d @ %v\n", net.tab.self.ID[:8], topics, rad, tickets, time.Now())
583 for topic, r := range net.ticketStore.radius {
584 if printTestImgLogs {
585 rad := r.radius / (maxRadius/1000000 + 1)
586 minrad := r.minRadius / (maxRadius/1000000 + 1)
587 log.WithFields(log.Fields{"module": logModule}).Debugf("*R %d %v %016x %v\n", tm/1000000, topic, net.tab.self.sha[:8], rad)
588 log.WithFields(log.Fields{"module": logModule}).Debugf("*MR %d %v %016x %v\n", tm/1000000, topic, net.tab.self.sha[:8], minrad)
591 for topic, t := range net.topictab.topics {
592 wp := t.wcl.nextWaitPeriod(tm)
593 if printTestImgLogs {
594 log.WithFields(log.Fields{"module": logModule}).Debugf("*W %d %v %016x %d\n", tm/1000000, topic, net.tab.self.sha[:8], wp/1000000)
598 // Periodic / lookup-initiated bucket refresh.
599 case <-refreshTimer.C:
600 log.WithFields(log.Fields{"module": logModule}).Debug("refresh timer clock")
601 // TODO: ideally we would start the refresh timer after
602 // fallback nodes have been set for the first time.
603 if refreshDone == nil {
604 refreshDone = make(chan struct{})
605 net.refresh(refreshDone)
607 case <-bucketRefreshTimer.C:
608 target := net.tab.chooseBucketRefreshTarget()
610 net.lookup(target, false)
611 bucketRefreshTimer.Reset(bucketRefreshInterval)
613 case newNursery := <-net.refreshReq:
614 log.WithFields(log.Fields{"module": logModule}).Debug("net refresh request")
615 if newNursery != nil {
616 net.nursery = newNursery
618 if refreshDone == nil {
619 refreshDone = make(chan struct{})
620 net.refresh(refreshDone)
622 net.refreshResp <- refreshDone
624 log.WithFields(log.Fields{"module": logModule, "table size": net.tab.count}).Debug("net refresh done")
625 if net.tab.count != 0 {
627 list := searchReqWhenRefreshDone
628 searchReqWhenRefreshDone = nil
630 for _, req := range list {
631 net.topicSearchReq <- req
635 refreshDone = make(chan struct{})
636 net.refresh(refreshDone)
640 log.WithFields(log.Fields{"module": logModule}).Debug("loop stopped,shutting down")
644 if refreshDone != nil {
645 // TODO: wait for pending refresh.
648 // Cancel all pending timeouts.
649 for _, timer := range net.timeoutTimers {
658 // Everything below runs on the Network.loop goroutine
659 // and can modify Node, Table and Network at any time without locking.
661 func (net *Network) refresh(done chan<- struct{}) {
664 seeds = net.db.querySeeds(seedCount, seedMaxAge)
670 log.WithFields(log.Fields{"module": logModule}).Debug("no seed nodes found")
671 time.AfterFunc(time.Second*10, func() { close(done) })
674 for _, n := range seeds {
675 n = net.internNodeFromDB(n)
676 if n.state == unknown {
677 net.transition(n, verifyinit)
679 // Force-add the seed node so Lookup does something.
680 // It will be deleted again if verification fails.
683 // Start self lookup to fill up the buckets.
685 net.Lookup(net.tab.self.ID)
692 func (net *Network) internNode(pkt *ingressPacket) *Node {
693 if n := net.nodes[pkt.remoteID]; n != nil {
694 n.IP = pkt.remoteAddr.IP
695 n.UDP = uint16(pkt.remoteAddr.Port)
696 n.TCP = uint16(pkt.remoteAddr.Port)
699 n := NewNode(pkt.remoteID, pkt.remoteAddr.IP, uint16(pkt.remoteAddr.Port), uint16(pkt.remoteAddr.Port))
701 net.nodes[pkt.remoteID] = n
705 func (net *Network) internNodeFromDB(dbn *Node) *Node {
706 if n := net.nodes[dbn.ID]; n != nil {
709 n := NewNode(dbn.ID, dbn.IP, dbn.UDP, dbn.TCP)
715 func (net *Network) internNodeFromNeighbours(sender *net.UDPAddr, rn rpcNode) (n *Node, err error) {
716 if rn.ID == net.tab.self.ID {
717 return nil, errors.New("is self")
719 if rn.UDP <= lowPort {
720 return nil, errors.New("low port")
724 // We haven't seen this node before.
725 n, err = nodeFromRPC(sender, rn)
726 if net.netrestrict != nil && !net.netrestrict.Contains(n.IP) {
727 return n, errors.New("not contained in netrestrict whitelist")
735 if !n.IP.Equal(rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP {
736 if n.state == known {
737 // reject address change if node is known by us
738 err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n)
740 // accept otherwise; this will be handled nicer with signed ENRs
749 // nodeNetGuts is embedded in Node and contains fields.
750 type nodeNetGuts struct {
751 // This is a cached copy of sha3(ID) which is used for node
752 // distance calculations. This is part of Node in order to make it
753 // possible to write tests that need a node at a certain distance.
754 // In those tests, the content of sha will not actually correspond
758 // State machine fields. Access to these fields
759 // is restricted to the Network.loop goroutine.
761 pingEcho []byte // hash of last ping sent by us
762 pingTopics []Topic // topic set sent by us in last ping
763 deferredQueries []*findnodeQuery // queries that can't be sent yet
764 pendingNeighbours *findnodeQuery // current query, waiting for reply
768 func (n *nodeNetGuts) deferQuery(q *findnodeQuery) {
769 n.deferredQueries = append(n.deferredQueries, q)
772 func (n *nodeNetGuts) startNextQuery(net *Network) {
773 if len(n.deferredQueries) == 0 {
776 nextq := n.deferredQueries[0]
777 if nextq.start(net) {
778 n.deferredQueries = append(n.deferredQueries[:0], n.deferredQueries[1:]...)
782 func (q *findnodeQuery) start(net *Network) bool {
783 // Satisfy queries against the local node directly.
784 if q.remote == net.tab.self {
785 log.WithFields(log.Fields{"module": logModule}).Debug("findnodeQuery self")
786 closest := net.tab.closest(q.target, bucketSize)
788 q.reply <- closest.entries
791 if q.remote.state.canQuery && q.remote.pendingNeighbours == nil {
792 log.WithFields(log.Fields{"module": logModule, "remote peer": q.remote.ID, "targetID": q.target}).Debug("find node query")
793 net.conn.sendFindnodeHash(q.remote, q.target)
794 net.timedEvent(respTimeout, q.remote, neighboursTimeout)
795 q.remote.pendingNeighbours = q
798 // If the node is not known yet, it won't accept queries.
799 // Initiate the transition to known.
800 // The request will be sent later when the node reaches known state.
801 if q.remote.state == unknown {
802 log.WithFields(log.Fields{"module": logModule, "id": q.remote.ID, "status": "unknown->verify init"}).Debug("find node query")
803 net.transition(q.remote, verifyinit)
808 // Node Events (the input to the state machine).
812 //go:generate stringer -type=nodeEvent
815 invalidEvent nodeEvent = iota // zero is reserved
817 // Packet type events.
818 // These correspond to packet types in the UDP protocol.
828 // Non-packet events.
829 // Event values in this category are allocated outside
830 // the packet type range (packet types are encoded as a single byte).
831 pongTimeout nodeEvent = iota + 256
836 // Node State Machine.
838 type nodeState struct {
840 handle func(*Network, *Node, nodeEvent, *ingressPacket) (next *nodeState, err error)
841 enter func(*Network, *Node)
845 func (s *nodeState) String() string {
851 verifyinit *nodeState
852 verifywait *nodeState
853 remoteverifywait *nodeState
856 unresponsive *nodeState
860 unknown = &nodeState{
862 enter: func(net *Network, n *Node) {
865 // Abort active queries.
866 for _, q := range n.deferredQueries {
869 n.deferredQueries = nil
870 if n.pendingNeighbours != nil {
871 n.pendingNeighbours.reply <- nil
872 n.pendingNeighbours = nil
876 handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
879 net.handlePing(n, pkt)
880 net.ping(n, pkt.remoteAddr)
881 return verifywait, nil
883 return unknown, errInvalidEvent
888 verifyinit = &nodeState{
890 enter: func(net *Network, n *Node) {
891 net.ping(n, n.addr())
893 handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
896 net.handlePing(n, pkt)
897 return verifywait, nil
899 err := net.handleKnownPong(n, pkt)
900 return remoteverifywait, err
904 return verifyinit, errInvalidEvent
909 verifywait = &nodeState{
911 handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
914 net.handlePing(n, pkt)
915 return verifywait, nil
917 err := net.handleKnownPong(n, pkt)
922 return verifywait, errInvalidEvent
927 remoteverifywait = &nodeState{
928 name: "remoteverifywait",
929 enter: func(net *Network, n *Node) {
930 net.timedEvent(respTimeout, n, pingTimeout)
932 handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
935 net.handlePing(n, pkt)
936 return remoteverifywait, nil
940 return remoteverifywait, errInvalidEvent
948 enter: func(net *Network, n *Node) {
950 n.startNextQuery(net)
951 // Insert into the table and start revalidation of the last node
952 // in the bucket if it is full.
953 last := net.tab.add(n)
954 if last != nil && last.state == known {
955 // TODO: do this asynchronously
956 net.transition(last, contested)
959 handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
960 if err := net.db.updateNode(n); err != nil {
966 net.handlePing(n, pkt)
969 err := net.handleKnownPong(n, pkt)
972 return net.handleQueryEvent(n, ev, pkt)
977 contested = &nodeState{
980 enter: func(net *Network, n *Node) {
982 net.ping(n, n.addr())
984 handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
987 // Node is still alive.
988 err := net.handleKnownPong(n, pkt)
991 net.tab.deleteReplace(n)
992 return unresponsive, nil
994 net.handlePing(n, pkt)
995 return contested, nil
997 return net.handleQueryEvent(n, ev, pkt)
1002 unresponsive = &nodeState{
1003 name: "unresponsive",
1005 handle: func(net *Network, n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
1006 net.db.deleteNode(n.ID)
1010 net.handlePing(n, pkt)
1013 err := net.handleKnownPong(n, pkt)
1016 return net.handleQueryEvent(n, ev, pkt)
1022 // handle processes packets sent by n and events related to n.
1023 func (net *Network) handle(n *Node, ev nodeEvent, pkt *ingressPacket) error {
1024 //fmt.Println("handle", n.addr().String(), n.state, ev)
1026 if err := net.checkPacket(n, ev, pkt); err != nil {
1027 //fmt.Println("check err:", err)
1030 // Start the background expiration goroutine after the first
1031 // successful communication. Subsequent calls have no effect if it
1032 // is already running. We do this here instead of somewhere else
1033 // so that the search for seed nodes also considers older nodes
1034 // that would otherwise be removed by the expirer.
1036 net.db.ensureExpirer()
1040 n.state = unknown //???
1042 next, err := n.state.handle(net, n, ev, pkt)
1043 net.transition(n, next)
1044 //fmt.Println("new state:", n.state)
1048 func (net *Network) checkPacket(n *Node, ev nodeEvent, pkt *ingressPacket) error {
1049 // Replay prevention checks.
1051 case pingPacket, findnodeHashPacket, neighborsPacket:
1052 // TODO: check date is > last date seen
1053 // TODO: check ping version
1055 if !bytes.Equal(pkt.data.(*pong).ReplyTok, n.pingEcho) {
1056 // fmt.Println("pong reply token mismatch")
1057 return fmt.Errorf("pong reply token mismatch")
1061 // Address validation.
1062 // TODO: Ideally we would do the following:
1063 // - reject all packets with wrong address except ping.
1064 // - for ping with new address, transition to verifywait but keep the
1065 // previous node (with old address) around. if the new one reaches known,
1070 func (net *Network) transition(n *Node, next *nodeState) {
1071 if n.state != next {
1073 if next.enter != nil {
1078 // TODO: persist/unpersist node
1081 func (net *Network) timedEvent(d time.Duration, n *Node, ev nodeEvent) {
1082 timeout := timeoutEvent{ev, n}
1083 net.timeoutTimers[timeout] = time.AfterFunc(d, func() {
1085 case net.timeout <- timeout:
1091 func (net *Network) abortTimedEvent(n *Node, ev nodeEvent) {
1092 timer := net.timeoutTimers[timeoutEvent{ev, n}]
1095 delete(net.timeoutTimers, timeoutEvent{ev, n})
1099 func (net *Network) ping(n *Node, addr *net.UDPAddr) {
1100 //fmt.Println("ping", n.addr().String(), n.ID.String(), n.sha.Hex())
1101 if n.pingEcho != nil || n.ID == net.tab.self.ID {
1102 //fmt.Println(" not sent")
1105 log.WithFields(log.Fields{"module": logModule, "node": n.ID}).Debug("Pinging remote node")
1106 n.pingTopics = net.ticketStore.regTopicSet()
1107 n.pingEcho = net.conn.sendPing(n, addr, n.pingTopics)
1108 net.timedEvent(respTimeout, n, pongTimeout)
1111 func (net *Network) handlePing(n *Node, pkt *ingressPacket) {
1112 log.WithFields(log.Fields{"module": logModule, "node": n.ID}).Debug("Handling remote ping")
1113 ping := pkt.data.(*ping)
1114 n.TCP = ping.From.TCP
1115 t := net.topictab.getTicket(n, ping.Topics)
1118 To: makeEndpoint(n.addr(), n.TCP), // TODO: maybe use known TCP port from DB
1120 Expiration: uint64(time.Now().Add(expiration).Unix()),
1122 ticketToPong(t, pong)
1123 net.conn.send(n, pongPacket, pong)
1126 func (net *Network) handleKnownPong(n *Node, pkt *ingressPacket) error {
1127 log.WithFields(log.Fields{"module": logModule, "node": n.ID}).Debug("Handling known pong")
1128 net.abortTimedEvent(n, pongTimeout)
1130 ticket, err := pongToTicket(now, n.pingTopics, n, pkt)
1132 // fmt.Printf("(%x) ticket: %+v\n", net.tab.self.ID[:8], pkt.data)
1133 net.ticketStore.addTicket(now, pkt.data.(*pong).ReplyTok, ticket)
1135 log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("Failed to convert pong to ticket")
1139 net.db.updateLastPong(n.ID, time.Now())
1143 func (net *Network) handleQueryEvent(n *Node, ev nodeEvent, pkt *ingressPacket) (*nodeState, error) {
1145 case findnodePacket:
1146 results := net.tab.closest(hash(pkt.data.(*findnode).Target[:]), bucketSize).entries
1147 net.conn.sendNeighbours(n, results)
1149 case neighborsPacket:
1150 err := net.handleNeighboursPacket(n, pkt)
1152 case neighboursTimeout:
1153 if n.pendingNeighbours != nil {
1154 n.pendingNeighbours.reply <- nil
1155 n.pendingNeighbours = nil
1158 if n.queryTimeouts > maxFindnodeFailures && n.state == known {
1159 return contested, errors.New("too many timeouts")
1165 case findnodeHashPacket:
1166 results := net.tab.closest(pkt.data.(*findnodeHash).Target, bucketSize).entries
1167 net.conn.sendNeighbours(n, results)
1169 case topicRegisterPacket:
1170 //fmt.Println("got topicRegisterPacket")
1171 regdata := pkt.data.(*topicRegister)
1172 pong, err := net.checkTopicRegister(regdata, net.conn.getNetID())
1175 return n.state, fmt.Errorf("bad waiting ticket: %v", err)
1177 net.topictab.useTicket(n, pong.TicketSerial, regdata.Topics, int(regdata.Idx), pong.Expiration, pong.WaitPeriods)
1179 case topicQueryPacket:
1180 // TODO: handle expiration
1181 topic := pkt.data.(*topicQuery).Topic
1182 results := net.topictab.getEntries(topic)
1183 if _, ok := net.ticketStore.tickets[topic]; ok {
1184 results = append(results, net.tab.self) // we're not registering in our own table but if we're advertising, return ourselves too
1186 if len(results) > 10 {
1187 results = results[:10]
1189 var hash common.Hash
1190 copy(hash[:], pkt.hash)
1191 net.conn.sendTopicNodes(n, hash, results)
1193 case topicNodesPacket:
1194 p := pkt.data.(*topicNodes)
1195 if net.ticketStore.gotTopicNodes(n, p.Echo, p.Nodes) {
1197 if n.queryTimeouts > maxFindnodeFailures && n.state == known {
1198 return contested, errors.New("too many timeouts")
1204 return n.state, errInvalidEvent
1208 func (net *Network) checkTopicRegister(data *topicRegister, netID uint64) (*pong, error) {
1209 var pongpkt ingressPacket
1210 if err := decodePacket(data.Pong, &pongpkt, netID); err != nil {
1213 if pongpkt.ev != pongPacket {
1214 return nil, errors.New("is not pong packet")
1216 if pongpkt.remoteID != net.tab.self.ID {
1217 return nil, errors.New("not signed by us")
1219 // check that we previously authorised all topics
1220 // that the other side is trying to register.
1221 hash, _, _ := wireHash(data.Topics)
1222 if hash != pongpkt.data.(*pong).TopicHash {
1223 return nil, errors.New("topic hash mismatch")
1225 if int(data.Idx) < 0 || int(data.Idx) >= len(data.Topics) {
1226 return nil, errors.New("topic index out of range")
1228 return pongpkt.data.(*pong), nil
1231 func wireHash(x interface{}) (h common.Hash, n int, err error) {
1233 wire.WriteBinary(x, hw, &n, &err)
1238 func (net *Network) handleNeighboursPacket(n *Node, pkt *ingressPacket) error {
1239 if n.pendingNeighbours == nil {
1242 net.abortTimedEvent(n, neighboursTimeout)
1244 req := pkt.data.(*neighbors)
1245 nodes := make([]*Node, len(req.Nodes))
1246 for i, rn := range req.Nodes {
1247 nn, err := net.internNodeFromNeighbours(pkt.remoteAddr, rn)
1249 log.WithFields(log.Fields{"module": logModule, "ip": rn.IP, "id:": n.ID[:8], "addr:": pkt.remoteAddr, "error": err}).Debug("invalid neighbour")
1253 // Start validation of query results immediately.
1254 // This fills the table quickly.
1255 // TODO: generates way too many packets, maybe do it via queue.
1256 if nn.state == unknown {
1257 net.transition(nn, verifyinit)
1260 // TODO: don't ignore second packet
1261 n.pendingNeighbours.reply <- nodes
1262 n.pendingNeighbours = nil
1263 // Now that this query is done, start the next one.
1264 n.startNextQuery(net)