mirror of https://github.com/postgres/postgres
libpq-be-fe-helpers.h: wrap new cancel APIs
Commit 61461a300c
introduced new functions to libpq for cancelling
queries. This commit introduces a helper function that backend-side
libraries and extensions can use to invoke those. This function takes a
timeout and can itself be interrupted while it is waiting for a cancel
request to be sent and processed, instead of being blocked.
This replaces the usage of the old functions in postgres_fdw and dblink.
Finally, it also adds some test coverage for the cancel support in
postgres_fdw.
Author: Jelte Fennema-Nio <postgres@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQT_VgOWWENUqvUV9xQmbaCyXjtRRAYO8W07oqashk_N+g@mail.gmail.com
This commit is contained in:
parent
427005742b
commit
2466d6654f
|
@ -1347,25 +1347,16 @@ Datum
|
||||||
dblink_cancel_query(PG_FUNCTION_ARGS)
|
dblink_cancel_query(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
PGconn *conn;
|
PGconn *conn;
|
||||||
PGcancelConn *cancelConn;
|
|
||||||
char *msg;
|
char *msg;
|
||||||
|
TimestampTz endtime;
|
||||||
|
|
||||||
dblink_init();
|
dblink_init();
|
||||||
conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
|
conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
|
||||||
cancelConn = PQcancelCreate(conn);
|
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
|
||||||
|
30000);
|
||||||
PG_TRY();
|
msg = libpqsrv_cancel(conn, endtime);
|
||||||
{
|
if (msg == NULL)
|
||||||
if (!PQcancelBlocking(cancelConn))
|
|
||||||
msg = pchomp(PQcancelErrorMessage(cancelConn));
|
|
||||||
else
|
|
||||||
msg = "OK";
|
msg = "OK";
|
||||||
}
|
|
||||||
PG_FINALLY();
|
|
||||||
{
|
|
||||||
PQcancelFinish(cancelConn);
|
|
||||||
}
|
|
||||||
PG_END_TRY();
|
|
||||||
|
|
||||||
PG_RETURN_TEXT_P(cstring_to_text(msg));
|
PG_RETURN_TEXT_P(cstring_to_text(msg));
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
|
||||||
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
|
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
|
||||||
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
|
static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
|
||||||
static bool pgfdw_cancel_query(PGconn *conn);
|
static bool pgfdw_cancel_query(PGconn *conn);
|
||||||
static bool pgfdw_cancel_query_begin(PGconn *conn);
|
static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
|
||||||
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
|
static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
|
||||||
bool consume_input);
|
bool consume_input);
|
||||||
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
|
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
|
||||||
|
@ -1315,36 +1315,31 @@ pgfdw_cancel_query(PGconn *conn)
|
||||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
|
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
|
||||||
CONNECTION_CLEANUP_TIMEOUT);
|
CONNECTION_CLEANUP_TIMEOUT);
|
||||||
|
|
||||||
if (!pgfdw_cancel_query_begin(conn))
|
if (!pgfdw_cancel_query_begin(conn, endtime))
|
||||||
return false;
|
return false;
|
||||||
return pgfdw_cancel_query_end(conn, endtime, false);
|
return pgfdw_cancel_query_end(conn, endtime, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
/*
|
||||||
pgfdw_cancel_query_begin(PGconn *conn)
|
* Submit a cancel request to the given connection, waiting only until
|
||||||
{
|
* the given time.
|
||||||
PGcancel *cancel;
|
*
|
||||||
char errbuf[256];
|
* We sleep interruptibly until we receive confirmation that the cancel
|
||||||
|
* request has been accepted, and if it is, return true; if the timeout
|
||||||
/*
|
* lapses without that, or the request fails for whatever reason, return
|
||||||
* Issue cancel request. Unfortunately, there's no good way to limit the
|
* false.
|
||||||
* amount of time that we might block inside PQgetCancel().
|
|
||||||
*/
|
*/
|
||||||
if ((cancel = PQgetCancel(conn)))
|
static bool
|
||||||
{
|
pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
|
||||||
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
|
{
|
||||||
{
|
char *errormsg = libpqsrv_cancel(conn, endtime);
|
||||||
ereport(WARNING,
|
|
||||||
(errcode(ERRCODE_CONNECTION_FAILURE),
|
|
||||||
errmsg("could not send cancel request: %s",
|
|
||||||
errbuf)));
|
|
||||||
PQfreeCancel(cancel);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
PQfreeCancel(cancel);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
if (errormsg != NULL)
|
||||||
|
ereport(WARNING,
|
||||||
|
errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg("could not send cancel request: %s", errormsg));
|
||||||
|
|
||||||
|
return errormsg == NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
|
@ -1685,7 +1680,11 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
|
||||||
*/
|
*/
|
||||||
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
|
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
|
||||||
{
|
{
|
||||||
if (!pgfdw_cancel_query_begin(entry->conn))
|
TimestampTz endtime;
|
||||||
|
|
||||||
|
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
|
||||||
|
CONNECTION_CLEANUP_TIMEOUT);
|
||||||
|
if (!pgfdw_cancel_query_begin(entry->conn, endtime))
|
||||||
return false; /* Unable to cancel running query */
|
return false; /* Unable to cancel running query */
|
||||||
*cancel_requested = lappend(*cancel_requested, entry);
|
*cancel_requested = lappend(*cancel_requested, entry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
ALTER VIEW v4 OWNER TO regress_view_owner;
|
ALTER VIEW v4 OWNER TO regress_view_owner;
|
||||||
|
-- Make sure this big CROSS JOIN query is pushed down
|
||||||
|
EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||||
|
Foreign Scan
|
||||||
|
Output: (count(*))
|
||||||
|
Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5))
|
||||||
|
Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE))
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- Make sure query cancellation works
|
||||||
|
SET statement_timeout = '10ms';
|
||||||
|
select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
|
||||||
|
ERROR: canceling statement due to statement timeout
|
||||||
|
RESET statement_timeout;
|
||||||
-- ====================================================================
|
-- ====================================================================
|
||||||
-- Check that userid to use when querying the remote table is correctly
|
-- Check that userid to use when querying the remote table is correctly
|
||||||
-- propagated into foreign rels present in subqueries under an UNION ALL
|
-- propagated into foreign rels present in subqueries under an UNION ALL
|
||||||
|
|
|
@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c
|
||||||
SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
|
SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10;
|
||||||
ALTER VIEW v4 OWNER TO regress_view_owner;
|
ALTER VIEW v4 OWNER TO regress_view_owner;
|
||||||
|
|
||||||
|
-- Make sure this big CROSS JOIN query is pushed down
|
||||||
|
EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5;
|
||||||
|
-- Make sure query cancellation works
|
||||||
|
SET statement_timeout = '10ms';
|
||||||
|
select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long
|
||||||
|
RESET statement_timeout;
|
||||||
|
|
||||||
-- ====================================================================
|
-- ====================================================================
|
||||||
-- Check that userid to use when querying the remote table is correctly
|
-- Check that userid to use when querying the remote table is correctly
|
||||||
-- propagated into foreign rels present in subqueries under an UNION ALL
|
-- propagated into foreign rels present in subqueries under an UNION ALL
|
||||||
|
|
|
@ -44,6 +44,8 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
|
#include "utils/timestamp.h"
|
||||||
|
#include "utils/wait_event.h"
|
||||||
|
|
||||||
|
|
||||||
static inline void libpqsrv_connect_prepare(void);
|
static inline void libpqsrv_connect_prepare(void);
|
||||||
|
@ -365,4 +367,91 @@ libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
|
||||||
return PQgetResult(conn);
|
return PQgetResult(conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Submit a cancel request to the given connection, waiting only until
|
||||||
|
* the given time.
|
||||||
|
*
|
||||||
|
* We sleep interruptibly until we receive confirmation that the cancel
|
||||||
|
* request has been accepted, and if it is, return NULL; if the cancel
|
||||||
|
* request fails, return an error message string (which is not to be
|
||||||
|
* freed).
|
||||||
|
*
|
||||||
|
* For other problems (to wit: OOM when strdup'ing an error message from
|
||||||
|
* libpq), this function can ereport(ERROR).
|
||||||
|
*
|
||||||
|
* Note: this function leaks a string's worth of memory when reporting
|
||||||
|
* libpq errors. Make sure to call it in a transient memory context.
|
||||||
|
*/
|
||||||
|
static inline char *
|
||||||
|
libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
|
||||||
|
{
|
||||||
|
PGcancelConn *cancel_conn;
|
||||||
|
char *error = NULL;
|
||||||
|
|
||||||
|
cancel_conn = PQcancelCreate(conn);
|
||||||
|
if (cancel_conn == NULL)
|
||||||
|
return _("out of memory");
|
||||||
|
|
||||||
|
/* In what follows, do not leak any PGcancelConn on any errors. */
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
if (!PQcancelStart(cancel_conn))
|
||||||
|
{
|
||||||
|
error = pchomp(PQcancelErrorMessage(cancel_conn));
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
PostgresPollingStatusType pollres;
|
||||||
|
TimestampTz now;
|
||||||
|
long cur_timeout;
|
||||||
|
int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
|
||||||
|
|
||||||
|
pollres = PQcancelPoll(cancel_conn);
|
||||||
|
if (pollres == PGRES_POLLING_OK)
|
||||||
|
break; /* success! */
|
||||||
|
|
||||||
|
/* If timeout has expired, give up, else get sleep time. */
|
||||||
|
now = GetCurrentTimestamp();
|
||||||
|
cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
|
||||||
|
if (cur_timeout <= 0)
|
||||||
|
{
|
||||||
|
error = _("cancel request timed out");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (pollres)
|
||||||
|
{
|
||||||
|
case PGRES_POLLING_READING:
|
||||||
|
waitEvents |= WL_SOCKET_READABLE;
|
||||||
|
break;
|
||||||
|
case PGRES_POLLING_WRITING:
|
||||||
|
waitEvents |= WL_SOCKET_WRITEABLE;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
error = pchomp(PQcancelErrorMessage(cancel_conn));
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Sleep until there's something to do */
|
||||||
|
WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
|
||||||
|
cur_timeout, PG_WAIT_CLIENT);
|
||||||
|
|
||||||
|
ResetLatch(MyLatch);
|
||||||
|
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
}
|
||||||
|
exit: ;
|
||||||
|
}
|
||||||
|
PG_FINALLY();
|
||||||
|
{
|
||||||
|
PQcancelFinish(cancel_conn);
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* LIBPQ_BE_FE_HELPERS_H */
|
#endif /* LIBPQ_BE_FE_HELPERS_H */
|
||||||
|
|
Loading…
Reference in New Issue