1 // Copyright (c) 2013-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
14 "github.com/btcsuite/btcd/blockchain"
15 "github.com/btcsuite/btcd/blockchain/indexers"
16 "github.com/btcsuite/btcd/chaincfg/chainhash"
17 "github.com/btcsuite/btcd/database"
18 "github.com/btcsuite/btcd/wire"
19 "github.com/btcsuite/btcutil"
22 var zeroHash = chainhash.Hash{}
24 // importResults houses the stats and result as an import operation.
25 type importResults struct {
31 // blockImporter houses information about an ongoing import from a block data
32 // file to the block database.
33 type blockImporter struct {
35 chain *blockchain.BlockChain
37 processQueue chan []byte
44 receivedLogBlocks int64
47 lastBlockTime time.Time
51 // readBlock reads the next block from the input file.
52 func (bi *blockImporter) readBlock() ([]byte, error) {
53 // The block file format is:
54 // <network> <block length> <serialized block>
56 err := binary.Read(bi.r, binary.LittleEndian, &net)
62 // No block and no error means there are no more blocks to read.
65 if net != uint32(activeNetParams.Net) {
66 return nil, fmt.Errorf("network mismatch -- got %x, want %x",
67 net, uint32(activeNetParams.Net))
70 // Read the block length and ensure it is sane.
72 if err := binary.Read(bi.r, binary.LittleEndian, &blockLen); err != nil {
75 if blockLen > wire.MaxBlockPayload {
76 return nil, fmt.Errorf("block payload of %d bytes is larger "+
77 "than the max allowed %d bytes", blockLen,
81 serializedBlock := make([]byte, blockLen)
82 if _, err := io.ReadFull(bi.r, serializedBlock); err != nil {
86 return serializedBlock, nil
89 // processBlock potentially imports the block into the database. It first
90 // deserializes the raw block while checking for errors. Already known blocks
91 // are skipped and orphan blocks are considered errors. Finally, it runs the
92 // block through the chain rules to ensure it follows all rules and matches
93 // up to the known checkpoint. Returns whether the block was imported along
94 // with any potential errors.
95 func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) {
96 // Deserialize the block which includes checks for malformed blocks.
97 block, err := btcutil.NewBlockFromBytes(serializedBlock)
102 // update progress statistics
103 bi.lastBlockTime = block.MsgBlock().Header.Timestamp
104 bi.receivedLogTx += int64(len(block.MsgBlock().Transactions))
106 // Skip blocks that already exist.
107 blockHash := block.Hash()
108 exists, err := bi.chain.HaveBlock(blockHash)
116 // Don't bother trying to process orphans.
117 prevHash := &block.MsgBlock().Header.PrevBlock
118 if !prevHash.IsEqual(&zeroHash) {
119 exists, err := bi.chain.HaveBlock(prevHash)
124 return false, fmt.Errorf("import file contains block "+
125 "%v which does not link to the available "+
126 "block chain", prevHash)
130 // Ensure the blocks follows all of the chain rules and match up to the
131 // known checkpoints.
132 isMainChain, isOrphan, err := bi.chain.ProcessBlock(block,
133 blockchain.BFFastAdd)
138 return false, fmt.Errorf("import file contains an block that "+
139 "does not extend the main chain: %v", blockHash)
142 return false, fmt.Errorf("import file contains an orphan "+
143 "block: %v", blockHash)
149 // readHandler is the main handler for reading blocks from the import file.
150 // This allows block processing to take place in parallel with block reads.
151 // It must be run as a goroutine.
152 func (bi *blockImporter) readHandler() {
155 // Read the next block from the file and if anything goes wrong
156 // notify the status handler with the error and bail.
157 serializedBlock, err := bi.readBlock()
159 bi.errChan <- fmt.Errorf("Error reading from input "+
160 "file: %v", err.Error())
164 // A nil block with no error means we're done.
165 if serializedBlock == nil {
169 // Send the block or quit if we've been signalled to exit by
170 // the status handler due to an error elsewhere.
172 case bi.processQueue <- serializedBlock:
178 // Close the processing channel to signal no more blocks are coming.
179 close(bi.processQueue)
183 // logProgress logs block progress as an information message. In order to
184 // prevent spam, it limits logging to one message every cfg.Progress seconds
185 // with duration and totals included.
186 func (bi *blockImporter) logProgress() {
187 bi.receivedLogBlocks++
190 duration := now.Sub(bi.lastLogTime)
191 if duration < time.Second*time.Duration(cfg.Progress) {
195 // Truncate the duration to 10s of milliseconds.
196 durationMillis := int64(duration / time.Millisecond)
197 tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
199 // Log information about new block height.
201 if bi.receivedLogBlocks == 1 {
204 txStr := "transactions"
205 if bi.receivedLogTx == 1 {
206 txStr = "transaction"
208 log.Infof("Processed %d %s in the last %s (%d %s, height %d, %s)",
209 bi.receivedLogBlocks, blockStr, tDuration, bi.receivedLogTx,
210 txStr, bi.lastHeight, bi.lastBlockTime)
212 bi.receivedLogBlocks = 0
217 // processHandler is the main handler for processing blocks. This allows block
218 // processing to take place in parallel with block reads from the import file.
219 // It must be run as a goroutine.
220 func (bi *blockImporter) processHandler() {
224 case serializedBlock, ok := <-bi.processQueue:
225 // We're done when the channel is closed.
232 imported, err := bi.processBlock(serializedBlock)
251 // statusHandler waits for updates from the import operation and notifies
252 // the passed doneChan with the results of the import. It also causes all
253 // goroutines to exit if an error is reported from any of them.
254 func (bi *blockImporter) statusHandler(resultsChan chan *importResults) {
256 // An error from either of the goroutines means we're done so signal
257 // caller with the error and signal all goroutines to quit.
258 case err := <-bi.errChan:
259 resultsChan <- &importResults{
260 blocksProcessed: bi.blocksProcessed,
261 blocksImported: bi.blocksImported,
266 // The import finished normally.
268 resultsChan <- &importResults{
269 blocksProcessed: bi.blocksProcessed,
270 blocksImported: bi.blocksImported,
276 // Import is the core function which handles importing the blocks from the file
277 // associated with the block importer to the database. It returns a channel
278 // on which the results will be returned when the operation has completed.
279 func (bi *blockImporter) Import() chan *importResults {
280 // Start up the read and process handling goroutines. This setup allows
281 // blocks to be read from disk in parallel while being processed.
284 go bi.processHandler()
286 // Wait for the import to finish in a separate goroutine and signal
287 // the status handler when done.
293 // Start the status handler and return the result channel that it will
294 // send the results on when the import is done.
295 resultChan := make(chan *importResults)
296 go bi.statusHandler(resultChan)
300 // newBlockImporter returns a new importer for the provided file reader seeker
302 func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) {
303 // Create the transaction and address indexes if needed.
305 // CAUTION: the txindex needs to be first in the indexes array because
306 // the addrindex uses data from the txindex during catchup. If the
307 // addrindex is run first, it may not have the transactions from the
308 // current block indexed.
309 var indexes []indexers.Indexer
310 if cfg.TxIndex || cfg.AddrIndex {
311 // Enable transaction index if address index is enabled since it
314 log.Infof("Transaction index enabled because it is " +
315 "required by the address index")
318 log.Info("Transaction index is enabled")
320 indexes = append(indexes, indexers.NewTxIndex(db))
323 log.Info("Address index is enabled")
324 indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams))
327 // Create an index manager if any of the optional indexes are enabled.
328 var indexManager blockchain.IndexManager
329 if len(indexes) > 0 {
330 indexManager = indexers.NewManager(db, indexes)
333 chain, err := blockchain.New(&blockchain.Config{
335 ChainParams: activeNetParams,
336 TimeSource: blockchain.NewMedianTime(),
337 IndexManager: indexManager,
343 return &blockImporter{
346 processQueue: make(chan []byte, 2),
347 doneChan: make(chan bool),
348 errChan: make(chan error),
349 quit: make(chan struct{}),
351 lastLogTime: time.Now(),