OSDN Git Service

Merge pull request #935 from Bytom/dev
[bytom/bytom.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/errors"
9         "github.com/bytom/p2p"
10         "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc/types"
12 )
13
14 const (
15         maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16         maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
17
18         syncTimeout        = 30 * time.Second
19         requestRetryTicker = 15 * time.Second
20
21         maxBlocksPending = 1024
22         maxtxsPending    = 32768
23         maxQuitReq       = 256
24 )
25
26 var (
27         errGetBlockTimeout = errors.New("Get block Timeout")
28         errPeerDropped     = errors.New("Peer dropped")
29         errGetBlockByHash  = errors.New("Get block by hash error")
30         errBroadcastStatus = errors.New("Broadcast new status block error")
31         errReqBlock        = errors.New("Request block error")
32         errPeerNotRegister = errors.New("peer is not registered")
33 )
34
35 //TODO: add retry mechanism
36 type blockKeeper struct {
37         chain *protocol.Chain
38         sw    *p2p.Switch
39         peers *peerSet
40
41         pendingProcessCh chan *blockPending
42         txsProcessCh     chan *txsNotify
43         quitReqBlockCh   chan *string
44 }
45
46 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
47         bk := &blockKeeper{
48                 chain:            chain,
49                 sw:               sw,
50                 peers:            peers,
51                 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
52                 txsProcessCh:     make(chan *txsNotify, maxtxsPending),
53                 quitReqBlockCh:   quitReqBlockCh,
54         }
55         go bk.txsProcessWorker()
56         return bk
57 }
58
59 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
60         bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
61 }
62
63 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
64         bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
65 }
66
67 func (bk *blockKeeper) IsCaughtUp() bool {
68         _, height := bk.peers.BestPeer()
69         return bk.chain.BestBlockHeight() < height
70 }
71
72 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
73         num := bk.chain.BestBlockHeight() + 1
74         currentHash := bk.chain.BestBlockHash()
75         orphanNum := uint64(0)
76         reqNum := uint64(0)
77         isOrphan := false
78         bkPeer, ok := bk.peers.Peer(peerID)
79         if !ok {
80                 log.Info("peer is not registered")
81                 return errPeerNotRegister
82         }
83         swPeer := bkPeer.getPeer()
84         for 0 < num && num <= maxPeerHeight {
85                 if isOrphan {
86                         reqNum = orphanNum
87                 } else {
88                         reqNum = num
89                 }
90                 block, err := bk.BlockRequest(peerID, reqNum)
91                 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
92                         log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
93                         if bkPeer == nil {
94                                 log.Info("peer is not registered")
95                                 break
96                         }
97                         log.Info("Block keeper request block error. Stop peer.")
98                         bk.sw.StopPeerGracefully(swPeer)
99                         break
100                 }
101                 isOrphan, err = bk.chain.ProcessBlock(block)
102                 if err != nil {
103                         if bkPeer == nil {
104                                 log.Info("peer is deleted")
105                                 break
106                         }
107                         if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
108                                 bk.sw.AddBannedPeer(swPeer)
109                                 bk.sw.StopPeerGracefully(swPeer)
110                         }
111                         log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
112                         break
113                 }
114                 if isOrphan {
115                         orphanNum = block.Height - 1
116                         continue
117                 }
118                 num++
119         }
120         bestHash := bk.chain.BestBlockHash()
121         log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
122         if currentHash.String() != bestHash.String() {
123                 log.Info("Broadcast new chain status.")
124
125                 block, err := bk.chain.GetBlockByHash(bestHash)
126                 if err != nil {
127                         log.Errorf("Failed on sync complete broadcast status get block %v", err)
128                         return errGetBlockByHash
129                 }
130
131                 peers, err := bk.peers.BroadcastNewStatus(block)
132                 if err != nil {
133                         log.Errorf("Failed on broadcast new status block %v", err)
134                         return errBroadcastStatus
135                 }
136                 for _, peer := range peers {
137                         if peer == nil {
138                                 return errPeerNotRegister
139                         }
140                         swPeer := peer.getPeer()
141                         log.Info("Block keeper broadcast block error. Stop peer.")
142                         bk.sw.StopPeerGracefully(swPeer)
143                 }
144         }
145         return nil
146 }
147
148 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
149         return bk.peers.requestBlockByHeight(peerID, height)
150 }
151
152 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
153         var block *types.Block
154
155         if err := bk.blockRequest(peerID, height); err != nil {
156                 return nil, errReqBlock
157         }
158         retryTicker := time.Tick(requestRetryTicker)
159         syncWait := time.NewTimer(syncTimeout)
160
161         for {
162                 select {
163                 case pendingResponse := <-bk.pendingProcessCh:
164                         block = pendingResponse.block
165                         if pendingResponse.peerID != peerID {
166                                 log.Warning("From different peer")
167                                 continue
168                         }
169                         if block.Height != height {
170                                 log.Warning("Block height error")
171                                 continue
172                         }
173                         return block, nil
174                 case <-retryTicker:
175                         if err := bk.blockRequest(peerID, height); err != nil {
176                                 return nil, errReqBlock
177                         }
178                 case <-syncWait.C:
179                         log.Warning("Request block timeout")
180                         return nil, errGetBlockTimeout
181                 case peerid := <-bk.quitReqBlockCh:
182                         if *peerid == peerID {
183                                 log.Info("Quite block request worker")
184                                 return nil, errPeerDropped
185                         }
186                 }
187         }
188 }
189
190 func (bk *blockKeeper) txsProcessWorker() {
191         for txsResponse := range bk.txsProcessCh {
192                 tx := txsResponse.tx
193                 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
194                 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
195                 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
196                         if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
197                                 swPeer := bkPeer.getPeer()
198                                 if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
199                                         bk.sw.AddBannedPeer(swPeer)
200                                         bk.sw.StopPeerGracefully(swPeer)
201                                 }
202                         }
203                 }
204         }
205 }