OSDN Git Service

versoin1.1.9 (#594)
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
1 package chainmgr
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/vapor/consensus"
9         dbm "github.com/bytom/vapor/database/leveldb"
10         "github.com/bytom/vapor/errors"
11         "github.com/bytom/vapor/netsync/peers"
12         "github.com/bytom/vapor/p2p/security"
13         "github.com/bytom/vapor/protocol"
14         "github.com/bytom/vapor/protocol/bc"
15         "github.com/bytom/vapor/protocol/bc/types"
16 )
17
18 const (
19         syncCycle = 5 * time.Second
20
21         noNeedSync = iota
22         fastSyncType
23         regularSyncType
24 )
25
26 var (
27         maxNumOfBlocksPerMsg      = uint64(64)
28         maxNumOfHeadersPerMsg     = uint64(1000)
29         maxNumOfBlocksRegularSync = uint64(128)
30 )
31
32 // Fetcher is the interface for fetch struct
33 type Fetcher interface {
34         processBlock(peerID string, block *types.Block)
35         processBlocks(peerID string, blocks []*types.Block)
36         processHeaders(peerID string, headers []*types.BlockHeader)
37         requireBlock(peerID string, height uint64) (*types.Block, error)
38 }
39
40 type blockMsg struct {
41         block  *types.Block
42         peerID string
43 }
44
45 type blocksMsg struct {
46         blocks []*types.Block
47         peerID string
48 }
49
50 type headersMsg struct {
51         headers []*types.BlockHeader
52         peerID  string
53 }
54
55 type blockKeeper struct {
56         chain      Chain
57         fastSync   *fastSync
58         msgFetcher Fetcher
59         peers      *peers.PeerSet
60         syncPeer   *peers.Peer
61
62         quit chan struct{}
63 }
64
65 func newBlockKeeper(chain Chain, peers *peers.PeerSet, fastSyncDB dbm.DB) *blockKeeper {
66         storage := newStorage(fastSyncDB)
67         msgFetcher := newMsgFetcher(storage, peers)
68         return &blockKeeper{
69                 chain:      chain,
70                 fastSync:   newFastSync(chain, msgFetcher, storage, peers),
71                 msgFetcher: msgFetcher,
72                 peers:      peers,
73                 quit:       make(chan struct{}),
74         }
75 }
76
77 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash, isTimeout func() bool) ([]*types.Block, error) {
78         headers, err := bk.locateHeaders(locator, stopHash, 0, maxNumOfBlocksPerMsg)
79         if err != nil {
80                 return nil, err
81         }
82
83         blocks := []*types.Block{}
84         for _, header := range headers {
85                 headerHash := header.Hash()
86                 block, err := bk.chain.GetBlockByHash(&headerHash)
87                 if err != nil {
88                         return nil, err
89                 }
90
91                 blocks = append(blocks, block)
92                 if isTimeout() {
93                         break
94                 }
95         }
96         return blocks, nil
97 }
98
99 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
100         startHeader, err := bk.chain.GetHeaderByHeight(0)
101         if err != nil {
102                 return nil, err
103         }
104
105         for _, hash := range locator {
106                 header, err := bk.chain.GetHeaderByHash(hash)
107                 if err == nil && bk.chain.InMainChain(header.Hash()) {
108                         startHeader = header
109                         break
110                 }
111         }
112
113         headers := make([]*types.BlockHeader, 0)
114         stopHeader, err := bk.chain.GetHeaderByHash(stopHash)
115         if err != nil {
116                 return headers, err
117         }
118
119         if !bk.chain.InMainChain(*stopHash) || stopHeader.Height < startHeader.Height {
120                 return headers, nil
121         }
122
123         headers = append(headers, startHeader)
124         if stopHeader.Height == startHeader.Height {
125                 return headers, nil
126         }
127
128         for num, index := uint64(0), startHeader.Height; num < maxNum-1; num++ {
129                 index += skip + 1
130                 if index >= stopHeader.Height {
131                         headers = append(headers, stopHeader)
132                         break
133                 }
134
135                 header, err := bk.chain.GetHeaderByHeight(index)
136                 if err != nil {
137                         return nil, err
138                 }
139
140                 headers = append(headers, header)
141         }
142
143         return headers, nil
144 }
145
146 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
147         bk.msgFetcher.processBlock(peerID, block)
148 }
149
150 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
151         bk.msgFetcher.processBlocks(peerID, blocks)
152 }
153
154 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
155         bk.msgFetcher.processHeaders(peerID, headers)
156 }
157
158 func (bk *blockKeeper) regularBlockSync() error {
159         peerHeight := bk.syncPeer.Height()
160         bestHeight := bk.chain.BestBlockHeight()
161         targetHeight := bestHeight + maxNumOfBlocksRegularSync
162         if targetHeight > peerHeight {
163                 targetHeight = peerHeight
164         }
165
166         for i := bestHeight + 1; i <= targetHeight; {
167                 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
168                 if err != nil {
169                         bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelConnException, err.Error())
170                         return err
171                 }
172
173                 isOrphan, err := bk.chain.ProcessBlock(block)
174                 if err != nil {
175                         if errors.Root(err) != protocol.ErrDoubleSignBlock {
176                                 bk.peers.ProcessIllegal(bk.syncPeer.ID(), security.LevelMsgIllegal, err.Error())
177                         }
178                         return err
179                 }
180
181                 if isOrphan {
182                         i--
183                         continue
184                 }
185
186                 //This code is used to preventing the sync peer return a dust block which will not change the node's chain status
187                 if bestHeight = bk.chain.BestBlockHeight(); i == bestHeight+1 {
188                         log.WithFields(log.Fields{"module": logModule, "height": i}).Warn("stop regular sync due to loop sync same height")
189                         return nil
190                 }
191
192                 i = bestHeight + 1
193         }
194         log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
195         return nil
196 }
197
198 func (bk *blockKeeper) start() {
199         go bk.syncWorker()
200 }
201
202 func (bk *blockKeeper) checkSyncType() int {
203         bestHeight := bk.chain.BestBlockHeight()
204         peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
205         if peer != nil {
206                 if peerIrreversibleHeight := peer.Height(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
207                         bk.fastSync.setSyncPeer(peer)
208                         return fastSyncType
209                 }
210         }
211
212         peer = bk.peers.BestPeer(consensus.SFFullNode)
213         if peer == nil {
214                 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
215                 return noNeedSync
216         }
217
218         if peer.Height() > bestHeight {
219                 bk.syncPeer = peer
220                 return regularSyncType
221         }
222
223         return noNeedSync
224 }
225
226 func (bk *blockKeeper) startSync() bool {
227         switch bk.checkSyncType() {
228         case fastSyncType:
229                 if err := bk.fastSync.process(); err != nil {
230                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
231                         return false
232                 }
233         case regularSyncType:
234                 if err := bk.regularBlockSync(); err != nil {
235                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
236                         return false
237                 }
238         default:
239                 return false
240         }
241
242         return true
243 }
244
245 func (bk *blockKeeper) stop() {
246         close(bk.quit)
247 }
248
249 func (bk *blockKeeper) syncWorker() {
250         syncTicker := time.NewTicker(syncCycle)
251         defer syncTicker.Stop()
252
253         for {
254                 select {
255                 case <-syncTicker.C:
256                         if update := bk.startSync(); !update {
257                                 continue
258                         }
259
260                         if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
261                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
262                         }
263                 case <-bk.quit:
264                         return
265                 }
266         }
267 }