OSDN Git Service

ff5d10913f6e3ad14f863a41b1b274a3e3e0e633
[bytom/bytom.git] / contract / trace_service.go
1 package contract
2
3 import (
4         "errors"
5         "sync"
6
7         "github.com/sirupsen/logrus"
8
9         "github.com/bytom/bytom/protocol/bc"
10         "github.com/bytom/bytom/protocol/bc/types"
11 )
12
13 const maxAdvanceTraceBlockNum = 3600
14
15 var (
16         errGivenTxTooEarly      = errors.New("given tx exceed the max num of blocks ahead")
17         errTxAndBlockIsMismatch = errors.New("given tx hash and block hash is mismatch")
18         errTxNotIncludeContract = errors.New("input of tx not include utxo contract")
19 )
20
21 type TraceService struct {
22         sync.RWMutex
23         tracer           *tracer
24         infra            *Infrastructure
25         scheduler        *traceScheduler
26         unconfirmedIndex map[bc.Hash]*TreeNode
27         endedInstances   map[string]bool
28         bestHeight       uint64
29         bestHash         bc.Hash
30 }
31
32 func NewTraceService(infra *Infrastructure) *TraceService {
33         allInstances, err := infra.Repository.LoadInstances()
34         if err != nil {
35                 logrus.WithField("err", err).Fatal("load instances from db")
36         }
37
38         chainStatus := infra.Repository.GetChainStatus()
39         if chainStatus == nil {
40                 chainStatus.BlockHeight, chainStatus.BlockHash = infra.Chain.BestChain()
41                 if err := infra.Repository.SaveChainStatus(chainStatus); err != nil {
42                         logrus.WithField("err", err).Fatal("init chain status for trace service")
43                 }
44         }
45
46         scheduler := newTraceScheduler(infra)
47         inSyncInstances := dispatchInstances(allInstances, scheduler, infra.Chain.FinalizedHeight())
48
49         service := &TraceService{
50                 infra:            infra,
51                 tracer:           newTracer(inSyncInstances),
52                 scheduler:        scheduler,
53                 unconfirmedIndex: make(map[bc.Hash]*TreeNode),
54                 endedInstances:   make(map[string]bool),
55                 bestHeight:       chainStatus.BlockHeight,
56                 bestHash:         chainStatus.BlockHash,
57         }
58         scheduler.start(service)
59         return service
60 }
61
62 func dispatchInstances(instances []*Instance, scheduler *traceScheduler, finalizedHeight uint64) []*Instance {
63         var result []*Instance
64         for _, inst := range instances {
65                 if inst.Status == InSync {
66                         result = append(result, inst)
67                 } else if inst.Status == Ended {
68                         if inst.EndedHeight < finalizedHeight {
69                                 result = append(result, inst)
70                         }
71                 } else if inst.Status == Lagging {
72                         if err := scheduler.addNewJob(inst); err != nil {
73                                 logrus.WithField("err", err).Fatal("add new job when init tracer")
74                         }
75                 }
76         }
77         return result
78 }
79
80 func (t *TraceService) BestHeight() uint64 {
81         t.RLock()
82         defer t.RUnlock()
83         return t.bestHeight
84 }
85
86 func (t *TraceService) BestHash() bc.Hash {
87         t.RLock()
88         defer t.RUnlock()
89         return t.bestHash
90 }
91
92 func (t *TraceService) ApplyBlock(block *types.Block) error {
93         t.Lock()
94         defer t.Unlock()
95
96         newInstances := t.tracer.applyBlock(block)
97         t.processEndedInstances(newInstances)
98         t.bestHeight++
99         t.bestHash = block.Hash()
100         return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
101 }
102
103 func (t *TraceService) DetachBlock(block *types.Block) error {
104         t.Lock()
105         defer t.Unlock()
106
107         newInstances := t.tracer.detachBlock(block)
108         t.processEndedInstances(nil)
109         t.bestHeight--
110         t.bestHash = block.PreviousBlockHash
111         return t.infra.Repository.SaveInstancesWithStatus(newInstances, t.bestHeight, t.bestHash)
112 }
113
114 func (t *TraceService) AddUnconfirmedTx(tx *types.Tx) {
115         transfers := parseTransfers(tx)
116         for _, transfer := range transfers {
117                 inUTXOs, outUTXOs := transfer.inUTXOs, transfer.outUTXOs
118                 if len(inUTXOs) == 0 || len(outUTXOs) == 0 {
119                         return
120                 }
121
122                 treeNode := &TreeNode{TxHash: tx.ID, UTXOs: outUTXOs}
123                 if inst := t.tracer.index.getByUTXO(inUTXOs[0].Hash); inst != nil {
124                         inst.Unconfirmed = append(inst.Unconfirmed, treeNode)
125                         t.addToUnconfirmedIndex(treeNode, outUTXOs)
126                         return
127                 }
128
129                 if parent, ok := t.unconfirmedIndex[inUTXOs[0].Hash]; ok {
130                         parent.Children = append(parent.Children, treeNode)
131                         t.addToUnconfirmedIndex(treeNode, outUTXOs)
132                 }
133         }
134 }
135
136 func (t *TraceService) CreateInstance(txHash, blockHash bc.Hash) ([]string, error) {
137         block, err := t.infra.Chain.GetBlockByHash(&blockHash)
138         if err != nil {
139                 return nil, err
140         }
141
142         if bestHeight, _ := t.infra.Chain.BestChain(); bestHeight-block.Height > maxAdvanceTraceBlockNum {
143                 return nil, errGivenTxTooEarly
144         }
145
146         tx := findTx(block, txHash)
147         if tx == nil {
148                 return nil, errTxAndBlockIsMismatch
149         }
150
151         transfers := parseTransfers(tx)
152         if len(transfers) == 0 {
153                 return nil, errTxNotIncludeContract
154         }
155
156         var traceIDs []string
157         for _, transfer := range transfers {
158                 inst := newInstance(transfer, block)
159                 traceIDs = append(traceIDs, inst.TraceID)
160                 if err := t.addNewTraceJob(inst); err != nil {
161                         return nil, err
162                 }
163         }
164         return traceIDs, nil
165 }
166
167 func (t *TraceService) RemoveInstance(traceID string) error {
168         t.Lock()
169         defer t.Unlock()
170
171         t.infra.Repository.RemoveInstance(traceID)
172         t.tracer.removeInstance(traceID)
173         return nil
174 }
175
176 func (t *TraceService) GetInstance(traceID string) (*Instance, error) {
177         return t.infra.Repository.GetInstance(traceID)
178 }
179
180 func (t *TraceService) takeOverInstances(instances []*Instance, blockHash bc.Hash) bool {
181         t.Lock()
182         defer t.Unlock()
183
184         if blockHash != t.bestHash {
185                 return false
186         }
187
188         for _, inst := range instances {
189                 if inst.Status != Ended {
190                         inst.Status = InSync
191                 }
192         }
193
194         if err := t.infra.Repository.SaveInstances(instances); err != nil {
195                 logrus.WithField("err", err).Error("save instances when take over instances")
196                 return false
197         }
198
199         t.tracer.addInstances(instances)
200         t.processEndedInstances(instances)
201         return true
202 }
203
204 func (t *TraceService) processEndedInstances(instances []*Instance) {
205         for _, inst := range instances {
206                 if inst.Status == Ended {
207                         t.endedInstances[inst.TraceID] = true
208                 }
209         }
210
211         finalizedHeight := t.infra.Chain.FinalizedHeight()
212         for traceID := range t.endedInstances {
213                 inst := t.tracer.getInstance(traceID)
214                 if inst.Status != Ended {
215                         delete(t.endedInstances, traceID)
216                 } else if finalizedHeight >= inst.EndedHeight {
217                         delete(t.endedInstances, traceID)
218                         t.tracer.removeInstance(traceID)
219                 }
220         }
221 }
222
223 func (t *TraceService) addNewTraceJob(inst *Instance) error {
224         if err := t.infra.Repository.SaveInstances([]*Instance{inst}); err != nil {
225                 return err
226         }
227
228         if inst.Status != Ended {
229                 if err := t.scheduler.addNewJob(inst); err != nil {
230                         return err
231                 }
232         }
233         return nil
234 }
235
236 func (t *TraceService) addToUnconfirmedIndex(treeNode *TreeNode, utxos []*UTXO) {
237         for _, utxo := range utxos {
238                 t.unconfirmedIndex[utxo.Hash] = treeNode
239         }
240 }
241
242 func findTx(block *types.Block, txHash bc.Hash) *types.Tx {
243         for _, tx := range block.Transactions {
244                 if tx.ID == txHash {
245                         return tx
246                 }
247         }
248         return nil
249 }