OSDN Git Service

Endeavor to make pgstats buffer process (a) safe and (b) useful.
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 5 Aug 2001 02:06:50 +0000 (02:06 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 5 Aug 2001 02:06:50 +0000 (02:06 +0000)
Make sure it exits immediately when collector process dies --- in old code,
buffer process would hang around and compete with the new buffer process
for packets.  Make sure it doesn't block on writing the pipe when the
collector falls more than a pipeload behind.  Avoid leaking pgstats FDs
into every backend.

src/backend/postmaster/pgstat.c
src/backend/postmaster/postmaster.c
src/include/miscadmin.h
src/include/pgstat.h

index 6c571aa..bac549a 100644 (file)
@@ -16,7 +16,7 @@
  *
  *     Copyright (c) 2001, PostgreSQL Global Development Group
  *
- *     $Header: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v 1.5 2001/08/04 00:14:43 tgl Exp $
+ *     $Header: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v 1.6 2001/08/05 02:06:50 tgl Exp $
  * ----------
  */
 #include "postgres.h"
@@ -66,7 +66,7 @@ bool  pgstat_collect_blocklevel               = false;
 static int                                     pgStatSock = -1;
 static int                                     pgStatPipe[2];
 static struct sockaddr_in      pgStatAddr;
-static int                                     pgStatPmPipe[2];
+static int                                     pgStatPmPipe[2] = { -1, -1 };
 
 static int                                     pgStatRunning = 0;
 static int                                     pgStatPid;
@@ -97,6 +97,7 @@ static char                                   pgStat_fname[MAXPGPATH];
  */
 static void            pgstat_main(int real_argc, char *real_argv[]);
 static void            pgstat_recvbuffer(int real_argc, char *real_argv[]);
+static void            pgstat_die(SIGNAL_ARGS);
 
 static int             pgstat_add_backend(PgStat_MsgHdr *msg);
 static void            pgstat_sub_backend(int procpid);
@@ -166,7 +167,7 @@ pgstat_init(void)
                return 0;
 
        /*
-        * Create the UDP socket for receiving statistic messages
+        * Create the UDP socket for sending and receiving statistic messages
         */
        if ((pgStatSock = socket(PF_INET, SOCK_DGRAM, 0)) < 0)
        {
@@ -198,7 +199,24 @@ pgstat_init(void)
        }
 
        /*
-        * Set the socket to non-blocking IO
+        * Connect the socket to its own address.  This saves a few cycles
+        * by not having to respecify the target address on every send.
+        * This also provides a kernel-level check that only packets from
+        * this same address will be received.
+        */
+       if (connect(pgStatSock, (struct sockaddr *)&pgStatAddr, alen) < 0)
+       {
+               perror("PGSTAT: connect(2)");
+               close(pgStatSock);
+               pgStatSock = -1;
+               return -1;
+       }
+
+       /*
+        * Set the socket to non-blocking IO.  This ensures that if the
+        * collector falls behind (despite the buffering process), statistics
+        * messages will be discarded; backends won't block waiting to send
+        * messages to the collector.
         */
        if (fcntl(pgStatSock, F_SETFL, O_NONBLOCK) < 0)
        {
@@ -253,9 +271,9 @@ pgstat_start(int real_argc, char *real_argv[])
        }
 
        /*
-        * Then fork off the collector.
+        * Then fork off the collector.  Remember its PID for pgstat_ispgstat.
         */
-       switch(pgStatPid = (int)fork())
+       switch ((pgStatPid = (int)fork()))
        {
                case -1:
                        perror("PGSTAT: fork(2)");
@@ -270,6 +288,9 @@ pgstat_start(int real_argc, char *real_argv[])
                        return 0;
        }
 
+       /* in postmaster child ... */
+       ClosePostmasterPorts(false);
+
        pgstat_main(real_argc, real_argv);
 
        exit(0);
@@ -297,6 +318,25 @@ pgstat_ispgstat(int pid)
 
 
 /* ----------
+ * pgstat_close_sockets() -
+ *
+ *     Called when postmaster forks a non-pgstat child process, to close off
+ *     file descriptors that should not be held open in child processes.
+ * ----------
+ */
+void
+pgstat_close_sockets(void)
+{
+       if (pgStatPmPipe[0] >= 0)
+               close(pgStatPmPipe[0]);
+       pgStatPmPipe[0] = -1;
+       if (pgStatPmPipe[1] >= 0)
+               close(pgStatPmPipe[1]);
+       pgStatPmPipe[1] = -1;
+}
+
+
+/* ----------
  * pgstat_beterm() -
  *
  *     Called from postmaster to tell collector a backend terminated.
@@ -307,7 +347,7 @@ pgstat_beterm(int pid)
 {
        PgStat_MsgBeterm                msg;
 
-       if (!pgstat_collect_startcollector)
+       if (!pgstat_collect_startcollector || pgStatSock < 0)
                return;
 
        msg.m_hdr.m_type                = PGSTAT_MTYPE_BETERM;
@@ -1030,8 +1070,8 @@ pgstat_send(void *msg, int len)
 
        ((PgStat_MsgHdr *)msg)->m_size = len;
 
-       sendto(pgStatSock, msg, len, 0, 
-                       (struct sockaddr *)&pgStatAddr, sizeof(pgStatAddr));
+       send(pgStatSock, msg, len, 0);
+       /* We deliberately ignore any error from send() */
 }
 
 
