OSDN Git Service

Add block fast sync function (#1104)
authoryahtoo <yahtoo.ma@gmail.com>
Thu, 19 Jul 2018 11:09:05 +0000 (19:09 +0800)
committerPaladz <yzhu101@uottawa.ca>
Thu, 19 Jul 2018 11:09:05 +0000 (19:09 +0800)
* Add block headers handler for fast sync

* Add testnet checkpoint

* Add debug info

* Fix GetHeadersMessage bug

* Add debug info

* Fix HeadersMessage bug

* Add HeadersMessage handler

* Fast sync call BlockRequestWorker download block

* Add debug info for measure time

* Add getBlocks for fast sync

* Fix code logic errors

* Del unused variable headersFirstMode

* Fix hash string compare error

* Fix code logic errors

* Fix blocksSend logic error

* Optimize log printing

* Add max blocksMsg package size limit

* Modify fast sync node selection logic

* Fix multi checkpoint fast sync error

* Del unused code prevGetHdrsMtx

* Add fast sync txs merkle root check

* Add fast sync complete new status broadcast

* Change variable hash format

* edit the code format

* delete the unused checkpoint

* edit netsync message byte

* Fix Duplicate Byte BlockRequestMessage

* Add testnet checkpoint

* tmp save

* edit the handle logic

* edit the fast sync

* move tx process out of blockKeeper

* elegant the message

* elegant the peer.go

* elegant block_keeper

* elegant the block_fetcher

* fix api node info bug

* delete unused data

* edit the tx_keeper

* edit ban method

* make sure both size sync tx

* add response bool on every send

* handle all the peer dc

* merge with dev

* fix bug on merge with dev

* add max block peer msg limit

* limit the max send block size

* fix the web wallet display bug

* fix hand shake timepout error

* fix logic if statement

* edit the services flag bug

* add error message log

* edit blocks/headers, marshal/unmarshal

* edit the message field name

* fix sync logic bug

* fix sync bug

* fix a code logic error

19 files changed:
api/nodeinfo.go
common/types.go
consensus/general.go
mining/mining.go
mining/sort.go
mining/tensority/algorithm.go
netsync/block_fetcher.go [new file with mode: 0644]
netsync/block_keeper.go
netsync/fetcher.go [deleted file]
netsync/handle.go
netsync/message.go
netsync/peer.go
netsync/protocol_reactor.go
netsync/sync.go [deleted file]
netsync/tx_keeper.go [new file with mode: 0644]
p2p/peer.go
p2p/pex/pex_reactor.go
p2p/switch.go
protocol/block.go

index 40ef409..b2b022e 100644 (file)
@@ -1,13 +1,13 @@
 package api
 
 import (
-       "net"
        "context"
+       "net"
 
-       "github.com/bytom/version"
-       "github.com/bytom/netsync"
        "github.com/bytom/errors"
+       "github.com/bytom/netsync"
        "github.com/bytom/p2p"
+       "github.com/bytom/version"
 )
 
 // NetInfo indicate net information
@@ -26,24 +26,25 @@ type NetInfo struct {
 func (a *API) GetNodeInfo() *NetInfo {
        info := &NetInfo{
                Listening:    a.sync.Switch().IsListening(),
-               Syncing:      a.sync.BlockKeeper().IsCaughtUp(),
+               Syncing:      !a.sync.IsCaughtUp(),
                Mining:       a.cpuMiner.IsMining(),
                PeerCount:    len(a.sync.Switch().Peers().List()),
                CurrentBlock: a.chain.BestBlockHeight(),
                NetWorkID:    a.sync.NodeInfo().Network,
                Version:      version.Version,
        }
-       _, info.HighestBlock = a.sync.Peers().BestPeer()
+       if bestPeer := a.sync.BestPeer(); bestPeer != nil {
+               info.HighestBlock = bestPeer.Height
+       }
        if info.CurrentBlock > info.HighestBlock {
                info.HighestBlock = info.CurrentBlock
        }
        return info
 }
 
-
 // return the currently connected peers with net address
 func (a *API) getPeerInfoByAddr(addr string) *netsync.PeerInfo {
-       peerInfos := a.sync.Peers().GetPeerInfos()
+       peerInfos := a.sync.GetPeerInfos()
        for _, peerInfo := range peerInfos {
                if peerInfo.RemoteAddr == addr {
                        return peerInfo
@@ -53,13 +54,8 @@ func (a *API) getPeerInfoByAddr(addr string) *netsync.PeerInfo {
 }
 
 // disconnect peer by the peer id
-func (a *API) disconnectPeerById(peerId string) error {
-       if peer, ok := a.sync.Peers().Peer(peerId); ok {
-               swPeer := peer.GetPeer()
-               a.sync.Switch().StopPeerGracefully(swPeer)
-               return nil
-       }
-       return errors.New("peerId not exist")
+func (a *API) disconnectPeerById(peerID string) error {
+       return a.sync.StopPeer(peerID)
 }
 
 // connect peer b y net address
@@ -100,7 +96,7 @@ func (a *API) IsMining() bool {
 
 // return the peers of current node
 func (a *API) listPeers() Response {
-       return NewSuccessResponse(a.sync.Peers().GetPeerInfos())
+       return NewSuccessResponse(a.sync.GetPeerInfos())
 }
 
 // disconnect peer
index 4804144..246b8d7 100644 (file)
@@ -27,6 +27,7 @@ func BytesToHash(b []byte) Hash {
        h.SetBytes(b)
        return h
 }
+
 func StringToHash(s string) Hash { return BytesToHash([]byte(s)) }
 func BigToHash(b *big.Int) Hash  { return BytesToHash(b.Bytes()) }
 func HexToHash(s string) Hash    { return BytesToHash(FromHex(s)) }
index c140a04..08544ae 100644 (file)
@@ -119,7 +119,11 @@ var MainNetParams = Params{
 var TestNetParams = Params{
        Name:            "test",
        Bech32HRPSegwit: "tm",
-       Checkpoints:     []Checkpoint{},
+       Checkpoints: []Checkpoint{
+               {1000, bc.NewHash([32]byte{0x28, 0x29, 0xc9, 0xe2, 0x19, 0x0f, 0xfe, 0xb6, 0xf2, 0x73, 0xde, 0x1a, 0xe8, 0x1f, 0xa6, 0xbc, 0x15, 0xaa, 0x08, 0xa9, 0xb8, 0x4c, 0x43, 0x25, 0x9a, 0xa0, 0x24, 0x8a, 0xd8, 0x55, 0x73, 0xca})},
+               {1500, bc.NewHash([32]byte{0xc2, 0x94, 0x02, 0xd1, 0x6c, 0xa8, 0x18, 0xc4, 0x95, 0x67, 0x48, 0xb8, 0xa8, 0x42, 0xa2, 0x69, 0x8e, 0xdd, 0xb2, 0xa9, 0xb6, 0xd9, 0x30, 0xf4, 0x12, 0xdb, 0x0f, 0x1e, 0x58, 0xc9, 0x69, 0x3b})},
+               {10303, bc.NewHash([32]byte{0x3e, 0x94, 0x5d, 0x35, 0x70, 0x30, 0xd4, 0x3b, 0x3d, 0xe3, 0xdd, 0x80, 0x67, 0x29, 0x9a, 0x5e, 0x09, 0xf9, 0xfb, 0x2b, 0xad, 0x5f, 0x92, 0xc8, 0x69, 0xd1, 0x42, 0x39, 0x74, 0x9a, 0xd1, 0x1c})},
+       },
 }
 
 // SoloNetParams is the config for test-net
index 1bc3e9c..b06417f 100644 (file)
@@ -98,7 +98,7 @@ func NewBlockTemplate(c *protocol.Chain, txPool *protocol.TxPool, accountManager
        b.Transactions = []*types.Tx{nil}
 
        txs := txPool.GetTransactions()
-       sort.Sort(ByTime(txs))
+       sort.Sort(byTime(txs))
        for _, txDesc := range txs {
                tx := txDesc.Tx.Tx
                gasOnlyTx := false
index 90065b5..507f50d 100644 (file)
@@ -2,8 +2,8 @@ package mining
 
 import "github.com/bytom/protocol"
 
-type ByTime []*protocol.TxDesc
+type byTime []*protocol.TxDesc
 
-func (a ByTime) Len() int           { return len(a) }
-func (a ByTime) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
-func (a ByTime) Less(i, j int) bool { return a[i].Added.Unix() < a[j].Added.Unix() }
+func (a byTime) Len() int           { return len(a) }
+func (a byTime) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
+func (a byTime) Less(i, j int) bool { return a[i].Added.Unix() < a[j].Added.Unix() }
index 7906051..967f2d0 100644 (file)
@@ -38,6 +38,12 @@ func (a *Cache) AddCache(hash, seed, result *bc.Hash) {
        a.lruCache.Add(*key, result)
 }
 
+// RemoveCache clean the cached result
+func (a *Cache) RemoveCache(hash, seed *bc.Hash) {
+       key := calcCacheKey(hash, seed)
+       a.lruCache.Remove(key)
+}
+
 // Hash is the real entry for call tensority algorithm
 func (a *Cache) Hash(hash, seed *bc.Hash) *bc.Hash {
        key := calcCacheKey(hash, seed)
diff --git a/netsync/block_fetcher.go b/netsync/block_fetcher.go
new file mode 100644 (file)
index 0000000..b55b5c7
--- /dev/null
@@ -0,0 +1,91 @@
+package netsync
+
+import (
+       log "github.com/sirupsen/logrus"
+       "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+
+       "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
+)
+
+const (
+       maxBlockDistance = 64
+       maxMsgSetSize    = 128
+       newBlockChSize   = 64
+)
+
+// blockFetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type blockFetcher struct {
+       chain *protocol.Chain
+       peers *peerSet
+
+       newBlockCh chan *blockMsg
+       queue      *prque.Prque
+       msgSet     map[bc.Hash]*blockMsg
+}
+
+//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
+func newBlockFetcher(chain *protocol.Chain, peers *peerSet) *blockFetcher {
+       f := &blockFetcher{
+               chain:      chain,
+               peers:      peers,
+               newBlockCh: make(chan *blockMsg, newBlockChSize),
+               queue:      prque.New(),
+               msgSet:     make(map[bc.Hash]*blockMsg),
+       }
+       go f.blockProcessor()
+       return f
+}
+
+func (f *blockFetcher) blockProcessor() {
+       for {
+               height := f.chain.BestBlockHeight()
+               for !f.queue.Empty() {
+                       msg := f.queue.PopItem().(*blockMsg)
+                       if msg.block.Height > height+1 {
+                               f.queue.Push(msg, -float32(msg.block.Height))
+                               break
+                       }
+
+                       f.insert(msg)
+                       delete(f.msgSet, msg.block.Hash())
+               }
+               f.add(<-f.newBlockCh)
+       }
+}
+
+func (f *blockFetcher) add(msg *blockMsg) {
+       bestHeight := f.chain.BestBlockHeight()
+       if len(f.msgSet) > maxMsgSetSize || bestHeight > msg.block.Height || msg.block.Height-bestHeight > maxBlockDistance {
+               return
+       }
+
+       blockHash := msg.block.Hash()
+       if _, ok := f.msgSet[blockHash]; !ok {
+               f.msgSet[blockHash] = msg
+               f.queue.Push(msg, -float32(msg.block.Height))
+               log.WithField("block height", msg.block.Height).Debug("fetcher receive mine block")
+       }
+}
+
+func (f *blockFetcher) insert(msg *blockMsg) {
+       if _, err := f.chain.ProcessBlock(msg.block); err != nil {
+               peer := f.peers.getPeer(msg.peerID)
+               if peer == nil {
+                       return
+               }
+
+               f.peers.addBanScore(msg.peerID, 20, 0, err.Error())
+               return
+       }
+
+       if err := f.peers.broadcastMinedBlock(msg.block); err != nil {
+               log.WithField("err", err).Error("fail on fetcher broadcast new block")
+               return
+       }
+}
+
+func (f *blockFetcher) processNewBlock(msg *blockMsg) {
+       f.newBlockCh <- msg
+}
index 23cd56c..3998050 100644 (file)
 package netsync
 
 import (
+       "container/list"
        "time"
 
        log "github.com/sirupsen/logrus"
 
        "github.com/bytom/consensus"
        "github.com/bytom/errors"
-       "github.com/bytom/p2p"
+       "github.com/bytom/mining/tensority"
        "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
 )
 
 const (
-       maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
-       maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
-
-       syncTimeout        = 30 * time.Second
-       requestRetryTicker = 15 * time.Second
-
-       maxBlocksPending = 1024
-       maxtxsPending    = 32768
-       maxQuitReq       = 256
-
-       maxTxChanSize = 10000 // txChanSize is the size of channel listening to Txpool newTxCh
+       syncTimeout           = 30 * time.Second
+       syncCycle             = 5 * time.Second
+       blockProcessChSize    = 1024
+       blocksProcessChSize   = 128
+       headersProcessChSize  = 1024
+       maxBlockPerMsg        = 128
+       maxBlockHeadersPerMsg = 2048
 )
 
 var (
-       errGetBlockTimeout = errors.New("Get block Timeout")
-       errPeerDropped     = errors.New("Peer dropped")
-       errGetBlockByHash  = errors.New("Get block by hash error")
-       errBroadcastStatus = errors.New("Broadcast new status block error")
-       errReqBlock        = errors.New("Request block error")
-       errPeerNotRegister = errors.New("peer is not registered")
+       errRequestTimeout = errors.New("request timeout")
+       errPeerDropped    = errors.New("Peer dropped")
+       errPeerMisbehave  = errors.New("peer is misbehave")
 )
 
-//TODO: add retry mechanism
+type blockMsg struct {
+       block  *types.Block
+       peerID string
+}
+
+type blocksMsg struct {
+       blocks []*types.Block
+       peerID string
+}
+
+type headersMsg struct {
+       headers []*types.BlockHeader
+       peerID  string
+}
+
 type blockKeeper struct {
        chain *protocol.Chain
-       sw    *p2p.Switch
        peers *peerSet
 
-       pendingProcessCh chan *blockPending
-       txsProcessCh     chan *txsNotify
-       quitReqBlockCh   chan *string
+       syncPeer         *peer
+       blockProcessCh   chan *blockMsg
+       blocksProcessCh  chan *blocksMsg
+       headersProcessCh chan *headersMsg
+
+       headerList *list.List
 }
 
-func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
+func newBlockKeeper(chain *protocol.Chain, peers *peerSet) *blockKeeper {
        bk := &blockKeeper{
                chain:            chain,
-               sw:               sw,
                peers:            peers,
-               pendingProcessCh: make(chan *blockPending, maxBlocksPending),
-               txsProcessCh:     make(chan *txsNotify, maxtxsPending),
-               quitReqBlockCh:   quitReqBlockCh,
+               blockProcessCh:   make(chan *blockMsg, blockProcessChSize),
+               blocksProcessCh:  make(chan *blocksMsg, blocksProcessChSize),
+               headersProcessCh: make(chan *headersMsg, headersProcessChSize),
+               headerList:       list.New(),
        }
-       go bk.txsProcessWorker()
+       bk.resetHeaderState()
+       go bk.syncWorker()
        return bk
 }
 
-func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
-       bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
+func (bk *blockKeeper) appendHeaderList(headers []*types.BlockHeader) error {
+       for _, header := range headers {
+               prevHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+               if prevHeader.Hash() != header.PreviousBlockHash {
+                       return errors.New("fail to append list due to order dismatch")
+               }
+               bk.headerList.PushBack(header)
+       }
+       return nil
 }
 
-func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
-       bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
-}
+func (bk *blockKeeper) blockLocator() []*bc.Hash {
+       header := bk.chain.BestBlockHeader()
+       locator := []*bc.Hash{}
 
-func (bk *blockKeeper) IsCaughtUp() bool {
-       _, height := bk.peers.BestPeer()
-       return bk.chain.BestBlockHeight() < height
-}
+       step := uint64(1)
+       for header != nil {
+               headerHash := header.Hash()
+               locator = append(locator, &headerHash)
+               if header.Height == 0 {
+                       break
+               }
 
-func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
-       num := bk.chain.BestBlockHeight() + 1
-       currentHash := bk.chain.BestBlockHash()
-       orphanNum := uint64(0)
-       reqNum := uint64(0)
-       isOrphan := false
-       bkPeer, ok := bk.peers.Peer(peerID)
-       if !ok {
-               log.Info("peer is not registered")
-               return errPeerNotRegister
-       }
-       swPeer := bkPeer.GetPeer()
-       for 0 < num && num <= maxPeerHeight {
-               if isOrphan {
-                       reqNum = orphanNum
+               var err error
+               if header.Height < step {
+                       header, err = bk.chain.GetHeaderByHeight(0)
                } else {
-                       reqNum = num
-               }
-               block, err := bk.BlockRequest(peerID, reqNum)
-               if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
-                       log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
-                       if bkPeer == nil {
-                               log.Info("peer is not registered")
-                               break
-                       }
-                       log.Info("Block keeper request block error. Stop peer.")
-                       bk.sw.StopPeerGracefully(swPeer)
-                       break
+                       header, err = bk.chain.GetHeaderByHeight(header.Height - step)
                }
-               isOrphan, err = bk.chain.ProcessBlock(block)
                if err != nil {
-                       if bkPeer == nil {
-                               log.Info("peer is deleted")
-                               break
-                       }
-                       if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
-                               bk.sw.AddBannedPeer(swPeer)
-                               bk.sw.StopPeerGracefully(swPeer)
-                       }
-                       log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
+                       log.WithField("err", err).Error("blockKeeper fail on get blockLocator")
                        break
                }
-               if isOrphan {
-                       orphanNum = block.Height - 1
-                       continue
+
+               if len(locator) > 10 {
+                       step *= 2
                }
-               num++
        }
-       bestHash := bk.chain.BestBlockHash()
-       log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
-       if currentHash.String() != bestHash.String() {
-               log.Info("Broadcast new chain status.")
+       return locator
+}
 
-               block, err := bk.chain.GetBlockByHash(bestHash)
+func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
+       bk.resetHeaderState()
+       lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
+       for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
+               if lastHeader.Height >= checkPoint.Height {
+                       return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
+               }
+
+               lastHash := lastHeader.Hash()
+               headers, err := bk.requireHeaders([]*bc.Hash{&lastHash}, &checkPoint.Hash)
                if err != nil {
-                       log.Errorf("Failed on sync complete broadcast status get block %v", err)
-                       return errGetBlockByHash
+                       return err
+               }
+
+               if len(headers) == 0 {
+                       return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
                }
 
-               peers, err := bk.peers.BroadcastNewStatus(block)
+               if err := bk.appendHeaderList(headers); err != nil {
+                       return err
+               }
+       }
+
+       fastHeader := bk.headerList.Front()
+       for bk.chain.BestBlockHeight() < checkPoint.Height {
+               locator := bk.blockLocator()
+               blocks, err := bk.requireBlocks(locator, &checkPoint.Hash)
                if err != nil {
-                       log.Errorf("Failed on broadcast new status block %v", err)
-                       return errBroadcastStatus
+                       return err
+               }
+
+               if len(blocks) == 0 {
+                       return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
                }
-               for _, peer := range peers {
-                       if peer == nil {
-                               return errPeerNotRegister
+
+               for _, block := range blocks {
+                       if fastHeader = fastHeader.Next(); fastHeader == nil {
+                               return errors.New("get block than is higher than checkpoint")
+                       }
+
+                       blockHash := block.Hash()
+                       if blockHash != fastHeader.Value.(*types.BlockHeader).Hash() {
+                               return errPeerMisbehave
+                       }
+
+                       seed, err := bk.chain.CalcNextSeed(&block.PreviousBlockHash)
+                       if err != nil {
+                               return errors.Wrap(err, "fail on fastBlockSync calculate seed")
+                       }
+
+                       tensority.AIHash.AddCache(&blockHash, seed, &bc.Hash{})
+                       _, err = bk.chain.ProcessBlock(block)
+                       tensority.AIHash.RemoveCache(&blockHash, seed)
+                       if err != nil {
+                               return errors.Wrap(err, "fail on fastBlockSync process block")
                        }
-                       swPeer := peer.GetPeer()
-                       log.Info("Block keeper broadcast block error. Stop peer.")
-                       bk.sw.StopPeerGracefully(swPeer)
                }
        }
        return nil
 }
 
-func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
-       return bk.peers.requestBlockByHeight(peerID, height)
+func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+       headers, err := bk.locateHeaders(locator, stopHash)
+       if err != nil {
+               return nil, err
+       }
+
+       blocks := []*types.Block{}
+       for i, header := range headers {
+               if i > maxBlockPerMsg {
+                       break
+               }
+
+               headerHash := header.Hash()
+               block, err := bk.chain.GetBlockByHash(&headerHash)
+               if err != nil {
+                       return nil, err
+               }
+
+               blocks = append(blocks, block)
+       }
+       return blocks, nil
+}
+
+func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+       stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
+       if err != nil {
+               return nil, err
+       }
+
+       startHeader, err := bk.chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
+
+       for _, hash := range locator {
+               header, err := bk.chain.GetHeaderByHash(hash)
+               if err == nil && bk.chain.InMainChain(header.Hash()) {
+                       startHeader = header
+                       break
+               }
+       }
+
+       totalHeaders := stopHeader.Height - startHeader.Height
+       if totalHeaders > maxBlockHeadersPerMsg {
+               totalHeaders = maxBlockHeadersPerMsg
+       }
+
+       headers := []*types.BlockHeader{}
+       for i := uint64(1); i <= totalHeaders; i++ {
+               header, err := bk.chain.GetHeaderByHeight(startHeader.Height + i)
+               if err != nil {
+                       return nil, err
+               }
+
+               headers = append(headers, header)
+       }
+       return headers, nil
 }
 
 func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
@@ -169,57 +249,151 @@ func (bk *blockKeeper) nextCheckpoint() *consensus.Checkpoint {
        return nextCheckpoint
 }
 
-func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
-       var block *types.Block
+func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
+       bk.blockProcessCh <- &blockMsg{block: block, peerID: peerID}
+}
+
+func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
+       bk.blocksProcessCh <- &blocksMsg{blocks: blocks, peerID: peerID}
+}
+
+func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
+       bk.headersProcessCh <- &headersMsg{headers: headers, peerID: peerID}
+}
+
+func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
+       i := bk.chain.BestBlockHeight() + 1
+       for i <= wantHeight {
+               block, err := bk.requireBlock(i)
+               if err != nil {
+                       return err
+               }
 
-       if err := bk.blockRequest(peerID, height); err != nil {
-               return nil, errReqBlock
+               isOrphan, err := bk.chain.ProcessBlock(block)
+               if err != nil {
+                       return err
+               }
+
+               if isOrphan {
+                       i--
+                       continue
+               }
+               i = bk.chain.BestBlockHeight() + 1
        }
-       retryTicker := time.Tick(requestRetryTicker)
-       syncWait := time.NewTimer(syncTimeout)
+       return nil
+}
 
+func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
+       if ok := bk.syncPeer.getBlockByHeight(height); !ok {
+               return nil, errPeerDropped
+       }
+
+       waitTicker := time.NewTimer(syncTimeout)
        for {
                select {
-               case pendingResponse := <-bk.pendingProcessCh:
-                       block = pendingResponse.block
-                       if pendingResponse.peerID != peerID {
-                               log.Warning("From different peer")
+               case msg := <-bk.blockProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
                                continue
                        }
-                       if block.Height != height {
-                               log.Warning("Block height error")
+                       if msg.block.Height != height {
                                continue
                        }
-                       return block, nil
-               case <-retryTicker:
-                       if err := bk.blockRequest(peerID, height); err != nil {
-                               return nil, errReqBlock
-                       }
-               case <-syncWait.C:
-                       log.Warning("Request block timeout")
-                       return nil, errGetBlockTimeout
-               case peerid := <-bk.quitReqBlockCh:
-                       if *peerid == peerID {
-                               log.Info("Quite block request worker")
-                               return nil, errPeerDropped
+                       return msg.block, nil
+               case <-waitTicker.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireBlock")
+               }
+       }
+}
+
+func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
+       if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
+               return nil, errPeerDropped
+       }
+
+       waitTicker := time.NewTimer(syncTimeout)
+       for {
+               select {
+               case msg := <-bk.blocksProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
+                               continue
                        }
+                       return msg.blocks, nil
+               case <-waitTicker.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireBlocks")
                }
        }
 }
 
-func (bk *blockKeeper) txsProcessWorker() {
-       for txsResponse := range bk.txsProcessCh {
-               tx := txsResponse.tx
-               log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
-               bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
-               if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
-                       if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
-                               swPeer := bkPeer.GetPeer()
-                               if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
-                                       bk.sw.AddBannedPeer(swPeer)
-                                       bk.sw.StopPeerGracefully(swPeer)
-                               }
+func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
+       if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
+               return nil, errPeerDropped
+       }
+
+       waitTicker := time.NewTimer(syncTimeout)
+       for {
+               select {
+               case msg := <-bk.headersProcessCh:
+                       if msg.peerID != bk.syncPeer.ID() {
+                               continue
                        }
+                       return msg.headers, nil
+               case <-waitTicker.C:
+                       return nil, errors.Wrap(errRequestTimeout, "requireHeaders")
+               }
+       }
+}
+
+// resetHeaderState sets the headers-first mode state to values appropriate for
+// syncing from a new peer.
+func (bk *blockKeeper) resetHeaderState() {
+       header := bk.chain.BestBlockHeader()
+       bk.headerList.Init()
+       if bk.nextCheckpoint() != nil {
+               bk.headerList.PushBack(header)
+       }
+}
+
+func (bk *blockKeeper) startSync() bool {
+       checkPoint := bk.nextCheckpoint()
+       peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
+       if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
+               bk.syncPeer = peer
+               if err := bk.fastBlockSync(checkPoint); err != nil {
+                       log.WithField("err", err).Warning("fail on fastBlockSync")
+                       bk.peers.addBanScore(peer.ID(), 0, 40, err.Error())
+                       return false
+               }
+               return true
+       }
+
+       peer = bk.peers.bestPeer(consensus.SFFullNode)
+       if peer != nil && peer.Height() > bk.chain.BestBlockHeight() {
+               bk.syncPeer = peer
+               if err := bk.regularBlockSync(peer.Height()); err != nil {
+                       log.WithField("err", err).Warning("fail on regularBlockSync")
+                       bk.peers.addBanScore(peer.ID(), 0, 40, err.Error())
+                       return false
+               }
+               return true
+       }
+       return false
+}
+
+func (bk *blockKeeper) syncWorker() {
+       syncTicker := time.NewTicker(syncCycle)
+       for {
+               <-syncTicker.C
+               if update := bk.startSync(); !update {
+                       continue
+               }
+
+               block, err := bk.chain.GetBlockByHeight(bk.chain.BestBlockHeight())
+               if err != nil {
+                       log.WithField("err", err).Error("fail on syncWorker get best block")
+               }
+
+               if err := bk.peers.broadcastMinedBlock(block); err != nil {
+                       log.WithField("err", err).Error("fail on syncWorker broadcast new block")
                }
        }
 }
diff --git a/netsync/fetcher.go b/netsync/fetcher.go
deleted file mode 100644 (file)
index be16e33..0000000
+++ /dev/null
@@ -1,184 +0,0 @@
-package netsync
-
-import (
-       "errors"
-
-       log "github.com/sirupsen/logrus"
-       "gopkg.in/karalabe/cookiejar.v2/collections/prque"
-
-       "github.com/bytom/p2p"
-       core "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
-)
-
-const (
-       maxQueueDist = 1024 //32 // Maximum allowed distance from the chain head to queue
-)
-
-var (
-       errTerminated = errors.New("terminated")
-)
-
-// Fetcher is responsible for accumulating block announcements from various peers
-// and scheduling them for retrieval.
-type Fetcher struct {
-       chain *core.Chain
-       sw    *p2p.Switch
-       peers *peerSet
-
-       // Various event channels
-       newMinedBlock chan *blockPending
-       quit          chan struct{}
-
-       // Block cache
-       queue  *prque.Prque              // Queue containing the import operations (block number sorted)
-       queued map[bc.Hash]*blockPending // Set of already queued blocks (to dedup imports)
-}
-
-//NewFetcher New creates a block fetcher to retrieve blocks of the new mined.
-func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
-       return &Fetcher{
-               chain:         chain,
-               sw:            sw,
-               peers:         peers,
-               newMinedBlock: make(chan *blockPending),
-               quit:          make(chan struct{}),
-               queue:         prque.New(),
-               queued:        make(map[bc.Hash]*blockPending),
-       }
-}
-
-// Start boots up the announcement based synchroniser, accepting and processing
-// hash notifications and block fetches until termination requested.
-func (f *Fetcher) Start() {
-       go f.loop()
-}
-
-// Stop terminates the announcement based synchroniser, canceling all pending
-// operations.
-func (f *Fetcher) Stop() {
-       close(f.quit)
-}
-
-// Enqueue tries to fill gaps the the fetcher's future import queue.
-func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
-       op := &blockPending{
-               peerID: peer,
-               block:  block,
-       }
-       select {
-       case f.newMinedBlock <- op:
-               return nil
-       case <-f.quit:
-               return errTerminated
-       }
-}
-
-// Loop is the main fetcher loop, checking and processing various notification
-// events.
-func (f *Fetcher) loop() {
-       for {
-               // Import any queued blocks that could potentially fit
-               height := f.chain.BestBlockHeight()
-               for !f.queue.Empty() {
-                       op := f.queue.PopItem().(*blockPending)
-                       // If too high up the chain or phase, continue later
-                       number := op.block.Height
-                       if number > height+1 {
-                               f.queue.Push(op, -float32(op.block.Height))
-                               break
-                       }
-                       // Otherwise if fresh and still unknown, try and import
-                       hash := op.block.Hash()
-                       block, _ := f.chain.GetBlockByHash(&hash)
-                       if block != nil {
-                               f.forgetBlock(hash)
-                               continue
-                       }
-                       if op.block.PreviousBlockHash.String() != f.chain.BestBlockHash().String() {
-                               f.forgetBlock(hash)
-                               continue
-                       }
-                       f.insert(op.peerID, op.block)
-               }
-               // Wait for an outside event to occur
-               select {
-               case <-f.quit:
-                       // Fetcher terminating, abort all operations
-                       return
-
-               case op := <-f.newMinedBlock:
-                       // A direct block insertion was requested, try and fill any pending gaps
-                       f.enqueue(op.peerID, op.block)
-               }
-       }
-}
-
-// enqueue schedules a new future import operation, if the block to be imported
-// has not yet been seen.
-func (f *Fetcher) enqueue(peer string, block *types.Block) {
-       hash := block.Hash()
-
-       //TODO: Ensure the peer isn't DOSing us
-       // Discard any past or too distant blocks
-       if dist := int64(block.Height) - int64(f.chain.BestBlockHeight()); dist < 0 || dist > maxQueueDist {
-               log.Info("Discarded propagated block, too far away", " peer: ", peer, "number: ", block.Height, "distance: ", dist)
-               return
-       }
-       // Schedule the block for future importing
-       if _, ok := f.queued[hash]; !ok {
-               op := &blockPending{
-                       peerID: peer,
-                       block:  block,
-               }
-               f.queued[hash] = op
-               f.queue.Push(op, -float32(block.Height))
-               log.Info("Queued receive mine block.", " peer:", peer, " number:", block.Height, " queued:", f.queue.Size())
-       }
-}
-
-// insert spawns a new goroutine to run a block insertion into the chain. If the
-// block's number is at the same height as the current import phase, it updates
-// the phase states accordingly.
-func (f *Fetcher) insert(peerID string, block *types.Block) {
-       // Run the import on a new thread
-       log.Info("Importing propagated block", " from peer: ", peerID, " height: ", block.Height)
-       // Run the actual import and log any issues
-       if _, err := f.chain.ProcessBlock(block); err != nil {
-               log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
-               fPeer, ok := f.peers.Peer(peerID)
-               if !ok {
-                       return
-               }
-               swPeer := fPeer.GetPeer()
-               if ban := fPeer.addBanScore(20, 0, "block process error"); ban {
-                       f.sw.AddBannedPeer(swPeer)
-                       f.sw.StopPeerGracefully(swPeer)
-               }
-               return
-       }
-       // If import succeeded, broadcast the block
-       log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
-       peers, err := f.peers.BroadcastMinedBlock(block)
-       if err != nil {
-               log.Errorf("Broadcast mine block error. %v", err)
-               return
-       }
-       for _, fPeer := range peers {
-               if fPeer == nil {
-                       continue
-               }
-               swPeer := fPeer.GetPeer()
-               log.Info("Fetcher broadcast block error. Stop peer.")
-               f.sw.StopPeerGracefully(swPeer)
-       }
-}
-
-// forgetBlock removes all traces of a queued block from the fetcher's internal
-// state.
-func (f *Fetcher) forgetBlock(hash bc.Hash) {
-       if insert := f.queued[hash]; insert != nil {
-               delete(f.queued, hash)
-       }
-}
index c678619..033a93b 100644 (file)
@@ -2,6 +2,7 @@ package netsync
 
 import (
        "encoding/hex"
+       "errors"
        "net"
        "path"
        "strconv"
@@ -22,50 +23,55 @@ import (
        "github.com/bytom/version"
 )
 
+const (
+       maxTxChanSize = 10000
+)
+
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
-       sw *p2p.Switch
-
-       privKey     crypto.PrivKeyEd25519 // local node's p2p key
-       chain       *core.Chain
-       txPool      *core.TxPool
-       fetcher     *Fetcher
-       blockKeeper *blockKeeper
-       peers       *peerSet
-
-       newTxCh       chan *types.Tx
-       newBlockCh    chan *bc.Hash
-       newPeerCh     chan struct{}
-       txSyncCh      chan *txsync
-       dropPeerCh    chan *string
-       quitSync      chan struct{}
-       config        *cfg.Config
-       synchronising int32
+       sw          *p2p.Switch
+       genesisHash bc.Hash
+
+       privKey      crypto.PrivKeyEd25519 // local node's p2p key
+       chain        *core.Chain
+       txPool       *core.TxPool
+       blockFetcher *blockFetcher
+       blockKeeper  *blockKeeper
+       peers        *peerSet
+
+       newTxCh    chan *types.Tx
+       newBlockCh chan *bc.Hash
+       txSyncCh   chan *txSyncMsg
+       quitSync   chan struct{}
+       config     *cfg.Config
 }
 
 //NewSyncManager create a sync manager
 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+       genesisHeader, err := chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
