1 /*-------------------------------------------------------------------------
4 * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN
6 * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
12 *-------------------------------------------------------------------------
15 /*-------------------------------------------------------------------------
16 * New Async Notification Model:
17 * 1. Multiple backends on same machine. Multiple backends listening on
18 * one relation. (Note: "listening on a relation" is not really the
19 * right way to think about it, since the notify names need not have
20 * anything to do with the names of relations actually in the database.
21 * But this terminology is all over the code and docs, and I don't feel
22 * like trying to replace it.)
24 * 2. There is a tuple in relation "pg_listener" for each active LISTEN,
25 * ie, each relname/listenerPID pair. The "notification" field of the
26 * tuple is zero when no NOTIFY is pending for that listener, or the PID
27 * of the originating backend when a cross-backend NOTIFY is pending.
28 * (We skip writing to pg_listener when doing a self-NOTIFY, so the
29 * notification field should never be equal to the listenerPID field.)
31 * 3. The NOTIFY statement itself (routine Async_Notify) just adds the target
32 * relname to a list of outstanding NOTIFY requests. Actual processing
33 * happens if and only if we reach transaction commit. At that time (in
34 * routine AtCommit_Notify) we scan pg_listener for matching relnames.
35 * If the listenerPID in a matching tuple is ours, we just send a notify
36 * message to our own front end. If it is not ours, and "notification"
37 * is not already nonzero, we set notification to our own PID and send a
38 * SIGUSR2 signal to the receiving process (indicated by listenerPID).
39 * BTW: if the signal operation fails, we presume that the listener backend
40 * crashed without removing this tuple, and remove the tuple for it.
42 * 4. Upon receipt of a SIGUSR2 signal, the signal handler can call inbound-
43 * notify processing immediately if this backend is idle (ie, it is
44 * waiting for a frontend command and is not within a transaction block).
45 * Otherwise the handler may only set a flag, which will cause the
46 * processing to occur just before we next go idle.
48 * 5. Inbound-notify processing consists of scanning pg_listener for tuples
49 * matching our own listenerPID and having nonzero notification fields.
50 * For each such tuple, we send a message to our frontend and clear the
51 * notification field. BTW: this routine has to start/commit its own
52 * transaction, since by assumption it is only called from outside any
55 * Like NOTIFY, LISTEN and UNLISTEN just add the desired action to a list
56 * of pending actions. If we reach transaction commit, the changes are
57 * applied to pg_listener just before executing any pending NOTIFYs. This
58 * method is necessary because to avoid race conditions, we must hold lock
59 * on pg_listener from when we insert a new listener tuple until we commit.
60 * To do that and not create undue hazard of deadlock, we don't want to
61 * touch pg_listener until we are otherwise done with the transaction;
62 * in particular it'd be uncool to still be taking user-commanded locks
63 * while holding the pg_listener lock.
65 * Although we grab ExclusiveLock on pg_listener for any operation,
66 * the lock is never held very long, so it shouldn't cause too much of
67 * a performance problem. (Previously we used AccessExclusiveLock, but
68 * there's no real reason to forbid concurrent reads.)
70 * An application that listens on the same relname it notifies will get
71 * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
72 * by comparing be_pid in the NOTIFY message to the application's own backend's
73 * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the
74 * frontend during startup.) The above design guarantees that notifies from
75 * other backends will never be missed by ignoring self-notifies. Note,
76 * however, that we do *not* guarantee that a separate frontend message will
77 * be sent for every outside NOTIFY. Since there is only room for one
78 * originating PID in pg_listener, outside notifies occurring at about the
79 * same time may be collapsed into a single message bearing the PID of the
80 * first outside backend to perform the NOTIFY.
81 *-------------------------------------------------------------------------
89 #include "access/heapam.h"
90 #include "access/twophase_rmgr.h"
91 #include "access/xact.h"
92 #include "catalog/pg_listener.h"
93 #include "commands/async.h"
94 #include "libpq/libpq.h"
95 #include "libpq/pqformat.h"
96 #include "miscadmin.h"
97 #include "storage/ipc.h"
98 #include "storage/sinval.h"
99 #include "tcop/tcopprot.h"
100 #include "utils/builtins.h"
101 #include "utils/fmgroids.h"
102 #include "utils/memutils.h"
103 #include "utils/ps_status.h"
104 #include "utils/tqual.h"
108 * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
109 * all actions requested in the current transaction. As explained above,
110 * we don't actually modify pg_listener until we reach transaction commit.
112 * The list is kept in CurTransactionContext. In subtransactions, each
113 * subtransaction has its own list in its own CurTransactionContext, but
114 * successful subtransactions attach their lists to their parent's list.
115 * Failed subtransactions simply discard their lists.
126 ListenActionKind action
;
127 char condname
[1]; /* actually, as long as needed */
130 static List
*pendingActions
= NIL
; /* list of ListenAction */
132 static List
*upperPendingActions
= NIL
; /* list of upper-xact lists */
135 * State for outbound notifies consists of a list of all relnames NOTIFYed
136 * in the current transaction. We do not actually perform a NOTIFY until
137 * and unless the transaction commits. pendingNotifies is NIL if no
138 * NOTIFYs have been done in the current transaction.
140 * The list is kept in CurTransactionContext. In subtransactions, each
141 * subtransaction has its own list in its own CurTransactionContext, but
142 * successful subtransactions attach their lists to their parent's list.
143 * Failed subtransactions simply discard their lists.
145 * Note: the action and notify lists do not interact within a transaction.
146 * In particular, if a transaction does NOTIFY and then LISTEN on the same
147 * condition name, it will get a self-notify at commit. This is a bit odd
148 * but is consistent with our historical behavior.
150 static List
*pendingNotifies
= NIL
; /* list of C strings */
152 static List
*upperPendingNotifies
= NIL
; /* list of upper-xact lists */
155 * State for inbound notifies consists of two flags: one saying whether
156 * the signal handler is currently allowed to call ProcessIncomingNotify
157 * directly, and one saying whether the signal has occurred but the handler
158 * was not allowed to call ProcessIncomingNotify at the time.
160 * NB: the "volatile" on these declarations is critical! If your compiler
161 * does not grok "volatile", you'd be best advised to compile this file
162 * with all optimization turned off.
164 static volatile sig_atomic_t notifyInterruptEnabled
= 0;
165 static volatile sig_atomic_t notifyInterruptOccurred
= 0;
167 /* True if we've registered an on_shmem_exit cleanup */
168 static bool unlistenExitRegistered
= false;
170 bool Trace_notify
= false;
173 static void queue_listen(ListenActionKind action
, const char *condname
);
174 static void Async_UnlistenOnExit(int code
, Datum arg
);
175 static void Exec_Listen(Relation lRel
, const char *relname
);
176 static void Exec_Unlisten(Relation lRel
, const char *relname
);
177 static void Exec_UnlistenAll(Relation lRel
);
178 static void Send_Notify(Relation lRel
);
179 static void ProcessIncomingNotify(void);
180 static void NotifyMyFrontEnd(char *relname
, int32 listenerPID
);
181 static bool AsyncExistsPendingNotify(const char *relname
);
182 static void ClearPendingActionsAndNotifies(void);
188 * This is executed by the SQL notify command.
190 * Adds the relation to the list of pending notifies.
191 * Actual notification happens during transaction commit.
192 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
195 Async_Notify(const char *relname
)
198 elog(DEBUG1
, "Async_Notify(%s)", relname
);
200 /* no point in making duplicate entries in the list ... */
201 if (!AsyncExistsPendingNotify(relname
))
204 * The name list needs to live until end of transaction, so store it
205 * in the transaction context.
207 MemoryContext oldcontext
;
209 oldcontext
= MemoryContextSwitchTo(CurTransactionContext
);
212 * Ordering of the list isn't important. We choose to put new entries
213 * on the front, as this might make duplicate-elimination a tad faster
214 * when the same condition is signaled many times in a row.
216 pendingNotifies
= lcons(pstrdup(relname
), pendingNotifies
);
218 MemoryContextSwitchTo(oldcontext
);
224 * Common code for listen, unlisten, unlisten all commands.
226 * Adds the request to the list of pending actions.
227 * Actual update of pg_listener happens during transaction commit.
228 * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
231 queue_listen(ListenActionKind action
, const char *condname
)
233 MemoryContext oldcontext
;
234 ListenAction
*actrec
;
237 * Unlike Async_Notify, we don't try to collapse out duplicates. It would
238 * be too complicated to ensure we get the right interactions of
239 * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there
240 * would be any performance benefit anyway in sane applications.
242 oldcontext
= MemoryContextSwitchTo(CurTransactionContext
);
244 /* space for terminating null is included in sizeof(ListenAction) */
245 actrec
= (ListenAction
*) palloc(sizeof(ListenAction
) + strlen(condname
));
246 actrec
->action
= action
;
247 strcpy(actrec
->condname
, condname
);
249 pendingActions
= lappend(pendingActions
, actrec
);
251 MemoryContextSwitchTo(oldcontext
);
257 * This is executed by the SQL listen command.
260 Async_Listen(const char *relname
)
263 elog(DEBUG1
, "Async_Listen(%s,%d)", relname
, MyProcPid
);
265 queue_listen(LISTEN_LISTEN
, relname
);
271 * This is executed by the SQL unlisten command.
274 Async_Unlisten(const char *relname
)
277 elog(DEBUG1
, "Async_Unlisten(%s,%d)", relname
, MyProcPid
);
279 /* If we couldn't possibly be listening, no need to queue anything */
280 if (pendingActions
== NIL
&& !unlistenExitRegistered
)
283 queue_listen(LISTEN_UNLISTEN
, relname
);
289 * This is invoked by UNLISTEN * command, and also at backend exit.
292 Async_UnlistenAll(void)
295 elog(DEBUG1
, "Async_UnlistenAll(%d)", MyProcPid
);
297 /* If we couldn't possibly be listening, no need to queue anything */
298 if (pendingActions
== NIL
&& !unlistenExitRegistered
)
301 queue_listen(LISTEN_UNLISTEN_ALL
, "");
305 * Async_UnlistenOnExit
307 * Clean up the pg_listener table at backend exit.
309 * This is executed if we have done any LISTENs in this backend.
310 * It might not be necessary anymore, if the user UNLISTENed everything,
311 * but we don't try to detect that case.
314 Async_UnlistenOnExit(int code
, Datum arg
)
317 * We need to start/commit a transaction for the unlisten, but if there is
318 * already an active transaction we had better abort that one first.
319 * Otherwise we'd end up committing changes that probably ought to be
322 AbortOutOfAnyTransaction();
323 /* Now we can do the unlisten */
324 StartTransactionCommand();
326 CommitTransactionCommand();
332 * This is called at the prepare phase of a two-phase
333 * transaction. Save the state for possible commit later.
336 AtPrepare_Notify(void)
340 /* It's not sensible to have any pending LISTEN/UNLISTEN actions */
343 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
344 errmsg("cannot PREPARE a transaction that has executed LISTEN or UNLISTEN")));
346 /* We can deal with pending NOTIFY though */
347 foreach(p
, pendingNotifies
)
349 const char *relname
= (const char *) lfirst(p
);
351 RegisterTwoPhaseRecord(TWOPHASE_RM_NOTIFY_ID
, 0,
352 relname
, strlen(relname
) + 1);
356 * We can clear the state immediately, rather than needing a separate
357 * PostPrepare call, because if the transaction fails we'd just discard
360 ClearPendingActionsAndNotifies();
366 * This is called at transaction commit.
368 * If there are pending LISTEN/UNLISTEN actions, insert or delete
369 * tuples in pg_listener accordingly.
371 * If there are outbound notify requests in the pendingNotifies list,
372 * scan pg_listener for matching tuples, and either signal the other
373 * backend or send a message to our own frontend.
375 * NOTE: we are still inside the current transaction, therefore can
376 * piggyback on its committing of changes.
379 AtCommit_Notify(void)
384 if (pendingActions
== NIL
&& pendingNotifies
== NIL
)
385 return; /* no relevant statements in this xact */
388 * NOTIFY is disabled if not normal processing mode. This test used to be
389 * in xact.c, but it seems cleaner to do it here.
391 if (!IsNormalProcessingMode())
393 ClearPendingActionsAndNotifies();
398 elog(DEBUG1
, "AtCommit_Notify");
400 /* Acquire ExclusiveLock on pg_listener */
401 lRel
= heap_open(ListenerRelationId
, ExclusiveLock
);
403 /* Perform any pending listen/unlisten actions */
404 foreach(p
, pendingActions
)
406 ListenAction
*actrec
= (ListenAction
*) lfirst(p
);
408 switch (actrec
->action
)
411 Exec_Listen(lRel
, actrec
->condname
);
413 case LISTEN_UNLISTEN
:
414 Exec_Unlisten(lRel
, actrec
->condname
);
416 case LISTEN_UNLISTEN_ALL
:
417 Exec_UnlistenAll(lRel
);
421 /* We must CCI after each action in case of conflicting actions */
422 CommandCounterIncrement();
425 /* Perform any pending notifies */
430 * We do NOT release the lock on pg_listener here; we need to hold it
431 * until end of transaction (which is about to happen, anyway) to ensure
432 * that notified backends see our tuple updates when they look. Else they
433 * might disregard the signal, which would make the application programmer
434 * very unhappy. Also, this prevents race conditions when we have just
435 * inserted a listening tuple.
437 heap_close(lRel
, NoLock
);
439 ClearPendingActionsAndNotifies();
442 elog(DEBUG1
, "AtCommit_Notify: done");
446 * Exec_Listen --- subroutine for AtCommit_Notify
448 * Register the current backend as listening on the specified relation.
451 Exec_Listen(Relation lRel
, const char *relname
)
455 Datum values
[Natts_pg_listener
];
456 bool nulls
[Natts_pg_listener
];
458 bool alreadyListener
= false;
461 elog(DEBUG1
, "Exec_Listen(%s,%d)", relname
, MyProcPid
);
463 /* Detect whether we are already listening on this relname */
464 scan
= heap_beginscan(lRel
, SnapshotNow
, 0, NULL
);
465 while ((tuple
= heap_getnext(scan
, ForwardScanDirection
)) != NULL
)
467 Form_pg_listener listener
= (Form_pg_listener
) GETSTRUCT(tuple
);
469 if (listener
->listenerpid
== MyProcPid
&&
470 strncmp(NameStr(listener
->relname
), relname
, NAMEDATALEN
) == 0)
472 alreadyListener
= true;
473 /* No need to scan the rest of the table */
483 * OK to insert a new tuple
485 memset(nulls
, false, sizeof(nulls
));
487 namestrcpy(&condname
, relname
);
488 values
[Anum_pg_listener_relname
- 1] = NameGetDatum(&condname
);
489 values
[Anum_pg_listener_pid
- 1] = Int32GetDatum(MyProcPid
);
490 values
[Anum_pg_listener_notify
- 1] = Int32GetDatum(0); /* no notifies pending */
492 tuple
= heap_form_tuple(RelationGetDescr(lRel
), values
, nulls
);
494 simple_heap_insert(lRel
, tuple
);
496 #ifdef NOT_USED /* currently there are no indexes */
497 CatalogUpdateIndexes(lRel
, tuple
);
500 heap_freetuple(tuple
);
503 * now that we are listening, make sure we will unlisten before dying.
505 if (!unlistenExitRegistered
)
507 on_shmem_exit(Async_UnlistenOnExit
, 0);
508 unlistenExitRegistered
= true;
513 * Exec_Unlisten --- subroutine for AtCommit_Notify
515 * Remove the current backend from the list of listening backends
516 * for the specified relation.
519 Exec_Unlisten(Relation lRel
, const char *relname
)
525 elog(DEBUG1
, "Exec_Unlisten(%s,%d)", relname
, MyProcPid
);
527 scan
= heap_beginscan(lRel
, SnapshotNow
, 0, NULL
);
528 while ((tuple
= heap_getnext(scan
, ForwardScanDirection
)) != NULL
)
530 Form_pg_listener listener
= (Form_pg_listener
) GETSTRUCT(tuple
);
532 if (listener
->listenerpid
== MyProcPid
&&
533 strncmp(NameStr(listener
->relname
), relname
, NAMEDATALEN
) == 0)
535 /* Found the matching tuple, delete it */
536 simple_heap_delete(lRel
, &tuple
->t_self
);
539 * We assume there can be only one match, so no need to scan the
548 * We do not complain about unlistening something not being listened;
554 * Exec_UnlistenAll --- subroutine for AtCommit_Notify
556 * Update pg_listener to unlisten all relations for this backend.
559 Exec_UnlistenAll(Relation lRel
)
566 elog(DEBUG1
, "Exec_UnlistenAll");
568 /* Find and delete all entries with my listenerPID */
570 Anum_pg_listener_pid
,
571 BTEqualStrategyNumber
, F_INT4EQ
,
572 Int32GetDatum(MyProcPid
));
573 scan
= heap_beginscan(lRel
, SnapshotNow
, 1, key
);
575 while ((lTuple
= heap_getnext(scan
, ForwardScanDirection
)) != NULL
)
576 simple_heap_delete(lRel
, &lTuple
->t_self
);
582 * Send_Notify --- subroutine for AtCommit_Notify
584 * Scan pg_listener for tuples matching our pending notifies, and
585 * either signal the other backend or send a message to our own frontend.
588 Send_Notify(Relation lRel
)
590 TupleDesc tdesc
= RelationGetDescr(lRel
);
594 Datum value
[Natts_pg_listener
];
595 bool repl
[Natts_pg_listener
],
596 nulls
[Natts_pg_listener
];
598 /* preset data to update notify column to MyProcPid */
599 memset(nulls
, false, sizeof(nulls
));
600 memset(repl
, false, sizeof(repl
));
601 repl
[Anum_pg_listener_notify
- 1] = true;
602 memset(value
, 0, sizeof(value
));
603 value
[Anum_pg_listener_notify
- 1] = Int32GetDatum(MyProcPid
);
605 scan
= heap_beginscan(lRel
, SnapshotNow
, 0, NULL
);
607 while ((lTuple
= heap_getnext(scan
, ForwardScanDirection
)) != NULL
)
609 Form_pg_listener listener
= (Form_pg_listener
) GETSTRUCT(lTuple
);
610 char *relname
= NameStr(listener
->relname
);
611 int32 listenerPID
= listener
->listenerpid
;
613 if (!AsyncExistsPendingNotify(relname
))
616 if (listenerPID
== MyProcPid
)
619 * Self-notify: no need to bother with table update. Indeed, we
620 * *must not* clear the notification field in this path, or we
621 * could lose an outside notify, which'd be bad for applications
622 * that ignore self-notify messages.
625 elog(DEBUG1
, "AtCommit_Notify: notifying self");
627 NotifyMyFrontEnd(relname
, listenerPID
);
632 elog(DEBUG1
, "AtCommit_Notify: notifying pid %d",
636 * If someone has already notified this listener, we don't bother
637 * modifying the table, but we do still send a SIGUSR2 signal,
638 * just in case that backend missed the earlier signal for some
639 * reason. It's OK to send the signal first, because the other
640 * guy can't read pg_listener until we unlock it.
642 if (kill(listenerPID
, SIGUSR2
) < 0)
645 * Get rid of pg_listener entry if it refers to a PID that no
646 * longer exists. Presumably, that backend crashed without
647 * deleting its pg_listener entries. This code used to only
648 * delete the entry if errno==ESRCH, but as far as I can see
649 * we should just do it for any failure (certainly at least
652 simple_heap_delete(lRel
, &lTuple
->t_self
);
654 else if (listener
->notification
== 0)
656 /* Rewrite the tuple with my PID in notification column */
657 rTuple
= heap_modify_tuple(lTuple
, tdesc
, value
, nulls
, repl
);
658 simple_heap_update(lRel
, &lTuple
->t_self
, rTuple
);
660 #ifdef NOT_USED /* currently there are no indexes */
661 CatalogUpdateIndexes(lRel
, rTuple
);
673 * This is called at transaction abort.
675 * Gets rid of pending actions and outbound notifies that we would have
676 * executed if the transaction got committed.
681 ClearPendingActionsAndNotifies();
685 * AtSubStart_Notify() --- Take care of subtransaction start.
687 * Push empty state for the new subtransaction.
690 AtSubStart_Notify(void)
692 MemoryContext old_cxt
;
694 /* Keep the list-of-lists in TopTransactionContext for simplicity */
695 old_cxt
= MemoryContextSwitchTo(TopTransactionContext
);
697 upperPendingActions
= lcons(pendingActions
, upperPendingActions
);
699 Assert(list_length(upperPendingActions
) ==
700 GetCurrentTransactionNestLevel() - 1);
702 pendingActions
= NIL
;
704 upperPendingNotifies
= lcons(pendingNotifies
, upperPendingNotifies
);
706 Assert(list_length(upperPendingNotifies
) ==
707 GetCurrentTransactionNestLevel() - 1);
709 pendingNotifies
= NIL
;
711 MemoryContextSwitchTo(old_cxt
);
715 * AtSubCommit_Notify() --- Take care of subtransaction commit.
717 * Reassign all items in the pending lists to the parent transaction.
720 AtSubCommit_Notify(void)
722 List
*parentPendingActions
;
723 List
*parentPendingNotifies
;
725 parentPendingActions
= (List
*) linitial(upperPendingActions
);
726 upperPendingActions
= list_delete_first(upperPendingActions
);
728 Assert(list_length(upperPendingActions
) ==
729 GetCurrentTransactionNestLevel() - 2);
732 * Mustn't try to eliminate duplicates here --- see queue_listen()
734 pendingActions
= list_concat(parentPendingActions
, pendingActions
);
736 parentPendingNotifies
= (List
*) linitial(upperPendingNotifies
);
737 upperPendingNotifies
= list_delete_first(upperPendingNotifies
);
739 Assert(list_length(upperPendingNotifies
) ==
740 GetCurrentTransactionNestLevel() - 2);
743 * We could try to eliminate duplicates here, but it seems not worthwhile.
745 pendingNotifies
= list_concat(parentPendingNotifies
, pendingNotifies
);
749 * AtSubAbort_Notify() --- Take care of subtransaction abort.
752 AtSubAbort_Notify(void)
754 int my_level
= GetCurrentTransactionNestLevel();
757 * All we have to do is pop the stack --- the actions/notifies made in
758 * this subxact are no longer interesting, and the space will be freed
759 * when CurTransactionContext is recycled.
761 * This routine could be called more than once at a given nesting level if
762 * there is trouble during subxact abort. Avoid dumping core by using
763 * GetCurrentTransactionNestLevel as the indicator of how far we need to
766 while (list_length(upperPendingActions
) > my_level
- 2)
768 pendingActions
= (List
*) linitial(upperPendingActions
);
769 upperPendingActions
= list_delete_first(upperPendingActions
);
772 while (list_length(upperPendingNotifies
) > my_level
- 2)
774 pendingNotifies
= (List
*) linitial(upperPendingNotifies
);
775 upperPendingNotifies
= list_delete_first(upperPendingNotifies
);
780 * NotifyInterruptHandler
782 * This is the signal handler for SIGUSR2.
784 * If we are idle (notifyInterruptEnabled is set), we can safely invoke
785 * ProcessIncomingNotify directly. Otherwise, just set a flag
789 NotifyInterruptHandler(SIGNAL_ARGS
)
791 int save_errno
= errno
;
794 * Note: this is a SIGNAL HANDLER. You must be very wary what you do
795 * here. Some helpful soul had this routine sprinkled with TPRINTFs, which
796 * would likely lead to corruption of stdio buffers if they were ever
800 /* Don't joggle the elbow of proc_exit */
801 if (proc_exit_inprogress
)
804 if (notifyInterruptEnabled
)
806 bool save_ImmediateInterruptOK
= ImmediateInterruptOK
;
809 * We may be called while ImmediateInterruptOK is true; turn it off
810 * while messing with the NOTIFY state. (We would have to save and
811 * restore it anyway, because PGSemaphore operations inside
812 * ProcessIncomingNotify() might reset it.)
814 ImmediateInterruptOK
= false;
817 * I'm not sure whether some flavors of Unix might allow another
818 * SIGUSR2 occurrence to recursively interrupt this routine. To cope
819 * with the possibility, we do the same sort of dance that
820 * EnableNotifyInterrupt must do --- see that routine for comments.
822 notifyInterruptEnabled
= 0; /* disable any recursive signal */
823 notifyInterruptOccurred
= 1; /* do at least one iteration */
826 notifyInterruptEnabled
= 1;
827 if (!notifyInterruptOccurred
)
829 notifyInterruptEnabled
= 0;
830 if (notifyInterruptOccurred
)
832 /* Here, it is finally safe to do stuff. */
834 elog(DEBUG1
, "NotifyInterruptHandler: perform async notify");
836 ProcessIncomingNotify();
839 elog(DEBUG1
, "NotifyInterruptHandler: done");
844 * Restore ImmediateInterruptOK, and check for interrupts if needed.
846 ImmediateInterruptOK
= save_ImmediateInterruptOK
;
847 if (save_ImmediateInterruptOK
)
848 CHECK_FOR_INTERRUPTS();
853 * In this path it is NOT SAFE to do much of anything, except this:
855 notifyInterruptOccurred
= 1;
862 * EnableNotifyInterrupt
864 * This is called by the PostgresMain main loop just before waiting
865 * for a frontend command. If we are truly idle (ie, *not* inside
866 * a transaction block), then process any pending inbound notifies,
867 * and enable the signal handler to process future notifies directly.
869 * NOTE: the signal handler starts out disabled, and stays so until
870 * PostgresMain calls this the first time.
873 EnableNotifyInterrupt(void)
875 if (IsTransactionOrTransactionBlock())
876 return; /* not really idle */
879 * This code is tricky because we are communicating with a signal handler
880 * that could interrupt us at any point. If we just checked
881 * notifyInterruptOccurred and then set notifyInterruptEnabled, we could
882 * fail to respond promptly to a signal that happens in between those two
883 * steps. (A very small time window, perhaps, but Murphy's Law says you
884 * can hit it...) Instead, we first set the enable flag, then test the
885 * occurred flag. If we see an unserviced interrupt has occurred, we
886 * re-clear the enable flag before going off to do the service work. (That
887 * prevents re-entrant invocation of ProcessIncomingNotify() if another
888 * interrupt occurs.) If an interrupt comes in between the setting and
889 * clearing of notifyInterruptEnabled, then it will have done the service
890 * work and left notifyInterruptOccurred zero, so we have to check again
891 * after clearing enable. The whole thing has to be in a loop in case
892 * another interrupt occurs while we're servicing the first. Once we get
893 * out of the loop, enable is set and we know there is no unserviced
896 * NB: an overenthusiastic optimizing compiler could easily break this
897 * code. Hopefully, they all understand what "volatile" means these days.
901 notifyInterruptEnabled
= 1;
902 if (!notifyInterruptOccurred
)
904 notifyInterruptEnabled
= 0;
905 if (notifyInterruptOccurred
)
908 elog(DEBUG1
, "EnableNotifyInterrupt: perform async notify");
910 ProcessIncomingNotify();
913 elog(DEBUG1
, "EnableNotifyInterrupt: done");
919 * DisableNotifyInterrupt
921 * This is called by the PostgresMain main loop just after receiving
922 * a frontend command. Signal handler execution of inbound notifies
923 * is disabled until the next EnableNotifyInterrupt call.
925 * The SIGUSR1 signal handler also needs to call this, so as to
926 * prevent conflicts if one signal interrupts the other. So we
927 * must return the previous state of the flag.
930 DisableNotifyInterrupt(void)
932 bool result
= (notifyInterruptEnabled
!= 0);
934 notifyInterruptEnabled
= 0;
940 * ProcessIncomingNotify
942 * Deal with arriving NOTIFYs from other backends.
943 * This is called either directly from the SIGUSR2 signal handler,
944 * or the next time control reaches the outer idle loop.
945 * Scan pg_listener for arriving notifies, report them to my front end,
946 * and clear the notification field in pg_listener until next time.
948 * NOTE: since we are outside any transaction, we must create our own.
951 ProcessIncomingNotify(void)
959 Datum value
[Natts_pg_listener
];
960 bool repl
[Natts_pg_listener
],
961 nulls
[Natts_pg_listener
];
962 bool catchup_enabled
;
964 /* Must prevent SIGUSR1 interrupt while I am running */
965 catchup_enabled
= DisableCatchupInterrupt();
968 elog(DEBUG1
, "ProcessIncomingNotify");
970 set_ps_display("notify interrupt", false);
972 notifyInterruptOccurred
= 0;
974 StartTransactionCommand();
976 lRel
= heap_open(ListenerRelationId
, ExclusiveLock
);
977 tdesc
= RelationGetDescr(lRel
);
979 /* Scan only entries with my listenerPID */
981 Anum_pg_listener_pid
,
982 BTEqualStrategyNumber
, F_INT4EQ
,
983 Int32GetDatum(MyProcPid
));
984 scan
= heap_beginscan(lRel
, SnapshotNow
, 1, key
);
986 /* Prepare data for rewriting 0 into notification field */
987 memset(nulls
, false, sizeof(nulls
));
988 memset(repl
, false, sizeof(repl
));
989 repl
[Anum_pg_listener_notify
- 1] = true;
990 memset(value
, 0, sizeof(value
));
991 value
[Anum_pg_listener_notify
- 1] = Int32GetDatum(0);
993 while ((lTuple
= heap_getnext(scan
, ForwardScanDirection
)) != NULL
)
995 Form_pg_listener listener
= (Form_pg_listener
) GETSTRUCT(lTuple
);
996 char *relname
= NameStr(listener
->relname
);
997 int32 sourcePID
= listener
->notification
;
1001 /* Notify the frontend */
1004 elog(DEBUG1
, "ProcessIncomingNotify: received %s from %d",
1005 relname
, (int) sourcePID
);
1007 NotifyMyFrontEnd(relname
, sourcePID
);
1010 * Rewrite the tuple with 0 in notification column.
1012 rTuple
= heap_modify_tuple(lTuple
, tdesc
, value
, nulls
, repl
);
1013 simple_heap_update(lRel
, &lTuple
->t_self
, rTuple
);
1015 #ifdef NOT_USED /* currently there are no indexes */
1016 CatalogUpdateIndexes(lRel
, rTuple
);
1023 * We do NOT release the lock on pg_listener here; we need to hold it
1024 * until end of transaction (which is about to happen, anyway) to ensure
1025 * that other backends see our tuple updates when they look. Otherwise, a
1026 * transaction started after this one might mistakenly think it doesn't
1027 * need to send this backend a new NOTIFY.
1029 heap_close(lRel
, NoLock
);
1031 CommitTransactionCommand();
1034 * Must flush the notify messages to ensure frontend gets them promptly.
1038 set_ps_display("idle", false);
1041 elog(DEBUG1
, "ProcessIncomingNotify: done");
1043 if (catchup_enabled
)
1044 EnableCatchupInterrupt();
1048 * Send NOTIFY message to my front end.
1051 NotifyMyFrontEnd(char *relname
, int32 listenerPID
)
1053 if (whereToSendOutput
== DestRemote
)
1057 pq_beginmessage(&buf
, 'A');
1058 pq_sendint(&buf
, listenerPID
, sizeof(int32
));
1059 pq_sendstring(&buf
, relname
);
1060 if (PG_PROTOCOL_MAJOR(FrontendProtocol
) >= 3)
1062 /* XXX Add parameter string here later */
1063 pq_sendstring(&buf
, "");
1065 pq_endmessage(&buf
);
1068 * NOTE: we do not do pq_flush() here. For a self-notify, it will
1069 * happen at the end of the transaction, and for incoming notifies
1070 * ProcessIncomingNotify will do it after finding all the notifies.
1074 elog(INFO
, "NOTIFY for %s", relname
);
1077 /* Does pendingNotifies include the given relname? */
1079 AsyncExistsPendingNotify(const char *relname
)
1083 foreach(p
, pendingNotifies
)
1085 const char *prelname
= (const char *) lfirst(p
);
1087 if (strcmp(prelname
, relname
) == 0)
1094 /* Clear the pendingActions and pendingNotifies lists. */
1096 ClearPendingActionsAndNotifies(void)
1099 * We used to have to explicitly deallocate the list members and nodes,
1100 * because they were malloc'd. Now, since we know they are palloc'd in
1101 * CurTransactionContext, we need not do that --- they'll go away
1102 * automatically at transaction exit. We need only reset the list head
1105 pendingActions
= NIL
;
1106 pendingNotifies
= NIL
;
1110 * 2PC processing routine for COMMIT PREPARED case.
1112 * (We don't have to do anything for ROLLBACK PREPARED.)
1115 notify_twophase_postcommit(TransactionId xid
, uint16 info
,
1116 void *recdata
, uint32 len
)
1119 * Set up to issue the NOTIFY at the end of my own current transaction.
1120 * (XXX this has some issues if my own transaction later rolls back, or if
1121 * there is any significant delay before I commit. OK for now because we
1122 * disallow COMMIT PREPARED inside a transaction block.)
1124 Async_Notify((char *) recdata
);