"golang.org/x/crypto/sha3"
)
+func DoubleSha256(b []byte) []byte {
+ hasher := sha3.New256()
+ hasher.Write(b)
+ sum := hasher.Sum(nil)
+ hasher.Reset()
+ hasher.Write(sum)
+ return hasher.Sum(nil)
+}
+
func Sha256(data ...[]byte) []byte {
d := sha3.New256()
for _, b := range data {
cfg "github.com/bytom/config"
"github.com/bytom/p2p"
+ "github.com/bytom/p2p/pex"
core "github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/version"
}
trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
- manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
+ addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
+ manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
+
+ pexReactor := pex.NewPEXReactor(addrBook)
+ manager.sw.AddReactor("PEX", pexReactor)
manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
-
protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
manager.sw.AddReactor("PROTOCOL", protocolReactor)
}
channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil {
- if pkt.ChannelID == PexChannel {
- continue
- } else {
- cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
- }
+ cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
}
msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil {
"time"
log "github.com/sirupsen/logrus"
- "github.com/tendermint/go-crypto"
+ tcrypto "github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
+
+ "github.com/bytom/crypto"
+ "github.com/bytom/p2p"
)
// AddrBook - concurrency safe peer address manager.
mtx sync.RWMutex
rand *rand.Rand
- ourAddrs map[string]*NetAddress
+ ourAddrs map[string]*p2p.NetAddress
addrLookup map[string]*knownAddress // new & old
bucketsNew []map[string]*knownAddress
bucketsOld []map[string]*knownAddress
a := &AddrBook{
filePath: filePath,
routabilityStrict: routabilityStrict,
- key: crypto.CRandHex(24),
+ key: tcrypto.CRandHex(24),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
- ourAddrs: make(map[string]*NetAddress),
+ ourAddrs: make(map[string]*p2p.NetAddress),
addrLookup: make(map[string]*knownAddress),
bucketsNew: make([]map[string]*knownAddress, newBucketCount),
bucketsOld: make([]map[string]*knownAddress, oldBucketCount),
}
// AddAddress add address to address book
-func (a *AddrBook) AddAddress(addr, src *NetAddress) error {
+func (a *AddrBook) AddAddress(addr, src *p2p.NetAddress) error {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.addAddress(addr, src)
}
+// AddOurAddress one of our addresses.
+func (a *AddrBook) AddOurAddress(addr *p2p.NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ a.ourAddrs[addr.String()] = addr
+}
+
// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
-func (a *AddrBook) GetSelection() []*NetAddress {
+func (a *AddrBook) GetSelection() []*p2p.NetAddress {
a.mtx.RLock()
defer a.mtx.RUnlock()
numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100)
numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
- allAddr := []*NetAddress{}
+ allAddr := []*p2p.NetAddress{}
for _, ka := range a.addrLookup {
allAddr = append(allAddr, ka.Addr)
}
}
// MarkGood marks the peer as good and moves it into an "old" bucket.
-func (a *AddrBook) MarkGood(addr *NetAddress) {
+func (a *AddrBook) MarkGood(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
}
// MarkAttempt marks that an attempt was made to connect to the address.
-func (a *AddrBook) MarkAttempt(addr *NetAddress) {
+func (a *AddrBook) MarkAttempt(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
}
// PickAddress picks a random address from random bucket
-func (a *AddrBook) PickAddress(bias int) *NetAddress {
+func (a *AddrBook) PickAddress(bias int) *p2p.NetAddress {
a.mtx.RLock()
defer a.mtx.RUnlock()
}
// RemoveAddress removes the address from the book.
-func (a *AddrBook) RemoveAddress(addr *NetAddress) {
+func (a *AddrBook) RemoveAddress(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.size()
}
-func (a *AddrBook) addAddress(addr, src *NetAddress) error {
+func (a *AddrBook) addAddress(addr, src *p2p.NetAddress) error {
if addr == nil || src == nil {
return errors.New("can't add nil to address book")
}
return nil
}
-func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
+func (a *AddrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
data1 := []byte{}
data1 = append(data1, []byte(a.key)...)
data1 = append(data1, []byte(a.groupKey(addr))...)
data1 = append(data1, []byte(a.groupKey(src))...)
- hash1 := doubleSha256(data1)
+ hash1 := crypto.DoubleSha256(data1)
hash64 := binary.BigEndian.Uint64(hash1)
hash64 %= newBucketsPerGroup
var hashbuf [8]byte
data2 = append(data2, a.groupKey(src)...)
data2 = append(data2, hashbuf[:]...)
- hash2 := doubleSha256(data2)
+ hash2 := crypto.DoubleSha256(data2)
return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
}
-func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
+func (a *AddrBook) calcOldBucket(addr *p2p.NetAddress) int {
data1 := []byte{}
data1 = append(data1, []byte(a.key)...)
data1 = append(data1, []byte(addr.String())...)
- hash1 := doubleSha256(data1)
+ hash1 := crypto.DoubleSha256(data1)
hash64 := binary.BigEndian.Uint64(hash1)
hash64 %= oldBucketsPerGroup
var hashbuf [8]byte
data2 = append(data2, a.groupKey(addr)...)
data2 = append(data2, hashbuf[:]...)
- hash2 := doubleSha256(data2)
+ hash2 := crypto.DoubleSha256(data2)
return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
}
func (a *AddrBook) expireNew(bucketIdx int) {
- for addrStr, ka := range a.bucketsNew[bucketIdx] {
+ for _, ka := range a.bucketsNew[bucketIdx] {
if ka.isBad() {
- a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
+ a.removeFromBucket(ka, bucketIdx)
return
}
}
}
}
-func (a *AddrBook) groupKey(na *NetAddress) string {
+func (a *AddrBook) groupKey(na *p2p.NetAddress) string {
if a.routabilityStrict && na.Local() {
return "local"
}
delete(bucket, ka.Addr.String())
if ka.removeBucketRef(bucketIdx) == 0 {
delete(a.addrLookup, ka.Addr.String())
- if ka.bucketType == bucketTypeNew {
+ if ka.BucketType == bucketTypeNew {
a.nNew--
} else {
a.nOld--
"encoding/json"
"os"
"time"
+
+ log "github.com/sirupsen/logrus"
+ cmn "github.com/tendermint/tmlibs/common"
)
type addrBookJSON struct {
Addrs []*knownAddress
}
-func (a *AddrBook) saveToFile() error {
+func (a *AddrBook) SaveToFile() error {
a.mtx.RLock()
defer a.mtx.RUnlock()
r, err := os.Open(a.filePath)
if err != nil {
- return error
+ return err
}
defer r.Close()
for {
select {
case <-ticker.C:
- if err := a.saveToFile(); err != nil {
+ if err := a.SaveToFile(); err != nil {
log.WithField("err", err).Error("failed to save AddrBook to file")
}
case <-a.Quit:
- a.saveToFile()
+ a.SaveToFile()
return
}
}
"fmt"
wire "github.com/tendermint/go-wire"
+
+ "github.com/bytom/p2p"
)
const (
func (m *pexRequestMessage) String() string { return "[pexRequest]" }
type pexAddrsMessage struct {
- Addrs []*NetAddress
+ Addrs []*p2p.NetAddress
}
func (m *pexAddrsMessage) String() string { return fmt.Sprintf("[pexAddrs %v]", m.Addrs) }
log "github.com/sirupsen/logrus"
cmn "github.com/tendermint/tmlibs/common"
+
+ "github.com/bytom/p2p"
)
const (
// PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
type PEXReactor struct {
- BaseReactor
+ p2p.BaseReactor
book *AddrBook
msgCountByPeer *cmn.CMap
}
book: b,
msgCountByPeer: cmn.NewCMap(),
}
- r.BaseReactor = *NewBaseReactor("PEXReactor", r)
+ r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
return r
}
}
// GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
- return []*ChannelDescriptor{&ChannelDescriptor{
+func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor {
+ return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{
ID: PexChannel,
Priority: 1,
SendQueueCapacity: 10,
}
// AddPeer adding peer to the address book
-func (r *PEXReactor) AddPeer(p *Peer) error {
+func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
if p.IsOutbound() {
if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
return errors.New("Send pex message fail")
return nil
}
- addr, err := NewNetAddressString(p.ListenAddr)
+ addr, err := p2p.NewNetAddressString(p.ListenAddr)
if err != nil {
return errors.New("addPeer: invalid peer address")
}
r.book.AddAddress(addr, addr)
- if r.Switch.peers.Size() >= r.Switch.config.MaxNumPeers {
+ if r.Switch.Peers().Size() >= r.Switch.Config.MaxNumPeers {
if r.SendAddrs(p, r.book.GetSelection()) {
<-time.After(1 * time.Second)
r.Switch.StopPeerGracefully(p)
}
// Receive implements Reactor by handling incoming PEX messages.
-func (r *PEXReactor) Receive(chID byte, p *Peer, rawMsg []byte) {
+func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
srcAddr := p.Connection().RemoteAddress
srcAddrStr := srcAddr.String()
r.incrementMsgCount(srcAddrStr)
switch msg := msg.(type) {
case *pexRequestMessage:
- if !r.SendAddrs(src, r.book.GetSelection()) {
+ if !r.SendAddrs(p, r.book.GetSelection()) {
log.Error("failed to send pex address message")
}
}
// RemovePeer implements Reactor.
-func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {}
+func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
// RequestPEX asks peer for more addresses.
-func (r *PEXReactor) RequestAddrs(p *Peer) bool {
+func (r *PEXReactor) RequestAddrs(p *p2p.Peer) bool {
ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
if !ok {
r.Switch.StopPeerGracefully(p)
}
// SendAddrs sends addrs to the peer.
-func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
+func (r *PEXReactor) SendAddrs(p *p2p.Peer, addrs []*p2p.NetAddress) bool {
ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
if !ok {
r.Switch.StopPeerGracefully(p)
return ok
}
-func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
- if _, err := r.Switch.DialPeerWithAddress(a, false); err != nil {
+func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
+ if err := r.Switch.DialPeerWithAddress(a); err != nil {
r.book.MarkAttempt(a)
} else {
r.book.MarkGood(a)
}
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
- toDial := make(map[string]*NetAddress)
+ toDial := make(map[string]*p2p.NetAddress)
maxAttempts := numToDial * 3
+
+ connectedPeers := make(map[string]struct{})
+ for _, peer := range r.Switch.Peers().List() {
+ connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+ }
+
for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
try := r.book.PickAddress(newBias)
if try == nil {
if dialling := r.Switch.IsDialing(try); dialling {
continue
}
- if connected := r.Switch.Peers().Has(try.ID); connected {
+ if _, ok := connectedPeers[try.IP.String()]; ok {
continue
}
case <-ticker.C:
r.ensurePeers()
case <-quickTicker.C:
- if r.Switch.peers.Size() < 3 {
+ if r.Switch.Peers().Size() < 3 {
r.ensurePeers()
}
case <-r.Quit:
ErrConnectBannedPeer = errors.New("Connect banned peer")
)
+// An AddrBook represents an address book from the pex package, which is used to store peer addresses.
+type AddrBook interface {
+ AddAddress(*NetAddress, *NetAddress) error
+ AddOurAddress(*NetAddress)
+ MarkGood(*NetAddress)
+ RemoveAddress(*NetAddress)
+ SaveToFile() error
+}
+
//-----------------------------------------------------------------------------
// Switch handles peer connections and exposes an API to receive incoming messages
type Switch struct {
cmn.BaseService
- config *cfg.P2PConfig
+ Config *cfg.P2PConfig
peerConfig *PeerConfig
listeners []Listener
reactors map[string]Reactor
dialing *cmn.CMap
nodeInfo *NodeInfo // our node info
nodePrivKey crypto.PrivKeyEd25519 // our node privkey
- addrBook *AddrBook
+ addrBook AddrBook
bannedPeer map[string]time.Time
db dbm.DB
mtx sync.Mutex
}
// NewSwitch creates a new Switch with the given config.
-func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
+func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
sw := &Switch{
- config: config,
+ Config: config,
peerConfig: DefaultPeerConfig(config),
reactors: make(map[string]Reactor),
chDescs: make([]*ChannelDescriptor, 0),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
nodeInfo: nil,
+ addrBook: addrBook,
db: trustHistoryDB,
}
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
-
- // Optionally, start the pex reactor
- if config.PexReactor {
- sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
- pexReactor := NewPEXReactor(sw.addrBook, sw)
- sw.AddReactor("PEX", pexReactor)
- }
-
sw.bannedPeer = make(map[string]time.Time)
if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
sw.addrBook.AddAddress(netAddr, ourAddr)
}
- sw.addrBook.Save()
+ sw.addrBook.SaveToFile()
}
//permute the list, dial them in random order.
// disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
// the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
// be double of MaxNumPeers
- if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
+ if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
inConn.Close()
log.Info("Ignoring inbound connection: already have enough peers.")
continue
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
- peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
+ peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.Config)
if err != nil {
conn.Close()
return err
privKey := crypto.GenPrivKeyEd25519()
// new switch, add reactors
// TODO: let the config be passed in?
- s := initSwitch(i, NewSwitch(cfg, nil))
+ s := initSwitch(i, NewSwitch(cfg, nil, nil))
s.SetNodeInfo(&NodeInfo{
PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
Moniker: cmn.Fmt("switch%d", i),
+++ /dev/null
-package p2p
-
-import (
- "crypto/sha256"
-)
-
-// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
-func doubleSha256(b []byte) []byte {
- hasher := sha256.New()
- hasher.Write(b)
- sum := hasher.Sum(nil)
- hasher.Reset()
- hasher.Write(sum)
- return hasher.Sum(nil)
-}