OSDN Git Service
(root)
/
bytom
/
vapor.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Mov (#518)
[bytom/vapor.git]
/
netsync
/
chainmgr
/
handle.go
diff --git
a/netsync/chainmgr/handle.go
b/netsync/chainmgr/handle.go
index
e57e60a
..
35d1f9a
100644
(file)
--- a/
netsync/chainmgr/handle.go
+++ b/
netsync/chainmgr/handle.go
@@
-3,6
+3,7
@@
package chainmgr
import (
"errors"
"reflect"
import (
"errors"
"reflect"
+ "time"
log "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
@@
-38,6
+39,7
@@
type Chain interface {
ValidateTx(*types.Tx) (bool, error)
}
ValidateTx(*types.Tx) (bool, error)
}
+// Switch is the interface for network layer
type Switch interface {
AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
Start() (bool, error)
type Switch interface {
AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
Start() (bool, error)
@@
-50,6
+52,7
@@
type Switch interface {
// Mempool is the interface for Bytom mempool
type Mempool interface {
GetTransactions() []*core.TxDesc
// Mempool is the interface for Bytom mempool
type Mempool interface {
GetTransactions() []*core.TxDesc
+ IsDust(tx *types.Tx) bool
}
//Manager is responsible for the business layer information synchronization
}
//Manager is responsible for the business layer information synchronization
@@
-89,6
+92,7
@@
func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dis
return manager, nil
}
return manager, nil
}
+// AddPeer add the network layer peer to logic layer
func (m *Manager) AddPeer(peer peers.BasePeer) {
m.peers.AddPeer(peer)
}
func (m *Manager) AddPeer(peer peers.BasePeer) {
m.peers.AddPeer(peer)
}
@@
-153,7
+157,12
@@
func (m *Manager) handleGetBlockMsg(peer *peers.Peer, msg *msgs.GetBlockMessage)
}
func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
}
func (m *Manager) handleGetBlocksMsg(peer *peers.Peer, msg *msgs.GetBlocksMessage) {
- blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+ endTime := time.Now().Add(requireBlocksTimeout / 2)
+ isTimeout := func() bool {
+ return time.Now().After(endTime)
+ }
+
+ blocks, err := m.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash(), isTimeout)
if err != nil || len(blocks) == 0 {
return
}
if err != nil || len(blocks) == 0 {
return
}
@@
-254,6
+263,11
@@
func (m *Manager) handleTransactionMsg(peer *peers.Peer, msg *msgs.TransactionMe
return
}
return
}
+ if m.mempool.IsDust(tx) {
+ m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust tx msg")
+ return
+ }
+
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
@@
-273,6
+287,11
@@
func (m *Manager) handleTransactionsMsg(peer *peers.Peer, msg *msgs.Transactions
}
for _, tx := range txs {
}
for _, tx := range txs {
+ if m.mempool.IsDust(tx) {
+ m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "receive dust txs msg")
+ continue
+ }
+
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
@@
-343,10
+362,12
@@
func (m *Manager) processMsg(basePeer peers.BasePeer, msgType byte, msg msgs.Blo
}
}
}
}
+// RemovePeer delete peer for peer set
func (m *Manager) RemovePeer(peerID string) {
m.peers.RemovePeer(peerID)
}
func (m *Manager) RemovePeer(peerID string) {
m.peers.RemovePeer(peerID)
}
+// SendStatus sent the current self status to remote peer
func (m *Manager) SendStatus(peer peers.BasePeer) error {
p := m.peers.GetPeer(peer.ID())
if p == nil {
func (m *Manager) SendStatus(peer peers.BasePeer) error {
p := m.peers.GetPeer(peer.ID())
if p == nil {
@@
-360,6
+381,7
@@
func (m *Manager) SendStatus(peer peers.BasePeer) error {
return nil
}
return nil
}
+// Start the network logic layer
func (m *Manager) Start() error {
var err error
m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})
func (m *Manager) Start() error {
var err error
m.txMsgSub, err = m.eventDispatcher.Subscribe(core.TxMsgEvent{})