OSDN Git Service

Substantial rewrite of async.c to avoid problems with non-reentrant stdio
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 6 Oct 1998 02:40:09 +0000 (02:40 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 6 Oct 1998 02:40:09 +0000 (02:40 +0000)
and possibly other problems.  Minor changes in xact.c and postgres.c's
main loop to support new handling of async NOTIFY.

src/backend/access/transam/xact.c
src/backend/commands/async.c
src/backend/tcop/postgres.c
src/backend/utils/misc/trace.c
src/include/access/xact.h
src/include/commands/async.h
src/include/utils/trace.h

index d082c80..24db5be 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.23 1998/09/01 04:27:19 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.24 1998/10/06 02:39:58 tgl Exp $
  *
  * NOTES
  *             Transaction aborts can now occur two ways:
@@ -901,6 +901,9 @@ CommitTransaction()
        /* handle commit for large objects [ PA, 7/17/98 ] */
        _lo_commit();
 
+       /* NOTIFY commit must also come before lower-level cleanup */
+       AtCommit_Notify();
+
        CloseSequences();
        DestroyTempRels();
        AtEOXact_portals();
@@ -916,10 +919,6 @@ CommitTransaction()
         * ----------------
         */
        s->state = TRANS_DEFAULT;
-       {                                                       /* want this after commit */
-               if (IsNormalProcessingMode())
-                       Async_NotifyAtCommit();
-       }
 
        /*
         * Let others to know about no transaction in progress - vadim
@@ -967,6 +966,7 @@ AbortTransaction()
         *      do abort processing
         * ----------------
         */
+       AtAbort_Notify();
        CloseSequences();
        AtEOXact_portals();
        RecordTransactionAbort();
@@ -982,17 +982,6 @@ AbortTransaction()
         * ----------------
         */
        s->state = TRANS_DEFAULT;
-       {
-
-               /*
-                * We need to do this in case another process notified us while we
-                * are in the middle of an aborted transaction.  We need to notify
-                * our frontend after we finish the current transaction. -- jw,
-                * 1/3/94
-                */
-               if (IsNormalProcessingMode())
-                       Async_NotifyAtAbort();
-       }
 }
 
 /* --------------------------------
@@ -1455,6 +1444,30 @@ UserAbortTransactionBlock()
        s->blockState = TBLOCK_ENDABORT;
 }
 
+/* --------------------------------
+ *             AbortOutOfAnyTransaction
+ *
+ * This routine is provided for error recovery purposes.  It aborts any
+ * active transaction or transaction block, leaving the system in a known
+ * idle state.
+ * --------------------------------
+ */
+void
+AbortOutOfAnyTransaction()
+{
+       TransactionState s = CurrentTransactionState;
+
+       /*
+        * Get out of any low-level transaction
+        */
+       if (s->state != TRANS_DEFAULT)
+               AbortTransaction();
+       /*
+        * Now reset the high-level state
+        */
+       s->blockState = TBLOCK_DEFAULT;
+}
+
 bool
 IsTransactionBlock()
 {
index 1212197..a8c447c 100644 (file)
@@ -1,38 +1,79 @@
 /*-------------------------------------------------------------------------
  *
  * async.c--
- *       Asynchronous notification
+ *       Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.40 1998/09/01 04:27:42 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.41 1998/10/06 02:39:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
-/* New Async Notification Model:
+
+/*-------------------------------------------------------------------------
+ * New Async Notification Model:
  * 1. Multiple backends on same machine.  Multiple backends listening on
- *       one relation.
- *
- * 2. One of the backend does a 'notify <relname>'.  For all backends that
- *       are listening to this relation (all notifications take place at the
- *       end of commit),
- *       2.a  If the process is the same as the backend process that issued
- *                notification (we are notifying something that we are listening),
- *                signal the corresponding frontend over the comm channel.
- *       2.b  For all other listening processes, we send kill(SIGUSR2) to wake up
- *                the listening backend.
- * 3. Upon receiving a kill(SIGUSR2) signal from another backend process
- *       notifying that one of the relation that we are listening is being
- *       notified, we can be in either of two following states:
- *       3.a  We are sleeping, wake up and signal our frontend.
- *       3.b  We are in middle of another transaction, wait until the end of
- *                of the current transaction and signal our frontend.
- * 4. Each frontend receives this notification and processes accordingly.
- *
- * -- jw, 12/28/93
- *
+ *       one relation.  (Note: "listening on a relation" is not really the
+ *       right way to think about it, since the notify names need not have
+ *       anything to do with the names of relations actually in the database.
+ *       But this terminology is all over the code and docs, and I don't feel
+ *       like trying to replace it.)
+ *
+ * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
+ *       ie, each relname/listenerPID pair.  The "notification" field of the
+ *       tuple is zero when no NOTIFY is pending for that listener, or the PID
+ *       of the originating backend when a cross-backend NOTIFY is pending.
+ *       (We skip writing to pg_listener when doing a self-NOTIFY, so the
+ *       notification field should never be equal to the listenerPID field.)
+ *
+ * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
+ *       relname to a list of outstanding NOTIFY requests.  Actual processing
+ *       happens if and only if we reach transaction commit.  At that time (in
+ *       routine AtCommit_Notify) we scan pg_listener for matching relnames.
+ *    If the listenerPID in a matching tuple is ours, we just send a notify
+ *       message to our own front end.  If it is not ours, and "notification"
+ *       is not already nonzero, we set notification to our own PID and send a
+ *       SIGUSR2 signal to the receiving process (indicated by listenerPID).
+ *       BTW: if the signal operation fails, we presume that the listener backend
+ *    crashed without removing this tuple, and remove the tuple for it.
+ *
+ * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
+ *       notify processing immediately if this backend is idle (ie, it is
+ *       waiting for a frontend command and is not within a transaction block).
+ *    Otherwise the handler may only set a flag, which will cause the
+ *       processing to occur just before we next go idle.
+ *
+ * 5. Inbound-notify processing consists of scanning pg_listener for tuples
+ *       matching our own listenerPID and having nonzero notification fields.
+ *       For each such tuple, we send a message to our frontend and clear the
+ *       notification field.  BTW: this routine has to start/commit its own
+ *       transaction, since by assumption it is only called from outside any
+ *       transaction.
+ *
+ * Note that the system's use of pg_listener is confined to very short
+ * intervals at the end of a transaction that contains NOTIFY statements,
+ * or during the transaction caused by an inbound SIGUSR2.  So the fact that
+ * pg_listener is a global resource shouldn't cause too much performance
+ * problem.  But application authors ought to be discouraged from doing
+ * LISTEN or UNLISTEN near the start of a long transaction --- that would
+ * result in holding the pg_listener write lock for a long time, possibly
+ * blocking unrelated activity.  It could even lead to deadlock against another
+ * transaction that touches the same user tables and then tries to NOTIFY.
+ * Probably best to do LISTEN or UNLISTEN outside of transaction blocks.
+ *
+ * An application that listens on the same relname it notifies will get
+ * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
+ * by comparing be_pid in the NOTIFY message to the application's own backend's
+ * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
+ * frontend during startup.)  The above design guarantees that notifies from
+ * other backends will never be missed by ignoring self-notifies.  Note,
+ * however, that we do *not* guarantee that a separate frontend message will
+ * be sent for every outside NOTIFY.  Since there is only room for one
+ * originating PID in pg_listener, outside notifies occurring at about the
+ * same time may be collapsed into a single message bearing the PID of the
+ * first outside backend to perform the NOTIFY.
+ *-------------------------------------------------------------------------
  */
 
 #include <unistd.h>
 
 #include "postgres.h"
 
+#include "commands/async.h"
 #include "access/heapam.h"
 #include "access/relscan.h"
 #include "access/xact.h"
 #include "catalog/catname.h"
 #include "catalog/pg_listener.h"
-#include "commands/async.h"
 #include "fmgr.h"
 #include "lib/dllist.h"
 #include "libpq/libpq.h"
 #include "miscadmin.h"
-#include "nodes/memnodes.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "tcop/dest.h"
-#include "utils/mcxt.h"
 #include "utils/syscache.h"
 #include <utils/trace.h>
 #include <utils/ps_status.h>
 
-#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK]
-#define NotifyHack      pg_options[OPT_NOTIFYHACK]
-
+/* stuff that we really ought not be touching directly :-( */
 extern TransactionState CurrentTransactionState;
 extern CommandDest whereToSendOutput;
 
-GlobalMemory notifyContext = NULL;
-
-static int     notifyFrontEndPending = 0;
-static int     notifyIssued = 0;
+/*
+ * State for outbound notifies consists of a list of all relnames NOTIFYed
+ * in the current transaction.  We do not actually perform a NOTIFY until
+ * and unless the transaction commits.  pendingNotifies is NULL if no
+ * NOTIFYs have been done in the current transaction.
+ */
 static Dllist *pendingNotifies = NULL;
 
-static int     AsyncExistsPendingNotify(char *);
-static void ClearPendingNotify(void);
-static void Async_NotifyFrontEnd(void);
-static void Async_NotifyFrontEnd_Aux(void);
-void           Async_Unlisten(char *relname, int pid);
-static void Async_UnlistenOnExit(int code, char *relname);
-static void Async_UnlistenAll(void);
-
 /*
- *--------------------------------------------------------------
- * Async_NotifyHandler --
- *
- *             This is the signal handler for SIGUSR2.  When the backend
- *             is signaled, the backend can be in two states.
- *             1. If the backend is in the middle of another transaction,
- *                we set the flag, notifyFrontEndPending, and wait until
- *                the end of the transaction to notify the front end.
- *             2. If the backend is not in the middle of another transaction,
- *                we notify the front end immediately.
- *
- *             -- jw, 12/28/93
- * Results:
- *             none
- *
- * Side effects:
- *             none
+ * State for inbound notifies consists of two flags: one saying whether
+ * the signal handler is currently allowed to call ProcessIncomingNotify
+ * directly, and one saying whether the signal has occurred but the handler
+ * was not allowed to call ProcessIncomingNotify at the time.
+ *
+ * NB: the "volatile" on these declarations is critical!  If your compiler
+ * does not grok "volatile", you'd be best advised to compile this file
+ * with all optimization turned off.
  */
-void
-Async_NotifyHandler(SIGNAL_ARGS)
-{
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
+static volatile int    notifyInterruptEnabled = 0;
+static volatile int    notifyInterruptOccurred = 0;
 
-       if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-               (CurrentTransactionState->blockState == TRANS_DEFAULT))
-       {
-               TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
-                               "waking up sleeping backend process");
-               PS_SET_STATUS("async_notify");
-               Async_NotifyFrontEnd();
-               PS_SET_STATUS("idle");
-       }
-       else
-       {
-               TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: "
-                        "process in middle of transaction, state=%d, blockstate=%d",
-                               CurrentTransactionState->state,
-                               CurrentTransactionState->blockState);
-               notifyFrontEndPending = 1;
-               TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
-       }
+/* True if we've registered an on_shmem_exit cleanup (or at least tried to). */
+static int     unlistenExitRegistered = 0;
+
+
+static void Async_UnlistenAll(void);
+static void Async_UnlistenOnExit(void);
+static void ProcessIncomingNotify(void);
+static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
+static int     AsyncExistsPendingNotify(char *relname);
+static void ClearPendingNotifies(void);
 
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
-}
 
 /*
  *--------------------------------------------------------------
@@ -136,253 +146,40 @@ Async_NotifyHandler(SIGNAL_ARGS)
  *             This is executed by the SQL notify command.
  *
  *             Adds the relation to the list of pending notifies.
- *             All notification happens at end of commit.
- *             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
- *
- *             All notification of backend processes happens here,
- *             then each backend notifies its corresponding front end at
- *             the end of commit.
- *
- *             -- jw, 12/28/93
+ *             Actual notification happens during transaction commit.
+ *             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  *
  * Results:
  *             XXX
  *
- * Side effects:
- *             All tuples for relname in pg_listener are updated.
- *
  *--------------------------------------------------------------
  */
 void
 Async_Notify(char *relname)
 {
-
-       HeapTuple       lTuple,
-                               rTuple;
-       Relation        lRel;
-       HeapScanDesc sRel;
-       TupleDesc       tdesc;
-       ScanKeyData key;
-       Datum           d,
-                               value[3];
-       bool            isnull;
-       char            repl[3],
-                               nulls[3];
-
        char       *notifyName;
 
        TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);
 
