OSDN Git Service

Revert "remove rpc package (#474)" (#475)
[bytom/bytom.git] / blockchain / reactor.go
1 package blockchain
2
3 import (
4         "context"
5         "net/http"
6         "reflect"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10         cmn "github.com/tendermint/tmlibs/common"
11
12         "github.com/bytom/blockchain/txfeed"
13         "github.com/bytom/wallet"
14         "github.com/bytom/mining/cpuminer"
15         "github.com/bytom/mining/miningpool"
16         "github.com/bytom/p2p"
17         "github.com/bytom/p2p/trust"
18         "github.com/bytom/protocol"
19         "github.com/bytom/protocol/bc"
20         protocolTypes "github.com/bytom/protocol/bc/types"
21         "github.com/bytom/types"
22 )
23
24 const (
25         // BlockchainChannel is a channel for blocks and status updates
26         BlockchainChannel = byte(0x40)
27         maxNewBlockChSize = int(1024)
28
29         statusUpdateIntervalSeconds = 10
30         maxBlockchainResponseSize   = 22020096 + 2
31         crosscoreRPCPrefix          = "/rpc/"
32 )
33
34 const (
35         // SUCCESS indicates the rpc calling is successful.
36         SUCCESS = "success"
37         // FAIL indicated the rpc calling is failed.
38         FAIL = "fail"
39 )
40
41 // Response describes the response standard.
42 type Response struct {
43         Status string      `json:"status,omitempty"`
44         Msg    string      `json:"msg,omitempty"`
45         Data   interface{} `json:"data,omitempty"`
46 }
47
48 //BlockchainReactor handles long-term catchup syncing.
49 type BlockchainReactor struct {
50         p2p.BaseReactor
51
52         chain         *protocol.Chain
53         wallet        *wallet.Wallet
54         txFeedTracker *txfeed.Tracker
55         blockKeeper   *blockKeeper
56         txPool        *protocol.TxPool
57         mining        *cpuminer.CPUMiner
58         miningPool    *miningpool.MiningPool
59         sw            *p2p.Switch
60         evsw          types.EventSwitch
61         newBlockCh    chan *bc.Hash
62         miningEnable  bool
63 }
64
65 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
66         return map[string]interface{}{
67                 "is_configured": false,
68                 "version":       "0.001",
69                 "build_commit":  "----",
70                 "build_date":    "------",
71                 "build_config":  "---------",
72         }, nil
73 }
74
75 func maxBytes(h http.Handler) http.Handler {
76         const maxReqSize = 1e7 // 10MB
77         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
78                 // A block can easily be bigger than maxReqSize, but everything
79                 // else should be pretty small.
80                 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
81                         req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
82                 }
83                 h.ServeHTTP(w, req)
84         })
85 }
86
87 // NewBlockchainReactor returns the reactor of whole blockchain.
88 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
89         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
90         bcr := &BlockchainReactor{
91                 chain:         chain,
92                 wallet:        wallet,
93                 blockKeeper:   newBlockKeeper(chain, sw),
94                 txPool:        txPool,
95                 sw:            sw,
96                 txFeedTracker: txfeeds,
97                 miningEnable:  miningEnable,
98                 newBlockCh:    newBlockCh,
99         }
100
101         if wallet == nil {
102                 bcr.mining = cpuminer.NewCPUMiner(chain, nil, txPool, newBlockCh)
103                 bcr.miningPool = miningpool.NewMiningPool(chain, nil, txPool, newBlockCh)
104         } else {
105                 bcr.mining = cpuminer.NewCPUMiner(chain, wallet.AccountMgr, txPool, newBlockCh)
106                 bcr.miningPool = miningpool.NewMiningPool(chain, wallet.AccountMgr, txPool, newBlockCh)
107         }
108
109         bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
110         return bcr
111 }
112
113 // OnStart implements BaseService
114 func (bcr *BlockchainReactor) OnStart() error {
115         bcr.BaseReactor.OnStart()
116
117         if bcr.miningEnable {
118                 bcr.mining.Start()
119         }
120         go bcr.syncRoutine()
121         return nil
122 }
123
124 // OnStop implements BaseService
125 func (bcr *BlockchainReactor) OnStop() {
126         bcr.BaseReactor.OnStop()
127         if bcr.miningEnable {
128                 bcr.mining.Stop()
129         }
130         bcr.blockKeeper.Stop()
131 }
132
133 // GetChannels implements Reactor
134 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
135         return []*p2p.ChannelDescriptor{
136                 {
137                         ID:                BlockchainChannel,
138                         Priority:          5,
139                         SendQueueCapacity: 100,
140                 },
141         }
142 }
143
144 // AddPeer implements Reactor by sending our state to peer.
145 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
146         peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
147 }
148
149 // RemovePeer implements Reactor by removing peer from the pool.
150 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
151         bcr.blockKeeper.RemovePeer(peer.Key)
152 }
153
154 // Receive implements Reactor by handling 4 types of messages (look below).
155 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
156         var tm *trust.TrustMetric
157         key := src.Connection().RemoteAddress.IP.String()
158         if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
159                 log.Errorf("Can't get peer trust metric")
160                 return
161         }
162
163         _, msg, err := DecodeMessage(msgBytes)
164         if err != nil {
165                 log.Errorf("Error decoding messagek %v", err)
166                 return
167         }
168         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
169
170         switch msg := msg.(type) {
171         case *BlockRequestMessage:
172                 var block *protocolTypes.Block
173                 var err error
174                 if msg.Height != 0 {
175                         block, err = bcr.chain.GetBlockByHeight(msg.Height)
176                 } else {
177                         block, err = bcr.chain.GetBlockByHash(msg.GetHash())
178                 }
179                 if err != nil {
180                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
181                         return
182                 }
183                 response, err := NewBlockResponseMessage(block)
184                 if err != nil {
185                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
186                         return
187                 }
188                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
189
190         case *BlockResponseMessage:
191                 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
192
193         case *StatusRequestMessage:
194                 block := bcr.chain.BestBlock()
195                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
196
197         case *StatusResponseMessage:
198                 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
199
200         case *TransactionNotifyMessage:
201                 tx := msg.GetTransaction()
202                 if err := bcr.chain.ValidateTx(tx); err != nil {
203                         bcr.sw.AddScamPeer(src)
204                 }
205
206         default:
207                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
208         }
209 }
210
211 // Handle messages from the poolReactor telling the reactor what to do.
212 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
213 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
214 func (bcr *BlockchainReactor) syncRoutine() {
215         statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
216         newTxCh := bcr.txPool.GetNewTxCh()
217
218         for {
219                 select {
220                 case blockHash := <-bcr.newBlockCh:
221                         block, err := bcr.chain.GetBlockByHash(blockHash)
222                         if err != nil {
223                                 log.Errorf("Error get block from newBlockCh %v", err)
224                         }
225                         log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
226                 case newTx := <-newTxCh:
227                         bcr.txFeedTracker.TxFilter(newTx)
228                         go bcr.BroadcastTransaction(newTx)
229                 case _ = <-statusUpdateTicker.C:
230                         go bcr.BroadcastStatusResponse()
231
232                         if bcr.miningEnable {
233                                 // mining if and only if block sync is finished
234                                 if bcr.blockKeeper.IsCaughtUp() {
235                                         bcr.mining.Start()
236                                 } else {
237                                         bcr.mining.Stop()
238                                 }
239                         }
240                 case <-bcr.Quit:
241                         return
242                 }
243         }
244 }
245
246 // BroadcastStatusResponse broadcasts `BlockStore` height.
247 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
248         block := bcr.chain.BestBlock()
249         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
250 }
251
252 // BroadcastTransaction broadcats `BlockStore` transaction.
253 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
254         msg, err := NewTransactionNotifyMessage(tx)
255         if err != nil {
256                 return err
257         }
258         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})
259         return nil
260 }