1 /*-------------------------------------------------------------------------
5 * This file contains the libpq-specific parts of walreceiver. It's
6 * loaded as a dynamic module to avoid linking the main server binary with
9 * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
13 * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.12 2010/07/06 19:18:57 momjian Exp $
15 *-------------------------------------------------------------------------
23 #include "access/xlog.h"
24 #include "miscadmin.h"
25 #include "replication/walreceiver.h"
26 #include "utils/builtins.h"
31 #ifdef HAVE_SYS_POLL_H
34 #ifdef HAVE_SYS_SELECT_H
35 #include <sys/select.h>
42 /* Current connection to the primary, if any */
43 static PGconn *streamConn = NULL;
44 static bool justconnected = false;
46 /* Buffer for currently read records */
47 static char *recvBuf = NULL;
49 /* Prototypes for interface functions */
50 static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
51 static bool libpqrcv_receive(int timeout, unsigned char *type,
52 char **buffer, int *len);
53 static void libpqrcv_send(const char *buffer, int nbytes);
54 static void libpqrcv_disconnect(void);
56 /* Prototypes for private functions */
57 static bool libpq_select(int timeout_ms);
58 static PGresult *libpqrcv_PQexec(const char *query);
61 * Module load callback
66 /* Tell walreceiver how to reach us */
67 if (walrcv_connect != NULL || walrcv_receive != NULL ||
68 walrcv_send != NULL || walrcv_disconnect != NULL)
69 elog(ERROR, "libpqwalreceiver already loaded");
70 walrcv_connect = libpqrcv_connect;
71 walrcv_receive = libpqrcv_receive;
72 walrcv_send = libpqrcv_send;
73 walrcv_disconnect = libpqrcv_disconnect;
77 * Establish the connection to the primary server for XLOG streaming
80 libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
82 char conninfo_repl[MAXCONNINFO + 37];
84 char standby_sysid[32];
85 TimeLineID primary_tli;
86 TimeLineID standby_tli;
91 * Connect using deliberately undocumented parameter: replication. The
92 * database name is ignored by the server in replication mode, but specify
93 * "replication" for .pgpass lookup.
95 snprintf(conninfo_repl, sizeof(conninfo_repl),
96 "%s dbname=replication replication=true",
99 streamConn = PQconnectdb(conninfo_repl);
100 if (PQstatus(streamConn) != CONNECTION_OK)
102 (errmsg("could not connect to the primary server: %s",
103 PQerrorMessage(streamConn))));
106 * Get the system identifier and timeline ID as a DataRow message from the
109 res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
110 if (PQresultStatus(res) != PGRES_TUPLES_OK)
114 (errmsg("could not receive database system identifier and timeline ID from "
115 "the primary server: %s",
116 PQerrorMessage(streamConn))));
118 if (PQnfields(res) != 2 || PQntuples(res) != 1)
120 int ntuples = PQntuples(res);
121 int nfields = PQnfields(res);
125 (errmsg("invalid response from primary server"),
126 errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
129 primary_sysid = PQgetvalue(res, 0, 0);
130 primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
133 * Confirm that the system identifier of the primary is the same as ours.
135 snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
136 GetSystemIdentifier());
137 if (strcmp(primary_sysid, standby_sysid) != 0)
141 (errmsg("database system identifier differs between the primary and standby"),
142 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
143 primary_sysid, standby_sysid)));
147 * Confirm that the current timeline of the primary is the same as the
148 * recovery target timeline.
150 standby_tli = GetRecoveryTargetTLI();
152 if (primary_tli != standby_tli)
154 (errmsg("timeline %u of the primary does not match recovery target timeline %u",
155 primary_tli, standby_tli)));
156 ThisTimeLineID = primary_tli;
158 /* Start streaming from the point requested by startup process */
159 snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
160 startpoint.xlogid, startpoint.xrecoff);
161 res = libpqrcv_PQexec(cmd);
162 if (PQresultStatus(res) != PGRES_COPY_BOTH)
166 (errmsg("could not start WAL streaming: %s",
167 PQerrorMessage(streamConn))));
171 justconnected = true;
173 (errmsg("streaming replication successfully connected to primary")));
179 * Wait until we can read WAL stream, or timeout.
181 * Returns true if data has become available for reading, false if timed out
182 * or interrupted by signal.
184 * This is based on pqSocketCheck.
187 libpq_select(int timeout_ms)
191 Assert(streamConn != NULL);
192 if (PQsocket(streamConn) < 0)
194 (errcode_for_socket_access(),
195 errmsg("socket not open")));
197 /* We use poll(2) if available, otherwise select(2) */
200 struct pollfd input_fd;
202 input_fd.fd = PQsocket(streamConn);
203 input_fd.events = POLLIN | POLLERR;
204 input_fd.revents = 0;
206 ret = poll(&input_fd, 1, timeout_ms);
207 #else /* !HAVE_POLL */
210 struct timeval timeout;
211 struct timeval *ptr_timeout;
213 FD_ZERO(&input_mask);
214 FD_SET(PQsocket(streamConn), &input_mask);
220 timeout.tv_sec = timeout_ms / 1000;
221 timeout.tv_usec = (timeout_ms % 1000) * 1000;
222 ptr_timeout = &timeout;
225 ret = select(PQsocket(streamConn) + 1, &input_mask,
226 NULL, NULL, ptr_timeout);
227 #endif /* HAVE_POLL */
230 if (ret == 0 || (ret < 0 && errno == EINTR))
234 (errcode_for_socket_access(),
235 errmsg("select() failed: %m")));
240 * Send a query and wait for the results by using the asynchronous libpq
241 * functions and the backend version of select().
243 * We must not use the regular blocking libpq functions like PQexec()
244 * since they are uninterruptible by signals on some platforms, such as
247 * We must also not use vanilla select() here since it cannot handle the
248 * signal emulation layer on Windows.
250 * The function is modeled on PQexec() in libpq, but only implements
251 * those parts that are in use in the walreceiver.
253 * Queries are always executed on the connection in streamConn.
256 libpqrcv_PQexec(const char *query)
258 PGresult *result = NULL;
259 PGresult *lastResult = NULL;
262 * PQexec() silently discards any prior query results on the connection.
263 * This is not required for walreceiver since it's expected that walsender
264 * won't generate any such junk results.
268 * Submit a query. Since we don't use non-blocking mode, this also can
269 * block. But its risk is relatively small, so we ignore that for now.
271 if (!PQsendQuery(streamConn, query))
277 * Receive data until PQgetResult is ready to get the result without
280 while (PQisBusy(streamConn))
283 * We don't need to break down the sleep into smaller increments,
284 * and check for interrupts after each nap, since we can just
285 * elog(FATAL) within SIGTERM signal handler if the signal arrives
286 * in the middle of establishment of replication connection.
288 if (!libpq_select(-1))
289 continue; /* interrupted */
290 if (PQconsumeInput(streamConn) == 0)
291 return NULL; /* trouble */
295 * Emulate the PQexec()'s behavior of returning the last result when
296 * there are many. Since walsender will never generate multiple
297 * results, we skip the concatenation of error messages.
299 result = PQgetResult(streamConn);
301 break; /* query is complete */
306 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
307 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
308 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
309 PQstatus(streamConn) == CONNECTION_BAD)
317 * Disconnect connection to primary, if any.
320 libpqrcv_disconnect(void)
322 PQfinish(streamConn);
324 justconnected = false;
328 * Receive a message available from XLOG stream, blocking for
329 * maximum of 'timeout' ms.
333 * True if data was received. *type, *buffer and *len are set to
334 * the type of the received data, buffer holding it, and length,
337 * False if no data was available within timeout, or wait was interrupted
340 * The buffer returned is only valid until the next call of this function or
341 * libpq_connect/disconnect.
346 libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
355 * If the caller requested to block, wait for data to arrive. But if this
356 * is the first call after connecting, don't wait, because there might
357 * already be some data in libpq buffer that we haven't returned to
360 if (timeout > 0 && !justconnected)
362 if (!libpq_select(timeout))
365 if (PQconsumeInput(streamConn) == 0)
367 (errmsg("could not receive data from WAL stream: %s",
368 PQerrorMessage(streamConn))));
370 justconnected = false;
372 /* Receive CopyData message */
373 rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
374 if (rawlen == 0) /* no data available yet, then return */
376 if (rawlen == -1) /* end-of-streaming or error */
380 res = PQgetResult(streamConn);
381 if (PQresultStatus(res) == PGRES_COMMAND_OK)
385 (errmsg("replication terminated by primary server")));
389 (errmsg("could not receive data from WAL stream: %s",
390 PQerrorMessage(streamConn))));
394 (errmsg("could not receive data from WAL stream: %s",
395 PQerrorMessage(streamConn))));
397 /* Return received messages to caller */
398 *type = *((unsigned char *) recvBuf);
399 *buffer = recvBuf + sizeof(*type);
400 *len = rawlen - sizeof(*type);
406 * Send a message to XLOG stream.
411 libpqrcv_send(const char *buffer, int nbytes)
413 if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
416 (errmsg("could not send data to WAL stream: %s",
417 PQerrorMessage(streamConn))));