-       if (!pendingNotifies)
-               pendingNotifies = DLNewList();
-
        /*
-        * Allocate memory from the global malloc pool because it needs to be
-        * referenced also when the transaction is finished.  DZ - 26-08-1996
+        * We allocate list memory from the global malloc pool to ensure that
+        * it will live until we want to use it.  This is probably not necessary
+        * any longer, since we will use it before the end of the transaction.
+        * DLList only knows how to use malloc() anyway, but we could probably
+        * palloc() the strings...
         */
+       if (!pendingNotifies)
+               pendingNotifies = DLNewList();
        notifyName = strdup(relname);
        DLAddHead(pendingNotifies, DLNewElem(notifyName));
-
-       ScanKeyEntryInitialize(&key, 0,
-                                                  Anum_pg_listener_relname,
-                                                  F_NAMEEQ,
-                                                  PointerGetDatum(notifyName));
-
-       lRel = heap_openr(ListenerRelationName);
-       tdesc = RelationGetDescr(lRel);
-       RelationSetLockForWrite(lRel);
-       sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
-
-       nulls[0] = nulls[1] = nulls[2] = ' ';
-       repl[0] = repl[1] = repl[2] = ' ';
-       repl[Anum_pg_listener_notify - 1] = 'r';
-       value[0] = value[1] = value[2] = (Datum) 0;
-       value[Anum_pg_listener_notify - 1] = Int32GetDatum(1);
-
-       while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
-       {
-               d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
-               if (!DatumGetInt32(d))
-               {
-                       rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
-                       heap_replace(lRel, &lTuple->t_ctid, rTuple);
-                       /* notify is really issued only if a tuple has been changed */
-                       notifyIssued = 1;
-               }
-       }
-       heap_endscan(sRel);
-
        /*
-        * Note: if the write lock is unset we can get multiple tuples with
-        * same oid if other backends notify the same relation. Use this
-        * option at your own risk.
+        * NOTE: we could check to see if pendingNotifies already has an entry
+        * for relname, and thus avoid making duplicate entries.  However, most
+        * apps probably don't notify the same name multiple times per transaction,
+        * so we'd likely just be wasting cycles to make such a check.
+        * AsyncExistsPendingNotify() doesn't really care whether the list
+        * contains duplicates...
         */
