diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index e5eef9ea43..b3e2fc3036 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -102,6 +102,14 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars parsed->twophase_xid = xl_twophase->xid; data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + int gidlen; + strcpy(parsed->twophase_gid, data); + gidlen = strlen(parsed->twophase_gid) + 1; + data += MAXALIGN(gidlen); + } } if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) @@ -139,6 +147,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) data += sizeof(xl_xact_xinfo); } + if (parsed->xinfo & XACT_XINFO_HAS_DBINFO) + { + xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data; + + parsed->dbId = xl_dbinfo->dbId; + parsed->tsId = xl_dbinfo->tsId; + + data += sizeof(xl_xact_dbinfo); + } + if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS) { xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data; @@ -168,6 +186,27 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed) parsed->twophase_xid = xl_twophase->xid; data += sizeof(xl_xact_twophase); + + if (parsed->xinfo & XACT_XINFO_HAS_GID) + { + int gidlen; + strcpy(parsed->twophase_gid, data); + gidlen = strlen(parsed->twophase_gid) + 1; + data += MAXALIGN(gidlen); + } + } + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + xl_xact_origin xl_origin; + + /* we're only guaranteed 4 byte alignment, so copy onto stack */ + memcpy(&xl_origin, data, sizeof(xl_origin)); + + parsed->origin_lsn = xl_origin.origin_lsn; + parsed->origin_timestamp = xl_origin.origin_timestamp; + + data += sizeof(xl_xact_origin); } } diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c479c4881b..d6e4b7980f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -144,11 +144,7 @@ int max_prepared_xacts = 0; * * typedef struct GlobalTransactionData *GlobalTransaction appears in * twophase.h - * - * Note that the max value of GIDSIZE must fit in the uint16 gidlen, - * specified in TwoPhaseFileHeader. */ -#define GIDSIZE 200 typedef struct GlobalTransactionData { @@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval); + bool initfileinval, + const char *gid); static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels); + RelFileNode *rels, + const char *gid); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); @@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid) /* * Header for a 2PC state file */ -#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */ +#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */ typedef struct TwoPhaseFileHeader { @@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader int32 ninvalmsgs; /* number of cache invalidation messages */ bool initfileinval; /* does relcache init file need invalidation? */ uint16 gidlen; /* length of the GID - GID follows the header */ + XLogRecPtr origin_lsn; /* lsn of this record at origin node */ + TimestampTz origin_timestamp; /* time of prepare at origin node */ } TwoPhaseFileHeader; /* @@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact) { TwoPhaseFileHeader *hdr; StateFileChunk *record; + bool replorigin; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); + replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId); + + if (replorigin) + { + Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr); + hdr->origin_lsn = replorigin_session_origin_lsn; + hdr->origin_timestamp = replorigin_session_origin_timestamp; + } + else + { + hdr->origin_lsn = InvalidXLogRecPtr; + hdr->origin_timestamp = 0; + } + /* * If the data size exceeds MaxAllocSize, we won't be able to read it in * ReadTwoPhaseFile. Check for that now, rather than fail in the case @@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact) XLogBeginInsert(); for (record = records.head; record != NULL; record = record->next) XLogRegisterData(record->data, record->len); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); + + if (replorigin) + /* Move LSNs forward for this replication origin */ + replorigin_session_advance(replorigin_session_origin_lsn, + gxact->prepare_end_lsn); + XLogFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } +/* + * ParsePrepareRecord + */ +void +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) +{ + TwoPhaseFileHeader *hdr; + char *bufptr; + + hdr = (TwoPhaseFileHeader *) xlrec; + bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader)); + + parsed->origin_lsn = hdr->origin_lsn; + parsed->origin_timestamp = hdr->origin_timestamp; + parsed->twophase_xid = hdr->xid; + parsed->dbId = hdr->database; + parsed->nsubxacts = hdr->nsubxacts; + parsed->nrels = hdr->ncommitrels; + parsed->nabortrels = hdr->nabortrels; + parsed->nmsgs = hdr->ninvalmsgs; + + strncpy(parsed->twophase_gid, bufptr, hdr->gidlen); + bufptr += MAXALIGN(hdr->gidlen); + + parsed->subxacts = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + + parsed->xnodes = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + + parsed->abortnodes = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + parsed->msgs = (SharedInvalidationMessage *) bufptr; + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); +} + + /* * Reads 2PC data from xlog. During checkpoint this data will be moved to @@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, - hdr->initfileinval); + hdr->initfileinval, gid); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, - hdr->nabortrels, abortrels); + hdr->nabortrels, abortrels, + gid); ProcArrayRemove(proc, latestXid); @@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void) if (buf == NULL) continue; - PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); + PrepareRedoAdd(buf, InvalidXLogRecPtr, + InvalidXLogRecPtr, InvalidRepOriginId); } } LWLockRelease(TwoPhaseStateLock); @@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval) + bool initfileinval, + const char *gid) { XLogRecPtr recptr; TimestampTz committs = GetCurrentTimestamp(); @@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ninvalmsgs, invalmsgs, initfileinval, false, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); if (replorigin) @@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels) + RelFileNode *rels, + const char *gid) { XLogRecPtr recptr; @@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid, nchildren, children, nrels, rels, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); @@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid, * data, the entry is marked as located on disk. */ void -PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, + XLogRecPtr end_lsn, RepOriginId origin_id) { TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; char *bufptr; @@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + if (origin_id != InvalidRepOriginId) + { + /* recover apply progress */ + replorigin_advance(origin_id, hdr->origin_lsn, end_lsn, + false /* backward */ , false /* WAL */ ); + } + elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); } diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index cfc62011b5..b88d4ccf74 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1226,7 +1226,7 @@ RecordTransactionCommit(void) nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, MyXactFlags, - InvalidTransactionId /* plain commit */ ); + InvalidTransactionId, NULL /* plain commit */ ); if (replorigin) /* Move LSNs forward for this replication origin */ @@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact) XactLogAbortRecord(xact_time, nchildren, children, nrels, rels, - MyXactFlags, InvalidTransactionId); + MyXactFlags, InvalidTransactionId, + NULL); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -5234,7 +5235,8 @@ XactLogCommitRecord(TimestampTz commit_time, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_commit xlrec; xl_xact_xinfo xl_xinfo; @@ -5246,6 +5248,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5308,6 +5311,13 @@ XactLogCommitRecord(TimestampTz commit_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } } /* dump transaction origin information */ @@ -5358,7 +5368,16 @@ XactLogCommitRecord(TimestampTz commit_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); @@ -5379,15 +5398,19 @@ XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_abort xlrec; xl_xact_xinfo xl_xinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; xl_xact_twophase xl_twophase; + xl_xact_dbinfo xl_dbinfo; + xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5423,6 +5446,31 @@ XactLogAbortRecord(TimestampTz abort_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } + } + + if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; + } + + /* dump transaction origin information only for abort prepared */ + if ( (replorigin_session_origin != InvalidRepOriginId) && + TransactionIdIsValid(twophase_xid) && + XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; + + xl_origin.origin_lsn = replorigin_session_origin_lsn; + xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } if (xl_xinfo.xinfo != 0) @@ -5437,6 +5485,10 @@ XactLogAbortRecord(TimestampTz abort_time, if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) { XLogRegisterData((char *) (&xl_subxacts), @@ -5454,7 +5506,22 @@ XactLogAbortRecord(TimestampTz abort_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) + XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + + if (TransactionIdIsValid(twophase_xid)) + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); return XLogInsert(RM_XACT_ID, info); } @@ -5777,7 +5844,8 @@ xact_redo(XLogReaderState *record) LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, - record->EndRecPtr); + record->EndRecPtr, + XLogRecGetOrigin(record)); LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 34d9470811..f05cde202f 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -15,6 +15,7 @@ #define TWOPHASE_H #include "access/xlogdefs.h" +#include "access/xact.h" #include "datatype/timestamp.h" #include "storage/lock.h" @@ -46,6 +47,8 @@ extern bool StandbyTransactionIdIsPrepared(TransactionId xid); extern TransactionId PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p); +extern void ParsePrepareRecord(uint8 info, char *xlrec, + xl_xact_parsed_prepare *parsed); extern void StandbyRecoverPreparedTransactions(void); extern void RecoverPreparedTransactions(void); @@ -54,7 +57,7 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon); extern void FinishPreparedTransaction(const char *gid, bool isCommit); extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, - XLogRecPtr end_lsn); + XLogRecPtr end_lsn, RepOriginId origin_id); extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); #endif /* TWOPHASE_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 87ae2cd4df..a46396f2d9 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -21,6 +21,13 @@ #include "storage/sinval.h" #include "utils/datetime.h" +/* + * Maximum size of Global Transaction ID (including '\0'). + * + * Note that the max value of GIDSIZE must fit in the uint16 gidlen, + * specified in TwoPhaseFileHeader. + */ +#define GIDSIZE 200 /* * Xact isolation levels @@ -156,6 +163,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XACT_XINFO_HAS_TWOPHASE (1U << 4) #define XACT_XINFO_HAS_ORIGIN (1U << 5) #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) +#define XACT_XINFO_HAS_GID (1U << 7) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -286,7 +294,6 @@ typedef struct xl_xact_abort typedef struct xl_xact_parsed_commit { TimestampTz xact_time; - uint32 xinfo; Oid dbId; /* MyDatabaseId */ @@ -302,16 +309,24 @@ typedef struct xl_xact_parsed_commit SharedInvalidationMessage *msgs; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; /* only for 2PC */ + int nabortrels; /* only for 2PC */ + RelFileNode *abortnodes; /* only for 2PC */ XLogRecPtr origin_lsn; TimestampTz origin_timestamp; } xl_xact_parsed_commit; +typedef xl_xact_parsed_commit xl_xact_parsed_prepare; + typedef struct xl_xact_parsed_abort { TimestampTz xact_time; uint32 xinfo; + Oid dbId; /* MyDatabaseId */ + Oid tsId; /* MyDatabaseTableSpace */ + int nsubxacts; TransactionId *subxacts; @@ -319,6 +334,10 @@ typedef struct xl_xact_parsed_abort RelFileNode *xnodes; TransactionId twophase_xid; /* only for 2PC */ + char twophase_gid[GIDSIZE]; /* only for 2PC */ + + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; } xl_xact_parsed_abort; @@ -386,12 +405,14 @@ extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, int xactflags, - TransactionId twophase_xid); + TransactionId twophase_xid, + const char *twophase_gid); extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid); + int xactflags, TransactionId twophase_xid, + const char *twophase_gid); extern void xact_redo(XLogReaderState *record); /* xactdesc.c */