9 log "github.com/sirupsen/logrus"
10 cmn "github.com/tendermint/tmlibs/common"
12 "github.com/bytom/blockchain/accesstoken"
13 "github.com/bytom/blockchain/account"
14 "github.com/bytom/blockchain/asset"
15 "github.com/bytom/blockchain/pseudohsm"
16 "github.com/bytom/blockchain/txfeed"
17 "github.com/bytom/blockchain/wallet"
18 "github.com/bytom/mining/cpuminer"
19 "github.com/bytom/mining/miningpool"
20 "github.com/bytom/p2p"
21 "github.com/bytom/p2p/trust"
22 "github.com/bytom/protocol"
23 "github.com/bytom/protocol/bc"
24 protocolTypes "github.com/bytom/protocol/bc/types"
25 "github.com/bytom/types"
29 // BlockchainChannel is a channel for blocks and status updates
30 BlockchainChannel = byte(0x40)
31 maxNewBlockChSize = int(1024)
33 statusUpdateIntervalSeconds = 10
34 maxBlockchainResponseSize = 22020096 + 2
35 crosscoreRPCPrefix = "/rpc/"
39 // SUCCESS indicates the rpc calling is successful.
41 // FAIL indicated the rpc calling is failed.
45 // Response describes the response standard.
46 type Response struct {
47 Status string `json:"status,omitempty"`
48 Msg string `json:"msg,omitempty"`
49 Data interface{} `json:"data,omitempty"`
52 //NewSuccessResponse success response
53 func NewSuccessResponse(data interface{}) Response {
54 return Response{Status: SUCCESS, Data: data}
57 //NewErrorResponse error response
58 func NewErrorResponse(err error) Response {
59 return Response{Status: FAIL, Msg: err.Error()}
62 //BlockchainReactor handles long-term catchup syncing.
63 type BlockchainReactor struct {
68 accounts *account.Manager
69 assets *asset.Registry
70 accessTokens *accesstoken.CredentialStore
71 txFeedTracker *txfeed.Tracker
72 blockKeeper *blockKeeper
73 txPool *protocol.TxPool
75 mining *cpuminer.CPUMiner
76 miningPool *miningpool.MiningPool
80 evsw types.EventSwitch
81 newBlockCh chan *bc.Hash
85 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
86 return map[string]interface{}{
87 "is_configured": false,
89 "build_commit": "----",
90 "build_date": "------",
91 "build_config": "---------",
95 func maxBytes(h http.Handler) http.Handler {
96 const maxReqSize = 1e7 // 10MB
97 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
98 // A block can easily be bigger than maxReqSize, but everything
99 // else should be pretty small.
100 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
101 req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
107 // NewBlockchainReactor returns the reactor of whole blockchain.
108 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, accessTokens *accesstoken.CredentialStore, miningEnable bool) *BlockchainReactor {
109 newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
110 bcr := &BlockchainReactor{
115 blockKeeper: newBlockKeeper(chain, sw),
117 mining: cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh),
118 miningPool: miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh),
119 mux: http.NewServeMux(),
122 txFeedTracker: txfeeds,
123 accessTokens: accessTokens,
124 miningEnable: miningEnable,
125 newBlockCh: newBlockCh,
127 bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
131 // OnStart implements BaseService
132 func (bcr *BlockchainReactor) OnStart() error {
133 bcr.BaseReactor.OnStart()
136 if bcr.miningEnable {
143 // OnStop implements BaseService
144 func (bcr *BlockchainReactor) OnStop() {
145 bcr.BaseReactor.OnStop()
146 if bcr.miningEnable {
149 bcr.blockKeeper.Stop()
152 // GetChannels implements Reactor
153 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
154 return []*p2p.ChannelDescriptor{
156 ID: BlockchainChannel,
158 SendQueueCapacity: 100,
163 // AddPeer implements Reactor by sending our state to peer.
164 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
165 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
168 // RemovePeer implements Reactor by removing peer from the pool.
169 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
170 bcr.blockKeeper.RemovePeer(peer.Key)
173 // Receive implements Reactor by handling 4 types of messages (look below).
174 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
175 var tm *trust.TrustMetric
176 key := src.Connection().RemoteAddress.IP.String()
177 if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
178 log.Errorf("Can't get peer trust metric")
182 _, msg, err := DecodeMessage(msgBytes)
184 log.Errorf("Error decoding messagek %v", err)
187 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
189 switch msg := msg.(type) {
190 case *BlockRequestMessage:
191 var block *protocolTypes.Block
194 block, err = bcr.chain.GetBlockByHeight(msg.Height)
196 block, err = bcr.chain.GetBlockByHash(msg.GetHash())
199 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
202 response, err := NewBlockResponseMessage(block)
204 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
207 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
209 case *BlockResponseMessage:
210 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
212 case *StatusRequestMessage:
213 block := bcr.chain.BestBlock()
214 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
216 case *StatusResponseMessage:
217 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
219 case *TransactionNotifyMessage:
220 tx := msg.GetTransaction()
221 if err := bcr.chain.ValidateTx(tx); err != nil {
222 bcr.sw.AddScamPeer(src)
226 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
230 // Handle messages from the poolReactor telling the reactor what to do.
231 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
232 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
233 func (bcr *BlockchainReactor) syncRoutine() {
234 statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
235 newTxCh := bcr.txPool.GetNewTxCh()
239 case blockHash := <-bcr.newBlockCh:
240 block, err := bcr.chain.GetBlockByHash(blockHash)
242 log.Errorf("Error get block from newBlockCh %v", err)
244 log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
245 case newTx := <-newTxCh:
246 bcr.txFeedTracker.TxFilter(newTx)
247 go bcr.BroadcastTransaction(newTx)
248 case _ = <-statusUpdateTicker.C:
249 go bcr.BroadcastStatusResponse()
251 if bcr.miningEnable {
252 // mining if and only if block sync is finished
253 if bcr.blockKeeper.IsCaughtUp() {
265 // BroadcastStatusResponse broadcasts `BlockStore` height.
266 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
267 block := bcr.chain.BestBlock()
268 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
271 // BroadcastTransaction broadcats `BlockStore` transaction.
272 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
273 msg, err := NewTransactionNotifyMessage(tx)
277 bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})