diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index fe43bc0c76..899519188b 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -284,15 +284,13 @@ typedef struct BackgroundWorker - If a background worker sends asynchronous notifications with the - NOTIFY command via the Server Programming Interface - (SPI), it should call - ProcessCompletedNotifies explicitly after committing - the enclosing transaction so that any notifications can be delivered. If a - background worker registers to receive asynchronous notifications with - the LISTEN through SPI, the worker - will log those notifications, but there is no programmatic way for the - worker to intercept and respond to those notifications. + Background workers can send asynchronous notification messages, either by + using the NOTIFY command via SPI, + or directly via Async_Notify(). Such notifications + will be sent at transaction commit. + Background workers should not register to receive asynchronous + notifications with the LISTEN command, as there is no + infrastructure for a worker to consume such notifications. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 387f80419a..6597ec45a9 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2269,7 +2269,17 @@ CommitTransaction(void) */ smgrDoPendingDeletes(true); + /* + * Send out notification signals to other backends (and do other + * post-commit NOTIFY cleanup). This must not happen until after our + * transaction is fully done from the viewpoint of other backends. + */ AtCommit_Notify(); + + /* + * Everything after this should be purely internal-to-this-backend + * cleanup. + */ AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_Enum(); diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4b16fb5682..409fece1e0 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -68,17 +68,27 @@ * CommitTransaction() which will then do the actual transaction commit. * * After commit we are called another time (AtCommit_Notify()). Here we - * make the actual updates to the effective listen state (listenChannels). + * make any actual updates to the effective listen state (listenChannels). + * Then we signal any backends that may be interested in our messages + * (including our own backend, if listening). This is done by + * SignalBackends(), which scans the list of listening backends and sends a + * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't + * know which backend is listening on which channel so we must signal them + * all). We can exclude backends that are already up to date, though, and + * we can also exclude backends that are in other databases (unless they + * are way behind and should be kicked to make them advance their + * pointers). * - * Finally, after we are out of the transaction altogether, we check if - * we need to signal listening backends. In SignalBackends() we scan the - * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal - * to every listening backend (we don't know which backend is listening on - * which channel so we must signal them all). We can exclude backends that - * are already up to date, though, and we can also exclude backends that - * are in other databases (unless they are way behind and should be kicked - * to make them advance their pointers). We don't bother with a - * self-signal either, but just process the queue directly. + * Finally, after we are out of the transaction altogether and about to go + * idle, we scan the queue for messages that need to be sent to our + * frontend (which might be notifies from other backends, or self-notifies + * from our own). This step is not part of the CommitTransaction sequence + * for two important reasons. First, we could get errors while sending + * data to our frontend, and it's really bad for errors to happen in + * post-commit cleanup. Second, in cases where a procedure issues commits + * within a single frontend command, we don't want to send notifies to our + * frontend until the command is done; but notifies to other backends + * should go out immediately after each commit. * * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * sets the process's latch, which triggers the event to be processed @@ -426,11 +436,8 @@ static bool unlistenExitRegistered = false; /* True if we're currently registered as a listener in asyncQueueControl */ static bool amRegisteredListener = false; -/* has this backend sent notifications in the current transaction? */ -static bool backendHasSentNotifications = false; - /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ -static bool backendTryAdvanceTail = false; +static bool tryAdvanceTail = false; /* GUC parameter */ bool Trace_notify = false; @@ -459,7 +466,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, char *page_buffer, Snapshot snapshot); static void asyncQueueAdvanceTail(void); -static void ProcessIncomingNotify(void); +static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); static void AddEventToPendingNotifies(Notification *n); static uint32 notification_hash(const void *key, Size keysize); @@ -950,8 +957,6 @@ PreCommit_Notify(void) AccessExclusiveLock); /* Now push the notifications into the queue */ - backendHasSentNotifications = true; - nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) { @@ -976,6 +981,8 @@ PreCommit_Notify(void) nextNotify = asyncQueueAddEntries(nextNotify); LWLockRelease(NotifyQueueLock); } + + /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ } } @@ -985,6 +992,11 @@ PreCommit_Notify(void) * This is called at transaction commit, after committing to clog. * * Update listenChannels and clear transaction-local state. + * + * If we issued any notifications in the transaction, send signals to + * listening backends (possibly including ourselves) to process them. + * Also, if we filled enough queue pages with new notifies, try to + * advance the queue tail pointer. */ void AtCommit_Notify(void) @@ -1027,6 +1039,29 @@ AtCommit_Notify(void) if (amRegisteredListener && listenChannels == NIL) asyncQueueUnregister(); + /* + * Send signals to listening backends. We need do this only if there are + * pending notifies, which were previously added to the shared queue by + * PreCommit_Notify(). + */ + if (pendingNotifies != NULL) + SignalBackends(); + + /* + * If it's time to try to advance the global tail pointer, do that. + * + * (It might seem odd to do this in the sender, when more than likely the + * listeners won't yet have read the messages we just sent. However, + * there's less contention if only the sender does it, and there is little + * need for urgency in advancing the global tail. So this typically will + * be clearing out messages that were sent some time ago.) + */ + if (tryAdvanceTail) + { + tryAdvanceTail = false; + asyncQueueAdvanceTail(); + } + /* And clean up */ ClearPendingActionsAndNotifies(); } @@ -1200,82 +1235,17 @@ Exec_UnlistenAllCommit(void) } /* - * ProcessCompletedNotifies --- send out signals and self-notifies + * ProcessCompletedNotifies --- nowadays this does nothing * - * This is called from postgres.c just before going idle at the completion - * of a transaction. If we issued any notifications in the just-completed - * transaction, send signals to other backends to process them, and also - * process the queue ourselves to send messages to our own frontend. - * Also, if we filled enough queue pages with new notifies, try to advance - * the queue tail pointer. - * - * The reason that this is not done in AtCommit_Notify is that there is - * a nonzero chance of errors here (for example, encoding conversion errors - * while trying to format messages to our frontend). An error during - * AtCommit_Notify would be a PANIC condition. The timing is also arranged - * to ensure that a transaction's self-notifies are delivered to the frontend - * before it gets the terminating ReadyForQuery message. - * - * Note that we send signals and process the queue even if the transaction - * eventually aborted. This is because we need to clean out whatever got - * added to the queue. - * - * NOTE: we are outside of any transaction here. + * This routine used to send signals and handle self-notifies, + * but that functionality has been moved elsewhere. + * We'd delete it entirely, except that the documentation used to instruct + * background-worker authors to call it. To avoid an ABI break in stable + * branches, keep it as a no-op routine. */ void ProcessCompletedNotifies(void) { - MemoryContext caller_context; - - /* Nothing to do if we didn't send any notifications */ - if (!backendHasSentNotifications) - return; - - /* - * We reset the flag immediately; otherwise, if any sort of error occurs - * below, we'd be locked up in an infinite loop, because control will come - * right back here after error cleanup. - */ - backendHasSentNotifications = false; - - /* - * We must preserve the caller's memory context (probably MessageContext) - * across the transaction we do here. - */ - caller_context = CurrentMemoryContext; - - if (Trace_notify) - elog(DEBUG1, "ProcessCompletedNotifies"); - - /* - * We must run asyncQueueReadAllNotifications inside a transaction, else - * bad things happen if it gets an error. - */ - StartTransactionCommand(); - - /* Send signals to other backends */ - SignalBackends(); - - if (listenChannels != NIL) - { - /* Read the queue ourselves, and send relevant stuff to the frontend */ - asyncQueueReadAllNotifications(); - } - - /* - * If it's time to try to advance the global tail pointer, do that. - */ - if (backendTryAdvanceTail) - { - backendTryAdvanceTail = false; - asyncQueueAdvanceTail(); - } - - CommitTransactionCommand(); - - MemoryContextSwitchTo(caller_context); - - /* We don't need pq_flush() here since postgres.c will do one shortly */ } /* @@ -1543,7 +1513,7 @@ asyncQueueAddEntries(ListCell *nextNotify) * pointer (we don't want to actually do that right here). */ if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) - backendTryAdvanceTail = true; + tryAdvanceTail = true; /* And exit the loop */ break; @@ -1658,8 +1628,6 @@ asyncQueueFillWarning(void) /* * Send signals to listening backends. * - * We never signal our own process; that should be handled by our caller. - * * Normally we signal only backends in our own database, since only those * backends could be interested in notifies we send. However, if there's * notify traffic in our database but no traffic in another database that @@ -1668,6 +1636,9 @@ asyncQueueFillWarning(void) * advance their queue position pointers, allowing the global tail to advance. * * Since we know the BackendId and the Pid the signaling is quite cheap. + * + * This is called during CommitTransaction(), so it's important for it + * to have very low probability of failure. */ static void SignalBackends(void) @@ -1682,8 +1653,7 @@ SignalBackends(void) * list of target PIDs. * * XXX in principle these pallocs could fail, which would be bad. Maybe - * preallocate the arrays? But in practice this is only run in trivial - * transactions, so there should surely be space available. + * preallocate the arrays? They're not that large, though. */ pids = (int32 *) palloc(MaxBackends * sizeof(int32)); ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); @@ -1696,8 +1666,6 @@ SignalBackends(void) QueuePosition pos; Assert(pid != InvalidPid); - if (pid == MyProcPid) - continue; /* never signal self */ pos = QUEUE_BACKEND_POS(i); if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) { @@ -1730,6 +1698,16 @@ SignalBackends(void) { int32 pid = pids[i]; + /* + * If we are signaling our own process, no need to involve the kernel; + * just set the flag directly. + */ + if (pid == MyProcPid) + { + notifyInterruptPending = true; + continue; + } + /* * Note: assuming things aren't broken, a signal failure here could * only occur if the target backend exited since we released @@ -1910,15 +1888,20 @@ HandleNotifyInterrupt(void) * via the process's latch, and this routine will get called. * If we are truly idle (ie, *not* inside a transaction block), * process the incoming notifies. + * + * If "flush" is true, force any frontend messages out immediately. + * This can be false when being called at the end of a frontend command, + * since we'll flush after sending ReadyForQuery. */ void -ProcessNotifyInterrupt(void) +ProcessNotifyInterrupt(bool flush) { if (IsTransactionOrTransactionBlock()) return; /* not really idle */ + /* Loop in case another signal arrives while sending messages */ while (notifyInterruptPending) - ProcessIncomingNotify(); + ProcessIncomingNotify(flush); } @@ -2180,6 +2163,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. + * + * This is (usually) called during CommitTransaction(), so it's important for + * it to have very low probability of failure. */ static void asyncQueueAdvanceTail(void) @@ -2253,17 +2239,16 @@ asyncQueueAdvanceTail(void) /* * ProcessIncomingNotify * - * Deal with arriving NOTIFYs from other backends as soon as it's safe to - * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT - * signal handler, but isn't anymore. + * Scan the queue for arriving notifications and report them to the front + * end. The notifications might be from other sessions, or our own; + * there's no need to distinguish here. * - * Scan the queue for arriving notifications and report them to my front - * end. + * If "flush" is true, force any frontend messages out immediately. * * NOTE: since we are outside any transaction, we must create our own. */ static void -ProcessIncomingNotify(void) +ProcessIncomingNotify(bool flush) { /* We *must* reset the flag */ notifyInterruptPending = false; @@ -2288,9 +2273,11 @@ ProcessIncomingNotify(void) CommitTransactionCommand(); /* - * Must flush the notify messages to ensure frontend gets them promptly. + * If this isn't an end-of-command case, we must flush the notify messages + * to ensure frontend gets them promptly. */ - pq_flush(); + if (flush) + pq_flush(); set_ps_display("idle"); @@ -2315,9 +2302,9 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) pq_endmessage(&buf); /* - * NOTE: we do not do pq_flush() here. For a self-notify, it will - * happen at the end of the transaction, and for incoming notifies - * ProcessIncomingNotify will do it after finding all the notifies. + * NOTE: we do not do pq_flush() here. Some level of caller will + * handle it later, allowing this message to be combined into a packet + * with other ones. */ } else diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 8cea10c901..9d2d43c452 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -504,7 +504,7 @@ ProcessClientReadInterrupt(bool blocked) /* Process notify interrupts, if any */ if (notifyInterruptPending) - ProcessNotifyInterrupt(); + ProcessNotifyInterrupt(true); } else if (ProcDiePending) { @@ -4371,17 +4371,15 @@ PostgresMain(int argc, char *argv[], } else { - /* Send out notify signals and transmit self-notifies */ - ProcessCompletedNotifies(); - /* - * Also process incoming notifies, if any. This is mostly to - * ensure stable behavior in tests: if any notifies were - * received during the just-finished transaction, they'll be - * seen by the client before ReadyForQuery is. + * Process incoming notifies (including self-notifies), if + * any, and send relevant messages to the client. Doing it + * here helps ensure stable behavior in tests: if any notifies + * were received during the just-finished transaction, they'll + * be seen by the client before ReadyForQuery is. */ if (notifyInterruptPending) - ProcessNotifyInterrupt(); + ProcessNotifyInterrupt(false); pgstat_report_stat(false); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index 9217f66b91..85bfb24768 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -49,6 +49,6 @@ extern void ProcessCompletedNotifies(void); extern void HandleNotifyInterrupt(void); /* process interrupts */ -extern void ProcessNotifyInterrupt(void); +extern void ProcessNotifyInterrupt(bool flush); #endif /* ASYNC_H */