OSDN Git Service

Format netsync module code directory (#88)
[bytom/vapor.git] / netsync / chainmgr / protocol_reactor.go
similarity index 61%
rename from netsync/protocol_reactor.go
rename to netsync/chainmgr/protocol_reactor.go
index 8a6c610..86987fb 100644 (file)
@@ -1,38 +1,28 @@
-package netsync
+package chainmgr
 
 import (
-       "time"
+       "bytes"
 
        log "github.com/sirupsen/logrus"
+       "github.com/tendermint/go-wire"
 
        "github.com/vapor/errors"
+       msgs "github.com/vapor/netsync/messages"
        "github.com/vapor/p2p"
        "github.com/vapor/p2p/connection"
 )
 
-const (
-       handshakeTimeout    = 10 * time.Second
-       handshakeCheckPerid = 500 * time.Millisecond
-)
-
-var (
-       errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
-       errStatusRequest            = errors.New("Status request error")
-)
-
 //ProtocolReactor handles new coming protocol message.
 type ProtocolReactor struct {
        p2p.BaseReactor
 
-       sm    *SyncManager
-       peers *peerSet
+       cm *ChainManager
 }
 
 // NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
+func NewProtocolReactor(cm *ChainManager) *ProtocolReactor {
        pr := &ProtocolReactor{
-               sm:    sm,
-               peers: peers,
+               cm: cm,
        }
        pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
        return pr
@@ -42,7 +32,7 @@ func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
        return []*connection.ChannelDescriptor{
                {
-                       ID:                BlockchainChannel,
+                       ID:                msgs.BlockchainChannel,
                        Priority:          5,
                        SendQueueCapacity: 100,
                },
@@ -62,26 +52,38 @@ func (pr *ProtocolReactor) OnStop() {
 
 // AddPeer implements Reactor by sending our state to peer.
 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
-       pr.sm.AddPeer(peer)
-       if err := pr.sm.SendStatus(peer); err != nil {
+       pr.cm.AddPeer(peer)
+       if err := pr.cm.SendStatus(peer); err != nil {
                return err
        }
-       pr.sm.syncTransactions(peer.Key)
+       pr.cm.syncTransactions(peer.Key)
        return nil
 }
 
 // RemovePeer implements Reactor by removing peer from the pool.
 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
-       pr.peers.removePeer(peer.Key)
+       pr.cm.RemovePeer(peer.Key)
+}
+
+//decodeMessage decode msg
+func decodeMessage(bz []byte) (msgType byte, msg msgs.BlockchainMessage, err error) {
+       msgType = bz[0]
+       n := int(0)
+       r := bytes.NewReader(bz)
+       msg = wire.ReadBinary(struct{ msgs.BlockchainMessage }{}, r, msgs.MaxBlockchainResponseSize, &n, &err).(struct{ msgs.BlockchainMessage }).BlockchainMessage
+       if err != nil && n != len(bz) {
+               err = errors.New("DecodeMessage() had bytes left over")
+       }
+       return
 }
 
 // Receive implements Reactor by handling 4 types of messages (look below).
 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
-       msgType, msg, err := DecodeMessage(msgBytes)
+       msgType, msg, err := decodeMessage(msgBytes)
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
                return
        }
 
-       pr.sm.processMsg(src, msgType, msg)
+       pr.cm.processMsg(src, msgType, msg)
 }