OSDN Git Service

Use nil better to len for goleveldb.Get return
[bytom/bytom.git] / blockchain / pin / pin.go
1 package pin
2
3 import (
4         "context"
5         "encoding/json"
6         "sort"
7         "sync"
8
9         "github.com/bytom/errors"
10         "github.com/bytom/log"
11         "github.com/bytom/protocol"
12         "github.com/bytom/protocol/bc/legacy"
13
14         dbm "github.com/tendermint/tmlibs/db"
15 )
16
17 const processorWorkers = 10
18
19 type Store struct {
20         DB dbm.DB
21
22         mu   sync.Mutex
23         cond sync.Cond
24         pins map[string]*pin
25 }
26
27 func NewStore(db dbm.DB) *Store {
28         s := &Store{
29                 DB:   db,
30                 pins: make(map[string]*pin),
31         }
32         s.cond.L = &s.mu
33         return s
34 }
35
36 func (s *Store) ProcessBlocks(ctx context.Context, c *protocol.Chain, pinName string, cb func(context.Context, *legacy.Block) error) {
37         p := <-s.pin(pinName)
38         height := p.getHeight()
39         for {
40                 select {
41                 case <-ctx.Done():
42                         log.Error(ctx, ctx.Err())
43                         return
44                 case <-c.BlockWaiter(height + 1):
45                         select {
46                         case <-ctx.Done():
47                                 log.Error(ctx, ctx.Err())
48                                 return
49                         case p.sem <- true:
50                                 go p.processBlock(ctx, c, height+1, cb)
51                                 height++
52                         }
53                 }
54         }
55 }
56
57 func (s *Store) CreatePin(ctx context.Context, name string, height uint64) error {
58         s.mu.Lock()
59         defer s.mu.Unlock()
60         if _, ok := s.pins[name]; ok {
61                 return nil
62         }
63
64         block_processor, err := json.Marshal(&struct {
65                 Name   string
66                 Height uint64
67         }{Name: name,
68                 Height: height})
69         if err != nil {
70                 return errors.Wrap(err, "failed marshal block_processor")
71         }
72         if len(block_processor) > 0 {
73                 s.DB.Set(json.RawMessage("blp"+name), block_processor)
74         }
75
76         s.pins[name] = newPin(s.DB, name, height)
77         s.cond.Broadcast()
78         return nil
79 }
80
81 func (s *Store) Height(name string) uint64 {
82         p := <-s.pin(name)
83         return p.getHeight()
84 }
85
86 func (s *Store) LoadAll(ctx context.Context) error {
87         s.mu.Lock()
88         defer s.mu.Unlock()
89
90         var block_processor = struct {
91                 Name   string
92                 Height uint64
93         }{}
94         iter := s.DB.Iterator()
95         for iter.Next() {
96                 key := string(iter.Key())
97                 if key[:3] != "blp" {
98                         continue
99                 }
100                 err := json.Unmarshal(iter.Value(), &block_processor)
101                 if err != nil {
102                         return errors.New("failed unmarshal this block_processor.")
103                 }
104
105                 s.pins[block_processor.Name] = newPin(s.DB,
106                         block_processor.Name,
107                         block_processor.Height)
108
109         }
110
111         s.cond.Broadcast()
112         return nil
113 }
114
115 func (s *Store) pin(name string) <-chan *pin {
116         ch := make(chan *pin, 1)
117         go func() {
118                 s.mu.Lock()
119                 defer s.mu.Unlock()
120                 for s.pins[name] == nil {
121                         s.cond.Wait()
122                 }
123                 ch <- s.pins[name]
124         }()
125         return ch
126 }
127
128 func (s *Store) PinWaiter(pinName string, height uint64) <-chan struct{} {
129         ch := make(chan struct{}, 1)
130         p := <-s.pin(pinName)
131         go func() {
132                 p.mu.Lock()
133                 defer p.mu.Unlock()
134                 for p.height < height {
135                         p.cond.Wait()
136                 }
137                 ch <- struct{}{}
138         }()
139         return ch
140 }
141
142 func (s *Store) AllWaiter(height uint64) <-chan struct{} {
143         ch := make(chan struct{}, 1)
144         go func() {
145                 var pins []string
146                 s.mu.Lock()
147                 for name := range s.pins {
148                         pins = append(pins, name)
149                 }
150                 s.mu.Unlock()
151                 for _, name := range pins {
152                         <-s.PinWaiter(name, height)
153                 }
154                 ch <- struct{}{}
155         }()
156         return ch
157 }
158
159 type pin struct {
160         mu        sync.Mutex
161         cond      sync.Cond
162         height    uint64
163         completed []uint64
164
165         db   dbm.DB
166         name string
167         sem  chan bool
168 }
169
170 func newPin(db dbm.DB, name string, height uint64) *pin {
171         p := &pin{db: db, name: name, height: height, sem: make(chan bool, processorWorkers)}
172         p.cond.L = &p.mu
173         return p
174 }
175
176 func (p *pin) getHeight() uint64 {
177         p.mu.Lock()
178         defer p.mu.Unlock()
179         return p.height
180 }
181
182 func (p *pin) processBlock(ctx context.Context, c *protocol.Chain, height uint64, cb func(context.Context, *legacy.Block) error) {
183         defer func() { <-p.sem }()
184         for {
185                 block, err := c.GetBlock(height)
186                 if err != nil {
187                         log.Error(ctx, err)
188                         continue
189                 }
190
191                 err = cb(ctx, block)
192                 if err != nil {
193                         log.Error(ctx, errors.Wrapf(err, "pin %q callback", p.name))
194                         continue
195                 }
196
197                 err = p.complete(ctx, block.Height)
198                 if err != nil {
199                         log.Error(ctx, err)
200                 }
201                 break
202         }
203 }
204
205 func (p *pin) complete(ctx context.Context, height uint64) error {
206         p.mu.Lock()
207         defer p.mu.Unlock()
208
209         p.completed = append(p.completed, height)
210         sort.Sort(uint64s(p.completed))
211
212         var (
213                 max = p.height
214                 i   int
215         )
216         for i = 0; i < len(p.completed); i++ {
217                 if p.completed[i] <= max {
218                         continue
219                 } else if p.completed[i] > max+1 {
220                         break
221                 }
222                 max = p.completed[i]
223         }
224
225         if max == p.height {
226                 return nil
227         }
228
229         var (
230                 block_processor = struct {
231                         Name   string
232                         Height uint64
233                 }{}
234                 err error
235         )
236
237         bytes := p.db.Get(json.RawMessage("blp" + p.name))
238         if bytes != nil {
239                 err = json.Unmarshal(bytes, &block_processor)
240                 if err == nil && block_processor.Height >= max {
241                         goto Noupdate
242                 }
243         }
244
245         block_processor.Name = p.name
246         block_processor.Height = max
247
248         bytes, err = json.Marshal(&block_processor)
249         if err != nil {
250                 goto Noupdate
251         }
252         if len(bytes) > 0 {
253                 p.db.Set(json.RawMessage("blp"+p.name), bytes)
254         }
255
256 Noupdate:
257         p.completed = p.completed[i:]
258         p.height = max
259         p.cond.Broadcast()
260
261         return nil
262 }
263
264 type uint64s []uint64
265
266 func (a uint64s) Len() int           { return len(a) }
267 func (a uint64s) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
268 func (a uint64s) Less(i, j int) bool { return a[i] < a[j] }