* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
* pq_getbyte_if_available - get a byte if available without blocking
+ * pq_putbytes_if_writable - send bytes to connection if writable without blocking
+ * pq_flush_if_writable - flush pending output if writable without blocking
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
static char PqSendBuffer[PQ_BUFFER_SIZE];
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
+static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
static char PqRecvBuffer[PQ_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
}
/* --------------------------------
+ * pq_putbytes_if_writable - send bytes to connection (not flushed
+ * until pq_flush), if writable
+ *
+ * Returns the number of bytes written without blocking, or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_putbytes_if_writable(const char *s, size_t len)
+{
+ size_t amount;
+ size_t nwritten = 0;
+
+ /* Should not be called by old-style COPY OUT */
+ Assert(!DoingCopyOut);
+ /* No-op if reentrant call */
+ if (PqCommBusy)
+ return 0;
+ PqCommBusy = true;
+
+ while (len > 0)
+ {
+ /* If buffer is full, then flush it out */
+ if (PqSendPointer >= PQ_BUFFER_SIZE)
+ {
+ int r;
+
+ r = pq_flush_if_writable();
+ if (r == 0)
+ break;
+ if (r == EOF)
+ {
+ PqCommBusy = false;
+ return r;
+ }
+ }
+ amount = PQ_BUFFER_SIZE - PqSendPointer;
+ if (amount > len)
+ amount = len;
+ memcpy(PqSendBuffer + PqSendPointer, s, amount);
+ PqSendPointer += amount;
+ s += amount;
+ len -= amount;
+ nwritten += amount;
+ }
+
+ PqCommBusy = false;
+ return (int) nwritten;
+}
+
+/* --------------------------------
* pq_flush - flush pending output
*
* returns 0 if OK, EOF if trouble
return 0;
}
+/* --------------------------------
+ * pq_flush_if_writable - flush pending output if writable
+ *
+ * Returns 1 if OK, 0 if pending output cannot be written without blocking,
+ * or EOF if trouble.
+ * --------------------------------
+ */
+int
+pq_flush_if_writable(void)
+{
+ static int last_reported_send_errno = 0;
+
+ char *bufptr = PqSendBuffer + PqSendStart;
+ char *bufend = PqSendBuffer + PqSendPointer;
+
+ while (bufptr < bufend)
+ {
+ int r;
+
+ /* Temporarily put the socket into non-blocking mode */
+#ifdef WIN32
+ pgwin32_noblock = 1;
+#else
+ if (!pg_set_noblock(MyProcPort->sock))
+ ereport(ERROR,
+ (errmsg("could not set socket to non-blocking mode: %m")));
+#endif
+ MyProcPort->noblock = true;
+ PG_TRY();
+ {
+ r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+
+ if (r < 0)
+ {
+ /*
+ * Ok if no data writable without blocking or interrupted (though
+ * EINTR really shouldn't happen with a non-blocking socket).
+ * Report other errors.
+ */
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
+ r = 0;
+ else
+ {
+ if (errno != last_reported_send_errno)
+ {
+ /*
+ * Careful: an ereport() that tries to write to the
+ * client would cause recursion to here, leading to
+ * stack overflow and core dump! This message must
+ * go *only* to the postmaster log.
+ *
+ * If a client disconnects while we're in the midst
+ * of output, we might write quite a bit of data before
+ * we get to a safe query abort point. So, suppress
+ * duplicate log messages.
+ */
+ last_reported_send_errno = errno;
+ ereport(COMMERROR,
+ (errcode_for_socket_access(),
+ errmsg("could not send data to client: %m")));
+ }
+
+ /*
+ * We drop the buffered data anyway so that processing can
+ * continue, even though we'll probably quit soon.
+ */
+ PqSendStart = PqSendPointer = 0;
+ r = EOF;
+ }
+ }
+ else if (r == 0)
+ {
+ /* EOF detected */
+ r = EOF;
+ }
+ }
+ PG_CATCH();
+ {
+ /*
+ * The rest of the backend code assumes the socket is in blocking
+ * mode, so treat failure as FATAL.
+ */
+#ifdef WIN32
+ pgwin32_noblock = 0;
+#else
+ if (!pg_set_block(MyProcPort->sock))
+ ereport(FATAL,
+ (errmsg("could not set socket to blocking mode: %m")));
+#endif
+ MyProcPort->noblock = false;
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+#ifdef WIN32
+ pgwin32_noblock = 0;
+#else
+ if (!pg_set_block(MyProcPort->sock))
+ ereport(FATAL,
+ (errmsg("could not set socket to blocking mode: %m")));
+#endif
+ MyProcPort->noblock = false;
+
+ if (r == 0 || r == EOF)
+ return r;
+
+ last_reported_send_errno = 0; /* reset after any successful send */
+ bufptr += r;
+ PqSendStart += r;
+ }
+
+ PqSendStart = PqSendPointer = 0;
+ return 1;
+}
+
/* --------------------------------
* Message-level I/O routines begin here.