mirror of https://github.com/postgres/postgres
Introduce a new smgr bulk loading facility.
The new facility makes it easier to optimize bulk loading, as the logic for buffering, WAL-logging, and syncing the relation only needs to be implemented once. It's also less error-prone: We have had a number of bugs in how a relation is fsync'd - or not - at the end of a bulk loading operation. By centralizing that logic to one place, we only need to write it correctly once. The new facility is faster for small relations: Instead of of calling smgrimmedsync(), we register the fsync to happen at next checkpoint, which avoids the fsync latency. That can make a big difference if you are e.g. restoring a schema-only dump with lots of relations. It is also slightly more efficient with large relations, as the WAL logging is performed multiple pages at a time. That avoids some WAL header overhead. The sorted GiST index build did that already, this moves the buffering to the new facility. The changes to pageinspect GiST test needs an explanation: Before this patch, the sorted GiST index build set the LSN on every page to the special GistBuildLSN value, not the LSN of the WAL record, even though they were WAL-logged. There was no particular need for it, it just happened naturally when we wrote out the pages before WAL-logging them. Now we WAL-log the pages first, like in B-tree build, so the pages are stamped with the record's real LSN. When the build is not WAL-logged, we still use GistBuildLSN. To make the test output predictable, use an unlogged index. Reviewed-by: Andres Freund Discussion: https://www.postgresql.org/message-id/30e8f366-58b3-b239-c521-422122dd5150%40iki.fi
This commit is contained in:
parent
e612384fc7
commit
8af2565248
|
@ -1,13 +1,6 @@
|
||||||
-- The gist_page_opaque_info() function prints the page's LSN. Normally,
|
-- The gist_page_opaque_info() function prints the page's LSN.
|
||||||
-- that's constant 1 (GistBuildLSN) on every page of a freshly built GiST
|
-- Use an unlogged index, so that the LSN is predictable.
|
||||||
-- index. But with wal_level=minimal, the whole relation is dumped to WAL at
|
CREATE UNLOGGED TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
|
||||||
-- the end of the transaction if it's smaller than wal_skip_threshold, which
|
|
||||||
-- updates the LSNs. Wrap the tests on gist_page_opaque_info() in the
|
|
||||||
-- same transaction with the CREATE INDEX so that we see the LSNs before
|
|
||||||
-- they are possibly overwritten at end of transaction.
|
|
||||||
BEGIN;
|
|
||||||
-- Create a test table and GiST index.
|
|
||||||
CREATE TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
|
|
||||||
generate_series(1,1000) i;
|
generate_series(1,1000) i;
|
||||||
CREATE INDEX test_gist_idx ON test_gist USING gist (p);
|
CREATE INDEX test_gist_idx ON test_gist USING gist (p);
|
||||||
-- Page 0 is the root, the rest are leaf pages
|
-- Page 0 is the root, the rest are leaf pages
|
||||||
|
@ -29,7 +22,6 @@ SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 2));
|
||||||
0/1 | 0/0 | 1 | {leaf}
|
0/1 | 0/0 | 1 | {leaf}
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
COMMIT;
|
|
||||||
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
|
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
|
||||||
itemoffset | ctid | itemlen | dead | keys
|
itemoffset | ctid | itemlen | dead | keys
|
||||||
------------+-----------+---------+------+-------------------------------
|
------------+-----------+---------+------+-------------------------------
|
||||||
|
|
|
@ -1,14 +1,6 @@
|
||||||
-- The gist_page_opaque_info() function prints the page's LSN. Normally,
|
-- The gist_page_opaque_info() function prints the page's LSN.
|
||||||
-- that's constant 1 (GistBuildLSN) on every page of a freshly built GiST
|
-- Use an unlogged index, so that the LSN is predictable.
|
||||||
-- index. But with wal_level=minimal, the whole relation is dumped to WAL at
|
CREATE UNLOGGED TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
|
||||||
-- the end of the transaction if it's smaller than wal_skip_threshold, which
|
|
||||||
-- updates the LSNs. Wrap the tests on gist_page_opaque_info() in the
|
|
||||||
-- same transaction with the CREATE INDEX so that we see the LSNs before
|
|
||||||
-- they are possibly overwritten at end of transaction.
|
|
||||||
BEGIN;
|
|
||||||
|
|
||||||
-- Create a test table and GiST index.
|
|
||||||
CREATE TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
|
|
||||||
generate_series(1,1000) i;
|
generate_series(1,1000) i;
|
||||||
CREATE INDEX test_gist_idx ON test_gist USING gist (p);
|
CREATE INDEX test_gist_idx ON test_gist USING gist (p);
|
||||||
|
|
||||||
|
@ -17,8 +9,6 @@ SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 0));
|
||||||
SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 1));
|
SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 1));
|
||||||
SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 2));
|
SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 2));
|
||||||
|
|
||||||
COMMIT;
|
|
||||||
|
|
||||||
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
|
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
|
||||||
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 1), 'test_gist_idx') LIMIT 5;
|
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 1), 'test_gist_idx') LIMIT 5;
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,8 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "optimizer/optimizer.h"
|
#include "optimizer/optimizer.h"
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
#include "storage/smgr.h"
|
#include "storage/bulk_write.h"
|
||||||
|
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/tuplesort.h"
|
#include "utils/tuplesort.h"
|
||||||
|
@ -106,11 +107,8 @@ typedef struct
|
||||||
Tuplesortstate *sortstate; /* state data for tuplesort.c */
|
Tuplesortstate *sortstate; /* state data for tuplesort.c */
|
||||||
|
|
||||||
BlockNumber pages_allocated;
|
BlockNumber pages_allocated;
|
||||||
BlockNumber pages_written;
|
|
||||||
|
|
||||||
int ready_num_pages;
|
BulkWriteState *bulkstate;
|
||||||
BlockNumber ready_blknos[XLR_MAX_BLOCK_ID];
|
|
||||||
Page ready_pages[XLR_MAX_BLOCK_ID];
|
|
||||||
} GISTBuildState;
|
} GISTBuildState;
|
||||||
|
|
||||||
#define GIST_SORTED_BUILD_PAGE_NUM 4
|
#define GIST_SORTED_BUILD_PAGE_NUM 4
|
||||||
|
@ -142,7 +140,6 @@ static void gist_indexsortbuild_levelstate_add(GISTBuildState *state,
|
||||||
IndexTuple itup);
|
IndexTuple itup);
|
||||||
static void gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
static void gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
||||||
GistSortedBuildLevelState *levelstate);
|
GistSortedBuildLevelState *levelstate);
|
||||||
static void gist_indexsortbuild_flush_ready_pages(GISTBuildState *state);
|
|
||||||
|
|
||||||
static void gistInitBuffering(GISTBuildState *buildstate);
|
static void gistInitBuffering(GISTBuildState *buildstate);
|
||||||
static int calculatePagesPerBuffer(GISTBuildState *buildstate, int levelStep);
|
static int calculatePagesPerBuffer(GISTBuildState *buildstate, int levelStep);
|
||||||
|
@ -405,27 +402,18 @@ gist_indexsortbuild(GISTBuildState *state)
|
||||||
{
|
{
|
||||||
IndexTuple itup;
|
IndexTuple itup;
|
||||||
GistSortedBuildLevelState *levelstate;
|
GistSortedBuildLevelState *levelstate;
|
||||||
Page page;
|
BulkWriteBuffer rootbuf;
|
||||||
|
|
||||||
state->pages_allocated = 0;
|
/* Reserve block 0 for the root page */
|
||||||
state->pages_written = 0;
|
state->pages_allocated = 1;
|
||||||
state->ready_num_pages = 0;
|
|
||||||
|
|
||||||
/*
|
state->bulkstate = smgr_bulk_start_rel(state->indexrel, MAIN_FORKNUM);
|
||||||
* Write an empty page as a placeholder for the root page. It will be
|
|
||||||
* replaced with the real root page at the end.
|
|
||||||
*/
|
|
||||||
page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
|
|
||||||
smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
|
|
||||||
page, true);
|
|
||||||
state->pages_allocated++;
|
|
||||||
state->pages_written++;
|
|
||||||
|
|
||||||
/* Allocate a temporary buffer for the first leaf page batch. */
|
/* Allocate a temporary buffer for the first leaf page batch. */
|
||||||
levelstate = palloc0(sizeof(GistSortedBuildLevelState));
|
levelstate = palloc0(sizeof(GistSortedBuildLevelState));
|
||||||
levelstate->pages[0] = page;
|
levelstate->pages[0] = palloc(BLCKSZ);
|
||||||
levelstate->parent = NULL;
|
levelstate->parent = NULL;
|
||||||
gistinitpage(page, F_LEAF);
|
gistinitpage(levelstate->pages[0], F_LEAF);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Fill index pages with tuples in the sorted order.
|
* Fill index pages with tuples in the sorted order.
|
||||||
|
@ -455,31 +443,15 @@ gist_indexsortbuild(GISTBuildState *state)
|
||||||
levelstate = parent;
|
levelstate = parent;
|
||||||
}
|
}
|
||||||
|
|
||||||
gist_indexsortbuild_flush_ready_pages(state);
|
|
||||||
|
|
||||||
/* Write out the root */
|
/* Write out the root */
|
||||||
PageSetLSN(levelstate->pages[0], GistBuildLSN);
|
PageSetLSN(levelstate->pages[0], GistBuildLSN);
|
||||||
PageSetChecksumInplace(levelstate->pages[0], GIST_ROOT_BLKNO);
|
rootbuf = smgr_bulk_get_buf(state->bulkstate);
|
||||||
smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
|
memcpy(rootbuf, levelstate->pages[0], BLCKSZ);
|
||||||
levelstate->pages[0], true);
|
smgr_bulk_write(state->bulkstate, GIST_ROOT_BLKNO, rootbuf, true);
|
||||||
if (RelationNeedsWAL(state->indexrel))
|
|
||||||
log_newpage(&state->indexrel->rd_locator, MAIN_FORKNUM, GIST_ROOT_BLKNO,
|
|
||||||
levelstate->pages[0], true);
|
|
||||||
|
|
||||||
pfree(levelstate->pages[0]);
|
|
||||||
pfree(levelstate);
|
pfree(levelstate);
|
||||||
|
|
||||||
/*
|
smgr_bulk_finish(state->bulkstate);
|
||||||
* When we WAL-logged index pages, we must nonetheless fsync index files.
|
|
||||||
* Since we're building outside shared buffers, a CHECKPOINT occurring
|
|
||||||
* during the build has no way to flush the previously written data to
|
|
||||||
* disk (indeed it won't know the index even exists). A crash later on
|
|
||||||
* would replay WAL from the checkpoint, therefore it wouldn't replay our
|
|
||||||
* earlier WAL entries. If we do not fsync those pages here, they might
|
|
||||||
* still not be on disk when the crash occurs.
|
|
||||||
*/
|
|
||||||
if (RelationNeedsWAL(state->indexrel))
|
|
||||||
smgrimmedsync(RelationGetSmgr(state->indexrel), MAIN_FORKNUM);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -509,8 +481,7 @@ gist_indexsortbuild_levelstate_add(GISTBuildState *state,
|
||||||
levelstate->current_page++;
|
levelstate->current_page++;
|
||||||
|
|
||||||
if (levelstate->pages[levelstate->current_page] == NULL)
|
if (levelstate->pages[levelstate->current_page] == NULL)
|
||||||
levelstate->pages[levelstate->current_page] =
|
levelstate->pages[levelstate->current_page] = palloc0(BLCKSZ);
|
||||||
palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
|
|
||||||
|
|
||||||
newPage = levelstate->pages[levelstate->current_page];
|
newPage = levelstate->pages[levelstate->current_page];
|
||||||
gistinitpage(newPage, old_page_flags);
|
gistinitpage(newPage, old_page_flags);
|
||||||
|
@ -573,6 +544,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
||||||
for (; dist != NULL; dist = dist->next)
|
for (; dist != NULL; dist = dist->next)
|
||||||
{
|
{
|
||||||
char *data;
|
char *data;
|
||||||
|
BulkWriteBuffer buf;
|
||||||
Page target;
|
Page target;
|
||||||
|
|
||||||
/* check once per page */
|
/* check once per page */
|
||||||
|
@ -580,7 +552,8 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
||||||
|
|
||||||
/* Create page and copy data */
|
/* Create page and copy data */
|
||||||
data = (char *) (dist->list);
|
data = (char *) (dist->list);
|
||||||
target = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
|
buf = smgr_bulk_get_buf(state->bulkstate);
|
||||||
|
target = (Page) buf;
|
||||||
gistinitpage(target, isleaf ? F_LEAF : 0);
|
gistinitpage(target, isleaf ? F_LEAF : 0);
|
||||||
for (int i = 0; i < dist->block.num; i++)
|
for (int i = 0; i < dist->block.num; i++)
|
||||||
{
|
{
|
||||||
|
@ -593,20 +566,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
||||||
}
|
}
|
||||||
union_tuple = dist->itup;
|
union_tuple = dist->itup;
|
||||||
|
|
||||||
if (state->ready_num_pages == XLR_MAX_BLOCK_ID)
|
|
||||||
gist_indexsortbuild_flush_ready_pages(state);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* The page is now complete. Assign a block number to it, and add it
|
|
||||||
* to the list of finished pages. (We don't write it out immediately,
|
|
||||||
* because we want to WAL-log the pages in batches.)
|
|
||||||
*/
|
|
||||||
blkno = state->pages_allocated++;
|
|
||||||
state->ready_blknos[state->ready_num_pages] = blkno;
|
|
||||||
state->ready_pages[state->ready_num_pages] = target;
|
|
||||||
state->ready_num_pages++;
|
|
||||||
ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set the right link to point to the previous page. This is just for
|
* Set the right link to point to the previous page. This is just for
|
||||||
* debugging purposes: GiST only follows the right link if a page is
|
* debugging purposes: GiST only follows the right link if a page is
|
||||||
|
@ -621,6 +580,15 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
||||||
*/
|
*/
|
||||||
if (levelstate->last_blkno)
|
if (levelstate->last_blkno)
|
||||||
GistPageGetOpaque(target)->rightlink = levelstate->last_blkno;
|
GistPageGetOpaque(target)->rightlink = levelstate->last_blkno;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The page is now complete. Assign a block number to it, and pass it
|
||||||
|
* to the bulk writer.
|
||||||
|
*/
|
||||||
|
blkno = state->pages_allocated++;
|
||||||
|
PageSetLSN(target, GistBuildLSN);
|
||||||
|
smgr_bulk_write(state->bulkstate, blkno, buf, true);
|
||||||
|
ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
|
||||||
levelstate->last_blkno = blkno;
|
levelstate->last_blkno = blkno;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -631,7 +599,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
||||||
if (parent == NULL)
|
if (parent == NULL)
|
||||||
{
|
{
|
||||||
parent = palloc0(sizeof(GistSortedBuildLevelState));
|
parent = palloc0(sizeof(GistSortedBuildLevelState));
|
||||||
parent->pages[0] = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
|
parent->pages[0] = palloc(BLCKSZ);
|
||||||
parent->parent = NULL;
|
parent->parent = NULL;
|
||||||
gistinitpage(parent->pages[0], 0);
|
gistinitpage(parent->pages[0], 0);
|
||||||
|
|
||||||
|
@ -641,39 +609,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
|
||||||
gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
|
|
||||||
{
|
|
||||||
if (state->ready_num_pages == 0)
|
|
||||||
return;
|
|
||||||
|
|
||||||
for (int i = 0; i < state->ready_num_pages; i++)
|
|
||||||
{
|
|
||||||
Page page = state->ready_pages[i];
|
|
||||||
BlockNumber blkno = state->ready_blknos[i];
|
|
||||||
|
|
||||||
/* Currently, the blocks must be buffered in order. */
|
|
||||||
if (blkno != state->pages_written)
|
|
||||||
elog(ERROR, "unexpected block number to flush GiST sorting build");
|
|
||||||
|
|
||||||
PageSetLSN(page, GistBuildLSN);
|
|
||||||
PageSetChecksumInplace(page, blkno);
|
|
||||||
smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
|
|
||||||
true);
|
|
||||||
|
|
||||||
state->pages_written++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (RelationNeedsWAL(state->indexrel))
|
|
||||||
log_newpages(&state->indexrel->rd_locator, MAIN_FORKNUM, state->ready_num_pages,
|
|
||||||
state->ready_blknos, state->ready_pages, true);
|
|
||||||
|
|
||||||
for (int i = 0; i < state->ready_num_pages; i++)
|
|
||||||
pfree(state->ready_pages[i]);
|
|
||||||
|
|
||||||
state->ready_num_pages = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*-------------------------------------------------------------------------
|
/*-------------------------------------------------------------------------
|
||||||
* Routines for non-sorted build
|
* Routines for non-sorted build
|
||||||
|
|
|
@ -87,8 +87,8 @@
|
||||||
* is optimized for bulk inserting a lot of tuples, knowing that we have
|
* is optimized for bulk inserting a lot of tuples, knowing that we have
|
||||||
* exclusive access to the heap. raw_heap_insert builds new pages in
|
* exclusive access to the heap. raw_heap_insert builds new pages in
|
||||||
* local storage. When a page is full, or at the end of the process,
|
* local storage. When a page is full, or at the end of the process,
|
||||||
* we insert it to WAL as a single record and then write it to disk
|
* we insert it to WAL as a single record and then write it to disk with
|
||||||
* directly through smgr. Note, however, that any data sent to the new
|
* the bulk smgr writer. Note, however, that any data sent to the new
|
||||||
* heap's TOAST table will go through the normal bufmgr.
|
* heap's TOAST table will go through the normal bufmgr.
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
|
@ -119,9 +119,9 @@
|
||||||
#include "replication/logical.h"
|
#include "replication/logical.h"
|
||||||
#include "replication/slot.h"
|
#include "replication/slot.h"
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
|
#include "storage/bulk_write.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "storage/procarray.h"
|
#include "storage/procarray.h"
|
||||||
#include "storage/smgr.h"
|
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
@ -133,9 +133,9 @@ typedef struct RewriteStateData
|
||||||
{
|
{
|
||||||
Relation rs_old_rel; /* source heap */
|
Relation rs_old_rel; /* source heap */
|
||||||
Relation rs_new_rel; /* destination heap */
|
Relation rs_new_rel; /* destination heap */
|
||||||
Page rs_buffer; /* page currently being built */
|
BulkWriteState *rs_bulkstate; /* writer for the destination */
|
||||||
|
BulkWriteBuffer rs_buffer; /* page currently being built */
|
||||||
BlockNumber rs_blockno; /* block where page will go */
|
BlockNumber rs_blockno; /* block where page will go */
|
||||||
bool rs_buffer_valid; /* T if any tuples in buffer */
|
|
||||||
bool rs_logical_rewrite; /* do we need to do logical rewriting */
|
bool rs_logical_rewrite; /* do we need to do logical rewriting */
|
||||||
TransactionId rs_oldest_xmin; /* oldest xmin used by caller to determine
|
TransactionId rs_oldest_xmin; /* oldest xmin used by caller to determine
|
||||||
* tuple visibility */
|
* tuple visibility */
|
||||||
|
@ -255,14 +255,14 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
|
||||||
|
|
||||||
state->rs_old_rel = old_heap;
|
state->rs_old_rel = old_heap;
|
||||||
state->rs_new_rel = new_heap;
|
state->rs_new_rel = new_heap;
|
||||||
state->rs_buffer = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
|
state->rs_buffer = NULL;
|
||||||
/* new_heap needn't be empty, just locked */
|
/* new_heap needn't be empty, just locked */
|
||||||
state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
|
state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
|
||||||
state->rs_buffer_valid = false;
|
|
||||||
state->rs_oldest_xmin = oldest_xmin;
|
state->rs_oldest_xmin = oldest_xmin;
|
||||||
state->rs_freeze_xid = freeze_xid;
|
state->rs_freeze_xid = freeze_xid;
|
||||||
state->rs_cutoff_multi = cutoff_multi;
|
state->rs_cutoff_multi = cutoff_multi;
|
||||||
state->rs_cxt = rw_cxt;
|
state->rs_cxt = rw_cxt;
|
||||||
|
state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
|
||||||
|
|
||||||
/* Initialize hash tables used to track update chains */
|
/* Initialize hash tables used to track update chains */
|
||||||
hash_ctl.keysize = sizeof(TidHashKey);
|
hash_ctl.keysize = sizeof(TidHashKey);
|
||||||
|
@ -314,30 +314,13 @@ end_heap_rewrite(RewriteState state)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Write the last page, if any */
|
/* Write the last page, if any */
|
||||||
if (state->rs_buffer_valid)
|
if (state->rs_buffer)
|
||||||
{
|
{
|
||||||
if (RelationNeedsWAL(state->rs_new_rel))
|
smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
|
||||||
log_newpage(&state->rs_new_rel->rd_locator,
|
state->rs_buffer = NULL;
|
||||||
MAIN_FORKNUM,
|
|
||||||
state->rs_blockno,
|
|
||||||
state->rs_buffer,
|
|
||||||
true);
|
|
||||||
|
|
||||||
PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
|
|
||||||
|
|
||||||
smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
|
|
||||||
state->rs_blockno, state->rs_buffer, true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
smgr_bulk_finish(state->rs_bulkstate);
|
||||||
* When we WAL-logged rel pages, we must nonetheless fsync them. The
|
|
||||||
* reason is the same as in storage.c's RelationCopyStorage(): we're
|
|
||||||
* writing data that's not in shared buffers, and so a CHECKPOINT
|
|
||||||
* occurring during the rewriteheap operation won't have fsync'd data we
|
|
||||||
* wrote before the checkpoint.
|
|
||||||
*/
|
|
||||||
if (RelationNeedsWAL(state->rs_new_rel))
|
|
||||||
smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
|
|
||||||
|
|
||||||
logical_end_heap_rewrite(state);
|
logical_end_heap_rewrite(state);
|
||||||
|
|
||||||
|
@ -611,7 +594,7 @@ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
|
||||||
static void
|
static void
|
||||||
raw_heap_insert(RewriteState state, HeapTuple tup)
|
raw_heap_insert(RewriteState state, HeapTuple tup)
|
||||||
{
|
{
|
||||||
Page page = state->rs_buffer;
|
Page page;
|
||||||
Size pageFreeSpace,
|
Size pageFreeSpace,
|
||||||
saveFreeSpace;
|
saveFreeSpace;
|
||||||
Size len;
|
Size len;
|
||||||
|
@ -664,7 +647,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
|
||||||
HEAP_DEFAULT_FILLFACTOR);
|
HEAP_DEFAULT_FILLFACTOR);
|
||||||
|
|
||||||
/* Now we can check to see if there's enough free space already. */
|
/* Now we can check to see if there's enough free space already. */
|
||||||
if (state->rs_buffer_valid)
|
page = (Page) state->rs_buffer;
|
||||||
|
if (page)
|
||||||
{
|
{
|
||||||
pageFreeSpace = PageGetHeapFreeSpace(page);
|
pageFreeSpace = PageGetHeapFreeSpace(page);
|
||||||
|
|
||||||
|
@ -675,35 +659,19 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
|
||||||
* contains a tuple. Hence, unlike RelationGetBufferForTuple(),
|
* contains a tuple. Hence, unlike RelationGetBufferForTuple(),
|
||||||
* enforce saveFreeSpace unconditionally.
|
* enforce saveFreeSpace unconditionally.
|
||||||
*/
|
*/
|
||||||
|
smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
|
||||||
/* XLOG stuff */
|
state->rs_buffer = NULL;
|
||||||
if (RelationNeedsWAL(state->rs_new_rel))
|
page = NULL;
|
||||||
log_newpage(&state->rs_new_rel->rd_locator,
|
|
||||||
MAIN_FORKNUM,
|
|
||||||
state->rs_blockno,
|
|
||||||
page,
|
|
||||||
true);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Now write the page. We say skipFsync = true because there's no
|
|
||||||
* need for smgr to schedule an fsync for this write; we'll do it
|
|
||||||
* ourselves in end_heap_rewrite.
|
|
||||||
*/
|
|
||||||
PageSetChecksumInplace(page, state->rs_blockno);
|
|
||||||
|
|
||||||
smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
|
|
||||||
state->rs_blockno, page, true);
|
|
||||||
|
|
||||||
state->rs_blockno++;
|
state->rs_blockno++;
|
||||||
state->rs_buffer_valid = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!state->rs_buffer_valid)
|
if (!page)
|
||||||
{
|
{
|
||||||
/* Initialize a new empty page */
|
/* Initialize a new empty page */
|
||||||
|
state->rs_buffer = smgr_bulk_get_buf(state->rs_bulkstate);
|
||||||
|
page = (Page) state->rs_buffer;
|
||||||
PageInit(page, BLCKSZ, 0);
|
PageInit(page, BLCKSZ, 0);
|
||||||
state->rs_buffer_valid = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* And now we can insert the tuple into the page */
|
/* And now we can insert the tuple into the page */
|
||||||
|
|
|
@ -29,11 +29,11 @@
|
||||||
#include "nodes/execnodes.h"
|
#include "nodes/execnodes.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
#include "postmaster/autovacuum.h"
|
#include "postmaster/autovacuum.h"
|
||||||
|
#include "storage/bulk_write.h"
|
||||||
#include "storage/condition_variable.h"
|
#include "storage/condition_variable.h"
|
||||||
#include "storage/indexfsm.h"
|
#include "storage/indexfsm.h"
|
||||||
#include "storage/ipc.h"
|
#include "storage/ipc.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "storage/smgr.h"
|
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/index_selfuncs.h"
|
#include "utils/index_selfuncs.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
@ -154,32 +154,17 @@ void
|
||||||
btbuildempty(Relation index)
|
btbuildempty(Relation index)
|
||||||
{
|
{
|
||||||
bool allequalimage = _bt_allequalimage(index, false);
|
bool allequalimage = _bt_allequalimage(index, false);
|
||||||
Buffer metabuf;
|
BulkWriteState *bulkstate;
|
||||||
Page metapage;
|
BulkWriteBuffer metabuf;
|
||||||
|
|
||||||
/*
|
bulkstate = smgr_bulk_start_rel(index, INIT_FORKNUM);
|
||||||
* Initialize the metapage.
|
|
||||||
*
|
|
||||||
* Regular index build bypasses the buffer manager and uses smgr functions
|
|
||||||
* directly, with an smgrimmedsync() call at the end. That makes sense
|
|
||||||
* when the index is large, but for an empty index, it's better to use the
|
|
||||||
* buffer cache to avoid the smgrimmedsync().
|
|
||||||
*/
|
|
||||||
metabuf = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
|
|
||||||
Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
|
|
||||||
_bt_lockbuf(index, metabuf, BT_WRITE);
|
|
||||||
|
|
||||||
START_CRIT_SECTION();
|
/* Construct metapage. */
|
||||||
|
metabuf = smgr_bulk_get_buf(bulkstate);
|
||||||
|
_bt_initmetapage((Page) metabuf, P_NONE, 0, allequalimage);
|
||||||
|
smgr_bulk_write(bulkstate, BTREE_METAPAGE, metabuf, true);
|
||||||
|
|
||||||
metapage = BufferGetPage(metabuf);
|
smgr_bulk_finish(bulkstate);
|
||||||
_bt_initmetapage(metapage, P_NONE, 0, allequalimage);
|
|
||||||
MarkBufferDirty(metabuf);
|
|
||||||
log_newpage_buffer(metabuf, true);
|
|
||||||
|
|
||||||
END_CRIT_SECTION();
|
|
||||||
|
|
||||||
_bt_unlockbuf(index, metabuf);
|
|
||||||
ReleaseBuffer(metabuf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -23,13 +23,8 @@
|
||||||
* many upper pages if the keys are reasonable-size) without risking a lot of
|
* many upper pages if the keys are reasonable-size) without risking a lot of
|
||||||
* cascading splits during early insertions.
|
* cascading splits during early insertions.
|
||||||
*
|
*
|
||||||
* Formerly the index pages being built were kept in shared buffers, but
|
* We use the bulk smgr loading facility to bypass the buffer cache and
|
||||||
* that is of no value (since other backends have no interest in them yet)
|
* WAL-log the pages efficiently.
|
||||||
* and it created locking problems for CHECKPOINT, because the upper-level
|
|
||||||
* pages were held exclusive-locked for long periods. Now we just build
|
|
||||||
* the pages in local memory and smgrwrite or smgrextend them as we finish
|
|
||||||
* them. They will need to be re-read into shared buffers on first use after
|
|
||||||
* the build finishes.
|
|
||||||
*
|
*
|
||||||
* This code isn't concerned about the FSM at all. The caller is responsible
|
* This code isn't concerned about the FSM at all. The caller is responsible
|
||||||
* for initializing that.
|
* for initializing that.
|
||||||
|
@ -57,7 +52,7 @@
|
||||||
#include "executor/instrument.h"
|
#include "executor/instrument.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
#include "storage/smgr.h"
|
#include "storage/bulk_write.h"
|
||||||
#include "tcop/tcopprot.h" /* pgrminclude ignore */
|
#include "tcop/tcopprot.h" /* pgrminclude ignore */
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/sortsupport.h"
|
#include "utils/sortsupport.h"
|
||||||
|
@ -234,7 +229,7 @@ typedef struct BTBuildState
|
||||||
*/
|
*/
|
||||||
typedef struct BTPageState
|
typedef struct BTPageState
|
||||||
{
|
{
|
||||||
Page btps_page; /* workspace for page building */
|
BulkWriteBuffer btps_buf; /* workspace for page building */
|
||||||
BlockNumber btps_blkno; /* block # to write this page at */
|
BlockNumber btps_blkno; /* block # to write this page at */
|
||||||
IndexTuple btps_lowkey; /* page's strict lower bound pivot tuple */
|
IndexTuple btps_lowkey; /* page's strict lower bound pivot tuple */
|
||||||
OffsetNumber btps_lastoff; /* last item offset loaded */
|
OffsetNumber btps_lastoff; /* last item offset loaded */
|
||||||
|
@ -251,11 +246,9 @@ typedef struct BTWriteState
|
||||||
{
|
{
|
||||||
Relation heap;
|
Relation heap;
|
||||||
Relation index;
|
Relation index;
|
||||||
|
BulkWriteState *bulkstate;
|
||||||
BTScanInsert inskey; /* generic insertion scankey */
|
BTScanInsert inskey; /* generic insertion scankey */
|
||||||
bool btws_use_wal; /* dump pages to WAL? */
|
|
||||||
BlockNumber btws_pages_alloced; /* # pages allocated */
|
BlockNumber btws_pages_alloced; /* # pages allocated */
|
||||||
BlockNumber btws_pages_written; /* # pages written out */
|
|
||||||
Page btws_zeropage; /* workspace for filling zeroes */
|
|
||||||
} BTWriteState;
|
} BTWriteState;
|
||||||
|
|
||||||
|
|
||||||
|
@ -267,7 +260,7 @@ static void _bt_spool(BTSpool *btspool, ItemPointer self,
|
||||||
static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2);
|
static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2);
|
||||||
static void _bt_build_callback(Relation index, ItemPointer tid, Datum *values,
|
static void _bt_build_callback(Relation index, ItemPointer tid, Datum *values,
|
||||||
bool *isnull, bool tupleIsAlive, void *state);
|
bool *isnull, bool tupleIsAlive, void *state);
|
||||||
static Page _bt_blnewpage(uint32 level);
|
static BulkWriteBuffer _bt_blnewpage(BTWriteState *wstate, uint32 level);
|
||||||
static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
|
static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
|
||||||
static void _bt_slideleft(Page rightmostpage);
|
static void _bt_slideleft(Page rightmostpage);
|
||||||
static void _bt_sortaddtup(Page page, Size itemsize,
|
static void _bt_sortaddtup(Page page, Size itemsize,
|
||||||
|
@ -569,12 +562,9 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
|
||||||
wstate.inskey = _bt_mkscankey(wstate.index, NULL);
|
wstate.inskey = _bt_mkscankey(wstate.index, NULL);
|
||||||
/* _bt_mkscankey() won't set allequalimage without metapage */
|
/* _bt_mkscankey() won't set allequalimage without metapage */
|
||||||
wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
|
wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
|
||||||
wstate.btws_use_wal = RelationNeedsWAL(wstate.index);
|
|
||||||
|
|
||||||
/* reserve the metapage */
|
/* reserve the metapage */
|
||||||
wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
|
wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
|
||||||
wstate.btws_pages_written = 0;
|
|
||||||
wstate.btws_zeropage = NULL; /* until needed */
|
|
||||||
|
|
||||||
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
|
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
|
||||||
PROGRESS_BTREE_PHASE_LEAF_LOAD);
|
PROGRESS_BTREE_PHASE_LEAF_LOAD);
|
||||||
|
@ -613,13 +603,15 @@ _bt_build_callback(Relation index,
|
||||||
/*
|
/*
|
||||||
* allocate workspace for a new, clean btree page, not linked to any siblings.
|
* allocate workspace for a new, clean btree page, not linked to any siblings.
|
||||||
*/
|
*/
|
||||||
static Page
|
static BulkWriteBuffer
|
||||||
_bt_blnewpage(uint32 level)
|
_bt_blnewpage(BTWriteState *wstate, uint32 level)
|
||||||
{
|
{
|
||||||
|
BulkWriteBuffer buf;
|
||||||
Page page;
|
Page page;
|
||||||
BTPageOpaque opaque;
|
BTPageOpaque opaque;
|
||||||
|
|
||||||
page = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
|
buf = smgr_bulk_get_buf(wstate->bulkstate);
|
||||||
|
page = (Page) buf;
|
||||||
|
|
||||||
/* Zero the page and set up standard page header info */
|
/* Zero the page and set up standard page header info */
|
||||||
_bt_pageinit(page, BLCKSZ);
|
_bt_pageinit(page, BLCKSZ);
|
||||||
|
@ -634,63 +626,17 @@ _bt_blnewpage(uint32 level)
|
||||||
/* Make the P_HIKEY line pointer appear allocated */
|
/* Make the P_HIKEY line pointer appear allocated */
|
||||||
((PageHeader) page)->pd_lower += sizeof(ItemIdData);
|
((PageHeader) page)->pd_lower += sizeof(ItemIdData);
|
||||||
|
|
||||||
return page;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* emit a completed btree page, and release the working storage.
|
* emit a completed btree page, and release the working storage.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
|
_bt_blwritepage(BTWriteState *wstate, BulkWriteBuffer buf, BlockNumber blkno)
|
||||||
{
|
{
|
||||||
/* XLOG stuff */
|
smgr_bulk_write(wstate->bulkstate, blkno, buf, true);
|
||||||
if (wstate->btws_use_wal)
|
/* smgr_bulk_write took ownership of 'buf' */
|
||||||
{
|
|
||||||
/* We use the XLOG_FPI record type for this */
|
|
||||||
log_newpage(&wstate->index->rd_locator, MAIN_FORKNUM, blkno, page, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If we have to write pages nonsequentially, fill in the space with
|
|
||||||
* zeroes until we come back and overwrite. This is not logically
|
|
||||||
* necessary on standard Unix filesystems (unwritten space will read as
|
|
||||||
* zeroes anyway), but it should help to avoid fragmentation. The dummy
|
|
||||||
* pages aren't WAL-logged though.
|
|
||||||
*/
|
|
||||||
while (blkno > wstate->btws_pages_written)
|
|
||||||
{
|
|
||||||
if (!wstate->btws_zeropage)
|
|
||||||
wstate->btws_zeropage = (Page) palloc_aligned(BLCKSZ,
|
|
||||||
PG_IO_ALIGN_SIZE,
|
|
||||||
MCXT_ALLOC_ZERO);
|
|
||||||
/* don't set checksum for all-zero page */
|
|
||||||
smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
|
|
||||||
wstate->btws_pages_written++,
|
|
||||||
wstate->btws_zeropage,
|
|
||||||
true);
|
|
||||||
}
|
|
||||||
|
|
||||||
PageSetChecksumInplace(page, blkno);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Now write the page. There's no need for smgr to schedule an fsync for
|
|
||||||
* this write; we'll do it ourselves before ending the build.
|
|
||||||
*/
|
|
||||||
if (blkno == wstate->btws_pages_written)
|
|
||||||
{
|
|
||||||
/* extending the file... */
|
|
||||||
smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
|
|
||||||
page, true);
|
|
||||||
wstate->btws_pages_written++;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* overwriting a block we zero-filled before */
|
|
||||||
smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
|
|
||||||
page, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
pfree(page);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -703,7 +649,7 @@ _bt_pagestate(BTWriteState *wstate, uint32 level)
|
||||||
BTPageState *state = (BTPageState *) palloc0(sizeof(BTPageState));
|
BTPageState *state = (BTPageState *) palloc0(sizeof(BTPageState));
|
||||||
|
|
||||||
/* create initial page for level */
|
/* create initial page for level */
|
||||||
state->btps_page = _bt_blnewpage(level);
|
state->btps_buf = _bt_blnewpage(wstate, level);
|
||||||
|
|
||||||
/* and assign it a page position */
|
/* and assign it a page position */
|
||||||
state->btps_blkno = wstate->btws_pages_alloced++;
|
state->btps_blkno = wstate->btws_pages_alloced++;
|
||||||
|
@ -839,6 +785,7 @@ static void
|
||||||
_bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
|
_bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
|
||||||
Size truncextra)
|
Size truncextra)
|
||||||
{
|
{
|
||||||
|
BulkWriteBuffer nbuf;
|
||||||
Page npage;
|
Page npage;
|
||||||
BlockNumber nblkno;
|
BlockNumber nblkno;
|
||||||
OffsetNumber last_off;
|
OffsetNumber last_off;
|
||||||
|
@ -853,7 +800,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
|
||||||
*/
|
*/
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
npage = state->btps_page;
|
nbuf = state->btps_buf;
|
||||||
|
npage = (Page) nbuf;
|
||||||
nblkno = state->btps_blkno;
|
nblkno = state->btps_blkno;
|
||||||
last_off = state->btps_lastoff;
|
last_off = state->btps_lastoff;
|
||||||
last_truncextra = state->btps_lastextra;
|
last_truncextra = state->btps_lastextra;
|
||||||
|
@ -909,6 +857,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
|
||||||
/*
|
/*
|
||||||
* Finish off the page and write it out.
|
* Finish off the page and write it out.
|
||||||
*/
|
*/
|
||||||
|
BulkWriteBuffer obuf = nbuf;
|
||||||
Page opage = npage;
|
Page opage = npage;
|
||||||
BlockNumber oblkno = nblkno;
|
BlockNumber oblkno = nblkno;
|
||||||
ItemId ii;
|
ItemId ii;
|
||||||
|
@ -916,7 +865,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
|
||||||
IndexTuple oitup;
|
IndexTuple oitup;
|
||||||
|
|
||||||
/* Create new page of same level */
|
/* Create new page of same level */
|
||||||
npage = _bt_blnewpage(state->btps_level);
|
nbuf = _bt_blnewpage(wstate, state->btps_level);
|
||||||
|
npage = (Page) nbuf;
|
||||||
|
|
||||||
/* and assign it a page position */
|
/* and assign it a page position */
|
||||||
nblkno = wstate->btws_pages_alloced++;
|
nblkno = wstate->btws_pages_alloced++;
|
||||||
|
@ -1028,10 +978,10 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Write out the old page. We never need to touch it again, so we can
|
* Write out the old page. _bt_blwritepage takes ownership of the
|
||||||
* free the opage workspace too.
|
* 'opage' buffer.
|
||||||
*/
|
*/
|
||||||
_bt_blwritepage(wstate, opage, oblkno);
|
_bt_blwritepage(wstate, obuf, oblkno);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Reset last_off to point to new page
|
* Reset last_off to point to new page
|
||||||
|
@ -1064,7 +1014,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
|
||||||
_bt_sortaddtup(npage, itupsz, itup, last_off,
|
_bt_sortaddtup(npage, itupsz, itup, last_off,
|
||||||
!isleaf && last_off == P_FIRSTKEY);
|
!isleaf && last_off == P_FIRSTKEY);
|
||||||
|
|
||||||
state->btps_page = npage;
|
state->btps_buf = nbuf;
|
||||||
state->btps_blkno = nblkno;
|
state->btps_blkno = nblkno;
|
||||||
state->btps_lastoff = last_off;
|
state->btps_lastoff = last_off;
|
||||||
}
|
}
|
||||||
|
@ -1116,7 +1066,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
|
||||||
BTPageState *s;
|
BTPageState *s;
|
||||||
BlockNumber rootblkno = P_NONE;
|
BlockNumber rootblkno = P_NONE;
|
||||||
uint32 rootlevel = 0;
|
uint32 rootlevel = 0;
|
||||||
Page metapage;
|
BulkWriteBuffer metabuf;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Each iteration of this loop completes one more level of the tree.
|
* Each iteration of this loop completes one more level of the tree.
|
||||||
|
@ -1127,7 +1077,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
|
||||||
BTPageOpaque opaque;
|
BTPageOpaque opaque;
|
||||||
|
|
||||||
blkno = s->btps_blkno;
|
blkno = s->btps_blkno;
|
||||||
opaque = BTPageGetOpaque(s->btps_page);
|
opaque = BTPageGetOpaque((Page) s->btps_buf);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We have to link the last page on this level to somewhere.
|
* We have to link the last page on this level to somewhere.
|
||||||
|
@ -1161,9 +1111,9 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
|
||||||
* This is the rightmost page, so the ItemId array needs to be slid
|
* This is the rightmost page, so the ItemId array needs to be slid
|
||||||
* back one slot. Then we can dump out the page.
|
* back one slot. Then we can dump out the page.
|
||||||
*/
|
*/
|
||||||
_bt_slideleft(s->btps_page);
|
_bt_slideleft((Page) s->btps_buf);
|
||||||
_bt_blwritepage(wstate, s->btps_page, s->btps_blkno);
|
_bt_blwritepage(wstate, s->btps_buf, s->btps_blkno);
|
||||||
s->btps_page = NULL; /* writepage freed the workspace */
|
s->btps_buf = NULL; /* writepage took ownership of the buffer */
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1172,10 +1122,10 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
|
||||||
* set to point to "P_NONE"). This changes the index to the "valid" state
|
* set to point to "P_NONE"). This changes the index to the "valid" state
|
||||||
* by filling in a valid magic number in the metapage.
|
* by filling in a valid magic number in the metapage.
|
||||||
*/
|
*/
|
||||||
metapage = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
|
metabuf = smgr_bulk_get_buf(wstate->bulkstate);
|
||||||
_bt_initmetapage(metapage, rootblkno, rootlevel,
|
_bt_initmetapage((Page) metabuf, rootblkno, rootlevel,
|
||||||
wstate->inskey->allequalimage);
|
wstate->inskey->allequalimage);
|
||||||
_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
|
_bt_blwritepage(wstate, metabuf, BTREE_METAPAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1197,6 +1147,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
|
||||||
int64 tuples_done = 0;
|
int64 tuples_done = 0;
|
||||||
bool deduplicate;
|
bool deduplicate;
|
||||||
|
|
||||||
|
wstate->bulkstate = smgr_bulk_start_rel(wstate->index, MAIN_FORKNUM);
|
||||||
|
|
||||||
deduplicate = wstate->inskey->allequalimage && !btspool->isunique &&
|
deduplicate = wstate->inskey->allequalimage && !btspool->isunique &&
|
||||||
BTGetDeduplicateItems(wstate->index);
|
BTGetDeduplicateItems(wstate->index);
|
||||||
|
|
||||||
|
@ -1352,7 +1304,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
|
||||||
*/
|
*/
|
||||||
dstate->maxpostingsize = MAXALIGN_DOWN((BLCKSZ * 10 / 100)) -
|
dstate->maxpostingsize = MAXALIGN_DOWN((BLCKSZ * 10 / 100)) -
|
||||||
sizeof(ItemIdData);
|
sizeof(ItemIdData);
|
||||||
Assert(dstate->maxpostingsize <= BTMaxItemSize(state->btps_page) &&
|
Assert(dstate->maxpostingsize <= BTMaxItemSize((Page) state->btps_buf) &&
|
||||||
dstate->maxpostingsize <= INDEX_SIZE_MASK);
|
dstate->maxpostingsize <= INDEX_SIZE_MASK);
|
||||||
dstate->htids = palloc(dstate->maxpostingsize);
|
dstate->htids = palloc(dstate->maxpostingsize);
|
||||||
|
|
||||||
|
@ -1422,18 +1374,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
|
||||||
|
|
||||||
/* Close down final pages and write the metapage */
|
/* Close down final pages and write the metapage */
|
||||||
_bt_uppershutdown(wstate, state);
|
_bt_uppershutdown(wstate, state);
|
||||||
|
smgr_bulk_finish(wstate->bulkstate);
|
||||||
/*
|
|
||||||
* When we WAL-logged index pages, we must nonetheless fsync index files.
|
|
||||||
* Since we're building outside shared buffers, a CHECKPOINT occurring
|
|
||||||
* during the build has no way to flush the previously written data to
|
|
||||||
* disk (indeed it won't know the index even exists). A crash later on
|
|
||||||
* would replay WAL from the checkpoint, therefore it wouldn't replay our
|
|
||||||
* earlier WAL entries. If we do not fsync those pages here, they might
|
|
||||||
* still not be on disk when the crash occurs.
|
|
||||||
*/
|
|
||||||
if (wstate->btws_use_wal)
|
|
||||||
smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -25,7 +25,7 @@
|
||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "storage/bufmgr.h"
|
#include "storage/bufmgr.h"
|
||||||
#include "storage/smgr.h"
|
#include "storage/bulk_write.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
@ -155,42 +155,27 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
|
||||||
void
|
void
|
||||||
spgbuildempty(Relation index)
|
spgbuildempty(Relation index)
|
||||||
{
|
{
|
||||||
Buffer metabuffer,
|
BulkWriteState *bulkstate;
|
||||||
rootbuffer,
|
BulkWriteBuffer buf;
|
||||||
nullbuffer;
|
|
||||||
|
|
||||||
/*
|
bulkstate = smgr_bulk_start_rel(index, INIT_FORKNUM);
|
||||||
* Initialize the meta page and root pages
|
|
||||||
*/
|
|
||||||
metabuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
|
|
||||||
LockBuffer(metabuffer, BUFFER_LOCK_EXCLUSIVE);
|
|
||||||
rootbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
|
|
||||||
LockBuffer(rootbuffer, BUFFER_LOCK_EXCLUSIVE);
|
|
||||||
nullbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
|
|
||||||
LockBuffer(nullbuffer, BUFFER_LOCK_EXCLUSIVE);
|
|
||||||
|
|
||||||
Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO);
|
/* Construct metapage. */
|
||||||
Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO);
|
buf = smgr_bulk_get_buf(bulkstate);
|
||||||
Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO);
|
SpGistInitMetapage((Page) buf);
|
||||||
|
smgr_bulk_write(bulkstate, SPGIST_METAPAGE_BLKNO, buf, true);
|
||||||
|
|
||||||
START_CRIT_SECTION();
|
/* Likewise for the root page. */
|
||||||
|
buf = smgr_bulk_get_buf(bulkstate);
|
||||||
|
SpGistInitPage((Page) buf, SPGIST_LEAF);
|
||||||
|
smgr_bulk_write(bulkstate, SPGIST_ROOT_BLKNO, buf, true);
|
||||||
|
|
||||||
SpGistInitMetapage(BufferGetPage(metabuffer));
|
/* Likewise for the null-tuples root page. */
|
||||||
MarkBufferDirty(metabuffer);
|
buf = smgr_bulk_get_buf(bulkstate);
|
||||||
SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
|
SpGistInitPage((Page) buf, SPGIST_LEAF | SPGIST_NULLS);
|
||||||
MarkBufferDirty(rootbuffer);
|
smgr_bulk_write(bulkstate, SPGIST_NULL_BLKNO, buf, true);
|
||||||
SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS);
|
|
||||||
MarkBufferDirty(nullbuffer);
|
|
||||||
|
|
||||||
log_newpage_buffer(metabuffer, true);
|
smgr_bulk_finish(bulkstate);
|
||||||
log_newpage_buffer(rootbuffer, true);
|
|
||||||
log_newpage_buffer(nullbuffer, true);
|
|
||||||
|
|
||||||
END_CRIT_SECTION();
|
|
||||||
|
|
||||||
UnlockReleaseBuffer(metabuffer);
|
|
||||||
UnlockReleaseBuffer(rootbuffer);
|
|
||||||
UnlockReleaseBuffer(nullbuffer);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "catalog/storage.h"
|
#include "catalog/storage.h"
|
||||||
#include "catalog/storage_xlog.h"
|
#include "catalog/storage_xlog.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
#include "storage/bulk_write.h"
|
||||||
#include "storage/freespace.h"
|
#include "storage/freespace.h"
|
||||||
#include "storage/smgr.h"
|
#include "storage/smgr.h"
|
||||||
#include "utils/hsearch.h"
|
#include "utils/hsearch.h"
|
||||||
|
@ -451,14 +452,11 @@ void
|
||||||
RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
|
RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
|
||||||
ForkNumber forkNum, char relpersistence)
|
ForkNumber forkNum, char relpersistence)
|
||||||
{
|
{
|
||||||
PGIOAlignedBlock buf;
|
|
||||||
Page page;
|
|
||||||
bool use_wal;
|
bool use_wal;
|
||||||
bool copying_initfork;
|
bool copying_initfork;
|
||||||
BlockNumber nblocks;
|
BlockNumber nblocks;
|
||||||
BlockNumber blkno;
|
BlockNumber blkno;
|
||||||
|
BulkWriteState *bulkstate;
|
||||||
page = (Page) buf.data;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The init fork for an unlogged relation in many respects has to be
|
* The init fork for an unlogged relation in many respects has to be
|
||||||
|
@ -477,16 +475,21 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
|
||||||
use_wal = XLogIsNeeded() &&
|
use_wal = XLogIsNeeded() &&
|
||||||
(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
|
(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
|
||||||
|
|
||||||
|
bulkstate = smgr_bulk_start_smgr(dst, forkNum, use_wal);
|
||||||
|
|
||||||
nblocks = smgrnblocks(src, forkNum);
|
nblocks = smgrnblocks(src, forkNum);
|
||||||
|
|
||||||
for (blkno = 0; blkno < nblocks; blkno++)
|
for (blkno = 0; blkno < nblocks; blkno++)
|
||||||
{
|
{
|
||||||
|
BulkWriteBuffer buf;
|
||||||
|
|
||||||
/* If we got a cancel signal during the copy of the data, quit */
|
/* If we got a cancel signal during the copy of the data, quit */
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
smgrread(src, forkNum, blkno, buf.data);
|
buf = smgr_bulk_get_buf(bulkstate);
|
||||||
|
smgrread(src, forkNum, blkno, (Page) buf);
|
||||||
|
|
||||||
if (!PageIsVerifiedExtended(page, blkno,
|
if (!PageIsVerifiedExtended((Page) buf, blkno,
|
||||||
PIV_LOG_WARNING | PIV_REPORT_STAT))
|
PIV_LOG_WARNING | PIV_REPORT_STAT))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -507,34 +510,13 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WAL-log the copied page. Unfortunately we don't know what kind of a
|
* Queue the page for WAL-logging and writing out. Unfortunately we
|
||||||
* page this is, so we have to log the full page including any unused
|
* don't know what kind of a page this is, so we have to log the full
|
||||||
* space.
|
* page including any unused space.
|
||||||
*/
|
*/
|
||||||
if (use_wal)
|
smgr_bulk_write(bulkstate, blkno, buf, false);
|
||||||
log_newpage(&dst->smgr_rlocator.locator, forkNum, blkno, page, false);
|
|
||||||
|
|
||||||
PageSetChecksumInplace(page, blkno);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Now write the page. We say skipFsync = true because there's no
|
|
||||||
* need for smgr to schedule an fsync for this write; we'll do it
|
|
||||||
* ourselves below.
|
|
||||||
*/
|
|
||||||
smgrextend(dst, forkNum, blkno, buf.data, true);
|
|
||||||
}
|
}
|
||||||
|
smgr_bulk_finish(bulkstate);
|
||||||
/*
|
|
||||||
* When we WAL-logged rel pages, we must nonetheless fsync them. The
|
|
||||||
* reason is that since we're copying outside shared buffers, a CHECKPOINT
|
|
||||||
* occurring during the copy has no way to flush the previously written
|
|
||||||
* data to disk (indeed it won't know the new rel even exists). A crash
|
|
||||||
* later on would replay WAL from the checkpoint, therefore it wouldn't
|
|
||||||
* replay our earlier WAL entries. If we do not fsync those pages here,
|
|
||||||
* they might still not be on disk when the crash occurs.
|
|
||||||
*/
|
|
||||||
if (use_wal || copying_initfork)
|
|
||||||
smgrimmedsync(dst, forkNum);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -13,6 +13,7 @@ top_builddir = ../../../..
|
||||||
include $(top_builddir)/src/Makefile.global
|
include $(top_builddir)/src/Makefile.global
|
||||||
|
|
||||||
OBJS = \
|
OBJS = \
|
||||||
|
bulk_write.o \
|
||||||
md.o \
|
md.o \
|
||||||
smgr.o
|
smgr.o
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,298 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* bulk_write.c
|
||||||
|
* Efficiently and reliably populate a new relation
|
||||||
|
*
|
||||||
|
* The assumption is that no other backends access the relation while we are
|
||||||
|
* loading it, so we can take some shortcuts. Do not mix operations through
|
||||||
|
* the regular buffer manager and the bulk loading interface!
|
||||||
|
*
|
||||||
|
* We bypass the buffer manager to avoid the locking overhead, and call
|
||||||
|
* smgrextend() directly. A downside is that the pages will need to be
|
||||||
|
* re-read into shared buffers on first use after the build finishes. That's
|
||||||
|
* usually a good tradeoff for large relations, and for small relations, the
|
||||||
|
* overhead isn't very significant compared to creating the relation in the
|
||||||
|
* first place.
|
||||||
|
*
|
||||||
|
* The pages are WAL-logged if needed. To save on WAL header overhead, we
|
||||||
|
* WAL-log several pages in one record.
|
||||||
|
*
|
||||||
|
* One tricky point is that because we bypass the buffer manager, we need to
|
||||||
|
* register the relation for fsyncing at the next checkpoint ourselves, and
|
||||||
|
* make sure that the relation is correctly fsync'd by us or the checkpointer
|
||||||
|
* even if a checkpoint happens concurrently.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
|
||||||
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* IDENTIFICATION
|
||||||
|
* src/backend/storage/smgr/bulk_write.c
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "access/xloginsert.h"
|
||||||
|
#include "access/xlogrecord.h"
|
||||||
|
#include "storage/bufmgr.h"
|
||||||
|
#include "storage/bufpage.h"
|
||||||
|
#include "storage/bulk_write.h"
|
||||||
|
#include "storage/proc.h"
|
||||||
|
#include "storage/smgr.h"
|
||||||
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
#define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
|
||||||
|
|
||||||
|
static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
|
||||||
|
|
||||||
|
typedef struct PendingWrite
|
||||||
|
{
|
||||||
|
BulkWriteBuffer buf;
|
||||||
|
BlockNumber blkno;
|
||||||
|
bool page_std;
|
||||||
|
} PendingWrite;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Bulk writer state for one relation fork.
|
||||||
|
*/
|
||||||
|
typedef struct BulkWriteState
|
||||||
|
{
|
||||||
|
/* Information about the target relation we're writing */
|
||||||
|
SMgrRelation smgr;
|
||||||
|
ForkNumber forknum;
|
||||||
|
bool use_wal;
|
||||||
|
|
||||||
|
/* We keep several writes queued, and WAL-log them in batches */
|
||||||
|
int npending;
|
||||||
|
PendingWrite pending_writes[MAX_PENDING_WRITES];
|
||||||
|
|
||||||
|
/* Current size of the relation */
|
||||||
|
BlockNumber pages_written;
|
||||||
|
|
||||||
|
/* The RedoRecPtr at the time that the bulk operation started */
|
||||||
|
XLogRecPtr start_RedoRecPtr;
|
||||||
|
|
||||||
|
MemoryContext memcxt;
|
||||||
|
} BulkWriteState;
|
||||||
|
|
||||||
|
static void smgr_bulk_flush(BulkWriteState *bulkstate);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Start a bulk write operation on a relation fork.
|
||||||
|
*/
|
||||||
|
BulkWriteState *
|
||||||
|
smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
|
||||||
|
{
|
||||||
|
return smgr_bulk_start_smgr(RelationGetSmgr(rel),
|
||||||
|
forknum,
|
||||||
|
RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Start a bulk write operation on a relation fork.
|
||||||
|
*
|
||||||
|
* This is like smgr_bulk_start_rel, but can be used without a relcache entry.
|
||||||
|
*/
|
||||||
|
BulkWriteState *
|
||||||
|
smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
|
||||||
|
{
|
||||||
|
BulkWriteState *state;
|
||||||
|
|
||||||
|
state = palloc(sizeof(BulkWriteState));
|
||||||
|
state->smgr = smgr;
|
||||||
|
state->forknum = forknum;
|
||||||
|
state->use_wal = use_wal;
|
||||||
|
|
||||||
|
state->npending = 0;
|
||||||
|
state->pages_written = 0;
|
||||||
|
|
||||||
|
state->start_RedoRecPtr = GetRedoRecPtr();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remember the memory context. We will use it to allocate all the
|
||||||
|
* buffers later.
|
||||||
|
*/
|
||||||
|
state->memcxt = CurrentMemoryContext;
|
||||||
|
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Finish bulk write operation.
|
||||||
|
*
|
||||||
|
* This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
|
||||||
|
* the relation if needed.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
smgr_bulk_finish(BulkWriteState *bulkstate)
|
||||||
|
{
|
||||||
|
/* WAL-log and flush any remaining pages */
|
||||||
|
smgr_bulk_flush(bulkstate);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When we wrote out the pages, we passed skipFsync=true to avoid the
|
||||||
|
* overhead of registering all the writes with the checkpointer. Register
|
||||||
|
* the whole relation now.
|
||||||
|
*
|
||||||
|
* There is one hole in that idea: If a checkpoint occurred while we were
|
||||||
|
* writing the pages, it already missed fsyncing the pages we had written
|
||||||
|
* before the checkpoint started. A crash later on would replay the WAL
|
||||||
|
* starting from the checkpoint, therefore it wouldn't replay our earlier
|
||||||
|
* WAL records. So if a checkpoint started after the bulk write, fsync
|
||||||
|
* the files now.
|
||||||
|
*/
|
||||||
|
if (!SmgrIsTemp(bulkstate->smgr))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Prevent a checkpoint from starting between the GetRedoRecPtr() and
|
||||||
|
* smgrregistersync() calls.
|
||||||
|
*/
|
||||||
|
Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
|
||||||
|
MyProc->delayChkptFlags |= DELAY_CHKPT_START;
|
||||||
|
|
||||||
|
if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* A checkpoint occurred and it didn't know about our writes, so
|
||||||
|
* fsync() the relation ourselves.
|
||||||
|
*/
|
||||||
|
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
|
||||||
|
smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
|
||||||
|
elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
smgrregistersync(bulkstate->smgr, bulkstate->forknum);
|
||||||
|
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
buffer_cmp(const void *a, const void *b)
|
||||||
|
{
|
||||||
|
const PendingWrite *bufa = (const PendingWrite *) a;
|
||||||
|
const PendingWrite *bufb = (const PendingWrite *) b;
|
||||||
|
|
||||||
|
/* We should not see duplicated writes for the same block */
|
||||||
|
Assert(bufa->blkno != bufb->blkno);
|
||||||
|
if (bufa->blkno > bufb->blkno)
|
||||||
|
return 1;
|
||||||
|
else
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Finish all the pending writes.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
smgr_bulk_flush(BulkWriteState *bulkstate)
|
||||||
|
{
|
||||||
|
int npending = bulkstate->npending;
|
||||||
|
PendingWrite *pending_writes = bulkstate->pending_writes;
|
||||||
|
|
||||||
|
if (npending == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (npending > 1)
|
||||||
|
qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
|
||||||
|
|
||||||
|
if (bulkstate->use_wal)
|
||||||
|
{
|
||||||
|
BlockNumber blknos[MAX_PENDING_WRITES];
|
||||||
|
Page pages[MAX_PENDING_WRITES];
|
||||||
|
bool page_std = true;
|
||||||
|
|
||||||
|
for (int i = 0; i < npending; i++)
|
||||||
|
{
|
||||||
|
blknos[i] = pending_writes[i].blkno;
|
||||||
|
pages[i] = pending_writes[i].buf->data;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If any of the pages use !page_std, we log them all as such.
|
||||||
|
* That's a bit wasteful, but in practice, a mix of standard and
|
||||||
|
* non-standard page layout is rare. None of the built-in AMs do
|
||||||
|
* that.
|
||||||
|
*/
|
||||||
|
if (!pending_writes[i].page_std)
|
||||||
|
page_std = false;
|
||||||
|
}
|
||||||
|
log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
|
||||||
|
npending, blknos, pages, page_std);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < npending; i++)
|
||||||
|
{
|
||||||
|
BlockNumber blkno = pending_writes[i].blkno;
|
||||||
|
Page page = pending_writes[i].buf->data;
|
||||||
|
|
||||||
|
PageSetChecksumInplace(page, blkno);
|
||||||
|
|
||||||
|
if (blkno >= bulkstate->pages_written)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If we have to write pages nonsequentially, fill in the space
|
||||||
|
* with zeroes until we come back and overwrite. This is not
|
||||||
|
* logically necessary on standard Unix filesystems (unwritten
|
||||||
|
* space will read as zeroes anyway), but it should help to avoid
|
||||||
|
* fragmentation. The dummy pages aren't WAL-logged though.
|
||||||
|
*/
|
||||||
|
while (blkno > bulkstate->pages_written)
|
||||||
|
{
|
||||||
|
/* don't set checksum for all-zero page */
|
||||||
|
smgrextend(bulkstate->smgr, bulkstate->forknum,
|
||||||
|
bulkstate->pages_written++,
|
||||||
|
&zero_buffer,
|
||||||
|
true);
|
||||||
|
}
|
||||||
|
|
||||||
|
smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
|
||||||
|
bulkstate->pages_written = pending_writes[i].blkno + 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
|
||||||
|
pfree(page);
|
||||||
|
}
|
||||||
|
|
||||||
|
bulkstate->npending = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Queue write of 'buf'.
|
||||||
|
*
|
||||||
|
* NB: this takes ownership of 'buf'!
|
||||||
|
*
|
||||||
|
* You are only allowed to write a given block once as part of one bulk write
|
||||||
|
* operation.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
|
||||||
|
{
|
||||||
|
PendingWrite *w;
|
||||||
|
|
||||||
|
w = &bulkstate->pending_writes[bulkstate->npending++];
|
||||||
|
w->buf = buf;
|
||||||
|
w->blkno = blocknum;
|
||||||
|
w->page_std = page_std;
|
||||||
|
|
||||||
|
if (bulkstate->npending == MAX_PENDING_WRITES)
|
||||||
|
smgr_bulk_flush(bulkstate);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Allocate a new buffer which can later be written with smgr_bulk_write().
|
||||||
|
*
|
||||||
|
* There is no function to free the buffer. When you pass it to
|
||||||
|
* smgr_bulk_write(), it takes ownership and frees it when it's no longer
|
||||||
|
* needed.
|
||||||
|
*
|
||||||
|
* This is currently implemented as a simple palloc, but could be implemented
|
||||||
|
* using a ring buffer or larger chunks in the future, so don't rely on it.
|
||||||
|
*/
|
||||||
|
BulkWriteBuffer
|
||||||
|
smgr_bulk_get_buf(BulkWriteState *bulkstate)
|
||||||
|
{
|
||||||
|
return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
|
||||||
|
}
|
|
@ -1236,6 +1236,49 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* mdregistersync() -- Mark whole relation as needing fsync
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
mdregistersync(SMgrRelation reln, ForkNumber forknum)
|
||||||
|
{
|
||||||
|
int segno;
|
||||||
|
int min_inactive_seg;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NOTE: mdnblocks makes sure we have opened all active segments, so that
|
||||||
|
* the loop below will get them all!
|
||||||
|
*/
|
||||||
|
mdnblocks(reln, forknum);
|
||||||
|
|
||||||
|
min_inactive_seg = segno = reln->md_num_open_segs[forknum];
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Temporarily open inactive segments, then close them after sync. There
|
||||||
|
* may be some inactive segments left opened after error, but that is
|
||||||
|
* harmless. We don't bother to clean them up and take a risk of further
|
||||||
|
* trouble. The next mdclose() will soon close them.
|
||||||
|
*/
|
||||||
|
while (_mdfd_openseg(reln, forknum, segno, 0) != NULL)
|
||||||
|
segno++;
|
||||||
|
|
||||||
|
while (segno > 0)
|
||||||
|
{
|
||||||
|
MdfdVec *v = &reln->md_seg_fds[forknum][segno - 1];
|
||||||
|
|
||||||
|
register_dirty_segment(reln, forknum, v);
|
||||||
|
|
||||||
|
/* Close inactive segments immediately */
|
||||||
|
if (segno > min_inactive_seg)
|
||||||
|
{
|
||||||
|
FileClose(v->mdfd_vfd);
|
||||||
|
_fdvec_resize(reln, forknum, segno - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
segno--;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* mdimmedsync() -- Immediately sync a relation to stable storage.
|
* mdimmedsync() -- Immediately sync a relation to stable storage.
|
||||||
*
|
*
|
||||||
|
@ -1255,7 +1298,7 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NOTE: mdnblocks makes sure we have opened all active segments, so that
|
* NOTE: mdnblocks makes sure we have opened all active segments, so that
|
||||||
* fsync loop will get them all!
|
* the loop below will get them all!
|
||||||
*/
|
*/
|
||||||
mdnblocks(reln, forknum);
|
mdnblocks(reln, forknum);
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
|
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
|
||||||
|
|
||||||
backend_sources += files(
|
backend_sources += files(
|
||||||
|
'bulk_write.c',
|
||||||
'md.c',
|
'md.c',
|
||||||
'smgr.c',
|
'smgr.c',
|
||||||
)
|
)
|
||||||
|
|
|
@ -102,6 +102,7 @@ typedef struct f_smgr
|
||||||
void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
|
void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
|
||||||
BlockNumber nblocks);
|
BlockNumber nblocks);
|
||||||
void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
|
void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
|
||||||
|
void (*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
|
||||||
} f_smgr;
|
} f_smgr;
|
||||||
|
|
||||||
static const f_smgr smgrsw[] = {
|
static const f_smgr smgrsw[] = {
|
||||||
|
@ -123,6 +124,7 @@ static const f_smgr smgrsw[] = {
|
||||||
.smgr_nblocks = mdnblocks,
|
.smgr_nblocks = mdnblocks,
|
||||||
.smgr_truncate = mdtruncate,
|
.smgr_truncate = mdtruncate,
|
||||||
.smgr_immedsync = mdimmedsync,
|
.smgr_immedsync = mdimmedsync,
|
||||||
|
.smgr_registersync = mdregistersync,
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -616,6 +618,14 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||||
* on disk at return, only dumped out to the kernel. However,
|
* on disk at return, only dumped out to the kernel. However,
|
||||||
* provisions will be made to fsync the write before the next checkpoint.
|
* provisions will be made to fsync the write before the next checkpoint.
|
||||||
*
|
*
|
||||||
|
* NB: The mechanism to ensure fsync at next checkpoint assumes that there is
|
||||||
|
* something that prevents a concurrent checkpoint from "racing ahead" of the
|
||||||
|
* write. One way to prevent that is by holding a lock on the buffer; the
|
||||||
|
* buffer manager's writes are protected by that. The bulk writer facility
|
||||||
|
* in bulk_write.c checks the redo pointer and calls smgrimmedsync() if a
|
||||||
|
* checkpoint happened; that relies on the fact that no other backend can be
|
||||||
|
* concurrently modifying the page.
|
||||||
|
*
|
||||||
* skipFsync indicates that the caller will make other provisions to
|
* skipFsync indicates that the caller will make other provisions to
|
||||||
* fsync the relation, so we needn't bother. Temporary relations also
|
* fsync the relation, so we needn't bother. Temporary relations also
|
||||||
* do not require fsync.
|
* do not require fsync.
|
||||||
|
@ -733,6 +743,24 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* smgrregistersync() -- Request a relation to be sync'd at next checkpoint
|
||||||
|
*
|
||||||
|
* This can be used after calling smgrwrite() or smgrextend() with skipFsync =
|
||||||
|
* true, to register the fsyncs that were skipped earlier.
|
||||||
|
*
|
||||||
|
* Note: be mindful that a checkpoint could already have happened between the
|
||||||
|
* smgrwrite or smgrextend calls and this! In that case, the checkpoint
|
||||||
|
* already missed fsyncing this relation, and you should use smgrimmedsync
|
||||||
|
* instead. Most callers should use the bulk loading facility in bulk_write.c
|
||||||
|
* which handles all that.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
smgrregistersync(SMgrRelation reln, ForkNumber forknum)
|
||||||
|
{
|
||||||
|
smgrsw[reln->smgr_which].smgr_registersync(reln, forknum);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* smgrimmedsync() -- Force the specified relation to stable storage.
|
* smgrimmedsync() -- Force the specified relation to stable storage.
|
||||||
*
|
*
|
||||||
|
@ -755,6 +783,9 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
|
||||||
* Note that you need to do FlushRelationBuffers() first if there is
|
* Note that you need to do FlushRelationBuffers() first if there is
|
||||||
* any possibility that there are dirty buffers for the relation;
|
* any possibility that there are dirty buffers for the relation;
|
||||||
* otherwise the sync is not very meaningful.
|
* otherwise the sync is not very meaningful.
|
||||||
|
*
|
||||||
|
* Most callers should use the bulk loading facility in bulk_write.c
|
||||||
|
* instead of calling this directly.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
|
smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* bulk_write.h
|
||||||
|
* Efficiently and reliably populate a new relation
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
|
||||||
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
|
*
|
||||||
|
* src/include/storage/bulk_write.h
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
#ifndef BULK_WRITE_H
|
||||||
|
#define BULK_WRITE_H
|
||||||
|
|
||||||
|
#include "storage/smgr.h"
|
||||||
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
typedef struct BulkWriteState BulkWriteState;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Temporary buffer to hold a page to until it's written out. Use
|
||||||
|
* smgr_bulk_get_buf() to reserve one of these. This is a separate typedef to
|
||||||
|
* distinguish it from other block-sized buffers passed around in the system.
|
||||||
|
*/
|
||||||
|
typedef PGIOAlignedBlock *BulkWriteBuffer;
|
||||||
|
|
||||||
|
/* forward declared from smgr.h */
|
||||||
|
struct SMgrRelationData;
|
||||||
|
|
||||||
|
extern BulkWriteState *smgr_bulk_start_rel(Relation rel, ForkNumber forknum);
|
||||||
|
extern BulkWriteState *smgr_bulk_start_smgr(struct SMgrRelationData *smgr, ForkNumber forknum, bool use_wal);
|
||||||
|
|
||||||
|
extern BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate);
|
||||||
|
extern void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std);
|
||||||
|
|
||||||
|
extern void smgr_bulk_finish(BulkWriteState *bulkstate);
|
||||||
|
|
||||||
|
#endif /* BULK_WRITE_H */
|
|
@ -43,6 +43,7 @@ extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
|
||||||
extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
|
extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
|
||||||
BlockNumber nblocks);
|
BlockNumber nblocks);
|
||||||
extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
|
extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
|
||||||
extern void ForgetDatabaseSyncRequests(Oid dbid);
|
extern void ForgetDatabaseSyncRequests(Oid dbid);
|
||||||
extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
|
extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
|
||||||
|
|
|
@ -106,6 +106,7 @@ extern BlockNumber smgrnblocks_cached(SMgrRelation reln, ForkNumber forknum);
|
||||||
extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
|
extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
|
||||||
int nforks, BlockNumber *nblocks);
|
int nforks, BlockNumber *nblocks);
|
||||||
extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
|
extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
|
||||||
|
extern void smgrregistersync(SMgrRelation reln, ForkNumber forknum);
|
||||||
extern void AtEOXact_SMgr(void);
|
extern void AtEOXact_SMgr(void);
|
||||||
extern bool ProcessBarrierSmgrRelease(void);
|
extern bool ProcessBarrierSmgrRelease(void);
|
||||||
|
|
||||||
|
|
|
@ -333,6 +333,8 @@ BuildAccumulator
|
||||||
BuiltinScript
|
BuiltinScript
|
||||||
BulkInsertState
|
BulkInsertState
|
||||||
BulkInsertStateData
|
BulkInsertStateData
|
||||||
|
BulkWriteBuffer
|
||||||
|
BulkWriteState
|
||||||
CACHESIGN
|
CACHESIGN
|
||||||
CAC_state
|
CAC_state
|
||||||
CCFastEqualFN
|
CCFastEqualFN
|
||||||
|
@ -2018,6 +2020,7 @@ PendingFsyncEntry
|
||||||
PendingRelDelete
|
PendingRelDelete
|
||||||
PendingRelSync
|
PendingRelSync
|
||||||
PendingUnlinkEntry
|
PendingUnlinkEntry
|
||||||
|
PendingWrite
|
||||||
PendingWriteback
|
PendingWriteback
|
||||||
PerLockTagEntry
|
PerLockTagEntry
|
||||||
PerlInterpreter
|
PerlInterpreter
|
||||||
|
|
Loading…
Reference in New Issue