-       if (NotifyUnlock)
-               RelationUnsetLockForWrite(lRel);
-
-       heap_close(lRel);
-
-       TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtCommit --
- *
- *             This is called at transaction commit.
- *
- *             Signal our corresponding frontend process on relations that
- *             were notified.  Signal all other backend process that
- *             are listening also.
- *
- *             -- jw, 12/28/93
- *
- * Results:
- *             XXX
- *
- * Side effects:
- *             Tuples in pg_listener that has our listenerpid are updated so
- *             that the notification is 0.  We do not want to notify frontend
- *             more than once.
- *
- *             -- jw, 12/28/93
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtCommit()
-{
-       HeapTuple       lTuple;
-       Relation        lRel;
-       HeapScanDesc sRel;
-       TupleDesc       tdesc;
-       ScanKeyData key;
-       Datum           d;
-       bool            isnull;
-       extern TransactionState CurrentTransactionState;
-
-       if (!pendingNotifies)
-               pendingNotifies = DLNewList();
-
-       if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-               (CurrentTransactionState->blockState == TRANS_DEFAULT))
-       {
-               if (notifyIssued)
-               {
-                       /* 'notify <relname>' issued by us */
-                       notifyIssued = 0;
-                       StartTransactionCommand();
-                       TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
-                       ScanKeyEntryInitialize(&key, 0,
-                                                                  Anum_pg_listener_notify,
-                                                                  F_INT4EQ,
-                                                                  Int32GetDatum(1));
-                       lRel = heap_openr(ListenerRelationName);
-                       RelationSetLockForWrite(lRel);
-                       sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, &key);
-                       tdesc = RelationGetDescr(lRel);
-
-                       while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
-                       {
-                               d = heap_getattr(lTuple, Anum_pg_listener_relname,
-                                                                tdesc, &isnull);
-
-                               if (AsyncExistsPendingNotify((char *) DatumGetPointer(d)))
-                               {
-                                       d = heap_getattr(lTuple, Anum_pg_listener_pid,
-                                                                        tdesc, &isnull);
-
-                                       if (MyProcPid == DatumGetInt32(d))
-                                       {
-                                               notifyFrontEndPending = 1;
-                                               TPRINTF(TRACE_NOTIFY,
-                                                               "Async_NotifyAtCommit: notifying self");
-                                       }
-                                       else
-                                       {
-                                               TPRINTF(TRACE_NOTIFY,
-                                                               "Async_NotifyAtCommit: notifying pid %d",
-                                                               DatumGetInt32(d));
-#ifdef HAVE_KILL
-                                               if (kill(DatumGetInt32(d), SIGUSR2) < 0)
-                                               {
-                                                       if (errno == ESRCH)
-                                                               heap_delete(lRel, &lTuple->t_ctid);
-                                               }
-#endif
-                                       }
-                               }
-                       }
-                       heap_endscan(sRel);
-                       heap_close(lRel);
-
-                       /*
-                        * Notify the frontend inside the current transaction while we
-                        * still have a valid write lock on pg_listeners. This avoid
-                        * waiting until all other backends have finished with
-                        * pg_listener.
-                        */
-                       if (notifyFrontEndPending)
-                       {
-                               /* The aux version is called inside transaction */
-                               Async_NotifyFrontEnd_Aux();
-                       }
-
-                       TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done");
-                       CommitTransactionCommand();
-               }
-               else
-               {
-
-                       /*
-                        * No notifies issued by us. If notifyFrontEndPending has been
-                        * set by Async_NotifyHandler notify the frontend of pending
-                        * notifies from other backends.
-                        */
-                       if (notifyFrontEndPending)
-                               Async_NotifyFrontEnd();
-               }
-
-               ClearPendingNotify();
-       }
-}
-
-/*
- *--------------------------------------------------------------
- * Async_NotifyAtAbort --
- *
- *             This is called at transaction commit.
- *
- *             Gets rid of pending notifies.  List elements are automatically
- *             freed through memory context.
- *
- *
- * Results:
- *             XXX
- *
- * Side effects:
- *             XXX
- *
- *--------------------------------------------------------------
- */
-void
-Async_NotifyAtAbort()
-{
-       if (pendingNotifies)
-       {
-               ClearPendingNotify();
-               DLFreeList(pendingNotifies);
-       }
-       pendingNotifies = DLNewList();
-       notifyIssued = 0;
-
-       if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
-               (CurrentTransactionState->blockState == TRANS_DEFAULT))
-       {
-               /* don't forget to notify front end */
-               if (notifyFrontEndPending)
-                       Async_NotifyFrontEnd();
-       }
 }
 
 /*
@@ -394,108 +191,94 @@ Async_NotifyAtAbort()
  *             Register a backend (identified by its Unix PID) as listening
  *             on the specified relation.
  *
- *             One listener per relation, pg_listener relation is keyed
- *             on (relname,pid) to provide multiple listeners in future.
- *
  * Results:
- *             pg_listeners is updated.
+ *             XXX
  *
  * Side effects:
- *             XXX
+ *             pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
 Async_Listen(char *relname, int pid)
 {
-       Datum           values[Natts_pg_listener];
-       char            nulls[Natts_pg_listener];
+       Relation        lRel;
        TupleDesc       tdesc;
        HeapScanDesc scan;
        HeapTuple       tuple,
                                newtup;
-       Relation        lDesc;
+       Datum           values[Natts_pg_listener];
+       char            nulls[Natts_pg_listener];
        Datum           d;
        int                     i;
        bool            isnull;
        int                     alreadyListener = 0;
-       char       *relnamei;
        TupleDesc       tupDesc;
 
-       if (whereToSendOutput != Remote)
-       {
-               elog(NOTICE, "Async_Listen: "
-                        "listen not available on interactive sessions");
-               return;
-       }
-
        TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
-       for (i = 0; i < Natts_pg_listener; i++)
-       {
-               nulls[i] = ' ';
-               values[i] = PointerGetDatum(NULL);
-       }
 
-       i = 0;
-       values[i++] = (Datum) relname;
-       values[i++] = (Datum) pid;
-       values[i++] = (Datum) 0;        /* no notifies pending */
-
-       lDesc = heap_openr(ListenerRelationName);
-       RelationSetLockForWrite(lDesc);
+       lRel = heap_openr(ListenerRelationName);
+       RelationSetLockForWrite(lRel);
+       tdesc = RelationGetDescr(lRel);
 
