import (
"context"
- "fmt"
- "net/http"
"reflect"
"time"
log "github.com/sirupsen/logrus"
cmn "github.com/tendermint/tmlibs/common"
- "github.com/bytom/blockchain/accesstoken"
- "github.com/bytom/blockchain/account"
- "github.com/bytom/blockchain/asset"
- "github.com/bytom/blockchain/pseudohsm"
"github.com/bytom/blockchain/txfeed"
- "github.com/bytom/blockchain/wallet"
- "github.com/bytom/encoding/json"
- "github.com/bytom/errors"
"github.com/bytom/mining/cpuminer"
+ "github.com/bytom/mining/miningpool"
"github.com/bytom/p2p"
+ "github.com/bytom/p2p/trust"
"github.com/bytom/protocol"
- "github.com/bytom/protocol/bc/legacy"
+ "github.com/bytom/protocol/bc"
+ protocolTypes "github.com/bytom/protocol/bc/types"
"github.com/bytom/types"
+ "github.com/bytom/wallet"
)
const (
// BlockchainChannel is a channel for blocks and status updates
BlockchainChannel = byte(0x40)
+ maxNewBlockChSize = int(1024)
- defaultChannelCapacity = 100
- trySyncIntervalMS = 100
statusUpdateIntervalSeconds = 10
maxBlockchainResponseSize = 22020096 + 2
crosscoreRPCPrefix = "/rpc/"
)
-const (
- // SUCCESS indicates the rpc calling is successful.
- SUCCESS = "success"
- // FAIL indicated the rpc calling is failed.
- FAIL = "fail"
-)
-
-// Response describes the response standard.
-type Response struct {
- Status string `json:"status,omitempty"`
- Msg string `json:"msg,omitempty"`
- Data interface{} `json:"data,omitempty"`
-}
-
-//NewSuccessResponse success response
-func NewSuccessResponse(data interface{}) Response {
- return Response{Status: SUCCESS, Data: data}
-}
-
-//NewErrorResponse error response
-func NewErrorResponse(err error) Response {
- return Response{Status: FAIL, Msg: err.Error()}
-}
-
-//BlockchainReactor handles long-term catchup syncing.
+// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
chain *protocol.Chain
wallet *wallet.Wallet
- accounts *account.Manager
- assets *asset.Registry
- accessTokens *accesstoken.CredentialStore
- txFeedTracker *txfeed.Tracker
+ TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
blockKeeper *blockKeeper
txPool *protocol.TxPool
- hsm *pseudohsm.HSM
mining *cpuminer.CPUMiner
- mux *http.ServeMux
+ miningPool *miningpool.MiningPool
sw *p2p.Switch
- handler http.Handler
evsw types.EventSwitch
+ newBlockCh chan *bc.Hash
miningEnable bool
}
-func batchRecover(ctx context.Context, v *interface{}) {
- if r := recover(); r != nil {
- var err error
- if recoveredErr, ok := r.(error); ok {
- err = recoveredErr
- } else {
- err = fmt.Errorf("panic with %T", r)
- }
- err = errors.Wrap(err)
- *v = err
- }
-
- if *v == nil {
- return
- }
- // Convert errors into error responses (including errors
- // from recovered panics above).
- if err, ok := (*v).(error); ok {
- *v = errorFormatter.Format(err)
- }
-}
-
-func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
+// Info return the server information
+func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
return map[string]interface{}{
"is_configured": false,
"version": "0.001",
}, nil
}
-func maxBytes(h http.Handler) http.Handler {
- const maxReqSize = 1e7 // 10MB
- return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- // A block can easily be bigger than maxReqSize, but everything
- // else should be pretty small.
- if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
- req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
- }
- h.ServeHTTP(w, req)
- })
-}
-
-// Used as a request object for api queries
-type requestQuery struct {
- Filter string `json:"filter,omitempty"`
- FilterParams []interface{} `json:"filter_params,omitempty"`
- SumBy []string `json:"sum_by,omitempty"`
- PageSize int `json:"page_size"`
-
- // AscLongPoll and Timeout are used by /list-transactions
- // to facilitate notifications.
- AscLongPoll bool `json:"ascending_with_long_poll,omitempty"`
- Timeout json.Duration `json:"timeout"`
-
- // After is a completely opaque cursor, indicating that only
- // items in the result set after the one identified by `After`
- // should be included. It has no relationship to time.
- After string `json:"after"`
-
- // These two are used for time-range queries like /list-transactions
- StartTimeMS uint64 `json:"start_time,omitempty"`
- EndTimeMS uint64 `json:"end_time,omitempty"`
-
- // This is used for point-in-time queries like /list-balances
- // TODO(bobg): Different request structs for endpoints with different needs
- TimestampMS uint64 `json:"timestamp,omitempty"`
-
- // This is used for filtering results from /list-access-tokens
- // Value must be "client" or "network"
- Type string `json:"type"`
-
- // Aliases is used to filter results from /mockshm/list-keys
- Aliases []string `json:"aliases,omitempty"`
-}
-
-// Used as a response object for api queries
-type page struct {
- Items interface{} `json:"items"`
- Next requestQuery `json:"next"`
- LastPage bool `json:"last_page"`
- After string `json:"after"`
-}
-
// NewBlockchainReactor returns the reactor of whole blockchain.
-func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, sw *p2p.Switch, hsm *pseudohsm.HSM, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, accessTokens *accesstoken.CredentialStore, miningEnable bool) *BlockchainReactor {
- mining := cpuminer.NewCPUMiner(chain, accounts, txPool)
+func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
+ newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
bcr := &BlockchainReactor{
chain: chain,
wallet: wallet,
- accounts: accounts,
- assets: assets,
blockKeeper: newBlockKeeper(chain, sw),
txPool: txPool,
- mining: mining,
- mux: http.NewServeMux(),
sw: sw,
- hsm: hsm,
- txFeedTracker: txfeeds,
- accessTokens: accessTokens,
+ TxFeedTracker: txfeeds,
miningEnable: miningEnable,
+ newBlockCh: newBlockCh,
}
+
+ if wallet == nil {
+ bcr.mining = cpuminer.NewCPUMiner(chain, nil, txPool, newBlockCh)
+ bcr.miningPool = miningpool.NewMiningPool(chain, nil, txPool, newBlockCh)
+ } else {
+ bcr.mining = cpuminer.NewCPUMiner(chain, wallet.AccountMgr, txPool, newBlockCh)
+ bcr.miningPool = miningpool.NewMiningPool(chain, wallet.AccountMgr, txPool, newBlockCh)
+ }
+
bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
return bcr
}
// OnStart implements BaseService
func (bcr *BlockchainReactor) OnStart() error {
bcr.BaseReactor.OnStart()
- bcr.BuildHandler()
if bcr.miningEnable {
bcr.mining.Start()
// GetChannels implements Reactor
func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
- &p2p.ChannelDescriptor{
+ {
ID: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 100,
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+ var tm *trust.TrustMetric
+ key := src.Connection().RemoteAddress.IP.String()
+ if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
+ log.Errorf("Can't get peer trust metric")
+ return
+ }
+
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
log.Errorf("Error decoding messagek %v", err)
switch msg := msg.(type) {
case *BlockRequestMessage:
- var block *legacy.Block
+ var block *protocolTypes.Block
var err error
if msg.Height != 0 {
block, err = bcr.chain.GetBlockByHeight(msg.Height)
log.Errorf("Fail on BlockRequestMessage get block: %v", err)
return
}
-
response, err := NewBlockResponseMessage(block)
if err != nil {
log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
case *BlockResponseMessage:
- bcr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
+ bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
case *StatusRequestMessage:
block := bcr.chain.BestBlock()
case *TransactionNotifyMessage:
tx := msg.GetTransaction()
if err := bcr.chain.ValidateTx(tx); err != nil {
- log.Errorf("TransactionNotifyMessage: %v", err)
+ bcr.sw.AddScamPeer(src)
}
default:
for {
select {
+ case blockHash := <-bcr.newBlockCh:
+ block, err := bcr.chain.GetBlockByHash(blockHash)
+ if err != nil {
+ log.Errorf("Error get block from newBlockCh %v", err)
+ }
+ log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
case newTx := <-newTxCh:
- bcr.txFeedTracker.TxFilter(newTx)
+ bcr.TxFeedTracker.TxFilter(newTx)
go bcr.BroadcastTransaction(newTx)
case _ = <-statusUpdateTicker.C:
go bcr.BroadcastStatusResponse()
}
// BroadcastTransaction broadcats `BlockStore` transaction.
-func (bcr *BlockchainReactor) BroadcastTransaction(tx *legacy.Tx) error {
+func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
msg, err := NewTransactionNotifyMessage(tx)
if err != nil {
return err