11 wire "github.com/tendermint/go-wire"
12 "github.com/bytom/p2p"
13 "github.com/bytom/types"
14 "github.com/bytom/protocol/bc/legacy"
15 "github.com/bytom/protocol"
16 "github.com/bytom/blockchain/query"
17 "github.com/bytom/encoding/json"
18 cmn "github.com/tendermint/tmlibs/common"
19 "github.com/bytom/blockchain/txdb"
20 "github.com/bytom/blockchain/account"
21 "github.com/bytom/blockchain/asset"
22 "github.com/bytom/blockchain/txfeed"
23 "github.com/bytom/log"
24 //"github.com/bytom/net/http/gzip"
25 "github.com/bytom/net/http/httpjson"
26 //"github.com/bytom/net/http/limit"
27 "github.com/bytom/net/http/static"
28 "github.com/bytom/generated/dashboard"
29 "github.com/bytom/errors"
30 "github.com/bytom/blockchain/txbuilder"
34 // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
35 BlockchainChannel = byte(0x40)
37 defaultChannelCapacity = 100
38 defaultSleepIntervalMS = 500
39 trySyncIntervalMS = 100
40 // stop syncing when last block's time is
41 // within this much of the system time.
42 // stopSyncingDurationMinutes = 10
44 // ask for best height every 10s
45 statusUpdateIntervalSeconds = 10
46 // check if we should switch to consensus reactor
47 switchToConsensusIntervalSeconds = 1
48 maxBlockchainResponseSize = 22020096 + 2
49 crosscoreRPCPrefix = "/rpc/"
52 // BlockchainReactor handles long-term catchup syncing.
53 type BlockchainReactor struct {
58 accounts *account.Manager
59 assets *asset.Registry
60 txFeeds *txfeed.TxFeed
61 indexer *query.Indexer
66 requestsCh chan BlockRequest
67 timeoutsCh chan string
68 submitter txbuilder.Submitter
70 evsw types.EventSwitch
73 func batchRecover(ctx context.Context, v *interface{}) {
74 if r := recover(); r != nil {
76 if recoveredErr, ok := r.(error); ok {
79 err = fmt.Errorf("panic with %T", r)
81 err = errors.Wrap(err)
88 // Convert errors into error responses (including errors
89 // from recovered panics above).
90 if err, ok := (*v).(error); ok {
91 errorFormatter.Log(ctx, err)
92 *v = errorFormatter.Format(err)
96 func jsonHandler(f interface{}) http.Handler {
97 h, err := httpjson.Handler(f, errorFormatter.Write)
104 func alwaysError(err error) http.Handler {
105 return jsonHandler(func() error { return err })
108 func (bcr *BlockchainReactor) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
109 bcr.handler.ServeHTTP(rw, req)
112 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
113 //if a.config == nil {
115 log.Printf(ctx, "-------info-----")
116 return map[string]interface{}{
117 "is_configured": false,
119 "build_commit": "----",
120 "build_date": "------",
121 "build_config": "---------",
126 func (bcr *BlockchainReactor) createblockkey(ctx context.Context) {
127 log.Printf(ctx,"creat-block-key")
130 func webAssetsHandler(next http.Handler) http.Handler {
131 mux := http.NewServeMux()
132 mux.Handle("/dashboard/", http.StripPrefix("/dashboard/", static.Handler{
133 Assets: dashboard.Files,
134 Default: "index.html",
136 mux.Handle("/", next)
140 func maxBytes(h http.Handler) http.Handler {
141 const maxReqSize = 1e7 // 10MB
142 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
143 // A block can easily be bigger than maxReqSize, but everything
144 // else should be pretty small.
145 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
146 req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
152 func (bcr *BlockchainReactor) BuildHander() {
154 m.Handle("/create-account", jsonHandler(bcr.createAccount))
155 m.Handle("/create-asset", jsonHandler(bcr.createAsset))
156 m.Handle("/update-account-tags",jsonHandler(bcr.updateAccountTags))
157 m.Handle("/update-asset-tags",jsonHandler(bcr.updateAssetTags))
158 m.Handle("/build-transaction", jsonHandler(bcr.build))
159 m.Handle("/create-control-program",jsonHandler(bcr.createControlProgram))
160 m.Handle("/create-account-receiver", jsonHandler(bcr.createAccountReceiver))
161 m.Handle("/create-transaction-feed", jsonHandler(bcr.createTxFeed))
162 m.Handle("/get-transaction-feed", jsonHandler(bcr.getTxFeed))
163 m.Handle("/update-transaction-feed", jsonHandler(bcr.updateTxFeed))
164 m.Handle("/delete-transaction-feed", jsonHandler(bcr.deleteTxFeed))
165 m.Handle("/list-accounts", jsonHandler(bcr.listAccounts))
166 m.Handle("/list-assets", jsonHandler(bcr.listAssets))
167 m.Handle("/list-transaction-feeds", jsonHandler(bcr.listTxFeeds))
168 m.Handle("/list-transactions", jsonHandler(bcr.listTransactions))
169 m.Handle("/list-balances", jsonHandler(bcr.listBalances))
170 m.Handle("/list-unspent-outputs", jsonHandler(bcr.listUnspentOutputs))
171 m.Handle("/", alwaysError(errors.New("not Found")))
172 m.Handle("/info", jsonHandler(bcr.info))
173 m.Handle("/create-block-key", jsonHandler(bcr.createblockkey))
175 latencyHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
176 if l := latency(m, req); l != nil {
177 defer l.RecordSince(time.Now())
181 handler := maxBytes(latencyHandler) // TODO(tessr): consider moving this to non-core specific mux
182 handler = webAssetsHandler(handler)
183 /* handler = healthHandler(handler)
184 for _, l := range a.requestLimits {
185 handler = limit.Handler(handler, alwaysError(errRateLimited), l.perSecond, l.burst, l.key)
187 handler = gzip.Handler{Handler: handler}
188 handler = coreCounter(handler)
189 handler = timeoutContextHandler(handler)
190 if a.config != nil && a.config.BlockchainId != nil {
191 handler = blockchainIDHandler(handler, a.config.BlockchainId.String())
194 bcr.handler = handler
197 // Used as a request object for api queries
198 type requestQuery struct {
199 Filter string `json:"filter,omitempty"`
200 FilterParams []interface{} `json:"filter_params,omitempty"`
201 SumBy []string `json:"sum_by,omitempty"`
202 PageSize int `json:"page_size"`
204 // AscLongPoll and Timeout are used by /list-transactions
205 // to facilitate notifications.
206 AscLongPoll bool `json:"ascending_with_long_poll,omitempty"`
207 Timeout json.Duration `json:"timeout"`
209 // After is a completely opaque cursor, indicating that only
210 // items in the result set after the one identified by `After`
211 // should be included. It has no relationship to time.
212 After string `json:"after"`
214 // These two are used for time-range queries like /list-transactions
215 StartTimeMS uint64 `json:"start_time,omitempty"`
216 EndTimeMS uint64 `json:"end_time,omitempty"`
218 // This is used for point-in-time queries like /list-balances
219 // TODO(bobg): Different request structs for endpoints with different needs
220 TimestampMS uint64 `json:"timestamp,omitempty"`
222 // This is used for filtering results from /list-access-tokens
223 // Value must be "client" or "network"
224 Type string `json:"type"`
226 // Aliases is used to filter results from /mockshm/list-keys
227 Aliases []string `json:"aliases,omitempty"`
230 // Used as a response object for api queries
232 Items interface{} `json:"items"`
233 Next requestQuery `json:"next"`
234 LastPage bool `json:"last_page"`
237 func NewBlockchainReactor(store *txdb.Store, chain *protocol.Chain, accounts *account.Manager, assets *asset.Registry, fastSync bool) *BlockchainReactor {
238 requestsCh := make(chan BlockRequest, defaultChannelCapacity)
239 timeoutsCh := make(chan string, defaultChannelCapacity)
240 pool := NewBlockPool(
245 bcR := &BlockchainReactor {
251 mux: http.NewServeMux(),
253 requestsCh: requestsCh,
254 timeoutsCh: timeoutsCh,
256 bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
260 // OnStart implements BaseService
261 func (bcR *BlockchainReactor) OnStart() error {
262 bcR.BaseReactor.OnStart()
265 _, err := bcR.pool.Start()
274 // OnStop implements BaseService
275 func (bcR *BlockchainReactor) OnStop() {
276 bcR.BaseReactor.OnStop()
279 // GetChannels implements Reactor
280 func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
281 return []*p2p.ChannelDescriptor{
282 &p2p.ChannelDescriptor{
283 ID: BlockchainChannel,
285 SendQueueCapacity: 100,
290 // AddPeer implements Reactor by sending our state to peer.
291 func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
292 if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) {
293 // doing nothing, will try later in `poolRoutine`
297 // RemovePeer implements Reactor by removing peer from the pool.
298 func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
299 bcR.pool.RemovePeer(peer.Key)
302 // Receive implements Reactor by handling 4 types of messages (look below).
303 func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
304 _, msg, err := DecodeMessage(msgBytes)
306 bcR.Logger.Error("Error decoding message", "error", err)
310 bcR.Logger.Info("Receive", "src", src, "chID", chID, "msg", msg)
312 switch msg := msg.(type) {
313 case *bcBlockRequestMessage:
314 // Got a request for a block. Respond with block if we have it.
315 block, _:= bcR.store.GetBlock(msg.Height)
317 msg := &bcBlockResponseMessage{Block: block}
318 queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
320 // queue is full, just ignore.
323 // TODO peer is asking for things we don't have.
325 case *bcBlockResponseMessage:
327 bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
328 case *bcStatusRequestMessage:
329 // Send peer our state.
330 queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
334 case *bcStatusResponseMessage:
335 // Got a peer status. Unverified.
336 bcR.pool.SetPeerHeight(src.Key, msg.Height)
338 bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
343 // Handle messages from the poolReactor telling the reactor what to do.
344 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
345 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
346 func (bcR *BlockchainReactor) poolRoutine() {
348 trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
349 statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
350 //switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
355 case request := <-bcR.requestsCh: // chan BlockRequest
356 peer := bcR.Switch.Peers().Get(request.PeerID)
358 continue FOR_LOOP // Peer has since been disconnected.
360 msg := &bcBlockRequestMessage{request.Height}
361 queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
363 // We couldn't make the request, send-queue full.
364 // The pool handles timeouts, just let it go.
367 case peerID := <-bcR.timeoutsCh: // chan string
369 peer := bcR.Switch.Peers().Get(peerID)
371 bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
373 case _ = <-statusUpdateTicker.C:
374 // ask for status updates
375 go bcR.BroadcastStatusRequest()
376 /*case _ = <-switchToConsensusTicker.C:
377 height, numPending, _ := bcR.pool.GetStatus()
378 outbound, inbound, _ := bcR.Switch.NumPeers()
379 bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
380 "outbound", outbound, "inbound", inbound)
381 if bcR.pool.IsCaughtUp() {
382 bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
385 conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
386 conR.SwitchToConsensus(bcR.state)
390 case _ = <-trySyncTicker.C: // chan time
391 // This loop can be slow as long as it's doing syncing work.
393 for i := 0; i < 10; i++ {
394 // See if there are any blocks to sync.
395 first, second := bcR.pool.PeekTwoBlocks()
396 bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
397 if first == nil || second == nil {
398 // We need both to sync the first block.
401 bcR.pool.PopRequest()
402 bcR.store.SaveBlock(first)
411 // BroadcastStatusRequest broadcasts `BlockStore` height.
412 func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
413 bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
419 // SetEventSwitch implements events.Eventable
420 func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
425 //-----------------------------------------------------------------------------
429 msgTypeBlockRequest = byte(0x10)
430 msgTypeBlockResponse = byte(0x11)
431 msgTypeStatusResponse = byte(0x20)
432 msgTypeStatusRequest = byte(0x21)
435 // BlockchainMessage is a generic message for this reactor.
436 type BlockchainMessage interface{}
438 var _ = wire.RegisterInterface(
439 struct{ BlockchainMessage }{},
440 wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
441 wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
442 wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
443 wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
446 // DecodeMessage decodes BlockchainMessage.
447 // TODO: ensure that bz is completely read.
448 func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
451 r := bytes.NewReader(bz)
452 msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
453 if err != nil && n != len(bz) {
454 err = errors.New("DecodeMessage() had bytes left over")
459 //-----------------------------------
461 type bcBlockRequestMessage struct {
465 func (m *bcBlockRequestMessage) String() string {
466 return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
469 //-------------------------------------
471 // NOTE: keep up-to-date with maxBlockchainResponseSize
472 type bcBlockResponseMessage struct {
476 func (m *bcBlockResponseMessage) String() string {
477 return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height)
480 //-------------------------------------
482 type bcStatusRequestMessage struct {
486 func (m *bcStatusRequestMessage) String() string {
487 return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
490 //-------------------------------------
492 type bcStatusResponseMessage struct {
496 func (m *bcStatusResponseMessage) String() string {
497 return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)