-       /* is someone already listening.  One listener per relation */
-       tdesc = RelationGetDescr(lDesc);
-       scan = heap_beginscan(lDesc, 0, SnapshotNow, 0, (ScanKey) NULL);
+       /* Detect whether we are already listening on this relname */
+       scan = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
        while (HeapTupleIsValid(tuple = heap_getnext(scan, 0)))
        {
-               d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc,
-                                                &isnull);
-               relnamei = DatumGetPointer(d);
-               if (!strncmp(relnamei, relname, NAMEDATALEN))
+               d = heap_getattr(tuple, Anum_pg_listener_relname, tdesc, &isnull);
+               if (!strncmp((char *) DatumGetPointer(d), relname, NAMEDATALEN))
                {
                        d = heap_getattr(tuple, Anum_pg_listener_pid, tdesc, &isnull);
-                       pid = DatumGetInt32(d);
-                       if (pid == MyProcPid)
+                       if (DatumGetInt32(d) == pid)
+                       {
                                alreadyListener = 1;
-               }
-               if (alreadyListener)
-               {
-                       /* No need to scan the rest of the table */
-                       break;
+                               /* No need to scan the rest of the table */
+                               break;
+                       }
                }
        }
        heap_endscan(scan);
 
        if (alreadyListener)
        {
-               elog(NOTICE, "Async_Listen: We are already listening on %s",
-                        relname);
-               RelationUnsetLockForWrite(lDesc);
-               heap_close(lDesc);
+               elog(NOTICE, "Async_Listen: We are already listening on %s", relname);
+               RelationUnsetLockForWrite(lRel);
+               heap_close(lRel);
                return;
        }
 
-       tupDesc = lDesc->rd_att;
-       newtup = heap_formtuple(tupDesc, values, nulls);
-       heap_insert(lDesc, newtup);
-       pfree(newtup);
-
        /*
-        * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
-        * listener on %s (possibly dead)",relname); }
+        * OK to insert a new tuple
         */
 
-       RelationUnsetLockForWrite(lDesc);
-       heap_close(lDesc);
+       for (i = 0; i < Natts_pg_listener; i++)
+       {
+               nulls[i] = ' ';
+               values[i] = PointerGetDatum(NULL);
+       }
+
+       i = 0;
+       values[i++] = (Datum) relname;
+       values[i++] = (Datum) pid;
+       values[i++] = (Datum) 0;        /* no notifies pending */
+
+       tupDesc = lRel->rd_att;
+       newtup = heap_formtuple(tupDesc, values, nulls);
+       heap_insert(lRel, newtup);
+       pfree(newtup);
+
+       RelationUnsetLockForWrite(lRel);
+       heap_close(lRel);
 
        /*
-        * now that we are listening, we should make a note to ourselves to
-        * unlisten prior to dying.
+        * now that we are listening, make sure we will unlisten before dying.
         */
-       relnamei = malloc(NAMEDATALEN);         /* persists to process exit */
-       StrNCpy(relnamei, relname, NAMEDATALEN);
-       on_shmem_exit(Async_UnlistenOnExit, (caddr_t) relnamei);
+       if (! unlistenExitRegistered)
+       {
+               if (on_shmem_exit(Async_UnlistenOnExit, (caddr_t) NULL) < 0)
+                       elog(NOTICE, "Async_Listen: out of shmem_exit slots");
+               unlistenExitRegistered = 1;
+       }
 }
 
 /*
@@ -508,17 +291,17 @@ Async_Listen(char *relname, int pid)
  *             for the specified relation.
  *
  * Results:
- *             pg_listeners is updated.
+ *             XXX
  *
  * Side effects:
- *             XXX
+ *             pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 void
 Async_Unlisten(char *relname, int pid)
 {
-       Relation        lDesc;
+       Relation        lRel;
        HeapTuple       lTuple;
 
        /* Handle specially the `unlisten "*"' command */
@@ -530,17 +313,21 @@ Async_Unlisten(char *relname, int pid)
 
        TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname);
 
+       /* Note we assume there can be only one matching tuple. */
        lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
                                                                 Int32GetDatum(pid),
                                                                 0, 0);
        if (lTuple != NULL)
        {
-               lDesc = heap_openr(ListenerRelationName);
-               RelationSetLockForWrite(lDesc);
-               heap_delete(lDesc, &lTuple->t_ctid);
-               RelationUnsetLockForWrite(lDesc);
-               heap_close(lDesc);
+               lRel = heap_openr(ListenerRelationName);
+               RelationSetLockForWrite(lRel);
+               heap_delete(lRel, &lTuple->t_ctid);
+               RelationUnsetLockForWrite(lRel);
+               heap_close(lRel);
        }