@@ -1044,7 +1084,8 @@ pgstat_send(void *msg, int len)
 /* ----------
  * pgstat_main() -
  *
- *     The statistics collector itself.
+ *     Start up the statistics collector itself.  This is the body of the
+ *     postmaster child process.
  * ----------
  */
 static void
@@ -1052,10 +1093,11 @@ pgstat_main(int real_argc, char *real_argv[])
 {
        PgStat_Msg      msg;
        fd_set                  rfds;
+       int                             readPipe;
+       int                             pmPipe = pgStatPmPipe[0];
        int                             maxfd;
        int                             nready;
-       int                             len;
-       int                             dlen;
+       int                             len = 0;
        struct timeval  timeout;
        struct timeval  next_statwrite;
        bool                    need_statwrite;
@@ -1067,9 +1109,11 @@ pgstat_main(int real_argc, char *real_argv[])
         * as well.
         */
        close(pgStatPmPipe[1]);
+       pgStatPmPipe[1] = -1;
 
        /*
-        * Ignore all signals usually bound to some action in the postmaster
+        * Ignore all signals usually bound to some action in the postmaster,
+        * except for SIGCHLD --- see pgstat_recvbuffer.
         */
        pqsignal(SIGHUP, SIG_IGN);
        pqsignal(SIGINT, SIG_IGN);
@@ -1079,15 +1123,22 @@ pgstat_main(int real_argc, char *real_argv[])
        pqsignal(SIGPIPE, SIG_IGN);
        pqsignal(SIGUSR1, SIG_IGN);
        pqsignal(SIGUSR2, SIG_IGN);
-       pqsignal(SIGCHLD, SIG_DFL);
+       pqsignal(SIGCHLD, pgstat_die);
        pqsignal(SIGTTIN, SIG_DFL);
        pqsignal(SIGTTOU, SIG_DFL);
        pqsignal(SIGCONT, SIG_DFL);
        pqsignal(SIGWINCH, SIG_DFL);
 
        /*
-        * Start a buffering subprocess to read from the socket, so
+        * Start a buffering process to read from the socket, so
         * we have a little more time to process incoming messages.
+        *
+        * NOTE: the process structure is: postmaster is parent of buffer process
+        * is parent of collector process.  This way, the buffer can detect
+        * collector failure via SIGCHLD, whereas otherwise it wouldn't notice
+        * collector failure until it tried to write on the pipe.  That would mean
+        * that after the postmaster started a new collector, we'd have two buffer
+        * processes competing to read from the UDP socket --- not good.
         */
        if (pipe(pgStatPipe) < 0)
        {
@@ -1095,26 +1146,32 @@ pgstat_main(int real_argc, char *real_argv[])
                exit(1);
        }
 
-       switch(fork())
+       switch (fork())
        {
                case -1:
                        perror("PGSTAT: fork(2)");
                        exit(1);
 
                case 0:
-                       close(pgStatPipe[0]);
-                       /* child process should die if can't pipe to parent collector */
-                       pqsignal(SIGPIPE, SIG_DFL);
-                       pgstat_recvbuffer(real_argc, real_argv);
-                       exit(2);
-
-               default:
+                       /* child becomes collector process */
                        close(pgStatPipe[1]);
                        close(pgStatSock);
                        break;
+
+               default:
+                       /* parent becomes buffer process */
+                       close(pgStatPipe[0]);
+                       pgstat_recvbuffer(real_argc, real_argv);
+                       exit(0);
        }
 
        /*
+        * In the child we can have default SIGCHLD handling (in case we
+        * want to call system() here...)
+        */
+       pqsignal(SIGCHLD, SIG_DFL);
+
+       /*
         * Identify myself via ps
         *
         * WARNING: On some platforms the environment will be moved around to
@@ -1164,9 +1221,11 @@ pgstat_main(int real_argc, char *real_argv[])
        }
        memset(pgStatBeTable, 0, sizeof(PgStat_StatBeEntry) * MaxBackends);
 
+       readPipe = pgStatPipe[0];
+
        /*
         * Process incoming messages and handle all the reporting stuff
-        * until the postmaster waves us good bye.
+        * until there are no more messages.
         */
        for (;;)
        {
@@ -1196,13 +1255,13 @@ pgstat_main(int real_argc, char *real_argv[])
                 * Setup the descriptor set for select(2)
                 */
                FD_ZERO(&rfds);
-               FD_SET(pgStatPipe[0], &rfds);
-               FD_SET(pgStatPmPipe[0], &rfds);
+               FD_SET(readPipe, &rfds);
+               FD_SET(pmPipe, &rfds);
 
-               if (pgStatPipe[0] > pgStatPmPipe[0])
-                       maxfd = pgStatPipe[0];
+               if (readPipe > pmPipe)
+                       maxfd = readPipe;
                else
-                       maxfd = pgStatPmPipe[0];
+                       maxfd = pmPipe;
 
                /*
                 * Now wait for something to do.
@@ -1211,6 +1270,8 @@ pgstat_main(int real_argc, char *real_argv[])
                                                (need_statwrite) ? &timeout : NULL);
                if (nready < 0)
                {
+                       if (errno == EINTR)
+                               continue;
                        perror("PGSTAT: select(2)");
                        exit(1);
                }
@@ -1230,103 +1291,92 @@ pgstat_main(int real_argc, char *real_argv[])
                /*
                 * Check if there is a new statistics message to collect.
                 */
-               if (FD_ISSET(pgStatPipe[0], &rfds))
+               if (FD_ISSET(readPipe, &rfds))
                {
                        /*
-                        * If this is the first message after we wrote the stats
-                        * file the last time, setup the timeout that it'd be
-                        * written.
+                        * We may need to issue multiple read calls in case the
+                        * buffer process didn't write the message in a single write,
+                        * which is possible since it dumps its buffer bytewise.
+                        * In any case, we'd need two reads since we don't know the
+                        * message length initially.
                         */
-                       if (!need_statwrite)
-                       {
-                               gettimeofday(&next_statwrite, NULL);
-                               next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
-                               next_statwrite.tv_sec  += (next_statwrite.tv_usec / 1000000);
-                               next_statwrite.tv_usec %= 1000000;
-                               need_statwrite = TRUE;
-                       }
+                       int             nread = 0;
+                       int             targetlen = sizeof(PgStat_MsgHdr); /* initial */
 
-                       /*
-                        * Read the header.
-                        */
-                       len = read(pgStatPipe[0], &msg, sizeof(PgStat_MsgHdr));
-                       if (len < 0)
+                       while (nread < targetlen)
                        {
-                               perror("PGSTAT: read(2)");
-                               exit(1);
-                       }
-                       if (len == 0)
-                       {
-                               return;
-                       }
-                       if (len != sizeof(PgStat_MsgHdr))
-                       {
-                               fprintf(stderr, "PGSTAT: short read(2)");
-                               exit(1);
-                       }
-
-                       /*
-                        * And the body. We need to do it in two steps because
-                        * we don't know the length.
-                        */
-                       dlen = msg.msg_hdr.m_size - sizeof(PgStat_MsgHdr);
-                       if (dlen > 0)
-                       {
-                               len = read(pgStatPipe[0], 
-                                               ((char *)&msg) + sizeof(PgStat_MsgHdr), dlen);
+                               len = read(readPipe,
+                                                  ((char *) &msg) + nread,
+                                                  targetlen - nread);
                                if (len < 0)
                                {
+                                       if (errno == EINTR)
+                                               continue;
                                        perror("PGSTAT: read(2)");
                                        exit(1);
                                }
-                               if (len == 0)
-                               {
-                                       return;
-                               }
-                               if (len != dlen)
+                               if (len == 0)   /* EOF on the pipe! */
+                                       break;
+                               nread += len;
+                               if (nread == sizeof(PgStat_MsgHdr))
                                {
-                                       fprintf(stderr, "PGSTAT: short read(2)");
-                                       exit(1);
+                                       /* we have the header, compute actual msg length */
+                                       targetlen = msg.msg_hdr.m_size;
+                                       if (targetlen < (int) sizeof(PgStat_MsgHdr) ||
+                                               targetlen > (int) sizeof(msg))
+                                       {
+                                               /*
+                                                * Bogus message length implies that we got out
+                                                * of sync with the buffer process somehow.
+                                                * Abort so that we can restart both processes.
+                                                */
+                                               fprintf(stderr, "PGSTAT: bogus message length\n");
+                                               exit(1);
+                                       }
                                }
                        }
+                       /*
+                        * EOF on the pipe implies that the buffer process exited.
+                        * Fall out of outer loop.
+                        */
+                       if (len == 0)
+                               break;
 
                        /*
-                        * Distribute the message to the specific function
-                        * handling it.
+                        * Distribute the message to the specific function handling it.
                         */
-                       len += sizeof(PgStat_MsgHdr);
                        switch (msg.msg_hdr.m_type)
                        {
                                case PGSTAT_MTYPE_DUMMY:
                                        break;
 
                                case PGSTAT_MTYPE_BESTART:
-                                       pgstat_recv_bestart((PgStat_MsgBestart *)&msg, len);
+                                       pgstat_recv_bestart((PgStat_MsgBestart *)&msg, nread);
                                        break;
 
                                case PGSTAT_MTYPE_BETERM:
-                                       pgstat_recv_beterm((PgStat_MsgBeterm *)&msg, len);
+                                       pgstat_recv_beterm((PgStat_MsgBeterm *)&msg, nread);
                                        break;
 
                                case PGSTAT_MTYPE_TABSTAT:
-                                       pgstat_recv_tabstat((PgStat_MsgTabstat *)&msg, len);
+                                       pgstat_recv_tabstat((PgStat_MsgTabstat *)&msg, nread);
                                        break;
 
                                case PGSTAT_MTYPE_TABPURGE:
-                                       pgstat_recv_tabpurge((PgStat_MsgTabpurge *)&msg, len);
+                                       pgstat_recv_tabpurge((PgStat_MsgTabpurge *)&msg, nread);
                                        break;
 
                                case PGSTAT_MTYPE_ACTIVITY:
-                                       pgstat_recv_activity((PgStat_MsgActivity *)&msg, len);
+                                       pgstat_recv_activity((PgStat_MsgActivity *)&msg, nread);
                                        break;
 
                                case PGSTAT_MTYPE_DROPDB:
-                                       pgstat_recv_dropdb((PgStat_MsgDropdb *)&msg, len);
+                                       pgstat_recv_dropdb((PgStat_MsgDropdb *)&msg, nread);
                                        break;
 
                                case PGSTAT_MTYPE_RESETCOUNTER:
                                        pgstat_recv_resetcounter((PgStat_MsgResetcounter *)&msg,
-                                                                                               len);
+                                                                                        nread);
                                        break;
 
                                default:
@@ -1334,36 +1384,55 @@ pgstat_main(int real_argc, char *real_argv[])
                        }
 
                        /*
-                        * Globally count messages and start over.
+                        * Globally count messages.
                         */
                        pgStatNumMessages++;
-                       continue;
+
+                       /*
+                        * If this is the first message after we wrote the stats
+                        * file the last time, setup the timeout that it'd be
+                        * written.
+                        */
+                       if (!need_statwrite)
+                       {
+                               gettimeofday(&next_statwrite, NULL);
+                               next_statwrite.tv_usec += ((PGSTAT_STAT_INTERVAL) * 1000);
+                               next_statwrite.tv_sec  += (next_statwrite.tv_usec / 1000000);
+                               next_statwrite.tv_usec %= 1000000;
+                               need_statwrite = TRUE;
+                       }
                }
 
                /*
-                * If the postmaster pipe is ready for reading this means that
-                * the kernel must have closed it because of the termination
-                * of the postmaster (he never really writes to it). Give up
-                * then, but save the final stats in case we want to reuse
-                * them at startup in the future.
+                * Note that we do NOT check for postmaster exit inside the loop;
+                * only EOF on the buffer pipe causes us to fall out.  This ensures
+                * we don't exit prematurely if there are still a few messages in
+                * the buffer or pipe at postmaster shutdown.
                 */
-               if (FD_ISSET(pgStatPmPipe[0], &rfds))
-               {
-                       pgstat_write_statsfile();
-                       return;
-               }
        }
