diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 898c497d12..0d48dfa494 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -52,6 +52,7 @@ #include "postmaster/bgworker.h" #include "postmaster/postmaster.h" +#include "postmaster/walwriter.h" #include "replication/decode.h" #include "replication/logical.h" @@ -1027,6 +1028,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool endofstream = false; TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; + long wait_time; CHECK_FOR_INTERRUPTS(); @@ -1114,11 +1116,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) len = walrcv_receive(wrconn, &buf, &fd); } - - /* confirm all writes at once */ - send_feedback(last_received, false, false); } + /* confirm all writes so far */ + send_feedback(last_received, false, false); + if (!in_remote_transaction) { /* @@ -1147,12 +1149,21 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } /* - * Wait for more data or latch. + * Wait for more data or latch. If we have unflushed transactions, + * wake up after WalWriterDelay to see if they've been flushed yet (in + * which case we should send a feedback message). Otherwise, there's + * no particular urgency about waking up unless we get data or a + * signal. */ + if (!dlist_is_empty(&lsn_mapping)) + wait_time = WalWriterDelay; + else + wait_time = NAPTIME_PER_CYCLE; + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, - fd, NAPTIME_PER_CYCLE, + fd, wait_time, WAIT_EVENT_LOGICAL_APPLY_MAIN); /* Emergency bailout if postmaster has died */