OSDN Git Service

add trace log module (#2116)
[bytom/bytom.git] / contract / trace_service.go
1 package contract
2
3 import (
4         "errors"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/bytom/protocol/bc"
10         "github.com/bytom/bytom/protocol/bc/types"
11 )
12
13 const maxAdvanceTraceBlockNum = 3600
14
15 var (
16         errGivenTxTooEarly      = errors.New("given tx exceed the max num of blocks ahead")
17         errTxAndBlockIsMismatch = errors.New("given tx hash and block hash is mismatch")
18         errTxNotIncludeContract = errors.New("input of tx not include utxo contract")
19 )
20
21 type TraceService struct {
22         sync.RWMutex
23         tracer           *tracer
24         infra            *Infrastructure
25         scheduler        *traceScheduler
26         unconfirmedIndex map[bc.Hash]*TreeNode
27         endedInstances   map[string]bool
28         bestHeight       uint64
29         bestHash         bc.Hash
30 }
31
32 func NewTraceService(infra *Infrastructure) *TraceService {
33         allInstances, err := infra.Repository.LoadInstances()
34         if err != nil {
35                 log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("load instances from db")
36         }
37
38         chainStatus := infra.Repository.GetChainStatus()
39         if chainStatus == nil {
40                 bestHeight, bestHash := infra.Chain.BestChain()
41                 chainStatus = &ChainStatus{BlockHeight: bestHeight, BlockHash: bestHash}
42                 if err := infra.Repository.SaveChainStatus(chainStatus); err != nil {
43                         log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("init chain status for trace service")
44                 }
45         }
46
47         scheduler := newTraceScheduler(infra)
48         inSyncInstances := dispatchInstances(allInstances, scheduler, infra.Chain.FinalizedHeight())
49
50         service := &TraceService{
51                 infra:            infra,
52                 tracer:           newTracer(inSyncInstances),
53                 scheduler:        scheduler,
54                 unconfirmedIndex: make(map[bc.Hash]*TreeNode),
55                 endedInstances:   make(map[string]bool),
56                 bestHeight:       chainStatus.BlockHeight,
57                 bestHash:         chainStatus.BlockHash,
58         }
59         scheduler.start(service)
60         return service
61 }
62
63 func dispatchInstances(instances []*Instance, scheduler *traceScheduler, finalizedHeight uint64) []*Instance {
64         var result []*Instance
65         for _, inst := range instances {
66                 if inst.Status == InSync {
67                         result = append(result, inst)
68                 } else if inst.Status == Ended {
69                         if inst.EndedHeight < finalizedHeight {
70                                 result = append(result, inst)
71                         }
72                 } else if inst.Status == Lagging {
73                         if err := scheduler.addNewJob(inst); err != nil {
74                                 log.WithFields(log.Fields{"module": logModule, "err": err}).Fatal("add new job when init tracer")
75                         }
76                 }
77         }
78         return result
79 }
80
81 func (t *TraceService) BestHeight() uint64 {
82         t.RLock()
83         defer t.RUnlock()
84         return t.bestHeight
85 }
86
87 func (t *TraceService) BestHash() bc.Hash {
88         t.RLock()
89         defer t.RUnlock()
90         return t.bestHash
91 }
92
93 func (t *TraceService) ApplyBlock(block *types.Block) error {
94         t.Lock()
95         defer t.Unlock()
96
97         newInstances := t.tracer.applyBlock(block)
98         t.processEndedInstances(newInstances)
99         t.bestHeight++
100         t.bestHash = block.Hash()
101         return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
102 }
103
104 func (t *TraceService) DetachBlock(block *types.Block) error {
105         t.Lock()
106         defer t.Unlock()
107
108         newInstances := t.tracer.detachBlock(block)
109         t.processEndedInstances(nil)
110         t.bestHeight--
111         t.bestHash = block.PreviousBlockHash
112         return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
113 }
114
115 func (t *TraceService) AddUnconfirmedTx(tx *types.Tx) {
116         transfers := parseTransfers(tx)
117         for _, transfer := range transfers {
118                 inUTXOs, outUTXOs := transfer.inUTXOs, transfer.outUTXOs
119                 if len(inUTXOs) == 0 || len(outUTXOs) == 0 {
120                         return
121                 }
122
123                 treeNode := &TreeNode{TxHash: tx.ID, UTXOs: outUTXOs}
124                 if inst := t.tracer.index.getByUTXO(inUTXOs[0].Hash); inst != nil {
125                         inst.Unconfirmed = append(inst.Unconfirmed, treeNode)
126                         t.addToUnconfirmedIndex(treeNode, outUTXOs)
127                         return
128                 }
129
130                 if parent, ok := t.unconfirmedIndex[inUTXOs[0].Hash]; ok {
131                         parent.Children = append(parent.Children, treeNode)
132                         t.addToUnconfirmedIndex(treeNode, outUTXOs)
133                 }
134         }
135 }
136
137 func (t *TraceService) CreateInstance(txHash, blockHash bc.Hash) ([]string, error) {
138         block, err := t.infra.Chain.GetBlockByHash(&blockHash)
139         if err != nil {
140                 return nil, err
141         }
142
143         if bestHeight, _ := t.infra.Chain.BestChain(); bestHeight-block.Height > maxAdvanceTraceBlockNum {
144                 return nil, errGivenTxTooEarly
145         }
146
147         tx := findTx(block, txHash)
148         if tx == nil {
149                 return nil, errTxAndBlockIsMismatch
150         }
151
152         transfers := parseTransfers(tx)
153         if len(transfers) == 0 {
154                 return nil, errTxNotIncludeContract
155         }
156
157         var traceIDs []string
158         for _, transfer := range transfers {
159                 inst := newInstance(transfer, block)
160                 traceIDs = append(traceIDs, inst.TraceID)
161                 if err := t.addNewTraceJob(inst); err != nil {
162                         return nil, err
163                 }
164         }
165         return traceIDs, nil
166 }
167
168 func (t *TraceService) RemoveInstance(traceID string) error {
169         t.Lock()
170         defer t.Unlock()
171
172         t.infra.Repository.RemoveInstance(traceID)
173         t.tracer.removeInstance(traceID)
174         return nil
175 }
176
177 func (t *TraceService) GetInstance(traceID string) (*Instance, error) {
178         return t.infra.Repository.GetInstance(traceID)
179 }
180
181 func (t *TraceService) takeOverInstances(instances []*Instance, blockHash bc.Hash) bool {
182         t.Lock()
183         defer t.Unlock()
184
185         if blockHash != t.bestHash {
186                 return false
187         }
188
189         for _, inst := range instances {
190                 if inst.Status != Ended {
191                         inst.Status = InSync
192                 }
193         }
194
195         if err := t.infra.Repository.SaveInstances(instances); err != nil {
196                 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("save instances when take over instances")
197                 return false
198         }
199
200         t.tracer.addInstances(instances)
201         t.processEndedInstances(instances)
202         return true
203 }
204
205 func (t *TraceService) processEndedInstances(instances []*Instance) {
206         for _, inst := range instances {
207                 if inst.Status == Ended {
208                         t.endedInstances[inst.TraceID] = true
209                 }
210         }
211
212         finalizedHeight := t.infra.Chain.FinalizedHeight()
213         for traceID := range t.endedInstances {
214                 inst := t.tracer.getInstance(traceID)
215                 if inst.Status != Ended {
216                         delete(t.endedInstances, traceID)
217                 } else if finalizedHeight >= inst.EndedHeight {
218                         delete(t.endedInstances, traceID)
219                         t.tracer.removeInstance(traceID)
220                 }
221         }
222 }
223
224 func (t *TraceService) addNewTraceJob(inst *Instance) error {
225         if err := t.infra.Repository.SaveInstances([]*Instance{inst}); err != nil {
226                 return err
227         }
228
229         if inst.Status != Ended {
230                 if err := t.scheduler.addNewJob(inst); err != nil {
231                         return err
232                 }
233         }
234         return nil
235 }
236
237 func (t *TraceService) addToUnconfirmedIndex(treeNode *TreeNode, utxos []*UTXO) {
238         for _, utxo := range utxos {
239                 t.unconfirmedIndex[utxo.Hash] = treeNode
240         }
241 }
242
243 func findTx(block *types.Block, txHash bc.Hash) *types.Tx {
244         for _, tx := range block.Transactions {
245                 if tx.ID == txHash {
246                         return tx
247                 }
248         }
249         return nil
250 }