diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 613e9c387b..656eabfa00 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,7 +37,8 @@ submake-isolation:
submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
+ binary prepared replorigin
regresscheck: all | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
new file mode 100644
index 0000000000..c0f512579c
--- /dev/null
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -0,0 +1,141 @@
+-- predictability
+SET synchronous_commit = on;
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+ pg_replication_origin_create
+------------------------------
+ 1
+(1 row)
+
+-- ensure duplicate creations fail
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
+DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
+--ensure deletions work (once)
+SELECT pg_replication_origin_create('test_decoding: temp');
+ pg_replication_origin_create
+------------------------------
+ 2
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: temp');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: temp');
+ERROR: cache lookup failed for replication origin 'test_decoding: temp'
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
+ table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
+ table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
+ COMMIT
+(5 rows)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ERROR: cannot setup replication origin when one is already setup
+BEGIN;
+-- setup transaction origin
+SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+-- check replication progress for the session is correct
+SELECT pg_replication_origin_session_progress(false);
+ pg_replication_origin_session_progress
+----------------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_session_progress(true);
+ pg_replication_origin_session_progress
+----------------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
+ local_id | external_id | remote_lsn | ?column?
+----------+--------------------------------+------------+----------
+ 1 | test_decoding: regression_slot | 0/AABBCCDD | t
+(1 row)
+
+-- check replication progress identified by name is correct
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
+ pg_replication_origin_progress
+--------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
+ pg_replication_origin_progress
+--------------------------------
+ 0/AABBCCDD
+(1 row)
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_origin_session_reset();
+ERROR: no replication origin is configured
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data
+------
+(0 rows)
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data
+--------------------------------------------------------------------------------
+ BEGIN
+ table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
+ COMMIT
+(3 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: regression_slot');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
new file mode 100644
index 0000000000..e12404e106
--- /dev/null
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -0,0 +1,64 @@
+-- predictability
+SET synchronous_commit = on;
+
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+-- ensure duplicate creations fail
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+
+--ensure deletions work (once)
+SELECT pg_replication_origin_create('test_decoding: temp');
+SELECT pg_replication_origin_drop('test_decoding: temp');
+SELECT pg_replication_origin_drop('test_decoding: temp');
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+
+BEGIN;
+-- setup transaction origin
+SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+
+-- check replication progress for the session is correct
+SELECT pg_replication_origin_session_progress(false);
+SELECT pg_replication_origin_session_progress(true);
+
+SELECT pg_replication_origin_session_reset();
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
+
+-- check replication progress identified by name is correct
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_origin_session_reset();
+
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 963d5df9da..bca03ee21b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
#include "replication/output_plugin.h"
#include "replication/logical.h"
+#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -43,6 +44,7 @@ typedef struct
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
+ bool only_local;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
+static bool pg_decode_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id);
void
_PG_init(void)
@@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
cb->commit_cb = pg_decode_commit_txn;
+ cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
}
@@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_xids = true;
data->include_timestamp = false;
data->skip_empty_xacts = false;
+ data->only_local = false;
ctx->output_plugin_private = data;
@@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "only-local") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->only_local = true;
+ else if (!parse_bool(strVal(elem->arg), &data->only_local))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static bool
+pg_decode_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (data->only_local && origin_id != InvalidRepOriginId)
+ return true;
+ return false;
+}
+
/*
* Print literal `outputstr' already represented as string of type `typid'
* into stringbuf `s'.
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 898865eea1..4b79958b35 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -238,6 +238,16 @@
query rewrite rules
+
+ pg_replication_origin
+ registered replication origins
+
+
+
+ pg_replication_origin_status
+ information about replication origins, including replication progress
+
+
pg_replication_slotsreplication slot information
@@ -5337,6 +5347,119 @@
+
+ pg_replication_origin
+
+
+ pg_replication_origin
+
+
+
+ The pg_replication_origin catalog contains
+ all replication origins created. For more on replication origins
+ see .
+
+
+
+
+ pg_replication_origin Columns
+
+
+
+
+ Name
+ Type
+ References
+ Description
+
+
+
+
+
+ roident
+ Oid
+
+ A unique, cluster-wide identifier for the replication
+ origin. Should never leave the system.
+
+
+
+ roname
+ text
+
+ The external, user defined, name of a replication
+ origin.
+
+
+
+
+
+
+
+ pg_replication_origin_status
+
+
+ pg_replication_origin_status
+
+
+
+ The pg_replication_origin_status view
+ contains information about how far replay for a certain origin has
+ progressed. For more on replication origins
+ see .
+
+
+
+
+ pg_replication_origin_status Columns
+
+
+
+
+ Name
+ Type
+ References
+ Description
+
+
+
+
+
+ local_id
+ Oid
+ pg_replication_origin.roident
+ internal node identifier
+
+
+
+ external_id
+ text
+ pg_replication_origin.roname
+ external node identifier
+
+
+
+ remote_lsn
+ pg_lsn
+
+ The origin node's LSN up to which data has been replicated.
+
+
+
+
+ local_lsn
+ pg_lsn
+
+ This node's LSN that at
+ which remote_lsn has been replicated. Used to
+ flush commit records before persisting data to disk when using
+ asynchronous commits.
+
+
+
+
+
+
pg_replication_slots
diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml
index 26aa7ee50e..6268d5496b 100644
--- a/doc/src/sgml/filelist.sgml
+++ b/doc/src/sgml/filelist.sgml
@@ -95,6 +95,7 @@
+
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 0053d7d410..dcade93e43 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -16879,11 +16879,13 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
Replication Functions
- The functions shown in are
- for controlling and interacting with replication features.
- See
- and for information about the
- underlying features. Use of these functions is restricted to superusers.
+ The functions shown
+ in are for
+ controlling and interacting with replication features.
+ See ,
+ ,
+ for information about the underlying features. Use of these
+ functions is restricted to superusers.
@@ -17040,6 +17042,195 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
on future calls.
+
+
+
+
+ pg_replication_origin_create
+
+ pg_replication_origin_create(node_nametext)
+
+
+ internal_idoid
+
+
+ Create a replication origin with the the passed in external
+ name, and create an internal id for it.
+
+
+
+
+
+
+ pg_replication_origin_drop
+
+ pg_replication_origin_drop(node_nametext)
+
+
+ void
+
+
+ Delete a previously created replication origin, including the
+ associated replay progress.
+
+
+
+
+
+
+ pg_replication_origin_oid
+
+ pg_replication_origin_oid(node_nametext)
+
+
+ internal_idoid
+
+
+ Lookup replication origin by name and return the internal
+ oid. If no corresponding replication origin is found a error
+ is thrown.
+
+
+
+
+
+
+ pg_replication_origin_session_setup
+
+ pg_replication_origin_setup_session(node_nametext)
+
+
+ void
+
+
+ Configure the current session to be replaying from the passed in
+ origin, allowing replay progress to be tracked. Use
+ pg_replication_origin_session_reset to revert.
+ Can only be used if no previous origin is configured.
+
+
+
+
+
+
+ pg_replication_origin_session_reset
+
+ pg_replication_origin_session_reset()
+
+
+ void
+
+
+ Cancel the effects
+ of pg_replication_origin_session_setup().
+
+
+
+
+
+
+ pg_replication_session_is_setup
+
+ pg_replication_session_is_setup()
+
+
+ bool
+
+
+ Has a replication origin been configured in the current session?
+
+
+
+
+
+
+ pg_replication_origin_session_progress
+
+ pg_replication_origin_progress(flushbool)
+
+
+ pg_lsn
+
+
+ Return the replay position for the replication origin configured in
+ the current session. The parameter flush
+ determines whether the corresponding local transaction will be
+ guaranteed to have been flushed to disk or not.
+
+
+
+
+
+
+ pg_replication_origin_xact_setup
+
+ pg_replication_origin_xact_setup(origin_lsnpg_lsn, origin_timestamptimestamptz)
+
+
+ void
+
+
+ Mark the current transaction to be replaying a transaction that has
+ committed at the passed in LSN and timestamp. Can
+ only be called when a replication origin has previously been
+ configured using
+ pg_replication_origin_session_setup().
+
+
+
+
+
+
+ pg_replication_origin_xact_reset
+
+ pg_replication_origin_xact_reset()
+
+
+ void
+
+
+ Cancel the effects of
+ pg_replication_origin_xact_setup().
+
+
+
+
+
+
+ pg_replication_origin_advance
+
+ pg_replication_origin_advance(node_nametext, pospg_lsn)
+
+
+ void
+
+
+ Set replication progress for the passed in node to the passed in
+ position. This primarily is useful for setting up the initial position
+ or a new position after configuration changes and similar. Be aware
+ that careless use of this function can lead to inconsistently
+ replicated data.
+
+
+
+
+
+
+ pg_replication_origin_progress
+
+ pg_replication_origin_progress(node_nametext, flushbool)
+
+
+ pg_lsn
+
+
+ Return the replay position for the passed in replication origin. The
+ parameter flush determines whether the
+ corresponding local transaction will be guaranteed to have been
+ flushed to disk or not.
+
+
+
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 0810a2d1f9..f817af3ea8 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
@@ -370,7 +371,8 @@ typedef void (*LogicalOutputPluginInit)(struct OutputPluginCallbacks *cb);
The begin_cb, change_cb
and commit_cb callbacks are required,
- while startup_cb
+ while startup_cb,
+ filter_by_origin_cb
and shutdown_cb are optional.
@@ -569,6 +571,37 @@ typedef void (*LogicalDecodeChangeCB) (
+
+
+ Origin Filter Callback
+
+
+ The optional filter_by_origin_cb callback
+ is called to determine wheter data that has been replayed
+ from origin_id is of interest to the
+ output plugin.
+
+typedef bool (*LogicalDecodeChangeCB) (
+ struct LogicalDecodingContext *ctx,
+ RepNodeId origin_id
+);
+
+ The ctx parameter has the same contents
+ as for the other callbacks. No information but the origin is
+ available. To signal that changes originating on the passed in
+ node are irrelevant, return true, causing them to be filtered
+ away; false otherwise. The other callbacks will not be called
+ for transactions and changes that have been filtered away.
+
+
+ This is useful when implementing cascading or multi directional
+ replication solutions. Filtering by the origin allows to
+ prevent replicating the same changes back and forth in such
+ setups. While transactions and changes also carry information
+ about the origin, filtering via this callback is noticeably
+ more efficient.
+
+
diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml
index e378d6978d..4a45138bf7 100644
--- a/doc/src/sgml/postgres.sgml
+++ b/doc/src/sgml/postgres.sgml
@@ -220,6 +220,7 @@
&spi;
&bgworker;
&logicaldecoding;
+ &replication-origins;
diff --git a/doc/src/sgml/replication-origins.sgml b/doc/src/sgml/replication-origins.sgml
new file mode 100644
index 0000000000..c531022911
--- /dev/null
+++ b/doc/src/sgml/replication-origins.sgml
@@ -0,0 +1,93 @@
+
+
+ Replication Progress Tracking
+
+ Replication Progress Tracking
+
+
+ Replication Origins
+
+
+
+ Replication origins are intended to make it easier to implement
+ logical replication solutions on top
+ of . They provide a solution to two
+ common problems:
+
+ How to safely keep track of replication progress
+ How to change replication behavior, based on the
+ origin of a row; e.g. to avoid loops in bi-directional replication
+ setups
+
+
+
+
+ Replication origins consist out of a name and a oid. The name, which
+ is what should be used to refer to the origin across systems, is
+ free-form text. It should be used in a way that makes conflicts
+ between replication origins created by different replication
+ solutions unlikely; e.g. by prefixing the replication solution's
+ name to it. The oid is used only to avoid having to store the long
+ version in situations where space efficiency is important. It should
+ never be shared between systems.
+
+
+
+ Replication origins can be created using the
+ pg_replication_origin_create();
+ dropped using
+ pg_replication_origin_drop();
+ and seen in the
+ pg_replication_origin
+ catalog.
+
+
+
+ When replicating from one system to another (independent of the fact that
+ those two might be in the same cluster, or even same database) one
+ nontrivial part of building a replication solution is to keep track of
+ replay progress in a safe manner. When the applying process, or the whole
+ cluster, dies, it needs to be possible to find out up to where data has
+ successfully been replicated. Naive solutions to this like updating a row in
+ a table for every replayed transaction have problems like runtime overhead
+ bloat.
+
+
+
+ Using the replication origin infrastructure a session can be
+ marked as replaying from a remote node (using the
+ pg_replication_origin_session_setup()
+ function. Additionally the LSN and commit
+ timestamp of every source transaction can be configured on a per
+ transaction basis using
+ pg_replication_origin_xact-setup().
+ If that's done replication progress will be persist in a crash safe
+ manner. Replay progress for all replication origins can be seen in the
+
+ pg_replication_origin_status
+ view. A individual origin's progress, e.g. when resuming
+ replication, can be acquired using
+ pg_replication_origin_progress()
+ for any origin or
+ pg_replication_origin_session_progress()
+ for the origin configured in the current session.
+
+
+
+ In more complex replication topologies than replication from exactly one
+ system to one other, another problem can be that, that it is hard to avoid
+ replicating replayed rows again. That can lead both to cycles in the
+ replication and inefficiencies. Replication origins provide a optional
+ mechanism to recognize and prevent that. When configured using the functions
+ referenced in the previous paragraph, every change and transaction passed to
+ output plugin callbacks (see )
+ generated by the session is tagged with the replication origin of the
+ generating session. This allows to treat them differently in the output
+ plugin, e.g. ignoring all but locally originating rows. Additionally
+ the
+ filter_by_origin_cb callback can be used
+ to filter the logical decoding change stream based on the
+ source. While less flexible, filtering via that callback is
+ considerably more efficient.
+
+
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 457cd708fd..b504ccd05c 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2189,6 +2189,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
(char *) heaptup->t_data + SizeofHeapTupleHeader,
heaptup->t_len - SizeofHeapTupleHeader);
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP_ID, info);
PageSetLSN(page, recptr);
@@ -2499,6 +2502,10 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
XLogRegisterBufData(0, tupledata, totaldatalen);
+
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP2_ID, info);
PageSetLSN(page, recptr);
@@ -2920,6 +2927,9 @@ l1:
- SizeofHeapTupleHeader);
}
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
PageSetLSN(page, recptr);
@@ -4650,6 +4660,8 @@ failed:
tuple->t_data->t_infomask2);
XLogRegisterData((char *) &xlrec, SizeOfHeapLock);
+ /* we don't decode row locks atm, so no need to log the origin */
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK);
PageSetLSN(page, recptr);
@@ -5429,6 +5441,8 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen);
+ /* inplace updates aren't decoded atm, don't log the origin */
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
PageSetLSN(page, recptr);
@@ -6787,6 +6801,9 @@ log_heap_update(Relation reln, Buffer oldbuf,
old_key_tuple->t_len - SizeofHeapTupleHeader);
}
+ /* filtering by origin on a row level is much more efficient */
+ XLogIncludeOrigin();
+
recptr = XLogInsert(RM_HEAP_ID, info);
return recptr;
@@ -6860,6 +6877,8 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapNewCid);
+ /* will be looked at irrespective of origin */
+
recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID);
return recptr;
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index d18e8ec998..c72a1f245d 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,8 +9,8 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
- hashdesc.o heapdesc.o \
- mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
+ hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
+ replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/replorigindesc.c b/src/backend/access/rmgrdesc/replorigindesc.c
new file mode 100644
index 0000000000..19bae9a0f8
--- /dev/null
+++ b/src/backend/access/rmgrdesc/replorigindesc.c
@@ -0,0 +1,61 @@
+/*-------------------------------------------------------------------------
+ *
+ * replorigindesc.c
+ * rmgr descriptor routines for replication/logical/replication_origin.c
+ *
+ * Portions Copyright (c) 2015, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/rmgrdesc/replorigindesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/origin.h"
+
+void
+replorigin_desc(StringInfo buf, XLogReaderState *record)
+{
+ char *rec = XLogRecGetData(record);
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_REPLORIGIN_SET:
+ {
+ xl_replorigin_set *xlrec;
+ xlrec = (xl_replorigin_set *) rec;
+
+ appendStringInfo(buf, "set %u; lsn %X/%X; force: %d",
+ xlrec->node_id,
+ (uint32) (xlrec->remote_lsn >> 32),
+ (uint32) xlrec->remote_lsn,
+ xlrec->force);
+ break;
+ }
+ case XLOG_REPLORIGIN_DROP:
+ {
+ xl_replorigin_drop *xlrec;
+ xlrec = (xl_replorigin_drop *) rec;
+
+ appendStringInfo(buf, "drop %u", xlrec->node_id);
+ break;
+ }
+ }
+}
+
+const char *
+replorigin_identify(uint8 info)
+{
+ switch (info)
+ {
+ case XLOG_REPLORIGIN_SET:
+ return "SET";
+ case XLOG_REPLORIGIN_DROP:
+ return "DROP";
+ default:
+ return NULL;
+ }
+}
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index b036b6d524..3297e1d379 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -101,6 +101,16 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
data += sizeof(xl_xact_twophase);
}
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ xl_xact_origin *xl_origin = (xl_xact_origin *) data;
+
+ parsed->origin_lsn = xl_origin->origin_lsn;
+ parsed->origin_timestamp = xl_origin->origin_timestamp;
+
+ data += sizeof(xl_xact_origin);
+ }
}
void
@@ -156,7 +166,7 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
}
static void
-xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
{
xl_xact_parsed_commit parsed;
int i;
@@ -218,6 +228,15 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
if (XactCompletionForceSyncCommit(parsed.xinfo))
appendStringInfo(buf, "; sync");
+
+ if (parsed.xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ appendStringInfo(buf, "; origin: node %u, lsn %X/%X, at %s",
+ origin_id,
+ (uint32)(parsed.origin_lsn >> 32),
+ (uint32)parsed.origin_lsn,
+ timestamptz_to_str(parsed.origin_timestamp));
+ }
}
static void
@@ -274,7 +293,8 @@ xact_desc(StringInfo buf, XLogReaderState *record)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
+ xact_desc_commit(buf, XLogRecGetInfo(record), xlrec,
+ XLogRecGetOrigin(record));
}
else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED)
{
diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c
index dc23ab27b6..40042a5fd5 100644
--- a/src/backend/access/transam/commit_ts.c
+++ b/src/backend/access/transam/commit_ts.c
@@ -49,18 +49,18 @@
*/
/*
- * We need 8+4 bytes per xact. Note that enlarging this struct might mean
+ * We need 8+2 bytes per xact. Note that enlarging this struct might mean
* the largest possible file name is more than 5 chars long; see
* SlruScanDirectory.
*/
typedef struct CommitTimestampEntry
{
TimestampTz time;
- CommitTsNodeId nodeid;
+ RepOriginId nodeid;
} CommitTimestampEntry;
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
- sizeof(CommitTsNodeId))
+ sizeof(RepOriginId))
#define COMMIT_TS_XACTS_PER_PAGE \
(BLCKSZ / SizeOfCommitTimestampEntry)
@@ -93,43 +93,18 @@ CommitTimestampShared *commitTsShared;
/* GUC variable */
bool track_commit_timestamp;
-static CommitTsNodeId default_node_id = InvalidCommitTsNodeId;
-
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
- CommitTsNodeId nodeid, int pageno);
+ RepOriginId nodeid, int pageno);
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
- CommitTsNodeId nodeid, int slotno);
+ RepOriginId nodeid, int slotno);
static int ZeroCommitTsPage(int pageno, bool writeXlog);
static bool CommitTsPagePrecedes(int page1, int page2);
static void WriteZeroPageXlogRec(int pageno);
static void WriteTruncateXlogRec(int pageno);
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid);
-
-
-/*
- * CommitTsSetDefaultNodeId
- *
- * Set default nodeid for current backend.
- */
-void
-CommitTsSetDefaultNodeId(CommitTsNodeId nodeid)
-{
- default_node_id = nodeid;
-}
-
-/*
- * CommitTsGetDefaultNodeId
- *
- * Set default nodeid for current backend.
- */
-CommitTsNodeId
-CommitTsGetDefaultNodeId(void)
-{
- return default_node_id;
-}
+ RepOriginId nodeid);
/*
* TransactionTreeSetCommitTsData
@@ -156,7 +131,7 @@ CommitTsGetDefaultNodeId(void)
void
TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid, bool do_xlog)
+ RepOriginId nodeid, bool do_xlog)
{
int i;
TransactionId headxid;
@@ -234,7 +209,7 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
static void
SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
- CommitTsNodeId nodeid, int pageno)
+ RepOriginId nodeid, int pageno)
{
int slotno;
int i;
@@ -259,7 +234,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
*/
static void
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
- CommitTsNodeId nodeid, int slotno)
+ RepOriginId nodeid, int slotno)
{
int entryno = TransactionIdToCTsEntry(xid);
CommitTimestampEntry entry;
@@ -282,7 +257,7 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
*/
bool
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
- CommitTsNodeId *nodeid)
+ RepOriginId *nodeid)
{
int pageno = TransactionIdToCTsPage(xid);
int entryno = TransactionIdToCTsEntry(xid);
@@ -322,7 +297,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
if (ts)
*ts = 0;
if (nodeid)
- *nodeid = InvalidCommitTsNodeId;
+ *nodeid = InvalidRepOriginId;
return false;
}
@@ -373,7 +348,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
* as NULL if not wanted.
*/
TransactionId
-GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid)
+GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
{
TransactionId xid;
@@ -503,7 +478,7 @@ CommitTsShmemInit(void)
commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
- commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId;
+ commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
}
else
Assert(found);
@@ -857,7 +832,7 @@ WriteTruncateXlogRec(int pageno)
static void
WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid)
+ RepOriginId nodeid)
{
xl_commit_ts_set record;
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index acd825fad4..7c4d773ce0 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -23,6 +23,7 @@
#include "commands/dbcommands_xlog.h"
#include "commands/sequence.h"
#include "commands/tablespace.h"
+#include "replication/origin.h"
#include "storage/standby.h"
#include "utils/relmapper.h"
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1495bb499f..511bcbbc51 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,8 +40,10 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
+#include "replication/origin.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -1073,21 +1075,27 @@ RecordTransactionCommit(void)
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */);
- }
- /*
- * We only need to log the commit timestamp separately if the node
- * identifier is a valid value; the commit record above already contains
- * the timestamp info otherwise, and will be used to load it.
- */
- if (markXidCommitted)
- {
- CommitTsNodeId node_id;
+ /*
+ * Record plain commit ts if not replaying remote actions, or if no
+ * timestamp is configured.
+ */
+ if (replorigin_sesssion_origin == InvalidRepOriginId ||
+ replorigin_sesssion_origin == DoNotReplicateId ||
+ replorigin_sesssion_origin_timestamp == 0)
+ replorigin_sesssion_origin_timestamp = xactStopTimestamp;
+ else
+ replorigin_session_advance(replorigin_sesssion_origin_lsn,
+ XactLastRecEnd);
- node_id = CommitTsGetDefaultNodeId();
+ /*
+ * We don't need to WAL log origin or timestamp here, the commit
+ * record contains all the necessary information and will redo the SET
+ * action during replay.
+ */
TransactionTreeSetCommitTsData(xid, nchildren, children,
- xactStopTimestamp,
- node_id, node_id != InvalidCommitTsNodeId);
+ replorigin_sesssion_origin_timestamp,
+ replorigin_sesssion_origin, false);
}
/*
@@ -1176,9 +1184,11 @@ RecordTransactionCommit(void)
if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd);
+ /* remember end of last commit record */
+ XactLastCommitEnd = XactLastRecEnd;
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
-
cleanup:
/* Clean up local data */
if (rels)
@@ -4611,6 +4621,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_invals xl_invals;
xl_xact_twophase xl_twophase;
+ xl_xact_origin xl_origin;
uint8 info;
@@ -4668,6 +4679,15 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_twophase.xid = twophase_xid;
}
+ /* dump transaction origin information */
+ if (replorigin_sesssion_origin != InvalidRepOriginId)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+ xl_origin.origin_lsn = replorigin_sesssion_origin_lsn;
+ xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp;
+ }
+
if (xl_xinfo.xinfo != 0)
info |= XLOG_XACT_HAS_INFO;
@@ -4709,6 +4729,12 @@ XactLogCommitRecord(TimestampTz commit_time,
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+ XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+ /* we allow filtering by xacts */
+ XLogIncludeOrigin();
+
return XLogInsert(RM_XACT_ID, info);
}
@@ -4806,10 +4832,12 @@ XactLogAbortRecord(TimestampTz abort_time,
static void
xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid,
- XLogRecPtr lsn)
+ XLogRecPtr lsn,
+ RepOriginId origin_id)
{
TransactionId max_xid;
int i;
+ TimestampTz commit_time;
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
@@ -4829,9 +4857,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
LWLockRelease(XidGenLock);
}
+ Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepOriginId));
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ commit_time = parsed->origin_timestamp;
+ else
+ commit_time = parsed->xact_time;
+
/* Set the transaction commit timestamp and metadata */
TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
- parsed->xact_time, InvalidCommitTsNodeId,
+ commit_time, origin_id,
false);
if (standbyState == STANDBY_DISABLED)
@@ -4892,6 +4927,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
StandbyReleaseLockTree(xid, 0, NULL);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+ false /* backward */, false /* WAL */);
+ }
+
/* Make sure files supposed to be dropped are dropped */
if (parsed->nrels > 0)
{
@@ -5047,13 +5089,13 @@ xact_redo(XLogReaderState *record)
{
Assert(!TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, XLogRecGetXid(record),
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
}
else
{
Assert(TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, parsed.twophase_xid,
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
RemoveTwoPhaseFile(parsed.twophase_xid, false);
}
}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2580996102..da7b6c2fad 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -44,6 +44,7 @@
#include "postmaster/startup.h"
#include "replication/logical.h"
#include "replication/slot.h"
+#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
@@ -295,6 +296,7 @@ static TimeLineID curFileTLI;
static XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr;
XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr;
+XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr;
/*
* RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -6211,6 +6213,11 @@ StartupXLOG(void)
*/
StartupMultiXact();
+ /*
+ * Recover knowledge about replay progress of known replication partners.
+ */
+ StartupReplicationOrigin();
+
/*
* Initialize unlogged LSN. On a clean shutdown, it's restored from the
* control file. On recovery, all unlogged relations are blown away, so
@@ -8394,6 +8401,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
CheckPointSnapBuild();
CheckPointLogicalRewriteHeap();
CheckPointBuffers(flags); /* performs all required fsyncs */
+ CheckPointReplicationOrigin();
/* We deliberately delay 2PC checkpointing as long as possible */
CheckPointTwoPhase(checkPointRedo);
}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 618f8792f8..0cdb6af052 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -26,6 +26,7 @@
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
#include "miscadmin.h"
+#include "replication/origin.h"
#include "storage/bufmgr.h"
#include "storage/proc.h"
#include "utils/memutils.h"
@@ -72,6 +73,9 @@ static XLogRecData *mainrdata_head;
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
static uint32 mainrdata_len; /* total # of bytes in chain */
+/* Should te in-progress insertion log the origin */
+static bool include_origin = false;
+
/*
* These are used to hold the record header while constructing a record.
* 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
@@ -83,10 +87,12 @@ static uint32 mainrdata_len; /* total # of bytes in chain */
static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL;
+#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
+
#define HEADER_SCRATCH_SIZE \
(SizeOfXLogRecord + \
MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
- SizeOfXLogRecordDataHeaderLong)
+ SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin)
/*
* An array of XLogRecData structs, to hold registered data.
@@ -193,6 +199,7 @@ XLogResetInsertion(void)
max_registered_block_id = 0;
mainrdata_len = 0;
mainrdata_last = (XLogRecData *) &mainrdata_head;
+ include_origin = false;
begininsert_called = false;
}
@@ -374,6 +381,16 @@ XLogRegisterBufData(uint8 block_id, char *data, int len)
regbuf->rdata_len += len;
}
+/*
+ * Should this record include the replication origin if one is set up?
+ */
+void
+XLogIncludeOrigin(void)
+{
+ Assert(begininsert_called);
+ include_origin = true;
+}
+
/*
* Insert an XLOG record having the specified RMID and info bytes, with the
* body of the record being the data and buffer references registered earlier
@@ -678,6 +695,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
scratch += sizeof(BlockNumber);
}
+ /* followed by the record's origin, if any */
+ if (include_origin && replorigin_sesssion_origin != InvalidRepOriginId)
+ {
+ *(scratch++) = XLR_BLOCK_ID_ORIGIN;
+ memcpy(scratch, &replorigin_sesssion_origin, sizeof(replorigin_sesssion_origin));
+ scratch += sizeof(replorigin_sesssion_origin);
+ }
+
/* followed by main data, if any */
if (mainrdata_len > 0)
{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 77be1b8ef3..3661e7229a 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -21,6 +21,7 @@
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
+#include "replication/origin.h"
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
@@ -975,6 +976,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
ResetDecoder(state);
state->decoded_record = record;
+ state->record_origin = InvalidRepOriginId;
ptr = (char *) record;
ptr += SizeOfXLogRecord;
@@ -1009,6 +1011,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
break; /* by convention, the main data fragment is
* always last */
}
+ else if (block_id == XLR_BLOCK_ID_ORIGIN)
+ {
+ COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
+ }
else if (block_id <= XLR_MAX_BLOCK_ID)
{
/* XLogRecordBlockHeader */
diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index c73f20d6a5..37d05d1acc 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -39,7 +39,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\
pg_ts_config.h pg_ts_config_map.h pg_ts_dict.h \
pg_ts_parser.h pg_ts_template.h pg_extension.h \
pg_foreign_data_wrapper.h pg_foreign_server.h pg_user_mapping.h \
- pg_foreign_table.h pg_policy.h \
+ pg_foreign_table.h pg_policy.h pg_replication_origin.h \
pg_default_acl.h pg_seclabel.h pg_shseclabel.h pg_collation.h pg_range.h \
pg_transform.h \
toasting.h indexing.h \
diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c
index e9d3cdcc9d..fa2aa27eff 100644
--- a/src/backend/catalog/catalog.c
+++ b/src/backend/catalog/catalog.c
@@ -32,6 +32,7 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_pltemplate.h"
#include "catalog/pg_db_role_setting.h"
+#include "catalog/pg_replication_origin.h"
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
@@ -224,7 +225,8 @@ IsSharedRelation(Oid relationId)
relationId == SharedDependRelationId ||
relationId == SharedSecLabelRelationId ||
relationId == TableSpaceRelationId ||
- relationId == DbRoleSettingRelationId)
+ relationId == DbRoleSettingRelationId ||
+ relationId == ReplicationOriginRelationId)
return true;
/* These are their indexes (see indexing.h) */
if (relationId == AuthIdRolnameIndexId ||
@@ -240,7 +242,9 @@ IsSharedRelation(Oid relationId)
relationId == SharedSecLabelObjectIndexId ||
relationId == TablespaceOidIndexId ||
relationId == TablespaceNameIndexId ||
- relationId == DbRoleSettingDatidRolidIndexId)
+ relationId == DbRoleSettingDatidRolidIndexId ||
+ relationId == ReplicationOriginIdentIndex ||
+ relationId == ReplicationOriginNameIndex)
return true;
/* These are their toast tables and toast indexes (see toasting.h) */
if (relationId == PgShdescriptionToastTable ||
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 4c35ef4349..2ad01f4cb4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -778,6 +778,13 @@ CREATE VIEW pg_user_mappings AS
REVOKE ALL on pg_user_mapping FROM public;
+
+CREATE VIEW pg_replication_origin_status AS
+ SELECT *
+ FROM pg_show_replication_origin_status();
+
+REVOKE ALL ON pg_replication_origin_status FROM public;
+
--
-- We have a few function definitions in here, too.
-- At some point there might be enough to justify breaking them out into
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 310a45c5c0..8adea13bf4 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o snapbuild.o
+OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+ snapbuild.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index eb7293f2f3..88424964ef 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -40,6 +40,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
+#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
@@ -131,6 +132,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_SPGIST_ID:
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
+ case RM_REPLORIGIN_ID:
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
@@ -422,6 +424,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ if (ctx->callbacks.filter_by_origin_cb == NULL)
+ return false;
+
+ return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
/*
* Consolidated commit record handling between the different form of commit
* records.
@@ -430,8 +441,17 @@ static void
DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid)
{
+ XLogRecPtr origin_lsn = InvalidXLogRecPtr;
+ XLogRecPtr commit_time = InvalidXLogRecPtr;
+ XLogRecPtr origin_id = InvalidRepOriginId;
int i;
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ origin_lsn = parsed->origin_lsn;
+ commit_time = parsed->origin_timestamp;
+ }
+
/*
* Process invalidation messages, even if we're not interested in the
* transaction's contents, since the various caches need to always be
@@ -452,12 +472,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* the reorderbuffer to forget the content of the (sub-)transactions
* if not.
*
- * There basically two reasons we might not be interested in this
+ * There can be several reasons we might not be interested in this
* transaction:
* 1) We might not be interested in decoding transactions up to this
* LSN. This can happen because we previously decoded it and now just
* are restarting or if we haven't assembled a consistent snapshot yet.
* 2) The transaction happened in another database.
+ * 3) The output plugin is not interested in the origin.
*
* We can't just use ReorderBufferAbort() here, because we need to execute
* the transaction's invalidations. This currently won't be needed if
@@ -472,7 +493,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
* ---
*/
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
- (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database))
+ (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+ FilterByOrigin(ctx, origin_id))
{
for (i = 0; i < parsed->nsubxacts; i++)
{
@@ -492,7 +514,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
- parsed->xact_time);
+ commit_time, origin_id, origin_lsn);
}
/*
@@ -537,8 +559,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -579,8 +606,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
+ change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
@@ -628,8 +660,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
+ change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
@@ -673,6 +710,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (rnode.dbNode != ctx->slot->data.database)
return;
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
data = tupledata;
@@ -685,6 +726,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
+ change->origin_id = XLogRecGetOrigin(r);
+
memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
/*
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 774ebbc749..45d143686a 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -39,6 +39,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/reorderbuffer.h"
+#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "storage/proc.h"
@@ -720,6 +721,34 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+bool
+filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+ bool ret;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "shutdown";
+ state.report_location = InvalidXLogRecPtr;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = false;
+
+ /* do the actual work: call callback */
+ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+
+ return ret;
+}
+
/*
* Set the required catalog xmin horizon for historic snapshots in the current
* replication slot.
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
new file mode 100644
index 0000000000..ab9ae0b6c2
--- /dev/null
+++ b/src/backend/replication/logical/origin.c
@@ -0,0 +1,1485 @@
+/*-------------------------------------------------------------------------
+ *
+ * origin.c
+ * Logical replication progress tracking support.
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/origin.c
+ *
+ * NOTES
+ *
+ * This file provides the following:
+ * * An infrastructure to name nodes in a replication setup
+ * * A facility to efficiently store and persist replication progress in a
+ * efficient and durable manner.
+ *
+ * Replication origin consist out of a descriptive, user defined, external
+ * name and a short, thus space efficient, internal 2 byte one. This split
+ * exists because replication origin have to be stored in WAL and shared
+ * memory and long descriptors would be inefficient. For now only use 2 bytes
+ * for the internal id of a replication origin as it seems unlikely that there
+ * soon will be more than 65k nodes in one replication setup; and using only
+ * two bytes allow us to be more space efficient.
+ *
+ * Replication progress is tracked in a shared memory table
+ * (ReplicationStates) that's dumped to disk every checkpoint. Entries
+ * ('slots') in this table are identified by the internal id. That's the case
+ * because it allows to increase replication progress during crash
+ * recovery. To allow doing so we store the original LSN (from the originating
+ * system) of a transaction in the commit record. That allows to recover the
+ * precise replayed state after crash recovery; without requiring synchronous
+ * commits. Allowing logical replication to use asynchronous commit is
+ * generally good for performance, but especially important as it allows a
+ * single threaded replay process to keep up with a source that has multiple
+ * backends generating changes concurrently. For efficiency and simplicity
+ * reasons a backend can setup one replication origin that's from then used as
+ * the source of changes produced by the backend, until reset again.
+ *
+ * This infrastructure is intended to be used in cooperation with logical
+ * decoding. When replaying from a remote system the configured origin is
+ * provided to output plugins, allowing prevention of replication loops and
+ * other filtering.
+ *
+ * There are several levels of locking at work:
+ *
+ * * To create and drop replication origins a exclusive lock on
+ * pg_replication_slot is required for the duration. That allows us to
+ * safely and conflict free assign new origins using a dirty snapshot.
+ *
+ * * When creating a in-memory replication progress slot the ReplicationOirgin
+ * LWLock has to be held exclusively; when iterating over the replication
+ * progress a shared lock has to be held, the same when advancing the
+ * replication progress of a individual backend that has not setup as the
+ * session's replication origin.
+ *
+ * * When manipulating or looking at the remote_lsn and local_lsn fields of a
+ * replication progress slot that slot's lwlock has to be held. That's
+ * primarily because we do not assume 8 byte writes (the LSN) is atomic on
+ * all our platforms, but it also simplifies memory ordering concerns
+ * between the remote and local lsn. We use a lwlock instead of a spinlock
+ * so it's less harmful to hold the lock over a WAL write
+ * (c.f. AdvanceReplicationProgress).
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include
+#include
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/origin.h"
+#include "replication/logical.h"
+
+#include "storage/fd.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/copydir.h"
+
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/pg_lsn.h"
+#include "utils/rel.h"
+#include "utils/syscache.h"
+#include "utils/tqual.h"
+
+/*
+ * Replay progress of a single remote node.
+ */
+typedef struct ReplicationState
+{
+ /*
+ * Local identifier for the remote node.
+ */
+ RepOriginId roident;
+
+ /*
+ * Location of the latest commit from the remote side.
+ */
+ XLogRecPtr remote_lsn;
+
+ /*
+ * Remember the local lsn of the commit record so we can XLogFlush() to it
+ * during a checkpoint so we know the commit record actually is safe on
+ * disk.
+ */
+ XLogRecPtr local_lsn;
+
+ /*
+ * Slot is setup in backend?
+ */
+ pid_t acquired_by;
+
+ /*
+ * Lock protecting remote_lsn and local_lsn.
+ */
+ LWLock lock;
+} ReplicationState;
+
+/*
+ * On disk version of ReplicationState.
+ */
+typedef struct ReplicationStateOnDisk
+{
+ RepOriginId roident;
+ XLogRecPtr remote_lsn;
+} ReplicationStateOnDisk;
+
+
+typedef struct ReplicationStateCtl
+{
+ int tranche_id;
+ LWLockTranche tranche;
+ ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
+} ReplicationStateCtl;
+
+/* external variables */
+RepOriginId replorigin_sesssion_origin = InvalidRepOriginId; /* assumed identity */
+XLogRecPtr replorigin_sesssion_origin_lsn = InvalidXLogRecPtr;
+TimestampTz replorigin_sesssion_origin_timestamp = 0;
+
+/*
+ * Base address into a shared memory array of replication states of size
+ * max_replication_slots.
+ *
+ * XXX: Should we use a separate variable to size this rather than
+ * max_replication_slots?
+ */
+static ReplicationState *replication_states;
+static ReplicationStateCtl *replication_states_ctl;
+
+/*
+ * Backend-local, cached element from ReplicationStates for use in a backend
+ * replaying remote commits, so we don't have to search ReplicationStates for
+ * the backends current RepOriginId.
+ */
+static ReplicationState *session_replication_state = NULL;
+
+/* Magic for on disk files. */
+#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
+
+static void
+replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
+{
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("only superusers can query or manipulate replication origins")));
+
+ if (check_slots && max_replication_slots == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
+
+ if (!recoveryOK && RecoveryInProgress())
+ ereport(ERROR,
+ (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
+ errmsg("cannot manipulate replication origins during recovery")));
+
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Functions for working with replication origins themselves.
+ * ---------------------------------------------------------------------------
+ */
+
+/*
+ * Check for a persistent replication origin identified by name.
+ *
+ * Returns InvalidOid if the node isn't known yet and missing_ok is true.
+ */
+RepOriginId
+replorigin_by_name(char *roname, bool missing_ok)
+{
+ Form_pg_replication_origin ident;
+ Oid roident = InvalidOid;
+ HeapTuple tuple;
+ Datum roname_d;
+
+ roname_d = CStringGetTextDatum(roname);
+
+ tuple = SearchSysCache1(REPLORIGNAME, roname_d);
+ if (HeapTupleIsValid(tuple))
+ {
+ ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
+ roident = ident->roident;
+ ReleaseSysCache(tuple);
+ }
+ else if (!missing_ok)
+ elog(ERROR, "cache lookup failed for replication origin '%s'",
+ roname);
+
+ return roident;
+}
+
+/*
+ * Create a replication origin.
+ *
+ * Needs to be called in a transaction.
+ */
+RepOriginId
+replorigin_create(char *roname)
+{
+ Oid roident;
+ HeapTuple tuple = NULL;
+ Relation rel;
+ Datum roname_d;
+ SnapshotData SnapshotDirty;
+ SysScanDesc scan;
+ ScanKeyData key;
+
+ roname_d = CStringGetTextDatum(roname);
+
+ Assert(IsTransactionState());
+
+ /*
+ * We need the numeric replication origin to be 16bit wide, so we cannot
+ * rely on the normal oid allocation. Instead we simply scan
+ * pg_replication_origin for the first unused id. That's not particularly
+ * efficient, but this should be an fairly infrequent operation - we can
+ * easily spend a bit more code on this when it turns out it needs to be
+ * faster.
+ *
+ * We handle concurrency by taking an exclusive lock (allowing reads!)
+ * over the table for the duration of the search. Because we use a "dirty
+ * snapshot" we can read rows that other in-progress sessions have
+ * written, even though they would be invisible with normal snapshots. Due
+ * to the exclusive lock there's no danger that new rows can appear while
+ * we're checking.
+ */
+ InitDirtySnapshot(SnapshotDirty);
+
+ rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
+
+ for (roident = InvalidOid + 1; roident < UINT16_MAX; roident++)
+ {
+ bool nulls[Natts_pg_replication_origin];
+ Datum values[Natts_pg_replication_origin];
+ bool collides;
+ CHECK_FOR_INTERRUPTS();
+
+ ScanKeyInit(&key,
+ Anum_pg_replication_origin_roident,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(roident));
+
+ scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
+ true /* indexOK */,
+ &SnapshotDirty,
+ 1, &key);
+
+ collides = HeapTupleIsValid(systable_getnext(scan));
+
+ systable_endscan(scan);
+
+ if (!collides)
+ {
+ /*
+ * Ok, found an unused roident, insert the new row and do a CCI,
+ * so our callers can look it up if they want to.
+ */
+ memset(&nulls, 0, sizeof(nulls));
+
+ values[Anum_pg_replication_origin_roident -1] = ObjectIdGetDatum(roident);
+ values[Anum_pg_replication_origin_roname - 1] = roname_d;
+
+ tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+ simple_heap_insert(rel, tuple);
+ CatalogUpdateIndexes(rel, tuple);
+ CommandCounterIncrement();
+ break;
+ }
+ }
+
+ /* now release lock again, */
+ heap_close(rel, ExclusiveLock);
+
+ if (tuple == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+ errmsg("no free replication oid could be found")));
+
+ heap_freetuple(tuple);
+ return roident;
+}
+
+
+/*
+ * Drop replication origin.
+ *
+ * Needs to be called in a transaction.
+ */
+void
+replorigin_drop(RepOriginId roident)
+{
+ HeapTuple tuple = NULL;
+ Relation rel;
+ int i;
+
+ Assert(IsTransactionState());
+
+ rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
+
+ /* cleanup the slot state info */
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state = &replication_states[i];
+
+ /* found our slot */
+ if (state->roident == roident)
+ {
+ if (state->acquired_by != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("cannot drop replication origin with oid %d, in use by pid %d",
+ state->roident,
+ state->acquired_by)));
+ }
+
+ /* first WAL log */
+ {
+ xl_replorigin_drop xlrec;
+
+ xlrec.node_id = roident;
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
+ XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
+ }
+
+ /* then reset the in-memory entry */
+ state->roident = InvalidRepOriginId;
+ state->remote_lsn = InvalidXLogRecPtr;
+ state->local_lsn = InvalidXLogRecPtr;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationOriginLock);
+
+ tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
+ simple_heap_delete(rel, &tuple->t_self);
+ ReleaseSysCache(tuple);
+
+ CommandCounterIncrement();
+
+ /* now release lock again, */
+ heap_close(rel, ExclusiveLock);
+}
+
+
+/*
+ * Lookup replication origin via it's oid and return the name.
+ *
+ * The external name is palloc'd in the calling context.
+ *
+ * Returns true if the origin is known, false otherwise.
+ */
+bool
+replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
+{
+ HeapTuple tuple;
+ Form_pg_replication_origin ric;
+
+ Assert(OidIsValid((Oid) roident));
+ Assert(roident != InvalidRepOriginId);
+ Assert(roident != DoNotReplicateId);
+
+ tuple = SearchSysCache1(REPLORIGIDENT,
+ ObjectIdGetDatum((Oid) roident));
+
+ if (HeapTupleIsValid(tuple))
+ {
+ ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
+ *roname = text_to_cstring(&ric->roname);
+ ReleaseSysCache(tuple);
+
+ return true;
+ }
+ else
+ {
+ *roname = NULL;
+
+ if (!missing_ok)
+ elog(ERROR, "cache lookup failed for replication origin with oid %u",
+ roident);
+
+ return false;
+ }
+}
+
+
+/* ---------------------------------------------------------------------------
+ * Functions for handling replication progress.
+ * ---------------------------------------------------------------------------
+ */
+
+Size
+ReplicationOriginShmemSize(void)
+{
+ Size size = 0;
+
+ /*
+ * XXX: max_replication_slots is arguablethe wrong thing to use here, here
+ * we keep the replay state of *remote* transactions. But for now it seems
+ * sufficient to reuse it, lest we introduce a separate guc.
+ */
+ if (max_replication_slots == 0)
+ return size;
+
+ size = add_size(size, offsetof(ReplicationStateCtl, states));
+
+ size = add_size(size,
+ mul_size(max_replication_slots, sizeof(ReplicationState)));
+ return size;
+}
+
+void
+ReplicationOriginShmemInit(void)
+{
+ bool found;
+
+ if (max_replication_slots == 0)
+ return;
+
+ replication_states_ctl = (ReplicationStateCtl *)
+ ShmemInitStruct("ReplicationOriginState",
+ ReplicationOriginShmemSize(),
+ &found);
+ replication_states = replication_states_ctl->states;
+
+ if (!found)
+ {
+ int i;
+
+ replication_states_ctl->tranche_id = LWLockNewTrancheId();
+ replication_states_ctl->tranche.name = "ReplicationOrigins";
+ replication_states_ctl->tranche.array_base =
+ &replication_states[0].lock;
+ replication_states_ctl->tranche.array_stride =
+ sizeof(ReplicationState);
+
+ MemSet(replication_states, 0, ReplicationOriginShmemSize());
+
+ for (i = 0; i < max_replication_slots; i++)
+ LWLockInitialize(&replication_states[i].lock,
+ replication_states_ctl->tranche_id);
+ }
+
+ LWLockRegisterTranche(replication_states_ctl->tranche_id,
+ &replication_states_ctl->tranche);
+}
+
+/* ---------------------------------------------------------------------------
+ * Perform a checkpoint of each replication origin's progress with respect to
+ * the replayed remote_lsn. Make sure that all transactions we refer to in the
+ * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
+ * if the transactions were originally committed asynchronously.
+ *
+ * We store checkpoints in the following format:
+ * +-------+------------------------+------------------+-----+--------+
+ * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
+ * +-------+------------------------+------------------+-----+--------+
+ *
+ * So its just the magic, followed by the statically sized
+ * ReplicationStateOnDisk structs. Note that the maximum number of
+ * ReplicationStates is determined by max_replication_slots.
+ * ---------------------------------------------------------------------------
+ */
+void
+CheckPointReplicationOrigin(void)
+{
+ const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
+ const char *path = "pg_logical/replorigin_checkpoint";
+ int tmpfd;
+ int i;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+ pg_crc32c crc;
+
+ if (max_replication_slots == 0)
+ return;
+
+ INIT_CRC32C(crc);
+
+ /* make sure no old temp file is remaining */
+ if (unlink(tmppath) < 0 && errno != ENOENT)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not remove file \"%s\": %m",
+ path)));
+
+ /*
+ * no other backend can perform this at the same time, we're protected by
+ * CheckpointLock.
+ */
+ tmpfd = OpenTransientFile((char *) tmppath,
+ O_CREAT | O_EXCL | O_WRONLY | PG_BINARY,
+ S_IRUSR | S_IWUSR);
+ if (tmpfd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m",
+ tmppath)));
+
+ /* write magic */
+ if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ }
+ COMP_CRC32C(crc, &magic, sizeof(magic));
+
+ /* prevent concurrent creations/drops */
+ LWLockAcquire(ReplicationOriginLock, LW_SHARED);
+
+ /* write actual data */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationStateOnDisk disk_state;
+ ReplicationState *curstate = &replication_states[i];
+ XLogRecPtr local_lsn;
+
+ if (curstate->roident == InvalidRepOriginId)
+ continue;
+
+ LWLockAcquire(&curstate->lock, LW_SHARED);
+
+ disk_state.roident = curstate->roident;
+
+ disk_state.remote_lsn = curstate->remote_lsn;
+ local_lsn = curstate->local_lsn;
+
+ LWLockRelease(&curstate->lock);
+
+ /* make sure we only write out a commit that's persistent */
+ XLogFlush(local_lsn);
+
+ if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
+ sizeof(disk_state))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ }
+
+ COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+ }
+
+ LWLockRelease(ReplicationOriginLock);
+
+ /* write out the CRC */
+ FIN_CRC32C(crc);
+ if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m",
+ tmppath)));
+ }
+
+ /* fsync the temporary file */
+ if (pg_fsync(tmpfd) != 0)
+ {
+ CloseTransientFile(tmpfd);
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m",
+ tmppath)));
+ }
+
+ CloseTransientFile(tmpfd);
+
+ /* rename to permanent file, fsync file and directory */
+ if (rename(tmppath, path) != 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ }
+
+ fsync_fname((char *) path, false);
+ fsync_fname("pg_logical", true);
+}
+
+/*
+ * Recover replication replay status from checkpoint data saved earlier by
+ * CheckPointReplicationOrigin.
+ *
+ * This only needs to be called at startup and *not* during every checkpoint
+ * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
+ * state thereafter can be recovered by looking at commit records.
+ */
+void
+StartupReplicationOrigin(void)
+{
+ const char *path = "pg_logical/replorigin_checkpoint";
+ int fd;
+ int readBytes;
+ uint32 magic = REPLICATION_STATE_MAGIC;
+ int last_state = 0;
+ pg_crc32c file_crc;
+ pg_crc32c crc;
+
+ /* don't want to overwrite already existing state */
+#ifdef USE_ASSERT_CHECKING
+ static bool already_started = false;
+ Assert(!already_started);
+ already_started = true;
+#endif
+
+ if (max_replication_slots == 0)
+ return;
+
+ INIT_CRC32C(crc);
+
+ elog(DEBUG2, "starting up replication origin progress state");
+
+ fd = OpenTransientFile((char *) path, O_RDONLY | PG_BINARY, 0);
+
+ /*
+ * might have had max_replication_slots == 0 last run, or we just brought up a
+ * standby.
+ */
+ if (fd < 0 && errno == ENOENT)
+ return;
+ else if (fd < 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m",
+ path)));
+
+ /* verify magic, thats written even if nothing was active */
+ readBytes = read(fd, &magic, sizeof(magic));
+ if (readBytes != sizeof(magic))
+ ereport(PANIC,
+ (errmsg("could not read file \"%s\": %m",
+ path)));
+ COMP_CRC32C(crc, &magic, sizeof(magic));
+
+ if (magic != REPLICATION_STATE_MAGIC)
+ ereport(PANIC,
+ (errmsg("replication checkpoint has wrong magic %u instead of %u",
+ magic, REPLICATION_STATE_MAGIC)));
+
+ /* we can skip locking here, no other access is possible */
+
+ /* recover individual states, until there are no more to be found */
+ while (true)
+ {
+ ReplicationStateOnDisk disk_state;
+
+ readBytes = read(fd, &disk_state, sizeof(disk_state));
+
+ /* no further data */
+ if (readBytes == sizeof(crc))
+ {
+ /* not pretty, but simple ... */
+ file_crc = *(pg_crc32c*) &disk_state;
+ break;
+ }
+
+ if (readBytes < 0)
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ path)));
+ }
+
+ if (readBytes != sizeof(disk_state))
+ {
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": read %d of %zu",
+ path, readBytes, sizeof(disk_state))));
+ }
+
+ COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
+
+ if (last_state == max_replication_slots)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state could be found, increase max_replication_slots")));
+
+ /* copy data to shared memory */
+ replication_states[last_state].roident = disk_state.roident;
+ replication_states[last_state].remote_lsn = disk_state.remote_lsn;
+ last_state++;
+
+ elog(LOG, "recovered replication state of node %u to %X/%X",
+ disk_state.roident,
+ (uint32)(disk_state.remote_lsn >> 32),
+ (uint32)disk_state.remote_lsn);
+ }
+
+ /* now check checksum */
+ FIN_CRC32C(crc);
+ if (file_crc != crc)
+ ereport(PANIC,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("replication_slot_checkpoint has wrong checksum %u, expected %u",
+ crc, file_crc)));
+
+ CloseTransientFile(fd);
+}
+
+void
+replorigin_redo(XLogReaderState *record)
+{
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+ switch (info)
+ {
+ case XLOG_REPLORIGIN_SET:
+ {
+ xl_replorigin_set *xlrec =
+ (xl_replorigin_set *) XLogRecGetData(record);
+
+ replorigin_advance(xlrec->node_id,
+ xlrec->remote_lsn, record->EndRecPtr,
+ xlrec->force /* backward */,
+ false /* WAL log */);
+ break;
+ }
+ case XLOG_REPLORIGIN_DROP:
+ {
+ xl_replorigin_drop *xlrec;
+ int i;
+
+ xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state = &replication_states[i];
+
+ /* found our slot */
+ if (state->roident == xlrec->node_id)
+ {
+ /* reset entry */
+ state->roident = InvalidRepOriginId;
+ state->remote_lsn = InvalidXLogRecPtr;
+ state->local_lsn = InvalidXLogRecPtr;
+ break;
+ }
+ }
+ break;
+ }
+ default:
+ elog(PANIC, "replorigin_redo: unknown op code %u", info);
+ }
+}
+
+
+/*
+ * Tell the replication origin progress machinery that a commit from 'node'
+ * that originated at the LSN remote_commit on the remote node was replayed
+ * successfully and that we don't need to do so again. In combination with
+ * setting up replorigin_sesssion_origin_lsn and replorigin_sesssion_origin that ensures we
+ * won't loose knowledge about that after a crash if the the transaction had a
+ * persistent effect (think of asynchronous commits).
+ *
+ * local_commit needs to be a local LSN of the commit so that we can make sure
+ * uppon a checkpoint that enough WAL has been persisted to disk.
+ *
+ * Needs to be called with a RowExclusiveLock on pg_replication_origin,
+ * unless running in recovery.
+ */
+void
+replorigin_advance(RepOriginId node,
+ XLogRecPtr remote_commit, XLogRecPtr local_commit,
+ bool go_backward, bool wal_log)
+{
+ int i;
+ ReplicationState *replication_state = NULL;
+ ReplicationState *free_state = NULL;
+
+ Assert(node != InvalidRepOriginId);
+
+ /* we don't track DoNotReplicateId */
+ if (node == DoNotReplicateId)
+ return;
+
+ /*
+ * XXX: For the case where this is called by WAL replay, it'd be more
+ * efficient to restore into a backend local hashtable and only dump into
+ * shmem after recovery is finished. Let's wait with implementing that
+ * till it's shown to be a measurable expense
+ */
+
+ /* Lock exclusively, as we may have to create a new table entry. */
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ /*
+ * Search for either an existing slot for the origin, or a free one we can
+ * use.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *curstate = &replication_states[i];
+
+ /* remember where to insert if necessary */
+ if (curstate->roident == InvalidRepOriginId &&
+ free_state == NULL)
+ {
+ free_state = curstate;
+ continue;
+ }
+
+ /* not our slot */
+ if (curstate->roident != node)
+ {
+ continue;
+ }
+
+ /* ok, found slot */
+ replication_state = curstate;
+
+ LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
+
+ /* Make sure it's not used by somebody else */
+ if (replication_state->acquired_by != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication origin with oid %d is already active for pid %d",
+ replication_state->roident,
+ replication_state->acquired_by)));
+ }
+
+ break;
+ }
+
+ if (replication_state == NULL && free_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state slot could be found for replication origin with oid %u",
+ node),
+ errhint("Increase max_replication_slots and try again.")));
+
+ if (replication_state == NULL)
+ {
+ /* initialize new slot */
+ LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
+ replication_state = free_state;
+ Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(replication_state->local_lsn == InvalidXLogRecPtr);
+ replication_state->roident = node;
+ }
+
+ Assert(replication_state->roident != InvalidRepOriginId);
+
+ /*
+ * If somebody "forcefully" sets this slot, WAL log it, so it's durable
+ * and the standby gets the message. Primarily this will be called during
+ * WAL replay (of commit records) where no WAL logging is necessary.
+ */
+ if (wal_log)
+ {
+ xl_replorigin_set xlrec;
+ xlrec.remote_lsn = remote_commit;
+ xlrec.node_id = node;
+ xlrec.force = go_backward;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
+
+ XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
+ }
+
+ /*
+ * Due to - harmless - race conditions during a checkpoint we could see
+ * values here that are older than the ones we already have in
+ * memory. Don't overwrite those.
+ */
+ if (go_backward || replication_state->remote_lsn < remote_commit)
+ replication_state->remote_lsn = remote_commit;
+ if (local_commit != InvalidXLogRecPtr &&
+ (go_backward || replication_state->local_lsn < local_commit))
+ replication_state->local_lsn = local_commit;
+ LWLockRelease(&replication_state->lock);
+
+ /*
+ * Release *after* changing the LSNs, slot isn't acquired and thus could
+ * otherwise be dropped anytime.
+ */
+ LWLockRelease(ReplicationOriginLock);
+}
+
+
+XLogRecPtr
+replorigin_get_progress(RepOriginId node, bool flush)
+{
+ int i;
+ XLogRecPtr local_lsn = InvalidXLogRecPtr;
+ XLogRecPtr remote_lsn = InvalidXLogRecPtr;
+
+ /* prevent slots from being concurrently dropped */
+ LWLockAcquire(ReplicationOriginLock, LW_SHARED);
+
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state;
+
+ state = &replication_states[i];
+
+ if (state->roident == node)
+ {
+ LWLockAcquire(&state->lock, LW_SHARED);
+
+ remote_lsn = state->remote_lsn;
+ local_lsn = state->local_lsn;
+
+ LWLockRelease(&state->lock);
+
+ break;
+ }
+ }
+
+ LWLockRelease(ReplicationOriginLock);
+
+ if (flush && local_lsn != InvalidXLogRecPtr)
+ XLogFlush(local_lsn);
+
+ return remote_lsn;
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ if (session_replication_state != NULL &&
+ session_replication_state->acquired_by == MyProcPid)
+ {
+ session_replication_state->acquired_by = 0;
+ session_replication_state = NULL;
+ }
+
+ LWLockRelease(ReplicationOriginLock);
+}
+
+/*
+ * Setup a replication origin in the shared memory struct if it doesn't
+ * already exists and cache access to the specific ReplicationSlot so the
+ * array doesn't have to be searched when calling
+ * replorigin_session_advance().
+ *
+ * Obviously only one such cached origin can exist per process and the current
+ * cached value can only be set again after the previous value is torn down
+ * with replorigin_session_reset().
+ */
+void
+replorigin_session_setup(RepOriginId node)
+{
+ static bool registered_cleanup;
+ int i;
+ int free_slot = -1;
+
+ if (!registered_cleanup)
+ {
+ on_shmem_exit(ReplicationOriginExitCleanup, 0);
+ registered_cleanup = true;
+ }
+
+ Assert(max_replication_slots > 0);
+
+ if (session_replication_state != NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot setup replication origin when one is already setup")));
+
+ /* Lock exclusively, as we may have to create a new table entry. */
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ /*
+ * Search for either an existing slot for the origin, or a free one we can
+ * use.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *curstate = &replication_states[i];
+
+ /* remember where to insert if necessary */
+ if (curstate->roident == InvalidRepOriginId &&
+ free_slot == -1)
+ {
+ free_slot = i;
+ continue;
+ }
+
+ /* not our slot */
+ if (curstate->roident != node)
+ continue;
+
+ else if (curstate->acquired_by != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication identiefer %d is already active for pid %d",
+ curstate->roident, curstate->acquired_by)));
+ }
+
+ /* ok, found slot */
+ session_replication_state = curstate;
+ }
+
+
+ if (session_replication_state == NULL && free_slot == -1)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("no free replication state slot could be found for replication origin with oid %u",
+ node),
+ errhint("Increase max_replication_slots and try again.")));
+ else if (session_replication_state == NULL)
+ {
+ /* initialize new slot */
+ session_replication_state = &replication_states[free_slot];
+ Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
+ session_replication_state->roident = node;
+ }
+
+
+ Assert(session_replication_state->roident != InvalidRepOriginId);
+
+ session_replication_state->acquired_by = MyProcPid;
+
+ LWLockRelease(ReplicationOriginLock);
+}
+
+/*
+ * Reset replay state previously setup in this session.
+ *
+ * This function may only be called if a origin was setup with
+ * replorigin_session_setup().
+ */
+void
+replorigin_session_reset(void)
+{
+ Assert(max_replication_slots != 0);
+
+ if (session_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("no replication origin is configured")));
+
+ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
+
+ session_replication_state->acquired_by = 0;
+ session_replication_state = NULL;
+
+ LWLockRelease(ReplicationOriginLock);
+}
+
+/*
+ * Do the same work replorigin_advance() does, just on the session's
+ * configured origin.
+ *
+ * This is noticeably cheaper than using replorigin_advance().
+ */
+void
+replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
+{
+ Assert(session_replication_state != NULL);
+ Assert(session_replication_state->roident != InvalidRepOriginId);
+
+ LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
+ if (session_replication_state->local_lsn < local_commit)
+ session_replication_state->local_lsn = local_commit;
+ if (session_replication_state->remote_lsn < remote_commit)
+ session_replication_state->remote_lsn = remote_commit;
+ LWLockRelease(&session_replication_state->lock);
+}
+
+/*
+ * Ask the machinery about the point up to which we successfully replayed
+ * changes from a already setup replication origin.
+ */
+XLogRecPtr
+replorigin_session_get_progress(bool flush)
+{
+ XLogRecPtr remote_lsn;
+ XLogRecPtr local_lsn;
+
+ Assert(session_replication_state != NULL);
+
+ LWLockAcquire(&session_replication_state->lock, LW_SHARED);
+ remote_lsn = session_replication_state->remote_lsn;
+ local_lsn = session_replication_state->local_lsn;
+ LWLockRelease(&session_replication_state->lock);
+
+ if (flush && local_lsn != InvalidXLogRecPtr)
+ XLogFlush(local_lsn);
+
+ return remote_lsn;
+}
+
+
+
+/* ---------------------------------------------------------------------------
+ * SQL functions for working with replication origin.
+ *
+ * These mostly should be fairly short wrappers around more generic functions.
+ * ---------------------------------------------------------------------------
+ */
+
+/*
+ * Create replication origin for the passed in name, and return the assigned
+ * oid.
+ */
+Datum
+pg_replication_origin_create(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId roident;
+
+ replorigin_check_prerequisites(false, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ roident = replorigin_create(name);
+
+ pfree(name);
+
+ PG_RETURN_OID(roident);
+}
+
+/*
+ * Drop replication origin.
+ */
+Datum
+pg_replication_origin_drop(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId roident;
+
+ replorigin_check_prerequisites(false, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+
+ roident = replorigin_by_name(name, false);
+ Assert(OidIsValid(roident));
+
+ replorigin_drop(roident);
+
+ pfree(name);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Return oid of a replication origin.
+ */
+Datum
+pg_replication_origin_oid(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId roident;
+
+ replorigin_check_prerequisites(false, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ roident = replorigin_by_name(name, true);
+
+ pfree(name);
+
+ if (OidIsValid(roident))
+ PG_RETURN_OID(roident);
+ PG_RETURN_NULL();
+}
+
+/*
+ * Setup a replication origin for this session.
+ */
+Datum
+pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
+{
+ char *name;
+ RepOriginId origin;
+
+ replorigin_check_prerequisites(true, false);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ origin = replorigin_by_name(name, false);
+ replorigin_session_setup(origin);
+
+ replorigin_sesssion_origin = origin;
+
+ pfree(name);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Reset previously setup origin in this session
+ */
+Datum
+pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
+{
+ replorigin_check_prerequisites(true, false);
+
+ replorigin_session_reset();
+
+ /* FIXME */
+ replorigin_sesssion_origin = InvalidRepOriginId;
+ replorigin_sesssion_origin_lsn = InvalidXLogRecPtr;
+ replorigin_sesssion_origin_timestamp = 0;
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * Has a replication origin been setup for this session.
+ */
+Datum
+pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
+{
+ replorigin_check_prerequisites(false, false);
+
+ PG_RETURN_BOOL(replorigin_sesssion_origin != InvalidRepOriginId);
+}
+
+
+/*
+ * Return the replication progress for origin setup in the current session.
+ *
+ * If 'flush' is set to true it is ensured that the returned value corresponds
+ * to a local transaction that has been flushed. this is useful if asychronous
+ * commits are used when replaying replicated transactions.
+ */
+Datum
+pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr remote_lsn = InvalidXLogRecPtr;
+ bool flush = PG_GETARG_BOOL(0);
+
+ replorigin_check_prerequisites(true, false);
+
+ if (session_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("no replication origin is configured")));
+
+ remote_lsn = replorigin_session_get_progress(flush);
+
+ if (remote_lsn == InvalidXLogRecPtr)
+ PG_RETURN_NULL();
+
+ PG_RETURN_LSN(remote_lsn);
+}
+
+Datum
+pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
+{
+ XLogRecPtr location = PG_GETARG_LSN(0);
+
+ replorigin_check_prerequisites(true, false);
+
+ if (session_replication_state == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("no replication origin is configured")));
+
+ replorigin_sesssion_origin_lsn = location;
+ replorigin_sesssion_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+
+ PG_RETURN_VOID();
+}
+
+Datum
+pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
+{
+ replorigin_check_prerequisites(true, false);
+
+ replorigin_sesssion_origin_lsn = InvalidXLogRecPtr;
+ replorigin_sesssion_origin_timestamp = 0;
+
+ PG_RETURN_VOID();
+}
+
+
+Datum
+pg_replication_origin_advance(PG_FUNCTION_ARGS)
+{
+ text *name = PG_GETARG_TEXT_P(0);
+ XLogRecPtr remote_commit = PG_GETARG_LSN(1);
+ RepOriginId node;
+
+ replorigin_check_prerequisites(true, false);
+
+ /* lock to prevent the replication origin from vanishing */
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+ node = replorigin_by_name(text_to_cstring(name), false);
+
+ /*
+ * Can't sensibly pass a local commit to be flushed at checkpoint - this
+ * xact hasn't committed yet. This is why this function should be used to
+ * set up the intial replication state, but not for replay.
+ */
+ replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
+ true /* go backward */, true /* wal log */);
+
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
+
+
+/*
+ * Return the replication progress for an individual replication origin.
+ *
+ * If 'flush' is set to true it is ensured that the returned value corresponds
+ * to a local transaction that has been flushed. this is useful if asychronous
+ * commits are used when replaying replicated transactions.
+ */
+Datum
+pg_replication_origin_progress(PG_FUNCTION_ARGS)
+{
+ char *name;
+ bool flush;
+ RepOriginId roident;
+ XLogRecPtr remote_lsn = InvalidXLogRecPtr;
+
+ replorigin_check_prerequisites(true, true);
+
+ name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
+ flush = PG_GETARG_BOOL(1);
+
+ roident = replorigin_by_name(name, false);
+ Assert(OidIsValid(roident));
+
+ remote_lsn = replorigin_get_progress(roident, flush);
+
+ if (remote_lsn == InvalidXLogRecPtr)
+ PG_RETURN_NULL();
+
+ PG_RETURN_LSN(remote_lsn);
+}
+
+
+Datum
+pg_show_replication_origin_status(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ int i;
+#define REPLICATION_ORIGIN_PROGRESS_COLS 4
+
+ /* we we want to return 0 rows if slot is set to zero */
+ replorigin_check_prerequisites(false, true);
+
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
+ elog(ERROR, "wrong function definition");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+
+ /* prevent slots from being concurrently dropped */
+ LWLockAcquire(ReplicationOriginLock, LW_SHARED);
+
+ /*
+ * Iterate through all possible replication_states, display if they are
+ * filled. Note that we do not take any locks, so slightly corrupted/out
+ * of date values are a possibility.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationState *state;
+ Datum values[REPLICATION_ORIGIN_PROGRESS_COLS];
+ bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
+ char *roname;
+
+ state = &replication_states[i];
+
+ /* unused slot, nothing to display */
+ if (state->roident == InvalidRepOriginId)
+ continue;
+
+ memset(values, 0, sizeof(values));
+ memset(nulls, 1, sizeof(nulls));
+
+ values[0] = ObjectIdGetDatum(state->roident);
+ nulls[0] = false;
+
+ /*
+ * We're not preventing the origin to be dropped concurrently, so
+ * silently accept that it might be gone.
+ */
+ if (replorigin_by_oid(state->roident, true,
+ &roname))
+ {
+ values[1] = CStringGetTextDatum(roname);
+ nulls[1] = false;
+ }
+
+ LWLockAcquire(&state->lock, LW_SHARED);
+
+ values[ 2] = LSNGetDatum(state->remote_lsn);
+ nulls[2] = false;
+
+ values[3] = LSNGetDatum(state->local_lsn);
+ nulls[3] = false;
+
+ LWLockRelease(&state->lock);
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ LWLockRelease(ReplicationOriginLock);
+
+#undef REPLICATION_ORIGIN_PROGRESS_COLS
+
+ return (Datum) 0;
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index dc855830c4..c9c1d1036e 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1255,7 +1255,8 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time)
+ TimestampTz commit_time,
+ RepOriginId origin_id, XLogRecPtr origin_lsn)
{
ReorderBufferTXN *txn;
volatile Snapshot snapshot_now;
@@ -1273,6 +1274,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->final_lsn = commit_lsn;
txn->end_lsn = end_lsn;
txn->commit_time = commit_time;
+ txn->origin_id = origin_id;
+ txn->origin_lsn = origin_lsn;
/* serialize the last bunch of changes if we need start earlier anyway */
if (txn->nentries_mem != txn->nentries)
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 16b9808686..32ac58f7d1 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -31,6 +31,7 @@
#include "replication/slot.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/origin.h"
#include "storage/bufmgr.h"
#include "storage/dsm.h"
#include "storage/ipc.h"
@@ -132,6 +133,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, ReplicationSlotsShmemSize());
+ size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, BTreeShmemSize());
@@ -238,6 +240,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
CheckpointerShmemInit();
AutoVacuumShmemInit();
ReplicationSlotsShmemInit();
+ ReplicationOriginShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index 644bbcc167..f58e1cebf2 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -54,6 +54,7 @@
#include "catalog/pg_shdepend.h"
#include "catalog/pg_shdescription.h"
#include "catalog/pg_shseclabel.h"
+#include "catalog/pg_replication_origin.h"
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_transform.h"
@@ -621,6 +622,28 @@ static const struct cachedesc cacheinfo[] = {
},
128
},
+ {ReplicationOriginRelationId, /* REPLORIGIDENT */
+ ReplicationOriginIdentIndex,
+ 1,
+ {
+ Anum_pg_replication_origin_roident,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
+ {ReplicationOriginRelationId, /* REPLORIGNAME */
+ ReplicationOriginNameIndex,
+ 1,
+ {
+ Anum_pg_replication_origin_roname,
+ 0,
+ 0,
+ 0
+ },
+ 16
+ },
{RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId,
2,
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index a0805d86b0..4a22575736 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -56,6 +56,8 @@
#include "common/restricted_token.h"
#include "storage/large_object.h"
#include "pg_getopt.h"
+#include "replication/logical.h"
+#include "replication/origin.h"
static ControlFileData ControlFile; /* pg_control values */
@@ -1091,6 +1093,7 @@ WriteEmptyXLOG(void)
record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
+
recptr += SizeOfXLogRecord;
*(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
*(recptr++) = sizeof(CheckPoint);
diff --git a/src/include/access/commit_ts.h b/src/include/access/commit_ts.h
index 93d1217f76..ad44db357a 100644
--- a/src/include/access/commit_ts.h
+++ b/src/include/access/commit_ts.h
@@ -13,6 +13,7 @@
#include "access/xlog.h"
#include "datatype/timestamp.h"
+#include "replication/origin.h"
#include "utils/guc.h"
@@ -21,18 +22,13 @@ extern PGDLLIMPORT bool track_commit_timestamp;
extern bool check_track_commit_timestamp(bool *newval, void **extra,
GucSource source);
-typedef uint32 CommitTsNodeId;
-#define InvalidCommitTsNodeId 0
-
-extern void CommitTsSetDefaultNodeId(CommitTsNodeId nodeid);
-extern CommitTsNodeId CommitTsGetDefaultNodeId(void);
extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- CommitTsNodeId nodeid, bool do_xlog);
+ RepOriginId nodeid, bool do_xlog);
extern bool TransactionIdGetCommitTsData(TransactionId xid,
- TimestampTz *ts, CommitTsNodeId *nodeid);
+ TimestampTz *ts, RepOriginId *nodeid);
extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
- CommitTsNodeId *nodeid);
+ RepOriginId *nodeid);
extern Size CommitTsShmemBuffers(void);
extern Size CommitTsShmemSize(void);
@@ -58,7 +54,7 @@ extern void AdvanceOldestCommitTs(TransactionId oldestXact);
typedef struct xl_commit_ts_set
{
TimestampTz timestamp;
- CommitTsNodeId nodeid;
+ RepOriginId nodeid;
TransactionId mainxid;
/* subxact Xids follow */
} xl_commit_ts_set;
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 48f04c6171..47033da017 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -44,3 +44,4 @@ PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 8da6aa952f..cad1bb1d31 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -131,6 +131,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XACT_XINFO_HAS_RELFILENODES (1U << 2)
#define XACT_XINFO_HAS_INVALS (1U << 3)
#define XACT_XINFO_HAS_TWOPHASE (1U << 4)
+#define XACT_XINFO_HAS_ORIGIN (1U << 5)
/*
* Also stored in xinfo, these indicating a variety of additional actions that
@@ -217,6 +218,12 @@ typedef struct xl_xact_twophase
} xl_xact_twophase;
#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
+typedef struct xl_xact_origin
+{
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
+} xl_xact_origin;
+
typedef struct xl_xact_commit
{
TimestampTz xact_time; /* time of commit */
@@ -227,6 +234,7 @@ typedef struct xl_xact_commit
/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES */
/* xl_xact_invals follows if XINFO_HAS_INVALS */
/* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */
+ /* xl_xact_origin follows if XINFO_HAS_ORIGIN */
} xl_xact_commit;
#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
@@ -267,6 +275,9 @@ typedef struct xl_xact_parsed_commit
SharedInvalidationMessage *msgs;
TransactionId twophase_xid; /* only for 2PC */
+
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
} xl_xact_parsed_commit;
typedef struct xl_xact_parsed_abort
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2b1f42389c..f08b6767ed 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -85,6 +85,7 @@ typedef enum
} RecoveryTargetType;
extern XLogRecPtr XactLastRecEnd;
+extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd;
extern bool reachedConsistency;
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index deca1de67b..75cf435e90 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD083 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD085 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 6638c1d422..18a3e7ca90 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -44,6 +44,12 @@ typedef uint64 XLogSegNo;
*/
typedef uint32 TimeLineID;
+/*
+ * Replication origin id - this is located in this file to avoid having to
+ * include origin.h in a bunch of xlog related places.
+ */
+typedef uint16 RepOriginId;
+
/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 6864c95b2c..ac609298cc 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -39,6 +39,7 @@
/* prototypes for public functions in xloginsert.c: */
extern void XLogBeginInsert(void);
+extern void XLogIncludeOrigin(void);
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
extern void XLogEnsureRecordSpace(int nbuffers, int ndatas);
extern void XLogRegisterData(char *data, int len);
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 609bfe3e40..5164abec75 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -127,6 +127,8 @@ struct XLogReaderState
uint32 main_data_len; /* main data portion's length */
uint32 main_data_bufsz; /* allocated size of the buffer */
+ RepOriginId record_origin;
+
/* information about blocks referenced by the record. */
DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
@@ -186,6 +188,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
+#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
#define XLogRecGetData(decoder) ((decoder)->main_data)
#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index b487ae0cc8..7a049f0e97 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -212,5 +212,6 @@ typedef struct XLogRecordDataHeaderLong
#define XLR_BLOCK_ID_DATA_SHORT 255
#define XLR_BLOCK_ID_DATA_LONG 254
+#define XLR_BLOCK_ID_ORIGIN 253
#endif /* XLOGRECORD_H */
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index b36e4edd84..e8334025e1 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201504261
+#define CATALOG_VERSION_NO 201504291
#endif
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index a234bde293..71e0010a6f 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -310,6 +310,12 @@ DECLARE_UNIQUE_INDEX(pg_policy_oid_index, 3257, on pg_policy using btree(oid oid
DECLARE_UNIQUE_INDEX(pg_policy_polrelid_polname_index, 3258, on pg_policy using btree(polrelid oid_ops, polname name_ops));
#define PolicyPolrelidPolnameIndexId 3258
+DECLARE_UNIQUE_INDEX(pg_replication_origin_roiident_index, 6001, on pg_replication_origin using btree(roident oid_ops));
+#define ReplicationOriginIdentIndex 6001
+
+DECLARE_UNIQUE_INDEX(pg_replication_origin_roname_index, 6002, on pg_replication_origin using btree(roname varchar_pattern_ops));
+#define ReplicationOriginNameIndex 6002
+
/* last step of initialization script: build the indexes declared above */
BUILD_INDICES
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index e97e6b1944..55c246e73d 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5203,6 +5203,42 @@ DESCR("for use by pg_upgrade");
DATA(insert OID = 3591 ( binary_upgrade_create_empty_extension PGNSP PGUID 12 1 0 0 0 f f f f f f v 7 0 2278 "25 25 16 25 1028 1009 1009" _null_ _null_ _null_ _null_ _null_ binary_upgrade_create_empty_extension _null_ _null_ _null_ ));
DESCR("for use by pg_upgrade");
+/* replication/origin.h */
+DATA(insert OID = 6003 ( pg_replication_origin_create PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_create _null_ _null_ _null_ ));
+DESCR("create a replication origin");
+
+DATA(insert OID = 6004 ( pg_replication_origin_drop PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_drop _null_ _null_ _null_ ));
+DESCR("drop replication origin identified by its name");
+
+DATA(insert OID = 6005 ( pg_replication_origin_oid PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 26 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_oid _null_ _null_ _null_ ));
+DESCR("translate the replication origin's name to its id");
+
+DATA(insert OID = 6006 ( pg_replication_origin_session_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 2278 "25" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_setup _null_ _null_ _null_ ));
+DESCR("configure session to maintain replication progress tracking for the passed in origin");
+
+DATA(insert OID = 6007 ( pg_replication_origin_session_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2278 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_reset _null_ _null_ _null_ ));
+DESCR("teardown configured replication progress tracking");
+
+DATA(insert OID = 6008 ( pg_replication_origin_session_is_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_is_setup _null_ _null_ _null_ ));
+DESCR("is a replication origin configured in this session");
+
+DATA(insert OID = 6009 ( pg_replication_origin_session_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 3220 "16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_session_progress _null_ _null_ _null_ ));
+DESCR("get the replication progress of the current session");
+
+DATA(insert OID = 6010 ( pg_replication_origin_xact_setup PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_setup _null_ _null_ _null_ ));
+DESCR("setup the transaction's origin lsn and timestamp");
+
+DATA(insert OID = 6011 ( pg_replication_origin_xact_reset PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "3220 1184" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_xact_reset _null_ _null_ _null_ ));
+DESCR("reset the transaction's origin lsn and timestamp");
+
+DATA(insert OID = 6012 ( pg_replication_origin_advance PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 2278 "25 3220" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_advance _null_ _null_ _null_ ));
+DESCR("advance replication itentifier to specific location");
+
+DATA(insert OID = 6013 ( pg_replication_origin_progress PGNSP PGUID 12 1 0 0 0 f f f f t f v 2 0 3220 "25 16" _null_ _null_ _null_ _null_ _null_ pg_replication_origin_progress _null_ _null_ _null_ ));
+DESCR("get an individual replication origin's replication progress");
+
+DATA(insert OID = 6014 ( pg_show_replication_origin_status PGNSP PGUID 12 1 100 0 0 f f f f f t v 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ _null_ pg_show_replication_origin_status _null_ _null_ _null_ ));
+DESCR("get progress for all replication origins");
/*
* Symbolic values for provolatile column: these indicate whether the result
diff --git a/src/include/catalog/pg_replication_origin.h b/src/include/catalog/pg_replication_origin.h
new file mode 100644
index 0000000000..3483809034
--- /dev/null
+++ b/src/include/catalog/pg_replication_origin.h
@@ -0,0 +1,70 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_replication_origin.h
+ * Persistent replication origin registry
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_replication_origin.h
+ *
+ * NOTES
+ * the genbki.pl script reads this file and generates .bki
+ * information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_REPLICATION_ORIGIN_H
+#define PG_REPLICATION_ORIGIN_H
+
+#include "catalog/genbki.h"
+#include "access/xlogdefs.h"
+
+/* ----------------
+ * pg_replication_origin. cpp turns this into
+ * typedef struct FormData_pg_replication_origin
+ * ----------------
+ */
+#define ReplicationOriginRelationId 6000
+
+CATALOG(pg_replication_origin,6000) BKI_SHARED_RELATION BKI_WITHOUT_OIDS
+{
+ /*
+ * Locally known id that get included into WAL.
+ *
+ * This should never leave the system.
+ *
+ * Needs to fit into a uint16, so we don't waste too much space in WAL
+ * records. For this reason we don't use a normal Oid column here, since
+ * we need to handle allocation of new values manually.
+ */
+ Oid roident;
+
+ /*
+ * Variable-length fields start here, but we allow direct access to
+ * roname.
+ */
+
+ /* external, free-format, name */
+ text roname BKI_FORCE_NOT_NULL;
+
+#ifdef CATALOG_VARLEN /* further variable-length fields */
+#endif
+} FormData_pg_replication_origin;
+
+typedef FormData_pg_replication_origin *Form_pg_replication_origin;
+
+/* ----------------
+ * compiler constants for pg_replication_origin
+ * ----------------
+ */
+#define Natts_pg_replication_origin 2
+#define Anum_pg_replication_origin_roident 1
+#define Anum_pg_replication_origin_roname 2
+
+/* ----------------
+ * pg_replication_origin has no initial contents
+ * ----------------
+ */
+
+#endif /* PG_REPLICATION_ORIGIN_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index cce4394d4e..dfdbe6535f 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -97,4 +97,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
+extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+
#endif
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
new file mode 100644
index 0000000000..ca26bc3e64
--- /dev/null
+++ b/src/include/replication/origin.h
@@ -0,0 +1,86 @@
+/*-------------------------------------------------------------------------
+ * origin.h
+ * Exports from replication/logical/origin.c
+ *
+ * Copyright (c) 2013-2015, PostgreSQL Global Development Group
+ *
+ * src/include/replication/origin.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_ORIGIN_H
+#define PG_ORIGIN_H
+
+#include "access/xlogdefs.h"
+#include "catalog/pg_replication_origin.h"
+#include "replication/logical.h"
+
+typedef struct xl_replorigin_set
+{
+ XLogRecPtr remote_lsn;
+ RepOriginId node_id;
+ bool force;
+} xl_replorigin_set;
+
+typedef struct xl_replorigin_drop
+{
+ RepOriginId node_id;
+} xl_replorigin_drop;
+
+#define XLOG_REPLORIGIN_SET 0x00
+#define XLOG_REPLORIGIN_DROP 0x10
+
+#define InvalidRepOriginId 0
+#define DoNotReplicateId UINT16_MAX
+
+extern PGDLLIMPORT RepOriginId replorigin_sesssion_origin;
+extern PGDLLIMPORT XLogRecPtr replorigin_sesssion_origin_lsn;
+extern PGDLLIMPORT TimestampTz replorigin_sesssion_origin_timestamp;
+
+/* API for querying & manipulating replication origins */
+extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
+extern RepOriginId replorigin_create(char *name);
+extern void replorigin_drop(RepOriginId roident);
+extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
+ char **roname);
+
+/* API for querying & manipulating replication progress tracking */
+extern void replorigin_advance(RepOriginId node,
+ XLogRecPtr remote_commit,
+ XLogRecPtr local_commit,
+ bool go_backward, bool wal_log);
+extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
+
+extern void replorigin_session_advance(XLogRecPtr remote_commit,
+ XLogRecPtr local_commit);
+extern void replorigin_session_setup(RepOriginId node);
+extern void replorigin_session_reset(void);
+extern XLogRecPtr replorigin_session_get_progress(bool flush);
+
+/* Checkpoint/Startup integration */
+extern void CheckPointReplicationOrigin(void);
+extern void StartupReplicationOrigin(void);
+
+/* WAL logging */
+void replorigin_redo(XLogReaderState *record);
+void replorigin_desc(StringInfo buf, XLogReaderState *record);
+const char * replorigin_identify(uint8 info);
+
+/* shared memory allocation */
+extern Size ReplicationOriginShmemSize(void);
+extern void ReplicationOriginShmemInit(void);
+
+/* SQL callable functions */
+extern Datum pg_replication_origin_create(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_drop(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_oid(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_setup(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_reset(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_session_progress(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_xact_setup(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_xact_reset(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_advance(PG_FUNCTION_ARGS);
+extern Datum pg_replication_origin_progress(PG_FUNCTION_ARGS);
+extern Datum pg_show_replication_origin_status(PG_FUNCTION_ARGS);
+
+#endif /* PG_ORIGIN_H */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 0935c1bac3..bec1a56017 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -73,6 +73,13 @@ typedef void (*LogicalDecodeCommitCB) (
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+/*
+ * Filter changes by origin.
+ */
+typedef bool (*LogicalDecodeFilterByOriginCB) (
+ struct LogicalDecodingContext *,
+ RepOriginId origin_id);
+
/*
* Called to shutdown an output plugin.
*/
@@ -89,6 +96,7 @@ typedef struct OutputPluginCallbacks
LogicalDecodeBeginCB begin_cb;
LogicalDecodeChangeCB change_cb;
LogicalDecodeCommitCB commit_cb;
+ LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index f1e0f57e7c..6a5528a734 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -68,6 +68,8 @@ typedef struct ReorderBufferChange
/* The type of change. */
enum ReorderBufferChangeType action;
+ RepOriginId origin_id;
+
/*
* Context data for the change, which part of the union is valid depends
* on action/action_internal.
@@ -166,6 +168,10 @@ typedef struct ReorderBufferTXN
*/
XLogRecPtr restart_decoding_lsn;
+ /* origin of the change that caused this transaction */
+ RepOriginId origin_id;
+ XLogRecPtr origin_lsn;
+
/*
* Commit time, only known when we read the actual commit record.
*/
@@ -339,7 +345,7 @@ void ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time);
+ TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index e3c2efc1f3..cff3b99922 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -134,8 +134,9 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
#define ReplicationSlotControlLock (&MainLWLockArray[37].lock)
#define CommitTsControlLock (&MainLWLockArray[38].lock)
#define CommitTsLock (&MainLWLockArray[39].lock)
+#define ReplicationOriginLock (&MainLWLockArray[40].lock)
-#define NUM_INDIVIDUAL_LWLOCKS 40
+#define NUM_INDIVIDUAL_LWLOCKS 41
/*
* It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index ff9a4f2af3..6634099cbe 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -77,6 +77,8 @@ enum SysCacheIdentifier
RANGETYPE,
RELNAMENSP,
RELOID,
+ REPLORIGIDENT,
+ REPLORIGNAME,
RULERELNAME,
STATRELATTINH,
TABLESPACEOID,
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 25095e5b70..f7f016be21 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1390,6 +1390,11 @@ pg_prepared_xacts| SELECT p.transaction,
FROM ((pg_prepared_xact() p(transaction, gid, prepared, ownerid, dbid)
LEFT JOIN pg_authid u ON ((p.ownerid = u.oid)))
LEFT JOIN pg_database d ON ((p.dbid = d.oid)));
+pg_replication_origin_status| SELECT pg_show_replication_origin_status.local_id,
+ pg_show_replication_origin_status.external_id,
+ pg_show_replication_origin_status.remote_lsn,
+ pg_show_replication_origin_status.local_lsn
+ FROM pg_show_replication_origin_status() pg_show_replication_origin_status(local_id, external_id, remote_lsn, local_lsn);
pg_replication_slots| SELECT l.slot_name,
l.plugin,
l.slot_type,
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index 52c9e778cb..eb0bc88ef1 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -121,6 +121,7 @@ pg_pltemplate|t
pg_policy|t
pg_proc|t
pg_range|t
+pg_replication_origin|t
pg_rewrite|t
pg_seclabel|t
pg_shdepend|t