OSDN Git Service

add btm descriptor in log & Add issue amout as a argment
[bytom/bytom.git] / blockchain / transact.go
1 package blockchain
2
3 import (
4         "context"
5         "encoding/json"
6         "sync"
7         "time"
8
9         "github.com/bytom/blockchain/txbuilder"
10         chainjson "github.com/bytom/encoding/json"
11         "github.com/bytom/errors"
12         "github.com/bytom/net/http/httperror"
13         "github.com/bytom/net/http/reqid"
14         "github.com/bytom/protocol/bc/legacy"
15         "github.com/bytom/log"
16 )
17
18 var defaultTxTTL = 5 * time.Minute
19
20 func (a *BlockchainReactor) actionDecoder(action string) (func([]byte) (txbuilder.Action, error), bool) {
21         var decoder func([]byte) (txbuilder.Action, error)
22         switch action {
23         case "control_account":
24                 decoder = a.accounts.DecodeControlAction
25         case "control_program":
26                 decoder = txbuilder.DecodeControlProgramAction
27         case "control_receiver":
28                 decoder = txbuilder.DecodeControlReceiverAction
29         case "issue":
30                 decoder = a.assets.DecodeIssueAction
31         case "retire":
32                 decoder = txbuilder.DecodeRetireAction
33         case "spend_account":
34                 decoder = a.accounts.DecodeSpendAction
35         case "spend_account_unspent_output":
36                 decoder = a.accounts.DecodeSpendUTXOAction
37         case "set_transaction_reference_data":
38                 decoder = txbuilder.DecodeSetTxRefDataAction
39         default:
40                 return nil, false
41         }
42         return decoder, true
43 }
44
45 func (a *BlockchainReactor) buildSingle(ctx context.Context, req *BuildRequest) (*txbuilder.Template, error) {
46         err := a.filterAliases(ctx, req)
47         if err != nil {
48                 return nil, err
49         }
50         actions := make([]txbuilder.Action, 0, len(req.Actions))
51         for i, act := range req.Actions {
52                 typ, ok := act["type"].(string)
53                 if !ok {
54                         return nil, errors.WithDetailf(errBadActionType, "no action type provided on action %d", i)
55                 }
56                 decoder, ok := a.actionDecoder(typ)
57                 if !ok {
58                         return nil, errors.WithDetailf(errBadActionType, "unknown action type %q on action %d", typ, i)
59                 }
60
61                 // Remarshal to JSON, the action may have been modified when we
62                 // filtered aliases.
63                 b, err := json.Marshal(act)
64                 if err != nil {
65                         return nil, err
66                 }
67                 action, err := decoder(b)
68                 if err != nil {
69                         return nil, errors.WithDetailf(errBadAction, "%s on action %d", err.Error(), i)
70                 }
71                 actions = append(actions, action)
72         }
73
74         ttl := req.TTL.Duration
75         if ttl == 0 {
76                 ttl = defaultTxTTL
77         }
78         maxTime := time.Now().Add(ttl)
79
80         tpl, err := txbuilder.Build(ctx, req.Tx, actions, maxTime)
81         if errors.Root(err) == txbuilder.ErrAction {
82                 // Format each of the inner errors contained in the data.
83                 var formattedErrs []httperror.Response
84                 for _, innerErr := range errors.Data(err)["actions"].([]error) {
85                         resp := errorFormatter.Format(innerErr)
86                         formattedErrs = append(formattedErrs, resp)
87                 }
88                 err = errors.WithData(err, "actions", formattedErrs)
89         }
90         if err != nil {
91                 return nil, err
92         }
93
94         // ensure null is never returned for signing instructions
95         if tpl.SigningInstructions == nil {
96                 tpl.SigningInstructions = []*txbuilder.SigningInstruction{}
97         }
98         return tpl, nil
99 }
100
101 // POST /build-transaction
102 func (a *BlockchainReactor) build(ctx context.Context, buildReqs []*BuildRequest) (interface{}, error) {
103         responses := make([]interface{}, len(buildReqs))
104         var wg sync.WaitGroup
105         wg.Add(len(responses))
106
107         for i := 0; i < len(responses); i++ {
108                 go func(i int) {
109                         subctx := reqid.NewSubContext(ctx, reqid.New())
110                         defer wg.Done()
111                         defer batchRecover(subctx, &responses[i])
112
113                         tmpl, err := a.buildSingle(subctx, buildReqs[i])
114                         if err != nil {
115                                 responses[i] = err
116                         } else {
117                                 responses[i] = tmpl
118                         }
119                 }(i)
120         }
121
122         wg.Wait()
123         return responses, nil
124 }
125
126 func (a *BlockchainReactor) submitSingle(ctx context.Context, tpl *txbuilder.Template, waitUntil string) (interface{}, error) {
127         if tpl.Transaction == nil {
128                 return nil, errors.Wrap(txbuilder.ErrMissingRawTx)
129         }
130
131         err := a.finalizeTxWait(ctx, tpl, waitUntil)
132         if err != nil {
133                 return nil, errors.Wrapf(err, "tx %s", tpl.Transaction.ID.String())
134         }
135
136         return map[string]string{"id": tpl.Transaction.ID.String()}, nil
137 }
138
139 /*
140 // recordSubmittedTx records a lower bound height at which the tx
141 // was first submitted to the tx pool. If this request fails for
142 // some reason, a retry will know to look for the transaction in
143 // blocks starting at this height.
144 //
145 // If the tx has already been submitted, it returns the existing
146 // height.
147 func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
148         const insertQ = `
149                 INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
150                 ON CONFLICT DO NOTHING
151         `
152         res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
153         if err != nil {
154                 return 0, err
155         }
156         inserted, err := res.RowsAffected()
157         if err != nil {
158                 return 0, err
159         }
160         if inserted == 1 {
161                 return currentHeight, nil
162         }
163
164         // The insert didn't affect any rows, meaning there was already an entry
165         // for this transaction hash.
166         const selectQ = `
167                 SELECT height FROM submitted_txs WHERE tx_hash = $1
168         `
169         var height uint64
170         err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
171         return height, err
172 }
173 */
174
175 /*
176 // cleanUpSubmittedTxs will periodically delete records of submitted txs
177 // older than a day. This function blocks and only exits when its context
178 // is cancelled.
179 func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
180         ticker := time.NewTicker(15 * time.Minute)
181         for {
182                 select {
183                 case <-ticker.C:
184                         // TODO(jackson): We could avoid expensive bulk deletes by partitioning
185                         // the table and DROP-ing tables of expired rows. Partitioning doesn't
186                         // play well with ON CONFLICT clauses though, so we would need to rework
187                         // how we guarantee uniqueness.
188                         const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
189                         _, err := db.Exec(ctx, q)
190                         if err != nil {
191                                 log.Error(ctx, err)
192                         }
193                 case <-ctx.Done():
194                         ticker.Stop()
195                         return
196                 }
197         }
198 }
199 */
200
201 // finalizeTxWait calls FinalizeTx and then waits for confirmation of
202 // the transaction.  A nil error return means the transaction is
203 // confirmed on the blockchain.  ErrRejected means a conflicting tx is
204 // on the blockchain.  context.DeadlineExceeded means ctx is an
205 // expiring context that timed out.
206 func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
207         // Use the current generator height as the lower bound of the block height
208         // that the transaction may appear in.
209         localHeight := a.chain.Height()
210         generatorHeight := localHeight
211
212         log.Printf(ctx, "localHeight:%v\n", localHeight)
213         // Remember this height in case we retry this submit call.
214         /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
215         if err != nil {
216                 return errors.Wrap(err, "saving tx submitted height")
217         }*/
218
219         err := txbuilder.FinalizeTx(ctx, a.chain, txTemplate.Transaction)
220         if err != nil {
221                 return err
222         }
223         if waitUntil == "none" {
224                 return nil
225         }
226
227         _, err = a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
228         if err != nil {
229                 return err
230         }
231         if waitUntil == "confirmed" {
232                 return nil
233         }
234         /*
235
236                 select {
237                 case <-ctx.Done():
238                         return ctx.Err()
239                 case <-a.pinStore.AllWaiter(height):
240                 }
241         */
242
243         return nil
244 }
245
246 func (a *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *legacy.Tx, height uint64) (uint64, error) {
247         log.Printf(ctx, "waitForTxInBlock function.")
248         for {
249                 height++
250                 select {
251                 case <-ctx.Done():
252                         return 0, ctx.Err()
253
254                 case <-a.chain.BlockWaiter(height):
255                         b, err := a.chain.GetBlock(height)
256                         if err != nil {
257                                 return 0, errors.Wrap(err, "getting block that just landed")
258                         }
259                         for _, confirmed := range b.Transactions {
260                                 if confirmed.ID == tx.ID {
261                                         // confirmed
262                                         return height, nil
263                                 }
264                         }
265
266                         if tx.MaxTime > 0 && tx.MaxTime < b.TimestampMS {
267                                 return 0, errors.Wrap(txbuilder.ErrRejected, "transaction max time exceeded")
268                         }
269
270                         // might still be in pool or might be rejected; we can't
271                         // tell definitively until its max time elapses.
272
273                         // Re-insert into the pool in case it was dropped.
274                         err = txbuilder.FinalizeTx(ctx, a.chain, tx)
275                         if err != nil {
276                                 return 0, err
277                         }
278
279                         // TODO(jackson): Do simple rejection checks like checking if
280                         // the tx's blockchain prevouts still exist in the state tree.
281                 }
282         }
283 }
284
285 type SubmitArg struct {
286         Transactions []txbuilder.Template
287         Wait         chainjson.Duration
288         WaitUntil    string `json:"wait_until"` // values none, confirmed, processed. default: processed
289 }
290
291 // POST /submit-transaction
292 func (a *BlockchainReactor) submit(ctx context.Context, x SubmitArg) (interface{}, error) {
293         // Setup a timeout for the provided wait duration.
294         timeout := x.Wait.Duration
295         if timeout <= 0 {
296                 timeout = 30 * time.Second
297         }
298         ctx, cancel := context.WithTimeout(ctx, timeout)
299         defer cancel()
300
301         responses := make([]interface{}, len(x.Transactions))
302         var wg sync.WaitGroup
303         wg.Add(len(responses))
304         for i := range responses {
305                 go func(i int) {
306                         subctx := reqid.NewSubContext(ctx, reqid.New())
307                         defer wg.Done()
308                         defer batchRecover(subctx, &responses[i])
309
310                         tx, err := a.submitSingle(subctx, &x.Transactions[i], x.WaitUntil)
311                         log.Printf(ctx, "-----tx:%v\n", tx)
312                         if err != nil {
313                                 responses[i] = err
314                         } else {
315                                 responses[i] = tx
316                         }
317                 }(i)
318         }
319
320         wg.Wait()
321         return responses, nil
322 }