OSDN Git Service

Merge pull request #802 from Bytom/dev
[bytom/bytom.git] / netsync / sync.go
1 // Copyright 2015 The go-ethereum Authors
2 // This file is part of the go-ethereum library.
3 //
4 // The go-ethereum library is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // The go-ethereum library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU Lesser General Public License for more details.
13 //
14 // You should have received a copy of the GNU Lesser General Public License
15 // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17 package netsync
18
19 import (
20         "math/rand"
21         "sync/atomic"
22         "time"
23
24         log "github.com/sirupsen/logrus"
25
26         "github.com/bytom/common"
27         "github.com/bytom/protocol/bc/types"
28 )
29
30 const (
31         forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
32         minDesiredPeerCount = 5                // Amount of peers desired to start syncing
33
34         // This is the target size for the packs of transactions sent by txsyncLoop.
35         // A pack can get larger than this if a single transactions exceeds this size.
36         txsyncPackSize = 100 * 1024
37 )
38
39 type txsync struct {
40         p   *peer
41         txs []*types.Tx
42 }
43
44 // syncer is responsible for periodically synchronising with the network, both
45 // downloading hashes and blocks as well as handling the announcement handler.
46 func (sm *SyncManager) syncer() {
47         // Start and ensure cleanup of sync mechanisms
48         sm.fetcher.Start()
49         defer sm.fetcher.Stop()
50         //defer sm.downloader.Terminate()
51
52         // Wait for different events to fire synchronisation operations
53         forceSync := time.NewTicker(forceSyncCycle)
54         defer forceSync.Stop()
55
56         for {
57                 select {
58                 case <-sm.newPeerCh:
59                         log.Info("New peer connected.")
60                         // Make sure we have peers to select from, then sync
61                         if sm.sw.Peers().Size() < minDesiredPeerCount {
62                                 break
63                         }
64                         go sm.synchronise()
65
66                 case <-forceSync.C:
67                         // Force a sync even if not enough peers are present
68                         go sm.synchronise()
69
70                 case <-sm.quitSync:
71                         return
72                 }
73         }
74 }
75
76 // synchronise tries to sync up our local block chain with a remote peer.
77 func (sm *SyncManager) synchronise() {
78         log.Info("bk peer num:", sm.blockKeeper.peers.Len(), " sw peer num:", sm.sw.Peers().Size(), " ", sm.sw.Peers().List())
79         // Make sure only one goroutine is ever allowed past this point at once
80         if !atomic.CompareAndSwapInt32(&sm.synchronising, 0, 1) {
81                 log.Info("Synchronising ...")
82                 return
83         }
84         defer atomic.StoreInt32(&sm.synchronising, 0)
85         for len(sm.dropPeerCh) > 0 {
86                 <-sm.dropPeerCh
87         }
88
89         peer, bestHeight := sm.peers.BestPeer()
90         // Short circuit if no peers are available
91         if peer == nil {
92                 return
93         }
94
95         if ok := sm.Switch().Peers().Has(peer.Key); !ok {
96                 log.Info("Peer disconnected")
97                 sm.sw.StopPeerGracefully(peer)
98                 return
99         }
100
101         if bestHeight > sm.chain.BestBlockHeight() {
102                 log.Info("sync peer:", peer.Addr(), " height:", bestHeight)
103                 sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
104         }
105 }
106
107 // txsyncLoop takes care of the initial transaction sync for each new
108 // connection. When a new peer appears, we relay all currently pending
109 // transactions. In order to minimise egress bandwidth usage, we send
110 // the transactions in small packs to one peer at a time.
111 func (sm *SyncManager) txsyncLoop() {
112         var (
113                 pending = make(map[string]*txsync)
114                 sending = false               // whether a send is active
115                 pack    = new(txsync)         // the pack that is being sent
116                 done    = make(chan error, 1) // result of the send
117         )
118
119         // send starts a sending a pack of transactions from the sync.
120         send := func(s *txsync) {
121                 // Fill pack with transactions up to the target size.
122                 size := common.StorageSize(0)
123                 pack.p = s.p
124                 pack.txs = pack.txs[:0]
125                 for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
126                         pack.txs = append(pack.txs, s.txs[i])
127                         size += common.StorageSize(s.txs[i].SerializedSize)
128                 }
129                 // Remove the transactions that will be sent.
130                 s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
131                 if len(s.txs) == 0 {
132                         delete(pending, s.p.swPeer.Key)
133                 }
134                 // Send the pack in the background.
135                 log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
136                 sending = true
137                 go func() { done <- pack.p.SendTransactions(pack.txs) }()
138         }
139
140         // pick chooses the next pending sync.
141         pick := func() *txsync {
142                 if len(pending) == 0 {
143                         return nil
144                 }
145                 n := rand.Intn(len(pending)) + 1
146                 for _, s := range pending {
147                         if n--; n == 0 {
148                                 return s
149                         }
150                 }
151                 return nil
152         }
153
154         for {
155                 select {
156                 case s := <-sm.txSyncCh:
157                         pending[s.p.swPeer.Key] = s
158                         if !sending {
159                                 send(s)
160                         }
161                 case err := <-done:
162                         sending = false
163                         // Stop tracking peers that cause send failures.
164                         if err != nil {
165                                 log.Info("Transaction send failed", "err", err)
166                                 delete(pending, pack.p.swPeer.Key)
167                         }
168                         // Schedule the next send.
169                         if s := pick(); s != nil {
170                                 send(s)
171                         }
172                 case <-sm.quitSync:
173                         return
174                 }
175         }
176 }