+       /* We do not complain about unlistening something not being listened;
+        * should we?
+        */
 }
 
 /*
@@ -549,187 +336,487 @@ Async_Unlisten(char *relname, int pid)
  *
  *             Unlisten all relations for this backend.
  *
+ *             This is invoked by UNLISTEN "*" command, and also at backend exit.
+ *
  * Results:
- *             pg_listeners is updated.
+ *             XXX
  *
  * Side effects:
- *             XXX
+ *             pg_listener is updated.
  *
  *--------------------------------------------------------------
  */
 static void
 Async_UnlistenAll()
 {
-       HeapTuple       lTuple;
        Relation        lRel;
-       HeapScanDesc sRel;
        TupleDesc       tdesc;
+       HeapScanDesc sRel;
+       HeapTuple       lTuple;
        ScanKeyData key[1];
 
        TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll");
+
+       lRel = heap_openr(ListenerRelationName);
+       RelationSetLockForWrite(lRel);
+       tdesc = RelationGetDescr(lRel);
+
+       /* Find and delete all entries with my listenerPID */
        ScanKeyEntryInitialize(&key[0], 0,
                                                   Anum_pg_listener_pid,
                                                   F_INT4EQ,
                                                   Int32GetDatum(MyProcPid));
-       lRel = heap_openr(ListenerRelationName);
-       RelationSetLockForWrite(lRel);
-       tdesc = RelationGetDescr(lRel);
        sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
 
        while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
                heap_delete(lRel, &lTuple->t_ctid);
+
        heap_endscan(sRel);
        RelationUnsetLockForWrite(lRel);
        heap_close(lRel);
-       TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done");
 }
 
 /*
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
  * Async_UnlistenOnExit --
  *
- *             This is called at backend exit for each registered listen.
+ *             Clean up the pg_listener table at backend exit.
+ *
+ *             This is executed if we have done any LISTENs in this backend.
+ *             It might not be necessary anymore, if the user UNLISTENed everything,
+ *             but we don't try to detect that case.
  *
  * Results:
  *             XXX
  *
- * --------------------------------------------------------------
- */
-static void
-Async_UnlistenOnExit(int code, /* from exitpg */
-                                        char *relname)
-{
-       Async_Unlisten((char *) relname, MyProcPid);
-}
-
-/*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd --
- *
- *             This is called outside transactions. The real work is done
- *             by Async_NotifyFrontEnd_Aux().
+ * Side effects:
+ *             pg_listener is updated if necessary.
  *
- * --------------------------------------------------------------
+ *--------------------------------------------------------------
  */
 static void
-Async_NotifyFrontEnd()
+Async_UnlistenOnExit()
 {
+       /*
+        * We need to start/commit a transaction for the unlisten,
+        * but if there is already an active transaction we had better
+        * abort that one first.  Otherwise we'd end up committing changes
+        * that probably ought to be discarded.
+        */
+       AbortOutOfAnyTransaction();
+       /* Now we can do the unlisten */
        StartTransactionCommand();
-       Async_NotifyFrontEnd_Aux();
+       Async_UnlistenAll();
        CommitTransactionCommand();
 }
 
 /*
- * --------------------------------------------------------------
- * Async_NotifyFrontEnd_Aux --
+ *--------------------------------------------------------------
+ * AtCommit_Notify --
  *
- *             This must be called inside a transaction block.
+ *             This is called at transaction commit.
  *
- *             Perform an asynchronous notification to front end over
- *             portal comm channel.  The name of the relation which contains the
- *             data is sent to the front end.
+ *             If there are outbound notify requests in the pendingNotifies list,
+ *             scan pg_listener for matching tuples, and either signal the other
+ *             backend or send a message to our own frontend.
  *
- *             We remove the notification flag from the pg_listener tuple
- *             associated with our process.
+ *             NOTE: we are still inside the current transaction, therefore can
+ *             piggyback on its committing of changes.
  *
  * Results:
  *             XXX
  *
- * --------------------------------------------------------------
+ * Side effects:
+ *             Tuples in pg_listener that have matching relnames and other peoples'
+ *             listenerPIDs are updated with a nonzero notification field.
+ *
+ *--------------------------------------------------------------
  */