+
+       /*
+        * Okay, we saw EOF on the buffer pipe, so there are no more messages to
+        * process.  If the buffer process quit because of postmaster shutdown,
+        * we want to save the final stats to reuse at next startup.  But if the
+        * buffer process failed, it seems best not to (there may even now be a
+        * new collector firing up, and we don't want it to read a partially-
+        * rewritten stats file).  We can tell whether the postmaster is still
+        * alive by checking to see if the postmaster pipe is still open.  If it
+        * is read-ready (ie, EOF), the postmaster must have quit.
+        */
+       if (FD_ISSET(pmPipe, &rfds))
+               pgstat_write_statsfile();
 }
 
 
 /* ----------
  * pgstat_recvbuffer() -
  *
- *     This is a special receive buffer started by the statistics
- *     collector itself and running in a separate process. It's only
+ *     This is the body of the separate buffering process. Its only
  *     purpose is to receive messages from the UDP socket as fast as
- *     possible and forward them over a pipe into the collector
- *     itself.
+ *     possible and forward them over a pipe into the collector itself.
+ *     If the collector is slow to absorb messages, they are buffered here.
  * ----------
  */
 static void
@@ -1371,16 +1440,21 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
 {
        fd_set                          rfds;
        fd_set                          wfds;
+       int                                     writePipe = pgStatPipe[1];
+       int                                     pmPipe = pgStatPmPipe[0];
        int                                     maxfd;
        int                                     nready;
        int                                     len;
-       PgStat_Msg                 *msgbuffer = NULL;
-       int                                     msg_recv = 0;
-       int                                     msg_send = 0;
-       int                                     msg_have = 0;
+       int                                     xfr;
+       int                                     frm;
+       PgStat_Msg                      input_buffer;
+       char                       *msgbuffer;
+       int                                     msg_send = 0; /* next send index in buffer */
+       int                                     msg_recv = 0; /* next receive index */
+       int                                     msg_have = 0; /* number of bytes stored */
        struct sockaddr_in      fromaddr;
        int                                     fromlen;
-       int                                     overflow = 0;
+       bool                            overflow = false;
 
        /*
         * Identify myself via ps
@@ -1392,10 +1466,29 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
        set_ps_display("");
 
        /*
+        * We want to die if our child collector process does.  There are two ways
+        * we might notice that it has died: receive SIGCHLD, or get a write
+        * failure on the pipe leading to the child.  We can set SIGPIPE to kill
+        * us here.  Our SIGCHLD handler was already set up before we forked (must
+        * do it that way, else it's a race condition).
+        */
+       pqsignal(SIGPIPE, SIG_DFL);
+       PG_SETMASK(&UnBlockSig);
+
+       /*
+        * Set the write pipe to nonblock mode, so that we cannot block when
+        * the collector falls behind.
+        */
+       if (fcntl(writePipe, F_SETFL, O_NONBLOCK) < 0)
+       {
+               perror("PGSTATBUFF: fcntl(2)");
+               exit(1);
+       }
+
+       /*
         * Allocate the message buffer
         */
-       msgbuffer = (PgStat_Msg *)malloc(sizeof(PgStat_Msg) *
-                               PGSTAT_RECVBUFFERSZ);
+       msgbuffer = (char *) malloc(PGSTAT_RECVBUFFERSZ);
        if (msgbuffer == NULL)
        {
                perror("PGSTATBUFF: malloc()");
@@ -1415,22 +1508,21 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
                 * As long as we have buffer space we add the socket
                 * to the read descriptor set.
                 */
-               if (msg_have < PGSTAT_RECVBUFFERSZ)
+               if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg)))
                {
                        FD_SET(pgStatSock, &rfds);
                        maxfd = pgStatSock;
-                       overflow = 0;
+                       overflow = false;
                }
                else
                {
-                       if (overflow == 0)
+                       if (!overflow)
                        {
-                               fprintf(stderr, "PGSTAT: Warning - receive buffer full\n");
-                               overflow = 1;
+                               fprintf(stderr, "PGSTATBUFF: Warning - receive buffer full\n");
+                               overflow = true;
                        }
                }
 
