diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 49924e476a..db4c9dbc23 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyTo(pstate, rel, query, relid, stmt->filename, stmt->is_program, - stmt->attlist, stmt->options); + NULL, stmt->attlist, stmt->options); *processed = DoCopyTo(cstate); /* copy from database to file */ EndCopyTo(cstate); } diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index fca29a9a10..2527e66059 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -51,6 +51,7 @@ typedef enum CopyDest { COPY_FILE, /* to file (or a piped program) */ COPY_FRONTEND, /* to frontend */ + COPY_CALLBACK /* to callback function */ } CopyDest; /* @@ -85,6 +86,7 @@ typedef struct CopyToStateData List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDOUT */ bool is_program; /* is 'filename' a program to popen? */ + copy_data_dest_cb data_dest_cb; /* function for writing data */ CopyFormatOptions opts; Node *whereClause; /* WHERE condition (or NULL) */ @@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_CALLBACK: + cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len); + break; } /* Update the progress */ @@ -336,6 +341,17 @@ EndCopy(CopyToState cstate) /* * Setup CopyToState to read tuples from a table or a query for COPY TO. + * + * 'rel': Relation to be copied + * 'raw_query': Query whose results are to be copied + * 'queryRelId': OID of base relation to convert to a query (for RLS) + * 'filename': Name of server-local file to write, NULL for STDOUT + * 'is_program': true if 'filename' is program to execute + * 'data_dest_cb': Callback that processes the output data + * 'attnamelist': List of char *, columns to include. NIL selects all cols. + * 'options': List of DefElem. See copy_opt_item in gram.y for selections. + * + * Returns a CopyToState, to be passed to DoCopyTo() and related functions. */ CopyToState BeginCopyTo(ParseState *pstate, @@ -344,11 +360,12 @@ BeginCopyTo(ParseState *pstate, Oid queryRelId, const char *filename, bool is_program, + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options) { CopyToState cstate; - bool pipe = (filename == NULL); + bool pipe = (filename == NULL && data_dest_cb == NULL); TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; @@ -656,7 +673,13 @@ BeginCopyTo(ParseState *pstate, cstate->copy_dest = COPY_FILE; /* default */ - if (pipe) + if (data_dest_cb) + { + progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; + cstate->copy_dest = COPY_CALLBACK; + cstate->data_dest_cb = data_dest_cb; + } + else if (pipe) { progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; @@ -765,11 +788,13 @@ EndCopyTo(CopyToState cstate) /* * Copy from relation or query TO file. + * + * Returns the number of rows processed. */ uint64 DoCopyTo(CopyToState cstate) { - bool pipe = (cstate->filename == NULL); + bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; int num_phys_attrs; diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 3f6677b132..b77b935005 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState; typedef struct CopyToStateData *CopyToState; typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); +typedef void (*copy_data_dest_cb) (void *data, int len); extern void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void); */ extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query, Oid queryRelId, const char *filename, bool is_program, - List *attnamelist, List *options); + copy_data_dest_cb data_dest_cb, List *attnamelist, List *options); extern void EndCopyTo(CopyToState cstate); extern uint64 DoCopyTo(CopyToState cstate); extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel, diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile index 6c31c8707c..7b3f292965 100644 --- a/src/test/modules/Makefile +++ b/src/test/modules/Makefile @@ -15,6 +15,7 @@ SUBDIRS = \ snapshot_too_old \ spgist_name_ops \ test_bloomfilter \ + test_copy_callbacks \ test_ddl_deparse \ test_extensions \ test_ginpostinglist \ diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build index a80e6e2ce2..c2e5f5ffd5 100644 --- a/src/test/modules/meson.build +++ b/src/test/modules/meson.build @@ -9,6 +9,7 @@ subdir('snapshot_too_old') subdir('spgist_name_ops') subdir('ssl_passphrase_callback') subdir('test_bloomfilter') +subdir('test_copy_callbacks') subdir('test_ddl_deparse') subdir('test_extensions') subdir('test_ginpostinglist') diff --git a/src/test/modules/test_copy_callbacks/.gitignore b/src/test/modules/test_copy_callbacks/.gitignore new file mode 100644 index 0000000000..5dcb3ff972 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/.gitignore @@ -0,0 +1,4 @@ +# Generated subdirectories +/log/ +/results/ +/tmp_check/ diff --git a/src/test/modules/test_copy_callbacks/Makefile b/src/test/modules/test_copy_callbacks/Makefile new file mode 100644 index 0000000000..82e890150d --- /dev/null +++ b/src/test/modules/test_copy_callbacks/Makefile @@ -0,0 +1,23 @@ +# src/test/modules/test_copy_callbacks/Makefile + +MODULE_big = test_copy_callbacks +OBJS = \ + $(WIN32RES) \ + test_copy_callbacks.o +PGFILEDESC = "test_copy_callbacks - test COPY callbacks" + +EXTENSION = test_copy_callbacks +DATA = test_copy_callbacks--1.0.sql + +REGRESS = test_copy_callbacks + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_copy_callbacks +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out new file mode 100644 index 0000000000..93ebeef130 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/expected/test_copy_callbacks.out @@ -0,0 +1,13 @@ +CREATE EXTENSION test_copy_callbacks; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +SELECT test_copy_to_callback('public.test'::pg_catalog.regclass); +NOTICE: COPY TO callback called with data "1 2 3" and length 5 +NOTICE: COPY TO callback called with data "12 34 56" and length 8 +NOTICE: COPY TO callback called with data "123 456 789" and length 11 +NOTICE: COPY TO callback has processed 3 rows + test_copy_to_callback +----------------------- + +(1 row) + diff --git a/src/test/modules/test_copy_callbacks/meson.build b/src/test/modules/test_copy_callbacks/meson.build new file mode 100644 index 0000000000..43eca8e3d9 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/meson.build @@ -0,0 +1,34 @@ +# FIXME: prevent install during main install, but not during test :/ + +test_copy_callbacks_sources = files( + 'test_copy_callbacks.c', +) + +if host_system == 'windows' + test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'test_copy_callbacks', + '--FILEDESC', 'test_copy_callbacks - test COPY callbacks',]) +endif + +test_copy_callbacks = shared_module('test_copy_callbacks', + test_copy_callbacks_sources, + kwargs: pg_mod_args, +) +testprep_targets += test_copy_callbacks + +install_data( + 'test_copy_callbacks.control', + 'test_copy_callbacks--1.0.sql', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'test_copy_callbacks', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'regress': { + 'sql': [ + 'test_copy_callbacks', + ], + }, +} diff --git a/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql new file mode 100644 index 0000000000..2deffba635 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/sql/test_copy_callbacks.sql @@ -0,0 +1,4 @@ +CREATE EXTENSION test_copy_callbacks; +CREATE TABLE public.test (a INT, b INT, c INT); +INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789); +SELECT test_copy_to_callback('public.test'::pg_catalog.regclass); diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql new file mode 100644 index 0000000000..215cf3fad6 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql @@ -0,0 +1,8 @@ +/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit + +CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass) + RETURNS pg_catalog.void + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.c b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c new file mode 100644 index 0000000000..ecdbe4eee1 --- /dev/null +++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.c @@ -0,0 +1,51 @@ +/*-------------------------------------------------------------------------- + * + * test_copy_callbacks.c + * Code for testing COPY callbacks. + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/test/modules/test_copy_callbacks/test_copy_callbacks.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/table.h" +#include "commands/copy.h" +#include "fmgr.h" +#include "utils/rel.h" + +PG_MODULE_MAGIC; + +static void +to_cb(void *data, int len) +{ + ereport(NOTICE, + (errmsg("COPY TO callback called with data \"%s\" and length %d", + (char *) data, len))); +} + +PG_FUNCTION_INFO_V1(test_copy_to_callback); +Datum +test_copy_to_callback(PG_FUNCTION_ARGS) +{ + Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock); + CopyToState cstate; + int64 processed; + + cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL, + to_cb, NIL, NIL); + processed = DoCopyTo(cstate); + EndCopyTo(cstate); + + ereport(NOTICE, (errmsg("COPY TO callback has processed %lld rows", + (long long) processed))); + + table_close(rel, NoLock); + + PG_RETURN_VOID(); +} diff --git a/src/test/modules/test_copy_callbacks/test_copy_callbacks.control b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control new file mode 100644 index 0000000000..b7ce3f12ff --- /dev/null +++ b/src/test/modules/test_copy_callbacks/test_copy_callbacks.control @@ -0,0 +1,4 @@ +comment = 'Test code for COPY callbacks' +default_version = '1.0' +module_pathname = '$libdir/test_copy_callbacks' +relocatable = true diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 97c9bc1861..d9b839c979 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3177,6 +3177,7 @@ compare_context config_var_value contain_aggs_of_level_context convert_testexpr_context +copy_data_dest_cb copy_data_source_cb core_YYSTYPE core_yy_extra_type