diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 71ec74015c..e2337acc2a 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -6605,10 +6605,15 @@ log_heap_update(Relation reln, Buffer oldbuf, xl_heap_header_len xlhdr; xl_heap_header_len xlhdr_idx; uint8 info; + uint16 prefix_suffix[2]; + uint16 prefixlen = 0, + suffixlen = 0; XLogRecPtr recptr; - XLogRecData rdata[7]; + XLogRecData rdata[9]; Page page = BufferGetPage(newbuf); bool need_tuple_data = RelationIsLogicallyLogged(reln); + int nr; + Buffer newbufref; /* Caller should not call me on a non-WAL-logged relation */ Assert(RelationNeedsWAL(reln)); @@ -6618,6 +6623,57 @@ log_heap_update(Relation reln, Buffer oldbuf, else info = XLOG_HEAP_UPDATE; + /* + * If the old and new tuple are on the same page, we only need to log + * the parts of the new tuple that were changed. That saves on the amount + * of WAL we need to write. Currently, we just count any unchanged bytes + * in the beginning and end of the tuple. That's quick to check, and + * perfectly covers the common case that only one field is updated. + * + * We could do this even if the old and new tuple are on different pages, + * but only if we don't make a full-page image of the old page, which is + * difficult to know in advance. Also, if the old tuple is corrupt for + * some reason, it would allow the corruption to propagate the new page, + * so it seems best to avoid. Under the general assumption that most + * updates tend to create the new tuple version on the same page, there + * isn't much to be gained by doing this across pages anyway. + * + * Skip this if we're taking a full-page image of the new page, as we don't + * include the new tuple in the WAL record in that case. Also disable if + * wal_level='logical', as logical decoding needs to be able to read the + * new tuple in whole from the WAL record alone. + */ + if (oldbuf == newbuf && !need_tuple_data && + !XLogCheckBufferNeedsBackup(newbuf)) + { + char *oldp = (char *) oldtup->t_data + oldtup->t_data->t_hoff; + char *newp = (char *) newtup->t_data + newtup->t_data->t_hoff; + int oldlen = oldtup->t_len - oldtup->t_data->t_hoff; + int newlen = newtup->t_len - newtup->t_data->t_hoff; + + /* Check for common prefix between old and new tuple */ + for (prefixlen = 0; prefixlen < Min(oldlen, newlen); prefixlen++) + { + if (newp[prefixlen] != oldp[prefixlen]) + break; + } + /* + * Storing the length of the prefix takes 2 bytes, so we need to save + * at least 3 bytes or there's no point. + */ + if (prefixlen < 3) + prefixlen = 0; + + /* Same for suffix */ + for (suffixlen = 0; suffixlen < Min(oldlen, newlen) - prefixlen; suffixlen++) + { + if (newp[newlen - suffixlen - 1] != oldp[oldlen - suffixlen - 1]) + break; + } + if (suffixlen < 3) + suffixlen = 0; + } + xlrec.target.node = reln->rd_node; xlrec.target.tid = oldtup->t_self; xlrec.old_xmax = HeapTupleHeaderGetRawXmax(oldtup->t_data); @@ -6630,41 +6686,119 @@ log_heap_update(Relation reln, Buffer oldbuf, xlrec.newtid = newtup->t_self; if (new_all_visible_cleared) xlrec.flags |= XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED; + if (prefixlen > 0) + xlrec.flags |= XLOG_HEAP_PREFIX_FROM_OLD; + if (suffixlen > 0) + xlrec.flags |= XLOG_HEAP_SUFFIX_FROM_OLD; - rdata[0].data = (char *) &xlrec; - rdata[0].len = SizeOfHeapUpdate; - rdata[0].buffer = InvalidBuffer; + /* If new tuple is the single and first tuple on page... */ + if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && + PageGetMaxOffsetNumber(page) == FirstOffsetNumber) + { + info |= XLOG_HEAP_INIT_PAGE; + newbufref = InvalidBuffer; + } + else + newbufref = newbuf; + + rdata[0].data = NULL; + rdata[0].len = 0; + rdata[0].buffer = oldbuf; + rdata[0].buffer_std = true; rdata[0].next = &(rdata[1]); - rdata[1].data = NULL; - rdata[1].len = 0; - rdata[1].buffer = oldbuf; - rdata[1].buffer_std = true; + rdata[1].data = (char *) &xlrec; + rdata[1].len = SizeOfHeapUpdate; + rdata[1].buffer = InvalidBuffer; rdata[1].next = &(rdata[2]); + /* prefix and/or suffix length fields */ + if (prefixlen > 0 || suffixlen > 0) + { + if (prefixlen > 0 && suffixlen > 0) + { + prefix_suffix[0] = prefixlen; + prefix_suffix[1] = suffixlen; + rdata[2].data = (char *) &prefix_suffix; + rdata[2].len = 2 * sizeof(uint16); + } + else if (prefixlen > 0) + { + rdata[2].data = (char *) &prefixlen; + rdata[2].len = sizeof(uint16); + } + else + { + rdata[2].data = (char *) &suffixlen; + rdata[2].len = sizeof(uint16); + } + rdata[2].buffer = newbufref; + rdata[2].buffer_std = true; + rdata[2].next = &(rdata[3]); + nr = 3; + } + else + nr = 2; + xlhdr.header.t_infomask2 = newtup->t_data->t_infomask2; xlhdr.header.t_infomask = newtup->t_data->t_infomask; xlhdr.header.t_hoff = newtup->t_data->t_hoff; - xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); + Assert(offsetof(HeapTupleHeaderData, t_bits) + prefixlen + suffixlen <= newtup->t_len); + xlhdr.t_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - prefixlen - suffixlen; /* - * As with insert records, we need not store the rdata[2] segment - * if we decide to store the whole buffer instead unless we're - * doing logical decoding. + * As with insert records, we need not store this rdata segment if we + * decide to store the whole buffer instead, unless we're doing logical + * decoding. */ - rdata[2].data = (char *) &xlhdr; - rdata[2].len = SizeOfHeapHeaderLen; - rdata[2].buffer = need_tuple_data ? InvalidBuffer : newbuf; - rdata[2].buffer_std = true; - rdata[2].next = &(rdata[3]); + rdata[nr].data = (char *) &xlhdr; + rdata[nr].len = SizeOfHeapHeaderLen; + rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; + rdata[nr].buffer_std = true; + rdata[nr].next = &(rdata[nr + 1]); + nr++; - /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[3].data = (char *) newtup->t_data - + offsetof(HeapTupleHeaderData, t_bits); - rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[3].buffer = need_tuple_data ? InvalidBuffer : newbuf; - rdata[3].buffer_std = true; - rdata[3].next = NULL; + /* + * PG73FORMAT: write bitmap [+ padding] [+ oid] + data + * + * The 'data' doesn't include the common prefix or suffix. + */ + if (prefixlen == 0) + { + rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits); + rdata[nr].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits) - suffixlen; + rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; + rdata[nr].buffer_std = true; + rdata[nr].next = NULL; + nr++; + } + else + { + /* + * Have to write the null bitmap and data after the common prefix as + * two separate rdata entries. + */ + /* bitmap [+ padding] [+ oid] */ + if (newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits) > 0) + { + rdata[nr - 1].next = &(rdata[nr]); + rdata[nr].data = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits); + rdata[nr].len = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits); + rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; + rdata[nr].buffer_std = true; + rdata[nr].next = NULL; + nr++; + } + + /* data after common prefix */ + rdata[nr - 1].next = &(rdata[nr]); + rdata[nr].data = ((char *) newtup->t_data) + newtup->t_data->t_hoff + prefixlen; + rdata[nr].len = newtup->t_len - newtup->t_data->t_hoff - prefixlen - suffixlen; + rdata[nr].buffer = need_tuple_data ? InvalidBuffer : newbufref; + rdata[nr].buffer_std = true; + rdata[nr].next = NULL; + nr++; + } /* * Separate storage for the FPW buffer reference of the new page in the @@ -6672,13 +6806,15 @@ log_heap_update(Relation reln, Buffer oldbuf, */ if (need_tuple_data) { - rdata[3].next = &(rdata[4]); + rdata[nr - 1].next = &(rdata[nr]); + + rdata[nr].data = NULL, + rdata[nr].len = 0; + rdata[nr].buffer = newbufref; + rdata[nr].buffer_std = true; + rdata[nr].next = NULL; + nr++; - rdata[4].data = NULL, - rdata[4].len = 0; - rdata[4].buffer = newbuf; - rdata[4].buffer_std = true; - rdata[4].next = NULL; xlrec.flags |= XLOG_HEAP_CONTAINS_NEW_TUPLE; /* We need to log a tuple identity */ @@ -6690,19 +6826,21 @@ log_heap_update(Relation reln, Buffer oldbuf, xlhdr_idx.header.t_hoff = old_key_tuple->t_data->t_hoff; xlhdr_idx.t_len = old_key_tuple->t_len; - rdata[4].next = &(rdata[5]); - rdata[5].data = (char *) &xlhdr_idx; - rdata[5].len = SizeOfHeapHeaderLen; - rdata[5].buffer = InvalidBuffer; - rdata[5].next = &(rdata[6]); + rdata[nr - 1].next = &(rdata[nr]); + rdata[nr].data = (char *) &xlhdr_idx; + rdata[nr].len = SizeOfHeapHeaderLen; + rdata[nr].buffer = InvalidBuffer; + rdata[nr].next = &(rdata[nr + 1]); + nr++; /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */ - rdata[6].data = (char *) old_key_tuple->t_data + rdata[nr].data = (char *) old_key_tuple->t_data + offsetof(HeapTupleHeaderData, t_bits); - rdata[6].len = old_key_tuple->t_len + rdata[nr].len = old_key_tuple->t_len - offsetof(HeapTupleHeaderData, t_bits); - rdata[6].buffer = InvalidBuffer; - rdata[6].next = NULL; + rdata[nr].buffer = InvalidBuffer; + rdata[nr].next = NULL; + nr++; if (reln->rd_rel->relreplident == REPLICA_IDENTITY_FULL) xlrec.flags |= XLOG_HEAP_CONTAINS_OLD_TUPLE; @@ -6711,19 +6849,6 @@ log_heap_update(Relation reln, Buffer oldbuf, } } - /* If new tuple is the single and first tuple on page... */ - if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber && - PageGetMaxOffsetNumber(page) == FirstOffsetNumber) - { - XLogRecData *rcur = &rdata[2]; - info |= XLOG_HEAP_INIT_PAGE; - while (rcur != NULL) - { - rcur->buffer = InvalidBuffer; - rcur = rcur->next; - } - } - recptr = XLogInsert(RM_HEAP_ID, info, rdata); return recptr; @@ -7750,17 +7875,25 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) Page page; OffsetNumber offnum; ItemId lp = NULL; + HeapTupleData oldtup; HeapTupleHeader htup; + char *recdata; + uint16 prefixlen = 0, + suffixlen = 0; + char *newp; struct { HeapTupleHeaderData hdr; char data[MaxHeapTupleSize]; } tbuf; xl_heap_header_len xlhdr; - int hsize; uint32 newlen; Size freespace; + /* initialize to keep the compiler quiet */ + oldtup.t_data = NULL; + oldtup.t_len = 0; + /* * The visibility map may need to be fixed even if the heap page is * already up-to-date. @@ -7827,6 +7960,9 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) htup = (HeapTupleHeader) PageGetItem(page, lp); + oldtup.t_data = htup; + oldtup.t_len = ItemIdGetLength(lp); + htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED); htup->t_infomask2 &= ~HEAP_KEYS_UPDATED; if (hot_update) @@ -7925,20 +8061,63 @@ newsame:; if (PageGetMaxOffsetNumber(page) + 1 < offnum) elog(PANIC, "heap_update_redo: invalid max offset number"); - hsize = SizeOfHeapUpdate + SizeOfHeapHeaderLen; + recdata = (char *) xlrec + SizeOfHeapUpdate; - memcpy((char *) &xlhdr, - (char *) xlrec + SizeOfHeapUpdate, - SizeOfHeapHeaderLen); - newlen = xlhdr.t_len; - Assert(newlen <= MaxHeapTupleSize); + if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD) + { + memcpy(&prefixlen, recdata, sizeof(uint16)); + recdata += sizeof(uint16); + } + if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD) + { + memcpy(&suffixlen, recdata, sizeof(uint16)); + recdata += sizeof(uint16); + } + + memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen); + recdata += SizeOfHeapHeaderLen; + + Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); - /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ - memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), - (char *) xlrec + hsize, - newlen); - newlen += offsetof(HeapTupleHeaderData, t_bits); + + /* + * Reconstruct the new tuple using the prefix and/or suffix from the old + * tuple, and the data stored in the WAL record. + */ + newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits); + if (prefixlen > 0) + { + int len; + + /* copy bitmap [+ padding] [+ oid] from WAL record */ + len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits); + memcpy(newp, recdata, len); + recdata += len; + newp += len; + + /* copy prefix from old tuple */ + memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen); + newp += prefixlen; + + /* copy new tuple data from WAL record */ + len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits)); + memcpy(newp, recdata, len); + recdata += len; + newp += len; + } + else + { + /* copy bitmap [+ padding] [+ oid] + data from record, all in one go */ + memcpy(newp, recdata, xlhdr.t_len); + recdata += xlhdr.t_len; + newp += xlhdr.t_len; + } + /* copy suffix from old tuple */ + if (suffixlen > 0) + memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen); + + newlen = offsetof(HeapTupleHeaderData, t_bits) + xlhdr.t_len + prefixlen + suffixlen; htup->t_infomask2 = xlhdr.header.t_infomask2; htup->t_infomask = xlhdr.header.t_infomask; htup->t_hoff = xlhdr.header.t_hoff; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index cdbe305f95..141edf4327 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2335,6 +2335,29 @@ XLogRecPtrToBytePos(XLogRecPtr ptr) return result; } +/* + * Determine whether the buffer referenced has to be backed up. + * + * Since we don't yet have the insert lock, fullPageWrites and forcePageWrites + * could change later, so the result should be used for optimization purposes + * only. + */ +bool +XLogCheckBufferNeedsBackup(Buffer buffer) +{ + bool doPageWrites; + Page page; + + page = BufferGetPage(buffer); + + doPageWrites = XLogCtl->Insert.fullPageWrites || XLogCtl->Insert.forcePageWrites; + + if (doPageWrites && PageGetLSN(page) <= RedoRecPtr) + return true; /* buffer requires backup */ + + return false; /* buffer does not need to be backed up */ +} + /* * Determine whether the buffer referenced by an XLogRecData item has to * be backed up, and if so fill a BkpBlock struct for it. In any case diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h index 194635952c..d6bc8f7f24 100644 --- a/src/include/access/heapam_xlog.h +++ b/src/include/access/heapam_xlog.h @@ -67,6 +67,8 @@ #define XLOG_HEAP_CONTAINS_OLD_TUPLE (1<<2) #define XLOG_HEAP_CONTAINS_OLD_KEY (1<<3) #define XLOG_HEAP_CONTAINS_NEW_TUPLE (1<<4) +#define XLOG_HEAP_PREFIX_FROM_OLD (1<<5) +#define XLOG_HEAP_SUFFIX_FROM_OLD (1<<6) /* convenience macro for checking whether any form of old tuple was logged */ #define XLOG_HEAP_CONTAINS_OLD \ @@ -179,7 +181,22 @@ typedef struct xl_heap_update ItemPointerData newtid; /* new inserted tuple id */ uint8 old_infobits_set; /* infomask bits to set on old tuple */ uint8 flags; - /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */ + + /* + * If XLOG_HEAP_PREFIX_FROM_OLD or XLOG_HEAP_SUFFIX_FROM_OLD flags are + * set, the prefix and/or suffix come next, as one or two uint16s. + * + * After that, xl_heap_header_len and new tuple data follow. The new + * tuple data and length don't include the prefix and suffix, which are + * copied from the old tuple on replay. The new tuple data is omitted if + * a full-page image of the page was taken (unless the + * XLOG_HEAP_CONTAINS_NEW_TUPLE flag is set, in which case it's included + * anyway). + * + * If XLOG_HEAP_CONTAINS_OLD_TUPLE or XLOG_HEAP_CONTAINS_OLD_KEY flags are + * set, another xl_heap_header_len struct and tuple data for the old tuple + * follows. + */ } xl_heap_update; #define SizeOfHeapUpdate (offsetof(xl_heap_update, flags) + sizeof(uint8)) diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index a238292b76..3509228466 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -279,6 +279,7 @@ typedef struct CheckpointStatsData extern CheckpointStatsData CheckpointStats; extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); +extern bool XLogCheckBufferNeedsBackup(Buffer buffer); extern void XLogFlush(XLogRecPtr RecPtr); extern bool XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr);