diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index b7aa128f7f..f790c56003 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -382,6 +382,18 @@ PostgreSQL documentation + + + + + + This option causes the replication slot specified by the + option --slot to be created before starting the + backup. In this case, an error is raised if the slot already exists. + + + + @@ -462,6 +474,10 @@ PostgreSQL documentation the server does not remove any necessary WAL data in the time between the end of the base backup and the start of streaming replication. + + The specified replication slot has to exist unless the + option is also used. + If this option is not specified and the server supports temporary replication slots (version 10 and later), then a temporary replication diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 537978090e..dac7299ff4 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -93,6 +93,8 @@ static pg_time_t last_progress_report = 0; static int32 maxrate = 0; /* no limit by default */ static char *replication_slot = NULL; static bool temp_replication_slot = true; +static bool create_slot = false; +static bool no_slot = false; static bool success = false; static bool made_new_pgdata = false; @@ -346,6 +348,7 @@ usage(void) printf(_("\nGeneral options:\n")); printf(_(" -c, --checkpoint=fast|spread\n" " set fast or spread checkpointing\n")); + printf(_(" -C, --create-slot create replication slot\n")); printf(_(" -l, --label=LABEL set backup label\n")); printf(_(" -n, --no-clean do not clean up after errors\n")); printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n")); @@ -466,7 +469,6 @@ typedef struct char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */ char *sysidentifier; int timeline; - bool temp_slot; } logstreamer_param; static int @@ -492,9 +494,6 @@ LogStreamerMain(logstreamer_param *param) stream.mark_done = true; stream.partial_suffix = NULL; stream.replication_slot = replication_slot; - stream.temp_slot = param->temp_slot; - if (stream.temp_slot && !stream.replication_slot) - stream.replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn)); if (format == 'p') stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync); @@ -583,9 +582,29 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) /* Temporary replication slots are only supported in 10 and newer */ if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_TEMP_SLOTS) - param->temp_slot = false; - else - param->temp_slot = temp_replication_slot; + temp_replication_slot = false; + + /* + * Create replication slot if requested + */ + if (temp_replication_slot && !replication_slot) + replication_slot = psprintf("pg_basebackup_%d", (int) PQbackendPID(param->bgconn)); + if (temp_replication_slot || create_slot) + { + if (!CreateReplicationSlot(param->bgconn, replication_slot, NULL, + temp_replication_slot, true, true, false)) + disconnect_and_exit(1); + + if (verbose) + { + if (temp_replication_slot) + fprintf(stderr, _("%s: created temporary replication slot \"%s\"\n"), + progname, replication_slot); + else + fprintf(stderr, _("%s: created replication slot \"%s\"\n"), + progname, replication_slot); + } + } if (format == 'p') { @@ -2079,6 +2098,7 @@ main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, + {"create-slot", no_argument, NULL, 'C'}, {"max-rate", required_argument, NULL, 'r'}, {"write-recovery-conf", no_argument, NULL, 'R'}, {"slot", required_argument, NULL, 'S'}, @@ -2105,7 +2125,6 @@ main(int argc, char **argv) int c; int option_index; - bool no_slot = false; progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup")); @@ -2127,11 +2146,14 @@ main(int argc, char **argv) atexit(cleanup_directories_atexit); - while ((c = getopt_long(argc, argv, "D:F:r:RT:X:l:nNzZ:d:c:h:p:U:s:S:wWvP", + while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) { + case 'C': + create_slot = true; + break; case 'D': basedir = pg_strdup(optarg); break; @@ -2348,6 +2370,29 @@ main(int argc, char **argv) temp_replication_slot = false; } + if (create_slot) + { + if (!replication_slot) + { + fprintf(stderr, + _("%s: --create-slot needs a slot to be specified using --slot\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (no_slot) + { + fprintf(stderr, + _("%s: --create-slot and --no-slot are incompatible options\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + } + if (xlog_dir) { if (format != 'p') diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c index fbac0df93d..888ae6c571 100644 --- a/src/bin/pg_basebackup/pg_receivewal.c +++ b/src/bin/pg_basebackup/pg_receivewal.c @@ -431,7 +431,6 @@ StreamLog(void) stream.do_sync); stream.partial_suffix = ".partial"; stream.replication_slot = replication_slot; - stream.temp_slot = false; ReceiveXlogStream(conn, &stream); @@ -728,7 +727,7 @@ main(int argc, char **argv) _("%s: creating replication slot \"%s\"\n"), progname, replication_slot); - if (!CreateReplicationSlot(conn, replication_slot, NULL, true, + if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false, slot_exists_ok)) disconnect_and_exit(1); disconnect_and_exit(0); diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 6811a55e76..3109d0f99f 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -979,8 +979,8 @@ main(int argc, char **argv) _("%s: creating replication slot \"%s\"\n"), progname, replication_slot); - if (!CreateReplicationSlot(conn, replication_slot, plugin, - false, slot_exists_ok)) + if (!CreateReplicationSlot(conn, replication_slot, plugin, false, + false, false, slot_exists_ok)) disconnect_and_exit(1); startpos = InvalidXLogRecPtr; } diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 65931f6454..07509cb825 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -522,24 +522,6 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) PQclear(res); } - /* - * Create temporary replication slot if one is needed - */ - if (stream->temp_slot) - { - snprintf(query, sizeof(query), - "CREATE_REPLICATION_SLOT \"%s\" TEMPORARY PHYSICAL RESERVE_WAL", - stream->replication_slot); - res = PQexec(conn, query); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - fprintf(stderr, _("%s: could not create temporary replication slot \"%s\": %s"), - progname, stream->replication_slot, PQerrorMessage(conn)); - PQclear(res); - return false; - } - } - /* * initialize flush position to starting point, it's the caller's * responsibility that that's sane. diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h index bb786ce289..5b8c33fc26 100644 --- a/src/bin/pg_basebackup/receivelog.h +++ b/src/bin/pg_basebackup/receivelog.h @@ -47,7 +47,6 @@ typedef struct StreamCtl WalWriteMethod *walmethod; /* How to write the WAL */ char *partial_suffix; /* Suffix appended to partially received files */ char *replication_slot; /* Replication slot to use, or NULL */ - bool temp_slot; /* Create temporary replication slot */ } StreamCtl; diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c index df17f60596..81fef8cd51 100644 --- a/src/bin/pg_basebackup/streamutil.c +++ b/src/bin/pg_basebackup/streamutil.c @@ -398,7 +398,8 @@ RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, */ bool CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, - bool is_physical, bool slot_exists_ok) + bool is_temporary, bool is_physical, bool reserve_wal, + bool slot_exists_ok) { PQExpBuffer query; PGresult *res; @@ -410,13 +411,18 @@ CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin, Assert(slot_name != NULL); /* Build query */ + appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name); + if (is_temporary) + appendPQExpBuffer(query, " TEMPORARY"); if (is_physical) - appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" PHYSICAL", - slot_name); + { + appendPQExpBuffer(query, " PHYSICAL"); + if (reserve_wal) + appendPQExpBuffer(query, " RESERVE_WAL"); + } else { - appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", - slot_name, plugin); + appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin); if (PQserverVersion(conn) >= 100000) /* pg_recvlogical doesn't use an exported snapshot, so suppress */ appendPQExpBuffer(query, " NOEXPORT_SNAPSHOT"); diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h index ec227712d5..908fd68c2b 100644 --- a/src/bin/pg_basebackup/streamutil.h +++ b/src/bin/pg_basebackup/streamutil.h @@ -33,8 +33,9 @@ extern PGconn *GetConnection(void); /* Replication commands */ extern bool CreateReplicationSlot(PGconn *conn, const char *slot_name, - const char *plugin, bool is_physical, - bool slot_exists_ok); + const char *plugin, bool is_temporary, + bool is_physical, bool reserve_wal, + bool slot_exists_ok); extern bool DropReplicationSlot(PGconn *conn, const char *slot_name); extern bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl index cce14b83e1..6a8be09f4c 100644 --- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl +++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl @@ -4,7 +4,7 @@ use Cwd; use Config; use PostgresNode; use TestLib; -use Test::More tests => 72; +use Test::More tests => 78; program_help_ok('pg_basebackup'); program_version_ok('pg_basebackup'); @@ -259,9 +259,32 @@ $node->command_fails( [ 'pg_basebackup', '-D', "$tempdir/backupxs_sl_fail", '-X', 'stream', '-S', - 'slot1' ], + 'slot0' ], 'pg_basebackup fails with nonexistent replication slot'); +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' ], + 'pg_basebackup -C fails without slot name'); + +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0', '--no-slot' ], + 'pg_basebackup fails with -C -S --no-slot'); + +$node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0' ], + 'pg_basebackup -C runs'); + +is($node->safe_psql('postgres', q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'slot0'}), + 'slot0', + 'replication slot was created'); +isnt($node->safe_psql('postgres', q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot0'}), + '', + 'restart LSN of new slot is not null'); + +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot1", '-C', '-S', 'slot0' ], + 'pg_basebackup fails with -C -S and a previously existing slot'); + $node->safe_psql('postgres', q{SELECT * FROM pg_create_physical_replication_slot('slot1')}); my $lsn = $node->safe_psql('postgres',