OSDN Git Service

add query API
[bytom/bytom.git] / blockchain / transact.go
1 package blockchain
2
3 import (
4         "context"
5         "encoding/json"
6         "sync"
7         "time"
8
9         //"github.com/bytom/blockchain/fetch"
10         "github.com/bytom/blockchain/txbuilder"
11         chainjson "github.com/bytom/encoding/json"
12         "github.com/bytom/errors"
13         //"github.com/bytom/log"
14         "github.com/bytom/net/http/httperror"
15         "github.com/bytom/net/http/reqid"
16         //"github.com/bytom/protocol/bc"
17         "github.com/bytom/protocol/bc/legacy"
18 )
19
20 var defaultTxTTL = 5 * time.Minute
21
22 func (a *BlockchainReactor) actionDecoder(action string) (func([]byte) (txbuilder.Action, error), bool) {
23         var decoder func([]byte) (txbuilder.Action, error)
24         switch action {
25         case "control_account":
26                 decoder = a.accounts.DecodeControlAction
27         case "control_program":
28                 decoder = txbuilder.DecodeControlProgramAction
29         case "control_receiver":
30                 decoder = txbuilder.DecodeControlReceiverAction
31         case "issue":
32                 decoder = a.assets.DecodeIssueAction
33         case "retire":
34                 decoder = txbuilder.DecodeRetireAction
35         case "spend_account":
36                 decoder = a.accounts.DecodeSpendAction
37         case "spend_account_unspent_output":
38                 decoder = a.accounts.DecodeSpendUTXOAction
39         case "set_transaction_reference_data":
40                 decoder = txbuilder.DecodeSetTxRefDataAction
41         default:
42                 return nil, false
43         }
44         return decoder, true
45 }
46
47 func (a *BlockchainReactor) buildSingle(ctx context.Context, req *buildRequest) (*txbuilder.Template, error) {
48         err := a.filterAliases(ctx, req)
49         if err != nil {
50                 return nil, err
51         }
52         actions := make([]txbuilder.Action, 0, len(req.Actions))
53         for i, act := range req.Actions {
54                 typ, ok := act["type"].(string)
55                 if !ok {
56                         return nil, errors.WithDetailf(errBadActionType, "no action type provided on action %d", i)
57                 }
58                 decoder, ok := a.actionDecoder(typ)
59                 if !ok {
60                         return nil, errors.WithDetailf(errBadActionType, "unknown action type %q on action %d", typ, i)
61                 }
62
63                 // Remarshal to JSON, the action may have been modified when we
64                 // filtered aliases.
65                 b, err := json.Marshal(act)
66                 if err != nil {
67                         return nil, err
68                 }
69                 a, err := decoder(b)
70                 if err != nil {
71                         return nil, errors.WithDetailf(errBadAction, "%s on action %d", err.Error(), i)
72                 }
73                 actions = append(actions, a)
74         }
75
76         ttl := req.TTL.Duration
77         if ttl == 0 {
78                 ttl = defaultTxTTL
79         }
80         maxTime := time.Now().Add(ttl)
81         tpl, err := txbuilder.Build(ctx, req.Tx, actions, maxTime)
82         if errors.Root(err) == txbuilder.ErrAction {
83                 // Format each of the inner errors contained in the data.
84                 var formattedErrs []httperror.Response
85                 for _, innerErr := range errors.Data(err)["actions"].([]error) {
86                         resp := errorFormatter.Format(innerErr)
87                         formattedErrs = append(formattedErrs, resp)
88                 }
89                 err = errors.WithData(err, "actions", formattedErrs)
90         }
91         if err != nil {
92                 return nil, err
93         }
94
95         // ensure null is never returned for signing instructions
96         if tpl.SigningInstructions == nil {
97                 tpl.SigningInstructions = []*txbuilder.SigningInstruction{}
98         }
99         return tpl, nil
100 }
101
102 // POST /build-transaction
103 func (a *BlockchainReactor) build(ctx context.Context, buildReqs []*buildRequest) (interface{}, error) {
104         responses := make([]interface{}, len(buildReqs))
105         var wg sync.WaitGroup
106         wg.Add(len(responses))
107
108         for i := 0; i < len(responses); i++ {
109                 go func(i int) {
110                         subctx := reqid.NewSubContext(ctx, reqid.New())
111                         defer wg.Done()
112                         defer batchRecover(subctx, &responses[i])
113
114                         tmpl, err := a.buildSingle(subctx, buildReqs[i])
115                         if err != nil {
116                                 responses[i] = err
117                         } else {
118                                 responses[i] = tmpl
119                         }
120                 }(i)
121         }
122
123         wg.Wait()
124         return responses, nil
125 }
126
127 func (a *BlockchainReactor) submitSingle(ctx context.Context, tpl *txbuilder.Template, waitUntil string) (interface{}, error) {
128         if tpl.Transaction == nil {
129                 return nil, errors.Wrap(txbuilder.ErrMissingRawTx)
130         }
131
132         err := a.finalizeTxWait(ctx, tpl, waitUntil)
133         if err != nil {
134                 return nil, errors.Wrapf(err, "tx %s", tpl.Transaction.ID.String())
135         }
136
137         return map[string]string{"id": tpl.Transaction.ID.String()}, nil
138 }
139
140 /*
141 // recordSubmittedTx records a lower bound height at which the tx
142 // was first submitted to the tx pool. If this request fails for
143 // some reason, a retry will know to look for the transaction in
144 // blocks starting at this height.
145 //
146 // If the tx has already been submitted, it returns the existing
147 // height.
148 func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
149         const insertQ = `
150                 INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
151                 ON CONFLICT DO NOTHING
152         `
153         res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
154         if err != nil {
155                 return 0, err
156         }
157         inserted, err := res.RowsAffected()
158         if err != nil {
159                 return 0, err
160         }
161         if inserted == 1 {
162                 return currentHeight, nil
163         }
164
165         // The insert didn't affect any rows, meaning there was already an entry
166         // for this transaction hash.
167         const selectQ = `
168                 SELECT height FROM submitted_txs WHERE tx_hash = $1
169         `
170         var height uint64
171         err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
172         return height, err
173 }
174 */
175
176 /*
177 // cleanUpSubmittedTxs will periodically delete records of submitted txs
178 // older than a day. This function blocks and only exits when its context
179 // is cancelled.
180 func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
181         ticker := time.NewTicker(15 * time.Minute)
182         for {
183                 select {
184                 case <-ticker.C:
185                         // TODO(jackson): We could avoid expensive bulk deletes by partitioning
186                         // the table and DROP-ing tables of expired rows. Partitioning doesn't
187                         // play well with ON CONFLICT clauses though, so we would need to rework
188                         // how we guarantee uniqueness.
189                         const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
190                         _, err := db.Exec(ctx, q)
191                         if err != nil {
192                                 log.Error(ctx, err)
193                         }
194                 case <-ctx.Done():
195                         ticker.Stop()
196                         return
197                 }
198         }
199 }
200 */
201
202 // finalizeTxWait calls FinalizeTx and then waits for confirmation of
203 // the transaction.  A nil error return means the transaction is
204 // confirmed on the blockchain.  ErrRejected means a conflicting tx is
205 // on the blockchain.  context.DeadlineExceeded means ctx is an
206 // expiring context that timed out.
207 func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
208         // Use the current generator height as the lower bound of the block height
209         // that the transaction may appear in.
210         localHeight := a.chain.Height()
211         generatorHeight := localHeight
212
213         // Remember this height in case we retry this submit call.
214         /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
215         if err != nil {
216                 return errors.Wrap(err, "saving tx submitted height")
217         }*/
218
219         err := txbuilder.FinalizeTx(ctx, a.chain, a.submitter, txTemplate.Transaction)
220         if err != nil {
221                 return err
222         }
223         if waitUntil == "none" {
224                 return nil
225         }
226
227         _, err = a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
228         if err != nil {
229                 return err
230         }
231         if waitUntil == "confirmed" {
232                 return nil
233         }
234         /*
235
236                 select {
237                 case <-ctx.Done():
238                         return ctx.Err()
239                 case <-a.pinStore.AllWaiter(height):
240                 }
241         */
242
243         return nil
244 }
245
246 func (a *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *legacy.Tx, height uint64) (uint64, error) {
247         for {
248                 height++
249                 select {
250                 case <-ctx.Done():
251                         return 0, ctx.Err()
252
253                 case <-a.chain.BlockWaiter(height):
254                         b, err := a.chain.GetBlock(height)
255                         if err != nil {
256                                 return 0, errors.Wrap(err, "getting block that just landed")
257                         }
258                         for _, confirmed := range b.Transactions {
259                                 if confirmed.ID == tx.ID {
260                                         // confirmed
261                                         return height, nil
262                                 }
263                         }
264
265                         if tx.MaxTime > 0 && tx.MaxTime < b.TimestampMS {
266                                 return 0, errors.Wrap(txbuilder.ErrRejected, "transaction max time exceeded")
267                         }
268
269                         // might still be in pool or might be rejected; we can't
270                         // tell definitively until its max time elapses.
271
272                         // Re-insert into the pool in case it was dropped.
273                         err = txbuilder.FinalizeTx(ctx, a.chain, a.submitter, tx)
274                         if err != nil {
275                                 return 0, err
276                         }
277
278                         // TODO(jackson): Do simple rejection checks like checking if
279                         // the tx's blockchain prevouts still exist in the state tree.
280                 }
281         }
282 }
283
284 type submitArg struct {
285         Transactions []txbuilder.Template
286         wait         chainjson.Duration
287         WaitUntil    string `json:"wait_until"` // values none, confirmed, processed. default: processed
288 }
289
290 // POST /submit-transaction
291 func (a *BlockchainReactor) submit(ctx context.Context, x submitArg) (interface{}, error) {
292         // Setup a timeout for the provided wait duration.
293         timeout := x.wait.Duration
294         if timeout <= 0 {
295                 timeout = 30 * time.Second
296         }
297         ctx, cancel := context.WithTimeout(ctx, timeout)
298         defer cancel()
299
300         responses := make([]interface{}, len(x.Transactions))
301         var wg sync.WaitGroup
302         wg.Add(len(responses))
303         for i := range responses {
304                 go func(i int) {
305                         subctx := reqid.NewSubContext(ctx, reqid.New())
306                         defer wg.Done()
307                         defer batchRecover(subctx, &responses[i])
308
309                         tx, err := a.submitSingle(subctx, &x.Transactions[i], x.WaitUntil)
310                         if err != nil {
311                                 responses[i] = err
312                         } else {
313                                 responses[i] = tx
314                         }
315                 }(i)
316         }
317
318         wg.Wait()
319         return responses, nil
320 }