WAL-log inplace update before revealing it to other sessions.

A buffer lock won't stop a reader having already checked tuple
visibility.  If a vac_update_datfrozenid() and then a crash happened
during inplace update of a relfrozenxid value, datfrozenxid could
overtake relfrozenxid.  That could lead to "could not access status of
transaction" errors.  Back-patch to v12 (all supported versions).  In
v14 and earlier, this also back-patches the assertion removal from
commit 7fcf2faf9c7dd473208fd6d5565f88d7f733782b.

Discussion: https://postgr.es/m/20240620012908.92.nmisch@google.com
This commit is contained in:
Noah Misch 2024-10-25 06:51:03 -07:00
parent 4eac5a1fa7
commit 431e05181e
2 changed files with 46 additions and 16 deletions

View File

@ -203,6 +203,4 @@ Inplace updates create an exception to the rule that tuple data won't change
under a reader holding a pin. A reader of a heap_fetch() result tuple may under a reader holding a pin. A reader of a heap_fetch() result tuple may
witness a torn read. Current inplace-updated fields are aligned and are no witness a torn read. Current inplace-updated fields are aligned and are no
wider than four bytes, and current readers don't need consistency across wider than four bytes, and current readers don't need consistency across
fields. Hence, they get by with just fetching each field once. XXX such a fields. Hence, they get by with just fetching each field once.
caller may also read a value that has not reached WAL; see
systable_inplace_update_finish().

View File

@ -6295,6 +6295,8 @@ heap_inplace_update_and_unlock(Relation relation,
HeapTupleHeader htup = oldtup->t_data; HeapTupleHeader htup = oldtup->t_data;
uint32 oldlen; uint32 oldlen;
uint32 newlen; uint32 newlen;
char *dst;
char *src;
Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self)); Assert(ItemPointerEquals(&oldtup->t_self, &tuple->t_self));
oldlen = oldtup->t_len - htup->t_hoff; oldlen = oldtup->t_len - htup->t_hoff;
@ -6302,6 +6304,9 @@ heap_inplace_update_and_unlock(Relation relation,
if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff) if (oldlen != newlen || htup->t_hoff != tuple->t_data->t_hoff)
elog(ERROR, "wrong tuple length"); elog(ERROR, "wrong tuple length");
dst = (char *) htup + htup->t_hoff;
src = (char *) tuple->t_data + tuple->t_data->t_hoff;
/* /*
* Construct shared cache inval if necessary. Note that because we only * Construct shared cache inval if necessary. Note that because we only
* pass the new version of the tuple, this mustn't be used for any * pass the new version of the tuple, this mustn't be used for any
@ -6320,15 +6325,15 @@ heap_inplace_update_and_unlock(Relation relation,
*/ */
PreInplace_Inval(); PreInplace_Inval();
/* NO EREPORT(ERROR) from here till changes are logged */
START_CRIT_SECTION();
memcpy((char *) htup + htup->t_hoff,
(char *) tuple->t_data + tuple->t_data->t_hoff,
newlen);
/*---------- /*----------
* XXX A crash here can allow datfrozenxid() to get ahead of relfrozenxid: * NO EREPORT(ERROR) from here till changes are complete
*
* Our buffer lock won't stop a reader having already pinned and checked
* visibility for this tuple. Hence, we write WAL first, then mutate the
* buffer. Like in MarkBufferDirtyHint() or RecordTransactionCommit(),
* checkpoint delay makes that acceptable. With the usual order of
* changes, a crash after memcpy() and before XLogInsert() could allow
* datfrozenxid to overtake relfrozenxid:
* *
* ["D" is a VACUUM (ONLY_DATABASE_STATS)] * ["D" is a VACUUM (ONLY_DATABASE_STATS)]
* ["R" is a VACUUM tbl] * ["R" is a VACUUM tbl]
@ -6338,14 +6343,28 @@ heap_inplace_update_and_unlock(Relation relation,
* D: raise pg_database.datfrozenxid, XLogInsert(), finish * D: raise pg_database.datfrozenxid, XLogInsert(), finish
* [crash] * [crash]
* [recovery restores datfrozenxid w/o relfrozenxid] * [recovery restores datfrozenxid w/o relfrozenxid]
*
* Like in MarkBufferDirtyHint() subroutine XLogSaveBufferForHint(), copy
* the buffer to the stack before logging. Here, that facilitates a FPI
* of the post-mutation block before we accept other sessions seeing it.
*/ */
Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
MarkBufferDirty(buffer); START_CRIT_SECTION();
MyProc->delayChkptFlags |= DELAY_CHKPT_START;
/* XLOG stuff */ /* XLOG stuff */
if (RelationNeedsWAL(relation)) if (RelationNeedsWAL(relation))
{ {
xl_heap_inplace xlrec; xl_heap_inplace xlrec;
PGAlignedBlock copied_buffer;
char *origdata = (char *) BufferGetBlock(buffer);
Page page = BufferGetPage(buffer);
uint16 lower = ((PageHeader) page)->pd_lower;
uint16 upper = ((PageHeader) page)->pd_upper;
uintptr_t dst_offset_in_block;
RelFileNode rnode;
ForkNumber forkno;
BlockNumber blkno;
XLogRecPtr recptr; XLogRecPtr recptr;
xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self);
@ -6353,16 +6372,28 @@ heap_inplace_update_and_unlock(Relation relation,
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHeapInplace); XLogRegisterData((char *) &xlrec, SizeOfHeapInplace);
XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); /* register block matching what buffer will look like after changes */
XLogRegisterBufData(0, (char *) htup + htup->t_hoff, newlen); memcpy(copied_buffer.data, origdata, lower);
memcpy(copied_buffer.data + upper, origdata + upper, BLCKSZ - upper);
dst_offset_in_block = dst - origdata;
memcpy(copied_buffer.data + dst_offset_in_block, src, newlen);
BufferGetTag(buffer, &rnode, &forkno, &blkno);
Assert(forkno == MAIN_FORKNUM);
XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer.data,
REGBUF_STANDARD);
XLogRegisterBufData(0, src, newlen);
/* inplace updates aren't decoded atm, don't log the origin */ /* inplace updates aren't decoded atm, don't log the origin */
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE);
PageSetLSN(BufferGetPage(buffer), recptr); PageSetLSN(page, recptr);
} }
memcpy(dst, src, newlen);
MarkBufferDirty(buffer);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK); LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
/* /*
@ -6375,6 +6406,7 @@ heap_inplace_update_and_unlock(Relation relation,
*/ */
AtInplace_Inval(); AtInplace_Inval();
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
END_CRIT_SECTION(); END_CRIT_SECTION();
UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock); UnlockTuple(relation, &tuple->t_self, InplaceUpdateTupleLock);