OSDN Git Service

fix AnnotatedOutput#AssetDefinition type (#525)
[bytom/bytom.git] / blockchain / reactor.go
1 package blockchain
2
3 import (
4         "context"
5         "reflect"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9         cmn "github.com/tendermint/tmlibs/common"
10
11         "github.com/bytom/account"
12         "github.com/bytom/blockchain/txfeed"
13         "github.com/bytom/mining/cpuminer"
14         "github.com/bytom/mining/miningpool"
15         "github.com/bytom/p2p"
16         "github.com/bytom/p2p/trust"
17         "github.com/bytom/protocol"
18         "github.com/bytom/protocol/bc"
19         protocolTypes "github.com/bytom/protocol/bc/types"
20         "github.com/bytom/types"
21 )
22
23 const (
24         // BlockchainChannel is a channel for blocks and status updates
25         BlockchainChannel = byte(0x40)
26         maxNewBlockChSize = int(1024)
27
28         statusUpdateIntervalSeconds = 10
29         maxBlockchainResponseSize   = 22020096 + 2
30 )
31
32 // BlockchainReactor handles long-term catchup syncing.
33 type BlockchainReactor struct {
34         p2p.BaseReactor
35
36         chain         *protocol.Chain
37         TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
38         blockKeeper   *blockKeeper
39         txPool        *protocol.TxPool
40         mining        *cpuminer.CPUMiner
41         miningPool    *miningpool.MiningPool
42         sw            *p2p.Switch
43         evsw          types.EventSwitch
44         newBlockCh    chan *bc.Hash
45         miningEnable  bool
46 }
47
48 // Info return the server information
49 func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
50         return map[string]interface{}{
51                 "is_configured": false,
52                 "version":       "0.001",
53                 "build_commit":  "----",
54                 "build_date":    "------",
55                 "build_config":  "---------",
56         }, nil
57 }
58
59 // NewBlockchainReactor returns the reactor of whole blockchain.
60 func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, accountMgr *account.Manager, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
61         newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
62         bcr := &BlockchainReactor{
63                 chain:         chain,
64                 blockKeeper:   newBlockKeeper(chain, sw),
65                 txPool:        txPool,
66                 sw:            sw,
67                 TxFeedTracker: txfeeds,
68                 miningEnable:  miningEnable,
69                 newBlockCh:    newBlockCh,
70         }
71
72         bcr.mining = cpuminer.NewCPUMiner(chain, accountMgr, txPool, newBlockCh)
73         bcr.miningPool = miningpool.NewMiningPool(chain, accountMgr, txPool, newBlockCh)
74
75         bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
76         return bcr
77 }
78
79 // OnStart implements BaseService
80 func (bcr *BlockchainReactor) OnStart() error {
81         bcr.BaseReactor.OnStart()
82
83         if bcr.miningEnable {
84                 bcr.mining.Start()
85         }
86         go bcr.syncRoutine()
87         return nil
88 }
89
90 // OnStop implements BaseService
91 func (bcr *BlockchainReactor) OnStop() {
92         bcr.BaseReactor.OnStop()
93         if bcr.miningEnable {
94                 bcr.mining.Stop()
95         }
96         bcr.blockKeeper.Stop()
97 }
98
99 // GetChannels implements Reactor
100 func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
101         return []*p2p.ChannelDescriptor{
102                 {
103                         ID:                BlockchainChannel,
104                         Priority:          5,
105                         SendQueueCapacity: 100,
106                 },
107         }
108 }
109
110 // AddPeer implements Reactor by sending our state to peer.
111 func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
112         peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
113 }
114
115 // RemovePeer implements Reactor by removing peer from the pool.
116 func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
117         bcr.blockKeeper.RemovePeer(peer.Key)
118 }
119
120 // Receive implements Reactor by handling 4 types of messages (look below).
121 func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
122         var tm *trust.TrustMetric
123         key := src.Connection().RemoteAddress.IP.String()
124         if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
125                 log.Errorf("Can't get peer trust metric")
126                 return
127         }
128
129         _, msg, err := DecodeMessage(msgBytes)
130         if err != nil {
131                 log.Errorf("Error decoding messagek %v", err)
132                 return
133         }
134         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
135
136         switch msg := msg.(type) {
137         case *BlockRequestMessage:
138                 var block *protocolTypes.Block
139                 var err error
140                 if msg.Height != 0 {
141                         block, err = bcr.chain.GetBlockByHeight(msg.Height)
142                 } else {
143                         block, err = bcr.chain.GetBlockByHash(msg.GetHash())
144                 }
145                 if err != nil {
146                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
147                         return
148                 }
149                 response, err := NewBlockResponseMessage(block)
150                 if err != nil {
151                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
152                         return
153                 }
154                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
155
156         case *BlockResponseMessage:
157                 bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
158
159         case *StatusRequestMessage:
160                 block := bcr.chain.BestBlock()
161                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
162
163         case *StatusResponseMessage:
164                 bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
165
166         case *TransactionNotifyMessage:
167                 tx := msg.GetTransaction()
168                 if err := bcr.chain.ValidateTx(tx); err != nil {
169                         bcr.sw.AddScamPeer(src)
170                 }
171
172         default:
173                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
174         }
175 }
176
177 // Handle messages from the poolReactor telling the reactor what to do.
178 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
179 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
180 func (bcr *BlockchainReactor) syncRoutine() {
181         statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
182         newTxCh := bcr.txPool.GetNewTxCh()
183
184         for {
185                 select {
186                 case blockHash := <-bcr.newBlockCh:
187                         block, err := bcr.chain.GetBlockByHash(blockHash)
188                         if err != nil {
189                                 log.Errorf("Error get block from newBlockCh %v", err)
190                         }
191                         log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
192                 case newTx := <-newTxCh:
193                         bcr.TxFeedTracker.TxFilter(newTx)
194                         go bcr.BroadcastTransaction(newTx)
195                 case _ = <-statusUpdateTicker.C:
196                         go bcr.BroadcastStatusResponse()
197
198                         if bcr.miningEnable {
199                                 // mining if and only if block sync is finished
200                                 if bcr.blockKeeper.IsCaughtUp() {
201                                         bcr.mining.Start()
202                                 } else {
203                                         bcr.mining.Stop()
204                                 }
205                         }
206                 case <-bcr.Quit:
207                         return
208                 }
209         }
210 }
211
212 // BroadcastStatusResponse broadcasts `BlockStore` height.
213 func (bcr *BlockchainReactor) BroadcastStatusResponse() {
214         block := bcr.chain.BestBlock()
215         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
216 }
217
218 // BroadcastTransaction broadcats `BlockStore` transaction.
219 func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
220         msg, err := NewTransactionNotifyMessage(tx)
221         if err != nil {
222                 return err
223         }
224         bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})
225         return nil
226 }