-
                /*
                 * If we have messages to write out, we add the pipe
                 * to the write descriptor set. Otherwise, we check if
@@ -1438,24 +1530,25 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
                 */
                if (msg_have > 0)
                {
-                       FD_SET(pgStatPipe[1], &wfds);
-                       if (pgStatPipe[1] > maxfd)
-                               maxfd = pgStatPipe[1];
+                       FD_SET(writePipe, &wfds);
+                       if (writePipe > maxfd)
+                               maxfd = writePipe;
                }
                else
                {
-                       FD_SET(pgStatPmPipe[0], &rfds);
-                       if (pgStatPmPipe[0] > maxfd)
-                               maxfd = pgStatPmPipe[0];
+                       FD_SET(pmPipe, &rfds);
+                       if (pmPipe > maxfd)
+                               maxfd = pmPipe;
                }
 
-
                /*
                 * Wait for some work to do.
                 */
                nready = select(maxfd + 1, &rfds, &wfds, NULL, NULL);
                if (nready < 0)
                {
+                       if (errno == EINTR)
+                               continue;
                        perror("PGSTATBUFF: select(2)");
                        exit(1);
                }
@@ -1468,8 +1561,8 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
                {
                        fromlen = sizeof(fromaddr);
                        len = recvfrom(pgStatSock, 
-                                               &msgbuffer[msg_recv], sizeof(PgStat_Msg), 0,
-                                               (struct sockaddr *)&fromaddr, &fromlen);
+                                                  &input_buffer, sizeof(PgStat_Msg), 0,
+                                                  (struct sockaddr *) &fromaddr, &fromlen);
                        if (len < 0)
                        {
                                perror("PGSTATBUFF: recvfrom(2)");
@@ -1485,13 +1578,15 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
                        /*
                         * The received length must match the length in the header
                         */
-                       if (msgbuffer[msg_recv].msg_hdr.m_size != len)
+                       if (input_buffer.msg_hdr.m_size != len)
                                continue;
 
                        /*
                         * The source address of the packet must be our own socket.
                         * This ensures that only real hackers or our own backends
-                        * tell us something.
+                        * tell us something.  (This should be redundant with a
+                        * kernel-level check due to having used connect(), but
+                        * let's do it anyway.)
                         */
                        if (fromaddr.sin_addr.s_addr != pgStatAddr.sin_addr.s_addr)
                                continue;
@@ -1499,44 +1594,67 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
                                continue;
                        
                        /*
-                        * O.K. - we accept this message.
+                        * O.K. - we accept this message.  Copy it to the circular
+                        * msgbuffer.
                         */
-                       msg_have++;
-                       msg_recv++;
-                       if (msg_recv == PGSTAT_RECVBUFFERSZ)
-                               msg_recv = 0;
+                       frm = 0;
+                       while (len > 0)
+                       {
+                               xfr = PGSTAT_RECVBUFFERSZ - msg_recv;
+                               if (xfr > len)
+                                       xfr = len;
+                               Assert(xfr > 0);
+                               memcpy(msgbuffer + msg_recv,
+                                          ((char *) &input_buffer) + frm,
+                                          xfr);
+                               msg_recv += xfr;
+                               if (msg_recv == PGSTAT_RECVBUFFERSZ)
+                                       msg_recv = 0;
+                               msg_have += xfr;
+                               frm += xfr;
+                               len -= xfr;
+                       }
                }
 
                /*
-                * If the collector is ready to receive, write a buffered
-                * message into his pipe.
+                * If the collector is ready to receive, write some data into his
+                * pipe.  We may or may not be able to write all that we have.
+                *
+                * NOTE: if what we have is less than PIPE_BUF bytes but more than
+                * the space available in the pipe buffer, most kernels will refuse
+                * to write any of it, and will return EAGAIN.  This means we will
+                * busy-loop until the situation changes (either because the collector
+                * caught up, or because more data arrives so that we have more than
+                * PIPE_BUF bytes buffered).  This is not good, but is there any way
+                * around it?  We have no way to tell when the collector has
+                * caught up...
                 */
