--- /dev/null
+package common
+
+import (
+ "errors"
+ "sync"
+)
+
+// OrderedSet is a set with limited capacity.
+// Items are evicted according to their insertion order.
+type OrderedSet struct {
+ capacity int
+ set map[interface{}]struct{}
+ queue []interface{}
+ start int
+ end int
+
+ lock sync.RWMutex
+}
+
+// NewOrderedSet creates an ordered set with given capacity
+func NewOrderedSet(capacity int) (*OrderedSet, error) {
+ if capacity < 1 {
+ return nil, errors.New("capacity must be a positive integer")
+ }
+
+ return &OrderedSet{
+ capacity: capacity,
+ set: map[interface{}]struct{}{},
+ queue: make([]interface{}, capacity),
+ end: -1,
+ }, nil
+}
+
+// Add inserts items into the set.
+// If capacity is reached, oldest items are evicted
+func (os *OrderedSet) Add(items ...interface{}) {
+ os.lock.Lock()
+ defer os.lock.Unlock()
+
+ for _, item := range items {
+ if _, ok := os.set[item]; ok {
+ continue
+ }
+
+ next := (os.end + 1) % os.capacity
+ if os.end != -1 && next == os.start {
+ delete(os.set, os.queue[os.start])
+ os.start = (os.start + 1) % os.capacity
+ }
+ os.end = next
+ os.queue[os.end] = item
+ os.set[item] = struct{}{}
+ }
+}
+
+// Has checks if certain items exists in the set
+func (os *OrderedSet) Has(item interface{}) bool {
+ os.lock.RLock()
+ defer os.lock.RUnlock()
+
+ _, ok := os.set[item]
+ return ok
+}
+
+// Size returns the size of the set
+func (os *OrderedSet) Size() int {
+ os.lock.RLock()
+ defer os.lock.RUnlock()
+
+ return len(os.set)
+}
log "github.com/sirupsen/logrus"
+ "github.com/vapor/common"
cfg "github.com/vapor/config"
"github.com/vapor/consensus"
dbm "github.com/vapor/database/leveldb"
)
const (
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+
logModule = "netsync"
)
eventDispatcher *event.Dispatcher
txMsgSub *event.Subscription
+ knownTxs *common.OrderedSet // Set of transaction hashes known so far
}
-//NewChainManager create a chain sync manager.
+//NewManager create a chain sync manager.
func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
+ knownTxs, _ := common.NewOrderedSet(maxKnownTxs)
manager := &Manager{
sw: sw,
mempool: mempool,
quit: make(chan struct{}),
config: config,
eventDispatcher: dispatcher,
+ knownTxs: knownTxs,
}
if !config.VaultMode {
return
}
+ if m.knownTxs.Has(tx.ID.String()) {
+ return
+ }
+
+ m.knownTxs.Add(tx.ID.String())
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
}
for _, tx := range txs {
+ if m.knownTxs.Has(tx.ID.String()) {
+ continue
+ }
+
+ m.knownTxs.Add(tx.ID.String())
m.peers.MarkTx(peer.ID(), tx.ID)
if isOrphan, err := m.chain.ValidateTx(tx); err != nil && !isOrphan {
m.peers.ProcessIllegal(peer.ID(), security.LevelMsgIllegal, "fail on validate tx transaction")
}
if ev.TxMsg.MsgType == core.MsgNewTx {
- if err := m.peers.BroadcastTx(ev.TxMsg.Tx); err != nil {
+ tx := ev.TxMsg.Tx
+ m.knownTxs.Add(tx.ID.String())
+ if err := m.peers.BroadcastTx(tx); err != nil {
log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
continue
}