OSDN Git Service

Change p2p messge send to trysend
[bytom/bytom.git] / netsync / block_keeper.go
1 package netsync
2
3 import (
4         "time"
5
6         log "github.com/sirupsen/logrus"
7
8         "github.com/bytom/errors"
9         "github.com/bytom/p2p"
10         "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc/types"
12 )
13
14 const (
15         maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
16         maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
17
18         syncTimeout        = 30 * time.Second
19         requestRetryTicker = 15 * time.Second
20
21         maxBlocksPending = 1024
22         maxtxsPending    = 32768
23         maxQuitReq       = 256
24 )
25
26 var (
27         errGetBlockTimeout = errors.New("Get block Timeout")
28         errPeerDropped     = errors.New("Peer dropped")
29         errGetBlockByHash  = errors.New("Get block by hash error")
30         errBroadcastStatus = errors.New("Broadcast new status block error")
31         errReqBlock        = errors.New("Request block error")
32         errPeerNotRegister = errors.New("peer is not registered")
33 )
34
35 //TODO: add retry mechanism
36 type blockKeeper struct {
37         chain *protocol.Chain
38         sw    *p2p.Switch
39         peers *peerSet
40
41         pendingProcessCh chan *blockPending
42         txsProcessCh     chan *txsNotify
43         quitReqBlockCh   chan *string
44 }
45
46 func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
47         bk := &blockKeeper{
48                 chain:            chain,
49                 sw:               sw,
50                 peers:            peers,
51                 pendingProcessCh: make(chan *blockPending, maxBlocksPending),
52                 txsProcessCh:     make(chan *txsNotify, maxtxsPending),
53                 quitReqBlockCh:   quitReqBlockCh,
54         }
55         go bk.txsProcessWorker()
56         return bk
57 }
58
59 func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
60         bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
61 }
62
63 func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
64         bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
65 }
66
67 func (bk *blockKeeper) IsCaughtUp() bool {
68         _, height := bk.peers.BestPeer()
69         return bk.chain.BestBlockHeight() < height
70 }
71
72 func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
73         num := bk.chain.BestBlockHeight() + 1
74         currentHash := bk.chain.BestBlockHash()
75         orphanNum := uint64(0)
76         reqNum := uint64(0)
77         isOrphan := false
78         bkPeer, ok := bk.peers.Peer(peerID)
79         if !ok {
80                 log.Info("peer is not registered")
81                 return errPeerNotRegister
82         }
83         swPeer := bkPeer.getPeer()
84         for num <= maxPeerHeight && num > 0 {
85                 if isOrphan {
86                         reqNum = orphanNum
87                 } else {
88                         reqNum = num
89                 }
90                 block, err := bk.BlockRequest(peerID, reqNum)
91                 if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
92                         log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
93                         if bkPeer == nil {
94                                 log.Info("peer is not registered")
95                                 break
96                         }
97                         if ban := bkPeer.addBanScore(0, 50, "block request error"); ban {
98                                 bk.sw.AddBannedPeer(swPeer)
99                         }
100                         bk.sw.StopPeerGracefully(swPeer)
101                         break
102                 }
103                 isOrphan, err = bk.chain.ProcessBlock(block)
104                 if err != nil {
105                         if bkPeer == nil {
106                                 log.Info("peer is deleted")
107                                 break
108                         }
109                         if ban := bkPeer.addBanScore(20, 0, "block process error"); ban {
110                                 bk.sw.AddBannedPeer(swPeer)
111                                 bk.sw.StopPeerGracefully(swPeer)
112                         }
113                         log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
114                         break
115                 }
116                 if isOrphan {
117                         orphanNum = block.Height - 1
118                         continue
119                 }
120                 num++
121         }
122         bestHash := bk.chain.BestBlockHash()
123         log.Info("Block sync complete. height:", bk.chain.BestBlockHeight(), " hash:", bestHash)
124         if currentHash.String() != bestHash.String() {
125                 log.Info("Broadcast new chain status.")
126
127                 block, err := bk.chain.GetBlockByHash(bestHash)
128                 if err != nil {
129                         log.Errorf("Failed on sync complete broadcast status get block %v", err)
130                         return errGetBlockByHash
131                 }
132
133                 peers, err := bk.peers.BroadcastNewStatus(block)
134                 if err != nil {
135                         log.Errorf("Failed on broadcast new status block %v", err)
136                         return errBroadcastStatus
137                 }
138                 for _, peer := range peers {
139                         if peer == nil {
140                                 return errPeerNotRegister
141                         }
142                         swPeer := peer.getPeer()
143                         if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
144                                 bk.sw.AddBannedPeer(swPeer)
145                                 bk.sw.StopPeerGracefully(swPeer)
146                         }
147                 }
148         }
149         return nil
150 }
151
152 func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
153         return bk.peers.requestBlockByHeight(peerID, height)
154 }
155
156 func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
157         var block *types.Block
158
159         if err := bk.blockRequest(peerID, height); err != nil {
160                 return nil, errReqBlock
161         }
162         retryTicker := time.Tick(requestRetryTicker)
163         syncWait := time.NewTimer(syncTimeout)
164
165         for {
166                 select {
167                 case pendingResponse := <-bk.pendingProcessCh:
168                         block = pendingResponse.block
169                         if pendingResponse.peerID != peerID {
170                                 log.Warning("From different peer")
171                                 continue
172                         }
173                         if block.Height != height {
174                                 log.Warning("Block height error")
175                                 continue
176                         }
177                         return block, nil
178                 case <-retryTicker:
179                         if err := bk.blockRequest(peerID, height); err != nil {
180                                 return nil, errReqBlock
181                         }
182                 case <-syncWait.C:
183                         log.Warning("Request block timeout")
184                         return nil, errGetBlockTimeout
185                 case peerid := <-bk.quitReqBlockCh:
186                         if *peerid == peerID {
187                                 log.Info("Quite block request worker")
188                                 return nil, errPeerDropped
189                         }
190                 }
191         }
192 }
193
194 func (bk *blockKeeper) txsProcessWorker() {
195         for txsResponse := range bk.txsProcessCh {
196                 tx := txsResponse.tx
197                 log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
198                 bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
199                 if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
200                         if bkPeer, ok := bk.peers.Peer(txsResponse.peerID); ok {
201                                 swPeer := bkPeer.getPeer()
202                                 if ban := bkPeer.addBanScore(10, 0, "tx error"); ban {
203                                         bk.sw.AddBannedPeer(swPeer)
204                                         bk.sw.StopPeerGracefully(swPeer)
205                                 }
206                         }
207                 }
208         }
209 }