OSDN Git Service

Merge branch 'master' of https://github.com/Bytom/bytom-btmhash
[bytom/bytom.git] / protocol / protocol.go
1 package protocol
2
3 import (
4         "context"
5         "sync"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9
10         "github.com/bytom/errors"
11         "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/legacy"
13         "github.com/bytom/protocol/state"
14 )
15
16 // maxCachedValidatedTxs is the max number of validated txs to cache.
17 const maxCachedValidatedTxs = 1000
18
19 var (
20         // ErrTheDistantFuture is returned when waiting for a blockheight
21         // too far in excess of the tip of the blockchain.
22         ErrTheDistantFuture = errors.New("block height too far in future")
23 )
24
25 // Store provides storage for blockchain data: blocks and state tree
26 // snapshots.
27 //
28 // Note, this is different from a state snapshot. A state snapshot
29 // provides access to the state at a given point in time -- outputs
30 // and issuance memory. The Chain type uses Store to load state
31 // from storage and persist validated data.
32 type Store interface {
33         Height() uint64
34         GetBlock(uint64) (*legacy.Block, error)
35         LatestSnapshot(context.Context) (*state.Snapshot, uint64, error)
36
37         SaveBlock(*legacy.Block) error
38         FinalizeBlock(context.Context, uint64) error
39         SaveSnapshot(context.Context, uint64, *state.Snapshot) error
40 }
41
42 // Chain provides a complete, minimal blockchain database. It
43 // delegates the underlying storage to other objects, and uses
44 // validation logic from package validation to decide what
45 // objects can be safely stored.
46 type Chain struct {
47         InitialBlockHash  bc.Hash
48         MaxIssuanceWindow time.Duration // only used by generators
49
50         state struct {
51                 cond     sync.Cond // protects height, block, snapshot
52                 height   uint64
53                 block    *legacy.Block
54                 snapshot *state.Snapshot
55         }
56         store Store
57
58         lastQueuedSnapshot time.Time
59         pendingSnapshots   chan pendingSnapshot
60
61         txPool *TxPool
62 }
63
64 type pendingSnapshot struct {
65         height   uint64
66         snapshot *state.Snapshot
67 }
68
69 // NewChain returns a new Chain using store as the underlying storage.
70 func NewChain(ctx context.Context, initialBlockHash bc.Hash, store Store, txPool *TxPool, heights <-chan uint64) (*Chain, error) {
71         c := &Chain{
72                 InitialBlockHash: initialBlockHash,
73                 store:            store,
74                 pendingSnapshots: make(chan pendingSnapshot, 1),
75                 txPool:           txPool,
76         }
77         c.state.cond.L = new(sync.Mutex)
78
79         log.WithField("current height", store.Height()).Info("Resume from the database")
80         c.state.height = store.Height()
81
82         if c.state.height < 1 {
83                 c.state.snapshot = state.Empty()
84         } else {
85                 c.state.block, _ = store.GetBlock(c.state.height)
86                 c.state.snapshot, _, _ = store.LatestSnapshot(ctx)
87         }
88
89         // Note that c.height.n may still be zero here.
90         if heights != nil {
91                 go func() {
92                         for h := range heights {
93                                 c.setHeight(h)
94                         }
95                 }()
96         }
97
98         go func() {
99                 for {
100                         select {
101                         case <-ctx.Done():
102                                 return
103                         case ps := <-c.pendingSnapshots:
104                                 err := store.SaveSnapshot(ctx, ps.height, ps.snapshot)
105                                 if err != nil {
106                                         log.WithField("error", err).Error("Error occurs when saving snapshot")
107                                 }
108                         }
109                 }
110         }()
111
112         return c, nil
113 }
114
115 func (c *Chain) GetStore() *Store {
116         return &(c.store)
117 }
118
119 // Height returns the current height of the blockchain.
120 func (c *Chain) Height() uint64 {
121         c.state.cond.L.Lock()
122         defer c.state.cond.L.Unlock()
123         return c.state.height
124 }
125
126 // TimestampMS returns the latest known block timestamp.
127 func (c *Chain) TimestampMS() uint64 {
128         c.state.cond.L.Lock()
129         defer c.state.cond.L.Unlock()
130         if c.state.block == nil {
131                 return 0
132         }
133         return c.state.block.TimestampMS
134 }
135
136 // State returns the most recent state available. It will not be current
137 // unless the current process is the leader. Callers should examine the
138 // returned block header's height if they need to verify the current state.
139 func (c *Chain) State() (*legacy.Block, *state.Snapshot) {
140         c.state.cond.L.Lock()
141         defer c.state.cond.L.Unlock()
142         return c.state.block, c.state.snapshot
143 }
144
145 func (c *Chain) setState(b *legacy.Block, s *state.Snapshot) {
146         c.state.cond.L.Lock()
147         defer c.state.cond.L.Unlock()
148         c.state.block = b
149         c.state.snapshot = s
150         if b != nil && b.Height > c.state.height {
151                 c.state.height = b.Height
152                 c.state.cond.Broadcast()
153         }
154 }
155
156 // BlockSoonWaiter returns a channel that
157 // waits for the block at the given height,
158 // but it is an error to wait for a block far in the future.
159 // WaitForBlockSoon will timeout if the context times out.
160 // To wait unconditionally, the caller should use WaitForBlock.
161 func (c *Chain) BlockSoonWaiter(ctx context.Context, height uint64) <-chan error {
162         ch := make(chan error, 1)
163
164         go func() {
165                 const slop = 3
166                 if height > c.Height()+slop {
167                         ch <- ErrTheDistantFuture
168                         return
169                 }
170
171                 select {
172                 case <-c.BlockWaiter(height):
173                         ch <- nil
174                 case <-ctx.Done():
175                         ch <- ctx.Err()
176                 }
177         }()
178
179         return ch
180 }
181
182 // BlockWaiter returns a channel that
183 // waits for the block at the given height.
184 func (c *Chain) BlockWaiter(height uint64) <-chan struct{} {
185         ch := make(chan struct{}, 1)
186         go func() {
187                 c.state.cond.L.Lock()
188                 defer c.state.cond.L.Unlock()
189                 for c.state.height < height {
190                         c.state.cond.Wait()
191                 }
192                 ch <- struct{}{}
193         }()
194
195         return ch
196 }