-package netsync
+package chainmgr
import (
- "time"
+ "bytes"
log "github.com/sirupsen/logrus"
+ "github.com/tendermint/go-wire"
"github.com/vapor/errors"
+ msgs "github.com/vapor/netsync/messages"
"github.com/vapor/p2p"
"github.com/vapor/p2p/connection"
)
-const (
- handshakeTimeout = 10 * time.Second
- handshakeCheckPerid = 500 * time.Millisecond
-)
-
-var (
- errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
- errStatusRequest = errors.New("Status request error")
-)
-
//ProtocolReactor handles new coming protocol message.
type ProtocolReactor struct {
p2p.BaseReactor
- sm *SyncManager
- peers *peerSet
+ cm *ChainManager
}
// NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
+func NewProtocolReactor(cm *ChainManager) *ProtocolReactor {
pr := &ProtocolReactor{
- sm: sm,
- peers: peers,
+ cm: cm,
}
pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
return pr
func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
return []*connection.ChannelDescriptor{
{
- ID: BlockchainChannel,
+ ID: msgs.BlockchainChannel,
Priority: 5,
SendQueueCapacity: 100,
},
// AddPeer implements Reactor by sending our state to peer.
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
- pr.sm.AddPeer(peer)
- if err := pr.sm.SendStatus(peer); err != nil {
+ pr.cm.AddPeer(peer)
+ if err := pr.cm.SendStatus(peer); err != nil {
return err
}
- pr.sm.syncTransactions(peer.Key)
+ pr.cm.syncTransactions(peer.Key)
return nil
}
// RemovePeer implements Reactor by removing peer from the pool.
func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- pr.peers.removePeer(peer.Key)
+ pr.cm.RemovePeer(peer.Key)
+}
+
+//decodeMessage decode msg
+func decodeMessage(bz []byte) (msgType byte, msg msgs.BlockchainMessage, err error) {
+ msgType = bz[0]
+ n := int(0)
+ r := bytes.NewReader(bz)
+ msg = wire.ReadBinary(struct{ msgs.BlockchainMessage }{}, r, msgs.MaxBlockchainResponseSize, &n, &err).(struct{ msgs.BlockchainMessage }).BlockchainMessage
+ if err != nil && n != len(bz) {
+ err = errors.New("DecodeMessage() had bytes left over")
+ }
+ return
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
- msgType, msg, err := DecodeMessage(msgBytes)
+ msgType, msg, err := decodeMessage(msgBytes)
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
return
}
- pr.sm.processMsg(src, msgType, msg)
+ pr.cm.processMsg(src, msgType, msg)
}