OSDN Git Service

add txCh for synmanager
[bytom/bytom.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/errors"
9         "github.com/bytom/p2p"
10         "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc/types"
12 )
13
14 const (
15         maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16         maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
17
18         syncTimeout        = 30 * time.Second
19         requestRetryTicker = 15 * time.Second
20
21         maxBlocksPending = 1024
22         maxtxsPending    = 32768
23         maxQuitReq       = 256
24
25         // txChanSize is the size of channel listening to Txpool newTxCh.
26         maxTxChanSize = 1000
27 )
28
29 var (
30         errGetBlockTimeout = errors.New("Get block Timeout")
31         errPeerDropped     = errors.New("Peer dropped")
32         errGetBlockByHash  = errors.New("Get block by hash error")
33         errBroadcastStatus = errors.New("Broadcast new status block error")
34         errReqBlock        = errors.New("Request block error")
35         errPeerNotRegister = errors.New("peer is not registered")
36 )
37
38 //TODO: add retry mechanism
39 type blockKeeper struct {
40         chain *protocol.Chain
41         sw    *p2p.Switch
42         peers *peerSet
43
44         pendingProcessCh chan *blockPending
45         txsProcessCh     chan *txsNotify
46         quitReqBlockCh   chan *string
47 }
48
49 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
50         bk := &blockKeeper{
51                 chain:            chain,
52                 sw:               sw,
53                 peers:            peers,
54                 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
55                 txsProcessCh:     make(chan *txsNotify, maxtxsPending),
56                 quitReqBlockCh:   quitReqBlockCh,
57         }
58         go bk.txsProcessWorker()
59         return bk
60 }
61
62 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
63         bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
64 }
65
66 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
67         bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
68 }
69
70 func (bk *blockKeeper) IsCaughtUp() bool {
71         _, height := bk.peers.BestPeer()
72         return bk.chain.BestBlockHeight() < height
73 }
74
75 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
76         num := bk.chain.BestBlockHeight() + 1
77         currentHash := bk.chain.BestBlockHash()
78         orphanNum := uint64(0)
79         reqNum := uint64(0)
80         isOrphan := false
81         bkPeer, ok := bk.peers.Peer(peerID)
82         if !ok {
83                 log.Info("peer is not registered")
84                 return errPeerNotRegister
85         }
86         swPeer := bkPeer.getPeer()
87         for 0 < num && num <= maxPeerHeight {
88                 if isOrphan {
89                         reqNum = orphanNum
90                 } else {
91                         reqNum = num
92                 }
93                 block, err := bk.BlockRequest(peerID, reqNum)
94                 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
95                         log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
96                         if bkPeer == nil {
97                                 log.Info("peer is not registered")
98                                 break
99                         }
100                         log.Info("Block keeper request block error. Stop peer.")
101                         bk.sw.StopPeerGracefully(swPeer)
102                         break
103                 }
104                 isOrphan, err = bk.chain.ProcessBlock(block)
105                 if err != nil {
106                         if bkPeer == nil {
107                                 log.Info("peer is deleted")
108                                 break
109                         }
110                         if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
111                                 bk.sw.AddBannedPeer(swPeer)
112                                 bk.sw.StopPeerGracefully(swPeer)
113                         }
114                         log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
115                         break
116                 }
117                 if isOrphan {
118                         orphanNum = block.Height - 1
119                         continue
120                 }
121                 num++
122         }
123         bestHash := bk.chain.BestBlockHash()
124         log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
125         if currentHash.String() != bestHash.String() {
126                 log.Info("Broadcast new chain status.")
127
128                 block, err := bk.chain.GetBlockByHash(bestHash)
129                 if err != nil {
130                         log.Errorf("Failed on sync complete broadcast status get block %v", err)
131                         return errGetBlockByHash
132                 }
133
134                 peers, err := bk.peers.BroadcastNewStatus(block)
135                 if err != nil {
136                         log.Errorf("Failed on broadcast new status block %v", err)
137                         return errBroadcastStatus
138                 }
139                 for _, peer := range peers {
140                         if peer == nil {
141                                 return errPeerNotRegister
142                         }
143                         swPeer := peer.getPeer()
144                         log.Info("Block keeper broadcast block error. Stop peer.")
145                         bk.sw.StopPeerGracefully(swPeer)
146                 }
147         }
148         return nil
149 }
150
151 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
152         return bk.peers.requestBlockByHeight(peerID, height)
153 }
154
155 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
156         var block *types.Block
157
158         if err := bk.blockRequest(peerID, height); err != nil {
159                 return nil, errReqBlock
160         }
161         retryTicker := time.Tick(requestRetryTicker)
162         syncWait := time.NewTimer(syncTimeout)
163
164         for {
165                 select {
166                 case pendingResponse := <-bk.pendingProcessCh:
167                         block = pendingResponse.block
168                         if pendingResponse.peerID != peerID {
169                                 log.Warning("From different peer")
170                                 continue
171                         }
172                         if block.Height != height {
173                                 log.Warning("Block height error")
174                                 continue
175                         }
176                         return block, nil
177                 case <-retryTicker:
178                         if err := bk.blockRequest(peerID, height); err != nil {
179                                 return nil, errReqBlock
180                         }
181                 case <-syncWait.C:
182                         log.Warning("Request block timeout")
183                         return nil, errGetBlockTimeout
184                 case peerid := <-bk.quitReqBlockCh:
185                         if *peerid == peerID {
186                                 log.Info("Quite block request worker")
187                                 return nil, errPeerDropped
188                         }
189                 }
190         }
191 }
192
193 func (bk *blockKeeper) txsProcessWorker() {
194         for txsResponse := range bk.txsProcessCh {
195                 tx := txsResponse.tx
196                 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
197                 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
198                 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
199                         if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
200                                 swPeer := bkPeer.getPeer()
201                                 if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
202                                         bk.sw.AddBannedPeer(swPeer)
203                                         bk.sw.StopPeerGracefully(swPeer)
204                                 }
205                         }
206                 }
207         }
208 }