8 log "github.com/sirupsen/logrus"
9 cmn "github.com/tendermint/tmlibs/common"
11 "github.com/bytom/blockchain/txfeed"
12 "github.com/bytom/mining/cpuminer"
13 "github.com/bytom/mining/miningpool"
14 "github.com/bytom/p2p"
15 "github.com/bytom/p2p/trust"
16 "github.com/bytom/protocol"
17 "github.com/bytom/protocol/bc"
18 protocolTypes "github.com/bytom/protocol/bc/types"
19 "github.com/bytom/types"
20 "github.com/bytom/wallet"
24 // BlockchainChannel is a channel for blocks and status updates
25 BlockchainChannel = byte(0x40)
26 maxNewBlockChSize = int(1024)
28 statusUpdateIntervalSeconds = 10
29 maxBlockchainResponseSize = 22020096 + 2
30 crosscoreRPCPrefix = "/rpc/"
33 // BlockchainReactor handles long-term catchup syncing.
34 type BlockchainReactor struct {
39 TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
40 blockKeeper *blockKeeper
41 txPool *protocol.TxPool
42 mining *cpuminer.CPUMiner
43 miningPool *miningpool.MiningPool
45 evsw types.EventSwitch
46 newBlockCh chan *bc.Hash
50 // Info return the server information
51 func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
52 return map[string]interface{}{
53 "is_configured": false,
55 "build_commit": "----",
56 "build_date": "------",
57 "build_config": "---------",
61 // NewBlockchainReactor returns the reactor of whole blockchain.
62 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
63 newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
64 bcr := &BlockchainReactor{
67 blockKeeper: newBlockKeeper(chain, sw),
70 TxFeedTracker: txfeeds,
71 miningEnable: miningEnable,
72 newBlockCh: newBlockCh,
76 bcr.mining = cpuminer.NewCPUMiner(chain, nil, txPool, newBlockCh)
77 bcr.miningPool = miningpool.NewMiningPool(chain, nil, txPool, newBlockCh)
79 bcr.mining = cpuminer.NewCPUMiner(chain, wallet.AccountMgr, txPool, newBlockCh)
80 bcr.miningPool = miningpool.NewMiningPool(chain, wallet.AccountMgr, txPool, newBlockCh)
83 bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
87 // OnStart implements BaseService
88 func (bcr *BlockchainReactor) OnStart() error {
89 bcr.BaseReactor.OnStart()
98 // OnStop implements BaseService
99 func (bcr *BlockchainReactor) OnStop() {
100 bcr.BaseReactor.OnStop()
101 if bcr.miningEnable {
104 bcr.blockKeeper.Stop()
107 // GetChannels implements Reactor
108 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
109 return []*p2p.ChannelDescriptor{
111 ID: BlockchainChannel,
113 SendQueueCapacity: 100,
118 // AddPeer implements Reactor by sending our state to peer.
119 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
120 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
123 // RemovePeer implements Reactor by removing peer from the pool.
124 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
125 bcr.blockKeeper.RemovePeer(peer.Key)
128 // Receive implements Reactor by handling 4 types of messages (look below).
129 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
130 var tm *trust.TrustMetric
131 key := src.Connection().RemoteAddress.IP.String()
132 if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
133 log.Errorf("Can't get peer trust metric")
137 _, msg, err := DecodeMessage(msgBytes)
139 log.Errorf("Error decoding messagek %v", err)
142 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
144 switch msg := msg.(type) {
145 case *BlockRequestMessage:
146 var block *protocolTypes.Block
149 block, err = bcr.chain.GetBlockByHeight(msg.Height)
151 block, err = bcr.chain.GetBlockByHash(msg.GetHash())
154 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
157 response, err := NewBlockResponseMessage(block)
159 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
162 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
164 case *BlockResponseMessage:
165 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
167 case *StatusRequestMessage:
168 block := bcr.chain.BestBlock()
169 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
171 case *StatusResponseMessage:
172 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
174 case *TransactionNotifyMessage:
175 tx := msg.GetTransaction()
176 if err := bcr.chain.ValidateTx(tx); err != nil {
177 bcr.sw.AddScamPeer(src)
181 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
185 // Handle messages from the poolReactor telling the reactor what to do.
186 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
187 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
188 func (bcr *BlockchainReactor) syncRoutine() {
189 statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
190 newTxCh := bcr.txPool.GetNewTxCh()
194 case blockHash := <-bcr.newBlockCh:
195 block, err := bcr.chain.GetBlockByHash(blockHash)
197 log.Errorf("Error get block from newBlockCh %v", err)
199 log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
200 case newTx := <-newTxCh:
201 bcr.TxFeedTracker.TxFilter(newTx)
202 go bcr.BroadcastTransaction(newTx)
203 case _ = <-statusUpdateTicker.C:
204 go bcr.BroadcastStatusResponse()
206 if bcr.miningEnable {
207 // mining if and only if block sync is finished
208 if bcr.blockKeeper.IsCaughtUp() {
220 // BroadcastStatusResponse broadcasts `BlockStore` height.
221 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
222 block := bcr.chain.BestBlock()
223 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
226 // BroadcastTransaction broadcats `BlockStore` transaction.
227 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
228 msg, err := NewTransactionNotifyMessage(tx)
232 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})