postgres_fdw: Add connection status check to postgres_fdw_get_connections().

This commit extends the postgres_fdw_get_connections() function
to check if connections are closed. This is useful for detecting closed
postgres_fdw connections that could prevent successful transaction
commits. Users can roll back transactions immediately upon detecting
closed connections, avoiding unnecessary processing of failed
transactions.

This feature is available only on systems supporting the non-standard
POLLRDHUP extension to the poll system call, including Linux.

Author: Hayato Kuroda
Reviewed-by: Shinya Kato, Zhihong Yu, Kyotaro Horiguchi, Andres Freund
Reviewed-by: Onder Kalaci, Takamichi Osumi, Vignesh C, Tom Lane, Ted Yu
Reviewed-by: Katsuragi Yuta, Peter Smith, Shubham Khanna, Fujii Masao
Discussion: https://postgr.es/m/TYAPR01MB58662809E678253B90E82CE5F5889@TYAPR01MB5866.jpnprd01.prod.outlook.com
This commit is contained in:
Fujii Masao 2024-07-26 22:16:39 +09:00
parent c297a47c5f
commit 857df3cef7
5 changed files with 199 additions and 19 deletions

View File

@ -12,6 +12,10 @@
*/
#include "postgres.h"
#if HAVE_POLL_H
#include <poll.h>
#endif
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/pg_user_mapping.h"
@ -171,6 +175,8 @@ static bool UserMappingPasswordRequired(UserMapping *user);
static bool disconnect_cached_connections(Oid serverid);
static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
enum pgfdwVersion api_version);
static int pgfdw_conn_check(PGconn *conn);
static bool pgfdw_conn_checkable(void);
/*
* Get a PGconn which can be used to execute queries on the remote PostgreSQL
@ -1991,14 +1997,14 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
/* Number of output arguments (columns) for various API versions */
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 3
#define POSTGRES_FDW_GET_CONNECTIONS_COLS 3 /* maximum of above */
#define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 4
#define POSTGRES_FDW_GET_CONNECTIONS_COLS 4 /* maximum of above */
/*
* Internal function used by postgres_fdw_get_connections variants.
*
* For API version 1.1, this function returns a set of records with
* the following values:
* For API version 1.1, this function takes no input parameter and
* returns a set of records with the following values:
*
* - server_name - server name of active connection. In case the foreign server
* is dropped but still the connection is active, then the server name will
@ -2006,10 +2012,12 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
* - valid - true/false representing whether the connection is valid or not.
* Note that connections can become invalid in pgfdw_inval_callback.
*
* For API version 1.2 and later, this function returns the following
* additional value along with the two values from version 1.1:
* For API version 1.2 and later, this function takes an input parameter
* to check a connection status and returns the following
* additional values along with the two values from version 1.1:
*
* - used_in_xact - true if the connection is used in the current transaction.
* - closed: true if the connection is closed.
*
* No records are returned when there are no cached connections at all.
*/
@ -2101,8 +2109,19 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
if (api_version >= PGFDW_V1_2)
{
bool check_conn = PG_GETARG_BOOL(0);
/* Is this connection used in the current transaction? */
values[2] = BoolGetDatum(entry->xact_depth > 0);
/*
* If a connection status check is requested and supported, return
* whether the connection is closed. Otherwise, return NULL.
*/
if (check_conn && pgfdw_conn_checkable())
values[3] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
else
nulls[3] = true;
}
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
@ -2258,3 +2277,56 @@ disconnect_cached_connections(Oid serverid)
return result;
}
/*
* Check if the remote server closed the connection.
*
* Returns 1 if the connection is closed, -1 if an error occurred,
* and 0 if it's not closed or if the connection check is unavailable
* on this platform.
*/
static int
pgfdw_conn_check(PGconn *conn)
{
int sock = PQsocket(conn);
if (PQstatus(conn) != CONNECTION_OK || sock == -1)
return -1;
#if (defined(HAVE_POLL) && defined(POLLRDHUP))
{
struct pollfd input_fd;
int result;
input_fd.fd = sock;
input_fd.events = POLLRDHUP;
input_fd.revents = 0;
do
result = poll(&input_fd, 1, 0);
while (result < 0 && errno == EINTR);
if (result < 0)
return -1;
return (input_fd.revents & POLLRDHUP) ? 1 : 0;
}
#else
return 0;
#endif
}
/*
* Check if connection status checking is available on this platform.
*
* Returns true if available, false otherwise.
*/
static bool
pgfdw_conn_checkable(void)
{
#if (defined(HAVE_POLL) && defined(POLLRDHUP))
return true;
#else
return false;
#endif
}

View File

