diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index f26269b5ea..ee01df589f 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -75,8 +75,10 @@ * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal * to every listening backend (we don't know which backend is listening on * which channel so we must signal them all). We can exclude backends that - * are already up to date, though. We don't bother with a self-signal - * either, but just process the queue directly. + * are already up to date, though, and we can also exclude backends that + * are in other databases (unless they are way behind and should be kicked + * to make them advance their pointers). We don't bother with a + * self-signal either, but just process the queue directly. * * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * sets the process's latch, which triggers the event to be processed @@ -89,13 +91,14 @@ * Inbound-notify processing consists of reading all of the notifications * that have arrived since scanning last time. We read every notification * until we reach either a notification from an uncommitted transaction or - * the head pointer's position. Then we check if we were the laziest - * backend: if our pointer is set to the same position as the global tail - * pointer is set, then we move the global tail pointer ahead to where the - * second-laziest backend is (in general, we take the MIN of the current - * head position and all active backends' new tail pointers). Whenever we - * move the global tail pointer we also truncate now-unused pages (i.e., - * delete files in pg_notify/ that are no longer used). + * the head pointer's position. + * + * 6. To avoid SLRU wraparound and limit disk space consumption, the tail + * pointer needs to be advanced so that old pages can be truncated. + * This is relatively expensive (notably, it requires an exclusive lock), + * so we don't want to do it often. We make sending backends do this work + * if they advanced the queue head into a new page, but only once every + * QUEUE_CLEANUP_DELAY pages. * * An application that listens on the same channel it notifies will get * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, @@ -211,6 +214,19 @@ typedef struct QueuePosition (x).page != (y).page ? (x) : \ (x).offset > (y).offset ? (x) : (y)) +/* + * Parameter determining how often we try to advance the tail pointer: + * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is + * also the distance by which a backend in another database needs to be + * behind before we'll decide we need to wake it up to advance its pointer. + * + * Resist the temptation to make this really large. While that would save + * work in some places, it would add cost in others. In particular, this + * should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends + * catch up before the pages they'll need to read fall out of SLRU cache. + */ +#define QUEUE_CLEANUP_DELAY 4 + /* * Struct describing a listening backend's status */ @@ -252,8 +268,8 @@ typedef struct QueueBackendStatus typedef struct AsyncQueueControl { QueuePosition head; /* head points to the next free location */ - QueuePosition tail; /* the global tail is equivalent to the pos of - * the "slowest" backend */ + QueuePosition tail; /* tail must be <= the queue position of every + * listening backend */ BackendId firstListener; /* id of first listener, or InvalidBackendId */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; @@ -402,10 +418,14 @@ static bool amRegisteredListener = false; /* has this backend sent notifications in the current transaction? */ static bool backendHasSentNotifications = false; +/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ +static bool backendTryAdvanceTail = false; + /* GUC parameter */ bool Trace_notify = false; /* local function prototypes */ +static int asyncQueuePageDiff(int p, int q); static bool asyncQueuePagePrecedes(int p, int q); static void queue_listen(ListenActionKind action, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); @@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); -static bool SignalBackends(void); +static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, @@ -436,10 +456,11 @@ static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); /* - * We will work on the page range of 0..QUEUE_MAX_PAGE. + * Compute the difference between two queue page numbers (i.e., p - q), + * accounting for wraparound. */ -static bool -asyncQueuePagePrecedes(int p, int q) +static int +asyncQueuePageDiff(int p, int q) { int diff; @@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q) diff -= QUEUE_MAX_PAGE + 1; else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) diff += QUEUE_MAX_PAGE + 1; - return diff < 0; + return diff; +} + +/* Is p < q, accounting for wraparound? */ +static bool +asyncQueuePagePrecedes(int p, int q) +{ + return asyncQueuePageDiff(p, q) < 0; } /* @@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void) * notification to the frontend. Also, although our transaction might * have executed NOTIFY, those message(s) aren't queued yet so we can't * see them in the queue. - * - * This will also advance the global tail pointer if possible. */ if (!QUEUE_POS_EQUAL(max, head)) asyncQueueReadAllNotifications(); @@ -1138,6 +1164,8 @@ Exec_UnlistenAllCommit(void) * of a transaction. If we issued any notifications in the just-completed * transaction, send signals to other backends to process them, and also * process the queue ourselves to send messages to our own frontend. + * Also, if we filled enough queue pages with new notifies, try to advance + * the queue tail pointer. * * The reason that this is not done in AtCommit_Notify is that there is * a nonzero chance of errors here (for example, encoding conversion errors @@ -1156,7 +1184,6 @@ void ProcessCompletedNotifies(void) { MemoryContext caller_context; - bool signalled; /* Nothing to do if we didn't send any notifications */ if (!backendHasSentNotifications) @@ -1185,23 +1212,20 @@ ProcessCompletedNotifies(void) StartTransactionCommand(); /* Send signals to other backends */ - signalled = SignalBackends(); + SignalBackends(); if (listenChannels != NIL) { /* Read the queue ourselves, and send relevant stuff to the frontend */ asyncQueueReadAllNotifications(); } - else if (!signalled) + + /* + * If it's time to try to advance the global tail pointer, do that. + */ + if (backendTryAdvanceTail) { - /* - * If we found no other listening backends, and we aren't listening - * ourselves, then we must execute asyncQueueAdvanceTail to flush the - * queue, because ain't nobody else gonna do it. This prevents queue - * overflow when we're sending useless notifies to nobody. (A new - * listener could have joined since we looked, but if so this is - * harmless.) - */ + backendTryAdvanceTail = false; asyncQueueAdvanceTail(); } @@ -1242,8 +1266,6 @@ IsListeningOn(const char *channel) static void asyncQueueUnregister(void) { - bool advanceTail; - Assert(listenChannels == NIL); /* else caller error */ if (!amRegisteredListener) /* nothing to do */ @@ -1253,10 +1275,7 @@ asyncQueueUnregister(void) * Need exclusive lock here to manipulate list links. */ LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); - /* check if entry is valid and oldest ... */ - advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && - QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); - /* ... then mark it invalid */ + /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; /* and remove it from the list */ @@ -1278,10 +1297,6 @@ asyncQueueUnregister(void) /* mark ourselves as no longer listed in the global array */ amRegisteredListener = false; - - /* If we were the laziest backend, try to advance the tail pointer */ - if (advanceTail) - asyncQueueAdvanceTail(); } /* @@ -1467,6 +1482,15 @@ asyncQueueAddEntries(ListCell *nextNotify) * page without overrunning the queue. */ slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head)); + + /* + * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, + * set flag to remember that we should try to advance the tail + * pointer (we don't want to actually do that right here). + */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + backendTryAdvanceTail = true; + /* And exit the loop */ break; } @@ -1570,31 +1594,30 @@ asyncQueueFillWarning(void) } /* - * Send signals to all listening backends (except our own). + * Send signals to listening backends. * - * Returns true if we sent at least one signal. + * We never signal our own process; that should be handled by our caller. * - * Since we need EXCLUSIVE lock anyway we also check the position of the other - * backends and in case one is already up-to-date we don't signal it. - * This can happen if concurrent notifying transactions have sent a signal and - * the signaled backend has read the other notifications and ours in the same - * step. + * Normally we signal only backends in our own database, since only those + * backends could be interested in notifies we send. However, if there's + * notify traffic in our database but no traffic in another database that + * does have listener(s), those listeners will fall further and further + * behind. Waken them anyway if they're far enough behind, so that they'll + * advance their queue position pointers, allowing the global tail to advance. * * Since we know the BackendId and the Pid the signalling is quite cheap. */ -static bool +static void SignalBackends(void) { - bool signalled = false; int32 *pids; BackendId *ids; int count; - int32 pid; /* - * Identify all backends that are listening and not already up-to-date. We - * don't want to send signals while holding the AsyncQueueLock, so we just - * build a list of target PIDs. + * Identify backends that we need to signal. We don't want to send + * signals while holding the AsyncQueueLock, so this loop just builds a + * list of target PIDs. * * XXX in principle these pallocs could fail, which would be bad. Maybe * preallocate the arrays? But in practice this is only run in trivial @@ -1607,26 +1630,43 @@ SignalBackends(void) LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { - pid = QUEUE_BACKEND_PID(i); - Assert(pid != InvalidPid); - if (pid != MyProcPid) - { - QueuePosition pos = QUEUE_BACKEND_POS(i); + int32 pid = QUEUE_BACKEND_PID(i); + QueuePosition pos; - if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) - { - pids[count] = pid; - ids[count] = i; - count++; - } + Assert(pid != InvalidPid); + if (pid == MyProcPid) + continue; /* never signal self */ + pos = QUEUE_BACKEND_POS(i); + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + { + /* + * Always signal listeners in our own database, unless they're + * already caught up (unlikely, but possible). + */ + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + continue; } + else + { + /* + * Listeners in other databases should be signaled only if they + * are far behind. + */ + if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), + QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) + continue; + } + /* OK, need to signal this one */ + pids[count] = pid; + ids[count] = i; + count++; } LWLockRelease(AsyncQueueLock); /* Now send signals */ for (int i = 0; i < count; i++) { - pid = pids[i]; + int32 pid = pids[i]; /* * Note: assuming things aren't broken, a signal failure here could @@ -1636,14 +1676,10 @@ SignalBackends(void) */ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) elog(DEBUG3, "could not signal backend with PID %d: %m", pid); - else - signalled = true; } pfree(pids); pfree(ids); - - return signalled; } /* @@ -1844,7 +1880,6 @@ asyncQueueReadAllNotifications(void) QueuePosition oldpos; QueuePosition head; Snapshot snapshot; - bool advanceTail; /* page_buffer must be adequately aligned, so use a union */ union @@ -1966,13 +2001,8 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(AsyncQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyBackendId) = pos; - advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); LWLockRelease(AsyncQueueLock); - /* If we were the laziest backend, try to advance the tail pointer */ - if (advanceTail) - asyncQueueAdvanceTail(); - PG_RE_THROW(); } PG_END_TRY(); @@ -1980,13 +2010,8 @@ asyncQueueReadAllNotifications(void) /* Update shared state */ LWLockAcquire(AsyncQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyBackendId) = pos; - advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); LWLockRelease(AsyncQueueLock); - /* If we were the laziest backend, try to advance the tail pointer */ - if (advanceTail) - asyncQueueAdvanceTail(); - /* Done with snapshot */ UnregisterSnapshot(snapshot); }