OSDN Git Service

fix commands
[bytom/shuttle.git] / vendor / github.com / bytom / netsync / protocol_reactor.go
diff --git a/vendor/github.com/bytom/netsync/protocol_reactor.go b/vendor/github.com/bytom/netsync/protocol_reactor.go
new file mode 100644 (file)
index 0000000..06d68d0
--- /dev/null
@@ -0,0 +1,101 @@
+package netsync
+
+import (
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/errors"
+       "github.com/bytom/p2p"
+       "github.com/bytom/p2p/connection"
+)
+
+const (
+       handshakeTimeout    = 10 * time.Second
+       handshakeCheckPerid = 500 * time.Millisecond
+)
+
+var (
+       errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+       errStatusRequest            = errors.New("Status request error")
+)
+
+//ProtocolReactor handles new coming protocol message.
+type ProtocolReactor struct {
+       p2p.BaseReactor
+
+       sm    *SyncManager
+       peers *peerSet
+}
+
+// NewProtocolReactor returns the reactor of whole blockchain.
+func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
+       pr := &ProtocolReactor{
+               sm:    sm,
+               peers: peers,
+       }
+       pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
+       return pr
+}
+
+// GetChannels implements Reactor
+func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
+       return []*connection.ChannelDescriptor{
+               {
+                       ID:                BlockchainChannel,
+                       Priority:          5,
+                       SendQueueCapacity: 100,
+               },
+       }
+}
+
+// OnStart implements BaseService
+func (pr *ProtocolReactor) OnStart() error {
+       pr.BaseReactor.OnStart()
+       return nil
+}
+
+// OnStop implements BaseService
+func (pr *ProtocolReactor) OnStop() {
+       pr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
+       if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
+               return errStatusRequest
+       }
+
+       checkTicker := time.NewTicker(handshakeCheckPerid)
+       defer checkTicker.Stop()
+       timeout := time.NewTimer(handshakeTimeout)
+       defer timeout.Stop()
+       for {
+               select {
+               case <-checkTicker.C:
+                       if exist := pr.peers.getPeer(peer.Key); exist != nil {
+                               pr.sm.syncTransactions(peer.Key)
+                               return nil
+                       }
+
+               case <-timeout.C:
+                       return errProtocolHandshakeTimeout
+               }
+       }
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+       pr.peers.removePeer(peer.Key)
+}
+
+// Receive implements Reactor by handling 4 types of messages (look below).
+func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+       msgType, msg, err := DecodeMessage(msgBytes)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
+               return
+       }
+
+       pr.sm.processMsg(src, msgType, msg)
+}