X-Git-Url: http://git.osdn.net/view?a=blobdiff_plain;f=blockchain%2Freactor.go;h=ab346c4b1c38eb34e1cc304bcf264838ea6fbb4b;hb=01180396e6e36d20c8a666286c985637e6186bfd;hp=6ce6723bd0e565c1723ac85ad8cd86121cacfdec;hpb=6bb0c9be56ba66ce925dd2587f4f535f26566732;p=bytom%2Fbytom.git diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 6ce6723b..ab346c4b 100755 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -2,108 +2,53 @@ package blockchain import ( "context" - "fmt" - "net/http" "reflect" "time" log "github.com/sirupsen/logrus" cmn "github.com/tendermint/tmlibs/common" - "github.com/bytom/blockchain/accesstoken" - "github.com/bytom/blockchain/account" - "github.com/bytom/blockchain/asset" - "github.com/bytom/blockchain/pseudohsm" "github.com/bytom/blockchain/txfeed" - "github.com/bytom/blockchain/wallet" - "github.com/bytom/encoding/json" - "github.com/bytom/errors" "github.com/bytom/mining/cpuminer" + "github.com/bytom/mining/miningpool" "github.com/bytom/p2p" + "github.com/bytom/p2p/trust" "github.com/bytom/protocol" - "github.com/bytom/protocol/bc/legacy" + "github.com/bytom/protocol/bc" + protocolTypes "github.com/bytom/protocol/bc/types" "github.com/bytom/types" + "github.com/bytom/wallet" ) const ( // BlockchainChannel is a channel for blocks and status updates BlockchainChannel = byte(0x40) + maxNewBlockChSize = int(1024) - defaultChannelCapacity = 100 - trySyncIntervalMS = 100 statusUpdateIntervalSeconds = 10 maxBlockchainResponseSize = 22020096 + 2 crosscoreRPCPrefix = "/rpc/" ) -const ( - // SUCCESS indicates the rpc calling is successful. - SUCCESS = "success" - // FAIL indicated the rpc calling is failed. - FAIL = "fail" -) - -// Response describes the response standard. -type Response struct { - Status string `json:"status,omitempty"` - Msg string `json:"msg,omitempty"` - Data interface{} `json:"data,omitempty"` -} - -//NewSuccessResponse success response -func NewSuccessResponse(data interface{}) Response { - return Response{Status: SUCCESS, Data: data} -} - -//NewErrorResponse error response -func NewErrorResponse(err error) Response { - return Response{Status: FAIL, Msg: err.Error()} -} - -//BlockchainReactor handles long-term catchup syncing. +// BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor chain *protocol.Chain wallet *wallet.Wallet - accounts *account.Manager - assets *asset.Registry - accessTokens *accesstoken.CredentialStore - txFeedTracker *txfeed.Tracker + TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor blockKeeper *blockKeeper txPool *protocol.TxPool - hsm *pseudohsm.HSM mining *cpuminer.CPUMiner - mux *http.ServeMux + miningPool *miningpool.MiningPool sw *p2p.Switch - handler http.Handler evsw types.EventSwitch + newBlockCh chan *bc.Hash miningEnable bool } -func batchRecover(ctx context.Context, v *interface{}) { - if r := recover(); r != nil { - var err error - if recoveredErr, ok := r.(error); ok { - err = recoveredErr - } else { - err = fmt.Errorf("panic with %T", r) - } - err = errors.Wrap(err) - *v = err - } - - if *v == nil { - return - } - // Convert errors into error responses (including errors - // from recovered panics above). - if err, ok := (*v).(error); ok { - *v = errorFormatter.Format(err) - } -} - -func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) { +// Info return the server information +func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) { return map[string]interface{}{ "is_configured": false, "version": "0.001", @@ -113,77 +58,28 @@ func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, }, nil } -func maxBytes(h http.Handler) http.Handler { - const maxReqSize = 1e7 // 10MB - return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - // A block can easily be bigger than maxReqSize, but everything - // else should be pretty small. - if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" { - req.Body = http.MaxBytesReader(w, req.Body, maxReqSize) - } - h.ServeHTTP(w, req) - }) -} - -// Used as a request object for api queries -type requestQuery struct { - Filter string `json:"filter,omitempty"` - FilterParams []interface{} `json:"filter_params,omitempty"` - SumBy []string `json:"sum_by,omitempty"` - PageSize int `json:"page_size"` - - // AscLongPoll and Timeout are used by /list-transactions - // to facilitate notifications. - AscLongPoll bool `json:"ascending_with_long_poll,omitempty"` - Timeout json.Duration `json:"timeout"` - - // After is a completely opaque cursor, indicating that only - // items in the result set after the one identified by `After` - // should be included. It has no relationship to time. - After string `json:"after"` - - // These two are used for time-range queries like /list-transactions - StartTimeMS uint64 `json:"start_time,omitempty"` - EndTimeMS uint64 `json:"end_time,omitempty"` - - // This is used for point-in-time queries like /list-balances - // TODO(bobg): Different request structs for endpoints with different needs - TimestampMS uint64 `json:"timestamp,omitempty"` - - // This is used for filtering results from /list-access-tokens - // Value must be "client" or "network" - Type string `json:"type"` - - // Aliases is used to filter results from /mockshm/list-keys - Aliases []string `json:"aliases,omitempty"` -} - -// Used as a response object for api queries -type page struct { - Items interface{} `json:"items"` - Next requestQuery `json:"next"` - LastPage bool `json:"last_page"` - After string `json:"after"` -} - // NewBlockchainReactor returns the reactor of whole blockchain. -func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, accessTokens *accesstoken.CredentialStore, miningEnable bool) *BlockchainReactor { - mining := cpuminer.NewCPUMiner(chain, accounts, txPool) +func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor { + newBlockCh := make(chan *bc.Hash, maxNewBlockChSize) bcr := &BlockchainReactor{ chain: chain, wallet: wallet, - accounts: accounts, - assets: assets, blockKeeper: newBlockKeeper(chain, sw), txPool: txPool, - mining: mining, - mux: http.NewServeMux(), sw: sw, - hsm: hsm, - txFeedTracker: txfeeds, - accessTokens: accessTokens, + TxFeedTracker: txfeeds, miningEnable: miningEnable, + newBlockCh: newBlockCh, } + + if wallet == nil { + bcr.mining = cpuminer.NewCPUMiner(chain, nil, txPool, newBlockCh) + bcr.miningPool = miningpool.NewMiningPool(chain, nil, txPool, newBlockCh) + } else { + bcr.mining = cpuminer.NewCPUMiner(chain, wallet.AccountMgr, txPool, newBlockCh) + bcr.miningPool = miningpool.NewMiningPool(chain, wallet.AccountMgr, txPool, newBlockCh) + } + bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr) return bcr } @@ -191,7 +87,6 @@ func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accoun // OnStart implements BaseService func (bcr *BlockchainReactor) OnStart() error { bcr.BaseReactor.OnStart() - bcr.BuildHandler() if bcr.miningEnable { bcr.mining.Start() @@ -212,7 +107,7 @@ func (bcr *BlockchainReactor) OnStop() { // GetChannels implements Reactor func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { return []*p2p.ChannelDescriptor{ - &p2p.ChannelDescriptor{ + { ID: BlockchainChannel, Priority: 5, SendQueueCapacity: 100, @@ -232,6 +127,13 @@ func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { // Receive implements Reactor by handling 4 types of messages (look below). func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { + var tm *trust.TrustMetric + key := src.Connection().RemoteAddress.IP.String() + if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil { + log.Errorf("Can't get peer trust metric") + return + } + _, msg, err := DecodeMessage(msgBytes) if err != nil { log.Errorf("Error decoding messagek %v", err) @@ -241,7 +143,7 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) switch msg := msg.(type) { case *BlockRequestMessage: - var block *legacy.Block + var block *protocolTypes.Block var err error if msg.Height != 0 { block, err = bcr.chain.GetBlockByHeight(msg.Height) @@ -252,7 +154,6 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) log.Errorf("Fail on BlockRequestMessage get block: %v", err) return } - response, err := NewBlockResponseMessage(block) if err != nil { log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err) @@ -261,7 +162,7 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response}) case *BlockResponseMessage: - bcr.blockKeeper.AddBlock(msg.GetBlock(), src.Key) + bcr.blockKeeper.AddBlock(msg.GetBlock(), src) case *StatusRequestMessage: block := bcr.chain.BestBlock() @@ -273,7 +174,7 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) case *TransactionNotifyMessage: tx := msg.GetTransaction() if err := bcr.chain.ValidateTx(tx); err != nil { - log.Errorf("TransactionNotifyMessage: %v", err) + bcr.sw.AddScamPeer(src) } default: @@ -290,8 +191,14 @@ func (bcr *BlockchainReactor) syncRoutine() { for { select { + case blockHash := <-bcr.newBlockCh: + block, err := bcr.chain.GetBlockByHash(blockHash) + if err != nil { + log.Errorf("Error get block from newBlockCh %v", err) + } + log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block") case newTx := <-newTxCh: - bcr.txFeedTracker.TxFilter(newTx) + bcr.TxFeedTracker.TxFilter(newTx) go bcr.BroadcastTransaction(newTx) case _ = <-statusUpdateTicker.C: go bcr.BroadcastStatusResponse() @@ -317,7 +224,7 @@ func (bcr *BlockchainReactor) BroadcastStatusResponse() { } // BroadcastTransaction broadcats `BlockStore` transaction. -func (bcr *BlockchainReactor) BroadcastTransaction(tx *legacy.Tx) error { +func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error { msg, err := NewTransactionNotifyMessage(tx) if err != nil { return err