-               if (FD_ISSET(pgStatPipe[1], &wfds))
+               if (FD_ISSET(writePipe, &wfds))
                {
-                       len = write(pgStatPipe[1], &msgbuffer[msg_send], 
-                                               msgbuffer[msg_send].msg_hdr.m_size);
+                       xfr = PGSTAT_RECVBUFFERSZ - msg_send;
+                       if (xfr > msg_have)
+                               xfr = msg_have;
+                       Assert(xfr > 0);
+                       len = write(writePipe, msgbuffer + msg_send, xfr);
                        if (len < 0)
                        {
+                               if (errno == EINTR || errno == EAGAIN)
+                                       continue;       /* not enough space in pipe */
                                perror("PGSTATBUFF: write(2)");
                                exit(1);
                        }
-                       if (len != msgbuffer[msg_send].msg_hdr.m_size)
-                       {
-                               fprintf(stderr, "PGSTATBUFF: short write(2)");
-                               exit(1);
-                       }
-
-                       msg_have--;
-                       msg_send++;
+                       /* NB: len < xfr is okay */
+                       msg_send += len;
                        if (msg_send == PGSTAT_RECVBUFFERSZ)
                                msg_send = 0;
+                       msg_have -= len;
                }
 
                /*
                 * Make sure we forwarded all messages before we check for
                 * Postmaster termination.
                 */
