Move the HTSU_Result enum definition into snapshot.h, to avoid including
[PostgreSQL.git] / src / backend / commands / async.c
blob583b34d90f969a8efc74d638b59b5535ac9d49ea
1 /*-------------------------------------------------------------------------
3 * async.c
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
6 * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * $PostgreSQL$
12 *-------------------------------------------------------------------------
15 /*-------------------------------------------------------------------------
16 * New Async Notification Model:
17 * 1. Multiple backends on same machine. Multiple backends listening on
18 * one relation. (Note: "listening on a relation" is not really the
19 * right way to think about it, since the notify names need not have
20 * anything to do with the names of relations actually in the database.
21 * But this terminology is all over the code and docs, and I don't feel
22 * like trying to replace it.)
24 * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25 * ie, each relname/listenerPID pair. The "notification" field of the
26 * tuple is zero when no NOTIFY is pending for that listener, or the PID
27 * of the originating backend when a cross-backend NOTIFY is pending.
28 * (We skip writing to pg_listener when doing a self-NOTIFY, so the
29 * notification field should never be equal to the listenerPID field.)
31 * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32 * relname to a list of outstanding NOTIFY requests. Actual processing
33 * happens if and only if we reach transaction commit. At that time (in
34 * routine AtCommit_Notify) we scan pg_listener for matching relnames.
35 * If the listenerPID in a matching tuple is ours, we just send a notify
36 * message to our own front end. If it is not ours, and "notification"
37 * is not already nonzero, we set notification to our own PID and send a
38 * SIGUSR2 signal to the receiving process (indicated by listenerPID).
39 * BTW: if the signal operation fails, we presume that the listener backend
40 * crashed without removing this tuple, and remove the tuple for it.
42 * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43 * notify processing immediately if this backend is idle (ie, it is
44 * waiting for a frontend command and is not within a transaction block).
45 * Otherwise the handler may only set a flag, which will cause the
46 * processing to occur just before we next go idle.
48 * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49 * matching our own listenerPID and having nonzero notification fields.
50 * For each such tuple, we send a message to our frontend and clear the
51 * notification field. BTW: this routine has to start/commit its own
52 * transaction, since by assumption it is only called from outside any
53 * transaction.
55 * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
56 * of pending actions. If we reach transaction commit, the changes are
57 * applied to pg_listener just before executing any pending NOTIFYs. This
58 * method is necessary because to avoid race conditions, we must hold lock
59 * on pg_listener from when we insert a new listener tuple until we commit.
60 * To do that and not create undue hazard of deadlock, we don't want to
61 * touch pg_listener until we are otherwise done with the transaction;
62 * in particular it'd be uncool to still be taking user-commanded locks
63 * while holding the pg_listener lock.
65 * Although we grab ExclusiveLock on pg_listener for any operation,
66 * the lock is never held very long, so it shouldn't cause too much of
67 * a performance problem. (Previously we used AccessExclusiveLock, but
68 * there's no real reason to forbid concurrent reads.)
70 * An application that listens on the same relname it notifies will get
71 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
72 * by comparing be_pid in the NOTIFY message to the application's own backend's
73 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
74 * frontend during startup.) The above design guarantees that notifies from
75 * other backends will never be missed by ignoring self-notifies. Note,
76 * however, that we do *not* guarantee that a separate frontend message will
77 * be sent for every outside NOTIFY. Since there is only room for one
78 * originating PID in pg_listener, outside notifies occurring at about the
79 * same time may be collapsed into a single message bearing the PID of the
80 * first outside backend to perform the NOTIFY.
81 *-------------------------------------------------------------------------
84 #include "postgres.h"
86 #include <unistd.h>
87 #include <signal.h>
89 #include "access/heapam.h"
90 #include "access/twophase_rmgr.h"
91 #include "access/xact.h"
92 #include "catalog/pg_listener.h"
93 #include "commands/async.h"
94 #include "libpq/libpq.h"
95 #include "libpq/pqformat.h"
96 #include "miscadmin.h"
97 #include "storage/ipc.h"
98 #include "storage/sinval.h"
99 #include "tcop/tcopprot.h"
100 #include "utils/builtins.h"
101 #include "utils/fmgroids.h"
102 #include "utils/memutils.h"
103 #include "utils/ps_status.h"
104 #include "utils/tqual.h"
108 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
109 * all actions requested in the current transaction. As explained above,
110 * we don't actually modify pg_listener until we reach transaction commit.
112 * The list is kept in CurTransactionContext. In subtransactions, each
113 * subtransaction has its own list in its own CurTransactionContext, but
114 * successful subtransactions attach their lists to their parent's list.
115 * Failed subtransactions simply discard their lists.
117 typedef enum
119 LISTEN_LISTEN,
120 LISTEN_UNLISTEN,
121 LISTEN_UNLISTEN_ALL
122 } ListenActionKind;
124 typedef struct
126 ListenActionKind action;
127 char condname[1]; /* actually, as long as needed */
128 } ListenAction;
130 static List *pendingActions = NIL; /* list of ListenAction */
132 static List *upperPendingActions = NIL; /* list of upper-xact lists */
135 * State for outbound notifies consists of a list of all relnames NOTIFYed
136 * in the current transaction. We do not actually perform a NOTIFY until
137 * and unless the transaction commits. pendingNotifies is NIL if no
138 * NOTIFYs have been done in the current transaction.
140 * The list is kept in CurTransactionContext. In subtransactions, each
141 * subtransaction has its own list in its own CurTransactionContext, but
142 * successful subtransactions attach their lists to their parent's list.
143 * Failed subtransactions simply discard their lists.
145 * Note: the action and notify lists do not interact within a transaction.
146 * In particular, if a transaction does NOTIFY and then LISTEN on the same
147 * condition name, it will get a self-notify at commit. This is a bit odd
148 * but is consistent with our historical behavior.
150 static List *pendingNotifies = NIL; /* list of C strings */
152 static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
155 * State for inbound notifies consists of two flags: one saying whether
156 * the signal handler is currently allowed to call ProcessIncomingNotify
157 * directly, and one saying whether the signal has occurred but the handler
158 * was not allowed to call ProcessIncomingNotify at the time.
160 * NB: the "volatile" on these declarations is critical! If your compiler
161 * does not grok "volatile", you'd be best advised to compile this file
162 * with all optimization turned off.
164 static volatile sig_atomic_t notifyInterruptEnabled = 0;
165 static volatile sig_atomic_t notifyInterruptOccurred = 0;
167 /* True if we've registered an on_shmem_exit cleanup */
168 static bool unlistenExitRegistered = false;
170 bool Trace_notify = false;
173 static void queue_listen(ListenActionKind action, const char *condname);
174 static void Async_UnlistenOnExit(int code, Datum arg);
175 static void Exec_Listen(Relation lRel, const char *relname);
176 static void Exec_Unlisten(Relation lRel, const char *relname);
177 static void Exec_UnlistenAll(Relation lRel);
178 static void Send_Notify(Relation lRel);
179 static void ProcessIncomingNotify(void);
180 static void NotifyMyFrontEnd(char *relname, int32 listenerPID);
181 static bool AsyncExistsPendingNotify(const char *relname);
182 static void ClearPendingActionsAndNotifies(void);
186 * Async_Notify
188 * This is executed by the SQL notify command.
190 * Adds the relation to the list of pending notifies.
191 * Actual notification happens during transaction commit.
192 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
194 void
195 Async_Notify(const char *relname)
197 if (Trace_notify)
198 elog(DEBUG1, "Async_Notify(%s)", relname);
200 /* no point in making duplicate entries in the list ... */
201 if (!AsyncExistsPendingNotify(relname))
204 * The name list needs to live until end of transaction, so store it
205 * in the transaction context.
207 MemoryContext oldcontext;
209 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
212 * Ordering of the list isn't important. We choose to put new
213 * entries on the front, as this might make duplicate-elimination
214 * a tad faster when the same condition is signaled many times in
215 * a row.
217 pendingNotifies = lcons(pstrdup(relname), pendingNotifies);
219 MemoryContextSwitchTo(oldcontext);
224 * queue_listen
225 * Common code for listen, unlisten, unlisten all commands.
227 * Adds the request to the list of pending actions.
228 * Actual update of pg_listener happens during transaction commit.
229 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
231 static void
232 queue_listen(ListenActionKind action, const char *condname)
234 MemoryContext oldcontext;
235 ListenAction *actrec;
238 * Unlike Async_Notify, we don't try to collapse out duplicates.
239 * It would be too complicated to ensure we get the right interactions
240 * of conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that
241 * there would be any performance benefit anyway in sane applications.
243 oldcontext = MemoryContextSwitchTo(CurTransactionContext);
245 /* space for terminating null is included in sizeof(ListenAction) */
246 actrec = (ListenAction *) palloc(sizeof(ListenAction) + strlen(condname));
247 actrec->action = action;
248 strcpy(actrec->condname, condname);
250 pendingActions = lappend(pendingActions, actrec);
252 MemoryContextSwitchTo(oldcontext);
256 * Async_Listen
258 * This is executed by the SQL listen command.
260 void
261 Async_Listen(const char *relname)
263 if (Trace_notify)
264 elog(DEBUG1, "Async_Listen(%s,%d)", relname, MyProcPid);
266 queue_listen(LISTEN_LISTEN, relname);
270 * Async_Unlisten
272 * This is executed by the SQL unlisten command.
274 void
275 Async_Unlisten(const char *relname)
277 /* Handle specially the `unlisten "*"' command */
278 if ((!relname) || (*relname == '\0') || (strcmp(relname, "*") == 0))
280 Async_UnlistenAll();
282 else
284 if (Trace_notify)
285 elog(DEBUG1, "Async_Unlisten(%s,%d)", relname, MyProcPid);
287 queue_listen(LISTEN_UNLISTEN, relname);
292 * Async_UnlistenAll
294 * This is invoked by UNLISTEN "*" command, and also at backend exit.
296 void
297 Async_UnlistenAll(void)
299 if (Trace_notify)
300 elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid);
302 queue_listen(LISTEN_UNLISTEN_ALL, "");
306 * Async_UnlistenOnExit
308 * Clean up the pg_listener table at backend exit.
310 * This is executed if we have done any LISTENs in this backend.
311 * It might not be necessary anymore, if the user UNLISTENed everything,
312 * but we don't try to detect that case.
314 static void
315 Async_UnlistenOnExit(int code, Datum arg)
318 * We need to start/commit a transaction for the unlisten, but if there is
319 * already an active transaction we had better abort that one first.
320 * Otherwise we'd end up committing changes that probably ought to be
321 * discarded.
323 AbortOutOfAnyTransaction();
324 /* Now we can do the unlisten */
325 StartTransactionCommand();
326 Async_UnlistenAll();
327 CommitTransactionCommand();
331 * AtPrepare_Notify
333 * This is called at the prepare phase of a two-phase
334 * transaction. Save the state for possible commit later.
336 void
337 AtPrepare_Notify(void)
339 ListCell *p;
341 /* It's not sensible to have any pending LISTEN/UNLISTEN actions */
342 if (pendingActions)
343 ereport(ERROR,
344 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN")));
347 /* We can deal with pending NOTIFY though */
348 foreach(p, pendingNotifies)
350 const char *relname = (const char *) lfirst(p);
352 RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID, 0,
353 relname, strlen(relname) + 1);
357 * We can clear the state immediately, rather than needing a separate
358 * PostPrepare call, because if the transaction fails we'd just discard
359 * the state anyway.
361 ClearPendingActionsAndNotifies();
365 * AtCommit_Notify
367 * This is called at transaction commit.
369 * If there are pending LISTEN/UNLISTEN actions, insert or delete
370 * tuples in pg_listener accordingly.
372 * If there are outbound notify requests in the pendingNotifies list,
373 * scan pg_listener for matching tuples, and either signal the other
374 * backend or send a message to our own frontend.
376 * NOTE: we are still inside the current transaction, therefore can
377 * piggyback on its committing of changes.
379 void
380 AtCommit_Notify(void)
382 Relation lRel;
383 ListCell *p;
385 if (pendingActions == NIL && pendingNotifies == NIL)
386 return; /* no relevant statements in this xact */
389 * NOTIFY is disabled if not normal processing mode. This test used to be
390 * in xact.c, but it seems cleaner to do it here.
392 if (!IsNormalProcessingMode())
394 ClearPendingActionsAndNotifies();
395 return;
398 if (Trace_notify)
399 elog(DEBUG1, "AtCommit_Notify");
401 /* Acquire ExclusiveLock on pg_listener */
402 lRel = heap_open(ListenerRelationId, ExclusiveLock);
404 /* Perform any pending listen/unlisten actions */
405 foreach(p, pendingActions)
407 ListenAction *actrec = (ListenAction *) lfirst(p);
409 switch (actrec->action)
411 case LISTEN_LISTEN:
412 Exec_Listen(lRel, actrec->condname);
413 break;
414 case LISTEN_UNLISTEN:
415 Exec_Unlisten(lRel, actrec->condname);
416 break;
417 case LISTEN_UNLISTEN_ALL:
418 Exec_UnlistenAll(lRel);
419 break;
422 /* We must CCI after each action in case of conflicting actions */
423 CommandCounterIncrement();
426 /* Perform any pending notifies */
427 if (pendingNotifies)
428 Send_Notify(lRel);
431 * We do NOT release the lock on pg_listener here; we need to hold it
432 * until end of transaction (which is about to happen, anyway) to ensure
433 * that notified backends see our tuple updates when they look. Else they
434 * might disregard the signal, which would make the application programmer
435 * very unhappy. Also, this prevents race conditions when we have just
436 * inserted a listening tuple.
438 heap_close(lRel, NoLock);
440 ClearPendingActionsAndNotifies();
442 if (Trace_notify)
443 elog(DEBUG1, "AtCommit_Notify: done");
447 * Exec_Listen --- subroutine for AtCommit_Notify
449 * Register the current backend as listening on the specified relation.
451 static void
452 Exec_Listen(Relation lRel, const char *relname)
454 HeapScanDesc scan;
455 HeapTuple tuple;
456 Datum values[Natts_pg_listener];
457 char nulls[Natts_pg_listener];
458 NameData condname;
459 bool alreadyListener = false;
461 if (Trace_notify)
462 elog(DEBUG1, "Exec_Listen(%s,%d)", relname, MyProcPid);
464 /* Detect whether we are already listening on this relname */
465 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
466 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
468 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
470 if (listener->listenerpid == MyProcPid &&
471 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
473 alreadyListener = true;
474 /* No need to scan the rest of the table */
475 break;
478 heap_endscan(scan);
480 if (alreadyListener)
481 return;
484 * OK to insert a new tuple
486 memset(nulls, ' ', sizeof(nulls));
488 namestrcpy(&condname, relname);
489 values[Anum_pg_listener_relname - 1] = NameGetDatum(&condname);
490 values[Anum_pg_listener_pid - 1] = Int32GetDatum(MyProcPid);
491 values[Anum_pg_listener_notify - 1] = Int32GetDatum(0); /* no notifies pending */
493 tuple = heap_formtuple(RelationGetDescr(lRel), values, nulls);
495 simple_heap_insert(lRel, tuple);
497 #ifdef NOT_USED /* currently there are no indexes */
498 CatalogUpdateIndexes(lRel, tuple);
499 #endif
501 heap_freetuple(tuple);
504 * now that we are listening, make sure we will unlisten before dying.
506 if (!unlistenExitRegistered)
508 on_shmem_exit(Async_UnlistenOnExit, 0);
509 unlistenExitRegistered = true;
514 * Exec_Unlisten --- subroutine for AtCommit_Notify
516 * Remove the current backend from the list of listening backends
517 * for the specified relation.
519 static void
520 Exec_Unlisten(Relation lRel, const char *relname)
522 HeapScanDesc scan;
523 HeapTuple tuple;
525 if (Trace_notify)
526 elog(DEBUG1, "Exec_Unlisten(%s,%d)", relname, MyProcPid);
528 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
529 while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
531 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(tuple);
533 if (listener->listenerpid == MyProcPid &&
534 strncmp(NameStr(listener->relname), relname, NAMEDATALEN) == 0)
536 /* Found the matching tuple, delete it */
537 simple_heap_delete(lRel, &tuple->t_self);
540 * We assume there can be only one match, so no need to scan the
541 * rest of the table
543 break;
546 heap_endscan(scan);
549 * We do not complain about unlistening something not being listened;
550 * should we?
555 * Exec_UnlistenAll --- subroutine for AtCommit_Notify
557 * Update pg_listener to unlisten all relations for this backend.
559 static void
560 Exec_UnlistenAll(Relation lRel)
562 HeapScanDesc scan;
563 HeapTuple lTuple;
564 ScanKeyData key[1];
566 if (Trace_notify)
567 elog(DEBUG1, "Exec_UnlistenAll");
569 /* Find and delete all entries with my listenerPID */
570 ScanKeyInit(&key[0],
571 Anum_pg_listener_pid,
572 BTEqualStrategyNumber, F_INT4EQ,
573 Int32GetDatum(MyProcPid));
574 scan = heap_beginscan(lRel, SnapshotNow, 1, key);
576 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
577 simple_heap_delete(lRel, &lTuple->t_self);
579 heap_endscan(scan);
583 * Send_Notify --- subroutine for AtCommit_Notify
585 * Scan pg_listener for tuples matching our pending notifies, and
586 * either signal the other backend or send a message to our own frontend.
588 static void
589 Send_Notify(Relation lRel)
591 TupleDesc tdesc = RelationGetDescr(lRel);
592 HeapScanDesc scan;
593 HeapTuple lTuple,
594 rTuple;
595 Datum value[Natts_pg_listener];
596 char repl[Natts_pg_listener],
597 nulls[Natts_pg_listener];
599 /* preset data to update notify column to MyProcPid */
600 nulls[0] = nulls[1] = nulls[2] = ' ';
601 repl[0] = repl[1] = repl[2] = ' ';
602 repl[Anum_pg_listener_notify - 1] = 'r';
603 value[0] = value[1] = value[2] = (Datum) 0;
604 value[Anum_pg_listener_notify - 1] = Int32GetDatum(MyProcPid);
606 scan = heap_beginscan(lRel, SnapshotNow, 0, NULL);
608 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
610 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
611 char *relname = NameStr(listener->relname);
612 int32 listenerPID = listener->listenerpid;
614 if (!AsyncExistsPendingNotify(relname))
615 continue;
617 if (listenerPID == MyProcPid)
620 * Self-notify: no need to bother with table update. Indeed, we
621 * *must not* clear the notification field in this path, or we
622 * could lose an outside notify, which'd be bad for applications
623 * that ignore self-notify messages.
625 if (Trace_notify)
626 elog(DEBUG1, "AtCommit_Notify: notifying self");
628 NotifyMyFrontEnd(relname, listenerPID);
630 else
632 if (Trace_notify)
633 elog(DEBUG1, "AtCommit_Notify: notifying pid %d",
634 listenerPID);
637 * If someone has already notified this listener, we don't bother
638 * modifying the table, but we do still send a SIGUSR2 signal,
639 * just in case that backend missed the earlier signal for some
640 * reason. It's OK to send the signal first, because the other
641 * guy can't read pg_listener until we unlock it.
643 if (kill(listenerPID, SIGUSR2) < 0)
646 * Get rid of pg_listener entry if it refers to a PID that no
647 * longer exists. Presumably, that backend crashed without
648 * deleting its pg_listener entries. This code used to only
649 * delete the entry if errno==ESRCH, but as far as I can see
650 * we should just do it for any failure (certainly at least
651 * for EPERM too...)
653 simple_heap_delete(lRel, &lTuple->t_self);
655 else if (listener->notification == 0)
657 /* Rewrite the tuple with my PID in notification column */
658 rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl);
659 simple_heap_update(lRel, &lTuple->t_self, rTuple);
661 #ifdef NOT_USED /* currently there are no indexes */
662 CatalogUpdateIndexes(lRel, rTuple);
663 #endif
668 heap_endscan(scan);
672 * AtAbort_Notify
674 * This is called at transaction abort.
676 * Gets rid of pending actions and outbound notifies that we would have
677 * executed if the transaction got committed.
679 void
680 AtAbort_Notify(void)
682 ClearPendingActionsAndNotifies();
686 * AtSubStart_Notify() --- Take care of subtransaction start.
688 * Push empty state for the new subtransaction.
690 void
691 AtSubStart_Notify(void)
693 MemoryContext old_cxt;
695 /* Keep the list-of-lists in TopTransactionContext for simplicity */
696 old_cxt = MemoryContextSwitchTo(TopTransactionContext);
698 upperPendingActions = lcons(pendingActions, upperPendingActions);
700 Assert(list_length(upperPendingActions) ==
701 GetCurrentTransactionNestLevel() - 1);
703 pendingActions = NIL;
705 upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies);
707 Assert(list_length(upperPendingNotifies) ==
708 GetCurrentTransactionNestLevel() - 1);
710 pendingNotifies = NIL;
712 MemoryContextSwitchTo(old_cxt);
716 * AtSubCommit_Notify() --- Take care of subtransaction commit.
718 * Reassign all items in the pending lists to the parent transaction.
720 void
721 AtSubCommit_Notify(void)
723 List *parentPendingActions;
724 List *parentPendingNotifies;
726 parentPendingActions = (List *) linitial(upperPendingActions);
727 upperPendingActions = list_delete_first(upperPendingActions);
729 Assert(list_length(upperPendingActions) ==
730 GetCurrentTransactionNestLevel() - 2);
733 * Mustn't try to eliminate duplicates here --- see queue_listen()
735 pendingActions = list_concat(parentPendingActions, pendingActions);
737 parentPendingNotifies = (List *) linitial(upperPendingNotifies);
738 upperPendingNotifies = list_delete_first(upperPendingNotifies);
740 Assert(list_length(upperPendingNotifies) ==
741 GetCurrentTransactionNestLevel() - 2);
744 * We could try to eliminate duplicates here, but it seems not worthwhile.
746 pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies);
750 * AtSubAbort_Notify() --- Take care of subtransaction abort.
752 void
753 AtSubAbort_Notify(void)
755 int my_level = GetCurrentTransactionNestLevel();
758 * All we have to do is pop the stack --- the actions/notifies made in this
759 * subxact are no longer interesting, and the space will be freed when
760 * CurTransactionContext is recycled.
762 * This routine could be called more than once at a given nesting level if
763 * there is trouble during subxact abort. Avoid dumping core by using
764 * GetCurrentTransactionNestLevel as the indicator of how far we need to
765 * prune the list.
767 while (list_length(upperPendingActions) > my_level - 2)
769 pendingActions = (List *) linitial(upperPendingActions);
770 upperPendingActions = list_delete_first(upperPendingActions);
773 while (list_length(upperPendingNotifies) > my_level - 2)
775 pendingNotifies = (List *) linitial(upperPendingNotifies);
776 upperPendingNotifies = list_delete_first(upperPendingNotifies);
781 * NotifyInterruptHandler
783 * This is the signal handler for SIGUSR2.
785 * If we are idle (notifyInterruptEnabled is set), we can safely invoke
786 * ProcessIncomingNotify directly. Otherwise, just set a flag
787 * to do it later.
789 void
790 NotifyInterruptHandler(SIGNAL_ARGS)
792 int save_errno = errno;
795 * Note: this is a SIGNAL HANDLER. You must be very wary what you do
796 * here. Some helpful soul had this routine sprinkled with TPRINTFs, which
797 * would likely lead to corruption of stdio buffers if they were ever
798 * turned on.
801 /* Don't joggle the elbow of proc_exit */
802 if (proc_exit_inprogress)
803 return;
805 if (notifyInterruptEnabled)
807 bool save_ImmediateInterruptOK = ImmediateInterruptOK;
810 * We may be called while ImmediateInterruptOK is true; turn it off
811 * while messing with the NOTIFY state. (We would have to save and
812 * restore it anyway, because PGSemaphore operations inside
813 * ProcessIncomingNotify() might reset it.)
815 ImmediateInterruptOK = false;
818 * I'm not sure whether some flavors of Unix might allow another
819 * SIGUSR2 occurrence to recursively interrupt this routine. To cope
820 * with the possibility, we do the same sort of dance that
821 * EnableNotifyInterrupt must do --- see that routine for comments.
823 notifyInterruptEnabled = 0; /* disable any recursive signal */
824 notifyInterruptOccurred = 1; /* do at least one iteration */
825 for (;;)
827 notifyInterruptEnabled = 1;
828 if (!notifyInterruptOccurred)
829 break;
830 notifyInterruptEnabled = 0;
831 if (notifyInterruptOccurred)
833 /* Here, it is finally safe to do stuff. */
834 if (Trace_notify)
835 elog(DEBUG1, "NotifyInterruptHandler: perform async notify");
837 ProcessIncomingNotify();
839 if (Trace_notify)
840 elog(DEBUG1, "NotifyInterruptHandler: done");
845 * Restore ImmediateInterruptOK, and check for interrupts if needed.
847 ImmediateInterruptOK = save_ImmediateInterruptOK;
848 if (save_ImmediateInterruptOK)
849 CHECK_FOR_INTERRUPTS();
851 else
854 * In this path it is NOT SAFE to do much of anything, except this:
856 notifyInterruptOccurred = 1;
859 errno = save_errno;
863 * EnableNotifyInterrupt
865 * This is called by the PostgresMain main loop just before waiting
866 * for a frontend command. If we are truly idle (ie, *not* inside
867 * a transaction block), then process any pending inbound notifies,
868 * and enable the signal handler to process future notifies directly.
870 * NOTE: the signal handler starts out disabled, and stays so until
871 * PostgresMain calls this the first time.
873 void
874 EnableNotifyInterrupt(void)
876 if (IsTransactionOrTransactionBlock())
877 return; /* not really idle */
880 * This code is tricky because we are communicating with a signal handler
881 * that could interrupt us at any point. If we just checked
882 * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
883 * fail to respond promptly to a signal that happens in between those two
884 * steps. (A very small time window, perhaps, but Murphy's Law says you
885 * can hit it...) Instead, we first set the enable flag, then test the
886 * occurred flag. If we see an unserviced interrupt has occurred, we
887 * re-clear the enable flag before going off to do the service work. (That
888 * prevents re-entrant invocation of ProcessIncomingNotify() if another
889 * interrupt occurs.) If an interrupt comes in between the setting and
890 * clearing of notifyInterruptEnabled, then it will have done the service
891 * work and left notifyInterruptOccurred zero, so we have to check again
892 * after clearing enable. The whole thing has to be in a loop in case
893 * another interrupt occurs while we're servicing the first. Once we get
894 * out of the loop, enable is set and we know there is no unserviced
895 * interrupt.
897 * NB: an overenthusiastic optimizing compiler could easily break this
898 * code. Hopefully, they all understand what "volatile" means these days.
900 for (;;)
902 notifyInterruptEnabled = 1;
903 if (!notifyInterruptOccurred)
904 break;
905 notifyInterruptEnabled = 0;
906 if (notifyInterruptOccurred)
908 if (Trace_notify)
909 elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
911 ProcessIncomingNotify();
913 if (Trace_notify)
914 elog(DEBUG1, "EnableNotifyInterrupt: done");
920 * DisableNotifyInterrupt
922 * This is called by the PostgresMain main loop just after receiving
923 * a frontend command. Signal handler execution of inbound notifies
924 * is disabled until the next EnableNotifyInterrupt call.
926 * The SIGUSR1 signal handler also needs to call this, so as to
927 * prevent conflicts if one signal interrupts the other. So we
928 * must return the previous state of the flag.
930 bool
931 DisableNotifyInterrupt(void)
933 bool result = (notifyInterruptEnabled != 0);
935 notifyInterruptEnabled = 0;
937 return result;
941 * ProcessIncomingNotify
943 * Deal with arriving NOTIFYs from other backends.
944 * This is called either directly from the SIGUSR2 signal handler,
945 * or the next time control reaches the outer idle loop.
946 * Scan pg_listener for arriving notifies, report them to my front end,
947 * and clear the notification field in pg_listener until next time.
949 * NOTE: since we are outside any transaction, we must create our own.
951 static void
952 ProcessIncomingNotify(void)
954 Relation lRel;
955 TupleDesc tdesc;
956 ScanKeyData key[1];
957 HeapScanDesc scan;
958 HeapTuple lTuple,
959 rTuple;
960 Datum value[Natts_pg_listener];
961 char repl[Natts_pg_listener],
962 nulls[Natts_pg_listener];
963 bool catchup_enabled;
965 /* Must prevent SIGUSR1 interrupt while I am running */
966 catchup_enabled = DisableCatchupInterrupt();
968 if (Trace_notify)
969 elog(DEBUG1, "ProcessIncomingNotify");
971 set_ps_display("notify interrupt", false);
973 notifyInterruptOccurred = 0;
975 StartTransactionCommand();
977 lRel = heap_open(ListenerRelationId, ExclusiveLock);
978 tdesc = RelationGetDescr(lRel);
980 /* Scan only entries with my listenerPID */
981 ScanKeyInit(&key[0],
982 Anum_pg_listener_pid,
983 BTEqualStrategyNumber, F_INT4EQ,
984 Int32GetDatum(MyProcPid));
985 scan = heap_beginscan(lRel, SnapshotNow, 1, key);
987 /* Prepare data for rewriting 0 into notification field */
988 nulls[0] = nulls[1] = nulls[2] = ' ';
989 repl[0] = repl[1] = repl[2] = ' ';
990 repl[Anum_pg_listener_notify - 1] = 'r';
991 value[0] = value[1] = value[2] = (Datum) 0;
992 value[Anum_pg_listener_notify - 1] = Int32GetDatum(0);
994 while ((lTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
996 Form_pg_listener listener = (Form_pg_listener) GETSTRUCT(lTuple);
997 char *relname = NameStr(listener->relname);
998 int32 sourcePID = listener->notification;
1000 if (sourcePID != 0)
1002 /* Notify the frontend */
1004 if (Trace_notify)
1005 elog(DEBUG1, "ProcessIncomingNotify: received %s from %d",
1006 relname, (int) sourcePID);
1008 NotifyMyFrontEnd(relname, sourcePID);
1011 * Rewrite the tuple with 0 in notification column.
1013 rTuple = heap_modifytuple(lTuple, tdesc, value, nulls, repl);
1014 simple_heap_update(lRel, &lTuple->t_self, rTuple);
1016 #ifdef NOT_USED /* currently there are no indexes */
1017 CatalogUpdateIndexes(lRel, rTuple);
1018 #endif
1021 heap_endscan(scan);
1024 * We do NOT release the lock on pg_listener here; we need to hold it
1025 * until end of transaction (which is about to happen, anyway) to ensure
1026 * that other backends see our tuple updates when they look. Otherwise, a
1027 * transaction started after this one might mistakenly think it doesn't
1028 * need to send this backend a new NOTIFY.
1030 heap_close(lRel, NoLock);
1032 CommitTransactionCommand();
1035 * Must flush the notify messages to ensure frontend gets them promptly.
1037 pq_flush();
1039 set_ps_display("idle", false);
1041 if (Trace_notify)
1042 elog(DEBUG1, "ProcessIncomingNotify: done");
1044 if (catchup_enabled)
1045 EnableCatchupInterrupt();
1049 * Send NOTIFY message to my front end.
1051 static void
1052 NotifyMyFrontEnd(char *relname, int32 listenerPID)
1054 if (whereToSendOutput == DestRemote)
1056 StringInfoData buf;
1058 pq_beginmessage(&buf, 'A');
1059 pq_sendint(&buf, listenerPID, sizeof(int32));
1060 pq_sendstring(&buf, relname);
1061 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
1063 /* XXX Add parameter string here later */
1064 pq_sendstring(&buf, "");
1066 pq_endmessage(&buf);
1069 * NOTE: we do not do pq_flush() here. For a self-notify, it will
1070 * happen at the end of the transaction, and for incoming notifies
1071 * ProcessIncomingNotify will do it after finding all the notifies.
1074 else
1075 elog(INFO, "NOTIFY for %s", relname);
1078 /* Does pendingNotifies include the given relname? */
1079 static bool
1080 AsyncExistsPendingNotify(const char *relname)
1082 ListCell *p;
1084 foreach(p, pendingNotifies)
1086 const char *prelname = (const char *) lfirst(p);
1088 if (strcmp(prelname, relname) == 0)
1089 return true;
1092 return false;
1095 /* Clear the pendingActions and pendingNotifies lists. */
1096 static void
1097 ClearPendingActionsAndNotifies(void)
1100 * We used to have to explicitly deallocate the list members and nodes,
1101 * because they were malloc'd. Now, since we know they are palloc'd in
1102 * CurTransactionContext, we need not do that --- they'll go away
1103 * automatically at transaction exit. We need only reset the list head
1104 * pointers.
1106 pendingActions = NIL;
1107 pendingNotifies = NIL;
1111 * 2PC processing routine for COMMIT PREPARED case.
1113 * (We don't have to do anything for ROLLBACK PREPARED.)
1115 void
1116 notify_twophase_postcommit(TransactionId xid, uint16 info,
1117 void *recdata, uint32 len)
1120 * Set up to issue the NOTIFY at the end of my own current transaction.
1121 * (XXX this has some issues if my own transaction later rolls back, or if
1122 * there is any significant delay before I commit. OK for now because we
1123 * disallow COMMIT PREPARED inside a transaction block.)
1125 Async_Notify((char *) recdata);