diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index f753c6e232..8c64d42dda 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -58,6 +58,7 @@ typedef struct ConnCacheEntry bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ + bool parallel_commit; /* do we commit (sub)xacts in parallel? */ bool invalidated; /* true if reconnect is pending */ bool keep_connections; /* setting value of keep_connections * server option */ @@ -92,6 +93,9 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void configure_remote_session(PGconn *conn); +static void do_sql_command_begin(PGconn *conn, const char *sql); +static void do_sql_command_end(PGconn *conn, const char *sql, + bool consume_input); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, @@ -100,6 +104,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, void *arg); static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); +static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); @@ -107,6 +112,9 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out); static void pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel); +static void pgfdw_finish_pre_commit_cleanup(List *pending_entries); +static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, + int curlevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); @@ -316,14 +324,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) * is changed will be closed and re-made later. * * By default, all the connections to any foreign servers are kept open. + * + * Also determine whether to commit (sub)transactions opened on the remote + * server in parallel at (sub)transaction end. */ entry->keep_connections = true; + entry->parallel_commit = false; foreach(lc, server->options) { DefElem *def = (DefElem *) lfirst(lc); if (strcmp(def->defname, "keep_connections") == 0) entry->keep_connections = defGetBoolean(def); + else if (strcmp(def->defname, "parallel_commit") == 0) + entry->parallel_commit = defGetBoolean(def); } /* Now try to make the connection */ @@ -622,10 +636,30 @@ configure_remote_session(PGconn *conn) */ void do_sql_command(PGconn *conn, const char *sql) +{ + do_sql_command_begin(conn, sql); + do_sql_command_end(conn, sql, false); +} + +static void +do_sql_command_begin(PGconn *conn, const char *sql) +{ + if (!PQsendQuery(conn, sql)) + pgfdw_report_error(ERROR, NULL, conn, false, sql); +} + +static void +do_sql_command_end(PGconn *conn, const char *sql, bool consume_input) { PGresult *res; - if (!PQsendQuery(conn, sql)) + /* + * If requested, consume whatever data is available from the socket. + * (Note that if all data is available, this allows pgfdw_get_result to + * call PQgetResult without forcing the overhead of WaitLatchOrSocket, + * which would be large compared to the overhead of PQconsumeInput.) + */ + if (consume_input && !PQconsumeInput(conn)) pgfdw_report_error(ERROR, NULL, conn, false, sql); res = pgfdw_get_result(conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) @@ -888,6 +922,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) { HASH_SEQ_STATUS scan; ConnCacheEntry *entry; + List *pending_entries = NIL; /* Quick exit if no connections were touched in this transaction. */ if (!xact_got_connection) @@ -925,6 +960,12 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Commit all remote transactions during pre-commit */ entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, "COMMIT TRANSACTION"); + pending_entries = lappend(pending_entries, entry); + continue; + } do_sql_command(entry->conn, "COMMIT TRANSACTION"); entry->changing_xact_state = false; @@ -981,23 +1022,15 @@ pgfdw_xact_callback(XactEvent event, void *arg) } /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; + pgfdw_reset_xact_state(entry, true); + } - /* - * If the connection isn't in a good idle state, it is marked as - * invalid or keep_connections option of its server is disabled, then - * discard it to recover. Next GetConnection will open a new - * connection. - */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state || - entry->invalidated || - !entry->keep_connections) - { - elog(DEBUG3, "discarding connection %p", entry->conn); - disconnect_pg_server(entry); - } + /* If there are any pending connections, finish cleaning them up */ + if (pending_entries) + { + Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT || + event == XACT_EVENT_PRE_COMMIT); + pgfdw_finish_pre_commit_cleanup(pending_entries); } /* @@ -1021,6 +1054,7 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, HASH_SEQ_STATUS scan; ConnCacheEntry *entry; int curlevel; + List *pending_entries = NIL; /* Nothing to do at subxact start, nor after commit. */ if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || @@ -1063,6 +1097,12 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, /* Commit all remote subtransactions during pre-commit */ snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, sql); + pending_entries = lappend(pending_entries, entry); + continue; + } do_sql_command(entry->conn, sql); entry->changing_xact_state = false; } @@ -1076,7 +1116,14 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, } /* OK, we're outta that level of subtransaction */ - entry->xact_depth--; + pgfdw_reset_xact_state(entry, false); + } + + /* If there are any pending connections, finish cleaning them up */ + if (pending_entries) + { + Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB); + pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel); } } @@ -1169,6 +1216,40 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) server->servername))); } +/* + * Reset state to show we're out of a (sub)transaction. + */ +static void +pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) +{ + if (toplevel) + { + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + + /* + * If the connection isn't in a good idle state, it is marked as + * invalid or keep_connections option of its server is disabled, then + * discard it to recover. Next GetConnection will open a new + * connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state || + entry->invalidated || + !entry->keep_connections) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + } + } + else + { + /* Reset state to show we're out of a subtransaction */ + entry->xact_depth--; + } +} + /* * Cancel the currently-in-progress query (whose query text we do not have) * and ignore the result. Returns true if we successfully cancel the query @@ -1456,6 +1537,112 @@ pgfdw_abort_cleanup(ConnCacheEntry *entry, const char *sql, bool toplevel) entry->changing_xact_state = false; } +/* + * Finish pre-commit cleanup of connections on each of which we've sent a + * COMMIT command to the remote server. + */ +static void +pgfdw_finish_pre_commit_cleanup(List *pending_entries) +{ + ConnCacheEntry *entry; + List *pending_deallocs = NIL; + ListCell *lc; + + Assert(pending_entries); + + /* + * Get the result of the COMMIT command for each of the pending entries + */ + foreach(lc, pending_entries) + { + entry = (ConnCacheEntry *) lfirst(lc); + + Assert(entry->changing_xact_state); + /* + * We might already have received the result on the socket, so pass + * consume_input=true to try to consume it first + */ + do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true); + entry->changing_xact_state = false; + + /* Do a DEALLOCATE ALL in parallel if needed */ + if (entry->have_prep_stmt && entry->have_error) + { + /* Ignore errors (see notes in pgfdw_xact_callback) */ + if (PQsendQuery(entry->conn, "DEALLOCATE ALL")) + { + pending_deallocs = lappend(pending_deallocs, entry); + continue; + } + } + entry->have_prep_stmt = false; + entry->have_error = false; + + pgfdw_reset_xact_state(entry, true); + } + + /* No further work if no pending entries */ + if (!pending_deallocs) + return; + + /* + * Get the result of the DEALLOCATE command for each of the pending + * entries + */ + foreach(lc, pending_deallocs) + { + PGresult *res; + + entry = (ConnCacheEntry *) lfirst(lc); + + /* Ignore errors (see notes in pgfdw_xact_callback) */ + while ((res = PQgetResult(entry->conn)) != NULL) + { + PQclear(res); + /* Stop if the connection is lost (else we'll loop infinitely) */ + if (PQstatus(entry->conn) == CONNECTION_BAD) + break; + } + entry->have_prep_stmt = false; + entry->have_error = false; + + pgfdw_reset_xact_state(entry, true); + } +} + +/* + * Finish pre-subcommit cleanup of connections on each of which we've sent a + * RELEASE command to the remote server. + */ +static void +pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel) +{ + ConnCacheEntry *entry; + char sql[100]; + ListCell *lc; + + Assert(pending_entries); + + /* + * Get the result of the RELEASE command for each of the pending entries + */ + snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + foreach(lc, pending_entries) + { + entry = (ConnCacheEntry *) lfirst(lc); + + Assert(entry->changing_xact_state); + /* + * We might already have received the result on the socket, so pass + * consume_input=true to try to consume it first + */ + do_sql_command_end(entry->conn, sql, true); + entry->changing_xact_state = false; + + pgfdw_reset_xact_state(entry, false); + } +} + /* * List active foreign server connections. * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 057342083c..f210f91188 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9509,7 +9509,7 @@ DO $d$ END; $d$; ERROR: invalid option "password" -HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, keep_connections +HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, sslsni, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, truncatable, fetch_size, batch_size, async_capable, parallel_commit, keep_connections CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')" PL/pgSQL function inline_code_block line 3 at EXECUTE -- If we add a password for our user mapping instead, we should get a different @@ -10933,3 +10933,79 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity --Clean up RESET postgres_fdw.application_name; RESET debug_discard_caches; +-- =================================================================== +-- test parallel commit +-- =================================================================== +ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true'); +CREATE TABLE ploc1 (f1 int, f2 text); +CREATE FOREIGN TABLE prem1 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc1'); +CREATE TABLE ploc2 (f1 int, f2 text); +CREATE FOREIGN TABLE prem2 (f1 int, f2 text) + SERVER loopback2 OPTIONS (table_name 'ploc2'); +BEGIN; +INSERT INTO prem1 VALUES (101, 'foo'); +INSERT INTO prem2 VALUES (201, 'bar'); +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+----- + 101 | foo +(1 row) + +SELECT * FROM prem2; + f1 | f2 +-----+----- + 201 | bar +(1 row) + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (102, 'foofoo'); +INSERT INTO prem2 VALUES (202, 'barbar'); +RELEASE SAVEPOINT s; +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo +(2 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar +(2 rows) + +-- This tests executing DEALLOCATE ALL against foreign servers in parallel +-- during pre-commit +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (103, 'baz'); +INSERT INTO prem2 VALUES (203, 'qux'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (104, 'bazbaz'); +INSERT INTO prem2 VALUES (204, 'quxqux'); +COMMIT; +SELECT * FROM prem1; + f1 | f2 +-----+-------- + 101 | foo + 102 | foofoo + 104 | bazbaz +(3 rows) + +SELECT * FROM prem2; + f1 | f2 +-----+-------- + 201 | bar + 202 | barbar + 204 | quxqux +(3 rows) + +ALTER SERVER loopback OPTIONS (DROP parallel_commit); +ALTER SERVER loopback2 OPTIONS (DROP parallel_commit); diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 2c6b2894b9..572591a558 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -121,6 +121,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) strcmp(def->defname, "updatable") == 0 || strcmp(def->defname, "truncatable") == 0 || strcmp(def->defname, "async_capable") == 0 || + strcmp(def->defname, "parallel_commit") == 0 || strcmp(def->defname, "keep_connections") == 0) { /* these accept only boolean values */ @@ -249,6 +250,7 @@ InitPgFdwOptions(void) /* async_capable is available on both server and table */ {"async_capable", ForeignServerRelationId, false}, {"async_capable", ForeignTableRelationId, false}, + {"parallel_commit", ForeignServerRelationId, false}, {"keep_connections", ForeignServerRelationId, false}, {"password_required", UserMappingRelationId, false}, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 6c9f579c41..95b6b7192e 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3515,3 +3515,49 @@ SELECT pg_terminate_backend(pid, 180000) FROM pg_stat_activity --Clean up RESET postgres_fdw.application_name; RESET debug_discard_caches; + +-- =================================================================== +-- test parallel commit +-- =================================================================== +ALTER SERVER loopback OPTIONS (ADD parallel_commit 'true'); +ALTER SERVER loopback2 OPTIONS (ADD parallel_commit 'true'); + +CREATE TABLE ploc1 (f1 int, f2 text); +CREATE FOREIGN TABLE prem1 (f1 int, f2 text) + SERVER loopback OPTIONS (table_name 'ploc1'); +CREATE TABLE ploc2 (f1 int, f2 text); +CREATE FOREIGN TABLE prem2 (f1 int, f2 text) + SERVER loopback2 OPTIONS (table_name 'ploc2'); + +BEGIN; +INSERT INTO prem1 VALUES (101, 'foo'); +INSERT INTO prem2 VALUES (201, 'bar'); +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (102, 'foofoo'); +INSERT INTO prem2 VALUES (202, 'barbar'); +RELEASE SAVEPOINT s; +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +-- This tests executing DEALLOCATE ALL against foreign servers in parallel +-- during pre-commit +BEGIN; +SAVEPOINT s; +INSERT INTO prem1 VALUES (103, 'baz'); +INSERT INTO prem2 VALUES (203, 'qux'); +ROLLBACK TO SAVEPOINT s; +RELEASE SAVEPOINT s; +INSERT INTO prem1 VALUES (104, 'bazbaz'); +INSERT INTO prem2 VALUES (204, 'quxqux'); +COMMIT; +SELECT * FROM prem1; +SELECT * FROM prem2; + +ALTER SERVER loopback OPTIONS (DROP parallel_commit); +ALTER SERVER loopback2 OPTIONS (DROP parallel_commit); diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index dc57fe4b0d..8ebf0dc3a0 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -456,6 +456,52 @@ OPTIONS (ADD password_required 'false'); + + Transaction Management Options + + + When multiple remote (sub)transactions are involved in a local + (sub)transaction, by default postgres_fdw commits + those remote (sub)transactions one by one when the local (sub)transaction + commits. + Performance can be improved with the following option: + + + + + + parallel_commit (boolean) + + + This option controls whether postgres_fdw commits + remote (sub)transactions opened on a foreign server in a local + (sub)transaction in parallel when the local (sub)transaction commits. + This option can only be specified for foreign servers, not per-table. + The default is false. + + + + If multiple foreign servers with this option enabled are involved in + a local (sub)transaction, multiple remote (sub)transactions opened on + those foreign servers in the local (sub)transaction are committed in + parallel across those foreign servers when the local (sub)transaction + commits. + + + + For a foreign server with this option enabled, if many remote + (sub)transactions are opened on the foreign server in a local + (sub)transaction, this option might increase the remote server’s load + when the local (sub)transaction commits, so be careful when using this + option. + + + + + + + + Updatability Options