OSDN Git Service

001cf222bcc76bb7150690eaa18cafdf2083aaca
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "strings"
6         "sync"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10         cmn "github.com/tendermint/tmlibs/common"
11
12         "github.com/bytom/errors"
13         "github.com/bytom/p2p"
14         "github.com/bytom/protocol"
15         "github.com/bytom/protocol/bc"
16         "github.com/bytom/protocol/bc/types"
17 )
18
19 const (
20         // BlockchainChannel is a channel for blocks and status updates
21         BlockchainChannel        = byte(0x40)
22         protocolHandshakeTimeout = time.Second * 10
23         handshakeRetryTicker     = 4 * time.Second
24 )
25
26 var (
27         //ErrProtocolHandshakeTimeout peers handshake timeout
28         ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
29         ErrStatusRequest            = errors.New("Status request error")
30         ErrDiffGenesisHash          = errors.New("Different genesis hash")
31 )
32
33 // Response describes the response standard.
34 type Response struct {
35         Status string      `json:"status,omitempty"`
36         Msg    string      `json:"msg,omitempty"`
37         Data   interface{} `json:"data,omitempty"`
38 }
39
40 type initalPeerStatus struct {
41         peerID      string
42         height      uint64
43         hash        *bc.Hash
44         genesisHash *bc.Hash
45 }
46
47 //ProtocolReactor handles new coming protocol message.
48 type ProtocolReactor struct {
49         p2p.BaseReactor
50
51         chain       *protocol.Chain
52         blockKeeper *blockKeeper
53         txPool      *protocol.TxPool
54         sw          *p2p.Switch
55         fetcher     *Fetcher
56         peers       *peerSet
57         handshakeMu sync.Mutex
58         genesisHash bc.Hash
59
60         newPeerCh      chan struct{}
61         quitReqBlockCh chan *string
62         txSyncCh       chan *txsync
63         peerStatusCh   chan *initalPeerStatus
64 }
65
66 // NewProtocolReactor returns the reactor of whole blockchain.
67 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
68         pr := &ProtocolReactor{
69                 chain:          chain,
70                 blockKeeper:    blockPeer,
71                 txPool:         txPool,
72                 sw:             sw,
73                 fetcher:        fetcher,
74                 peers:          peers,
75                 newPeerCh:      newPeerCh,
76                 txSyncCh:       txSyncCh,
77                 quitReqBlockCh: quitReqBlockCh,
78                 peerStatusCh:   make(chan *initalPeerStatus),
79         }
80         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
81         genesisBlock, _ := pr.chain.GetBlockByHeight(0)
82         pr.genesisHash = genesisBlock.Hash()
83
84         return pr
85 }
86
87 // GetChannels implements Reactor
88 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
89         return []*p2p.ChannelDescriptor{
90                 &p2p.ChannelDescriptor{
91                         ID:                BlockchainChannel,
92                         Priority:          5,
93                         SendQueueCapacity: 100,
94                 },
95         }
96 }
97
98 // OnStart implements BaseService
99 func (pr *ProtocolReactor) OnStart() error {
100         pr.BaseReactor.OnStart()
101         return nil
102 }
103
104 // OnStop implements BaseService
105 func (pr *ProtocolReactor) OnStop() {
106         pr.BaseReactor.OnStop()
107 }
108
109 // syncTransactions starts sending all currently pending transactions to the given peer.
110 func (pr *ProtocolReactor) syncTransactions(p *peer) {
111         if p == nil {
112                 return
113         }
114         pending := pr.txPool.GetTransactions()
115         if len(pending) == 0 {
116                 return
117         }
118         txs := make([]*types.Tx, len(pending))
119         for i, batch := range pending {
120                 txs[i] = batch.Tx
121         }
122         pr.txSyncCh <- &txsync{p, txs}
123 }
124
125 // AddPeer implements Reactor by sending our state to peer.
126 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
127         pr.handshakeMu.Lock()
128         defer pr.handshakeMu.Unlock()
129         if peer == nil {
130                 return errPeerDropped
131         }
132         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
133                 return ErrStatusRequest
134         }
135         retryTicker := time.Tick(handshakeRetryTicker)
136         handshakeWait := time.NewTimer(protocolHandshakeTimeout)
137         for {
138                 select {
139                 case status := <-pr.peerStatusCh:
140                         if status.peerID == peer.Key {
141                                 if strings.Compare(pr.genesisHash.String(), status.genesisHash.String()) != 0 {
142                                         log.Info("Remote peer genesis block hash:", status.genesisHash.String(), " local hash:", pr.genesisHash.String())
143                                         return ErrDiffGenesisHash
144                                 }
145                                 pr.peers.AddPeer(peer)
146                                 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
147                                 prPeer, ok := pr.peers.Peer(peer.Key)
148                                 if !ok {
149                                         return errPeerDropped
150                                 }
151                                 pr.syncTransactions(prPeer)
152                                 pr.newPeerCh <- struct{}{}
153                                 return nil
154                         }
155                 case <-retryTicker:
156                         if peer == nil {
157                                 return errPeerDropped
158                         }
159                         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
160                                 return ErrStatusRequest
161                         }
162                 case <-handshakeWait.C:
163                         return ErrProtocolHandshakeTimeout
164                 }
165         }
166 }
167
168 // RemovePeer implements Reactor by removing peer from the pool.
169 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
170         select {
171         case pr.quitReqBlockCh <- &peer.Key:
172         default:
173                 log.Warning("quitReqBlockCh is full")
174         }
175         pr.peers.RemovePeer(peer.Key)
176 }
177
178 // Receive implements Reactor by handling 4 types of messages (look below).
179 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
180         _, msg, err := DecodeMessage(msgBytes)
181         if err != nil {
182                 log.Errorf("Error decoding messagek %v", err)
183                 return
184         }
185         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
186
187         switch msg := msg.(type) {
188         case *BlockRequestMessage:
189                 var block *types.Block
190                 var err error
191                 if msg.Height != 0 {
192                         block, err = pr.chain.GetBlockByHeight(msg.Height)
193                 } else {
194                         block, err = pr.chain.GetBlockByHash(msg.GetHash())
195                 }
196                 if err != nil {
197                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
198                         return
199                 }
200                 response, err := NewBlockResponseMessage(block)
201                 if err != nil {
202                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
203                         return
204                 }
205                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
206
207         case *BlockResponseMessage:
208                 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
209                 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
210
211         case *StatusRequestMessage:
212                 blockHeader := pr.chain.BestBlockHeader()
213                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader, &pr.genesisHash)})
214
215         case *StatusResponseMessage:
216                 peerStatus := &initalPeerStatus{
217                         peerID:      src.Key,
218                         height:      msg.Height,
219                         hash:        msg.GetHash(),
220                         genesisHash: msg.GetGenesisHash(),
221                 }
222                 pr.peerStatusCh <- peerStatus
223
224         case *TransactionNotifyMessage:
225                 tx, err := msg.GetTransaction()
226                 if err != nil {
227                         log.Errorf("Error decoding new tx %v", err)
228                         return
229                 }
230                 pr.blockKeeper.AddTx(tx, src.Key)
231
232         case *MineBlockMessage:
233                 block, err := msg.GetMineBlock()
234                 if err != nil {
235                         log.Errorf("Error decoding mined block %v", err)
236                         return
237                 }
238                 // Mark the peer as owning the block and schedule it for import
239                 hash := block.Hash()
240                 pr.peers.MarkBlock(src.Key, &hash)
241                 pr.fetcher.Enqueue(src.Key, block)
242                 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
243
244         default:
245                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
246         }
247 }