diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 5b4bd71694..505e38376c 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,6 +131,9 @@ static const struct }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain + }, + { + "TablesyncWorkerMain", TablesyncWorkerMain } }; diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 6fb96148f4..1d4e83c4c1 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -942,7 +942,7 @@ ParallelApplyWorkerMain(Datum main_arg) MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = MyLogicalRepWorker->reply_time = 0; - InitializeApplyWorker(); + InitializeLogRepWorker(); InitializingApplyWorker = false; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 542af7d863..e231fa7f95 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -459,24 +459,30 @@ retry: snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); if (is_parallel_apply_worker) + { snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain"); - else - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); - - if (OidIsValid(relid)) snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication worker for subscription %u sync %u", subid, relid); - else if (is_parallel_apply_worker) - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication parallel apply worker for subscription %u", subid); - else - snprintf(bgw.bgw_name, BGW_MAXLEN, - "logical replication apply worker for subscription %u", subid); - - if (is_parallel_apply_worker) + "logical replication parallel apply worker for subscription %u", + subid); snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); + } + else if (OidIsValid(relid)) + { + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication tablesync worker for subscription %u sync %u", + subid, + relid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication tablesync worker"); + } else - snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); + { + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication apply worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication apply worker"); + } bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6d461654ab..651a775065 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -106,6 +106,7 @@ #include "pgstat.h" #include "replication/logicallauncher.h" #include "replication/logicalrelation.h" +#include "replication/logicalworker.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "replication/slot.h" @@ -1241,7 +1242,7 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid, * * The returned slot name is palloc'ed in current memory context. */ -char * +static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) { char *slotname; @@ -1584,6 +1585,94 @@ FetchTableStates(bool *started_tx) return has_subrels; } +/* + * Execute the initial sync with error handling. Disable the subscription, + * if it's required. + * + * Allocate the slot name in long-lived context on return. Note that we don't + * handle FATAL errors which are probably because of system resource error and + * are not repeatable. + */ +static void +start_table_sync(XLogRecPtr *origin_startpos, char **slotname) +{ + char *sync_slotname = NULL; + + Assert(am_tablesync_worker()); + + PG_TRY(); + { + /* Call initial sync. */ + sync_slotname = LogicalRepSyncTableStart(origin_startpos); + } + PG_CATCH(); + { + if (MySubscription->disableonerr) + DisableSubscriptionAndExit(); + else + { + /* + * Report the worker failed during table synchronization. Abort + * the current transaction so that the stats message is sent in an + * idle state. + */ + AbortOutOfAnyTransaction(); + pgstat_report_subscription_error(MySubscription->oid, false); + + PG_RE_THROW(); + } + } + PG_END_TRY(); + + /* allocate slot name in long-lived context */ + *slotname = MemoryContextStrdup(ApplyContext, sync_slotname); + pfree(sync_slotname); +} + +/* + * Runs the tablesync worker. + * + * It starts syncing tables. After a successful sync, sets streaming options + * and starts streaming to catchup with apply worker. + */ +static void +run_tablesync_worker() +{ + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *slotname = NULL; + WalRcvStreamOptions options; + + start_table_sync(&origin_startpos, &slotname); + + ReplicationOriginNameForLogicalRep(MySubscription->oid, + MyLogicalRepWorker->relid, + originname, + sizeof(originname)); + + set_apply_error_context_origin(originname); + + set_stream_options(&options, slotname, &origin_startpos); + + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + /* Apply the changes till we catchup with the apply worker. */ + start_apply(origin_startpos); +} + +/* Logical Replication Tablesync worker entry point */ +void +TablesyncWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + + SetupApplyOrSyncWorker(worker_slot); + + run_tablesync_worker(); + + finish_sync_worker(); +} + /* * If the subscription has no tables then return false. * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 832b1cf764..a9f5fa7dfc 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -396,8 +396,6 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); -static void DisableSubscriptionAndExit(void); - static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -4327,6 +4325,57 @@ stream_open_and_write_change(TransactionId xid, char action, StringInfo s) stream_stop_internal(xid); } +/* + * Sets streaming options including replication slot name and origin start + * position. Workers need these options for logical replication. + */ +void +set_stream_options(WalRcvStreamOptions *options, + char *slotname, + XLogRecPtr *origin_startpos) +{ + int server_version; + + options->logical = true; + options->startpoint = *origin_startpos; + options->slotname = slotname; + + server_version = walrcv_server_version(LogRepWorkerWalRcvConn); + options->proto.logical.proto_version = + server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : + server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : + server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : + LOGICALREP_PROTO_VERSION_NUM; + + options->proto.logical.publication_names = MySubscription->publications; + options->proto.logical.binary = MySubscription->binary; + + /* + * Assign the appropriate option value for streaming option according to + * the 'streaming' mode and the publisher's ability to support that mode. + */ + if (server_version >= 160000 && + MySubscription->stream == LOGICALREP_STREAM_PARALLEL) + { + options->proto.logical.streaming_str = "parallel"; + MyLogicalRepWorker->parallel_apply = true; + } + else if (server_version >= 140000 && + MySubscription->stream != LOGICALREP_STREAM_OFF) + { + options->proto.logical.streaming_str = "on"; + MyLogicalRepWorker->parallel_apply = false; + } + else + { + options->proto.logical.streaming_str = NULL; + MyLogicalRepWorker->parallel_apply = false; + } + + options->proto.logical.twophase = false; + options->proto.logical.origin = pstrdup(MySubscription->origin); +} + /* * Cleanup the memory for subxacts and reset the related variables. */ @@ -4361,57 +4410,13 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) } /* - * Execute the initial sync with error handling. Disable the subscription, - * if it's required. - * - * Allocate the slot name in long-lived context on return. Note that we don't - * handle FATAL errors which are probably because of system resource error and - * are not repeatable. - */ -static void -start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) -{ - char *syncslotname = NULL; - - Assert(am_tablesync_worker()); - - PG_TRY(); - { - /* Call initial sync. */ - syncslotname = LogicalRepSyncTableStart(origin_startpos); - } - PG_CATCH(); - { - if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); - - PG_RE_THROW(); - } - } - PG_END_TRY(); - - /* allocate slot name in long-lived context */ - *myslotname = MemoryContextStrdup(ApplyContext, syncslotname); - pfree(syncslotname); -} - -/* - * Run the apply loop with error handling. Disable the subscription, - * if necessary. + * Common function to run the apply loop with error handling. Disable the + * subscription, if necessary. * * Note that we don't handle FATAL errors which are probably because * of system resource error and are not repeatable. */ -static void +void start_apply(XLogRecPtr origin_startpos) { PG_TRY(); @@ -4439,13 +4444,118 @@ start_apply(XLogRecPtr origin_startpos) } /* - * Common initialization for leader apply worker and parallel apply worker. + * Runs the leader apply worker. + * + * It sets up replication origin, streaming options and then starts streaming. + */ +static void +run_apply_worker() +{ + char originname[NAMEDATALEN]; + XLogRecPtr origin_startpos = InvalidXLogRecPtr; + char *slotname = NULL; + WalRcvStreamOptions options; + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + bool must_use_password; + + slotname = MySubscription->slotname; + + /* + * This shouldn't happen if the subscription is enabled, but guard against + * DDL bugs or manual catalog changes. (libpqwalreceiver will crash if + * slot is NULL.) + */ + if (!slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("subscription has no replication slot set"))); + + /* Setup replication origin tracking. */ + ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, + originname, sizeof(originname)); + StartTransactionCommand(); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + + /* Is the use of a password mandatory? */ + must_use_password = MySubscription->passwordrequired && + !superuser_arg(MySubscription->owner); + + /* Note that the superuser_arg call can access the DB */ + CommitTransactionCommand(); + + LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, + must_use_password, + MySubscription->name, &err); + + if (LogRepWorkerWalRcvConn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything but it does + * some initializations on the upstream so let's still call it. + */ + (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); + + set_apply_error_context_origin(originname); + + set_stream_options(&options, slotname, &origin_startpos); + + /* + * Even when the two_phase mode is requested by the user, it remains as + * the tri-state PENDING until all tablesyncs have reached READY state. + * Only then, can it become ENABLED. + * + * Note: If the subscription has no tables then leave the state as + * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && + AllTablesyncsReady()) + { + /* Start streaming with two_phase enabled */ + options.proto.logical.twophase = true; + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + + StartTransactionCommand(); + UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + CommitTransactionCommand(); + } + else + { + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); + } + + ereport(DEBUG1, + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + MySubscription->name, + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : + MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?"))); + + /* Run the main loop. */ + start_apply(origin_startpos); +} + +/* + * Common initialization for leader apply worker, parallel apply worker and + * tablesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. */ void -InitializeApplyWorker(void) +InitializeLogRepWorker(void) { MemoryContext oldctx; @@ -4518,22 +4628,15 @@ InitializeApplyWorker(void) CommitTransactionCommand(); } -/* Logical Replication Apply worker entry point */ +/* Common function to setup the leader apply or tablesync worker. */ void -ApplyWorkerMain(Datum main_arg) +SetupApplyOrSyncWorker(int worker_slot) { - int worker_slot = DatumGetInt32(main_arg); - char originname[NAMEDATALEN]; - XLogRecPtr origin_startpos = InvalidXLogRecPtr; - char *myslotname = NULL; - WalRcvStreamOptions options; - int server_version; - - InitializingApplyWorker = true; - /* Attach to slot */ logicalrep_worker_attach(worker_slot); + Assert(am_tablesync_worker() || am_leader_apply_worker()); + /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGTERM, die); @@ -4551,79 +4654,12 @@ ApplyWorkerMain(Datum main_arg) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - InitializeApplyWorker(); - - InitializingApplyWorker = false; + InitializeLogRepWorker(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - if (am_tablesync_worker()) - { - start_table_sync(&origin_startpos, &myslotname); - - ReplicationOriginNameForLogicalRep(MySubscription->oid, - MyLogicalRepWorker->relid, - originname, - sizeof(originname)); - set_apply_error_context_origin(originname); - } - else - { - /* This is the leader apply worker */ - RepOriginId originid; - TimeLineID startpointTLI; - char *err; - bool must_use_password; - - myslotname = MySubscription->slotname; - - /* - * This shouldn't happen if the subscription is enabled, but guard - * against DDL bugs or manual catalog changes. (libpqwalreceiver will - * crash if slot is NULL.) - */ - if (!myslotname) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("subscription has no replication slot set"))); - - /* Setup replication origin tracking. */ - StartTransactionCommand(); - ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, - originname, sizeof(originname)); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); - - /* Is the use of a password mandatory? */ - must_use_password = MySubscription->passwordrequired && - !superuser_arg(MySubscription->owner); - - /* Note that the superuser_arg call can access the DB */ - CommitTransactionCommand(); - - LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, - must_use_password, - MySubscription->name, &err); - if (LogRepWorkerWalRcvConn == NULL) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not connect to the publisher: %s", err))); - - /* - * We don't really use the output identify_system for anything but it - * does some initializations on the upstream so let's still call it. - */ - (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI); - - set_apply_error_context_origin(originname); - } - /* * Setup callback for syscache so that we know when something changes in * the subscription relation state. @@ -4631,91 +4667,21 @@ ApplyWorkerMain(Datum main_arg) CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, invalidate_syncing_table_states, (Datum) 0); +} - /* Build logical replication streaming options. */ - options.logical = true; - options.startpoint = origin_startpos; - options.slotname = myslotname; +/* Logical Replication Apply worker entry point */ +void +ApplyWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); - server_version = walrcv_server_version(LogRepWorkerWalRcvConn); - options.proto.logical.proto_version = - server_version >= 160000 ? LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM : - server_version >= 150000 ? LOGICALREP_PROTO_TWOPHASE_VERSION_NUM : - server_version >= 140000 ? LOGICALREP_PROTO_STREAM_VERSION_NUM : - LOGICALREP_PROTO_VERSION_NUM; + InitializingApplyWorker = true; - options.proto.logical.publication_names = MySubscription->publications; - options.proto.logical.binary = MySubscription->binary; + SetupApplyOrSyncWorker(worker_slot); - /* - * Assign the appropriate option value for streaming option according to - * the 'streaming' mode and the publisher's ability to support that mode. - */ - if (server_version >= 160000 && - MySubscription->stream == LOGICALREP_STREAM_PARALLEL) - { - options.proto.logical.streaming_str = "parallel"; - MyLogicalRepWorker->parallel_apply = true; - } - else if (server_version >= 140000 && - MySubscription->stream != LOGICALREP_STREAM_OFF) - { - options.proto.logical.streaming_str = "on"; - MyLogicalRepWorker->parallel_apply = false; - } - else - { - options.proto.logical.streaming_str = NULL; - MyLogicalRepWorker->parallel_apply = false; - } + InitializingApplyWorker = false; - options.proto.logical.twophase = false; - options.proto.logical.origin = pstrdup(MySubscription->origin); - - if (!am_tablesync_worker()) - { - /* - * Even when the two_phase mode is requested by the user, it remains - * as the tri-state PENDING until all tablesyncs have reached READY - * state. Only then, can it become ENABLED. - * - * Note: If the subscription has no tables then leave the state as - * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to - * work. - */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) - { - /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - - StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; - CommitTransactionCommand(); - } - else - { - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } - - ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", - MySubscription->name, - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : - MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : - "?"))); - } - else - { - /* Start normal logical streaming replication. */ - walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); - } - - /* Run the main loop. */ - start_apply(origin_startpos); + run_apply_worker(); proc_exit(0); } @@ -4724,7 +4690,7 @@ ApplyWorkerMain(Datum main_arg) * After error recovery, disable the subscription in a new transaction * and exit cleanly. */ -static void +void DisableSubscriptionAndExit(void) { /* diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 39588da79f..bbd71d0b42 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,6 +18,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); +extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 343e781896..672a7117c0 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -19,6 +19,7 @@ #include "datatype/timestamp.h" #include "miscadmin.h" #include "replication/logicalrelation.h" +#include "replication/walreceiver.h" #include "storage/buffile.h" #include "storage/fileset.h" #include "storage/lock.h" @@ -243,7 +244,6 @@ extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); -extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); extern bool AllTablesyncsReady(void); extern void UpdateTwoPhaseState(Oid suboid, char new_state); @@ -265,7 +265,17 @@ extern void maybe_reread_subscription(void); extern void stream_cleanup_files(Oid subid, TransactionId xid); -extern void InitializeApplyWorker(void); +extern void set_stream_options(WalRcvStreamOptions *options, + char *slotname, + XLogRecPtr *origin_startpos); + +extern void start_apply(XLogRecPtr origin_startpos); + +extern void InitializeLogRepWorker(void); + +extern void SetupApplyOrSyncWorker(int worker_slot); + +extern void DisableSubscriptionAndExit(void); extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);