OSDN Git Service

c79b22097c1159bdd5d81bd0cd1c480a34dce8ad
[bytom/bytom.git] / contract / trace_scheduler.go
1 package contract
2
3 import (
4         "errors"
5         "math"
6         "sync"
7         "time"
8
9         "github.com/bytom/bytom/protocol/bc"
10         log "github.com/sirupsen/logrus"
11         "golang.org/x/sync/semaphore"
12 )
13
14 var errInstQueueOverflow = errors.New("instance queue is overflow")
15
16 type traceScheduler struct {
17         weighted      *semaphore.Weighted
18         instances     *sync.Map
19         tracerService *TracerService
20         infra         *Infrastructure
21         tracer        *tracer
22 }
23
24 func newTraceScheduler(infra *Infrastructure) *traceScheduler {
25         scheduler := &traceScheduler{
26                 weighted:  semaphore.NewWeighted(1000),
27                 instances: new(sync.Map),
28                 infra:     infra,
29         }
30         go scheduler.processLoop()
31         return scheduler
32 }
33
34 func (t *traceScheduler) addNewJob(instance *Instance) error {
35         if !t.weighted.TryAcquire(1) {
36                 return errInstQueueOverflow
37         }
38
39         t.instances.Store(instance.TraceID, instance)
40         return nil
41 }
42
43 func (t *traceScheduler) processLoop() {
44         ticker := time.NewTicker(6 * time.Second)
45         defer ticker.Stop()
46
47         for range ticker.C {
48                 jobs, beginHeight := t.prepareJobs()
49                 if len(jobs) == 0 {
50                         continue
51                 }
52
53                 t.tracer = newTracer(nil)
54
55                 var prevHash *bc.Hash
56                 catchedJobs := make(map[bc.Hash][]*Instance)
57                 for height := beginHeight + 1; ; height++ {
58                         if ok, err := t.tryAttach(height, prevHash, jobs, catchedJobs); err != nil {
59                                 log.WithField("err", err).Error("try attach on trace scheduler")
60                                 break
61                         } else if !ok {
62                                 if err := t.detach(prevHash, catchedJobs); err != nil {
63                                         log.WithField("err", err).Error("detach on trace scheduler")
64                                         break
65                                 }
66                                 height -= 2
67                         }
68                         if bestHeight := t.infra.Chain.BestBlockHeight(); height == bestHeight {
69                                 if err := t.finishJobs(jobs, catchedJobs, *prevHash); err != nil {
70                                         log.WithField("err", err).Error("finish jobs")
71                                         break
72                                 }
73                         }
74                 }
75         }
76 }
77
78 func (t *traceScheduler) prepareJobs() (map[bc.Hash][]*Instance, uint64) {
79         var beginHeight uint64 = math.MaxUint64
80         hashToJobs := make(map[bc.Hash][]*Instance)
81         t.instances.Range(func(_, value interface{}) bool {
82                 inst := value.(*Instance)
83                 hashToJobs[inst.ScannedHash] = append(hashToJobs[inst.ScannedHash], inst)
84                 if inst.ScannedHeight < beginHeight {
85                         beginHeight = inst.ScannedHeight
86                 }
87                 return true
88         })
89         return hashToJobs, beginHeight
90 }
91
92 func (t *traceScheduler) tryAttach(height uint64, prevHash *bc.Hash, jobs, catchedJobs map[bc.Hash][]*Instance) (bool, error) {
93         block, err := t.infra.Chain.GetBlockByHeight(height)
94         if err != nil {
95                 return false, err
96         }
97
98         if prevHash != nil && block.PreviousBlockHash != *prevHash {
99                 return false, nil
100         }
101
102         if instances, ok := jobs[block.PreviousBlockHash]; ok {
103                 t.tracer.addInstances(instances)
104                 catchedJobs[block.PreviousBlockHash] = instances
105         }
106
107         t.tracer.applyBlock(block)
108         *prevHash = block.Hash()
109         return true, nil
110 }
111
112 func (t *traceScheduler) detach(prevHash *bc.Hash, catchedJobs map[bc.Hash][]*Instance) error {
113         prevBlock, err := t.infra.Chain.GetBlockByHash(prevHash)
114         if err != nil {
115                 return err
116         }
117
118         if instances, ok := catchedJobs[prevBlock.Hash()]; ok {
119                 for _, inst := range instances {
120                         t.tracer.removeInstance(inst.TraceID)
121                 }
122                 delete(catchedJobs, prevBlock.Hash())
123         }
124
125         t.tracer.detachBlock(prevBlock)
126         *prevHash = prevBlock.PreviousBlockHash
127         return nil
128 }
129
130 func (t *traceScheduler) finishJobs(jobs, catchedJobs map[bc.Hash][]*Instance, scannedHash bc.Hash) error {
131         var inSyncInstances, offChainInstances []*Instance
132         for hash, instances := range jobs {
133                 if _, ok := catchedJobs[hash]; !ok {
134                         offChainInstances = append(offChainInstances, instances...)
135                         for _, inst := range instances {
136                                 inst.Status = OffChain
137                         }
138                 } else {
139                         inSyncInstances = append(inSyncInstances, instances...)
140                 }
141         }
142
143         if err := t.infra.Repository.SaveInstances(offChainInstances); err != nil {
144                 return err
145         }
146
147         t.releaseInstances(offChainInstances)
148
149         if ok := t.tracerService.takeOverInstances(inSyncInstances, scannedHash); ok {
150                 t.releaseInstances(inSyncInstances)
151         }
152         return nil
153 }
154
155 func (t *traceScheduler) releaseInstances(instances []*Instance) {
156         t.weighted.Release(int64(len(instances)))
157         for _, inst := range instances {
158                 t.instances.Delete(inst.TraceID)
159         }
160 }