libpq-be-fe-helpers.h: wrap new cancel APIs

Commit 61461a300c introduced new functions to libpq for cancelling
queries.  This commit introduces a helper function that backend-side
libraries and extensions can use to invoke those.  This function takes a
timeout and can itself be interrupted while it is waiting for a cancel
request to be sent and processed, instead of being blocked.

This replaces the usage of the old functions in postgres_fdw and dblink.

Finally, it also adds some test coverage for the cancel support in
postgres_fdw.

Author: Jelte Fennema-Nio <postgres@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQT_VgOWWENUqvUV9xQmbaCyXjtRRAYO8W07oqashk_N+g@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2024-03-28 11:31:03 +01:00
parent 427005742b
commit 2466d6654f
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
5 changed files with 140 additions and 39 deletions

View File

@ -1347,25 +1347,16 @@ Datum
dblink_cancel_query(PG_FUNCTION_ARGS) dblink_cancel_query(PG_FUNCTION_ARGS)
{ {
PGconn *conn; PGconn *conn;
PGcancelConn *cancelConn;
char *msg; char *msg;
TimestampTz endtime;
dblink_init(); dblink_init();
conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
cancelConn = PQcancelCreate(conn); endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
30000);
PG_TRY(); msg = libpqsrv_cancel(conn, endtime);
{ if (msg == NULL)
if (!PQcancelBlocking(cancelConn))
msg = pchomp(PQcancelErrorMessage(cancelConn));
else
msg = "OK"; msg = "OK";
}
PG_FINALLY();
{
PQcancelFinish(cancelConn);
}
PG_END_TRY();
PG_RETURN_TEXT_P(cstring_to_text(msg)); PG_RETURN_TEXT_P(cstring_to_text(msg));
} }

View File

@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_cancel_query_begin(PGconn *conn); static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
bool consume_input); bool consume_input);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn)
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
CONNECTION_CLEANUP_TIMEOUT); CONNECTION_CLEANUP_TIMEOUT);
if (!pgfdw_cancel_query_begin(conn)) if (!pgfdw_cancel_query_begin(conn, endtime))
return false; return false;
return pgfdw_cancel_query_end(conn, endtime, false); return pgfdw_cancel_query_end(conn, endtime, false);
} }
static bool /*
pgfdw_cancel_query_begin(PGconn *conn) * Submit a cancel request to the given connection, waiting only until
{ * the given time.
PGcancel *cancel; *
char errbuf[256]; * We sleep interruptibly until we receive confirmation that the cancel
* request has been accepted, and if it is, return true; if the timeout
/* * lapses without that, or the request fails for whatever reason, return
* Issue cancel request. Unfortunately, there's no good way to limit the * false.
* amount of time that we might block inside PQgetCancel().
*/ */
if ((cancel = PQgetCancel(conn))) static bool
{ pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
if (!PQcancel(cancel, errbuf, sizeof(errbuf))) {
{ char *errormsg = libpqsrv_cancel(conn, endtime);
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s",
errbuf)));
PQfreeCancel(cancel);
return false;
}
PQfreeCancel(cancel);
}
return true; if (errormsg != NULL)
ereport(WARNING,
errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s", errormsg));
return errormsg == NULL;
} }
static bool static bool
@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
*/ */
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
{ {
if (!pgfdw_cancel_query_begin(entry->conn)) TimestampTz endtime;
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
CONNECTION_CLEANUP_TIMEOUT);
if (!pgfdw_cancel_query_begin(entry->conn, endtime))
return false; /* Unable to cancel running query */ return false; /* Unable to cancel running query */
*cancel_requested = lappend(*cancel_requested, entry); *cancel_requested = lappend(*cancel_requested, entry);
} }

View File

@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
(10 rows) (10 rows)
ALTER VIEW v4 OWNER TO regress_view_owner; ALTER VIEW v4 OWNER TO regress_view_owner;
-- Make sure this big CROSS JOIN query is pushed down
EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
Foreign Scan
Output: (count(*))
Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
(4 rows)
-- Make sure query cancellation works
SET statement_timeout = '10ms';
select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
ERROR: canceling statement due to statement timeout
RESET statement_timeout;
-- ==================================================================== -- ====================================================================
-- Check that userid to use when querying the remote table is correctly -- Check that userid to use when querying the remote table is correctly
-- propagated into foreign rels present in subqueries under an UNION ALL -- propagated into foreign rels present in subqueries under an UNION ALL

View File

@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
ALTER VIEW v4 OWNER TO regress_view_owner; ALTER VIEW v4 OWNER TO regress_view_owner;
-- Make sure this big CROSS JOIN query is pushed down
EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
-- Make sure query cancellation works
SET statement_timeout = '10ms';
select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
RESET statement_timeout;
-- ==================================================================== -- ====================================================================
-- Check that userid to use when querying the remote table is correctly -- Check that userid to use when querying the remote table is correctly
-- propagated into foreign rels present in subqueries under an UNION ALL -- propagated into foreign rels present in subqueries under an UNION ALL

View File

@ -44,6 +44,8 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
static inline void libpqsrv_connect_prepare(void); static inline void libpqsrv_connect_prepare(void);
@ -365,4 +367,91 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
return PQgetResult(conn); return PQgetResult(conn);
} }
/*
* Submit a cancel request to the given connection, waiting only until
* the given time.
*
* We sleep interruptibly until we receive confirmation that the cancel
* request has been accepted, and if it is, return NULL; if the cancel
* request fails, return an error message string (which is not to be
* freed).
*
* For other problems (to wit: OOM when strdup'ing an error message from
* libpq), this function can ereport(ERROR).
*
* Note: this function leaks a string's worth of memory when reporting
* libpq errors. Make sure to call it in a transient memory context.
*/
static inline char *
libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
{
PGcancelConn *cancel_conn;
char *error = NULL;
cancel_conn = PQcancelCreate(conn);
if (cancel_conn == NULL)
return _("out of memory");
/* In what follows, do not leak any PGcancelConn on any errors. */
PG_TRY();
{
if (!PQcancelStart(cancel_conn))
{
error = pchomp(PQcancelErrorMessage(cancel_conn));
goto exit;
}
for (;;)
{
PostgresPollingStatusType pollres;
TimestampTz now;
long cur_timeout;
int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
pollres = PQcancelPoll(cancel_conn);
if (pollres == PGRES_POLLING_OK)
break; /* success! */
/* If timeout has expired, give up, else get sleep time. */
now = GetCurrentTimestamp();
cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
if (cur_timeout <= 0)
{
error = _("cancel request timed out");
break;
}
switch (pollres)
{
case PGRES_POLLING_READING:
waitEvents |= WL_SOCKET_READABLE;
break;
case PGRES_POLLING_WRITING:
waitEvents |= WL_SOCKET_WRITEABLE;
break;
default:
error = pchomp(PQcancelErrorMessage(cancel_conn));
goto exit;
}
/* Sleep until there's something to do */
WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
cur_timeout, PG_WAIT_CLIENT);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
exit: ;
}
PG_FINALLY();
{
PQcancelFinish(cancel_conn);
}
PG_END_TRY();
return error;
}
#endif /* LIBPQ_BE_FE_HELPERS_H */ #endif /* LIBPQ_BE_FE_HELPERS_H */