1 // Copyright 2015 The go-ethereum Authors
2 // This file is part of the go-ethereum library.
4 // The go-ethereum library is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
9 // The go-ethereum library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU Lesser General Public License for more details.
14 // You should have received a copy of the GNU Lesser General Public License
15 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
24 log "github.com/sirupsen/logrus"
26 "github.com/bytom/common"
27 "github.com/bytom/protocol/bc/types"
31 forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
32 minDesiredPeerCount = 5 // Amount of peers desired to start syncing
34 // This is the target size for the packs of transactions sent by txsyncLoop.
35 // A pack can get larger than this if a single transactions exceeds this size.
36 txsyncPackSize = 100 * 1024
44 // syncer is responsible for periodically synchronising with the network, both
45 // downloading hashes and blocks as well as handling the announcement handler.
46 func (sm *SyncManager) syncer() {
47 // Start and ensure cleanup of sync mechanisms
49 defer sm.fetcher.Stop()
50 //defer sm.downloader.Terminate()
52 // Wait for different events to fire synchronisation operations
53 forceSync := time.NewTicker(forceSyncCycle)
54 defer forceSync.Stop()
59 log.Info("New peer connected.")
60 // Make sure we have peers to select from, then sync
61 if sm.sw.Peers().Size() < minDesiredPeerCount {
67 // Force a sync even if not enough peers are present
76 // synchronise tries to sync up our local block chain with a remote peer.
77 func (sm *SyncManager) synchronise() {
78 log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
79 // Make sure only one goroutine is ever allowed past this point at once
80 if !atomic.CompareAndSwapInt32(&sm.synchronising, 0, 1) {
81 log.Info("Synchronising ...")
84 defer atomic.StoreInt32(&sm.synchronising, 0)
86 peer, bestHeight := sm.peers.BestPeer()
87 // Short circuit if no peers are available
91 if bestHeight > sm.chain.BestBlockHeight() {
92 log.Info("sync peer:", peer.Addr(), " height:", bestHeight)
93 sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
97 // txsyncLoop takes care of the initial transaction sync for each new
98 // connection. When a new peer appears, we relay all currently pending
99 // transactions. In order to minimise egress bandwidth usage, we send
100 // the transactions in small packs to one peer at a time.
101 func (sm *SyncManager) txsyncLoop() {
103 pending = make(map[string]*txsync)
104 sending = false // whether a send is active
105 pack = new(txsync) // the pack that is being sent
106 done = make(chan error, 1) // result of the send
109 // send starts a sending a pack of transactions from the sync.
110 send := func(s *txsync) {
111 // Fill pack with transactions up to the target size.
112 size := common.StorageSize(0)
114 pack.txs = pack.txs[:0]
115 for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
116 pack.txs = append(pack.txs, s.txs[i])
117 size += common.StorageSize(s.txs[i].SerializedSize)
119 // Remove the transactions that will be sent.
120 s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
122 delete(pending, s.p.Key)
124 // Send the pack in the background.
125 log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
127 go func() { done <- pack.p.SendTransactions(pack.txs) }()
130 // pick chooses the next pending sync.
131 pick := func() *txsync {
132 if len(pending) == 0 {
135 n := rand.Intn(len(pending)) + 1
136 for _, s := range pending {
146 case s := <-sm.txSyncCh:
153 // Stop tracking peers that cause send failures.
155 log.Info("Transaction send failed", "err", err)
156 delete(pending, pack.p.Key)
158 // Schedule the next send.
159 if s := pick(); s != nil {