ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
- startpoint.xlogid, startpoint.xrecoff);
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X MODE %s",
+ startpoint.xlogid, startpoint.xrecoff,
+ GetConfigOption("replication_mode", false));
res = libpqrcv_PQexec(cmd);
- if (PQresultStatus(res) != PGRES_COPY_OUT)
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
PQclear(res);
ereport(ERROR,
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
- /* Send a CopyXLogResponse message, and start streaming */
+ /* Verify that the specified replication mode is valid */
+ {
+ const struct config_enum_entry *entry;
+
+ for (entry = replication_mode_options; entry && entry->name; entry++)
+ {
+ if (strcmp(modestr, entry->name) == 0)
+ {
+ mode = entry->val;
+ break;
+ }
+ }
+ if (entry == NULL || entry->name == NULL)
+ ereport(FATAL,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid replication mode: %s", modestr)));
+ }
+
+ /* Change the state according to replication mode specified by standby */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->walSndState = (mode == REPLICATION_MODE_ASYNC) ?
+ WALSND_ASYNC : WALSND_CATCHUP;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
+ /* Send a CopyBothResponse message, and start streaming */
pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);
pq_flush();