OSDN Git Service

fix bug for end to end test
[bytom/bytom.git] / protocol / protocol.go
1 package protocol
2
3 import (
4         "context"
5         "sync"
6         "time"
7
8         "github.com/bytom/blockchain/txdb"
9         "github.com/bytom/errors"
10         "github.com/bytom/protocol/bc"
11         "github.com/bytom/protocol/bc/legacy"
12         "github.com/bytom/protocol/state"
13 )
14
15 // maxCachedValidatedTxs is the max number of validated txs to cache.
16 const maxCachedValidatedTxs = 1000
17
18 var (
19         // ErrTheDistantFuture is returned when waiting for a blockheight
20         // too far in excess of the tip of the blockchain.
21         ErrTheDistantFuture = errors.New("block height too far in future")
22 )
23
24 // Store provides storage for blockchain data: blocks and state tree
25 // snapshots.
26 //
27 // Note, this is different from a state snapshot. A state snapshot
28 // provides access to the state at a given point in time -- outputs
29 // and issuance memory. The Chain type uses Store to load state
30 // from storage and persist validated data.
31 type Store interface {
32         BlockExist(*bc.Hash) bool
33
34         GetBlock(*bc.Hash) (*legacy.Block, error)
35         GetMainchain(*bc.Hash) (map[uint64]*bc.Hash, error)
36         GetSnapshot(*bc.Hash) (*state.Snapshot, error)
37         GetStoreStatus() txdb.BlockStoreStateJSON
38
39         SaveBlock(*legacy.Block) error
40         SaveMainchain(map[uint64]*bc.Hash, *bc.Hash) error
41         SaveSnapshot(*state.Snapshot, *bc.Hash) error
42         SaveStoreStatus(uint64, *bc.Hash)
43 }
44
45 type OrphanManage struct {
46         //TODO: add orphan cached block limit
47         orphan     map[bc.Hash]*legacy.Block
48         preOrphans map[bc.Hash][]*bc.Hash
49         mtx        sync.RWMutex
50 }
51
52 func NewOrphanManage() *OrphanManage {
53         return &OrphanManage{
54                 orphan:     make(map[bc.Hash]*legacy.Block),
55                 preOrphans: make(map[bc.Hash][]*bc.Hash),
56         }
57 }
58
59 func (o *OrphanManage) BlockExist(hash *bc.Hash) bool {
60         o.mtx.RLock()
61         _, ok := o.orphan[*hash]
62         o.mtx.RUnlock()
63         return ok
64 }
65
66 func (o *OrphanManage) Add(block *legacy.Block) {
67         blockHash := block.Hash()
68         o.mtx.Lock()
69         defer o.mtx.Unlock()
70
71         if _, ok := o.orphan[blockHash]; ok {
72                 return
73         }
74
75         o.orphan[blockHash] = block
76         o.preOrphans[block.PreviousBlockHash] = append(o.preOrphans[block.PreviousBlockHash], &blockHash)
77 }
78
79 func (o *OrphanManage) Delete(hash *bc.Hash) {
80         o.mtx.Lock()
81         defer o.mtx.Unlock()
82         block, ok := o.orphan[*hash]
83         if !ok {
84                 return
85         }
86         delete(o.orphan, *hash)
87
88         preOrphans, ok := o.preOrphans[block.PreviousBlockHash]
89         if !ok || len(preOrphans) == 1 {
90                 delete(o.preOrphans, block.PreviousBlockHash)
91                 return
92         }
93
94         for i, preOrphan := range preOrphans {
95                 if preOrphan == hash {
96                         o.preOrphans[block.PreviousBlockHash] = append(preOrphans[:i], preOrphans[i+1:]...)
97                         return
98                 }
99         }
100 }
101
102 func (o *OrphanManage) Get(hash *bc.Hash) (*legacy.Block, bool) {
103         o.mtx.RLock()
104         block, ok := o.orphan[*hash]
105         o.mtx.RUnlock()
106         return block, ok
107 }
108
109 // Chain provides a complete, minimal blockchain database. It
110 // delegates the underlying storage to other objects, and uses
111 // validation logic from package validation to decide what
112 // objects can be safely stored.
113 type Chain struct {
114         InitialBlockHash  bc.Hash
115         MaxIssuanceWindow time.Duration // only used by generators
116
117         orphanManage *OrphanManage
118         txPool       *TxPool
119
120         state struct {
121                 cond      sync.Cond
122                 block     *legacy.Block
123                 height    uint64
124                 hash      *bc.Hash
125                 mainChain map[uint64]*bc.Hash
126                 snapshot  *state.Snapshot
127         }
128         store Store
129 }
130
131 // NewChain returns a new Chain using store as the underlying storage.
132 func NewChain(initialBlockHash bc.Hash, store Store, txPool *TxPool) (*Chain, error) {
133         c := &Chain{
134                 InitialBlockHash: initialBlockHash,
135                 orphanManage:     NewOrphanManage(),
136                 store:            store,
137                 txPool:           txPool,
138         }
139         c.state.cond.L = new(sync.Mutex)
140         storeStatus := store.GetStoreStatus()
141         c.state.height = storeStatus.Height
142
143         if c.state.height == 0 {
144                 c.state.snapshot = state.Empty()
145                 c.state.mainChain = make(map[uint64]*bc.Hash)
146                 return c, nil
147         }
148
149         c.state.hash = storeStatus.Hash
150         var err error
151         if c.state.block, err = store.GetBlock(storeStatus.Hash); err != nil {
152                 return nil, err
153         }
154         if c.state.snapshot, err = store.GetSnapshot(storeStatus.Hash); err != nil {
155                 return nil, err
156         }
157         if c.state.mainChain, err = store.GetMainchain(storeStatus.Hash); err != nil {
158                 return nil, err
159         }
160         return c, nil
161 }
162
163 // Height returns the current height of the blockchain.
164 func (c *Chain) Height() uint64 {
165         c.state.cond.L.Lock()
166         defer c.state.cond.L.Unlock()
167         return c.state.height
168 }
169
170 func (c *Chain) InMainchain(block *legacy.Block) bool {
171         c.state.cond.L.Lock()
172         hash, ok := c.state.mainChain[block.Height]
173         c.state.cond.L.Unlock()
174         if !ok {
175                 return false
176         }
177         return *hash == block.Hash()
178 }
179
180 // TimestampMS returns the latest known block timestamp.
181 func (c *Chain) TimestampMS() uint64 {
182         c.state.cond.L.Lock()
183         defer c.state.cond.L.Unlock()
184         if c.state.block == nil {
185                 return 0
186         }
187         return c.state.block.TimestampMS
188 }
189
190 // State returns the most recent state available. It will not be current
191 // unless the current process is the leader. Callers should examine the
192 // returned block header's height if they need to verify the current state.
193 func (c *Chain) State() (*legacy.Block, *state.Snapshot) {
194         c.state.cond.L.Lock()
195         defer c.state.cond.L.Unlock()
196         return c.state.block, c.state.snapshot
197 }
198
199 func (c *Chain) setState(block *legacy.Block, s *state.Snapshot, m map[uint64]*bc.Hash) error {
200         if block.AssetsMerkleRoot != s.Tree.RootHash() {
201                 return ErrBadStateRoot
202         }
203
204         c.state.cond.L.Lock()
205         blockHash := block.Hash()
206         c.state.block = block
207         c.state.height = block.Height
208         c.state.hash = &blockHash
209         c.state.snapshot = s
210         for k, v := range m {
211                 c.state.mainChain[k] = v
212         }
213         c.state.cond.L.Unlock()
214
215         if err := c.store.SaveSnapshot(c.state.snapshot, &blockHash); err != nil {
216                 return err
217         }
218         if err := c.store.SaveMainchain(c.state.mainChain, &blockHash); err != nil {
219                 return err
220         }
221         c.store.SaveStoreStatus(block.Height, &blockHash)
222
223         c.state.cond.Broadcast()
224         return nil
225 }
226
227 // BlockSoonWaiter returns a channel that
228 // waits for the block at the given height,
229 // but it is an error to wait for a block far in the future.
230 // WaitForBlockSoon will timeout if the context times out.
231 // To wait unconditionally, the caller should use WaitForBlock.
232 func (c *Chain) BlockSoonWaiter(ctx context.Context, height uint64) <-chan error {
233         ch := make(chan error, 1)
234
235         go func() {
236                 const slop = 3
237                 if height > c.Height()+slop {
238                         ch <- ErrTheDistantFuture
239                         return
240                 }
241
242                 select {
243                 case <-c.BlockWaiter(height):
244                         ch <- nil
245                 case <-ctx.Done():
246                         ch <- ctx.Err()
247                 }
248         }()
249
250         return ch
251 }
252
253 // BlockWaiter returns a channel that
254 // waits for the block at the given height.
255 func (c *Chain) BlockWaiter(height uint64) <-chan struct{} {
256         ch := make(chan struct{}, 1)
257         go func() {
258                 c.state.cond.L.Lock()
259                 defer c.state.cond.L.Unlock()
260                 for c.state.height < height {
261                         c.state.cond.Wait()
262                 }
263                 ch <- struct{}{}
264         }()
265
266         return ch
267 }