OSDN Git Service

Fix LISTEN/NOTIFY race condition reported by Gavin Sherry. While a
[pg-rex/syncrep.git] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.100 2003/09/15 23:33:39 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab AccessExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.
58  *
59  * An application that listens on the same relname it notifies will get
60  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
61  * by comparing be_pid in the NOTIFY message to the application's own backend's
62  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
63  * frontend during startup.)  The above design guarantees that notifies from
64  * other backends will never be missed by ignoring self-notifies.  Note,
65  * however, that we do *not* guarantee that a separate frontend message will
66  * be sent for every outside NOTIFY.  Since there is only room for one
67  * originating PID in pg_listener, outside notifies occurring at about the
68  * same time may be collapsed into a single message bearing the PID of the
69  * first outside backend to perform the NOTIFY.
70  *-------------------------------------------------------------------------
71  */
72
73 #include "postgres.h"
74
75 #include <unistd.h>
76 #include <signal.h>
77 #include <errno.h>
78 #include <netinet/in.h>
79
80 #include "access/heapam.h"
81 #include "catalog/catname.h"
82 #include "catalog/pg_listener.h"
83 #include "commands/async.h"
84 #include "libpq/libpq.h"
85 #include "libpq/pqformat.h"
86 #include "miscadmin.h"
87 #include "storage/ipc.h"
88 #include "tcop/tcopprot.h"
89 #include "utils/fmgroids.h"
90 #include "utils/ps_status.h"
91 #include "utils/syscache.h"
92
93
94 /* stuff that we really ought not be touching directly :-( */
95 extern TransactionState CurrentTransactionState;
96
97
98 /*
99  * State for outbound notifies consists of a list of all relnames NOTIFYed
100  * in the current transaction.  We do not actually perform a NOTIFY until
101  * and unless the transaction commits.  pendingNotifies is NIL if no
102  * NOTIFYs have been done in the current transaction.  The List nodes and
103  * referenced strings are all palloc'd in TopTransactionContext.
104  */
105 static List *pendingNotifies = NIL;
106
107 /*
108  * State for inbound notifies consists of two flags: one saying whether
109  * the signal handler is currently allowed to call ProcessIncomingNotify
110  * directly, and one saying whether the signal has occurred but the handler
111  * was not allowed to call ProcessIncomingNotify at the time.
112  *
113  * NB: the "volatile" on these declarations is critical!  If your compiler
114  * does not grok "volatile", you'd be best advised to compile this file
115  * with all optimization turned off.
116  */
117 static volatile int notifyInterruptEnabled = 0;
118 static volatile int notifyInterruptOccurred = 0;
119
120 /* True if we've registered an on_shmem_exit cleanup */
121 static bool unlistenExitRegistered = false;
122
123 bool            Trace_notify = false;
124
125
126 static void Async_UnlistenAll(void);
127 static void Async_UnlistenOnExit(void);
128 static void ProcessIncomingNotify(void);
129 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
130 static bool AsyncExistsPendingNotify(const char *relname);
131 static void ClearPendingNotifies(void);
132
133
134 /*
135  *--------------------------------------------------------------
136  * Async_Notify
137  *
138  *              This is executed by the SQL notify command.
139  *
140  *              Adds the relation to the list of pending notifies.
141  *              Actual notification happens during transaction commit.
142  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
143  *
144  * Results:
145  *              XXX
146  *
147  *--------------------------------------------------------------
148  */
149 void
150 Async_Notify(char *relname)
151 {
152         if (Trace_notify)
153                 elog(DEBUG1, "Async_Notify(%s)", relname);
154
155         /* no point in making duplicate entries in the list ... */
156         if (!AsyncExistsPendingNotify(relname))
157         {
158                 /*
159                  * The name list needs to live until end of transaction, so store
160                  * it in the top transaction context.
161                  */
162                 MemoryContext oldcontext;
163
164                 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
165
166                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
167
168                 MemoryContextSwitchTo(oldcontext);
169         }
170 }
171
172 /*
173  *--------------------------------------------------------------
174  * Async_Listen
175  *
176  *              This is executed by the SQL listen command.
177  *
178  *              Register a backend (identified by its Unix PID) as listening
179  *              on the specified relation.
180  *
181  * Results:
182  *              XXX
183  *
184  * Side effects:
185  *              pg_listener is updated.
186  *
187  *--------------------------------------------------------------
188  */
189 void
190 Async_Listen(char *relname, int pid)
191 {
192         Relation        lRel;
193         HeapScanDesc scan;
194         HeapTuple       tuple;
195         Datum           values[Natts_pg_listener];
196         char            nulls[Natts_pg_listener];
197         int                     i;
198         bool            alreadyListener = false;
199
200         if (Trace_notify)
201                 elog(DEBUG1, "Async_Listen(%s,%d)", relname, pid);
202
203         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
204
205         /* Detect whether we are already listening on this relname */
206         scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
207         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
208         {
209                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
210
211                 if (listener->listenerpid == pid &&
212                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
213                 {
214                         alreadyListener = true;
215                         /* No need to scan the rest of the table */
216                         break;
217                 }
218         }
219         heap_endscan(scan);
220
221         if (alreadyListener)
222         {
223                 heap_close(lRel, AccessExclusiveLock);
224                 return;
225         }
226
227         /*
228          * OK to insert a new tuple
229          */
230
231         for (i = 0; i < Natts_pg_listener; i++)
232         {
233                 nulls[i] = ' ';
234                 values[i] = PointerGetDatum(NULL);
235         }
236
237         i = 0;
238         values[i++] = (Datum) relname;
239         values[i++] = (Datum) pid;
240         values[i++] = (Datum) 0;        /* no notifies pending */
241
242         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
243         simple_heap_insert(lRel, tuple);
244
245 #ifdef NOT_USED                                 /* currently there are no indexes */
246         CatalogUpdateIndexes(lRel, tuple);
247 #endif
248
249         heap_freetuple(tuple);
250
251         heap_close(lRel, AccessExclusiveLock);
252
253         /*
254          * now that we are listening, make sure we will unlisten before dying.
255          */
256         if (!unlistenExitRegistered)
257         {
258                 on_shmem_exit(Async_UnlistenOnExit, 0);
259                 unlistenExitRegistered = true;
260         }
261 }
262
263 /*
264  *--------------------------------------------------------------
265  * Async_Unlisten
266  *
267  *              This is executed by the SQL unlisten command.
268  *
269  *              Remove the backend from the list of listening backends
270  *              for the specified relation.
271  *
272  * Results:
273  *              XXX
274  *
275  * Side effects:
276  *              pg_listener is updated.
277  *
278  *--------------------------------------------------------------
279  */
280 void
281 Async_Unlisten(char *relname, int pid)
282 {
283         Relation        lRel;
284         HeapScanDesc scan;
285         HeapTuple       tuple;
286
287         /* Handle specially the `unlisten "*"' command */
288         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
289         {
290                 Async_UnlistenAll();
291                 return;
292         }
293
294         if (Trace_notify)
295                 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, pid);
296
297         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
298
299         scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
300         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
301         {
302                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
303
304                 if (listener->listenerpid == pid &&
305                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
306                 {
307                         /* Found the matching tuple, delete it */
308                         simple_heap_delete(lRel, &tuple->t_self);
309
310                         /*
311                          * We assume there can be only one match, so no need to scan
312                          * the rest of the table
313                          */
314                         break;
315                 }
316         }
317         heap_endscan(scan);
318
319         heap_close(lRel, AccessExclusiveLock);
320
321         /*
322          * We do not complain about unlistening something not being listened;
323          * should we?
324          */
325 }
326
327 /*
328  *--------------------------------------------------------------
329  * Async_UnlistenAll
330  *
331  *              Unlisten all relations for this backend.
332  *
333  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
334  *
335  * Results:
336  *              XXX
337  *
338  * Side effects:
339  *              pg_listener is updated.
340  *
341  *--------------------------------------------------------------
342  */
343 static void
344 Async_UnlistenAll(void)
345 {
346         Relation        lRel;
347         TupleDesc       tdesc;
348         HeapScanDesc scan;
349         HeapTuple       lTuple;
350         ScanKeyData key[1];
351
352         if (Trace_notify)
353                 elog(DEBUG1, "Async_UnlistenAll");
354
355         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
356         tdesc = RelationGetDescr(lRel);
357
358         /* Find and delete all entries with my listenerPID */
359         ScanKeyEntryInitialize(&key[0], 0,
360                                                    Anum_pg_listener_pid,
361                                                    F_INT4EQ,
362                                                    Int32GetDatum(MyProcPid));
363         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
364
365         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
366                 simple_heap_delete(lRel, &lTuple->t_self);
367
368         heap_endscan(scan);
369         heap_close(lRel, AccessExclusiveLock);
370 }
371
372 /*
373  *--------------------------------------------------------------
374  * Async_UnlistenOnExit
375  *
376  *              Clean up the pg_listener table at backend exit.
377  *
378  *              This is executed if we have done any LISTENs in this backend.
379  *              It might not be necessary anymore, if the user UNLISTENed everything,
380  *              but we don't try to detect that case.
381  *
382  * Results:
383  *              XXX
384  *
385  * Side effects:
386  *              pg_listener is updated if necessary.
387  *
388  *--------------------------------------------------------------
389  */
390 static void
391 Async_UnlistenOnExit(void)
392 {
393         /*
394          * We need to start/commit a transaction for the unlisten, but if
395          * there is already an active transaction we had better abort that one
396          * first.  Otherwise we'd end up committing changes that probably
397          * ought to be discarded.
398          */
399         AbortOutOfAnyTransaction();
400         /* Now we can do the unlisten */
401         StartTransactionCommand();
402         Async_UnlistenAll();
403         CommitTransactionCommand();
404 }
405
406 /*
407  *--------------------------------------------------------------
408  * AtCommit_Notify
409  *
410  *              This is called at transaction commit.
411  *
412  *              If there are outbound notify requests in the pendingNotifies list,
413  *              scan pg_listener for matching tuples, and either signal the other
414  *              backend or send a message to our own frontend.
415  *
416  *              NOTE: we are still inside the current transaction, therefore can
417  *              piggyback on its committing of changes.
418  *
419  * Results:
420  *              XXX
421  *
422  * Side effects:
423  *              Tuples in pg_listener that have matching relnames and other peoples'
424  *              listenerPIDs are updated with a nonzero notification field.
425  *
426  *--------------------------------------------------------------
427  */
428 void
429 AtCommit_Notify(void)
430 {
431         Relation        lRel;
432         TupleDesc       tdesc;
433         HeapScanDesc scan;
434         HeapTuple       lTuple,
435                                 rTuple;
436         Datum           value[Natts_pg_listener];
437         char            repl[Natts_pg_listener],
438                                 nulls[Natts_pg_listener];
439
440         if (pendingNotifies == NIL)
441                 return;                                 /* no NOTIFY statements in this
442                                                                  * transaction */
443
444         /*
445          * NOTIFY is disabled if not normal processing mode. This test used to
446          * be in xact.c, but it seems cleaner to do it here.
447          */
448         if (!IsNormalProcessingMode())
449         {
450                 ClearPendingNotifies();
451                 return;
452         }
453
454         if (Trace_notify)
455                 elog(DEBUG1, "AtCommit_Notify");
456
457         /* preset data to update notify column to MyProcPid */
458         nulls[0] = nulls[1] = nulls[2] = ' ';
459         repl[0] = repl[1] = repl[2] = ' ';
460         repl[Anum_pg_listener_notify - 1] = 'r';
461         value[0] = value[1] = value[2] = (Datum) 0;
462         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
463
464         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
465         tdesc = RelationGetDescr(lRel);
466         scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
467
468         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
469         {
470                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
471                 char       *relname = NameStr(listener->relname);
472                 int32           listenerPID = listener->listenerpid;
473
474                 if (!AsyncExistsPendingNotify(relname))
475                         continue;
476
477                 if (listenerPID == MyProcPid)
478                 {
479                         /*
480                          * Self-notify: no need to bother with table update. Indeed,
481                          * we *must not* clear the notification field in this path, or
482                          * we could lose an outside notify, which'd be bad for
483                          * applications that ignore self-notify messages.
484                          */
485
486                         if (Trace_notify)
487                                 elog(DEBUG1, "AtCommit_Notify: notifying self");
488
489                         NotifyMyFrontEnd(relname, listenerPID);
490                 }
491                 else
492                 {
493                         if (Trace_notify)
494                                 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
495                                          listenerPID);
496
497                         /*
498                          * If someone has already notified this listener, we don't
499                          * bother modifying the table, but we do still send a SIGUSR2
500                          * signal, just in case that backend missed the earlier signal
501                          * for some reason.  It's OK to send the signal first, because
502                          * the other guy can't read pg_listener until we unlock it.
503                          */
504                         if (kill(listenerPID, SIGUSR2) < 0)
505                         {
506                                 /*
507                                  * Get rid of pg_listener entry if it refers to a PID that
508                                  * no longer exists.  Presumably, that backend crashed
509                                  * without deleting its pg_listener entries. This code
510                                  * used to only delete the entry if errno==ESRCH, but as
511                                  * far as I can see we should just do it for any failure
512                                  * (certainly at least for EPERM too...)
513                                  */
514                                 simple_heap_delete(lRel, &lTuple->t_self);
515                         }
516                         else if (listener->notification == 0)
517                         {
518                                 ItemPointerData ctid;
519                                 int                     result;
520
521                                 rTuple = heap_modifytuple(lTuple, lRel,
522                                                                                   value, nulls, repl);
523                                 /*
524                                  * We cannot use simple_heap_update here because the tuple
525                                  * could have been modified by an uncommitted transaction;
526                                  * specifically, since UNLISTEN releases exclusive lock on
527                                  * the table before commit, the other guy could already have
528                                  * tried to unlisten.  There are no other cases where we
529                                  * should be able to see an uncommitted update or delete.
530                                  * Therefore, our response to a HeapTupleBeingUpdated result
531                                  * is just to ignore it.  We do *not* wait for the other
532                                  * guy to commit --- that would risk deadlock, and we don't
533                                  * want to block while holding the table lock anyway for
534                                  * performance reasons.  We also ignore HeapTupleUpdated,
535                                  * which could occur if the other guy commits between our
536                                  * heap_getnext and heap_update calls.
537                                  */
538                                 result = heap_update(lRel, &lTuple->t_self, rTuple,
539                                                                          &ctid,
540                                                                          GetCurrentCommandId(),
541                                                                          false /* no wait for commit */);
542                                 switch (result)
543                                 {
544                                         case HeapTupleSelfUpdated:
545                                                 /* Tuple was already updated in current command? */
546                                                 elog(ERROR, "tuple already updated by self");
547                                                 break;
548
549                                         case HeapTupleMayBeUpdated:
550                                                 /* done successfully */
551
552 #ifdef NOT_USED                                 /* currently there are no indexes */
553                                                 CatalogUpdateIndexes(lRel, rTuple);
554 #endif
555                                                 break;
556
557                                         case HeapTupleBeingUpdated:
558                                                 /* ignore uncommitted tuples */
559                                                 break;
560
561                                         case HeapTupleUpdated:
562                                                 /* ignore just-committed tuples */
563                                                 break;
564
565                                         default:
566                                                 elog(ERROR, "unrecognized heap_update status: %u",
567                                                          result);
568                                                 break;
569                                 }
570                         }
571                 }
572         }
573
574         heap_endscan(scan);
575
576         /*
577          * We do NOT release the lock on pg_listener here; we need to hold it
578          * until end of transaction (which is about to happen, anyway) to
579          * ensure that notified backends see our tuple updates when they look.
580          * Else they might disregard the signal, which would make the
581          * application programmer very unhappy.
582          */
583         heap_close(lRel, NoLock);
584
585         ClearPendingNotifies();
586
587         if (Trace_notify)
588                 elog(DEBUG1, "AtCommit_Notify: done");
589 }
590
591 /*
592  *--------------------------------------------------------------
593  * AtAbort_Notify
594  *
595  *              This is called at transaction abort.
596  *
597  *              Gets rid of pending outbound notifies that we would have executed
598  *              if the transaction got committed.
599  *
600  * Results:
601  *              XXX
602  *
603  *--------------------------------------------------------------
604  */
605 void
606 AtAbort_Notify(void)
607 {
608         ClearPendingNotifies();
609 }
610
611 /*
612  *--------------------------------------------------------------
613  * Async_NotifyHandler
614  *
615  *              This is the signal handler for SIGUSR2.
616  *
617  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
618  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
619  *              to do it later.
620  *
621  * Results:
622  *              none
623  *
624  * Side effects:
625  *              per above
626  *--------------------------------------------------------------
627  */
628 void
629 Async_NotifyHandler(SIGNAL_ARGS)
630 {
631         int                     save_errno = errno;
632
633         /*
634          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
635          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
636          * which would likely lead to corruption of stdio buffers if they were
637          * ever turned on.
638          */
639
640         /* Don't joggle the elbow of proc_exit */
641         if (proc_exit_inprogress)
642                 return;
643
644         if (notifyInterruptEnabled)
645         {
646                 bool            save_ImmediateInterruptOK = ImmediateInterruptOK;
647
648                 /*
649                  * We may be called while ImmediateInterruptOK is true; turn it
650                  * off while messing with the NOTIFY state.  (We would have to
651                  * save and restore it anyway, because PGSemaphore operations
652                  * inside ProcessIncomingNotify() might reset it.)
653                  */
654                 ImmediateInterruptOK = false;
655
656                 /*
657                  * I'm not sure whether some flavors of Unix might allow another
658                  * SIGUSR2 occurrence to recursively interrupt this routine. To
659                  * cope with the possibility, we do the same sort of dance that
660                  * EnableNotifyInterrupt must do --- see that routine for
661                  * comments.
662                  */
663                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
664                 notifyInterruptOccurred = 1;    /* do at least one iteration */
665                 for (;;)
666                 {
667                         notifyInterruptEnabled = 1;
668                         if (!notifyInterruptOccurred)
669                                 break;
670                         notifyInterruptEnabled = 0;
671                         if (notifyInterruptOccurred)
672                         {
673                                 /* Here, it is finally safe to do stuff. */
674                                 if (Trace_notify)
675                                         elog(DEBUG1, "Async_NotifyHandler: perform async notify");
676
677                                 ProcessIncomingNotify();
678
679                                 if (Trace_notify)
680                                         elog(DEBUG1, "Async_NotifyHandler: done");
681                         }
682                 }
683
684                 /*
685                  * Restore ImmediateInterruptOK, and check for interrupts if
686                  * needed.
687                  */
688                 ImmediateInterruptOK = save_ImmediateInterruptOK;
689                 if (save_ImmediateInterruptOK)
690                         CHECK_FOR_INTERRUPTS();
691         }
692         else
693         {
694                 /*
695                  * In this path it is NOT SAFE to do much of anything, except
696                  * this:
697                  */
698                 notifyInterruptOccurred = 1;
699         }
700
701         errno = save_errno;
702 }
703
704 /*
705  * --------------------------------------------------------------
706  * EnableNotifyInterrupt
707  *
708  *              This is called by the PostgresMain main loop just before waiting
709  *              for a frontend command.  If we are truly idle (ie, *not* inside
710  *              a transaction block), then process any pending inbound notifies,
711  *              and enable the signal handler to process future notifies directly.
712  *
713  *              NOTE: the signal handler starts out disabled, and stays so until
714  *              PostgresMain calls this the first time.
715  * --------------------------------------------------------------
716  */
717 void
718 EnableNotifyInterrupt(void)
719 {
720         if (CurrentTransactionState->blockState != TRANS_DEFAULT)
721                 return;                                 /* not really idle */
722
723         /*
724          * This code is tricky because we are communicating with a signal
725          * handler that could interrupt us at any point.  If we just checked
726          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
727          * could fail to respond promptly to a signal that happens in between
728          * those two steps.  (A very small time window, perhaps, but Murphy's
729          * Law says you can hit it...)  Instead, we first set the enable flag,
730          * then test the occurred flag.  If we see an unserviced interrupt has
731          * occurred, we re-clear the enable flag before going off to do the
732          * service work.  (That prevents re-entrant invocation of
733          * ProcessIncomingNotify() if another interrupt occurs.) If an
734          * interrupt comes in between the setting and clearing of
735          * notifyInterruptEnabled, then it will have done the service work and
736          * left notifyInterruptOccurred zero, so we have to check again after
737          * clearing enable.  The whole thing has to be in a loop in case
738          * another interrupt occurs while we're servicing the first. Once we
739          * get out of the loop, enable is set and we know there is no
740          * unserviced interrupt.
741          *
742          * NB: an overenthusiastic optimizing compiler could easily break this
743          * code.  Hopefully, they all understand what "volatile" means these
744          * days.
745          */
746         for (;;)
747         {
748                 notifyInterruptEnabled = 1;
749                 if (!notifyInterruptOccurred)
750                         break;
751                 notifyInterruptEnabled = 0;
752                 if (notifyInterruptOccurred)
753                 {
754                         if (Trace_notify)
755                                 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
756
757                         ProcessIncomingNotify();
758
759                         if (Trace_notify)
760                                 elog(DEBUG1, "EnableNotifyInterrupt: done");
761                 }
762         }
763 }
764
765 /*
766  * --------------------------------------------------------------
767  * DisableNotifyInterrupt
768  *
769  *              This is called by the PostgresMain main loop just after receiving
770  *              a frontend command.  Signal handler execution of inbound notifies
771  *              is disabled until the next EnableNotifyInterrupt call.
772  * --------------------------------------------------------------
773  */
774 void
775 DisableNotifyInterrupt(void)
776 {
777         notifyInterruptEnabled = 0;
778 }
779
780 /*
781  * --------------------------------------------------------------
782  * ProcessIncomingNotify
783  *
784  *              Deal with arriving NOTIFYs from other backends.
785  *              This is called either directly from the SIGUSR2 signal handler,
786  *              or the next time control reaches the outer idle loop.
787  *              Scan pg_listener for arriving notifies, report them to my front end,
788  *              and clear the notification field in pg_listener until next time.
789  *
790  *              NOTE: since we are outside any transaction, we must create our own.
791  *
792  * Results:
793  *              XXX
794  *
795  * --------------------------------------------------------------
796  */
797 static void
798 ProcessIncomingNotify(void)
799 {
800         Relation        lRel;
801         TupleDesc       tdesc;
802         ScanKeyData key[1];
803         HeapScanDesc scan;
804         HeapTuple       lTuple,
805                                 rTuple;
806         Datum           value[Natts_pg_listener];
807         char            repl[Natts_pg_listener],
808                                 nulls[Natts_pg_listener];
809
810         if (Trace_notify)
811                 elog(DEBUG1, "ProcessIncomingNotify");
812
813         set_ps_display("async_notify");
814
815         notifyInterruptOccurred = 0;
816
817         StartTransactionCommand();
818
819         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
820         tdesc = RelationGetDescr(lRel);
821
822         /* Scan only entries with my listenerPID */
823         ScanKeyEntryInitialize(&key[0], 0,
824                                                    Anum_pg_listener_pid,
825                                                    F_INT4EQ,
826                                                    Int32GetDatum(MyProcPid));
827         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
828
829         /* Prepare data for rewriting 0 into notification field */
830         nulls[0] = nulls[1] = nulls[2] = ' ';
831         repl[0] = repl[1] = repl[2] = ' ';
832         repl[Anum_pg_listener_notify - 1] = 'r';
833         value[0] = value[1] = value[2] = (Datum) 0;
834         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
835
836         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
837         {
838                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
839                 char       *relname = NameStr(listener->relname);
840                 int32           sourcePID = listener->notification;
841
842                 if (sourcePID != 0)
843                 {
844                         /* Notify the frontend */
845
846                         if (Trace_notify)
847                                 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
848                                          relname, (int) sourcePID);
849
850                         NotifyMyFrontEnd(relname, sourcePID);
851                         /*
852                          * Rewrite the tuple with 0 in notification column.
853                          *
854                          * simple_heap_update is safe here because no one else would
855                          * have tried to UNLISTEN us, so there can be no uncommitted
856                          * changes.
857                          */
858                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
859                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
860
861 #ifdef NOT_USED                                 /* currently there are no indexes */
862                         CatalogUpdateIndexes(lRel, rTuple);
863 #endif
864                 }
865         }
866         heap_endscan(scan);
867
868         /*
869          * We do NOT release the lock on pg_listener here; we need to hold it
870          * until end of transaction (which is about to happen, anyway) to
871          * ensure that other backends see our tuple updates when they look.
872          * Otherwise, a transaction started after this one might mistakenly
873          * think it doesn't need to send this backend a new NOTIFY.
874          */
875         heap_close(lRel, NoLock);
876
877         CommitTransactionCommand();
878
879         /*
880          * Must flush the notify messages to ensure frontend gets them
881          * promptly.
882          */
883         pq_flush();
884
885         set_ps_display("idle");
886
887         if (Trace_notify)
888                 elog(DEBUG1, "ProcessIncomingNotify: done");
889 }
890
891 /*
892  * Send NOTIFY message to my front end.
893  */
894 static void
895 NotifyMyFrontEnd(char *relname, int32 listenerPID)
896 {
897         if (whereToSendOutput == Remote)
898         {
899                 StringInfoData buf;
900
901                 pq_beginmessage(&buf, 'A');
902                 pq_sendint(&buf, listenerPID, sizeof(int32));
903                 pq_sendstring(&buf, relname);
904                 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
905                 {
906                         /* XXX Add parameter string here later */
907                         pq_sendstring(&buf, "");
908                 }
909                 pq_endmessage(&buf);
910
911                 /*
912                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
913                  * happen at the end of the transaction, and for incoming notifies
914                  * ProcessIncomingNotify will do it after finding all the
915                  * notifies.
916                  */
917         }
918         else
919                 elog(INFO, "NOTIFY for %s", relname);
920 }
921
922 /* Does pendingNotifies include the given relname? */
923 static bool
924 AsyncExistsPendingNotify(const char *relname)
925 {
926         List       *p;
927
928         foreach(p, pendingNotifies)
929         {
930                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
931                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
932                         return true;
933         }
934
935         return false;
936 }
937
938 /* Clear the pendingNotifies list. */
939 static void
940 ClearPendingNotifies(void)
941 {
942         /*
943          * We used to have to explicitly deallocate the list members and
944          * nodes, because they were malloc'd.  Now, since we know they are
945          * palloc'd in TopTransactionContext, we need not do that --- they'll
946          * go away automatically at transaction exit.  We need only reset the
947          * list head pointer.
948          */
949         pendingNotifies = NIL;
950 }