OSDN Git Service

Replace the basic log in p2p (#60)
authorLiu-Cheng Xu <xuliuchengxlc@gmail.com>
Fri, 20 Oct 2017 01:34:21 +0000 (09:34 +0800)
committerGuanghua Guo <1536310027@qq.com>
Fri, 20 Oct 2017 01:34:21 +0000 (09:34 +0800)
p2p/connection.go
p2p/peer.go
p2p/peer_test.go
p2p/pex_reactor.go
p2p/switch.go

index 36f15ab..dfdcc42 100644 (file)
@@ -10,6 +10,7 @@ import (
        "sync/atomic"
        "time"
 
+       log "github.com/sirupsen/logrus"
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        flow "github.com/tendermint/tmlibs/flowrate"
@@ -177,10 +178,10 @@ func (c *MConnection) String() string {
 }
 
 func (c *MConnection) flush() {
-       c.Logger.Debug("Flush", "conn", c)
+       log.WithField("conn", c).Debug("Flush")
        err := c.bufWriter.Flush()
        if err != nil {
-               c.Logger.Error("MConnection flush failed", "error", err)
+               log.WithField("error", err).Error("MConnection flush failed")
        }
 }
 
@@ -208,12 +209,16 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
                return false
        }
 
-       c.Logger.Debug("Send", "channel", chID, "conn", c, "msg", msg) //, "bytes", wire.BinaryBytes(msg))
+       log.WithFields(log.Fields{
+               "chID": chID,
+               "conn": c,
+               "msg":  msg,
+       }).Debug("Send")
 
        // Send message to channel.
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
+               log.WithField("chID", chID).Error(cmn.Fmt("Cannot send bytes, unknown channel"))
                return false
        }
 
@@ -225,7 +230,11 @@ func (c *MConnection) Send(chID byte, msg interface{}) bool {
                default:
                }
        } else {
-               c.Logger.Error("Send failed", "channel", chID, "conn", c, "msg", msg)
+               log.WithFields(log.Fields{
+                       "chID": chID,
+                       "conn": c,
+                       "msg":  msg,
+               }).Error("Send failed")
        }
        return success
 }
@@ -237,12 +246,16 @@ func (c *MConnection) TrySend(chID byte, msg interface{}) bool {
                return false
        }
 
-       c.Logger.Debug("TrySend", "channel", chID, "conn", c, "msg", msg)
+       log.WithFields(log.Fields{
+               "chID": chID,
+               "conn": c,
+               "msg":  msg,
+       }).Debug("TrySend")
 
        // Send message to channel.
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               c.Logger.Error(cmn.Fmt("Cannot send bytes, unknown channel %X", chID))
+               log.WithField("chID", chID).Error(cmn.Fmt("cannot send bytes, unknown channel"))
                return false
        }
 
@@ -267,7 +280,7 @@ func (c *MConnection) CanSend(chID byte) bool {
 
        channel, ok := c.channelsIdx[chID]
        if !ok {
-               c.Logger.Error(cmn.Fmt("Unknown channel %X", chID))
+               log.WithField("chID", chID).Error(cmn.Fmt("Unknown channel"))
                return false
        }
        return channel.canSend()
@@ -291,12 +304,12 @@ FOR_LOOP:
                                channel.updateStats()
                        }
                case <-c.pingTimer.Ch:
-                       c.Logger.Debug("Send Ping")
+                       log.Debug("Send Ping")
                        wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
                        c.sendMonitor.Update(int(n))
                        c.flush()
                case <-c.pong:
-                       c.Logger.Debug("Send Pong")
+                       log.Debug("Send Pong")
                        wire.WriteByte(packetTypePong, c.bufWriter, &n, &err)
                        c.sendMonitor.Update(int(n))
                        c.flush()
@@ -318,7 +331,10 @@ FOR_LOOP:
                        break FOR_LOOP
                }
                if err != nil {
-                       c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "error", err)
+                       log.WithFields(log.Fields{
+                               "conn":  c,
+                               "error": err,
+                       }).Error("Connection failed @ sendRoutine")
                        c.stopForError(err)
                        break FOR_LOOP
                }
