Define PG_LOGICAL_DIR for path pg_logical/ in data folder
This is similar to 2065ddf5e34c, but this time for pg_logical/ itself and its contents, like the paths for snapshots, mappings or origin checkpoints. Author: Bertrand Drouvot Reviewed-by: Ashutosh Bapat, Yugo Nagata, Michael Paquier Discussion: https://postgr.es/m/ZryVvjqS9SnV1GPP@ip-10-97-1-34.eu-west-3.compute.internal
This commit is contained in:
parent
2065ddf5e3
commit
c39afc38cf
@ -961,8 +961,8 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
|
|||||||
dboid = MyDatabaseId;
|
dboid = MyDatabaseId;
|
||||||
|
|
||||||
snprintf(path, MAXPGPATH,
|
snprintf(path, MAXPGPATH,
|
||||||
"pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
|
"%s/" LOGICAL_REWRITE_FORMAT,
|
||||||
dboid, relid,
|
PG_LOGICAL_MAPPINGS_DIR, dboid, relid,
|
||||||
LSN_FORMAT_ARGS(state->rs_begin_lsn),
|
LSN_FORMAT_ARGS(state->rs_begin_lsn),
|
||||||
xid, GetCurrentTransactionId());
|
xid, GetCurrentTransactionId());
|
||||||
|
|
||||||
@ -1081,8 +1081,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
|
|||||||
xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
|
xlrec = (xl_heap_rewrite_mapping *) XLogRecGetData(r);
|
||||||
|
|
||||||
snprintf(path, MAXPGPATH,
|
snprintf(path, MAXPGPATH,
|
||||||
"pg_logical/mappings/" LOGICAL_REWRITE_FORMAT,
|
"%s/" LOGICAL_REWRITE_FORMAT,
|
||||||
xlrec->mapped_db, xlrec->mapped_rel,
|
PG_LOGICAL_MAPPINGS_DIR, xlrec->mapped_db, xlrec->mapped_rel,
|
||||||
LSN_FORMAT_ARGS(xlrec->start_lsn),
|
LSN_FORMAT_ARGS(xlrec->start_lsn),
|
||||||
xlrec->mapped_xid, XLogRecGetXid(r));
|
xlrec->mapped_xid, XLogRecGetXid(r));
|
||||||
|
|
||||||
@ -1158,7 +1158,7 @@ CheckPointLogicalRewriteHeap(void)
|
|||||||
XLogRecPtr redo;
|
XLogRecPtr redo;
|
||||||
DIR *mappings_dir;
|
DIR *mappings_dir;
|
||||||
struct dirent *mapping_de;
|
struct dirent *mapping_de;
|
||||||
char path[MAXPGPATH + 20];
|
char path[MAXPGPATH + sizeof(PG_LOGICAL_MAPPINGS_DIR)];
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We start of with a minimum of the last redo pointer. No new decoding
|
* We start of with a minimum of the last redo pointer. No new decoding
|
||||||
@ -1173,8 +1173,8 @@ CheckPointLogicalRewriteHeap(void)
|
|||||||
if (cutoff != InvalidXLogRecPtr && redo < cutoff)
|
if (cutoff != InvalidXLogRecPtr && redo < cutoff)
|
||||||
cutoff = redo;
|
cutoff = redo;
|
||||||
|
|
||||||
mappings_dir = AllocateDir("pg_logical/mappings");
|
mappings_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
|
||||||
while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
|
while ((mapping_de = ReadDir(mappings_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
|
||||||
{
|
{
|
||||||
Oid dboid;
|
Oid dboid;
|
||||||
Oid relid;
|
Oid relid;
|
||||||
@ -1189,7 +1189,7 @@ CheckPointLogicalRewriteHeap(void)
|
|||||||
strcmp(mapping_de->d_name, "..") == 0)
|
strcmp(mapping_de->d_name, "..") == 0)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
|
snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_MAPPINGS_DIR, mapping_de->d_name);
|
||||||
de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
|
de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
|
||||||
|
|
||||||
if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
|
if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
|
||||||
@ -1249,5 +1249,5 @@ CheckPointLogicalRewriteHeap(void)
|
|||||||
FreeDir(mappings_dir);
|
FreeDir(mappings_dir);
|
||||||
|
|
||||||
/* persist directory entries to disk */
|
/* persist directory entries to disk */
|
||||||
fsync_fname("pg_logical/mappings", true);
|
fsync_fname(PG_LOGICAL_MAPPINGS_DIR, true);
|
||||||
}
|
}
|
||||||
|
@ -95,6 +95,10 @@
|
|||||||
#include "utils/snapmgr.h"
|
#include "utils/snapmgr.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
/* paths for replication origin checkpoint files */
|
||||||
|
#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
|
||||||
|
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Replay progress of a single remote node.
|
* Replay progress of a single remote node.
|
||||||
*/
|
*/
|
||||||
@ -572,8 +576,8 @@ ReplicationOriginShmemInit(void)
|
|||||||
void
|
void
|
||||||
CheckPointReplicationOrigin(void)
|
CheckPointReplicationOrigin(void)
|
||||||
{
|
{
|
||||||
const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
|
const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
|
||||||
const char *path = "pg_logical/replorigin_checkpoint";
|
const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
|
||||||
int tmpfd;
|
int tmpfd;
|
||||||
int i;
|
int i;
|
||||||
uint32 magic = REPLICATION_STATE_MAGIC;
|
uint32 magic = REPLICATION_STATE_MAGIC;
|
||||||
@ -698,7 +702,7 @@ CheckPointReplicationOrigin(void)
|
|||||||
void
|
void
|
||||||
StartupReplicationOrigin(void)
|
StartupReplicationOrigin(void)
|
||||||
{
|
{
|
||||||
const char *path = "pg_logical/replorigin_checkpoint";
|
const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
|
||||||
int fd;
|
int fd;
|
||||||
int readBytes;
|
int readBytes;
|
||||||
uint32 magic = REPLICATION_STATE_MAGIC;
|
uint32 magic = REPLICATION_STATE_MAGIC;
|
||||||
|
@ -5081,7 +5081,7 @@ ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
|
|||||||
int readBytes;
|
int readBytes;
|
||||||
LogicalRewriteMappingData map;
|
LogicalRewriteMappingData map;
|
||||||
|
|
||||||
sprintf(path, "pg_logical/mappings/%s", fname);
|
sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
|
||||||
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
|
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
|
||||||
if (fd < 0)
|
if (fd < 0)
|
||||||
ereport(ERROR,
|
ereport(ERROR,
|
||||||
@ -5197,8 +5197,8 @@ UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
|
|||||||
ListCell *file;
|
ListCell *file;
|
||||||
Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
|
Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
|
||||||
|
|
||||||
mapping_dir = AllocateDir("pg_logical/mappings");
|
mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
|
||||||
while ((mapping_de = ReadDir(mapping_dir, "pg_logical/mappings")) != NULL)
|
while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
|
||||||
{
|
{
|
||||||
Oid f_dboid;
|
Oid f_dboid;
|
||||||
Oid f_relid;
|
Oid f_relid;
|
||||||
|
@ -1654,7 +1654,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
|
|||||||
* unless the user used pg_resetwal or similar. If a user did so, there's
|
* unless the user used pg_resetwal or similar. If a user did so, there's
|
||||||
* no hope continuing to decode anyway.
|
* no hope continuing to decode anyway.
|
||||||
*/
|
*/
|
||||||
sprintf(path, "pg_logical/snapshots/%X-%X.snap",
|
sprintf(path, "%s/%X-%X.snap",
|
||||||
|
PG_LOGICAL_SNAPSHOTS_DIR,
|
||||||
LSN_FORMAT_ARGS(lsn));
|
LSN_FORMAT_ARGS(lsn));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1681,7 +1682,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
|
|||||||
* be safely on disk.
|
* be safely on disk.
|
||||||
*/
|
*/
|
||||||
fsync_fname(path, false);
|
fsync_fname(path, false);
|
||||||
fsync_fname("pg_logical/snapshots", true);
|
fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
|
||||||
|
|
||||||
builder->last_serialized_snapshot = lsn;
|
builder->last_serialized_snapshot = lsn;
|
||||||
goto out;
|
goto out;
|
||||||
@ -1697,7 +1698,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
|
|||||||
elog(DEBUG1, "serializing snapshot to %s", path);
|
elog(DEBUG1, "serializing snapshot to %s", path);
|
||||||
|
|
||||||
/* to make sure only we will write to this tempfile, include pid */
|
/* to make sure only we will write to this tempfile, include pid */
|
||||||
sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
|
sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
|
||||||
|
PG_LOGICAL_SNAPSHOTS_DIR,
|
||||||
LSN_FORMAT_ARGS(lsn), MyProcPid);
|
LSN_FORMAT_ARGS(lsn), MyProcPid);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1818,7 +1820,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
|
|||||||
(errcode_for_file_access(),
|
(errcode_for_file_access(),
|
||||||
errmsg("could not close file \"%s\": %m", tmppath)));
|
errmsg("could not close file \"%s\": %m", tmppath)));
|
||||||
|
|
||||||
fsync_fname("pg_logical/snapshots", true);
|
fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We may overwrite the work from some other backend, but that's ok, our
|
* We may overwrite the work from some other backend, but that's ok, our
|
||||||
@ -1834,7 +1836,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
|
|||||||
|
|
||||||
/* make sure we persist */
|
/* make sure we persist */
|
||||||
fsync_fname(path, false);
|
fsync_fname(path, false);
|
||||||
fsync_fname("pg_logical/snapshots", true);
|
fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Now there's no way we can lose the dumped state anymore, remember this
|
* Now there's no way we can lose the dumped state anymore, remember this
|
||||||
@ -1871,7 +1873,8 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
|
|||||||
if (builder->state == SNAPBUILD_CONSISTENT)
|
if (builder->state == SNAPBUILD_CONSISTENT)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
sprintf(path, "pg_logical/snapshots/%X-%X.snap",
|
sprintf(path, "%s/%X-%X.snap",
|
||||||
|
PG_LOGICAL_SNAPSHOTS_DIR,
|
||||||
LSN_FORMAT_ARGS(lsn));
|
LSN_FORMAT_ARGS(lsn));
|
||||||
|
|
||||||
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
|
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
|
||||||
@ -1892,7 +1895,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
|
|||||||
* ----
|
* ----
|
||||||
*/
|
*/
|
||||||
fsync_fname(path, false);
|
fsync_fname(path, false);
|
||||||
fsync_fname("pg_logical/snapshots", true);
|
fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
|
||||||
|
|
||||||
|
|
||||||
/* read statically sized portion of snapshot */
|
/* read statically sized portion of snapshot */
|
||||||
@ -2074,7 +2077,7 @@ CheckPointSnapBuild(void)
|
|||||||
XLogRecPtr redo;
|
XLogRecPtr redo;
|
||||||
DIR *snap_dir;
|
DIR *snap_dir;
|
||||||
struct dirent *snap_de;
|
struct dirent *snap_de;
|
||||||
char path[MAXPGPATH + 21];
|
char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We start off with a minimum of the last redo pointer. No new
|
* We start off with a minimum of the last redo pointer. No new
|
||||||
@ -2090,8 +2093,8 @@ CheckPointSnapBuild(void)
|
|||||||
if (redo < cutoff)
|
if (redo < cutoff)
|
||||||
cutoff = redo;
|
cutoff = redo;
|
||||||
|
|
||||||
snap_dir = AllocateDir("pg_logical/snapshots");
|
snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
|
||||||
while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
|
while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
|
||||||
{
|
{
|
||||||
uint32 hi;
|
uint32 hi;
|
||||||
uint32 lo;
|
uint32 lo;
|
||||||
@ -2102,7 +2105,7 @@ CheckPointSnapBuild(void)
|
|||||||
strcmp(snap_de->d_name, "..") == 0)
|
strcmp(snap_de->d_name, "..") == 0)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
|
snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
|
||||||
de_type = get_dirent_type(path, snap_de, false, DEBUG1);
|
de_type = get_dirent_type(path, snap_de, false, DEBUG1);
|
||||||
|
|
||||||
if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
|
if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
|
||||||
@ -2162,7 +2165,8 @@ SnapBuildSnapshotExists(XLogRecPtr lsn)
|
|||||||
int ret;
|
int ret;
|
||||||
struct stat stat_buf;
|
struct stat stat_buf;
|
||||||
|
|
||||||
sprintf(path, "pg_logical/snapshots/%X-%X.snap",
|
sprintf(path, "%s/%X-%X.snap",
|
||||||
|
PG_LOGICAL_SNAPSHOTS_DIR,
|
||||||
LSN_FORMAT_ARGS(lsn));
|
LSN_FORMAT_ARGS(lsn));
|
||||||
|
|
||||||
ret = stat(path, &stat_buf);
|
ret = stat(path, &stat_buf);
|
||||||
|
@ -690,21 +690,23 @@ pg_ls_archive_statusdir(PG_FUNCTION_ARGS)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Function to return the list of files in the pg_logical/snapshots directory.
|
* Function to return the list of files in the PG_LOGICAL_SNAPSHOTS_DIR
|
||||||
|
* directory.
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
pg_ls_logicalsnapdir(PG_FUNCTION_ARGS)
|
pg_ls_logicalsnapdir(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
return pg_ls_dir_files(fcinfo, "pg_logical/snapshots", false);
|
return pg_ls_dir_files(fcinfo, PG_LOGICAL_SNAPSHOTS_DIR, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Function to return the list of files in the pg_logical/mappings directory.
|
* Function to return the list of files in the PG_LOGICAL_MAPPINGS_DIR
|
||||||
|
* directory.
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
pg_ls_logicalmapdir(PG_FUNCTION_ARGS)
|
pg_ls_logicalmapdir(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
return pg_ls_dir_files(fcinfo, "pg_logical/mappings", false);
|
return pg_ls_dir_files(fcinfo, PG_LOGICAL_MAPPINGS_DIR, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -18,6 +18,11 @@
|
|||||||
#include "utils/snapshot.h"
|
#include "utils/snapshot.h"
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
|
/* paths for logical decoding data (relative to installation's $PGDATA) */
|
||||||
|
#define PG_LOGICAL_DIR "pg_logical"
|
||||||
|
#define PG_LOGICAL_MAPPINGS_DIR PG_LOGICAL_DIR "/mappings"
|
||||||
|
#define PG_LOGICAL_SNAPSHOTS_DIR PG_LOGICAL_DIR "/snapshots"
|
||||||
|
|
||||||
/* GUC variables */
|
/* GUC variables */
|
||||||
extern PGDLLIMPORT int logical_decoding_work_mem;
|
extern PGDLLIMPORT int logical_decoding_work_mem;
|
||||||
extern PGDLLIMPORT int debug_logical_replication_streaming;
|
extern PGDLLIMPORT int debug_logical_replication_streaming;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user