OSDN Git Service

fix_ended_instance (#2111)
authorPoseidon <shenao.78@163.com>
Sat, 18 Sep 2021 06:01:03 +0000 (14:01 +0800)
committerGitHub <noreply@github.com>
Sat, 18 Sep 2021 06:01:03 +0000 (14:01 +0800)
* fix_ended_instance

* fix take over instance

contract/infrastructure.go
contract/instance.go
contract/trace_scheduler.go
contract/trace_service.go
contract/tracer.go
protocol/protocol.go

index e86f210..bb78e93 100644 (file)
@@ -16,6 +16,7 @@ func NewInfrastructure(chain ChainService, repository Repository) *Infrastructur
 
 type ChainService interface {
        BestChain() (uint64, bc.Hash)
+       FinalizedHeight() uint64
        GetBlockByHash(*bc.Hash) (*types.Block, error)
        GetBlockByHeight(uint64) (*types.Block, error)
        BlockWaiter(height uint64) <-chan struct{}
index f8dffbd..18c76f9 100644 (file)
@@ -11,7 +11,7 @@ type Status uint8
 const (
        Lagging Status = iota + 1
        InSync
-       Finalized
+       Ended
        OffChain
 )
 
@@ -26,48 +26,50 @@ type Instance struct {
        UTXOs         []*UTXO  `json:"utxos"`
        TxHash        *bc.Hash `json:"tx_hash"`
        Status        Status   `json:"status"`
+       EndedHeight   uint64   `json:"ended_height"`
        ScannedHash   bc.Hash  `json:"scanned_hash"`
        ScannedHeight uint64   `json:"scanned_height"`
        Unconfirmed   []*TreeNode
 }
 
-func newInstance(inUTXOs, outUTXOs []*UTXO, txHash bc.Hash, block *types.Block) *Instance {
+func newInstance(t *transfer, block *types.Block) *Instance {
        inst := &Instance{
                TraceID:       uuid.New().String(),
-               TxHash:        &txHash,
-               UTXOs:         outUTXOs,
+               TxHash:        &t.txHash,
+               UTXOs:         t.outUTXOs,
+               Status:        Lagging,
                ScannedHeight: block.Height,
                ScannedHash:   block.Hash(),
        }
-       inst.Status = Lagging
-       if len(outUTXOs) == 0 {
-               inst.Status = Finalized
-               inst.UTXOs = inUTXOs
+       if len(t.outUTXOs) == 0 {
+               inst.Status = Ended
+               inst.UTXOs = t.inUTXOs
        }
        return inst
 }
 
-func (i *Instance) transferTo(newUTXOs []*UTXO, txHash bc.Hash) *Instance {
+func (i *Instance) transferTo(t *transfer, blockHeight uint64) *Instance {
        inst := &Instance{
                TraceID:     i.TraceID,
                Status:      i.Status,
                Unconfirmed: i.Unconfirmed,
-               UTXOs:       newUTXOs,
-               TxHash:      &txHash,
+               UTXOs:       t.outUTXOs,
+               TxHash:      &t.txHash,
        }
-       if len(newUTXOs) == 0 {
-               inst.Status = Finalized
-               inst.UTXOs = i.UTXOs
+       if len(t.outUTXOs) == 0 {
+               inst.Status = Ended
+               inst.EndedHeight = blockHeight
+               inst.UTXOs = t.inUTXOs
        }
-       inst.confirmTx(txHash)
+       inst.confirmTx(t.txHash)
        return inst
 }
 
-func (i *Instance) rollbackTo(newUTXOs []*UTXO) *Instance {
+func (i *Instance) rollbackTo(t *transfer) *Instance {
        return &Instance{
                TraceID:     i.TraceID,
                Status:      InSync,
-               UTXOs:       newUTXOs,
+               UTXOs:       t.inUTXOs,
                TxHash:      nil,
                Unconfirmed: nil,
        }
@@ -103,8 +105,8 @@ func (i *instanceIndex) getAll() []*Instance {
        return instances
 }
 
-func (i *instanceIndex) getByID(id string) *Instance {
-       return i.traceIdToInst[id]
+func (i *instanceIndex) getByID(traceID string) *Instance {
+       return i.traceIdToInst[traceID]
 }
 
 func (i *instanceIndex) getByUTXO(utxoHash bc.Hash) *Instance {
index e79f21b..9a9a943 100644 (file)
@@ -18,7 +18,10 @@ type traceScheduler struct {
        instances     *sync.Map
        tracerService *TraceService
        infra         *Infrastructure
+
        tracer        *tracer
+       currentHeight uint64
+       currentHash   bc.Hash
 }
 
 func newTraceScheduler(infra *Infrastructure) *traceScheduler {
@@ -56,23 +59,22 @@ func (t *traceScheduler) processLoop() {
 
                t.tracer = newTracer(jobs[beginHash])
 
-               for height, blockHash := beginHeight, beginHash; ; height++ {
-                       if bestHeight := t.tracerService.BestHeight(); height == bestHeight {
-                               if err := t.finishJobs(jobs, blockHash); err != nil {
+               for t.currentHeight, t.currentHash = beginHeight, beginHash; ; {
+                       if bestHeight := t.tracerService.BestHeight(); t.currentHeight == bestHeight {
+                               if err := t.finishJobs(jobs); err != nil {
                                        log.WithField("err", err).Error("finish jobs")
                                        break
                                }
                        }
 
-                       if ok, err := t.tryAttach(height+1, &blockHash, jobs); err != nil {
+                       if ok, err := t.tryAttach(jobs); err != nil {
                                log.WithField("err", err).Error("try attach on trace scheduler")
                                break
                        } else if !ok {
-                               if err := t.detach(&blockHash, jobs); err != nil {
+                               if err := t.detach(jobs); err != nil {
                                        log.WithField("err", err).Error("detach on trace scheduler")
                                        break
                                }
-                               height -= 2
                        }
                }
        }
@@ -93,18 +95,19 @@ func (t *traceScheduler) prepareJobs() (map[bc.Hash][]*Instance, uint64, bc.Hash
        return hashToJobs, beginHeight, beginHash
 }
 
-func (t *traceScheduler) tryAttach(height uint64, blockHash *bc.Hash, jobs map[bc.Hash][]*Instance) (bool, error) {
-       block, err := t.infra.Chain.GetBlockByHeight(height)
+func (t *traceScheduler) tryAttach(jobs map[bc.Hash][]*Instance) (bool, error) {
+       block, err := t.infra.Chain.GetBlockByHeight(t.currentHeight+1)
        if err != nil {
                return false, err
        }
 
-       if block.PreviousBlockHash != *blockHash {
+       if block.PreviousBlockHash != t.currentHash {
                return false, nil
        }
 
        t.tracer.applyBlock(block)
-       *blockHash = block.Hash()
+       t.currentHeight++
+       t.currentHash = block.Hash()
 
        if instances, ok := jobs[block.Hash()]; ok {
                t.tracer.addInstances(instances)
@@ -112,8 +115,8 @@ func (t *traceScheduler) tryAttach(height uint64, blockHash *bc.Hash, jobs map[b
        return true, nil
 }
 
-func (t *traceScheduler) detach(blockHash *bc.Hash, jobs map[bc.Hash][]*Instance) error {
-       block, err := t.infra.Chain.GetBlockByHash(blockHash)
+func (t *traceScheduler) detach(jobs map[bc.Hash][]*Instance) error {
+       block, err := t.infra.Chain.GetBlockByHash(&t.currentHash)
        if err != nil {
                return err
        }
@@ -125,11 +128,12 @@ func (t *traceScheduler) detach(blockHash *bc.Hash, jobs map[bc.Hash][]*Instance
        }
 
        t.tracer.detachBlock(block)
-       *blockHash = block.PreviousBlockHash
+       t.currentHeight--
+       t.currentHash = block.PreviousBlockHash
        return nil
 }
 
-func (t *traceScheduler) finishJobs(jobs map[bc.Hash][]*Instance, scannedHash bc.Hash) error {
+func (t *traceScheduler) finishJobs(jobs map[bc.Hash][]*Instance) error {
        inSyncInstances := t.tracer.allInstances()
        inSyncMap := make(map[string]bool)
        for _, inst := range inSyncInstances {
@@ -152,7 +156,7 @@ func (t *traceScheduler) finishJobs(jobs map[bc.Hash][]*Instance, scannedHash bc
 
        t.releaseInstances(offChainInstances)
 
-       if ok := t.tracerService.takeOverInstances(inSyncInstances, scannedHash); ok {
+       if ok := t.tracerService.takeOverInstances(inSyncInstances, t.currentHash); ok {
                t.releaseInstances(inSyncInstances)
        }
        return nil
index cc0a5d4..3104ea1 100644 (file)
@@ -24,6 +24,7 @@ type TraceService struct {
        infra            *Infrastructure
        scheduler        *traceScheduler
        unconfirmedIndex map[bc.Hash]*TreeNode
+       endedInstances   map[string]bool
        bestHeight       uint64
        bestHash         bc.Hash
 }
@@ -43,13 +44,14 @@ func NewTraceService(infra *Infrastructure) *TraceService {
        }
 
        scheduler := newTraceScheduler(infra)
-       inSyncInstances := dispatchInstances(allInstances, scheduler)
+       inSyncInstances := dispatchInstances(allInstances, scheduler, infra.Chain.FinalizedHeight())
 
        service := &TraceService{
                infra:            infra,
                tracer:           newTracer(inSyncInstances),
                scheduler:        scheduler,
                unconfirmedIndex: make(map[bc.Hash]*TreeNode),
+               endedInstances:   make(map[string]bool),
                bestHeight:       chainStatus.BlockHeight,
                bestHash:         chainStatus.BlockHash,
        }
@@ -57,11 +59,15 @@ func NewTraceService(infra *Infrastructure) *TraceService {
        return service
 }
 
-func dispatchInstances(instances []*Instance, scheduler *traceScheduler) []*Instance {
+func dispatchInstances(instances []*Instance, scheduler *traceScheduler, finalizedHeight uint64) []*Instance {
        var result []*Instance
        for _, inst := range instances {
                if inst.Status == InSync {
                        result = append(result, inst)
+               } else if inst.Status == Ended {
+                       if inst.EndedHeight < finalizedHeight {
+                               result = append(result, inst)
+                       }
                } else if inst.Status == Lagging {
                        if err := scheduler.addNewJob(inst); err != nil {
                                logrus.WithField("err", err).Fatal("add new job when init tracer")
@@ -88,6 +94,7 @@ func (t *TraceService) ApplyBlock(block *types.Block) error {
        defer t.Unlock()
 
        newInstances := t.tracer.applyBlock(block)
+       t.processEndedInstances(newInstances)
        t.bestHeight++
        t.bestHash = block.Hash()
        return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
@@ -98,6 +105,7 @@ func (t *TraceService) DetachBlock(block *types.Block) error {
        defer t.Unlock()
 
        newInstances := t.tracer.detachBlock(block)
+       t.processEndedInstances(nil)
        t.bestHeight--
        t.bestHash = block.PreviousBlockHash
        return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
@@ -112,7 +120,7 @@ func (t *TraceService) AddUnconfirmedTx(tx *types.Tx) {
                }
 
                treeNode := &TreeNode{TxHash: tx.ID, UTXOs: outUTXOs}
-               if inst := t.tracer.table.getByUTXO(inUTXOs[0].Hash); inst != nil {
+               if inst := t.tracer.index.getByUTXO(inUTXOs[0].Hash); inst != nil {
                        inst.Unconfirmed = append(inst.Unconfirmed, treeNode)
                        t.addToUnconfirmedIndex(treeNode, outUTXOs)
                        return
@@ -147,9 +155,9 @@ func (t *TraceService) CreateInstance(txHash, blockHash bc.Hash) ([]string, erro
 
        var traceIDs []string
        for _, transfer := range transfers {
-               inst := newInstance(transfer.inUTXOs, transfer.outUTXOs, txHash, block)
+               inst := newInstance(transfer, block)
                traceIDs = append(traceIDs, inst.TraceID)
-               if err := t.addNewTraceJob(inst, block); err != nil {
+               if err := t.addNewTraceJob(inst); err != nil {
                        return nil, err
                }
        }
@@ -178,7 +186,9 @@ func (t *TraceService) takeOverInstances(instances []*Instance, blockHash bc.Has
        }
 
        for _, inst := range instances {
-               inst.Status = InSync
+               if inst.Status != Ended {
+                       inst.Status = InSync
+               }
        }
 
        if err := t.infra.Repository.SaveInstances(instances); err != nil {
@@ -187,15 +197,35 @@ func (t *TraceService) takeOverInstances(instances []*Instance, blockHash bc.Has
        }
 
        t.tracer.addInstances(instances)
+       t.processEndedInstances(instances)
        return true
 }
 
-func (t *TraceService) addNewTraceJob(inst *Instance, block *types.Block) error {
+func (t *TraceService) processEndedInstances(instances []*Instance) {
+       for _, inst := range instances {
+               if inst.Status == Ended {
+                       t.endedInstances[inst.TraceID] = true
+               }
+       }
+
+       finalizedHeight := t.infra.Chain.FinalizedHeight()
+       for traceID := range t.endedInstances {
+               inst := t.tracer.getInstance(traceID)
+               if inst.Status != Ended {
+                       delete(t.endedInstances, traceID)
+               } else if finalizedHeight >= inst.EndedHeight {
+                       delete(t.endedInstances, traceID)
+                       t.tracer.removeInstance(traceID)
+               }
+       }
+}
+
+func (t *TraceService) addNewTraceJob(inst *Instance) error {
        if err := t.infra.Repository.SaveInstances([]*Instance{inst}); err != nil {
                return err
        }
 
-       if inst.Status != Finalized {
+       if inst.Status != Ended {
                if err := t.scheduler.addNewJob(inst); err != nil {
                        return err
                }
index f10951c..3e5492b 100644 (file)
@@ -4,33 +4,38 @@ import (
        "encoding/hex"
 
        "github.com/bytom/bytom/consensus/segwit"
+       "github.com/bytom/bytom/protocol/bc"
        "github.com/bytom/bytom/protocol/bc/types"
 )
 
 type tracer struct {
-       table *instanceIndex
+       index *instanceIndex
 }
 
 func newTracer(instances []*Instance) *tracer {
-       table := newInstanceIndex()
+       index := newInstanceIndex()
        for _, inst := range instances {
-               table.save(inst)
+               index.save(inst)
        }
-       return &tracer{table: table}
+       return &tracer{index: index}
+}
+
+func (t *tracer) getInstance(traceID string) *Instance {
+       return t.index.getByID(traceID)
 }
 
 func (t *tracer) allInstances() []*Instance {
-       return t.table.getAll()
+       return t.index.getAll()
 }
 
 func (t *tracer) addInstances(instances []*Instance) {
        for _, inst := range instances {
-               t.table.save(inst)
+               t.index.save(inst)
        }
 }
 
 func (t *tracer) removeInstance(traceID string) {
-       t.table.remove(traceID)
+       t.index.remove(traceID)
 }
 
 func (t *tracer) applyBlock(block *types.Block) []*Instance {
@@ -42,8 +47,8 @@ func (t *tracer) applyBlock(block *types.Block) []*Instance {
                                continue
                        }
 
-                       if inst := t.table.getByUTXO(transfer.inUTXOs[0].Hash); inst != nil {
-                               newInst := inst.transferTo(transfer.outUTXOs, tx.ID)
+                       if inst := t.index.getByUTXO(transfer.inUTXOs[0].Hash); inst != nil {
+                               newInst := inst.transferTo(transfer, block.Height)
                                newInstances = append(newInstances, newInst)
                        }
                }
@@ -59,8 +64,8 @@ func (t *tracer) detachBlock(block *types.Block) []*Instance {
                transfers := parseTransfers(tx)
                for _, transfer := range transfers {
                        utxos := append(transfer.outUTXOs, transfer.inUTXOs...)
-                       if inst := t.table.getByUTXO(utxos[0].Hash); inst != nil {
-                               newInst := inst.rollbackTo(transfer.inUTXOs)
+                       if inst := t.index.getByUTXO(utxos[0].Hash); inst != nil {
+                               newInst := inst.rollbackTo(transfer)
                                newInstances = append(newInstances, newInst)
                        }
                }
@@ -70,6 +75,7 @@ func (t *tracer) detachBlock(block *types.Block) []*Instance {
 }
 
 type transfer struct {
+       txHash   bc.Hash
        inUTXOs  []*UTXO
        outUTXOs []*UTXO
 }
@@ -82,11 +88,11 @@ func parseTransfers(tx *types.Tx) []*transfer {
        var transfers []*transfer
        for program, utxos := range groupInUTXOs {
                outUTXOs := groupOutUTXOs[program]
-               transfers = append(transfers, &transfer{inUTXOs: utxos, outUTXOs: outUTXOs})
+               transfers = append(transfers, &transfer{txHash: tx.ID, inUTXOs: utxos, outUTXOs: outUTXOs})
        }
        for program, utxos := range groupOutUTXOs {
                if _, ok := groupInUTXOs[program]; !ok {
-                       transfers = append(transfers, &transfer{outUTXOs: utxos})
+                       transfers = append(transfers, &transfer{txHash: tx.ID, outUTXOs: utxos})
                }
        }
        return transfers
@@ -123,6 +129,6 @@ func isContract(program []byte) bool {
 
 func (t *tracer) saveInstances(instances []*Instance) {
        for _, inst := range instances {
-               t.table.save(inst)
+               t.index.save(inst)
        }
 }
index 7aed23a..f1429be 100644 (file)
@@ -145,6 +145,11 @@ func (c *Chain) BestChain() (uint64, bc.Hash) {
        return c.bestBlockHeader.Height, c.bestBlockHeader.Hash()
 }
 
+func (c *Chain) FinalizedHeight() uint64 {
+       finalizedHeight, _ := c.casper.LastFinalized()
+       return finalizedHeight
+}
+
 // AllValidators return all validators has vote num
 func (c *Chain) AllValidators(blockHash *bc.Hash) ([]*state.Validator, error) {
        parentCheckpoint, err := c.casper.ParentCheckpoint(blockHash)