@@ -373,7 +389,7 @@ func (c *MConnection) sendMsgPacket() bool {
        // Make & send a msgPacket from this channel
        n, err := leastChannel.writeMsgPacketTo(c.bufWriter)
        if err != nil {
-               c.Logger.Error("Failed to write msgPacket", "error", err)
+               log.WithField("error", err).Error("Failed to write msgPacket")
                c.stopForError(err)
                return true
        }
@@ -415,7 +431,10 @@ FOR_LOOP:
                c.recvMonitor.Update(int(n))
                if err != nil {
                        if c.IsRunning() {
-                               c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
+                               log.WithFields(log.Fields{
+                                       "conn":  c,
+                                       "error": err,
+                               }).Error("Connection failed @ recvRoutine (reading byte)")
                                c.stopForError(err)
                        }
                        break FOR_LOOP
@@ -425,18 +444,21 @@ FOR_LOOP:
                switch pktType {
                case packetTypePing:
                        // TODO: prevent abuse, as they cause flush()'s.
-                       c.Logger.Debug("Receive Ping")
+                       log.Debug("Receive Ping")
                        c.pong <- struct{}{}
                case packetTypePong:
                        // do nothing
-                       c.Logger.Debug("Receive Pong")
+                       log.Debug("Receive Pong")
                case packetTypeMsg:
                        pkt, n, err := msgPacket{}, int(0), error(nil)
                        wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
                        c.recvMonitor.Update(int(n))
                        if err != nil {
                                if c.IsRunning() {
-                                       c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
+                                       log.WithFields(log.Fields{
+                                               "conn":  c,
+                                               "error": err,
+                                       }).Error("Connection failed @ recvRoutine")
                                        c.stopForError(err)
                                }
                                break FOR_LOOP
@@ -448,13 +470,20 @@ FOR_LOOP:
                        msgBytes, err := channel.recvMsgPacket(pkt)
                        if err != nil {
                                if c.IsRunning() {
-                                       c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "error", err)
+                                       log.WithFields(log.Fields{
+                                               "conn":  c,
+                                               "error": err,
+                                       }).Error("Connection failed @ recvRoutine")
                                        c.stopForError(err)
                                }
                                break FOR_LOOP
                        }
                        if msgBytes != nil {
                                c.Logger.Debug("Received bytes", "chID", pkt.ChannelID, "msgBytes", msgBytes)
+                               log.WithFields(log.Fields{
+                                       "channelID": pkt.ChannelID,
+                                       "msgBytes":  msgBytes,
+                               }).Debug("Received bytes")
                                c.onReceive(pkt.ChannelID, msgBytes)
                        }
                default:
@@ -626,7 +655,6 @@ func (ch *Channel) nextMsgPacket() msgPacket {
 // Not goroutine-safe
 func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
        packet := ch.nextMsgPacket()
-       // log.Debug("Write Msg Packet", "conn", ch.conn, "packet", packet)
        wire.WriteByte(packetTypeMsg, w, &n, &err)
        wire.WriteBinary(packet, w, &n, &err)
        if err == nil {
@@ -638,7 +666,6 @@ func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) {
 // Handles incoming msgPackets. Returns a msg bytes if msg is complete.
 // Not goroutine-safe
 func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) {
-       // log.Debug("Read Msg Packet", "conn", ch.conn, "packet", packet)
        if ch.desc.RecvMessageCapacity < len(ch.recving)+len(packet.Bytes) {
                return nil, wire.ErrBinaryReadOverflow
        }
index 2602206..89e0a28 100644 (file)
@@ -7,6 +7,7 @@ import (
        "time"
 
        "github.com/pkg/errors"
+       log "github.com/sirupsen/logrus"
        crypto "github.com/tendermint/go-crypto"
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
@@ -156,7 +157,7 @@ func (p *Peer) HandshakeTimeout(ourNodeInfo *NodeInfo, timeout time.Duration) er
                func() {
                        var n int
                        wire.ReadBinary(peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
-                       p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
+                       log.WithField("peerNodeInfo", peerNodeInfo).Info("Peer handshake")
                })
        if err1 != nil {
                return errors.Wrap(err1, "Error during handshake/write")
index 0ac7763..fcd48f9 100644 (file)
@@ -1,7 +1,6 @@
 package p2p
 
 import (
-       golog "log"
        "net"
        "testing"
        "time"
@@ -9,6 +8,7 @@ import (
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
+       log "github.com/sirupsen/logrus"
        crypto "github.com/tendermint/go-crypto"
 )
 
@@ -116,7 +116,7 @@ func (p *remotePeer) PubKey() crypto.PubKeyEd25519 {
 func (p *remotePeer) Start() {
        l, e := net.Listen("tcp", "127.0.0.1:0") // any available address
        if e != nil {
-               golog.Fatalf("net.Listen tcp :0: %+v", e)
+               log.Fatalf("net.Listen tcp :0: %+v", e)
        }
        p.addr = NewNetAddress(l.Addr())
        p.quit = make(chan struct{})
@@ -131,11 +131,11 @@ func (p *remotePeer) accept(l net.Listener) {
        for {
                conn, err := l.Accept()
                if err != nil {
-                       golog.Fatalf("Failed to accept conn: %+v", err)
+                       log.Fatalf("Failed to accept conn: %+v", err)
                }
                peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config)
                if err != nil {
-                       golog.Fatalf("Failed to create a peer: %+v", err)
+                       log.Fatalf("Failed to create a peer: %+v", err)
                }
                err = peer.HandshakeTimeout(&NodeInfo{
                        PubKey:  p.PrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
@@ -144,7 +144,7 @@ func (p *remotePeer) accept(l net.Listener) {
                        Version: "123.123.123",
                }, 1*time.Second)
                if err != nil {
-                       golog.Fatalf("Failed to perform handshake: %+v", err)
+                       log.Fatalf("Failed to perform handshake: %+v", err)
                }
                select {
                case <-p.quit:
index 269a8d0..75c383f 100644 (file)
@@ -7,6 +7,7 @@ import (
        "reflect"
        "time"
 
+       log "github.com/sirupsen/logrus"
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
 )
@@ -105,7 +106,10 @@ func (r *PEXReactor) AddPeer(p *Peer) {
                addr, err := NewNetAddressString(p.ListenAddr)
                if err != nil {
                        // this should never happen
-                       r.Logger.Error("Error in AddPeer: invalid peer address", "addr", p.ListenAddr, "error", err)
+                       log.WithFields(log.Fields{
+                               "addr":  p.ListenAddr,
+                               "error": err,
+                       }).Error("Error in AddPeer: Invalid peer address")
                        return
                }
                r.book.AddAddress(addr, addr)
@@ -125,17 +129,17 @@ func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
 
        r.IncrementMsgCountForPeer(srcAddrStr)
        if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
-               r.Logger.Error("Maximum number of messages reached for peer", "peer", srcAddrStr)
+               log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
                // TODO remove src from peers?
                return
        }
 
        _, msg, err := DecodeMessage(msgBytes)
        if err != nil {
-               r.Logger.Error("Error decoding message", "error", err)
+               log.WithField("error", err).Error("Error decoding message")
                return
        }
-       r.Logger.Info("Received message", "msg", msg)
+       log.WithField("msg", msg).Info("Reveived message")
 
        switch msg := msg.(type) {
        case *pexRequestMessage:
@@ -150,7 +154,7 @@ func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
                        }
                }
        default:
-               r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
+               log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
        }
 }
 
@@ -230,7 +234,11 @@ func (r *PEXReactor) ensurePeersRoutine() {
 func (r *PEXReactor) ensurePeers() {
        numOutPeers, _, numDialing := r.Switch.NumPeers()
        numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
-       r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial)
+       log.WithFields(log.Fields{
+               "numOutPeers": numOutPeers,
+               "numDialing":  numDialing,
+               "numToDial":   numToDial,
+       }).Info("Ensure peers")
        if numToDial <= 0 {
                return
        }
@@ -257,13 +265,9 @@ func (r *PEXReactor) ensurePeers() {
                        alreadyDialing := r.Switch.IsDialing(try)
                        alreadyConnected := r.Switch.Peers().Has(try.IP.String())
                        if alreadySelected || alreadyDialing || alreadyConnected {
-                               // r.Logger.Info("Cannot dial address", "addr", try,
-                               //      "alreadySelected", alreadySelected,
-                               //      "alreadyDialing", alreadyDialing,
-                               //  "alreadyConnected", alreadyConnected)
                                continue
                        } else {
-                               r.Logger.Info("Will dial address", "addr", try)
+                               log.WithField("addr", try).Info("Will dial address")
                                picked = try
                                break
                        }
@@ -289,7 +293,7 @@ func (r *PEXReactor) ensurePeers() {
                if peers := r.Switch.Peers().List(); len(peers) > 0 {
                        i := rand.Int() % len(peers)
                        peer := peers[i]
-                       r.Logger.Info("No addresses to dial. Sending pexRequest to random peer", "peer", peer)
+                       log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
                        r.RequestPEX(peer)
                }
        }
index 031a513..0979007 100644 (file)
@@ -8,6 +8,7 @@ import (
        "time"
 
        cfg "github.com/bytom/config"
+       log "github.com/sirupsen/logrus"
        crypto "github.com/tendermint/go-crypto"
        cmn "github.com/tendermint/tmlibs/common"
 )
@@ -238,7 +239,7 @@ func (sw *Switch) AddPeer(peer *Peer) error {
                return err
        }
 
-       sw.Logger.Info("Added peer", "peer", peer)
+       log.WithField("peer", peer).Info("Added peer")
        return nil
 }
 
@@ -309,9 +310,9 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
 func (sw *Switch) dialSeed(addr *NetAddress) {
        peer, err := sw.DialPeerWithAddress(addr, true)
        if err != nil {
-               sw.Logger.Error("Error dialing seed", "error", err)
+               log.WithField("error", err).Error("Error dialing seed")
        } else {
-               sw.Logger.Info("Connected to seed", "peer", peer)
+               log.WithField("peer", peer).Info("Connected to seed")
        }
 }
 
@@ -319,10 +320,13 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
        sw.dialing.Set(addr.IP.String(), addr)
        defer sw.dialing.Delete(addr.IP.String())
 
-       sw.Logger.Info("Dialing peer", "address", addr)
+       log.WithField("address", addr).Info("Dialing peer")
        peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
        if err != nil {
-               sw.Logger.Info("Failed to dial peer", "address", addr, "error", err)
+               log.WithFields(log.Fields{
+                       "address": addr,
+                       "error":   err,
+               }).Info("Failed to dial peer")
                return nil, err
        }
        peer.SetLogger(sw.Logger.With("peer", addr))
@@ -331,11 +335,17 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
        }
        err = sw.AddPeer(peer)
        if err != nil {
-               sw.Logger.Info("Failed to add peer", "address", addr, "error", err)
+               log.WithFields(log.Fields{
+                       "address": addr,
+                       "error":   err,
+               }).Info("Failed to add peer")
                peer.CloseConn()
                return nil, err
        }
-       sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer)
+       log.WithFields(log.Fields{
+               "address": addr,
+               "error":   err,
+       }).Info("Dialed and added peer")
        return peer, nil
 }
 
@@ -349,7 +359,10 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
        successChan := make(chan bool, len(sw.peers.List()))
-       sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
+       log.WithFields(log.Fields{
+               "chID": chID,
+               "msg":  msg,
+       }).Debug("Broadcast")
        for _, peer := range sw.peers.List() {
                go func(peer *Peer) {
                        success := peer.Send(chID, msg)
@@ -381,12 +394,15 @@ func (sw *Switch) Peers() IPeerSet {
 // TODO: make record depending on reason.
 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
        addr := NewNetAddress(peer.Addr())
-       sw.Logger.Info("Stopping peer for error", "peer", peer, "error", reason)
+       log.WithFields(log.Fields{
+               "peer":  peer,
+               "error": reason,
+       }).Info("Stopping peer due to error")
        sw.stopAndRemovePeer(peer, reason)
 
        if peer.IsPersistent() {
                go func() {
-                       sw.Logger.Info("Reconnecting to peer", "peer", peer)
+                       log.WithField("peer", peer).Info("Reconnecting to peer")
                        for i := 1; i < reconnectAttempts; i++ {
                                if !sw.IsRunning() {
                                        return
@@ -395,15 +411,21 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
                                peer, err := sw.DialPeerWithAddress(addr, true)
                                if err != nil {
                                        if i == reconnectAttempts {
-                                               sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "error", err)
+                                               log.WithFields(log.Fields{
+                                                       "retries": i,
+                                                       "error":   err,
+                                               }).Info("Error reconnecting to peer. Giving up")
                                                return
                                        }
-                                       sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "error", err)
+                                       log.WithFields(log.Fields{
+                                               "retries": i,
+                                               "error":   err,
+                                       }).Info("Error reconnecting to peer. Trying again")
                                        time.Sleep(reconnectInterval)
                                        continue
                                }
 
-                               sw.Logger.Info("Reconnected to peer", "peer", peer)
+                               log.WithField("peer", peer).Info("Reconnected to peer")
                                return
                        }
                }()
@@ -413,7 +435,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
 // Disconnect from a peer gracefully.
 // TODO: handle graceful disconnects.
 func (sw *Switch) StopPeerGracefully(peer *Peer) {
-       sw.Logger.Info("Stopping peer gracefully")
+       log.Info("Stopping peer gracefully")
        sw.stopAndRemovePeer(peer, nil)
 }
 
@@ -435,14 +457,21 @@ func (sw *Switch) listenerRoutine(l Listener) {
                // ignore connection if we already have enough
                maxPeers := sw.config.MaxNumPeers
                if maxPeers <= sw.peers.Size() {
-                       sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
+                       log.WithFields(log.Fields{
+                               "address":  inConn.RemoteAddr().String(),
+                               "numPeers": sw.peers.Size(),
+                               "max":      maxPeers,
+                       }).Info("Ignoring inbound connection: already have enough peers")
                        continue
                }
 
                // New inbound connection!
                err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
                if err != nil {
-                       sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
+                       log.WithFields(log.Fields{
+                               "address": inConn.RemoteAddr().String(),
+                               "error":   err,
+                       }).Info("Ignoring inbound connection: error while adding peer")
                        continue
                }