9 log "github.com/sirupsen/logrus"
10 cmn "github.com/tendermint/tmlibs/common"
12 "github.com/bytom/errors"
13 "github.com/bytom/p2p"
14 "github.com/bytom/protocol"
15 "github.com/bytom/protocol/bc"
16 "github.com/bytom/protocol/bc/types"
20 // BlockchainChannel is a channel for blocks and status updates
21 BlockchainChannel = byte(0x40)
22 protocolHandshakeTimeout = time.Second * 10
23 handshakeRetryTicker = 4 * time.Second
27 //ErrProtocolHandshakeTimeout peers handshake timeout
28 ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
29 ErrStatusRequest = errors.New("Status request error")
30 ErrDiffGenesisHash = errors.New("Different genesis hash")
33 // Response describes the response standard.
34 type Response struct {
35 Status string `json:"status,omitempty"`
36 Msg string `json:"msg,omitempty"`
37 Data interface{} `json:"data,omitempty"`
40 type initalPeerStatus struct {
47 //ProtocolReactor handles new coming protocol message.
48 type ProtocolReactor struct {
52 blockKeeper *blockKeeper
53 txPool *protocol.TxPool
57 handshakeMu sync.Mutex
60 newPeerCh chan struct{}
61 quitReqBlockCh chan *string
63 peerStatusCh chan *initalPeerStatus
66 // NewProtocolReactor returns the reactor of whole blockchain.
67 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
68 pr := &ProtocolReactor{
70 blockKeeper: blockPeer,
77 quitReqBlockCh: quitReqBlockCh,
78 peerStatusCh: make(chan *initalPeerStatus),
80 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
81 genesisBlock, _ := pr.chain.GetBlockByHeight(0)
82 pr.genesisHash = genesisBlock.Hash()
87 // GetChannels implements Reactor
88 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
89 return []*p2p.ChannelDescriptor{
90 &p2p.ChannelDescriptor{
91 ID: BlockchainChannel,
93 SendQueueCapacity: 100,
98 // OnStart implements BaseService
99 func (pr *ProtocolReactor) OnStart() error {
100 pr.BaseReactor.OnStart()
104 // OnStop implements BaseService
105 func (pr *ProtocolReactor) OnStop() {
106 pr.BaseReactor.OnStop()
109 // syncTransactions starts sending all currently pending transactions to the given peer.
110 func (pr *ProtocolReactor) syncTransactions(p *peer) {
114 pending := pr.txPool.GetTransactions()
115 if len(pending) == 0 {
118 txs := make([]*types.Tx, len(pending))
119 for i, batch := range pending {
122 pr.txSyncCh <- &txsync{p, txs}
125 // AddPeer implements Reactor by sending our state to peer.
126 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
127 pr.handshakeMu.Lock()
128 defer pr.handshakeMu.Unlock()
130 return errPeerDropped
132 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
133 return ErrStatusRequest
135 retryTicker := time.Tick(handshakeRetryTicker)
136 handshakeWait := time.NewTimer(protocolHandshakeTimeout)
139 case status := <-pr.peerStatusCh:
140 if status.peerID == peer.Key {
141 if strings.Compare(pr.genesisHash.String(), status.genesisHash.String()) != 0 {
142 log.Info("Remote peer genesis block hash:", status.genesisHash.String(), " local hash:", pr.genesisHash.String())
143 return ErrDiffGenesisHash
145 pr.peers.AddPeer(peer)
146 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
147 prPeer, ok := pr.peers.Peer(peer.Key)
149 return errPeerDropped
151 pr.syncTransactions(prPeer)
152 pr.newPeerCh <- struct{}{}
157 return errPeerDropped
159 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
160 return ErrStatusRequest
162 case <-handshakeWait.C:
163 return ErrProtocolHandshakeTimeout
168 // RemovePeer implements Reactor by removing peer from the pool.
169 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
171 case pr.quitReqBlockCh <- &peer.Key:
173 log.Warning("quitReqBlockCh is full")
175 pr.peers.RemovePeer(peer.Key)
178 // Receive implements Reactor by handling 4 types of messages (look below).
179 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
180 _, msg, err := DecodeMessage(msgBytes)
182 log.Errorf("Error decoding messagek %v", err)
185 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
187 switch msg := msg.(type) {
188 case *BlockRequestMessage:
189 var block *types.Block
192 block, err = pr.chain.GetBlockByHeight(msg.Height)
194 block, err = pr.chain.GetBlockByHash(msg.GetHash())
197 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
200 response, err := NewBlockResponseMessage(block)
202 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
205 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
207 case *BlockResponseMessage:
208 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
209 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
211 case *StatusRequestMessage:
212 blockHeader := pr.chain.BestBlockHeader()
213 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader, &pr.genesisHash)})
215 case *StatusResponseMessage:
216 peerStatus := &initalPeerStatus{
220 genesisHash: msg.GetGenesisHash(),
222 pr.peerStatusCh <- peerStatus
224 case *TransactionNotifyMessage:
225 tx, err := msg.GetTransaction()
227 log.Errorf("Error decoding new tx %v", err)
230 pr.blockKeeper.AddTx(tx, src.Key)
232 case *MineBlockMessage:
233 block, err := msg.GetMineBlock()
235 log.Errorf("Error decoding mined block %v", err)
238 // Mark the peer as owning the block and schedule it for import
240 pr.peers.MarkBlock(src.Key, &hash)
241 pr.fetcher.Enqueue(src.Key, block)
242 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
245 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))