@ -10464,10 +10464,10 @@ drop cascades to foreign table ft7
-- should be output as invalid connections. Also the server name for
-- loopback3 should be NULL because the server was dropped.
SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid | used_in_xact
-------------+-------+--------------
loopback | f | t
| f | t
server_name | valid | used_in_xact | closed
-------------+-------+--------------+--------
loopback | f | t |
| f | t |
(2 rows)
-- The invalid connections get closed in pgfdw_xact_callback during commit.
@ -12286,3 +12286,49 @@ ANALYZE analyze_table;
-- cleanup
DROP FOREIGN TABLE analyze_ftable;
DROP TABLE analyze_table;
-- ===================================================================
-- test for postgres_fdw_get_connections function with check_conn = true
-- ===================================================================
-- Disable debug_discard_caches in order to manage remote connections
SET debug_discard_caches TO '0';
-- The text of the error might vary across platforms, so only show SQLSTATE.
\set VERBOSITY sqlstate
SELECT 1 FROM postgres_fdw_disconnect_all();
?column?
----------
1
(1 row)
ALTER SERVER loopback OPTIONS (SET application_name 'fdw_conn_check');
SELECT 1 FROM ft1 LIMIT 1;
?column?
----------
1
(1 row)
-- Since the remote server is still connected, "closed" should be FALSE,
-- or NULL if the connection status check is not available.
SELECT CASE WHEN closed IS NOT true THEN 1 ELSE 0 END
FROM postgres_fdw_get_connections(true);
case
------
1
(1 row)
-- After terminating the remote backend, since the connection is closed,
-- "closed" should be TRUE, or NULL if the connection status check
-- is not available.
DO $$ BEGIN
PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
WHERE application_name = 'fdw_conn_check';
END $$;
SELECT CASE WHEN closed IS NOT false THEN 1 ELSE 0 END
FROM postgres_fdw_get_connections(true);
case
------
1
(1 row)
-- Clean up
\set VERBOSITY default
RESET debug_discard_caches;

View File

@ -9,8 +9,9 @@ ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_get_connections ();
/* Then we can drop it */
DROP FUNCTION postgres_fdw_get_connections ();
CREATE FUNCTION postgres_fdw_get_connections (OUT server_name text,
OUT valid boolean, OUT used_in_xact boolean)
CREATE FUNCTION postgres_fdw_get_connections (
IN check_conn boolean DEFAULT false, OUT server_name text,
OUT valid boolean, OUT used_in_xact boolean, OUT closed boolean)
RETURNS SETOF record
AS 'MODULE_PATHNAME', 'postgres_fdw_get_connections_1_2'
LANGUAGE C STRICT PARALLEL RESTRICTED;

View File

@ -4235,3 +4235,36 @@ ANALYZE analyze_table;
-- cleanup
DROP FOREIGN TABLE analyze_ftable;
DROP TABLE analyze_table;
-- ===================================================================
-- test for postgres_fdw_get_connections function with check_conn = true
-- ===================================================================
-- Disable debug_discard_caches in order to manage remote connections
SET debug_discard_caches TO '0';
-- The text of the error might vary across platforms, so only show SQLSTATE.
\set VERBOSITY sqlstate
SELECT 1 FROM postgres_fdw_disconnect_all();
ALTER SERVER loopback OPTIONS (SET application_name 'fdw_conn_check');
SELECT 1 FROM ft1 LIMIT 1;
-- Since the remote server is still connected, "closed" should be FALSE,
-- or NULL if the connection status check is not available.
SELECT CASE WHEN closed IS NOT true THEN 1 ELSE 0 END
FROM postgres_fdw_get_connections(true);
-- After terminating the remote backend, since the connection is closed,
-- "closed" should be TRUE, or NULL if the connection status check
-- is not available.
DO $$ BEGIN
PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
WHERE application_name = 'fdw_conn_check';
END $$;
SELECT CASE WHEN closed IS NOT false THEN 1 ELSE 0 END
FROM postgres_fdw_get_connections(true);
-- Clean up
\set VERBOSITY default
RESET debug_discard_caches;

View File

@ -777,21 +777,39 @@ OPTIONS (ADD password_required 'false');
<variablelist>
<varlistentry>
<term><function>postgres_fdw_get_connections(OUT server_name text,
OUT valid boolean, OUT used_in_xact boolean)
<term><function>postgres_fdw_get_connections(
IN check_conn boolean DEFAULT false, OUT server_name text,
OUT valid boolean, OUT used_in_xact boolean, OUT closed boolean)
returns setof record</function></term>
<listitem>
<para>
This function returns information about all open connections postgres_fdw
has established from the local session to foreign servers. If there are
no open connections, no records are returned.
</para>
<para>
If <literal>check_conn</literal> is set to <literal>true</literal>,
the function checks the status of each connection and shows
the result in the <literal>closed</literal> column.
This feature is currently available only on systems that support
the non-standard <symbol>POLLRDHUP</symbol> extension to
the <symbol>poll</symbol> system call, including Linux.
This is useful to check if all connections used within
a transaction are still open. If any connections are closed,
the transaction cannot be committed successfully,
so it is better to roll back as soon as a closed connection is detected,
rather than continuing to the end. Users can roll back the transaction
immediately if the function reports connections where both
<literal>used_in_xact</literal> and <literal>closed</literal> are
<literal>true</literal>.
</para>
<para>
Example usage of the function:
<screen>
postgres=*# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
server_name | valid | used_in_xact
-------------+-------+--------------
loopback1 | t | t
loopback2 | f | t
server_name | valid | used_in_xact | closed
-------------+-------+--------------+--------
loopback1 | t | t |
loopback2 | f | t |
</screen>
The output columns are described in
<xref linkend="postgres-fdw-get-connections-columns"/>.
@ -836,6 +854,16 @@ postgres=*# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1;
True if this connection is used in the current transaction.
</entry>
</row>
<row>
<entry><structfield>closed</structfield></entry>
<entry><type>boolean</type></entry>
<entry>
True if this connection is closed, false otherwise.
<literal>NULL</literal> is returned if <literal>check_conn</literal>
is set to <literal>false</literal> or if the connection status check
is not available on this platform.
</entry>
</row>
</tbody>
</tgroup>
</table>