static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(bool *caughtup);
+static bool XLogSend(bool *caughtup, bool *pending);
static void ProcessStreamMsgs(StringInfo inMsg);
static void RegisterWalSndWaiter(BackendId backendId, XLogRecPtr record,
default:
ereport(FATAL,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby closing message type %d",
+ errmsg("invalid standby message type %d",
firstchar)));
}
}
{
StringInfoData input_message;
bool caughtup = false;
+ bool pending = false;
initStringInfo(&input_message);
*/
if (ready_to_stop)
{
- if (!XLogSend(&caughtup))
+ if (!XLogSend(&caughtup, &pending))
break;
- if (caughtup)
+ if (caughtup && !pending)
shutdown_requested = true;
}
}
/*
- * If we had sent all accumulated WAL in last round, nap for the
- * configured time before retrying.
+ * If we had sent all accumulated WAL in last round or could not
+ * flush pending WAL in output buffer because the socket was not
+ * writable, nap for the configured time before retrying.
*/
- if (caughtup || WalSndOutTail > 0)
+ if (caughtup || pending)
{
/*
* Even if we wrote all the WAL that was available when we started
*/
ResetLatch(&MyWalSnd->latch);
- if (!XLogSend(&caughtup))
+ if (!XLogSend(&caughtup, &pending))
break;
- if ((caughtup || WalSndOutTail > 0) && !got_SIGHUP &&
- !ready_to_stop && !shutdown_requested)
+ if ((caughtup || pending) && !got_SIGHUP && !ready_to_stop &&
+ !shutdown_requested)
{
+ bool check_timeout;
+ long sleeptime;
+ int res;
+
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
+ /*
+ * Check for replication timeout if it's enabled and we need
+ * to wait until the socket has become writable to flush
+ * pending WAL in output buffer or until the Ack message
+ * from the standby has become available.
+ */
+ if (replication_timeout > 0 &&
+ (pending ||
+ (rplMode != REPLICATION_MODE_ASYNC &&
+ XLByteLT(ackdPtr, sentPtr))))
+ {
+ sleeptime = replication_timeout;
+ check_timeout = true;
+ }
+ else
+ {
+ sleeptime = WalSndDelay;
+ check_timeout = false;
+ }
+
/* Sleep */
- WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
- true, (WalSndOutTail > 0),
- WalSndDelay * 1000L);
+ res = WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
+ true, (WalSndOutTail > 0),
+ sleeptime * 1000L);
+
+ if (res == 0 && check_timeout)
+ {
+ /*
+ * Since typically expiration of replication timeout means
+ * communication problem, we don't send the error message
+ * to the standby.
+ */
+ ereport(COMMERROR,
+ (errmsg("terminating walsender process due to replication timeout")));
+ break;
+ }
}
/* Process messages received from the standby */
else
{
/* Attempt to send the log once every loop */
- if (!XLogSend(&caughtup))
+ if (!XLogSend(&caughtup, &pending))
break;
}
}
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*
+ * If there is pending WAL in output buffer, *pending is set to true,
+ * otherwise *pending is set to false.
+ *
* Returns true if OK, false if trouble.
*/
static bool
-XLogSend(bool *caughtup)
+XLogSend(bool *caughtup, bool *pending)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
int res;
WalDataMessageHeader msghdr;
- /* Attempt to flush the unsent data in WalSndOutBuffer and PqSendBuffer */
- if (WalSndOutTail > 0)
+ /* Attempt to flush pending WAL in output buffer */
+ if (*pending)
{
if (WalSndOutHead != WalSndOutTail)
{
if (res == 0)
return true;
- WalSndOutHead = WalSndOutTail = 0;
goto updt;
}
if (WalSndOutHead != WalSndOutTail)
{
*caughtup = false;
+ *pending = true;
return true;
}
if (res == 0)
{
*caughtup = false;
+ *pending = true;
return true;
}
- WalSndOutHead = WalSndOutTail = 0;
updt:
+ WalSndOutHead = WalSndOutTail = 0;
+ *pending = false;
+
sentPtr = endptr;
/* Update shared memory status */