6 log "github.com/sirupsen/logrus"
7 "gopkg.in/fatih/set.v0"
9 "github.com/bytom/errors"
10 "github.com/bytom/p2p"
11 "github.com/bytom/protocol/bc"
12 "github.com/bytom/protocol/bc/types"
16 errClosed = errors.New("peer set is closed")
17 errAlreadyRegistered = errors.New("peer is already registered")
18 errNotRegistered = errors.New("peer is not registered")
21 const defaultVersion = 1
25 version int // Protocol version negotiated
31 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
32 knownBlocks *set.Set // Set of block hashes known to be known by this peer
35 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
37 version: defaultVersion,
42 knownBlocks: set.New(),
46 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
49 return p.height, p.hash
52 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
60 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
61 msg := &BlockRequestMessage{RawHash: hash.Byte32()}
62 p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
66 func (p *peer) requestBlockByHeight(height uint64) error {
67 msg := &BlockRequestMessage{Height: height}
68 p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
72 func (p *peer) SendTransactions(txs []*types.Tx) error {
73 for _, tx := range txs {
74 msg, err := NewTransactionNotifyMessage(tx)
76 return errors.New("Failed construction tx msg")
79 p.knownTxs.Add(hash.String())
80 p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
85 func (p *peer) getPeer() *p2p.Peer {
92 // MarkTransaction marks a transaction as known for the peer, ensuring that it
93 // will never be propagated to this particular peer.
94 func (p *peer) MarkTransaction(hash *bc.Hash) {
98 // If we reached the memory allowance, drop a previously known transaction hash
99 for p.knownTxs.Size() >= maxKnownTxs {
102 p.knownTxs.Add(hash.String())
105 // MarkBlock marks a block as known for the peer, ensuring that the block will
106 // never be propagated to this particular peer.
107 func (p *peer) MarkBlock(hash *bc.Hash) {
111 // If we reached the memory allowance, drop a previously known block hash
112 for p.knownBlocks.Size() >= maxKnownBlocks {
115 p.knownBlocks.Add(hash.String())
118 type peerSet struct {
119 peers map[string]*peer
124 // newPeerSet creates a new peer set to track the active participants.
125 func newPeerSet() *peerSet {
127 peers: make(map[string]*peer),
131 // Register injects a new peer into the working set, or returns an error if the
132 // peer is already known.
133 func (ps *peerSet) Register(p *peer) error {
135 defer ps.lock.Unlock()
140 if _, ok := ps.peers[p.id]; ok {
141 return errAlreadyRegistered
147 // Unregister removes a remote peer from the active set, disabling any further
148 // actions to/from that particular entity.
149 func (ps *peerSet) Unregister(id string) error {
151 defer ps.lock.Unlock()
153 if _, ok := ps.peers[id]; !ok {
154 return errNotRegistered
160 func (ps *peerSet) DropPeer(id string) error {
162 defer ps.lock.Unlock()
164 peer, ok := ps.peers[id]
166 return errNotRegistered
172 // Peer retrieves the registered peer with the given id.
173 func (ps *peerSet) Peer(id string) *peer {
175 defer ps.lock.RUnlock()
180 // Len returns if the current number of peers in the set.
181 func (ps *peerSet) Len() int {
183 defer ps.lock.RUnlock()
188 // MarkTransaction marks a transaction as known for the peer, ensuring that it
189 // will never be propagated to this particular peer.
190 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
192 defer ps.lock.RUnlock()
194 if peer, ok := ps.peers[peerID]; ok {
195 peer.MarkTransaction(hash)
199 // MarkBlock marks a block as known for the peer, ensuring that the block will
200 // never be propagated to this particular peer.
201 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
203 defer ps.lock.RUnlock()
205 if peer, ok := ps.peers[peerID]; ok {
210 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
211 // their set of known hashes.
212 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
214 defer ps.lock.RUnlock()
216 list := make([]*peer, 0, len(ps.peers))
217 for _, p := range ps.peers {
218 if !p.knownBlocks.Has(hash.String()) {
219 list = append(list, p)
225 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
226 // in their set of known hashes.
227 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
229 defer ps.lock.RUnlock()
231 list := make([]*peer, 0, len(ps.peers))
232 for _, p := range ps.peers {
233 if !p.knownTxs.Has(hash.String()) {
234 list = append(list, p)
240 // BestPeer retrieves the known peer with the currently highest total difficulty.
241 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
243 defer ps.lock.RUnlock()
245 var bestPeer *p2p.Peer
246 var bestHeight uint64
248 for _, p := range ps.peers {
249 if bestPeer == nil || p.height > bestHeight {
250 bestPeer, bestHeight = p.Peer, p.height
254 return bestPeer, bestHeight
257 // Close disconnects all peers.
258 // No new peers can be registered after Close has returned.
259 func (ps *peerSet) Close() {
261 defer ps.lock.Unlock()
263 for _, p := range ps.peers {
269 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
271 defer ps.lock.Unlock()
273 if _, ok := ps.peers[peer.Key]; !ok {
274 keeperPeer := newPeer(0, nil, peer)
275 ps.peers[peer.Key] = keeperPeer
276 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
279 log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
282 func (ps *peerSet) RemovePeer(peerID string) {
284 defer ps.lock.Unlock()
286 delete(ps.peers, peerID)
287 log.WithField("ID", peerID).Info("Delete peer from peerset")
290 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
292 defer ps.lock.Unlock()
294 if peer, ok := ps.peers[peerID]; ok {
295 peer.SetStatus(height, hash)
299 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
301 defer ps.lock.Unlock()
303 peer, ok := ps.peers[peerID]
305 return errors.New("Can't find peer. ")
307 return peer.requestBlockByHash(hash)
310 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
312 defer ps.lock.Unlock()
314 peer, ok := ps.peers[peerID]
316 return errors.New("Can't find peer. ")
318 return peer.requestBlockByHeight(height)
321 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
322 msg, err := NewMinedBlockMessage(block)
324 return errors.New("Failed construction block msg")
327 peers := ps.PeersWithoutBlock(&hash)
328 for _, peer := range peers {
329 ps.MarkBlock(peer.Key, &hash)
330 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
335 func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
336 msg, err := NewTransactionNotifyMessage(tx)
338 return errors.New("Failed construction tx msg")
340 peers := ps.PeersWithoutTx(&tx.ID)
341 for _, peer := range peers {
342 ps.peers[peer.Key].MarkTransaction(&tx.ID)
343 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})