OSDN Git Service

add query API
[bytom/bytom.git] / blockchain / reactor.go
1 package blockchain
2
3 import (
4         "bytes"
5         "context"
6         "reflect"
7     "time"
8         "net/http"
9         "fmt"
10
11         wire "github.com/tendermint/go-wire"
12         "github.com/bytom/p2p"
13         "github.com/bytom/types"
14     "github.com/bytom/protocol/bc/legacy"
15     "github.com/bytom/protocol"
16         "github.com/bytom/blockchain/query"
17         "github.com/bytom/encoding/json"
18         cmn "github.com/tendermint/tmlibs/common"
19         "github.com/bytom/blockchain/txdb"
20         "github.com/bytom/blockchain/account"
21         "github.com/bytom/blockchain/asset"
22         "github.com/bytom/blockchain/txfeed"
23         "github.com/bytom/log"
24         //"github.com/bytom/net/http/gzip"
25         "github.com/bytom/net/http/httpjson"
26         //"github.com/bytom/net/http/limit"
27         "github.com/bytom/net/http/static"
28         "github.com/bytom/generated/dashboard"
29         "github.com/bytom/errors"
30         "github.com/bytom/blockchain/txbuilder"
31 )
32
33 const (
34         // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
35         BlockchainChannel = byte(0x40)
36
37         defaultChannelCapacity = 100
38         defaultSleepIntervalMS = 500
39         trySyncIntervalMS      = 100
40         // stop syncing when last block's time is
41         // within this much of the system time.
42         // stopSyncingDurationMinutes = 10
43
44         // ask for best height every 10s
45         statusUpdateIntervalSeconds = 10
46         // check if we should switch to consensus reactor
47         switchToConsensusIntervalSeconds = 1
48         maxBlockchainResponseSize        = 22020096 + 2
49         crosscoreRPCPrefix = "/rpc/"
50 )
51
52 // BlockchainReactor handles long-term catchup syncing.
53 type BlockchainReactor struct {
54         p2p.BaseReactor
55
56         chain        *protocol.Chain
57         store        *txdb.Store
58         accounts         *account.Manager
59         assets       *asset.Registry
60         txFeeds          *txfeed.TxFeed
61         indexer         *query.Indexer
62         pool         *BlockPool
63         mux          *http.ServeMux
64         handler      http.Handler
65         fastSync     bool
66         requestsCh   chan BlockRequest
67         timeoutsCh   chan string
68         submitter    txbuilder.Submitter
69
70         evsw types.EventSwitch
71 }
72
73 func batchRecover(ctx context.Context, v *interface{}) {
74         if r := recover(); r != nil {
75                 var err error
76                 if recoveredErr, ok := r.(error); ok {
77                         err = recoveredErr
78                 } else {
79                         err = fmt.Errorf("panic with %T", r)
80                 }
81                 err = errors.Wrap(err)
82                 *v = err
83         }
84
85         if *v == nil {
86                 return
87         }
88         // Convert errors into error responses (including errors
89         // from recovered panics above).
90         if err, ok := (*v).(error); ok {
91                 errorFormatter.Log(ctx, err)
92                 *v = errorFormatter.Format(err)
93         }
94 }
95
96 func jsonHandler(f interface{}) http.Handler {
97     h, err := httpjson.Handler(f, errorFormatter.Write)
98         if err != nil {
99                 panic(err)
100         }
101         return h
102 }
103
104 func alwaysError(err error) http.Handler {
105         return jsonHandler(func() error { return err })
106 }
107
108 func (bcr *BlockchainReactor) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
109     bcr.handler.ServeHTTP(rw, req)
110 }
111
112 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
113     //if a.config == nil {
114                 // never configured
115         log.Printf(ctx, "-------info-----")
116         return map[string]interface{}{
117                 "is_configured": false,
118                 "version":       "0.001",
119                 "build_commit":  "----",
120                 "build_date":    "------",
121                 "build_config":  "---------",
122         }, nil
123         //}
124 }
125
126 func (bcr *BlockchainReactor) createblockkey(ctx context.Context) {
127         log.Printf(ctx,"creat-block-key")
128 }
129
130 func webAssetsHandler(next http.Handler) http.Handler {
131         mux := http.NewServeMux()
132         mux.Handle("/dashboard/", http.StripPrefix("/dashboard/", static.Handler{
133                 Assets:  dashboard.Files,
134                 Default: "index.html",
135         }))
136         mux.Handle("/", next)
137         return mux
138 }
139
140 func maxBytes(h http.Handler) http.Handler {
141     const maxReqSize = 1e7 // 10MB
142         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
143                 // A block can easily be bigger than maxReqSize, but everything
144                 // else should be pretty small.
145                 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
146                         req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
147                 }
148                 h.ServeHTTP(w, req)
149         })
150 }
151
152 func (bcr *BlockchainReactor) BuildHander() {
153         m := bcr.mux
154         m.Handle("/create-account", jsonHandler(bcr.createAccount))
155         m.Handle("/create-asset", jsonHandler(bcr.createAsset))
156         m.Handle("/update-account-tags",jsonHandler(bcr.updateAccountTags))
157         m.Handle("/update-asset-tags",jsonHandler(bcr.updateAssetTags))
158         m.Handle("/build-transaction", jsonHandler(bcr.build))
159         m.Handle("/create-control-program",jsonHandler(bcr.createControlProgram))
160         m.Handle("/create-account-receiver", jsonHandler(bcr.createAccountReceiver))
161         m.Handle("/create-transaction-feed", jsonHandler(bcr.createTxFeed))
162         m.Handle("/get-transaction-feed", jsonHandler(bcr.getTxFeed))
163         m.Handle("/update-transaction-feed", jsonHandler(bcr.updateTxFeed))
164         m.Handle("/delete-transaction-feed", jsonHandler(bcr.deleteTxFeed))
165         m.Handle("/list-accounts", jsonHandler(bcr.listAccounts))
166         m.Handle("/list-assets", jsonHandler(bcr.listAssets))
167         m.Handle("/list-transaction-feeds", jsonHandler(bcr.listTxFeeds))
168         m.Handle("/list-transactions", jsonHandler(bcr.listTransactions))
169         m.Handle("/list-balances", jsonHandler(bcr.listBalances))
170         m.Handle("/list-unspent-outputs", jsonHandler(bcr.listUnspentOutputs))
171         m.Handle("/", alwaysError(errors.New("not Found")))
172         m.Handle("/info", jsonHandler(bcr.info))
173         m.Handle("/create-block-key", jsonHandler(bcr.createblockkey))
174
175     latencyHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
176                 if l := latency(m, req); l != nil {
177                         defer l.RecordSince(time.Now())
178                 }
179                 m.ServeHTTP(w, req)
180                 })
181         handler := maxBytes(latencyHandler) // TODO(tessr): consider moving this to non-core specific mux
182         handler = webAssetsHandler(handler)
183 /*      handler = healthHandler(handler)
184         for _, l := range a.requestLimits {
185                 handler = limit.Handler(handler, alwaysError(errRateLimited), l.perSecond, l.burst, l.key)
186         }
187         handler = gzip.Handler{Handler: handler}
188         handler = coreCounter(handler)
189         handler = timeoutContextHandler(handler)
190         if a.config != nil && a.config.BlockchainId != nil {
191                 handler = blockchainIDHandler(handler, a.config.BlockchainId.String())
192         }
193         */
194         bcr.handler = handler
195 }
196
197 // Used as a request object for api queries
198 type requestQuery struct {
199         Filter       string        `json:"filter,omitempty"`
200         FilterParams []interface{} `json:"filter_params,omitempty"`
201         SumBy        []string      `json:"sum_by,omitempty"`
202         PageSize     int           `json:"page_size"`
203
204         // AscLongPoll and Timeout are used by /list-transactions
205         // to facilitate notifications.
206         AscLongPoll bool          `json:"ascending_with_long_poll,omitempty"`
207         Timeout     json.Duration `json:"timeout"`
208
209         // After is a completely opaque cursor, indicating that only
210         // items in the result set after the one identified by `After`
211         // should be included. It has no relationship to time.
212         After string `json:"after"`
213
214         // These two are used for time-range queries like /list-transactions
215         StartTimeMS uint64 `json:"start_time,omitempty"`
216         EndTimeMS   uint64 `json:"end_time,omitempty"`
217
218         // This is used for point-in-time queries like /list-balances
219         // TODO(bobg): Different request structs for endpoints with different needs
220         TimestampMS uint64 `json:"timestamp,omitempty"`
221
222         // This is used for filtering results from /list-access-tokens
223         // Value must be "client" or "network"
224         Type string `json:"type"`
225
226         // Aliases is used to filter results from /mockshm/list-keys
227         Aliases []string `json:"aliases,omitempty"`
228 }
229
230 // Used as a response object for api queries
231 type page struct {
232         Items    interface{}  `json:"items"`
233         Next     requestQuery `json:"next"`
234         LastPage bool         `json:"last_page"`
235 }
236
237 func NewBlockchainReactor(store *txdb.Store, chain *protocol.Chain, accounts *account.Manager, assets *asset.Registry, fastSync bool) *BlockchainReactor {
238     requestsCh    := make(chan BlockRequest, defaultChannelCapacity)
239     timeoutsCh    := make(chan string, defaultChannelCapacity)
240     pool := NewBlockPool(
241         store.Height()+1,
242         requestsCh,
243         timeoutsCh,
244     )
245     bcR := &BlockchainReactor {
246         chain:         chain,
247         store:         store,
248                 accounts:      accounts,
249                 assets:            assets,
250         pool:          pool,
251                 mux:           http.NewServeMux(),
252         fastSync:      fastSync,
253         requestsCh:    requestsCh,
254         timeoutsCh:   timeoutsCh,
255     }
256     bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
257     return bcR
258 }
259
260 // OnStart implements BaseService
261 func (bcR *BlockchainReactor) OnStart() error {
262         bcR.BaseReactor.OnStart()
263         bcR.BuildHander()
264     if bcR.fastSync {
265         _, err := bcR.pool.Start()
266         if err != nil {
267             return err
268         }
269         go bcR.poolRoutine()
270     }
271         return nil
272 }
273
274 // OnStop implements BaseService
275 func (bcR *BlockchainReactor) OnStop() {
276         bcR.BaseReactor.OnStop()
277 }
278
279 // GetChannels implements Reactor
280 func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
281         return []*p2p.ChannelDescriptor{
282                 &p2p.ChannelDescriptor{
283                         ID:                BlockchainChannel,
284                         Priority:          5,
285                         SendQueueCapacity: 100,
286                 },
287         }
288 }
289
290 // AddPeer implements Reactor by sending our state to peer.
291 func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
292         if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}}) {
293                 // doing nothing, will try later in `poolRoutine`
294         }
295 }
296
297 // RemovePeer implements Reactor by removing peer from the pool.
298 func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
299         bcR.pool.RemovePeer(peer.Key)
300 }
301
302 // Receive implements Reactor by handling 4 types of messages (look below).
303 func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
304         _, msg, err := DecodeMessage(msgBytes)
305         if err != nil {
306                 bcR.Logger.Error("Error decoding message", "error", err)
307                 return
308         }
309
310         bcR.Logger.Info("Receive", "src", src, "chID", chID, "msg", msg)
311
312         switch msg := msg.(type) {
313         case *bcBlockRequestMessage:
314                 // Got a request for a block. Respond with block if we have it.
315                 block, _:= bcR.store.GetBlock(msg.Height)
316                 if block != nil {
317                         msg := &bcBlockResponseMessage{Block: block}
318                         queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
319                         if !queued {
320                                 // queue is full, just ignore.
321                         }
322                 } else {
323                         // TODO peer is asking for things we don't have.
324                 }
325         case *bcBlockResponseMessage:
326                 // Got a block.
327                 bcR.pool.AddBlock(src.Key, msg.Block, len(msgBytes))
328         case *bcStatusRequestMessage:
329                 // Send peer our state.
330                 queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.store.Height()}})
331                 if !queued {
332                         // sorry
333                 }
334         case *bcStatusResponseMessage:
335                 // Got a peer status. Unverified.
336                 bcR.pool.SetPeerHeight(src.Key, msg.Height)
337         default:
338                 bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
339         }
340 }
341
342
343 // Handle messages from the poolReactor telling the reactor what to do.
344 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
345 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
346 func (bcR *BlockchainReactor) poolRoutine() {
347
348         trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
349         statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
350         //switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
351
352 FOR_LOOP:
353         for {
354                 select {
355                 case request := <-bcR.requestsCh: // chan BlockRequest
356                         peer := bcR.Switch.Peers().Get(request.PeerID)
357                         if peer == nil {
358                                 continue FOR_LOOP // Peer has since been disconnected.
359                         }
360                         msg := &bcBlockRequestMessage{request.Height}
361                         queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
362                         if !queued {
363                                 // We couldn't make the request, send-queue full.
364                                 // The pool handles timeouts, just let it go.
365                                 continue FOR_LOOP
366                         }
367                 case peerID := <-bcR.timeoutsCh: // chan string
368                         // Peer timed out.
369                         peer := bcR.Switch.Peers().Get(peerID)
370                         if peer != nil {
371                                 bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
372                         }
373                 case _ = <-statusUpdateTicker.C:
374                         // ask for status updates
375                         go bcR.BroadcastStatusRequest()
376                 /*case _ = <-switchToConsensusTicker.C:
377                         height, numPending, _ := bcR.pool.GetStatus()
378                         outbound, inbound, _ := bcR.Switch.NumPeers()
379                         bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
380                                 "outbound", outbound, "inbound", inbound)
381                         if bcR.pool.IsCaughtUp() {
382                                 bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
383                                 bcR.pool.Stop()
384
385                                 conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
386                                 conR.SwitchToConsensus(bcR.state)
387
388                                 break FOR_LOOP
389                         }*/
390                 case _ = <-trySyncTicker.C: // chan time
391                         // This loop can be slow as long as it's doing syncing work.
392                 SYNC_LOOP:
393                         for i := 0; i < 10; i++ {
394                                 // See if there are any blocks to sync.
395                                 first, second := bcR.pool.PeekTwoBlocks()
396                                 bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
397                                 if first == nil || second == nil {
398                                         // We need both to sync the first block.
399                                         break SYNC_LOOP
400                                 }
401                             bcR.pool.PopRequest()
402                 bcR.store.SaveBlock(first)
403                         }
404                         continue FOR_LOOP
405                 case <-bcR.Quit:
406                         break FOR_LOOP
407                 }
408         }
409 }
410
411 // BroadcastStatusRequest broadcasts `BlockStore` height.
412 func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
413         bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.store.Height()}})
414         return nil
415 }
416
417
418 /*
419 // SetEventSwitch implements events.Eventable
420 func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
421         bcR.evsw = evsw
422 }
423 */
424
425 //-----------------------------------------------------------------------------
426 // Messages
427
428 const (
429         msgTypeBlockRequest   = byte(0x10)
430         msgTypeBlockResponse  = byte(0x11)
431         msgTypeStatusResponse = byte(0x20)
432         msgTypeStatusRequest  = byte(0x21)
433 )
434
435 // BlockchainMessage is a generic message for this reactor.
436 type BlockchainMessage interface{}
437
438 var _ = wire.RegisterInterface(
439         struct{ BlockchainMessage }{},
440         wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
441         wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
442         wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
443         wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
444 )
445
446 // DecodeMessage decodes BlockchainMessage.
447 // TODO: ensure that bz is completely read.
448 func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
449         msgType = bz[0]
450         n := int(0)
451         r := bytes.NewReader(bz)
452         msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
453         if err != nil && n != len(bz) {
454                 err = errors.New("DecodeMessage() had bytes left over")
455         }
456         return
457 }
458
459 //-----------------------------------
460
461 type bcBlockRequestMessage struct {
462         Height uint64
463 }
464
465 func (m *bcBlockRequestMessage) String() string {
466         return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
467 }
468
469 //-------------------------------------
470
471 // NOTE: keep up-to-date with maxBlockchainResponseSize
472 type bcBlockResponseMessage struct {
473         Block *legacy.Block
474 }
475
476 func (m *bcBlockResponseMessage) String() string {
477         return cmn.Fmt("[bcBlockResponseMessage %v]", m.Block.Height)
478 }
479
480 //-------------------------------------
481
482 type bcStatusRequestMessage struct {
483         Height uint64
484 }
485
486 func (m *bcStatusRequestMessage) String() string {
487         return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
488 }
489
490 //-------------------------------------
491
492 type bcStatusResponseMessage struct {
493         Height uint64
494 }
495
496 func (m *bcStatusResponseMessage) String() string {
497         return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
498 }