package api
import (
- "net"
"context"
+ "net"
- "github.com/bytom/version"
- "github.com/bytom/netsync"
"github.com/bytom/errors"
+ "github.com/bytom/netsync"
"github.com/bytom/p2p"
+ "github.com/bytom/version"
)
// NetInfo indicate net information
func (a *API) GetNodeInfo() *NetInfo {
info := &NetInfo{
Listening: a.sync.Switch().IsListening(),
- Syncing: a.sync.BlockKeeper().IsCaughtUp(),
+ Syncing: !a.sync.IsCaughtUp(),
Mining: a.cpuMiner.IsMining(),
PeerCount: len(a.sync.Switch().Peers().List()),
CurrentBlock: a.chain.BestBlockHeight(),
NetWorkID: a.sync.NodeInfo().Network,
Version: version.Version,
}
- _, info.HighestBlock = a.sync.Peers().BestPeer()
+ if bestPeer := a.sync.BestPeer(); bestPeer != nil {
+ info.HighestBlock = bestPeer.Height
+ }
if info.CurrentBlock > info.HighestBlock {
info.HighestBlock = info.CurrentBlock
}
return info
}
-
// return the currently connected peers with net address
func (a *API) getPeerInfoByAddr(addr string) *netsync.PeerInfo {
- peerInfos := a.sync.Peers().GetPeerInfos()
+ peerInfos := a.sync.GetPeerInfos()
for _, peerInfo := range peerInfos {
if peerInfo.RemoteAddr == addr {
return peerInfo
}
// disconnect peer by the peer id
-func (a *API) disconnectPeerById(peerId string) error {
- if peer, ok := a.sync.Peers().Peer(peerId); ok {
- swPeer := peer.GetPeer()
- a.sync.Switch().StopPeerGracefully(swPeer)
- return nil
- }
- return errors.New("peerId not exist")
+func (a *API) disconnectPeerById(peerID string) error {
+ return a.sync.StopPeer(peerID)
}
// connect peer b y net address
// return the peers of current node
func (a *API) listPeers() Response {
- return NewSuccessResponse(a.sync.Peers().GetPeerInfos())
+ return NewSuccessResponse(a.sync.GetPeerInfos())
}
// disconnect peer
h.SetBytes(b)
return h
}
+
func StringToHash(s string) Hash { return BytesToHash([]byte(s)) }
func BigToHash(b *big.Int) Hash { return BytesToHash(b.Bytes()) }
func HexToHash(s string) Hash { return BytesToHash(FromHex(s)) }
var TestNetParams = Params{
Name: "test",
Bech32HRPSegwit: "tm",
- Checkpoints: []Checkpoint{},
+ Checkpoints: []Checkpoint{
+ {1000, bc.NewHash([32]byte{0x28, 0x29, 0xc9, 0xe2, 0x19, 0x0f, 0xfe, 0xb6, 0xf2, 0x73, 0xde, 0x1a, 0xe8, 0x1f, 0xa6, 0xbc, 0x15, 0xaa, 0x08, 0xa9, 0xb8, 0x4c, 0x43, 0x25, 0x9a, 0xa0, 0x24, 0x8a, 0xd8, 0x55, 0x73, 0xca})},
+ {1500, bc.NewHash([32]byte{0xc2, 0x94, 0x02, 0xd1, 0x6c, 0xa8, 0x18, 0xc4, 0x95, 0x67, 0x48, 0xb8, 0xa8, 0x42, 0xa2, 0x69, 0x8e, 0xdd, 0xb2, 0xa9, 0xb6, 0xd9, 0x30, 0xf4, 0x12, 0xdb, 0x0f, 0x1e, 0x58, 0xc9, 0x69, 0x3b})},
+ {10303, bc.NewHash([32]byte{0x3e, 0x94, 0x5d, 0x35, 0x70, 0x30, 0xd4, 0x3b, 0x3d, 0xe3, 0xdd, 0x80, 0x67, 0x29, 0x9a, 0x5e, 0x09, 0xf9, 0xfb, 0x2b, 0xad, 0x5f, 0x92, 0xc8, 0x69, 0xd1, 0x42, 0x39, 0x74, 0x9a, 0xd1, 0x1c})},
+ },
}
// SoloNetParams is the config for test-net
b.Transactions = []*types.Tx{nil}
txs := txPool.GetTransactions()
- sort.Sort(ByTime(txs))
+ sort.Sort(byTime(txs))
for _, txDesc := range txs {
tx := txDesc.Tx.Tx
gasOnlyTx := false
import "github.com/bytom/protocol"
-type ByTime []*protocol.TxDesc
+type byTime []*protocol.TxDesc
-func (a ByTime) Len() int { return len(a) }
-func (a ByTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a ByTime) Less(i, j int) bool { return a[i].Added.Unix() < a[j].Added.Unix() }
+func (a byTime) Len() int { return len(a) }
+func (a byTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a byTime) Less(i, j int) bool { return a[i].Added.Unix() < a[j].Added.Unix() }
a.lruCache.Add(*key, result)
}
+// RemoveCache clean the cached result
+func (a *Cache) RemoveCache(hash, seed *bc.Hash) {
+ key := calcCacheKey(hash, seed)
+ a.lruCache.Remove(key)
+}
+
// Hash is the real entry for call tensority algorithm
func (a *Cache) Hash(hash, seed *bc.Hash) *bc.Hash {
key := calcCacheKey(hash, seed)
--- /dev/null
+package netsync
+
+import (
+ log "github.com/sirupsen/logrus"
+ "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+
+ "github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
+)
+
+const (
+ maxBlockDistance = 64
+ maxMsgSetSize = 128
+ newBlockChSize = 64
+)
+
+// blockFetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type blockFetcher struct {
+ chain *protocol.Chain
+ peers *peerSet
+
+ newBlockCh chan *blockMsg
+ queue *prque.Prque
+ msgSet map[bc.Hash]*blockMsg
+}
+
+//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
+func newBlockFetcher(chain *protocol.Chain, peers *peerSet) *blockFetcher {
+ f := &blockFetcher{
+ chain: chain,
+ peers: peers,
+ newBlockCh: make(chan *blockMsg, newBlockChSize),
+ queue: prque.New(),
+ msgSet: make(map[bc.Hash]*blockMsg),
+ }
+ go f.blockProcessor()
+ return f
+}
+
+func (f *blockFetcher) blockProcessor() {
+ for {
+ height := f.chain.BestBlockHeight()
+ for !f.queue.Empty() {
+ msg := f.queue.PopItem().(*blockMsg)
+ if msg.block.Height > height+1 {
+ f.queue.Push(msg, -float32(msg.block.Height))
+ break
+ }
+
+ f.insert(msg)
+ delete(f.msgSet, msg.block.Hash())
+ }
+ f.add(<-f.newBlockCh)
+ }
+}
+
+func (f *blockFetcher) add(msg *blockMsg) {
+ bestHeight := f.chain.BestBlockHeight()
+ if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+ return
+ }
+
+ blockHash := msg.block.Hash()
+ if _, ok := f.msgSet[blockHash]; !ok {
+ f.msgSet[blockHash] = msg
+ f.queue.Push(msg, -float32(msg.block.Height))
+ log.WithField("block height", msg.block.Height).Debug("fetcher receive mine block")
+ }
+}
+
+func (f *blockFetcher) insert(msg *blockMsg) {
+ if _, err := f.chain.ProcessBlock(msg.block); err != nil {
+ peer := f.peers.getPeer(msg.peerID)
+ if peer == nil {
+ return
+ }
+
+ f.peers.addBanScore(msg.peerID, 20, 0, err.Error())
+ return
+ }
+
+ if err := f.peers.broadcastMinedBlock(msg.block); err != nil {
+ log.WithField("err", err).Error("fail on fetcher broadcast new block")
+ return
+ }
+}
+
+func (f *blockFetcher) processNewBlock(msg *blockMsg) {
+ f.newBlockCh <- msg
+}
package netsync
import (
+ "container/list"
"time"
log "github.com/sirupsen/logrus"
"github.com/bytom/consensus"
"github.com/bytom/errors"
- "github.com/bytom/p2p"
+ "github.com/bytom/mining/tensority"
"github.com/bytom/protocol"
+ "github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
)
const (
- maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
- maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
-
- syncTimeout = 30 * time.Second
- requestRetryTicker = 15 * time.Second
-
- maxBlocksPending = 1024
- maxtxsPending = 32768
- maxQuitReq = 256
-
- maxTxChanSize = 10000 // txChanSize is the size of channel listening to Txpool newTxCh
+ syncTimeout = 30 * time.Second
+ syncCycle = 5 * time.Second
+ blockProcessChSize = 1024
+ blocksProcessChSize = 128
+ headersProcessChSize = 1024
+ maxBlockPerMsg = 128
+ maxBlockHeadersPerMsg = 2048
)
var (
- errGetBlockTimeout = errors.New("Get block Timeout")
- errPeerDropped = errors.New("Peer dropped")
- errGetBlockByHash = errors.New("Get block by hash error")
- errBroadcastStatus = errors.New("Broadcast new status block error")
- errReqBlock = errors.New("Request block error")
- errPeerNotRegister = errors.New("peer is not registered")
+ errRequestTimeout = errors.New("request timeout")
+ errPeerDropped = errors.New("Peer dropped")
+ errPeerMisbehave = errors.New("peer is misbehave")
)
-//TODO: add retry mechanism
+type blockMsg struct {
+ block *types.Block
+ peerID string
+}
+
+type blocksMsg struct {
+ blocks []*types.Block
+ peerID string
+}
+
+type headersMsg struct {
+ headers []*types.BlockHeader
+ peerID string
+}
+
type blockKeeper struct {
chain *protocol.Chain
- sw *p2p.Switch
peers *peerSet
- pendingProcessCh chan *blockPending
- txsProcessCh chan *txsNotify
- quitReqBlockCh chan *string
+ syncPeer *peer
+ blockProcessCh chan *blockMsg
+ blocksProcessCh chan *blocksMsg
+ headersProcessCh chan *headersMsg
+
+ headerList *list.List
}
-func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
+func newBlockKeeper(chain *protocol.Chain, peers *peerSet) *blockKeeper {
bk := &blockKeeper{
chain: chain,
- sw: sw,
peers: peers,
- pendingProcessCh: make(chan *blockPending, maxBlocksPending),
- txsProcessCh: make(chan *txsNotify, maxtxsPending),
- quitReqBlockCh: quitReqBlockCh,
+ blockProcessCh: make(chan *blockMsg, blockProcessChSize),
+ blocksProcessCh: make(chan *blocksMsg, blocksProcessChSize),
+ headersProcessCh: make(chan *headersMsg, headersProcessChSize),
+ headerList: list.New(),
}
- go bk.txsProcessWorker()
+ bk.resetHeaderState()
+ go bk.syncWorker()
return bk
}
-func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
- bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
+func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
+ for _, header := range headers {
+ prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+ if prevHeader.Hash() != header.PreviousBlockHash {
+ return errors.New("fail to append list due to order dismatch")
+ }
+ bk.headerList.PushBack(header)
+ }
+ return nil
}
-func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
- bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
-}
+func (bk *blockKeeper) blockLocator() []*bc.Hash {
+ header := bk.chain.BestBlockHeader()
+ locator := []*bc.Hash{}
-func (bk *blockKeeper) IsCaughtUp() bool {
- _, height := bk.peers.BestPeer()
- return bk.chain.BestBlockHeight() < height
-}
+ step := uint64(1)
+ for header != nil {
+ headerHash := header.Hash()
+ locator = append(locator, &headerHash)
+ if header.Height == 0 {
+ break
+ }
-func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
- num := bk.chain.BestBlockHeight() + 1
- currentHash := bk.chain.BestBlockHash()
- orphanNum := uint64(0)
- reqNum := uint64(0)
- isOrphan := false
- bkPeer, ok := bk.peers.Peer(peerID)
- if !ok {
- log.Info("peer is not registered")
- return errPeerNotRegister
- }
- swPeer := bkPeer.GetPeer()
- for 0 < num && num <= maxPeerHeight {
- if isOrphan {
- reqNum = orphanNum
+ var err error
+ if header.Height < step {
+ header, err = bk.chain.GetHeaderByHeight(0)
} else {
- reqNum = num
- }
- block, err := bk.BlockRequest(peerID, reqNum)
- if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
- log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
- if bkPeer == nil {
- log.Info("peer is not registered")
- break
- }
- log.Info("Block keeper request block error. Stop peer.")
- bk.sw.StopPeerGracefully(swPeer)
- break
+ header, err = bk.chain.GetHeaderByHeight(header.Height - step)
}
- isOrphan, err = bk.chain.ProcessBlock(block)
if err != nil {
- if bkPeer == nil {
- log.Info("peer is deleted")
- break
- }
- if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
- bk.sw.AddBannedPeer(swPeer)
- bk.sw.StopPeerGracefully(swPeer)
- }
- log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
+ log.WithField("err", err).Error("blockKeeper fail on get blockLocator")
break
}
- if isOrphan {
- orphanNum = block.Height - 1
- continue
+
+ if len(locator) > 10 {
+ step *= 2
}
- num++
}
- bestHash := bk.chain.BestBlockHash()
- log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
- if currentHash.String() != bestHash.String() {
- log.Info("Broadcast new chain status.")
+ return locator
+}
- block, err := bk.chain.GetBlockByHash(bestHash)
+func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
+ bk.resetHeaderState()
+ lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+ for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
+ if lastHeader.Height >= checkPoint.Height {
+ return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
+ }
+
+ lastHash := lastHeader.Hash()
+ headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
if err != nil {
- log.Errorf("Failed on sync complete broadcast status get block %v", err)
- return errGetBlockByHash
+ return err
+ }
+
+ if len(headers) == 0 {
+ return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
}
- peers, err := bk.peers.BroadcastNewStatus(block)
+ if err := bk.appendHeaderList(headers); err != nil {
+ return err
+ }
+ }
+
+ fastHeader := bk.headerList.Front()
+ for bk.chain.BestBlockHeight() < checkPoint.Height {
+ locator := bk.blockLocator()
+ blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
if err != nil {
- log.Errorf("Failed on broadcast new status block %v", err)
- return errBroadcastStatus
+ return err
+ }
+
+ if len(blocks) == 0 {
+ return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
}
- for _, peer := range peers {
- if peer == nil {
- return errPeerNotRegister
+
+ for _, block := range blocks {
+ if fastHeader = fastHeader.Next(); fastHeader == nil {
+ return errors.New("get block than is higher than checkpoint")
+ }
+
+ blockHash := block.Hash()
+ if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
+ return errPeerMisbehave
+ }
+
+ seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
+ if err != nil {
+ return errors.Wrap(err, "fail on fastBlockSync calculate seed")
+ }
+
+ tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
+ _, err = bk.chain.ProcessBlock(block)
+ tensority.AIHash.RemoveCache(&blockHash, seed)
+ if err != nil {
+ return errors.Wrap(err, "fail on fastBlockSync process block")
}
- swPeer := peer.GetPeer()
- log.Info("Block keeper broadcast block error. Stop peer.")
- bk.sw.StopPeerGracefully(swPeer)
}
}
return nil
}
-func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
- return bk.peers.requestBlockByHeight(peerID, height)
+func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+ headers, err := bk.locateHeaders(locator, stopHash)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks := []*types.Block{}
+ for i, header := range headers {
+ if i > maxBlockPerMsg {
+ break
+ }
+
+ headerHash := header.Hash()
+ block, err := bk.chain.GetBlockByHash(&headerHash)
+ if err != nil {
+ return nil, err
+ }
+
+ blocks = append(blocks, block)
+ }
+ return blocks, nil
+}
+
+func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+ stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
+ if err != nil {
+ return nil, err
+ }
+
+ startHeader, err := bk.chain.GetHeaderByHeight(0)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, hash := range locator {
+ header, err := bk.chain.GetHeaderByHash(hash)
+ if err == nil && bk.chain.InMainChain(header.Hash()) {
+ startHeader = header
+ break
+ }
+ }
+
+ totalHeaders := stopHeader.Height - startHeader.Height
+ if totalHeaders > maxBlockHeadersPerMsg {
+ totalHeaders = maxBlockHeadersPerMsg
+ }
+
+ headers := []*types.BlockHeader{}
+ for i := uint64(1); i <= totalHeaders; i++ {
+ header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
+ if err != nil {
+ return nil, err
+ }
+
+ headers = append(headers, header)
+ }
+ return headers, nil
}
func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
return nextCheckpoint
}
-func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
- var block *types.Block
+func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
+ bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
+}
+
+func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
+ bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
+}
+
+func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
+ bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
+}
+
+func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
+ i := bk.chain.BestBlockHeight() + 1
+ for i <= wantHeight {
+ block, err := bk.requireBlock(i)
+ if err != nil {
+ return err
+ }
- if err := bk.blockRequest(peerID, height); err != nil {
- return nil, errReqBlock
+ isOrphan, err := bk.chain.ProcessBlock(block)
+ if err != nil {
+ return err
+ }
+
+ if isOrphan {
+ i--
+ continue
+ }
+ i = bk.chain.BestBlockHeight() + 1
}
- retryTicker := time.Tick(requestRetryTicker)
- syncWait := time.NewTimer(syncTimeout)
+ return nil
+}
+func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
+ if ok := bk.syncPeer.getBlockByHeight(height); !ok {
+ return nil, errPeerDropped
+ }
+
+ waitTicker := time.NewTimer(syncTimeout)
for {
select {
- case pendingResponse := <-bk.pendingProcessCh:
- block = pendingResponse.block
- if pendingResponse.peerID != peerID {
- log.Warning("From different peer")
+ case msg := <-bk.blockProcessCh:
+ if msg.peerID != bk.syncPeer.ID() {
continue
}
- if block.Height != height {
- log.Warning("Block height error")
+ if msg.block.Height != height {
continue
}
- return block, nil
- case <-retryTicker:
- if err := bk.blockRequest(peerID, height); err != nil {
- return nil, errReqBlock
- }
- case <-syncWait.C:
- log.Warning("Request block timeout")
- return nil, errGetBlockTimeout
- case peerid := <-bk.quitReqBlockCh:
- if *peerid == peerID {
- log.Info("Quite block request worker")
- return nil, errPeerDropped
+ return msg.block, nil
+ case <-waitTicker.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireBlock")
+ }
+ }
+}
+
+func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+ if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
+ return nil, errPeerDropped
+ }
+
+ waitTicker := time.NewTimer(syncTimeout)
+ for {
+ select {
+ case msg := <-bk.blocksProcessCh:
+ if msg.peerID != bk.syncPeer.ID() {
+ continue
}
+ return msg.blocks, nil
+ case <-waitTicker.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
}
}
}
-func (bk *blockKeeper) txsProcessWorker() {
- for txsResponse := range bk.txsProcessCh {
- tx := txsResponse.tx
- log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
- bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
- if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
- if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
- swPeer := bkPeer.GetPeer()
- if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
- bk.sw.AddBannedPeer(swPeer)
- bk.sw.StopPeerGracefully(swPeer)
- }
+func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+ if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
+ return nil, errPeerDropped
+ }
+
+ waitTicker := time.NewTimer(syncTimeout)
+ for {
+ select {
+ case msg := <-bk.headersProcessCh:
+ if msg.peerID != bk.syncPeer.ID() {
+ continue
}
+ return msg.headers, nil
+ case <-waitTicker.C:
+ return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
+ }
+ }
+}
+
+// resetHeaderState sets the headers-first mode state to values appropriate for
+// syncing from a new peer.
+func (bk *blockKeeper) resetHeaderState() {
+ header := bk.chain.BestBlockHeader()
+ bk.headerList.Init()
+ if bk.nextCheckpoint() != nil {
+ bk.headerList.PushBack(header)
+ }
+}
+
+func (bk *blockKeeper) startSync() bool {
+ checkPoint := bk.nextCheckpoint()
+ peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
+ if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
+ bk.syncPeer = peer
+ if err := bk.fastBlockSync(checkPoint); err != nil {
+ log.WithField("err", err).Warning("fail on fastBlockSync")
+ bk.peers.addBanScore(peer.ID(), 0, 40, err.Error())
+ return false
+ }
+ return true
+ }
+
+ peer = bk.peers.bestPeer(consensus.SFFullNode)
+ if peer != nil && peer.Height() > bk.chain.BestBlockHeight() {
+ bk.syncPeer = peer
+ if err := bk.regularBlockSync(peer.Height()); err != nil {
+ log.WithField("err", err).Warning("fail on regularBlockSync")
+ bk.peers.addBanScore(peer.ID(), 0, 40, err.Error())
+ return false
+ }
+ return true
+ }
+ return false
+}
+
+func (bk *blockKeeper) syncWorker() {
+ syncTicker := time.NewTicker(syncCycle)
+ for {
+ <-syncTicker.C
+ if update := bk.startSync(); !update {
+ continue
+ }
+
+ block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
+ if err != nil {
+ log.WithField("err", err).Error("fail on syncWorker get best block")
+ }
+
+ if err := bk.peers.broadcastMinedBlock(block); err != nil {
+ log.WithField("err", err).Error("fail on syncWorker broadcast new block")
}
}
}
+++ /dev/null
-package netsync
-
-import (
- "errors"
-
- log "github.com/sirupsen/logrus"
- "gopkg.in/karalabe/cookiejar.v2/collections/prque"
-
- "github.com/bytom/p2p"
- core "github.com/bytom/protocol"
- "github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/types"
-)
-
-const (
- maxQueueDist = 1024 //32 // Maximum allowed distance from the chain head to queue
-)
-
-var (
- errTerminated = errors.New("terminated")
-)
-
-// Fetcher is responsible for accumulating block announcements from various peers
-// and scheduling them for retrieval.
-type Fetcher struct {
- chain *core.Chain
- sw *p2p.Switch
- peers *peerSet
-
- // Various event channels
- newMinedBlock chan *blockPending
- quit chan struct{}
-
- // Block cache
- queue *prque.Prque // Queue containing the import operations (block number sorted)
- queued map[bc.Hash]*blockPending // Set of already queued blocks (to dedup imports)
-}
-
-//NewFetcher New creates a block fetcher to retrieve blocks of the new mined.
-func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
- return &Fetcher{
- chain: chain,
- sw: sw,
- peers: peers,
- newMinedBlock: make(chan *blockPending),
- quit: make(chan struct{}),
- queue: prque.New(),
- queued: make(map[bc.Hash]*blockPending),
- }
-}
-
-// Start boots up the announcement based synchroniser, accepting and processing
-// hash notifications and block fetches until termination requested.
-func (f *Fetcher) Start() {
- go f.loop()
-}
-
-// Stop terminates the announcement based synchroniser, canceling all pending
-// operations.
-func (f *Fetcher) Stop() {
- close(f.quit)
-}
-
-// Enqueue tries to fill gaps the the fetcher's future import queue.
-func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
- op := &blockPending{
- peerID: peer,
- block: block,
- }
- select {
- case f.newMinedBlock <- op:
- return nil
- case <-f.quit:
- return errTerminated
- }
-}
-
-// Loop is the main fetcher loop, checking and processing various notification
-// events.
-func (f *Fetcher) loop() {
- for {
- // Import any queued blocks that could potentially fit
- height := f.chain.BestBlockHeight()
- for !f.queue.Empty() {
- op := f.queue.PopItem().(*blockPending)
- // If too high up the chain or phase, continue later
- number := op.block.Height
- if number > height+1 {
- f.queue.Push(op, -float32(op.block.Height))
- break
- }
- // Otherwise if fresh and still unknown, try and import
- hash := op.block.Hash()
- block, _ := f.chain.GetBlockByHash(&hash)
- if block != nil {
- f.forgetBlock(hash)
- continue
- }
- if op.block.PreviousBlockHash.String() != f.chain.BestBlockHash().String() {
- f.forgetBlock(hash)
- continue
- }
- f.insert(op.peerID, op.block)
- }
- // Wait for an outside event to occur
- select {
- case <-f.quit:
- // Fetcher terminating, abort all operations
- return
-
- case op := <-f.newMinedBlock:
- // A direct block insertion was requested, try and fill any pending gaps
- f.enqueue(op.peerID, op.block)
- }
- }
-}
-
-// enqueue schedules a new future import operation, if the block to be imported
-// has not yet been seen.
-func (f *Fetcher) enqueue(peer string, block *types.Block) {
- hash := block.Hash()
-
- //TODO: Ensure the peer isn't DOSing us
- // Discard any past or too distant blocks
- if dist := int64(block.Height) - int64(f.chain.BestBlockHeight()); dist < 0 || dist > maxQueueDist {
- log.Info("Discarded propagated block, too far away", " peer: ", peer, "number: ", block.Height, "distance: ", dist)
- return
- }
- // Schedule the block for future importing
- if _, ok := f.queued[hash]; !ok {
- op := &blockPending{
- peerID: peer,
- block: block,
- }
- f.queued[hash] = op
- f.queue.Push(op, -float32(block.Height))
- log.Info("Queued receive mine block.", " peer:", peer, " number:", block.Height, " queued:", f.queue.Size())
- }
-}
-
-// insert spawns a new goroutine to run a block insertion into the chain. If the
-// block's number is at the same height as the current import phase, it updates
-// the phase states accordingly.
-func (f *Fetcher) insert(peerID string, block *types.Block) {
- // Run the import on a new thread
- log.Info("Importing propagated block", " from peer: ", peerID, " height: ", block.Height)
- // Run the actual import and log any issues
- if _, err := f.chain.ProcessBlock(block); err != nil {
- log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
- fPeer, ok := f.peers.Peer(peerID)
- if !ok {
- return
- }
- swPeer := fPeer.GetPeer()
- if ban := fPeer.addBanScore(20, 0, "block process error"); ban {
- f.sw.AddBannedPeer(swPeer)
- f.sw.StopPeerGracefully(swPeer)
- }
- return
- }
- // If import succeeded, broadcast the block
- log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
- peers, err := f.peers.BroadcastMinedBlock(block)
- if err != nil {
- log.Errorf("Broadcast mine block error. %v", err)
- return
- }
- for _, fPeer := range peers {
- if fPeer == nil {
- continue
- }
- swPeer := fPeer.GetPeer()
- log.Info("Fetcher broadcast block error. Stop peer.")
- f.sw.StopPeerGracefully(swPeer)
- }
-}
-
-// forgetBlock removes all traces of a queued block from the fetcher's internal
-// state.
-func (f *Fetcher) forgetBlock(hash bc.Hash) {
- if insert := f.queued[hash]; insert != nil {
- delete(f.queued, hash)
- }
-}
import (
"encoding/hex"
+ "errors"
"net"
"path"
"strconv"
"github.com/bytom/version"
)
+const (
+ maxTxChanSize = 10000
+)
+
//SyncManager Sync Manager is responsible for the business layer information synchronization
type SyncManager struct {
- sw *p2p.Switch
-
- privKey crypto.PrivKeyEd25519 // local node's p2p key
- chain *core.Chain
- txPool *core.TxPool
- fetcher *Fetcher
- blockKeeper *blockKeeper
- peers *peerSet
-
- newTxCh chan *types.Tx
- newBlockCh chan *bc.Hash
- newPeerCh chan struct{}
- txSyncCh chan *txsync
- dropPeerCh chan *string
- quitSync chan struct{}
- config *cfg.Config
- synchronising int32
+ sw *p2p.Switch
+ genesisHash bc.Hash
+
+ privKey crypto.PrivKeyEd25519 // local node's p2p key
+ chain *core.Chain
+ txPool *core.TxPool
+ blockFetcher *blockFetcher
+ blockKeeper *blockKeeper
+ peers *peerSet
+
+ newTxCh chan *types.Tx
+ newBlockCh chan *bc.Hash
+ txSyncCh chan *txSyncMsg
+ quitSync chan struct{}
+ config *cfg.Config
}
//NewSyncManager create a sync manager
func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+ genesisHeader, err := chain.GetHeaderByHeight(0)
+ if err != nil {
+ return nil, err
+ }
+
sw := p2p.NewSwitch(config)
- peers := newPeerSet()
- dropPeerCh := make(chan *string, maxQuitReq)
+ peers := newPeerSet(sw)
manager := &SyncManager{
- sw: sw,
- txPool: txPool,
- chain: chain,
- privKey: crypto.GenPrivKeyEd25519(),
- fetcher: NewFetcher(chain, sw, peers),
- blockKeeper: newBlockKeeper(chain, sw, peers, dropPeerCh),
- peers: peers,
- newTxCh: make(chan *types.Tx, maxTxChanSize),
- newBlockCh: newBlockCh,
- newPeerCh: make(chan struct{}),
- txSyncCh: make(chan *txsync),
- dropPeerCh: dropPeerCh,
- quitSync: make(chan struct{}),
- config: config,
- }
-
- protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
+ sw: sw,
+ genesisHash: genesisHeader.Hash(),
+ txPool: txPool,
+ chain: chain,
+ privKey: crypto.GenPrivKeyEd25519(),
+ blockFetcher: newBlockFetcher(chain, peers),
+ blockKeeper: newBlockKeeper(chain, peers),
+ peers: peers,
+ newTxCh: make(chan *types.Tx, maxTxChanSize),
+ newBlockCh: newBlockCh,
+ txSyncCh: make(chan *txSyncMsg),
+ quitSync: make(chan struct{}),
+ config: config,
+ }
+
+ protocolReactor := NewProtocolReactor(manager, manager.peers)
manager.sw.AddReactor("PROTOCOL", protocolReactor)
// Create & add listener
return manager, nil
}
+//BestPeer return the highest p2p peerInfo
+func (sm *SyncManager) BestPeer() *PeerInfo {
+ bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
+ if bestPeer != nil {
+ return bestPeer.getPeerInfo()
+ }
+ return nil
+}
+
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
+ return sm.newTxCh
+}
+
+//GetPeerInfos return peer info of all peers
+func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
+ return sm.peers.getPeerInfos()
+}
+
+//IsCaughtUp check wheather the peer finish the sync
+func (sm *SyncManager) IsCaughtUp() bool {
+ peer := sm.peers.bestPeer(consensus.SFFullNode)
+ return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+}
+
+//NodeInfo get P2P peer node info
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+ return sm.sw.NodeInfo()
+}
+
+//StopPeer try to stop peer by given ID
+func (sm *SyncManager) StopPeer(peerID string) error {
+ if peer := sm.peers.getPeer(peerID); peer == nil {
+ return errors.New("peerId not exist")
+ }
+ sm.peers.removePeer(peerID)
+ return nil
+}
+
+//Switch get sync manager switch
+func (sm *SyncManager) Switch() *p2p.Switch {
+ return sm.sw
+}
+
+func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+ sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
+}
+
+func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+ blocks, err := msg.GetBlocks()
+ if err != nil {
+ log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
+ return
+ }
+
+ sm.blockKeeper.processBlocks(peer.ID(), blocks)
+}
+
+func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+ var block *types.Block
+ var err error
+ if msg.Height != 0 {
+ block, err = sm.chain.GetBlockByHeight(msg.Height)
+ } else {
+ block, err = sm.chain.GetBlockByHash(msg.GetHash())
+ }
+ if err != nil {
+ log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
+ return
+ }
+
+ ok, err := peer.sendBlock(block)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+ blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+ if err != nil || len(blocks) == 0 {
+ return
+ }
+
+ totalSize := 0
+ sendBlocks := []*types.Block{}
+ for _, block := range blocks {
+ rawData, err := block.MarshalText()
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
+ continue
+ }
+
+ if totalSize+len(rawData) > maxBlockchainResponseSize-16 {
+ break
+ }
+ totalSize += len(rawData)
+ sendBlocks = append(sendBlocks, block)
+ }
+
+ ok, err := peer.sendBlocks(sendBlocks)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+ headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+ if err != nil || len(headers) == 0 {
+ log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
+ return
+ }
+
+ ok, err := peer.sendHeaders(headers)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+ headers, err := msg.GetHeaders()
+ if err != nil {
+ log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
+ return
+ }
+
+ sm.blockKeeper.processHeaders(peer.ID(), headers)
+}
+
+func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
+ block, err := msg.GetMineBlock()
+ if err != nil {
+ log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
+ return
+ }
+
+ hash := block.Hash()
+ peer.markBlock(&hash)
+ sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
+ peer.setStatus(block.Height, &hash)
+}
+
+func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
+ bestHeader := sm.chain.BestBlockHeader()
+ genesisBlock, err := sm.chain.GetBlockByHeight(0)
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
+ }
+
+ genesisHash := genesisBlock.Hash()
+ msg := NewStatusResponseMessage(bestHeader, &genesisHash)
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+}
+
+func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
+ if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
+ peer.setStatus(msg.Height, msg.GetHash())
+ return
+ }
+
+ if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
+ log.WithFields(log.Fields{
+ "remote genesis": genesisHash.String(),
+ "local genesis": sm.genesisHash.String(),
+ }).Warn("fail hand shake due to differnt genesis")
+ return
+ }
+
+ sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
+}
+
+func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+ tx, err := msg.GetTransaction()
+ if err != nil {
+ sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+ return
+ }
+
+ if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
+ sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ }
+}
+
// Defaults to tcp
func protocolAndAddress(listenAddr string) (string, string) {
p, address := "tcp", listenAddr
}
// broadcast transactions
go sm.txBroadcastLoop()
-
- // broadcast mined blocks
go sm.minedBroadcastLoop()
-
- // start sync handlers
- go sm.syncer()
-
- go sm.txsyncLoop()
+ go sm.txSyncLoop()
}
//Stop stop sync manager
return ntab, nil
}
-func (sm *SyncManager) txBroadcastLoop() {
- for {
- select {
- case newTx := <-sm.newTxCh:
- peers, err := sm.peers.BroadcastTx(newTx)
- if err != nil {
- log.Errorf("Broadcast new tx error. %v", err)
- return
- }
- for _, smPeer := range peers {
- if smPeer == nil {
- continue
- }
- swPeer := smPeer.GetPeer()
- log.Info("Tx broadcast error. Stop Peer.")
- sm.sw.StopPeerGracefully(swPeer)
- }
- case <-sm.quitSync:
- return
- }
- }
-}
-
func (sm *SyncManager) minedBroadcastLoop() {
for {
select {
log.Errorf("Failed on mined broadcast loop get block %v", err)
return
}
- peers, err := sm.peers.BroadcastMinedBlock(block)
- if err != nil {
+ if err := sm.peers.broadcastMinedBlock(block); err != nil {
log.Errorf("Broadcast mine block error. %v", err)
return
}
- for _, smPeer := range peers {
- if smPeer == nil {
- continue
- }
- swPeer := smPeer.GetPeer()
- log.Info("New mined block broadcast error. Stop Peer.")
- sm.sw.StopPeerGracefully(swPeer)
- }
case <-sm.quitSync:
return
}
}
}
-
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
- return sm.sw.NodeInfo()
-}
-
-//BlockKeeper get block keeper
-func (sm *SyncManager) BlockKeeper() *blockKeeper {
- return sm.blockKeeper
-}
-
-//Peers get sync manager peer set
-func (sm *SyncManager) Peers() *peerSet {
- return sm.peers
-}
-
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
- return sm.sw
-}
-
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
- return sm.newTxCh
-}
import (
"bytes"
+ "encoding/json"
"errors"
"fmt"
"github.com/bytom/protocol/bc/types"
)
-//protocol msg
+//protocol msg byte
const (
- BlockRequestByte = byte(0x10)
- BlockResponseByte = byte(0x11)
- StatusRequestByte = byte(0x20)
- StatusResponseByte = byte(0x21)
- NewTransactionByte = byte(0x30)
- NewMineBlockByte = byte(0x40)
+ BlockchainChannel = byte(0x40)
+
+ BlockRequestByte = byte(0x10)
+ BlockResponseByte = byte(0x11)
+ HeadersRequestByte = byte(0x12)
+ HeadersResponseByte = byte(0x13)
+ BlocksRequestByte = byte(0x14)
+ BlocksResponseByte = byte(0x15)
+ StatusRequestByte = byte(0x20)
+ StatusResponseByte = byte(0x21)
+ NewTransactionByte = byte(0x30)
+ NewMineBlockByte = byte(0x40)
maxBlockchainResponseSize = 22020096 + 2
)
-// BlockchainMessage is a generic message for this reactor.
+//BlockchainMessage is a generic message for this reactor.
type BlockchainMessage interface{}
var _ = wire.RegisterInterface(
struct{ BlockchainMessage }{},
- wire.ConcreteType{&BlockRequestMessage{}, BlockRequestByte},
- wire.ConcreteType{&BlockResponseMessage{}, BlockResponseByte},
+ wire.ConcreteType{&GetBlockMessage{}, BlockRequestByte},
+ wire.ConcreteType{&BlockMessage{}, BlockResponseByte},
+ wire.ConcreteType{&GetHeadersMessage{}, HeadersRequestByte},
+ wire.ConcreteType{&HeadersMessage{}, HeadersResponseByte},
+ wire.ConcreteType{&GetBlocksMessage{}, BlocksRequestByte},
+ wire.ConcreteType{&BlocksMessage{}, BlocksResponseByte},
wire.ConcreteType{&StatusRequestMessage{}, StatusRequestByte},
wire.ConcreteType{&StatusResponseMessage{}, StatusResponseByte},
- wire.ConcreteType{&TransactionNotifyMessage{}, NewTransactionByte},
+ wire.ConcreteType{&TransactionMessage{}, NewTransactionByte},
wire.ConcreteType{&MineBlockMessage{}, NewMineBlockByte},
)
-type blockPending struct {
- block *types.Block
- peerID string
-}
-
-type txsNotify struct {
- tx *types.Tx
- peerID string
-}
-
//DecodeMessage decode msg
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
msgType = bz[0]
return
}
-//BlockRequestMessage request blocks from remote peers by height/hash
-type BlockRequestMessage struct {
+//GetBlockMessage request blocks from remote peers by height/hash
+type GetBlockMessage struct {
Height uint64
RawHash [32]byte
}
-//GetHash get hash
-func (m *BlockRequestMessage) GetHash() *bc.Hash {
+//GetHash reutrn the hash of the request
+func (m *GetBlockMessage) GetHash() *bc.Hash {
hash := bc.NewHash(m.RawHash)
return &hash
}
//String convert msg to string
-func (m *BlockRequestMessage) String() string {
+func (m *GetBlockMessage) String() string {
if m.Height > 0 {
- return fmt.Sprintf("BlockRequestMessage{Height: %d}", m.Height)
+ return fmt.Sprintf("GetBlockMessage{Height: %d}", m.Height)
}
hash := m.GetHash()
- return fmt.Sprintf("BlockRequestMessage{Hash: %s}", hash.String())
+ return fmt.Sprintf("GetBlockMessage{Hash: %s}", hash.String())
}
-//BlockResponseMessage response get block msg
-type BlockResponseMessage struct {
+//BlockMessage response get block msg
+type BlockMessage struct {
RawBlock []byte
}
-//NewBlockResponseMessage construct bock response msg
-func NewBlockResponseMessage(block *types.Block) (*BlockResponseMessage, error) {
+//NewBlockMessage construct bock response msg
+func NewBlockMessage(block *types.Block) (*BlockMessage, error) {
rawBlock, err := block.MarshalText()
if err != nil {
return nil, err
}
- return &BlockResponseMessage{RawBlock: rawBlock}, nil
+ return &BlockMessage{RawBlock: rawBlock}, nil
}
//GetBlock get block from msg
-func (m *BlockResponseMessage) GetBlock() *types.Block {
+func (m *BlockMessage) GetBlock() *types.Block {
block := &types.Block{
BlockHeader: types.BlockHeader{},
Transactions: []*types.Tx{},
}
//String convert msg to string
-func (m *BlockResponseMessage) String() string {
- return fmt.Sprintf("BlockResponseMessage{Size: %d}", len(m.RawBlock))
+func (m *BlockMessage) String() string {
+ return fmt.Sprintf("BlockMessage{Size: %d}", len(m.RawBlock))
}
-//TransactionNotifyMessage notify new tx msg
-type TransactionNotifyMessage struct {
- RawTx []byte
+//GetHeadersMessage is one of the bytom msg type
+type GetHeadersMessage struct {
+ RawBlockLocator [][32]byte
+ RawStopHash [32]byte
}
-//NewTransactionNotifyMessage construct notify new tx msg
-func NewTransactionNotifyMessage(tx *types.Tx) (*TransactionNotifyMessage, error) {
- rawTx, err := tx.TxData.MarshalText()
- if err != nil {
- return nil, err
+//NewGetHeadersMessage return a new GetHeadersMessage
+func NewGetHeadersMessage(blockLocator []*bc.Hash, stopHash *bc.Hash) *GetHeadersMessage {
+ msg := &GetHeadersMessage{
+ RawStopHash: stopHash.Byte32(),
+ }
+ for _, hash := range blockLocator {
+ msg.RawBlockLocator = append(msg.RawBlockLocator, hash.Byte32())
}
- return &TransactionNotifyMessage{RawTx: rawTx}, nil
+ return msg
}
-//GetTransaction get tx from msg
-func (m *TransactionNotifyMessage) GetTransaction() (*types.Tx, error) {
- tx := &types.Tx{}
- if err := tx.UnmarshalText(m.RawTx); err != nil {
- return nil, err
+//GetBlockLocator return the locator of the msg
+func (msg *GetHeadersMessage) GetBlockLocator() []*bc.Hash {
+ blockLocator := []*bc.Hash{}
+ for _, rawHash := range msg.RawBlockLocator {
+ hash := bc.NewHash(rawHash)
+ blockLocator = append(blockLocator, &hash)
}
- return tx, nil
+ return blockLocator
}
-//String
-func (m *TransactionNotifyMessage) String() string {
- return fmt.Sprintf("TransactionNotifyMessage{Size: %d}", len(m.RawTx))
+//GetStopHash return the stop hash of the msg
+func (msg *GetHeadersMessage) GetStopHash() *bc.Hash {
+ hash := bc.NewHash(msg.RawStopHash)
+ return &hash
+}
+
+//HeadersMessage is one of the bytom msg type
+type HeadersMessage struct {
+ RawHeaders [][]byte
+}
+
+//NewHeadersMessage create a new HeadersMessage
+func NewHeadersMessage(headers []*types.BlockHeader) (*HeadersMessage, error) {
+ RawHeaders := [][]byte{}
+ for _, header := range headers {
+ data, err := json.Marshal(header)
+ if err != nil {
+ return nil, err
+ }
+
+ RawHeaders = append(RawHeaders, data)
+ }
+ return &HeadersMessage{RawHeaders: RawHeaders}, nil
+}
+
+//GetHeaders return the headers in the msg
+func (msg *HeadersMessage) GetHeaders() ([]*types.BlockHeader, error) {
+ headers := []*types.BlockHeader{}
+ for _, data := range msg.RawHeaders {
+ header := &types.BlockHeader{}
+ if err := json.Unmarshal(data, header); err != nil {
+ return nil, err
+ }
+
+ headers = append(headers, header)
+ }
+ return headers, nil
+}
+
+//GetBlocksMessage is one of the bytom msg type
+type GetBlocksMessage struct {
+ RawBlockLocator [][32]byte
+ RawStopHash [32]byte
+}
+
+//NewGetBlocksMessage create a new GetBlocksMessage
+func NewGetBlocksMessage(blockLocator []*bc.Hash, stopHash *bc.Hash) *GetBlocksMessage {
+ msg := &GetBlocksMessage{
+ RawStopHash: stopHash.Byte32(),
+ }
+ for _, hash := range blockLocator {
+ msg.RawBlockLocator = append(msg.RawBlockLocator, hash.Byte32())
+ }
+ return msg
+}
+
+//GetBlockLocator return the locator of the msg
+func (msg *GetBlocksMessage) GetBlockLocator() []*bc.Hash {
+ blockLocator := []*bc.Hash{}
+ for _, rawHash := range msg.RawBlockLocator {
+ hash := bc.NewHash(rawHash)
+ blockLocator = append(blockLocator, &hash)
+ }
+ return blockLocator
+}
+
+//GetStopHash return the stop hash of the msg
+func (msg *GetBlocksMessage) GetStopHash() *bc.Hash {
+ hash := bc.NewHash(msg.RawStopHash)
+ return &hash
+}
+
+//BlocksMessage is one of the bytom msg type
+type BlocksMessage struct {
+ RawBlocks [][]byte
+}
+
+//NewBlocksMessage create a new BlocksMessage
+func NewBlocksMessage(blocks []*types.Block) (*BlocksMessage, error) {
+ rawBlocks := [][]byte{}
+ for _, block := range blocks {
+ data, err := json.Marshal(block)
+ if err != nil {
+ return nil, err
+ }
+
+ rawBlocks = append(rawBlocks, data)
+ }
+ return &BlocksMessage{RawBlocks: rawBlocks}, nil
+}
+
+//GetBlocks returns the blocks in the msg
+func (msg *BlocksMessage) GetBlocks() ([]*types.Block, error) {
+ blocks := []*types.Block{}
+ for _, data := range msg.RawBlocks {
+ block := &types.Block{}
+ if err := json.Unmarshal(data, block); err != nil {
+ return nil, err
+ }
+
+ blocks = append(blocks, block)
+ }
+ return blocks, nil
}
//StatusRequestMessage status request msg
return fmt.Sprintf("StatusResponseMessage{Height: %d, Best hash: %s, Genesis hash: %s}", m.Height, hash.String(), genesisHash.String())
}
+//TransactionMessage notify new tx msg
+type TransactionMessage struct {
+ RawTx []byte
+}
+
+//NewTransactionMessage construct notify new tx msg
+func NewTransactionMessage(tx *types.Tx) (*TransactionMessage, error) {
+ rawTx, err := tx.TxData.MarshalText()
+ if err != nil {
+ return nil, err
+ }
+ return &TransactionMessage{RawTx: rawTx}, nil
+}
+
+//GetTransaction get tx from msg
+func (m *TransactionMessage) GetTransaction() (*types.Tx, error) {
+ tx := &types.Tx{}
+ if err := tx.UnmarshalText(m.RawTx); err != nil {
+ return nil, err
+ }
+ return tx, nil
+}
+
+//String
+func (m *TransactionMessage) String() string {
+ return fmt.Sprintf("TransactionMessage{Size: %d}", len(m.RawTx))
+}
+
//MineBlockMessage new mined block msg
type MineBlockMessage struct {
RawBlock []byte
package netsync
import (
- "strconv"
+ "net"
"sync"
log "github.com/sirupsen/logrus"
"github.com/bytom/consensus"
"github.com/bytom/errors"
- "github.com/bytom/p2p"
"github.com/bytom/p2p/trust"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
)
-var (
- errClosed = errors.New("peer set is closed")
- errAlreadyRegistered = errors.New("peer is already registered")
- errNotRegistered = errors.New("peer is not registered")
-)
-
const (
- defaultVersion = 1
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
defaultBanThreshold = uint64(100)
)
-type peer struct {
- mtx sync.RWMutex
- version int // Protocol version negotiated
- services consensus.ServiceFlag
- id string
- height uint64
- hash *bc.Hash
- banScore trust.DynamicBanScore
-
- swPeer *p2p.Peer
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+ Addr() net.Addr
+ ID() string
+ ServiceFlag() consensus.ServiceFlag
+ TrySend(byte, interface{}) bool
+}
- knownTxs *set.Set // Set of transaction hashes known to be known by this peer
- knownBlocks *set.Set // Set of block hashes known to be known by this peer
+//BasePeerSet is the intergace for connection level peer manager
+type BasePeerSet interface {
+ AddBannedPeer(string) error
+ StopPeerGracefully(string)
}
-// PeerInfo indicate peer information
+// PeerInfo indicate peer status snap
type PeerInfo struct {
- Id string `json:"id"`
+ ID string `json:"id"`
RemoteAddr string `json:"remote_addr"`
Height uint64 `json:"height"`
Delay uint32 `json:"delay"`
}
-func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
- services := consensus.SFFullNode
- if len(Peer.Other) != 0 {
- if serviceFlag, err := strconv.ParseUint(Peer.Other[0], 10, 64); err != nil {
- services = consensus.ServiceFlag(serviceFlag)
- }
- }
+type peer struct {
+ BasePeer
+ mtx sync.RWMutex
+ services consensus.ServiceFlag
+ height uint64
+ hash *bc.Hash
+ banScore trust.DynamicBanScore
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+}
+func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
return &peer{
- version: defaultVersion,
- services: services,
- id: Peer.Key,
+ BasePeer: basePeer,
+ services: basePeer.ServiceFlag(),
height: height,
hash: hash,
- swPeer: Peer,
knownTxs: set.New(),
knownBlocks: set.New(),
}
}
-func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
+func (p *peer) Height() uint64 {
p.mtx.RLock()
defer p.mtx.RUnlock()
- return p.height, p.hash
+ return p.height
}
-func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
+func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
+ score := p.banScore.Increase(persistent, transient)
+ if score > defaultBanThreshold {
+ log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
+ return true
+ }
- p.height = height
- p.hash = hash
+ warnThreshold := defaultBanThreshold >> 1
+ if score > warnThreshold {
+ log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
+ }
+ return false
}
-func (p *peer) requestBlockByHash(hash *bc.Hash) error {
- msg := &BlockRequestMessage{RawHash: hash.Byte32()}
- p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return nil
+func (p *peer) getBlockByHeight(height uint64) bool {
+ msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
+ return p.TrySend(BlockchainChannel, msg)
}
-func (p *peer) requestBlockByHeight(height uint64) error {
- msg := &BlockRequestMessage{Height: height}
- p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return nil
+func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+ msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
+ return p.TrySend(BlockchainChannel, msg)
}
-func (p *peer) SendTransactions(txs []*types.Tx) error {
- for _, tx := range txs {
- msg, err := NewTransactionNotifyMessage(tx)
- if err != nil {
- return errors.New("Failed construction tx msg")
- }
- hash := &tx.ID
- p.knownTxs.Add(hash.String())
- if p.swPeer == nil {
- return errPeerDropped
- }
- p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- }
- return nil
+func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+ msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+ return p.TrySend(BlockchainChannel, msg)
}
-func (p *peer) GetPeer() *p2p.Peer {
- p.mtx.RLock()
- defer p.mtx.RUnlock()
-
- return p.swPeer
-}
-
-
-func (p *peer) GetPeerInfo() *PeerInfo {
+func (p *peer) getPeerInfo() *PeerInfo {
p.mtx.RLock()
defer p.mtx.RUnlock()
return &PeerInfo{
- Id: p.id,
- RemoteAddr: p.swPeer.RemoteAddr,
+ ID: p.ID(),
+ RemoteAddr: p.Addr().String(),
Height: p.height,
- Delay: 0, // TODO
}
}
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (p *peer) MarkTransaction(hash *bc.Hash) {
+func (p *peer) markBlock(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
- // If we reached the memory allowance, drop a previously known transaction hash
- for p.knownTxs.Size() >= maxKnownTxs {
- p.knownTxs.Pop()
+ for p.knownBlocks.Size() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
}
- p.knownTxs.Add(hash.String())
+ p.knownBlocks.Add(hash.String())
}
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (p *peer) MarkBlock(hash *bc.Hash) {
+func (p *peer) markTransaction(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
- // If we reached the memory allowance, drop a previously known block hash
- for p.knownBlocks.Size() >= maxKnownBlocks {
- p.knownBlocks.Pop()
+ for p.knownTxs.Size() >= maxKnownTxs {
+ p.knownTxs.Pop()
}
- p.knownBlocks.Add(hash.String())
+ p.knownTxs.Add(hash.String())
}
-// addBanScore increases the persistent and decaying ban score fields by the
-// values passed as parameters. If the resulting score exceeds half of the ban
-// threshold, a warning is logged including the reason provided. Further, if
-// the score is above the ban threshold, the peer will be banned and
-// disconnected.
-func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
- warnThreshold := defaultBanThreshold >> 1
- if transient == 0 && persistent == 0 {
- // The score is not being increased, but a warning message is still
- // logged if the score is above the warn threshold.
- score := p.banScore.Int()
- if score > warnThreshold {
- log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
- }
- return false
- }
- score := p.banScore.Increase(persistent, transient)
- if score > warnThreshold {
- log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
- if score > defaultBanThreshold {
- log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
- return true
- }
+func (p *peer) sendBlock(block *types.Block) (bool, error) {
+ msg, err := NewBlockMessage(block)
+ if err != nil {
+ return false, errors.Wrap(err, "fail on NewBlockMessage")
}
- return false
-}
-type peerSet struct {
- peers map[string]*peer
- lock sync.RWMutex
- closed bool
-}
-
-// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
- return &peerSet{
- peers: make(map[string]*peer),
+ ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ if ok {
+ blcokHash := block.Hash()
+ p.knownBlocks.Add(blcokHash.String())
}
+ return ok, nil
}
-// Register injects a new peer into the working set, or returns an error if the
-// peer is already known.
-func (ps *peerSet) Register(p *peer) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
+func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
+ msg, err := NewBlocksMessage(blocks)
+ if err != nil {
+ return false, errors.Wrap(err, "fail on NewBlocksMessage")
+ }
- if ps.closed {
- return errClosed
+ if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ return ok, nil
}
- if _, ok := ps.peers[p.id]; ok {
- return errAlreadyRegistered
+
+ for _, block := range blocks {
+ blcokHash := block.Hash()
+ p.knownBlocks.Add(blcokHash.String())
}
- ps.peers[p.id] = p
- return nil
+ return true, nil
}
-// Unregister removes a remote peer from the active set, disabling any further
-// actions to/from that particular entity.
-func (ps *peerSet) Unregister(id string) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[id]; !ok {
- return errNotRegistered
+func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
+ msg, err := NewHeadersMessage(headers)
+ if err != nil {
+ return false, errors.New("fail on NewHeadersMessage")
}
- delete(ps.peers, id)
- return nil
-}
-// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) (*peer, bool) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
- p, ok := ps.peers[id]
- return p, ok
+ ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return ok, nil
}
+func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
+ for _, tx := range txs {
+ msg, err := NewTransactionMessage(tx)
+ if err != nil {
+ return false, errors.Wrap(err, "failed to tx msg")
+ }
-// getPeerInfos return all peer information of current node
-func (ps *peerSet) GetPeerInfos() []*PeerInfo {
- var peerInfos []*PeerInfo
- for _, peer := range ps.peers {
- peerInfos = append(peerInfos, peer.GetPeerInfo())
+ if p.knownTxs.Has(tx.ID.String()) {
+ continue
+ }
+ if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ return ok, nil
+ }
+ p.knownTxs.Add(tx.ID.String())
}
- return peerInfos
+ return true, nil
}
-// Len returns if the current number of peers in the set.
-func (ps *peerSet) Len() int {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- return len(ps.peers)
+func (p *peer) setStatus(height uint64, hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+ p.height = height
+ p.hash = hash
}
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+type peerSet struct {
+ BasePeerSet
+ mtx sync.RWMutex
+ peers map[string]*peer
+}
- if peer, ok := ps.peers[peerID]; ok {
- peer.MarkTransaction(hash)
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet(basePeerSet BasePeerSet) *peerSet {
+ return &peerSet{
+ BasePeerSet: basePeerSet,
+ peers: make(map[string]*peer),
}
}
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
+ ps.mtx.Lock()
+ peer := ps.peers[peerID]
+ ps.mtx.Unlock()
- if peer, ok := ps.peers[peerID]; ok {
- peer.MarkBlock(hash)
+ if peer == nil {
+ return
}
+ if ban := peer.addBanScore(persistent, transient, reason); !ban {
+ return
+ }
+ if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
+ log.WithField("err", err).Error("fail on add ban peer")
+ }
+ ps.removePeer(peerID)
}
-// PeersWithoutBlock retrieves a list of peers that do not have a given block in
-// their set of known hashes.
-func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownBlocks.Has(hash.String()) {
- list = append(list, p)
- }
+ if _, ok := ps.peers[peer.ID()]; !ok {
+ ps.peers[peer.ID()] = newPeer(height, hash, peer)
+ return
}
- return list
+ log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
}
-// PeersWithoutTx retrieves a list of peers that do not have a given transaction
-// in their set of known hashes.
-func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
+ var bestPeer *peer
for _, p := range ps.peers {
- if !p.knownTxs.Has(hash.String()) {
- list = append(list, p)
+ if !p.services.IsEnable(flag) {
+ continue
+ }
+ if bestPeer == nil || p.height > bestPeer.height {
+ bestPeer = p
}
}
- return list
+ return bestPeer
}
-// BestPeer retrieves the known peer with the currently highest total difficulty.
-func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- var bestPeer *p2p.Peer
- var bestHeight uint64
+func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
+ msg, err := NewMinedBlockMessage(block)
+ if err != nil {
+ return errors.Wrap(err, "fail on broadcast mined block")
+ }
- for _, p := range ps.peers {
- if bestPeer == nil || p.height > bestHeight {
- bestPeer, bestHeight = p.swPeer, p.height
+ hash := block.Hash()
+ peers := ps.peersWithoutBlock(&hash)
+ for _, peer := range peers {
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ ps.removePeer(peer.ID())
+ continue
}
+ peer.markBlock(&hash)
}
-
- return bestPeer, bestHeight
+ return nil
}
-// Close disconnects all peers.
-// No new peers can be registered after Close has returned.
-func (ps *peerSet) Close() {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- for _, p := range ps.peers {
- p.swPeer.CloseConn()
+func (ps *peerSet) broadcastTx(tx *types.Tx) error {
+ msg, err := NewTransactionMessage(tx)
+ if err != nil {
+ return errors.Wrap(err, "fail on broadcast tx")
}
- ps.closed = true
-}
-func (ps *peerSet) AddPeer(peer *p2p.Peer) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if _, ok := ps.peers[peer.Key]; !ok {
- keeperPeer := newPeer(0, nil, peer)
- ps.peers[peer.Key] = keeperPeer
- log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
- return
+ peers := ps.peersWithoutTx(&tx.ID)
+ for _, peer := range peers {
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ ps.removePeer(peer.ID())
+ continue
+ }
+ peer.markTransaction(&tx.ID)
}
- log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
+ return nil
}
-func (ps *peerSet) RemovePeer(peerID string) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- delete(ps.peers, peerID)
- log.WithField("ID", peerID).Info("Delete peer from peerset")
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) getPeer(id string) *peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+ return ps.peers[id]
}
-func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
+func (ps *peerSet) getPeerInfos() []*PeerInfo {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
- if peer, ok := ps.peers[peerID]; ok {
- peer.SetStatus(height, hash)
- }
-}
-
-func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
- peer, ok := ps.Peer(peerID)
- if !ok {
- return errors.New("Can't find peer. ")
+ result := []*PeerInfo{}
+ for _, peer := range ps.peers {
+ result = append(result, peer.getPeerInfo())
}
- return peer.requestBlockByHash(hash)
+ return result
}
-func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
- peer, ok := ps.Peer(peerID)
- if !ok {
- return errors.New("Can't find peer. ")
- }
- return peer.requestBlockByHeight(height)
-}
+func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
- msg, err := NewMinedBlockMessage(block)
- if err != nil {
- return nil, errors.New("Failed construction block msg")
- }
- hash := block.Hash()
- peers := ps.PeersWithoutBlock(&hash)
- abnormalPeers := make([]*peer, 0)
- for _, peer := range peers {
- if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- abnormalPeers = append(abnormalPeers, peer)
- continue
- }
- if p, ok := ps.Peer(peer.id); ok {
- p.MarkBlock(&hash)
+ peers := []*peer{}
+ for _, peer := range ps.peers {
+ if !peer.knownBlocks.Has(hash.String()) {
+ peers = append(peers, peer)
}
}
- return abnormalPeers, nil
+ return peers
}
-func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
- return ps.BroadcastMinedBlock(block)
-}
+func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
-func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
- msg, err := NewTransactionNotifyMessage(tx)
- if err != nil {
- return nil, errors.New("Failed construction tx msg")
- }
- peers := ps.PeersWithoutTx(&tx.ID)
- abnormalPeers := make([]*peer, 0)
- for _, peer := range peers {
- if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- abnormalPeers = append(abnormalPeers, peer)
- continue
- }
- if p, ok := ps.Peer(peer.id); ok {
- p.MarkTransaction(&tx.ID)
+ peers := []*peer{}
+ for _, peer := range ps.peers {
+ if !peer.knownTxs.Has(hash.String()) {
+ peers = append(peers, peer)
}
}
- return abnormalPeers, nil
+ return peers
+}
+
+func (ps *peerSet) removePeer(peerID string) {
+ ps.mtx.Lock()
+ delete(ps.peers, peerID)
+ ps.mtx.Unlock()
+ ps.StopPeerGracefully(peerID)
}
import (
"reflect"
- "strings"
- "sync"
"time"
log "github.com/sirupsen/logrus"
- cmn "github.com/tendermint/tmlibs/common"
"github.com/bytom/errors"
"github.com/bytom/p2p"
"github.com/bytom/p2p/connection"
- "github.com/bytom/protocol"
- "github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/types"
)
const (
- // BlockchainChannel is a channel for blocks and status updates
- BlockchainChannel = byte(0x40)
- protocolHandshakeTimeout = time.Second * 10
- handshakeRetryTicker = 4 * time.Second
+ handshakeTimeout = 10 * time.Second
+ handshakeCheckPerid = 500 * time.Millisecond
)
var (
- //ErrProtocolHandshakeTimeout peers handshake timeout
- ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
- ErrStatusRequest = errors.New("Status request error")
- ErrDiffGenesisHash = errors.New("Different genesis hash")
+ errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+ errStatusRequest = errors.New("Status request error")
)
-// Response describes the response standard.
-type Response struct {
- Status string `json:"status,omitempty"`
- Msg string `json:"msg,omitempty"`
- Data interface{} `json:"data,omitempty"`
-}
-
-type initalPeerStatus struct {
- peerID string
- height uint64
- hash *bc.Hash
- genesisHash *bc.Hash
-}
-
//ProtocolReactor handles new coming protocol message.
type ProtocolReactor struct {
p2p.BaseReactor
- chain *protocol.Chain
- blockKeeper *blockKeeper
- txPool *protocol.TxPool
- sw *p2p.Switch
- fetcher *Fetcher
- peers *peerSet
- handshakeMu sync.Mutex
- genesisHash bc.Hash
-
- newPeerCh chan struct{}
- quitReqBlockCh chan *string
- txSyncCh chan *txsync
- peerStatusCh chan *initalPeerStatus
+ sm *SyncManager
+ peers *peerSet
}
// NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
+func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
pr := &ProtocolReactor{
- chain: chain,
- blockKeeper: blockPeer,
- txPool: txPool,
- sw: sw,
- fetcher: fetcher,
- peers: peers,
- newPeerCh: newPeerCh,
- txSyncCh: txSyncCh,
- quitReqBlockCh: quitReqBlockCh,
- peerStatusCh: make(chan *initalPeerStatus),
+ sm: sm,
+ peers: peers,
}
pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
- genesisBlock, _ := pr.chain.GetBlockByHeight(0)
- pr.genesisHash = genesisBlock.Hash()
-
return pr
}
pr.BaseReactor.OnStop()
}
-// syncTransactions starts sending all currently pending transactions to the given peer.
-func (pr *ProtocolReactor) syncTransactions(p *peer) {
- if p == nil {
- return
- }
- pending := pr.txPool.GetTransactions()
- if len(pending) == 0 {
- return
- }
- txs := make([]*types.Tx, len(pending))
- for i, batch := range pending {
- txs[i] = batch.Tx
- }
- pr.txSyncCh <- &txsync{p, txs}
-}
-
// AddPeer implements Reactor by sending our state to peer.
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
- pr.handshakeMu.Lock()
- defer pr.handshakeMu.Unlock()
- if peer == nil {
- return errPeerDropped
- }
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
- return ErrStatusRequest
+ return errStatusRequest
}
- retryTicker := time.Tick(handshakeRetryTicker)
- handshakeWait := time.NewTimer(protocolHandshakeTimeout)
+
+ checkTicker := time.NewTimer(handshakeCheckPerid)
+ timeoutTicker := time.NewTimer(handshakeTimeout)
for {
select {
- case status := <-pr.peerStatusCh:
- if status.peerID == peer.Key {
- if strings.Compare(pr.genesisHash.String(), status.genesisHash.String()) != 0 {
- log.Info("Remote peer genesis block hash:", status.genesisHash.String(), " local hash:", pr.genesisHash.String())
- return ErrDiffGenesisHash
- }
- pr.peers.AddPeer(peer)
- pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
- prPeer, ok := pr.peers.Peer(peer.Key)
- if !ok {
- return errPeerDropped
- }
- pr.syncTransactions(prPeer)
- pr.newPeerCh <- struct{}{}
+ case <-checkTicker.C:
+ if exist := pr.peers.getPeer(peer.Key); exist != nil {
+ pr.sm.syncTransactions(peer.Key)
return nil
}
- case <-retryTicker:
- if peer == nil {
- return errPeerDropped
- }
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
- return ErrStatusRequest
- }
- case <-handshakeWait.C:
- return ErrProtocolHandshakeTimeout
+
+ case <-timeoutTicker.C:
+ return errProtocolHandshakeTimeout
}
}
}
// RemovePeer implements Reactor by removing peer from the pool.
func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- select {
- case pr.quitReqBlockCh <- &peer.Key:
- default:
- log.Warning("quitReqBlockCh is full")
- }
- pr.peers.RemovePeer(peer.Key)
+ pr.peers.removePeer(peer.Key)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
- _, msg, err := DecodeMessage(msgBytes)
+ msgType, msg, err := DecodeMessage(msgBytes)
if err != nil {
- log.Errorf("Error decoding message %v", err)
+ log.WithField("err", err).Errorf("fail on reactor decoding message")
+ return
+ }
+
+ peer := pr.peers.getPeer(src.Key)
+ if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
return
}
- log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
switch msg := msg.(type) {
- case *BlockRequestMessage:
- var block *types.Block
- var err error
- if msg.Height != 0 {
- block, err = pr.chain.GetBlockByHeight(msg.Height)
- } else {
- block, err = pr.chain.GetBlockByHash(msg.GetHash())
- }
- if err != nil {
- log.Errorf("Fail on BlockRequestMessage get block: %v", err)
- return
- }
- response, err := NewBlockResponseMessage(block)
- if err != nil {
- log.Errorf("Fail on BlockRequestMessage create response: %v", err)
- return
- }
- src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
+ case *GetBlockMessage:
+ pr.sm.handleGetBlockMsg(peer, msg)
- case *BlockResponseMessage:
- log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
- pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
+ case *BlockMessage:
+ pr.sm.handleBlockMsg(peer, msg)
case *StatusRequestMessage:
- blockHeader := pr.chain.BestBlockHeader()
- src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader, &pr.genesisHash)})
+ pr.sm.handleStatusRequestMsg(src)
case *StatusResponseMessage:
- peerStatus := &initalPeerStatus{
- peerID: src.Key,
- height: msg.Height,
- hash: msg.GetHash(),
- genesisHash: msg.GetGenesisHash(),
- }
- pr.peerStatusCh <- peerStatus
+ pr.sm.handleStatusResponseMsg(src, msg)
- case *TransactionNotifyMessage:
- tx, err := msg.GetTransaction()
- if err != nil {
- log.Errorf("Error decoding new tx %v", err)
- return
- }
- pr.blockKeeper.AddTx(tx, src.Key)
+ case *TransactionMessage:
+ pr.sm.handleTransactionMsg(peer, msg)
case *MineBlockMessage:
- block, err := msg.GetMineBlock()
- if err != nil {
- log.Errorf("Error decoding mined block %v", err)
- return
- }
- // Mark the peer as owning the block and schedule it for import
- hash := block.Hash()
- pr.peers.MarkBlock(src.Key, &hash)
- pr.fetcher.Enqueue(src.Key, block)
- pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
+ pr.sm.handleMineBlockMsg(peer, msg)
+
+ case *GetHeadersMessage:
+ pr.sm.handleGetHeadersMsg(peer, msg)
+
+ case *HeadersMessage:
+ pr.sm.handleHeadersMsg(peer, msg)
+
+ case *GetBlocksMessage:
+ pr.sm.handleGetBlocksMsg(peer, msg)
+
+ case *BlocksMessage:
+ pr.sm.handleBlocksMsg(peer, msg)
default:
- log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
+ log.Errorf("unknown message type %v", reflect.TypeOf(msg))
}
}
+++ /dev/null
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package netsync
-
-import (
- "math/rand"
- "sync/atomic"
- "time"
-
- log "github.com/sirupsen/logrus"
-
- "github.com/bytom/common"
- "github.com/bytom/protocol/bc/types"
-)
-
-const (
- forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
- minDesiredPeerCount = 5 // Amount of peers desired to start syncing
-
- // This is the target size for the packs of transactions sent by txsyncLoop.
- // A pack can get larger than this if a single transactions exceeds this size.
- txsyncPackSize = 100 * 1024
-)
-
-type txsync struct {
- p *peer
- txs []*types.Tx
-}
-
-// syncer is responsible for periodically synchronising with the network, both
-// downloading hashes and blocks as well as handling the announcement handler.
-func (sm *SyncManager) syncer() {
- // Start and ensure cleanup of sync mechanisms
- sm.fetcher.Start()
- defer sm.fetcher.Stop()
- //defer sm.downloader.Terminate()
-
- // Wait for different events to fire synchronisation operations
- forceSync := time.NewTicker(forceSyncCycle)
- defer forceSync.Stop()
-
- for {
- select {
- case <-sm.newPeerCh:
- log.Info("New peer connected.")
- // Make sure we have peers to select from, then sync
- if sm.sw.Peers().Size() < minDesiredPeerCount {
- break
- }
- go sm.synchronise()
-
- case <-forceSync.C:
- // Force a sync even if not enough peers are present
- go sm.synchronise()
-
- case <-sm.quitSync:
- return
- }
- }
-}
-
-// synchronise tries to sync up our local block chain with a remote peer.
-func (sm *SyncManager) synchronise() {
- log.Debug("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
- // Make sure only one goroutine is ever allowed past this point at once
- if !atomic.CompareAndSwapInt32(&sm.synchronising, 0, 1) {
- log.Info("Synchronising ...")
- return
- }
- defer atomic.StoreInt32(&sm.synchronising, 0)
- for len(sm.dropPeerCh) > 0 {
- <-sm.dropPeerCh
- }
-
- peer, bestHeight := sm.peers.BestPeer()
- // Short circuit if no peers are available
- if peer == nil {
- return
- }
-
- if ok := sm.Switch().Peers().Has(peer.Key); !ok {
- log.Info("Peer disconnected")
- sm.sw.StopPeerGracefully(peer)
- return
- }
-
- if bestHeight > sm.chain.BestBlockHeight() {
- log.Info("sync peer:", peer.Addr(), " height:", bestHeight)
- sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
- }
-}
-
-// txsyncLoop takes care of the initial transaction sync for each new
-// connection. When a new peer appears, we relay all currently pending
-// transactions. In order to minimise egress bandwidth usage, we send
-// the transactions in small packs to one peer at a time.
-func (sm *SyncManager) txsyncLoop() {
- var (
- pending = make(map[string]*txsync)
- sending = false // whether a send is active
- pack = new(txsync) // the pack that is being sent
- done = make(chan error, 1) // result of the send
- )
-
- // send starts a sending a pack of transactions from the sync.
- send := func(s *txsync) {
- // Fill pack with transactions up to the target size.
- size := common.StorageSize(0)
- pack.p = s.p
- pack.txs = pack.txs[:0]
- for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
- pack.txs = append(pack.txs, s.txs[i])
- size += common.StorageSize(s.txs[i].SerializedSize)
- }
- // Remove the transactions that will be sent.
- s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
- if len(s.txs) == 0 {
- delete(pending, s.p.swPeer.Key)
- }
- // Send the pack in the background.
- log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
- sending = true
- go func() { done <- pack.p.SendTransactions(pack.txs) }()
- }
-
- // pick chooses the next pending sync.
- pick := func() *txsync {
- if len(pending) == 0 {
- return nil
- }
- n := rand.Intn(len(pending)) + 1
- for _, s := range pending {
- if n--; n == 0 {
- return s
- }
- }
- return nil
- }
-
- for {
- select {
- case s := <-sm.txSyncCh:
- pending[s.p.swPeer.Key] = s
- if !sending {
- send(s)
- }
- case err := <-done:
- sending = false
- // Stop tracking peers that cause send failures.
- if err != nil {
- log.Info("Transaction send failed", "err", err)
- delete(pending, pack.p.swPeer.Key)
- }
- // Schedule the next send.
- if s := pick(); s != nil {
- send(s)
- }
- case <-sm.quitSync:
- return
- }
- }
-}
--- /dev/null
+package netsync
+
+import (
+ "math/rand"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/bytom/protocol/bc/types"
+)
+
+const (
+ // This is the target size for the packs of transactions sent by txSyncLoop.
+ // A pack can get larger than this if a single transactions exceeds this size.
+ txSyncPackSize = 100 * 1024
+)
+
+type txSyncMsg struct {
+ peerID string
+ txs []*types.Tx
+}
+
+func (sm *SyncManager) syncTransactions(peerID string) {
+ pending := sm.txPool.GetTransactions()
+ if len(pending) == 0 {
+ return
+ }
+
+ txs := make([]*types.Tx, len(pending))
+ for i, batch := range pending {
+ txs[i] = batch.Tx
+ }
+ sm.txSyncCh <- &txSyncMsg{peerID, txs}
+}
+
+func (sm *SyncManager) txBroadcastLoop() {
+ for {
+ select {
+ case newTx := <-sm.newTxCh:
+ if err := sm.peers.broadcastTx(newTx); err != nil {
+ log.Errorf("Broadcast new tx error. %v", err)
+ return
+ }
+ case <-sm.quitSync:
+ return
+ }
+ }
+}
+
+// txSyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (sm *SyncManager) txSyncLoop() {
+ pending := make(map[string]*txSyncMsg)
+ sending := false // whether a send is active
+ done := make(chan error, 1) // result of the send
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(msg *txSyncMsg) {
+ peer := sm.peers.getPeer(msg.peerID)
+ if peer == nil {
+ delete(pending, msg.peerID)
+ }
+
+ totalSize := uint64(0)
+ sendTxs := []*types.Tx{}
+ for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
+ sendTxs = append(sendTxs, msg.txs[i])
+ totalSize += msg.txs[i].SerializedSize
+ }
+
+ copy(msg.txs, msg.txs[len(sendTxs):])
+ if len(msg.txs) == 0 {
+ delete(pending, msg.peerID)
+ }
+
+ // Send the pack in the background.
+ log.WithFields(log.Fields{
+ "count": len(sendTxs),
+ "bytes": totalSize,
+ "peer": msg.peerID,
+ }).Debug("txSyncLoop sending transactions")
+ sending = true
+ go func() {
+ ok, err := peer.sendTransactions(sendTxs)
+ if !ok {
+ sm.peers.removePeer(msg.peerID)
+ }
+ done <- err
+ }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *txSyncMsg {
+ if len(pending) == 0 {
+ return nil
+ }
+
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case msg := <-sm.txSyncCh:
+ pending[msg.peerID] = msg
+ if !sending {
+ send(msg)
+ }
+
+ case err := <-done:
+ sending = false
+ if err != nil {
+ log.WithField("err", err).Warning("fail on txSyncLoop sending")
+ }
+
+ if s := pick(); s != nil {
+ send(s)
+ }
+ }
+ }
+}
import (
"fmt"
"net"
+ "strconv"
"time"
"github.com/pkg/errors"
cmn "github.com/tendermint/tmlibs/common"
cfg "github.com/bytom/config"
+ "github.com/bytom/consensus"
"github.com/bytom/p2p/connection"
)
return peerNodeInfo, nil
}
+func (p *Peer) ID() string {
+ return p.Key
+}
+
// IsOutbound returns true if the connection is outbound, false otherwise.
func (p *Peer) IsOutbound() bool {
return p.outbound
return p.mconn.Send(chID, msg)
}
+func (p *Peer) ServiceFlag() consensus.ServiceFlag {
+ services := consensus.SFFullNode
+ if len(p.Other) == 0 {
+ return services
+ }
+
+ if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
+ services = consensus.ServiceFlag(serviceFlag)
+ }
+ return services
+}
+
// String representation.
func (p *Peer) String() string {
if p.outbound {
if r.SendAddrs(p, nodes) {
<-time.After(1 * time.Second)
- r.Switch.StopPeerGracefully(p)
+ r.Switch.StopPeerGracefully(p.Key)
}
return errors.New("addPeer: reach the max peer, exchange then close")
}
_, msg, err := DecodeMessage(rawMsg)
if err != nil {
log.WithField("error", err).Error("failed to decoding pex message")
- r.Switch.StopPeerGracefully(p)
+ r.Switch.StopPeerGracefully(p.Key)
return
}
ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
if !ok {
- r.Switch.StopPeerGracefully(p)
+ r.Switch.StopPeerGracefully(p.Key)
}
return ok
}
}
//AddBannedPeer add peer to blacklist
-func (sw *Switch) AddBannedPeer(peer *Peer) error {
+func (sw *Switch) AddBannedPeer(ip string) error {
sw.mtx.Lock()
defer sw.mtx.Unlock()
- key := peer.NodeInfo.RemoteAddrHost()
- sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
+ sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
datajson, err := json.Marshal(sw.bannedPeer)
if err != nil {
return err
}
// StopPeerGracefully disconnect from a peer gracefully.
-func (sw *Switch) StopPeerGracefully(peer *Peer) {
- sw.stopAndRemovePeer(peer, nil)
+func (sw *Switch) StopPeerGracefully(peerID string) {
+ if peer := sw.peers.Get(peerID); peer != nil {
+ sw.stopAndRemovePeer(peer, nil)
+ }
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
}
func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
+ sw.peers.Remove(peer)
for _, reactor := range sw.reactors {
reactor.RemovePeer(peer, reason)
}
- sw.peers.Remove(peer)
peer.Stop()
}
return c.store.GetBlock(hash)
}
-// GetBlockByHeight return a block by given height
+// GetBlockByHeight return a block header by given height
func (c *Chain) GetBlockByHeight(height uint64) (*types.Block, error) {
node := c.index.NodeByHeight(height)
if node == nil {
return c.store.GetBlock(&node.Hash)
}
+// GetHeaderByHash return a block header by given hash
+func (c *Chain) GetHeaderByHash(hash *bc.Hash) (*types.BlockHeader, error) {
+ node := c.index.GetNode(hash)
+ if node == nil {
+ return nil, errors.New("can't find block header in given hash")
+ }
+ return node.BlockHeader(), nil
+}
+
+// GetHeaderByHeight return a block header by given height
+func (c *Chain) GetHeaderByHeight(height uint64) (*types.BlockHeader, error) {
+ node := c.index.NodeByHeight(height)
+ if node == nil {
+ return nil, errors.New("can't find block header in given height")
+ }
+ return node.BlockHeader(), nil
+}
+
func (c *Chain) calcReorganizeNodes(node *state.BlockNode) ([]*state.BlockNode, []*state.BlockNode) {
var attachNodes []*state.BlockNode
var detachNodes []*state.BlockNode