OSDN Git Service

7564eb68ad5cb7d851b5118497be7f580910496c
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / database / cmd / dbtool / insecureimport.go
1 // Copyright (c) 2015-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package main
6
7 import (
8         "encoding/binary"
9         "fmt"
10         "io"
11         "os"
12         "sync"
13         "time"
14
15         "github.com/btcsuite/btcd/chaincfg/chainhash"
16         "github.com/btcsuite/btcd/database"
17         "github.com/btcsuite/btcd/wire"
18         "github.com/btcsuite/btcutil"
19 )
20
21 // importCmd defines the configuration options for the insecureimport command.
22 type importCmd struct {
23         InFile   string `short:"i" long:"infile" description:"File containing the block(s)"`
24         Progress int    `short:"p" long:"progress" description:"Show a progress message each time this number of seconds have passed -- Use 0 to disable progress announcements"`
25 }
26
27 var (
28         // importCfg defines the configuration options for the command.
29         importCfg = importCmd{
30                 InFile:   "bootstrap.dat",
31                 Progress: 10,
32         }
33
34         // zeroHash is a simply a hash with all zeros.  It is defined here to
35         // avoid creating it multiple times.
36         zeroHash = chainhash.Hash{}
37 )
38
39 // importResults houses the stats and result as an import operation.
40 type importResults struct {
41         blocksProcessed int64
42         blocksImported  int64
43         err             error
44 }
45
46 // blockImporter houses information about an ongoing import from a block data
47 // file to the block database.
48 type blockImporter struct {
49         db                database.DB
50         r                 io.ReadSeeker
51         processQueue      chan []byte
52         doneChan          chan bool
53         errChan           chan error
54         quit              chan struct{}
55         wg                sync.WaitGroup
56         blocksProcessed   int64
57         blocksImported    int64
58         receivedLogBlocks int64
59         receivedLogTx     int64
60         lastHeight        int64
61         lastBlockTime     time.Time
62         lastLogTime       time.Time
63 }
64
65 // readBlock reads the next block from the input file.
66 func (bi *blockImporter) readBlock() ([]byte, error) {
67         // The block file format is:
68         //  <network> <block length> <serialized block>
69         var net uint32
70         err := binary.Read(bi.r, binary.LittleEndian, &net)
71         if err != nil {
72                 if err != io.EOF {
73                         return nil, err
74                 }
75
76                 // No block and no error means there are no more blocks to read.
77                 return nil, nil
78         }
79         if net != uint32(activeNetParams.Net) {
80                 return nil, fmt.Errorf("network mismatch -- got %x, want %x",
81                         net, uint32(activeNetParams.Net))
82         }
83
84         // Read the block length and ensure it is sane.
85         var blockLen uint32
86         if err := binary.Read(bi.r, binary.LittleEndian, &blockLen); err != nil {
87                 return nil, err
88         }
89         if blockLen > wire.MaxBlockPayload {
90                 return nil, fmt.Errorf("block payload of %d bytes is larger "+
91                         "than the max allowed %d bytes", blockLen,
92                         wire.MaxBlockPayload)
93         }
94
95         serializedBlock := make([]byte, blockLen)
96         if _, err := io.ReadFull(bi.r, serializedBlock); err != nil {
97                 return nil, err
98         }
99
100         return serializedBlock, nil
101 }
102
103 // processBlock potentially imports the block into the database.  It first
104 // deserializes the raw block while checking for errors.  Already known blocks
105 // are skipped and orphan blocks are considered errors.  Returns whether the
106 // block was imported along with any potential errors.
107 //
108 // NOTE: This is not a safe import as it does not verify chain rules.
109 func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) {
110         // Deserialize the block which includes checks for malformed blocks.
111         block, err := btcutil.NewBlockFromBytes(serializedBlock)
112         if err != nil {
113                 return false, err
114         }
115
116         // update progress statistics
117         bi.lastBlockTime = block.MsgBlock().Header.Timestamp
118         bi.receivedLogTx += int64(len(block.MsgBlock().Transactions))
119
120         // Skip blocks that already exist.
121         var exists bool
122         err = bi.db.View(func(tx database.Tx) error {
123                 exists, err = tx.HasBlock(block.Hash())
124                 return err
125         })
126         if err != nil {
127                 return false, err
128         }
129         if exists {
130                 return false, nil
131         }
132
133         // Don't bother trying to process orphans.
134         prevHash := &block.MsgBlock().Header.PrevBlock
135         if !prevHash.IsEqual(&zeroHash) {
136                 var exists bool
137                 err := bi.db.View(func(tx database.Tx) error {
138                         exists, err = tx.HasBlock(prevHash)
139                         return err
140                 })
141                 if err != nil {
142                         return false, err
143                 }
144                 if !exists {
145                         return false, fmt.Errorf("import file contains block "+
146                                 "%v which does not link to the available "+
147                                 "block chain", prevHash)
148                 }
149         }
150
151         // Put the blocks into the database with no checking of chain rules.
152         err = bi.db.Update(func(tx database.Tx) error {
153                 return tx.StoreBlock(block)
154         })
155         if err != nil {
156                 return false, err
157         }
158
159         return true, nil
160 }
161
162 // readHandler is the main handler for reading blocks from the import file.
163 // This allows block processing to take place in parallel with block reads.
164 // It must be run as a goroutine.
165 func (bi *blockImporter) readHandler() {
166 out:
167         for {
168                 // Read the next block from the file and if anything goes wrong
169                 // notify the status handler with the error and bail.
170                 serializedBlock, err := bi.readBlock()
171                 if err != nil {
172                         bi.errChan <- fmt.Errorf("Error reading from input "+
173                                 "file: %v", err.Error())
174                         break out
175                 }
176
177                 // A nil block with no error means we're done.
178                 if serializedBlock == nil {
179                         break out
180                 }
181
182                 // Send the block or quit if we've been signalled to exit by
183                 // the status handler due to an error elsewhere.
184                 select {
185                 case bi.processQueue <- serializedBlock:
186                 case <-bi.quit:
187                         break out
188                 }
189         }
190
191         // Close the processing channel to signal no more blocks are coming.
192         close(bi.processQueue)
193         bi.wg.Done()
194 }
195
196 // logProgress logs block progress as an information message.  In order to
197 // prevent spam, it limits logging to one message every importCfg.Progress
198 // seconds with duration and totals included.
199 func (bi *blockImporter) logProgress() {
200         bi.receivedLogBlocks++
201
202         now := time.Now()
203         duration := now.Sub(bi.lastLogTime)
204         if duration < time.Second*time.Duration(importCfg.Progress) {
205                 return
206         }
207
208         // Truncate the duration to 10s of milliseconds.
209         durationMillis := int64(duration / time.Millisecond)
210         tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
211
212         // Log information about new block height.
213         blockStr := "blocks"
214         if bi.receivedLogBlocks == 1 {
215                 blockStr = "block"
216         }
217         txStr := "transactions"
218         if bi.receivedLogTx == 1 {
219                 txStr = "transaction"
220         }
221         log.Infof("Processed %d %s in the last %s (%d %s, height %d, %s)",
222                 bi.receivedLogBlocks, blockStr, tDuration, bi.receivedLogTx,
223                 txStr, bi.lastHeight, bi.lastBlockTime)
224
225         bi.receivedLogBlocks = 0
226         bi.receivedLogTx = 0
227         bi.lastLogTime = now
228 }
229
230 // processHandler is the main handler for processing blocks.  This allows block
231 // processing to take place in parallel with block reads from the import file.
232 // It must be run as a goroutine.
233 func (bi *blockImporter) processHandler() {
234 out:
235         for {
236                 select {
237                 case serializedBlock, ok := <-bi.processQueue:
238                         // We're done when the channel is closed.
239                         if !ok {
240                                 break out
241                         }
242
243                         bi.blocksProcessed++
244                         bi.lastHeight++
245                         imported, err := bi.processBlock(serializedBlock)
246                         if err != nil {
247                                 bi.errChan <- err
248                                 break out
249                         }
250
251                         if imported {
252                                 bi.blocksImported++
253                         }
254
255                         bi.logProgress()
256
257                 case <-bi.quit:
258                         break out
259                 }
260         }
261         bi.wg.Done()
262 }
263
264 // statusHandler waits for updates from the import operation and notifies
265 // the passed doneChan with the results of the import.  It also causes all
266 // goroutines to exit if an error is reported from any of them.
267 func (bi *blockImporter) statusHandler(resultsChan chan *importResults) {
268         select {
269         // An error from either of the goroutines means we're done so signal
270         // caller with the error and signal all goroutines to quit.
271         case err := <-bi.errChan:
272                 resultsChan <- &importResults{
273                         blocksProcessed: bi.blocksProcessed,
274                         blocksImported:  bi.blocksImported,
275                         err:             err,
276                 }
277                 close(bi.quit)
278
279         // The import finished normally.
280         case <-bi.doneChan:
281                 resultsChan <- &importResults{
282                         blocksProcessed: bi.blocksProcessed,
283                         blocksImported:  bi.blocksImported,
284                         err:             nil,
285                 }
286         }
287 }
288
289 // Import is the core function which handles importing the blocks from the file
290 // associated with the block importer to the database.  It returns a channel
291 // on which the results will be returned when the operation has completed.
292 func (bi *blockImporter) Import() chan *importResults {
293         // Start up the read and process handling goroutines.  This setup allows
294         // blocks to be read from disk in parallel while being processed.
295         bi.wg.Add(2)
296         go bi.readHandler()
297         go bi.processHandler()
298
299         // Wait for the import to finish in a separate goroutine and signal
300         // the status handler when done.
301         go func() {
302                 bi.wg.Wait()
303                 bi.doneChan <- true
304         }()
305
306         // Start the status handler and return the result channel that it will
307         // send the results on when the import is done.
308         resultChan := make(chan *importResults)
309         go bi.statusHandler(resultChan)
310         return resultChan
311 }
312
313 // newBlockImporter returns a new importer for the provided file reader seeker
314 // and database.
315 func newBlockImporter(db database.DB, r io.ReadSeeker) *blockImporter {
316         return &blockImporter{
317                 db:           db,
318                 r:            r,
319                 processQueue: make(chan []byte, 2),
320                 doneChan:     make(chan bool),
321                 errChan:      make(chan error),
322                 quit:         make(chan struct{}),
323                 lastLogTime:  time.Now(),
324         }
325 }
326
327 // Execute is the main entry point for the command.  It's invoked by the parser.
328 func (cmd *importCmd) Execute(args []string) error {
329         // Setup the global config options and ensure they are valid.
330         if err := setupGlobalConfig(); err != nil {
331                 return err
332         }
333
334         // Ensure the specified block file exists.
335         if !fileExists(cmd.InFile) {
336                 str := "The specified block file [%v] does not exist"
337                 return fmt.Errorf(str, cmd.InFile)
338         }
339
340         // Load the block database.
341         db, err := loadBlockDB()
342         if err != nil {
343                 return err
344         }
345         defer db.Close()
346
347         // Ensure the database is sync'd and closed on Ctrl+C.
348         addInterruptHandler(func() {
349                 log.Infof("Gracefully shutting down the database...")
350                 db.Close()
351         })
352
353         fi, err := os.Open(importCfg.InFile)
354         if err != nil {
355                 return err
356         }
357         defer fi.Close()
358
359         // Create a block importer for the database and input file and start it.
360         // The results channel returned from start will contain an error if
361         // anything went wrong.
362         importer := newBlockImporter(db, fi)
363
364         // Perform the import asynchronously and signal the main goroutine when
365         // done.  This allows blocks to be processed and read in parallel.  The
366         // results channel returned from Import contains the statistics about
367         // the import including an error if something went wrong.  This is done
368         // in a separate goroutine rather than waiting directly so the main
369         // goroutine can be signaled for shutdown by either completion, error,
370         // or from the main interrupt handler.  This is necessary since the main
371         // goroutine must be kept running long enough for the interrupt handler
372         // goroutine to finish.
373         go func() {
374                 log.Info("Starting import")
375                 resultsChan := importer.Import()
376                 results := <-resultsChan
377                 if results.err != nil {
378                         dbErr, ok := results.err.(database.Error)
379                         if !ok || ok && dbErr.ErrorCode != database.ErrDbNotOpen {
380                                 shutdownChannel <- results.err
381                                 return
382                         }
383                 }
384
385                 log.Infof("Processed a total of %d blocks (%d imported, %d "+
386                         "already known)", results.blocksProcessed,
387                         results.blocksImported,
388                         results.blocksProcessed-results.blocksImported)
389                 shutdownChannel <- nil
390         }()
391
392         // Wait for shutdown signal from either a normal completion or from the
393         // interrupt handler.
394         err = <-shutdownChannel
395         return err
396 }