OSDN Git Service

Allow bidirectional copy messages in streaming replication mode.
[pg-rex/syncrep.git] / src / backend / replication / libpqwalreceiver / libpqwalreceiver.c
1 /*-------------------------------------------------------------------------
2  *
3  * libpqwalreceiver.c
4  *
5  * This file contains the libpq-specific parts of walreceiver. It's
6  * loaded as a dynamic module to avoid linking the main server binary with
7  * libpq.
8  *
9  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
10  *
11  *
12  * IDENTIFICATION
13  *        $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.12 2010/07/06 19:18:57 momjian Exp $
14  *
15  *-------------------------------------------------------------------------
16  */
17 #include "postgres.h"
18
19 #include <unistd.h>
20 #include <sys/time.h>
21
22 #include "libpq-fe.h"
23 #include "access/xlog.h"
24 #include "miscadmin.h"
25 #include "replication/walreceiver.h"
26 #include "utils/builtins.h"
27
28 #ifdef HAVE_POLL_H
29 #include <poll.h>
30 #endif
31 #ifdef HAVE_SYS_POLL_H
32 #include <sys/poll.h>
33 #endif
34 #ifdef HAVE_SYS_SELECT_H
35 #include <sys/select.h>
36 #endif
37
38 PG_MODULE_MAGIC;
39
40 void            _PG_init(void);
41
42 /* Current connection to the primary, if any */
43 static PGconn *streamConn = NULL;
44 static bool justconnected = false;
45
46 /* Buffer for currently read records */
47 static char *recvBuf = NULL;
48
49 /* Prototypes for interface functions */
50 static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
51 static bool libpqrcv_receive(int timeout, unsigned char *type,
52                                  char **buffer, int *len);
53 static void libpqrcv_send(const char *buffer, int nbytes);
54 static void libpqrcv_disconnect(void);
55
56 /* Prototypes for private functions */
57 static bool libpq_select(int timeout_ms);
58 static PGresult *libpqrcv_PQexec(const char *query);
59
60 /*
61  * Module load callback
62  */
63 void
64 _PG_init(void)
65 {
66         /* Tell walreceiver how to reach us */
67         if (walrcv_connect != NULL || walrcv_receive != NULL ||
68                 walrcv_send != NULL || walrcv_disconnect != NULL)
69                 elog(ERROR, "libpqwalreceiver already loaded");
70         walrcv_connect = libpqrcv_connect;
71         walrcv_receive = libpqrcv_receive;
72         walrcv_send = libpqrcv_send;
73         walrcv_disconnect = libpqrcv_disconnect;
74 }
75
76 /*
77  * Establish the connection to the primary server for XLOG streaming
78  */
79 static bool
80 libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
81 {
82         char            conninfo_repl[MAXCONNINFO + 37];
83         char       *primary_sysid;
84         char            standby_sysid[32];
85         TimeLineID      primary_tli;
86         TimeLineID      standby_tli;
87         PGresult   *res;
88         char            cmd[64];
89
90         /*
91          * Connect using deliberately undocumented parameter: replication. The
92          * database name is ignored by the server in replication mode, but specify
93          * "replication" for .pgpass lookup.
94          */
95         snprintf(conninfo_repl, sizeof(conninfo_repl),
96                          "%s dbname=replication replication=true",
97                          conninfo);
98
99         streamConn = PQconnectdb(conninfo_repl);
100         if (PQstatus(streamConn) != CONNECTION_OK)
101                 ereport(ERROR,
102                                 (errmsg("could not connect to the primary server: %s",
103                                                 PQerrorMessage(streamConn))));
104
105         /*
106          * Get the system identifier and timeline ID as a DataRow message from the
107          * primary server.
108          */
109         res = libpqrcv_PQexec("IDENTIFY_SYSTEM");
110         if (PQresultStatus(res) != PGRES_TUPLES_OK)
111         {
112                 PQclear(res);
113                 ereport(ERROR,
114                                 (errmsg("could not receive database system identifier and timeline ID from "
115                                                 "the primary server: %s",
116                                                 PQerrorMessage(streamConn))));
117         }
118         if (PQnfields(res) != 2 || PQntuples(res) != 1)
119         {
120                 int                     ntuples = PQntuples(res);
121                 int                     nfields = PQnfields(res);
122
123                 PQclear(res);
124                 ereport(ERROR,
125                                 (errmsg("invalid response from primary server"),
126                                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
127                                                    ntuples, nfields)));
128         }
129         primary_sysid = PQgetvalue(res, 0, 0);
130         primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
131
132         /*
133          * Confirm that the system identifier of the primary is the same as ours.
134          */
135         snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
136                          GetSystemIdentifier());
137         if (strcmp(primary_sysid, standby_sysid) != 0)
138         {
139                 PQclear(res);
140                 ereport(ERROR,
141                                 (errmsg("database system identifier differs between the primary and standby"),
142                                  errdetail("The primary's identifier is %s, the standby's identifier is %s.",
143                                                    primary_sysid, standby_sysid)));
144         }
145
146         /*
147          * Confirm that the current timeline of the primary is the same as the
148          * recovery target timeline.
149          */
150         standby_tli = GetRecoveryTargetTLI();
151         PQclear(res);
152         if (primary_tli != standby_tli)
153                 ereport(ERROR,
154                                 (errmsg("timeline %u of the primary does not match recovery target timeline %u",
155                                                 primary_tli, standby_tli)));
156         ThisTimeLineID = primary_tli;
157
158         /* Start streaming from the point requested by startup process */
159         snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
160                          startpoint.xlogid, startpoint.xrecoff);
161         res = libpqrcv_PQexec(cmd);
162         if (PQresultStatus(res) != PGRES_COPY_BOTH)
163         {
164                 PQclear(res);
165                 ereport(ERROR,
166                                 (errmsg("could not start WAL streaming: %s",
167                                                 PQerrorMessage(streamConn))));
168         }
169         PQclear(res);
170
171         justconnected = true;
172         ereport(LOG,
173                 (errmsg("streaming replication successfully connected to primary")));
174
175         return true;
176 }
177
178 /*
179  * Wait until we can read WAL stream, or timeout.
180  *
181  * Returns true if data has become available for reading, false if timed out
182  * or interrupted by signal.
183  *
184  * This is based on pqSocketCheck.
185  */
186 static bool
187 libpq_select(int timeout_ms)
188 {
189         int                     ret;
190
191         Assert(streamConn != NULL);
192         if (PQsocket(streamConn) < 0)
193                 ereport(ERROR,
194                                 (errcode_for_socket_access(),
195                                  errmsg("socket not open")));
196
197         /* We use poll(2) if available, otherwise select(2) */
198         {
199 #ifdef HAVE_POLL
200                 struct pollfd input_fd;
201
202                 input_fd.fd = PQsocket(streamConn);
203                 input_fd.events = POLLIN | POLLERR;
204                 input_fd.revents = 0;
205
206                 ret = poll(&input_fd, 1, timeout_ms);
207 #else                                                   /* !HAVE_POLL */
208
209                 fd_set          input_mask;
210                 struct timeval timeout;
211                 struct timeval *ptr_timeout;
212
213                 FD_ZERO(&input_mask);
214                 FD_SET(PQsocket(streamConn), &input_mask);
215
216                 if (timeout_ms < 0)
217                         ptr_timeout = NULL;
218                 else
219                 {
220                         timeout.tv_sec = timeout_ms / 1000;
221                         timeout.tv_usec = (timeout_ms % 1000) * 1000;
222                         ptr_timeout = &timeout;
223                 }
224
225                 ret = select(PQsocket(streamConn) + 1, &input_mask,
226                                          NULL, NULL, ptr_timeout);
227 #endif   /* HAVE_POLL */
228         }
229
230         if (ret == 0 || (ret < 0 && errno == EINTR))
231                 return false;
232         if (ret < 0)
233                 ereport(ERROR,
234                                 (errcode_for_socket_access(),
235                                  errmsg("select() failed: %m")));
236         return true;
237 }
238
239 /*
240  * Send a query and wait for the results by using the asynchronous libpq
241  * functions and the backend version of select().
242  *
243  * We must not use the regular blocking libpq functions like PQexec()
244  * since they are uninterruptible by signals on some platforms, such as
245  * Windows.
246  *
247  * We must also not use vanilla select() here since it cannot handle the
248  * signal emulation layer on Windows.
249  *
250  * The function is modeled on PQexec() in libpq, but only implements
251  * those parts that are in use in the walreceiver.
252  *
253  * Queries are always executed on the connection in streamConn.
254  */
255 static PGresult *
256 libpqrcv_PQexec(const char *query)
257 {
258         PGresult   *result = NULL;
259         PGresult   *lastResult = NULL;
260
261         /*
262          * PQexec() silently discards any prior query results on the connection.
263          * This is not required for walreceiver since it's expected that walsender
264          * won't generate any such junk results.
265          */
266
267         /*
268          * Submit a query. Since we don't use non-blocking mode, this also can
269          * block. But its risk is relatively small, so we ignore that for now.
270          */
271         if (!PQsendQuery(streamConn, query))
272                 return NULL;
273
274         for (;;)
275         {
276                 /*
277                  * Receive data until PQgetResult is ready to get the result without
278                  * blocking.
279                  */
280                 while (PQisBusy(streamConn))
281                 {
282                         /*
283                          * We don't need to break down the sleep into smaller increments,
284                          * and check for interrupts after each nap, since we can just
285                          * elog(FATAL) within SIGTERM signal handler if the signal arrives
286                          * in the middle of establishment of replication connection.
287                          */
288                         if (!libpq_select(-1))
289                                 continue;               /* interrupted */
290                         if (PQconsumeInput(streamConn) == 0)
291                                 return NULL;    /* trouble */
292                 }
293
294                 /*
295                  * Emulate the PQexec()'s behavior of returning the last result when
296                  * there are many. Since walsender will never generate multiple
297                  * results, we skip the concatenation of error messages.
298                  */
299                 result = PQgetResult(streamConn);
300                 if (result == NULL)
301                         break;                          /* query is complete */
302
303                 PQclear(lastResult);
304                 lastResult = result;
305
306                 if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
307                         PQresultStatus(lastResult) == PGRES_COPY_OUT ||
308                         PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
309                         PQstatus(streamConn) == CONNECTION_BAD)
310                         break;
311         }
312
313         return lastResult;
314 }
315
316 /*
317  * Disconnect connection to primary, if any.
318  */
319 static void
320 libpqrcv_disconnect(void)
321 {
322         PQfinish(streamConn);
323         streamConn = NULL;
324         justconnected = false;
325 }
326
327 /*
328  * Receive a message available from XLOG stream, blocking for
329  * maximum of 'timeout' ms.
330  *
331  * Returns:
332  *
333  *       True if data was received. *type, *buffer and *len are set to
334  *       the type of the received data, buffer holding it, and length,
335  *       respectively.
336  *
337  *       False if no data was available within timeout, or wait was interrupted
338  *       by signal.
339  *
340  * The buffer returned is only valid until the next call of this function or
341  * libpq_connect/disconnect.
342  *
343  * ereports on error.
344  */
345 static bool
346 libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
347 {
348         int                     rawlen;
349
350         if (recvBuf != NULL)
351                 PQfreemem(recvBuf);
352         recvBuf = NULL;
353
354         /*
355          * If the caller requested to block, wait for data to arrive. But if this
356          * is the first call after connecting, don't wait, because there might
357          * already be some data in libpq buffer that we haven't returned to
358          * caller.
359          */
360         if (timeout > 0 && !justconnected)
361         {
362                 if (!libpq_select(timeout))
363                         return false;
364
365                 if (PQconsumeInput(streamConn) == 0)
366                         ereport(ERROR,
367                                         (errmsg("could not receive data from WAL stream: %s",
368                                                         PQerrorMessage(streamConn))));
369         }
370         justconnected = false;
371
372         /* Receive CopyData message */
373         rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
374         if (rawlen == 0)                        /* no data available yet, then return */
375                 return false;
376         if (rawlen == -1)                       /* end-of-streaming or error */
377         {
378                 PGresult   *res;
379
380                 res = PQgetResult(streamConn);
381                 if (PQresultStatus(res) == PGRES_COMMAND_OK)
382                 {
383                         PQclear(res);
384                         ereport(ERROR,
385                                         (errmsg("replication terminated by primary server")));
386                 }
387                 PQclear(res);
388                 ereport(ERROR,
389                                 (errmsg("could not receive data from WAL stream: %s",
390                                                 PQerrorMessage(streamConn))));
391         }
392         if (rawlen < -1)
393                 ereport(ERROR,
394                                 (errmsg("could not receive data from WAL stream: %s",
395                                                 PQerrorMessage(streamConn))));
396
397         /* Return received messages to caller */
398         *type = *((unsigned char *) recvBuf);
399         *buffer = recvBuf + sizeof(*type);
400         *len = rawlen - sizeof(*type);
401
402         return true;
403 }
404
405 /*
406  * Send a message to XLOG stream.
407  *
408  * ereports on error.
409  */
410 static void
411 libpqrcv_send(const char *buffer, int nbytes)
412 {
413         if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
414                 PQflush(streamConn))
415                 ereport(ERROR,
416                                 (errmsg("could not send data to WAL stream: %s",
417                                                 PQerrorMessage(streamConn))));
418 }