9 "github.com/bytom/bytom/protocol/bc"
10 log "github.com/sirupsen/logrus"
11 "golang.org/x/sync/semaphore"
14 var errInstQueueOverflow = errors.New("instance queue is overflow")
16 type traceScheduler struct {
17 weighted *semaphore.Weighted
19 tracerService *TracerService
24 func newTraceScheduler(infra *Infrastructure) *traceScheduler {
25 scheduler := &traceScheduler{
26 weighted: semaphore.NewWeighted(1000),
27 instances: new(sync.Map),
30 go scheduler.processLoop()
34 func (t *traceScheduler) addNewJob(instance *Instance) error {
35 if !t.weighted.TryAcquire(1) {
36 return errInstQueueOverflow
39 t.instances.Store(instance.TraceID, instance)
43 func (t *traceScheduler) processLoop() {
44 ticker := time.NewTicker(6 * time.Second)
48 jobs, beginHeight := t.prepareJobs()
53 t.tracer = newTracer(nil)
56 catchedJobs := make(map[bc.Hash][]*Instance)
57 for height := beginHeight + 1; ; height++ {
58 if ok, err := t.tryAttach(height, prevHash, jobs, catchedJobs); err != nil {
59 log.WithField("err", err).Error("try attach on trace scheduler")
62 if err := t.detach(prevHash, catchedJobs); err != nil {
63 log.WithField("err", err).Error("detach on trace scheduler")
68 if bestHeight := t.infra.Chain.BestBlockHeight(); height == bestHeight {
69 if err := t.finishJobs(jobs, catchedJobs, *prevHash); err != nil {
70 log.WithField("err", err).Error("finish jobs")
78 func (t *traceScheduler) prepareJobs() (map[bc.Hash][]*Instance, uint64) {
79 var beginHeight uint64 = math.MaxUint64
80 hashToJobs := make(map[bc.Hash][]*Instance)
81 t.instances.Range(func(_, value interface{}) bool {
82 inst := value.(*Instance)
83 hashToJobs[inst.ScannedHash] = append(hashToJobs[inst.ScannedHash], inst)
84 if inst.ScannedHeight < beginHeight {
85 beginHeight = inst.ScannedHeight
89 return hashToJobs, beginHeight
92 func (t *traceScheduler) tryAttach(height uint64, prevHash *bc.Hash, jobs, catchedJobs map[bc.Hash][]*Instance) (bool, error) {
93 block, err := t.infra.Chain.GetBlockByHeight(height)
98 if prevHash != nil && block.PreviousBlockHash != *prevHash {
102 if instances, ok := jobs[block.PreviousBlockHash]; ok {
103 t.tracer.addInstances(instances)
104 catchedJobs[block.PreviousBlockHash] = instances
107 t.tracer.applyBlock(block)
108 *prevHash = block.Hash()
112 func (t *traceScheduler) detach(prevHash *bc.Hash, catchedJobs map[bc.Hash][]*Instance) error {
113 prevBlock, err := t.infra.Chain.GetBlockByHash(prevHash)
118 if instances, ok := catchedJobs[prevBlock.Hash()]; ok {
119 for _, inst := range instances {
120 t.tracer.removeInstance(inst.TraceID)
122 delete(catchedJobs, prevBlock.Hash())
125 t.tracer.detachBlock(prevBlock)
126 *prevHash = prevBlock.PreviousBlockHash
130 func (t *traceScheduler) finishJobs(jobs, catchedJobs map[bc.Hash][]*Instance, scannedHash bc.Hash) error {
131 var inSyncInstances, offChainInstances []*Instance
132 for hash, instances := range jobs {
133 if _, ok := catchedJobs[hash]; !ok {
134 offChainInstances = append(offChainInstances, instances...)
135 for _, inst := range instances {
136 inst.Status = OffChain
139 inSyncInstances = append(inSyncInstances, instances...)
143 if err := t.infra.Repository.SaveInstances(offChainInstances); err != nil {
147 t.releaseInstances(offChainInstances)
149 if ok := t.tracerService.takeOverInstances(inSyncInstances, scannedHash); ok {
150 t.releaseInstances(inSyncInstances)
155 func (t *traceScheduler) releaseInstances(instances []*Instance) {
156 t.weighted.Release(int64(len(instances)))
157 for _, inst := range instances {
158 t.instances.Delete(inst.TraceID)