-static void
-Async_NotifyFrontEnd_Aux()
+void
+AtCommit_Notify()
 {
-       HeapTuple       lTuple,
-                               rTuple;
        Relation        lRel;
-       HeapScanDesc sRel;
        TupleDesc       tdesc;
-       ScanKeyData key[2];
+       HeapScanDesc sRel;
+       HeapTuple       lTuple,
+                               rTuple;
        Datum           d,
-                               value[3];
-       char            repl[3],
-                               nulls[3];
+                               value[Natts_pg_listener];
+       char            repl[Natts_pg_listener],
+                               nulls[Natts_pg_listener];
        bool            isnull;
+       char       *relname;
+       int32           listenerPID;
 
-#define MAX_DONE 64
+       if (!pendingNotifies)
+               return;                                 /* no NOTIFY statements in this transaction */
 
-       char       *done[MAX_DONE];
-       int                     ndone = 0;
-       int                     i;
+       /* NOTIFY is disabled if not normal processing mode.
+        * This test used to be in xact.c, but it seems cleaner to do it here.
+        */
+       if (! IsNormalProcessingMode())
+       {
+               ClearPendingNotifies();
+               return;
+       }
 
-       notifyFrontEndPending = 0;
+       TPRINTF(TRACE_NOTIFY, "AtCommit_Notify");
 
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
-       StartTransactionCommand();
-       ScanKeyEntryInitialize(&key[0], 0,
-                                                  Anum_pg_listener_notify,
-                                                  F_INT4EQ,
-                                                  Int32GetDatum(1));
-       ScanKeyEntryInitialize(&key[1], 0,
-                                                  Anum_pg_listener_pid,
-                                                  F_INT4EQ,
-                                                  Int32GetDatum(MyProcPid));
        lRel = heap_openr(ListenerRelationName);
        RelationSetLockForWrite(lRel);
        tdesc = RelationGetDescr(lRel);
-       sRel = heap_beginscan(lRel, 0, SnapshotNow, 2, key);
+       sRel = heap_beginscan(lRel, 0, SnapshotNow, 0, (ScanKey) NULL);
 
+       /* preset data to update notify column to MyProcPid */
        nulls[0] = nulls[1] = nulls[2] = ' ';
        repl[0] = repl[1] = repl[2] = ' ';
        repl[Anum_pg_listener_notify - 1] = 'r';
        value[0] = value[1] = value[2] = (Datum) 0;
-       value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
+       value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
 
        while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
        {
-               d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc,
-                                                &isnull);
+               d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+               relname = (char *) DatumGetPointer(d);
 
-               /*
-                * This hack deletes duplicate tuples which can be left in the
-                * table if the NotifyUnlock option is set. I'm further
-                * investigating this.  -- dz
-                */
-               if (NotifyHack)
+               if (AsyncExistsPendingNotify(relname))
                {
-                       for (i = 0; i < ndone; i++)
+                       d = heap_getattr(lTuple, Anum_pg_listener_pid, tdesc, &isnull);
+                       listenerPID = DatumGetInt32(d);
+
+                       if (listenerPID == MyProcPid)
+                       {
+                               /* Self-notify: no need to bother with table update.
+                                * Indeed, we *must not* clear the notification field in
+                                * this path, or we could lose an outside notify, which'd be
+                                * bad for applications that ignore self-notify messages.
+                                */
+                               TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying self");
+                               NotifyMyFrontEnd(relname, listenerPID);
+                       }
+                       else
                        {
-                               if (strcmp(DatumGetName(d)->data, done[i]) == 0)
+                               TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: notifying pid %d",
+                                               listenerPID);
+                               /*
+                                * If someone has already notified this listener,
+                                * we don't bother modifying the table, but we do still send
+                                * a SIGUSR2 signal, just in case that backend missed the
+                                * earlier signal for some reason.  It's OK to send the signal
+                                * first, because the other guy can't read pg_listener until
+                                * we unlock it.
+                                */
+#ifdef HAVE_KILL
+                               if (kill(listenerPID, SIGUSR2) < 0)
                                {
-                                       TPRINTF(TRACE_NOTIFY,
-                                                       "Async_NotifyFrontEnd: duplicate %s",
-                                                       DatumGetName(d)->data);
+                                       /* Get rid of pg_listener entry if it refers to a PID
+                                        * that no longer exists.  Presumably, that backend
+                                        * crashed without deleting its pg_listener entries.
+                                        * This code used to only delete the entry if errno==ESRCH,
+                                        * but as far as I can see we should just do it for any
+                                        * failure (certainly at least for EPERM too...)
+                                        */
                                        heap_delete(lRel, &lTuple->t_ctid);
-                                       continue;
                                }
+                               else
+#endif
+                               {
+                                       d = heap_getattr(lTuple, Anum_pg_listener_notify,
+                                                                        tdesc, &isnull);
+                                       if (DatumGetInt32(d) == 0)
+                                       {
+                                               rTuple = heap_modifytuple(lTuple, lRel,
+                                                                                                 value, nulls, repl);
+                                               heap_replace(lRel, &lTuple->t_ctid, rTuple);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       heap_endscan(sRel);
+       /*
+        * We do not do RelationUnsetLockForWrite(lRel) here, because the
+        * transaction is about to be committed anyway.
+        */
+       heap_close(lRel);
+
+       ClearPendingNotifies();
+
+       TPRINTF(TRACE_NOTIFY, "AtCommit_Notify: done");
+}
+
+/*
+ *--------------------------------------------------------------
+ * AtAbort_Notify --
+ *
+ *             This is called at transaction abort.
+ *
+ *             Gets rid of pending outbound notifies that we would have executed
+ *             if the transaction got committed.
+ *
+ * Results:
+ *             XXX
+ *
+ *--------------------------------------------------------------
+ */
+void
+AtAbort_Notify()
+{
+       ClearPendingNotifies();
+}
+
+/*
+ *--------------------------------------------------------------
+ * Async_NotifyHandler --
+ *
+ *             This is the signal handler for SIGUSR2.
+ *
+ *             If we are idle (notifyInterruptEnabled is set), we can safely invoke
+ *             ProcessIncomingNotify directly.  Otherwise, just set a flag
+ *             to do it later.
+ *
+ * Results:
+ *             none
+ *
+ * Side effects:
+ *             per above
+ *--------------------------------------------------------------
+ */
+
+void
+Async_NotifyHandler(SIGNAL_ARGS)
+{
+       /*
+        * Note: this is a SIGNAL HANDLER.  You must be very wary what you do here.
+        * Some helpful soul had this routine sprinkled with TPRINTFs, which would
+        * likely lead to corruption of stdio buffers if they were ever turned on.
+        */
+
+       if (notifyInterruptEnabled)
+       {
+               /* I'm not sure whether some flavors of Unix might allow another
+                * SIGUSR2 occurrence to recursively interrupt this routine.
+                * To cope with the possibility, we do the same sort of dance that
+                * EnableNotifyInterrupt must do --- see that routine for comments.
+                */
+               notifyInterruptEnabled = 0;             /* disable any recursive signal */
+               notifyInterruptOccurred = 1;    /* do at least one iteration */
+               for (;;)
+               {
+                       notifyInterruptEnabled = 1;
+                       if (! notifyInterruptOccurred)
+                               break;
+                       notifyInterruptEnabled = 0;
+                       if (notifyInterruptOccurred)
+                       {
+                               /* Here, it is finally safe to do stuff. */
+                               TPRINTF(TRACE_NOTIFY,
+                                               "Async_NotifyHandler: perform async notify");
+                               ProcessIncomingNotify();
+                               TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done");
                        }
-                       if (ndone < MAX_DONE)
-                               done[ndone++] = pstrdup(DatumGetName(d)->data);
                }
+       }
+       else
+       {
+               /* In this path it is NOT SAFE to do much of anything, except this: */
+               notifyInterruptOccurred = 1;
+       }
+}
+
+/*
+ * --------------------------------------------------------------
+ * EnableNotifyInterrupt --
+ *
+ *             This is called by the PostgresMain main loop just before waiting
+ *             for a frontend command.  If we are truly idle (ie, *not* inside
+ *             a transaction block), then process any pending inbound notifies,
+ *             and enable the signal handler to process future notifies directly.
+ *
+ *             NOTE: the signal handler starts out disabled, and stays so until
+ *             PostgresMain calls this the first time.
+ * --------------------------------------------------------------
+ */
+
+void
+EnableNotifyInterrupt(void)
+{
+       if (CurrentTransactionState->blockState != TRANS_DEFAULT)
+               return;                                 /* not really idle */
+
+       /*
+        * This code is tricky because we are communicating with a signal
+        * handler that could interrupt us at any point.  If we just checked
+        * notifyInterruptOccurred and then set notifyInterruptEnabled, we
+        * could fail to respond promptly to a signal that happens in between
+        * those two steps.  (A very small time window, perhaps, but Murphy's
+        * Law says you can hit it...)  Instead, we first set the enable flag,
+        * then test the occurred flag.  If we see an unserviced interrupt
+        * has occurred, we re-clear the enable flag before going off to do
+        * the service work.  (That prevents re-entrant invocation of
+        * ProcessIncomingNotify() if another interrupt occurs.)
+        * If an interrupt comes in between the setting and clearing of
+        * notifyInterruptEnabled, then it will have done the service
+        * work and left notifyInterruptOccurred zero, so we have to check
+        * again after clearing enable.  The whole thing has to be in a loop
+        * in case another interrupt occurs while we're servicing the first.
+        * Once we get out of the loop, enable is set and we know there is no
+        * unserviced interrupt.
+        *
+        * NB: an overenthusiastic optimizing compiler could easily break this
+        * code.  Hopefully, they all understand what "volatile" means these days.
+        */
+       for (;;)
+       {
+               notifyInterruptEnabled = 1;
+               if (! notifyInterruptOccurred)
+                       break;
+               notifyInterruptEnabled = 0;
+               if (notifyInterruptOccurred)
+               {
+                       TPRINTF(TRACE_NOTIFY,
+                                       "EnableNotifyInterrupt: perform async notify");
+                       ProcessIncomingNotify();
+                       TPRINTF(TRACE_NOTIFY, "EnableNotifyInterrupt: done");
+               }
+       }
+}
+
+/*
+ * --------------------------------------------------------------
+ * DisableNotifyInterrupt --
+ *
+ *             This is called by the PostgresMain main loop just after receiving
+ *             a frontend command.  Signal handler execution of inbound notifies
+ *             is disabled until the next EnableNotifyInterrupt call.
+ * --------------------------------------------------------------
+ */
+
+void
+DisableNotifyInterrupt(void)
+{
+       notifyInterruptEnabled = 0;
+}
+
+/*
+ * --------------------------------------------------------------
+ * ProcessIncomingNotify --
+ *
+ *             Deal with arriving NOTIFYs from other backends.
+ *             This is called either directly from the SIGUSR2 signal handler,
+ *             or the next time control reaches the outer idle loop.
+ *             Scan pg_listener for arriving notifies, report them to my front end,
+ *             and clear the notification field in pg_listener until next time.
+ *
+ *             NOTE: since we are outside any transaction, we must create our own.
+ *
+ * Results:
+ *             XXX
+ *
+ * --------------------------------------------------------------
+ */
+static void
+ProcessIncomingNotify(void)
+{
+       Relation        lRel;
+       TupleDesc       tdesc;
+       ScanKeyData key[1];
+       HeapScanDesc sRel;
+       HeapTuple       lTuple,
+                               rTuple;
+       Datum           d,
+                               value[Natts_pg_listener];
+       char            repl[Natts_pg_listener],
+                               nulls[Natts_pg_listener];
+       bool            isnull;
+       char       *relname;
+       int32           sourcePID;
+
+       TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify");
+       PS_SET_STATUS("async_notify");
+
+       notifyInterruptOccurred = 0;
+
+       StartTransactionCommand();
 
-               rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
-               heap_replace(lRel, &lTuple->t_ctid, rTuple);
+       lRel = heap_openr(ListenerRelationName);
+       RelationSetLockForWrite(lRel);
+       tdesc = RelationGetDescr(lRel);
+
+       /* Scan only entries with my listenerPID */
+       ScanKeyEntryInitialize(&key[0], 0,
+                                                  Anum_pg_listener_pid,
+                                                  F_INT4EQ,
+                                                  Int32GetDatum(MyProcPid));
+       sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key);
 
-               /* notifying the front end */
-               TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s",
-                               DatumGetName(d)->data);
+       /* Prepare data for rewriting 0 into notification field */
+       nulls[0] = nulls[1] = nulls[2] = ' ';
+       repl[0] = repl[1] = repl[2] = ' ';
+       repl[Anum_pg_listener_notify - 1] = 'r';
+       value[0] = value[1] = value[2] = (Datum) 0;
+       value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
 
-               if (whereToSendOutput == Remote)
+       while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0)))
+       {
+               d = heap_getattr(lTuple, Anum_pg_listener_notify, tdesc, &isnull);
+               sourcePID = DatumGetInt32(d);
+               if (sourcePID != 0)
                {
-                       pq_putnchar("A", 1);
-                       pq_putint((int32) MyProcPid, sizeof(int32));
-                       pq_putstr(DatumGetName(d)->data);
-                       pq_flush();
+                       d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull);
+                       relname = (char *) DatumGetPointer(d);
+                       /* Notify the frontend */
+                       TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: received %s from %d",
+                                       relname, (int) sourcePID);
+                       NotifyMyFrontEnd(relname, sourcePID);
+                       /* Rewrite the tuple with 0 in notification column */
+                       rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
+                       heap_replace(lRel, &lTuple->t_ctid, rTuple);
                }
        }
        heap_endscan(sRel);
