OSDN Git Service

Merge branch 'dev' into dev-verify
[bytom/bytom.git] / p2p / connection.go
index ec43e4a..0b966f0 100644 (file)
@@ -79,8 +79,8 @@ type MConnection struct {
 
        quit         chan struct{}
        flushTimer   *cmn.ThrottleTimer // flush writes as necessary but throttled.
-       pingTimer    *cmn.RepeatTimer   // send pings periodically
-       chStatsTimer *cmn.RepeatTimer   // update channel stats periodically
+       pingTimer    *time.Ticker       // send pings periodically
+       chStatsTimer *time.Ticker       // update channel stats periodically
 
        LocalAddress  *NetAddress
        RemoteAddress *NetAddress
@@ -124,6 +124,9 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
                onError:     onError,
                config:      config,
 
+               pingTimer:    time.NewTicker(pingTimeout),
+               chStatsTimer: time.NewTicker(updateState),
+
                LocalAddress:  NewNetAddress(conn.LocalAddr()),
                RemoteAddress: NewNetAddress(conn.RemoteAddr()),
        }
@@ -150,8 +153,6 @@ func (c *MConnection) OnStart() error {
        c.BaseService.OnStart()
        c.quit = make(chan struct{})
        c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle)
-       c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
-       c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
        go c.sendRoutine()
        go c.recvRoutine()
        return nil
@@ -160,8 +161,6 @@ func (c *MConnection) OnStart() error {
 func (c *MConnection) OnStop() {
        c.BaseService.OnStop()
        c.flushTimer.Stop()
-       c.pingTimer.Stop()
-       c.chStatsTimer.Stop()
        if c.quit != nil {
                close(c.quit)
        }
@@ -299,11 +298,11 @@ FOR_LOOP:
                        // NOTE: flushTimer.Set() must be called every time
                        // something is written to .bufWriter.
                        c.flush()
-               case <-c.chStatsTimer.Ch:
+               case <-c.chStatsTimer.C:
                        for _, channel := range c.channels {
                                channel.updateStats()
                        }
-               case <-c.pingTimer.Ch:
+               case <-c.pingTimer.C:
                        log.Debug("Send Ping")
                        wire.WriteByte(packetTypePing, c.bufWriter, &n, &err)
                        c.sendMonitor.Update(int(n))
@@ -412,15 +411,15 @@ FOR_LOOP:
                /*
                        // Peek into bufReader for debugging
                        if numBytes := c.bufReader.Buffered(); numBytes > 0 {
-                               log.Info("Peek connection buffer", "numBytes", numBytes, "bytes", log15.Lazy{func() []byte {
-                                       bytes, err := c.bufReader.Peek(MinInt(numBytes, 100))
-                                       if err == nil {
-                                               return bytes
-                                       } else {
-                                               log.Warn("Error peeking connection buffer", "error", err)
-                                               return nil
-                                       }
-                               }})
+                               log.Infof("Peek connection buffer numBytes:", numBytes)
+                               bytes, err := c.bufReader.Peek(cmn.MinInt(numBytes, 100))
+                               if err == nil {
+                                       log.Infof("bytes:", bytes)
+                               } else {
+                                       log.Warning("Error peeking connection buffer err:", err)
+                               }
+                       } else {
+                               log.Warning("Received bytes number is:", numBytes)
                        }
                */
 
@@ -435,6 +434,7 @@ FOR_LOOP:
                                        "conn":  c,
                                        "error": err,
                                }).Error("Connection failed @ recvRoutine (reading byte)")
+                               c.conn.Close()
                                c.stopForError(err)
                        }
                        break FOR_LOOP
@@ -465,7 +465,11 @@ FOR_LOOP:
                        }
                        channel, ok := c.channelsIdx[pkt.ChannelID]
                        if !ok || channel == nil {
-                               cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
+                               if pkt.ChannelID == PexChannel {
+                                       continue
+                               } else {
+                                       cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
+                               }
                        }
                        msgBytes, err := channel.recvMsgPacket(pkt)
                        if err != nil {
@@ -488,10 +492,6 @@ FOR_LOOP:
                default:
                        cmn.PanicSanity(cmn.Fmt("Unknown message type %X", pktType))
                }
-
-               // TODO: shouldn't this go in the sendRoutine?
-               // Better to send a ping packet when *we* haven't sent anything for a while.
-               c.pingTimer.Reset()
        }
 
        // Cleanup