Add support for COPY TO callback functions
This is useful as a way for extensions to process COPY TO rows in the way they see fit (say auditing, analytics, backend, etc.) without the need to invoke an external process running as the OS user running the backend through PROGRAM that requires superuser rights. COPY FROM already provides a similar callback for logical replication. For COPY TO, the callback is triggered when we are ready to send a row in CopySendEndOfRow(), which is the same code path as when sending a row to a frontend or a pipe/file. A small test module, test_copy_callbacks, is added to provide some coverage for this facility. Author: Bilva Sanaba, Nathan Bossart Discussion: https://postgr.es/m/253C21D1-FCEB-41D9-A2AF-E6517015B7D7@amazon.com
This commit is contained in:
parent
0e87dfe464
commit
9fcdf2c787
@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
|
|||||||
|
|
||||||
cstate = BeginCopyTo(pstate, rel, query, relid,
|
cstate = BeginCopyTo(pstate, rel, query, relid,
|
||||||
stmt->filename, stmt->is_program,
|
stmt->filename, stmt->is_program,
|
||||||
stmt->attlist, stmt->options);
|
NULL, stmt->attlist, stmt->options);
|
||||||
*processed = DoCopyTo(cstate); /* copy from database to file */
|
*processed = DoCopyTo(cstate); /* copy from database to file */
|
||||||
EndCopyTo(cstate);
|
EndCopyTo(cstate);
|
||||||
}
|
}
|
||||||
|
@ -51,6 +51,7 @@ typedef enum CopyDest
|
|||||||
{
|
{
|
||||||
COPY_FILE, /* to file (or a piped program) */
|
COPY_FILE, /* to file (or a piped program) */
|
||||||
COPY_FRONTEND, /* to frontend */
|
COPY_FRONTEND, /* to frontend */
|
||||||
|
COPY_CALLBACK /* to callback function */
|
||||||
} CopyDest;
|
} CopyDest;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -85,6 +86,7 @@ typedef struct CopyToStateData
|
|||||||
List *attnumlist; /* integer list of attnums to copy */
|
List *attnumlist; /* integer list of attnums to copy */
|
||||||
char *filename; /* filename, or NULL for STDOUT */
|
char *filename; /* filename, or NULL for STDOUT */
|
||||||
bool is_program; /* is 'filename' a program to popen? */
|
bool is_program; /* is 'filename' a program to popen? */
|
||||||
|
copy_data_dest_cb data_dest_cb; /* function for writing data */
|
||||||
|
|
||||||
CopyFormatOptions opts;
|
CopyFormatOptions opts;
|
||||||
Node *whereClause; /* WHERE condition (or NULL) */
|
Node *whereClause; /* WHERE condition (or NULL) */
|
||||||
@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
|
|||||||
/* Dump the accumulated row as one CopyData message */
|
/* Dump the accumulated row as one CopyData message */
|
||||||
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
|
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
|
||||||
break;
|
break;
|
||||||
|
case COPY_CALLBACK:
|
||||||
|
cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Update the progress */
|
/* Update the progress */
|
||||||
@ -336,6 +341,17 @@ EndCopy(CopyToState cstate)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Setup CopyToState to read tuples from a table or a query for COPY TO.
|
* Setup CopyToState to read tuples from a table or a query for COPY TO.
|
||||||
|
*
|
||||||
|
* 'rel': Relation to be copied
|
||||||
|
* 'raw_query': Query whose results are to be copied
|
||||||
|
* 'queryRelId': OID of base relation to convert to a query (for RLS)
|
||||||
|
* 'filename': Name of server-local file to write, NULL for STDOUT
|
||||||
|
* 'is_program': true if 'filename' is program to execute
|
||||||
|
* 'data_dest_cb': Callback that processes the output data
|
||||||
|
* 'attnamelist': List of char *, columns to include. NIL selects all cols.
|
||||||
|
* 'options': List of DefElem. See copy_opt_item in gram.y for selections.
|
||||||
|
*
|
||||||
|
* Returns a CopyToState, to be passed to DoCopyTo() and related functions.
|
||||||
*/
|
*/
|
||||||
CopyToState
|
CopyToState
|
||||||
BeginCopyTo(ParseState *pstate,
|
BeginCopyTo(ParseState *pstate,
|
||||||
@ -344,11 +360,12 @@ BeginCopyTo(ParseState *pstate,
|
|||||||
Oid queryRelId,
|
Oid queryRelId,
|
||||||
const char *filename,
|
const char *filename,
|
||||||
bool is_program,
|
bool is_program,
|
||||||
|
copy_data_dest_cb data_dest_cb,
|
||||||
List *attnamelist,
|
List *attnamelist,
|
||||||
List *options)
|
List *options)
|
||||||
{
|
{
|
||||||
CopyToState cstate;
|
CopyToState cstate;
|
||||||
bool pipe = (filename == NULL);
|
bool pipe = (filename == NULL && data_dest_cb == NULL);
|
||||||
TupleDesc tupDesc;
|
TupleDesc tupDesc;
|
||||||
int num_phys_attrs;
|
int num_phys_attrs;
|
||||||
MemoryContext oldcontext;
|
MemoryContext oldcontext;
|
||||||
@ -656,7 +673,13 @@ BeginCopyTo(ParseState *pstate,
|
|||||||
|
|
||||||
cstate->copy_dest = COPY_FILE; /* default */
|
cstate->copy_dest = COPY_FILE; /* default */
|
||||||
|
|
||||||
if (pipe)
|
if (data_dest_cb)
|
||||||
|
{
|
||||||
|
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
|
||||||
|
cstate->copy_dest = COPY_CALLBACK;
|
||||||
|
cstate->data_dest_cb = data_dest_cb;
|
||||||
|
}
|
||||||
|
else if (pipe)
|
||||||
{
|
{
|
||||||
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
|
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
|
||||||
|
|
||||||
@ -765,11 +788,13 @@ EndCopyTo(CopyToState cstate)
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* Copy from relation or query TO file.
|
* Copy from relation or query TO file.
|
||||||
|
*
|
||||||
|
* Returns the number of rows processed.
|
||||||
*/
|
*/
|
||||||
uint64
|
uint64
|
||||||
DoCopyTo(CopyToState cstate)
|
DoCopyTo(CopyToState cstate)
|
||||||
{
|
{
|
||||||
bool pipe = (cstate->filename == NULL);
|
bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
|
||||||
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
|
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
|
||||||
TupleDesc tupDesc;
|
TupleDesc tupDesc;
|
||||||
int num_phys_attrs;
|
int num_phys_attrs;
|
||||||
|
@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
|
|||||||
typedef struct CopyToStateData *CopyToState;
|
typedef struct CopyToStateData *CopyToState;
|
||||||
|
|
||||||
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
|
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
|
||||||
|
typedef void (*copy_data_dest_cb) (void *data, int len);
|
||||||
|
|
||||||
extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
|
extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
|
||||||
int stmt_location, int stmt_len,
|
int stmt_location, int stmt_len,
|
||||||
@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
|
|||||||
*/
|
*/
|
||||||
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
|
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
|
||||||
Oid queryRelId, const char *filename, bool is_program,
|
Oid queryRelId, const char *filename, bool is_program,
|
||||||
List *attnamelist, List *options);
|
copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
|
||||||
extern void EndCopyTo(CopyToState cstate);
|
extern void EndCopyTo(CopyToState cstate);
|
||||||
extern uint64 DoCopyTo(CopyToState cstate);
|
extern uint64 DoCopyTo(CopyToState cstate);
|
||||||
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
|
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
|
||||||
|
@ -15,6 +15,7 @@ SUBDIRS = \
|
|||||||
snapshot_too_old \
|
snapshot_too_old \
|
||||||
spgist_name_ops \
|
spgist_name_ops \
|
||||||
test_bloomfilter \
|
test_bloomfilter \
|
||||||
|
test_copy_callbacks \
|
||||||
test_ddl_deparse \
|
test_ddl_deparse \
|
||||||
test_extensions \
|
test_extensions \
|
||||||
test_ginpostinglist \
|
test_ginpostinglist \
|
||||||
|
@ -9,6 +9,7 @@ subdir('snapshot_too_old')
|
|||||||
subdir('spgist_name_ops')
|
subdir('spgist_name_ops')
|
||||||
subdir('ssl_passphrase_callback')
|
subdir('ssl_passphrase_callback')
|
||||||
subdir('test_bloomfilter')
|
subdir('test_bloomfilter')
|
||||||
|
subdir('test_copy_callbacks')
|
||||||
subdir('test_ddl_deparse')
|
subdir('test_ddl_deparse')
|
||||||
subdir('test_extensions')
|
subdir('test_extensions')
|
||||||
subdir('test_ginpostinglist')
|
subdir('test_ginpostinglist')
|
||||||
|
4
src/test/modules/test_copy_callbacks/.gitignore
vendored
Normal file
4
src/test/modules/test_copy_callbacks/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
# Generated subdirectories
|
||||||
|
/log/
|
||||||
|
/results/
|
||||||
|
/tmp_check/
|
23
src/test/modules/test_copy_callbacks/Makefile
Normal file
23
src/test/modules/test_copy_callbacks/Makefile
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
# src/test/modules/test_copy_callbacks/Makefile
|
||||||
|
|
||||||
|
MODULE_big = test_copy_callbacks
|
||||||
|
OBJS = \
|
||||||
|
$(WIN32RES) \
|
||||||
|
test_copy_callbacks.o
|
||||||
|
PGFILEDESC = "test_copy_callbacks - test COPY callbacks"
|
||||||
|
|
||||||
|
EXTENSION = test_copy_callbacks
|
||||||
|
DATA = test_copy_callbacks--1.0.sql
|
||||||
|
|
||||||
|
REGRESS = test_copy_callbacks
|
||||||
|
|
||||||
|
ifdef USE_PGXS
|
||||||
|
PG_CONFIG = pg_config
|
||||||
|
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||||
|
include $(PGXS)
|
||||||
|
else
|
||||||
|
subdir = src/test/modules/test_copy_callbacks
|
||||||
|
top_builddir = ../../../..
|
||||||
|
include $(top_builddir)/src/Makefile.global
|
||||||
|
include $(top_srcdir)/contrib/contrib-global.mk
|
||||||
|
endif
|
@ -0,0 +1,13 @@
|
|||||||
|
CREATE EXTENSION test_copy_callbacks;
|
||||||
|
CREATE TABLE public.test (a INT, b INT, c INT);
|
||||||
|
INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
|
||||||
|
SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
|
||||||
|
NOTICE: COPY TO callback called with data "1 2 3" and length 5
|
||||||
|
NOTICE: COPY TO callback called with data "12 34 56" and length 8
|
||||||
|
NOTICE: COPY TO callback called with data "123 456 789" and length 11
|
||||||
|
NOTICE: COPY TO callback has processed 3 rows
|
||||||
|
test_copy_to_callback
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
34
src/test/modules/test_copy_callbacks/meson.build
Normal file
34
src/test/modules/test_copy_callbacks/meson.build
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
# FIXME: prevent install during main install, but not during test :/
|
||||||
|
|
||||||
|
test_copy_callbacks_sources = files(
|
||||||
|
'test_copy_callbacks.c',
|
||||||
|
)
|
||||||
|
|
||||||
|
if host_system == 'windows'
|
||||||
|
test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
|
||||||
|
'--NAME', 'test_copy_callbacks',
|
||||||
|
'--FILEDESC', 'test_copy_callbacks - test COPY callbacks',])
|
||||||
|
endif
|
||||||
|
|
||||||
|
test_copy_callbacks = shared_module('test_copy_callbacks',
|
||||||
|
test_copy_callbacks_sources,
|
||||||
|
kwargs: pg_mod_args,
|
||||||
|
)
|
||||||
|
testprep_targets += test_copy_callbacks
|
||||||
|
|
||||||
|
install_data(
|
||||||
|
'test_copy_callbacks.control',
|
||||||
|
'test_copy_callbacks--1.0.sql',
|
||||||
|
kwargs: contrib_data_args,
|
||||||
|
)
|
||||||
|
|
||||||
|
tests += {
|
||||||
|
'name': 'test_copy_callbacks',
|
||||||
|
'sd': meson.current_source_dir(),
|
||||||
|
'bd': meson.current_build_dir(),
|
||||||
|
'regress': {
|
||||||
|
'sql': [
|
||||||
|
'test_copy_callbacks',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
}
|
@ -0,0 +1,4 @@
|
|||||||
|
CREATE EXTENSION test_copy_callbacks;
|
||||||
|
CREATE TABLE public.test (a INT, b INT, c INT);
|
||||||
|
INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
|
||||||
|
SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
|
@ -0,0 +1,8 @@
|
|||||||
|
/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
|
||||||
|
|
||||||
|
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||||
|
\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
|
||||||
|
|
||||||
|
CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass)
|
||||||
|
RETURNS pg_catalog.void
|
||||||
|
AS 'MODULE_PATHNAME' LANGUAGE C;
|
51
src/test/modules/test_copy_callbacks/test_copy_callbacks.c
Normal file
51
src/test/modules/test_copy_callbacks/test_copy_callbacks.c
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
/*--------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* test_copy_callbacks.c
|
||||||
|
* Code for testing COPY callbacks.
|
||||||
|
*
|
||||||
|
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
|
||||||
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
|
*
|
||||||
|
* IDENTIFICATION
|
||||||
|
* src/test/modules/test_copy_callbacks/test_copy_callbacks.c
|
||||||
|
*
|
||||||
|
* -------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "access/table.h"
|
||||||
|
#include "commands/copy.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
PG_MODULE_MAGIC;
|
||||||
|
|
||||||
|
static void
|
||||||
|
to_cb(void *data, int len)
|
||||||
|
{
|
||||||
|
ereport(NOTICE,
|
||||||
|
(errmsg("COPY TO callback called with data \"%s\" and length %d",
|
||||||
|
(char *) data, len)));
|
||||||
|
}
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(test_copy_to_callback);
|
||||||
|
Datum
|
||||||
|
test_copy_to_callback(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock);
|
||||||
|
CopyToState cstate;
|
||||||
|
int64 processed;
|
||||||
|
|
||||||
|
cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
|
||||||
|
to_cb, NIL, NIL);
|
||||||
|
processed = DoCopyTo(cstate);
|
||||||
|
EndCopyTo(cstate);
|
||||||
|
|
||||||
|
ereport(NOTICE, (errmsg("COPY TO callback has processed %lld rows",
|
||||||
|
(long long) processed)));
|
||||||
|
|
||||||
|
table_close(rel, NoLock);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
@ -0,0 +1,4 @@
|
|||||||
|
comment = 'Test code for COPY callbacks'
|
||||||
|
default_version = '1.0'
|
||||||
|
module_pathname = '$libdir/test_copy_callbacks'
|
||||||
|
relocatable = true
|
@ -3177,6 +3177,7 @@ compare_context
|
|||||||
config_var_value
|
config_var_value
|
||||||
contain_aggs_of_level_context
|
contain_aggs_of_level_context
|
||||||
convert_testexpr_context
|
convert_testexpr_context
|
||||||
|
copy_data_dest_cb
|
||||||
copy_data_source_cb
|
copy_data_source_cb
|
||||||
core_YYSTYPE
|
core_YYSTYPE
|
||||||
core_yy_extra_type
|
core_yy_extra_type
|
||||||
|
Loading…
x
Reference in New Issue
Block a user