OSDN Git Service

Optimize p2p transfer business layer logic (#501)
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "strings"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9         cmn "github.com/tendermint/tmlibs/common"
10
11         "github.com/bytom/errors"
12         "github.com/bytom/p2p"
13         "github.com/bytom/p2p/trust"
14         "github.com/bytom/protocol"
15         "github.com/bytom/protocol/bc"
16         "github.com/bytom/protocol/bc/types"
17 )
18
19 const (
20         // BlockchainChannel is a channel for blocks and status updates
21         BlockchainChannel        = byte(0x40)
22         protocolHandshakeTimeout = time.Second * 10
23 )
24
25 var (
26         //ErrProtocolHandshakeTimeout peers handshake timeout
27         ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
28 )
29
30 // Response describes the response standard.
31 type Response struct {
32         Status string      `json:"status,omitempty"`
33         Msg    string      `json:"msg,omitempty"`
34         Data   interface{} `json:"data,omitempty"`
35 }
36
37 type initalPeerStatus struct {
38         peerID string
39         height uint64
40         hash   *bc.Hash
41 }
42
43 //ProtocolReactor handles new coming protocol message.
44 type ProtocolReactor struct {
45         p2p.BaseReactor
46
47         chain       *protocol.Chain
48         blockKeeper *blockKeeper
49         txPool      *protocol.TxPool
50         sw          *p2p.Switch
51         fetcher     *Fetcher
52         peers       *peerSet
53
54         newPeerCh      chan struct{}
55         quitReqBlockCh chan *string
56         txSyncCh       chan *txsync
57         peerStatusCh   chan *initalPeerStatus
58 }
59
60 // NewProtocolReactor returns the reactor of whole blockchain.
61 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
62         pr := &ProtocolReactor{
63                 chain:          chain,
64                 blockKeeper:    blockPeer,
65                 txPool:         txPool,
66                 sw:             sw,
67                 fetcher:        fetcher,
68                 peers:          peers,
69                 newPeerCh:      newPeerCh,
70                 txSyncCh:       txSyncCh,
71                 quitReqBlockCh: quitReqBlockCh,
72                 peerStatusCh:   make(chan *initalPeerStatus),
73         }
74         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
75         return pr
76 }
77
78 // GetChannels implements Reactor
79 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
80         return []*p2p.ChannelDescriptor{
81                 &p2p.ChannelDescriptor{
82                         ID:                BlockchainChannel,
83                         Priority:          5,
84                         SendQueueCapacity: 100,
85                 },
86         }
87 }
88
89 // OnStart implements BaseService
90 func (pr *ProtocolReactor) OnStart() error {
91         pr.BaseReactor.OnStart()
92         return nil
93 }
94
95 // OnStop implements BaseService
96 func (pr *ProtocolReactor) OnStop() {
97         pr.BaseReactor.OnStop()
98 }
99
100 // syncTransactions starts sending all currently pending transactions to the given peer.
101 func (pr *ProtocolReactor) syncTransactions(p *peer) {
102         pending := pr.txPool.GetTransactions()
103         if len(pending) == 0 {
104                 return
105         }
106         txs := make([]*types.Tx, len(pending))
107         for i, batch := range pending {
108                 txs[i] = batch.Tx
109         }
110         pr.txSyncCh <- &txsync{p, txs}
111 }
112
113 // AddPeer implements Reactor by sending our state to peer.
114 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
115         peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
116         handshakeWait := time.NewTimer(protocolHandshakeTimeout)
117         for {
118                 select {
119                 case status := <-pr.peerStatusCh:
120                         if strings.Compare(status.peerID, peer.Key) == 0 {
121                                 pr.peers.AddPeer(peer)
122                                 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
123                                 pr.syncTransactions(pr.peers.Peer(peer.Key))
124                                 pr.newPeerCh <- struct{}{}
125                                 return nil
126                         }
127                 case <-handshakeWait.C:
128                         return ErrProtocolHandshakeTimeout
129                 }
130         }
131 }
132
133 // RemovePeer implements Reactor by removing peer from the pool.
134 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
135         pr.quitReqBlockCh <- &peer.Key
136         pr.peers.RemovePeer(peer.Key)
137 }
138
139 // Receive implements Reactor by handling 4 types of messages (look below).
140 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
141         var tm *trust.TrustMetric
142         key := src.Connection().RemoteAddress.IP.String()
143         if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
144                 log.Errorf("Can't get peer trust metric")
145                 return
146         }
147
148         _, msg, err := DecodeMessage(msgBytes)
149         if err != nil {
150                 log.Errorf("Error decoding messagek %v", err)
151                 return
152         }
153         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
154
155         switch msg := msg.(type) {
156         case *BlockRequestMessage:
157                 var block *types.Block
158                 var err error
159                 if msg.Height != 0 {
160                         block, err = pr.chain.GetBlockByHeight(msg.Height)
161                 } else {
162                         block, err = pr.chain.GetBlockByHash(msg.GetHash())
163                 }
164                 if err != nil {
165                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
166                         return
167                 }
168                 response, err := NewBlockResponseMessage(block)
169                 if err != nil {
170                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
171                         return
172                 }
173                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
174
175         case *BlockResponseMessage:
176                 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
177                 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
178
179         case *StatusRequestMessage:
180                 block := pr.chain.BestBlock()
181                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
182
183         case *StatusResponseMessage:
184                 peerStatus := &initalPeerStatus{
185                         peerID: src.Key,
186                         height: msg.Height,
187                         hash:   msg.GetHash(),
188                 }
189                 pr.peerStatusCh <- peerStatus
190
191         case *TransactionNotifyMessage:
192                 tx, err := msg.GetTransaction()
193                 if err != nil {
194                         log.Errorf("Error decoding new tx %v", err)
195                         return
196                 }
197                 pr.blockKeeper.AddTx(tx, src.Key)
198
199         case *MineBlockMessage:
200                 block, err := msg.GetMineBlock()
201                 if err != nil {
202                         log.Errorf("Error decoding mined block %v", err)
203                         return
204                 }
205                 // Mark the peer as owning the block and schedule it for import
206                 hash := block.Hash()
207                 pr.peers.MarkBlock(src.Key, &hash)
208                 pr.fetcher.Enqueue(src.Key, block)
209                 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
210
211         default:
212                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
213         }
214 }