+
        sw := p2p.NewSwitch(config)
-       peers := newPeerSet()
-       dropPeerCh := make(chan *string, maxQuitReq)
+       peers := newPeerSet(sw)
        manager := &SyncManager{
-               sw:          sw,
-               txPool:      txPool,
-               chain:       chain,
-               privKey:     crypto.GenPrivKeyEd25519(),
-               fetcher:     NewFetcher(chain, sw, peers),
-               blockKeeper: newBlockKeeper(chain, sw, peers, dropPeerCh),
-               peers:       peers,
-               newTxCh:     make(chan *types.Tx, maxTxChanSize),
-               newBlockCh:  newBlockCh,
-               newPeerCh:   make(chan struct{}),
-               txSyncCh:    make(chan *txsync),
-               dropPeerCh:  dropPeerCh,
-               quitSync:    make(chan struct{}),
-               config:      config,
-       }
-
-       protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
+               sw:           sw,
+               genesisHash:  genesisHeader.Hash(),
+               txPool:       txPool,
+               chain:        chain,
+               privKey:      crypto.GenPrivKeyEd25519(),
+               blockFetcher: newBlockFetcher(chain, peers),
+               blockKeeper:  newBlockKeeper(chain, peers),
+               peers:        peers,
+               newTxCh:      make(chan *types.Tx, maxTxChanSize),
+               newBlockCh:   newBlockCh,
+               txSyncCh:     make(chan *txSyncMsg),
+               quitSync:     make(chan struct{}),
+               config:       config,
+       }
+
+       protocolReactor := NewProtocolReactor(manager, manager.peers)
        manager.sw.AddReactor("PROTOCOL", protocolReactor)
 
        // Create & add listener
