// Copyright (c) 2013-2016 The btcsuite developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. package main import ( "encoding/binary" "fmt" "io" "sync" "time" "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/blockchain/indexers" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" ) var zeroHash = chainhash.Hash{} // importResults houses the stats and result as an import operation. type importResults struct { blocksProcessed int64 blocksImported int64 err error } // blockImporter houses information about an ongoing import from a block data // file to the block database. type blockImporter struct { db database.DB chain *blockchain.BlockChain r io.ReadSeeker processQueue chan []byte doneChan chan bool errChan chan error quit chan struct{} wg sync.WaitGroup blocksProcessed int64 blocksImported int64 receivedLogBlocks int64 receivedLogTx int64 lastHeight int64 lastBlockTime time.Time lastLogTime time.Time } // readBlock reads the next block from the input file. func (bi *blockImporter) readBlock() ([]byte, error) { // The block file format is: // var net uint32 err := binary.Read(bi.r, binary.LittleEndian, &net) if err != nil { if err != io.EOF { return nil, err } // No block and no error means there are no more blocks to read. return nil, nil } if net != uint32(activeNetParams.Net) { return nil, fmt.Errorf("network mismatch -- got %x, want %x", net, uint32(activeNetParams.Net)) } // Read the block length and ensure it is sane. var blockLen uint32 if err := binary.Read(bi.r, binary.LittleEndian, &blockLen); err != nil { return nil, err } if blockLen > wire.MaxBlockPayload { return nil, fmt.Errorf("block payload of %d bytes is larger "+ "than the max allowed %d bytes", blockLen, wire.MaxBlockPayload) } serializedBlock := make([]byte, blockLen) if _, err := io.ReadFull(bi.r, serializedBlock); err != nil { return nil, err } return serializedBlock, nil } // processBlock potentially imports the block into the database. It first // deserializes the raw block while checking for errors. Already known blocks // are skipped and orphan blocks are considered errors. Finally, it runs the // block through the chain rules to ensure it follows all rules and matches // up to the known checkpoint. Returns whether the block was imported along // with any potential errors. func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) { // Deserialize the block which includes checks for malformed blocks. block, err := btcutil.NewBlockFromBytes(serializedBlock) if err != nil { return false, err } // update progress statistics bi.lastBlockTime = block.MsgBlock().Header.Timestamp bi.receivedLogTx += int64(len(block.MsgBlock().Transactions)) // Skip blocks that already exist. blockHash := block.Hash() exists, err := bi.chain.HaveBlock(blockHash) if err != nil { return false, err } if exists { return false, nil } // Don't bother trying to process orphans. prevHash := &block.MsgBlock().Header.PrevBlock if !prevHash.IsEqual(&zeroHash) { exists, err := bi.chain.HaveBlock(prevHash) if err != nil { return false, err } if !exists { return false, fmt.Errorf("import file contains block "+ "%v which does not link to the available "+ "block chain", prevHash) } } // Ensure the blocks follows all of the chain rules and match up to the // known checkpoints. isMainChain, isOrphan, err := bi.chain.ProcessBlock(block, blockchain.BFFastAdd) if err != nil { return false, err } if !isMainChain { return false, fmt.Errorf("import file contains an block that "+ "does not extend the main chain: %v", blockHash) } if isOrphan { return false, fmt.Errorf("import file contains an orphan "+ "block: %v", blockHash) } return true, nil } // readHandler is the main handler for reading blocks from the import file. // This allows block processing to take place in parallel with block reads. // It must be run as a goroutine. func (bi *blockImporter) readHandler() { out: for { // Read the next block from the file and if anything goes wrong // notify the status handler with the error and bail. serializedBlock, err := bi.readBlock() if err != nil { bi.errChan <- fmt.Errorf("Error reading from input "+ "file: %v", err.Error()) break out } // A nil block with no error means we're done. if serializedBlock == nil { break out } // Send the block or quit if we've been signalled to exit by // the status handler due to an error elsewhere. select { case bi.processQueue <- serializedBlock: case <-bi.quit: break out } } // Close the processing channel to signal no more blocks are coming. close(bi.processQueue) bi.wg.Done() } // logProgress logs block progress as an information message. In order to // prevent spam, it limits logging to one message every cfg.Progress seconds // with duration and totals included. func (bi *blockImporter) logProgress() { bi.receivedLogBlocks++ now := time.Now() duration := now.Sub(bi.lastLogTime) if duration < time.Second*time.Duration(cfg.Progress) { return } // Truncate the duration to 10s of milliseconds. durationMillis := int64(duration / time.Millisecond) tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) // Log information about new block height. blockStr := "blocks" if bi.receivedLogBlocks == 1 { blockStr = "block" } txStr := "transactions" if bi.receivedLogTx == 1 { txStr = "transaction" } log.Infof("Processed %d %s in the last %s (%d %s, height %d, %s)", bi.receivedLogBlocks, blockStr, tDuration, bi.receivedLogTx, txStr, bi.lastHeight, bi.lastBlockTime) bi.receivedLogBlocks = 0 bi.receivedLogTx = 0 bi.lastLogTime = now } // processHandler is the main handler for processing blocks. This allows block // processing to take place in parallel with block reads from the import file. // It must be run as a goroutine. func (bi *blockImporter) processHandler() { out: for { select { case serializedBlock, ok := <-bi.processQueue: // We're done when the channel is closed. if !ok { break out } bi.blocksProcessed++ bi.lastHeight++ imported, err := bi.processBlock(serializedBlock) if err != nil { bi.errChan <- err break out } if imported { bi.blocksImported++ } bi.logProgress() case <-bi.quit: break out } } bi.wg.Done() } // statusHandler waits for updates from the import operation and notifies // the passed doneChan with the results of the import. It also causes all // goroutines to exit if an error is reported from any of them. func (bi *blockImporter) statusHandler(resultsChan chan *importResults) { select { // An error from either of the goroutines means we're done so signal // caller with the error and signal all goroutines to quit. case err := <-bi.errChan: resultsChan <- &importResults{ blocksProcessed: bi.blocksProcessed, blocksImported: bi.blocksImported, err: err, } close(bi.quit) // The import finished normally. case <-bi.doneChan: resultsChan <- &importResults{ blocksProcessed: bi.blocksProcessed, blocksImported: bi.blocksImported, err: nil, } } } // Import is the core function which handles importing the blocks from the file // associated with the block importer to the database. It returns a channel // on which the results will be returned when the operation has completed. func (bi *blockImporter) Import() chan *importResults { // Start up the read and process handling goroutines. This setup allows // blocks to be read from disk in parallel while being processed. bi.wg.Add(2) go bi.readHandler() go bi.processHandler() // Wait for the import to finish in a separate goroutine and signal // the status handler when done. go func() { bi.wg.Wait() bi.doneChan <- true }() // Start the status handler and return the result channel that it will // send the results on when the import is done. resultChan := make(chan *importResults) go bi.statusHandler(resultChan) return resultChan } // newBlockImporter returns a new importer for the provided file reader seeker // and database. func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) { // Create the transaction and address indexes if needed. // // CAUTION: the txindex needs to be first in the indexes array because // the addrindex uses data from the txindex during catchup. If the // addrindex is run first, it may not have the transactions from the // current block indexed. var indexes []indexers.Indexer if cfg.TxIndex || cfg.AddrIndex { // Enable transaction index if address index is enabled since it // requires it. if !cfg.TxIndex { log.Infof("Transaction index enabled because it is " + "required by the address index") cfg.TxIndex = true } else { log.Info("Transaction index is enabled") } indexes = append(indexes, indexers.NewTxIndex(db)) } if cfg.AddrIndex { log.Info("Address index is enabled") indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams)) } // Create an index manager if any of the optional indexes are enabled. var indexManager blockchain.IndexManager if len(indexes) > 0 { indexManager = indexers.NewManager(db, indexes) } chain, err := blockchain.New(&blockchain.Config{ DB: db, ChainParams: activeNetParams, TimeSource: blockchain.NewMedianTime(), IndexManager: indexManager, }) if err != nil { return nil, err } return &blockImporter{ db: db, r: r, processQueue: make(chan []byte, 2), doneChan: make(chan bool), errChan: make(chan error), quit: make(chan struct{}), chain: chain, lastLogTime: time.Now(), }, nil }