8 log "github.com/sirupsen/logrus"
9 cmn "github.com/tendermint/tmlibs/common"
11 "github.com/bytom/errors"
12 "github.com/bytom/p2p"
13 "github.com/bytom/p2p/trust"
14 "github.com/bytom/protocol"
15 "github.com/bytom/protocol/bc"
16 "github.com/bytom/protocol/bc/types"
20 // BlockchainChannel is a channel for blocks and status updates
21 BlockchainChannel = byte(0x40)
22 protocolHandshakeTimeout = time.Second * 10
26 //ErrProtocolHandshakeTimeout peers handshake timeout
27 ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
30 // Response describes the response standard.
31 type Response struct {
32 Status string `json:"status,omitempty"`
33 Msg string `json:"msg,omitempty"`
34 Data interface{} `json:"data,omitempty"`
37 type initalPeerStatus struct {
43 //ProtocolReactor handles new coming protocol message.
44 type ProtocolReactor struct {
48 blockKeeper *blockKeeper
49 txPool *protocol.TxPool
54 newPeerCh chan struct{}
55 quitReqBlockCh chan *string
57 peerStatusCh chan *initalPeerStatus
60 // NewProtocolReactor returns the reactor of whole blockchain.
61 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
62 pr := &ProtocolReactor{
64 blockKeeper: blockPeer,
71 quitReqBlockCh: quitReqBlockCh,
72 peerStatusCh: make(chan *initalPeerStatus),
74 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
78 // GetChannels implements Reactor
79 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
80 return []*p2p.ChannelDescriptor{
81 &p2p.ChannelDescriptor{
82 ID: BlockchainChannel,
84 SendQueueCapacity: 100,
89 // OnStart implements BaseService
90 func (pr *ProtocolReactor) OnStart() error {
91 pr.BaseReactor.OnStart()
95 // OnStop implements BaseService
96 func (pr *ProtocolReactor) OnStop() {
97 pr.BaseReactor.OnStop()
100 // syncTransactions starts sending all currently pending transactions to the given peer.
101 func (pr *ProtocolReactor) syncTransactions(p *peer) {
102 pending := pr.txPool.GetTransactions()
103 if len(pending) == 0 {
106 txs := make([]*types.Tx, len(pending))
107 for i, batch := range pending {
110 pr.txSyncCh <- &txsync{p, txs}
113 // AddPeer implements Reactor by sending our state to peer.
114 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
115 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
116 handshakeWait := time.NewTimer(protocolHandshakeTimeout)
119 case status := <-pr.peerStatusCh:
120 if strings.Compare(status.peerID, peer.Key) == 0 {
121 pr.peers.AddPeer(peer)
122 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
123 pr.syncTransactions(pr.peers.Peer(peer.Key))
124 pr.newPeerCh <- struct{}{}
127 case <-handshakeWait.C:
128 return ErrProtocolHandshakeTimeout
133 // RemovePeer implements Reactor by removing peer from the pool.
134 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
135 pr.quitReqBlockCh <- &peer.Key
136 pr.peers.RemovePeer(peer.Key)
139 // Receive implements Reactor by handling 4 types of messages (look below).
140 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
141 var tm *trust.TrustMetric
142 key := src.Connection().RemoteAddress.IP.String()
143 if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
144 log.Errorf("Can't get peer trust metric")
148 _, msg, err := DecodeMessage(msgBytes)
150 log.Errorf("Error decoding messagek %v", err)
153 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
155 switch msg := msg.(type) {
156 case *BlockRequestMessage:
157 var block *types.Block
160 block, err = pr.chain.GetBlockByHeight(msg.Height)
162 block, err = pr.chain.GetBlockByHash(msg.GetHash())
165 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
168 response, err := NewBlockResponseMessage(block)
170 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
173 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
175 case *BlockResponseMessage:
176 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
177 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
179 case *StatusRequestMessage:
180 block := pr.chain.BestBlock()
181 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
183 case *StatusResponseMessage:
184 peerStatus := &initalPeerStatus{
189 pr.peerStatusCh <- peerStatus
191 case *TransactionNotifyMessage:
192 tx, err := msg.GetTransaction()
194 log.Errorf("Error decoding new tx %v", err)
197 pr.blockKeeper.AddTx(tx, src.Key)
199 case *MineBlockMessage:
200 block, err := msg.GetMineBlock()
202 log.Errorf("Error decoding mined block %v", err)
205 // Mark the peer as owning the block and schedule it for import
207 pr.peers.MarkBlock(src.Key, &hash)
208 pr.fetcher.Enqueue(src.Key, block)
209 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
212 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))