-               if (FD_ISSET(pgStatSock, &rfds) || FD_ISSET(pgStatPipe[1], &wfds))
+               if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds))
                        continue;
 
                /*
@@ -1544,16 +1662,22 @@ pgstat_recvbuffer(int real_argc, char *real_argv[])
                 * the kernel must have closed it on exit() (the postmaster
                 * never really writes to it). So we've done our job.
                 */
-               if (FD_ISSET(pgStatPmPipe[0], &rfds))
+               if (FD_ISSET(pmPipe, &rfds))
                        exit(0);
        }
 }
 
+static void
+pgstat_die(SIGNAL_ARGS)
+{
+       exit(1);
+}
+
 
 /* ----------
  * pgstat_add_backend() -
  *
- *     Support function to keep our backen list up to date.
+ *     Support function to keep our backend list up to date.
  * ----------
  */
 static int
@@ -2414,7 +2538,7 @@ pgstat_recv_tabstat(PgStat_MsgTabstat *msg, int len)
 
        /*
         * If the database is marked for destroy, this is a delayed
-        * UDP packet and not worth beeing counted.
+        * UDP packet and not worth being counted.
         */
        if (dbentry->destroy > 0)
                return;
index 511f9bc..c303663 100644 (file)
@@ -37,7 +37,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.234 2001/08/04 00:14:43 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.235 2001/08/05 02:06:50 tgl Exp $
  *
  * NOTES
  *
