From c81a2f301700d8a8356b2c3081903677626eb7f7 Mon Sep 17 00:00:00 2001 From: MasaoFujii Date: Wed, 8 Sep 2010 16:39:09 +0900 Subject: [PATCH] Add interface function 'libpqrcv_send' to allow walreceiver to send messages back to the master. --- .../replication/libpqwalreceiver/libpqwalreceiver.c | 19 ++++++++++++++++++- src/backend/replication/walreceiver.c | 3 ++- src/include/replication/walreceiver.h | 3 +++ src/interfaces/libpq/fe-exec.c | 6 +++++- 4 files changed, 28 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index e7581160cc..786bc43a51 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -50,6 +50,7 @@ static char *recvBuf = NULL; static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len); +static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ @@ -64,10 +65,11 @@ _PG_init(void) { /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_receive != NULL || - walrcv_disconnect != NULL) + walrcv_send != NULL || walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_receive = libpqrcv_receive; + walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; } @@ -398,3 +400,18 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) return true; } + +/* + * Send a message to XLOG stream. + * + * ereports on error. + */ +static void +libpqrcv_send(const char *buffer, int nbytes) +{ + if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || + PQflush(streamConn)) + ereport(ERROR, + (errmsg("could not send data to WAL stream: %s", + PQerrorMessage(streamConn)))); +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ca5fddaee3..f61db0425a 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -57,6 +57,7 @@ bool am_walreceiver; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; walrcv_receive_type walrcv_receive = NULL; +walrcv_send_type walrcv_send = NULL; walrcv_disconnect_type walrcv_disconnect = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -247,7 +248,7 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_disconnect == NULL) + walrcv_send == NULL || walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 53b2bb3abd..e8cae3cb8b 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -84,6 +84,9 @@ typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type, char **buffer, int *len); extern PGDLLIMPORT walrcv_receive_type walrcv_receive; +typedef void (*walrcv_send_type) (const char *buffer, int nbytes); +extern PGDLLIMPORT walrcv_send_type walrcv_send; + typedef void (*walrcv_disconnect_type) (void); extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b20587f0e4..3171836a0e 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -2002,6 +2002,9 @@ PQnotifies(PGconn *conn) /* * PQputCopyData - send some data to the backend during COPY IN * + * This function can be called by walreceiver even during COPY OUT + * to send a message to the master. + * * Returns 1 if successful, 0 if data could not be sent (only possible * in nonblock mode), or -1 if an error occurs. */ @@ -2010,7 +2013,8 @@ PQputCopyData(PGconn *conn, const char *buffer, int nbytes) { if (!conn) return -1; - if (conn->asyncStatus != PGASYNC_COPY_IN) + if (conn->asyncStatus != PGASYNC_COPY_IN && + conn->asyncStatus != PGASYNC_COPY_OUT) { printfPQExpBuffer(&conn->errorMessage, libpq_gettext("no COPY in progress\n")); -- 2.11.0