OSDN Git Service

when wallet disable, wallet related api redirect to error (#483)
[bytom/bytom.git] / blockchain / reactor.go
1 package blockchain
2
3 import (
4         "context"
5         "reflect"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9         cmn "github.com/tendermint/tmlibs/common"
10
11         "github.com/bytom/blockchain/txfeed"
12         "github.com/bytom/mining/cpuminer"
13         "github.com/bytom/mining/miningpool"
14         "github.com/bytom/p2p"
15         "github.com/bytom/p2p/trust"
16         "github.com/bytom/protocol"
17         "github.com/bytom/protocol/bc"
18         protocolTypes "github.com/bytom/protocol/bc/types"
19         "github.com/bytom/types"
20         "github.com/bytom/wallet"
21 )
22
23 const (
24         // BlockchainChannel is a channel for blocks and status updates
25         BlockchainChannel = byte(0x40)
26         maxNewBlockChSize = int(1024)
27
28         statusUpdateIntervalSeconds = 10
29         maxBlockchainResponseSize   = 22020096 + 2
30         crosscoreRPCPrefix          = "/rpc/"
31 )
32
33 // BlockchainReactor handles long-term catchup syncing.
34 type BlockchainReactor struct {
35         p2p.BaseReactor
36
37         chain         *protocol.Chain
38         wallet        *wallet.Wallet
39         TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
40         blockKeeper   *blockKeeper
41         txPool        *protocol.TxPool
42         mining        *cpuminer.CPUMiner
43         miningPool    *miningpool.MiningPool
44         sw            *p2p.Switch
45         evsw          types.EventSwitch
46         newBlockCh    chan *bc.Hash
47         miningEnable  bool
48 }
49
50 // Info return the server information
51 func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
52         return map[string]interface{}{
53                 "is_configured": false,
54                 "version":       "0.001",
55                 "build_commit":  "----",
56                 "build_date":    "------",
57                 "build_config":  "---------",
58         }, nil
59 }
60
61 // NewBlockchainReactor returns the reactor of whole blockchain.
62 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
63         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
64         bcr := &BlockchainReactor{
65                 chain:         chain,
66                 wallet:        wallet,
67                 blockKeeper:   newBlockKeeper(chain, sw),
68                 txPool:        txPool,
69                 sw:            sw,
70                 TxFeedTracker: txfeeds,
71                 miningEnable:  miningEnable,
72                 newBlockCh:    newBlockCh,
73         }
74
75         if wallet == nil {
76                 bcr.mining = cpuminer.NewCPUMiner(chain, nil, txPool, newBlockCh)
77                 bcr.miningPool = miningpool.NewMiningPool(chain, nil, txPool, newBlockCh)
78         } else {
79                 bcr.mining = cpuminer.NewCPUMiner(chain, wallet.AccountMgr, txPool, newBlockCh)
80                 bcr.miningPool = miningpool.NewMiningPool(chain, wallet.AccountMgr, txPool, newBlockCh)
81         }
82
83         bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
84         return bcr
85 }
86
87 // OnStart implements BaseService
88 func (bcr *BlockchainReactor) OnStart() error {
89         bcr.BaseReactor.OnStart()
90
91         if bcr.miningEnable {
92                 bcr.mining.Start()
93         }
94         go bcr.syncRoutine()
95         return nil
96 }
97
98 // OnStop implements BaseService
99 func (bcr *BlockchainReactor) OnStop() {
100         bcr.BaseReactor.OnStop()
101         if bcr.miningEnable {
102                 bcr.mining.Stop()
103         }
104         bcr.blockKeeper.Stop()
105 }
106
107 // GetChannels implements Reactor
108 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
109         return []*p2p.ChannelDescriptor{
110                 {
111                         ID:                BlockchainChannel,
112                         Priority:          5,
113                         SendQueueCapacity: 100,
114                 },
115         }
116 }
117
118 // AddPeer implements Reactor by sending our state to peer.
119 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
120         peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
121 }
122
123 // RemovePeer implements Reactor by removing peer from the pool.
124 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
125         bcr.blockKeeper.RemovePeer(peer.Key)
126 }
127
128 // Receive implements Reactor by handling 4 types of messages (look below).
129 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
130         var tm *trust.TrustMetric
131         key := src.Connection().RemoteAddress.IP.String()
132         if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
133                 log.Errorf("Can't get peer trust metric")
134                 return
135         }
136
137         _, msg, err := DecodeMessage(msgBytes)
138         if err != nil {
139                 log.Errorf("Error decoding messagek %v", err)
140                 return
141         }
142         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
143
144         switch msg := msg.(type) {
145         case *BlockRequestMessage:
146                 var block *protocolTypes.Block
147                 var err error
148                 if msg.Height != 0 {
149                         block, err = bcr.chain.GetBlockByHeight(msg.Height)
150                 } else {
151                         block, err = bcr.chain.GetBlockByHash(msg.GetHash())
152                 }
153                 if err != nil {
154                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
155                         return
156                 }
157                 response, err := NewBlockResponseMessage(block)
158                 if err != nil {
159                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
160                         return
161                 }
162                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
163
164         case *BlockResponseMessage:
165                 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
166
167         case *StatusRequestMessage:
168                 block := bcr.chain.BestBlock()
169                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
170
171         case *StatusResponseMessage:
172                 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
173
174         case *TransactionNotifyMessage:
175                 tx := msg.GetTransaction()
176                 if err := bcr.chain.ValidateTx(tx); err != nil {
177                         bcr.sw.AddScamPeer(src)
178                 }
179
180         default:
181                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
182         }
183 }
184
185 // Handle messages from the poolReactor telling the reactor what to do.
186 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
187 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
188 func (bcr *BlockchainReactor) syncRoutine() {
189         statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
190         newTxCh := bcr.txPool.GetNewTxCh()
191
192         for {
193                 select {
194                 case blockHash := <-bcr.newBlockCh:
195                         block, err := bcr.chain.GetBlockByHash(blockHash)
196                         if err != nil {
197                                 log.Errorf("Error get block from newBlockCh %v", err)
198                         }
199                         log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
200                 case newTx := <-newTxCh:
201                         bcr.TxFeedTracker.TxFilter(newTx)
202                         go bcr.BroadcastTransaction(newTx)
203                 case _ = <-statusUpdateTicker.C:
204                         go bcr.BroadcastStatusResponse()
205
206                         if bcr.miningEnable {
207                                 // mining if and only if block sync is finished
208                                 if bcr.blockKeeper.IsCaughtUp() {
209                                         bcr.mining.Start()
210                                 } else {
211                                         bcr.mining.Stop()
212                                 }
213                         }
214                 case <-bcr.Quit:
215                         return
216                 }
217         }
218 }
219
220 // BroadcastStatusResponse broadcasts `BlockStore` height.
221 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
222         block := bcr.chain.BestBlock()
223         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
224 }
225
226 // BroadcastTransaction broadcats `BlockStore` transaction.
227 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
228         msg, err := NewTransactionNotifyMessage(tx)
229         if err != nil {
230                 return err
231         }
232         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})
233         return nil
234 }