diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 362de12457..04813b506e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg int attnum; } SlotErrCallbackArg; -static MemoryContext ApplyContext = NULL; -MemoryContext ApplyCacheContext = NULL; +static MemoryContext ApplyMessageContext = NULL; +MemoryContext ApplyContext = NULL; WalReceiverConn *wrconn = NULL; @@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) /* * Make sure that we started local transaction. * - * Also switches to ApplyContext as necessary. + * Also switches to ApplyMessageContext as necessary. */ static bool ensure_transaction(void) { if (IsTransactionState()) { - if (CurrentMemoryContext != ApplyContext) - MemoryContextSwitchTo(ApplyContext); + if (CurrentMemoryContext != ApplyMessageContext) + MemoryContextSwitchTo(ApplyMessageContext); + return false; } @@ -162,7 +163,7 @@ ensure_transaction(void) if (!MySubscriptionValid) reread_subscription(); - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); return true; } @@ -961,7 +962,7 @@ store_flush_position(XLogRecPtr remote_lsn) FlushPosition *flushpos; /* Need to do this in permanent context */ - MemoryContextSwitchTo(ApplyCacheContext); + MemoryContextSwitchTo(ApplyContext); /* Track commit lsn */ flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); @@ -969,7 +970,7 @@ store_flush_position(XLogRecPtr remote_lsn) flushpos->remote_end = remote_lsn; dlist_push_tail(&lsn_mapping, &flushpos->node); - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); } @@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) static void LogicalRepApplyLoop(XLogRecPtr last_received) { - /* Init the ApplyContext which we use for easier cleanup. */ - ApplyContext = AllocSetContextCreate(TopMemoryContext, - "ApplyContext", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); + /* + * Init the ApplyMessageContext which we clean up after each + * replication protocol message. + */ + ApplyMessageContext = AllocSetContextCreate(ApplyContext, + "ApplyMessageContext", + ALLOCSET_DEFAULT_SIZES); /* mark as idle, before starting to loop */ pgstat_report_activity(STATE_IDLE, NULL); @@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) TimestampTz last_recv_timestamp = GetCurrentTimestamp(); bool ping_sent = false; - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); len = walrcv_receive(wrconn, &buf, &fd); @@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) ping_sent = false; /* Ensure we are reading the data into our memory context. */ - MemoryContextSwitchTo(ApplyContext); + MemoryContextSwitchTo(ApplyMessageContext); s.data = buf; s.len = len; @@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, timestamp, true); } /* other message types are purposefully ignored */ + + MemoryContextReset(ApplyMessageContext); } len = walrcv_receive(wrconn, &buf, &fd); @@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) } /* Cleanup the memory. */ - MemoryContextResetAndDeleteChildren(ApplyContext); + MemoryContextResetAndDeleteChildren(ApplyMessageContext); MemoryContextSwitchTo(TopMemoryContext); /* Check if we need to exit the streaming loop. */ @@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) if (!reply_message) { - MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext); + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); reply_message = makeStringInfo(); MemoryContextSwitchTo(oldctx); } @@ -1308,7 +1312,7 @@ reread_subscription(void) } /* Ensure allocations in permanent context. */ - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); newsub = GetSubscription(MyLogicalRepWorker->subid, true); @@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->userid); /* Load the subscription into persistent memory context. */ - CreateCacheMemoryContext(); - ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext, - "ApplyCacheContext", + ApplyContext = AllocSetContextCreate(TopMemoryContext, + "ApplyContext", ALLOCSET_DEFAULT_SIZES); StartTransactionCommand(); - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); MySubscription = GetSubscription(MyLogicalRepWorker->subid, false); MySubscriptionValid = true; MemoryContextSwitchTo(oldctx); @@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg) syncslotname = LogicalRepSyncTableStart(&origin_startpos); /* The slot name needs to be allocated in permanent memory context. */ - oldctx = MemoryContextSwitchTo(ApplyCacheContext); + oldctx = MemoryContextSwitchTo(ApplyContext); myslotname = pstrdup(syncslotname); MemoryContextSwitchTo(oldctx); diff --git a/src/backend/utils/mmgr/README b/src/backend/utils/mmgr/README index 480b1f89d0..387c337985 100644 --- a/src/backend/utils/mmgr/README +++ b/src/backend/utils/mmgr/README @@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees, and don't actually need any storage allocated in their private contexts. +Logical Replication Worker Contexts +----------------------------------- + +ApplyContext --- permanent during whole lifetime of apply worker. It +is possible to use TopMemoryContext here as well, but for simplicity +of memory usage analysis we spin up different context. + +ApplyMessageContext --- short-lived context that is reset after each +logical replication protocol message is processed. + + Transient Contexts During Execution ----------------------------------- diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index f6fee102b2..26788fec5c 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -56,8 +56,8 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; -/* Memory context for cached variables in apply worker. */ -extern MemoryContext ApplyCacheContext; +/* Main memory context for apply worker. Permanent during worker lifetime. */ +extern MemoryContext ApplyContext; /* libpqreceiver connection */ extern struct WalReceiverConn *wrconn;