7 log "github.com/sirupsen/logrus"
9 "github.com/bytom/bytom/protocol/bc"
10 "github.com/bytom/bytom/protocol/bc/types"
13 const maxAdvanceTraceBlockNum = 3600
16 errGivenTxTooEarly = errors.New("given tx exceed the max num of blocks ahead")
17 errTxAndBlockIsMismatch = errors.New("given tx hash and block hash is mismatch")
18 errTxNotIncludeContract = errors.New("input of tx not include utxo contract")
21 type TraceService struct {
25 scheduler *traceScheduler
26 unconfirmedIndex map[bc.Hash]*TreeNode
27 endedInstances map[string]bool
32 func NewTraceService(infra *Infrastructure) *TraceService {
33 allInstances, err := infra.Repository.LoadInstances()
35 log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("load instances from db")
38 chainStatus := infra.Repository.GetChainStatus()
39 if chainStatus == nil {
40 bestHeight, bestHash := infra.Chain.BestChain()
41 chainStatus = &ChainStatus{BlockHeight: bestHeight, BlockHash: bestHash}
42 if err := infra.Repository.SaveChainStatus(chainStatus); err != nil {
43 log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("init chain status for trace service")
47 scheduler := newTraceScheduler(infra)
48 inSyncInstances := dispatchInstances(allInstances, scheduler, infra.Chain.FinalizedHeight())
50 service := &TraceService{
52 tracer: newTracer(inSyncInstances),
54 unconfirmedIndex: make(map[bc.Hash]*TreeNode),
55 endedInstances: make(map[string]bool),
56 bestHeight: chainStatus.BlockHeight,
57 bestHash: chainStatus.BlockHash,
59 scheduler.start(service)
63 func dispatchInstances(instances []*Instance, scheduler *traceScheduler, finalizedHeight uint64) []*Instance {
64 var result []*Instance
65 for _, inst := range instances {
66 if inst.Status == InSync {
67 result = append(result, inst)
68 } else if inst.Status == Ended {
69 if inst.EndedHeight < finalizedHeight {
70 result = append(result, inst)
72 } else if inst.Status == Lagging {
73 if err := scheduler.addNewJob(inst); err != nil {
74 log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("add new job when init tracer")
81 func (t *TraceService) BestHeight() uint64 {
87 func (t *TraceService) BestHash() bc.Hash {
93 func (t *TraceService) ApplyBlock(block *types.Block) error {
97 newInstances := t.tracer.applyBlock(block)
98 t.processEndedInstances(newInstances)
100 t.bestHash = block.Hash()
101 return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
104 func (t *TraceService) DetachBlock(block *types.Block) error {
108 newInstances := t.tracer.detachBlock(block)
109 t.processEndedInstances(nil)
111 t.bestHash = block.PreviousBlockHash
112 return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
115 func (t *TraceService) AddUnconfirmedTx(tx *types.Tx) {
116 transfers := parseTransfers(tx)
117 for _, transfer := range transfers {
118 inUTXOs, outUTXOs := transfer.inUTXOs, transfer.outUTXOs
119 if len(inUTXOs) == 0 || len(outUTXOs) == 0 {
123 treeNode := &TreeNode{TxHash: tx.ID, UTXOs: outUTXOs}
124 if inst := t.tracer.index.getByUTXO(inUTXOs[0].Hash); inst != nil {
125 inst.Unconfirmed = append(inst.Unconfirmed, treeNode)
126 t.addToUnconfirmedIndex(treeNode, outUTXOs)
130 if parent, ok := t.unconfirmedIndex[inUTXOs[0].Hash]; ok {
131 parent.Children = append(parent.Children, treeNode)
132 t.addToUnconfirmedIndex(treeNode, outUTXOs)
137 func (t *TraceService) CreateInstance(txHash, blockHash bc.Hash) ([]string, error) {
138 block, err := t.infra.Chain.GetBlockByHash(&blockHash)
143 if bestHeight, _ := t.infra.Chain.BestChain(); bestHeight-block.Height > maxAdvanceTraceBlockNum {
144 return nil, errGivenTxTooEarly
147 tx := findTx(block, txHash)
149 return nil, errTxAndBlockIsMismatch
152 transfers := parseTransfers(tx)
153 if len(transfers) == 0 {
154 return nil, errTxNotIncludeContract
157 var traceIDs []string
158 for _, transfer := range transfers {
159 inst := newInstance(transfer, block)
160 traceIDs = append(traceIDs, inst.TraceID)
161 if err := t.addNewTraceJob(inst); err != nil {
168 func (t *TraceService) RemoveInstance(traceID string) error {
172 t.infra.Repository.RemoveInstance(traceID)
173 t.tracer.removeInstance(traceID)
177 func (t *TraceService) GetInstance(traceID string) (*Instance, error) {
178 return t.infra.Repository.GetInstance(traceID)
181 func (t *TraceService) takeOverInstances(instances []*Instance, blockHash bc.Hash) bool {
185 if blockHash != t.bestHash {
189 for _, inst := range instances {
190 if inst.Status != Ended {
195 if err := t.infra.Repository.SaveInstances(instances); err != nil {
196 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save instances when take over instances")
200 t.tracer.addInstances(instances)
201 t.processEndedInstances(instances)
205 func (t *TraceService) processEndedInstances(instances []*Instance) {
206 for _, inst := range instances {
207 if inst.Status == Ended {
208 t.endedInstances[inst.TraceID] = true
212 finalizedHeight := t.infra.Chain.FinalizedHeight()
213 for traceID := range t.endedInstances {
214 inst := t.tracer.getInstance(traceID)
215 if inst.Status != Ended {
216 delete(t.endedInstances, traceID)
217 } else if finalizedHeight >= inst.EndedHeight {
218 delete(t.endedInstances, traceID)
219 t.tracer.removeInstance(traceID)
224 func (t *TraceService) addNewTraceJob(inst *Instance) error {
225 if err := t.infra.Repository.SaveInstances([]*Instance{inst}); err != nil {
229 if inst.Status != Ended {
230 if err := t.scheduler.addNewJob(inst); err != nil {
237 func (t *TraceService) addToUnconfirmedIndex(treeNode *TreeNode, utxos []*UTXO) {
238 for _, utxo := range utxos {
239 t.unconfirmedIndex[utxo.Hash] = treeNode
243 func findTx(block *types.Block, txHash bc.Hash) *types.Tx {
244 for _, tx := range block.Transactions {