9 log "github.com/sirupsen/logrus"
10 cmn "github.com/tendermint/tmlibs/common"
12 "github.com/bytom/blockchain/txfeed"
13 "github.com/bytom/wallet"
14 "github.com/bytom/mining/cpuminer"
15 "github.com/bytom/mining/miningpool"
16 "github.com/bytom/p2p"
17 "github.com/bytom/p2p/trust"
18 "github.com/bytom/protocol"
19 "github.com/bytom/protocol/bc"
20 protocolTypes "github.com/bytom/protocol/bc/types"
21 "github.com/bytom/types"
25 // BlockchainChannel is a channel for blocks and status updates
26 BlockchainChannel = byte(0x40)
27 maxNewBlockChSize = int(1024)
29 statusUpdateIntervalSeconds = 10
30 maxBlockchainResponseSize = 22020096 + 2
31 crosscoreRPCPrefix = "/rpc/"
35 // SUCCESS indicates the rpc calling is successful.
37 // FAIL indicated the rpc calling is failed.
41 // Response describes the response standard.
42 type Response struct {
43 Status string `json:"status,omitempty"`
44 Msg string `json:"msg,omitempty"`
45 Data interface{} `json:"data,omitempty"`
48 //BlockchainReactor handles long-term catchup syncing.
49 type BlockchainReactor struct {
54 txFeedTracker *txfeed.Tracker
55 blockKeeper *blockKeeper
56 txPool *protocol.TxPool
57 mining *cpuminer.CPUMiner
58 miningPool *miningpool.MiningPool
60 evsw types.EventSwitch
61 newBlockCh chan *bc.Hash
65 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
66 return map[string]interface{}{
67 "is_configured": false,
69 "build_commit": "----",
70 "build_date": "------",
71 "build_config": "---------",
75 func maxBytes(h http.Handler) http.Handler {
76 const maxReqSize = 1e7 // 10MB
77 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
78 // A block can easily be bigger than maxReqSize, but everything
79 // else should be pretty small.
80 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
81 req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
87 // NewBlockchainReactor returns the reactor of whole blockchain.
88 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
89 newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
90 bcr := &BlockchainReactor{
93 blockKeeper: newBlockKeeper(chain, sw),
96 txFeedTracker: txfeeds,
97 miningEnable: miningEnable,
98 newBlockCh: newBlockCh,
102 bcr.mining = cpuminer.NewCPUMiner(chain, nil, txPool, newBlockCh)
103 bcr.miningPool = miningpool.NewMiningPool(chain, nil, txPool, newBlockCh)
105 bcr.mining = cpuminer.NewCPUMiner(chain, wallet.AccountMgr, txPool, newBlockCh)
106 bcr.miningPool = miningpool.NewMiningPool(chain, wallet.AccountMgr, txPool, newBlockCh)
109 bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
113 // OnStart implements BaseService
114 func (bcr *BlockchainReactor) OnStart() error {
115 bcr.BaseReactor.OnStart()
117 if bcr.miningEnable {
124 // OnStop implements BaseService
125 func (bcr *BlockchainReactor) OnStop() {
126 bcr.BaseReactor.OnStop()
127 if bcr.miningEnable {
130 bcr.blockKeeper.Stop()
133 // GetChannels implements Reactor
134 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
135 return []*p2p.ChannelDescriptor{
137 ID: BlockchainChannel,
139 SendQueueCapacity: 100,
144 // AddPeer implements Reactor by sending our state to peer.
145 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
146 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
149 // RemovePeer implements Reactor by removing peer from the pool.
150 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
151 bcr.blockKeeper.RemovePeer(peer.Key)
154 // Receive implements Reactor by handling 4 types of messages (look below).
155 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
156 var tm *trust.TrustMetric
157 key := src.Connection().RemoteAddress.IP.String()
158 if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
159 log.Errorf("Can't get peer trust metric")
163 _, msg, err := DecodeMessage(msgBytes)
165 log.Errorf("Error decoding messagek %v", err)
168 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
170 switch msg := msg.(type) {
171 case *BlockRequestMessage:
172 var block *protocolTypes.Block
175 block, err = bcr.chain.GetBlockByHeight(msg.Height)
177 block, err = bcr.chain.GetBlockByHash(msg.GetHash())
180 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
183 response, err := NewBlockResponseMessage(block)
185 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
188 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
190 case *BlockResponseMessage:
191 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
193 case *StatusRequestMessage:
194 block := bcr.chain.BestBlock()
195 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
197 case *StatusResponseMessage:
198 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
200 case *TransactionNotifyMessage:
201 tx := msg.GetTransaction()
202 if err := bcr.chain.ValidateTx(tx); err != nil {
203 bcr.sw.AddScamPeer(src)
207 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
211 // Handle messages from the poolReactor telling the reactor what to do.
212 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
213 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
214 func (bcr *BlockchainReactor) syncRoutine() {
215 statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
216 newTxCh := bcr.txPool.GetNewTxCh()
220 case blockHash := <-bcr.newBlockCh:
221 block, err := bcr.chain.GetBlockByHash(blockHash)
223 log.Errorf("Error get block from newBlockCh %v", err)
225 log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
226 case newTx := <-newTxCh:
227 bcr.txFeedTracker.TxFilter(newTx)
228 go bcr.BroadcastTransaction(newTx)
229 case _ = <-statusUpdateTicker.C:
230 go bcr.BroadcastStatusResponse()
232 if bcr.miningEnable {
233 // mining if and only if block sync is finished
234 if bcr.blockKeeper.IsCaughtUp() {
246 // BroadcastStatusResponse broadcasts `BlockStore` height.
247 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
248 block := bcr.chain.BestBlock()
249 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
252 // BroadcastTransaction broadcats `BlockStore` transaction.
253 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
254 msg, err := NewTransactionNotifyMessage(tx)
258 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})