type ChainService interface {
BestChain() (uint64, bc.Hash)
+ FinalizedHeight() uint64
GetBlockByHash(*bc.Hash) (*types.Block, error)
GetBlockByHeight(uint64) (*types.Block, error)
BlockWaiter(height uint64) <-chan struct{}
const (
Lagging Status = iota + 1
InSync
- Finalized
+ Ended
OffChain
)
UTXOs []*UTXO `json:"utxos"`
TxHash *bc.Hash `json:"tx_hash"`
Status Status `json:"status"`
+ EndedHeight uint64 `json:"ended_height"`
ScannedHash bc.Hash `json:"scanned_hash"`
ScannedHeight uint64 `json:"scanned_height"`
Unconfirmed []*TreeNode
}
-func newInstance(inUTXOs, outUTXOs []*UTXO, txHash bc.Hash, block *types.Block) *Instance {
+func newInstance(t *transfer, block *types.Block) *Instance {
inst := &Instance{
TraceID: uuid.New().String(),
- TxHash: &txHash,
- UTXOs: outUTXOs,
+ TxHash: &t.txHash,
+ UTXOs: t.outUTXOs,
+ Status: Lagging,
ScannedHeight: block.Height,
ScannedHash: block.Hash(),
}
- inst.Status = Lagging
- if len(outUTXOs) == 0 {
- inst.Status = Finalized
- inst.UTXOs = inUTXOs
+ if len(t.outUTXOs) == 0 {
+ inst.Status = Ended
+ inst.UTXOs = t.inUTXOs
}
return inst
}
-func (i *Instance) transferTo(newUTXOs []*UTXO, txHash bc.Hash) *Instance {
+func (i *Instance) transferTo(t *transfer, blockHeight uint64) *Instance {
inst := &Instance{
TraceID: i.TraceID,
Status: i.Status,
Unconfirmed: i.Unconfirmed,
- UTXOs: newUTXOs,
- TxHash: &txHash,
+ UTXOs: t.outUTXOs,
+ TxHash: &t.txHash,
}
- if len(newUTXOs) == 0 {
- inst.Status = Finalized
- inst.UTXOs = i.UTXOs
+ if len(t.outUTXOs) == 0 {
+ inst.Status = Ended
+ inst.EndedHeight = blockHeight
+ inst.UTXOs = t.inUTXOs
}
- inst.confirmTx(txHash)
+ inst.confirmTx(t.txHash)
return inst
}
-func (i *Instance) rollbackTo(newUTXOs []*UTXO) *Instance {
+func (i *Instance) rollbackTo(t *transfer) *Instance {
return &Instance{
TraceID: i.TraceID,
Status: InSync,
- UTXOs: newUTXOs,
+ UTXOs: t.inUTXOs,
TxHash: nil,
Unconfirmed: nil,
}
return instances
}
-func (i *instanceIndex) getByID(id string) *Instance {
- return i.traceIdToInst[id]
+func (i *instanceIndex) getByID(traceID string) *Instance {
+ return i.traceIdToInst[traceID]
}
func (i *instanceIndex) getByUTXO(utxoHash bc.Hash) *Instance {
instances *sync.Map
tracerService *TraceService
infra *Infrastructure
+
tracer *tracer
+ currentHeight uint64
+ currentHash bc.Hash
}
func newTraceScheduler(infra *Infrastructure) *traceScheduler {
t.tracer = newTracer(jobs[beginHash])
- for height, blockHash := beginHeight, beginHash; ; height++ {
- if bestHeight := t.tracerService.BestHeight(); height == bestHeight {
- if err := t.finishJobs(jobs, blockHash); err != nil {
+ for t.currentHeight, t.currentHash = beginHeight, beginHash; ; {
+ if bestHeight := t.tracerService.BestHeight(); t.currentHeight == bestHeight {
+ if err := t.finishJobs(jobs); err != nil {
log.WithField("err", err).Error("finish jobs")
break
}
}
- if ok, err := t.tryAttach(height+1, &blockHash, jobs); err != nil {
+ if ok, err := t.tryAttach(jobs); err != nil {
log.WithField("err", err).Error("try attach on trace scheduler")
break
} else if !ok {
- if err := t.detach(&blockHash, jobs); err != nil {
+ if err := t.detach(jobs); err != nil {
log.WithField("err", err).Error("detach on trace scheduler")
break
}
- height -= 2
}
}
}
return hashToJobs, beginHeight, beginHash
}
-func (t *traceScheduler) tryAttach(height uint64, blockHash *bc.Hash, jobs map[bc.Hash][]*Instance) (bool, error) {
- block, err := t.infra.Chain.GetBlockByHeight(height)
+func (t *traceScheduler) tryAttach(jobs map[bc.Hash][]*Instance) (bool, error) {
+ block, err := t.infra.Chain.GetBlockByHeight(t.currentHeight+1)
if err != nil {
return false, err
}
- if block.PreviousBlockHash != *blockHash {
+ if block.PreviousBlockHash != t.currentHash {
return false, nil
}
t.tracer.applyBlock(block)
- *blockHash = block.Hash()
+ t.currentHeight++
+ t.currentHash = block.Hash()
if instances, ok := jobs[block.Hash()]; ok {
t.tracer.addInstances(instances)
return true, nil
}
-func (t *traceScheduler) detach(blockHash *bc.Hash, jobs map[bc.Hash][]*Instance) error {
- block, err := t.infra.Chain.GetBlockByHash(blockHash)
+func (t *traceScheduler) detach(jobs map[bc.Hash][]*Instance) error {
+ block, err := t.infra.Chain.GetBlockByHash(&t.currentHash)
if err != nil {
return err
}
}
t.tracer.detachBlock(block)
- *blockHash = block.PreviousBlockHash
+ t.currentHeight--
+ t.currentHash = block.PreviousBlockHash
return nil
}
-func (t *traceScheduler) finishJobs(jobs map[bc.Hash][]*Instance, scannedHash bc.Hash) error {
+func (t *traceScheduler) finishJobs(jobs map[bc.Hash][]*Instance) error {
inSyncInstances := t.tracer.allInstances()
inSyncMap := make(map[string]bool)
for _, inst := range inSyncInstances {
t.releaseInstances(offChainInstances)
- if ok := t.tracerService.takeOverInstances(inSyncInstances, scannedHash); ok {
+ if ok := t.tracerService.takeOverInstances(inSyncInstances, t.currentHash); ok {
t.releaseInstances(inSyncInstances)
}
return nil
infra *Infrastructure
scheduler *traceScheduler
unconfirmedIndex map[bc.Hash]*TreeNode
+ endedInstances map[string]bool
bestHeight uint64
bestHash bc.Hash
}
}
scheduler := newTraceScheduler(infra)
- inSyncInstances := dispatchInstances(allInstances, scheduler)
+ inSyncInstances := dispatchInstances(allInstances, scheduler, infra.Chain.FinalizedHeight())
service := &TraceService{
infra: infra,
tracer: newTracer(inSyncInstances),
scheduler: scheduler,
unconfirmedIndex: make(map[bc.Hash]*TreeNode),
+ endedInstances: make(map[string]bool),
bestHeight: chainStatus.BlockHeight,
bestHash: chainStatus.BlockHash,
}
return service
}
-func dispatchInstances(instances []*Instance, scheduler *traceScheduler) []*Instance {
+func dispatchInstances(instances []*Instance, scheduler *traceScheduler, finalizedHeight uint64) []*Instance {
var result []*Instance
for _, inst := range instances {
if inst.Status == InSync {
result = append(result, inst)
+ } else if inst.Status == Ended {
+ if inst.EndedHeight < finalizedHeight {
+ result = append(result, inst)
+ }
} else if inst.Status == Lagging {
if err := scheduler.addNewJob(inst); err != nil {
logrus.WithField("err", err).Fatal("add new job when init tracer")
defer t.Unlock()
newInstances := t.tracer.applyBlock(block)
+ t.processEndedInstances(newInstances)
t.bestHeight++
t.bestHash = block.Hash()
return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
defer t.Unlock()
newInstances := t.tracer.detachBlock(block)
+ t.processEndedInstances(nil)
t.bestHeight--
t.bestHash = block.PreviousBlockHash
return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
}
treeNode := &TreeNode{TxHash: tx.ID, UTXOs: outUTXOs}
- if inst := t.tracer.table.getByUTXO(inUTXOs[0].Hash); inst != nil {
+ if inst := t.tracer.index.getByUTXO(inUTXOs[0].Hash); inst != nil {
inst.Unconfirmed = append(inst.Unconfirmed, treeNode)
t.addToUnconfirmedIndex(treeNode, outUTXOs)
return
var traceIDs []string
for _, transfer := range transfers {
- inst := newInstance(transfer.inUTXOs, transfer.outUTXOs, txHash, block)
+ inst := newInstance(transfer, block)
traceIDs = append(traceIDs, inst.TraceID)
- if err := t.addNewTraceJob(inst, block); err != nil {
+ if err := t.addNewTraceJob(inst); err != nil {
return nil, err
}
}
}
for _, inst := range instances {
- inst.Status = InSync
+ if inst.Status != Ended {
+ inst.Status = InSync
+ }
}
if err := t.infra.Repository.SaveInstances(instances); err != nil {
}
t.tracer.addInstances(instances)
+ t.processEndedInstances(instances)
return true
}
-func (t *TraceService) addNewTraceJob(inst *Instance, block *types.Block) error {
+func (t *TraceService) processEndedInstances(instances []*Instance) {
+ for _, inst := range instances {
+ if inst.Status == Ended {
+ t.endedInstances[inst.TraceID] = true
+ }
+ }
+
+ finalizedHeight := t.infra.Chain.FinalizedHeight()
+ for traceID := range t.endedInstances {
+ inst := t.tracer.getInstance(traceID)
+ if inst.Status != Ended {
+ delete(t.endedInstances, traceID)
+ } else if finalizedHeight >= inst.EndedHeight {
+ delete(t.endedInstances, traceID)
+ t.tracer.removeInstance(traceID)
+ }
+ }
+}
+
+func (t *TraceService) addNewTraceJob(inst *Instance) error {
if err := t.infra.Repository.SaveInstances([]*Instance{inst}); err != nil {
return err
}
- if inst.Status != Finalized {
+ if inst.Status != Ended {
if err := t.scheduler.addNewJob(inst); err != nil {
return err
}
"encoding/hex"
"github.com/bytom/bytom/consensus/segwit"
+ "github.com/bytom/bytom/protocol/bc"
"github.com/bytom/bytom/protocol/bc/types"
)
type tracer struct {
- table *instanceIndex
+ index *instanceIndex
}
func newTracer(instances []*Instance) *tracer {
- table := newInstanceIndex()
+ index := newInstanceIndex()
for _, inst := range instances {
- table.save(inst)
+ index.save(inst)
}
- return &tracer{table: table}
+ return &tracer{index: index}
+}
+
+func (t *tracer) getInstance(traceID string) *Instance {
+ return t.index.getByID(traceID)
}
func (t *tracer) allInstances() []*Instance {
- return t.table.getAll()
+ return t.index.getAll()
}
func (t *tracer) addInstances(instances []*Instance) {
for _, inst := range instances {
- t.table.save(inst)
+ t.index.save(inst)
}
}
func (t *tracer) removeInstance(traceID string) {
- t.table.remove(traceID)
+ t.index.remove(traceID)
}
func (t *tracer) applyBlock(block *types.Block) []*Instance {
continue
}
- if inst := t.table.getByUTXO(transfer.inUTXOs[0].Hash); inst != nil {
- newInst := inst.transferTo(transfer.outUTXOs, tx.ID)
+ if inst := t.index.getByUTXO(transfer.inUTXOs[0].Hash); inst != nil {
+ newInst := inst.transferTo(transfer, block.Height)
newInstances = append(newInstances, newInst)
}
}
transfers := parseTransfers(tx)
for _, transfer := range transfers {
utxos := append(transfer.outUTXOs, transfer.inUTXOs...)
- if inst := t.table.getByUTXO(utxos[0].Hash); inst != nil {
- newInst := inst.rollbackTo(transfer.inUTXOs)
+ if inst := t.index.getByUTXO(utxos[0].Hash); inst != nil {
+ newInst := inst.rollbackTo(transfer)
newInstances = append(newInstances, newInst)
}
}
}
type transfer struct {
+ txHash bc.Hash
inUTXOs []*UTXO
outUTXOs []*UTXO
}
var transfers []*transfer
for program, utxos := range groupInUTXOs {
outUTXOs := groupOutUTXOs[program]
- transfers = append(transfers, &transfer{inUTXOs: utxos, outUTXOs: outUTXOs})
+ transfers = append(transfers, &transfer{txHash: tx.ID, inUTXOs: utxos, outUTXOs: outUTXOs})
}
for program, utxos := range groupOutUTXOs {
if _, ok := groupInUTXOs[program]; !ok {
- transfers = append(transfers, &transfer{outUTXOs: utxos})
+ transfers = append(transfers, &transfer{txHash: tx.ID, outUTXOs: utxos})
}
}
return transfers
func (t *tracer) saveInstances(instances []*Instance) {
for _, inst := range instances {
- t.table.save(inst)
+ t.index.save(inst)
}
}
return c.bestBlockHeader.Height, c.bestBlockHeader.Hash()
}
+func (c *Chain) FinalizedHeight() uint64 {
+ finalizedHeight, _ := c.casper.LastFinalized()
+ return finalizedHeight
+}
+
// AllValidators return all validators has vote num
func (c *Chain) AllValidators(blockHash *bc.Hash) ([]*state.Validator, error) {
parentCheckpoint, err := c.casper.ParentCheckpoint(blockHash)