9 "github.com/bytom/errors"
10 "github.com/bytom/log"
11 "github.com/bytom/protocol"
12 "github.com/bytom/protocol/bc/legacy"
14 dbm "github.com/tendermint/tmlibs/db"
17 const processorWorkers = 10
27 func NewStore(db dbm.DB) *Store {
30 pins: make(map[string]*pin),
36 func (s *Store) ProcessBlocks(ctx context.Context, c *protocol.Chain, pinName string, cb func(context.Context, *legacy.Block) error) {
38 height := p.getHeight()
42 log.Error(ctx, ctx.Err())
44 case <-c.BlockWaiter(height + 1):
47 log.Error(ctx, ctx.Err())
50 go p.processBlock(ctx, c, height+1, cb)
57 func (s *Store) CreatePin(ctx context.Context, name string, height uint64) error {
60 if _, ok := s.pins[name]; ok {
64 block_processor, err := json.Marshal(&struct {
70 return errors.Wrap(err, "failed marshal block_processor")
72 if len(block_processor) > 0 {
73 s.DB.Set(json.RawMessage("blp"+name), block_processor)
76 s.pins[name] = newPin(s.DB, name, height)
81 func (s *Store) Height(name string) uint64 {
86 func (s *Store) LoadAll(ctx context.Context) error {
90 var block_processor = struct {
94 iter := s.DB.Iterator()
96 key := string(iter.Key())
100 err := json.Unmarshal(iter.Value(), &block_processor)
102 return errors.New("failed unmarshal this block_processor.")
105 s.pins[block_processor.Name] = newPin(s.DB,
106 block_processor.Name,
107 block_processor.Height)
115 func (s *Store) pin(name string) <-chan *pin {
116 ch := make(chan *pin, 1)
120 for s.pins[name] == nil {
128 func (s *Store) PinWaiter(pinName string, height uint64) <-chan struct{} {
129 ch := make(chan struct{}, 1)
130 p := <-s.pin(pinName)
134 for p.height < height {
142 func (s *Store) AllWaiter(height uint64) <-chan struct{} {
143 ch := make(chan struct{}, 1)
147 for name := range s.pins {
148 pins = append(pins, name)
151 for _, name := range pins {
152 <-s.PinWaiter(name, height)
170 func newPin(db dbm.DB, name string, height uint64) *pin {
171 p := &pin{db: db, name: name, height: height, sem: make(chan bool, processorWorkers)}
176 func (p *pin) getHeight() uint64 {
182 func (p *pin) processBlock(ctx context.Context, c *protocol.Chain, height uint64, cb func(context.Context, *legacy.Block) error) {
183 defer func() { <-p.sem }()
185 block, err := c.GetBlock(height)
193 log.Error(ctx, errors.Wrapf(err, "pin %q callback", p.name))
197 err = p.complete(ctx, block.Height)
205 func (p *pin) complete(ctx context.Context, height uint64) error {
209 p.completed = append(p.completed, height)
210 sort.Sort(uint64s(p.completed))
216 for i = 0; i < len(p.completed); i++ {
217 if p.completed[i] <= max {
219 } else if p.completed[i] > max+1 {
230 block_processor = struct {
237 bytes := p.db.Get(json.RawMessage("blp" + p.name))
239 err = json.Unmarshal(bytes, &block_processor)
240 if err == nil && block_processor.Height >= max {
245 block_processor.Name = p.name
246 block_processor.Height = max
248 bytes, err = json.Marshal(&block_processor)
253 p.db.Set(json.RawMessage("blp"+p.name), bytes)
257 p.completed = p.completed[i:]
264 type uint64s []uint64
266 func (a uint64s) Len() int { return len(a) }
267 func (a uint64s) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
268 func (a uint64s) Less(i, j int) bool { return a[i] < a[j] }