OSDN Git Service

Add consensus message transfer bbft_msg
authorYahtoo Ma <yahtoo.ma@gmail.com>
Wed, 22 May 2019 11:15:21 +0000 (19:15 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Wed, 22 May 2019 11:15:21 +0000 (19:15 +0800)
19 files changed:
api/api.go
api/nodeinfo.go
netsync/bbft/handle.go [new file with mode: 0644]
netsync/bbft/message.go [new file with mode: 0644]
netsync/bbft/reactor.go [new file with mode: 0644]
netsync/block_fetcher.go
netsync/block_keeper.go
netsync/handle.go
netsync/message.go
netsync/message_broadcast.go [new file with mode: 0644]
netsync/message_test.go
netsync/peer.go [deleted file]
netsync/peers/peer.go [new file with mode: 0644]
netsync/peers/peer_set.go [new file with mode: 0644]
netsync/peers/transfer.go [new file with mode: 0644]
netsync/protocol_reactor.go
netsync/tool_test.go
netsync/tx_keeper.go
test.go [new file with mode: 0644]

index 2396703..1574151 100644 (file)
@@ -24,7 +24,7 @@ import (
        "github.com/vapor/net/http/httpjson"
        "github.com/vapor/net/http/static"
        "github.com/vapor/net/websocket"
-       "github.com/vapor/netsync"
+       "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
        "github.com/vapor/protocol"
        "github.com/vapor/wallet"
@@ -173,9 +173,9 @@ type NetSync interface {
        IsCaughtUp() bool
        PeerCount() int
        GetNetwork() string
-       BestPeer() *netsync.PeerInfo
+       BestPeer() *peers.PeerInfo
        DialPeerWithAddress(addr *p2p.NetAddress) error
-       GetPeerInfos() []*netsync.PeerInfo
+       GetPeerInfos() []*peers.PeerInfo
        StopPeer(peerID string) error
 }
 
index 6558aaa..7ddde83 100644 (file)
@@ -5,7 +5,7 @@ import (
        "net"
 
        "github.com/vapor/errors"
-       "github.com/vapor/netsync"
+       "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
        "github.com/vapor/version"
 )
@@ -53,7 +53,7 @@ func (a *API) GetNodeInfo() *NetInfo {
 }
 
 // return the currently connected peers with net address
-func (a *API) getPeerInfoByAddr(addr string) *netsync.PeerInfo {
+func (a *API) getPeerInfoByAddr(addr string) *peers.PeerInfo {
        peerInfos := a.sync.GetPeerInfos()
        for _, peerInfo := range peerInfos {
                if peerInfo.RemoteAddr == addr {
@@ -69,7 +69,7 @@ func (a *API) disconnectPeerById(peerID string) error {
 }
 
 // connect peer b y net address
-func (a *API) connectPeerByIpAndPort(ip string, port uint16) (*netsync.PeerInfo, error) {
+func (a *API) connectPeerByIpAndPort(ip string, port uint16) (*peers.PeerInfo, error) {
        netIp := net.ParseIP(ip)
        if netIp == nil {
                return nil, errors.New("invalid ip address")
diff --git a/netsync/bbft/handle.go b/netsync/bbft/handle.go
new file mode 100644 (file)
index 0000000..9495bb7
--- /dev/null
@@ -0,0 +1,57 @@
+package bbft
+
+import (
+       "net"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/consensus"
+)
+
+type consensusManger struct {
+       peers *peerSet
+}
+
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+       Addr() net.Addr
+       ID() string
+       ServiceFlag() consensus.ServiceFlag
+       TrySend(byte, interface{}) bool
+       IsLAN() bool
+}
+
+func (cm *consensusManger) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
+       peer := cm.peers.getPeer(basePeer.ID())
+       if peer == nil {
+               return
+       }
+
+       log.WithFields(log.Fields{
+               "module":  logModule,
+               "peer":    basePeer.Addr(),
+               "type":    reflect.TypeOf(msg),
+               "message": msg.String(),
+       }).Info("receive message from peer")
+
+       switch msg := msg.(type) {
+       case *BlockProposeMessage:
+               cm.handleBlockProposeMsg(peer, msg)
+
+       case *BlockSigMessage:
+               cm.handleBlockSigMsg(peer, msg)
+
+       default:
+               log.WithFields(log.Fields{
+                       "module":       logModule,
+                       "peer":         basePeer.Addr(),
+                       "message_type": reflect.TypeOf(msg),
+               }).Error("unhandled message type")
+       }
+}
+
+func (cm *consensusManger) handleBlockProposeMsg(peer *peer, msg *BlockProposeMessage) {
+}
+
+func (cm *consensusManger) handleBlockSigMsg(peer *peer, msg *BlockSigMessage) {
+}
diff --git a/netsync/bbft/message.go b/netsync/bbft/message.go
new file mode 100644 (file)
index 0000000..f6144c0
--- /dev/null
@@ -0,0 +1,55 @@
+package bbft
+
+import (
+       "bytes"
+       "errors"
+
+       "github.com/tendermint/go-wire"
+)
+
+//Consensus msg byte
+const (
+       ConsensusChannel = byte(0x50)
+
+       BlockSigByte     = byte(0x10)
+       BlockProposeByte = byte(0x11)
+
+       maxBlockchainResponseSize = 22020096 + 2
+)
+
+//BlockchainMessage is a generic message for this reactor.
+type ConsensusMessage interface {
+       String() string
+}
+
+var _ = wire.RegisterInterface(
+       struct{ ConsensusMessage }{},
+       wire.ConcreteType{&BlockSigMessage{}, BlockSigByte},
+       wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte},
+)
+
+//DecodeMessage decode msg
+func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
+       msgType = bz[0]
+       n := int(0)
+       r := bytes.NewReader(bz)
+       msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ ConsensusMessage }).ConsensusMessage
+       if err != nil && n != len(bz) {
+               err = errors.New("DecodeMessage() had bytes left over")
+       }
+       return
+}
+
+type BlockSigMessage struct {
+}
+
+func (bs *BlockSigMessage) String() string {
+       return ""
+}
+
+type BlockProposeMessage struct {
+}
+
+func (bp *BlockProposeMessage) String() string {
+       return ""
+}
diff --git a/netsync/bbft/reactor.go b/netsync/bbft/reactor.go
new file mode 100644 (file)
index 0000000..0e630f6
--- /dev/null
@@ -0,0 +1,66 @@
+package bbft
+
+import (
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/p2p"
+       "github.com/vapor/p2p/connection"
+)
+
+const logModule = "bbft"
+
+type ConsensusReactor struct {
+       p2p.BaseReactor
+       c     *consensus
+       peers *peerSet
+}
+
+func NewConsensusReactor(peers *peerSet) *ConsensusReactor {
+       cr := &ConsensusReactor{
+               peers: peers,
+       }
+       cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
+       return cr
+}
+
+// GetChannels implements Reactor
+func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
+       return []*connection.ChannelDescriptor{
+               {
+                       ID:                ConsensusChannel,
+                       Priority:          10,
+                       SendQueueCapacity: 100,
+               },
+       }
+}
+
+// OnStart implements BaseService
+func (cr *ConsensusReactor) OnStart() error {
+       cr.BaseReactor.OnStart()
+       return nil
+}
+
+// OnStop implements BaseService
+func (cr *ConsensusReactor) OnStop() {
+       cr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
+       return nil
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+}
+
+// Receive implements Reactor by handling 4 types of messages (look below).
+func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+       msgType, msg, err := DecodeMessage(msgBytes)
+       if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
+               return
+       }
+
+       cr.c.processMsg(src, msgType, msg)
+}
index c48cd0b..c69c03e 100644 (file)
@@ -4,6 +4,7 @@ import (
        log "github.com/sirupsen/logrus"
        "gopkg.in/karalabe/cookiejar.v2/collections/prque"
 
+       "github.com/vapor/netsync/peers"
        "github.com/vapor/protocol/bc"
 )
 
