OSDN Git Service

new repo
[bytom/vapor.git] / vendor / github.com / btcsuite / btcd / cmd / addblock / import.go
1 // Copyright (c) 2013-2016 The btcsuite developers
2 // Use of this source code is governed by an ISC
3 // license that can be found in the LICENSE file.
4
5 package main
6
7 import (
8         "encoding/binary"
9         "fmt"
10         "io"
11         "sync"
12         "time"
13
14         "github.com/btcsuite/btcd/blockchain"
15         "github.com/btcsuite/btcd/blockchain/indexers"
16         "github.com/btcsuite/btcd/chaincfg/chainhash"
17         "github.com/btcsuite/btcd/database"
18         "github.com/btcsuite/btcd/wire"
19         "github.com/btcsuite/btcutil"
20 )
21
22 var zeroHash = chainhash.Hash{}
23
24 // importResults houses the stats and result as an import operation.
25 type importResults struct {
26         blocksProcessed int64
27         blocksImported  int64
28         err             error
29 }
30
31 // blockImporter houses information about an ongoing import from a block data
32 // file to the block database.
33 type blockImporter struct {
34         db                database.DB
35         chain             *blockchain.BlockChain
36         r                 io.ReadSeeker
37         processQueue      chan []byte
38         doneChan          chan bool
39         errChan           chan error
40         quit              chan struct{}
41         wg                sync.WaitGroup
42         blocksProcessed   int64
43         blocksImported    int64
44         receivedLogBlocks int64
45         receivedLogTx     int64
46         lastHeight        int64
47         lastBlockTime     time.Time
48         lastLogTime       time.Time
49 }
50
51 // readBlock reads the next block from the input file.
52 func (bi *blockImporter) readBlock() ([]byte, error) {
53         // The block file format is:
54         //  <network> <block length> <serialized block>
55         var net uint32
56         err := binary.Read(bi.r, binary.LittleEndian, &net)
57         if err != nil {
58                 if err != io.EOF {
59                         return nil, err
60                 }
61
62                 // No block and no error means there are no more blocks to read.
63                 return nil, nil
64         }
65         if net != uint32(activeNetParams.Net) {
66                 return nil, fmt.Errorf("network mismatch -- got %x, want %x",
67                         net, uint32(activeNetParams.Net))
68         }
69
70         // Read the block length and ensure it is sane.
71         var blockLen uint32
72         if err := binary.Read(bi.r, binary.LittleEndian, &blockLen); err != nil {
73                 return nil, err
74         }
75         if blockLen > wire.MaxBlockPayload {
76                 return nil, fmt.Errorf("block payload of %d bytes is larger "+
77                         "than the max allowed %d bytes", blockLen,
78                         wire.MaxBlockPayload)
79         }
80
81         serializedBlock := make([]byte, blockLen)
82         if _, err := io.ReadFull(bi.r, serializedBlock); err != nil {
83                 return nil, err
84         }
85
86         return serializedBlock, nil
87 }
88
89 // processBlock potentially imports the block into the database.  It first
90 // deserializes the raw block while checking for errors.  Already known blocks
91 // are skipped and orphan blocks are considered errors.  Finally, it runs the
92 // block through the chain rules to ensure it follows all rules and matches
93 // up to the known checkpoint.  Returns whether the block was imported along
94 // with any potential errors.
95 func (bi *blockImporter) processBlock(serializedBlock []byte) (bool, error) {
96         // Deserialize the block which includes checks for malformed blocks.
97         block, err := btcutil.NewBlockFromBytes(serializedBlock)
98         if err != nil {
99                 return false, err
100         }
101
102         // update progress statistics
103         bi.lastBlockTime = block.MsgBlock().Header.Timestamp
104         bi.receivedLogTx += int64(len(block.MsgBlock().Transactions))
105
106         // Skip blocks that already exist.
107         blockHash := block.Hash()
108         exists, err := bi.chain.HaveBlock(blockHash)
109         if err != nil {
110                 return false, err
111         }
112         if exists {
113                 return false, nil
114         }
115
116         // Don't bother trying to process orphans.
117         prevHash := &block.MsgBlock().Header.PrevBlock
118         if !prevHash.IsEqual(&zeroHash) {
119                 exists, err := bi.chain.HaveBlock(prevHash)
120                 if err != nil {
121                         return false, err
122                 }
123                 if !exists {
124                         return false, fmt.Errorf("import file contains block "+
125                                 "%v which does not link to the available "+
126                                 "block chain", prevHash)
127                 }
128         }
129
130         // Ensure the blocks follows all of the chain rules and match up to the
131         // known checkpoints.
132         isMainChain, isOrphan, err := bi.chain.ProcessBlock(block,
133                 blockchain.BFFastAdd)
134         if err != nil {
135                 return false, err
136         }
137         if !isMainChain {
138                 return false, fmt.Errorf("import file contains an block that "+
139                         "does not extend the main chain: %v", blockHash)
140         }
141         if isOrphan {
142                 return false, fmt.Errorf("import file contains an orphan "+
143                         "block: %v", blockHash)
144         }
145
146         return true, nil
147 }
148
149 // readHandler is the main handler for reading blocks from the import file.
150 // This allows block processing to take place in parallel with block reads.
151 // It must be run as a goroutine.
152 func (bi *blockImporter) readHandler() {
153 out:
154         for {
155                 // Read the next block from the file and if anything goes wrong
156                 // notify the status handler with the error and bail.
157                 serializedBlock, err := bi.readBlock()
158                 if err != nil {
159                         bi.errChan <- fmt.Errorf("Error reading from input "+
160                                 "file: %v", err.Error())
161                         break out
162                 }
163
164                 // A nil block with no error means we're done.
165                 if serializedBlock == nil {
166                         break out
167                 }
168
169                 // Send the block or quit if we've been signalled to exit by
170                 // the status handler due to an error elsewhere.
171                 select {
172                 case bi.processQueue <- serializedBlock:
173                 case <-bi.quit:
174                         break out
175                 }
176         }
177
178         // Close the processing channel to signal no more blocks are coming.
179         close(bi.processQueue)
180         bi.wg.Done()
181 }
182
183 // logProgress logs block progress as an information message.  In order to
184 // prevent spam, it limits logging to one message every cfg.Progress seconds
185 // with duration and totals included.
186 func (bi *blockImporter) logProgress() {
187         bi.receivedLogBlocks++
188
189         now := time.Now()
190         duration := now.Sub(bi.lastLogTime)
191         if duration < time.Second*time.Duration(cfg.Progress) {
192                 return
193         }
194
195         // Truncate the duration to 10s of milliseconds.
196         durationMillis := int64(duration / time.Millisecond)
197         tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10)
198
199         // Log information about new block height.
200         blockStr := "blocks"
201         if bi.receivedLogBlocks == 1 {
202                 blockStr = "block"
203         }
204         txStr := "transactions"
205         if bi.receivedLogTx == 1 {
206                 txStr = "transaction"
207         }
208         log.Infof("Processed %d %s in the last %s (%d %s, height %d, %s)",
209                 bi.receivedLogBlocks, blockStr, tDuration, bi.receivedLogTx,
210                 txStr, bi.lastHeight, bi.lastBlockTime)
211
212         bi.receivedLogBlocks = 0
213         bi.receivedLogTx = 0
214         bi.lastLogTime = now
215 }
216
217 // processHandler is the main handler for processing blocks.  This allows block
218 // processing to take place in parallel with block reads from the import file.
219 // It must be run as a goroutine.
220 func (bi *blockImporter) processHandler() {
221 out:
222         for {
223                 select {
224                 case serializedBlock, ok := <-bi.processQueue:
225                         // We're done when the channel is closed.
226                         if !ok {
227                                 break out
228                         }
229
230                         bi.blocksProcessed++
231                         bi.lastHeight++
232                         imported, err := bi.processBlock(serializedBlock)
233                         if err != nil {
234                                 bi.errChan <- err
235                                 break out
236                         }
237
238                         if imported {
239                                 bi.blocksImported++
240                         }
241
242                         bi.logProgress()
243
244                 case <-bi.quit:
245                         break out
246                 }
247         }
248         bi.wg.Done()
249 }
250
251 // statusHandler waits for updates from the import operation and notifies
252 // the passed doneChan with the results of the import.  It also causes all
253 // goroutines to exit if an error is reported from any of them.
254 func (bi *blockImporter) statusHandler(resultsChan chan *importResults) {
255         select {
256         // An error from either of the goroutines means we're done so signal
257         // caller with the error and signal all goroutines to quit.
258         case err := <-bi.errChan:
259                 resultsChan <- &importResults{
260                         blocksProcessed: bi.blocksProcessed,
261                         blocksImported:  bi.blocksImported,
262                         err:             err,
263                 }
264                 close(bi.quit)
265
266         // The import finished normally.
267         case <-bi.doneChan:
268                 resultsChan <- &importResults{
269                         blocksProcessed: bi.blocksProcessed,
270                         blocksImported:  bi.blocksImported,
271                         err:             nil,
272                 }
273         }
274 }
275
276 // Import is the core function which handles importing the blocks from the file
277 // associated with the block importer to the database.  It returns a channel
278 // on which the results will be returned when the operation has completed.
279 func (bi *blockImporter) Import() chan *importResults {
280         // Start up the read and process handling goroutines.  This setup allows
281         // blocks to be read from disk in parallel while being processed.
282         bi.wg.Add(2)
283         go bi.readHandler()
284         go bi.processHandler()
285
286         // Wait for the import to finish in a separate goroutine and signal
287         // the status handler when done.
288         go func() {
289                 bi.wg.Wait()
290                 bi.doneChan <- true
291         }()
292
293         // Start the status handler and return the result channel that it will
294         // send the results on when the import is done.
295         resultChan := make(chan *importResults)
296         go bi.statusHandler(resultChan)
297         return resultChan
298 }
299
300 // newBlockImporter returns a new importer for the provided file reader seeker
301 // and database.
302 func newBlockImporter(db database.DB, r io.ReadSeeker) (*blockImporter, error) {
303         // Create the transaction and address indexes if needed.
304         //
305         // CAUTION: the txindex needs to be first in the indexes array because
306         // the addrindex uses data from the txindex during catchup.  If the
307         // addrindex is run first, it may not have the transactions from the
308         // current block indexed.
309         var indexes []indexers.Indexer
310         if cfg.TxIndex || cfg.AddrIndex {
311                 // Enable transaction index if address index is enabled since it
312                 // requires it.
313                 if !cfg.TxIndex {
314                         log.Infof("Transaction index enabled because it is " +
315                                 "required by the address index")
316                         cfg.TxIndex = true
317                 } else {
318                         log.Info("Transaction index is enabled")
319                 }
320                 indexes = append(indexes, indexers.NewTxIndex(db))
321         }
322         if cfg.AddrIndex {
323                 log.Info("Address index is enabled")
324                 indexes = append(indexes, indexers.NewAddrIndex(db, activeNetParams))
325         }
326
327         // Create an index manager if any of the optional indexes are enabled.
328         var indexManager blockchain.IndexManager
329         if len(indexes) > 0 {
330                 indexManager = indexers.NewManager(db, indexes)
331         }
332
333         chain, err := blockchain.New(&blockchain.Config{
334                 DB:           db,
335                 ChainParams:  activeNetParams,
336                 TimeSource:   blockchain.NewMedianTime(),
337                 IndexManager: indexManager,
338         })
339         if err != nil {
340                 return nil, err
341         }
342
343         return &blockImporter{
344                 db:           db,
345                 r:            r,
346                 processQueue: make(chan []byte, 2),
347                 doneChan:     make(chan bool),
348                 errChan:      make(chan error),
349                 quit:         make(chan struct{}),
350                 chain:        chain,
351                 lastLogTime:  time.Now(),
352         }, nil
353 }