-       RelationUnsetLockForWrite(lRel);
+       /*
+        * We do not do RelationUnsetLockForWrite(lRel) here, because the
+        * transaction is about to be committed anyway.
+        */
        heap_close(lRel);
 
-       TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done");
+       CommitTransactionCommand();
+
+       /* Must flush the notify messages to ensure frontend gets them promptly. */
+       pq_flush();
+
+       PS_SET_STATUS("idle");
+       TPRINTF(TRACE_NOTIFY, "ProcessIncomingNotify: done");
 }
 
+/* Send NOTIFY message to my front end. */
+
+static void
+NotifyMyFrontEnd(char *relname, int32 listenerPID)
+{
+       if (whereToSendOutput == Remote)
+       {
+               pq_putnchar("A", 1);
+               pq_putint(listenerPID, sizeof(int32));
+               pq_putstr(relname);
+               /* NOTE: we do not do pq_flush() here.  For a self-notify, it will
+                * happen at the end of the transaction, and for incoming notifies
+                * ProcessIncomingNotify will do it after finding all the notifies.
+                */
+       }
+       else
+       {
+               elog(NOTICE, "NOTIFY for %s", relname);
+       }
+}
+
+/* Does pendingNotifies include the given relname?
+ *
+ * NB: not called unless pendingNotifies != NULL.
+ */
+
 static int
 AsyncExistsPendingNotify(char *relname)
 {
@@ -747,11 +834,26 @@ AsyncExistsPendingNotify(char *relname)
        return 0;
 }
 
