8 log "github.com/sirupsen/logrus"
9 cmn "github.com/tendermint/tmlibs/common"
11 "github.com/bytom/account"
12 "github.com/bytom/blockchain/txfeed"
13 "github.com/bytom/mining/cpuminer"
14 "github.com/bytom/mining/miningpool"
15 "github.com/bytom/p2p"
16 "github.com/bytom/p2p/trust"
17 "github.com/bytom/protocol"
18 "github.com/bytom/protocol/bc"
19 protocolTypes "github.com/bytom/protocol/bc/types"
20 "github.com/bytom/types"
24 // BlockchainChannel is a channel for blocks and status updates
25 BlockchainChannel = byte(0x40)
26 maxNewBlockChSize = int(1024)
28 statusUpdateIntervalSeconds = 10
29 maxBlockchainResponseSize = 22020096 + 2
32 // BlockchainReactor handles long-term catchup syncing.
33 type BlockchainReactor struct {
37 TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
38 blockKeeper *blockKeeper
39 txPool *protocol.TxPool
40 mining *cpuminer.CPUMiner
41 miningPool *miningpool.MiningPool
43 evsw types.EventSwitch
44 newBlockCh chan *bc.Hash
48 // Info return the server information
49 func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
50 return map[string]interface{}{
51 "is_configured": false,
53 "build_commit": "----",
54 "build_date": "------",
55 "build_config": "---------",
59 // NewBlockchainReactor returns the reactor of whole blockchain.
60 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, accountMgr *account.Manager, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
61 newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
62 bcr := &BlockchainReactor{
64 blockKeeper: newBlockKeeper(chain, sw),
67 TxFeedTracker: txfeeds,
68 miningEnable: miningEnable,
69 newBlockCh: newBlockCh,
72 bcr.mining = cpuminer.NewCPUMiner(chain, accountMgr, txPool, newBlockCh)
73 bcr.miningPool = miningpool.NewMiningPool(chain, accountMgr, txPool, newBlockCh)
75 bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
79 // OnStart implements BaseService
80 func (bcr *BlockchainReactor) OnStart() error {
81 bcr.BaseReactor.OnStart()
90 // OnStop implements BaseService
91 func (bcr *BlockchainReactor) OnStop() {
92 bcr.BaseReactor.OnStop()
96 bcr.blockKeeper.Stop()
99 // GetChannels implements Reactor
100 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
101 return []*p2p.ChannelDescriptor{
103 ID: BlockchainChannel,
105 SendQueueCapacity: 100,
110 // AddPeer implements Reactor by sending our state to peer.
111 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
112 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
115 // RemovePeer implements Reactor by removing peer from the pool.
116 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
117 bcr.blockKeeper.RemovePeer(peer.Key)
120 // Receive implements Reactor by handling 4 types of messages (look below).
121 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
122 var tm *trust.TrustMetric
123 key := src.Connection().RemoteAddress.IP.String()
124 if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
125 log.Errorf("Can't get peer trust metric")
129 _, msg, err := DecodeMessage(msgBytes)
131 log.Errorf("Error decoding messagek %v", err)
134 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
136 switch msg := msg.(type) {
137 case *BlockRequestMessage:
138 var block *protocolTypes.Block
141 block, err = bcr.chain.GetBlockByHeight(msg.Height)
143 block, err = bcr.chain.GetBlockByHash(msg.GetHash())
146 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
149 response, err := NewBlockResponseMessage(block)
151 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
154 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
156 case *BlockResponseMessage:
157 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
159 case *StatusRequestMessage:
160 block := bcr.chain.BestBlock()
161 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
163 case *StatusResponseMessage:
164 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
166 case *TransactionNotifyMessage:
167 tx := msg.GetTransaction()
168 if err := bcr.chain.ValidateTx(tx); err != nil {
169 bcr.sw.AddScamPeer(src)
173 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
177 // Handle messages from the poolReactor telling the reactor what to do.
178 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
179 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
180 func (bcr *BlockchainReactor) syncRoutine() {
181 statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
182 newTxCh := bcr.txPool.GetNewTxCh()
186 case blockHash := <-bcr.newBlockCh:
187 block, err := bcr.chain.GetBlockByHash(blockHash)
189 log.Errorf("Error get block from newBlockCh %v", err)
191 log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
192 case newTx := <-newTxCh:
193 bcr.TxFeedTracker.TxFilter(newTx)
194 go bcr.BroadcastTransaction(newTx)
195 case _ = <-statusUpdateTicker.C:
196 go bcr.BroadcastStatusResponse()
198 if bcr.miningEnable {
199 // mining if and only if block sync is finished
200 if bcr.blockKeeper.IsCaughtUp() {
212 // BroadcastStatusResponse broadcasts `BlockStore` height.
213 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
214 block := bcr.chain.BestBlock()
215 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
218 // BroadcastTransaction broadcats `BlockStore` transaction.
219 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
220 msg, err := NewTransactionNotifyMessage(tx)
224 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})