7 "github.com/sirupsen/logrus"
9 "github.com/bytom/bytom/protocol/bc"
10 "github.com/bytom/bytom/protocol/bc/types"
13 const maxAdvanceTraceBlockNum = 3600
16 errGivenTxTooEarly = errors.New("given tx exceed the max num of blocks ahead")
17 errTxAndBlockIsMismatch = errors.New("given tx hash and block hash is mismatch")
18 errTxNotIncludeContract = errors.New("input of tx not include utxo contract")
21 type TraceService struct {
25 scheduler *traceScheduler
26 unconfirmedIndex map[bc.Hash]*TreeNode
27 endedInstances map[string]bool
32 func NewTraceService(infra *Infrastructure) *TraceService {
33 allInstances, err := infra.Repository.LoadInstances()
35 logrus.WithField("err", err).Fatal("load instances from db")
38 chainStatus := infra.Repository.GetChainStatus()
39 if chainStatus == nil {
40 chainStatus.BlockHeight, chainStatus.BlockHash = infra.Chain.BestChain()
41 if err := infra.Repository.SaveChainStatus(chainStatus); err != nil {
42 logrus.WithField("err", err).Fatal("init chain status for trace service")
46 scheduler := newTraceScheduler(infra)
47 inSyncInstances := dispatchInstances(allInstances, scheduler, infra.Chain.FinalizedHeight())
49 service := &TraceService{
51 tracer: newTracer(inSyncInstances),
53 unconfirmedIndex: make(map[bc.Hash]*TreeNode),
54 endedInstances: make(map[string]bool),
55 bestHeight: chainStatus.BlockHeight,
56 bestHash: chainStatus.BlockHash,
58 scheduler.start(service)
62 func dispatchInstances(instances []*Instance, scheduler *traceScheduler, finalizedHeight uint64) []*Instance {
63 var result []*Instance
64 for _, inst := range instances {
65 if inst.Status == InSync {
66 result = append(result, inst)
67 } else if inst.Status == Ended {
68 if inst.EndedHeight < finalizedHeight {
69 result = append(result, inst)
71 } else if inst.Status == Lagging {
72 if err := scheduler.addNewJob(inst); err != nil {
73 logrus.WithField("err", err).Fatal("add new job when init tracer")
80 func (t *TraceService) BestHeight() uint64 {
86 func (t *TraceService) BestHash() bc.Hash {
92 func (t *TraceService) ApplyBlock(block *types.Block) error {
96 newInstances := t.tracer.applyBlock(block)
97 t.processEndedInstances(newInstances)
99 t.bestHash = block.Hash()
100 return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
103 func (t *TraceService) DetachBlock(block *types.Block) error {
107 newInstances := t.tracer.detachBlock(block)
108 t.processEndedInstances(nil)
110 t.bestHash = block.PreviousBlockHash
111 return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
114 func (t *TraceService) AddUnconfirmedTx(tx *types.Tx) {
115 transfers := parseTransfers(tx)
116 for _, transfer := range transfers {
117 inUTXOs, outUTXOs := transfer.inUTXOs, transfer.outUTXOs
118 if len(inUTXOs) == 0 || len(outUTXOs) == 0 {
122 treeNode := &TreeNode{TxHash: tx.ID, UTXOs: outUTXOs}
123 if inst := t.tracer.index.getByUTXO(inUTXOs[0].Hash); inst != nil {
124 inst.Unconfirmed = append(inst.Unconfirmed, treeNode)
125 t.addToUnconfirmedIndex(treeNode, outUTXOs)
129 if parent, ok := t.unconfirmedIndex[inUTXOs[0].Hash]; ok {
130 parent.Children = append(parent.Children, treeNode)
131 t.addToUnconfirmedIndex(treeNode, outUTXOs)
136 func (t *TraceService) CreateInstance(txHash, blockHash bc.Hash) ([]string, error) {
137 block, err := t.infra.Chain.GetBlockByHash(&blockHash)
142 if bestHeight, _ := t.infra.Chain.BestChain(); bestHeight-block.Height > maxAdvanceTraceBlockNum {
143 return nil, errGivenTxTooEarly
146 tx := findTx(block, txHash)
148 return nil, errTxAndBlockIsMismatch
151 transfers := parseTransfers(tx)
152 if len(transfers) == 0 {
153 return nil, errTxNotIncludeContract
156 var traceIDs []string
157 for _, transfer := range transfers {
158 inst := newInstance(transfer, block)
159 traceIDs = append(traceIDs, inst.TraceID)
160 if err := t.addNewTraceJob(inst); err != nil {
167 func (t *TraceService) RemoveInstance(traceID string) error {
171 t.infra.Repository.RemoveInstance(traceID)
172 t.tracer.removeInstance(traceID)
176 func (t *TraceService) GetInstance(traceID string) (*Instance, error) {
177 return t.infra.Repository.GetInstance(traceID)
180 func (t *TraceService) takeOverInstances(instances []*Instance, blockHash bc.Hash) bool {
184 if blockHash != t.bestHash {
188 for _, inst := range instances {
189 if inst.Status != Ended {
194 if err := t.infra.Repository.SaveInstances(instances); err != nil {
195 logrus.WithField("err", err).Error("save instances when take over instances")
199 t.tracer.addInstances(instances)
200 t.processEndedInstances(instances)
204 func (t *TraceService) processEndedInstances(instances []*Instance) {
205 for _, inst := range instances {
206 if inst.Status == Ended {
207 t.endedInstances[inst.TraceID] = true
211 finalizedHeight := t.infra.Chain.FinalizedHeight()
212 for traceID := range t.endedInstances {
213 inst := t.tracer.getInstance(traceID)
214 if inst.Status != Ended {
215 delete(t.endedInstances, traceID)
216 } else if finalizedHeight >= inst.EndedHeight {
217 delete(t.endedInstances, traceID)
218 t.tracer.removeInstance(traceID)
223 func (t *TraceService) addNewTraceJob(inst *Instance) error {
224 if err := t.infra.Repository.SaveInstances([]*Instance{inst}); err != nil {
228 if inst.Status != Ended {
229 if err := t.scheduler.addNewJob(inst); err != nil {
236 func (t *TraceService) addToUnconfirmedIndex(treeNode *TreeNode, utxos []*UTXO) {
237 for _, utxo := range utxos {
238 t.unconfirmedIndex[utxo.Hash] = treeNode
242 func findTx(block *types.Block, txHash bc.Hash) *types.Tx {
243 for _, tx := range block.Transactions {