@@ -89,6 +95,199 @@ func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool,
        return manager, nil
 }
 
+//BestPeer return the highest p2p peerInfo
+func (sm *SyncManager) BestPeer() *PeerInfo {
+       bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
+       if bestPeer != nil {
+               return bestPeer.getPeerInfo()
+       }
+       return nil
+}
+
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
+       return sm.newTxCh
+}
+
+//GetPeerInfos return peer info of all peers
+func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
+       return sm.peers.getPeerInfos()
+}
+
+//IsCaughtUp check wheather the peer finish the sync
+func (sm *SyncManager) IsCaughtUp() bool {
+       peer := sm.peers.bestPeer(consensus.SFFullNode)
+       return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+}
+
+//NodeInfo get P2P peer node info
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+       return sm.sw.NodeInfo()
+}
+
+//StopPeer try to stop peer by given ID
+func (sm *SyncManager) StopPeer(peerID string) error {
+       if peer := sm.peers.getPeer(peerID); peer == nil {
+               return errors.New("peerId not exist")
+       }
+       sm.peers.removePeer(peerID)
+       return nil
+}
+
+//Switch get sync manager switch
+func (sm *SyncManager) Switch() *p2p.Switch {
+       return sm.sw
+}
+
+func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+       sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
+}
+
+func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+       blocks, err := msg.GetBlocks()
+       if err != nil {
+               log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
+               return
+       }
+
+       sm.blockKeeper.processBlocks(peer.ID(), blocks)
+}
+
+func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+       var block *types.Block
+       var err error
+       if msg.Height != 0 {
+               block, err = sm.chain.GetBlockByHeight(msg.Height)
+       } else {
+               block, err = sm.chain.GetBlockByHash(msg.GetHash())
+       }
+       if err != nil {
+               log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
+               return
+       }
+
+       ok, err := peer.sendBlock(block)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
+       }
+}
+
+func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+       blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+       if err != nil || len(blocks) == 0 {
+               return
+       }
+
+       totalSize := 0
+       sendBlocks := []*types.Block{}
+       for _, block := range blocks {
+               rawData, err := block.MarshalText()
+               if err != nil {
+                       log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
+                       continue
+               }
+
+               if totalSize+len(rawData) > maxBlockchainResponseSize-16 {
+                       break
+               }
+               totalSize += len(rawData)
+               sendBlocks = append(sendBlocks, block)
+       }
+
+       ok, err := peer.sendBlocks(sendBlocks)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
+       }
+}
+
+func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+       headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+       if err != nil || len(headers) == 0 {
+               log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
+               return
+       }
+
+       ok, err := peer.sendHeaders(headers)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
+       }
+}
+
+func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+       headers, err := msg.GetHeaders()
+       if err != nil {
+               log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
+               return
+       }
+
+       sm.blockKeeper.processHeaders(peer.ID(), headers)
+}
+
+func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
+       block, err := msg.GetMineBlock()
+       if err != nil {
+               log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
+               return
+       }
+
+       hash := block.Hash()
+       peer.markBlock(&hash)
+       sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
+       peer.setStatus(block.Height, &hash)
+}
+
+func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
+       bestHeader := sm.chain.BestBlockHeader()
+       genesisBlock, err := sm.chain.GetBlockByHeight(0)
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
+       }
+
+       genesisHash := genesisBlock.Hash()
+       msg := NewStatusResponseMessage(bestHeader, &genesisHash)
+       if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+}
+
+func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
+       if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
+               peer.setStatus(msg.Height, msg.GetHash())
+               return
+       }
+
+       if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
+               log.WithFields(log.Fields{
+                       "remote genesis": genesisHash.String(),
+                       "local genesis":  sm.genesisHash.String(),
+               }).Warn("fail hand shake due to differnt genesis")
+               return
+       }
+
+       sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
+}
+
+func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+       tx, err := msg.GetTransaction()
+       if err != nil {
+               sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+               return
+       }
+
+       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
+               sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+       }
+}
+
 // Defaults to tcp
 func protocolAndAddress(listenAddr string) (string, string) {
        p, address := "tcp", listenAddr
@@ -132,14 +331,8 @@ func (sm *SyncManager) Start() {
        }
        // broadcast transactions
        go sm.txBroadcastLoop()
-
-       // broadcast mined blocks
        go sm.minedBroadcastLoop()
-
-       // start sync handlers
-       go sm.syncer()
-
-       go sm.txsyncLoop()
+       go sm.txSyncLoop()
 }
 
 //Stop stop sync manager