@@ -227,7 +227,6 @@ extern int  optreset;
 static void pmdaemonize(int argc, char *argv[]);
 static Port *ConnCreate(int serverFd);
 static void ConnFree(Port *port);
-static void ClosePostmasterPorts(void);
 static void reset_shared(unsigned short port);
 static void SIGHUP_handler(SIGNAL_ARGS);
 static void pmdie(SIGNAL_ARGS);
@@ -1241,8 +1240,8 @@ ConnFree(Port *conn)
  * that are not needed by that child process.  The postmaster still has
  * them open, of course.
  */
-static void
-ClosePostmasterPorts(void)
+void
+ClosePostmasterPorts(bool pgstat_too)
 {
        /* Close the listen sockets */
        if (NetServer)
@@ -1252,6 +1251,9 @@ ClosePostmasterPorts(void)
        StreamClose(ServerSock_UNIX);
        ServerSock_UNIX = INVALID_SOCK;
 #endif
+       /* Close pgstat control sockets, unless we're starting pgstat itself */
+       if (pgstat_too)
+               pgstat_close_sockets();
 }
 
 
@@ -1900,7 +1902,7 @@ DoBackend(Port *port)
         */
 
        /* Close the postmaster's other sockets */
-       ClosePostmasterPorts();
+       ClosePostmasterPorts(true);
 
        /* Save port etc. for ps status */
        MyProcPort = port;
