OSDN Git Service

Format netsync module code directory (#88)
[bytom/vapor.git] / netsync / peers / peer.go
similarity index 51%
rename from netsync/peer.go
rename to netsync/peers/peer.go
index 794501b..ee5068a 100644 (file)
@@ -1,4 +1,4 @@
-package netsync
+package peers
 
 import (
        "encoding/hex"
@@ -12,18 +12,27 @@ import (
 
        "github.com/vapor/consensus"
        "github.com/vapor/errors"
+       msgs "github.com/vapor/netsync/messages"
        "github.com/vapor/p2p/trust"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
 
 const (
-       maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
-       maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
-       defaultBanThreshold = uint32(100)
+       maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+       maxKnownSignatures    = 1024  // Maximum block signatures to keep in the known list (prevent DOS)
+       maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
+       defaultBanThreshold   = uint32(100)
+       maxFilterAddressSize  = 50
+       maxFilterAddressCount = 1000
+
+       logModule = "peers"
 )
 
-var errSendStatusMsg = errors.New("send status msg fail")
+var (
+       errSendStatusMsg = errors.New("send status msg fail")
+       ErrPeerMisbehave = errors.New("peer is misbehave")
+)
 
 //BasePeer is the interface for connection level peer
 type BasePeer interface {
@@ -41,6 +50,14 @@ type BasePeerSet interface {
        StopPeerGracefully(string)
 }
 
+type BroadcastMsg interface {
+       FilterTargetPeers(ps *PeerSet) []string
+       MarkSendRecord(ps *PeerSet, peers []string)
+       GetChan() byte
+       GetMsg() interface{}
+       MsgString() string
+}
+
 // PeerInfo indicate peer status snap
 type PeerInfo struct {
        ID                  string `json:"peer_id"`
@@ -56,36 +73,38 @@ type PeerInfo struct {
        CurrentReceivedRate int64  `json:"current_received_rate"`
 }
 
-type peer struct {
+type Peer struct {
        BasePeer
-       mtx         sync.RWMutex
-       services    consensus.ServiceFlag
-       height      uint64
-       hash        *bc.Hash
-       banScore    trust.DynamicBanScore
-       knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
-       knownBlocks *set.Set // Set of block hashes known to be known by this peer
-       knownStatus uint64   // Set of chain status known to be known by this peer
-       filterAdds  *set.Set // Set of addresses that the spv node cares about.
-}
-
-func newPeer(basePeer BasePeer) *peer {
-       return &peer{
-               BasePeer:    basePeer,
-               services:    basePeer.ServiceFlag(),
-               knownTxs:    set.New(),
-               knownBlocks: set.New(),
-               filterAdds:  set.New(),
-       }
-}
-
-func (p *peer) Height() uint64 {
+       mtx             sync.RWMutex
+       services        consensus.ServiceFlag
+       height          uint64
+       hash            *bc.Hash
+       banScore        trust.DynamicBanScore
+       knownTxs        *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks     *set.Set // Set of block hashes known to be known by this peer
+       knownSignatures *set.Set // Set of block signatures known to be known by this peer
+       knownStatus     uint64   // Set of chain status known to be known by this peer
+       filterAdds      *set.Set // Set of addresses that the spv node cares about.
+}
+
+func newPeer(basePeer BasePeer) *Peer {
+       return &Peer{
+               BasePeer:        basePeer,
+               services:        basePeer.ServiceFlag(),
+               knownTxs:        set.New(),
+               knownBlocks:     set.New(),
+               knownSignatures: set.New(),
+               filterAdds:      set.New(),
+       }
+}
+
+func (p *Peer) Height() uint64 {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
        return p.height
 }
 
-func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
+func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
        score := p.banScore.Increase(persistent, transient)
        if score > defaultBanThreshold {
                log.WithFields(log.Fields{
@@ -109,7 +128,7 @@ func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
        return false
 }
 
-func (p *peer) addFilterAddress(address []byte) {
+func (p *Peer) AddFilterAddress(address []byte) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
 
@@ -125,31 +144,35 @@ func (p *peer) addFilterAddress(address []byte) {
        p.filterAdds.Add(hex.EncodeToString(address))
 }
 
-func (p *peer) addFilterAddresses(addresses [][]byte) {
+func (p *Peer) AddFilterAddresses(addresses [][]byte) {
        if !p.filterAdds.IsEmpty() {
                p.filterAdds.Clear()
        }
        for _, address := range addresses {
-               p.addFilterAddress(address)
+               p.AddFilterAddress(address)
        }
 }
 
-func (p *peer) getBlockByHeight(height uint64) bool {
-       msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
-       return p.TrySend(BlockchainChannel, msg)
+func (p *Peer) FilterClear() {
+       p.filterAdds.Clear()
 }
 
-func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
-       msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
-       return p.TrySend(BlockchainChannel, msg)
+func (p *Peer) GetBlockByHeight(height uint64) bool {
+       msg := struct{ msgs.BlockchainMessage }{&msgs.GetBlockMessage{Height: height}}
+       return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
-func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
-       msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
-       return p.TrySend(BlockchainChannel, msg)
+func (p *Peer) GetBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetBlocksMessage(locator, stopHash)}
+       return p.TrySend(msgs.BlockchainChannel, msg)
 }
 
-func (p *peer) getPeerInfo() *PeerInfo {
+func (p *Peer) GetHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+       msg := struct{ msgs.BlockchainMessage }{msgs.NewGetHeadersMessage(locator, stopHash)}
+       return p.TrySend(msgs.BlockchainChannel, msg)
+}
+
+func (p *Peer) GetPeerInfo() *PeerInfo {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
 
@@ -174,7 +197,7 @@ func (p *peer) getPeerInfo() *PeerInfo {
        }
 }
 
-func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
        var relatedTxs []*types.Tx
        var relatedStatuses []*bc.TxVerifyResult
        for i, tx := range txs {
@@ -186,7 +209,7 @@ func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.Transaction
        return relatedTxs, relatedStatuses
 }
 
-func (p *peer) isRelatedTx(tx *types.Tx) bool {
+func (p *Peer) isRelatedTx(tx *types.Tx) bool {
        for _, input := range tx.Inputs {
                switch inp := input.TypedInput.(type) {
                case *types.SpendInput:
@@ -203,11 +226,11 @@ func (p *peer) isRelatedTx(tx *types.Tx) bool {
        return false
 }
 
-func (p *peer) isSPVNode() bool {
+func (p *Peer) isSPVNode() bool {
        return !p.services.IsEnable(consensus.SFFullNode)
 }
 
-func (p *peer) markBlock(hash *bc.Hash) {
+func (p *Peer) MarkBlock(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
 
@@ -217,14 +240,24 @@ func (p *peer) markBlock(hash *bc.Hash) {
        p.knownBlocks.Add(hash.String())
 }
 
-func (p *peer) markNewStatus(height uint64) {
+func (p *Peer) markNewStatus(height uint64) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
 
        p.knownStatus = height
 }
 
-func (p *peer) markTransaction(hash *bc.Hash) {
+func (p *Peer) markSign(signature []byte) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       for p.knownSignatures.Size() >= maxKnownSignatures {
+               p.knownSignatures.Pop()
+       }
+       p.knownSignatures.Add(signature)
+}
+
+func (p *Peer) markTransaction(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
 
@@ -234,13 +267,39 @@ func (p *peer) markTransaction(hash *bc.Hash) {
        p.knownTxs.Add(hash.String())
 }
 
-func (p *peer) sendBlock(block *types.Block) (bool, error) {
-       msg, err := NewBlockMessage(block)
+func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []string
+       for _, peer := range ps.peers {
+               if !peer.knownBlocks.Has(hash.String()) {
+                       peers = append(peers, peer.ID())
+               }
+       }
+       return peers
+}
+
+func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []string
+       for _, peer := range ps.peers {
+               if !peer.knownSignatures.Has(signature) {
+                       peers = append(peers, peer.ID())
+               }
+       }
+       return peers
+}
+
+func (p *Peer) SendBlock(block *types.Block) (bool, error) {
+       msg, err := msgs.NewBlockMessage(block)
        if err != nil {
                return false, errors.Wrap(err, "fail on NewBlockMessage")
        }
 
-       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
        if ok {
                blcokHash := block.Hash()
                p.knownBlocks.Add(blcokHash.String())
@@ -248,13 +307,13 @@ func (p *peer) sendBlock(block *types.Block) (bool, error) {
        return ok, nil
 }
 
-func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
-       msg, err := NewBlocksMessage(blocks)
+func (p *Peer) SendBlocks(blocks []*types.Block) (bool, error) {
+       msg, err := msgs.NewBlocksMessage(blocks)
        if err != nil {
                return false, errors.Wrap(err, "fail on NewBlocksMessage")
        }
 
-       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+       if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                return ok, nil
        }
 
@@ -265,39 +324,39 @@ func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
        return true, nil
 }
 
-func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
-       msg, err := NewHeadersMessage(headers)
+func (p *Peer) SendHeaders(headers []*types.BlockHeader) (bool, error) {
+       msg, err := msgs.NewHeadersMessage(headers)
        if err != nil {
                return false, errors.New("fail on NewHeadersMessage")
        }
 
-       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
        return ok, nil
 }
 
-func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
-       msg := NewMerkleBlockMessage()
-       if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
+func (p *Peer) SendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
+       msg := msgs.NewMerkleBlockMessage()
+       if err := msg.SetRawBlockHeader(block.BlockHeader); err != nil {
                return false, err
        }
 
        relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
 
        txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
-       if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+       if err := msg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
                return false, nil
        }
 
        statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
-       if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
+       if err := msg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
                return false, nil
        }
 
-       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg})
        return ok, nil
 }
 
-func (p *peer) sendTransactions(txs []*types.Tx) error {
+func (p *Peer) SendTransactions(txs []*types.Tx) error {
        validTxs := make([]*types.Tx, 0, len(txs))
        for i, tx := range txs {
                if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
@@ -305,16 +364,16 @@ func (p *peer) sendTransactions(txs []*types.Tx) error {
                }
 
                validTxs = append(validTxs, tx)
-               if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
+               if len(validTxs) != msgs.TxsMsgMaxTxNum && i != len(txs)-1 {
                        continue
                }
 
-               msg, err := NewTransactionsMessage(validTxs)
+               msg, err := msgs.NewTransactionsMessage(validTxs)
                if err != nil {
                        return err
                }
 
-               if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                        return errors.New("failed to send txs msg")
                }
 
@@ -328,37 +387,37 @@ func (p *peer) sendTransactions(txs []*types.Tx) error {
        return nil
 }
 
-func (p *peer) sendStatus(header *types.BlockHeader) error {
-       msg := NewStatusMessage(header)
-       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+func (p *Peer) SendStatus(header *types.BlockHeader) error {
+       msg := msgs.NewStatusMessage(header)
+       if ok := p.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                return errSendStatusMsg
        }
        p.markNewStatus(header.Height)
        return nil
 }
 
-func (p *peer) setStatus(height uint64, hash *bc.Hash) {
+func (p *Peer) SetStatus(height uint64, hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
        p.height = height
        p.hash = hash
 }
 
-type peerSet struct {
+type PeerSet struct {
        BasePeerSet
        mtx   sync.RWMutex
-       peers map[string]*peer
+       peers map[string]*Peer
 }
 
 // newPeerSet creates a new peer set to track the active participants.
-func newPeerSet(basePeerSet BasePeerSet) *peerSet {
-       return &peerSet{
+func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
+       return &PeerSet{
                BasePeerSet: basePeerSet,
-               peers:       make(map[string]*peer),
+               peers:       make(map[string]*Peer),
        }
 }
 
-func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
+func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
        ps.mtx.Lock()
        peer := ps.peers[peerID]
        ps.mtx.Unlock()
@@ -372,10 +431,10 @@ func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reas
        if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
        }
-       ps.removePeer(peerID)
+       ps.RemovePeer(peerID)
 }
 
-func (ps *peerSet) addPeer(peer BasePeer) {
+func (ps *PeerSet) AddPeer(peer BasePeer) {
        ps.mtx.Lock()
        defer ps.mtx.Unlock()
 
@@ -386,11 +445,11 @@ func (ps *peerSet) addPeer(peer BasePeer) {
        log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
 }
 
-func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
+func (ps *PeerSet) BestPeer(flag consensus.ServiceFlag) *Peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
 
-       var bestPeer *peer
+       var bestPeer *Peer
        for _, p := range ps.peers {
                if !p.services.IsEnable(flag) {
                        continue
@@ -402,35 +461,47 @@ func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
        return bestPeer
 }
 
-func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
-       msg, err := NewMinedBlockMessage(block)
-       if err != nil {
-               return errors.Wrap(err, "fail on broadcast mined block")
+//SendMsg send message to the target peer.
+func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return false
        }
 
-       hash := block.Hash()
-       peers := ps.peersWithoutBlock(&hash)
+       ok := peer.TrySend(msgChannel, msg)
+       if !ok {
+               ps.RemovePeer(peerID)
+       }
+       return ok
+}
+
+//BroadcastMsg Broadcast message to the target peers
+// and mark the message send record
+func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
+       //filter target peers
+       peers := bm.FilterTargetPeers(ps)
+
+       //broadcast to target peers
+       peersSuccess := make([]string, 0)
        for _, peer := range peers {
-               if peer.isSPVNode() {
+               if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
+                       log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
                        continue
                }
-               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
-                       ps.removePeer(peer.ID())
-                       continue
-               }
-               peer.markBlock(&hash)
-               peer.markNewStatus(block.Height)
+               peersSuccess = append(peersSuccess, peer)
        }
+
+       //mark the message send record
+       bm.MarkSendRecord(ps, peersSuccess)
        return nil
 }
 
-func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
-       msg := NewStatusMessage(&bestBlock.BlockHeader)
+func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
+       msg := msgs.NewStatusMessage(&bestBlock.BlockHeader)
        peers := ps.peersWithoutNewStatus(bestBlock.Height)
        for _, peer := range peers {
-               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       ps.removePeer(peer.ID())
+               if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
+                       ps.RemovePeer(peer.ID())
                        continue
                }
 
@@ -439,8 +510,8 @@ func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
        return nil
 }
 
-func (ps *peerSet) broadcastTx(tx *types.Tx) error {
-       msg, err := NewTransactionMessage(tx)
+func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
+       msg, err := msgs.NewTransactionMessage(tx)
        if err != nil {
                return errors.Wrap(err, "fail on broadcast tx")
        }
@@ -450,14 +521,14 @@ func (ps *peerSet) broadcastTx(tx *types.Tx) error {
                if peer.isSPVNode() && !peer.isRelatedTx(tx) {
                        continue
                }
-               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               if ok := peer.TrySend(msgs.BlockchainChannel, struct{ msgs.BlockchainMessage }{msg}); !ok {
                        log.WithFields(log.Fields{
                                "module":  logModule,
                                "peer":    peer.Addr(),
                                "type":    reflect.TypeOf(msg),
                                "message": msg.String(),
                        }).Warning("send message to peer error")
-                       ps.removePeer(peer.ID())
+                       ps.RemovePeer(peer.ID())
                        continue
                }
                peer.markTransaction(&tx.ID)
@@ -465,33 +536,57 @@ func (ps *peerSet) broadcastTx(tx *types.Tx) error {
        return nil
 }
 
-func (ps *peerSet) errorHandler(peerID string, err error) {
-       if errors.Root(err) == errPeerMisbehave {
-               ps.addBanScore(peerID, 20, 0, err.Error())
+func (ps *PeerSet) ErrorHandler(peerID string, err error) {
+       if errors.Root(err) == ErrPeerMisbehave {
+               ps.AddBanScore(peerID, 20, 0, err.Error())
        } else {
-               ps.removePeer(peerID)
+               ps.RemovePeer(peerID)
        }
 }
 
 // Peer retrieves the registered peer with the given id.
-func (ps *peerSet) getPeer(id string) *peer {
+func (ps *PeerSet) GetPeer(id string) *Peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
        return ps.peers[id]
 }
 
-func (ps *peerSet) getPeerInfos() []*PeerInfo {
+func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
 
        result := []*PeerInfo{}
        for _, peer := range ps.peers {
-               result = append(result, peer.getPeerInfo())
+               result = append(result, peer.GetPeerInfo())
        }
        return result
 }
 
-func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.MarkBlock(hash)
+}
+
+func (ps *PeerSet) MarkBlockSignature(peerID string, signature []byte) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.markSign(signature)
+}
+
+func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.markNewStatus(height)
+}
+
+func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
        ps.mtx.Lock()
        peer := ps.peers[peerID]
        ps.mtx.Unlock()
@@ -502,11 +597,11 @@ func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
        peer.markTransaction(&txHash)
 }
 
-func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
+func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
 
-       peers := []*peer{}
+       peers := []*Peer{}
        for _, peer := range ps.peers {
                if !peer.knownBlocks.Has(hash.String()) {
                        peers = append(peers, peer)
@@ -515,11 +610,11 @@ func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
        return peers
 }
 
-func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
+func (ps *PeerSet) peersWithoutNewStatus(height uint64) []*Peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
 
-       var peers []*peer
+       var peers []*Peer
        for _, peer := range ps.peers {
                if peer.knownStatus < height {
                        peers = append(peers, peer)
@@ -528,11 +623,11 @@ func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
        return peers
 }
 
-func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
+func (ps *PeerSet) peersWithoutTx(hash *bc.Hash) []*Peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
 
-       peers := []*peer{}
+       peers := []*Peer{}
        for _, peer := range ps.peers {
                if !peer.knownTxs.Has(hash.String()) {
                        peers = append(peers, peer)
@@ -541,9 +636,18 @@ func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
        return peers
 }
 
-func (ps *peerSet) removePeer(peerID string) {
+func (ps *PeerSet) RemovePeer(peerID string) {
        ps.mtx.Lock()
        delete(ps.peers, peerID)
        ps.mtx.Unlock()
        ps.StopPeerGracefully(peerID)
 }
+
+func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+
+       peer.SetStatus(height, hash)
+}