OSDN Git Service

update master (#487)
[bytom/bytom.git] / blockchain / reactor.go
index 6ce6723..ab346c4 100755 (executable)
@@ -2,108 +2,53 @@ package blockchain
 
 import (
        "context"
-       "fmt"
-       "net/http"
        "reflect"
        "time"
 
        log "github.com/sirupsen/logrus"
        cmn "github.com/tendermint/tmlibs/common"
 
-       "github.com/bytom/blockchain/accesstoken"
-       "github.com/bytom/blockchain/account"
-       "github.com/bytom/blockchain/asset"
-       "github.com/bytom/blockchain/pseudohsm"
        "github.com/bytom/blockchain/txfeed"
-       "github.com/bytom/blockchain/wallet"
-       "github.com/bytom/encoding/json"
-       "github.com/bytom/errors"
        "github.com/bytom/mining/cpuminer"
+       "github.com/bytom/mining/miningpool"
        "github.com/bytom/p2p"
+       "github.com/bytom/p2p/trust"
        "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc/legacy"
+       "github.com/bytom/protocol/bc"
+       protocolTypes "github.com/bytom/protocol/bc/types"
        "github.com/bytom/types"
+       "github.com/bytom/wallet"
 )
 
 const (
        // BlockchainChannel is a channel for blocks and status updates
        BlockchainChannel = byte(0x40)
+       maxNewBlockChSize = int(1024)
 
-       defaultChannelCapacity      = 100
-       trySyncIntervalMS           = 100
        statusUpdateIntervalSeconds = 10
        maxBlockchainResponseSize   = 22020096 + 2
        crosscoreRPCPrefix          = "/rpc/"
 )
 
-const (
-       // SUCCESS indicates the rpc calling is successful.
-       SUCCESS = "success"
-       // FAIL indicated the rpc calling is failed.
-       FAIL = "fail"
-)
-
-// Response describes the response standard.
-type Response struct {
-       Status string      `json:"status,omitempty"`
-       Msg    string      `json:"msg,omitempty"`
-       Data   interface{} `json:"data,omitempty"`
-}
-
-//NewSuccessResponse success response
-func NewSuccessResponse(data interface{}) Response {
-       return Response{Status: SUCCESS, Data: data}
-}
-
-//NewErrorResponse error response
-func NewErrorResponse(err error) Response {
-       return Response{Status: FAIL, Msg: err.Error()}
-}
-
-//BlockchainReactor handles long-term catchup syncing.
+// BlockchainReactor handles long-term catchup syncing.
 type BlockchainReactor struct {
        p2p.BaseReactor
 
        chain         *protocol.Chain
        wallet        *wallet.Wallet
-       accounts      *account.Manager
-       assets        *asset.Registry
-       accessTokens  *accesstoken.CredentialStore
-       txFeedTracker *txfeed.Tracker
+       TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
        blockKeeper   *blockKeeper
        txPool        *protocol.TxPool
-       hsm           *pseudohsm.HSM
        mining        *cpuminer.CPUMiner
-       mux           *http.ServeMux
+       miningPool    *miningpool.MiningPool
        sw            *p2p.Switch
-       handler       http.Handler
        evsw          types.EventSwitch
+       newBlockCh    chan *bc.Hash
        miningEnable  bool
 }
 
-func batchRecover(ctx context.Context, v *interface{}) {
-       if r := recover(); r != nil {
-               var err error
-               if recoveredErr, ok := r.(error); ok {
-                       err = recoveredErr
-               } else {
-                       err = fmt.Errorf("panic with %T", r)
-               }
-               err = errors.Wrap(err)
-               *v = err
-       }
-
-       if *v == nil {
-               return
-       }
-       // Convert errors into error responses (including errors
-       // from recovered panics above).
-       if err, ok := (*v).(error); ok {
-               *v = errorFormatter.Format(err)
-       }
-}
-
-func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
+// Info return the server information
+func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
        return map[string]interface{}{
                "is_configured": false,
                "version":       "0.001",
@@ -113,77 +58,28 @@ func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{},
        }, nil
 }
 
-func maxBytes(h http.Handler) http.Handler {
-       const maxReqSize = 1e7 // 10MB
-       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
-               // A block can easily be bigger than maxReqSize, but everything
-               // else should be pretty small.
-               if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
-                       req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
-               }
-               h.ServeHTTP(w, req)
-       })
-}
-
-// Used as a request object for api queries
-type requestQuery struct {
-       Filter       string        `json:"filter,omitempty"`
-       FilterParams []interface{} `json:"filter_params,omitempty"`
-       SumBy        []string      `json:"sum_by,omitempty"`
-       PageSize     int           `json:"page_size"`
-
-       // AscLongPoll and Timeout are used by /list-transactions
-       // to facilitate notifications.
-       AscLongPoll bool          `json:"ascending_with_long_poll,omitempty"`
-       Timeout     json.Duration `json:"timeout"`
-
-       // After is a completely opaque cursor, indicating that only
-       // items in the result set after the one identified by `After`
-       // should be included. It has no relationship to time.
-       After string `json:"after"`
-
-       // These two are used for time-range queries like /list-transactions
-       StartTimeMS uint64 `json:"start_time,omitempty"`
-       EndTimeMS   uint64 `json:"end_time,omitempty"`
-
-       // This is used for point-in-time queries like /list-balances
-       // TODO(bobg): Different request structs for endpoints with different needs
-       TimestampMS uint64 `json:"timestamp,omitempty"`
-
-       // This is used for filtering results from /list-access-tokens
-       // Value must be "client" or "network"
-       Type string `json:"type"`
-
-       // Aliases is used to filter results from /mockshm/list-keys
-       Aliases []string `json:"aliases,omitempty"`
-}
-
-// Used as a response object for api queries
-type page struct {
-       Items    interface{}  `json:"items"`
-       Next     requestQuery `json:"next"`
-       LastPage bool         `json:"last_page"`
-       After    string       `json:"after"`
-}
-
 // NewBlockchainReactor returns the reactor of whole blockchain.
