Include replication origins in SQL functions for commit timestamp
This includes two changes: - Addition of a new function pg_xact_commit_timestamp_origin() able, for a given transaction ID, to return the commit timestamp and replication origin of this transaction. An equivalent function existed in pglogical. - Addition of the replication origin to pg_last_committed_xact(). The commit timestamp manager includes already APIs able to return the replication origin of a transaction on top of its commit timestamp, but the code paths for replication origins were never stressed as those functions have never looked for a replication origin, and the SQL functions available have never included this information since their introduction in 73c986a. While on it, refactor a test of modules/commit_ts/ to use tstzrange() to check that a transaction timestamp is within the wanted range, making the test a bit easier to read. Bump catalog version. Author: Movead Li Reviewed-by: Madan Kumar, Michael Paquier Discussion: https://postgr.es/m/2020051116430836450630@highgo.ca
This commit is contained in:
parent
cd22d3cdb9
commit
b1e48bbe64
@ -23397,6 +23397,21 @@ SELECT collation for ('foo' COLLATE "de_DE");
|
||||
</para></entry>
|
||||
</row>
|
||||
|
||||
<row>
|
||||
<entry role="func_table_entry"><para role="func_signature">
|
||||
<indexterm>
|
||||
<primary>pg_xact_commit_timestamp_origin</primary>
|
||||
</indexterm>
|
||||
<function>pg_xact_commit_timestamp_origin</function> ( <type>xid</type> )
|
||||
<returnvalue>record</returnvalue>
|
||||
( <parameter>timestamp</parameter> <type>timestamp with time zone</type>,
|
||||
<parameter>roident</parameter> <type>oid</type>)
|
||||
</para>
|
||||
<para>
|
||||
Returns the commit timestamp and replication origin of a transaction.
|
||||
</para></entry>
|
||||
</row>
|
||||
|
||||
<row>
|
||||
<entry role="func_table_entry"><para role="func_signature">
|
||||
<indexterm>
|
||||
@ -23405,11 +23420,12 @@ SELECT collation for ('foo' COLLATE "de_DE");
|
||||
<function>pg_last_committed_xact</function> ()
|
||||
<returnvalue>record</returnvalue>
|
||||
( <parameter>xid</parameter> <type>xid</type>,
|
||||
<parameter>timestamp</parameter> <type>timestamp with time zone</type> )
|
||||
<parameter>timestamp</parameter> <type>timestamp with time zone</type>,
|
||||
<parameter>roident</parameter> <type>oid</type> )
|
||||
</para>
|
||||
<para>
|
||||
Returns the transaction ID and commit timestamp of the latest
|
||||
committed transaction.
|
||||
Returns the transaction ID, commit timestamp and replication origin
|
||||
of the latest committed transaction.
|
||||
</para></entry>
|
||||
</row>
|
||||
</tbody>
|
||||
|
@ -361,7 +361,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
|
||||
* is concerned, anyway; it's up to the caller to ensure the value is useful
|
||||
* for its purposes.)
|
||||
*
|
||||
* ts and extra are filled with the corresponding data; they can be passed
|
||||
* ts and nodeid are filled with the corresponding data; they can be passed
|
||||
* as NULL if not wanted.
|
||||
*/
|
||||
TransactionId
|
||||
@ -417,28 +417,38 @@ pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* pg_last_committed_xact
|
||||
*
|
||||
* SQL-callable wrapper to obtain some information about the latest
|
||||
* committed transaction: transaction ID, timestamp and replication
|
||||
* origin.
|
||||
*/
|
||||
Datum
|
||||
pg_last_committed_xact(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TransactionId xid;
|
||||
RepOriginId nodeid;
|
||||
TimestampTz ts;
|
||||
Datum values[2];
|
||||
bool nulls[2];
|
||||
Datum values[3];
|
||||
bool nulls[3];
|
||||
TupleDesc tupdesc;
|
||||
HeapTuple htup;
|
||||
|
||||
/* and construct a tuple with our data */
|
||||
xid = GetLatestCommitTsData(&ts, NULL);
|
||||
xid = GetLatestCommitTsData(&ts, &nodeid);
|
||||
|
||||
/*
|
||||
* Construct a tuple descriptor for the result row. This must match this
|
||||
* function's pg_proc entry!
|
||||
*/
|
||||
tupdesc = CreateTemplateTupleDesc(2);
|
||||
tupdesc = CreateTemplateTupleDesc(3);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
|
||||
XIDOID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
|
||||
TIMESTAMPTZOID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "roident",
|
||||
OIDOID, -1, 0);
|
||||
tupdesc = BlessTupleDesc(tupdesc);
|
||||
|
||||
if (!TransactionIdIsNormal(xid))
|
||||
@ -452,6 +462,9 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
|
||||
|
||||
values[1] = TimestampTzGetDatum(ts);
|
||||
nulls[1] = false;
|
||||
|
||||
values[2] = ObjectIdGetDatum((Oid) nodeid);
|
||||
nulls[2] = false;
|
||||
}
|
||||
|
||||
htup = heap_form_tuple(tupdesc, values, nulls);
|
||||
@ -459,6 +472,54 @@ pg_last_committed_xact(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(htup));
|
||||
}
|
||||
|
||||
/*
|
||||
* pg_xact_commit_timestamp_origin
|
||||
*
|
||||
* SQL-callable wrapper to obtain commit timestamp and replication origin
|
||||
* of a given transaction.
|
||||
*/
|
||||
Datum
|
||||
pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TransactionId xid = PG_GETARG_UINT32(0);
|
||||
RepOriginId nodeid;
|
||||
TimestampTz ts;
|
||||
Datum values[2];
|
||||
bool nulls[2];
|
||||
TupleDesc tupdesc;
|
||||
HeapTuple htup;
|
||||
bool found;
|
||||
|
||||
found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
|
||||
|
||||
/*
|
||||
* Construct a tuple descriptor for the result row. This must match this
|
||||
* function's pg_proc entry!
|
||||
*/
|
||||
tupdesc = CreateTemplateTupleDesc(2);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
|
||||
TIMESTAMPTZOID, -1, 0);
|
||||
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "roident",
|
||||
OIDOID, -1, 0);
|
||||
tupdesc = BlessTupleDesc(tupdesc);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
memset(nulls, true, sizeof(nulls));
|
||||
}
|
||||
else
|
||||
{
|
||||
values[0] = TimestampTzGetDatum(ts);
|
||||
nulls[0] = false;
|
||||
|
||||
values[1] = ObjectIdGetDatum((Oid) nodeid);
|
||||
nulls[1] = false;
|
||||
}
|
||||
|
||||
htup = heap_form_tuple(tupdesc, values, nulls);
|
||||
|
||||
PG_RETURN_DATUM(HeapTupleGetDatum(htup));
|
||||
}
|
||||
|
||||
/*
|
||||
* Number of shared CommitTS buffers.
|
||||
|
@ -53,6 +53,6 @@
|
||||
*/
|
||||
|
||||
/* yyyymmddN */
|
||||
#define CATALOG_VERSION_NO 202007072
|
||||
#define CATALOG_VERSION_NO 202007121
|
||||
|
||||
#endif
|
||||
|
@ -5946,12 +5946,21 @@
|
||||
prorettype => 'timestamptz', proargtypes => 'xid',
|
||||
prosrc => 'pg_xact_commit_timestamp' },
|
||||
|
||||
{ oid => '8456',
|
||||
descr => 'get commit timestamp and replication origin of a transaction',
|
||||
proname => 'pg_xact_commit_timestamp_origin', provolatile => 'v',
|
||||
prorettype => 'record', proargtypes => 'xid',
|
||||
proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{i,o,o}',
|
||||
proargnames => '{xid,timestamp,roident}',
|
||||
prosrc => 'pg_xact_commit_timestamp_origin' },
|
||||
|
||||
{ oid => '3583',
|
||||
descr => 'get transaction Id and commit timestamp of latest transaction commit',
|
||||
descr => 'get transaction Id, commit timestamp and replication origin of latest transaction commit',
|
||||
proname => 'pg_last_committed_xact', provolatile => 'v',
|
||||
prorettype => 'record', proargtypes => '',
|
||||
proallargtypes => '{xid,timestamptz}', proargmodes => '{o,o}',
|
||||
proargnames => '{xid,timestamp}', prosrc => 'pg_last_committed_xact' },
|
||||
proallargtypes => '{xid,timestamptz,oid}', proargmodes => '{o,o,o}',
|
||||
proargnames => '{xid,timestamp,roident}',
|
||||
prosrc => 'pg_last_committed_xact' },
|
||||
|
||||
{ oid => '3537', descr => 'get identification of SQL object',
|
||||
proname => 'pg_describe_object', provolatile => 's', prorettype => 'text',
|
||||
|
@ -39,9 +39,94 @@ SELECT pg_xact_commit_timestamp('2'::xid);
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
|
||||
?column? | ?column? | ?column?
|
||||
----------+----------+----------
|
||||
t | t | t
|
||||
SELECT x.xid::text::bigint > 0 as xid_valid,
|
||||
x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_last_committed_xact() x;
|
||||
xid_valid | ts_in_range | valid_roident
|
||||
-----------+-------------+---------------
|
||||
t | t | f
|
||||
(1 row)
|
||||
|
||||
-- Test non-normal transaction ids.
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
|
||||
timestamp | roident
|
||||
-----------+---------
|
||||
|
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
|
||||
ERROR: cannot retrieve commit timestamp for transaction 0
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
|
||||
timestamp | roident
|
||||
-----------+---------
|
||||
|
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
|
||||
timestamp | roident
|
||||
-----------+---------
|
||||
|
|
||||
(1 row)
|
||||
|
||||
-- Test transaction without replication origin
|
||||
SELECT txid_current() as txid_no_origin \gset
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_last_committed_xact() x;
|
||||
ts_in_range | valid_roident
|
||||
-------------+---------------
|
||||
t | f
|
||||
(1 row)
|
||||
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
|
||||
ts_in_range | valid_roident
|
||||
-------------+---------------
|
||||
t | f
|
||||
(1 row)
|
||||
|
||||
-- Test transaction with replication origin
|
||||
SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
|
||||
AS valid_roident;
|
||||
valid_roident
|
||||
---------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
|
||||
pg_replication_origin_session_setup
|
||||
-------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT txid_current() as txid_with_origin \gset
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
|
||||
FROM pg_last_committed_xact() x, pg_replication_origin r
|
||||
WHERE r.roident = x.roident;
|
||||
ts_in_range | roname
|
||||
-------------+----------------------------
|
||||
t | test_commit_ts: get_origin
|
||||
(1 row)
|
||||
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
|
||||
FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
|
||||
WHERE r.roident = x.roident;
|
||||
ts_in_range | roname
|
||||
-------------+----------------------------
|
||||
t | test_commit_ts: get_origin
|
||||
(1 row)
|
||||
|
||||
SELECT pg_replication_origin_session_reset();
|
||||
pg_replication_origin_session_reset
|
||||
-------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
|
||||
pg_replication_origin_drop
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
|
@ -34,6 +34,79 @@ SELECT pg_xact_commit_timestamp('2'::xid);
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
|
||||
SELECT x.xid::text::bigint > 0 as xid_valid,
|
||||
x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_last_committed_xact() x;
|
||||
ERROR: could not get commit timestamp data
|
||||
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
|
||||
-- Test non-normal transaction ids.
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
|
||||
timestamp | roident
|
||||
-----------+---------
|
||||
|
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
|
||||
ERROR: cannot retrieve commit timestamp for transaction 0
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
|
||||
timestamp | roident
|
||||
-----------+---------
|
||||
|
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
|
||||
timestamp | roident
|
||||
-----------+---------
|
||||
|
|
||||
(1 row)
|
||||
|
||||
-- Test transaction without replication origin
|
||||
SELECT txid_current() as txid_no_origin \gset
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_last_committed_xact() x;
|
||||
ERROR: could not get commit timestamp data
|
||||
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
|
||||
ERROR: could not get commit timestamp data
|
||||
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
|
||||
-- Test transaction with replication origin
|
||||
SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
|
||||
AS valid_roident;
|
||||
valid_roident
|
||||
---------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
|
||||
pg_replication_origin_session_setup
|
||||
-------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT txid_current() as txid_with_origin \gset
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
|
||||
FROM pg_last_committed_xact() x, pg_replication_origin r
|
||||
WHERE r.roident = x.roident;
|
||||
ERROR: could not get commit timestamp data
|
||||
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
|
||||
FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
|
||||
WHERE r.roident = x.roident;
|
||||
ERROR: could not get commit timestamp data
|
||||
HINT: Make sure the configuration parameter "track_commit_timestamp" is set.
|
||||
SELECT pg_replication_origin_session_reset();
|
||||
pg_replication_origin_session_reset
|
||||
-------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
|
||||
pg_replication_origin_drop
|
||||
----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
|
@ -21,4 +21,37 @@ SELECT pg_xact_commit_timestamp('0'::xid);
|
||||
SELECT pg_xact_commit_timestamp('1'::xid);
|
||||
SELECT pg_xact_commit_timestamp('2'::xid);
|
||||
|
||||
SELECT x.xid::text::bigint > 0, x.timestamp > '-infinity'::timestamptz, x.timestamp <= now() FROM pg_last_committed_xact() x;
|
||||
SELECT x.xid::text::bigint > 0 as xid_valid,
|
||||
x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_last_committed_xact() x;
|
||||
|
||||
-- Test non-normal transaction ids.
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin(NULL); -- ok, NULL
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('0'::xid); -- error
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('1'::xid); -- ok, NULL
|
||||
SELECT * FROM pg_xact_commit_timestamp_origin('2'::xid); -- ok, NULL
|
||||
|
||||
-- Test transaction without replication origin
|
||||
SELECT txid_current() as txid_no_origin \gset
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_last_committed_xact() x;
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range,
|
||||
roident != 0 AS valid_roident
|
||||
FROM pg_xact_commit_timestamp_origin(:'txid_no_origin') x;
|
||||
|
||||
-- Test transaction with replication origin
|
||||
SELECT pg_replication_origin_create('test_commit_ts: get_origin') != 0
|
||||
AS valid_roident;
|
||||
SELECT pg_replication_origin_session_setup('test_commit_ts: get_origin');
|
||||
SELECT txid_current() as txid_with_origin \gset
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
|
||||
FROM pg_last_committed_xact() x, pg_replication_origin r
|
||||
WHERE r.roident = x.roident;
|
||||
SELECT x.timestamp <@ tstzrange('-infinity'::timestamptz, now()) AS ts_in_range, r.roname
|
||||
FROM pg_xact_commit_timestamp_origin(:'txid_with_origin') x, pg_replication_origin r
|
||||
WHERE r.roident = x.roident;
|
||||
|
||||
SELECT pg_replication_origin_session_reset();
|
||||
SELECT pg_replication_origin_drop('test_commit_ts: get_origin');
|
||||
|
Loading…
x
Reference in New Issue
Block a user