@@ -180,29 +373,6 @@ func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16)
        return ntab, nil
 }
 
-func (sm *SyncManager) txBroadcastLoop() {
-       for {
-               select {
-               case newTx := <-sm.newTxCh:
-                       peers, err := sm.peers.BroadcastTx(newTx)
-                       if err != nil {
-                               log.Errorf("Broadcast new tx error. %v", err)
-                               return
-                       }
-                       for _, smPeer := range peers {
-                               if smPeer == nil {
-                                       continue
-                               }
-                               swPeer := smPeer.GetPeer()
-                               log.Info("Tx broadcast error. Stop Peer.")
-                               sm.sw.StopPeerGracefully(swPeer)
-                       }
-               case <-sm.quitSync:
-                       return
-               }
-       }
-}
-
 func (sm *SyncManager) minedBroadcastLoop() {
        for {
                select {
@@ -212,46 +382,12 @@ func (sm *SyncManager) minedBroadcastLoop() {
                                log.Errorf("Failed on mined broadcast loop get block %v", err)
                                return
                        }
-                       peers, err := sm.peers.BroadcastMinedBlock(block)
-                       if err != nil {
+                       if err := sm.peers.broadcastMinedBlock(block); err != nil {
                                log.Errorf("Broadcast mine block error. %v", err)
                                return
                        }
-                       for _, smPeer := range peers {
-                               if smPeer == nil {
-                                       continue
-                               }
-                               swPeer := smPeer.GetPeer()
-                               log.Info("New mined block broadcast error. Stop Peer.")
-                               sm.sw.StopPeerGracefully(swPeer)
-                       }
                case <-sm.quitSync:
                        return
                }
        }
 }
-
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
-       return sm.sw.NodeInfo()
-}
-
-//BlockKeeper get block keeper
-func (sm *SyncManager) BlockKeeper() *blockKeeper {
-       return sm.blockKeeper
-}
-
-//Peers get sync manager peer set
-func (sm *SyncManager) Peers() *peerSet {
-       return sm.peers
-}
-
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
-       return sm.sw
-}
-
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
-       return sm.newTxCh
-}
index ec6e1c9..b413651 100644 (file)
@@ -2,6 +2,7 @@ package netsync
 
 import (
        "bytes"
+       "encoding/json"
        "errors"
        "fmt"
 
@@ -11,41 +12,41 @@ import (
        "github.com/bytom/protocol/bc/types"
 )
 
-//protocol msg
+//protocol msg byte
 const (
-       BlockRequestByte   = byte(0x10)
-       BlockResponseByte  = byte(0x11)
-       StatusRequestByte  = byte(0x20)
-       StatusResponseByte = byte(0x21)
-       NewTransactionByte = byte(0x30)
-       NewMineBlockByte   = byte(0x40)
+       BlockchainChannel = byte(0x40)
+
+       BlockRequestByte    = byte(0x10)
+       BlockResponseByte   = byte(0x11)
+       HeadersRequestByte  = byte(0x12)
+       HeadersResponseByte = byte(0x13)
+       BlocksRequestByte   = byte(0x14)
+       BlocksResponseByte  = byte(0x15)
+       StatusRequestByte   = byte(0x20)
+       StatusResponseByte  = byte(0x21)
+       NewTransactionByte  = byte(0x30)
+       NewMineBlockByte    = byte(0x40)
 
        maxBlockchainResponseSize = 22020096 + 2
 )
 
-// BlockchainMessage is a generic message for this reactor.
+//BlockchainMessage is a generic message for this reactor.
 type BlockchainMessage interface{}
 
 var _ = wire.RegisterInterface(
        struct{ BlockchainMessage }{},
-       wire.ConcreteType{&BlockRequestMessage{}, BlockRequestByte},
-       wire.ConcreteType{&BlockResponseMessage{}, BlockResponseByte},
+       wire.ConcreteType{&GetBlockMessage{}, BlockRequestByte},
+       wire.ConcreteType{&BlockMessage{}, BlockResponseByte},
+       wire.ConcreteType{&GetHeadersMessage{}, HeadersRequestByte},
+       wire.ConcreteType{&HeadersMessage{}, HeadersResponseByte},
+       wire.ConcreteType{&GetBlocksMessage{}, BlocksRequestByte},
+       wire.ConcreteType{&BlocksMessage{}, BlocksResponseByte},
        wire.ConcreteType{&StatusRequestMessage{}, StatusRequestByte},
        wire.ConcreteType{&StatusResponseMessage{}, StatusResponseByte},
-       wire.ConcreteType{&TransactionNotifyMessage{}, NewTransactionByte},
+       wire.ConcreteType{&TransactionMessage{}, NewTransactionByte},
        wire.ConcreteType{&MineBlockMessage{}, NewMineBlockByte},
 )
 
-type blockPending struct {
-       block  *types.Block
-       peerID string
-}
-
-type txsNotify struct {
-       tx     *types.Tx
-       peerID string
-}
-
 //DecodeMessage decode msg
 func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
        msgType = bz[0]
@@ -58,43 +59,43 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
        return
 }
 
