OSDN Git Service

fix ban node failed (#256)
[bytom/vapor.git] / netsync / chainmgr / block_keeper.go
1 package chainmgr
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/vapor/consensus"
9         "github.com/vapor/errors"
10         "github.com/vapor/netsync/peers"
11         "github.com/vapor/p2p/security"
12         "github.com/vapor/protocol/bc"
13         "github.com/vapor/protocol/bc/types"
14 )
15
16 const (
17         syncCycle = 5 * time.Second
18
19         noNeedSync = iota
20         fastSyncType
21         regularSyncType
22 )
23
24 var (
25         syncTimeout = 30 * time.Second
26
27         errRequestTimeout = errors.New("request timeout")
28         errPeerDropped    = errors.New("Peer dropped")
29 )
30
31 type FastSync interface {
32         locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error)
33         locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error)
34         process() error
35         setSyncPeer(peer *peers.Peer)
36 }
37
38 type Fetcher interface {
39         processBlock(peerID string, block *types.Block)
40         processBlocks(peerID string, blocks []*types.Block)
41         processHeaders(peerID string, headers []*types.BlockHeader)
42         requireBlock(peerID string, height uint64) (*types.Block, error)
43 }
44
45 type blockMsg struct {
46         block  *types.Block
47         peerID string
48 }
49
50 type blocksMsg struct {
51         blocks []*types.Block
52         peerID string
53 }
54
55 type headersMsg struct {
56         headers []*types.BlockHeader
57         peerID  string
58 }
59
60 type blockKeeper struct {
61         chain      Chain
62         fastSync   FastSync
63         msgFetcher Fetcher
64         peers      *peers.PeerSet
65         syncPeer   *peers.Peer
66
67         quit chan struct{}
68 }
69
70 func newBlockKeeper(chain Chain, peers *peers.PeerSet) *blockKeeper {
71         msgFetcher := newMsgFetcher(peers)
72         return &blockKeeper{
73                 chain:      chain,
74                 fastSync:   newFastSync(chain, msgFetcher, peers),
75                 msgFetcher: msgFetcher,
76                 peers:      peers,
77                 quit:       make(chan struct{}),
78         }
79 }
80
81 func (bk *blockKeeper) locateBlocks(locator []*bc.Hash, stopHash *bc.Hash) ([]*types.Block, error) {
82         return bk.fastSync.locateBlocks(locator, stopHash)
83 }
84
85 func (bk *blockKeeper) locateHeaders(locator []*bc.Hash, stopHash *bc.Hash, skip uint64, maxNum uint64) ([]*types.BlockHeader, error) {
86         return bk.fastSync.locateHeaders(locator, stopHash, skip, maxNum)
87 }
88
89 func (bk *blockKeeper) processBlock(peerID string, block *types.Block) {
90         bk.msgFetcher.processBlock(peerID, block)
91 }
92
93 func (bk *blockKeeper) processBlocks(peerID string, blocks []*types.Block) {
94         bk.msgFetcher.processBlocks(peerID, blocks)
95 }
96
97 func (bk *blockKeeper) processHeaders(peerID string, headers []*types.BlockHeader) {
98         bk.msgFetcher.processHeaders(peerID, headers)
99 }
100
101 func (bk *blockKeeper) regularBlockSync() error {
102         peerHeight := bk.syncPeer.Height()
103         bestHeight := bk.chain.BestBlockHeight()
104         i := bestHeight + 1
105         for i <= peerHeight {
106                 block, err := bk.msgFetcher.requireBlock(bk.syncPeer.ID(), i)
107                 if err != nil {
108                         bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelConnException, err)
109                         return err
110                 }
111
112                 isOrphan, err := bk.chain.ProcessBlock(block)
113                 if err != nil {
114                         bk.peers.ErrorHandler(bk.syncPeer.ID(), security.LevelMsgIllegal, err)
115                         return err
116                 }
117
118                 if isOrphan {
119                         i--
120                         continue
121                 }
122                 i = bk.chain.BestBlockHeight() + 1
123         }
124         log.WithFields(log.Fields{"module": logModule, "height": bk.chain.BestBlockHeight()}).Info("regular sync success")
125         return nil
126 }
127
128 func (bk *blockKeeper) start() {
129         go bk.syncWorker()
130 }
131
132 func (bk *blockKeeper) checkSyncType() int {
133         peer := bk.peers.BestIrreversiblePeer(consensus.SFFullNode | consensus.SFFastSync)
134         if peer == nil {
135                 log.WithFields(log.Fields{"module": logModule}).Debug("can't find fast sync peer")
136                 return noNeedSync
137         }
138
139         bestHeight := bk.chain.BestBlockHeight()
140
141         if peerIrreversibleHeight := peer.IrreversibleHeight(); peerIrreversibleHeight >= bestHeight+minGapStartFastSync {
142                 bk.fastSync.setSyncPeer(peer)
143                 return fastSyncType
144         }
145
146         peer = bk.peers.BestPeer(consensus.SFFullNode)
147         if peer == nil {
148                 log.WithFields(log.Fields{"module": logModule}).Debug("can't find sync peer")
149                 return noNeedSync
150         }
151
152         peerHeight := peer.Height()
153         if peerHeight > bestHeight {
154                 bk.syncPeer = peer
155                 return regularSyncType
156         }
157
158         return noNeedSync
159 }
160
161 func (bk *blockKeeper) startSync() bool {
162         switch bk.checkSyncType() {
163         case fastSyncType:
164                 if err := bk.fastSync.process(); err != nil {
165                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("failed on fast sync")
166                         return false
167                 }
168         case regularSyncType:
169                 if err := bk.regularBlockSync(); err != nil {
170                         log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on regularBlockSync")
171                         return false
172                 }
173         }
174
175         return true
176 }
177
178 func (bk *blockKeeper) stop() {
179         close(bk.quit)
180 }
181
182 func (bk *blockKeeper) syncWorker() {
183         syncTicker := time.NewTicker(syncCycle)
184         defer syncTicker.Stop()
185
186         for {
187                 select {
188                 case <-syncTicker.C:
189                         if update := bk.startSync(); !update {
190                                 continue
191                         }
192
193                         if err := bk.peers.BroadcastNewStatus(bk.chain.BestBlockHeader(), bk.chain.LastIrreversibleHeader()); err != nil {
194                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on syncWorker broadcast new status")
195                         }
196                 case <-bk.quit:
197                         return
198                 }
199         }
200 }