OSDN Git Service

Optimized code format
[bytom/bytom.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/errors"
9         "github.com/bytom/p2p"
10         "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc/types"
12 )
13
14 const (
15         maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16         maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
17
18         syncTimeout        = 30 * time.Second
19         requestRetryTicker = 15 * time.Second
20
21         maxBlocksPending = 1024
22         maxtxsPending    = 32768
23         maxQuitReq       = 256
24 )
25
26 var (
27         errGetBlockTimeout = errors.New("Get block Timeout")
28         errPeerDropped     = errors.New("Peer dropped")
29         errGetBlockByHash  = errors.New("Get block by hash error")
30         errBroadcastStatus = errors.New("Broadcast new status block error")
31         errReqBlock        = errors.New("Request block error")
32 )
33
34 //TODO: add retry mechanism
35 type blockKeeper struct {
36         chain *protocol.Chain
37         sw    *p2p.Switch
38         peers *peerSet
39
40         pendingProcessCh chan *blockPending
41         txsProcessCh     chan *txsNotify
42         quitReqBlockCh   chan *string
43 }
44
45 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
46         bk := &blockKeeper{
47                 chain:            chain,
48                 sw:               sw,
49                 peers:            peers,
50                 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
51                 txsProcessCh:     make(chan *txsNotify, maxtxsPending),
52                 quitReqBlockCh:   quitReqBlockCh,
53         }
54         go bk.txsProcessWorker()
55         return bk
56 }
57
58 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
59         bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
60 }
61
62 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
63         bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
64 }
65
66 func (bk *blockKeeper) IsCaughtUp() bool {
67         _, height := bk.peers.BestPeer()
68         return bk.chain.BestBlockHeight() < height
69 }
70
71 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
72         num := bk.chain.BestBlockHeight() + 1
73         currentHash := bk.chain.BestBlockHash()
74         orphanNum := uint64(0)
75         reqNum := uint64(0)
76         isOrphan := false
77         for num <= maxPeerHeight && num > 0 {
78                 if isOrphan {
79                         reqNum = orphanNum
80                 } else {
81                         reqNum = num
82                 }
83                 block, err := bk.BlockRequest(peerID, reqNum)
84                 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
85                         log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
86                         peer := bk.peers.Peer(peerID)
87                         if peer == nil {
88                                 log.Info("peer is not registered")
89                                 break
90                         }
91                         log.Info("Peer communication abnormality")
92                         bk.sw.StopPeerGracefully(peer.Peer)
93                         break
94                 }
95                 isOrphan, err = bk.chain.ProcessBlock(block)
96                 if err != nil {
97                         bk.sw.AddScamPeer(bk.peers.Peer(peerID).getPeer())
98                         log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
99                         break
100                 }
101                 if isOrphan {
102                         orphanNum = block.Height - 1
103                         continue
104                 }
105                 num++
106         }
107         bestHash := bk.chain.BestBlockHash()
108         log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
109         if currentHash.String() != bestHash.String() {
110                 log.Info("Broadcast new chain status.")
111
112                 block, err := bk.chain.GetBlockByHash(bestHash)
113                 if err != nil {
114                         log.Errorf("Failed on sync complete broadcast status get block %v", err)
115                         return errGetBlockByHash
116                 }
117
118                 if err := bk.peers.BroadcastNewStatus(block); err != nil {
119                         log.Errorf("Failed on broadcast new status block %v", err)
120                         return errBroadcastStatus
121                 }
122         }
123         return nil
124 }
125
126 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
127         return bk.peers.requestBlockByHeight(peerID, height)
128 }
129
130 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
131         var block *types.Block
132
133         if err := bk.blockRequest(peerID, height); err != nil {
134                 return nil, errReqBlock
135         }
136         retryTicker := time.Tick(requestRetryTicker)
137         syncWait := time.NewTimer(syncTimeout)
138
139         for {
140                 select {
141                 case pendingResponse := <-bk.pendingProcessCh:
142                         block = pendingResponse.block
143                         if pendingResponse.peerID != peerID {
144                                 log.Warning("From different peer")
145                                 continue
146                         }
147                         if block.Height != height {
148                                 log.Warning("Block height error")
149                                 continue
150                         }
151                         return block, nil
152                 case <-retryTicker:
153                         if err := bk.blockRequest(peerID, height); err != nil {
154                                 return nil, errReqBlock
155                         }
156                 case <-syncWait.C:
157                         log.Warning("Request block timeout")
158                         return nil, errGetBlockTimeout
159                 case peerid := <-bk.quitReqBlockCh:
160                         if *peerid == peerID {
161                                 log.Info("Quite block request worker")
162                                 return nil, errPeerDropped
163                         }
164                 }
165         }
166 }
167
168 func (bk *blockKeeper) txsProcessWorker() {
169         for txsResponse := range bk.txsProcessCh {
170                 tx := txsResponse.tx
171                 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
172                 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
173                 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
174                         bk.sw.AddScamPeer(bk.peers.Peer(txsResponse.peerID).getPeer())
175                 }
176         }
177 }