diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ab5f3719fc..c5b2541319 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -227,17 +227,22 @@ textarray_to_stringlist(ArrayType *textarray) /* * Set the state of a subscription table. * + * If update_only is true and the record for given table doesn't exist, do + * nothing. This can be used to avoid inserting a new record that was deleted + * by someone else. Generally, subscription DDL commands should use false, + * workers should use true. + * * The insert-or-update logic in this function is not concurrency safe so it * might raise an error in rare circumstances. But if we took a stronger lock * such as ShareRowExclusiveLock, we would risk more deadlocks. */ Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) + XLogRecPtr sublsn, bool update_only) { Relation rel; HeapTuple tup; - Oid subrelid; + Oid subrelid = InvalidOid; bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; @@ -252,7 +257,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, * If the record for given table does not exist yet create new record, * otherwise update the existing one. */ - if (!HeapTupleIsValid(tup)) + if (!HeapTupleIsValid(tup) && !update_only) { /* Form the tuple. */ memset(values, 0, sizeof(values)); @@ -272,7 +277,7 @@ SetSubscriptionRelState(Oid subid, Oid relid, char state, heap_freetuple(tup); } - else + else if (HeapTupleIsValid(tup)) { bool replaces[Natts_pg_subscription_rel]; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index ad98b38efe..49737a9042 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -451,7 +451,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) rv->schemaname, rv->relname); SetSubscriptionRelState(subid, relid, table_state, - InvalidXLogRecPtr); + InvalidXLogRecPtr, false); } ereport(NOTICE, @@ -574,7 +574,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) { SetSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, - InvalidXLogRecPtr); + InvalidXLogRecPtr, false); ereport(NOTICE, (errmsg("added subscription for table %s.%s", quote_identifier(rv->schemaname), diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6fe39d2023..f57ae6ee2d 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -287,7 +287,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); walrcv_endstreaming(wrconn, &tli); finish_sync_worker(); @@ -414,7 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } SetSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn); + rstate->lsn, true); } } else @@ -845,7 +846,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); CommitTransactionCommand(); pgstat_report_stat(false); @@ -932,7 +934,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) SetSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, SUBREL_STATE_SYNCDONE, - *origin_startpos); + *origin_startpos, + true); finish_sync_worker(); } break; diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 391f96b76e..f5f6191676 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -71,7 +71,7 @@ typedef struct SubscriptionRelState } SubscriptionRelState; extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn); + XLogRecPtr sublsn, bool update_only); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, bool missing_ok); extern void RemoveSubscriptionRel(Oid subid, Oid relid);