OSDN Git Service

Introduce new backend libpq functions which send data to client
authorMasaoFujii <masao.fujii@gmail.com>
Tue, 30 Nov 2010 01:47:15 +0000 (10:47 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Tue, 30 Nov 2010 11:17:15 +0000 (20:17 +0900)
if writable without blocking.

These functions are required to introduce the replication timeout
to walsender process.

src/backend/libpq/pqcomm.c
src/backend/port/win32/socket.c
src/include/libpq/libpq.h

index 8535066..b2454b4 100644 (file)
@@ -56,6 +56,8 @@
  *             pq_putbytes             - send bytes to connection (not flushed until pq_flush)
  *             pq_flush                - flush pending output
  *             pq_getbyte_if_available - get a byte if available without blocking
+ *             pq_putbytes_if_writable - send bytes to connection if writable without blocking
+ *             pq_flush_if_writable    - flush pending output if writable without blocking
  *
  * message-level I/O (and old-style-COPY-OUT cruft):
  *             pq_putmessage   - send a normal message (suppressed in COPY OUT mode)
@@ -112,6 +114,7 @@ static char sock_path[MAXPGPATH];
 
 static char PqSendBuffer[PQ_BUFFER_SIZE];
 static int     PqSendPointer;          /* Next index to store a byte in PqSendBuffer */
+static int     PqSendStart;            /* Next index to send a byte in PqSendBuffer */
 
 static char PqRecvBuffer[PQ_BUFFER_SIZE];
 static int     PqRecvPointer;          /* Next index to read a byte from PqRecvBuffer */
@@ -1153,6 +1156,56 @@ internal_putbytes(const char *s, size_t len)
 }
 
 /* --------------------------------
+ *             pq_putbytes_if_writable - send bytes to connection (not flushed
+ *                     until pq_flush), if writable
+ *
+ * Returns the number of bytes written without blocking, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_putbytes_if_writable(const char *s, size_t len)
+{
+       size_t          amount;
+       size_t          nwritten = 0;
+
+       /* Should not be called by old-style COPY OUT */
+       Assert(!DoingCopyOut);
+       /* No-op if reentrant call */
+       if (PqCommBusy)
+               return 0;
+       PqCommBusy = true;
+
+       while (len > 0)
+       {
+               /* If buffer is full, then flush it out */
+               if (PqSendPointer >= PQ_BUFFER_SIZE)
+               {
+                       int             r;
+
+                       r = pq_flush_if_writable();
+                       if (r == 0)
+                               break;
+                       if (r == EOF)
+                       {
+                               PqCommBusy = false;
+                               return r;
+                       }
+               }
+               amount = PQ_BUFFER_SIZE - PqSendPointer;
+               if (amount > len)
+                       amount = len;
+               memcpy(PqSendBuffer + PqSendPointer, s, amount);
+               PqSendPointer += amount;
+               s += amount;
+               len -= amount;
+               nwritten += amount;
+       }
+
+       PqCommBusy = false;
+       return (int) nwritten;
+}
+
+/* --------------------------------
  *             pq_flush                - flush pending output
  *
  *             returns 0 if OK, EOF if trouble
@@ -1224,6 +1277,120 @@ internal_flush(void)
        return 0;
 }
 
+/* --------------------------------
+ *             pq_flush_if_writable - flush pending output if writable
+ *
+ * Returns 1 if OK, 0 if pending output cannot be written without blocking,
+ * or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+       static int      last_reported_send_errno = 0;
+
+       char       *bufptr = PqSendBuffer + PqSendStart;
+       char       *bufend = PqSendBuffer + PqSendPointer;
+
+       while (bufptr < bufend)
+       {
+               int                     r;
+
+               /* Temporarily put the socket into non-blocking mode */
+#ifdef WIN32
+               pgwin32_noblock = 1;
+#else
+               if (!pg_set_noblock(MyProcPort->sock))
+                       ereport(ERROR,
+                                       (errmsg("could not set socket to non-blocking mode: %m")));
+#endif
+               MyProcPort->noblock = true;
+               PG_TRY();
+               {
+                       r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+
+                       if (r < 0)
+                       {
+                               /*
+                                * Ok if no data writable without blocking or interrupted (though
+                                * EINTR really shouldn't happen with a non-blocking socket).
+                                * Report other errors.
+                                */
+                               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+                                       r = 0;
+                               else
+                               {
+                                       if (errno != last_reported_send_errno)
+                                       {
+                                               /*
+                                                * Careful: an ereport() that tries to write to the
+                                                * client would cause recursion to here, leading to
+                                                * stack overflow and core dump!  This message must
+                                                * go *only* to the postmaster log.
+                                                *
+                                                * If a client disconnects while we're in the midst
+                                                * of output, we might write quite a bit of data before
+                                                * we get to a safe query abort point.  So, suppress
+                                                * duplicate log messages.
+                                                */
+                                               last_reported_send_errno = errno;
+                                               ereport(COMMERROR,
+                                                               (errcode_for_socket_access(),
+                                                                errmsg("could not send data to client: %m")));
+                                       }
+
+                                       /*
+                                        * We drop the buffered data anyway so that processing can
+                                        * continue, even though we'll probably quit soon.
+                                        */
+                                       PqSendStart = PqSendPointer = 0;
+                                       r = EOF;
+                               }
+                       }
+                       else if (r == 0)
+                       {
+                               /* EOF detected */
+                               r = EOF;
+                       }
+               }
+               PG_CATCH();
+               {
+                       /*
+                        * The rest of the backend code assumes the socket is in blocking
+                        * mode, so treat failure as FATAL.
+                        */
+#ifdef WIN32
+                       pgwin32_noblock = 0;
+#else
+                       if (!pg_set_block(MyProcPort->sock))
+                               ereport(FATAL,
+                                               (errmsg("could not set socket to blocking mode: %m")));
+#endif
+                       MyProcPort->noblock = false;
+                       PG_RE_THROW();
+               }
+               PG_END_TRY();
+#ifdef WIN32
+               pgwin32_noblock = 0;
+#else
+               if (!pg_set_block(MyProcPort->sock))
+                       ereport(FATAL,
+                                       (errmsg("could not set socket to blocking mode: %m")));
+#endif
+               MyProcPort->noblock = false;
+
+               if (r == 0 || r == EOF)
+                       return r;
+
+               last_reported_send_errno = 0;   /* reset after any successful send */
+               bufptr += r;
+               PqSendStart += r;
+       }
+
+       PqSendStart = PqSendPointer = 0;
+       return 1;
+}
+
 
 /* --------------------------------
  * Message-level I/O routines begin here.
index 6c7d327..db92c47 100644 (file)
@@ -14,7 +14,8 @@
 #include "postgres.h"
 
 /*
- * Indicate if pgwin32_recv() should operate in non-blocking mode.
+ * Indicate if pgwin32_recv() and pgwin32_send() should operate
+ * in non-blocking mode.
  *
  * Since the socket emulation layer always sets the actual socket to
  * non-blocking mode in order to be able to deliver signals, we must
@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
                        return -1;
                }
 
+               if (pgwin32_noblock)
+               {
+                       /*
+                        * No data sent, and we are in "emulated non-blocking mode", so
+                        * return indicating that we'd block if we were to continue.
+                        */
+                       errno = EWOULDBLOCK;
+                       return -1;
+               }
+
                /* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
 
                if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
index 978d9a9..fb3277f 100644 (file)
@@ -59,7 +59,9 @@ extern int    pq_getbyte(void);
 extern int     pq_peekbyte(void);
 extern int     pq_getbyte_if_available(unsigned char *c);
 extern int     pq_putbytes(const char *s, size_t len);
+extern int     pq_putbytes_if_writable(const char *s, size_t len);
 extern int     pq_flush(void);
+extern int     pq_flush_if_writable(void);
 extern int     pq_putmessage(char msgtype, const char *s, size_t len);
 extern void pq_startcopyout(void);
 extern void pq_endcopyout(bool errorAbort);