mirror of https://github.com/postgres/postgres
libpq-be-fe-helpers.h: wrap new cancel APIs
Commit 61461a300c
introduced new functions to libpq for cancelling
queries. This commit introduces a helper function that backend-side
libraries and extensions can use to invoke those. This function takes a
timeout and can itself be interrupted while it is waiting for a cancel
request to be sent and processed, instead of being blocked.
This replaces the usage of the old functions in postgres_fdw and dblink.
Finally, it also adds some test coverage for the cancel support in
postgres_fdw.
Author: Jelte Fennema-Nio <postgres@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQT_VgOWWENUqvUV9xQmbaCyXjtRRAYO8W07oqashk_N+g@mail.gmail.com
This commit is contained in:
parent
427005742b
commit
2466d6654f
|
@ -1347,25 +1347,16 @@ Datum
|
|||
dblink_cancel_query(PG_FUNCTION_ARGS)
|
||||
{
|
||||
PGconn *conn;
|
||||
PGcancelConn *cancelConn;
|
||||
char *msg;
|
||||
TimestampTz endtime;
|
||||
|
||||
dblink_init();
|
||||
conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
|
||||
cancelConn = PQcancelCreate(conn);
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
if (!PQcancelBlocking(cancelConn))
|
||||
msg = pchomp(PQcancelErrorMessage(cancelConn));
|
||||
else
|
||||
msg = "OK";
|
||||
}
|
||||
PG_FINALLY();
|
||||
{
|
||||
PQcancelFinish(cancelConn);
|
||||
}
|
||||
PG_END_TRY();
|
||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
|
||||
30000);
|
||||
msg = libpqsrv_cancel(conn, endtime);
|
||||
if (msg == NULL)
|
||||
msg = "OK";
|
||||
|
||||
PG_RETURN_TEXT_P(cstring_to_text(msg));
|
||||
}
|
||||
|
|
|
@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
|
|||
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
|
||||
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
|
||||
static bool pgfdw_cancel_query(PGconn *conn);
|
||||
static bool pgfdw_cancel_query_begin(PGconn *conn);
|
||||
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
|
||||
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
|
||||
bool consume_input);
|
||||
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
|
||||
|
@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn)
|
|||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
|
||||
CONNECTION_CLEANUP_TIMEOUT);
|
||||
|
||||
if (!pgfdw_cancel_query_begin(conn))
|
||||
if (!pgfdw_cancel_query_begin(conn, endtime))
|
||||
return false;
|
||||
return pgfdw_cancel_query_end(conn, endtime, false);
|
||||
}
|
||||
|
||||
/*
|
||||
* Submit a cancel request to the given connection, waiting only until
|
||||
* the given time.
|
||||
*
|
||||
* We sleep interruptibly until we receive confirmation that the cancel
|
||||
* request has been accepted, and if it is, return true; if the timeout
|
||||
* lapses without that, or the request fails for whatever reason, return
|
||||
* false.
|
||||
*/
|
||||
static bool
|
||||
pgfdw_cancel_query_begin(PGconn *conn)
|
||||
pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
|
||||
{
|
||||
PGcancel *cancel;
|
||||
char errbuf[256];
|
||||
char *errormsg = libpqsrv_cancel(conn, endtime);
|
||||
|
||||
/*
|
||||
* Issue cancel request. Unfortunately, there's no good way to limit the
|
||||
* amount of time that we might block inside PQgetCancel().
|
||||
*/
|
||||
if ((cancel = PQgetCancel(conn)))
|
||||
{
|
||||
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
|
||||
{
|
||||
ereport(WARNING,
|
||||
(errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not send cancel request: %s",
|
||||
errbuf)));
|
||||
PQfreeCancel(cancel);
|
||||
return false;
|
||||
}
|
||||
PQfreeCancel(cancel);
|
||||
}
|
||||
if (errormsg != NULL)
|
||||
ereport(WARNING,
|
||||
errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not send cancel request: %s", errormsg));
|
||||
|
||||
return true;
|
||||
return errormsg == NULL;
|
||||
}
|
||||
|
||||
static bool
|
||||
|
@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
|
|||
*/
|
||||
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
|
||||
{
|
||||
if (!pgfdw_cancel_query_begin(entry->conn))
|
||||
TimestampTz endtime;
|
||||
|
||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
|
||||
CONNECTION_CLEANUP_TIMEOUT);
|
||||
if (!pgfdw_cancel_query_begin(entry->conn, endtime))
|
||||
return false; /* Unable to cancel running query */
|
||||
*cancel_requested = lappend(*cancel_requested, entry);
|
||||
}
|
||||
|
|
|
@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
|
|||
(10 rows)
|
||||
|
||||
ALTER VIEW v4 OWNER TO regress_view_owner;
|
||||
-- Make sure this big CROSS JOIN query is pushed down
|
||||
EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Foreign Scan
|
||||
Output: (count(*))
|
||||
Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
|
||||
Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
|
||||
(4 rows)
|
||||
|
||||
-- Make sure query cancellation works
|
||||
SET statement_timeout = '10ms';
|
||||
select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
|
||||
ERROR: canceling statement due to statement timeout
|
||||
RESET statement_timeout;
|
||||
-- ====================================================================
|
||||
-- Check that userid to use when querying the remote table is correctly
|
||||
-- propagated into foreign rels present in subqueries under an UNION ALL
|
||||
|
|
|
@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
|
|||
SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
|
||||
ALTER VIEW v4 OWNER TO regress_view_owner;
|
||||
|
||||
-- Make sure this big CROSS JOIN query is pushed down
|
||||
EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
|
||||
-- Make sure query cancellation works
|
||||
SET statement_timeout = '10ms';
|
||||
select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
|
||||
RESET statement_timeout;
|
||||
|
||||
-- ====================================================================
|
||||
-- Check that userid to use when querying the remote table is correctly
|
||||
-- propagated into foreign rels present in subqueries under an UNION ALL
|
||||
|
|
|
@ -44,6 +44,8 @@
|
|||
#include "miscadmin.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/timestamp.h"
|
||||
#include "utils/wait_event.h"
|
||||
|
||||
|
||||
static inline void libpqsrv_connect_prepare(void);
|
||||
|
@ -365,4 +367,91 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
|
|||
return PQgetResult(conn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Submit a cancel request to the given connection, waiting only until
|
||||
* the given time.
|
||||
*
|
||||
* We sleep interruptibly until we receive confirmation that the cancel
|
||||
* request has been accepted, and if it is, return NULL; if the cancel
|
||||
* request fails, return an error message string (which is not to be
|
||||
* freed).
|
||||
*
|
||||
* For other problems (to wit: OOM when strdup'ing an error message from
|
||||
* libpq), this function can ereport(ERROR).
|
||||
*
|
||||
* Note: this function leaks a string's worth of memory when reporting
|
||||
* libpq errors. Make sure to call it in a transient memory context.
|
||||
*/
|
||||
static inline char *
|
||||
libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
|
||||
{
|
||||
PGcancelConn *cancel_conn;
|
||||
char *error = NULL;
|
||||
|
||||
cancel_conn = PQcancelCreate(conn);
|
||||
if (cancel_conn == NULL)
|
||||
return _("out of memory");
|
||||
|
||||
/* In what follows, do not leak any PGcancelConn on any errors. */
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
if (!PQcancelStart(cancel_conn))
|
||||
{
|
||||
error = pchomp(PQcancelErrorMessage(cancel_conn));
|
||||
goto exit;
|
||||
}
|
||||
|
||||
for (;;)
|
||||
{
|
||||
PostgresPollingStatusType pollres;
|
||||
TimestampTz now;
|
||||
long cur_timeout;
|
||||
int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
|
||||
|
||||
pollres = PQcancelPoll(cancel_conn);
|
||||
if (pollres == PGRES_POLLING_OK)
|
||||
break; /* success! */
|
||||
|
||||
/* If timeout has expired, give up, else get sleep time. */
|
||||
now = GetCurrentTimestamp();
|
||||
cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
|
||||
if (cur_timeout <= 0)
|
||||
{
|
||||
error = _("cancel request timed out");
|
||||
break;
|
||||
}
|
||||
|
||||
switch (pollres)
|
||||
{
|
||||
case PGRES_POLLING_READING:
|
||||
waitEvents |= WL_SOCKET_READABLE;
|
||||
break;
|
||||
case PGRES_POLLING_WRITING:
|
||||
waitEvents |= WL_SOCKET_WRITEABLE;
|
||||
break;
|
||||
default:
|
||||
error = pchomp(PQcancelErrorMessage(cancel_conn));
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* Sleep until there's something to do */
|
||||
WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
|
||||
cur_timeout, PG_WAIT_CLIENT);
|
||||
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
exit: ;
|
||||
}
|
||||
PG_FINALLY();
|
||||
{
|
||||
PQcancelFinish(cancel_conn);
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
#endif /* LIBPQ_BE_FE_HELPERS_H */
|
||||
|
|
Loading…
Reference in New Issue