@@ -2224,7 +2226,7 @@ SSDataBase(int xlop)
                on_exit_reset();
 
                /* Close the postmaster's sockets */
-               ClosePostmasterPorts();
+               ClosePostmasterPorts(true);
 
                /* Set up command-line arguments for subprocess */
                av[ac++] = "postgres";
index ce4255c..c3afaf8 100644 (file)
@@ -12,7 +12,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: miscadmin.h,v 1.87 2001/06/18 21:38:02 momjian Exp $
+ * $Id: miscadmin.h,v 1.88 2001/08/05 02:06:50 tgl Exp $
  *
  * NOTES
  *       some of the information in this file should be moved to
@@ -108,6 +108,7 @@ extern void ProcessInterrupts(void);
  * from postmaster/postmaster.c
  */
 extern int     PostmasterMain(int argc, char *argv[]);
+extern void ClosePostmasterPorts(bool pgstat_too);
 
 /*
  * from utils/init/globals.c
index fc9dc43..646cc27 100644 (file)
@@ -5,7 +5,7 @@
  *
  *     Copyright (c) 2001, PostgreSQL Global Development Group
  *
- *  $Id: pgstat.h,v 1.5 2001/08/04 00:14:43 tgl Exp $
+ *  $Id: pgstat.h,v 1.6 2001/08/05 02:06:50 tgl Exp $
  * ----------
  */
 #ifndef PGSTAT_H
  * ----------
  */
 #define PGSTAT_STAT_INTERVAL   500             /* How often to write the status        */
-                                                                               /* file in milliseconds.                        */
+                                                                               /* file, in milliseconds.                       */
 
 #define PGSTAT_DESTROY_DELAY   10000   /* How long to keep destroyed           */
                                                                                /* objects known to give delayed        */
-                                                                               /* UDP packets time to arrive           */
+                                                                               /* UDP packets time to arrive,          */
                                                                                /* in milliseconds.                                     */
 
 #define PGSTAT_DESTROY_COUNT   (PGSTAT_DESTROY_DELAY                                   \
@@ -36,7 +36,7 @@
 
 
 /* ----------
- * How much of the actual query to send to the collector.
+ * How much of the actual query string to send to the collector.
  * ----------
  */
 #define PGSTAT_ACTIVITY_SIZE   256
 #define        PGSTAT_MTYPE_RESETCOUNTER       7
 
 /* ----------
- * TODO
- * For simplicity now, the number of messages buffered in
- * pgstat_recvbuffer(). Should be an amount of bytes used
- * for a gapless wraparound buffer.
+ * Amount of space reserved in pgstat_recvbuffer().
  * ----------
  */
-#define        PGSTAT_RECVBUFFERSZ             1024
+#define        PGSTAT_RECVBUFFERSZ             ((int) (1024 * sizeof(PgStat_Msg)))
 
 
 /* ----------
@@ -338,6 +335,7 @@ extern bool         pgstat_collect_blocklevel;
 extern int             pgstat_init(void);
 extern int             pgstat_start(int real_argc, char *real_argv[]);
 extern int             pgstat_ispgstat(int pid);
+extern void            pgstat_close_sockets(void);
 extern void            pgstat_beterm(int pid);
 
 /* ----------