cmn "github.com/tendermint/tmlibs/common"
"github.com/bytom/accesstoken"
- "github.com/bytom/blockchain"
+ "github.com/bytom/blockchain/txfeed"
cfg "github.com/bytom/config"
"github.com/bytom/dashboard"
"github.com/bytom/errors"
+ "github.com/bytom/mining/cpuminer"
+ "github.com/bytom/mining/miningpool"
"github.com/bytom/net/http/authn"
"github.com/bytom/net/http/gzip"
"github.com/bytom/net/http/httpjson"
"github.com/bytom/net/http/static"
+ "github.com/bytom/netsync"
"github.com/bytom/protocol"
"github.com/bytom/wallet"
)
// API is the scheduling center for server
type API struct {
- bcr *blockchain.BlockchainReactor
- wallet *wallet.Wallet
- accessTokens *accesstoken.CredentialStore
- chain *protocol.Chain
- server *http.Server
- handler http.Handler
+ sync *netsync.SyncManager
+ wallet *wallet.Wallet
+ accessTokens *accesstoken.CredentialStore
+ chain *protocol.Chain
+ server *http.Server
+ handler http.Handler
+ txFeedTracker *txfeed.Tracker
+ cpuMiner *cpuminer.CPUMiner
+ miningPool *miningpool.MiningPool
}
func (a *API) initServer(config *cfg.Config) {
}
// NewAPI create and initialize the API
-func NewAPI(bcr *blockchain.BlockchainReactor, wallet *wallet.Wallet, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
+func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
api := &API{
- bcr: bcr,
- wallet: wallet,
- chain: chain,
- accessTokens: token,
+ sync: sync,
+ wallet: wallet,
+ chain: chain,
+ accessTokens: token,
+ txFeedTracker: txfeeds,
+ cpuMiner: cpuMiner,
+ miningPool: miningPool,
}
api.buildHandler()
api.initServer(config)
m.Handle("/", alwaysError(errors.New("not Found")))
m.Handle("/error", jsonHandler(a.walletError))
- m.Handle("/info", jsonHandler(a.bcr.Info))
m.Handle("/net-info", jsonHandler(a.getNetInfo))
m.Handle("/create-access-token", jsonHandler(a.createAccessToken))
log "github.com/sirupsen/logrus"
"github.com/bytom/blockchain/query"
- "github.com/bytom/consensus"
+ "github.com/bytom/wallet"
"github.com/bytom/consensus/difficulty"
chainjson "github.com/bytom/encoding/json"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
- "github.com/bytom/wallet"
)
-// return network infomation
-func (a *API) getNetInfo() Response {
- return NewSuccessResponse(a.bcr.GetNodeInfo())
-}
// return best block hash
func (a *API) getBestBlockHash() Response {
return NewSuccessResponse(blockHeight)
}
-// return is in mining or not
-func (a *API) isMining() Response {
- IsMining := map[string]bool{"isMining": a.bcr.IsMining()}
- return NewSuccessResponse(IsMining)
-}
-
-// return gasRate
-func (a *API) gasRate() Response {
- gasrate := map[string]int64{"gasRate": consensus.VMGasRate}
- return NewSuccessResponse(gasrate)
-}
"context"
"github.com/bytom/protocol/bc/types"
+ "github.com/bytom/protocol/bc"
)
func (a *API) getWork() Response {
- work, err := a.bcr.GetWork()
+ work, err := a.GetWork()
if err != nil {
return NewErrorResponse(err)
}
return NewSuccessResponse(work)
}
+
type SubmitWorkReq struct {
BlockHeader *types.BlockHeader `json:"block_header"`
}
func (a *API) submitWork(ctx context.Context, req *SubmitWorkReq) Response {
- if err := a.bcr.SubmitWork(req.BlockHeader); err != nil {
+ if err := a.SubmitWork(req.BlockHeader); err != nil {
return NewErrorResponse(err)
}
return NewSuccessResponse(true)
}
return NewSuccessResponse(resp)
}
+
+// GetWorkResp is resp struct for API
+type GetWorkResp struct {
+ BlockHeader *types.BlockHeader `json:"block_header"`
+ Seed *bc.Hash `json:"seed"`
+}
+
+func (a *API) GetWork() (*GetWorkResp, error) {
+ bh, err := a.miningPool.GetWork()
+ if err != nil {
+ return nil, err
+ }
+
+ seed, err := a.chain.GetSeed(bh.Height, &bh.PreviousBlockHash)
+ if err != nil {
+ return nil, err
+ }
+
+ return &GetWorkResp{
+ BlockHeader: bh,
+ Seed: seed,
+ }, nil
+}
+
+func (a *API) SubmitWork(bh *types.BlockHeader) error {
+ return a.miningPool.SubmitWork(bh)
+}
--- /dev/null
+package api
+
+type NetInfo struct {
+ Listening bool `json:"listening"`
+ Syncing bool `json:"syncing"`
+ Mining bool `json:"mining"`
+ PeerCount int `json:"peer_count"`
+ CurrentBlock uint64 `json:"current_block"`
+ HighestBlock uint64 `json:"highest_block"`
+}
+
+func (a *API) GetNodeInfo() *NetInfo {
+ info := &NetInfo{
+ Listening: a.sync.Switch().IsListening(),
+ Syncing: a.sync.BlockKeeper().IsCaughtUp(),
+ Mining: a.cpuMiner.IsMining(),
+ PeerCount: len(a.sync.Switch().Peers().List()),
+ CurrentBlock: a.chain.Height(),
+ }
+ _, info.HighestBlock = a.sync.Peers().BestPeer()
+ if info.CurrentBlock > info.HighestBlock {
+ info.HighestBlock = info.CurrentBlock
+ }
+ return info
+}
+
+// return network infomation
+func (a *API) getNetInfo() Response {
+ return NewSuccessResponse(a.GetNodeInfo())
+}
+
+// return is in mining or not
+func (a *API) isMining() Response {
+ IsMining := map[string]bool{"isMining": a.IsMining()}
+ return NewSuccessResponse(IsMining)
+}
+
+func (a *API) IsMining() bool {
+ return a.cpuMiner.IsMining()
+}
"github.com/bytom/account"
"github.com/bytom/blockchain/query"
+ "github.com/bytom/consensus"
)
// POST /list-accounts
return NewSuccessResponse(UTXOs)
}
+
+// return gasRate
+func (a *API) gasRate() Response {
+ gasrate := map[string]int64{"gasRate": consensus.VMGasRate}
+ return NewSuccessResponse(gasrate)
+}
log "github.com/sirupsen/logrus"
+ "github.com/bytom/errors"
"github.com/bytom/blockchain/txfeed"
)
Alias string `json:"alias"`
Filter string `json:"filter"`
}) Response {
- if err := a.bcr.TxFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
+ if err := a.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
log.WithField("error", err).Error("Add TxFeed Failed")
return NewErrorResponse(err)
}
Alias string `json:"alias,omitempty"`
}) Response {
var tmpTxFeed interface{}
- rawTxfeed, err := a.bcr.GetTxFeedByAlias(ctx, in.Alias)
+ rawTxfeed, err := a.GetTxFeedByAlias(ctx, in.Alias)
if err != nil {
return NewErrorResponse(err)
}
func (a *API) deleteTxFeed(ctx context.Context, in struct {
Alias string `json:"alias,omitempty"`
}) Response {
- if err := a.bcr.TxFeedTracker.Delete(ctx, in.Alias); err != nil {
+ if err := a.txFeedTracker.Delete(ctx, in.Alias); err != nil {
return NewErrorResponse(err)
}
return NewSuccessResponse(nil)
Alias string `json:"alias"`
Filter string `json:"filter"`
}) Response {
- if err := a.bcr.TxFeedTracker.Delete(ctx, in.Alias); err != nil {
+ if err := a.txFeedTracker.Delete(ctx, in.Alias); err != nil {
return NewErrorResponse(err)
}
- if err := a.bcr.TxFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
+ if err := a.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
log.WithField("error", err).Error("Update TxFeed Failed")
return NewErrorResponse(err)
}
txFeed := txfeed.TxFeed{}
txFeeds := make([]txfeed.TxFeed, 0)
- iter := a.bcr.TxFeedTracker.DB.Iterator()
+ iter := a.txFeedTracker.DB.Iterator()
defer iter.Release()
for iter.Next() {
return NewSuccessResponse(txFeeds)
}
+
+func (a *API) GetTxFeedByAlias(ctx context.Context, filter string) ([]byte, error) {
+ jf, err := json.Marshal(filter)
+ if err != nil {
+ return nil, err
+ }
+
+ value := a.txFeedTracker.DB.Get(jf)
+ if value == nil {
+ return nil, errors.New("No transaction feed")
+ }
+
+ return value, nil
+}
+++ /dev/null
-package blockchain
-
-import (
- "errors"
- "sync"
- "time"
-
- log "github.com/sirupsen/logrus"
-
- "github.com/bytom/p2p"
- "github.com/bytom/protocol"
- "github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/types"
-)
-
-type blockKeeperPeer struct {
- mtx sync.RWMutex
- height uint64
- hash *bc.Hash
-}
-
-func newBlockKeeperPeer(height uint64, hash *bc.Hash) *blockKeeperPeer {
- return &blockKeeperPeer{
- height: height,
- hash: hash,
- }
-}
-
-func (p *blockKeeperPeer) GetStatus() (height uint64, hash *bc.Hash) {
- p.mtx.RLock()
- defer p.mtx.RUnlock()
- return p.height, p.hash
-}
-
-func (p *blockKeeperPeer) SetStatus(height uint64, hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
-
- p.height = height
- p.hash = hash
-}
-
-type pendingResponse struct {
- block *types.Block
- src *p2p.Peer
-}
-
-//TODO: add retry mechanism
-type blockKeeper struct {
- mtx sync.RWMutex
- chainHeight uint64
- maxPeerHeight uint64
- chainUpdateCh <-chan struct{}
- peerUpdateCh chan struct{}
- done chan bool
-
- chain *protocol.Chain
- sw *p2p.Switch
- peers map[string]*blockKeeperPeer
- pendingProcessCh chan *pendingResponse
-}
-
-func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch) *blockKeeper {
- chainHeight := chain.Height()
- bk := &blockKeeper{
- chainHeight: chainHeight,
- maxPeerHeight: uint64(0),
- chainUpdateCh: chain.BlockWaiter(chainHeight + 1),
- peerUpdateCh: make(chan struct{}, 1000),
- done: make(chan bool, 1),
-
- chain: chain,
- sw: sw,
- peers: make(map[string]*blockKeeperPeer),
- pendingProcessCh: make(chan *pendingResponse),
- }
- go bk.blockProcessWorker()
- go bk.blockRequestWorker()
- return bk
-}
-
-func (bk *blockKeeper) Stop() {
- bk.done <- true
-}
-
-func (bk *blockKeeper) AddBlock(block *types.Block, src *p2p.Peer) {
- bk.pendingProcessCh <- &pendingResponse{block: block, src: src}
-}
-
-func (bk *blockKeeper) IsCaughtUp() bool {
- bk.mtx.RLock()
- defer bk.mtx.RUnlock()
- return bk.chainHeight >= bk.maxPeerHeight
-}
-
-func (bk *blockKeeper) RemovePeer(peerID string) {
- bk.mtx.Lock()
- delete(bk.peers, peerID)
- bk.mtx.Unlock()
- log.WithField("ID", peerID).Info("Delete peer from blockKeeper")
-}
-
-func (bk *blockKeeper) requestBlockByHash(peerID string, hash *bc.Hash) error {
- peer := bk.sw.Peers().Get(peerID)
- if peer == nil {
- return errors.New("can't find peer in peer pool")
- }
- msg := &BlockRequestMessage{RawHash: hash.Byte32()}
- peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return nil
-}
-
-func (bk *blockKeeper) requestBlockByHeight(peerID string, height uint64) error {
- peer := bk.sw.Peers().Get(peerID)
- if peer == nil {
- return errors.New("can't find peer in peer pool")
- }
- msg := &BlockRequestMessage{Height: height}
- peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return nil
-}
-
-func (bk *blockKeeper) SetPeerHeight(peerID string, height uint64, hash *bc.Hash) {
- bk.mtx.Lock()
- defer bk.mtx.Unlock()
-
- if height > bk.maxPeerHeight {
- bk.maxPeerHeight = height
- bk.peerUpdateCh <- struct{}{}
- }
-
- if peer, ok := bk.peers[peerID]; ok {
- peer.SetStatus(height, hash)
- return
- }
- peer := newBlockKeeperPeer(height, hash)
- bk.peers[peerID] = peer
- log.WithFields(log.Fields{"ID": peerID, "Height": height}).Info("Add new peer to blockKeeper")
-}
-
-func (bk *blockKeeper) RequestBlockByHeight(height uint64) {
- bk.mtx.RLock()
- defer bk.mtx.RUnlock()
-
- for peerID, peer := range bk.peers {
- if peerHeight, _ := peer.GetStatus(); peerHeight > bk.chainHeight {
- bk.requestBlockByHeight(peerID, height)
- }
- }
-}
-
-func (bk *blockKeeper) blockRequestWorker() {
- for {
- select {
- case <-bk.chainUpdateCh:
- chainHeight := bk.chain.Height()
- bk.mtx.Lock()
- if bk.chainHeight < chainHeight {
- bk.chainHeight = chainHeight
- }
- bk.chainUpdateCh = bk.chain.BlockWaiter(bk.chainHeight + 1)
- bk.mtx.Unlock()
-
- case <-bk.peerUpdateCh:
- bk.mtx.RLock()
- chainHeight := bk.chainHeight
- maxPeerHeight := bk.maxPeerHeight
- bk.mtx.RUnlock()
-
- for i := chainHeight + 1; i <= maxPeerHeight; i++ {
- bk.RequestBlockByHeight(i)
- waiter := bk.chain.BlockWaiter(i)
- retryTicker := time.Tick(15 * time.Second)
-
- retryLoop:
- for {
- select {
- case <-waiter:
- break retryLoop
- case <-retryTicker:
- bk.RequestBlockByHeight(i)
- }
- }
- }
-
- case <-bk.done:
- return
- }
- }
-}
-
-func (bk *blockKeeper) blockProcessWorker() {
- for pendingResponse := range bk.pendingProcessCh {
-
- block := pendingResponse.block
- blockHash := block.Hash()
- isOrphan, err := bk.chain.ProcessBlock(block)
- if err != nil {
- bk.sw.AddScamPeer(pendingResponse.src)
- log.WithField("hash", blockHash.String()).Errorf("blockKeeper fail process block %v", err)
- continue
- }
- log.WithFields(log.Fields{
- "height": block.Height,
- "hash": blockHash.String(),
- "isOrphan": isOrphan,
- }).Info("blockKeeper processed block")
-
- if isOrphan {
- bk.requestBlockByHash(pendingResponse.src.Key, &block.PreviousBlockHash)
- }
- }
-}
+++ /dev/null
-package blockchain
-
-type NetInfo struct {
- Listening bool `json:"listening"`
- Syncing bool `json:"syncing"`
- Mining bool `json:"mining"`
- PeerCount int `json:"peer_count"`
- CurrentBlock uint64 `json:"current_block"`
- HighestBlock uint64 `json:"highest_block"`
-}
-
-func (bcr *BlockchainReactor) GetNodeInfo() *NetInfo {
- return &NetInfo{
- Listening: bcr.sw.IsListening(),
- Syncing: bcr.blockKeeper.IsCaughtUp(),
- Mining: bcr.mining.IsMining(),
- PeerCount: len(bcr.sw.Peers().List()),
- CurrentBlock: bcr.blockKeeper.chainHeight,
- HighestBlock: bcr.blockKeeper.maxPeerHeight,
- }
-}
-
-func (bcr *BlockchainReactor) IsMining() bool {
- return bcr.mining.IsMining()
-}
+++ /dev/null
-package blockchain
-
-import (
- "github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/types"
-)
-
-// GetWorkResp is resp struct for API
-type GetWorkResp struct {
- BlockHeader *types.BlockHeader `json:"block_header"`
- Seed *bc.Hash `json:"seed"`
-}
-
-func (bcr *BlockchainReactor) GetWork() (*GetWorkResp, error) {
- bh, err := bcr.miningPool.GetWork()
- if err != nil {
- return nil, err
- }
-
- seed, err := bcr.chain.GetSeed(bh.Height, &bh.PreviousBlockHash)
- if err != nil {
- return nil, err
- }
-
- return &GetWorkResp{
- BlockHeader: bh,
- Seed: seed,
- }, nil
-}
-
-func (bcr *BlockchainReactor) SubmitWork(bh *types.BlockHeader) error {
- return bcr.miningPool.SubmitWork(bh)
-}
+++ /dev/null
-package blockchain
-
-import (
- "context"
- "reflect"
- "time"
-
- log "github.com/sirupsen/logrus"
- cmn "github.com/tendermint/tmlibs/common"
-
- "github.com/bytom/account"
- "github.com/bytom/blockchain/txfeed"
- "github.com/bytom/mining/cpuminer"
- "github.com/bytom/mining/miningpool"
- "github.com/bytom/p2p"
- "github.com/bytom/p2p/trust"
- "github.com/bytom/protocol"
- "github.com/bytom/protocol/bc"
- protocolTypes "github.com/bytom/protocol/bc/types"
- "github.com/bytom/types"
-)
-
-const (
- // BlockchainChannel is a channel for blocks and status updates
- BlockchainChannel = byte(0x40)
- maxNewBlockChSize = int(1024)
-
- statusUpdateIntervalSeconds = 10
- maxBlockchainResponseSize = 22020096 + 2
-)
-
-// BlockchainReactor handles long-term catchup syncing.
-type BlockchainReactor struct {
- p2p.BaseReactor
-
- chain *protocol.Chain
- TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
- blockKeeper *blockKeeper
- txPool *protocol.TxPool
- mining *cpuminer.CPUMiner
- miningPool *miningpool.MiningPool
- sw *p2p.Switch
- evsw types.EventSwitch
- newBlockCh chan *bc.Hash
- miningEnable bool
-}
-
-// Info return the server information
-func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
- return map[string]interface{}{
- "is_configured": false,
- "version": "0.001",
- "build_commit": "----",
- "build_date": "------",
- "build_config": "---------",
- }, nil
-}
-
-// NewBlockchainReactor returns the reactor of whole blockchain.
-func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, accountMgr *account.Manager, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
- newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
- bcr := &BlockchainReactor{
- chain: chain,
- blockKeeper: newBlockKeeper(chain, sw),
- txPool: txPool,
- sw: sw,
- TxFeedTracker: txfeeds,
- miningEnable: miningEnable,
- newBlockCh: newBlockCh,
- }
-
- bcr.mining = cpuminer.NewCPUMiner(chain, accountMgr, txPool, newBlockCh)
- bcr.miningPool = miningpool.NewMiningPool(chain, accountMgr, txPool, newBlockCh)
-
- bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
- return bcr
-}
-
-// OnStart implements BaseService
-func (bcr *BlockchainReactor) OnStart() error {
- bcr.BaseReactor.OnStart()
-
- if bcr.miningEnable {
- bcr.mining.Start()
- }
- go bcr.syncRoutine()
- return nil
-}
-
-// OnStop implements BaseService
-func (bcr *BlockchainReactor) OnStop() {
- bcr.BaseReactor.OnStop()
- if bcr.miningEnable {
- bcr.mining.Stop()
- }
- bcr.blockKeeper.Stop()
-}
-
-// GetChannels implements Reactor
-func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
- return []*p2p.ChannelDescriptor{
- {
- ID: BlockchainChannel,
- Priority: 5,
- SendQueueCapacity: 100,
- },
- }
-}
-
-// AddPeer implements Reactor by sending our state to peer.
-func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
- peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
-}
-
-// RemovePeer implements Reactor by removing peer from the pool.
-func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- bcr.blockKeeper.RemovePeer(peer.Key)
-}
-
-// Receive implements Reactor by handling 4 types of messages (look below).
-func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
- var tm *trust.TrustMetric
- key := src.Connection().RemoteAddress.IP.String()
- if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
- log.Errorf("Can't get peer trust metric")
- return
- }
-
- _, msg, err := DecodeMessage(msgBytes)
- if err != nil {
- log.Errorf("Error decoding messagek %v", err)
- return
- }
- log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
-
- switch msg := msg.(type) {
- case *BlockRequestMessage:
- var block *protocolTypes.Block
- var err error
- if msg.Height != 0 {
- block, err = bcr.chain.GetBlockByHeight(msg.Height)
- } else {
- block, err = bcr.chain.GetBlockByHash(msg.GetHash())
- }
- if err != nil {
- log.Errorf("Fail on BlockRequestMessage get block: %v", err)
- return
- }
- response, err := NewBlockResponseMessage(block)
- if err != nil {
- log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
- return
- }
- src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
-
- case *BlockResponseMessage:
- bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
-
- case *StatusRequestMessage:
- block := bcr.chain.BestBlock()
- src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
-
- case *StatusResponseMessage:
- bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
-
- case *TransactionNotifyMessage:
- tx := msg.GetTransaction()
- if err := bcr.chain.ValidateTx(tx); err != nil {
- bcr.sw.AddScamPeer(src)
- }
-
- default:
- log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
- }
-}
-
-// Handle messages from the poolReactor telling the reactor what to do.
-// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
-// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
-func (bcr *BlockchainReactor) syncRoutine() {
- statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
- newTxCh := bcr.txPool.GetNewTxCh()
-
- for {
- select {
- case blockHash := <-bcr.newBlockCh:
- block, err := bcr.chain.GetBlockByHash(blockHash)
- if err != nil {
- log.Errorf("Error get block from newBlockCh %v", err)
- }
- log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
- case newTx := <-newTxCh:
- bcr.TxFeedTracker.TxFilter(newTx)
- go bcr.BroadcastTransaction(newTx)
- case _ = <-statusUpdateTicker.C:
- go bcr.BroadcastStatusResponse()
-
- if bcr.miningEnable {
- // mining if and only if block sync is finished
- if bcr.blockKeeper.IsCaughtUp() {
- bcr.mining.Start()
- } else {
- bcr.mining.Stop()
- }
- }
- case <-bcr.Quit:
- return
- }
- }
-}
-
-// BroadcastStatusResponse broadcasts `BlockStore` height.
-func (bcr *BlockchainReactor) BroadcastStatusResponse() {
- block := bcr.chain.BestBlock()
- bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
-}
-
-// BroadcastTransaction broadcats `BlockStore` transaction.
-func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
- msg, err := NewTransactionNotifyMessage(tx)
- if err != nil {
- return err
- }
- bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return nil
-}
+++ /dev/null
-package blockchain
-
-import (
- "context"
-
- log "github.com/sirupsen/logrus"
-
- "github.com/bytom/blockchain/txbuilder"
- "github.com/bytom/errors"
- "github.com/bytom/protocol/bc/types"
-)
-
-// finalizeTxWait calls FinalizeTx and then waits for confirmation of
-// the transaction. A nil error return means the transaction is
-// confirmed on the blockchain. ErrRejected means a conflicting tx is
-// on the blockchain. context.DeadlineExceeded means ctx is an
-// expiring context that timed out.
-func (bcr *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
- // Use the current generator height as the lower bound of the block height
- // that the transaction may appear in.
- localHeight := bcr.chain.Height()
- //generatorHeight := localHeight
-
- log.WithField("localHeight", localHeight).Info("Starting to finalize transaction")
-
- err := txbuilder.FinalizeTx(ctx, bcr.chain, txTemplate.Transaction)
- if err != nil {
- return err
- }
- if waitUntil == "none" {
- return nil
- }
-
- //TODO:complete finalizeTxWait
- //height, err := a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
- if err != nil {
- return err
- }
- if waitUntil == "confirmed" {
- return nil
- }
-
- return nil
-}
-
-func (bcr *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *types.Tx, height uint64) (uint64, error) {
- log.Printf("waitForTxInBlock function")
- for {
- height++
- select {
- case <-ctx.Done():
- return 0, ctx.Err()
-
- case <-bcr.chain.BlockWaiter(height):
- b, err := bcr.chain.GetBlockByHeight(height)
- if err != nil {
- return 0, errors.Wrap(err, "getting block that just landed")
- }
- for _, confirmed := range b.Transactions {
- if confirmed.ID == tx.ID {
- // confirmed
- return height, nil
- }
- }
-
- // might still be in pool or might be rejected; we can't
- // tell definitively until its max time elapses.
- // Re-insert into the pool in case it was dropped.
- err = txbuilder.FinalizeTx(ctx, bcr.chain, tx)
- if err != nil {
- return 0, err
- }
-
- // TODO(jackson): Do simple rejection checks like checking if
- // the tx's blockchain prevouts still exist in the state tree.
- }
- }
-}
return err
}
- err = c.ValidateTx(tx)
+ _, err = c.ValidateTx(tx)
if errors.Root(err) == protocol.ErrBadTx {
return errors.Sub(ErrRejected, err)
}
+++ /dev/null
-package blockchain
-
-import (
- "context"
- "encoding/json"
-
- "github.com/bytom/errors"
-)
-
-func (bcr *BlockchainReactor) GetTxFeedByAlias(ctx context.Context, filter string) ([]byte, error) {
- jf, err := json.Marshal(filter)
- if err != nil {
- return nil, err
- }
-
- value := bcr.TxFeedTracker.DB.Get(jf)
- if value == nil {
- return nil, errors.New("No transaction feed")
- }
-
- return value, nil
-}
if _, err := n.Start(); err != nil {
return fmt.Errorf("Failed to start node: %v", err)
} else {
- log.WithField("nodeInfo", n.Switch().NodeInfo()).Info("Started node")
+ log.WithField("nodeInfo", n.SyncManager().Switch().NodeInfo()).Info("Started node")
}
// Trap signal, run forever.
"os"
"github.com/bytom/api"
- "github.com/bytom/blockchain"
"github.com/bytom/consensus/difficulty"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
fmt.Println(err)
os.Exit(1)
}
- resp := &blockchain.GetWorkResp{}
+ resp := &api.GetWorkResp{}
if err = json.Unmarshal(rawData, resp); err != nil {
fmt.Println(err)
os.Exit(1)
MaxNumPeers: 50,
HandshakeTimeout: 30,
DialTimeout: 3,
+ PexReactor: true,
}
}
BlockHeader: types.BlockHeader{
Version: 1,
Height: 0,
- Nonce: 4216085,
+ Nonce: 4216193,
Timestamp: 1516788453,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: merkleRoot,
TransactionStatusHash: txStatusHash,
},
- Bits: 2305843009222082559,
+ Bits: 2305843009214532812,
},
Transactions: []*types.Tx{genesisCoinbaseTx},
}
InitialBlockSubsidy = uint64(1470000000000000000)
// config for pow mining
- PowMinBits = uint64(2161727821138738707)
+ PowMinBits = uint64(2305843009213861724)
BlocksPerRetarget = uint64(1024)
TargetSecondsPerBlock = uint64(60)
SeedPerRetarget = uint64(128)
CoinbaseArbitrarySizeLimit = 128
VMGasRate = int64(1000)
- StorageGasRate = int64(10)
+ StorageGasRate = int64(0)
MaxGasAmount = int64(100000)
DefaultGasCredit = int64(80000)
--- /dev/null
+package netsync
+
+import (
+ "strings"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/bytom/errors"
+ "github.com/bytom/p2p"
+ "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc/types"
+)
+
+const (
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+
+ syncTimeout = 30 * time.Second
+ requestRetryTicker = 15 * time.Second
+
+ maxBlocksPending = 1024
+ maxtxsPending = 32768
+ maxQuitReq = 256
+)
+
+var (
+ errGetBlockTimeout = errors.New("Get block Timeout")
+ errPeerDropped = errors.New("Peer dropped")
+ errCommAbnorm = errors.New("Peer communication abnormality")
+ errScamPeer = errors.New("Scam peer")
+ errReqBlock = errors.New("Request block error")
+)
+
+//TODO: add retry mechanism
+type blockKeeper struct {
+ chain *protocol.Chain
+ sw *p2p.Switch
+ peers *peerSet
+
+ pendingProcessCh chan *blockPending
+ txsProcessCh chan *txsNotify
+ quitReqBlockCh chan *string
+}
+
+func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
+ bk := &blockKeeper{
+ chain: chain,
+ sw: sw,
+ peers: peers,
+ pendingProcessCh: make(chan *blockPending, maxBlocksPending),
+ txsProcessCh: make(chan *txsNotify, maxtxsPending),
+ quitReqBlockCh: quitReqBlockCh,
+ }
+ go bk.txsProcessWorker()
+ return bk
+}
+
+func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
+ bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
+}
+
+func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
+ bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
+}
+
+func (bk *blockKeeper) IsCaughtUp() bool {
+ _, height := bk.peers.BestPeer()
+ return bk.chain.Height() < height
+}
+
+func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
+ num := bk.chain.Height() + 1
+ orphanNum := uint64(0)
+ reqNum := uint64(0)
+ isOrphan := false
+ for num <= maxPeerHeight && num > 0 {
+ if isOrphan {
+ reqNum = orphanNum
+ } else {
+ reqNum = num
+ }
+ block, err := bk.BlockRequest(peerID, reqNum)
+ if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
+ log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
+ bk.peers.DropPeer(peerID)
+ return errCommAbnorm
+ }
+ isOrphan, err = bk.chain.ProcessBlock(block)
+ if err != nil {
+ bk.sw.AddScamPeer(bk.peers.Peer(peerID).getPeer())
+ log.WithField("hash: ", block.Hash()).Errorf("blockKeeper fail process block %v", err)
+ return errScamPeer
+ }
+ if isOrphan {
+ orphanNum = block.Height - 1
+ continue
+ }
+ num++
+ }
+ return nil
+}
+
+func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
+ return bk.peers.requestBlockByHeight(peerID, height)
+}
+
+func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
+ var block *types.Block
+
+ if err := bk.blockRequest(peerID, height); err != nil {
+ return nil, errReqBlock
+ }
+ retryTicker := time.Tick(requestRetryTicker)
+ syncWait := time.NewTimer(syncTimeout)
+
+ for {
+ select {
+ case pendingResponse := <-bk.pendingProcessCh:
+ block = pendingResponse.block
+ if strings.Compare(pendingResponse.peerID, peerID) != 0 {
+ log.Warning("From different peer")
+ continue
+ }
+ if block.Height != height {
+ log.Warning("Block height error")
+ continue
+ }
+ return block, nil
+ case <-retryTicker:
+ if err := bk.blockRequest(peerID, height); err != nil {
+ return nil, errReqBlock
+ }
+ case <-syncWait.C:
+ log.Warning("Request block timeout")
+ return nil, errGetBlockTimeout
+ case peerid := <-bk.quitReqBlockCh:
+ if strings.Compare(*peerid, peerID) == 0 {
+ log.Info("Quite block request worker")
+ return nil, errPeerDropped
+ }
+ }
+ }
+}
+
+func (bk *blockKeeper) txsProcessWorker() {
+ for txsResponse := range bk.txsProcessCh {
+ tx := txsResponse.tx
+ log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
+ bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
+ if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
+ bk.sw.AddScamPeer(bk.peers.Peer(txsResponse.peerID).getPeer())
+ }
+ }
+}
--- /dev/null
+package netsync
+
+import (
+ "errors"
+
+ log "github.com/sirupsen/logrus"
+ "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+
+ "github.com/bytom/p2p"
+ core "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/types"
+ "strings"
+)
+
+const (
+ maxQueueDist = 1024 //32 // Maximum allowed distance from the chain head to queue
+)
+
+var (
+ errTerminated = errors.New("terminated")
+)
+
+// Fetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type Fetcher struct {
+ chain *core.Chain
+ sw *p2p.Switch
+ peers *peerSet
+
+ // Various event channels
+ newMinedBlock chan *blockPending
+ quit chan struct{}
+
+ // Block cache
+ queue *prque.Prque // Queue containing the import operations (block number sorted)
+ queues map[string]int // Per peer block counts to prevent memory exhaustion
+ queued map[bc.Hash]*blockPending // Set of already queued blocks (to dedup imports)
+}
+
+//NewFetcher New creates a block fetcher to retrieve blocks of the new mined.
+func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
+ return &Fetcher{
+ chain: chain,
+ sw: sw,
+ peers: peers,
+ newMinedBlock: make(chan *blockPending),
+ quit: make(chan struct{}),
+ queue: prque.New(),
+ queues: make(map[string]int),
+ queued: make(map[bc.Hash]*blockPending),
+ }
+}
+
+// Start boots up the announcement based synchroniser, accepting and processing
+// hash notifications and block fetches until termination requested.
+func (f *Fetcher) Start() {
+ go f.loop()
+}
+
+// Stop terminates the announcement based synchroniser, canceling all pending
+// operations.
+func (f *Fetcher) Stop() {
+ close(f.quit)
+}
+
+// Enqueue tries to fill gaps the the fetcher's future import queue.
+func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
+ op := &blockPending{
+ peerID: peer,
+ block: block,
+ }
+ select {
+ case f.newMinedBlock <- op:
+ return nil
+ case <-f.quit:
+ return errTerminated
+ }
+}
+
+// Loop is the main fetcher loop, checking and processing various notification
+// events.
+func (f *Fetcher) loop() {
+ for {
+ // Import any queued blocks that could potentially fit
+ height := f.chain.Height()
+ for !f.queue.Empty() {
+ op := f.queue.PopItem().(*blockPending)
+ // If too high up the chain or phase, continue later
+ number := op.block.Height
+ if number > height+1 {
+ f.queue.Push(op, -float32(op.block.Height))
+ break
+ }
+ // Otherwise if fresh and still unknown, try and import
+ hash := op.block.Hash()
+ block, _ := f.chain.GetBlockByHash(&hash)
+ if block != nil {
+ f.forgetBlock(hash)
+ continue
+ }
+ if strings.Compare(op.block.PreviousBlockHash.String(), f.chain.BestBlockHash().String()) != 0 {
+ f.forgetBlock(hash)
+ continue
+ }
+ f.insert(op.peerID, op.block)
+ }
+ // Wait for an outside event to occur
+ select {
+ case <-f.quit:
+ // Fetcher terminating, abort all operations
+ return
+
+ case op := <-f.newMinedBlock:
+ // A direct block insertion was requested, try and fill any pending gaps
+ f.enqueue(op.peerID, op.block)
+ }
+ }
+}
+
+// enqueue schedules a new future import operation, if the block to be imported
+// has not yet been seen.
+func (f *Fetcher) enqueue(peer string, block *types.Block) {
+ hash := block.Hash()
+
+ //TODO: Ensure the peer isn't DOSing us
+ // Discard any past or too distant blocks
+ if dist := int64(block.Height) - int64(f.chain.Height()); dist < 0 || dist > maxQueueDist {
+ log.Info("Discarded propagated block, too far away", " peer: ", peer, "number: ", block.Height, "distance: ", dist)
+ return
+ }
+ // Schedule the block for future importing
+ if _, ok := f.queued[hash]; !ok {
+ op := &blockPending{
+ peerID: peer,
+ block: block,
+ }
+ f.queued[hash] = op
+ f.queue.Push(op, -float32(block.Height))
+ log.Info("Queued propagated block.", " peer: ", peer, "number: ", block.Height, "queued: ", f.queue.Size())
+ }
+}
+
+// insert spawns a new goroutine to run a block insertion into the chain. If the
+// block's number is at the same height as the current import phase, it updates
+// the phase states accordingly.
+func (f *Fetcher) insert(peer string, block *types.Block) {
+ // Run the import on a new thread
+ log.Info("Importing propagated block", " from peer: ", peer, " height: ", block.Height)
+ // Run the actual import and log any issues
+ if _, err := f.chain.ProcessBlock(block); err != nil {
+ log.Info("Propagated block import failed", " from peer: ", peer, " height: ", block.Height, "err: ", err)
+ return
+ }
+ // If import succeeded, broadcast the block
+ log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
+ go f.peers.BroadcastMinedBlock(block)
+}
+
+// forgetBlock removes all traces of a queued block from the fetcher's internal
+// state.
+func (f *Fetcher) forgetBlock(hash bc.Hash) {
+ if insert := f.queued[hash]; insert != nil {
+ f.queues[insert.peerID]--
+ if f.queues[insert.peerID] == 0 {
+ delete(f.queues, insert.peerID)
+ }
+ delete(f.queued, hash)
+ }
+}
--- /dev/null
+package netsync
+
+import (
+ "strings"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/tendermint/go-crypto"
+ "github.com/tendermint/go-wire"
+ cmn "github.com/tendermint/tmlibs/common"
+ dbm "github.com/tendermint/tmlibs/db"
+
+ cfg "github.com/bytom/config"
+ "github.com/bytom/p2p"
+ core "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/version"
+)
+
+//SyncManager Sync Manager is responsible for the business layer information synchronization
+type SyncManager struct {
+ networkID uint64
+ sw *p2p.Switch
+ addrBook *p2p.AddrBook // known peers
+
+ privKey crypto.PrivKeyEd25519 // local node's p2p key
+ chain *core.Chain
+ txPool *core.TxPool
+ fetcher *Fetcher
+ blockKeeper *blockKeeper
+ peers *peerSet
+
+ newBlockCh chan *bc.Hash
+ newPeerCh chan struct{}
+ txSyncCh chan *txsync
+ dropPeerCh chan *string
+ quitSync chan struct{}
+ config *cfg.Config
+ synchronising int32
+}
+
+//NewSyncManager create a sync manager
+func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+ // Create the protocol manager with the base fields
+ manager := &SyncManager{
+ txPool: txPool,
+ chain: chain,
+ privKey: crypto.GenPrivKeyEd25519(),
+ config: config,
+ quitSync: make(chan struct{}),
+ newBlockCh: newBlockCh,
+ newPeerCh: make(chan struct{}),
+ txSyncCh: make(chan *txsync),
+ dropPeerCh: make(chan *string, maxQuitReq),
+ peers: newPeerSet(),
+ }
+
+ trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
+ manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
+
+ manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
+ manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
+
+ protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
+ manager.sw.AddReactor("PROTOCOL", protocolReactor)
+
+ // Optionally, start the pex reactor
+ //var addrBook *p2p.AddrBook
+ if config.P2P.PexReactor {
+ manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
+ pexReactor := p2p.NewPEXReactor(manager.addrBook)
+ manager.sw.AddReactor("PEX", pexReactor)
+ }
+
+ return manager, nil
+}
+
+// Defaults to tcp
+func protocolAndAddress(listenAddr string) (string, string) {
+ p, address := "tcp", listenAddr
+ parts := strings.SplitN(address, "://", 2)
+ if len(parts) == 2 {
+ p, address = parts[0], parts[1]
+ }
+ return p, address
+}
+
+func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo {
+ nodeInfo := &p2p.NodeInfo{
+ PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
+ Moniker: sm.config.Moniker,
+ Network: "bytom",
+ Version: version.Version,
+ Other: []string{
+ cmn.Fmt("wire_version=%v", wire.Version),
+ cmn.Fmt("p2p_version=%v", p2p.Version),
+ },
+ }
+
+ if !sm.sw.IsListening() {
+ return nodeInfo
+ }
+
+ p2pListener := sm.sw.Listeners()[0]
+ p2pHost := p2pListener.ExternalAddress().IP.String()
+ p2pPort := p2pListener.ExternalAddress().Port
+
+ // We assume that the rpcListener has the same ExternalAddress.
+ // This is probably true because both P2P and RPC listeners use UPnP,
+ // except of course if the rpc is only bound to localhost
+ nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
+ return nodeInfo
+}
+
+func (sm *SyncManager) netStart() error {
+ // Create & add listener
+ p, address := protocolAndAddress(sm.config.P2P.ListenAddress)
+
+ l := p2p.NewDefaultListener(p, address, sm.config.P2P.SkipUPNP, nil)
+
+ sm.sw.AddListener(l)
+
+ // Start the switch
+ sm.sw.SetNodeInfo(sm.makeNodeInfo())
+ sm.sw.SetNodePrivKey(sm.privKey)
+ _, err := sm.sw.Start()
+ if err != nil {
+ return err
+ }
+
+ // If seeds exist, add them to the address book and dial out
+ if sm.config.P2P.Seeds != "" {
+ // dial out
+ seeds := strings.Split(sm.config.P2P.Seeds, ",")
+ if err := sm.DialSeeds(seeds); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+//Start start sync manager service
+func (sm *SyncManager) Start() {
+ sm.netStart()
+ // broadcast transactions
+ go sm.txBroadcastLoop()
+
+ // broadcast mined blocks
+ go sm.minedBroadcastLoop()
+
+ // start sync handlers
+ go sm.syncer()
+
+ go sm.txsyncLoop()
+}
+
+//Stop stop sync manager
+func (sm *SyncManager) Stop() {
+ close(sm.quitSync)
+ sm.sw.Stop()
+}
+
+func (sm *SyncManager) txBroadcastLoop() {
+ newTxCh := sm.txPool.GetNewTxCh()
+ for {
+ select {
+ case newTx := <-newTxCh:
+ sm.peers.BroadcastTx(newTx)
+
+ case <-sm.quitSync:
+ return
+ }
+ }
+}
+
+func (sm *SyncManager) minedBroadcastLoop() {
+ for {
+ select {
+ case blockHash := <-sm.newBlockCh:
+ block, err := sm.chain.GetBlockByHash(blockHash)
+ if err != nil {
+ log.Errorf("Failed on mined broadcast loop get block %v", err)
+ return
+ }
+ sm.peers.BroadcastMinedBlock(block)
+ case <-sm.quitSync:
+ return
+ }
+ }
+}
+
+//NodeInfo get P2P peer node info
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+ return sm.sw.NodeInfo()
+}
+
+//BlockKeeper get block keeper
+func (sm *SyncManager) BlockKeeper() *blockKeeper {
+ return sm.blockKeeper
+}
+
+//Peers get sync manager peer set
+func (sm *SyncManager) Peers() *peerSet {
+ return sm.peers
+}
+
+//DialSeeds dial seed peers
+func (sm *SyncManager) DialSeeds(seeds []string) error {
+ return sm.sw.DialSeeds(sm.addrBook, seeds)
+}
+
+//Switch get sync manager switch
+func (sm *SyncManager) Switch() *p2p.Switch {
+ return sm.sw
+}
+
+func (sm *SyncManager) removePeer(peerID string) {
+ sm.peers.DropPeer(peerID)
+ log.Debug("Removing peer", "peerID:", peerID)
+}
-package blockchain
+package netsync
import (
"bytes"
"errors"
"fmt"
- wire "github.com/tendermint/go-wire"
+ "github.com/tendermint/go-wire"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
)
+//protocol msg
const (
- // BlockRequestByte means block request message
- BlockRequestByte = byte(0x10)
- // BlockResponseByte means block response message
- BlockResponseByte = byte(0x11)
- // StatusRequestByte means status request message
- StatusRequestByte = byte(0x20)
- // StatusResponseByte means status response message
+ BlockRequestByte = byte(0x10)
+ BlockResponseByte = byte(0x11)
+ StatusRequestByte = byte(0x20)
StatusResponseByte = byte(0x21)
- // NewTransactionByte means transaction notify message
NewTransactionByte = byte(0x30)
+ NewMineBlockByte = byte(0x40)
+
+ maxBlockchainResponseSize = 22020096 + 2
)
// BlockchainMessage is a generic message for this reactor.
wire.ConcreteType{&StatusRequestMessage{}, StatusRequestByte},
wire.ConcreteType{&StatusResponseMessage{}, StatusResponseByte},
wire.ConcreteType{&TransactionNotifyMessage{}, NewTransactionByte},
+ wire.ConcreteType{&MineBlockMessage{}, NewMineBlockByte},
)
-// DecodeMessage decode receive messages
+type blockPending struct {
+ block *types.Block
+ peerID string
+}
+
+type txsNotify struct {
+ tx *types.Tx
+ peerID string
+}
+
+//DecodeMessage decode msg
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
msgType = bz[0]
n := int(0)
return
}
-// BlockRequestMessage is block request message struct
+//BlockRequestMessage request blocks from remote peers by height/hash
type BlockRequestMessage struct {
Height uint64
RawHash [32]byte
}
-// GetHash return block hash
+//GetHash get hash
func (m *BlockRequestMessage) GetHash() *bc.Hash {
hash := bc.NewHash(m.RawHash)
return &hash
}
+//String convert msg to string
func (m *BlockRequestMessage) String() string {
if m.Height > 0 {
return fmt.Sprintf("BlockRequestMessage{Height: %d}", m.Height)
return fmt.Sprintf("BlockRequestMessage{Hash: %s}", hash.String())
}
-// BlockResponseMessage is block response message struct
+//BlockResponseMessage response get block msg
type BlockResponseMessage struct {
RawBlock []byte
}
-// NewBlockResponseMessage produce new BlockResponseMessage instance
+//NewBlockResponseMessage construct bock response msg
func NewBlockResponseMessage(block *types.Block) (*BlockResponseMessage, error) {
rawBlock, err := block.MarshalText()
if err != nil {
return &BlockResponseMessage{RawBlock: rawBlock}, nil
}
-// GetBlock return block struct
+//GetBlock get block from msg
func (m *BlockResponseMessage) GetBlock() *types.Block {
block := &types.Block{
BlockHeader: types.BlockHeader{},
return block
}
+//String convert msg to string
func (m *BlockResponseMessage) String() string {
return fmt.Sprintf("BlockResponseMessage{Size: %d}", len(m.RawBlock))
}
-// TransactionNotifyMessage is transaction notify message struct
+//TransactionNotifyMessage notify new tx msg
type TransactionNotifyMessage struct {
RawTx []byte
}
-// NewTransactionNotifyMessage produce new TransactionNotifyMessage instance
+//NewTransactionNotifyMessage construct notify new tx msg
func NewTransactionNotifyMessage(tx *types.Tx) (*TransactionNotifyMessage, error) {
rawTx, err := tx.TxData.MarshalText()
if err != nil {
return &TransactionNotifyMessage{RawTx: rawTx}, nil
}
-// GetTransaction return Tx struct
-func (m *TransactionNotifyMessage) GetTransaction() *types.Tx {
+//GetTransaction get tx from msg
+func (m *TransactionNotifyMessage) GetTransaction() (*types.Tx, error) {
tx := &types.Tx{}
- tx.UnmarshalText(m.RawTx)
- return tx
+ if err := tx.UnmarshalText(m.RawTx); err != nil {
+ return nil, err
+ }
+ return tx, nil
}
+//String
func (m *TransactionNotifyMessage) String() string {
return fmt.Sprintf("TransactionNotifyMessage{Size: %d}", len(m.RawTx))
}
-// StatusRequestMessage is status request message struct
+//StatusRequestMessage status request msg
type StatusRequestMessage struct{}
+//String
func (m *StatusRequestMessage) String() string {
return "StatusRequestMessage"
}
-// StatusResponseMessage is status response message struct
+//StatusResponseMessage get status response msg
type StatusResponseMessage struct {
Height uint64
RawHash [32]byte
}
-// NewStatusResponseMessage produce new StatusResponseMessage instance
+//NewStatusResponseMessage construct get status response msg
func NewStatusResponseMessage(block *types.Block) *StatusResponseMessage {
return &StatusResponseMessage{
Height: block.Height,
}
}
-// GetHash return hash pointer
+//GetHash get hash from msg
func (m *StatusResponseMessage) GetHash() *bc.Hash {
hash := bc.NewHash(m.RawHash)
return &hash
}
+//String convert msg to string
func (m *StatusResponseMessage) String() string {
hash := m.GetHash()
return fmt.Sprintf("StatusResponseMessage{Height: %d, Hash: %s}", m.Height, hash.String())
}
+
+//MineBlockMessage new mined block msg
+type MineBlockMessage struct {
+ RawBlock []byte
+}
+
+//NewMinedBlockMessage construct new mined block msg
+func NewMinedBlockMessage(block *types.Block) (*MineBlockMessage, error) {
+ rawBlock, err := block.MarshalText()
+ if err != nil {
+ return nil, err
+ }
+ return &MineBlockMessage{RawBlock: rawBlock}, nil
+}
+
+//GetMineBlock get mine block from msg
+func (m *MineBlockMessage) GetMineBlock() (*types.Block, error) {
+ block := &types.Block{}
+ if err:=block.UnmarshalText(m.RawBlock);err!=nil{
+ return nil, err
+ }
+ return block, nil
+}
+
+//String convert msg to string
+func (m *MineBlockMessage) String() string {
+ return fmt.Sprintf("NewMineBlockMessage{Size: %d}", len(m.RawBlock))
+}
--- /dev/null
+package netsync
+
+import (
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+ "gopkg.in/fatih/set.v0"
+
+ "github.com/bytom/errors"
+ "github.com/bytom/p2p"
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/types"
+)
+
+var (
+ errClosed = errors.New("peer set is closed")
+ errAlreadyRegistered = errors.New("peer is already registered")
+ errNotRegistered = errors.New("peer is not registered")
+)
+
+const defaultVersion = 1
+
+type peer struct {
+ mtx sync.RWMutex
+ version int // Protocol version negotiated
+ id string
+ height uint64
+ hash *bc.Hash
+ *p2p.Peer
+
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+}
+
+func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
+ return &peer{
+ version: defaultVersion,
+ height: height,
+ hash: hash,
+ Peer: Peer,
+ knownTxs: set.New(),
+ knownBlocks: set.New(),
+ }
+}
+
+func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
+ p.mtx.RLock()
+ defer p.mtx.RUnlock()
+ return p.height, p.hash
+}
+
+func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ p.height = height
+ p.hash = hash
+}
+
+func (p *peer) requestBlockByHash(hash *bc.Hash) error {
+ msg := &BlockRequestMessage{RawHash: hash.Byte32()}
+ p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return nil
+}
+
+func (p *peer) requestBlockByHeight(height uint64) error {
+ msg := &BlockRequestMessage{Height: height}
+ p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return nil
+}
+
+func (p *peer) SendTransactions(txs []*types.Tx) error {
+ for _, tx := range txs {
+ msg, err := NewTransactionNotifyMessage(tx)
+ if err != nil {
+ return errors.New("Failed construction tx msg")
+ }
+ hash := &tx.ID
+ p.knownTxs.Add(hash.String())
+ p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ }
+ return nil
+}
+
+func (p *peer) getPeer() *p2p.Peer {
+ p.mtx.RLock()
+ defer p.mtx.RUnlock()
+
+ return p.Peer
+}
+
+// MarkTransaction marks a transaction as known for the peer, ensuring that it
+// will never be propagated to this particular peer.
+func (p *peer) MarkTransaction(hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ // If we reached the memory allowance, drop a previously known transaction hash
+ for p.knownTxs.Size() >= maxKnownTxs {
+ p.knownTxs.Pop()
+ }
+ p.knownTxs.Add(hash.String())
+}
+
+// MarkBlock marks a block as known for the peer, ensuring that the block will
+// never be propagated to this particular peer.
+func (p *peer) MarkBlock(hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ // If we reached the memory allowance, drop a previously known block hash
+ for p.knownBlocks.Size() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ p.knownBlocks.Add(hash.String())
+}
+
+type peerSet struct {
+ peers map[string]*peer
+ lock sync.RWMutex
+ closed bool
+}
+
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet() *peerSet {
+ return &peerSet{
+ peers: make(map[string]*peer),
+ }
+}
+
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known.
+func (ps *peerSet) Register(p *peer) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if ps.closed {
+ return errClosed
+ }
+ if _, ok := ps.peers[p.id]; ok {
+ return errAlreadyRegistered
+ }
+ ps.peers[p.id] = p
+ return nil
+}
+
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if _, ok := ps.peers[id]; !ok {
+ return errNotRegistered
+ }
+ delete(ps.peers, id)
+ return nil
+}
+
+func (ps *peerSet) DropPeer(id string) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ peer, ok := ps.peers[id]
+ if !ok {
+ return errNotRegistered
+ }
+ delete(ps.peers, id)
+ peer.CloseConn()
+ return nil
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return ps.peers[id]
+}
+
+// Len returns if the current number of peers in the set.
+func (ps *peerSet) Len() int {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return len(ps.peers)
+}
+
+// MarkTransaction marks a transaction as known for the peer, ensuring that it
+// will never be propagated to this particular peer.
+func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ if peer, ok := ps.peers[peerID]; ok {
+ peer.MarkTransaction(hash)
+ }
+}
+
+// MarkBlock marks a block as known for the peer, ensuring that the block will
+// never be propagated to this particular peer.
+func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ if peer, ok := ps.peers[peerID]; ok {
+ peer.MarkBlock(hash)
+ }
+}
+
+// PeersWithoutBlock retrieves a list of peers that do not have a given block in
+// their set of known hashes.
+func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownBlocks.Has(hash.String()) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// PeersWithoutTx retrieves a list of peers that do not have a given transaction
+// in their set of known hashes.
+func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownTxs.Has(hash.String()) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// BestPeer retrieves the known peer with the currently highest total difficulty.
+func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ var bestPeer *p2p.Peer
+ var bestHeight uint64
+
+ for _, p := range ps.peers {
+ if bestPeer == nil || p.height > bestHeight {
+ bestPeer, bestHeight = p.Peer, p.height
+ }
+ }
+
+ return bestPeer, bestHeight
+}
+
+// Close disconnects all peers.
+// No new peers can be registered after Close has returned.
+func (ps *peerSet) Close() {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ for _, p := range ps.peers {
+ p.CloseConn()
+ }
+ ps.closed = true
+}
+
+func (ps *peerSet) AddPeer(peer *p2p.Peer) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if _, ok := ps.peers[peer.Key]; !ok {
+ keeperPeer := newPeer(0, nil, peer)
+ ps.peers[peer.Key] = keeperPeer
+ log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
+ return
+ }
+ log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
+}
+
+func (ps *peerSet) RemovePeer(peerID string) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ delete(ps.peers, peerID)
+ log.WithField("ID", peerID).Info("Delete peer from peerset")
+}
+
+func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if peer, ok := ps.peers[peerID]; ok {
+ peer.SetStatus(height, hash)
+ }
+}
+
+func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ peer, ok := ps.peers[peerID]
+ if !ok {
+ return errors.New("Can't find peer. ")
+ }
+ return peer.requestBlockByHash(hash)
+}
+
+func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ peer, ok := ps.peers[peerID]
+ if !ok {
+ return errors.New("Can't find peer. ")
+ }
+ return peer.requestBlockByHeight(height)
+}
+
+func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
+ msg, err := NewMinedBlockMessage(block)
+ if err != nil {
+ return errors.New("Failed construction block msg")
+ }
+ hash := block.Hash()
+ peers := ps.PeersWithoutBlock(&hash)
+ for _, peer := range peers {
+ ps.MarkBlock(peer.Key, &hash)
+ peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ }
+ return nil
+}
+
+func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
+ msg, err := NewTransactionNotifyMessage(tx)
+ if err != nil {
+ return errors.New("Failed construction tx msg")
+ }
+ peers := ps.PeersWithoutTx(&tx.ID)
+ for _, peer := range peers {
+ ps.peers[peer.Key].MarkTransaction(&tx.ID)
+ peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ }
+ return nil
+}
--- /dev/null
+package netsync
+
+import (
+ "reflect"
+ "strings"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ cmn "github.com/tendermint/tmlibs/common"
+
+ "github.com/bytom/errors"
+ "github.com/bytom/p2p"
+ "github.com/bytom/p2p/trust"
+ "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/types"
+)
+
+const (
+ // BlockchainChannel is a channel for blocks and status updates
+ BlockchainChannel = byte(0x40)
+ protocolHandshakeTimeout = time.Second * 10
+)
+
+var (
+ //ErrProtocolHandshakeTimeout peers handshake timeout
+ ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+)
+
+// Response describes the response standard.
+type Response struct {
+ Status string `json:"status,omitempty"`
+ Msg string `json:"msg,omitempty"`
+ Data interface{} `json:"data,omitempty"`
+}
+
+type initalPeerStatus struct {
+ peerID string
+ height uint64
+ hash *bc.Hash
+}
+
+//ProtocolReactor handles new coming protocol message.
+type ProtocolReactor struct {
+ p2p.BaseReactor
+
+ chain *protocol.Chain
+ blockKeeper *blockKeeper
+ txPool *protocol.TxPool
+ sw *p2p.Switch
+ fetcher *Fetcher
+ peers *peerSet
+
+ newPeerCh chan struct{}
+ quitReqBlockCh chan *string
+ txSyncCh chan *txsync
+ peerStatusCh chan *initalPeerStatus
+}
+
+// NewProtocolReactor returns the reactor of whole blockchain.
+func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
+ pr := &ProtocolReactor{
+ chain: chain,
+ blockKeeper: blockPeer,
+ txPool: txPool,
+ sw: sw,
+ fetcher: fetcher,
+ peers: peers,
+ newPeerCh: newPeerCh,
+ txSyncCh: txSyncCh,
+ quitReqBlockCh: quitReqBlockCh,
+ peerStatusCh: make(chan *initalPeerStatus),
+ }
+ pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
+ return pr
+}
+
+// GetChannels implements Reactor
+func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
+ return []*p2p.ChannelDescriptor{
+ &p2p.ChannelDescriptor{
+ ID: BlockchainChannel,
+ Priority: 5,
+ SendQueueCapacity: 100,
+ },
+ }
+}
+
+// OnStart implements BaseService
+func (pr *ProtocolReactor) OnStart() error {
+ pr.BaseReactor.OnStart()
+ return nil
+}
+
+// OnStop implements BaseService
+func (pr *ProtocolReactor) OnStop() {
+ pr.BaseReactor.OnStop()
+}
+
+// syncTransactions starts sending all currently pending transactions to the given peer.
+func (pr *ProtocolReactor) syncTransactions(p *peer) {
+ pending := pr.txPool.GetTransactions()
+ if len(pending) == 0 {
+ return
+ }
+ txs := make([]*types.Tx, len(pending))
+ for i, batch := range pending {
+ txs[i] = batch.Tx
+ }
+ pr.txSyncCh <- &txsync{p, txs}
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
+ peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
+ handshakeWait := time.NewTimer(protocolHandshakeTimeout)
+ for {
+ select {
+ case status := <-pr.peerStatusCh:
+ if strings.Compare(status.peerID, peer.Key) == 0 {
+ pr.peers.AddPeer(peer)
+ pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
+ pr.syncTransactions(pr.peers.Peer(peer.Key))
+ pr.newPeerCh <- struct{}{}
+ return nil
+ }
+ case <-handshakeWait.C:
+ return ErrProtocolHandshakeTimeout
+ }
+ }
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+ pr.quitReqBlockCh <- &peer.Key
+ pr.peers.RemovePeer(peer.Key)
+}
+
+// Receive implements Reactor by handling 4 types of messages (look below).
+func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+ var tm *trust.TrustMetric
+ key := src.Connection().RemoteAddress.IP.String()
+ if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
+ log.Errorf("Can't get peer trust metric")
+ return
+ }
+
+ _, msg, err := DecodeMessage(msgBytes)
+ if err != nil {
+ log.Errorf("Error decoding messagek %v", err)
+ return
+ }
+ log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
+
+ switch msg := msg.(type) {
+ case *BlockRequestMessage:
+ var block *types.Block
+ var err error
+ if msg.Height != 0 {
+ block, err = pr.chain.GetBlockByHeight(msg.Height)
+ } else {
+ block, err = pr.chain.GetBlockByHash(msg.GetHash())
+ }
+ if err != nil {
+ log.Errorf("Fail on BlockRequestMessage get block: %v", err)
+ return
+ }
+ response, err := NewBlockResponseMessage(block)
+ if err != nil {
+ log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
+ return
+ }
+ src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
+
+ case *BlockResponseMessage:
+ log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
+ pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
+
+ case *StatusRequestMessage:
+ block := pr.chain.BestBlock()
+ src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
+
+ case *StatusResponseMessage:
+ peerStatus := &initalPeerStatus{
+ peerID: src.Key,
+ height: msg.Height,
+ hash: msg.GetHash(),
+ }
+ pr.peerStatusCh <- peerStatus
+
+ case *TransactionNotifyMessage:
+ tx, err := msg.GetTransaction()
+ if err != nil {
+ log.Errorf("Error decoding new tx %v", err)
+ return
+ }
+ pr.blockKeeper.AddTx(tx, src.Key)
+
+ case *MineBlockMessage:
+ block, err := msg.GetMineBlock()
+ if err != nil {
+ log.Errorf("Error decoding mined block %v", err)
+ return
+ }
+ // Mark the peer as owning the block and schedule it for import
+ hash := block.Hash()
+ pr.peers.MarkBlock(src.Key, &hash)
+ pr.fetcher.Enqueue(src.Key, block)
+ pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
+
+ default:
+ log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
+ }
+}
--- /dev/null
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package netsync
+
+import (
+ "sync/atomic"
+ "time"
+ "math/rand"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/bytom/common"
+ "github.com/bytom/protocol/bc/types"
+)
+
+const (
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+
+ // This is the target size for the packs of transactions sent by txsyncLoop.
+ // A pack can get larger than this if a single transactions exceeds this size.
+ txsyncPackSize = 100 * 1024
+)
+
+type txsync struct {
+ p *peer
+ txs []*types.Tx
+}
+
+// syncer is responsible for periodically synchronising with the network, both
+// downloading hashes and blocks as well as handling the announcement handler.
+func (sm *SyncManager) syncer() {
+ // Start and ensure cleanup of sync mechanisms
+ sm.fetcher.Start()
+ defer sm.fetcher.Stop()
+ //defer sm.downloader.Terminate()
+
+ // Wait for different events to fire synchronisation operations
+ forceSync := time.NewTicker(forceSyncCycle)
+ defer forceSync.Stop()
+
+ for {
+ select {
+ case <-sm.newPeerCh:
+ log.Info("New peer connected.")
+ // Make sure we have peers to select from, then sync
+ if sm.sw.Peers().Size() < minDesiredPeerCount {
+ break
+ }
+ go sm.synchronise()
+
+ case <-forceSync.C:
+ // Force a sync even if not enough peers are present
+ go sm.synchronise()
+
+ case <-sm.quitSync:
+ return
+ }
+ }
+}
+
+// synchronise tries to sync up our local block chain with a remote peer.
+func (sm *SyncManager) synchronise() {
+ // Make sure only one goroutine is ever allowed past this point at once
+ if !atomic.CompareAndSwapInt32(&sm.synchronising, 0, 1) {
+ log.Info("Synchronising ...")
+ return
+ }
+ defer atomic.StoreInt32(&sm.synchronising, 0)
+
+ peer, bestHeight := sm.peers.BestPeer()
+ // Short circuit if no peers are available
+ if peer == nil {
+ return
+ }
+ if bestHeight > sm.chain.Height() {
+ sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
+ }
+}
+
+// txsyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (sm *SyncManager) txsyncLoop() {
+ var (
+ pending = make(map[string]*txsync)
+ sending = false // whether a send is active
+ pack = new(txsync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *txsync) {
+ // Fill pack with transactions up to the target size.
+ size := common.StorageSize(0)
+ pack.p = s.p
+ pack.txs = pack.txs[:0]
+ for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+ pack.txs = append(pack.txs, s.txs[i])
+ size += common.StorageSize(s.txs[i].SerializedSize)
+ }
+ // Remove the transactions that will be sent.
+ s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
+ if len(s.txs) == 0 {
+ delete(pending, s.p.Key)
+ }
+ // Send the pack in the background.
+ log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
+ sending = true
+ go func() { done <- pack.p.SendTransactions(pack.txs) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *txsync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-sm.txSyncCh:
+ pending[s.p.Key] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ log.Info("Transaction send failed", "err", err)
+ delete(pending, pack.p.Key)
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-sm.quitSync:
+ return
+ }
+ }
+}
"context"
"net/http"
_ "net/http/pprof"
- "strings"
"time"
log "github.com/sirupsen/logrus"
- "github.com/tendermint/go-crypto"
- "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
browser "github.com/toqueteos/webbrowser"
"github.com/bytom/account"
"github.com/bytom/api"
"github.com/bytom/asset"
- bc "github.com/bytom/blockchain"
"github.com/bytom/blockchain/pseudohsm"
"github.com/bytom/blockchain/txfeed"
cfg "github.com/bytom/config"
"github.com/bytom/crypto/ed25519/chainkd"
"github.com/bytom/database/leveldb"
"github.com/bytom/env"
- "github.com/bytom/p2p"
+ "github.com/bytom/mining/cpuminer"
+ "github.com/bytom/mining/miningpool"
+ "github.com/bytom/netsync"
"github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
"github.com/bytom/types"
- "github.com/bytom/version"
w "github.com/bytom/wallet"
)
const (
webAddress = "http://127.0.0.1:9888"
expireReservationsPeriod = time.Second
+ maxNewBlockChSize = 1024
)
type Node struct {
// config
config *cfg.Config
- // network
- privKey crypto.PrivKeyEd25519 // local node's p2p key
- sw *p2p.Switch // p2p connections
- addrBook *p2p.AddrBook // known peers
+ syncManager *netsync.SyncManager
- evsw types.EventSwitch // pub/sub for services
- bcReactor *bc.BlockchainReactor
+ evsw types.EventSwitch // pub/sub for services
+ //bcReactor *bc.BlockchainReactor
wallet *w.Wallet
accessTokens *accesstoken.CredentialStore
api *api.API
chain *protocol.Chain
+ txfeed *txfeed.Tracker
+ cpuMiner *cpuminer.CPUMiner
+ miningPool *miningpool.MiningPool
+ miningEnable bool
}
func NewNode(config *cfg.Config) *Node {
tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
accessTokens := accesstoken.NewStore(tokenDB)
- privKey := crypto.GenPrivKeyEd25519()
-
// Make event switch
eventSwitch := types.NewEventSwitch()
_, err := eventSwitch.Start()
cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
}
- trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-
- sw := p2p.NewSwitch(config.P2P, trustHistoryDB)
-
genesisBlock := cfg.GenerateGenesisBlock()
txPool := protocol.NewTxPool()
// Clean up expired UTXO reservations periodically.
go accounts.ExpireReservations(ctx, expireReservationsPeriod)
}
+ newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
- bcReactor := bc.NewBlockchainReactor(chain, txPool, sw, accounts, txFeed, config.Mining)
-
- sw.AddReactor("BLOCKCHAIN", bcReactor)
-
- // Optionally, start the pex reactor
- var addrBook *p2p.AddrBook
- if config.P2P.PexReactor {
- addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
- pexReactor := p2p.NewPEXReactor(addrBook)
- sw.AddReactor("PEX", pexReactor)
- }
+ syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
// run the profile server
profileHost := config.ProfListenAddress
}
node := &Node{
- config: config,
-
- privKey: privKey,
- sw: sw,
- addrBook: addrBook,
-
+ config: config,
+ syncManager: syncManager,
evsw: eventSwitch,
- bcReactor: bcReactor,
accessTokens: accessTokens,
wallet: wallet,
chain: chain,
+ txfeed: txFeed,
+ miningEnable: config.Mining,
}
+
+ node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
+ node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
+
node.BaseService = *cmn.NewBaseService(nil, "Node", node)
return node
}
func (n *Node) initAndstartApiServer() {
- n.api = api.NewAPI(n.bcReactor, n.wallet, n.chain, n.config, n.accessTokens)
+ n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
listenAddr := env.String("LISTEN", n.config.ApiAddress)
n.api.StartServer(*listenAddr)
}
func (n *Node) OnStart() error {
- // Create & add listener
- p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
- l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
- n.sw.AddListener(l)
-
- // Start the switch
- n.sw.SetNodeInfo(n.makeNodeInfo())
- n.sw.SetNodePrivKey(n.privKey)
- _, err := n.sw.Start()
- if err != nil {
- return err
+ if n.miningEnable {
+ n.cpuMiner.Start()
}
-
- // If seeds exist, add them to the address book and dial out
- if n.config.P2P.Seeds != "" {
- // dial out
- seeds := strings.Split(n.config.P2P.Seeds, ",")
- if err := n.DialSeeds(seeds); err != nil {
- return err
- }
- }
-
+ n.syncManager.Start()
n.initAndstartApiServer()
if !n.config.Web.Closed {
lanchWebBroser()
func (n *Node) OnStop() {
n.BaseService.OnStop()
-
+ if n.miningEnable {
+ n.cpuMiner.Stop()
+ }
+ n.syncManager.Stop()
log.Info("Stopping Node")
// TODO: gracefully disconnect from peers.
- n.sw.Stop()
-
}
func (n *Node) RunForever() {
})
}
-// Add a Listener to accept inbound peer connections.
-// Add listeners before starting the Node.
-// The first listener is the primary listener (in NodeInfo)
-func (n *Node) AddListener(l p2p.Listener) {
- n.sw.AddListener(l)
-}
-
-func (n *Node) Switch() *p2p.Switch {
- return n.sw
-}
-
func (n *Node) EventSwitch() types.EventSwitch {
return n.evsw
}
-func (n *Node) makeNodeInfo() *p2p.NodeInfo {
- nodeInfo := &p2p.NodeInfo{
- PubKey: n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
- Moniker: n.config.Moniker,
- Network: "bytom",
- Version: version.Version,
- Other: []string{
- cmn.Fmt("wire_version=%v", wire.Version),
- cmn.Fmt("p2p_version=%v", p2p.Version),
- },
- }
-
- if !n.sw.IsListening() {
- return nodeInfo
- }
-
- p2pListener := n.sw.Listeners()[0]
- p2pHost := p2pListener.ExternalAddress().IP.String()
- p2pPort := p2pListener.ExternalAddress().Port
- //rpcListenAddr := n.config.RPC.ListenAddress
-
- // We assume that the rpcListener has the same ExternalAddress.
- // This is probably true because both P2P and RPC listeners use UPnP,
- // except of course if the rpc is only bound to localhost
- nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
- //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
- return nodeInfo
-}
-
-//------------------------------------------------------------------------------
-
-func (n *Node) NodeInfo() *p2p.NodeInfo {
- return n.sw.NodeInfo()
-}
-
-func (n *Node) DialSeeds(seeds []string) error {
- return n.sw.DialSeeds(n.addrBook, seeds)
+func (n *Node) SyncManager() *netsync.SyncManager {
+ return n.syncManager
}
-// Defaults to tcp
-func ProtocolAndAddress(listenAddr string) (string, string) {
- p, address := "tcp", listenAddr
- parts := strings.SplitN(address, "://", 2)
- if len(parts) == 2 {
- p, address = parts[0], parts[1]
- }
- return p, address
+func (n *Node) MiningPool() *miningpool.MiningPool {
+ return n.miningPool
}
//------------------------------------------------------------------------------
"math/rand"
"reflect"
"time"
+ "strings"
log "github.com/sirupsen/logrus"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
+ "github.com/bytom/errors"
)
const (
// AddPeer implements Reactor by adding peer to the address book (if inbound)
// or by requesting more addresses (if outbound).
-func (r *PEXReactor) AddPeer(p *Peer) {
+func (r *PEXReactor) AddPeer(p *Peer) error {
if p.IsOutbound() {
// For outbound peers, the address is already in the books.
// Either it was added in DialSeeds or when we
"addr": p.ListenAddr,
"error": err,
}).Error("Error in AddPeer: Invalid peer address")
- return
+ return errors.New("Error in AddPeer: Invalid peer address")
}
r.book.AddAddress(addr, addr)
}
+ return nil
}
// RemovePeer implements Reactor.
}
_, alreadySelected := toDial[try.IP.String()]
alreadyDialing := r.Switch.IsDialing(try)
- alreadyConnected := r.Switch.Peers().Has(try.IP.String())
+ var alreadyConnected bool
+ for _, v := range r.Switch.Peers().list {
+ if strings.Compare(v.mconn.RemoteAddress.String(), try.String()) == 0 {
+ alreadyConnected = true
+ break
+ }
+ }
if alreadySelected || alreadyDialing || alreadyConnected {
continue
} else {
SetSwitch(*Switch)
GetChannels() []*ChannelDescriptor
- AddPeer(peer *Peer)
+ AddPeer(peer *Peer) error
RemovePeer(peer *Peer, reason interface{})
Receive(chID byte, peer *Peer, msgBytes []byte)
}
// Start peer
if sw.IsRunning() {
- sw.startInitPeer(peer)
+ if err := sw.startInitPeer(peer); err != nil {
+ return err
+ }
}
// Add the peer to .peers.
sw.filterConnByPubKey = f
}
-func (sw *Switch) startInitPeer(peer *Peer) {
+func (sw *Switch) startInitPeer(peer *Peer) error {
peer.Start() // spawn send/recv routines
for _, reactor := range sw.reactors {
- reactor.AddPeer(peer)
+ if err := reactor.AddPeer(peer); err != nil {
+ return err
+ }
}
+ return nil
}
// Dial a list of seeds asynchronously in random order
return
}
-func (sw *Switch) Peers() IPeerSet {
+func (sw *Switch) Peers() *PeerSet {
return sw.peers
}
return err
}
chainChanges[a.Height] = &attachBlock.ID
+ // TODO: put this action in better place
+ c.orphanManage.Delete(&attachBlock.ID)
}
return c.setState(block, utxoView, chainChanges)
blockHash := block.Hash()
if c.BlockExist(&blockHash) {
log.WithField("hash", blockHash.String()).Info("Skip process due to block already been handled")
- return false, nil
+ return c.orphanManage.BlockExist(&blockHash), nil
}
if !c.store.BlockExist(&block.PreviousBlockHash) {
c.orphanManage.Add(block)
// ValidateTx validates the given transaction. A cache holds
// per-transaction validation results and is consulted before
// performing full validation.
-func (c *Chain) ValidateTx(tx *types.Tx) error {
+func (c *Chain) ValidateTx(tx *types.Tx) (bool,error) {
newTx := tx.Tx
block := types.MapBlock(c.BestBlock())
if ok := c.txPool.HaveTransaction(&newTx.ID); ok {
- return c.txPool.GetErrCache(&newTx.ID)
+ return false, c.txPool.GetErrCache(&newTx.ID)
}
// validate the UTXO
view := c.txPool.GetTransactionUTXO(tx.Tx)
if err := c.GetTransactionsUtxo(view, []*bc.Tx{newTx}); err != nil {
c.txPool.AddErrCache(&newTx.ID, err)
- return err
+ return false, err
}
if err := view.ApplyTransaction(block, newTx, false); err != nil {
- c.txPool.AddErrCache(&newTx.ID, err)
- return err
+ return true, err
}
// validate the BVM contract
if err != nil {
if gasStatus == nil || !gasStatus.GasVaild {
c.txPool.AddErrCache(&newTx.ID, err)
- return err
+ return false, err
}
gasOnlyTx = true
}
_, err = c.txPool.AddTransaction(tx, gasOnlyTx, block.BlockHeader.Height, gasStatus.BTMValue)
- return err
+ return false, err
}
--- /dev/null
+The MIT License (MIT)
+
+Copyright (c) 2013 Fatih Arslan
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
--- /dev/null
+# Set [![GoDoc](http://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/gopkg.in/fatih/set.v0) [![Build Status](http://img.shields.io/travis/fatih/set.svg?style=flat-square)](https://travis-ci.org/fatih/set)
+
+Set is a basic and simple, hash-based, **Set** data structure implementation
+in Go (Golang).
+
+Set provides both threadsafe and non-threadsafe implementations of a generic
+set data structure. The thread safety encompasses all operations on one set.
+Operations on multiple sets are consistent in that the elements of each set
+used was valid at exactly one point in time between the start and the end of
+the operation. Because it's thread safe, you can use it concurrently with your
+goroutines.
+
+For usage see examples below or click on the godoc badge.
+
+## Install and Usage
+
+Install the package with:
+
+```bash
+go get gopkg.in/fatih/set.v0
+```
+
+Import it with:
+
+```go
+import "gopkg.in/fatih/set.v0"
+```
+
+and use `set` as the package name inside the code.
+
+## Examples
+
+#### Initialization of a new Set
+
+```go
+
+// create a set with zero items
+s := set.New()
+s := set.NewNonTS() // non thread-safe version
+
+// ... or with some initial values
+s := set.New("istanbul", "frankfurt", 30.123, "san francisco", 1234)
+s := set.NewNonTS("kenya", "ethiopia", "sumatra")
+
+```
+
+#### Basic Operations
+
+```go
+// add items
+s.Add("istanbul")
+s.Add("istanbul") // nothing happens if you add duplicate item
+
+// add multiple items
+s.Add("ankara", "san francisco", 3.14)
+
+// remove item
+s.Remove("frankfurt")
+s.Remove("frankfurt") // nothing happes if you remove a nonexisting item
+
+// remove multiple items
+s.Remove("barcelona", 3.14, "ankara")
+
+// removes an arbitary item and return it
+item := s.Pop()
+
+// create a new copy
+other := s.Copy()
+
+// remove all items
+s.Clear()
+
+// number of items in the set
+len := s.Size()
+
+// return a list of items
+items := s.List()
+
+// string representation of set
+fmt.Printf("set is %s", s.String())
+
+```
+
+#### Check Operations
+
+```go
+// check for set emptiness, returns true if set is empty
+s.IsEmpty()
+
+// check for a single item exist
+s.Has("istanbul")
+
+// ... or for multiple items. This will return true if all of the items exist.
+s.Has("istanbul", "san francisco", 3.14)
+
+// create two sets for the following checks...
+s := s.New("1", "2", "3", "4", "5")
+t := s.New("1", "2", "3")
+
+
+// check if they are the same
+if !s.IsEqual(t) {
+ fmt.Println("s is not equal to t")
+}
+
+// if s contains all elements of t
+if s.IsSubset(t) {
+ fmt.Println("t is a subset of s")
+}
+
+// ... or if s is a superset of t
+if t.IsSuperset(s) {
+ fmt.Println("s is a superset of t")
+}
+
+
+```
+
+#### Set Operations
+
+
+```go
+// let us initialize two sets with some values
+a := set.New("ankara", "berlin", "san francisco")
+b := set.New("frankfurt", "berlin")
+
+// creates a new set with the items in a and b combined.
+// [frankfurt, berlin, ankara, san francisco]
+c := set.Union(a, b)
+
+// contains items which is in both a and b
+// [berlin]
+c := set.Intersection(a, b)
+
+// contains items which are in a but not in b
+// [ankara, san francisco]
+c := set.Difference(a, b)
+
+// contains items which are in one of either, but not in both.
+// [frankfurt, ankara, san francisco]
+c := set.SymmetricDifference(a, b)
+
+```
+
+```go
+// like Union but saves the result back into a.
+a.Merge(b)
+
+// removes the set items which are in b from a and saves the result back into a.
+a.Separate(b)
+
+```
+
+#### Multiple Set Operations
+
+```go
+a := set.New("1", "3", "4", "5")
+b := set.New("2", "3", "4", "5")
+c := set.New("4", "5", "6", "7")
+
+// creates a new set with items in a, b and c
+// [1 2 3 4 5 6 7]
+u := set.Union(a, b, c)
+
+// creates a new set with items in a but not in b and c
+// [1]
+u := set.Difference(a, b, c)
+
+// creates a new set with items that are common to a, b and c
+// [5]
+u := set.Intersection(a, b, c)
+```
+
+#### Helper methods
+
+The Slice functions below are a convenient way to extract or convert your Set data
+into basic data types.
+
+
+```go
+// create a set of mixed types
+s := set.New("ankara", "5", "8", "san francisco", 13, 21)
+
+
+// convert s into a slice of strings (type is []string)
+// [ankara 5 8 san francisco]
+t := set.StringSlice(s)
+
+
+// u contains a slice of ints (type is []int)
+// [13, 21]
+u := set.IntSlice(s)
+
+```
+
+#### Concurrent safe usage
+
+Below is an example of a concurrent way that uses set. We call ten functions
+concurrently and wait until they are finished. It basically creates a new
+string for each goroutine and adds it to our set.
+
+```go
+package main
+
+import (
+ "fmt"
+ "github.com/fatih/set"
+ "strconv"
+ "sync"
+)
+
+func main() {
+ var wg sync.WaitGroup // this is just for waiting until all goroutines finish
+
+ // Initialize our thread safe Set
+ s := set.New()
+
+ // Add items concurrently (item1, item2, and so on)
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func(i int) {
+ item := "item" + strconv.Itoa(i)
+ fmt.Println("adding", item)
+ s.Add(item)
+ wg.Done()
+ }(i)
+ }
+
+ // Wait until all concurrent calls finished and print our set
+ wg.Wait()
+ fmt.Println(s)
+}
+```
+
+## Credits
+
+ * [Fatih Arslan](https://github.com/fatih)
+ * [Arne Hormann](https://github.com/arnehormann)
+ * [Sam Boyer](https://github.com/sdboyer)
+ * [Ralph Loizzo](https://github.com/friartech)
+
+## License
+
+The MIT License (MIT) - see LICENSE.md for more details
+
--- /dev/null
+// Package set provides both threadsafe and non-threadsafe implementations of
+// a generic set data structure. In the threadsafe set, safety encompasses all
+// operations on one set. Operations on multiple sets are consistent in that
+// the elements of each set used was valid at exactly one point in time
+// between the start and the end of the operation.
+package set
+
+// Interface is describing a Set. Sets are an unordered, unique list of values.
+type Interface interface {
+ New(items ...interface{}) Interface
+ Add(items ...interface{})
+ Remove(items ...interface{})
+ Pop() interface{}
+ Has(items ...interface{}) bool
+ Size() int
+ Clear()
+ IsEmpty() bool
+ IsEqual(s Interface) bool
+ IsSubset(s Interface) bool
+ IsSuperset(s Interface) bool
+ Each(func(interface{}) bool)
+ String() string
+ List() []interface{}
+ Copy() Interface
+ Merge(s Interface)
+ Separate(s Interface)
+}
+
+// helpful to not write everywhere struct{}{}
+var keyExists = struct{}{}
+
+// Union is the merger of multiple sets. It returns a new set with all the
+// elements present in all the sets that are passed.
+//
+// The dynamic type of the returned set is determined by the first passed set's
+// implementation of the New() method.
+func Union(set1, set2 Interface, sets ...Interface) Interface {
+ u := set1.Copy()
+ set2.Each(func(item interface{}) bool {
+ u.Add(item)
+ return true
+ })
+ for _, set := range sets {
+ set.Each(func(item interface{}) bool {
+ u.Add(item)
+ return true
+ })
+ }
+
+ return u
+}
+
+// Difference returns a new set which contains items which are in in the first
+// set but not in the others. Unlike the Difference() method you can use this
+// function separately with multiple sets.
+func Difference(set1, set2 Interface, sets ...Interface) Interface {
+ s := set1.Copy()
+ s.Separate(set2)
+ for _, set := range sets {
+ s.Separate(set) // seperate is thread safe
+ }
+ return s
+}
+
+// Intersection returns a new set which contains items that only exist in all given sets.
+func Intersection(set1, set2 Interface, sets ...Interface) Interface {
+ all := Union(set1, set2, sets...)
+ result := Union(set1, set2, sets...)
+
+ all.Each(func(item interface{}) bool {
+ if !set1.Has(item) || !set2.Has(item) {
+ result.Remove(item)
+ }
+
+ for _, set := range sets {
+ if !set.Has(item) {
+ result.Remove(item)
+ }
+ }
+ return true
+ })
+ return result
+}
+
+// SymmetricDifference returns a new set which s is the difference of items which are in
+// one of either, but not in both.
+func SymmetricDifference(s Interface, t Interface) Interface {
+ u := Difference(s, t)
+ v := Difference(t, s)
+ return Union(u, v)
+}
+
+// StringSlice is a helper function that returns a slice of strings of s. If
+// the set contains mixed types of items only items of type string are returned.
+func StringSlice(s Interface) []string {
+ slice := make([]string, 0)
+ for _, item := range s.List() {
+ v, ok := item.(string)
+ if !ok {
+ continue
+ }
+
+ slice = append(slice, v)
+ }
+ return slice
+}
+
+// IntSlice is a helper function that returns a slice of ints of s. If
+// the set contains mixed types of items only items of type int are returned.
+func IntSlice(s Interface) []int {
+ slice := make([]int, 0)
+ for _, item := range s.List() {
+ v, ok := item.(int)
+ if !ok {
+ continue
+ }
+
+ slice = append(slice, v)
+ }
+ return slice
+}
--- /dev/null
+package set
+
+import (
+ "fmt"
+ "strings"
+)
+
+// Provides a common set baseline for both threadsafe and non-ts Sets.
+type set struct {
+ m map[interface{}]struct{} // struct{} doesn't take up space
+}
+
+// SetNonTS defines a non-thread safe set data structure.
+type SetNonTS struct {
+ set
+}
+
+// NewNonTS creates and initialize a new non-threadsafe Set.
+// It accepts a variable number of arguments to populate the initial set.
+// If nothing is passed a SetNonTS with zero size is created.
+func NewNonTS(items ...interface{}) *SetNonTS {
+ s := &SetNonTS{}
+ s.m = make(map[interface{}]struct{})
+
+ // Ensure interface compliance
+ var _ Interface = s
+
+ s.Add(items...)
+ return s
+}
+
+// New creates and initalizes a new Set interface. It accepts a variable
+// number of arguments to populate the initial set. If nothing is passed a
+// zero size Set based on the struct is created.
+func (s *set) New(items ...interface{}) Interface {
+ return NewNonTS(items...)
+}
+
+// Add includes the specified items (one or more) to the set. The underlying
+// Set s is modified. If passed nothing it silently returns.
+func (s *set) Add(items ...interface{}) {
+ if len(items) == 0 {
+ return
+ }
+
+ for _, item := range items {
+ s.m[item] = keyExists
+ }
+}
+
+// Remove deletes the specified items from the set. The underlying Set s is
+// modified. If passed nothing it silently returns.
+func (s *set) Remove(items ...interface{}) {
+ if len(items) == 0 {
+ return
+ }
+
+ for _, item := range items {
+ delete(s.m, item)
+ }
+}
+
+// Pop deletes and return an item from the set. The underlying Set s is
+// modified. If set is empty, nil is returned.
+func (s *set) Pop() interface{} {
+ for item := range s.m {
+ delete(s.m, item)
+ return item
+ }
+ return nil
+}
+
+// Has looks for the existence of items passed. It returns false if nothing is
+// passed. For multiple items it returns true only if all of the items exist.
+func (s *set) Has(items ...interface{}) bool {
+ // assume checked for empty item, which not exist
+ if len(items) == 0 {
+ return false
+ }
+
+ has := true
+ for _, item := range items {
+ if _, has = s.m[item]; !has {
+ break
+ }
+ }
+ return has
+}
+
+// Size returns the number of items in a set.
+func (s *set) Size() int {
+ return len(s.m)
+}
+
+// Clear removes all items from the set.
+func (s *set) Clear() {
+ s.m = make(map[interface{}]struct{})
+}
+
+// IsEmpty reports whether the Set is empty.
+func (s *set) IsEmpty() bool {
+ return s.Size() == 0
+}
+
+// IsEqual test whether s and t are the same in size and have the same items.
+func (s *set) IsEqual(t Interface) bool {
+ // Force locking only if given set is threadsafe.
+ if conv, ok := t.(*Set); ok {
+ conv.l.RLock()
+ defer conv.l.RUnlock()
+ }
+
+ // return false if they are no the same size
+ if sameSize := len(s.m) == t.Size(); !sameSize {
+ return false
+ }
+
+ equal := true
+ t.Each(func(item interface{}) bool {
+ _, equal = s.m[item]
+ return equal // if false, Each() will end
+ })
+
+ return equal
+}
+
+// IsSubset tests whether t is a subset of s.
+func (s *set) IsSubset(t Interface) (subset bool) {
+ subset = true
+
+ t.Each(func(item interface{}) bool {
+ _, subset = s.m[item]
+ return subset
+ })
+
+ return
+}
+
+// IsSuperset tests whether t is a superset of s.
+func (s *set) IsSuperset(t Interface) bool {
+ return t.IsSubset(s)
+}
+
+// Each traverses the items in the Set, calling the provided function for each
+// set member. Traversal will continue until all items in the Set have been
+// visited, or if the closure returns false.
+func (s *set) Each(f func(item interface{}) bool) {
+ for item := range s.m {
+ if !f(item) {
+ break
+ }
+ }
+}
+
+// String returns a string representation of s
+func (s *set) String() string {
+ t := make([]string, 0, len(s.List()))
+ for _, item := range s.List() {
+ t = append(t, fmt.Sprintf("%v", item))
+ }
+
+ return fmt.Sprintf("[%s]", strings.Join(t, ", "))
+}
+
+// List returns a slice of all items. There is also StringSlice() and
+// IntSlice() methods for returning slices of type string or int.
+func (s *set) List() []interface{} {
+ list := make([]interface{}, 0, len(s.m))
+
+ for item := range s.m {
+ list = append(list, item)
+ }
+
+ return list
+}
+
+// Copy returns a new Set with a copy of s.
+func (s *set) Copy() Interface {
+ return NewNonTS(s.List()...)
+}
+
+// Merge is like Union, however it modifies the current set it's applied on
+// with the given t set.
+func (s *set) Merge(t Interface) {
+ t.Each(func(item interface{}) bool {
+ s.m[item] = keyExists
+ return true
+ })
+}
+
+// it's not the opposite of Merge.
+// Separate removes the set items containing in t from set s. Please aware that
+func (s *set) Separate(t Interface) {
+ s.Remove(t.List()...)
+}
--- /dev/null
+package set
+
+import (
+ "sync"
+)
+
+// Set defines a thread safe set data structure.
+type Set struct {
+ set
+ l sync.RWMutex // we name it because we don't want to expose it
+}
+
+// New creates and initialize a new Set. It's accept a variable number of
+// arguments to populate the initial set. If nothing passed a Set with zero
+// size is created.
+func New(items ...interface{}) *Set {
+ s := &Set{}
+ s.m = make(map[interface{}]struct{})
+
+ // Ensure interface compliance
+ var _ Interface = s
+
+ s.Add(items...)
+ return s
+}
+
+// New creates and initalizes a new Set interface. It accepts a variable
+// number of arguments to populate the initial set. If nothing is passed a
+// zero size Set based on the struct is created.
+func (s *Set) New(items ...interface{}) Interface {
+ return New(items...)
+}
+
+// Add includes the specified items (one or more) to the set. The underlying
+// Set s is modified. If passed nothing it silently returns.
+func (s *Set) Add(items ...interface{}) {
+ if len(items) == 0 {
+ return
+ }
+
+ s.l.Lock()
+ defer s.l.Unlock()
+
+ for _, item := range items {
+ s.m[item] = keyExists
+ }
+}
+
+// Remove deletes the specified items from the set. The underlying Set s is
+// modified. If passed nothing it silently returns.
+func (s *Set) Remove(items ...interface{}) {
+ if len(items) == 0 {
+ return
+ }
+
+ s.l.Lock()
+ defer s.l.Unlock()
+
+ for _, item := range items {
+ delete(s.m, item)
+ }
+}
+
+// Pop deletes and return an item from the set. The underlying Set s is
+// modified. If set is empty, nil is returned.
+func (s *Set) Pop() interface{} {
+ s.l.RLock()
+ for item := range s.m {
+ s.l.RUnlock()
+ s.l.Lock()
+ delete(s.m, item)
+ s.l.Unlock()
+ return item
+ }
+ s.l.RUnlock()
+ return nil
+}
+
+// Has looks for the existence of items passed. It returns false if nothing is
+// passed. For multiple items it returns true only if all of the items exist.
+func (s *Set) Has(items ...interface{}) bool {
+ // assume checked for empty item, which not exist
+ if len(items) == 0 {
+ return false
+ }
+
+ s.l.RLock()
+ defer s.l.RUnlock()
+
+ has := true
+ for _, item := range items {
+ if _, has = s.m[item]; !has {
+ break
+ }
+ }
+ return has
+}
+
+// Size returns the number of items in a set.
+func (s *Set) Size() int {
+ s.l.RLock()
+ defer s.l.RUnlock()
+
+ l := len(s.m)
+ return l
+}
+
+// Clear removes all items from the set.
+func (s *Set) Clear() {
+ s.l.Lock()
+ defer s.l.Unlock()
+
+ s.m = make(map[interface{}]struct{})
+}
+
+// IsEqual test whether s and t are the same in size and have the same items.
+func (s *Set) IsEqual(t Interface) bool {
+ s.l.RLock()
+ defer s.l.RUnlock()
+
+ // Force locking only if given set is threadsafe.
+ if conv, ok := t.(*Set); ok {
+ conv.l.RLock()
+ defer conv.l.RUnlock()
+ }
+
+ // return false if they are no the same size
+ if sameSize := len(s.m) == t.Size(); !sameSize {
+ return false
+ }
+
+ equal := true
+ t.Each(func(item interface{}) bool {
+ _, equal = s.m[item]
+ return equal // if false, Each() will end
+ })
+
+ return equal
+}
+
+// IsSubset tests whether t is a subset of s.
+func (s *Set) IsSubset(t Interface) (subset bool) {
+ s.l.RLock()
+ defer s.l.RUnlock()
+
+ subset = true
+
+ t.Each(func(item interface{}) bool {
+ _, subset = s.m[item]
+ return subset
+ })
+
+ return
+}
+
+// Each traverses the items in the Set, calling the provided function for each
+// set member. Traversal will continue until all items in the Set have been
+// visited, or if the closure returns false.
+func (s *Set) Each(f func(item interface{}) bool) {
+ s.l.RLock()
+ defer s.l.RUnlock()
+
+ for item := range s.m {
+ if !f(item) {
+ break
+ }
+ }
+}
+
+// List returns a slice of all items. There is also StringSlice() and
+// IntSlice() methods for returning slices of type string or int.
+func (s *Set) List() []interface{} {
+ s.l.RLock()
+ defer s.l.RUnlock()
+
+ list := make([]interface{}, 0, len(s.m))
+
+ for item := range s.m {
+ list = append(list, item)
+ }
+
+ return list
+}
+
+// Copy returns a new Set with a copy of s.
+func (s *Set) Copy() Interface {
+ return New(s.List()...)
+}
+
+// Merge is like Union, however it modifies the current set it's applied on
+// with the given t set.
+func (s *Set) Merge(t Interface) {
+ s.l.Lock()
+ defer s.l.Unlock()
+
+ t.Each(func(item interface{}) bool {
+ s.m[item] = keyExists
+ return true
+ })
+}
--- /dev/null
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
+// Package prque implements a priority queue data structure supporting arbitrary
+// value types and float priorities.
+//
+// The reasoning behind using floats for the priorities vs. ints or interfaces
+// was larger flexibility without sacrificing too much performance or code
+// complexity.
+//
+// If you would like to use a min-priority queue, simply negate the priorities.
+//
+// Internally the queue is based on the standard heap package working on a
+// sortable version of the block based stack.
+package prque
+
+import (
+ "container/heap"
+)
+
+// Priority queue data structure.
+type Prque struct {
+ cont *sstack
+}
+
+// Creates a new priority queue.
+func New() *Prque {
+ return &Prque{newSstack()}
+}
+
+// Pushes a value with a given priority into the queue, expanding if necessary.
+func (p *Prque) Push(data interface{}, priority float32) {
+ heap.Push(p.cont, &item{data, priority})
+}
+
+// Pops the value with the greates priority off the stack and returns it.
+// Currently no shrinking is done.
+func (p *Prque) Pop() (interface{}, float32) {
+ item := heap.Pop(p.cont).(*item)
+ return item.value, item.priority
+}
+
+// Pops only the item from the queue, dropping the associated priority value.
+func (p *Prque) PopItem() interface{} {
+ return heap.Pop(p.cont).(*item).value
+}
+
+// Checks whether the priority queue is empty.
+func (p *Prque) Empty() bool {
+ return p.cont.Len() == 0
+}
+
+// Returns the number of element in the priority queue.
+func (p *Prque) Size() int {
+ return p.cont.Len()
+}
+
+// Clears the contents of the priority queue.
+func (p *Prque) Reset() {
+ *p = *New()
+}
--- /dev/null
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
+package prque
+
+// The size of a block of data
+const blockSize = 4096
+
+// A prioritized item in the sorted stack.
+type item struct {
+ value interface{}
+ priority float32
+}
+
+// Internal sortable stack data structure. Implements the Push and Pop ops for
+// the stack (heap) functionality and the Len, Less and Swap methods for the
+// sortability requirements of the heaps.
+type sstack struct {
+ size int
+ capacity int
+ offset int
+
+ blocks [][]*item
+ active []*item
+}
+
+// Creates a new, empty stack.
+func newSstack() *sstack {
+ result := new(sstack)
+ result.active = make([]*item, blockSize)
+ result.blocks = [][]*item{result.active}
+ result.capacity = blockSize
+ return result
+}
+
+// Pushes a value onto the stack, expanding it if necessary. Required by
+// heap.Interface.
+func (s *sstack) Push(data interface{}) {
+ if s.size == s.capacity {
+ s.active = make([]*item, blockSize)
+ s.blocks = append(s.blocks, s.active)
+ s.capacity += blockSize
+ s.offset = 0
+ } else if s.offset == blockSize {
+ s.active = s.blocks[s.size/blockSize]
+ s.offset = 0
+ }
+ s.active[s.offset] = data.(*item)
+ s.offset++
+ s.size++
+}
+
+// Pops a value off the stack and returns it. Currently no shrinking is done.
+// Required by heap.Interface.
+func (s *sstack) Pop() (res interface{}) {
+ s.size--
+ s.offset--
+ if s.offset < 0 {
+ s.offset = blockSize - 1
+ s.active = s.blocks[s.size/blockSize]
+ }
+ res, s.active[s.offset] = s.active[s.offset], nil
+ return
+}
+
+// Returns the length of the stack. Required by sort.Interface.
+func (s *sstack) Len() int {
+ return s.size
+}
+
+// Compares the priority of two elements of the stack (higher is first).
+// Required by sort.Interface.
+func (s *sstack) Less(i, j int) bool {
+ return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority
+}
+
+// Swaps two elements in the stack. Required by sort.Interface.
+func (s *sstack) Swap(i, j int) {
+ ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
+ s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io]
+}
+
+// Resets the stack, effectively clearing its contents.
+func (s *sstack) Reset() {
+ *s = *newSstack()
+}