OSDN Git Service

1945c751e18aa0f25c5f8d10ea3701be9d68c166
[pg-rex/syncrep.git] / src / backend / commands / async.c
1 /*-------------------------------------------------------------------------
2  *
3  * async.c
4  *        Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.90 2002/09/02 02:47:01 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 /*-------------------------------------------------------------------------
16  * New Async Notification Model:
17  * 1. Multiple backends on same machine.  Multiple backends listening on
18  *        one relation.  (Note: "listening on a relation" is not really the
19  *        right way to think about it, since the notify names need not have
20  *        anything to do with the names of relations actually in the database.
21  *        But this terminology is all over the code and docs, and I don't feel
22  *        like trying to replace it.)
23  *
24  * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25  *        ie, each relname/listenerPID pair.  The "notification" field of the
26  *        tuple is zero when no NOTIFY is pending for that listener, or the PID
27  *        of the originating backend when a cross-backend NOTIFY is pending.
28  *        (We skip writing to pg_listener when doing a self-NOTIFY, so the
29  *        notification field should never be equal to the listenerPID field.)
30  *
31  * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32  *        relname to a list of outstanding NOTIFY requests.  Actual processing
33  *        happens if and only if we reach transaction commit.  At that time (in
34  *        routine AtCommit_Notify) we scan pg_listener for matching relnames.
35  *        If the listenerPID in a matching tuple is ours, we just send a notify
36  *        message to our own front end.  If it is not ours, and "notification"
37  *        is not already nonzero, we set notification to our own PID and send a
38  *        SIGUSR2 signal to the receiving process (indicated by listenerPID).
39  *        BTW: if the signal operation fails, we presume that the listener backend
40  *        crashed without removing this tuple, and remove the tuple for it.
41  *
42  * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43  *        notify processing immediately if this backend is idle (ie, it is
44  *        waiting for a frontend command and is not within a transaction block).
45  *        Otherwise the handler may only set a flag, which will cause the
46  *        processing to occur just before we next go idle.
47  *
48  * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49  *        matching our own listenerPID and having nonzero notification fields.
50  *        For each such tuple, we send a message to our frontend and clear the
51  *        notification field.  BTW: this routine has to start/commit its own
52  *        transaction, since by assumption it is only called from outside any
53  *        transaction.
54  *
55  * Although we grab AccessExclusiveLock on pg_listener for any operation,
56  * the lock is never held very long, so it shouldn't cause too much of
57  * a performance problem.
58  *
59  * An application that listens on the same relname it notifies will get
60  * NOTIFY messages for its own NOTIFYs.  These can be ignored, if not useful,
61  * by comparing be_pid in the NOTIFY message to the application's own backend's
62  * PID.  (As of FE/BE protocol 2.0, the backend's PID is provided to the
63  * frontend during startup.)  The above design guarantees that notifies from
64  * other backends will never be missed by ignoring self-notifies.  Note,
65  * however, that we do *not* guarantee that a separate frontend message will
66  * be sent for every outside NOTIFY.  Since there is only room for one
67  * originating PID in pg_listener, outside notifies occurring at about the
68  * same time may be collapsed into a single message bearing the PID of the
69  * first outside backend to perform the NOTIFY.
70  *-------------------------------------------------------------------------
71  */
72
73 #include "postgres.h"
74
75 #include <unistd.h>
76 #include <signal.h>
77 #include <errno.h>
78 #include <netinet/in.h>
79
80 #include "access/heapam.h"
81 #include "catalog/catname.h"
82 #include "catalog/pg_listener.h"
83 #include "commands/async.h"
84 #include "libpq/libpq.h"
85 #include "libpq/pqformat.h"
86 #include "miscadmin.h"
87 #include "storage/ipc.h"
88 #include "tcop/tcopprot.h"
89 #include "utils/fmgroids.h"
90 #include "utils/ps_status.h"
91 #include "utils/syscache.h"
92
93
94 /* stuff that we really ought not be touching directly :-( */
95 extern TransactionState CurrentTransactionState;
96
97
98 /*
99  * State for outbound notifies consists of a list of all relnames NOTIFYed
100  * in the current transaction.  We do not actually perform a NOTIFY until
101  * and unless the transaction commits.  pendingNotifies is NIL if no
102  * NOTIFYs have been done in the current transaction.  The List nodes and
103  * referenced strings are all palloc'd in TopTransactionContext.
104  */
105 static List *pendingNotifies = NIL;
106
107 /*
108  * State for inbound notifies consists of two flags: one saying whether
109  * the signal handler is currently allowed to call ProcessIncomingNotify
110  * directly, and one saying whether the signal has occurred but the handler
111  * was not allowed to call ProcessIncomingNotify at the time.
112  *
113  * NB: the "volatile" on these declarations is critical!  If your compiler
114  * does not grok "volatile", you'd be best advised to compile this file
115  * with all optimization turned off.
116  */
117 static volatile int notifyInterruptEnabled = 0;
118 static volatile int notifyInterruptOccurred = 0;
119
120 /* True if we've registered an on_shmem_exit cleanup */
121 static bool unlistenExitRegistered = false;
122
123 bool            Trace_notify = false;
124
125
126 static void Async_UnlistenAll(void);
127 static void Async_UnlistenOnExit(void);
128 static void ProcessIncomingNotify(void);
129 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
130 static bool AsyncExistsPendingNotify(const char *relname);
131 static void ClearPendingNotifies(void);
132
133
134 /*
135  *--------------------------------------------------------------
136  * Async_Notify
137  *
138  *              This is executed by the SQL notify command.
139  *
140  *              Adds the relation to the list of pending notifies.
141  *              Actual notification happens during transaction commit.
142  *              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
143  *
144  * Results:
145  *              XXX
146  *
147  *--------------------------------------------------------------
148  */
149 void
150 Async_Notify(char *relname)
151 {
152         if (Trace_notify)
153                 elog(LOG, "Async_Notify: %s", relname);
154
155         /* no point in making duplicate entries in the list ... */
156         if (!AsyncExistsPendingNotify(relname))
157         {
158                 /*
159                  * The name list needs to live until end of transaction, so store
160                  * it in the top transaction context.
161                  */
162                 MemoryContext oldcontext;
163
164                 oldcontext = MemoryContextSwitchTo(TopTransactionContext);
165
166                 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
167
168                 MemoryContextSwitchTo(oldcontext);
169         }
170 }
171
172 /*
173  *--------------------------------------------------------------
174  * Async_Listen
175  *
176  *              This is executed by the SQL listen command.
177  *
178  *              Register a backend (identified by its Unix PID) as listening
179  *              on the specified relation.
180  *
181  * Results:
182  *              XXX
183  *
184  * Side effects:
185  *              pg_listener is updated.
186  *
187  *--------------------------------------------------------------
188  */
189 void
190 Async_Listen(char *relname, int pid)
191 {
192         Relation        lRel;
193         HeapScanDesc scan;
194         HeapTuple       tuple;
195         Datum           values[Natts_pg_listener];
196         char            nulls[Natts_pg_listener];
197         int                     i;
198         bool            alreadyListener = false;
199
200         if (Trace_notify)
201                 elog(LOG, "Async_Listen: %s", relname);
202
203         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
204
205         /* Detect whether we are already listening on this relname */
206         scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
207         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
208         {
209                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
210
211                 if (listener->listenerpid == pid &&
212                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
213                 {
214                         alreadyListener = true;
215                         /* No need to scan the rest of the table */
216                         break;
217                 }
218         }
219         heap_endscan(scan);
220
221         if (alreadyListener)
222         {
223                 heap_close(lRel, AccessExclusiveLock);
224                 elog(WARNING, "Async_Listen: We are already listening on %s", relname);
225                 return;
226         }
227
228         /*
229          * OK to insert a new tuple
230          */
231
232         for (i = 0; i < Natts_pg_listener; i++)
233         {
234                 nulls[i] = ' ';
235                 values[i] = PointerGetDatum(NULL);
236         }
237
238         i = 0;
239         values[i++] = (Datum) relname;
240         values[i++] = (Datum) pid;
241         values[i++] = (Datum) 0;        /* no notifies pending */
242
243         tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
244         simple_heap_insert(lRel, tuple);
245
246 #ifdef NOT_USED                                 /* currently there are no indexes */
247         CatalogUpdateIndexes(lRel, tuple);
248 #endif
249
250         heap_freetuple(tuple);
251
252         heap_close(lRel, AccessExclusiveLock);
253
254         /*
255          * now that we are listening, make sure we will unlisten before dying.
256          */
257         if (!unlistenExitRegistered)
258         {
259                 on_shmem_exit(Async_UnlistenOnExit, 0);
260                 unlistenExitRegistered = true;
261         }
262 }
263
264 /*
265  *--------------------------------------------------------------
266  * Async_Unlisten
267  *
268  *              This is executed by the SQL unlisten command.
269  *
270  *              Remove the backend from the list of listening backends
271  *              for the specified relation.
272  *
273  * Results:
274  *              XXX
275  *
276  * Side effects:
277  *              pg_listener is updated.
278  *
279  *--------------------------------------------------------------
280  */
281 void
282 Async_Unlisten(char *relname, int pid)
283 {
284         Relation        lRel;
285         HeapScanDesc scan;
286         HeapTuple       tuple;
287
288         /* Handle specially the `unlisten "*"' command */
289         if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
290         {
291                 Async_UnlistenAll();
292                 return;
293         }
294
295         if (Trace_notify)
296                 elog(LOG, "Async_Unlisten %s", relname);
297
298         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
299
300         scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
301         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
302         {
303                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
304
305                 if (listener->listenerpid == pid &&
306                   strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
307                 {
308                         /* Found the matching tuple, delete it */
309                         simple_heap_delete(lRel, &tuple->t_self);
310
311                         /*
312                          * We assume there can be only one match, so no need to scan
313                          * the rest of the table
314                          */
315                         break;
316                 }
317         }
318         heap_endscan(scan);
319
320         heap_close(lRel, AccessExclusiveLock);
321
322         /*
323          * We do not complain about unlistening something not being listened;
324          * should we?
325          */
326 }
327
328 /*
329  *--------------------------------------------------------------
330  * Async_UnlistenAll
331  *
332  *              Unlisten all relations for this backend.
333  *
334  *              This is invoked by UNLISTEN "*" command, and also at backend exit.
335  *
336  * Results:
337  *              XXX
338  *
339  * Side effects:
340  *              pg_listener is updated.
341  *
342  *--------------------------------------------------------------
343  */
344 static void
345 Async_UnlistenAll(void)
346 {
347         Relation        lRel;
348         TupleDesc       tdesc;
349         HeapScanDesc scan;
350         HeapTuple       lTuple;
351         ScanKeyData key[1];
352
353         if (Trace_notify)
354                 elog(LOG, "Async_UnlistenAll");
355
356         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
357         tdesc = RelationGetDescr(lRel);
358
359         /* Find and delete all entries with my listenerPID */
360         ScanKeyEntryInitialize(&key[0], 0,
361                                                    Anum_pg_listener_pid,
362                                                    F_INT4EQ,
363                                                    Int32GetDatum(MyProcPid));
364         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
365
366         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
367                 simple_heap_delete(lRel, &lTuple->t_self);
368
369         heap_endscan(scan);
370         heap_close(lRel, AccessExclusiveLock);
371 }
372
373 /*
374  *--------------------------------------------------------------
375  * Async_UnlistenOnExit
376  *
377  *              Clean up the pg_listener table at backend exit.
378  *
379  *              This is executed if we have done any LISTENs in this backend.
380  *              It might not be necessary anymore, if the user UNLISTENed everything,
381  *              but we don't try to detect that case.
382  *
383  * Results:
384  *              XXX
385  *
386  * Side effects:
387  *              pg_listener is updated if necessary.
388  *
389  *--------------------------------------------------------------
390  */
391 static void
392 Async_UnlistenOnExit(void)
393 {
394         /*
395          * We need to start/commit a transaction for the unlisten, but if
396          * there is already an active transaction we had better abort that one
397          * first.  Otherwise we'd end up committing changes that probably
398          * ought to be discarded.
399          */
400         AbortOutOfAnyTransaction();
401         /* Now we can do the unlisten */
402         StartTransactionCommand(true);
403         Async_UnlistenAll();
404         CommitTransactionCommand(true);
405 }
406
407 /*
408  *--------------------------------------------------------------
409  * AtCommit_Notify
410  *
411  *              This is called at transaction commit.
412  *
413  *              If there are outbound notify requests in the pendingNotifies list,
414  *              scan pg_listener for matching tuples, and either signal the other
415  *              backend or send a message to our own frontend.
416  *
417  *              NOTE: we are still inside the current transaction, therefore can
418  *              piggyback on its committing of changes.
419  *
420  * Results:
421  *              XXX
422  *
423  * Side effects:
424  *              Tuples in pg_listener that have matching relnames and other peoples'
425  *              listenerPIDs are updated with a nonzero notification field.
426  *
427  *--------------------------------------------------------------
428  */
429 void
430 AtCommit_Notify(void)
431 {
432         Relation        lRel;
433         TupleDesc       tdesc;
434         HeapScanDesc scan;
435         HeapTuple       lTuple,
436                                 rTuple;
437         Datum           value[Natts_pg_listener];
438         char            repl[Natts_pg_listener],
439                                 nulls[Natts_pg_listener];
440
441         if (pendingNotifies == NIL)
442                 return;                                 /* no NOTIFY statements in this
443                                                                  * transaction */
444
445         /*
446          * NOTIFY is disabled if not normal processing mode. This test used to
447          * be in xact.c, but it seems cleaner to do it here.
448          */
449         if (!IsNormalProcessingMode())
450         {
451                 ClearPendingNotifies();
452                 return;
453         }
454
455         if (Trace_notify)
456                 elog(LOG, "AtCommit_Notify");
457
458         /* preset data to update notify column to MyProcPid */
459         nulls[0] = nulls[1] = nulls[2] = ' ';
460         repl[0] = repl[1] = repl[2] = ' ';
461         repl[Anum_pg_listener_notify - 1] = 'r';
462         value[0] = value[1] = value[2] = (Datum) 0;
463         value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
464
465         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
466         tdesc = RelationGetDescr(lRel);
467         scan = heap_beginscan(lRel, SnapshotNow, 0, (ScanKey) NULL);
468
469         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
470         {
471                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
472                 char       *relname = NameStr(listener->relname);
473                 int32           listenerPID = listener->listenerpid;
474
475                 if (!AsyncExistsPendingNotify(relname))
476                         continue;
477
478                 if (listenerPID == MyProcPid)
479                 {
480                         /*
481                          * Self-notify: no need to bother with table update. Indeed,
482                          * we *must not* clear the notification field in this path, or
483                          * we could lose an outside notify, which'd be bad for
484                          * applications that ignore self-notify messages.
485                          */
486
487                         if (Trace_notify)
488                                 elog(LOG, "AtCommit_Notify: notifying self");
489
490                         NotifyMyFrontEnd(relname, listenerPID);
491                 }
492                 else
493                 {
494                         if (Trace_notify)
495                                 elog(LOG, "AtCommit_Notify: notifying pid %d",
496                                          listenerPID);
497
498                         /*
499                          * If someone has already notified this listener, we don't
500                          * bother modifying the table, but we do still send a SIGUSR2
501                          * signal, just in case that backend missed the earlier signal
502                          * for some reason.  It's OK to send the signal first, because
503                          * the other guy can't read pg_listener until we unlock it.
504                          */
505                         if (kill(listenerPID, SIGUSR2) < 0)
506                         {
507                                 /*
508                                  * Get rid of pg_listener entry if it refers to a PID that
509                                  * no longer exists.  Presumably, that backend crashed
510                                  * without deleting its pg_listener entries. This code
511                                  * used to only delete the entry if errno==ESRCH, but as
512                                  * far as I can see we should just do it for any failure
513                                  * (certainly at least for EPERM too...)
514                                  */
515                                 simple_heap_delete(lRel, &lTuple->t_self);
516                         }
517                         else if (listener->notification == 0)
518                         {
519                                 rTuple = heap_modifytuple(lTuple, lRel,
520                                                                                   value, nulls, repl);
521                                 simple_heap_update(lRel, &lTuple->t_self, rTuple);
522
523 #ifdef NOT_USED                                 /* currently there are no indexes */
524                                 CatalogUpdateIndexes(lRel, rTuple);
525 #endif
526                         }
527                 }
528         }
529
530         heap_endscan(scan);
531
532         /*
533          * We do NOT release the lock on pg_listener here; we need to hold it
534          * until end of transaction (which is about to happen, anyway) to
535          * ensure that notified backends see our tuple updates when they look.
536          * Else they might disregard the signal, which would make the
537          * application programmer very unhappy.
538          */
539         heap_close(lRel, NoLock);
540
541         ClearPendingNotifies();
542
543         if (Trace_notify)
544                 elog(LOG, "AtCommit_Notify: done");
545 }
546
547 /*
548  *--------------------------------------------------------------
549  * AtAbort_Notify
550  *
551  *              This is called at transaction abort.
552  *
553  *              Gets rid of pending outbound notifies that we would have executed
554  *              if the transaction got committed.
555  *
556  * Results:
557  *              XXX
558  *
559  *--------------------------------------------------------------
560  */
561 void
562 AtAbort_Notify(void)
563 {
564         ClearPendingNotifies();
565 }
566
567 /*
568  *--------------------------------------------------------------
569  * Async_NotifyHandler
570  *
571  *              This is the signal handler for SIGUSR2.
572  *
573  *              If we are idle (notifyInterruptEnabled is set), we can safely invoke
574  *              ProcessIncomingNotify directly.  Otherwise, just set a flag
575  *              to do it later.
576  *
577  * Results:
578  *              none
579  *
580  * Side effects:
581  *              per above
582  *--------------------------------------------------------------
583  */
584 void
585 Async_NotifyHandler(SIGNAL_ARGS)
586 {
587         int                     save_errno = errno;
588
589         /*
590          * Note: this is a SIGNAL HANDLER.      You must be very wary what you do
591          * here. Some helpful soul had this routine sprinkled with TPRINTFs,
592          * which would likely lead to corruption of stdio buffers if they were
593          * ever turned on.
594          */
595
596         if (notifyInterruptEnabled)
597         {
598                 /*
599                  * I'm not sure whether some flavors of Unix might allow another
600                  * SIGUSR2 occurrence to recursively interrupt this routine. To
601                  * cope with the possibility, we do the same sort of dance that
602                  * EnableNotifyInterrupt must do --- see that routine for
603                  * comments.
604                  */
605                 notifyInterruptEnabled = 0;             /* disable any recursive signal */
606                 notifyInterruptOccurred = 1;    /* do at least one iteration */
607                 for (;;)
608                 {
609                         notifyInterruptEnabled = 1;
610                         if (!notifyInterruptOccurred)
611                                 break;
612                         notifyInterruptEnabled = 0;
613                         if (notifyInterruptOccurred)
614                         {
615                                 /* Here, it is finally safe to do stuff. */
616                                 if (Trace_notify)
617                                         elog(LOG, "Async_NotifyHandler: perform async notify");
618
619                                 ProcessIncomingNotify();
620
621                                 if (Trace_notify)
622                                         elog(LOG, "Async_NotifyHandler: done");
623                         }
624                 }
625         }
626         else
627         {
628                 /*
629                  * In this path it is NOT SAFE to do much of anything, except
630                  * this:
631                  */
632                 notifyInterruptOccurred = 1;
633         }
634
635         errno = save_errno;
636 }
637
638 /*
639  * --------------------------------------------------------------
640  * EnableNotifyInterrupt
641  *
642  *              This is called by the PostgresMain main loop just before waiting
643  *              for a frontend command.  If we are truly idle (ie, *not* inside
644  *              a transaction block), then process any pending inbound notifies,
645  *              and enable the signal handler to process future notifies directly.
646  *
647  *              NOTE: the signal handler starts out disabled, and stays so until
648  *              PostgresMain calls this the first time.
649  * --------------------------------------------------------------
650  */
651 void
652 EnableNotifyInterrupt(void)
653 {
654         if (CurrentTransactionState->blockState != TRANS_DEFAULT)
655                 return;                                 /* not really idle */
656
657         /*
658          * This code is tricky because we are communicating with a signal
659          * handler that could interrupt us at any point.  If we just checked
660          * notifyInterruptOccurred and then set notifyInterruptEnabled, we
661          * could fail to respond promptly to a signal that happens in between
662          * those two steps.  (A very small time window, perhaps, but Murphy's
663          * Law says you can hit it...)  Instead, we first set the enable flag,
664          * then test the occurred flag.  If we see an unserviced interrupt has
665          * occurred, we re-clear the enable flag before going off to do the
666          * service work.  (That prevents re-entrant invocation of
667          * ProcessIncomingNotify() if another interrupt occurs.) If an
668          * interrupt comes in between the setting and clearing of
669          * notifyInterruptEnabled, then it will have done the service work and
670          * left notifyInterruptOccurred zero, so we have to check again after
671          * clearing enable.  The whole thing has to be in a loop in case
672          * another interrupt occurs while we're servicing the first. Once we
673          * get out of the loop, enable is set and we know there is no
674          * unserviced interrupt.
675          *
676          * NB: an overenthusiastic optimizing compiler could easily break this
677          * code.  Hopefully, they all understand what "volatile" means these
678          * days.
679          */
680         for (;;)
681         {
682                 notifyInterruptEnabled = 1;
683                 if (!notifyInterruptOccurred)
684                         break;
685                 notifyInterruptEnabled = 0;
686                 if (notifyInterruptOccurred)
687                 {
688                         if (Trace_notify)
689                                 elog(LOG, "EnableNotifyInterrupt: perform async notify");
690
691                         ProcessIncomingNotify();
692
693                         if (Trace_notify)
694                                 elog(LOG, "EnableNotifyInterrupt: done");
695                 }
696         }
697 }
698
699 /*
700  * --------------------------------------------------------------
701  * DisableNotifyInterrupt
702  *
703  *              This is called by the PostgresMain main loop just after receiving
704  *              a frontend command.  Signal handler execution of inbound notifies
705  *              is disabled until the next EnableNotifyInterrupt call.
706  * --------------------------------------------------------------
707  */
708 void
709 DisableNotifyInterrupt(void)
710 {
711         notifyInterruptEnabled = 0;
712 }
713
714 /*
715  * --------------------------------------------------------------
716  * ProcessIncomingNotify
717  *
718  *              Deal with arriving NOTIFYs from other backends.
719  *              This is called either directly from the SIGUSR2 signal handler,
720  *              or the next time control reaches the outer idle loop.
721  *              Scan pg_listener for arriving notifies, report them to my front end,
722  *              and clear the notification field in pg_listener until next time.
723  *
724  *              NOTE: since we are outside any transaction, we must create our own.
725  *
726  * Results:
727  *              XXX
728  *
729  * --------------------------------------------------------------
730  */
731 static void
732 ProcessIncomingNotify(void)
733 {
734         Relation        lRel;
735         TupleDesc       tdesc;
736         ScanKeyData key[1];
737         HeapScanDesc scan;
738         HeapTuple       lTuple,
739                                 rTuple;
740         Datum           value[Natts_pg_listener];
741         char            repl[Natts_pg_listener],
742                                 nulls[Natts_pg_listener];
743
744         if (Trace_notify)
745                 elog(LOG, "ProcessIncomingNotify");
746
747         set_ps_display("async_notify");
748
749         notifyInterruptOccurred = 0;
750
751         StartTransactionCommand(true);
752
753         lRel = heap_openr(ListenerRelationName, AccessExclusiveLock);
754         tdesc = RelationGetDescr(lRel);
755
756         /* Scan only entries with my listenerPID */
757         ScanKeyEntryInitialize(&key[0], 0,
758                                                    Anum_pg_listener_pid,
759                                                    F_INT4EQ,
760                                                    Int32GetDatum(MyProcPid));
761         scan = heap_beginscan(lRel, SnapshotNow, 1, key);
762
763         /* Prepare data for rewriting 0 into notification field */
764         nulls[0] = nulls[1] = nulls[2] = ' ';
765         repl[0] = repl[1] = repl[2] = ' ';
766         repl[Anum_pg_listener_notify - 1] = 'r';
767         value[0] = value[1] = value[2] = (Datum) 0;
768         value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
769
770         while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
771         {
772                 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
773                 char       *relname = NameStr(listener->relname);
774                 int32           sourcePID = listener->notification;
775
776                 if (sourcePID != 0)
777                 {
778                         /* Notify the frontend */
779
780                         if (Trace_notify)
781                                 elog(LOG, "ProcessIncomingNotify: received %s from %d",
782                                          relname, (int) sourcePID);
783
784                         NotifyMyFrontEnd(relname, sourcePID);
785                         /* Rewrite the tuple with 0 in notification column */
786                         rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl);
787                         simple_heap_update(lRel, &lTuple->t_self, rTuple);
788
789 #ifdef NOT_USED                                 /* currently there are no indexes */
790                         CatalogUpdateIndexes(lRel, rTuple);
791 #endif
792                 }
793         }
794         heap_endscan(scan);
795
796         /*
797          * We do NOT release the lock on pg_listener here; we need to hold it
798          * until end of transaction (which is about to happen, anyway) to
799          * ensure that other backends see our tuple updates when they look.
800          * Otherwise, a transaction started after this one might mistakenly
801          * think it doesn't need to send this backend a new NOTIFY.
802          */
803         heap_close(lRel, NoLock);
804
805         CommitTransactionCommand(true);
806
807         /*
808          * Must flush the notify messages to ensure frontend gets them
809          * promptly.
810          */
811         pq_flush();
812
813         set_ps_display("idle");
814
815         if (Trace_notify)
816                 elog(LOG, "ProcessIncomingNotify: done");
817 }
818
819 /*
820  * Send NOTIFY message to my front end.
821  */
822 static void
823 NotifyMyFrontEnd(char *relname, int32 listenerPID)
824 {
825         if (whereToSendOutput == Remote)
826         {
827                 StringInfoData buf;
828
829                 pq_beginmessage(&buf);
830                 pq_sendbyte(&buf, 'A');
831                 pq_sendint(&buf, listenerPID, sizeof(int32));
832                 pq_sendstring(&buf, relname);
833                 pq_endmessage(&buf);
834
835                 /*
836                  * NOTE: we do not do pq_flush() here.  For a self-notify, it will
837                  * happen at the end of the transaction, and for incoming notifies
838                  * ProcessIncomingNotify will do it after finding all the
839                  * notifies.
840                  */
841         }
842         else
843                 elog(INFO, "NOTIFY for %s", relname);
844 }
845
846 /* Does pendingNotifies include the given relname? */
847 static bool
848 AsyncExistsPendingNotify(const char *relname)
849 {
850         List       *p;
851
852         foreach(p, pendingNotifies)
853         {
854                 /* Use NAMEDATALEN for relname comparison.        DZ - 26-08-1996 */
855                 if (strncmp((const char *) lfirst(p), relname, NAMEDATALEN) == 0)
856                         return true;
857         }
858
859         return false;
860 }
861
862 /* Clear the pendingNotifies list. */
863 static void
864 ClearPendingNotifies(void)
865 {
866         /*
867          * We used to have to explicitly deallocate the list members and
868          * nodes, because they were malloc'd.  Now, since we know they are
869          * palloc'd in TopTransactionContext, we need not do that --- they'll
870          * go away automatically at transaction exit.  We need only reset the
871          * list head pointer.
872          */
873         pendingNotifies = NIL;
874 }