+/* Clear the pendingNotifies list. */
+
 static void
-ClearPendingNotify()
+ClearPendingNotifies()
 {
        Dlelem     *p;
 
-       while ((p = DLRemHead(pendingNotifies)) != NULL)
-               free(DLE_VAL(p));
+       if (pendingNotifies)
+       {
+               /* Since the referenced strings are malloc'd, we have to scan the
+                * list and delete them individually.  If we used palloc for the
+                * strings then we could just do DLFreeList to get rid of both
+                * the list nodes and the list base...
+                */
+               while ((p = DLRemHead(pendingNotifies)) != NULL)
+               {
+                       free(DLE_VAL(p));
+                       DLFreeElem(p);
+               }
+               DLFreeList(pendingNotifies);
+               pendingNotifies = NULL;
+       }
 }
index 566b15c..8394fcd 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.90 1998/10/02 01:14:14 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.91 1998/10/06 02:40:01 tgl Exp $
  *
  * NOTES
  *       this is the "main" module of the postgres backend and
@@ -1511,7 +1511,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
        if (!IsUnderPostmaster)
        {
                puts("\nPOSTGRES backend interactive interface ");
-               puts("$Revision: 1.90 $ $Date: 1998/10/02 01:14:14 $\n");
+               puts("$Revision: 1.91 $ $Date: 1998/10/06 02:40:01 $\n");
        }
 
        /* ----------------
@@ -1559,7 +1559,16 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
                ReadyForQuery(whereToSendOutput);
 
                /* ----------------
-                *       (2) read a command.
+                *       (2) deal with pending asynchronous NOTIFY from other backends,
+                *   and enable async.c's signal handler to execute NOTIFY directly.
+                * ----------------
+                */
+               QueryCancel = false;    /* forget any earlier CANCEL signal */
+
+               EnableNotifyInterrupt();
+
+               /* ----------------
+                *       (3) read a command.
                 * ----------------
                 */
                MemSet(parser_input, 0, MAX_PARSE_BUFFER);
@@ -1569,7 +1578,13 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
                QueryCancel = false;    /* forget any earlier CANCEL signal */
 
                /* ----------------
-                *       (3) process the command.
+                *       (4) disable async.c's signal handler.
+                * ----------------
+                */
+               DisableNotifyInterrupt();
+
+               /* ----------------
+                *       (5) process the command.
                 * ----------------
                 */
                switch (firstchar)
@@ -1640,7 +1655,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
                }
 
                /* ----------------
-                *       (4) commit the current transaction
+                *       (6) commit the current transaction
                 *
                 *       Note: if we had an empty input buffer, then we didn't
                 *       call pg_exec_query, so we don't bother to commit this transaction.
index fb3289d..8adbb07 100644 (file)
@@ -70,10 +70,6 @@ static char *opt_names[] = {
        "syslog",                                       /* use syslog for error messages */
        "hostlookup",                           /* enable hostname lookup in ps_status */
        "showportnumber",                       /* show port number in ps_status */
-       "notifyunlock",                         /* enable unlock of pg_listener after
-                                                                * notify */
-       "notifyhack"                            /* enable notify hack to remove duplicate
-                                                                * tuples */
 };
 
 /*
index a612910..d006264 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xact.h,v 1.15 1998/09/01 04:34:35 momjian Exp $
+ * $Id: xact.h,v 1.16 1998/10/06 02:40:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -107,6 +107,7 @@ extern void BeginTransactionBlock(void);
 extern void EndTransactionBlock(void);
 extern bool IsTransactionBlock(void);
 extern void UserAbortTransactionBlock(void);
+extern void AbortOutOfAnyTransaction(void);
 
 extern TransactionId DisabledTransactionId;
 
index 2c9d0a3..5494b0f 100644 (file)
@@ -1,27 +1,38 @@
 /*-------------------------------------------------------------------------
  *
  * async.h--
- *
- *
+ *       Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: async.h,v 1.9 1998/09/01 04:35:22 momjian Exp $
+ * $Id: async.h,v 1.10 1998/10/06 02:40:08 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef ASYNC_H
 #define ASYNC_H
 
-#include <nodes/memnodes.h>
+#include <postgres.h>
 
-extern void Async_NotifyHandler(SIGNAL_ARGS);
+/* notify-related SQL statements */
 extern void Async_Notify(char *relname);
-extern void Async_NotifyAtCommit(void);
-extern void Async_NotifyAtAbort(void);
 extern void Async_Listen(char *relname, int pid);
 extern void Async_Unlisten(char *relname, int pid);
 
-extern GlobalMemory notifyContext;
+/* perform (or cancel) outbound notify processing at transaction commit */
+extern void AtCommit_Notify(void);
+extern void AtAbort_Notify(void);
+
+/* signal handler for inbound notifies (SIGUSR2) */
+extern void Async_NotifyHandler(SIGNAL_ARGS);
+
+/*
+ * enable/disable processing of inbound notifies directly from signal handler.
+ * The enable routine first performs processing of any inbound notifies that
+ * have occurred since the last disable.  These are meant to be called ONLY
+ * from the appropriate places in PostgresMain().
+ */
+extern void EnableNotifyInterrupt(void);
+extern void DisableNotifyInterrupt(void);
 
 #endif  /* ASYNC_H */
index 8f71639..d978f16 100644 (file)
@@ -66,10 +66,6 @@ enum pg_option_enum
        OPT_SYSLOG,                                     /* use syslog for error messages */
        OPT_HOSTLOOKUP,                         /* enable hostname lookup in ps_status */
        OPT_SHOWPORTNUMBER,                     /* show port number in ps_status */
-       OPT_NOTIFYUNLOCK,                       /* enable unlock of pg_listener after
-                                                                * notify */
-       OPT_NOTIFYHACK,                         /* enable notify hack to remove duplicate
-                                                                * tuples */
 
        NUM_PG_OPTIONS                          /* must be the last item of enum */
 };