diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 4211d31f30..bf4c61ccfb 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset prepared with . + + + + pg_log_standby_snapshot + + pg_log_standby_snapshot () + pg_lsn + + + Take a snapshot of running transactions and write it to WAL, without + having to wait bgwriter or checkpointer to log one. This is useful for + logical decoding on standby, as logical slot creation has to wait + until such a record is replayed on the standby. + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 4e912b4bd4..ebe0376e3e 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU may consume changes from a slot at any given time. + + A logical replication slot can also be created on a hot standby. To prevent + VACUUM from removing required rows from the system + catalogs, hot_standby_feedback should be set on the + standby. In spite of that, if any required rows get removed, the slot gets + invalidated. It's highly recommended to use a physical slot between the primary + and the standby. Otherwise, hot_standby_feedback will work, but only while the + connection is alive (for example a node restart would break it). Then, the + primary may delete system catalog rows that could be needed by the logical + decoding on the standby (as it does not know about the catalog_xmin on the + standby). Existing logical slots on standby also get invalidated if wal_level + on primary is reduced to less than 'logical'. This is done as soon as the + standby detects such a change in the WAL stream. It means, that for walsenders + that are lagging (if any), some WAL records up to the wal_level parameter change + on the primary won't be decoded. + + + + Creation of a logical slot requires information about all the currently + running transactions. On the primary, this information is available + directly, but on a standby, this information has to be obtained from + primary. Thus, slot creation may need to wait for some activity to happen + on the primary. If the primary is idle, creating a logical slot on + standby may take noticeable time. This can be sped up by calling the + pg_log_standby_snapshot on the primary. + + Replication slots persist across crashes and know nothing about the state diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1b7c2f23a4..b540ee293b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -4469,6 +4469,17 @@ LocalProcessControlFile(bool reset) ReadControlFile(); } +/* + * Get the wal_level from the control file. For a standby, this value should be + * considered as its active wal_level, because it may be different from what + * was originally configured on standby. + */ +WalLevel +GetActiveWalLevelOnStandby(void) +{ + return ControlFile->wal_level; +} + /* * Initialization of shared memory for XLOG */ diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index c07daa874f..52f827a902 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -31,6 +31,7 @@ #include "storage/fd.h" #include "storage/ipc.h" #include "storage/smgr.h" +#include "storage/standby.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" @@ -196,6 +197,36 @@ pg_switch_wal(PG_FUNCTION_ARGS) PG_RETURN_LSN(switchpoint); } +/* + * pg_log_standby_snapshot: call LogStandbySnapshot() + * + * Permission checking for this function is managed through the normal + * GRANT system. + */ +Datum +pg_log_standby_snapshot(PG_FUNCTION_ARGS) +{ + XLogRecPtr recptr; + + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is in progress"), + errhint("pg_log_standby_snapshot() cannot be executed during recovery."))); + + if (!XLogStandbyInfoActive()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("pg_log_standby_snapshot() can only be used if wal_level >= replica"))); + + recptr = LogStandbySnapshot(); + + /* + * As a convenience, return the WAL location of the last inserted record + */ + PG_RETURN_LSN(recptr); +} + /* * pg_create_restore_point: a named point for restore * diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 83ca893444..b7c65ea37d 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public; REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public; +REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public; + REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public; REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8fe7bb65f1..5508cc2177 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* + * If wal_level on the primary is reduced to less than + * logical, we want to prevent existing logical slots from + * being used. Existing logical slots on the standby get + * invalidated when this WAL record is replayed; and further, + * slot creation fails when wal_level is not sufficient; but + * all these operations are not synchronized, so a logical + * slot may creep in while the wal_level is being + * reduced. Hence this extra check. + */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + { + /* + * This can occur only on a standby, as a primary would + * not allow to restart after changing wal_level < logical + * if there is pre-existing logical slot. + */ + Assert(RecoveryInProgress()); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary"))); + } + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6ecea3c49c..82dae95080 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary"))); + } } /* @@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + /* + * On a standby, this check is also required while creating the + * slot. Check the comments in the function. + */ + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cc79d6713b..41848f0ac6 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -41,6 +41,7 @@ #include "access/transam.h" #include "access/xlog_internal.h" +#include "access/xlogrecovery.h" #include "common/file_utils.h" #include "common/string.h" #include "miscadmin.h" @@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void) /* * For logical slots log a standby snapshot and start logical decoding * at exactly that position. That allows the slot to start up more - * quickly. + * quickly. But on a standby we cannot do WAL writes, so just use the + * replay pointer; effectively, an attempt to create a logical slot on + * standby will cause it to wait for an xl_running_xact record to be + * logged independently on the primary, so that a snapshot can be + * built using the record. * - * That's not needed (or indeed helpful) for physical slots as they'll - * start replay at the last logged checkpoint anyway. Instead return - * the location of the last redo LSN. While that slightly increases - * the chance that we have to retry, it's where a base backup has to - * start replay at. + * None of this is needed (or indeed helpful) for physical slots as + * they'll start replay at the last logged checkpoint anyway. Instead + * return the location of the last redo LSN. While that slightly + * increases the chance that we have to retry, it's where a base + * backup has to start replay at. */ - if (!RecoveryInProgress() && SlotIsLogical(slot)) - { - XLogRecPtr flushptr; - - /* start at current insert position */ - restart_lsn = GetXLogInsertRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - - /* make sure we have enough information to start */ - flushptr = LogStandbySnapshot(); - - /* and make sure it's fsynced to disk */ - XLogFlush(flushptr); - } - else - { + if (SlotIsPhysical(slot)) restart_lsn = GetRedoRecPtr(); - SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = restart_lsn; - SpinLockRelease(&slot->mutex); - } + else if (RecoveryInProgress()) + restart_lsn = GetXLogReplayRecPtr(NULL); + else + restart_lsn = GetXLogInsertRecPtr(); + + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); /* prevent WAL removal as fast as possible */ ReplicationSlotsComputeRequiredLSN(); @@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void) if (XLogGetLastRemovedSegno() < segno) break; } + + if (!RecoveryInProgress() && SlotIsLogical(slot)) + { + XLogRecPtr flushptr; + + /* make sure we have enough information to start */ + flushptr = LogStandbySnapshot(); + + /* and make sure it's fsynced to disk */ + XLogFlush(flushptr); + } } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5423cf0a17..45b8b3684f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req int count; WALReadError errinfo; XLogSegNo segno; - TimeLineID currTLI = GetWALInsertionTimeLine(); + TimeLineID currTLI; /* - * Since logical decoding is only permitted on a primary server, we know - * that the current timeline ID can't be changing any more. If we did this - * on a standby, we'd have to worry about the values we compute here - * becoming invalid due to a promotion or timeline change. + * Make sure we have enough WAL available before retrieving the current + * timeline. This is needed to determine am_cascading_walsender accurately + * which is needed to determine the current timeline. */ + flushptr = WalSndWaitForWal(targetPagePtr + reqLen); + + /* + * Since logical decoding is also permitted on a standby server, we need + * to check if the server is in recovery to decide how to get the current + * timeline ID (so that it also cover the promotion or timeline change + * cases). + */ + am_cascading_walsender = RecoveryInProgress(); + + if (am_cascading_walsender) + GetXLogReplayRecPtr(&currTLI); + else + currTLI = GetWALInsertionTimeLine(); + XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI); sendTimeLineIsHistoric = (state->currTLI != currTLI); sendTimeLine = state->currTLI; sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; - /* make sure we have enough WAL available */ - flushptr = WalSndWaitForWal(targetPagePtr + reqLen); - /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) return -1; @@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req cur_page, targetPagePtr, XLOG_BLCKSZ, - state->seg.ws_tli, /* Pass the current TLI because only - * WalSndSegmentOpen controls whether new - * TLI is needed. */ + currTLI, /* Pass the current TLI because only + * WalSndSegmentOpen controls whether new TLI + * is needed. */ &errinfo)) WALReadRaiseError(&errinfo); @@ -3076,10 +3087,14 @@ XLogSendLogical(void) * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ - if (flushPtr == InvalidXLogRecPtr) - flushPtr = GetFlushRecPtr(NULL); - else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) - flushPtr = GetFlushRecPtr(NULL); + if (flushPtr == InvalidXLogRecPtr || + logical_decoding_ctx->reader->EndRecPtr >= flushPtr) + { + if (am_cascading_walsender) + flushPtr = GetStandbyFlushRecPtr(NULL); + else + flushPtr = GetFlushRecPtr(NULL); + } /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) @@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); - *tli = replayTLI; + if (tli) + *tli = replayTLI; result = replayPtr; if (receiveTLI == replayTLI && receivePtr > replayPtr) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cfe5409738..48ca852381 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -230,6 +230,7 @@ extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); extern void InitializeWalConsistencyChecking(void); extern void LocalProcessControlFile(bool reset); +extern WalLevel GetActiveWalLevelOnStandby(void); extern void StartupXLOG(void); extern void ShutdownXLOG(int code, Datum arg); extern void CreateCheckPoint(int flags); diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 33a77fe6ae..54f3ddcd97 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202304074 +#define CATALOG_VERSION_NO 202304075 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 067bee8198..b516cee8bd 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6432,6 +6432,9 @@ { oid => '2848', descr => 'switch to new wal file', proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn', proargtypes => '', prosrc => 'pg_switch_wal' }, +{ oid => '9658', descr => 'log details of the current snapshot to WAL', + proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 'pg_lsn', + proargtypes => '', prosrc => 'pg_log_standby_snapshot' }, { oid => '3098', descr => 'create a named restore point', proname => 'pg_create_restore_point', provolatile => 'v', prorettype => 'pg_lsn', proargtypes => 'text',