/*------------------------------------------------------------------------- * * heapam_xlog.c * WAL replay logic for heap access method. * * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * src/backend/access/heap/heapam_xlog.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/bufmask.h" #include "access/heapam.h" #include "access/visibilitymap.h" #include "access/xlog.h" #include "access/xlogutils.h" #include "storage/freespace.h" #include "storage/standby.h" /* * Replay XLOG_HEAP2_PRUNE_* records. */ static void heap_xlog_prune_freeze(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; char *maindataptr = XLogRecGetData(record); xl_heap_prune xlrec; Buffer buffer; RelFileLocator rlocator; BlockNumber blkno; XLogRedoAction action; XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); memcpy(&xlrec, maindataptr, SizeOfHeapPrune); maindataptr += SizeOfHeapPrune; /* * We will take an ordinary exclusive lock or a cleanup lock depending on * whether the XLHP_CLEANUP_LOCK flag is set. With an ordinary exclusive * lock, we better not be doing anything that requires moving existing * tuple data. */ Assert((xlrec.flags & XLHP_CLEANUP_LOCK) != 0 || (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS)) == 0); /* * We are about to remove and/or freeze tuples. In Hot Standby mode, * ensure that there are no queries running for which the removed tuples * are still visible or which still consider the frozen xids as running. * The conflict horizon XID comes after xl_heap_prune. */ if ((xlrec.flags & XLHP_HAS_CONFLICT_HORIZON) != 0) { TransactionId snapshot_conflict_horizon; /* memcpy() because snapshot_conflict_horizon is stored unaligned */ memcpy(&snapshot_conflict_horizon, maindataptr, sizeof(TransactionId)); maindataptr += sizeof(TransactionId); if (InHotStandby) ResolveRecoveryConflictWithSnapshot(snapshot_conflict_horizon, (xlrec.flags & XLHP_IS_CATALOG_REL) != 0, rlocator); } /* * If we have a full-page image, restore it and we're done. */ action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, (xlrec.flags & XLHP_CLEANUP_LOCK) != 0, &buffer); if (action == BLK_NEEDS_REDO) { Page page = (Page) BufferGetPage(buffer); OffsetNumber *redirected; OffsetNumber *nowdead; OffsetNumber *nowunused; int nredirected; int ndead; int nunused; int nplans; Size datalen; xlhp_freeze_plan *plans; OffsetNumber *frz_offsets; char *dataptr = XLogRecGetBlockData(record, 0, &datalen); heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags, &nplans, &plans, &frz_offsets, &nredirected, &redirected, &ndead, &nowdead, &nunused, &nowunused); /* * Update all line pointers per the record, and repair fragmentation * if needed. */ if (nredirected > 0 || ndead > 0 || nunused > 0) heap_page_prune_execute(buffer, (xlrec.flags & XLHP_CLEANUP_LOCK) == 0, redirected, nredirected, nowdead, ndead, nowunused, nunused); /* Freeze tuples */ for (int p = 0; p < nplans; p++) { HeapTupleFreeze frz; /* * Convert freeze plan representation from WAL record into * per-tuple format used by heap_execute_freeze_tuple */ frz.xmax = plans[p].xmax; frz.t_infomask2 = plans[p].t_infomask2; frz.t_infomask = plans[p].t_infomask; frz.frzflags = plans[p].frzflags; frz.offset = InvalidOffsetNumber; /* unused, but be tidy */ for (int i = 0; i < plans[p].ntuples; i++) { OffsetNumber offset = *(frz_offsets++); ItemId lp; HeapTupleHeader tuple; lp = PageGetItemId(page, offset); tuple = (HeapTupleHeader) PageGetItem(page, lp); heap_execute_freeze_tuple(tuple, &frz); } } /* There should be no more data */ Assert((char *) frz_offsets == dataptr + datalen); /* * Note: we don't worry about updating the page's prunability hints. * At worst this will cause an extra prune cycle to occur soon. */ PageSetLSN(page, lsn); MarkBufferDirty(buffer); } /* * If we released any space or line pointers, update the free space map. * * Do this regardless of a full-page image being applied, since the FSM * data is not in the page anyway. */ if (BufferIsValid(buffer)) { if (xlrec.flags & (XLHP_HAS_REDIRECTIONS | XLHP_HAS_DEAD_ITEMS | XLHP_HAS_NOW_UNUSED_ITEMS)) { Size freespace = PageGetHeapFreeSpace(BufferGetPage(buffer)); UnlockReleaseBuffer(buffer); XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); } else UnlockReleaseBuffer(buffer); } } /* * Replay XLOG_HEAP2_VISIBLE records. * * The critical integrity requirement here is that we must never end up with * a situation where the visibility map bit is set, and the page-level * PD_ALL_VISIBLE bit is clear. If that were to occur, then a subsequent * page modification would fail to clear the visibility map bit. */ static void heap_xlog_visible(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record); Buffer vmbuffer = InvalidBuffer; Buffer buffer; Page page; RelFileLocator rlocator; BlockNumber blkno; XLogRedoAction action; Assert((xlrec->flags & VISIBILITYMAP_XLOG_VALID_BITS) == xlrec->flags); XLogRecGetBlockTag(record, 1, &rlocator, NULL, &blkno); /* * If there are any Hot Standby transactions running that have an xmin * horizon old enough that this page isn't all-visible for them, they * might incorrectly decide that an index-only scan can skip a heap fetch. * * NB: It might be better to throw some kind of "soft" conflict here that * forces any index-only scan that is in flight to perform heap fetches, * rather than killing the transaction outright. */ if (InHotStandby) ResolveRecoveryConflictWithSnapshot(xlrec->snapshotConflictHorizon, xlrec->flags & VISIBILITYMAP_XLOG_CATALOG_REL, rlocator); /* * Read the heap page, if it still exists. If the heap file has dropped or * truncated later in recovery, we don't need to update the page, but we'd * better still update the visibility map. */ action = XLogReadBufferForRedo(record, 1, &buffer); if (action == BLK_NEEDS_REDO) { /* * We don't bump the LSN of the heap page when setting the visibility * map bit (unless checksums or wal_hint_bits is enabled, in which * case we must). This exposes us to torn page hazards, but since * we're not inspecting the existing page contents in any way, we * don't care. */ page = BufferGetPage(buffer); PageSetAllVisible(page); if (XLogHintBitIsNeeded()) PageSetLSN(page, lsn); MarkBufferDirty(buffer); } else if (action == BLK_RESTORED) { /* * If heap block was backed up, we already restored it and there's * nothing more to do. (This can only happen with checksums or * wal_log_hints enabled.) */ } if (BufferIsValid(buffer)) { Size space = PageGetFreeSpace(BufferGetPage(buffer)); UnlockReleaseBuffer(buffer); /* * Since FSM is not WAL-logged and only updated heuristically, it * easily becomes stale in standbys. If the standby is later promoted * and runs VACUUM, it will skip updating individual free space * figures for pages that became all-visible (or all-frozen, depending * on the vacuum mode,) which is troublesome when FreeSpaceMapVacuum * propagates too optimistic free space values to upper FSM layers; * later inserters try to use such pages only to find out that they * are unusable. This can cause long stalls when there are many such * pages. * * Forestall those problems by updating FSM's idea about a page that * is becoming all-visible or all-frozen. * * Do this regardless of a full-page image being applied, since the * FSM data is not in the page anyway. */ if (xlrec->flags & VISIBILITYMAP_VALID_BITS) XLogRecordPageWithFreeSpace(rlocator, blkno, space); } /* * Even if we skipped the heap page update due to the LSN interlock, it's * still safe to update the visibility map. Any WAL record that clears * the visibility map bit does so before checking the page LSN, so any * bits that need to be cleared will still be cleared. */ if (XLogReadBufferForRedoExtended(record, 0, RBM_ZERO_ON_ERROR, false, &vmbuffer) == BLK_NEEDS_REDO) { Page vmpage = BufferGetPage(vmbuffer); Relation reln; uint8 vmbits; /* initialize the page if it was read as zeros */ if (PageIsNew(vmpage)) PageInit(vmpage, BLCKSZ, 0); /* remove VISIBILITYMAP_XLOG_* */ vmbits = xlrec->flags & VISIBILITYMAP_VALID_BITS; /* * XLogReadBufferForRedoExtended locked the buffer. But * visibilitymap_set will handle locking itself. */ LockBuffer(vmbuffer, BUFFER_LOCK_UNLOCK); reln = CreateFakeRelcacheEntry(rlocator); visibilitymap_pin(reln, blkno, &vmbuffer); visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer, xlrec->snapshotConflictHorizon, vmbits); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } else if (BufferIsValid(vmbuffer)) UnlockReleaseBuffer(vmbuffer); } /* * Given an "infobits" field from an XLog record, set the correct bits in the * given infomask and infomask2 for the tuple touched by the record. * * (This is the reverse of compute_infobits). */ static void fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2) { *infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY | HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK); *infomask2 &= ~HEAP_KEYS_UPDATED; if (infobits & XLHL_XMAX_IS_MULTI) *infomask |= HEAP_XMAX_IS_MULTI; if (infobits & XLHL_XMAX_LOCK_ONLY) *infomask |= HEAP_XMAX_LOCK_ONLY; if (infobits & XLHL_XMAX_EXCL_LOCK) *infomask |= HEAP_XMAX_EXCL_LOCK; /* note HEAP_XMAX_SHR_LOCK isn't considered here */ if (infobits & XLHL_XMAX_KEYSHR_LOCK) *infomask |= HEAP_XMAX_KEYSHR_LOCK; if (infobits & XLHL_KEYS_UPDATED) *infomask2 |= HEAP_KEYS_UPDATED; } /* * Replay XLOG_HEAP_DELETE records. */ static void heap_xlog_delete(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_delete *xlrec = (xl_heap_delete *) XLogRecGetData(record); Buffer buffer; Page page; ItemId lp = NULL; HeapTupleHeader htup; BlockNumber blkno; RelFileLocator target_locator; ItemPointerData target_tid; XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); ItemPointerSetBlockNumber(&target_tid, blkno); ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_locator); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, blkno, &vmbuffer); visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); if (PageGetMaxOffsetNumber(page) >= xlrec->offnum) lp = PageGetItemId(page, xlrec->offnum); if (PageGetMaxOffsetNumber(page) < xlrec->offnum || !ItemIdIsNormal(lp)) elog(PANIC, "invalid lp"); htup = (HeapTupleHeader) PageGetItem(page, lp); htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; HeapTupleHeaderClearHotUpdated(htup); fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, &htup->t_infomask2); if (!(xlrec->flags & XLH_DELETE_IS_SUPER)) HeapTupleHeaderSetXmax(htup, xlrec->xmax); else HeapTupleHeaderSetXmin(htup, InvalidTransactionId); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); if (xlrec->flags & XLH_DELETE_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* Make sure t_ctid is set correctly */ if (xlrec->flags & XLH_DELETE_IS_PARTITION_MOVE) HeapTupleHeaderSetMovedPartitions(htup); else htup->t_ctid = target_tid; PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); } /* * Replay XLOG_HEAP_INSERT records. */ static void heap_xlog_insert(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_insert *xlrec = (xl_heap_insert *) XLogRecGetData(record); Buffer buffer; Page page; union { HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; HeapTupleHeader htup; xl_heap_header xlhdr; uint32 newlen; Size freespace = 0; RelFileLocator target_locator; BlockNumber blkno; ItemPointerData target_tid; XLogRedoAction action; XLogRecGetBlockTag(record, 0, &target_locator, NULL, &blkno); ItemPointerSetBlockNumber(&target_tid, blkno); ItemPointerSetOffsetNumber(&target_tid, xlrec->offnum); /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(target_locator); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, blkno, &vmbuffer); visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } /* * If we inserted the first and only tuple on the page, re-initialize the * page from scratch. */ if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) { buffer = XLogInitBufferForRedo(record, 0); page = BufferGetPage(buffer); PageInit(page, BufferGetPageSize(buffer), 0); action = BLK_NEEDS_REDO; } else action = XLogReadBufferForRedo(record, 0, &buffer); if (action == BLK_NEEDS_REDO) { Size datalen; char *data; page = BufferGetPage(buffer); if (PageGetMaxOffsetNumber(page) + 1 < xlrec->offnum) elog(PANIC, "invalid max offset number"); data = XLogRecGetBlockData(record, 0, &datalen); newlen = datalen - SizeOfHeapHeader; Assert(datalen > SizeOfHeapHeader && newlen <= MaxHeapTupleSize); memcpy((char *) &xlhdr, data, SizeOfHeapHeader); data += SizeOfHeapHeader; htup = &tbuf.hdr; MemSet((char *) htup, 0, SizeofHeapTupleHeader); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ memcpy((char *) htup + SizeofHeapTupleHeader, data, newlen); newlen += SizeofHeapTupleHeader; htup->t_infomask2 = xlhdr.t_infomask2; htup->t_infomask = xlhdr.t_infomask; htup->t_hoff = xlhdr.t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); HeapTupleHeaderSetCmin(htup, FirstCommandId); htup->t_ctid = target_tid; if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum, true, true) == InvalidOffsetNumber) elog(PANIC, "failed to add tuple"); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ PageSetLSN(page, lsn); if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */ if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET) PageSetAllVisible(page); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* * If the page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * * XXX: Don't do this if the page was restored from full page image. We * don't bother to update the FSM in that case, it doesn't need to be * totally accurate anyway. */ if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(target_locator, blkno, freespace); } /* * Replay XLOG_HEAP2_MULTI_INSERT records. */ static void heap_xlog_multi_insert(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_multi_insert *xlrec; RelFileLocator rlocator; BlockNumber blkno; Buffer buffer; Page page; union { HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; HeapTupleHeader htup; uint32 newlen; Size freespace = 0; int i; bool isinit = (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) != 0; XLogRedoAction action; /* * Insertion doesn't overwrite MVCC data, so no conflict processing is * required. */ xlrec = (xl_heap_multi_insert *) XLogRecGetData(record); XLogRecGetBlockTag(record, 0, &rlocator, NULL, &blkno); /* check that the mutually exclusive flags are not both set */ Assert(!((xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) && (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET))); /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rlocator); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, blkno, &vmbuffer); visibilitymap_clear(reln, blkno, vmbuffer, VISIBILITYMAP_VALID_BITS); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } if (isinit) { buffer = XLogInitBufferForRedo(record, 0); page = BufferGetPage(buffer); PageInit(page, BufferGetPageSize(buffer), 0); action = BLK_NEEDS_REDO; } else action = XLogReadBufferForRedo(record, 0, &buffer); if (action == BLK_NEEDS_REDO) { char *tupdata; char *endptr; Size len; /* Tuples are stored as block data */ tupdata = XLogRecGetBlockData(record, 0, &len); endptr = tupdata + len; page = (Page) BufferGetPage(buffer); for (i = 0; i < xlrec->ntuples; i++) { OffsetNumber offnum; xl_multi_insert_tuple *xlhdr; /* * If we're reinitializing the page, the tuples are stored in * order from FirstOffsetNumber. Otherwise there's an array of * offsets in the WAL record, and the tuples come after that. */ if (isinit) offnum = FirstOffsetNumber + i; else offnum = xlrec->offsets[i]; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "invalid max offset number"); xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(tupdata); tupdata = ((char *) xlhdr) + SizeOfMultiInsertTuple; newlen = xlhdr->datalen; Assert(newlen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, SizeofHeapTupleHeader); /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ memcpy((char *) htup + SizeofHeapTupleHeader, (char *) tupdata, newlen); tupdata += newlen; newlen += SizeofHeapTupleHeader; htup->t_infomask2 = xlhdr->t_infomask2; htup->t_infomask = xlhdr->t_infomask; htup->t_hoff = xlhdr->t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); HeapTupleHeaderSetCmin(htup, FirstCommandId); ItemPointerSetBlockNumber(&htup->t_ctid, blkno); ItemPointerSetOffsetNumber(&htup->t_ctid, offnum); offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); if (offnum == InvalidOffsetNumber) elog(PANIC, "failed to add tuple"); } if (tupdata != endptr) elog(PANIC, "total tuple length mismatch"); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ PageSetLSN(page, lsn); if (xlrec->flags & XLH_INSERT_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* XLH_INSERT_ALL_FROZEN_SET implies that all tuples are visible */ if (xlrec->flags & XLH_INSERT_ALL_FROZEN_SET) PageSetAllVisible(page); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); /* * If the page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * * XXX: Don't do this if the page was restored from full page image. We * don't bother to update the FSM in that case, it doesn't need to be * totally accurate anyway. */ if (action == BLK_NEEDS_REDO && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(rlocator, blkno, freespace); } /* * Replay XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE records. */ static void heap_xlog_update(XLogReaderState *record, bool hot_update) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); RelFileLocator rlocator; BlockNumber oldblk; BlockNumber newblk; ItemPointerData newtid; Buffer obuffer, nbuffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleData oldtup; HeapTupleHeader htup; uint16 prefixlen = 0, suffixlen = 0; char *newp; union { HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; xl_heap_header xlhdr; uint32 newlen; Size freespace = 0; XLogRedoAction oldaction; XLogRedoAction newaction; /* initialize to keep the compiler quiet */ oldtup.t_data = NULL; oldtup.t_len = 0; XLogRecGetBlockTag(record, 0, &rlocator, NULL, &newblk); if (XLogRecGetBlockTagExtended(record, 1, NULL, NULL, &oldblk, NULL)) { /* HOT updates are never done across pages */ Assert(!hot_update); } else oldblk = newblk; ItemPointerSet(&newtid, newblk, xlrec->new_offnum); /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rlocator); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, oldblk, &vmbuffer); visibilitymap_clear(reln, oldblk, vmbuffer, VISIBILITYMAP_VALID_BITS); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } /* * In normal operation, it is important to lock the two pages in * page-number order, to avoid possible deadlocks against other update * operations going the other way. However, during WAL replay there can * be no other update happening, so we don't need to worry about that. But * we *do* need to worry that we don't expose an inconsistent state to Hot * Standby queries --- so the original page can't be unlocked before we've * added the new tuple to the new page. */ /* Deal with old tuple version */ oldaction = XLogReadBufferForRedo(record, (oldblk == newblk) ? 0 : 1, &obuffer); if (oldaction == BLK_NEEDS_REDO) { page = BufferGetPage(obuffer); offnum = xlrec->old_offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) elog(PANIC, "invalid lp"); htup = (HeapTupleHeader) PageGetItem(page, lp); oldtup.t_data = htup; oldtup.t_len = ItemIdGetLength(lp); htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; if (hot_update) HeapTupleHeaderSetHotUpdated(htup); else HeapTupleHeaderClearHotUpdated(htup); fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask, &htup->t_infomask2); HeapTupleHeaderSetXmax(htup, xlrec->old_xmax); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); /* Set forward chain link in t_ctid */ htup->t_ctid = newtid; /* Mark the page as a candidate for pruning */ PageSetPrunable(page, XLogRecGetXid(record)); if (xlrec->flags & XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); PageSetLSN(page, lsn); MarkBufferDirty(obuffer); } /* * Read the page the new tuple goes into, if different from old. */ if (oldblk == newblk) { nbuffer = obuffer; newaction = oldaction; } else if (XLogRecGetInfo(record) & XLOG_HEAP_INIT_PAGE) { nbuffer = XLogInitBufferForRedo(record, 0); page = (Page) BufferGetPage(nbuffer); PageInit(page, BufferGetPageSize(nbuffer), 0); newaction = BLK_NEEDS_REDO; } else newaction = XLogReadBufferForRedo(record, 0, &nbuffer); /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(rlocator); Buffer vmbuffer = InvalidBuffer; visibilitymap_pin(reln, newblk, &vmbuffer); visibilitymap_clear(reln, newblk, vmbuffer, VISIBILITYMAP_VALID_BITS); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } /* Deal with new tuple */ if (newaction == BLK_NEEDS_REDO) { char *recdata; char *recdata_end; Size datalen; Size tuplen; recdata = XLogRecGetBlockData(record, 0, &datalen); recdata_end = recdata + datalen; page = BufferGetPage(nbuffer); offnum = xlrec->new_offnum; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "invalid max offset number"); if (xlrec->flags & XLH_UPDATE_PREFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&prefixlen, recdata, sizeof(uint16)); recdata += sizeof(uint16); } if (xlrec->flags & XLH_UPDATE_SUFFIX_FROM_OLD) { Assert(newblk == oldblk); memcpy(&suffixlen, recdata, sizeof(uint16)); recdata += sizeof(uint16); } memcpy((char *) &xlhdr, recdata, SizeOfHeapHeader); recdata += SizeOfHeapHeader; tuplen = recdata_end - recdata; Assert(tuplen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, SizeofHeapTupleHeader); /* * Reconstruct the new tuple using the prefix and/or suffix from the * old tuple, and the data stored in the WAL record. */ newp = (char *) htup + SizeofHeapTupleHeader; if (prefixlen > 0) { int len; /* copy bitmap [+ padding] [+ oid] from WAL record */ len = xlhdr.t_hoff - SizeofHeapTupleHeader; memcpy(newp, recdata, len); recdata += len; newp += len; /* copy prefix from old tuple */ memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen); newp += prefixlen; /* copy new tuple data from WAL record */ len = tuplen - (xlhdr.t_hoff - SizeofHeapTupleHeader); memcpy(newp, recdata, len); recdata += len; newp += len; } else { /* * copy bitmap [+ padding] [+ oid] + data from record, all in one * go */ memcpy(newp, recdata, tuplen); recdata += tuplen; newp += tuplen; } Assert(recdata == recdata_end); /* copy suffix from old tuple */ if (suffixlen > 0) memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen); newlen = SizeofHeapTupleHeader + tuplen + prefixlen + suffixlen; htup->t_infomask2 = xlhdr.t_infomask2; htup->t_infomask = xlhdr.t_infomask; htup->t_hoff = xlhdr.t_hoff; HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record)); HeapTupleHeaderSetCmin(htup, FirstCommandId); HeapTupleHeaderSetXmax(htup, xlrec->new_xmax); /* Make sure there is no forward chain link in t_ctid */ htup->t_ctid = newtid; offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true); if (offnum == InvalidOffsetNumber) elog(PANIC, "failed to add tuple"); if (xlrec->flags & XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ PageSetLSN(page, lsn); MarkBufferDirty(nbuffer); } if (BufferIsValid(nbuffer) && nbuffer != obuffer) UnlockReleaseBuffer(nbuffer); if (BufferIsValid(obuffer)) UnlockReleaseBuffer(obuffer); /* * If the new page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * * However, don't update the FSM on HOT updates, because after crash * recovery, either the old or the new tuple will certainly be dead and * prunable. After pruning, the page will have roughly as much free space * as it did before the update, assuming the new tuple is about the same * size as the old one. * * XXX: Don't do this if the page was restored from full page image. We * don't bother to update the FSM in that case, it doesn't need to be * totally accurate anyway. */ if (newaction == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(rlocator, newblk, freespace); } /* * Replay XLOG_HEAP_CONFIRM records. */ static void heap_xlog_confirm(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_confirm *xlrec = (xl_heap_confirm *) XLogRecGetData(record); Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); offnum = xlrec->offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) elog(PANIC, "invalid lp"); htup = (HeapTupleHeader) PageGetItem(page, lp); /* * Confirm tuple as actually inserted */ ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); } /* * Replay XLOG_HEAP_LOCK records. */ static void heap_xlog_lock(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED) { RelFileLocator rlocator; Buffer vmbuffer = InvalidBuffer; BlockNumber block; Relation reln; XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block); reln = CreateFakeRelcacheEntry(rlocator); visibilitymap_pin(reln, block, &vmbuffer); visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); offnum = xlrec->offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) elog(PANIC, "invalid lp"); htup = (HeapTupleHeader) PageGetItem(page, lp); htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, &htup->t_infomask2); /* * Clear relevant update flags, but only if the modified infomask says * there's no update. */ if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask)) { HeapTupleHeaderClearHotUpdated(htup); /* Make sure there is no forward chain link in t_ctid */ ItemPointerSet(&htup->t_ctid, BufferGetBlockNumber(buffer), offnum); } HeapTupleHeaderSetXmax(htup, xlrec->xmax); HeapTupleHeaderSetCmax(htup, FirstCommandId, false); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); } /* * Replay XLOG_HEAP2_LOCK_UPDATED records. */ static void heap_xlog_lock_updated(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_lock_updated *xlrec; Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; xlrec = (xl_heap_lock_updated *) XLogRecGetData(record); /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ if (xlrec->flags & XLH_LOCK_ALL_FROZEN_CLEARED) { RelFileLocator rlocator; Buffer vmbuffer = InvalidBuffer; BlockNumber block; Relation reln; XLogRecGetBlockTag(record, 0, &rlocator, NULL, &block); reln = CreateFakeRelcacheEntry(rlocator); visibilitymap_pin(reln, block, &vmbuffer); visibilitymap_clear(reln, block, vmbuffer, VISIBILITYMAP_ALL_FROZEN); ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); offnum = xlrec->offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) elog(PANIC, "invalid lp"); htup = (HeapTupleHeader) PageGetItem(page, lp); htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask, &htup->t_infomask2); HeapTupleHeaderSetXmax(htup, xlrec->xmax); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); } /* * Replay XLOG_HEAP_INPLACE records. */ static void heap_xlog_inplace(XLogReaderState *record) { XLogRecPtr lsn = record->EndRecPtr; xl_heap_inplace *xlrec = (xl_heap_inplace *) XLogRecGetData(record); Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; uint32 oldlen; Size newlen; if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { char *newtup = XLogRecGetBlockData(record, 0, &newlen); page = BufferGetPage(buffer); offnum = xlrec->offnum; if (PageGetMaxOffsetNumber(page) >= offnum) lp = PageGetItemId(page, offnum); if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) elog(PANIC, "invalid lp"); htup = (HeapTupleHeader) PageGetItem(page, lp); oldlen = ItemIdGetLength(lp) - htup->t_hoff; if (oldlen != newlen) elog(PANIC, "wrong tuple length"); memcpy((char *) htup + htup->t_hoff, newtup, newlen); PageSetLSN(page, lsn); MarkBufferDirty(buffer); } if (BufferIsValid(buffer)) UnlockReleaseBuffer(buffer); } void heap_redo(XLogReaderState *record) { uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; /* * These operations don't overwrite MVCC data so no conflict processing is * required. The ones in heap2 rmgr do. */ switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP_INSERT: heap_xlog_insert(record); break; case XLOG_HEAP_DELETE: heap_xlog_delete(record); break; case XLOG_HEAP_UPDATE: heap_xlog_update(record, false); break; case XLOG_HEAP_TRUNCATE: /* * TRUNCATE is a no-op because the actions are already logged as * SMGR WAL records. TRUNCATE WAL record only exists for logical * decoding. */ break; case XLOG_HEAP_HOT_UPDATE: heap_xlog_update(record, true); break; case XLOG_HEAP_CONFIRM: heap_xlog_confirm(record); break; case XLOG_HEAP_LOCK: heap_xlog_lock(record); break; case XLOG_HEAP_INPLACE: heap_xlog_inplace(record); break; default: elog(PANIC, "heap_redo: unknown op code %u", info); } } void heap2_redo(XLogReaderState *record) { uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP2_PRUNE_ON_ACCESS: case XLOG_HEAP2_PRUNE_VACUUM_SCAN: case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP: heap_xlog_prune_freeze(record); break; case XLOG_HEAP2_VISIBLE: heap_xlog_visible(record); break; case XLOG_HEAP2_MULTI_INSERT: heap_xlog_multi_insert(record); break; case XLOG_HEAP2_LOCK_UPDATED: heap_xlog_lock_updated(record); break; case XLOG_HEAP2_NEW_CID: /* * Nothing to do on a real replay, only used during logical * decoding. */ break; case XLOG_HEAP2_REWRITE: heap_xlog_logical_rewrite(record); break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } } /* * Mask a heap page before performing consistency checks on it. */ void heap_mask(char *pagedata, BlockNumber blkno) { Page page = (Page) pagedata; OffsetNumber off; mask_page_lsn_and_checksum(page); mask_page_hint_bits(page); mask_unused_space(page); for (off = 1; off <= PageGetMaxOffsetNumber(page); off++) { ItemId iid = PageGetItemId(page, off); char *page_item; page_item = (char *) (page + ItemIdGetOffset(iid)); if (ItemIdIsNormal(iid)) { HeapTupleHeader page_htup = (HeapTupleHeader) page_item; /* * If xmin of a tuple is not yet frozen, we should ignore * differences in hint bits, since they can be set without * emitting WAL. */ if (!HeapTupleHeaderXminFrozen(page_htup)) page_htup->t_infomask &= ~HEAP_XACT_MASK; else { /* Still we need to mask xmax hint bits. */ page_htup->t_infomask &= ~HEAP_XMAX_INVALID; page_htup->t_infomask &= ~HEAP_XMAX_COMMITTED; } /* * During replay, we set Command Id to FirstCommandId. Hence, mask * it. See heap_xlog_insert() for details. */ page_htup->t_choice.t_heap.t_field3.t_cid = MASK_MARKER; /* * For a speculative tuple, heap_insert() does not set ctid in the * caller-passed heap tuple itself, leaving the ctid field to * contain a speculative token value - a per-backend monotonically * increasing identifier. Besides, it does not WAL-log ctid under * any circumstances. * * During redo, heap_xlog_insert() sets t_ctid to current block * number and self offset number. It doesn't care about any * speculative insertions on the primary. Hence, we set t_ctid to * current block number and self offset number to ignore any * inconsistency. */ if (HeapTupleHeaderIsSpeculative(page_htup)) ItemPointerSet(&page_htup->t_ctid, blkno, off); /* * NB: Not ignoring ctid changes due to the tuple having moved * (i.e. HeapTupleHeaderIndicatesMovedPartitions), because that's * important information that needs to be in-sync between primary * and standby, and thus is WAL logged. */ } /* * Ignore any padding bytes after the tuple, when the length of the * item is not MAXALIGNed. */ if (ItemIdHasStorage(iid)) { int len = ItemIdGetLength(iid); int padlen = MAXALIGN(len) - len; if (padlen > 0) memset(page_item + len, MASK_MARKER, padlen); } } }