@@ -17,7 +18,7 @@ const (
 // and scheduling them for retrieval.
 type blockFetcher struct {
        chain Chain
-       peers *peerSet
+       peers *peers.PeerSet
 
        newBlockCh chan *blockMsg
        queue      *prque.Prque
@@ -25,7 +26,7 @@ type blockFetcher struct {
 }
 
 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
-func newBlockFetcher(chain Chain, peers *peerSet) *blockFetcher {
+func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
        f := &blockFetcher{
                chain:      chain,
                peers:      peers,
@@ -75,12 +76,7 @@ func (f *blockFetcher) add(msg *blockMsg) {
 func (f *blockFetcher) insert(msg *blockMsg) {
        isOrphan, err := f.chain.ProcessBlock(msg.block)
        if err != nil {
-               peer := f.peers.getPeer(msg.peerID)
-               if peer == nil {
-                       return
-               }
-
-               f.peers.addBanScore(msg.peerID, 20, 0, err.Error())
+               f.peers.AddBanScore(msg.peerID, 20, 0, err.Error())
                return
        }
 
@@ -88,12 +84,14 @@ func (f *blockFetcher) insert(msg *blockMsg) {
                return
        }
 
-       if err := f.peers.broadcastMinedBlock(msg.block); err != nil {
+       minedMsg, _ := newMinedBlockBroadcastMsg(msg.block, BlockchainChannel)
+       if err := f.peers.BroadcastMsg(minedMsg); err != nil {
+               //if err := f.peers.BroadcastMinedBlock(msg.block); err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on broadcast new block")
                return
        }
-
-       if err := f.peers.broadcastNewStatus(msg.block); err != nil {
+       headMsg,_:=newStatusBroadcastMsg(&msg.block.BlockHeader,BlockchainChannel)
+       if err := f.peers.BroadcastMsg(headMsg); err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on broadcast new status")
                return
        }
index 9b61081..d772258 100644 (file)
@@ -8,6 +8,7 @@ import (
 
        "github.com/vapor/consensus"
        "github.com/vapor/errors"
+       "github.com/vapor/netsync/peers"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
@@ -27,7 +28,6 @@ var (
        errAppendHeaders  = errors.New("fail to append list due to order dismatch")
        errRequestTimeout = errors.New("request timeout")
        errPeerDropped    = errors.New("Peer dropped")
-       errPeerMisbehave  = errors.New("peer is misbehave")
 )
 
 type blockMsg struct {
@@ -47,9 +47,9 @@ type headersMsg struct {
 
 type blockKeeper struct {
        chain Chain
-       peers *peerSet
+       peers *peers.PeerSet
 
-       syncPeer         *peer
+       syncPeerID       string
        blockProcessCh   chan *blockMsg
        blocksProcessCh  chan *blocksMsg
        headersProcessCh chan *headersMsg
@@ -57,7 +57,7 @@ type blockKeeper struct {
        headerList *list.List
 }
 
-func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
+func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
        bk := &blockKeeper{
                chain:            chain,
                peers:            peers,
@@ -117,7 +117,7 @@ func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
        lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
        for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
                if lastHeader.Height >= checkPoint.Height {
-                       return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
+                       return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
                }
 
                lastHash := lastHeader.Hash()
@@ -127,7 +127,7 @@ func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
                }
 
                if len(headers) == 0 {
-                       return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
+                       return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
                }
 
                if err := bk.appendHeaderList(headers); err != nil {
@@ -144,7 +144,7 @@ func (bk *blockKeeper) fastBlockSync(checkPoint *consensus.Checkpoint) error {
                }
 
                if len(blocks) == 0 {
-                       return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
+                       return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
                }
 
                for _, block := range blocks {
@@ -271,7 +271,9 @@ func (bk *blockKeeper) regularBlockSync(wantHeight uint64) error {
 }
 
 func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
-       if ok := bk.syncPeer.getBlockByHeight(height); !ok {
+       //send get block msg
+       msg := struct{ BlockchainMessage }{NewGetBlockMessage(height, [32]byte{})}
+       if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
                return nil, errPeerDropped
        }
 
@@ -281,7 +283,7 @@ func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
        for {
                select {
                case msg := <-bk.blockProcessCh:
-                       if msg.peerID != bk.syncPeer.ID() {
+                       if msg.peerID != bk.syncPeerID {
                                continue
                        }
                        if msg.block.Height != height {
@@ -295,7 +297,9 @@ func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
 }
 
 func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
-       if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
+       //send get blocks msg
+       msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
+       if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
                return nil, errPeerDropped
        }
 
@@ -305,7 +309,7 @@ func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*
        for {
                select {
                case msg := <-bk.blocksProcessCh:
-                       if msg.peerID != bk.syncPeer.ID() {
+                       if msg.peerID != bk.syncPeerID {
                                continue
                        }
                        return msg.blocks, nil
@@ -316,7 +320,9 @@ func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*
 }
 
 func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
-       if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
+       //send get headers msg
+       msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+       if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
                return nil, errPeerDropped
        }
 
@@ -326,7 +332,7 @@ func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]
        for {
                select {
                case msg := <-bk.headersProcessCh:
-                       if msg.peerID != bk.syncPeer.ID() {
+                       if msg.peerID != bk.syncPeerID {
                                continue
                        }
                        return msg.headers, nil
@@ -348,29 +354,29 @@ func (bk *blockKeeper) resetHeaderState() {
 
 func (bk *blockKeeper) startSync() bool {
        checkPoint := bk.nextCheckpoint()
-       peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
-       if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
-               bk.syncPeer = peer
+       bestPeerID, bestPeerHeight := bk.peers.BestPeerInfo(consensus.SFFastSync | consensus.SFFullNode)
+       if bestPeerID != "" && checkPoint != nil && bestPeerHeight >= checkPoint.Height {
+               bk.syncPeerID = bestPeerID
                if err := bk.fastBlockSync(checkPoint); err != nil {
                        log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
-                       bk.peers.errorHandler(peer.ID(), err)
+                       bk.peers.ErrorHandler(bestPeerID, err)
                        return false
                }
                return true
        }
 
        blockHeight := bk.chain.BestBlockHeight()
-       peer = bk.peers.bestPeer(consensus.SFFullNode)
-       if peer != nil && peer.Height() > blockHeight {
-               bk.syncPeer = peer
+       bestPeerID, bestPeerHeight = bk.peers.BestPeerInfo(consensus.SFFullNode)
+       if bestPeerID != "" && bestPeerHeight > blockHeight {
+               bk.syncPeerID = bestPeerID
                targetHeight := blockHeight + maxBlockPerMsg
-               if targetHeight > peer.Height() {
-                       targetHeight = peer.Height()
+               if targetHeight > bestPeerHeight {
+                       targetHeight = bestPeerHeight
                }
 
                if err := bk.regularBlockSync(targetHeight); err != nil {
                        log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
-                       bk.peers.errorHandler(peer.ID(), err)
+                       bk.peers.ErrorHandler(bestPeerID, err)
                        return false
                }
                return true
@@ -392,8 +398,9 @@ func (bk *blockKeeper) syncWorker() {
                if err != nil {
                        log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
                }
+               headMsg, _ := newStatusBroadcastMsg(&block.BlockHeader, BlockchainChannel)
 
-               if err = bk.peers.broadcastNewStatus(block); err != nil {
+               if err = bk.peers.BroadcastMsg(headMsg); err != nil {
                        log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
                }
        }
index 7dc35b9..9fe7592 100644 (file)
@@ -9,6 +9,7 @@ import (
        cfg "github.com/vapor/config"
        "github.com/vapor/consensus"
        "github.com/vapor/event"
+       "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p"
        core "github.com/vapor/protocol"
        "github.com/vapor/protocol/bc"
@@ -16,10 +17,8 @@ import (
 )
 
 const (
-       logModule             = "netsync"
-       maxTxChanSize         = 10000
-       maxFilterAddressSize  = 50
-       maxFilterAddressCount = 1000
+       logModule      = "netsync"
+       txsMsgMaxTxNum = 1024
 )
 
 var (
@@ -59,7 +58,7 @@ type SyncManager struct {
        txPool       *core.TxPool
        blockFetcher *blockFetcher
        blockKeeper  *blockKeeper
-       peers        *peerSet
+       peers        *peers.PeerSet
 
        txSyncCh chan *txSyncMsg
        quitSync chan struct{}
@@ -70,7 +69,7 @@ type SyncManager struct {
        txMsgSub        *event.Subscription
 }
 
-// CreateSyncManager create sync manager and set switch.
+// NewSyncManager create sync manager and set switch.
 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
        sw, err := p2p.NewSwitch(config)
        if err != nil {
@@ -80,13 +79,13 @@ func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispat
        return newSyncManager(config, sw, chain, txPool, dispatcher)
 }
 
-//NewSyncManager create a sync manager
+//newSyncManager create a sync manager
 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
        genesisHeader, err := chain.GetHeaderByHeight(0)
        if err != nil {
                return nil, err
        }
-       peers := newPeerSet(sw)
+       peers := peers.NewPeerSet(sw)
        manager := &SyncManager{
                sw:              sw,
                genesisHash:     genesisHeader.Hash(),
@@ -102,25 +101,28 @@ func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxP
        }
 
        if !config.VaultMode {
-               protocolReactor := NewProtocolReactor(manager, peers)
+               protocolReactor := NewProtocolReactor(manager)
                manager.sw.AddReactor("PROTOCOL", protocolReactor)
        }
        return manager, nil
 }
 
-func (sm *SyncManager) AddPeer(peer BasePeer) {
-       sm.peers.addPeer(peer)
+//AddPeer add peer to SyncManager PeerSet
+func (sm *SyncManager) AddPeer(peer peers.BasePeer) {
+       sm.peers.AddPeer(peer)
 }
 
 //BestPeer return the highest p2p peerInfo
-func (sm *SyncManager) BestPeer() *PeerInfo {
-       bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
-       if bestPeer != nil {
-               return bestPeer.getPeerInfo()
+func (sm *SyncManager) BestPeer() *peers.PeerInfo {
+       peerID, bestHeight := sm.peers.BestPeerInfo(consensus.SFFullNode)
+       if peerID == "" || bestHeight == 0 {
+               return nil
        }
-       return nil
+
+       return sm.peers.GetPeerInfo(peerID)
 }
 
+//DialPeerWithAddress
 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
        if sm.config.VaultMode {
                return errVaultModeDialPeer
@@ -129,61 +131,67 @@ func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
        return sm.sw.DialPeerWithAddress(addr)
 }
 
+//GetNetwork return chain id
 func (sm *SyncManager) GetNetwork() string {
        return sm.config.ChainID
 }
 
 //GetPeerInfos return peer info of all peers
-func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
-       return sm.peers.getPeerInfos()
+func (sm *SyncManager) GetPeerInfos() []*peers.PeerInfo {
+       return sm.peers.GetPeerInfos()
 }
 
 //IsCaughtUp check wheather the peer finish the sync
 func (sm *SyncManager) IsCaughtUp() bool {
-       peer := sm.peers.bestPeer(consensus.SFFullNode)
-       return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+       bestPeerID, bestPeerHeight := sm.peers.BestPeerInfo(consensus.SFFullNode)
+       return bestPeerID == "" || bestPeerHeight <= sm.chain.BestBlockHeight()
+}
+
+//RemovePeer del peer from SyncManager PeerSet then disconnect with peer
+func (sm *SyncManager) RemovePeer(peer peers.BasePeer) {
+       sm.peers.RemovePeer(peer.ID())
 }
 
 //StopPeer try to stop peer by given ID
 func (sm *SyncManager) StopPeer(peerID string) error {
-       if peer := sm.peers.getPeer(peerID); peer == nil {
-               return errors.New("peerId not exist")
-       }
-       sm.peers.removePeer(peerID)
+       sm.peers.RemovePeer(peerID)
        return nil
 }
 
-func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+func (sm *SyncManager) handleBlockMsg(peerID string, msg *BlockMessage) {
        block, err := msg.GetBlock()
        if err != nil {
                return
        }
-       sm.blockKeeper.processBlock(peer.ID(), block)
+       sm.blockKeeper.processBlock(peerID, block)
 }
 
-func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+func (sm *SyncManager) handleBlocksMsg(peerID string, msg *BlocksMessage) {
        blocks, err := msg.GetBlocks()
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
                return
        }
 
-       sm.blockKeeper.processBlocks(peer.ID(), blocks)
+       sm.blockKeeper.processBlocks(peerID, blocks)
 }
 
-func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
-       peer.addFilterAddress(msg.Address)
+func (sm *SyncManager) handleFilterAddMsg(peerID string, msg *FilterAddMessage) {
+       var addresses [][]byte
+       addresses = append(addresses, msg.Address)
+       sm.peers.AddFilterAddresses(peerID, addresses)
+
 }
 
-func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
-       peer.filterAdds.Clear()
+func (sm *SyncManager) handleFilterClearMsg(peerID string) {
+       sm.peers.ClearFilterAdds(peerID)
 }
 
-func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
-       peer.addFilterAddresses(msg.Addresses)
+func (sm *SyncManager) handleFilterLoadMsg(peerID string, msg *FilterLoadMessage) {
+       sm.peers.AddFilterAddresses(peerID, msg.Addresses)
 }
 
-func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+func (sm *SyncManager) handleGetBlockMsg(peerID string, msg *GetBlockMessage) {
        var block *types.Block
        var err error
        if msg.Height != 0 {
@@ -195,17 +203,19 @@ func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
                return
        }
-
-       ok, err := peer.sendBlock(block)
-       if !ok {
-               sm.peers.removePeer(peer.ID())
-       }
+       blockMsg, err := NewBlockMessage(block)
        if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on create block message")
+               return
+       }
+       ok := sm.peers.SendMsg(peerID, BlockchainChannel, blockMsg)
+       if !ok {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
        }
+       //todo:mark block
 }
 
-func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+func (sm *SyncManager) handleGetBlocksMsg(peerID string, msg *GetBlocksMessage) {
        blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
        if err != nil || len(blocks) == 0 {
                return
@@ -216,43 +226,49 @@ func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
        for _, block := range blocks {
                rawData, err := block.MarshalText()
                if err != nil {
-                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
+                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on handleGetBlocksMsg marshal block")
                        continue
                }
 
-               if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
+               if totalSize+len(rawData) > MaxBlockchainResponseSize/2 {
                        break
                }
                totalSize += len(rawData)
                sendBlocks = append(sendBlocks, block)
        }
-
-       ok, err := peer.sendBlocks(sendBlocks)
-       if !ok {
-               sm.peers.removePeer(peer.ID())
-       }
+       blocksMsg, err := NewBlocksMessage(blocks)
        if err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warn("failed on create blocks msg")
+               return
+       }
+       ok := sm.peers.SendMsg(peerID, BlockchainChannel, blocksMsg)
+       if !ok {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
        }
+       //todo: mark block
 }
 
-func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+func (sm *SyncManager) handleGetHeadersMsg(peerID string, msg *GetHeadersMessage) {
        headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
        if err != nil || len(headers) == 0 {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("failed on handleGetHeadersMsg locateHeaders")
                return
        }
 
-       ok, err := peer.sendHeaders(headers)
-       if !ok {
-               sm.peers.removePeer(peer.ID())
-       }
+       headersMsg, err := NewHeadersMessage(headers)
        if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warn("fail on create headers msg")
+               return
+       }
+
+       ok := sm.peers.SendMsg(peerID, BlockchainChannel, headersMsg)
+
+       if !ok {
+               log.WithFields(log.Fields{"module": logModule}).Error("fail on handleGetHeadersMsg sentBlock")
        }
 }
 
-func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
+func (sm *SyncManager) handleGetMerkleBlockMsg(peerID string, msg *GetMerkleBlockMessage) {
        var err error
        var block *types.Block
        if msg.Height != 0 {
@@ -272,28 +288,42 @@ func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMe
                return
        }
 
-       ok, err := peer.sendMerkleBlock(block, txStatus)
-       if err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
+       merkleBlockMsg := NewMerkleBlockMessage()
+       if err := merkleBlockMsg.SetRawBlockHeader(block.BlockHeader); err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg set block header")
                return
        }
 
+       relatedTxs, relatedStatuses := sm.peers.GetRelatedTxAndStatus(peerID, block.Transactions, txStatus)
+
+       txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
+       if err := merkleBlockMsg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg set tx info")
+               return
+       }
+
+       statusHashes := types.GetStatusMerkleTreeProof(txStatus.VerifyStatus, txFlags)
+       if err := merkleBlockMsg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg set status info")
+               return
+       }
+       ok := sm.peers.SendMsg(peerID, BlockchainChannel, merkleBlockMsg)
        if !ok {
-               sm.peers.removePeer(peer.ID())
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
        }
 }
 
-func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+func (sm *SyncManager) handleHeadersMsg(peerID string, msg *HeadersMessage) {
        headers, err := msg.GetHeaders()
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
                return
        }
 
-       sm.blockKeeper.processHeaders(peer.ID(), headers)
+       sm.blockKeeper.processHeaders(peerID, headers)
 }
 
-func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
+func (sm *SyncManager) handleMineBlockMsg(peerID string, msg *MineBlockMessage) {
        block, err := msg.GetMineBlock()
        if err != nil {
                log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
@@ -301,52 +331,49 @@ func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
        }
 
        hash := block.Hash()
-       peer.markBlock(&hash)
-       sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
-       peer.setStatus(block.Height, &hash)
+       sm.peers.MarkBlock(peerID, &hash)
+       sm.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
+       sm.peers.SetStatus(peerID, block.Height, &hash)
 }
 
-func (sm *SyncManager) handleStatusMsg(basePeer BasePeer, msg *StatusMessage) {
-       if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
-               peer.setStatus(msg.Height, msg.GetHash())
-               return
-       }
+func (sm *SyncManager) handleStatusMsg(peerID string, msg *StatusMessage) {
+       sm.peers.SetStatus(peerID, msg.Height, msg.GetHash())
 }
 
-func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+func (sm *SyncManager) handleTransactionMsg(peerID string, msg *TransactionMessage) {
        tx, err := msg.GetTransaction()
        if err != nil {
-               sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+               sm.peers.AddBanScore(peerID, 0, 10, "fail on get tx from message")
                return
        }
 
        if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
-               sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+               sm.peers.AddBanScore(peerID, 10, 0, "fail on validate tx transaction")
        }
-       sm.peers.markTx(peer.ID(), tx.ID)
 }
 
-func (sm *SyncManager) handleTransactionsMsg(peer *peer, msg *TransactionsMessage) {
+func (sm *SyncManager) handleTransactionsMsg(peerID string, msg *TransactionsMessage) {
        txs, err := msg.GetTransactions()
        if err != nil {
-               sm.peers.addBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+               sm.peers.AddBanScore(peerID, 0, 20, "fail on get txs from message")
                return
        }
 
        if len(txs) > txsMsgMaxTxNum {
-               sm.peers.addBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+               sm.peers.AddBanScore(peerID, 20, 0, "exceeded the maximum tx number limit")
                return
        }
 
        for _, tx := range txs {
                if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && !isOrphan {
-                       sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+                       sm.peers.AddBanScore(peerID, 10, 0, "fail on validate tx transaction")
                        return
                }
-               sm.peers.markTx(peer.ID(), tx.ID)
+               sm.peers.MarkTx(peerID, &tx.ID)
        }
 }
 
+//IsListening
 func (sm *SyncManager) IsListening() bool {
        if sm.config.VaultMode {
                return false
@@ -361,82 +388,78 @@ func (sm *SyncManager) PeerCount() int {
        return len(sm.sw.Peers().List())
 }
 
-func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
-       peer := sm.peers.getPeer(basePeer.ID())
-       if peer == nil {
+func (sm *SyncManager) processMsg(peerID string, msgType byte, msg BlockchainMessage) {
+       if sm.peers.GetPeer(peerID) == nil {
                return
        }
 
        log.WithFields(log.Fields{
                "module":  logModule,
-               "peer":    basePeer.Addr(),
+               "peer":    peerID,
                "type":    reflect.TypeOf(msg),
                "message": msg.String(),
        }).Info("receive message from peer")
 
        switch msg := msg.(type) {
        case *GetBlockMessage:
-               sm.handleGetBlockMsg(peer, msg)
+               sm.handleGetBlockMsg(peerID, msg)
 
        case *BlockMessage:
-               sm.handleBlockMsg(peer, msg)
+               sm.handleBlockMsg(peerID, msg)
 
        case *StatusMessage:
-               sm.handleStatusMsg(basePeer, msg)
+               sm.handleStatusMsg(peerID, msg)
 
        case *TransactionMessage:
-               sm.handleTransactionMsg(peer, msg)
+               sm.handleTransactionMsg(peerID, msg)
 
        case *TransactionsMessage:
-               sm.handleTransactionsMsg(peer, msg)
+               sm.handleTransactionsMsg(peerID, msg)
 
        case *MineBlockMessage:
-               sm.handleMineBlockMsg(peer, msg)
+               sm.handleMineBlockMsg(peerID, msg)
 
        case *GetHeadersMessage:
-               sm.handleGetHeadersMsg(peer, msg)
+               sm.handleGetHeadersMsg(peerID, msg)
 
        case *HeadersMessage:
-               sm.handleHeadersMsg(peer, msg)
+               sm.handleHeadersMsg(peerID, msg)
 
        case *GetBlocksMessage:
-               sm.handleGetBlocksMsg(peer, msg)
+               sm.handleGetBlocksMsg(peerID, msg)
 
        case *BlocksMessage:
-               sm.handleBlocksMsg(peer, msg)
+               sm.handleBlocksMsg(peerID, msg)
 
        case *FilterLoadMessage:
-               sm.handleFilterLoadMsg(peer, msg)
+               sm.handleFilterLoadMsg(peerID, msg)
 
        case *FilterAddMessage:
-               sm.handleFilterAddMsg(peer, msg)
+               sm.handleFilterAddMsg(peerID, msg)
 
        case *FilterClearMessage:
-               sm.handleFilterClearMsg(peer)
+               sm.handleFilterClearMsg(peerID)
 
        case *GetMerkleBlockMessage:
-               sm.handleGetMerkleBlockMsg(peer, msg)
+               sm.handleGetMerkleBlockMsg(peerID, msg)
 
        default:
                log.WithFields(log.Fields{
                        "module":       logModule,
-                       "peer":         basePeer.Addr(),
+                       "peerID":       peerID,
                        "message_type": reflect.TypeOf(msg),
                }).Error("unhandled message type")
        }
 }
 
-func (sm *SyncManager) SendStatus(peer BasePeer) error {
-       p := sm.peers.getPeer(peer.ID())
-       if p == nil {
-               return errors.New("invalid peer")
-       }
-
-       if err := p.sendStatus(sm.chain.BestBlockHeader()); err != nil {
-               sm.peers.removePeer(p.ID())
-               return err
+func (sm *SyncManager) SendStatus(peerID string) bool {
+       msg := NewStatusMessage(sm.chain.BestBlockHeader())
+       ok := sm.peers.SendMsg(peerID, BlockchainChannel, msg)
+       if !ok {
+               log.WithFields(log.Fields{"module": logModule}).Error("fail on send status msg")
        }
-       return nil
+       //todo: mark status
+       return ok
 }
 
 func (sm *SyncManager) Start() error {
@@ -487,8 +510,9 @@ func (sm *SyncManager) minedBroadcastLoop() {
                                log.WithFields(log.Fields{"module": logModule}).Error("event type error")
                                continue
                        }
-
-                       if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
+                       minedMsg, _ := newMinedBlockBroadcastMsg(&ev.Block, BlockchainChannel)
+                       if err := sm.peers.BroadcastMsg(minedMsg); err != nil {
+                               //if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
                                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
                                continue
                        }
index b4f702a..bb4a3b8 100644 (file)
@@ -7,7 +7,7 @@ import (
        "errors"
        "fmt"
 
-       "github.com/tendermint/go-wire"
+       wire "github.com/tendermint/go-wire"
 
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
@@ -33,8 +33,7 @@ const (
        MerkleRequestByte   = byte(0x60)
        MerkleResponseByte  = byte(0x61)
 
-       maxBlockchainResponseSize = 22020096 + 2
-       txsMsgMaxTxNum            = 1024
+       MaxBlockchainResponseSize = 22020096 + 2
 )
 
 //BlockchainMessage is a generic message for this reactor.
@@ -66,7 +65,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
        msgType = bz[0]
        n := int(0)
        r := bytes.NewReader(bz)
-       msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
+       msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, MaxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
        if err != nil && n != len(bz) {
                err = errors.New("DecodeMessage() had bytes left over")
        }
@@ -79,6 +78,10 @@ type GetBlockMessage struct {
        RawHash [32]byte
 }
 
+func NewGetBlockMessage(height uint64,hash [32]byte) *GetBlockMessage {
+       return &GetBlockMessage{Height: height,RawHash:hash}
+}
+
 //GetHash reutrn the hash of the request
 func (m *GetBlockMessage) GetHash() *bc.Hash {
        hash := bc.NewHash(m.RawHash)
@@ -453,7 +456,7 @@ type MerkleBlockMessage struct {
        Flags          []byte
 }
 
-func (m *MerkleBlockMessage) setRawBlockHeader(bh types.BlockHeader) error {
+func (m *MerkleBlockMessage) SetRawBlockHeader(bh types.BlockHeader) error {
        rawHeader, err := bh.MarshalText()
        if err != nil {
                return err
@@ -463,7 +466,7 @@ func (m *MerkleBlockMessage) setRawBlockHeader(bh types.BlockHeader) error {
        return nil
 }
 
-func (m *MerkleBlockMessage) setTxInfo(txHashes []*bc.Hash, txFlags []uint8, relatedTxs []*types.Tx) error {
+func (m *MerkleBlockMessage) SetTxInfo(txHashes []*bc.Hash, txFlags []uint8, relatedTxs []*types.Tx) error {
        for _, txHash := range txHashes {
                m.TxHashes = append(m.TxHashes, txHash.Byte32())
        }
@@ -479,7 +482,7 @@ func (m *MerkleBlockMessage) setTxInfo(txHashes []*bc.Hash, txFlags []uint8, rel
        return nil
 }
 
-func (m *MerkleBlockMessage) setStatusInfo(statusHashes []*bc.Hash, relatedStatuses []*bc.TxVerifyResult) error {
+func (m *MerkleBlockMessage) SetStatusInfo(statusHashes []*bc.Hash, relatedStatuses []*bc.TxVerifyResult) error {
        for _, statusHash := range statusHashes {
                m.StatusHashes = append(m.StatusHashes, statusHash.Byte32())
        }
diff --git a/netsync/message_broadcast.go b/netsync/message_broadcast.go
new file mode 100644 (file)
index 0000000..d0e4de5
--- /dev/null
@@ -0,0 +1,122 @@
+package netsync
+
+import (
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/protocol/bc/types"
+)
+
+//type BroadcastProcess interface {
+//     filter(ps *PeerSet) []*Peer
+//     mark(*Peer)
+//     getChan() byte
+//     getMsg() interface{}
+//     msgString() string
+//}
+
+type minedBlockBroadcastMsg struct {
+       block     *types.Block
+       msg       *MineBlockMessage
+       transChan byte
+}
+
+func newMinedBlockBroadcastMsg(block *types.Block, transChan byte) (*minedBlockBroadcastMsg, error) {
+       msg, err := NewMinedBlockMessage(block)
+       if err != nil {
+               return nil, err
+       }
+       return &minedBlockBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil
+}
+
+func (m *minedBlockBroadcastMsg) GetChan() byte {
+       return m.transChan
+}
+
+func (m *minedBlockBroadcastMsg) GetMsg() interface{} {
+       return m.msg
+}
+
+func (m *minedBlockBroadcastMsg) MsgString() string {
+       return m.msg.String()
+}
+
+func (m *minedBlockBroadcastMsg) Mark(ps *peers.PeerSet, peers []string) {
+       hash := m.block.Hash()
+       height := m.block.Height
+       for _, peer := range peers {
+               ps.MarkBlock(peer, &hash)
+               ps.MarkStatus(peer, height)
+       }
+}
+
+func (m *minedBlockBroadcastMsg) Filter(ps *peers.PeerSet) []string {
+       //TODO: SPV NODE FILTER
+       return ps.PeersWithoutBlock(m.block.Hash())
+}
+
+type statusBroadcastMsg struct {
+       header    *types.BlockHeader
+       msg       *StatusMessage
+       transChan byte
+}
+
+func newStatusBroadcastMsg(header *types.BlockHeader, transChan byte) (*statusBroadcastMsg, error) {
+       msg := NewStatusMessage(header)
+       return &statusBroadcastMsg{header: header, msg: msg, transChan: transChan}, nil
+}
+
+func (s *statusBroadcastMsg) GetChan() byte {
+       return s.transChan
+}
+
+func (s *statusBroadcastMsg) GetMsg() interface{} {
+       return s.msg
+}
+
+func (s *statusBroadcastMsg) MsgString() string {
+       return s.msg.String()
+}
+
+func (s *statusBroadcastMsg) Filter(ps *peers.PeerSet) []string {
+       return ps.PeersWithoutNewStatus(s.header.Height)
+}
+
+func (s *statusBroadcastMsg) Mark(ps *peers.PeerSet, peers []string) {
+       height := s.header.Height
+       for _, peer := range peers {
+               ps.MarkStatus(peer, height)
+       }
+}
+
+type txBroadcastMsg struct {
+       tx        *types.Tx
+       msg       *TransactionMessage
+       transChan byte
+}
+
+func newTxBroadcastMsg(tx *types.Tx, transChan byte) (*txBroadcastMsg, error) {
+       msg, _ := NewTransactionMessage(tx)
+       return &txBroadcastMsg{tx: tx, msg: msg, transChan: transChan}, nil
+}
+
+func (t *txBroadcastMsg) GetChan() byte {
+       return t.transChan
+}
+
+func (t *txBroadcastMsg) GetMsg() interface{} {
+       return t.msg
+}
+
+func (t *txBroadcastMsg) MsgString() string {
+       return t.msg.String()
+}
+
+func (t *txBroadcastMsg) Filter(ps *peers.PeerSet) []string {
+       //TODO:                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+       return ps.PeersWithoutTx(t.tx.ID)
+}
+
+func (t *txBroadcastMsg) Mark(ps *peers.PeerSet, peers []string) {
+       for _, peer := range peers {
+               ps.MarkTx(peer, &t.tx.ID)
+       }
+}
index 14bb71f..207f99b 100644 (file)
@@ -5,7 +5,6 @@ import (
        "testing"
 
        "github.com/davecgh/go-spew/spew"
-
        "github.com/vapor/consensus"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
@@ -40,7 +39,14 @@ var txs = []*types.Tx{
 }
 
 func TestTransactionMessage(t *testing.T) {
-       for _, tx := range txs {
+       wantStrings := [5]string{
+               "{tx_size: 104, tx_hash: 8ec07dcaa0d8966ebf7d4483cff08cf2f91b7fd054d74b9b36e6701851b6987f}",
+               "{tx_size: 106, tx_hash: b36797fadd33aa38794d4fef61a13214e17cd5441aee38f8cbdfe5a7863ee5ff}",
+               "{tx_size: 108, tx_hash: bf93b35da3c58f26896657f7cab5034536b6ab70b3a32e1de07bff6f94318a94}",
+               "{tx_size: 108, tx_hash: 82ba46e4d13ea302977564de1ea4053e0820f0c1296d404a05ed0adc3e3bac38}",
+               "{tx_size: 108, tx_hash: fb6640b67c734e124e34ce65f52f877f192c36caad0830e312799cc337749bd9}",
+       }
+       for i, tx := range txs {
                txMsg, err := NewTransactionMessage(tx)
                if err != nil {
                        t.Fatalf("create tx msg err:%s", err)
@@ -53,6 +59,10 @@ func TestTransactionMessage(t *testing.T) {
                if !reflect.DeepEqual(*tx.Tx, *gotTx.Tx) {
                        t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTx.Tx))
                }
+
+               if txMsg.String() != wantStrings[i] {
+                       t.Errorf("index:%d txs msg string test err. got:%s want:%s", i, txMsg.String(), wantStrings[i])
+               }
        }
 }
 
@@ -71,6 +81,11 @@ func TestTransactionsMessage(t *testing.T) {
                t.Fatal("txs msg test err: number of txs not match ")
        }
 
+       wantString := "{tx_num: 5}"
+       if txsMsg.String() != wantString {
+               t.Errorf("txs msg string test err. got:%s want:%s", txsMsg.String(), wantString)
+       }
+
        for i, tx := range txs {
                if !reflect.DeepEqual(tx.Tx, gotTxs[i].Tx) {
                        t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTxs[i].Tx))
@@ -82,7 +97,7 @@ var testBlock = &types.Block{
        BlockHeader: types.BlockHeader{
                Version:   1,
                Height:    0,
-               Timestamp: 1528945000000,
+               Timestamp: 1528945000,
                BlockCommitment: types.BlockCommitment{
                        TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
                        TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
@@ -105,18 +120,28 @@ func TestBlockMessage(t *testing.T) {
                t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
        }
 
+       wantString := "{block_height: 0, block_hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+       if blockMsg.String() != wantString {
+               t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+       }
+
        blockMsg.RawBlock[1] = blockMsg.RawBlock[1] + 0x1
        _, err = blockMsg.GetBlock()
        if err == nil {
                t.Fatalf("get mine block err")
        }
+
+       wantString = "{err: wrong message}"
+       if blockMsg.String() != wantString {
+               t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+       }
 }
 
 var testHeaders = []*types.BlockHeader{
        {
                Version:   1,
                Height:    0,
-               Timestamp: 1528945000000,
+               Timestamp: 1528945000,
                BlockCommitment: types.BlockCommitment{
                        TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
                        TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
@@ -125,7 +150,7 @@ var testHeaders = []*types.BlockHeader{
        {
                Version:   1,
                Height:    1,
-               Timestamp: 1528945000000,
+               Timestamp: 1528945000,
                BlockCommitment: types.BlockCommitment{
                        TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
                        TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
@@ -134,7 +159,7 @@ var testHeaders = []*types.BlockHeader{
        {
                Version:   1,
                Height:    3,
-               Timestamp: 1528945000000,
+               Timestamp: 1528945000,
                BlockCommitment: types.BlockCommitment{
                        TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
                        TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
@@ -156,14 +181,75 @@ func TestHeadersMessage(t *testing.T) {
        if !reflect.DeepEqual(gotHeaders, testHeaders) {
                t.Errorf("headers msg test err: got %s\nwant %s", spew.Sdump(gotHeaders), spew.Sdump(testHeaders))
        }
+
+       wantString := "{header_length: 3}"
+       if headersMsg.String() != wantString {
+               t.Errorf("headers msg test string err. got:%s want:%s", headersMsg.String(), wantString)
+       }
+
 }
 
 func TestGetBlockMessage(t *testing.T) {
-       getBlockMsg := GetBlockMessage{RawHash: [32]byte{0x01}}
-       gotHash := getBlockMsg.GetHash()
+       testCase := []struct {
+               height     uint64
+               rawHash    [32]byte
+               wantString string
+       }{
+               {
+                       height:     uint64(100),
+                       rawHash:    [32]byte{0x01},
+                       wantString: "{height: 100}",
+               },
+               {
+                       height:     uint64(0),
+                       rawHash:    [32]byte{0x01},
+                       wantString: "{hash: 0100000000000000000000000000000000000000000000000000000000000000}",
+               },
+       }
+       for i, c := range testCase {
+               getBlockMsg := NewGetBlockMessage(c.height, c.rawHash)
+               gotHash := getBlockMsg.GetHash()
+
+               if !reflect.DeepEqual(gotHash.Byte32(), c.rawHash) {
+                       t.Errorf("index:%d test get block msg err. got: %s want: %s", i, spew.Sdump(gotHash.Byte32()), spew.Sdump(c.rawHash))
+               }
+
+               if getBlockMsg.Height != c.height {
+                       t.Errorf("index:%d test get block msg err. got: %d want: %d", i, getBlockMsg.Height, c.height)
+               }
+               if getBlockMsg.String() != c.wantString {
+                       t.Errorf("index:%d test get block msg string err. got: %s want: %s", i, getBlockMsg.String(), c.wantString)
+               }
+
+       }
+}
+
+type testGetBlocksMessage struct {
+       blockLocator []*bc.Hash
+       stopHash     *bc.Hash
+}
+
+func TestGetBlocksMessage(t *testing.T) {
+       testMsg := testGetBlocksMessage{
+               blockLocator: []*bc.Hash{{V0: 0x01}, {V0: 0x02}, {V0: 0x03}},
+               stopHash:     &bc.Hash{V0: 0xaa, V2: 0x55},
+       }
+
+       getBlocksMsg := NewGetBlocksMessage(testMsg.blockLocator, testMsg.stopHash)
+       gotBlockLocator := getBlocksMsg.GetBlockLocator()
+       gotStopHash := getBlocksMsg.GetStopHash()
+
+       if !reflect.DeepEqual(gotBlockLocator, testMsg.blockLocator) {
+               t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotBlockLocator), spew.Sdump(testMsg.blockLocator))
+       }
+
+       if !reflect.DeepEqual(gotStopHash, testMsg.stopHash) {
+               t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotStopHash), spew.Sdump(testMsg.stopHash))
+       }
 
-       if !reflect.DeepEqual(gotHash.Byte32(), getBlockMsg.RawHash) {
-               t.Errorf("get block msg test err: got %s\nwant %s", spew.Sdump(gotHash.Byte32()), spew.Sdump(getBlockMsg.RawHash))
+       wantString := "{stop_hash: 00000000000000aa000000000000000000000000000000550000000000000000}"
+       if getBlocksMsg.String() != wantString {
+               t.Errorf("get headers msg string test err: got:%s want:%s", getBlocksMsg.String(), wantString)
        }
 }
 
@@ -188,6 +274,11 @@ func TestGetHeadersMessage(t *testing.T) {
        if !reflect.DeepEqual(testMsg.stopHash, gotStopHash) {
                t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotStopHash), spew.Sdump(testMsg.stopHash))
        }
+
+       wantString := "{stop_hash: 00000000000000aa000000000000000000000000000000550000000000000000}"
+       if getHeadersMsg.String() != wantString {
+               t.Errorf("get headers msg string test err: got:%s want:%s", getHeadersMsg.String(), wantString)
+       }
 }
 
 var testBlocks = []*types.Block{
@@ -195,7 +286,7 @@ var testBlocks = []*types.Block{
                BlockHeader: types.BlockHeader{
                        Version:   1,
                        Height:    0,
-                       Timestamp: 1528945000000,
+                       Timestamp: 1528945000,
                        BlockCommitment: types.BlockCommitment{
                                TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
                                TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
@@ -206,7 +297,7 @@ var testBlocks = []*types.Block{
                BlockHeader: types.BlockHeader{
                        Version:   1,
                        Height:    0,
-                       Timestamp: 1528945000000,
+                       Timestamp: 1528945000,
                        BlockCommitment: types.BlockCommitment{
                                TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
                                TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
@@ -230,6 +321,11 @@ func TestBlocksMessage(t *testing.T) {
                        t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
                }
        }
+
+       wantString := "{blocks_length: 2}"
+       if blocksMsg.String() != wantString {
+               t.Errorf("block msg string test err: got:%s want:%s", blocksMsg.String(), wantString)
+       }
 }
 
 func TestStatusMessage(t *testing.T) {
@@ -238,4 +334,137 @@ func TestStatusMessage(t *testing.T) {
        if !reflect.DeepEqual(*gotHash, testBlock.Hash()) {
                t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotHash), spew.Sdump(testBlock.Hash()))
        }
+
+       wantString := "{height: 0, hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+       if statusResponseMsg.String() != wantString {
+               t.Errorf("status response msg string test err: got:%s want:%s", statusResponseMsg.String(), wantString)
+       }
+}
+
+func TestMinedBlockMessage(t *testing.T) {
+       blockMsg, err := NewMinedBlockMessage(testBlock)
+       if err != nil {
+               t.Fatalf("create new mine block msg err:%s", err)
+       }
+
+       gotBlock, err := blockMsg.GetMineBlock()
+       if err != nil {
+               t.Fatalf("got block err:%s", err)
+       }
+
+       if !reflect.DeepEqual(gotBlock.BlockHeader, testBlock.BlockHeader) {
+               t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
+       }
+
+       wantString := "{block_height: 0, block_hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+       if blockMsg.String() != wantString {
+               t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+       }
+
+       blockMsg.RawBlock[1] = blockMsg.RawBlock[1] + 0x1
+       _, err = blockMsg.GetMineBlock()
+       if err == nil {
+               t.Fatalf("get mine block err")
+       }
+
+       wantString = "{err: wrong message}"
+       if blockMsg.String() != wantString {
+               t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+       }
+}
+
+func TestFilterLoadMessage(t *testing.T) {
+       filterLoadMsg := &FilterLoadMessage{
+               Addresses: [][]byte{{0x01}, {0x01, 0x02}},
+       }
+
+       wantString := "{addresses_length: 2}"
+       if filterLoadMsg.String() != wantString {
+               t.Errorf("filter load msg test err. got:%s want:%s", filterLoadMsg.String(), wantString)
+       }
+}
+
+func TestFilterAddMessage(t *testing.T) {
+       filterAddMessage := &FilterAddMessage{
+               Address: []byte{0x01, 0x02, 0x03},
+       }
+
+       wantString := "{address: 010203}"
+       if filterAddMessage.String() != wantString {
+               t.Errorf("filter add msg test err. got:%s want:%s", filterAddMessage.String(), wantString)
+       }
+}
+
+func TestFilterClearMessage(t *testing.T) {
+       filterClearMessage := &FilterClearMessage{}
+
+       wantString := "{}"
+       if filterClearMessage.String() != wantString {
+               t.Errorf("filter clear msg test err. got:%s want:%s", filterClearMessage.String(), wantString)
+       }
+}
+
+func TestGetMerkleBlockMessage(t *testing.T) {
+       testCase := []struct {
+               height     uint64
+               rawHash    [32]byte
+               wantString string
+       }{
+               {
+                       height:     uint64(100),
+                       rawHash:    [32]byte{0x01},
+                       wantString: "{height: 100}",
+               },
+               {
+                       height:     uint64(0),
+                       rawHash:    [32]byte{0x01},
+                       wantString: "{hash: 0100000000000000000000000000000000000000000000000000000000000000}",
+               },
+       }
+       for i, c := range testCase {
+               getMerkleBlockMsg := &GetMerkleBlockMessage{
+                       Height:  c.height,
+                       RawHash: c.rawHash,
+               }
+               gotHash := getMerkleBlockMsg.GetHash()
+
+               if !reflect.DeepEqual(gotHash.Byte32(), c.rawHash) {
+                       t.Errorf("index:%d test get merkle block msg err. got: %s want: %s", i, spew.Sdump(gotHash.Byte32()), spew.Sdump(c.rawHash))
+               }
+
+               if getMerkleBlockMsg.Height != c.height {
+                       t.Errorf("index:%d test get merkle block msg err. got: %d want: %d", i, getMerkleBlockMsg.Height, c.height)
+               }
+               if getMerkleBlockMsg.String() != c.wantString {
+                       t.Errorf("index:%d test get merkle block msg string err. got: %s want: %s", i, getMerkleBlockMsg.String(), c.wantString)
+               }
+       }
+}
+
+func TestMerkleBlockMessage(t *testing.T) {
+       blockHeader := types.BlockHeader{
+               Version:   1,
+               Height:    0,
+               Timestamp: 1528945000,
+               BlockCommitment: types.BlockCommitment{
+                       TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
+                       TransactionStatusHash:  bc.Hash{V0: uint64(0x55)},
+               },
+       }
+       txHashes := []*bc.Hash{{V0: 123, V1: 234, V2: 345, V3: 456}}
+       txFlags := []uint8{0x1, 0x2}
+       relatedTxs := txs
+       statusHashes := []*bc.Hash{{V0: 123, V1: 234, V2: 345, V3: 456}}
+       relatedStatuses := []*bc.TxVerifyResult{{StatusFail: false}, {StatusFail: true}}
+       merkleBlockMsg := NewMerkleBlockMessage()
+       merkleBlockMsg.SetRawBlockHeader(blockHeader)
+       merkleBlockMsg.SetTxInfo(txHashes, txFlags, relatedTxs)
+       merkleBlockMsg.SetStatusInfo(statusHashes, relatedStatuses)
+       if !reflect.DeepEqual(merkleBlockMsg.Flags, txFlags) {
+               t.Errorf("test get merkle block msg err. got: %s want: %s", merkleBlockMsg.Flags, txFlags)
+       }
+       wantString := "{}"
+       if merkleBlockMsg.String() != wantString {
+               t.Errorf("merkle block msg test err. got:%s want:%s", merkleBlockMsg.String(), wantString)
+       }
 }
diff --git a/netsync/peer.go b/netsync/peer.go
deleted file mode 100644 (file)
index 794501b..0000000
+++ /dev/null
@@ -1,549 +0,0 @@
-package netsync
-
-import (
-       "encoding/hex"
-       "net"
-       "reflect"
-       "sync"
-
-       log "github.com/sirupsen/logrus"
-       "github.com/tendermint/tmlibs/flowrate"
-       "gopkg.in/fatih/set.v0"
-
-       "github.com/vapor/consensus"
-       "github.com/vapor/errors"
-       "github.com/vapor/p2p/trust"
-       "github.com/vapor/protocol/bc"
-       "github.com/vapor/protocol/bc/types"
-)
-
-const (
-       maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
-       maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
-       defaultBanThreshold = uint32(100)
-)
-
-var errSendStatusMsg = errors.New("send status msg fail")
-
-//BasePeer is the interface for connection level peer
-type BasePeer interface {
-       Addr() net.Addr
-       ID() string
-       ServiceFlag() consensus.ServiceFlag
-       TrafficStatus() (*flowrate.Status, *flowrate.Status)
-       TrySend(byte, interface{}) bool
-       IsLAN() bool
-}
-
-//BasePeerSet is the intergace for connection level peer manager
-type BasePeerSet interface {
-       AddBannedPeer(string) error
-       StopPeerGracefully(string)
-}
-
-// PeerInfo indicate peer status snap
-type PeerInfo struct {
-       ID                  string `json:"peer_id"`
-       RemoteAddr          string `json:"remote_addr"`
-       Height              uint64 `json:"height"`
-       Ping                string `json:"ping"`
-       Duration            string `json:"duration"`
-       TotalSent           int64  `json:"total_sent"`
-       TotalReceived       int64  `json:"total_received"`
-       AverageSentRate     int64  `json:"average_sent_rate"`
-       AverageReceivedRate int64  `json:"average_received_rate"`
-       CurrentSentRate     int64  `json:"current_sent_rate"`
-       CurrentReceivedRate int64  `json:"current_received_rate"`
-}
-
-type peer struct {
-       BasePeer
-       mtx         sync.RWMutex
-       services    consensus.ServiceFlag
-       height      uint64
-       hash        *bc.Hash
-       banScore    trust.DynamicBanScore
-       knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
-       knownBlocks *set.Set // Set of block hashes known to be known by this peer
-       knownStatus uint64   // Set of chain status known to be known by this peer
-       filterAdds  *set.Set // Set of addresses that the spv node cares about.
-}
-
-func newPeer(basePeer BasePeer) *peer {
-       return &peer{
-               BasePeer:    basePeer,
-               services:    basePeer.ServiceFlag(),
-               knownTxs:    set.New(),
-               knownBlocks: set.New(),
-               filterAdds:  set.New(),
-       }
-}
-
-func (p *peer) Height() uint64 {
-       p.mtx.RLock()
-       defer p.mtx.RUnlock()
-       return p.height
-}
-
-func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
-       score := p.banScore.Increase(persistent, transient)
-       if score > defaultBanThreshold {
-               log.WithFields(log.Fields{
-                       "module":  logModule,
-                       "address": p.Addr(),
-                       "score":   score,
-                       "reason":  reason,
-               }).Errorf("banning and disconnecting")
-               return true
-       }
-
-       warnThreshold := defaultBanThreshold >> 1
-       if score > warnThreshold {
-               log.WithFields(log.Fields{
-                       "module":  logModule,
-                       "address": p.Addr(),
-                       "score":   score,
-                       "reason":  reason,
-               }).Warning("ban score increasing")
-       }
-       return false
-}
-
-func (p *peer) addFilterAddress(address []byte) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-
-       if p.filterAdds.Size() >= maxFilterAddressCount {
-               log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
-               return
-       }
-       if len(address) > maxFilterAddressSize {
-               log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
-               return
-       }
-
-       p.filterAdds.Add(hex.EncodeToString(address))
-}
-
-func (p *peer) addFilterAddresses(addresses [][]byte) {
-       if !p.filterAdds.IsEmpty() {
-               p.filterAdds.Clear()
-       }
-       for _, address := range addresses {
-               p.addFilterAddress(address)
-       }
-}
-
-func (p *peer) getBlockByHeight(height uint64) bool {
-       msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
-       return p.TrySend(BlockchainChannel, msg)
-}
-
-func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
-       msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
-       return p.TrySend(BlockchainChannel, msg)
-}
-
-func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
-       msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
-       return p.TrySend(BlockchainChannel, msg)
-}
-
-func (p *peer) getPeerInfo() *PeerInfo {
-       p.mtx.RLock()
-       defer p.mtx.RUnlock()
-
-       sentStatus, receivedStatus := p.TrafficStatus()
-       ping := sentStatus.Idle - receivedStatus.Idle
-       if receivedStatus.Idle > sentStatus.Idle {
-               ping = -ping
-       }
-
-       return &PeerInfo{
-               ID:                  p.ID(),
-               RemoteAddr:          p.Addr().String(),
-               Height:              p.height,
-               Ping:                ping.String(),
-               Duration:            sentStatus.Duration.String(),
-               TotalSent:           sentStatus.Bytes,
-               TotalReceived:       receivedStatus.Bytes,
-               AverageSentRate:     sentStatus.AvgRate,
-               AverageReceivedRate: receivedStatus.AvgRate,
-               CurrentSentRate:     sentStatus.CurRate,
-               CurrentReceivedRate: receivedStatus.CurRate,
-       }
-}
-
-func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
-       var relatedTxs []*types.Tx
-       var relatedStatuses []*bc.TxVerifyResult
-       for i, tx := range txs {
-               if p.isRelatedTx(tx) {
-                       relatedTxs = append(relatedTxs, tx)
-                       relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
-               }
-       }
-       return relatedTxs, relatedStatuses
-}
-
-func (p *peer) isRelatedTx(tx *types.Tx) bool {
-       for _, input := range tx.Inputs {
-               switch inp := input.TypedInput.(type) {
-               case *types.SpendInput:
-                       if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
-                               return true
-                       }
-               }
-       }
-       for _, output := range tx.Outputs {
-               if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
-                       return true
-               }
-       }
-       return false
-}
-
-func (p *peer) isSPVNode() bool {
-       return !p.services.IsEnable(consensus.SFFullNode)
-}
-
-func (p *peer) markBlock(hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-
-       for p.knownBlocks.Size() >= maxKnownBlocks {
-               p.knownBlocks.Pop()
-       }
-       p.knownBlocks.Add(hash.String())
-}
-
-func (p *peer) markNewStatus(height uint64) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-
-       p.knownStatus = height
-}
-
-func (p *peer) markTransaction(hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-
-       for p.knownTxs.Size() >= maxKnownTxs {
-               p.knownTxs.Pop()
-       }
-       p.knownTxs.Add(hash.String())
-}
-
-func (p *peer) sendBlock(block *types.Block) (bool, error) {
-       msg, err := NewBlockMessage(block)
-       if err != nil {
-               return false, errors.Wrap(err, "fail on NewBlockMessage")
-       }
-
-       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       if ok {
-               blcokHash := block.Hash()
-               p.knownBlocks.Add(blcokHash.String())
-       }
-       return ok, nil
-}
-
-func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
-       msg, err := NewBlocksMessage(blocks)
-       if err != nil {
-               return false, errors.Wrap(err, "fail on NewBlocksMessage")
-       }
-
-       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-               return ok, nil
-       }
-
-       for _, block := range blocks {
-               blcokHash := block.Hash()
-               p.knownBlocks.Add(blcokHash.String())
-       }
-       return true, nil
-}
-
-func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
-       msg, err := NewHeadersMessage(headers)
-       if err != nil {
-               return false, errors.New("fail on NewHeadersMessage")
-       }
-
-       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return ok, nil
-}
-
-func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
-       msg := NewMerkleBlockMessage()
-       if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
-               return false, err
-       }
-
-       relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
-
-       txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
-       if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
-               return false, nil
-       }
-
-       statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
-       if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
-               return false, nil
-       }
-
-       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return ok, nil
-}
-
-func (p *peer) sendTransactions(txs []*types.Tx) error {
-       validTxs := make([]*types.Tx, 0, len(txs))
-       for i, tx := range txs {
-               if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
-                       continue
-               }
-
-               validTxs = append(validTxs, tx)
-               if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
-                       continue
-               }
-
-               msg, err := NewTransactionsMessage(validTxs)
-               if err != nil {
-                       return err
-               }
-
-               if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       return errors.New("failed to send txs msg")
-               }
-
-               for _, validTx := range validTxs {
-                       p.knownTxs.Add(validTx.ID.String())
-               }
-
-               validTxs = make([]*types.Tx, 0, len(txs))
-       }
-
-       return nil
-}
-
-func (p *peer) sendStatus(header *types.BlockHeader) error {
-       msg := NewStatusMessage(header)
-       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-               return errSendStatusMsg
-       }
-       p.markNewStatus(header.Height)
-       return nil
-}
-
-func (p *peer) setStatus(height uint64, hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-       p.height = height
-       p.hash = hash
-}
-
-type peerSet struct {
-       BasePeerSet
-       mtx   sync.RWMutex
-       peers map[string]*peer
-}
-
-// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet(basePeerSet BasePeerSet) *peerSet {
-       return &peerSet{
-               BasePeerSet: basePeerSet,
-               peers:       make(map[string]*peer),
-       }
-}
-
-func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
-       ps.mtx.Lock()
-       peer := ps.peers[peerID]
-       ps.mtx.Unlock()
-
-       if peer == nil {
-               return
-       }
-       if ban := peer.addBanScore(persistent, transient, reason); !ban {
-               return
-       }
-       if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
-               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
-       }
-       ps.removePeer(peerID)
-}
-
-func (ps *peerSet) addPeer(peer BasePeer) {
-       ps.mtx.Lock()
-       defer ps.mtx.Unlock()
-
-       if _, ok := ps.peers[peer.ID()]; !ok {
-               ps.peers[peer.ID()] = newPeer(peer)
-               return
-       }
-       log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
-}
-
-func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       var bestPeer *peer
-       for _, p := range ps.peers {
-               if !p.services.IsEnable(flag) {
-                       continue
-               }
-               if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
-                       bestPeer = p
-               }
-       }
-       return bestPeer
-}
-
-func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
-       msg, err := NewMinedBlockMessage(block)
-       if err != nil {
-               return errors.Wrap(err, "fail on broadcast mined block")
-       }
-
-       hash := block.Hash()
-       peers := ps.peersWithoutBlock(&hash)
-       for _, peer := range peers {
-               if peer.isSPVNode() {
-                       continue
-               }
-               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
-                       ps.removePeer(peer.ID())
-                       continue
-               }
-               peer.markBlock(&hash)
-               peer.markNewStatus(block.Height)
-       }
-       return nil
-}
-
-func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
-       msg := NewStatusMessage(&bestBlock.BlockHeader)
-       peers := ps.peersWithoutNewStatus(bestBlock.Height)
-       for _, peer := range peers {
-               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       ps.removePeer(peer.ID())
-                       continue
-               }
-
-               peer.markNewStatus(bestBlock.Height)
-       }
-       return nil
-}
-
-func (ps *peerSet) broadcastTx(tx *types.Tx) error {
-       msg, err := NewTransactionMessage(tx)
-       if err != nil {
-               return errors.Wrap(err, "fail on broadcast tx")
-       }
-
-       peers := ps.peersWithoutTx(&tx.ID)
-       for _, peer := range peers {
-               if peer.isSPVNode() && !peer.isRelatedTx(tx) {
-                       continue
-               }
-               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       log.WithFields(log.Fields{
-                               "module":  logModule,
-                               "peer":    peer.Addr(),
-                               "type":    reflect.TypeOf(msg),
-                               "message": msg.String(),
-                       }).Warning("send message to peer error")
-                       ps.removePeer(peer.ID())
-                       continue
-               }
-               peer.markTransaction(&tx.ID)
-       }
-       return nil
-}
-
-func (ps *peerSet) errorHandler(peerID string, err error) {
-       if errors.Root(err) == errPeerMisbehave {
-               ps.addBanScore(peerID, 20, 0, err.Error())
-       } else {
-               ps.removePeer(peerID)
-       }
-}
-
-// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) getPeer(id string) *peer {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-       return ps.peers[id]
-}
-
-func (ps *peerSet) getPeerInfos() []*PeerInfo {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       result := []*PeerInfo{}
-       for _, peer := range ps.peers {
-               result = append(result, peer.getPeerInfo())
-       }
-       return result
-}
-
-func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
-       ps.mtx.Lock()
-       peer := ps.peers[peerID]
-       ps.mtx.Unlock()
-
-       if peer == nil {
-               return
-       }
-       peer.markTransaction(&txHash)
-}
-
-func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       peers := []*peer{}
-       for _, peer := range ps.peers {
-               if !peer.knownBlocks.Has(hash.String()) {
-                       peers = append(peers, peer)
-               }
-       }
-       return peers
-}
-
-func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       var peers []*peer
-       for _, peer := range ps.peers {
-               if peer.knownStatus < height {
-                       peers = append(peers, peer)
-               }
-       }
-       return peers
-}
-
-func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       peers := []*peer{}
-       for _, peer := range ps.peers {
-               if !peer.knownTxs.Has(hash.String()) {
-                       peers = append(peers, peer)
-               }
-       }
-       return peers
-}
-
-func (ps *peerSet) removePeer(peerID string) {
-       ps.mtx.Lock()
-       delete(ps.peers, peerID)
-       ps.mtx.Unlock()
-       ps.StopPeerGracefully(peerID)
-}
diff --git a/netsync/peers/peer.go b/netsync/peers/peer.go
new file mode 100644 (file)
index 0000000..1b50561
--- /dev/null
@@ -0,0 +1,249 @@
+package peers
+
+import (
+       "encoding/hex"
+       "net"
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+       "github.com/tendermint/tmlibs/flowrate"
+       "gopkg.in/fatih/set.v0"
+
+       "github.com/vapor/consensus"
+       "github.com/vapor/errors"
+       "github.com/vapor/p2p/trust"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+const (
+       maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+       maxKnownBlocks        = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
+       defaultBanThreshold   = uint32(100)
+       maxFilterAddressSize  = 50
+       maxFilterAddressCount = 1000
+
+       logModule = "peers"
+)
+
+var (
+       errSendStatusMsg = errors.New("send status msg fail")
+       ErrPeerMisbehave = errors.New("peer is misbehave")
+)
+
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+       Addr() net.Addr
+       ID() string
+       ServiceFlag() consensus.ServiceFlag
+       TrafficStatus() (*flowrate.Status, *flowrate.Status)
+       TrySend(byte, interface{}) bool
+       IsLAN() bool
+}
+
+// PeerInfo indicate peer status snap
+type PeerInfo struct {
+       ID                  string `json:"peer_id"`
+       RemoteAddr          string `json:"remote_addr"`
+       Height              uint64 `json:"height"`
+       Ping                string `json:"ping"`
+       Duration            string `json:"duration"`
+       TotalSent           int64  `json:"total_sent"`
+       TotalReceived       int64  `json:"total_received"`
+       AverageSentRate     int64  `json:"average_sent_rate"`
+       AverageReceivedRate int64  `json:"average_received_rate"`
+       CurrentSentRate     int64  `json:"current_sent_rate"`
+       CurrentReceivedRate int64  `json:"current_received_rate"`
+}
+
+type Peer struct {
+       BasePeer
+       mtx         sync.RWMutex
+       services    consensus.ServiceFlag
+       height      uint64
+       hash        *bc.Hash
+       banScore    trust.DynamicBanScore
+       knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks *set.Set // Set of block hashes known to be known by this peer
+       knownStatus uint64   // Set of chain status known to be known by this peer
+       filterAdds  *set.Set // Set of addresses that the spv node cares about.
+}
+
+func newPeer(basePeer BasePeer) *Peer {
+       return &Peer{
+               BasePeer:    basePeer,
+               services:    basePeer.ServiceFlag(),
+               knownTxs:    set.New(),
+               knownBlocks: set.New(),
+               filterAdds:  set.New(),
+       }
+}
+
+func (p *Peer) bestHeight() uint64 {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+       return p.height
+}
+
+func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
+       score := p.banScore.Increase(persistent, transient)
+       if score > defaultBanThreshold {
+               log.WithFields(log.Fields{
+                       "module":  logModule,
+                       "address": p.Addr(),
+                       "score":   score,
+                       "reason":  reason,
+               }).Errorf("banning and disconnecting")
+               return true
+       }
+
+       warnThreshold := defaultBanThreshold >> 1
+       if score > warnThreshold {
+               log.WithFields(log.Fields{
+                       "module":  logModule,
+                       "address": p.Addr(),
+                       "score":   score,
+                       "reason":  reason,
+               }).Warning("ban score increasing")
+       }
+       return false
+}
+
+func (p *Peer) addFilterAddress(address []byte) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       if p.filterAdds.Size() >= maxFilterAddressCount {
+               log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
+               return
+       }
+       if len(address) > maxFilterAddressSize {
+               log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
+               return
+       }
+
+       p.filterAdds.Add(hex.EncodeToString(address))
+}
+
+func (p *Peer) addFilterAddresses(addresses [][]byte) {
+       if !p.filterAdds.IsEmpty() {
+               p.filterAdds.Clear()
+       }
+       for _, address := range addresses {
+               p.addFilterAddress(address)
+       }
+}
+
+//func (p *Peer) GetBlockByHeight(height uint64) bool {
+//     return p.TrySend(BlockchainChannel, msg)
+//}
+
+//func (p *Peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+//     return p.TrySend(BlockchainChannel, msg)
+//}
+
+//func (p *Peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+//     msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+//     return p.TrySend(BlockchainChannel, msg)
+//}
+
+func (p *Peer) getPeerInfo() *PeerInfo {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+
+       sentStatus, receivedStatus := p.TrafficStatus()
+       ping := sentStatus.Idle - receivedStatus.Idle
+       if receivedStatus.Idle > sentStatus.Idle {
+               ping = -ping
+       }
+
+       return &PeerInfo{
+               ID:                  p.ID(),
+               RemoteAddr:          p.Addr().String(),
+               Height:              p.height,
+               Ping:                ping.String(),
+               Duration:            sentStatus.Duration.String(),
+               TotalSent:           sentStatus.Bytes,
+               TotalReceived:       receivedStatus.Bytes,
+               AverageSentRate:     sentStatus.AvgRate,
+               AverageReceivedRate: receivedStatus.AvgRate,
+               CurrentSentRate:     sentStatus.CurRate,
+               CurrentReceivedRate: receivedStatus.CurRate,
+       }
+}
+
+func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+       var relatedTxs []*types.Tx
+       var relatedStatuses []*bc.TxVerifyResult
+       for i, tx := range txs {
+               if p.isRelatedTx(tx) {
+                       relatedTxs = append(relatedTxs, tx)
+                       relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
+               }
+       }
+       return relatedTxs, relatedStatuses
+}
+
+func (p *Peer) isRelatedTx(tx *types.Tx) bool {
+       for _, input := range tx.Inputs {
+               switch inp := input.TypedInput.(type) {
+               case *types.SpendInput:
+                       if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+                               return true
+                       }
+               }
+       }
+       for _, output := range tx.Outputs {
+               if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (p *Peer) isSPVNode() bool {
+       return !p.services.IsEnable(consensus.SFFullNode)
+}
+
+func (p *Peer) markBlock(hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       for p.knownBlocks.Size() >= maxKnownBlocks {
+               p.knownBlocks.Pop()
+       }
+       p.knownBlocks.Add(hash.String())
+}
+
+func (p *Peer) markNewStatus(height uint64) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       p.knownStatus = height
+}
+
+func (p *Peer) markTransaction(hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       for p.knownTxs.Size() >= maxKnownTxs {
+               p.knownTxs.Pop()
+       }
+       p.knownTxs.Add(hash.String())
+}
+
+//func (p *Peer) sendStatus(header *types.BlockHeader) error {
+//     msg := NewStatusMessage(header)
+//     if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+//             return errSendStatusMsg
+//     }
+//     p.markNewStatus(header.Height)
+//     return nil
+//}
+
+func (p *Peer) setStatus(height uint64, hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       p.height = height
+       p.hash = hash
+}
diff --git a/netsync/peers/peer_set.go b/netsync/peers/peer_set.go
new file mode 100644 (file)
index 0000000..7db0cf4
--- /dev/null
@@ -0,0 +1,149 @@
+package peers
+
+import (
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/consensus"
+       "github.com/vapor/errors"
+       "github.com/vapor/protocol/bc"
+)
+
+/*
+//添加删除查询节点
+AddPeer(peer BasePeer)
+RemovePeer(peerID string)
+GetPeer(id string) *peer
+BestPeerInfo(flag consensus.ServiceFlag) (string, uint64)
+GetPeerInfo(peerID string) *PeerInfo
+GetPeerInfos() []*PeerInfo
+SetStatus(peerID string, height uint64, hash *bc.Hash)
+
+//节点错误处理
+AddBanScore(peerID string, persistent, transient uint32, reason string)
+ErrorHandler(peerID string, err error)
+*/
+
+//BasePeerSet is the intergace for connection level peer manager
+type BasePeerSet interface {
+       AddBannedPeer(string) error
+       StopPeerGracefully(string)
+}
+
+type PeerSet struct {
+       BasePeerSet
+       mtx   sync.RWMutex
+       peers map[string]*Peer
+}
+
+// NewPeerSet creates a new Peer set to track the active participants.
+func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
+       return &PeerSet{
+               BasePeerSet: basePeerSet,
+               peers:       make(map[string]*Peer),
+       }
+}
+
+func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
+       ps.mtx.Lock()
+       peer := ps.peers[peerID]
+       ps.mtx.Unlock()
+
+       if peer == nil {
+               return
+       }
+       if ban := peer.addBanScore(persistent, transient, reason); !ban {
+               return
+       }
+       if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
+       }
+       ps.RemovePeer(peerID)
+}
+
+func (ps *PeerSet) AddPeer(peer BasePeer) {
+       ps.mtx.Lock()
+       defer ps.mtx.Unlock()
+
+       if _, ok := ps.peers[peer.ID()]; !ok {
+               ps.peers[peer.ID()] = newPeer(peer)
+               return
+       }
+       log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
+}
+
+func (ps *PeerSet) BestPeerInfo(flag consensus.ServiceFlag) (string, uint64) {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var bestPeer *Peer
+       for _, p := range ps.peers {
+               if !p.services.IsEnable(flag) {
+                       continue
+               }
+               if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
+                       bestPeer = p
+               }
+       }
+       if bestPeer == nil {
+               return "", 0
+       }
+       return bestPeer.ID(), bestPeer.bestHeight()
+}
+
+func (ps *PeerSet) ErrorHandler(peerID string, err error) {
+       if errors.Root(err) == ErrPeerMisbehave {
+               ps.AddBanScore(peerID, 20, 0, err.Error())
+       } else {
+               ps.RemovePeer(peerID)
+       }
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *PeerSet) GetPeer(id string) *Peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+       return ps.peers[id]
+}
+
+func (ps *PeerSet) GetPeerInfo(peerID string) *PeerInfo {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return nil
+       }
+
+       return peer.getPeerInfo()
+}
+
+func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       result := []*PeerInfo{}
+       for _, peer := range ps.peers {
+               result = append(result, peer.getPeerInfo())
+       }
+       return result
+}
+
+func (ps *PeerSet) RemovePeer(peerID string) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+
+       ps.mtx.Lock()
+       delete(ps.peers, peerID)
+       ps.mtx.Unlock()
+       ps.StopPeerGracefully(peerID)
+}
+
+func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+
+       peer.setStatus(height, hash)
+}
diff --git a/netsync/peers/transfer.go b/netsync/peers/transfer.go
new file mode 100644 (file)
index 0000000..a4e78ab
--- /dev/null
@@ -0,0 +1,225 @@
+package peers
+
+import (
+       "reflect"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+//防止重传
+//MarkBlock(peerID string, hash *bc.Hash)
+//MarkTx(peerID string, hash *bc.Hash)
+//peersWithoutBlock(hash *bc.Hash) []*peer
+//peersWithoutNewStatus(height uint64) []*peer
+//peersWithoutTx(hash *bc.Hash) []*peer
+
+//过滤有效交易
+//AddFilterAddresses(peerID string, addresses [][]byte)
+//ClearFilterAdds(peerID string)
+//FilterValidTxs(peerID string, txs []*types.Tx)
+//GetRelatedTxAndStatus(peerID string, txs []*types.Tx, txStatuses *bc.TransactionStatus)
+
+//传输
+//BroadcastMinedBlock(block *types.Block)
+//BroadcastNewStatus(bestBlock *types.Block) error
+//BroadcastTx(tx *types.Tx) error
+//SendMsg(peerID string, msgChannel byte, msg interface{})
+
+func (ps *PeerSet) AddFilterAddresses(peerID string, addresses [][]byte) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.addFilterAddresses(addresses)
+}
+
+type BroadcastMsg interface {
+       Filter(ps *PeerSet) []string
+       Mark(ps *PeerSet, peers []string)
+       GetChan() byte
+       GetMsg() interface{}
+       MsgString() string
+}
+
+func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
+       peers := bm.Filter(ps)
+       peersSuccess := make([]string, 0)
+       for _, peer := range peers {
+               if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
+                       log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
+                       continue
+               }
+               peersSuccess = append(peersSuccess, peer)
+       }
+       bm.Mark(ps, peersSuccess)
+       return nil
+}
+
+//func (ps *PeerSet) BroadcastMinedBlock(block *types.Block) error {
+//     msg, err := NewMinedBlockMessage(block)
+//     if err != nil {
+//             return errors.Wrap(err, "fail on broadcast mined block")
+//     }
+//
+//     hash := block.Hash()
+//     peers := ps.peersWithoutBlock(&hash)
+//     for _, peer := range peers {
+//             if peer.isSPVNode() {
+//                     continue
+//             }
+//
+//             if ok := ps.SendMsg(peer.ID(), BlockchainChannel, msg); !ok {
+//                     log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
+//                     continue
+//             }
+//
+//             peer.markBlock(&hash)
+//             peer.markNewStatus(block.Height)
+//     }
+//     return nil
+//}
+
+//func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
+//     msg := NewStatusMessage(&bestBlock.BlockHeader)
+//     peers := ps.peersWithoutNewStatus(bestBlock.Height)
+//     for _, peer := range peers {
+//             if ok := ps.SendMsg(peer.ID(), BlockchainChannel, msg); !ok {
+//                     log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
+//                     continue
+//             }
+//             peer.markNewStatus(bestBlock.Height)
+//     }
+//     return nil
+//}
+
+//func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
+//     msg, err := NewTransactionMessage(tx)
+//     if err != nil {
+//             return errors.Wrap(err, "fail on broadcast tx")
+//     }
+//
+//     peers := ps.peersWithoutTx(&tx.ID)
+//     for _, peer := range peers {
+//             if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+//                     continue
+//             }
+//             if ok := ps.SendMsg(peer.ID(), BlockchainChannel, msg); !ok {
+//                     log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
+//                     continue
+//             }
+//             peer.markTransaction(&tx.ID)
+//     }
+//     return nil
+//}
+
+func (ps *PeerSet) ClearFilterAdds(peerID string) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.filterAdds.Clear()
+}
+
+func (ps *PeerSet) FilterValidTxs(peerID string, txs []*types.Tx) []*types.Tx {
+       validTxs := make([]*types.Tx, 0, len(txs))
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return validTxs
+       }
+
+       for _, tx := range txs {
+               if peer.isSPVNode() && !peer.isRelatedTx(tx) || peer.knownTxs.Has(tx.ID.String()) {
+                       continue
+               }
+
+               validTxs = append(validTxs, tx)
+       }
+       return validTxs
+}
+
+func (ps *PeerSet) GetRelatedTxAndStatus(peerID string, txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return nil, nil
+       }
+       return peer.getRelatedTxAndStatus(txs, txStatuses)
+}
+
+func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.markBlock(hash)
+}
+
+func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.markNewStatus(height)
+}
+
+func (ps *PeerSet) MarkTx(peerID string, hash *bc.Hash) {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return
+       }
+       peer.markTransaction(hash)
+}
+
+func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []string
+       for _, peer := range ps.peers {
+               if !peer.knownBlocks.Has(hash.String()) {
+                       peers = append(peers, peer.ID())
+               }
+       }
+       return peers
+}
+
+func (ps *PeerSet) PeersWithoutNewStatus(height uint64) []string {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []string
+       for _, peer := range ps.peers {
+               if peer.knownStatus < height {
+                       peers = append(peers, peer.ID())
+               }
+       }
+       return peers
+}
+
+func (ps *PeerSet) PeersWithoutTx(hash bc.Hash) []string {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []string
+       for _, peer := range ps.peers {
+               if !peer.knownTxs.Has(hash.String()) {
+                       peers = append(peers, peer.ID())
+               }
+       }
+       return peers
+}
+
+func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
+       peer := ps.GetPeer(peerID)
+       if peer == nil {
+               return false
+       }
+
+       ok := peer.TrySend(msgChannel, msg)
+       if !ok {
+               ps.RemovePeer(peerID)
+       }
+       return ok
+}
index 8a6c610..66784aa 100644 (file)
@@ -1,8 +1,6 @@
 package netsync
 
 import (
-       "time"
-
        log "github.com/sirupsen/logrus"
 
        "github.com/vapor/errors"
@@ -10,29 +8,21 @@ import (
        "github.com/vapor/p2p/connection"
 )
 
-const (
-       handshakeTimeout    = 10 * time.Second
-       handshakeCheckPerid = 500 * time.Millisecond
-)
-
 var (
-       errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
-       errStatusRequest            = errors.New("Status request error")
+       errSendStatusMsg = errors.New("Status msg send error")
 )
 
 //ProtocolReactor handles new coming protocol message.
 type ProtocolReactor struct {
        p2p.BaseReactor
 
-       sm    *SyncManager
-       peers *peerSet
+       sm *SyncManager
 }
 
 // NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
+func NewProtocolReactor(sm *SyncManager) *ProtocolReactor {
        pr := &ProtocolReactor{
-               sm:    sm,
-               peers: peers,
+               sm: sm,
        }
        pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
        return pr
@@ -63,8 +53,8 @@ func (pr *ProtocolReactor) OnStop() {
 // AddPeer implements Reactor by sending our state to peer.
 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
        pr.sm.AddPeer(peer)
-       if err := pr.sm.SendStatus(peer); err != nil {
-               return err
+       if ok := pr.sm.SendStatus(peer.ID()); !ok {
+               return errSendStatusMsg
        }
        pr.sm.syncTransactions(peer.Key)
        return nil
@@ -72,7 +62,7 @@ func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
 
 // RemovePeer implements Reactor by removing peer from the pool.
 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
-       pr.peers.removePeer(peer.Key)
+       pr.sm.RemovePeer(peer)
 }
 
 // Receive implements Reactor by handling 4 types of messages (look below).
@@ -83,5 +73,5 @@ func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
                return
        }
 
-       pr.sm.processMsg(src, msgType, msg)
+       pr.sm.processMsg(src.ID(), msgType, msg)
 }
index e817930..4a02580 100644 (file)
@@ -9,6 +9,7 @@ import (
        "github.com/tendermint/tmlibs/flowrate"
 
        "github.com/vapor/consensus"
+       "github.com/vapor/netsync/peers"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
        "github.com/vapor/test/mock"
@@ -65,8 +66,8 @@ func (p *P2PPeer) TrySend(b byte, msg interface{}) bool {
        if p.async {
                p.msgCh <- msgBytes
        } else {
-               msgType, msg, _ := DecodeMessage(msgBytes)
-               p.remoteNode.processMsg(p.srcPeer, msgType, msg)
+               msgType, msg, _ := message.DecodeMessage(msgBytes)
+               p.remoteNode.processMsg(p.srcPeer.ID(), msgType, msg)
        }
        return true
 }
@@ -77,8 +78,8 @@ func (p *P2PPeer) setAsync(b bool) {
 
 func (p *P2PPeer) postMan() {
        for msgBytes := range p.msgCh {
-               msgType, msg, _ := DecodeMessage(msgBytes)
-               p.remoteNode.processMsg(p.srcPeer, msgType, msg)
+               msgType, msg, _ := message.DecodeMessage(msgBytes)
+               p.remoteNode.processMsg(p.srcPeer.ID(), msgType, msg)
        }
 }
 
@@ -151,7 +152,7 @@ func mockBlocks(startBlock *types.Block, height uint64) []*types.Block {
 
 func mockSync(blocks []*types.Block) *SyncManager {
        chain := mock.NewChain()
-       peers := newPeerSet(NewPeerSet())
+       peers := peers.NewPeerSet(NewPeerSet())
        chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
        for _, block := range blocks {
                chain.SetBlockByHeight(block.Height, block)
index 5b95ba9..ce6bbcb 100644 (file)
@@ -6,6 +6,7 @@ import (
 
        log "github.com/sirupsen/logrus"
 
+       "github.com/vapor/errors"
        core "github.com/vapor/protocol"
        "github.com/vapor/protocol/bc/types"
 )
@@ -22,15 +23,21 @@ type txSyncMsg struct {
 }
 
 func (sm *SyncManager) syncTransactions(peerID string) {
-       pending := sm.txPool.GetTransactions()
-       if len(pending) == 0 {
+       txDescs := sm.txPool.GetTransactions()
+       if len(txDescs) == 0 {
                return
        }
-
-       txs := make([]*types.Tx, len(pending))
-       for i, batch := range pending {
+       //*TxDesc slice -> *types.Tx slice
+       txs := make([]*types.Tx, len(txDescs))
+       for i, batch := range txDescs {
                txs[i] = batch.Tx
        }
+       //get valid txs
+       validTxs := sm.peers.FilterValidTxs(peerID, txs)
+       if len(validTxs) == 0 {
+               return
+       }
+
        sm.txSyncCh <- &txSyncMsg{peerID, txs}
 }
 
@@ -50,7 +57,8 @@ func (sm *SyncManager) txBroadcastLoop() {
                        }
 
                        if ev.TxMsg.MsgType == core.MsgNewTx {
-                               if err := sm.peers.broadcastTx(ev.TxMsg.Tx); err != nil {
+                               txMsg, _ := newTxBroadcastMsg(ev.TxMsg.Tx, BlockchainChannel)
+                               if err := sm.peers.BroadcastMsg(txMsg); err != nil {
                                        log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
                                        continue
                                }
@@ -72,7 +80,7 @@ func (sm *SyncManager) txSyncLoop() {
 
        // send starts a sending a pack of transactions from the sync.
        send := func(msg *txSyncMsg) {
-               peer := sm.peers.getPeer(msg.peerID)
+               peer := sm.peers.GetPeer(msg.peerID)
                if peer == nil {
                        delete(pending, msg.peerID)
                        return
@@ -100,9 +108,16 @@ func (sm *SyncManager) txSyncLoop() {
                }).Debug("txSyncLoop sending transactions")
                sending = true
                go func() {
-                       err := peer.sendTransactions(sendTxs)
+                       txsMsg, err := NewTransactionsMessage(sendTxs)
                        if err != nil {
-                               sm.peers.removePeer(msg.peerID)
+                               log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("failed create transactions msg")
+                               done <- err
+                               return
+                       }
+
+                       if ok := sm.peers.SendMsg(msg.peerID, BlockchainChannel, txsMsg); !ok {
+                               sm.peers.RemovePeer(msg.peerID)
+                               err = errors.New("send txsMsg err")
                        }
                        done <- err
                }()
diff --git a/test.go b/test.go
new file mode 100644 (file)
index 0000000..0288019
--- /dev/null
+++ b/test.go
@@ -0,0 +1 @@
+package vapor