9 log "github.com/sirupsen/logrus"
10 cmn "github.com/tendermint/tmlibs/common"
12 "github.com/bytom/errors"
13 "github.com/bytom/p2p"
14 "github.com/bytom/p2p/trust"
15 "github.com/bytom/protocol"
16 "github.com/bytom/protocol/bc"
17 "github.com/bytom/protocol/bc/types"
21 // BlockchainChannel is a channel for blocks and status updates
22 BlockchainChannel = byte(0x40)
23 protocolHandshakeTimeout = time.Second * 10
24 handshakeRetryTicker = 4 * time.Second
28 //ErrProtocolHandshakeTimeout peers handshake timeout
29 ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
30 ErrStatusRequest = errors.New("Status request error")
33 // Response describes the response standard.
34 type Response struct {
35 Status string `json:"status,omitempty"`
36 Msg string `json:"msg,omitempty"`
37 Data interface{} `json:"data,omitempty"`
40 type initalPeerStatus struct {
46 //ProtocolReactor handles new coming protocol message.
47 type ProtocolReactor struct {
51 blockKeeper *blockKeeper
52 txPool *protocol.TxPool
56 handshakeMu sync.Mutex
58 newPeerCh chan struct{}
59 quitReqBlockCh chan *string
61 peerStatusCh chan *initalPeerStatus
64 // NewProtocolReactor returns the reactor of whole blockchain.
65 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
66 pr := &ProtocolReactor{
68 blockKeeper: blockPeer,
75 quitReqBlockCh: quitReqBlockCh,
76 peerStatusCh: make(chan *initalPeerStatus),
78 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
82 // GetChannels implements Reactor
83 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
84 return []*p2p.ChannelDescriptor{
85 &p2p.ChannelDescriptor{
86 ID: BlockchainChannel,
88 SendQueueCapacity: 100,
93 // OnStart implements BaseService
94 func (pr *ProtocolReactor) OnStart() error {
95 pr.BaseReactor.OnStart()
99 // OnStop implements BaseService
100 func (pr *ProtocolReactor) OnStop() {
101 pr.BaseReactor.OnStop()
104 // syncTransactions starts sending all currently pending transactions to the given peer.
105 func (pr *ProtocolReactor) syncTransactions(p *peer) {
106 pending := pr.txPool.GetTransactions()
107 if len(pending) == 0 {
110 txs := make([]*types.Tx, len(pending))
111 for i, batch := range pending {
114 pr.txSyncCh <- &txsync{p, txs}
117 // AddPeer implements Reactor by sending our state to peer.
118 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
119 pr.handshakeMu.Lock()
120 defer pr.handshakeMu.Unlock()
122 if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
123 return ErrStatusRequest
125 retryTicker := time.Tick(handshakeRetryTicker)
126 handshakeWait := time.NewTimer(protocolHandshakeTimeout)
129 case status := <-pr.peerStatusCh:
130 if status.peerID == peer.Key {
131 pr.peers.AddPeer(peer)
132 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
133 pr.syncTransactions(pr.peers.Peer(peer.Key))
134 pr.newPeerCh <- struct{}{}
138 if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
139 return ErrStatusRequest
141 case <-handshakeWait.C:
142 return ErrProtocolHandshakeTimeout
147 // RemovePeer implements Reactor by removing peer from the pool.
148 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
149 pr.quitReqBlockCh <- &peer.Key
150 pr.peers.RemovePeer(peer.Key)
153 // Receive implements Reactor by handling 4 types of messages (look below).
154 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
155 var tm *trust.TrustMetric
156 key := src.Connection().RemoteAddress.IP.String()
157 if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
158 log.Errorf("Can't get peer trust metric")
162 _, msg, err := DecodeMessage(msgBytes)
164 log.Errorf("Error decoding messagek %v", err)
167 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
169 switch msg := msg.(type) {
170 case *BlockRequestMessage:
171 var block *types.Block
174 block, err = pr.chain.GetBlockByHeight(msg.Height)
176 block, err = pr.chain.GetBlockByHash(msg.GetHash())
179 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
182 response, err := NewBlockResponseMessage(block)
184 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
187 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
189 case *BlockResponseMessage:
190 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
191 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
193 case *StatusRequestMessage:
194 blockHeader := pr.chain.BestBlockHeader()
195 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
197 case *StatusResponseMessage:
198 peerStatus := &initalPeerStatus{
203 pr.peerStatusCh <- peerStatus
205 case *TransactionNotifyMessage:
206 tx, err := msg.GetTransaction()
208 log.Errorf("Error decoding new tx %v", err)
211 pr.blockKeeper.AddTx(tx, src.Key)
213 case *MineBlockMessage:
214 block, err := msg.GetMineBlock()
216 log.Errorf("Error decoding mined block %v", err)
219 // Mark the peer as owning the block and schedule it for import
221 pr.peers.MarkBlock(src.Key, &hash)
222 pr.fetcher.Enqueue(src.Key, block)
223 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
226 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))