OSDN Git Service

init push for add newBlockCh for p2p (#443)
[bytom/bytom.git] / blockchain / reactor.go
1 package blockchain
2
3 import (
4         "context"
5         "net/http"
6         "reflect"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10         cmn "github.com/tendermint/tmlibs/common"
11
12         "github.com/bytom/blockchain/accesstoken"
13         "github.com/bytom/blockchain/account"
14         "github.com/bytom/blockchain/asset"
15         "github.com/bytom/blockchain/pseudohsm"
16         "github.com/bytom/blockchain/txfeed"
17         "github.com/bytom/blockchain/wallet"
18         "github.com/bytom/mining/cpuminer"
19         "github.com/bytom/mining/miningpool"
20         "github.com/bytom/p2p"
21         "github.com/bytom/p2p/trust"
22         "github.com/bytom/protocol"
23         "github.com/bytom/protocol/bc"
24         protocolTypes "github.com/bytom/protocol/bc/types"
25         "github.com/bytom/types"
26 )
27
28 const (
29         // BlockchainChannel is a channel for blocks and status updates
30         BlockchainChannel = byte(0x40)
31         maxNewBlockChSize = int(1024)
32
33         statusUpdateIntervalSeconds = 10
34         maxBlockchainResponseSize   = 22020096 + 2
35         crosscoreRPCPrefix          = "/rpc/"
36 )
37
38 const (
39         // SUCCESS indicates the rpc calling is successful.
40         SUCCESS = "success"
41         // FAIL indicated the rpc calling is failed.
42         FAIL = "fail"
43 )
44
45 // Response describes the response standard.
46 type Response struct {
47         Status string      `json:"status,omitempty"`
48         Msg    string      `json:"msg,omitempty"`
49         Data   interface{} `json:"data,omitempty"`
50 }
51
52 //NewSuccessResponse success response
53 func NewSuccessResponse(data interface{}) Response {
54         return Response{Status: SUCCESS, Data: data}
55 }
56
57 //NewErrorResponse error response
58 func NewErrorResponse(err error) Response {
59         return Response{Status: FAIL, Msg: err.Error()}
60 }
61
62 //BlockchainReactor handles long-term catchup syncing.
63 type BlockchainReactor struct {
64         p2p.BaseReactor
65
66         chain         *protocol.Chain
67         wallet        *wallet.Wallet
68         accounts      *account.Manager
69         assets        *asset.Registry
70         accessTokens  *accesstoken.CredentialStore
71         txFeedTracker *txfeed.Tracker
72         blockKeeper   *blockKeeper
73         txPool        *protocol.TxPool
74         hsm           *pseudohsm.HSM
75         mining        *cpuminer.CPUMiner
76         miningPool    *miningpool.MiningPool
77         mux           *http.ServeMux
78         sw            *p2p.Switch
79         handler       http.Handler
80         evsw          types.EventSwitch
81         newBlockCh    chan *bc.Hash
82         miningEnable  bool
83 }
84
85 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
86         return map[string]interface{}{
87                 "is_configured": false,
88                 "version":       "0.001",
89                 "build_commit":  "----",
90                 "build_date":    "------",
91                 "build_config":  "---------",
92         }, nil
93 }
94
95 func maxBytes(h http.Handler) http.Handler {
96         const maxReqSize = 1e7 // 10MB
97         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
98                 // A block can easily be bigger than maxReqSize, but everything
99                 // else should be pretty small.
100                 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
101                         req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
102                 }
103                 h.ServeHTTP(w, req)
104         })
105 }
106
107 // NewBlockchainReactor returns the reactor of whole blockchain.
108 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, accessTokens *accesstoken.CredentialStore, miningEnable bool) *BlockchainReactor {
109         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
110         bcr := &BlockchainReactor{
111                 chain:         chain,
112                 wallet:        wallet,
113                 accounts:      accounts,
114                 assets:        assets,
115                 blockKeeper:   newBlockKeeper(chain, sw),
116                 txPool:        txPool,
117                 mining:        cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh),
118                 miningPool:    miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh),
119                 mux:           http.NewServeMux(),
120                 sw:            sw,
121                 hsm:           hsm,
122                 txFeedTracker: txfeeds,
123                 accessTokens:  accessTokens,
124                 miningEnable:  miningEnable,
125                 newBlockCh:    newBlockCh,
126         }
127         bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
128         return bcr
129 }
130
131 // OnStart implements BaseService
132 func (bcr *BlockchainReactor) OnStart() error {
133         bcr.BaseReactor.OnStart()
134         bcr.BuildHandler()
135
136         if bcr.miningEnable {
137                 bcr.mining.Start()
138         }
139         go bcr.syncRoutine()
140         return nil
141 }
142
143 // OnStop implements BaseService
144 func (bcr *BlockchainReactor) OnStop() {
145         bcr.BaseReactor.OnStop()
146         if bcr.miningEnable {
147                 bcr.mining.Stop()
148         }
149         bcr.blockKeeper.Stop()
150 }
151
152 // GetChannels implements Reactor
153 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
154         return []*p2p.ChannelDescriptor{
155                 {
156                         ID:                BlockchainChannel,
157                         Priority:          5,
158                         SendQueueCapacity: 100,
159                 },
160         }
161 }
162
163 // AddPeer implements Reactor by sending our state to peer.
164 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
165         peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
166 }
167
168 // RemovePeer implements Reactor by removing peer from the pool.
169 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
170         bcr.blockKeeper.RemovePeer(peer.Key)
171 }
172
173 // Receive implements Reactor by handling 4 types of messages (look below).
174 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
175         var tm *trust.TrustMetric
176         key := src.Connection().RemoteAddress.IP.String()
177         if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
178                 log.Errorf("Can't get peer trust metric")
179                 return
180         }
181
182         _, msg, err := DecodeMessage(msgBytes)
183         if err != nil {
184                 log.Errorf("Error decoding messagek %v", err)
185                 return
186         }
187         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
188
189         switch msg := msg.(type) {
190         case *BlockRequestMessage:
191                 var block *protocolTypes.Block
192                 var err error
193                 if msg.Height != 0 {
194                         block, err = bcr.chain.GetBlockByHeight(msg.Height)
195                 } else {
196                         block, err = bcr.chain.GetBlockByHash(msg.GetHash())
197                 }
198                 if err != nil {
199                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
200                         return
201                 }
202                 response, err := NewBlockResponseMessage(block)
203                 if err != nil {
204                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
205                         return
206                 }
207                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
208
209         case *BlockResponseMessage:
210                 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
211
212         case *StatusRequestMessage:
213                 block := bcr.chain.BestBlock()
214                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
215
216         case *StatusResponseMessage:
217                 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
218
219         case *TransactionNotifyMessage:
220                 tx := msg.GetTransaction()
221                 if err := bcr.chain.ValidateTx(tx); err != nil {
222                         bcr.sw.AddScamPeer(src)
223                 }
224
225         default:
226                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
227         }
228 }
229
230 // Handle messages from the poolReactor telling the reactor what to do.
231 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
232 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
233 func (bcr *BlockchainReactor) syncRoutine() {
234         statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
235         newTxCh := bcr.txPool.GetNewTxCh()
236
237         for {
238                 select {
239                 case blockHash := <-bcr.newBlockCh:
240                         block, err := bcr.chain.GetBlockByHash(blockHash)
241                         if err != nil {
242                                 log.Errorf("Error get block from newBlockCh %v", err)
243                         }
244                         log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
245                 case newTx := <-newTxCh:
246                         bcr.txFeedTracker.TxFilter(newTx)
247                         go bcr.BroadcastTransaction(newTx)
248                 case _ = <-statusUpdateTicker.C:
249                         go bcr.BroadcastStatusResponse()
250
251                         if bcr.miningEnable {
252                                 // mining if and only if block sync is finished
253                                 if bcr.blockKeeper.IsCaughtUp() {
254                                         bcr.mining.Start()
255                                 } else {
256                                         bcr.mining.Stop()
257                                 }
258                         }
259                 case <-bcr.Quit:
260                         return
261                 }
262         }
263 }
264
265 // BroadcastStatusResponse broadcasts `BlockStore` height.
266 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
267         block := bcr.chain.BestBlock()
268         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
269 }
270
271 // BroadcastTransaction broadcats `BlockStore` transaction.
272 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
273         msg, err := NewTransactionNotifyMessage(tx)
274         if err != nil {
275                 return err
276         }
277         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})
278         return nil
279 }