int replication_timeout = 0; /* maximum time to wait for the Ack from the standby */
/*
+ * Buffer for WAL sending
+ *
+ * WalSndOutBuffer is a work area in which the output message is constructed.
+ * It's used in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ */
+static char *WalSndOutBuffer;
+static int WalSndOutHead; /* head of pending output */
+static int WalSndOutTail; /* tail of pending output */
+
+/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
* but for walsender to read the XLOG.
*/
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(char *msgbuf, bool *caughtup);
+static bool XLogSend(bool *caughtup, bool *pending);
static void ProcessStreamMsgs(StringInfo inMsg);
static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
default:
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby closing message type %d",
+ errmsg("invalid standby message type %d",
firstchar)));
}
}
WalSndLoop(void)
{
StringInfoData input_message;
- char *output_message;
bool caughtup = false;
+ bool pending = false;
initStringInfo(&input_message);
* just once to reduce palloc overhead. The buffer must be made large
* enough for maximum-sized messages.
*/
- output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+ WalSndOutBuffer = palloc(6 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
+ WalSndOutHead = WalSndOutTail = 0;
/* Loop forever, unless we get an error */
for (;;)
*/
if (ready_to_stop)
{
- if (!XLogSend(output_message, &caughtup))
+ if (!XLogSend(&caughtup, &pending))
break;
- if (caughtup)
+ if (caughtup && !pending)
shutdown_requested = true;
}
}
/*
- * If we had sent all accumulated WAL in last round, nap for the
- * configured time before retrying.
+ * If we had sent all accumulated WAL in last round or could not
+ * flush pending WAL in output buffer because the socket was not
+ * writable, nap for the configured time before retrying.
*/
- if (caughtup)
+ if (caughtup || pending)
{
/*
* Even if we wrote all the WAL that was available when we started
*/
ResetLatch(&MyWalSnd->latch);
- if (!XLogSend(output_message, &caughtup))
+ if (!XLogSend(&caughtup, &pending))
break;
- if (caughtup && !got_SIGHUP && !ready_to_stop && !shutdown_requested)
+ if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+ !shutdown_requested)
{
+ bool check_timeout;
+ long sleeptime;
+ int res;
+
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
+ /*
+ * Check for replication timeout if it's enabled and we need
+ * to wait until the socket has become writable to flush
+ * pending WAL in output buffer or until the Ack message
+ * from the standby has become available.
+ */
+ if (replication_timeout > 0 &&
+ (pending ||
+ (rplMode != REPLICATION_MODE_ASYNC &&
+ XLByteLT(ackdPtr, sentPtr))))
+ {
+ sleeptime = replication_timeout;
+ check_timeout = true;
+ }
+ else
+ {
+ sleeptime = WalSndDelay;
+ check_timeout = false;
+ }
+
/* Sleep */
- WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
- WalSndDelay * 1000L);
+ res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+ true, (WalSndOutTail > 0),
+ sleeptime * 1000L);
+
+ if (res == 0 && check_timeout)
+ {
+ /*
+ * Since typically expiration of replication timeout means
+ * communication problem, we don't send the error message
+ * to the standby.
+ */
+ ereport(COMMERROR,
+ (errmsg("terminating walsender process due to replication timeout")));
+ break;
+ }
}
/* Process messages received from the standby */
else
{
/* Attempt to send the log once every loop */
- if (!XLogSend(output_message, &caughtup))
+ if (!XLogSend(&caughtup, &pending))
break;
}
}
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and send it.
*
- * msgbuf is a work area in which the output message is constructed. It's
- * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
- * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
- *
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
* Returns true if OK, false if trouble.
*/
static bool
-XLogSend(char *msgbuf, bool *caughtup)
+XLogSend(bool *caughtup, bool *pending)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
- XLogRecPtr endptr;
+ static XLogRecPtr endptr;
Size nbytes;
+ uint32 n32;
+ int res;
WalDataMessageHeader msghdr;
+ /* Attempt to flush pending WAL in output buffer */
+ if (*pending)
+ {
+ if (WalSndOutHead != WalSndOutTail)
+ {
+ res = pq_putbytes_if_writable(WalSndOutBuffer + WalSndOutHead,
+ WalSndOutTail - WalSndOutHead);
+ if (res == EOF)
+ return false;
+ WalSndOutHead += res;
+ if (WalSndOutHead != WalSndOutTail)
+ return true;
+ }
+
+ res = pq_flush_if_writable();
+ if (res == EOF)
+ return false;
+ if (res == 0)
+ return true;
+
+ goto updt;
+ }
+
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
/*
* OK to read and send the slice.
*/
- msgbuf[0] = 'w';
+ WalSndOutBuffer[0] = 'd';
+ WalSndOutBuffer[5] = 'w';
+ WalSndOutHead = 0;
+ WalSndOutTail = 6 + sizeof(WalDataMessageHeader) + nbytes;
+
+ n32 = htonl((uint32) WalSndOutTail - 1);
+ memcpy(WalSndOutBuffer + 1, &n32, 4);
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
- XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+ XLogRead(WalSndOutBuffer + 6 + sizeof(WalDataMessageHeader), startptr, nbytes);
/*
* We fill the message header last so that the send timestamp is taken as
msghdr.walEnd = SendRqstPtr;
msghdr.sendTime = GetCurrentTimestamp();
- memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+ memcpy(WalSndOutBuffer + 6, &msghdr, sizeof(WalDataMessageHeader));
- pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+ res = pq_putbytes_if_writable(WalSndOutBuffer, WalSndOutTail);
+ if (res == EOF)
+ return false;
+
+ WalSndOutHead = res;
+ if (WalSndOutHead != WalSndOutTail)
+ {
+ *caughtup = false;
+ *pending = true;
+ return true;
+ }
/* Flush pending output to the client */
- if (pq_flush())
+ res = pq_flush_if_writable();
+ if (res == EOF)
return false;
+ if (res == 0)
+ {
+ *caughtup = false;
+ *pending = true;
+ return true;
+ }
+
+updt:
+ WalSndOutHead = WalSndOutTail = 0;
+ *pending = false;
sentPtr = endptr;