-func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, accessTokens *accesstoken.CredentialStore, miningEnable bool) *BlockchainReactor {
-       mining := cpuminer.NewCPUMiner(chain, accounts, txPool)
+func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
+       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
        bcr := &BlockchainReactor{
                chain:         chain,
                wallet:        wallet,
-               accounts:      accounts,
-               assets:        assets,
                blockKeeper:   newBlockKeeper(chain, sw),
                txPool:        txPool,
-               mining:        mining,
-               mux:           http.NewServeMux(),
                sw:            sw,
-               hsm:           hsm,
-               txFeedTracker: txfeeds,
-               accessTokens:  accessTokens,
+               TxFeedTracker: txfeeds,
                miningEnable:  miningEnable,
+               newBlockCh:    newBlockCh,
        }
+
+       if wallet == nil {
+               bcr.mining = cpuminer.NewCPUMiner(chain, nil, txPool, newBlockCh)
+               bcr.miningPool = miningpool.NewMiningPool(chain, nil, txPool, newBlockCh)
+       } else {
+               bcr.mining = cpuminer.NewCPUMiner(chain, wallet.AccountMgr, txPool, newBlockCh)
+               bcr.miningPool = miningpool.NewMiningPool(chain, wallet.AccountMgr, txPool, newBlockCh)
+       }
+
        bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
        return bcr
 }
@@ -191,7 +87,6 @@ func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accoun
 // OnStart implements BaseService
 func (bcr *BlockchainReactor) OnStart() error {
        bcr.BaseReactor.OnStart()
-       bcr.BuildHandler()
 
        if bcr.miningEnable {
                bcr.mining.Start()
@@ -212,7 +107,7 @@ func (bcr *BlockchainReactor) OnStop() {
 // GetChannels implements Reactor
 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
        return []*p2p.ChannelDescriptor{
-               &p2p.ChannelDescriptor{
+               {
                        ID:                BlockchainChannel,
                        Priority:          5,
                        SendQueueCapacity: 100,
@@ -232,6 +127,13 @@ func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
 
 // Receive implements Reactor by handling 4 types of messages (look below).
 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+       var tm *trust.TrustMetric
+       key := src.Connection().RemoteAddress.IP.String()
+       if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
+               log.Errorf("Can't get peer trust metric")
+               return
+       }
+
        _, msg, err := DecodeMessage(msgBytes)
        if err != nil {
                log.Errorf("Error decoding messagek %v", err)
@@ -241,7 +143,7 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
 
        switch msg := msg.(type) {
        case *BlockRequestMessage:
-               var block *legacy.Block
+               var block *protocolTypes.Block
                var err error
                if msg.Height != 0 {
                        block, err = bcr.chain.GetBlockByHeight(msg.Height)
@@ -252,7 +154,6 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
                        log.Errorf("Fail on BlockRequestMessage get block: %v", err)
                        return
                }
-
                response, err := NewBlockResponseMessage(block)
                if err != nil {
                        log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
@@ -261,7 +162,7 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
                src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
 
        case *BlockResponseMessage:
-               bcr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
+               bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
 
        case *StatusRequestMessage:
                block := bcr.chain.BestBlock()
@@ -273,7 +174,7 @@ func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
        case *TransactionNotifyMessage:
                tx := msg.GetTransaction()
                if err := bcr.chain.ValidateTx(tx); err != nil {
-                       log.Errorf("TransactionNotifyMessage: %v", err)
+                       bcr.sw.AddScamPeer(src)
                }
 
        default:
@@ -290,8 +191,14 @@ func (bcr *BlockchainReactor) syncRoutine() {
 
        for {
                select {
+               case blockHash := <-bcr.newBlockCh:
+                       block, err := bcr.chain.GetBlockByHash(blockHash)
+                       if err != nil {
+                               log.Errorf("Error get block from newBlockCh %v", err)
+                       }
+                       log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
                case newTx := <-newTxCh:
-                       bcr.txFeedTracker.TxFilter(newTx)
+                       bcr.TxFeedTracker.TxFilter(newTx)
                        go bcr.BroadcastTransaction(newTx)
                case _ = <-statusUpdateTicker.C:
                        go bcr.BroadcastStatusResponse()
@@ -317,7 +224,7 @@ func (bcr *BlockchainReactor) BroadcastStatusResponse() {
 }
 
 // BroadcastTransaction broadcats `BlockStore` transaction.
-func (bcr *BlockchainReactor) BroadcastTransaction(tx *legacy.Tx) error {
+func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
        msg, err := NewTransactionNotifyMessage(tx)
        if err != nil {
                return err