-//BlockRequestMessage request blocks from remote peers by height/hash
-type BlockRequestMessage struct {
+//GetBlockMessage request blocks from remote peers by height/hash
+type GetBlockMessage struct {
        Height  uint64
        RawHash [32]byte
 }
 
-//GetHash get hash
-func (m *BlockRequestMessage) GetHash() *bc.Hash {
+//GetHash reutrn the hash of the request
+func (m *GetBlockMessage) GetHash() *bc.Hash {
        hash := bc.NewHash(m.RawHash)
        return &hash
 }
 
 //String convert msg to string
-func (m *BlockRequestMessage) String() string {
+func (m *GetBlockMessage) String() string {
        if m.Height > 0 {
-               return fmt.Sprintf("BlockRequestMessage{Height: %d}", m.Height)
+               return fmt.Sprintf("GetBlockMessage{Height: %d}", m.Height)
        }
        hash := m.GetHash()
-       return fmt.Sprintf("BlockRequestMessage{Hash: %s}", hash.String())
+       return fmt.Sprintf("GetBlockMessage{Hash: %s}", hash.String())
 }
 
-//BlockResponseMessage response get block msg
-type BlockResponseMessage struct {
+//BlockMessage response get block msg
+type BlockMessage struct {
        RawBlock []byte
 }
 
-//NewBlockResponseMessage construct bock response msg
-func NewBlockResponseMessage(block *types.Block) (*BlockResponseMessage, error) {
+//NewBlockMessage construct bock response msg
+func NewBlockMessage(block *types.Block) (*BlockMessage, error) {
        rawBlock, err := block.MarshalText()
        if err != nil {
                return nil, err
        }
-       return &BlockResponseMessage{RawBlock: rawBlock}, nil
+       return &BlockMessage{RawBlock: rawBlock}, nil
 }
 
 //GetBlock get block from msg
-func (m *BlockResponseMessage) GetBlock() *types.Block {
+func (m *BlockMessage) GetBlock() *types.Block {
        block := &types.Block{
                BlockHeader:  types.BlockHeader{},
                Transactions: []*types.Tx{},
@@ -104,36 +105,140 @@ func (m *BlockResponseMessage) GetBlock() *types.Block {
 }
 
 //String convert msg to string
-func (m *BlockResponseMessage) String() string {
-       return fmt.Sprintf("BlockResponseMessage{Size: %d}", len(m.RawBlock))
+func (m *BlockMessage) String() string {
+       return fmt.Sprintf("BlockMessage{Size: %d}", len(m.RawBlock))
 }
 
-//TransactionNotifyMessage notify new tx msg
-type TransactionNotifyMessage struct {
-       RawTx []byte
+//GetHeadersMessage is one of the bytom msg type
+type GetHeadersMessage struct {
+       RawBlockLocator [][32]byte
+       RawStopHash     [32]byte
 }
 
-//NewTransactionNotifyMessage construct notify new tx msg
-func NewTransactionNotifyMessage(tx *types.Tx) (*TransactionNotifyMessage, error) {
-       rawTx, err := tx.TxData.MarshalText()
-       if err != nil {
-               return nil, err
+//NewGetHeadersMessage return a new GetHeadersMessage
+func NewGetHeadersMessage(blockLocator []*bc.Hash, stopHash *bc.Hash) *GetHeadersMessage {
+       msg := &GetHeadersMessage{
+               RawStopHash: stopHash.Byte32(),
+       }
+       for _, hash := range blockLocator {
+               msg.RawBlockLocator = append(msg.RawBlockLocator, hash.Byte32())
        }
-       return &TransactionNotifyMessage{RawTx: rawTx}, nil
+       return msg
 }
 
-//GetTransaction get tx from msg
-func (m *TransactionNotifyMessage) GetTransaction() (*types.Tx, error) {
-       tx := &types.Tx{}
-       if err := tx.UnmarshalText(m.RawTx); err != nil {
-               return nil, err
+//GetBlockLocator return the locator of the msg
+func (msg *GetHeadersMessage) GetBlockLocator() []*bc.Hash {
+       blockLocator := []*bc.Hash{}
+       for _, rawHash := range msg.RawBlockLocator {
+               hash := bc.NewHash(rawHash)
+               blockLocator = append(blockLocator, &hash)
        }
-       return tx, nil
+       return blockLocator
 }
 
-//String
-func (m *TransactionNotifyMessage) String() string {
-       return fmt.Sprintf("TransactionNotifyMessage{Size: %d}", len(m.RawTx))
+//GetStopHash return the stop hash of the msg
+func (msg *GetHeadersMessage) GetStopHash() *bc.Hash {
+       hash := bc.NewHash(msg.RawStopHash)
+       return &hash
+}
+
+//HeadersMessage is one of the bytom msg type
+type HeadersMessage struct {
+       RawHeaders [][]byte
+}
+
+//NewHeadersMessage create a new HeadersMessage
+func NewHeadersMessage(headers []*types.BlockHeader) (*HeadersMessage, error) {
+       RawHeaders := [][]byte{}
+       for _, header := range headers {
+               data, err := json.Marshal(header)
+               if err != nil {
+                       return nil, err
+               }
+
+               RawHeaders = append(RawHeaders, data)
+       }
+       return &HeadersMessage{RawHeaders: RawHeaders}, nil
+}
+
+//GetHeaders return the headers in the msg
+func (msg *HeadersMessage) GetHeaders() ([]*types.BlockHeader, error) {
+       headers := []*types.BlockHeader{}
+       for _, data := range msg.RawHeaders {
+               header := &types.BlockHeader{}
+               if err := json.Unmarshal(data, header); err != nil {
+                       return nil, err
+               }
+
+               headers = append(headers, header)
+       }
+       return headers, nil
+}
+
+//GetBlocksMessage is one of the bytom msg type
+type GetBlocksMessage struct {
+       RawBlockLocator [][32]byte
+       RawStopHash     [32]byte
+}
+
+//NewGetBlocksMessage create a new GetBlocksMessage
+func NewGetBlocksMessage(blockLocator []*bc.Hash, stopHash *bc.Hash) *GetBlocksMessage {
+       msg := &GetBlocksMessage{
+               RawStopHash: stopHash.Byte32(),
+       }
+       for _, hash := range blockLocator {
+               msg.RawBlockLocator = append(msg.RawBlockLocator, hash.Byte32())
+       }
+       return msg
+}
+
+//GetBlockLocator return the locator of the msg
+func (msg *GetBlocksMessage) GetBlockLocator() []*bc.Hash {
+       blockLocator := []*bc.Hash{}
+       for _, rawHash := range msg.RawBlockLocator {
+               hash := bc.NewHash(rawHash)
+               blockLocator = append(blockLocator, &hash)
+       }
+       return blockLocator
+}
+
+//GetStopHash return the stop hash of the msg
+func (msg *GetBlocksMessage) GetStopHash() *bc.Hash {
+       hash := bc.NewHash(msg.RawStopHash)
+       return &hash
+}
+
+//BlocksMessage is one of the bytom msg type
+type BlocksMessage struct {
+       RawBlocks [][]byte
+}
+
+//NewBlocksMessage create a new BlocksMessage
+func NewBlocksMessage(blocks []*types.Block) (*BlocksMessage, error) {
+       rawBlocks := [][]byte{}
+       for _, block := range blocks {
+               data, err := json.Marshal(block)
+               if err != nil {
+                       return nil, err
+               }
+
+               rawBlocks = append(rawBlocks, data)
+       }
+       return &BlocksMessage{RawBlocks: rawBlocks}, nil
+}
+
+//GetBlocks returns the blocks in the msg
+func (msg *BlocksMessage) GetBlocks() ([]*types.Block, error) {
+       blocks := []*types.Block{}
+       for _, data := range msg.RawBlocks {
+               block := &types.Block{}
+               if err := json.Unmarshal(data, block); err != nil {
+                       return nil, err
+               }
+
+               blocks = append(blocks, block)
+       }
+       return blocks, nil
 }
 
 //StatusRequestMessage status request msg
@@ -179,6 +284,34 @@ func (m *StatusResponseMessage) String() string {
        return fmt.Sprintf("StatusResponseMessage{Height: %d, Best hash: %s, Genesis hash: %s}", m.Height, hash.String(), genesisHash.String())
 }
 
+//TransactionMessage notify new tx msg
+type TransactionMessage struct {
+       RawTx []byte
+}
+
+//NewTransactionMessage construct notify new tx msg
+func NewTransactionMessage(tx *types.Tx) (*TransactionMessage, error) {
+       rawTx, err := tx.TxData.MarshalText()
+       if err != nil {
+               return nil, err
+       }
+       return &TransactionMessage{RawTx: rawTx}, nil
+}
+
+//GetTransaction get tx from msg
+func (m *TransactionMessage) GetTransaction() (*types.Tx, error) {
+       tx := &types.Tx{}
+       if err := tx.UnmarshalText(m.RawTx); err != nil {
+               return nil, err
+       }
+       return tx, nil
+}
+
+//String
+func (m *TransactionMessage) String() string {
+       return fmt.Sprintf("TransactionMessage{Size: %d}", len(m.RawTx))
+}
+
 //MineBlockMessage new mined block msg
 type MineBlockMessage struct {
        RawBlock []byte
index 677676c..14ccc0b 100644 (file)
@@ -1,7 +1,7 @@
 package netsync
 
 import (
-       "strconv"
+       "net"
        "sync"
 
        log "github.com/sirupsen/logrus"
@@ -9,414 +9,332 @@ import (
 
        "github.com/bytom/consensus"
        "github.com/bytom/errors"
-       "github.com/bytom/p2p"
        "github.com/bytom/p2p/trust"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
 )
 
-var (
-       errClosed            = errors.New("peer set is closed")
-       errAlreadyRegistered = errors.New("peer is already registered")
-       errNotRegistered     = errors.New("peer is not registered")
-)
-
 const (
-       defaultVersion      = 1
+       maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+       maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
        defaultBanThreshold = uint64(100)
 )
 
-type peer struct {
-       mtx      sync.RWMutex
-       version  int // Protocol version negotiated
-       services consensus.ServiceFlag
-       id       string
-       height   uint64
-       hash     *bc.Hash
-       banScore trust.DynamicBanScore
-
-       swPeer *p2p.Peer
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+       Addr() net.Addr
+       ID() string
+       ServiceFlag() consensus.ServiceFlag
+       TrySend(byte, interface{}) bool
+}
 
-       knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
-       knownBlocks *set.Set // Set of block hashes known to be known by this peer
+//BasePeerSet is the intergace for connection level peer manager
+type BasePeerSet interface {
+       AddBannedPeer(string) error
+       StopPeerGracefully(string)
 }
 
-// PeerInfo indicate peer information
+// PeerInfo indicate peer status snap
 type PeerInfo struct {
-       Id         string `json:"id"`
+       ID         string `json:"id"`
        RemoteAddr string `json:"remote_addr"`
        Height     uint64 `json:"height"`
        Delay      uint32 `json:"delay"`
 }
 
-func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
-       services := consensus.SFFullNode
-       if len(Peer.Other) != 0 {
-               if serviceFlag, err := strconv.ParseUint(Peer.Other[0], 10, 64); err != nil {
-                       services = consensus.ServiceFlag(serviceFlag)
-               }
-       }
+type peer struct {
+       BasePeer
+       mtx         sync.RWMutex
+       services    consensus.ServiceFlag
+       height      uint64
+       hash        *bc.Hash
+       banScore    trust.DynamicBanScore
+       knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks *set.Set // Set of block hashes known to be known by this peer
+}
 
+func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
        return &peer{
-               version:     defaultVersion,
-               services:    services,
-               id:          Peer.Key,
+               BasePeer:    basePeer,
+               services:    basePeer.ServiceFlag(),
                height:      height,
                hash:        hash,
-               swPeer:      Peer,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
        }
 }
 
-func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
+func (p *peer) Height() uint64 {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
-       return p.height, p.hash
+       return p.height
 }
 
-func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
+func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
+       score := p.banScore.Increase(persistent, transient)
+       if score > defaultBanThreshold {
+               log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
+               return true
+       }
 
-       p.height = height
-       p.hash = hash
+       warnThreshold := defaultBanThreshold >> 1
+       if score > warnThreshold {
+               log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
+       }
+       return false
 }
 
-func (p *peer) requestBlockByHash(hash *bc.Hash) error {
-       msg := &BlockRequestMessage{RawHash: hash.Byte32()}
-       p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return nil
+func (p *peer) getBlockByHeight(height uint64) bool {
+       msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
+       return p.TrySend(BlockchainChannel, msg)
 }
 
-func (p *peer) requestBlockByHeight(height uint64) error {
-       msg := &BlockRequestMessage{Height: height}
-       p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return nil
+func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+       msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
+       return p.TrySend(BlockchainChannel, msg)
 }
 
-func (p *peer) SendTransactions(txs []*types.Tx) error {
-       for _, tx := range txs {
-               msg, err := NewTransactionNotifyMessage(tx)
-               if err != nil {
-                       return errors.New("Failed construction tx msg")
-               }
-               hash := &tx.ID
-               p.knownTxs.Add(hash.String())
-               if p.swPeer == nil {
-                       return errPeerDropped
-               }
-               p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       }
-       return nil
+func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+       msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+       return p.TrySend(BlockchainChannel, msg)
 }
 
-func (p *peer) GetPeer() *p2p.Peer {
-       p.mtx.RLock()
-       defer p.mtx.RUnlock()
-
-       return p.swPeer
-}
-
-
-func (p *peer) GetPeerInfo() *PeerInfo {
+func (p *peer) getPeerInfo() *PeerInfo {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
        return &PeerInfo{
-               Id:         p.id,
-               RemoteAddr: p.swPeer.RemoteAddr,
+               ID:         p.ID(),
+               RemoteAddr: p.Addr().String(),
                Height:     p.height,
-               Delay:      0, // TODO
        }
 }
 
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (p *peer) MarkTransaction(hash *bc.Hash) {
+func (p *peer) markBlock(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
 
-       // If we reached the memory allowance, drop a previously known transaction hash
-       for p.knownTxs.Size() >= maxKnownTxs {
-               p.knownTxs.Pop()
+       for p.knownBlocks.Size() >= maxKnownBlocks {
+               p.knownBlocks.Pop()
        }
-       p.knownTxs.Add(hash.String())
+       p.knownBlocks.Add(hash.String())
 }
 
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (p *peer) MarkBlock(hash *bc.Hash) {
+func (p *peer) markTransaction(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
 
-       // If we reached the memory allowance, drop a previously known block hash
-       for p.knownBlocks.Size() >= maxKnownBlocks {
-               p.knownBlocks.Pop()
+       for p.knownTxs.Size() >= maxKnownTxs {
+               p.knownTxs.Pop()
        }
-       p.knownBlocks.Add(hash.String())
+       p.knownTxs.Add(hash.String())
 }
 
-// addBanScore increases the persistent and decaying ban score fields by the
-// values passed as parameters. If the resulting score exceeds half of the ban
-// threshold, a warning is logged including the reason provided. Further, if
-// the score is above the ban threshold, the peer will be banned and
-// disconnected.
-func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
-       warnThreshold := defaultBanThreshold >> 1
-       if transient == 0 && persistent == 0 {
-               // The score is not being increased, but a warning message is still
-               // logged if the score is above the warn threshold.
-               score := p.banScore.Int()
-               if score > warnThreshold {
-                       log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
-               }
-               return false
-       }
-       score := p.banScore.Increase(persistent, transient)
-       if score > warnThreshold {
-               log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
-               if score > defaultBanThreshold {
-                       log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
-                       return true
-               }
+func (p *peer) sendBlock(block *types.Block) (bool, error) {
+       msg, err := NewBlockMessage(block)
+       if err != nil {
+               return false, errors.Wrap(err, "fail on NewBlockMessage")
        }
-       return false
-}
 
-type peerSet struct {
-       peers  map[string]*peer
-       lock   sync.RWMutex
-       closed bool
-}
-
-// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
-       return &peerSet{
-               peers: make(map[string]*peer),
+       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       if ok {
+               blcokHash := block.Hash()
+               p.knownBlocks.Add(blcokHash.String())
        }
+       return ok, nil
 }
 
-// Register injects a new peer into the working set, or returns an error if the
-// peer is already known.
-func (ps *peerSet) Register(p *peer) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
+func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
+       msg, err := NewBlocksMessage(blocks)
+       if err != nil {
+               return false, errors.Wrap(err, "fail on NewBlocksMessage")
+       }
 
-       if ps.closed {
-               return errClosed
+       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               return ok, nil
        }
-       if _, ok := ps.peers[p.id]; ok {
-               return errAlreadyRegistered
+
+       for _, block := range blocks {
+               blcokHash := block.Hash()
+               p.knownBlocks.Add(blcokHash.String())
        }
-       ps.peers[p.id] = p
-       return nil
+       return true, nil
 }
 
-// Unregister removes a remote peer from the active set, disabling any further
-// actions to/from that particular entity.
-func (ps *peerSet) Unregister(id string) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       if _, ok := ps.peers[id]; !ok {
-               return errNotRegistered
+func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
+       msg, err := NewHeadersMessage(headers)
+       if err != nil {
+               return false, errors.New("fail on NewHeadersMessage")
        }
-       delete(ps.peers, id)
-       return nil
-}
 
-// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) (*peer, bool) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
-       p, ok := ps.peers[id]
-       return p, ok
+       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       return ok, nil
 }
 
+func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
+       for _, tx := range txs {
+               msg, err := NewTransactionMessage(tx)
+               if err != nil {
+                       return false, errors.Wrap(err, "failed to tx msg")
+               }
 
-// getPeerInfos return all peer information of current node
-func (ps *peerSet) GetPeerInfos() []*PeerInfo {
-       var peerInfos []*PeerInfo
-       for _, peer := range ps.peers {
-               peerInfos = append(peerInfos, peer.GetPeerInfo())
+               if p.knownTxs.Has(tx.ID.String()) {
+                       continue
+               }
+               if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       return ok, nil
+               }
+               p.knownTxs.Add(tx.ID.String())
        }
-       return peerInfos
+       return true, nil
 }
 
-// Len returns if the current number of peers in the set.
-func (ps *peerSet) Len() int {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
-
-       return len(ps.peers)
+func (p *peer) setStatus(height uint64, hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       p.height = height
+       p.hash = hash
 }
 
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+type peerSet struct {
+       BasePeerSet
+       mtx   sync.RWMutex
+       peers map[string]*peer
+}
 
-       if peer, ok := ps.peers[peerID]; ok {
-               peer.MarkTransaction(hash)
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet(basePeerSet BasePeerSet) *peerSet {
+       return &peerSet{
+               BasePeerSet: basePeerSet,
+               peers:       make(map[string]*peer),
        }
 }
 
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
+       ps.mtx.Lock()
+       peer := ps.peers[peerID]
+       ps.mtx.Unlock()
 
-       if peer, ok := ps.peers[peerID]; ok {
-               peer.MarkBlock(hash)
+       if peer == nil {
+               return
        }
+       if ban := peer.addBanScore(persistent, transient, reason); !ban {
+               return
+       }
+       if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
+               log.WithField("err", err).Error("fail on add ban peer")
+       }
+       ps.removePeer(peerID)
 }
 
-// PeersWithoutBlock retrieves a list of peers that do not have a given block in
-// their set of known hashes.
-func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+       ps.mtx.Lock()
+       defer ps.mtx.Unlock()
 
-       list := make([]*peer, 0, len(ps.peers))
-       for _, p := range ps.peers {
-               if !p.knownBlocks.Has(hash.String()) {
-                       list = append(list, p)
-               }
+       if _, ok := ps.peers[peer.ID()]; !ok {
+               ps.peers[peer.ID()] = newPeer(height, hash, peer)
+               return
        }
-       return list
+       log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
 }
 
-// PeersWithoutTx retrieves a list of peers that do not have a given transaction
-// in their set of known hashes.
-func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
 
-       list := make([]*peer, 0, len(ps.peers))
+       var bestPeer *peer
        for _, p := range ps.peers {
-               if !p.knownTxs.Has(hash.String()) {
-                       list = append(list, p)
+               if !p.services.IsEnable(flag) {
+                       continue
+               }
+               if bestPeer == nil || p.height > bestPeer.height {
+                       bestPeer = p
                }
        }
-       return list
+       return bestPeer
 }
 
-// BestPeer retrieves the known peer with the currently highest total difficulty.
-func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
-
-       var bestPeer *p2p.Peer
-       var bestHeight uint64
+func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
+       msg, err := NewMinedBlockMessage(block)
+       if err != nil {
+               return errors.Wrap(err, "fail on broadcast mined block")
+       }
 
-       for _, p := range ps.peers {
-               if bestPeer == nil || p.height > bestHeight {
-                       bestPeer, bestHeight = p.swPeer, p.height
+       hash := block.Hash()
+       peers := ps.peersWithoutBlock(&hash)
+       for _, peer := range peers {
+               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       ps.removePeer(peer.ID())
+                       continue
                }
+               peer.markBlock(&hash)
        }
-
-       return bestPeer, bestHeight
+       return nil
 }
 
-// Close disconnects all peers.
-// No new peers can be registered after Close has returned.
-func (ps *peerSet) Close() {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       for _, p := range ps.peers {
-               p.swPeer.CloseConn()
+func (ps *peerSet) broadcastTx(tx *types.Tx) error {
+       msg, err := NewTransactionMessage(tx)
+       if err != nil {
+               return errors.Wrap(err, "fail on broadcast tx")
        }
-       ps.closed = true
-}
 
-func (ps *peerSet) AddPeer(peer *p2p.Peer) {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       if _, ok := ps.peers[peer.Key]; !ok {
-               keeperPeer := newPeer(0, nil, peer)
-               ps.peers[peer.Key] = keeperPeer
-               log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
-               return
+       peers := ps.peersWithoutTx(&tx.ID)
+       for _, peer := range peers {
+               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       ps.removePeer(peer.ID())
+                       continue
+               }
+               peer.markTransaction(&tx.ID)
        }
-       log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
+       return nil
 }
 
-func (ps *peerSet) RemovePeer(peerID string) {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       delete(ps.peers, peerID)
-       log.WithField("ID", peerID).Info("Delete peer from peerset")
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) getPeer(id string) *peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+       return ps.peers[id]
 }
 
-func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
+func (ps *peerSet) getPeerInfos() []*PeerInfo {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
 
-       if peer, ok := ps.peers[peerID]; ok {
-               peer.SetStatus(height, hash)
-       }
-}
-
-func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
-       peer, ok := ps.Peer(peerID)
-       if !ok {
-               return errors.New("Can't find peer. ")
+       result := []*PeerInfo{}
+       for _, peer := range ps.peers {
+               result = append(result, peer.getPeerInfo())
        }
-       return peer.requestBlockByHash(hash)
+       return result
 }
 
-func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
-       peer, ok := ps.Peer(peerID)
-       if !ok {
-               return errors.New("Can't find peer. ")
-       }
-       return peer.requestBlockByHeight(height)
-}
+func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
 
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
-       msg, err := NewMinedBlockMessage(block)
-       if err != nil {
-               return nil, errors.New("Failed construction block msg")
-       }
-       hash := block.Hash()
-       peers := ps.PeersWithoutBlock(&hash)
-       abnormalPeers := make([]*peer, 0)
-       for _, peer := range peers {
-               if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       abnormalPeers = append(abnormalPeers, peer)
-                       continue
-               }
-               if p, ok := ps.Peer(peer.id); ok {
-                       p.MarkBlock(&hash)
+       peers := []*peer{}
+       for _, peer := range ps.peers {
+               if !peer.knownBlocks.Has(hash.String()) {
+                       peers = append(peers, peer)
                }
        }
-       return abnormalPeers, nil
+       return peers
 }
 
-func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
-       return ps.BroadcastMinedBlock(block)
-}
+func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
 
-func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
-       msg, err := NewTransactionNotifyMessage(tx)
-       if err != nil {
-               return nil, errors.New("Failed construction tx msg")
-       }
-       peers := ps.PeersWithoutTx(&tx.ID)
-       abnormalPeers := make([]*peer, 0)
-       for _, peer := range peers {
-               if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       abnormalPeers = append(abnormalPeers, peer)
-                       continue
-               }
-               if p, ok := ps.Peer(peer.id); ok {
-                       p.MarkTransaction(&tx.ID)
+       peers := []*peer{}
+       for _, peer := range ps.peers {
+               if !peer.knownTxs.Has(hash.String()) {
+                       peers = append(peers, peer)
                }
        }
-       return abnormalPeers, nil
+       return peers
+}
+
+func (ps *peerSet) removePeer(peerID string) {
+       ps.mtx.Lock()
+       delete(ps.peers, peerID)
+       ps.mtx.Unlock()
+       ps.StopPeerGracefully(peerID)
 }
index 3698482..cc45b88 100644 (file)
@@ -2,86 +2,40 @@ package netsync
 
 import (
        "reflect"
-       "strings"
-       "sync"
        "time"
 
        log "github.com/sirupsen/logrus"
-       cmn "github.com/tendermint/tmlibs/common"
 
        "github.com/bytom/errors"
        "github.com/bytom/p2p"
        "github.com/bytom/p2p/connection"
-       "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
 )
 
 const (
-       // BlockchainChannel is a channel for blocks and status updates
-       BlockchainChannel        = byte(0x40)
-       protocolHandshakeTimeout = time.Second * 10
-       handshakeRetryTicker     = 4 * time.Second
+       handshakeTimeout    = 10 * time.Second
+       handshakeCheckPerid = 500 * time.Millisecond
 )
 
 var (
-       //ErrProtocolHandshakeTimeout peers handshake timeout
-       ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
-       ErrStatusRequest            = errors.New("Status request error")
-       ErrDiffGenesisHash          = errors.New("Different genesis hash")
+       errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+       errStatusRequest            = errors.New("Status request error")
 )
 
-// Response describes the response standard.
-type Response struct {
-       Status string      `json:"status,omitempty"`
-       Msg    string      `json:"msg,omitempty"`
-       Data   interface{} `json:"data,omitempty"`
-}
-
-type initalPeerStatus struct {
-       peerID      string
-       height      uint64
-       hash        *bc.Hash
-       genesisHash *bc.Hash
-}
-
 //ProtocolReactor handles new coming protocol message.
 type ProtocolReactor struct {
        p2p.BaseReactor
 
-       chain       *protocol.Chain
-       blockKeeper *blockKeeper
-       txPool      *protocol.TxPool
-       sw          *p2p.Switch
-       fetcher     *Fetcher
-       peers       *peerSet
-       handshakeMu sync.Mutex
-       genesisHash bc.Hash
-
-       newPeerCh      chan struct{}
-       quitReqBlockCh chan *string
-       txSyncCh       chan *txsync
-       peerStatusCh   chan *initalPeerStatus
+       sm    *SyncManager
+       peers *peerSet
 }
 
 // NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
+func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
        pr := &ProtocolReactor{
-               chain:          chain,
-               blockKeeper:    blockPeer,
-               txPool:         txPool,
-               sw:             sw,
-               fetcher:        fetcher,
-               peers:          peers,
-               newPeerCh:      newPeerCh,
-               txSyncCh:       txSyncCh,
-               quitReqBlockCh: quitReqBlockCh,
-               peerStatusCh:   make(chan *initalPeerStatus),
+               sm:    sm,
+               peers: peers,
        }
        pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
-       genesisBlock, _ := pr.chain.GetBlockByHeight(0)
-       pr.genesisHash = genesisBlock.Hash()
-
        return pr
 }
 
@@ -107,142 +61,78 @@ func (pr *ProtocolReactor) OnStop() {
        pr.BaseReactor.OnStop()
 }
 
-// syncTransactions starts sending all currently pending transactions to the given peer.
-func (pr *ProtocolReactor) syncTransactions(p *peer) {
-       if p == nil {
-               return
-       }
-       pending := pr.txPool.GetTransactions()
-       if len(pending) == 0 {
-               return
-       }
-       txs := make([]*types.Tx, len(pending))
-       for i, batch := range pending {
-               txs[i] = batch.Tx
-       }
-       pr.txSyncCh <- &txsync{p, txs}
-}
-
 // AddPeer implements Reactor by sending our state to peer.
 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
-       pr.handshakeMu.Lock()
-       defer pr.handshakeMu.Unlock()
-       if peer == nil {
-               return errPeerDropped
-       }
        if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
-               return ErrStatusRequest
+               return errStatusRequest
        }
-       retryTicker := time.Tick(handshakeRetryTicker)
-       handshakeWait := time.NewTimer(protocolHandshakeTimeout)
+
+       checkTicker := time.NewTimer(handshakeCheckPerid)
+       timeoutTicker := time.NewTimer(handshakeTimeout)
        for {
                select {
-               case status := <-pr.peerStatusCh:
-                       if status.peerID == peer.Key {
-                               if strings.Compare(pr.genesisHash.String(), status.genesisHash.String()) != 0 {
-                                       log.Info("Remote peer genesis block hash:", status.genesisHash.String(), " local hash:", pr.genesisHash.String())
-                                       return ErrDiffGenesisHash
-                               }
-                               pr.peers.AddPeer(peer)
-                               pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
-                               prPeer, ok := pr.peers.Peer(peer.Key)
-                               if !ok {
-                                       return errPeerDropped
-                               }
-                               pr.syncTransactions(prPeer)
-                               pr.newPeerCh <- struct{}{}
+               case <-checkTicker.C:
+                       if exist := pr.peers.getPeer(peer.Key); exist != nil {
+                               pr.sm.syncTransactions(peer.Key)
                                return nil
                        }
-               case <-retryTicker:
-                       if peer == nil {
-                               return errPeerDropped
-                       }
-                       if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
-                               return ErrStatusRequest
-                       }
-               case <-handshakeWait.C:
-                       return ErrProtocolHandshakeTimeout
+
+               case <-timeoutTicker.C:
+                       return errProtocolHandshakeTimeout
                }
        }
 }
 
 // RemovePeer implements Reactor by removing peer from the pool.
 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
-       select {
-       case pr.quitReqBlockCh <- &peer.Key:
-       default:
-               log.Warning("quitReqBlockCh is full")
-       }
-       pr.peers.RemovePeer(peer.Key)
+       pr.peers.removePeer(peer.Key)
 }
 
 // Receive implements Reactor by handling 4 types of messages (look below).
 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
-       _, msg, err := DecodeMessage(msgBytes)
+       msgType, msg, err := DecodeMessage(msgBytes)
        if err != nil {
-               log.Errorf("Error decoding message %v", err)
+               log.WithField("err", err).Errorf("fail on reactor decoding message")
+               return
+       }
+
+       peer := pr.peers.getPeer(src.Key)
+       if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
                return
        }
-       log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
 
        switch msg := msg.(type) {
-       case *BlockRequestMessage:
-               var block *types.Block
-               var err error
-               if msg.Height != 0 {
-                       block, err = pr.chain.GetBlockByHeight(msg.Height)
-               } else {
-                       block, err = pr.chain.GetBlockByHash(msg.GetHash())
-               }
-               if err != nil {
-                       log.Errorf("Fail on BlockRequestMessage get block: %v", err)
-                       return
-               }
-               response, err := NewBlockResponseMessage(block)
-               if err != nil {
-                       log.Errorf("Fail on BlockRequestMessage create response: %v", err)
-                       return
-               }
-               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
+       case *GetBlockMessage:
+               pr.sm.handleGetBlockMsg(peer, msg)
 
-       case *BlockResponseMessage:
-               log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
-               pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
+       case *BlockMessage:
+               pr.sm.handleBlockMsg(peer, msg)
 
        case *StatusRequestMessage:
-               blockHeader := pr.chain.BestBlockHeader()
-               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader, &pr.genesisHash)})
+               pr.sm.handleStatusRequestMsg(src)
 
        case *StatusResponseMessage:
-               peerStatus := &initalPeerStatus{
-                       peerID:      src.Key,
-                       height:      msg.Height,
-                       hash:        msg.GetHash(),
-                       genesisHash: msg.GetGenesisHash(),
-               }
-               pr.peerStatusCh <- peerStatus
+               pr.sm.handleStatusResponseMsg(src, msg)
 
-       case *TransactionNotifyMessage:
-               tx, err := msg.GetTransaction()
-               if err != nil {
-                       log.Errorf("Error decoding new tx %v", err)
-                       return
-               }
-               pr.blockKeeper.AddTx(tx, src.Key)
+       case *TransactionMessage:
+               pr.sm.handleTransactionMsg(peer, msg)
 
        case *MineBlockMessage:
-               block, err := msg.GetMineBlock()
-               if err != nil {
-                       log.Errorf("Error decoding mined block %v", err)
-                       return
-               }
-               // Mark the peer as owning the block and schedule it for import
-               hash := block.Hash()
-               pr.peers.MarkBlock(src.Key, &hash)
-               pr.fetcher.Enqueue(src.Key, block)
-               pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
+               pr.sm.handleMineBlockMsg(peer, msg)
+
+       case *GetHeadersMessage:
+               pr.sm.handleGetHeadersMsg(peer, msg)
+
+       case *HeadersMessage:
+               pr.sm.handleHeadersMsg(peer, msg)
+
+       case *GetBlocksMessage:
+               pr.sm.handleGetBlocksMsg(peer, msg)
+
+       case *BlocksMessage:
+               pr.sm.handleBlocksMsg(peer, msg)
 
        default:
-               log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
+               log.Errorf("unknown message type %v", reflect.TypeOf(msg))
        }
 }
diff --git a/netsync/sync.go b/netsync/sync.go
deleted file mode 100644 (file)
index 7e3554e..0000000
+++ /dev/null
@@ -1,176 +0,0 @@
-// Copyright 2015 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
-
-package netsync
-
-import (
-       "math/rand"
-       "sync/atomic"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-
-       "github.com/bytom/common"
-       "github.com/bytom/protocol/bc/types"
-)
-
-const (
-       forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
-       minDesiredPeerCount = 5                // Amount of peers desired to start syncing
-
-       // This is the target size for the packs of transactions sent by txsyncLoop.
-       // A pack can get larger than this if a single transactions exceeds this size.
-       txsyncPackSize = 100 * 1024
-)
-
-type txsync struct {
-       p   *peer
-       txs []*types.Tx
-}
-
-// syncer is responsible for periodically synchronising with the network, both
-// downloading hashes and blocks as well as handling the announcement handler.
-func (sm *SyncManager) syncer() {
-       // Start and ensure cleanup of sync mechanisms
-       sm.fetcher.Start()
-       defer sm.fetcher.Stop()
-       //defer sm.downloader.Terminate()
-
-       // Wait for different events to fire synchronisation operations
-       forceSync := time.NewTicker(forceSyncCycle)
-       defer forceSync.Stop()
-
-       for {
-               select {
-               case <-sm.newPeerCh:
-                       log.Info("New peer connected.")
-                       // Make sure we have peers to select from, then sync
-                       if sm.sw.Peers().Size() < minDesiredPeerCount {
-                               break
-                       }
-                       go sm.synchronise()
-
-               case <-forceSync.C:
-                       // Force a sync even if not enough peers are present
-                       go sm.synchronise()
-
-               case <-sm.quitSync:
-                       return
-               }
-       }
-}
-
-// synchronise tries to sync up our local block chain with a remote peer.
-func (sm *SyncManager) synchronise() {
-       log.Debug("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
-       // Make sure only one goroutine is ever allowed past this point at once
-       if !atomic.CompareAndSwapInt32(&sm.synchronising, 0, 1) {
-               log.Info("Synchronising ...")
-               return
-       }
-       defer atomic.StoreInt32(&sm.synchronising, 0)
-       for len(sm.dropPeerCh) > 0 {
-               <-sm.dropPeerCh
-       }
-
-       peer, bestHeight := sm.peers.BestPeer()
-       // Short circuit if no peers are available
-       if peer == nil {
-               return
-       }
-
-       if ok := sm.Switch().Peers().Has(peer.Key); !ok {
-               log.Info("Peer disconnected")
-               sm.sw.StopPeerGracefully(peer)
-               return
-       }
-
-       if bestHeight > sm.chain.BestBlockHeight() {
-               log.Info("sync peer:", peer.Addr(), " height:", bestHeight)
-               sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
-       }
-}
-
-// txsyncLoop takes care of the initial transaction sync for each new
-// connection. When a new peer appears, we relay all currently pending
-// transactions. In order to minimise egress bandwidth usage, we send
-// the transactions in small packs to one peer at a time.
-func (sm *SyncManager) txsyncLoop() {
-       var (
-               pending = make(map[string]*txsync)
-               sending = false               // whether a send is active
-               pack    = new(txsync)         // the pack that is being sent
-               done    = make(chan error, 1) // result of the send
-       )
-
-       // send starts a sending a pack of transactions from the sync.
-       send := func(s *txsync) {
-               // Fill pack with transactions up to the target size.
-               size := common.StorageSize(0)
-               pack.p = s.p
-               pack.txs = pack.txs[:0]
-               for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
-                       pack.txs = append(pack.txs, s.txs[i])
-                       size += common.StorageSize(s.txs[i].SerializedSize)
-               }
-               // Remove the transactions that will be sent.
-               s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
-               if len(s.txs) == 0 {
-                       delete(pending, s.p.swPeer.Key)
-               }
-               // Send the pack in the background.
-               log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
-               sending = true
-               go func() { done <- pack.p.SendTransactions(pack.txs) }()
-       }
-
-       // pick chooses the next pending sync.
-       pick := func() *txsync {
-               if len(pending) == 0 {
-                       return nil
-               }
-               n := rand.Intn(len(pending)) + 1
-               for _, s := range pending {
-                       if n--; n == 0 {
-                               return s
-                       }
-               }
-               return nil
-       }
-
-       for {
-               select {
-               case s := <-sm.txSyncCh:
-                       pending[s.p.swPeer.Key] = s
-                       if !sending {
-                               send(s)
-                       }
-               case err := <-done:
-                       sending = false
-                       // Stop tracking peers that cause send failures.
-                       if err != nil {
-                               log.Info("Transaction send failed", "err", err)
-                               delete(pending, pack.p.swPeer.Key)
-                       }
-                       // Schedule the next send.
-                       if s := pick(); s != nil {
-                               send(s)
-                       }
-               case <-sm.quitSync:
-                       return
-               }
-       }
-}
diff --git a/netsync/tx_keeper.go b/netsync/tx_keeper.go
new file mode 100644 (file)
index 0000000..1a8cc67
--- /dev/null
@@ -0,0 +1,127 @@
+package netsync
+
+import (
+       "math/rand"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/protocol/bc/types"
+)
+
+const (
+       // This is the target size for the packs of transactions sent by txSyncLoop.
+       // A pack can get larger than this if a single transactions exceeds this size.
+       txSyncPackSize = 100 * 1024
+)
+
+type txSyncMsg struct {
+       peerID string
+       txs    []*types.Tx
+}
+
+func (sm *SyncManager) syncTransactions(peerID string) {
+       pending := sm.txPool.GetTransactions()
+       if len(pending) == 0 {
+               return
+       }
+
+       txs := make([]*types.Tx, len(pending))
+       for i, batch := range pending {
+               txs[i] = batch.Tx
+       }
+       sm.txSyncCh <- &txSyncMsg{peerID, txs}
+}
+
+func (sm *SyncManager) txBroadcastLoop() {
+       for {
+               select {
+               case newTx := <-sm.newTxCh:
+                       if err := sm.peers.broadcastTx(newTx); err != nil {
+                               log.Errorf("Broadcast new tx error. %v", err)
+                               return
+                       }
+               case <-sm.quitSync:
+                       return
+               }
+       }
+}
+
+// txSyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (sm *SyncManager) txSyncLoop() {
+       pending := make(map[string]*txSyncMsg)
+       sending := false            // whether a send is active
+       done := make(chan error, 1) // result of the send
+
+       // send starts a sending a pack of transactions from the sync.
+       send := func(msg *txSyncMsg) {
+               peer := sm.peers.getPeer(msg.peerID)
+               if peer == nil {
+                       delete(pending, msg.peerID)
+               }
+
+               totalSize := uint64(0)
+               sendTxs := []*types.Tx{}
+               for i := 0; i < len(msg.txs) && totalSize < txSyncPackSize; i++ {
+                       sendTxs = append(sendTxs, msg.txs[i])
+                       totalSize += msg.txs[i].SerializedSize
+               }
+
+               copy(msg.txs, msg.txs[len(sendTxs):])
+               if len(msg.txs) == 0 {
+                       delete(pending, msg.peerID)
+               }
+
+               // Send the pack in the background.
+               log.WithFields(log.Fields{
+                       "count": len(sendTxs),
+                       "bytes": totalSize,
+                       "peer":  msg.peerID,
+               }).Debug("txSyncLoop sending transactions")
+               sending = true
+               go func() {
+                       ok, err := peer.sendTransactions(sendTxs)
+                       if !ok {
+                               sm.peers.removePeer(msg.peerID)
+                       }
+                       done <- err
+               }()
+       }
+
+       // pick chooses the next pending sync.
+       pick := func() *txSyncMsg {
+               if len(pending) == 0 {
+                       return nil
+               }
+
+               n := rand.Intn(len(pending)) + 1
+               for _, s := range pending {
+                       if n--; n == 0 {
+                               return s
+                       }
+               }
+               return nil
+       }
+
+       for {
+               select {
+               case msg := <-sm.txSyncCh:
+                       pending[msg.peerID] = msg
+                       if !sending {
+                               send(msg)
+                       }
+
+               case err := <-done:
+                       sending = false
+                       if err != nil {
+                               log.WithField("err", err).Warning("fail on txSyncLoop sending")
+                       }
+
+                       if s := pick(); s != nil {
+                               send(s)
+                       }
+               }
+       }
+}
index 32620c1..2f44281 100644 (file)
@@ -3,6 +3,7 @@ package p2p
 import (
        "fmt"
        "net"
+       "strconv"
        "time"
 
        "github.com/pkg/errors"
@@ -12,6 +13,7 @@ import (
        cmn "github.com/tendermint/tmlibs/common"
 
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
        "github.com/bytom/p2p/connection"
 )
 
@@ -158,6 +160,10 @@ func (pc *peerConn) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duratio
        return peerNodeInfo, nil
 }
 
+func (p *Peer) ID() string {
+       return p.Key
+}
+
 // IsOutbound returns true if the connection is outbound, false otherwise.
 func (p *Peer) IsOutbound() bool {
        return p.outbound
@@ -177,6 +183,18 @@ func (p *Peer) Send(chID byte, msg interface{}) bool {
        return p.mconn.Send(chID, msg)
 }
 
+func (p *Peer) ServiceFlag() consensus.ServiceFlag {
+       services := consensus.SFFullNode
+       if len(p.Other) == 0 {
+               return services
+       }
+
+       if serviceFlag, err := strconv.ParseUint(p.Other[0], 10, 64); err == nil {
+               services = consensus.ServiceFlag(serviceFlag)
+       }
+       return services
+}
+
 // String representation.
 func (p *Peer) String() string {
        if p.outbound {
index ee80708..b50274f 100644 (file)
@@ -70,7 +70,7 @@ func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
 
        if r.SendAddrs(p, nodes) {
                <-time.After(1 * time.Second)
-               r.Switch.StopPeerGracefully(p)
+               r.Switch.StopPeerGracefully(p.Key)
        }
        return errors.New("addPeer: reach the max peer, exchange then close")
 }
@@ -80,7 +80,7 @@ func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
        _, msg, err := DecodeMessage(rawMsg)
        if err != nil {
                log.WithField("error", err).Error("failed to decoding pex message")
-               r.Switch.StopPeerGracefully(p)
+               r.Switch.StopPeerGracefully(p.Key)
                return
        }
 
@@ -116,7 +116,7 @@ func (r *PEXReactor) SendAddrs(p *p2p.Peer, nodes []*discover.Node) bool {
 
        ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
        if !ok {
-               r.Switch.StopPeerGracefully(p)
+               r.Switch.StopPeerGracefully(p.Key)
        }
        return ok
 }
index 77057d0..5cbf59c 100644 (file)
@@ -107,12 +107,11 @@ func (sw *Switch) OnStop() {
 }
 
 //AddBannedPeer add peer to blacklist
-func (sw *Switch) AddBannedPeer(peer *Peer) error {
+func (sw *Switch) AddBannedPeer(ip string) error {
        sw.mtx.Lock()
        defer sw.mtx.Unlock()
 
-       key := peer.NodeInfo.RemoteAddrHost()
-       sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
+       sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
        datajson, err := json.Marshal(sw.bannedPeer)
        if err != nil {
                return err
@@ -263,8 +262,10 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
 }
 
 // StopPeerGracefully disconnect from a peer gracefully.
-func (sw *Switch) StopPeerGracefully(peer *Peer) {
-       sw.stopAndRemovePeer(peer, nil)
+func (sw *Switch) StopPeerGracefully(peerID string) {
+       if peer := sw.peers.Get(peerID); peer != nil {
+               sw.stopAndRemovePeer(peer, nil)
+       }
 }
 
 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
@@ -365,9 +366,9 @@ func (sw *Switch) startInitPeer(peer *Peer) error {
 }
 
 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
+       sw.peers.Remove(peer)
        for _, reactor := range sw.reactors {
                reactor.RemovePeer(peer, reason)
        }
-       sw.peers.Remove(peer)
        peer.Stop()
 }
index 872ef4f..72a2dd7 100644 (file)
@@ -28,7 +28,7 @@ func (c *Chain) GetBlockByHash(hash *bc.Hash) (*types.Block, error) {
        return c.store.GetBlock(hash)
 }
 
-// GetBlockByHeight return a block by given height
+// GetBlockByHeight return a block header by given height
 func (c *Chain) GetBlockByHeight(height uint64) (*types.Block, error) {
        node := c.index.NodeByHeight(height)
        if node == nil {
@@ -37,6 +37,24 @@ func (c *Chain) GetBlockByHeight(height uint64) (*types.Block, error) {
        return c.store.GetBlock(&node.Hash)
 }
 
+// GetHeaderByHash return a block header by given hash
+func (c *Chain) GetHeaderByHash(hash *bc.Hash) (*types.BlockHeader, error) {
+       node := c.index.GetNode(hash)
+       if node == nil {
+               return nil, errors.New("can't find block header in given hash")
+       }
+       return node.BlockHeader(), nil
+}
+
+// GetHeaderByHeight return a block header by given height
+func (c *Chain) GetHeaderByHeight(height uint64) (*types.BlockHeader, error) {
+       node := c.index.NodeByHeight(height)
+       if node == nil {
+               return nil, errors.New("can't find block header in given height")
+       }
+       return node.BlockHeader(), nil
+}
+
 func (c *Chain) calcReorganizeNodes(node *state.BlockNode) ([]*state.BlockNode, []*state.BlockNode) {
        var attachNodes []*state.BlockNode
        var detachNodes []*state.BlockNode