"github.com/vapor/net/http/httpjson"
"github.com/vapor/net/http/static"
"github.com/vapor/net/websocket"
- "github.com/vapor/netsync"
+ "github.com/vapor/netsync/peers"
"github.com/vapor/p2p"
"github.com/vapor/protocol"
"github.com/vapor/wallet"
IsCaughtUp() bool
PeerCount() int
GetNetwork() string
- BestPeer() *netsync.PeerInfo
+ BestPeer() *peers.PeerInfo
DialPeerWithAddress(addr *p2p.NetAddress) error
- GetPeerInfos() []*netsync.PeerInfo
+ GetPeerInfos() []*peers.PeerInfo
StopPeer(peerID string) error
}
"net"
"github.com/vapor/errors"
- "github.com/vapor/netsync"
+ "github.com/vapor/netsync/peers"
"github.com/vapor/p2p"
"github.com/vapor/version"
)
}
// return the currently connected peers with net address
-func (a *API) getPeerInfoByAddr(addr string) *netsync.PeerInfo {
+func (a *API) getPeerInfoByAddr(addr string) *peers.PeerInfo {
peerInfos := a.sync.GetPeerInfos()
for _, peerInfo := range peerInfos {
if peerInfo.RemoteAddr == addr {
}
// connect peer b y net address
-func (a *API) connectPeerByIpAndPort(ip string, port uint16) (*netsync.PeerInfo, error) {
+func (a *API) connectPeerByIpAndPort(ip string, port uint16) (*peers.PeerInfo, error) {
netIp := net.ParseIP(ip)
if netIp == nil {
return nil, errors.New("invalid ip address")
--- /dev/null
+package bbft
+
+import (
+ "net"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/consensus"
+)
+
+type consensusManger struct {
+ peers *peerSet
+}
+
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+ Addr() net.Addr
+ ID() string
+ ServiceFlag() consensus.ServiceFlag
+ TrySend(byte, interface{}) bool
+ IsLAN() bool
+}
+
+func (cm *consensusManger) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
+ peer := cm.peers.getPeer(basePeer.ID())
+ if peer == nil {
+ return
+ }
+
+ log.WithFields(log.Fields{
+ "module": logModule,
+ "peer": basePeer.Addr(),
+ "type": reflect.TypeOf(msg),
+ "message": msg.String(),
+ }).Info("receive message from peer")
+
+ switch msg := msg.(type) {
+ case *BlockProposeMessage:
+ cm.handleBlockProposeMsg(peer, msg)
+
+ case *BlockSigMessage:
+ cm.handleBlockSigMsg(peer, msg)
+
+ default:
+ log.WithFields(log.Fields{
+ "module": logModule,
+ "peer": basePeer.Addr(),
+ "message_type": reflect.TypeOf(msg),
+ }).Error("unhandled message type")
+ }
+}
+
+func (cm *consensusManger) handleBlockProposeMsg(peer *peer, msg *BlockProposeMessage) {
+}
+
+func (cm *consensusManger) handleBlockSigMsg(peer *peer, msg *BlockSigMessage) {
+}
--- /dev/null
+package bbft
+
+import (
+ "bytes"
+ "errors"
+
+ "github.com/tendermint/go-wire"
+)
+
+//Consensus msg byte
+const (
+ ConsensusChannel = byte(0x50)
+
+ BlockSigByte = byte(0x10)
+ BlockProposeByte = byte(0x11)
+
+ maxBlockchainResponseSize = 22020096 + 2
+)
+
+//BlockchainMessage is a generic message for this reactor.
+type ConsensusMessage interface {
+ String() string
+}
+
+var _ = wire.RegisterInterface(
+ struct{ ConsensusMessage }{},
+ wire.ConcreteType{&BlockSigMessage{}, BlockSigByte},
+ wire.ConcreteType{&BlockProposeMessage{}, BlockProposeByte},
+)
+
+//DecodeMessage decode msg
+func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
+ msgType = bz[0]
+ n := int(0)
+ r := bytes.NewReader(bz)
+ msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ ConsensusMessage }).ConsensusMessage
+ if err != nil && n != len(bz) {
+ err = errors.New("DecodeMessage() had bytes left over")
+ }
+ return
+}
+
+type BlockSigMessage struct {
+}
+
+func (bs *BlockSigMessage) String() string {
+ return ""
+}
+
+type BlockProposeMessage struct {
+}
+
+func (bp *BlockProposeMessage) String() string {
+ return ""
+}
--- /dev/null
+package bbft
+
+import (
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/p2p"
+ "github.com/vapor/p2p/connection"
+)
+
+const logModule = "bbft"
+
+type ConsensusReactor struct {
+ p2p.BaseReactor
+ c *consensus
+ peers *peerSet
+}
+
+func NewConsensusReactor(peers *peerSet) *ConsensusReactor {
+ cr := &ConsensusReactor{
+ peers: peers,
+ }
+ cr.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", cr)
+ return cr
+}
+
+// GetChannels implements Reactor
+func (cr *ConsensusReactor) GetChannels() []*connection.ChannelDescriptor {
+ return []*connection.ChannelDescriptor{
+ {
+ ID: ConsensusChannel,
+ Priority: 10,
+ SendQueueCapacity: 100,
+ },
+ }
+}
+
+// OnStart implements BaseService
+func (cr *ConsensusReactor) OnStart() error {
+ cr.BaseReactor.OnStart()
+ return nil
+}
+
+// OnStop implements BaseService
+func (cr *ConsensusReactor) OnStop() {
+ cr.BaseReactor.OnStop()
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (cr *ConsensusReactor) AddPeer(peer *p2p.Peer) error {
+ return nil
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (cr *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+}
+
+// Receive implements Reactor by handling 4 types of messages (look below).
+func (cr *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+ msgType, msg, err := DecodeMessage(msgBytes)
+ if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
+ return
+ }
+
+ cr.c.processMsg(src, msgType, msg)
+}
log "github.com/sirupsen/logrus"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
+ "github.com/vapor/netsync/peers"
"github.com/vapor/protocol/bc"
)
// and scheduling them for retrieval.
type blockFetcher struct {
chain Chain
- peers *peerSet
+ peers *peers.PeerSet
newBlockCh chan *blockMsg
queue *prque.Prque
}
//NewBlockFetcher creates a block fetcher to retrieve blocks of the new mined.
-func newBlockFetcher(chain Chain, peers *peerSet) *blockFetcher {
+func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
f := &blockFetcher{
chain: chain,
peers: peers,
func (f *blockFetcher) insert(msg *blockMsg) {
isOrphan, err := f.chain.ProcessBlock(msg.block)
if err != nil {
- peer := f.peers.getPeer(msg.peerID)
- if peer == nil {
- return
- }
-
- f.peers.addBanScore(msg.peerID, 20, 0, err.Error())
+ f.peers.AddBanScore(msg.peerID, 20, 0, err.Error())
return
}
return
}
- if err := f.peers.broadcastMinedBlock(msg.block); err != nil {
+ minedMsg, _ := newMinedBlockBroadcastMsg(msg.block, BlockchainChannel)
+ if err := f.peers.BroadcastMsg(minedMsg); err != nil {
+ //if err := f.peers.BroadcastMinedBlock(msg.block); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on broadcast new block")
return
}
-
- if err := f.peers.broadcastNewStatus(msg.block); err != nil {
+ headMsg,_:=newStatusBroadcastMsg(&msg.block.BlockHeader,BlockchainChannel)
+ if err := f.peers.BroadcastMsg(headMsg); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("blockFetcher fail on broadcast new status")
return
}
"github.com/vapor/consensus"
"github.com/vapor/errors"
+ "github.com/vapor/netsync/peers"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
)
errAppendHeaders = errors.New("fail to append list due to order dismatch")
errRequestTimeout = errors.New("request timeout")
errPeerDropped = errors.New("Peer dropped")
- errPeerMisbehave = errors.New("peer is misbehave")
)
type blockMsg struct {
type blockKeeper struct {
chain Chain
- peers *peerSet
+ peers *peers.PeerSet
- syncPeer *peer
+ syncPeerID string
blockProcessCh chan *blockMsg
blocksProcessCh chan *blocksMsg
headersProcessCh chan *headersMsg
headerList *list.List
}
-func newBlockKeeper(chain Chain, peers *peerSet) *blockKeeper {
+func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
bk := &blockKeeper{
chain: chain,
peers: peers,
lastHeader := bk.headerList.Back().Value.(*types.BlockHeader)
for ; lastHeader.Hash() != checkPoint.Hash; lastHeader = bk.headerList.Back().Value.(*types.BlockHeader) {
if lastHeader.Height >= checkPoint.Height {
- return errors.Wrap(errPeerMisbehave, "peer is not in the checkpoint branch")
+ return errors.Wrap(peers.ErrPeerMisbehave, "peer is not in the checkpoint branch")
}
lastHash := lastHeader.Hash()
}
if len(headers) == 0 {
- return errors.Wrap(errPeerMisbehave, "requireHeaders return empty list")
+ return errors.Wrap(peers.ErrPeerMisbehave, "requireHeaders return empty list")
}
if err := bk.appendHeaderList(headers); err != nil {
}
if len(blocks) == 0 {
- return errors.Wrap(errPeerMisbehave, "requireBlocks return empty list")
+ return errors.Wrap(peers.ErrPeerMisbehave, "requireBlocks return empty list")
}
for _, block := range blocks {
}
func (bk *blockKeeper) requireBlock(height uint64) (*types.Block, error) {
- if ok := bk.syncPeer.getBlockByHeight(height); !ok {
+ //send get block msg
+ msg := struct{ BlockchainMessage }{NewGetBlockMessage(height, [32]byte{})}
+ if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
return nil, errPeerDropped
}
for {
select {
case msg := <-bk.blockProcessCh:
- if msg.peerID != bk.syncPeer.ID() {
+ if msg.peerID != bk.syncPeerID {
continue
}
if msg.block.Height != height {
}
func (bk *blockKeeper) requireBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
- if ok := bk.syncPeer.getBlocks(locator, stopHash); !ok {
+ //send get blocks msg
+ msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
+ if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
return nil, errPeerDropped
}
for {
select {
case msg := <-bk.blocksProcessCh:
- if msg.peerID != bk.syncPeer.ID() {
+ if msg.peerID != bk.syncPeerID {
continue
}
return msg.blocks, nil
}
func (bk *blockKeeper) requireHeaders(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.BlockHeader, error) {
- if ok := bk.syncPeer.getHeaders(locator, stopHash); !ok {
+ //send get headers msg
+ msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+ if ok := bk.peers.SendMsg(bk.syncPeerID, BlockchainChannel, msg); !ok {
return nil, errPeerDropped
}
for {
select {
case msg := <-bk.headersProcessCh:
- if msg.peerID != bk.syncPeer.ID() {
+ if msg.peerID != bk.syncPeerID {
continue
}
return msg.headers, nil
func (bk *blockKeeper) startSync() bool {
checkPoint := bk.nextCheckpoint()
- peer := bk.peers.bestPeer(consensus.SFFastSync | consensus.SFFullNode)
- if peer != nil && checkPoint != nil && peer.Height() >= checkPoint.Height {
- bk.syncPeer = peer
+ bestPeerID, bestPeerHeight := bk.peers.BestPeerInfo(consensus.SFFastSync | consensus.SFFullNode)
+ if bestPeerID != "" && checkPoint != nil && bestPeerHeight >= checkPoint.Height {
+ bk.syncPeerID = bestPeerID
if err := bk.fastBlockSync(checkPoint); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on fastBlockSync")
- bk.peers.errorHandler(peer.ID(), err)
+ bk.peers.ErrorHandler(bestPeerID, err)
return false
}
return true
}
blockHeight := bk.chain.BestBlockHeight()
- peer = bk.peers.bestPeer(consensus.SFFullNode)
- if peer != nil && peer.Height() > blockHeight {
- bk.syncPeer = peer
+ bestPeerID, bestPeerHeight = bk.peers.BestPeerInfo(consensus.SFFullNode)
+ if bestPeerID != "" && bestPeerHeight > blockHeight {
+ bk.syncPeerID = bestPeerID
targetHeight := blockHeight + maxBlockPerMsg
- if targetHeight > peer.Height() {
- targetHeight = peer.Height()
+ if targetHeight > bestPeerHeight {
+ targetHeight = bestPeerHeight
}
if err := bk.regularBlockSync(targetHeight); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
- bk.peers.errorHandler(peer.ID(), err)
+ bk.peers.ErrorHandler(bestPeerID, err)
return false
}
return true
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker get best block")
}
+ headMsg, _ := newStatusBroadcastMsg(&block.BlockHeader, BlockchainChannel)
- if err = bk.peers.broadcastNewStatus(block); err != nil {
+ if err = bk.peers.BroadcastMsg(headMsg); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
}
}
cfg "github.com/vapor/config"
"github.com/vapor/consensus"
"github.com/vapor/event"
+ "github.com/vapor/netsync/peers"
"github.com/vapor/p2p"
core "github.com/vapor/protocol"
"github.com/vapor/protocol/bc"
)
const (
- logModule = "netsync"
- maxTxChanSize = 10000
- maxFilterAddressSize = 50
- maxFilterAddressCount = 1000
+ logModule = "netsync"
+ txsMsgMaxTxNum = 1024
)
var (
txPool *core.TxPool
blockFetcher *blockFetcher
blockKeeper *blockKeeper
- peers *peerSet
+ peers *peers.PeerSet
txSyncCh chan *txSyncMsg
quitSync chan struct{}
txMsgSub *event.Subscription
}
-// CreateSyncManager create sync manager and set switch.
+// NewSyncManager create sync manager and set switch.
func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
sw, err := p2p.NewSwitch(config)
if err != nil {
return newSyncManager(config, sw, chain, txPool, dispatcher)
}
-//NewSyncManager create a sync manager
+//newSyncManager create a sync manager
func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
genesisHeader, err := chain.GetHeaderByHeight(0)
if err != nil {
return nil, err
}
- peers := newPeerSet(sw)
+ peers := peers.NewPeerSet(sw)
manager := &SyncManager{
sw: sw,
genesisHash: genesisHeader.Hash(),
}
if !config.VaultMode {
- protocolReactor := NewProtocolReactor(manager, peers)
+ protocolReactor := NewProtocolReactor(manager)
manager.sw.AddReactor("PROTOCOL", protocolReactor)
}
return manager, nil
}
-func (sm *SyncManager) AddPeer(peer BasePeer) {
- sm.peers.addPeer(peer)
+//AddPeer add peer to SyncManager PeerSet
+func (sm *SyncManager) AddPeer(peer peers.BasePeer) {
+ sm.peers.AddPeer(peer)
}
//BestPeer return the highest p2p peerInfo
-func (sm *SyncManager) BestPeer() *PeerInfo {
- bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
- if bestPeer != nil {
- return bestPeer.getPeerInfo()
+func (sm *SyncManager) BestPeer() *peers.PeerInfo {
+ peerID, bestHeight := sm.peers.BestPeerInfo(consensus.SFFullNode)
+ if peerID == "" || bestHeight == 0 {
+ return nil
}
- return nil
+
+ return sm.peers.GetPeerInfo(peerID)
}
+//DialPeerWithAddress
func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
if sm.config.VaultMode {
return errVaultModeDialPeer
return sm.sw.DialPeerWithAddress(addr)
}
+//GetNetwork return chain id
func (sm *SyncManager) GetNetwork() string {
return sm.config.ChainID
}
//GetPeerInfos return peer info of all peers
-func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
- return sm.peers.getPeerInfos()
+func (sm *SyncManager) GetPeerInfos() []*peers.PeerInfo {
+ return sm.peers.GetPeerInfos()
}
//IsCaughtUp check wheather the peer finish the sync
func (sm *SyncManager) IsCaughtUp() bool {
- peer := sm.peers.bestPeer(consensus.SFFullNode)
- return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+ bestPeerID, bestPeerHeight := sm.peers.BestPeerInfo(consensus.SFFullNode)
+ return bestPeerID == "" || bestPeerHeight <= sm.chain.BestBlockHeight()
+}
+
+//RemovePeer del peer from SyncManager PeerSet then disconnect with peer
+func (sm *SyncManager) RemovePeer(peer peers.BasePeer) {
+ sm.peers.RemovePeer(peer.ID())
}
//StopPeer try to stop peer by given ID
func (sm *SyncManager) StopPeer(peerID string) error {
- if peer := sm.peers.getPeer(peerID); peer == nil {
- return errors.New("peerId not exist")
- }
- sm.peers.removePeer(peerID)
+ sm.peers.RemovePeer(peerID)
return nil
}
-func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+func (sm *SyncManager) handleBlockMsg(peerID string, msg *BlockMessage) {
block, err := msg.GetBlock()
if err != nil {
return
}
- sm.blockKeeper.processBlock(peer.ID(), block)
+ sm.blockKeeper.processBlock(peerID, block)
}
-func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+func (sm *SyncManager) handleBlocksMsg(peerID string, msg *BlocksMessage) {
blocks, err := msg.GetBlocks()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
return
}
- sm.blockKeeper.processBlocks(peer.ID(), blocks)
+ sm.blockKeeper.processBlocks(peerID, blocks)
}
-func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
- peer.addFilterAddress(msg.Address)
+func (sm *SyncManager) handleFilterAddMsg(peerID string, msg *FilterAddMessage) {
+ var addresses [][]byte
+ addresses = append(addresses, msg.Address)
+ sm.peers.AddFilterAddresses(peerID, addresses)
+
}
-func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
- peer.filterAdds.Clear()
+func (sm *SyncManager) handleFilterClearMsg(peerID string) {
+ sm.peers.ClearFilterAdds(peerID)
}
-func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
- peer.addFilterAddresses(msg.Addresses)
+func (sm *SyncManager) handleFilterLoadMsg(peerID string, msg *FilterLoadMessage) {
+ sm.peers.AddFilterAddresses(peerID, msg.Addresses)
}
-func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+func (sm *SyncManager) handleGetBlockMsg(peerID string, msg *GetBlockMessage) {
var block *types.Block
var err error
if msg.Height != 0 {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
return
}
-
- ok, err := peer.sendBlock(block)
- if !ok {
- sm.peers.removePeer(peer.ID())
- }
+ blockMsg, err := NewBlockMessage(block)
if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on create block message")
+ return
+ }
+ ok := sm.peers.SendMsg(peerID, BlockchainChannel, blockMsg)
+ if !ok {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
}
+ //todo:mark block
}
-func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+func (sm *SyncManager) handleGetBlocksMsg(peerID string, msg *GetBlocksMessage) {
blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
if err != nil || len(blocks) == 0 {
return
for _, block := range blocks {
rawData, err := block.MarshalText()
if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("failed on handleGetBlocksMsg marshal block")
continue
}
- if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
+ if totalSize+len(rawData) > MaxBlockchainResponseSize/2 {
break
}
totalSize += len(rawData)
sendBlocks = append(sendBlocks, block)
}
-
- ok, err := peer.sendBlocks(sendBlocks)
- if !ok {
- sm.peers.removePeer(peer.ID())
- }
+ blocksMsg, err := NewBlocksMessage(blocks)
if err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warn("failed on create blocks msg")
+ return
+ }
+ ok := sm.peers.SendMsg(peerID, BlockchainChannel, blocksMsg)
+ if !ok {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
}
+ //todo: mark block
}
-func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+func (sm *SyncManager) handleGetHeadersMsg(peerID string, msg *GetHeadersMessage) {
headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
if err != nil || len(headers) == 0 {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("failed on handleGetHeadersMsg locateHeaders")
return
}
- ok, err := peer.sendHeaders(headers)
- if !ok {
- sm.peers.removePeer(peer.ID())
- }
+ headersMsg, err := NewHeadersMessage(headers)
if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warn("fail on create headers msg")
+ return
+ }
+
+ ok := sm.peers.SendMsg(peerID, BlockchainChannel, headersMsg)
+
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("fail on handleGetHeadersMsg sentBlock")
}
}
-func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
+func (sm *SyncManager) handleGetMerkleBlockMsg(peerID string, msg *GetMerkleBlockMessage) {
var err error
var block *types.Block
if msg.Height != 0 {
return
}
- ok, err := peer.sendMerkleBlock(block, txStatus)
- if err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
+ merkleBlockMsg := NewMerkleBlockMessage()
+ if err := merkleBlockMsg.SetRawBlockHeader(block.BlockHeader); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg set block header")
return
}
+ relatedTxs, relatedStatuses := sm.peers.GetRelatedTxAndStatus(peerID, block.Transactions, txStatus)
+
+ txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
+ if err := merkleBlockMsg.SetTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg set tx info")
+ return
+ }
+
+ statusHashes := types.GetStatusMerkleTreeProof(txStatus.VerifyStatus, txFlags)
+ if err := merkleBlockMsg.SetStatusInfo(statusHashes, relatedStatuses); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg set status info")
+ return
+ }
+ ok := sm.peers.SendMsg(peerID, BlockchainChannel, merkleBlockMsg)
if !ok {
- sm.peers.removePeer(peer.ID())
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
}
}
-func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+func (sm *SyncManager) handleHeadersMsg(peerID string, msg *HeadersMessage) {
headers, err := msg.GetHeaders()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
return
}
- sm.blockKeeper.processHeaders(peer.ID(), headers)
+ sm.blockKeeper.processHeaders(peerID, headers)
}
-func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
+func (sm *SyncManager) handleMineBlockMsg(peerID string, msg *MineBlockMessage) {
block, err := msg.GetMineBlock()
if err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
}
hash := block.Hash()
- peer.markBlock(&hash)
- sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
- peer.setStatus(block.Height, &hash)
+ sm.peers.MarkBlock(peerID, &hash)
+ sm.blockFetcher.processNewBlock(&blockMsg{peerID: peerID, block: block})
+ sm.peers.SetStatus(peerID, block.Height, &hash)
}
-func (sm *SyncManager) handleStatusMsg(basePeer BasePeer, msg *StatusMessage) {
- if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
- peer.setStatus(msg.Height, msg.GetHash())
- return
- }
+func (sm *SyncManager) handleStatusMsg(peerID string, msg *StatusMessage) {
+ sm.peers.SetStatus(peerID, msg.Height, msg.GetHash())
}
-func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+func (sm *SyncManager) handleTransactionMsg(peerID string, msg *TransactionMessage) {
tx, err := msg.GetTransaction()
if err != nil {
- sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+ sm.peers.AddBanScore(peerID, 0, 10, "fail on get tx from message")
return
}
if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
- sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ sm.peers.AddBanScore(peerID, 10, 0, "fail on validate tx transaction")
}
- sm.peers.markTx(peer.ID(), tx.ID)
}
-func (sm *SyncManager) handleTransactionsMsg(peer *peer, msg *TransactionsMessage) {
+func (sm *SyncManager) handleTransactionsMsg(peerID string, msg *TransactionsMessage) {
txs, err := msg.GetTransactions()
if err != nil {
- sm.peers.addBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+ sm.peers.AddBanScore(peerID, 0, 20, "fail on get txs from message")
return
}
if len(txs) > txsMsgMaxTxNum {
- sm.peers.addBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+ sm.peers.AddBanScore(peerID, 20, 0, "exceeded the maximum tx number limit")
return
}
for _, tx := range txs {
if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && !isOrphan {
- sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ sm.peers.AddBanScore(peerID, 10, 0, "fail on validate tx transaction")
return
}
- sm.peers.markTx(peer.ID(), tx.ID)
+ sm.peers.MarkTx(peerID, &tx.ID)
}
}
+//IsListening
func (sm *SyncManager) IsListening() bool {
if sm.config.VaultMode {
return false
return len(sm.sw.Peers().List())
}
-func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
- peer := sm.peers.getPeer(basePeer.ID())
- if peer == nil {
+func (sm *SyncManager) processMsg(peerID string, msgType byte, msg BlockchainMessage) {
+ if sm.peers.GetPeer(peerID) == nil {
return
}
log.WithFields(log.Fields{
"module": logModule,
- "peer": basePeer.Addr(),
+ "peer": peerID,
"type": reflect.TypeOf(msg),
"message": msg.String(),
}).Info("receive message from peer")
switch msg := msg.(type) {
case *GetBlockMessage:
- sm.handleGetBlockMsg(peer, msg)
+ sm.handleGetBlockMsg(peerID, msg)
case *BlockMessage:
- sm.handleBlockMsg(peer, msg)
+ sm.handleBlockMsg(peerID, msg)
case *StatusMessage:
- sm.handleStatusMsg(basePeer, msg)
+ sm.handleStatusMsg(peerID, msg)
case *TransactionMessage:
- sm.handleTransactionMsg(peer, msg)
+ sm.handleTransactionMsg(peerID, msg)
case *TransactionsMessage:
- sm.handleTransactionsMsg(peer, msg)
+ sm.handleTransactionsMsg(peerID, msg)
case *MineBlockMessage:
- sm.handleMineBlockMsg(peer, msg)
+ sm.handleMineBlockMsg(peerID, msg)
case *GetHeadersMessage:
- sm.handleGetHeadersMsg(peer, msg)
+ sm.handleGetHeadersMsg(peerID, msg)
case *HeadersMessage:
- sm.handleHeadersMsg(peer, msg)
+ sm.handleHeadersMsg(peerID, msg)
case *GetBlocksMessage:
- sm.handleGetBlocksMsg(peer, msg)
+ sm.handleGetBlocksMsg(peerID, msg)
case *BlocksMessage:
- sm.handleBlocksMsg(peer, msg)
+ sm.handleBlocksMsg(peerID, msg)
case *FilterLoadMessage:
- sm.handleFilterLoadMsg(peer, msg)
+ sm.handleFilterLoadMsg(peerID, msg)
case *FilterAddMessage:
- sm.handleFilterAddMsg(peer, msg)
+ sm.handleFilterAddMsg(peerID, msg)
case *FilterClearMessage:
- sm.handleFilterClearMsg(peer)
+ sm.handleFilterClearMsg(peerID)
case *GetMerkleBlockMessage:
- sm.handleGetMerkleBlockMsg(peer, msg)
+ sm.handleGetMerkleBlockMsg(peerID, msg)
default:
log.WithFields(log.Fields{
"module": logModule,
- "peer": basePeer.Addr(),
+ "peerID": peerID,
"message_type": reflect.TypeOf(msg),
}).Error("unhandled message type")
}
}
-func (sm *SyncManager) SendStatus(peer BasePeer) error {
- p := sm.peers.getPeer(peer.ID())
- if p == nil {
- return errors.New("invalid peer")
- }
-
- if err := p.sendStatus(sm.chain.BestBlockHeader()); err != nil {
- sm.peers.removePeer(p.ID())
- return err
+func (sm *SyncManager) SendStatus(peerID string) bool {
+ msg := NewStatusMessage(sm.chain.BestBlockHeader())
+ ok := sm.peers.SendMsg(peerID, BlockchainChannel, msg)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("fail on send status msg")
}
- return nil
+ //todo: mark status
+ return ok
}
func (sm *SyncManager) Start() error {
log.WithFields(log.Fields{"module": logModule}).Error("event type error")
continue
}
-
- if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
+ minedMsg, _ := newMinedBlockBroadcastMsg(&ev.Block, BlockchainChannel)
+ if err := sm.peers.BroadcastMsg(minedMsg); err != nil {
+ //if err := sm.peers.broadcastMinedBlock(&ev.Block); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")
continue
}
"errors"
"fmt"
- "github.com/tendermint/go-wire"
+ wire "github.com/tendermint/go-wire"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
MerkleRequestByte = byte(0x60)
MerkleResponseByte = byte(0x61)
- maxBlockchainResponseSize = 22020096 + 2
- txsMsgMaxTxNum = 1024
+ MaxBlockchainResponseSize = 22020096 + 2
)
//BlockchainMessage is a generic message for this reactor.
msgType = bz[0]
n := int(0)
r := bytes.NewReader(bz)
- msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
+ msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, MaxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
if err != nil && n != len(bz) {
err = errors.New("DecodeMessage() had bytes left over")
}
RawHash [32]byte
}
+func NewGetBlockMessage(height uint64,hash [32]byte) *GetBlockMessage {
+ return &GetBlockMessage{Height: height,RawHash:hash}
+}
+
//GetHash reutrn the hash of the request
func (m *GetBlockMessage) GetHash() *bc.Hash {
hash := bc.NewHash(m.RawHash)
Flags []byte
}
-func (m *MerkleBlockMessage) setRawBlockHeader(bh types.BlockHeader) error {
+func (m *MerkleBlockMessage) SetRawBlockHeader(bh types.BlockHeader) error {
rawHeader, err := bh.MarshalText()
if err != nil {
return err
return nil
}
-func (m *MerkleBlockMessage) setTxInfo(txHashes []*bc.Hash, txFlags []uint8, relatedTxs []*types.Tx) error {
+func (m *MerkleBlockMessage) SetTxInfo(txHashes []*bc.Hash, txFlags []uint8, relatedTxs []*types.Tx) error {
for _, txHash := range txHashes {
m.TxHashes = append(m.TxHashes, txHash.Byte32())
}
return nil
}
-func (m *MerkleBlockMessage) setStatusInfo(statusHashes []*bc.Hash, relatedStatuses []*bc.TxVerifyResult) error {
+func (m *MerkleBlockMessage) SetStatusInfo(statusHashes []*bc.Hash, relatedStatuses []*bc.TxVerifyResult) error {
for _, statusHash := range statusHashes {
m.StatusHashes = append(m.StatusHashes, statusHash.Byte32())
}
--- /dev/null
+package netsync
+
+import (
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/protocol/bc/types"
+)
+
+//type BroadcastProcess interface {
+// filter(ps *PeerSet) []*Peer
+// mark(*Peer)
+// getChan() byte
+// getMsg() interface{}
+// msgString() string
+//}
+
+type minedBlockBroadcastMsg struct {
+ block *types.Block
+ msg *MineBlockMessage
+ transChan byte
+}
+
+func newMinedBlockBroadcastMsg(block *types.Block, transChan byte) (*minedBlockBroadcastMsg, error) {
+ msg, err := NewMinedBlockMessage(block)
+ if err != nil {
+ return nil, err
+ }
+ return &minedBlockBroadcastMsg{block: block, msg: msg, transChan: transChan}, nil
+}
+
+func (m *minedBlockBroadcastMsg) GetChan() byte {
+ return m.transChan
+}
+
+func (m *minedBlockBroadcastMsg) GetMsg() interface{} {
+ return m.msg
+}
+
+func (m *minedBlockBroadcastMsg) MsgString() string {
+ return m.msg.String()
+}
+
+func (m *minedBlockBroadcastMsg) Mark(ps *peers.PeerSet, peers []string) {
+ hash := m.block.Hash()
+ height := m.block.Height
+ for _, peer := range peers {
+ ps.MarkBlock(peer, &hash)
+ ps.MarkStatus(peer, height)
+ }
+}
+
+func (m *minedBlockBroadcastMsg) Filter(ps *peers.PeerSet) []string {
+ //TODO: SPV NODE FILTER
+ return ps.PeersWithoutBlock(m.block.Hash())
+}
+
+type statusBroadcastMsg struct {
+ header *types.BlockHeader
+ msg *StatusMessage
+ transChan byte
+}
+
+func newStatusBroadcastMsg(header *types.BlockHeader, transChan byte) (*statusBroadcastMsg, error) {
+ msg := NewStatusMessage(header)
+ return &statusBroadcastMsg{header: header, msg: msg, transChan: transChan}, nil
+}
+
+func (s *statusBroadcastMsg) GetChan() byte {
+ return s.transChan
+}
+
+func (s *statusBroadcastMsg) GetMsg() interface{} {
+ return s.msg
+}
+
+func (s *statusBroadcastMsg) MsgString() string {
+ return s.msg.String()
+}
+
+func (s *statusBroadcastMsg) Filter(ps *peers.PeerSet) []string {
+ return ps.PeersWithoutNewStatus(s.header.Height)
+}
+
+func (s *statusBroadcastMsg) Mark(ps *peers.PeerSet, peers []string) {
+ height := s.header.Height
+ for _, peer := range peers {
+ ps.MarkStatus(peer, height)
+ }
+}
+
+type txBroadcastMsg struct {
+ tx *types.Tx
+ msg *TransactionMessage
+ transChan byte
+}
+
+func newTxBroadcastMsg(tx *types.Tx, transChan byte) (*txBroadcastMsg, error) {
+ msg, _ := NewTransactionMessage(tx)
+ return &txBroadcastMsg{tx: tx, msg: msg, transChan: transChan}, nil
+}
+
+func (t *txBroadcastMsg) GetChan() byte {
+ return t.transChan
+}
+
+func (t *txBroadcastMsg) GetMsg() interface{} {
+ return t.msg
+}
+
+func (t *txBroadcastMsg) MsgString() string {
+ return t.msg.String()
+}
+
+func (t *txBroadcastMsg) Filter(ps *peers.PeerSet) []string {
+ //TODO: if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+ return ps.PeersWithoutTx(t.tx.ID)
+}
+
+func (t *txBroadcastMsg) Mark(ps *peers.PeerSet, peers []string) {
+ for _, peer := range peers {
+ ps.MarkTx(peer, &t.tx.ID)
+ }
+}
"testing"
"github.com/davecgh/go-spew/spew"
-
"github.com/vapor/consensus"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
}
func TestTransactionMessage(t *testing.T) {
- for _, tx := range txs {
+ wantStrings := [5]string{
+ "{tx_size: 104, tx_hash: 8ec07dcaa0d8966ebf7d4483cff08cf2f91b7fd054d74b9b36e6701851b6987f}",
+ "{tx_size: 106, tx_hash: b36797fadd33aa38794d4fef61a13214e17cd5441aee38f8cbdfe5a7863ee5ff}",
+ "{tx_size: 108, tx_hash: bf93b35da3c58f26896657f7cab5034536b6ab70b3a32e1de07bff6f94318a94}",
+ "{tx_size: 108, tx_hash: 82ba46e4d13ea302977564de1ea4053e0820f0c1296d404a05ed0adc3e3bac38}",
+ "{tx_size: 108, tx_hash: fb6640b67c734e124e34ce65f52f877f192c36caad0830e312799cc337749bd9}",
+ }
+ for i, tx := range txs {
txMsg, err := NewTransactionMessage(tx)
if err != nil {
t.Fatalf("create tx msg err:%s", err)
if !reflect.DeepEqual(*tx.Tx, *gotTx.Tx) {
t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTx.Tx))
}
+
+ if txMsg.String() != wantStrings[i] {
+ t.Errorf("index:%d txs msg string test err. got:%s want:%s", i, txMsg.String(), wantStrings[i])
+ }
}
}
t.Fatal("txs msg test err: number of txs not match ")
}
+ wantString := "{tx_num: 5}"
+ if txsMsg.String() != wantString {
+ t.Errorf("txs msg string test err. got:%s want:%s", txsMsg.String(), wantString)
+ }
+
for i, tx := range txs {
if !reflect.DeepEqual(tx.Tx, gotTxs[i].Tx) {
t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTxs[i].Tx))
BlockHeader: types.BlockHeader{
Version: 1,
Height: 0,
- Timestamp: 1528945000000,
+ Timestamp: 1528945000,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
}
+ wantString := "{block_height: 0, block_hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+ if blockMsg.String() != wantString {
+ t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+ }
+
blockMsg.RawBlock[1] = blockMsg.RawBlock[1] + 0x1
_, err = blockMsg.GetBlock()
if err == nil {
t.Fatalf("get mine block err")
}
+
+ wantString = "{err: wrong message}"
+ if blockMsg.String() != wantString {
+ t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+ }
}
var testHeaders = []*types.BlockHeader{
{
Version: 1,
Height: 0,
- Timestamp: 1528945000000,
+ Timestamp: 1528945000,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
{
Version: 1,
Height: 1,
- Timestamp: 1528945000000,
+ Timestamp: 1528945000,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
{
Version: 1,
Height: 3,
- Timestamp: 1528945000000,
+ Timestamp: 1528945000,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
if !reflect.DeepEqual(gotHeaders, testHeaders) {
t.Errorf("headers msg test err: got %s\nwant %s", spew.Sdump(gotHeaders), spew.Sdump(testHeaders))
}
+
+ wantString := "{header_length: 3}"
+ if headersMsg.String() != wantString {
+ t.Errorf("headers msg test string err. got:%s want:%s", headersMsg.String(), wantString)
+ }
+
}
func TestGetBlockMessage(t *testing.T) {
- getBlockMsg := GetBlockMessage{RawHash: [32]byte{0x01}}
- gotHash := getBlockMsg.GetHash()
+ testCase := []struct {
+ height uint64
+ rawHash [32]byte
+ wantString string
+ }{
+ {
+ height: uint64(100),
+ rawHash: [32]byte{0x01},
+ wantString: "{height: 100}",
+ },
+ {
+ height: uint64(0),
+ rawHash: [32]byte{0x01},
+ wantString: "{hash: 0100000000000000000000000000000000000000000000000000000000000000}",
+ },
+ }
+ for i, c := range testCase {
+ getBlockMsg := NewGetBlockMessage(c.height, c.rawHash)
+ gotHash := getBlockMsg.GetHash()
+
+ if !reflect.DeepEqual(gotHash.Byte32(), c.rawHash) {
+ t.Errorf("index:%d test get block msg err. got: %s want: %s", i, spew.Sdump(gotHash.Byte32()), spew.Sdump(c.rawHash))
+ }
+
+ if getBlockMsg.Height != c.height {
+ t.Errorf("index:%d test get block msg err. got: %d want: %d", i, getBlockMsg.Height, c.height)
+ }
+ if getBlockMsg.String() != c.wantString {
+ t.Errorf("index:%d test get block msg string err. got: %s want: %s", i, getBlockMsg.String(), c.wantString)
+ }
+
+ }
+}
+
+type testGetBlocksMessage struct {
+ blockLocator []*bc.Hash
+ stopHash *bc.Hash
+}
+
+func TestGetBlocksMessage(t *testing.T) {
+ testMsg := testGetBlocksMessage{
+ blockLocator: []*bc.Hash{{V0: 0x01}, {V0: 0x02}, {V0: 0x03}},
+ stopHash: &bc.Hash{V0: 0xaa, V2: 0x55},
+ }
+
+ getBlocksMsg := NewGetBlocksMessage(testMsg.blockLocator, testMsg.stopHash)
+ gotBlockLocator := getBlocksMsg.GetBlockLocator()
+ gotStopHash := getBlocksMsg.GetStopHash()
+
+ if !reflect.DeepEqual(gotBlockLocator, testMsg.blockLocator) {
+ t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotBlockLocator), spew.Sdump(testMsg.blockLocator))
+ }
+
+ if !reflect.DeepEqual(gotStopHash, testMsg.stopHash) {
+ t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotStopHash), spew.Sdump(testMsg.stopHash))
+ }
- if !reflect.DeepEqual(gotHash.Byte32(), getBlockMsg.RawHash) {
- t.Errorf("get block msg test err: got %s\nwant %s", spew.Sdump(gotHash.Byte32()), spew.Sdump(getBlockMsg.RawHash))
+ wantString := "{stop_hash: 00000000000000aa000000000000000000000000000000550000000000000000}"
+ if getBlocksMsg.String() != wantString {
+ t.Errorf("get headers msg string test err: got:%s want:%s", getBlocksMsg.String(), wantString)
}
}
if !reflect.DeepEqual(testMsg.stopHash, gotStopHash) {
t.Errorf("get headers msg test err: got %s\nwant %s", spew.Sdump(gotStopHash), spew.Sdump(testMsg.stopHash))
}
+
+ wantString := "{stop_hash: 00000000000000aa000000000000000000000000000000550000000000000000}"
+ if getHeadersMsg.String() != wantString {
+ t.Errorf("get headers msg string test err: got:%s want:%s", getHeadersMsg.String(), wantString)
+ }
}
var testBlocks = []*types.Block{
BlockHeader: types.BlockHeader{
Version: 1,
Height: 0,
- Timestamp: 1528945000000,
+ Timestamp: 1528945000,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
BlockHeader: types.BlockHeader{
Version: 1,
Height: 0,
- Timestamp: 1528945000000,
+ Timestamp: 1528945000,
BlockCommitment: types.BlockCommitment{
TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
}
}
+
+ wantString := "{blocks_length: 2}"
+ if blocksMsg.String() != wantString {
+ t.Errorf("block msg string test err: got:%s want:%s", blocksMsg.String(), wantString)
+ }
}
func TestStatusMessage(t *testing.T) {
if !reflect.DeepEqual(*gotHash, testBlock.Hash()) {
t.Errorf("status response msg test err: got %s\nwant %s", spew.Sdump(*gotHash), spew.Sdump(testBlock.Hash()))
}
+
+ wantString := "{height: 0, hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+ if statusResponseMsg.String() != wantString {
+ t.Errorf("status response msg string test err: got:%s want:%s", statusResponseMsg.String(), wantString)
+ }
+}
+
+func TestMinedBlockMessage(t *testing.T) {
+ blockMsg, err := NewMinedBlockMessage(testBlock)
+ if err != nil {
+ t.Fatalf("create new mine block msg err:%s", err)
+ }
+
+ gotBlock, err := blockMsg.GetMineBlock()
+ if err != nil {
+ t.Fatalf("got block err:%s", err)
+ }
+
+ if !reflect.DeepEqual(gotBlock.BlockHeader, testBlock.BlockHeader) {
+ t.Errorf("block msg test err: got %s\nwant %s", spew.Sdump(gotBlock.BlockHeader), spew.Sdump(testBlock.BlockHeader))
+ }
+
+ wantString := "{block_height: 0, block_hash: f59514e2541488a38bc2667940bc2c24027e4a3a371d884b55570d036997bb57}"
+ if blockMsg.String() != wantString {
+ t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+ }
+
+ blockMsg.RawBlock[1] = blockMsg.RawBlock[1] + 0x1
+ _, err = blockMsg.GetMineBlock()
+ if err == nil {
+ t.Fatalf("get mine block err")
+ }
+
+ wantString = "{err: wrong message}"
+ if blockMsg.String() != wantString {
+ t.Errorf("block msg test err. got:%s want:%s", blockMsg.String(), wantString)
+ }
+}
+
+func TestFilterLoadMessage(t *testing.T) {
+ filterLoadMsg := &FilterLoadMessage{
+ Addresses: [][]byte{{0x01}, {0x01, 0x02}},
+ }
+
+ wantString := "{addresses_length: 2}"
+ if filterLoadMsg.String() != wantString {
+ t.Errorf("filter load msg test err. got:%s want:%s", filterLoadMsg.String(), wantString)
+ }
+}
+
+func TestFilterAddMessage(t *testing.T) {
+ filterAddMessage := &FilterAddMessage{
+ Address: []byte{0x01, 0x02, 0x03},
+ }
+
+ wantString := "{address: 010203}"
+ if filterAddMessage.String() != wantString {
+ t.Errorf("filter add msg test err. got:%s want:%s", filterAddMessage.String(), wantString)
+ }
+}
+
+func TestFilterClearMessage(t *testing.T) {
+ filterClearMessage := &FilterClearMessage{}
+
+ wantString := "{}"
+ if filterClearMessage.String() != wantString {
+ t.Errorf("filter clear msg test err. got:%s want:%s", filterClearMessage.String(), wantString)
+ }
+}
+
+func TestGetMerkleBlockMessage(t *testing.T) {
+ testCase := []struct {
+ height uint64
+ rawHash [32]byte
+ wantString string
+ }{
+ {
+ height: uint64(100),
+ rawHash: [32]byte{0x01},
+ wantString: "{height: 100}",
+ },
+ {
+ height: uint64(0),
+ rawHash: [32]byte{0x01},
+ wantString: "{hash: 0100000000000000000000000000000000000000000000000000000000000000}",
+ },
+ }
+ for i, c := range testCase {
+ getMerkleBlockMsg := &GetMerkleBlockMessage{
+ Height: c.height,
+ RawHash: c.rawHash,
+ }
+ gotHash := getMerkleBlockMsg.GetHash()
+
+ if !reflect.DeepEqual(gotHash.Byte32(), c.rawHash) {
+ t.Errorf("index:%d test get merkle block msg err. got: %s want: %s", i, spew.Sdump(gotHash.Byte32()), spew.Sdump(c.rawHash))
+ }
+
+ if getMerkleBlockMsg.Height != c.height {
+ t.Errorf("index:%d test get merkle block msg err. got: %d want: %d", i, getMerkleBlockMsg.Height, c.height)
+ }
+ if getMerkleBlockMsg.String() != c.wantString {
+ t.Errorf("index:%d test get merkle block msg string err. got: %s want: %s", i, getMerkleBlockMsg.String(), c.wantString)
+ }
+ }
+}
+
+func TestMerkleBlockMessage(t *testing.T) {
+ blockHeader := types.BlockHeader{
+ Version: 1,
+ Height: 0,
+ Timestamp: 1528945000,
+ BlockCommitment: types.BlockCommitment{
+ TransactionsMerkleRoot: bc.Hash{V0: uint64(0x11)},
+ TransactionStatusHash: bc.Hash{V0: uint64(0x55)},
+ },
+ }
+ txHashes := []*bc.Hash{{V0: 123, V1: 234, V2: 345, V3: 456}}
+ txFlags := []uint8{0x1, 0x2}
+ relatedTxs := txs
+ statusHashes := []*bc.Hash{{V0: 123, V1: 234, V2: 345, V3: 456}}
+ relatedStatuses := []*bc.TxVerifyResult{{StatusFail: false}, {StatusFail: true}}
+ merkleBlockMsg := NewMerkleBlockMessage()
+ merkleBlockMsg.SetRawBlockHeader(blockHeader)
+ merkleBlockMsg.SetTxInfo(txHashes, txFlags, relatedTxs)
+ merkleBlockMsg.SetStatusInfo(statusHashes, relatedStatuses)
+ if !reflect.DeepEqual(merkleBlockMsg.Flags, txFlags) {
+ t.Errorf("test get merkle block msg err. got: %s want: %s", merkleBlockMsg.Flags, txFlags)
+ }
+ wantString := "{}"
+ if merkleBlockMsg.String() != wantString {
+ t.Errorf("merkle block msg test err. got:%s want:%s", merkleBlockMsg.String(), wantString)
+ }
}
+++ /dev/null
-package netsync
-
-import (
- "encoding/hex"
- "net"
- "reflect"
- "sync"
-
- log "github.com/sirupsen/logrus"
- "github.com/tendermint/tmlibs/flowrate"
- "gopkg.in/fatih/set.v0"
-
- "github.com/vapor/consensus"
- "github.com/vapor/errors"
- "github.com/vapor/p2p/trust"
- "github.com/vapor/protocol/bc"
- "github.com/vapor/protocol/bc/types"
-)
-
-const (
- maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
- maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- defaultBanThreshold = uint32(100)
-)
-
-var errSendStatusMsg = errors.New("send status msg fail")
-
-//BasePeer is the interface for connection level peer
-type BasePeer interface {
- Addr() net.Addr
- ID() string
- ServiceFlag() consensus.ServiceFlag
- TrafficStatus() (*flowrate.Status, *flowrate.Status)
- TrySend(byte, interface{}) bool
- IsLAN() bool
-}
-
-//BasePeerSet is the intergace for connection level peer manager
-type BasePeerSet interface {
- AddBannedPeer(string) error
- StopPeerGracefully(string)
-}
-
-// PeerInfo indicate peer status snap
-type PeerInfo struct {
- ID string `json:"peer_id"`
- RemoteAddr string `json:"remote_addr"`
- Height uint64 `json:"height"`
- Ping string `json:"ping"`
- Duration string `json:"duration"`
- TotalSent int64 `json:"total_sent"`
- TotalReceived int64 `json:"total_received"`
- AverageSentRate int64 `json:"average_sent_rate"`
- AverageReceivedRate int64 `json:"average_received_rate"`
- CurrentSentRate int64 `json:"current_sent_rate"`
- CurrentReceivedRate int64 `json:"current_received_rate"`
-}
-
-type peer struct {
- BasePeer
- mtx sync.RWMutex
- services consensus.ServiceFlag
- height uint64
- hash *bc.Hash
- banScore trust.DynamicBanScore
- knownTxs *set.Set // Set of transaction hashes known to be known by this peer
- knownBlocks *set.Set // Set of block hashes known to be known by this peer
- knownStatus uint64 // Set of chain status known to be known by this peer
- filterAdds *set.Set // Set of addresses that the spv node cares about.
-}
-
-func newPeer(basePeer BasePeer) *peer {
- return &peer{
- BasePeer: basePeer,
- services: basePeer.ServiceFlag(),
- knownTxs: set.New(),
- knownBlocks: set.New(),
- filterAdds: set.New(),
- }
-}
-
-func (p *peer) Height() uint64 {
- p.mtx.RLock()
- defer p.mtx.RUnlock()
- return p.height
-}
-
-func (p *peer) addBanScore(persistent, transient uint32, reason string) bool {
- score := p.banScore.Increase(persistent, transient)
- if score > defaultBanThreshold {
- log.WithFields(log.Fields{
- "module": logModule,
- "address": p.Addr(),
- "score": score,
- "reason": reason,
- }).Errorf("banning and disconnecting")
- return true
- }
-
- warnThreshold := defaultBanThreshold >> 1
- if score > warnThreshold {
- log.WithFields(log.Fields{
- "module": logModule,
- "address": p.Addr(),
- "score": score,
- "reason": reason,
- }).Warning("ban score increasing")
- }
- return false
-}
-
-func (p *peer) addFilterAddress(address []byte) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
-
- if p.filterAdds.Size() >= maxFilterAddressCount {
- log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
- return
- }
- if len(address) > maxFilterAddressSize {
- log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
- return
- }
-
- p.filterAdds.Add(hex.EncodeToString(address))
-}
-
-func (p *peer) addFilterAddresses(addresses [][]byte) {
- if !p.filterAdds.IsEmpty() {
- p.filterAdds.Clear()
- }
- for _, address := range addresses {
- p.addFilterAddress(address)
- }
-}
-
-func (p *peer) getBlockByHeight(height uint64) bool {
- msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
- return p.TrySend(BlockchainChannel, msg)
-}
-
-func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
- msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
- return p.TrySend(BlockchainChannel, msg)
-}
-
-func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
- msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
- return p.TrySend(BlockchainChannel, msg)
-}
-
-func (p *peer) getPeerInfo() *PeerInfo {
- p.mtx.RLock()
- defer p.mtx.RUnlock()
-
- sentStatus, receivedStatus := p.TrafficStatus()
- ping := sentStatus.Idle - receivedStatus.Idle
- if receivedStatus.Idle > sentStatus.Idle {
- ping = -ping
- }
-
- return &PeerInfo{
- ID: p.ID(),
- RemoteAddr: p.Addr().String(),
- Height: p.height,
- Ping: ping.String(),
- Duration: sentStatus.Duration.String(),
- TotalSent: sentStatus.Bytes,
- TotalReceived: receivedStatus.Bytes,
- AverageSentRate: sentStatus.AvgRate,
- AverageReceivedRate: receivedStatus.AvgRate,
- CurrentSentRate: sentStatus.CurRate,
- CurrentReceivedRate: receivedStatus.CurRate,
- }
-}
-
-func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
- var relatedTxs []*types.Tx
- var relatedStatuses []*bc.TxVerifyResult
- for i, tx := range txs {
- if p.isRelatedTx(tx) {
- relatedTxs = append(relatedTxs, tx)
- relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
- }
- }
- return relatedTxs, relatedStatuses
-}
-
-func (p *peer) isRelatedTx(tx *types.Tx) bool {
- for _, input := range tx.Inputs {
- switch inp := input.TypedInput.(type) {
- case *types.SpendInput:
- if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
- return true
- }
- }
- }
- for _, output := range tx.Outputs {
- if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
- return true
- }
- }
- return false
-}
-
-func (p *peer) isSPVNode() bool {
- return !p.services.IsEnable(consensus.SFFullNode)
-}
-
-func (p *peer) markBlock(hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
-
- for p.knownBlocks.Size() >= maxKnownBlocks {
- p.knownBlocks.Pop()
- }
- p.knownBlocks.Add(hash.String())
-}
-
-func (p *peer) markNewStatus(height uint64) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
-
- p.knownStatus = height
-}
-
-func (p *peer) markTransaction(hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
-
- for p.knownTxs.Size() >= maxKnownTxs {
- p.knownTxs.Pop()
- }
- p.knownTxs.Add(hash.String())
-}
-
-func (p *peer) sendBlock(block *types.Block) (bool, error) {
- msg, err := NewBlockMessage(block)
- if err != nil {
- return false, errors.Wrap(err, "fail on NewBlockMessage")
- }
-
- ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- if ok {
- blcokHash := block.Hash()
- p.knownBlocks.Add(blcokHash.String())
- }
- return ok, nil
-}
-
-func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
- msg, err := NewBlocksMessage(blocks)
- if err != nil {
- return false, errors.Wrap(err, "fail on NewBlocksMessage")
- }
-
- if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- return ok, nil
- }
-
- for _, block := range blocks {
- blcokHash := block.Hash()
- p.knownBlocks.Add(blcokHash.String())
- }
- return true, nil
-}
-
-func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
- msg, err := NewHeadersMessage(headers)
- if err != nil {
- return false, errors.New("fail on NewHeadersMessage")
- }
-
- ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return ok, nil
-}
-
-func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
- msg := NewMerkleBlockMessage()
- if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
- return false, err
- }
-
- relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
-
- txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
- if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
- return false, nil
- }
-
- statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
- if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
- return false, nil
- }
-
- ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return ok, nil
-}
-
-func (p *peer) sendTransactions(txs []*types.Tx) error {
- validTxs := make([]*types.Tx, 0, len(txs))
- for i, tx := range txs {
- if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
- continue
- }
-
- validTxs = append(validTxs, tx)
- if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
- continue
- }
-
- msg, err := NewTransactionsMessage(validTxs)
- if err != nil {
- return err
- }
-
- if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- return errors.New("failed to send txs msg")
- }
-
- for _, validTx := range validTxs {
- p.knownTxs.Add(validTx.ID.String())
- }
-
- validTxs = make([]*types.Tx, 0, len(txs))
- }
-
- return nil
-}
-
-func (p *peer) sendStatus(header *types.BlockHeader) error {
- msg := NewStatusMessage(header)
- if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- return errSendStatusMsg
- }
- p.markNewStatus(header.Height)
- return nil
-}
-
-func (p *peer) setStatus(height uint64, hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
- p.height = height
- p.hash = hash
-}
-
-type peerSet struct {
- BasePeerSet
- mtx sync.RWMutex
- peers map[string]*peer
-}
-
-// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet(basePeerSet BasePeerSet) *peerSet {
- return &peerSet{
- BasePeerSet: basePeerSet,
- peers: make(map[string]*peer),
- }
-}
-
-func (ps *peerSet) addBanScore(peerID string, persistent, transient uint32, reason string) {
- ps.mtx.Lock()
- peer := ps.peers[peerID]
- ps.mtx.Unlock()
-
- if peer == nil {
- return
- }
- if ban := peer.addBanScore(persistent, transient, reason); !ban {
- return
- }
- if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
- }
- ps.removePeer(peerID)
-}
-
-func (ps *peerSet) addPeer(peer BasePeer) {
- ps.mtx.Lock()
- defer ps.mtx.Unlock()
-
- if _, ok := ps.peers[peer.ID()]; !ok {
- ps.peers[peer.ID()] = newPeer(peer)
- return
- }
- log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
-}
-
-func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- var bestPeer *peer
- for _, p := range ps.peers {
- if !p.services.IsEnable(flag) {
- continue
- }
- if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
- bestPeer = p
- }
- }
- return bestPeer
-}
-
-func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
- msg, err := NewMinedBlockMessage(block)
- if err != nil {
- return errors.Wrap(err, "fail on broadcast mined block")
- }
-
- hash := block.Hash()
- peers := ps.peersWithoutBlock(&hash)
- for _, peer := range peers {
- if peer.isSPVNode() {
- continue
- }
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
- ps.removePeer(peer.ID())
- continue
- }
- peer.markBlock(&hash)
- peer.markNewStatus(block.Height)
- }
- return nil
-}
-
-func (ps *peerSet) broadcastNewStatus(bestBlock *types.Block) error {
- msg := NewStatusMessage(&bestBlock.BlockHeader)
- peers := ps.peersWithoutNewStatus(bestBlock.Height)
- for _, peer := range peers {
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- ps.removePeer(peer.ID())
- continue
- }
-
- peer.markNewStatus(bestBlock.Height)
- }
- return nil
-}
-
-func (ps *peerSet) broadcastTx(tx *types.Tx) error {
- msg, err := NewTransactionMessage(tx)
- if err != nil {
- return errors.Wrap(err, "fail on broadcast tx")
- }
-
- peers := ps.peersWithoutTx(&tx.ID)
- for _, peer := range peers {
- if peer.isSPVNode() && !peer.isRelatedTx(tx) {
- continue
- }
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- log.WithFields(log.Fields{
- "module": logModule,
- "peer": peer.Addr(),
- "type": reflect.TypeOf(msg),
- "message": msg.String(),
- }).Warning("send message to peer error")
- ps.removePeer(peer.ID())
- continue
- }
- peer.markTransaction(&tx.ID)
- }
- return nil
-}
-
-func (ps *peerSet) errorHandler(peerID string, err error) {
- if errors.Root(err) == errPeerMisbehave {
- ps.addBanScore(peerID, 20, 0, err.Error())
- } else {
- ps.removePeer(peerID)
- }
-}
-
-// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) getPeer(id string) *peer {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
- return ps.peers[id]
-}
-
-func (ps *peerSet) getPeerInfos() []*PeerInfo {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- result := []*PeerInfo{}
- for _, peer := range ps.peers {
- result = append(result, peer.getPeerInfo())
- }
- return result
-}
-
-func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
- ps.mtx.Lock()
- peer := ps.peers[peerID]
- ps.mtx.Unlock()
-
- if peer == nil {
- return
- }
- peer.markTransaction(&txHash)
-}
-
-func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- peers := []*peer{}
- for _, peer := range ps.peers {
- if !peer.knownBlocks.Has(hash.String()) {
- peers = append(peers, peer)
- }
- }
- return peers
-}
-
-func (ps *peerSet) peersWithoutNewStatus(height uint64) []*peer {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- var peers []*peer
- for _, peer := range ps.peers {
- if peer.knownStatus < height {
- peers = append(peers, peer)
- }
- }
- return peers
-}
-
-func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- peers := []*peer{}
- for _, peer := range ps.peers {
- if !peer.knownTxs.Has(hash.String()) {
- peers = append(peers, peer)
- }
- }
- return peers
-}
-
-func (ps *peerSet) removePeer(peerID string) {
- ps.mtx.Lock()
- delete(ps.peers, peerID)
- ps.mtx.Unlock()
- ps.StopPeerGracefully(peerID)
-}
--- /dev/null
+package peers
+
+import (
+ "encoding/hex"
+ "net"
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/tendermint/tmlibs/flowrate"
+ "gopkg.in/fatih/set.v0"
+
+ "github.com/vapor/consensus"
+ "github.com/vapor/errors"
+ "github.com/vapor/p2p/trust"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+const (
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+ defaultBanThreshold = uint32(100)
+ maxFilterAddressSize = 50
+ maxFilterAddressCount = 1000
+
+ logModule = "peers"
+)
+
+var (
+ errSendStatusMsg = errors.New("send status msg fail")
+ ErrPeerMisbehave = errors.New("peer is misbehave")
+)
+
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+ Addr() net.Addr
+ ID() string
+ ServiceFlag() consensus.ServiceFlag
+ TrafficStatus() (*flowrate.Status, *flowrate.Status)
+ TrySend(byte, interface{}) bool
+ IsLAN() bool
+}
+
+// PeerInfo indicate peer status snap
+type PeerInfo struct {
+ ID string `json:"peer_id"`
+ RemoteAddr string `json:"remote_addr"`
+ Height uint64 `json:"height"`
+ Ping string `json:"ping"`
+ Duration string `json:"duration"`
+ TotalSent int64 `json:"total_sent"`
+ TotalReceived int64 `json:"total_received"`
+ AverageSentRate int64 `json:"average_sent_rate"`
+ AverageReceivedRate int64 `json:"average_received_rate"`
+ CurrentSentRate int64 `json:"current_sent_rate"`
+ CurrentReceivedRate int64 `json:"current_received_rate"`
+}
+
+type Peer struct {
+ BasePeer
+ mtx sync.RWMutex
+ services consensus.ServiceFlag
+ height uint64
+ hash *bc.Hash
+ banScore trust.DynamicBanScore
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ knownStatus uint64 // Set of chain status known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
+}
+
+func newPeer(basePeer BasePeer) *Peer {
+ return &Peer{
+ BasePeer: basePeer,
+ services: basePeer.ServiceFlag(),
+ knownTxs: set.New(),
+ knownBlocks: set.New(),
+ filterAdds: set.New(),
+ }
+}
+
+func (p *Peer) bestHeight() uint64 {
+ p.mtx.RLock()
+ defer p.mtx.RUnlock()
+ return p.height
+}
+
+func (p *Peer) addBanScore(persistent, transient uint32, reason string) bool {
+ score := p.banScore.Increase(persistent, transient)
+ if score > defaultBanThreshold {
+ log.WithFields(log.Fields{
+ "module": logModule,
+ "address": p.Addr(),
+ "score": score,
+ "reason": reason,
+ }).Errorf("banning and disconnecting")
+ return true
+ }
+
+ warnThreshold := defaultBanThreshold >> 1
+ if score > warnThreshold {
+ log.WithFields(log.Fields{
+ "module": logModule,
+ "address": p.Addr(),
+ "score": score,
+ "reason": reason,
+ }).Warning("ban score increasing")
+ }
+ return false
+}
+
+func (p *Peer) addFilterAddress(address []byte) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ if p.filterAdds.Size() >= maxFilterAddressCount {
+ log.WithField("module", logModule).Warn("the count of filter addresses is greater than limit")
+ return
+ }
+ if len(address) > maxFilterAddressSize {
+ log.WithField("module", logModule).Warn("the size of filter address is greater than limit")
+ return
+ }
+
+ p.filterAdds.Add(hex.EncodeToString(address))
+}
+
+func (p *Peer) addFilterAddresses(addresses [][]byte) {
+ if !p.filterAdds.IsEmpty() {
+ p.filterAdds.Clear()
+ }
+ for _, address := range addresses {
+ p.addFilterAddress(address)
+ }
+}
+
+//func (p *Peer) GetBlockByHeight(height uint64) bool {
+// return p.TrySend(BlockchainChannel, msg)
+//}
+
+//func (p *Peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+// return p.TrySend(BlockchainChannel, msg)
+//}
+
+//func (p *Peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+// msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+// return p.TrySend(BlockchainChannel, msg)
+//}
+
+func (p *Peer) getPeerInfo() *PeerInfo {
+ p.mtx.RLock()
+ defer p.mtx.RUnlock()
+
+ sentStatus, receivedStatus := p.TrafficStatus()
+ ping := sentStatus.Idle - receivedStatus.Idle
+ if receivedStatus.Idle > sentStatus.Idle {
+ ping = -ping
+ }
+
+ return &PeerInfo{
+ ID: p.ID(),
+ RemoteAddr: p.Addr().String(),
+ Height: p.height,
+ Ping: ping.String(),
+ Duration: sentStatus.Duration.String(),
+ TotalSent: sentStatus.Bytes,
+ TotalReceived: receivedStatus.Bytes,
+ AverageSentRate: sentStatus.AvgRate,
+ AverageReceivedRate: receivedStatus.AvgRate,
+ CurrentSentRate: sentStatus.CurRate,
+ CurrentReceivedRate: receivedStatus.CurRate,
+ }
+}
+
+func (p *Peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+ var relatedTxs []*types.Tx
+ var relatedStatuses []*bc.TxVerifyResult
+ for i, tx := range txs {
+ if p.isRelatedTx(tx) {
+ relatedTxs = append(relatedTxs, tx)
+ relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
+ }
+ }
+ return relatedTxs, relatedStatuses
+}
+
+func (p *Peer) isRelatedTx(tx *types.Tx) bool {
+ for _, input := range tx.Inputs {
+ switch inp := input.TypedInput.(type) {
+ case *types.SpendInput:
+ if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+ return true
+ }
+ }
+ }
+ for _, output := range tx.Outputs {
+ if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram())) {
+ return true
+ }
+ }
+ return false
+}
+
+func (p *Peer) isSPVNode() bool {
+ return !p.services.IsEnable(consensus.SFFullNode)
+}
+
+func (p *Peer) markBlock(hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ for p.knownBlocks.Size() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ p.knownBlocks.Add(hash.String())
+}
+
+func (p *Peer) markNewStatus(height uint64) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ p.knownStatus = height
+}
+
+func (p *Peer) markTransaction(hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ for p.knownTxs.Size() >= maxKnownTxs {
+ p.knownTxs.Pop()
+ }
+ p.knownTxs.Add(hash.String())
+}
+
+//func (p *Peer) sendStatus(header *types.BlockHeader) error {
+// msg := NewStatusMessage(header)
+// if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+// return errSendStatusMsg
+// }
+// p.markNewStatus(header.Height)
+// return nil
+//}
+
+func (p *Peer) setStatus(height uint64, hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+ p.height = height
+ p.hash = hash
+}
--- /dev/null
+package peers
+
+import (
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/consensus"
+ "github.com/vapor/errors"
+ "github.com/vapor/protocol/bc"
+)
+
+/*
+//添加删除查询节点
+AddPeer(peer BasePeer)
+RemovePeer(peerID string)
+GetPeer(id string) *peer
+BestPeerInfo(flag consensus.ServiceFlag) (string, uint64)
+GetPeerInfo(peerID string) *PeerInfo
+GetPeerInfos() []*PeerInfo
+SetStatus(peerID string, height uint64, hash *bc.Hash)
+
+//节点错误处理
+AddBanScore(peerID string, persistent, transient uint32, reason string)
+ErrorHandler(peerID string, err error)
+*/
+
+//BasePeerSet is the intergace for connection level peer manager
+type BasePeerSet interface {
+ AddBannedPeer(string) error
+ StopPeerGracefully(string)
+}
+
+type PeerSet struct {
+ BasePeerSet
+ mtx sync.RWMutex
+ peers map[string]*Peer
+}
+
+// NewPeerSet creates a new Peer set to track the active participants.
+func NewPeerSet(basePeerSet BasePeerSet) *PeerSet {
+ return &PeerSet{
+ BasePeerSet: basePeerSet,
+ peers: make(map[string]*Peer),
+ }
+}
+
+func (ps *PeerSet) AddBanScore(peerID string, persistent, transient uint32, reason string) {
+ ps.mtx.Lock()
+ peer := ps.peers[peerID]
+ ps.mtx.Unlock()
+
+ if peer == nil {
+ return
+ }
+ if ban := peer.addBanScore(persistent, transient, reason); !ban {
+ return
+ }
+ if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on add ban peer")
+ }
+ ps.RemovePeer(peerID)
+}
+
+func (ps *PeerSet) AddPeer(peer BasePeer) {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+
+ if _, ok := ps.peers[peer.ID()]; !ok {
+ ps.peers[peer.ID()] = newPeer(peer)
+ return
+ }
+ log.WithField("module", logModule).Warning("add existing peer to blockKeeper")
+}
+
+func (ps *PeerSet) BestPeerInfo(flag consensus.ServiceFlag) (string, uint64) {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var bestPeer *Peer
+ for _, p := range ps.peers {
+ if !p.services.IsEnable(flag) {
+ continue
+ }
+ if bestPeer == nil || p.height > bestPeer.height || (p.height == bestPeer.height && p.IsLAN()) {
+ bestPeer = p
+ }
+ }
+ if bestPeer == nil {
+ return "", 0
+ }
+ return bestPeer.ID(), bestPeer.bestHeight()
+}
+
+func (ps *PeerSet) ErrorHandler(peerID string, err error) {
+ if errors.Root(err) == ErrPeerMisbehave {
+ ps.AddBanScore(peerID, 20, 0, err.Error())
+ } else {
+ ps.RemovePeer(peerID)
+ }
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *PeerSet) GetPeer(id string) *Peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+ return ps.peers[id]
+}
+
+func (ps *PeerSet) GetPeerInfo(peerID string) *PeerInfo {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return nil
+ }
+
+ return peer.getPeerInfo()
+}
+
+func (ps *PeerSet) GetPeerInfos() []*PeerInfo {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ result := []*PeerInfo{}
+ for _, peer := range ps.peers {
+ result = append(result, peer.getPeerInfo())
+ }
+ return result
+}
+
+func (ps *PeerSet) RemovePeer(peerID string) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+
+ ps.mtx.Lock()
+ delete(ps.peers, peerID)
+ ps.mtx.Unlock()
+ ps.StopPeerGracefully(peerID)
+}
+
+func (ps *PeerSet) SetStatus(peerID string, height uint64, hash *bc.Hash) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+
+ peer.setStatus(height, hash)
+}
--- /dev/null
+package peers
+
+import (
+ "reflect"
+
+ log "github.com/sirupsen/logrus"
+
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+//防止重传
+//MarkBlock(peerID string, hash *bc.Hash)
+//MarkTx(peerID string, hash *bc.Hash)
+//peersWithoutBlock(hash *bc.Hash) []*peer
+//peersWithoutNewStatus(height uint64) []*peer
+//peersWithoutTx(hash *bc.Hash) []*peer
+
+//过滤有效交易
+//AddFilterAddresses(peerID string, addresses [][]byte)
+//ClearFilterAdds(peerID string)
+//FilterValidTxs(peerID string, txs []*types.Tx)
+//GetRelatedTxAndStatus(peerID string, txs []*types.Tx, txStatuses *bc.TransactionStatus)
+
+//传输
+//BroadcastMinedBlock(block *types.Block)
+//BroadcastNewStatus(bestBlock *types.Block) error
+//BroadcastTx(tx *types.Tx) error
+//SendMsg(peerID string, msgChannel byte, msg interface{})
+
+func (ps *PeerSet) AddFilterAddresses(peerID string, addresses [][]byte) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.addFilterAddresses(addresses)
+}
+
+type BroadcastMsg interface {
+ Filter(ps *PeerSet) []string
+ Mark(ps *PeerSet, peers []string)
+ GetChan() byte
+ GetMsg() interface{}
+ MsgString() string
+}
+
+func (ps *PeerSet) BroadcastMsg(bm BroadcastMsg) error {
+ peers := bm.Filter(ps)
+ peersSuccess := make([]string, 0)
+ for _, peer := range peers {
+ if ok := ps.SendMsg(peer, bm.GetChan(), bm.GetMsg()); !ok {
+ log.WithFields(log.Fields{"module": logModule, "peer": peer, "type": reflect.TypeOf(bm.GetMsg()), "message": bm.MsgString()}).Warning("send message to peer error")
+ continue
+ }
+ peersSuccess = append(peersSuccess, peer)
+ }
+ bm.Mark(ps, peersSuccess)
+ return nil
+}
+
+//func (ps *PeerSet) BroadcastMinedBlock(block *types.Block) error {
+// msg, err := NewMinedBlockMessage(block)
+// if err != nil {
+// return errors.Wrap(err, "fail on broadcast mined block")
+// }
+//
+// hash := block.Hash()
+// peers := ps.peersWithoutBlock(&hash)
+// for _, peer := range peers {
+// if peer.isSPVNode() {
+// continue
+// }
+//
+// if ok := ps.SendMsg(peer.ID(), BlockchainChannel, msg); !ok {
+// log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
+// continue
+// }
+//
+// peer.markBlock(&hash)
+// peer.markNewStatus(block.Height)
+// }
+// return nil
+//}
+
+//func (ps *PeerSet) BroadcastNewStatus(bestBlock *types.Block) error {
+// msg := NewStatusMessage(&bestBlock.BlockHeader)
+// peers := ps.peersWithoutNewStatus(bestBlock.Height)
+// for _, peer := range peers {
+// if ok := ps.SendMsg(peer.ID(), BlockchainChannel, msg); !ok {
+// log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
+// continue
+// }
+// peer.markNewStatus(bestBlock.Height)
+// }
+// return nil
+//}
+
+//func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
+// msg, err := NewTransactionMessage(tx)
+// if err != nil {
+// return errors.Wrap(err, "fail on broadcast tx")
+// }
+//
+// peers := ps.peersWithoutTx(&tx.ID)
+// for _, peer := range peers {
+// if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+// continue
+// }
+// if ok := ps.SendMsg(peer.ID(), BlockchainChannel, msg); !ok {
+// log.WithFields(log.Fields{"module": logModule, "peer": peer.Addr(), "type": reflect.TypeOf(msg), "message": msg.String()}).Warning("send message to peer error")
+// continue
+// }
+// peer.markTransaction(&tx.ID)
+// }
+// return nil
+//}
+
+func (ps *PeerSet) ClearFilterAdds(peerID string) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.filterAdds.Clear()
+}
+
+func (ps *PeerSet) FilterValidTxs(peerID string, txs []*types.Tx) []*types.Tx {
+ validTxs := make([]*types.Tx, 0, len(txs))
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return validTxs
+ }
+
+ for _, tx := range txs {
+ if peer.isSPVNode() && !peer.isRelatedTx(tx) || peer.knownTxs.Has(tx.ID.String()) {
+ continue
+ }
+
+ validTxs = append(validTxs, tx)
+ }
+ return validTxs
+}
+
+func (ps *PeerSet) GetRelatedTxAndStatus(peerID string, txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return nil, nil
+ }
+ return peer.getRelatedTxAndStatus(txs, txStatuses)
+}
+
+func (ps *PeerSet) MarkBlock(peerID string, hash *bc.Hash) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.markBlock(hash)
+}
+
+func (ps *PeerSet) MarkStatus(peerID string, height uint64) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.markNewStatus(height)
+}
+
+func (ps *PeerSet) MarkTx(peerID string, hash *bc.Hash) {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return
+ }
+ peer.markTransaction(hash)
+}
+
+func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []string
+ for _, peer := range ps.peers {
+ if !peer.knownBlocks.Has(hash.String()) {
+ peers = append(peers, peer.ID())
+ }
+ }
+ return peers
+}
+
+func (ps *PeerSet) PeersWithoutNewStatus(height uint64) []string {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []string
+ for _, peer := range ps.peers {
+ if peer.knownStatus < height {
+ peers = append(peers, peer.ID())
+ }
+ }
+ return peers
+}
+
+func (ps *PeerSet) PeersWithoutTx(hash bc.Hash) []string {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []string
+ for _, peer := range ps.peers {
+ if !peer.knownTxs.Has(hash.String()) {
+ peers = append(peers, peer.ID())
+ }
+ }
+ return peers
+}
+
+func (ps *PeerSet) SendMsg(peerID string, msgChannel byte, msg interface{}) bool {
+ peer := ps.GetPeer(peerID)
+ if peer == nil {
+ return false
+ }
+
+ ok := peer.TrySend(msgChannel, msg)
+ if !ok {
+ ps.RemovePeer(peerID)
+ }
+ return ok
+}
package netsync
import (
- "time"
-
log "github.com/sirupsen/logrus"
"github.com/vapor/errors"
"github.com/vapor/p2p/connection"
)
-const (
- handshakeTimeout = 10 * time.Second
- handshakeCheckPerid = 500 * time.Millisecond
-)
-
var (
- errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
- errStatusRequest = errors.New("Status request error")
+ errSendStatusMsg = errors.New("Status msg send error")
)
//ProtocolReactor handles new coming protocol message.
type ProtocolReactor struct {
p2p.BaseReactor
- sm *SyncManager
- peers *peerSet
+ sm *SyncManager
}
// NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
+func NewProtocolReactor(sm *SyncManager) *ProtocolReactor {
pr := &ProtocolReactor{
- sm: sm,
- peers: peers,
+ sm: sm,
}
pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
return pr
// AddPeer implements Reactor by sending our state to peer.
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
pr.sm.AddPeer(peer)
- if err := pr.sm.SendStatus(peer); err != nil {
- return err
+ if ok := pr.sm.SendStatus(peer.ID()); !ok {
+ return errSendStatusMsg
}
pr.sm.syncTransactions(peer.Key)
return nil
// RemovePeer implements Reactor by removing peer from the pool.
func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- pr.peers.removePeer(peer.Key)
+ pr.sm.RemovePeer(peer)
}
// Receive implements Reactor by handling 4 types of messages (look below).
return
}
- pr.sm.processMsg(src, msgType, msg)
+ pr.sm.processMsg(src.ID(), msgType, msg)
}
"github.com/tendermint/tmlibs/flowrate"
"github.com/vapor/consensus"
+ "github.com/vapor/netsync/peers"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
"github.com/vapor/test/mock"
if p.async {
p.msgCh <- msgBytes
} else {
- msgType, msg, _ := DecodeMessage(msgBytes)
- p.remoteNode.processMsg(p.srcPeer, msgType, msg)
+ msgType, msg, _ := message.DecodeMessage(msgBytes)
+ p.remoteNode.processMsg(p.srcPeer.ID(), msgType, msg)
}
return true
}
func (p *P2PPeer) postMan() {
for msgBytes := range p.msgCh {
- msgType, msg, _ := DecodeMessage(msgBytes)
- p.remoteNode.processMsg(p.srcPeer, msgType, msg)
+ msgType, msg, _ := message.DecodeMessage(msgBytes)
+ p.remoteNode.processMsg(p.srcPeer.ID(), msgType, msg)
}
}
func mockSync(blocks []*types.Block) *SyncManager {
chain := mock.NewChain()
- peers := newPeerSet(NewPeerSet())
+ peers := peers.NewPeerSet(NewPeerSet())
chain.SetBestBlockHeader(&blocks[len(blocks)-1].BlockHeader)
for _, block := range blocks {
chain.SetBlockByHeight(block.Height, block)
log "github.com/sirupsen/logrus"
+ "github.com/vapor/errors"
core "github.com/vapor/protocol"
"github.com/vapor/protocol/bc/types"
)
}
func (sm *SyncManager) syncTransactions(peerID string) {
- pending := sm.txPool.GetTransactions()
- if len(pending) == 0 {
+ txDescs := sm.txPool.GetTransactions()
+ if len(txDescs) == 0 {
return
}
-
- txs := make([]*types.Tx, len(pending))
- for i, batch := range pending {
+ //*TxDesc slice -> *types.Tx slice
+ txs := make([]*types.Tx, len(txDescs))
+ for i, batch := range txDescs {
txs[i] = batch.Tx
}
+ //get valid txs
+ validTxs := sm.peers.FilterValidTxs(peerID, txs)
+ if len(validTxs) == 0 {
+ return
+ }
+
sm.txSyncCh <- &txSyncMsg{peerID, txs}
}
}
if ev.TxMsg.MsgType == core.MsgNewTx {
- if err := sm.peers.broadcastTx(ev.TxMsg.Tx); err != nil {
+ txMsg, _ := newTxBroadcastMsg(ev.TxMsg.Tx, BlockchainChannel)
+ if err := sm.peers.BroadcastMsg(txMsg); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
continue
}
// send starts a sending a pack of transactions from the sync.
send := func(msg *txSyncMsg) {
- peer := sm.peers.getPeer(msg.peerID)
+ peer := sm.peers.GetPeer(msg.peerID)
if peer == nil {
delete(pending, msg.peerID)
return
}).Debug("txSyncLoop sending transactions")
sending = true
go func() {
- err := peer.sendTransactions(sendTxs)
+ txsMsg, err := NewTransactionsMessage(sendTxs)
if err != nil {
- sm.peers.removePeer(msg.peerID)
+ log.WithFields(log.Fields{"module": logModule, "error": err}).Debug("failed create transactions msg")
+ done <- err
+ return
+ }
+
+ if ok := sm.peers.SendMsg(msg.peerID, BlockchainChannel, txsMsg); !ok {
+ sm.peers.RemovePeer(msg.peerID)
+ err = errors.New("send txsMsg err")
}
done <- err
}()
--- /dev/null
+package vapor