From 00c76cf21c42c17e60e73a87dea0d1b4e234d9da Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Thu, 12 Sep 2024 13:32:05 +0900 Subject: [PATCH] Move logic related to WAL replay of Heap/Heap2 into its own file This brings more clarity to heapam.c, by cleanly separating all the logic related to WAL replay and the rest of Heap and Heap2, similarly to other RMGRs like hash, btree, etc. The header reorganization is also nice in heapam.c, cutting half of the headers required. Author: Li Yong Reviewed-by: Sutou Kouhei, Michael Paquier Discussion: https://postgr.es/m/EFE55E65-D7BD-4C6A-B630-91F43FD0771B@ebay.com --- src/backend/access/heap/Makefile | 1 + src/backend/access/heap/heapam.c | 1339 ------------------------- src/backend/access/heap/heapam_xlog.c | 1339 +++++++++++++++++++++++++ src/backend/access/heap/meson.build | 1 + src/include/access/heapam.h | 25 + 5 files changed, 1366 insertions(+), 1339 deletions(-) create mode 100644 src/backend/access/heap/heapam_xlog.c diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile index af0bd1888e..394534172f 100644 --- a/src/backend/access/heap/Makefile +++ b/src/backend/access/heap/Makefile @@ -16,6 +16,7 @@ OBJS = \ heapam.o \ heapam_handler.o \ heapam_visibility.o \ + heapam_xlog.o \ heaptoast.o \ hio.o \ pruneheap.o \ diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 91b20147a0..f167107257 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -31,42 +31,24 @@ */ #include "postgres.h" -#include "access/bufmask.h" #include "access/heapam.h" -#include "access/heapam_xlog.h" #include "access/heaptoast.h" #include "access/hio.h" #include "access/multixact.h" -#include "access/parallel.h" -#include "access/relscan.h" #include "access/subtrans.h" #include "access/syncscan.h" -#include "access/sysattr.h" -#include "access/tableam.h" -#include "access/transam.h" #include "access/valid.h" #include "access/visibilitymap.h" -#include "access/xact.h" -#include "access/xlog.h" #include "access/xloginsert.h" -#include "access/xlogutils.h" -#include "catalog/catalog.h" #include "commands/vacuum.h" -#include "miscadmin.h" #include "pgstat.h" -#include "port/atomics.h" #include "port/pg_bitutils.h" -#include "storage/bufmgr.h" -#include "storage/freespace.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "storage/procarray.h" -#include "storage/standby.h" #include "utils/datum.h" #include "utils/injection_point.h" #include "utils/inval.h" -#include "utils/relcache.h" -#include "utils/snapmgr.h" #include "utils/spccache.h" @@ -6811,30 +6793,6 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple, return freeze_xmin || replace_xvac || replace_xmax || freeze_xmax; } -/* - * heap_execute_freeze_tuple - * Execute the prepared freezing of a tuple with caller's freeze plan. - * - * Caller is responsible for ensuring that no other backend can access the - * storage underlying this tuple, either by holding an exclusive lock on the - * buffer containing it (which is what lazy VACUUM does), or by having it be - * in private storage (which is what CLUSTER and friends do). - */ -static inline void -heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz) -{ - HeapTupleHeaderSetXmax(tuple, frz->xmax); - - if (frz->frzflags & XLH_FREEZE_XVAC) - HeapTupleHeaderSetXvac(tuple, FrozenTransactionId); - - if (frz->frzflags & XLH_INVALID_XVAC) - HeapTupleHeaderSetXvac(tuple, InvalidTransactionId); - - tuple->t_infomask = frz->t_infomask; - tuple->t_infomask2 = frz->t_infomask2; -} - /* * Perform xmin/xmax XID status sanity checks before actually executing freeze * plans. @@ -8745,1303 +8703,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, return key_tuple; } -/* - * Replay XLOG_HEAP2_PRUNE_* records. - */ -static void -heap_xlog_prune_freeze(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - char *maindataptr = XLogRecGetData(record); - xl_heap_prune xlrec; - Buffer buffer; - RelFileLocator rlocator; - BlockNumber blkno; - XLogRedoAction action; - - XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); - memcpy(&xlrec, maindataptr, SizeOfHeapPrune); - maindataptr += SizeOfHeapPrune; - - /* - * We will take an ordinary exclusive lock or a cleanup lock depending on - * whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive - * lock, we better not be doing anything that requires moving existing - * tuple data. - */ - Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 || - (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0); - - /* - * We are about to remove and/or freeze tuples. In Hot Standby mode, - * ensure that there are no queries running for which the removed tuples - * are still visible or which still consider the frozen xids as running. - * The conflict horizon XID comes after xl_heap_prune. - */ - if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0) - { - TransactionId snapshot_conflict_horizon; - - /* memcpy() because snapshot_conflict_horizon is stored unaligned */ - memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId)); - maindataptr += sizeof(TransactionId); - - if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon, - (xlrec.flags & XLHP_IS_CATALOG_REL) != 0, - rlocator); - } - - /* - * If we have a full-page image, restore it and we're done. - */ - action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, - (xlrec.flags & XLHP_CLEANUP_LOCK) != 0, - &buffer); - if (action == BLK_NEEDS_REDO) - { - Page page = (Page) BufferGetPage(buffer); - OffsetNumber *redirected; - OffsetNumber *nowdead; - OffsetNumber *nowunused; - int nredirected; - int ndead; - int nunused; - int nplans; - Size datalen; - xlhp_freeze_plan *plans; - OffsetNumber *frz_offsets; - char *dataptr = XLogRecGetBlockData(record, 0, &datalen); - - heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags, - &nplans, &plans, &frz_offsets, - &nredirected, &redirected, - &ndead, &nowdead, - &nunused, &nowunused); - - /* - * Update all line pointers per the record, and repair fragmentation - * if needed. - */ - if (nredirected > 0 || ndead > 0 || nunused > 0) - heap_page_prune_execute(buffer, - (xlrec.flags & XLHP_CLEANUP_LOCK) == 0, - redirected, nredirected, - nowdead, ndead, - nowunused, nunused); - - /* Freeze tuples */ - for (int p = 0; p < nplans; p++) - { - HeapTupleFreeze frz; - - /* - * Convert freeze plan representation from WAL record into - * per-tuple format used by heap_execute_freeze_tuple - */ - frz.xmax = plans[p].xmax; - frz.t_infomask2 = plans[p].t_infomask2; - frz.t_infomask = plans[p].t_infomask; - frz.frzflags = plans[p].frzflags; - frz.offset = InvalidOffsetNumber; /* unused, but be tidy */ - - for (int i = 0; i < plans[p].ntuples; i++) - { - OffsetNumber offset = *(frz_offsets++); - ItemId lp; - HeapTupleHeader tuple; - - lp = PageGetItemId(page, offset); - tuple = (HeapTupleHeader) PageGetItem(page, lp); - heap_execute_freeze_tuple(tuple, &frz); - } - } - - /* There should be no more data */ - Assert((char *) frz_offsets == dataptr + datalen); - - /* - * Note: we don't worry about updating the page's prunability hints. - * At worst this will cause an extra prune cycle to occur soon. - */ - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - - /* - * If we released any space or line pointers, update the free space map. - * - * Do this regardless of a full-page image being applied, since the FSM - * data is not in the page anyway. - */ - if (BufferIsValid(buffer)) - { - if (xlrec.flags & (XLHP_HAS_REDIRECTIONS | - XLHP_HAS_DEAD_ITEMS | - XLHP_HAS_NOW_UNUSED_ITEMS)) - { - Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer)); - - UnlockReleaseBuffer(buffer); - - XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); - } - else - UnlockReleaseBuffer(buffer); - } -} - -/* - * Replay XLOG_HEAP2_VISIBLE record. - * - * The critical integrity requirement here is that we must never end up with - * a situation where the visibility map bit is set, and the page-level - * PD_ALL_VISIBLE bit is clear. If that were to occur, then a subsequent - * page modification would fail to clear the visibility map bit. - */ -static void -heap_xlog_visible(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record); - Buffer vmbuffer = InvalidBuffer; - Buffer buffer; - Page page; - RelFileLocator rlocator; - BlockNumber blkno; - XLogRedoAction action; - - Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags); - - XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno); - - /* - * If there are any Hot Standby transactions running that have an xmin - * horizon old enough that this page isn't all-visible for them, they - * might incorrectly decide that an index-only scan can skip a heap fetch. - * - * NB: It might be better to throw some kind of "soft" conflict here that - * forces any index-only scan that is in flight to perform heap fetches, - * rather than killing the transaction outright. - */ - if (InHotStandby) - ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, - xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL, - rlocator); - - /* - * Read the heap page, if it still exists. If the heap file has dropped or - * truncated later in recovery, we don't need to update the page, but we'd - * better still update the visibility map. - */ - action = XLogReadBufferForRedo(record, 1, &buffer); - if (action == BLK_NEEDS_REDO) - { - /* - * We don't bump the LSN of the heap page when setting the visibility - * map bit (unless checksums or wal_hint_bits is enabled, in which - * case we must). This exposes us to torn page hazards, but since - * we're not inspecting the existing page contents in any way, we - * don't care. - */ - page = BufferGetPage(buffer); - - PageSetAllVisible(page); - - if (XLogHintBitIsNeeded()) - PageSetLSN(page, lsn); - - MarkBufferDirty(buffer); - } - else if (action == BLK_RESTORED) - { - /* - * If heap block was backed up, we already restored it and there's - * nothing more to do. (This can only happen with checksums or - * wal_log_hints enabled.) - */ - } - - if (BufferIsValid(buffer)) - { - Size space = PageGetFreeSpace(BufferGetPage(buffer)); - - UnlockReleaseBuffer(buffer); - - /* - * Since FSM is not WAL-logged and only updated heuristically, it - * easily becomes stale in standbys. If the standby is later promoted - * and runs VACUUM, it will skip updating individual free space - * figures for pages that became all-visible (or all-frozen, depending - * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum - * propagates too optimistic free space values to upper FSM layers; - * later inserters try to use such pages only to find out that they - * are unusable. This can cause long stalls when there are many such - * pages. - * - * Forestall those problems by updating FSM's idea about a page that - * is becoming all-visible or all-frozen. - * - * Do this regardless of a full-page image being applied, since the - * FSM data is not in the page anyway. - */ - if (xlrec->flags & VISIBILITYMAP_VALID_BITS) - XLogRecordPageWithFreeSpace(rlocator, blkno, space); - } - - /* - * Even if we skipped the heap page update due to the LSN interlock, it's - * still safe to update the visibility map. Any WAL record that clears - * the visibility map bit does so before checking the page LSN, so any - * bits that need to be cleared will still be cleared. - */ - if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false, - &vmbuffer) == BLK_NEEDS_REDO) - { - Page vmpage = BufferGetPage(vmbuffer); - Relation reln; - uint8 vmbits; - - /* initialize the page if it was read as zeros */ - if (PageIsNew(vmpage)) - PageInit(vmpage, BLCKSZ, 0); - - /* remove VISIBILITYMAP_XLOG_* */ - vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS; - - /* - * XLogReadBufferForRedoExtended locked the buffer. But - * visibilitymap_set will handle locking itself. - */ - LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK); - - reln = CreateFakeRelcacheEntry(rlocator); - visibilitymap_pin(reln, blkno, &vmbuffer); - - visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer, - xlrec->snapshotConflictHorizon, vmbits); - - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - else if (BufferIsValid(vmbuffer)) - UnlockReleaseBuffer(vmbuffer); -} - -/* - * Given an "infobits" field from an XLog record, set the correct bits in the - * given infomask and infomask2 for the tuple touched by the record. - * - * (This is the reverse of compute_infobits). - */ -static void -fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2) -{ - *infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY | - HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK); - *infomask2 &= ~HEAP_KEYS_UPDATED; - - if (infobits & XLHL_XMAX_IS_MULTI) - *infomask |= HEAP_XMAX_IS_MULTI; - if (infobits & XLHL_XMAX_LOCK_ONLY) - *infomask |= HEAP_XMAX_LOCK_ONLY; - if (infobits & XLHL_XMAX_EXCL_LOCK) - *infomask |= HEAP_XMAX_EXCL_LOCK; - /* note HEAP_XMAX_SHR_LOCK isn't considered here */ - if (infobits & XLHL_XMAX_KEYSHR_LOCK) - *infomask |= HEAP_XMAX_KEYSHR_LOCK; - - if (infobits & XLHL_KEYS_UPDATED) - *infomask2 |= HEAP_KEYS_UPDATED; -} - -static void -heap_xlog_delete(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record); - Buffer buffer; - Page page; - ItemId lp = NULL; - HeapTupleHeader htup; - BlockNumber blkno; - RelFileLocator target_locator; - ItemPointerData target_tid; - - XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); - ItemPointerSetBlockNumber(&target_tid, blkno); - ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); - - /* - * The visibility map may need to be fixed even if the heap page is - * already up-to-date. - */ - if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) - { - Relation reln = CreateFakeRelcacheEntry(target_locator); - Buffer vmbuffer = InvalidBuffer; - - visibilitymap_pin(reln, blkno, &vmbuffer); - visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - - if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) - { - page = BufferGetPage(buffer); - - if (PageGetMaxOffsetNumber(page) >= xlrec->offnum) - lp = PageGetItemId(page, xlrec->offnum); - - if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "invalid lp"); - - htup = (HeapTupleHeader) PageGetItem(page, lp); - - htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); - htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; - HeapTupleHeaderClearHotUpdated(htup); - fix_infomask_from_infobits(xlrec->infobits_set, - &htup->t_infomask, &htup->t_infomask2); - if (!(xlrec->flags & XLH_DELETE_IS_SUPER)) - HeapTupleHeaderSetXmax(htup, xlrec->xmax); - else - HeapTupleHeaderSetXmin(htup, InvalidTransactionId); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); - - /* Mark the page as a candidate for pruning */ - PageSetPrunable(page, XLogRecGetXid(record)); - - if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); - - /* Make sure t_ctid is set correctly */ - if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE) - HeapTupleHeaderSetMovedPartitions(htup); - else - htup->t_ctid = target_tid; - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); -} - -static void -heap_xlog_insert(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record); - Buffer buffer; - Page page; - union - { - HeapTupleHeaderData hdr; - char data[MaxHeapTupleSize]; - } tbuf; - HeapTupleHeader htup; - xl_heap_header xlhdr; - uint32 newlen; - Size freespace = 0; - RelFileLocator target_locator; - BlockNumber blkno; - ItemPointerData target_tid; - XLogRedoAction action; - - XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); - ItemPointerSetBlockNumber(&target_tid, blkno); - ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); - - /* - * The visibility map may need to be fixed even if the heap page is - * already up-to-date. - */ - if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) - { - Relation reln = CreateFakeRelcacheEntry(target_locator); - Buffer vmbuffer = InvalidBuffer; - - visibilitymap_pin(reln, blkno, &vmbuffer); - visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - - /* - * If we inserted the first and only tuple on the page, re-initialize the - * page from scratch. - */ - if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) - { - buffer = XLogInitBufferForRedo(record, 0); - page = BufferGetPage(buffer); - PageInit(page, BufferGetPageSize(buffer), 0); - action = BLK_NEEDS_REDO; - } - else - action = XLogReadBufferForRedo(record, 0, &buffer); - if (action == BLK_NEEDS_REDO) - { - Size datalen; - char *data; - - page = BufferGetPage(buffer); - - if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum) - elog(PANIC, "invalid max offset number"); - - data = XLogRecGetBlockData(record, 0, &datalen); - - newlen = datalen - SizeOfHeapHeader; - Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize); - memcpy((char *) &xlhdr, data, SizeOfHeapHeader); - data += SizeOfHeapHeader; - - htup = &tbuf.hdr; - MemSet((char *) htup, 0, SizeofHeapTupleHeader); - /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ - memcpy((char *) htup + SizeofHeapTupleHeader, - data, - newlen); - newlen += SizeofHeapTupleHeader; - htup->t_infomask2 = xlhdr.t_infomask2; - htup->t_infomask = xlhdr.t_infomask; - htup->t_hoff = xlhdr.t_hoff; - HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); - htup->t_ctid = target_tid; - - if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum, - true, true) == InvalidOffsetNumber) - elog(PANIC, "failed to add tuple"); - - freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - - PageSetLSN(page, lsn); - - if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); - - /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */ - if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET) - PageSetAllVisible(page); - - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); - - /* - * If the page is running low on free space, update the FSM as well. - * Arbitrarily, our definition of "low" is less than 20%. We can't do much - * better than that without knowing the fill-factor for the table. - * - * XXX: Don't do this if the page was restored from full page image. We - * don't bother to update the FSM in that case, it doesn't need to be - * totally accurate anyway. - */ - if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) - XLogRecordPageWithFreeSpace(target_locator, blkno, freespace); -} - -/* - * Handles MULTI_INSERT record type. - */ -static void -heap_xlog_multi_insert(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_multi_insert *xlrec; - RelFileLocator rlocator; - BlockNumber blkno; - Buffer buffer; - Page page; - union - { - HeapTupleHeaderData hdr; - char data[MaxHeapTupleSize]; - } tbuf; - HeapTupleHeader htup; - uint32 newlen; - Size freespace = 0; - int i; - bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0; - XLogRedoAction action; - - /* - * Insertion doesn't overwrite MVCC data, so no conflict processing is - * required. - */ - xlrec = (xl_heap_multi_insert *) XLogRecGetData(record); - - XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); - - /* check that the mutually exclusive flags are not both set */ - Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) && - (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET))); - - /* - * The visibility map may need to be fixed even if the heap page is - * already up-to-date. - */ - if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) - { - Relation reln = CreateFakeRelcacheEntry(rlocator); - Buffer vmbuffer = InvalidBuffer; - - visibilitymap_pin(reln, blkno, &vmbuffer); - visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - - if (isinit) - { - buffer = XLogInitBufferForRedo(record, 0); - page = BufferGetPage(buffer); - PageInit(page, BufferGetPageSize(buffer), 0); - action = BLK_NEEDS_REDO; - } - else - action = XLogReadBufferForRedo(record, 0, &buffer); - if (action == BLK_NEEDS_REDO) - { - char *tupdata; - char *endptr; - Size len; - - /* Tuples are stored as block data */ - tupdata = XLogRecGetBlockData(record, 0, &len); - endptr = tupdata + len; - - page = (Page) BufferGetPage(buffer); - - for (i = 0; i < xlrec->ntuples; i++) - { - OffsetNumber offnum; - xl_multi_insert_tuple *xlhdr; - - /* - * If we're reinitializing the page, the tuples are stored in - * order from FirstOffsetNumber. Otherwise there's an array of - * offsets in the WAL record, and the tuples come after that. - */ - if (isinit) - offnum = FirstOffsetNumber + i; - else - offnum = xlrec->offsets[i]; - if (PageGetMaxOffsetNumber(page) + 1 < offnum) - elog(PANIC, "invalid max offset number"); - - xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata); - tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple; - - newlen = xlhdr->datalen; - Assert(newlen <= MaxHeapTupleSize); - htup = &tbuf.hdr; - MemSet((char *) htup, 0, SizeofHeapTupleHeader); - /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ - memcpy((char *) htup + SizeofHeapTupleHeader, - (char *) tupdata, - newlen); - tupdata += newlen; - - newlen += SizeofHeapTupleHeader; - htup->t_infomask2 = xlhdr->t_infomask2; - htup->t_infomask = xlhdr->t_infomask; - htup->t_hoff = xlhdr->t_hoff; - HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); - ItemPointerSetBlockNumber(&htup->t_ctid, blkno); - ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); - - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); - if (offnum == InvalidOffsetNumber) - elog(PANIC, "failed to add tuple"); - } - if (tupdata != endptr) - elog(PANIC, "total tuple length mismatch"); - - freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - - PageSetLSN(page, lsn); - - if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); - - /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */ - if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET) - PageSetAllVisible(page); - - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); - - /* - * If the page is running low on free space, update the FSM as well. - * Arbitrarily, our definition of "low" is less than 20%. We can't do much - * better than that without knowing the fill-factor for the table. - * - * XXX: Don't do this if the page was restored from full page image. We - * don't bother to update the FSM in that case, it doesn't need to be - * totally accurate anyway. - */ - if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) - XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); -} - -/* - * Handles UPDATE and HOT_UPDATE - */ -static void -heap_xlog_update(XLogReaderState *record, bool hot_update) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); - RelFileLocator rlocator; - BlockNumber oldblk; - BlockNumber newblk; - ItemPointerData newtid; - Buffer obuffer, - nbuffer; - Page page; - OffsetNumber offnum; - ItemId lp = NULL; - HeapTupleData oldtup; - HeapTupleHeader htup; - uint16 prefixlen = 0, - suffixlen = 0; - char *newp; - union - { - HeapTupleHeaderData hdr; - char data[MaxHeapTupleSize]; - } tbuf; - xl_heap_header xlhdr; - uint32 newlen; - Size freespace = 0; - XLogRedoAction oldaction; - XLogRedoAction newaction; - - /* initialize to keep the compiler quiet */ - oldtup.t_data = NULL; - oldtup.t_len = 0; - - XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk); - if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL)) - { - /* HOT updates are never done across pages */ - Assert(!hot_update); - } - else - oldblk = newblk; - - ItemPointerSet(&newtid, newblk, xlrec->new_offnum); - - /* - * The visibility map may need to be fixed even if the heap page is - * already up-to-date. - */ - if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) - { - Relation reln = CreateFakeRelcacheEntry(rlocator); - Buffer vmbuffer = InvalidBuffer; - - visibilitymap_pin(reln, oldblk, &vmbuffer); - visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS); - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - - /* - * In normal operation, it is important to lock the two pages in - * page-number order, to avoid possible deadlocks against other update - * operations going the other way. However, during WAL replay there can - * be no other update happening, so we don't need to worry about that. But - * we *do* need to worry that we don't expose an inconsistent state to Hot - * Standby queries --- so the original page can't be unlocked before we've - * added the new tuple to the new page. - */ - - /* Deal with old tuple version */ - oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1, - &obuffer); - if (oldaction == BLK_NEEDS_REDO) - { - page = BufferGetPage(obuffer); - offnum = xlrec->old_offnum; - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); - - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "invalid lp"); - - htup = (HeapTupleHeader) PageGetItem(page, lp); - - oldtup.t_data = htup; - oldtup.t_len = ItemIdGetLength(lp); - - htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); - htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; - if (hot_update) - HeapTupleHeaderSetHotUpdated(htup); - else - HeapTupleHeaderClearHotUpdated(htup); - fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask, - &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); - /* Set forward chain link in t_ctid */ - htup->t_ctid = newtid; - - /* Mark the page as a candidate for pruning */ - PageSetPrunable(page, XLogRecGetXid(record)); - - if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); - - PageSetLSN(page, lsn); - MarkBufferDirty(obuffer); - } - - /* - * Read the page the new tuple goes into, if different from old. - */ - if (oldblk == newblk) - { - nbuffer = obuffer; - newaction = oldaction; - } - else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) - { - nbuffer = XLogInitBufferForRedo(record, 0); - page = (Page) BufferGetPage(nbuffer); - PageInit(page, BufferGetPageSize(nbuffer), 0); - newaction = BLK_NEEDS_REDO; - } - else - newaction = XLogReadBufferForRedo(record, 0, &nbuffer); - - /* - * The visibility map may need to be fixed even if the heap page is - * already up-to-date. - */ - if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) - { - Relation reln = CreateFakeRelcacheEntry(rlocator); - Buffer vmbuffer = InvalidBuffer; - - visibilitymap_pin(reln, newblk, &vmbuffer); - visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS); - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - - /* Deal with new tuple */ - if (newaction == BLK_NEEDS_REDO) - { - char *recdata; - char *recdata_end; - Size datalen; - Size tuplen; - - recdata = XLogRecGetBlockData(record, 0, &datalen); - recdata_end = recdata + datalen; - - page = BufferGetPage(nbuffer); - - offnum = xlrec->new_offnum; - if (PageGetMaxOffsetNumber(page) + 1 < offnum) - elog(PANIC, "invalid max offset number"); - - if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD) - { - Assert(newblk == oldblk); - memcpy(&prefixlen, recdata, sizeof(uint16)); - recdata += sizeof(uint16); - } - if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD) - { - Assert(newblk == oldblk); - memcpy(&suffixlen, recdata, sizeof(uint16)); - recdata += sizeof(uint16); - } - - memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader); - recdata += SizeOfHeapHeader; - - tuplen = recdata_end - recdata; - Assert(tuplen <= MaxHeapTupleSize); - - htup = &tbuf.hdr; - MemSet((char *) htup, 0, SizeofHeapTupleHeader); - - /* - * Reconstruct the new tuple using the prefix and/or suffix from the - * old tuple, and the data stored in the WAL record. - */ - newp = (char *) htup + SizeofHeapTupleHeader; - if (prefixlen > 0) - { - int len; - - /* copy bitmap [+ padding] [+ oid] from WAL record */ - len = xlhdr.t_hoff - SizeofHeapTupleHeader; - memcpy(newp, recdata, len); - recdata += len; - newp += len; - - /* copy prefix from old tuple */ - memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen); - newp += prefixlen; - - /* copy new tuple data from WAL record */ - len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader); - memcpy(newp, recdata, len); - recdata += len; - newp += len; - } - else - { - /* - * copy bitmap [+ padding] [+ oid] + data from record, all in one - * go - */ - memcpy(newp, recdata, tuplen); - recdata += tuplen; - newp += tuplen; - } - Assert(recdata == recdata_end); - - /* copy suffix from old tuple */ - if (suffixlen > 0) - memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen); - - newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen; - htup->t_infomask2 = xlhdr.t_infomask2; - htup->t_infomask = xlhdr.t_infomask; - htup->t_hoff = xlhdr.t_hoff; - - HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); - HeapTupleHeaderSetCmin(htup, FirstCommandId); - HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); - /* Make sure there is no forward chain link in t_ctid */ - htup->t_ctid = newtid; - - offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); - if (offnum == InvalidOffsetNumber) - elog(PANIC, "failed to add tuple"); - - if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) - PageClearAllVisible(page); - - freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ - - PageSetLSN(page, lsn); - MarkBufferDirty(nbuffer); - } - - if (BufferIsValid(nbuffer) && nbuffer != obuffer) - UnlockReleaseBuffer(nbuffer); - if (BufferIsValid(obuffer)) - UnlockReleaseBuffer(obuffer); - - /* - * If the new page is running low on free space, update the FSM as well. - * Arbitrarily, our definition of "low" is less than 20%. We can't do much - * better than that without knowing the fill-factor for the table. - * - * However, don't update the FSM on HOT updates, because after crash - * recovery, either the old or the new tuple will certainly be dead and - * prunable. After pruning, the page will have roughly as much free space - * as it did before the update, assuming the new tuple is about the same - * size as the old one. - * - * XXX: Don't do this if the page was restored from full page image. We - * don't bother to update the FSM in that case, it doesn't need to be - * totally accurate anyway. - */ - if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5) - XLogRecordPageWithFreeSpace(rlocator, newblk, freespace); -} - -static void -heap_xlog_confirm(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record); - Buffer buffer; - Page page; - OffsetNumber offnum; - ItemId lp = NULL; - HeapTupleHeader htup; - - if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) - { - page = BufferGetPage(buffer); - - offnum = xlrec->offnum; - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); - - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "invalid lp"); - - htup = (HeapTupleHeader) PageGetItem(page, lp); - - /* - * Confirm tuple as actually inserted - */ - ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum); - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); -} - -static void -heap_xlog_lock(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); - Buffer buffer; - Page page; - OffsetNumber offnum; - ItemId lp = NULL; - HeapTupleHeader htup; - - /* - * The visibility map may need to be fixed even if the heap page is - * already up-to-date. - */ - if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED) - { - RelFileLocator rlocator; - Buffer vmbuffer = InvalidBuffer; - BlockNumber block; - Relation reln; - - XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block); - reln = CreateFakeRelcacheEntry(rlocator); - - visibilitymap_pin(reln, block, &vmbuffer); - visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN); - - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - - if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) - { - page = (Page) BufferGetPage(buffer); - - offnum = xlrec->offnum; - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); - - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "invalid lp"); - - htup = (HeapTupleHeader) PageGetItem(page, lp); - - htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); - htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; - fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, - &htup->t_infomask2); - - /* - * Clear relevant update flags, but only if the modified infomask says - * there's no update. - */ - if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask)) - { - HeapTupleHeaderClearHotUpdated(htup); - /* Make sure there is no forward chain link in t_ctid */ - ItemPointerSet(&htup->t_ctid, - BufferGetBlockNumber(buffer), - offnum); - } - HeapTupleHeaderSetXmax(htup, xlrec->xmax); - HeapTupleHeaderSetCmax(htup, FirstCommandId, false); - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); -} - -static void -heap_xlog_lock_updated(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_lock_updated *xlrec; - Buffer buffer; - Page page; - OffsetNumber offnum; - ItemId lp = NULL; - HeapTupleHeader htup; - - xlrec = (xl_heap_lock_updated *) XLogRecGetData(record); - - /* - * The visibility map may need to be fixed even if the heap page is - * already up-to-date. - */ - if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED) - { - RelFileLocator rlocator; - Buffer vmbuffer = InvalidBuffer; - BlockNumber block; - Relation reln; - - XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block); - reln = CreateFakeRelcacheEntry(rlocator); - - visibilitymap_pin(reln, block, &vmbuffer); - visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN); - - ReleaseBuffer(vmbuffer); - FreeFakeRelcacheEntry(reln); - } - - if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) - { - page = BufferGetPage(buffer); - - offnum = xlrec->offnum; - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); - - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "invalid lp"); - - htup = (HeapTupleHeader) PageGetItem(page, lp); - - htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); - htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; - fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, - &htup->t_infomask2); - HeapTupleHeaderSetXmax(htup, xlrec->xmax); - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); -} - -static void -heap_xlog_inplace(XLogReaderState *record) -{ - XLogRecPtr lsn = record->EndRecPtr; - xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record); - Buffer buffer; - Page page; - OffsetNumber offnum; - ItemId lp = NULL; - HeapTupleHeader htup; - uint32 oldlen; - Size newlen; - - if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) - { - char *newtup = XLogRecGetBlockData(record, 0, &newlen); - - page = BufferGetPage(buffer); - - offnum = xlrec->offnum; - if (PageGetMaxOffsetNumber(page) >= offnum) - lp = PageGetItemId(page, offnum); - - if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) - elog(PANIC, "invalid lp"); - - htup = (HeapTupleHeader) PageGetItem(page, lp); - - oldlen = ItemIdGetLength(lp) - htup->t_hoff; - if (oldlen != newlen) - elog(PANIC, "wrong tuple length"); - - memcpy((char *) htup + htup->t_hoff, newtup, newlen); - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); -} - -void -heap_redo(XLogReaderState *record) -{ - uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; - - /* - * These operations don't overwrite MVCC data so no conflict processing is - * required. The ones in heap2 rmgr do. - */ - - switch (info & XLOG_HEAP_OPMASK) - { - case XLOG_HEAP_INSERT: - heap_xlog_insert(record); - break; - case XLOG_HEAP_DELETE: - heap_xlog_delete(record); - break; - case XLOG_HEAP_UPDATE: - heap_xlog_update(record, false); - break; - case XLOG_HEAP_TRUNCATE: - - /* - * TRUNCATE is a no-op because the actions are already logged as - * SMGR WAL records. TRUNCATE WAL record only exists for logical - * decoding. - */ - break; - case XLOG_HEAP_HOT_UPDATE: - heap_xlog_update(record, true); - break; - case XLOG_HEAP_CONFIRM: - heap_xlog_confirm(record); - break; - case XLOG_HEAP_LOCK: - heap_xlog_lock(record); - break; - case XLOG_HEAP_INPLACE: - heap_xlog_inplace(record); - break; - default: - elog(PANIC, "heap_redo: unknown op code %u", info); - } -} - -void -heap2_redo(XLogReaderState *record) -{ - uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; - - switch (info & XLOG_HEAP_OPMASK) - { - case XLOG_HEAP2_PRUNE_ON_ACCESS: - case XLOG_HEAP2_PRUNE_VACUUM_SCAN: - case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP: - heap_xlog_prune_freeze(record); - break; - case XLOG_HEAP2_VISIBLE: - heap_xlog_visible(record); - break; - case XLOG_HEAP2_MULTI_INSERT: - heap_xlog_multi_insert(record); - break; - case XLOG_HEAP2_LOCK_UPDATED: - heap_xlog_lock_updated(record); - break; - case XLOG_HEAP2_NEW_CID: - - /* - * Nothing to do on a real replay, only used during logical - * decoding. - */ - break; - case XLOG_HEAP2_REWRITE: - heap_xlog_logical_rewrite(record); - break; - default: - elog(PANIC, "heap2_redo: unknown op code %u", info); - } -} - -/* - * Mask a heap page before performing consistency checks on it. - */ -void -heap_mask(char *pagedata, BlockNumber blkno) -{ - Page page = (Page) pagedata; - OffsetNumber off; - - mask_page_lsn_and_checksum(page); - - mask_page_hint_bits(page); - mask_unused_space(page); - - for (off = 1; off <= PageGetMaxOffsetNumber(page); off++) - { - ItemId iid = PageGetItemId(page, off); - char *page_item; - - page_item = (char *) (page + ItemIdGetOffset(iid)); - - if (ItemIdIsNormal(iid)) - { - HeapTupleHeader page_htup = (HeapTupleHeader) page_item; - - /* - * If xmin of a tuple is not yet frozen, we should ignore - * differences in hint bits, since they can be set without - * emitting WAL. - */ - if (!HeapTupleHeaderXminFrozen(page_htup)) - page_htup->t_infomask &= ~HEAP_XACT_MASK; - else - { - /* Still we need to mask xmax hint bits. */ - page_htup->t_infomask &= ~HEAP_XMAX_INVALID; - page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED; - } - - /* - * During replay, we set Command Id to FirstCommandId. Hence, mask - * it. See heap_xlog_insert() for details. - */ - page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER; - - /* - * For a speculative tuple, heap_insert() does not set ctid in the - * caller-passed heap tuple itself, leaving the ctid field to - * contain a speculative token value - a per-backend monotonically - * increasing identifier. Besides, it does not WAL-log ctid under - * any circumstances. - * - * During redo, heap_xlog_insert() sets t_ctid to current block - * number and self offset number. It doesn't care about any - * speculative insertions on the primary. Hence, we set t_ctid to - * current block number and self offset number to ignore any - * inconsistency. - */ - if (HeapTupleHeaderIsSpeculative(page_htup)) - ItemPointerSet(&page_htup->t_ctid, blkno, off); - - /* - * NB: Not ignoring ctid changes due to the tuple having moved - * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's - * important information that needs to be in-sync between primary - * and standby, and thus is WAL logged. - */ - } - - /* - * Ignore any padding bytes after the tuple, when the length of the - * item is not MAXALIGNed. - */ - if (ItemIdHasStorage(iid)) - { - int len = ItemIdGetLength(iid); - int padlen = MAXALIGN(len) - len; - - if (padlen > 0) - memset(page_item + len, MASK_MARKER, padlen); - } - } -} - /* * HeapCheckForSerializableConflictOut * We are reading a tuple. If it's not visible, there may be a diff --git a/src/backend/access/heap/heapam_xlog.c b/src/backend/access/heap/heapam_xlog.c new file mode 100644 index 0000000000..6dae7233ec --- /dev/null +++ b/src/backend/access/heap/heapam_xlog.c @@ -0,0 +1,1339 @@ +/*------------------------------------------------------------------------- + * + * heapam_xlog.c + * WAL replay logic for heap access method. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/heap/heapam_xlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/bufmask.h" +#include "access/heapam.h" +#include "access/visibilitymap.h" +#include "access/xlog.h" +#include "access/xlogutils.h" +#include "storage/freespace.h" +#include "storage/standby.h" + + +/* + * Replay XLOG_HEAP2_PRUNE_* records. + */ +static void +heap_xlog_prune_freeze(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + char *maindataptr = XLogRecGetData(record); + xl_heap_prune xlrec; + Buffer buffer; + RelFileLocator rlocator; + BlockNumber blkno; + XLogRedoAction action; + + XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); + memcpy(&xlrec, maindataptr, SizeOfHeapPrune); + maindataptr += SizeOfHeapPrune; + + /* + * We will take an ordinary exclusive lock or a cleanup lock depending on + * whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive + * lock, we better not be doing anything that requires moving existing + * tuple data. + */ + Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 || + (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0); + + /* + * We are about to remove and/or freeze tuples. In Hot Standby mode, + * ensure that there are no queries running for which the removed tuples + * are still visible or which still consider the frozen xids as running. + * The conflict horizon XID comes after xl_heap_prune. + */ + if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0) + { + TransactionId snapshot_conflict_horizon; + + /* memcpy() because snapshot_conflict_horizon is stored unaligned */ + memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId)); + maindataptr += sizeof(TransactionId); + + if (InHotStandby) + ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon, + (xlrec.flags & XLHP_IS_CATALOG_REL) != 0, + rlocator); + } + + /* + * If we have a full-page image, restore it and we're done. + */ + action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, + (xlrec.flags & XLHP_CLEANUP_LOCK) != 0, + &buffer); + if (action == BLK_NEEDS_REDO) + { + Page page = (Page) BufferGetPage(buffer); + OffsetNumber *redirected; + OffsetNumber *nowdead; + OffsetNumber *nowunused; + int nredirected; + int ndead; + int nunused; + int nplans; + Size datalen; + xlhp_freeze_plan *plans; + OffsetNumber *frz_offsets; + char *dataptr = XLogRecGetBlockData(record, 0, &datalen); + + heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags, + &nplans, &plans, &frz_offsets, + &nredirected, &redirected, + &ndead, &nowdead, + &nunused, &nowunused); + + /* + * Update all line pointers per the record, and repair fragmentation + * if needed. + */ + if (nredirected > 0 || ndead > 0 || nunused > 0) + heap_page_prune_execute(buffer, + (xlrec.flags & XLHP_CLEANUP_LOCK) == 0, + redirected, nredirected, + nowdead, ndead, + nowunused, nunused); + + /* Freeze tuples */ + for (int p = 0; p < nplans; p++) + { + HeapTupleFreeze frz; + + /* + * Convert freeze plan representation from WAL record into + * per-tuple format used by heap_execute_freeze_tuple + */ + frz.xmax = plans[p].xmax; + frz.t_infomask2 = plans[p].t_infomask2; + frz.t_infomask = plans[p].t_infomask; + frz.frzflags = plans[p].frzflags; + frz.offset = InvalidOffsetNumber; /* unused, but be tidy */ + + for (int i = 0; i < plans[p].ntuples; i++) + { + OffsetNumber offset = *(frz_offsets++); + ItemId lp; + HeapTupleHeader tuple; + + lp = PageGetItemId(page, offset); + tuple = (HeapTupleHeader) PageGetItem(page, lp); + heap_execute_freeze_tuple(tuple, &frz); + } + } + + /* There should be no more data */ + Assert((char *) frz_offsets == dataptr + datalen); + + /* + * Note: we don't worry about updating the page's prunability hints. + * At worst this will cause an extra prune cycle to occur soon. + */ + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + + /* + * If we released any space or line pointers, update the free space map. + * + * Do this regardless of a full-page image being applied, since the FSM + * data is not in the page anyway. + */ + if (BufferIsValid(buffer)) + { + if (xlrec.flags & (XLHP_HAS_REDIRECTIONS | + XLHP_HAS_DEAD_ITEMS | + XLHP_HAS_NOW_UNUSED_ITEMS)) + { + Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer)); + + UnlockReleaseBuffer(buffer); + + XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); + } + else + UnlockReleaseBuffer(buffer); + } +} + +/* + * Replay XLOG_HEAP2_VISIBLE records. + * + * The critical integrity requirement here is that we must never end up with + * a situation where the visibility map bit is set, and the page-level + * PD_ALL_VISIBLE bit is clear. If that were to occur, then a subsequent + * page modification would fail to clear the visibility map bit. + */ +static void +heap_xlog_visible(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record); + Buffer vmbuffer = InvalidBuffer; + Buffer buffer; + Page page; + RelFileLocator rlocator; + BlockNumber blkno; + XLogRedoAction action; + + Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags); + + XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno); + + /* + * If there are any Hot Standby transactions running that have an xmin + * horizon old enough that this page isn't all-visible for them, they + * might incorrectly decide that an index-only scan can skip a heap fetch. + * + * NB: It might be better to throw some kind of "soft" conflict here that + * forces any index-only scan that is in flight to perform heap fetches, + * rather than killing the transaction outright. + */ + if (InHotStandby) + ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, + xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL, + rlocator); + + /* + * Read the heap page, if it still exists. If the heap file has dropped or + * truncated later in recovery, we don't need to update the page, but we'd + * better still update the visibility map. + */ + action = XLogReadBufferForRedo(record, 1, &buffer); + if (action == BLK_NEEDS_REDO) + { + /* + * We don't bump the LSN of the heap page when setting the visibility + * map bit (unless checksums or wal_hint_bits is enabled, in which + * case we must). This exposes us to torn page hazards, but since + * we're not inspecting the existing page contents in any way, we + * don't care. + */ + page = BufferGetPage(buffer); + + PageSetAllVisible(page); + + if (XLogHintBitIsNeeded()) + PageSetLSN(page, lsn); + + MarkBufferDirty(buffer); + } + else if (action == BLK_RESTORED) + { + /* + * If heap block was backed up, we already restored it and there's + * nothing more to do. (This can only happen with checksums or + * wal_log_hints enabled.) + */ + } + + if (BufferIsValid(buffer)) + { + Size space = PageGetFreeSpace(BufferGetPage(buffer)); + + UnlockReleaseBuffer(buffer); + + /* + * Since FSM is not WAL-logged and only updated heuristically, it + * easily becomes stale in standbys. If the standby is later promoted + * and runs VACUUM, it will skip updating individual free space + * figures for pages that became all-visible (or all-frozen, depending + * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum + * propagates too optimistic free space values to upper FSM layers; + * later inserters try to use such pages only to find out that they + * are unusable. This can cause long stalls when there are many such + * pages. + * + * Forestall those problems by updating FSM's idea about a page that + * is becoming all-visible or all-frozen. + * + * Do this regardless of a full-page image being applied, since the + * FSM data is not in the page anyway. + */ + if (xlrec->flags & VISIBILITYMAP_VALID_BITS) + XLogRecordPageWithFreeSpace(rlocator, blkno, space); + } + + /* + * Even if we skipped the heap page update due to the LSN interlock, it's + * still safe to update the visibility map. Any WAL record that clears + * the visibility map bit does so before checking the page LSN, so any + * bits that need to be cleared will still be cleared. + */ + if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false, + &vmbuffer) == BLK_NEEDS_REDO) + { + Page vmpage = BufferGetPage(vmbuffer); + Relation reln; + uint8 vmbits; + + /* initialize the page if it was read as zeros */ + if (PageIsNew(vmpage)) + PageInit(vmpage, BLCKSZ, 0); + + /* remove VISIBILITYMAP_XLOG_* */ + vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS; + + /* + * XLogReadBufferForRedoExtended locked the buffer. But + * visibilitymap_set will handle locking itself. + */ + LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK); + + reln = CreateFakeRelcacheEntry(rlocator); + visibilitymap_pin(reln, blkno, &vmbuffer); + + visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer, + xlrec->snapshotConflictHorizon, vmbits); + + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + else if (BufferIsValid(vmbuffer)) + UnlockReleaseBuffer(vmbuffer); +} + +/* + * Given an "infobits" field from an XLog record, set the correct bits in the + * given infomask and infomask2 for the tuple touched by the record. + * + * (This is the reverse of compute_infobits). + */ +static void +fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2) +{ + *infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY | + HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK); + *infomask2 &= ~HEAP_KEYS_UPDATED; + + if (infobits & XLHL_XMAX_IS_MULTI) + *infomask |= HEAP_XMAX_IS_MULTI; + if (infobits & XLHL_XMAX_LOCK_ONLY) + *infomask |= HEAP_XMAX_LOCK_ONLY; + if (infobits & XLHL_XMAX_EXCL_LOCK) + *infomask |= HEAP_XMAX_EXCL_LOCK; + /* note HEAP_XMAX_SHR_LOCK isn't considered here */ + if (infobits & XLHL_XMAX_KEYSHR_LOCK) + *infomask |= HEAP_XMAX_KEYSHR_LOCK; + + if (infobits & XLHL_KEYS_UPDATED) + *infomask2 |= HEAP_KEYS_UPDATED; +} + +/* + * Replay XLOG_HEAP_DELETE records. + */ +static void +heap_xlog_delete(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record); + Buffer buffer; + Page page; + ItemId lp = NULL; + HeapTupleHeader htup; + BlockNumber blkno; + RelFileLocator target_locator; + ItemPointerData target_tid; + + XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); + ItemPointerSetBlockNumber(&target_tid, blkno); + ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) + { + Relation reln = CreateFakeRelcacheEntry(target_locator); + Buffer vmbuffer = InvalidBuffer; + + visibilitymap_pin(reln, blkno, &vmbuffer); + visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + if (PageGetMaxOffsetNumber(page) >= xlrec->offnum) + lp = PageGetItemId(page, xlrec->offnum); + + if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; + HeapTupleHeaderClearHotUpdated(htup); + fix_infomask_from_infobits(xlrec->infobits_set, + &htup->t_infomask, &htup->t_infomask2); + if (!(xlrec->flags & XLH_DELETE_IS_SUPER)) + HeapTupleHeaderSetXmax(htup, xlrec->xmax); + else + HeapTupleHeaderSetXmin(htup, InvalidTransactionId); + HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page, XLogRecGetXid(record)); + + if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); + + /* Make sure t_ctid is set correctly */ + if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE) + HeapTupleHeaderSetMovedPartitions(htup); + else + htup->t_ctid = target_tid; + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +/* + * Replay XLOG_HEAP_INSERT records. + */ +static void +heap_xlog_insert(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record); + Buffer buffer; + Page page; + union + { + HeapTupleHeaderData hdr; + char data[MaxHeapTupleSize]; + } tbuf; + HeapTupleHeader htup; + xl_heap_header xlhdr; + uint32 newlen; + Size freespace = 0; + RelFileLocator target_locator; + BlockNumber blkno; + ItemPointerData target_tid; + XLogRedoAction action; + + XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); + ItemPointerSetBlockNumber(&target_tid, blkno); + ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) + { + Relation reln = CreateFakeRelcacheEntry(target_locator); + Buffer vmbuffer = InvalidBuffer; + + visibilitymap_pin(reln, blkno, &vmbuffer); + visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* + * If we inserted the first and only tuple on the page, re-initialize the + * page from scratch. + */ + if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) + { + buffer = XLogInitBufferForRedo(record, 0); + page = BufferGetPage(buffer); + PageInit(page, BufferGetPageSize(buffer), 0); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 0, &buffer); + if (action == BLK_NEEDS_REDO) + { + Size datalen; + char *data; + + page = BufferGetPage(buffer); + + if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum) + elog(PANIC, "invalid max offset number"); + + data = XLogRecGetBlockData(record, 0, &datalen); + + newlen = datalen - SizeOfHeapHeader; + Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize); + memcpy((char *) &xlhdr, data, SizeOfHeapHeader); + data += SizeOfHeapHeader; + + htup = &tbuf.hdr; + MemSet((char *) htup, 0, SizeofHeapTupleHeader); + /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ + memcpy((char *) htup + SizeofHeapTupleHeader, + data, + newlen); + newlen += SizeofHeapTupleHeader; + htup->t_infomask2 = xlhdr.t_infomask2; + htup->t_infomask = xlhdr.t_infomask; + htup->t_hoff = xlhdr.t_hoff; + HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + htup->t_ctid = target_tid; + + if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum, + true, true) == InvalidOffsetNumber) + elog(PANIC, "failed to add tuple"); + + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ + + PageSetLSN(page, lsn); + + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); + + /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */ + if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET) + PageSetAllVisible(page); + + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* + * If the page is running low on free space, update the FSM as well. + * Arbitrarily, our definition of "low" is less than 20%. We can't do much + * better than that without knowing the fill-factor for the table. + * + * XXX: Don't do this if the page was restored from full page image. We + * don't bother to update the FSM in that case, it doesn't need to be + * totally accurate anyway. + */ + if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) + XLogRecordPageWithFreeSpace(target_locator, blkno, freespace); +} + +/* + * Replay XLOG_HEAP2_MULTI_INSERT records. + */ +static void +heap_xlog_multi_insert(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_multi_insert *xlrec; + RelFileLocator rlocator; + BlockNumber blkno; + Buffer buffer; + Page page; + union + { + HeapTupleHeaderData hdr; + char data[MaxHeapTupleSize]; + } tbuf; + HeapTupleHeader htup; + uint32 newlen; + Size freespace = 0; + int i; + bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0; + XLogRedoAction action; + + /* + * Insertion doesn't overwrite MVCC data, so no conflict processing is + * required. + */ + xlrec = (xl_heap_multi_insert *) XLogRecGetData(record); + + XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); + + /* check that the mutually exclusive flags are not both set */ + Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) && + (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET))); + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) + { + Relation reln = CreateFakeRelcacheEntry(rlocator); + Buffer vmbuffer = InvalidBuffer; + + visibilitymap_pin(reln, blkno, &vmbuffer); + visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + if (isinit) + { + buffer = XLogInitBufferForRedo(record, 0); + page = BufferGetPage(buffer); + PageInit(page, BufferGetPageSize(buffer), 0); + action = BLK_NEEDS_REDO; + } + else + action = XLogReadBufferForRedo(record, 0, &buffer); + if (action == BLK_NEEDS_REDO) + { + char *tupdata; + char *endptr; + Size len; + + /* Tuples are stored as block data */ + tupdata = XLogRecGetBlockData(record, 0, &len); + endptr = tupdata + len; + + page = (Page) BufferGetPage(buffer); + + for (i = 0; i < xlrec->ntuples; i++) + { + OffsetNumber offnum; + xl_multi_insert_tuple *xlhdr; + + /* + * If we're reinitializing the page, the tuples are stored in + * order from FirstOffsetNumber. Otherwise there's an array of + * offsets in the WAL record, and the tuples come after that. + */ + if (isinit) + offnum = FirstOffsetNumber + i; + else + offnum = xlrec->offsets[i]; + if (PageGetMaxOffsetNumber(page) + 1 < offnum) + elog(PANIC, "invalid max offset number"); + + xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata); + tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple; + + newlen = xlhdr->datalen; + Assert(newlen <= MaxHeapTupleSize); + htup = &tbuf.hdr; + MemSet((char *) htup, 0, SizeofHeapTupleHeader); + /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ + memcpy((char *) htup + SizeofHeapTupleHeader, + (char *) tupdata, + newlen); + tupdata += newlen; + + newlen += SizeofHeapTupleHeader; + htup->t_infomask2 = xlhdr->t_infomask2; + htup->t_infomask = xlhdr->t_infomask; + htup->t_hoff = xlhdr->t_hoff; + HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + ItemPointerSetBlockNumber(&htup->t_ctid, blkno); + ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); + + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); + if (offnum == InvalidOffsetNumber) + elog(PANIC, "failed to add tuple"); + } + if (tupdata != endptr) + elog(PANIC, "total tuple length mismatch"); + + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ + + PageSetLSN(page, lsn); + + if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); + + /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */ + if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET) + PageSetAllVisible(page); + + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); + + /* + * If the page is running low on free space, update the FSM as well. + * Arbitrarily, our definition of "low" is less than 20%. We can't do much + * better than that without knowing the fill-factor for the table. + * + * XXX: Don't do this if the page was restored from full page image. We + * don't bother to update the FSM in that case, it doesn't need to be + * totally accurate anyway. + */ + if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) + XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); +} + +/* + * Replay XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE records. + */ +static void +heap_xlog_update(XLogReaderState *record, bool hot_update) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); + RelFileLocator rlocator; + BlockNumber oldblk; + BlockNumber newblk; + ItemPointerData newtid; + Buffer obuffer, + nbuffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleData oldtup; + HeapTupleHeader htup; + uint16 prefixlen = 0, + suffixlen = 0; + char *newp; + union + { + HeapTupleHeaderData hdr; + char data[MaxHeapTupleSize]; + } tbuf; + xl_heap_header xlhdr; + uint32 newlen; + Size freespace = 0; + XLogRedoAction oldaction; + XLogRedoAction newaction; + + /* initialize to keep the compiler quiet */ + oldtup.t_data = NULL; + oldtup.t_len = 0; + + XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk); + if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL)) + { + /* HOT updates are never done across pages */ + Assert(!hot_update); + } + else + oldblk = newblk; + + ItemPointerSet(&newtid, newblk, xlrec->new_offnum); + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) + { + Relation reln = CreateFakeRelcacheEntry(rlocator); + Buffer vmbuffer = InvalidBuffer; + + visibilitymap_pin(reln, oldblk, &vmbuffer); + visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS); + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* + * In normal operation, it is important to lock the two pages in + * page-number order, to avoid possible deadlocks against other update + * operations going the other way. However, during WAL replay there can + * be no other update happening, so we don't need to worry about that. But + * we *do* need to worry that we don't expose an inconsistent state to Hot + * Standby queries --- so the original page can't be unlocked before we've + * added the new tuple to the new page. + */ + + /* Deal with old tuple version */ + oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1, + &obuffer); + if (oldaction == BLK_NEEDS_REDO) + { + page = BufferGetPage(obuffer); + offnum = xlrec->old_offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + oldtup.t_data = htup; + oldtup.t_len = ItemIdGetLength(lp); + + htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; + if (hot_update) + HeapTupleHeaderSetHotUpdated(htup); + else + HeapTupleHeaderClearHotUpdated(htup); + fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask, + &htup->t_infomask2); + HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); + HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + /* Set forward chain link in t_ctid */ + htup->t_ctid = newtid; + + /* Mark the page as a candidate for pruning */ + PageSetPrunable(page, XLogRecGetXid(record)); + + if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); + + PageSetLSN(page, lsn); + MarkBufferDirty(obuffer); + } + + /* + * Read the page the new tuple goes into, if different from old. + */ + if (oldblk == newblk) + { + nbuffer = obuffer; + newaction = oldaction; + } + else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) + { + nbuffer = XLogInitBufferForRedo(record, 0); + page = (Page) BufferGetPage(nbuffer); + PageInit(page, BufferGetPageSize(nbuffer), 0); + newaction = BLK_NEEDS_REDO; + } + else + newaction = XLogReadBufferForRedo(record, 0, &nbuffer); + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) + { + Relation reln = CreateFakeRelcacheEntry(rlocator); + Buffer vmbuffer = InvalidBuffer; + + visibilitymap_pin(reln, newblk, &vmbuffer); + visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS); + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* Deal with new tuple */ + if (newaction == BLK_NEEDS_REDO) + { + char *recdata; + char *recdata_end; + Size datalen; + Size tuplen; + + recdata = XLogRecGetBlockData(record, 0, &datalen); + recdata_end = recdata + datalen; + + page = BufferGetPage(nbuffer); + + offnum = xlrec->new_offnum; + if (PageGetMaxOffsetNumber(page) + 1 < offnum) + elog(PANIC, "invalid max offset number"); + + if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD) + { + Assert(newblk == oldblk); + memcpy(&prefixlen, recdata, sizeof(uint16)); + recdata += sizeof(uint16); + } + if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD) + { + Assert(newblk == oldblk); + memcpy(&suffixlen, recdata, sizeof(uint16)); + recdata += sizeof(uint16); + } + + memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader); + recdata += SizeOfHeapHeader; + + tuplen = recdata_end - recdata; + Assert(tuplen <= MaxHeapTupleSize); + + htup = &tbuf.hdr; + MemSet((char *) htup, 0, SizeofHeapTupleHeader); + + /* + * Reconstruct the new tuple using the prefix and/or suffix from the + * old tuple, and the data stored in the WAL record. + */ + newp = (char *) htup + SizeofHeapTupleHeader; + if (prefixlen > 0) + { + int len; + + /* copy bitmap [+ padding] [+ oid] from WAL record */ + len = xlhdr.t_hoff - SizeofHeapTupleHeader; + memcpy(newp, recdata, len); + recdata += len; + newp += len; + + /* copy prefix from old tuple */ + memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen); + newp += prefixlen; + + /* copy new tuple data from WAL record */ + len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader); + memcpy(newp, recdata, len); + recdata += len; + newp += len; + } + else + { + /* + * copy bitmap [+ padding] [+ oid] + data from record, all in one + * go + */ + memcpy(newp, recdata, tuplen); + recdata += tuplen; + newp += tuplen; + } + Assert(recdata == recdata_end); + + /* copy suffix from old tuple */ + if (suffixlen > 0) + memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen); + + newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen; + htup->t_infomask2 = xlhdr.t_infomask2; + htup->t_infomask = xlhdr.t_infomask; + htup->t_hoff = xlhdr.t_hoff; + + HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); + HeapTupleHeaderSetCmin(htup, FirstCommandId); + HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); + /* Make sure there is no forward chain link in t_ctid */ + htup->t_ctid = newtid; + + offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); + if (offnum == InvalidOffsetNumber) + elog(PANIC, "failed to add tuple"); + + if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) + PageClearAllVisible(page); + + freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ + + PageSetLSN(page, lsn); + MarkBufferDirty(nbuffer); + } + + if (BufferIsValid(nbuffer) && nbuffer != obuffer) + UnlockReleaseBuffer(nbuffer); + if (BufferIsValid(obuffer)) + UnlockReleaseBuffer(obuffer); + + /* + * If the new page is running low on free space, update the FSM as well. + * Arbitrarily, our definition of "low" is less than 20%. We can't do much + * better than that without knowing the fill-factor for the table. + * + * However, don't update the FSM on HOT updates, because after crash + * recovery, either the old or the new tuple will certainly be dead and + * prunable. After pruning, the page will have roughly as much free space + * as it did before the update, assuming the new tuple is about the same + * size as the old one. + * + * XXX: Don't do this if the page was restored from full page image. We + * don't bother to update the FSM in that case, it doesn't need to be + * totally accurate anyway. + */ + if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5) + XLogRecordPageWithFreeSpace(rlocator, newblk, freespace); +} + +/* + * Replay XLOG_HEAP_CONFIRM records. + */ +static void +heap_xlog_confirm(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + offnum = xlrec->offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* + * Confirm tuple as actually inserted + */ + ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +/* + * Replay XLOG_HEAP_LOCK records. + */ +static void +heap_xlog_lock(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED) + { + RelFileLocator rlocator; + Buffer vmbuffer = InvalidBuffer; + BlockNumber block; + Relation reln; + + XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block); + reln = CreateFakeRelcacheEntry(rlocator); + + visibilitymap_pin(reln, block, &vmbuffer); + visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN); + + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = (Page) BufferGetPage(buffer); + + offnum = xlrec->offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; + fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, + &htup->t_infomask2); + + /* + * Clear relevant update flags, but only if the modified infomask says + * there's no update. + */ + if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask)) + { + HeapTupleHeaderClearHotUpdated(htup); + /* Make sure there is no forward chain link in t_ctid */ + ItemPointerSet(&htup->t_ctid, + BufferGetBlockNumber(buffer), + offnum); + } + HeapTupleHeaderSetXmax(htup, xlrec->xmax); + HeapTupleHeaderSetCmax(htup, FirstCommandId, false); + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +/* + * Replay XLOG_HEAP2_LOCK_UPDATED records. + */ +static void +heap_xlog_lock_updated(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_lock_updated *xlrec; + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + xlrec = (xl_heap_lock_updated *) XLogRecGetData(record); + + /* + * The visibility map may need to be fixed even if the heap page is + * already up-to-date. + */ + if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED) + { + RelFileLocator rlocator; + Buffer vmbuffer = InvalidBuffer; + BlockNumber block; + Relation reln; + + XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block); + reln = CreateFakeRelcacheEntry(rlocator); + + visibilitymap_pin(reln, block, &vmbuffer); + visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN); + + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + page = BufferGetPage(buffer); + + offnum = xlrec->offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); + htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; + fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, + &htup->t_infomask2); + HeapTupleHeaderSetXmax(htup, xlrec->xmax); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +/* + * Replay XLOG_HEAP_INPLACE records. + */ +static void +heap_xlog_inplace(XLogReaderState *record) +{ + XLogRecPtr lsn = record->EndRecPtr; + xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record); + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + uint32 oldlen; + Size newlen; + + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) + { + char *newtup = XLogRecGetBlockData(record, 0, &newlen); + + page = BufferGetPage(buffer); + + offnum = xlrec->offnum; + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) + elog(PANIC, "invalid lp"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + oldlen = ItemIdGetLength(lp) - htup->t_hoff; + if (oldlen != newlen) + elog(PANIC, "wrong tuple length"); + + memcpy((char *) htup + htup->t_hoff, newtup, newlen); + + PageSetLSN(page, lsn); + MarkBufferDirty(buffer); + } + if (BufferIsValid(buffer)) + UnlockReleaseBuffer(buffer); +} + +void +heap_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + /* + * These operations don't overwrite MVCC data so no conflict processing is + * required. The ones in heap2 rmgr do. + */ + + switch (info & XLOG_HEAP_OPMASK) + { + case XLOG_HEAP_INSERT: + heap_xlog_insert(record); + break; + case XLOG_HEAP_DELETE: + heap_xlog_delete(record); + break; + case XLOG_HEAP_UPDATE: + heap_xlog_update(record, false); + break; + case XLOG_HEAP_TRUNCATE: + + /* + * TRUNCATE is a no-op because the actions are already logged as + * SMGR WAL records. TRUNCATE WAL record only exists for logical + * decoding. + */ + break; + case XLOG_HEAP_HOT_UPDATE: + heap_xlog_update(record, true); + break; + case XLOG_HEAP_CONFIRM: + heap_xlog_confirm(record); + break; + case XLOG_HEAP_LOCK: + heap_xlog_lock(record); + break; + case XLOG_HEAP_INPLACE: + heap_xlog_inplace(record); + break; + default: + elog(PANIC, "heap_redo: unknown op code %u", info); + } +} + +void +heap2_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info & XLOG_HEAP_OPMASK) + { + case XLOG_HEAP2_PRUNE_ON_ACCESS: + case XLOG_HEAP2_PRUNE_VACUUM_SCAN: + case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP: + heap_xlog_prune_freeze(record); + break; + case XLOG_HEAP2_VISIBLE: + heap_xlog_visible(record); + break; + case XLOG_HEAP2_MULTI_INSERT: + heap_xlog_multi_insert(record); + break; + case XLOG_HEAP2_LOCK_UPDATED: + heap_xlog_lock_updated(record); + break; + case XLOG_HEAP2_NEW_CID: + + /* + * Nothing to do on a real replay, only used during logical + * decoding. + */ + break; + case XLOG_HEAP2_REWRITE: + heap_xlog_logical_rewrite(record); + break; + default: + elog(PANIC, "heap2_redo: unknown op code %u", info); + } +} + +/* + * Mask a heap page before performing consistency checks on it. + */ +void +heap_mask(char *pagedata, BlockNumber blkno) +{ + Page page = (Page) pagedata; + OffsetNumber off; + + mask_page_lsn_and_checksum(page); + + mask_page_hint_bits(page); + mask_unused_space(page); + + for (off = 1; off <= PageGetMaxOffsetNumber(page); off++) + { + ItemId iid = PageGetItemId(page, off); + char *page_item; + + page_item = (char *) (page + ItemIdGetOffset(iid)); + + if (ItemIdIsNormal(iid)) + { + HeapTupleHeader page_htup = (HeapTupleHeader) page_item; + + /* + * If xmin of a tuple is not yet frozen, we should ignore + * differences in hint bits, since they can be set without + * emitting WAL. + */ + if (!HeapTupleHeaderXminFrozen(page_htup)) + page_htup->t_infomask &= ~HEAP_XACT_MASK; + else + { + /* Still we need to mask xmax hint bits. */ + page_htup->t_infomask &= ~HEAP_XMAX_INVALID; + page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED; + } + + /* + * During replay, we set Command Id to FirstCommandId. Hence, mask + * it. See heap_xlog_insert() for details. + */ + page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER; + + /* + * For a speculative tuple, heap_insert() does not set ctid in the + * caller-passed heap tuple itself, leaving the ctid field to + * contain a speculative token value - a per-backend monotonically + * increasing identifier. Besides, it does not WAL-log ctid under + * any circumstances. + * + * During redo, heap_xlog_insert() sets t_ctid to current block + * number and self offset number. It doesn't care about any + * speculative insertions on the primary. Hence, we set t_ctid to + * current block number and self offset number to ignore any + * inconsistency. + */ + if (HeapTupleHeaderIsSpeculative(page_htup)) + ItemPointerSet(&page_htup->t_ctid, blkno, off); + + /* + * NB: Not ignoring ctid changes due to the tuple having moved + * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's + * important information that needs to be in-sync between primary + * and standby, and thus is WAL logged. + */ + } + + /* + * Ignore any padding bytes after the tuple, when the length of the + * item is not MAXALIGNed. + */ + if (ItemIdHasStorage(iid)) + { + int len = ItemIdGetLength(iid); + int padlen = MAXALIGN(len) - len; + + if (padlen > 0) + memset(page_item + len, MASK_MARKER, padlen); + } + } +} diff --git a/src/backend/access/heap/meson.build b/src/backend/access/heap/meson.build index e00d5b4f0d..19a990208e 100644 --- a/src/backend/access/heap/meson.build +++ b/src/backend/access/heap/meson.build @@ -4,6 +4,7 @@ backend_sources += files( 'heapam.c', 'heapam_handler.c', 'heapam_visibility.c', + 'heapam_xlog.c', 'heaptoast.c', 'hio.c', 'pruneheap.c', diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 9e9aec88a6..b92eb506ec 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -14,6 +14,7 @@ #ifndef HEAPAM_H #define HEAPAM_H +#include "access/heapam_xlog.h" #include "access/relation.h" /* for backward compatibility */ #include "access/relscan.h" #include "access/sdir.h" @@ -422,4 +423,28 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data, extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple, Buffer buffer, Snapshot snapshot); +/* + * heap_execute_freeze_tuple + * Execute the prepared freezing of a tuple with caller's freeze plan. + * + * Caller is responsible for ensuring that no other backend can access the + * storage underlying this tuple, either by holding an exclusive lock on the + * buffer containing it (which is what lazy VACUUM does), or by having it be + * in private storage (which is what CLUSTER and friends do). + */ +static inline void +heap_execute_freeze_tuple(HeapTupleHeader tuple, HeapTupleFreeze *frz) +{ + HeapTupleHeaderSetXmax(tuple, frz->xmax); + + if (frz->frzflags & XLH_FREEZE_XVAC) + HeapTupleHeaderSetXvac(tuple, FrozenTransactionId); + + if (frz->frzflags & XLH_INVALID_XVAC) + HeapTupleHeaderSetXvac(tuple, InvalidTransactionId); + + tuple->t_infomask = frz->t_infomask; + tuple->t_infomask2 = frz->t_infomask2; +} + #endif /* HEAPAM_H */