
9f8377f7a added code to allow COPY FROM insert a column's default value when the input matches the DEFAULT string specified in the COPY command. Here we fix some inefficient code which needlessly palloc0'd an array to store if we should use the default value or input value for the given column. This array was being palloc0'd and pfree'd once per row. It's much more efficient to allocate this once and just reset the values once per row. Reported-by: Masahiko Sawada Author: Masahiko Sawada Discussion: https://postgr.es/m/CAD21AoDvDmUQeJtZrau1ovnT_smN940%3DKp6mszNGK3bq9yRN6g%40mail.gmail.com Backpatch-through: 16, where 9f8377f7a was introduced.
1774 lines
53 KiB
C
1774 lines
53 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* copyfrom.c
|
|
* COPY <table> FROM file/program/client
|
|
*
|
|
* This file contains routines needed to efficiently load tuples into a
|
|
* table. That includes looking up the correct partition, firing triggers,
|
|
* calling the table AM function to insert the data, and updating indexes.
|
|
* Reading data from the input file or client and parsing it into Datums
|
|
* is handled in copyfromparse.c.
|
|
*
|
|
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/commands/copyfrom.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include <ctype.h>
|
|
#include <unistd.h>
|
|
#include <sys/stat.h>
|
|
|
|
#include "access/heapam.h"
|
|
#include "access/htup_details.h"
|
|
#include "access/tableam.h"
|
|
#include "access/xact.h"
|
|
#include "access/xlog.h"
|
|
#include "catalog/namespace.h"
|
|
#include "commands/copy.h"
|
|
#include "commands/copyfrom_internal.h"
|
|
#include "commands/progress.h"
|
|
#include "commands/trigger.h"
|
|
#include "executor/execPartition.h"
|
|
#include "executor/executor.h"
|
|
#include "executor/nodeModifyTable.h"
|
|
#include "executor/tuptable.h"
|
|
#include "foreign/fdwapi.h"
|
|
#include "libpq/libpq.h"
|
|
#include "libpq/pqformat.h"
|
|
#include "miscadmin.h"
|
|
#include "optimizer/optimizer.h"
|
|
#include "pgstat.h"
|
|
#include "rewrite/rewriteHandler.h"
|
|
#include "storage/fd.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "utils/lsyscache.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/portal.h"
|
|
#include "utils/rel.h"
|
|
#include "utils/snapmgr.h"
|
|
|
|
/*
|
|
* No more than this many tuples per CopyMultiInsertBuffer
|
|
*
|
|
* Caution: Don't make this too big, as we could end up with this many
|
|
* CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
|
|
* multiInsertBuffers list. Increasing this can cause quadratic growth in
|
|
* memory requirements during copies into partitioned tables with a large
|
|
* number of partitions.
|
|
*/
|
|
#define MAX_BUFFERED_TUPLES 1000
|
|
|
|
/*
|
|
* Flush buffers if there are >= this many bytes, as counted by the input
|
|
* size, of tuples stored.
|
|
*/
|
|
#define MAX_BUFFERED_BYTES 65535
|
|
|
|
/* Trim the list of buffers back down to this number after flushing */
|
|
#define MAX_PARTITION_BUFFERS 32
|
|
|
|
/* Stores multi-insert data related to a single relation in CopyFrom. */
|
|
typedef struct CopyMultiInsertBuffer
|
|
{
|
|
TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
|
|
ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
|
|
BulkInsertState bistate; /* BulkInsertState for this rel if plain
|
|
* table; NULL if foreign table */
|
|
int nused; /* number of 'slots' containing tuples */
|
|
uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
|
|
* stream */
|
|
} CopyMultiInsertBuffer;
|
|
|
|
/*
|
|
* Stores one or many CopyMultiInsertBuffers and details about the size and
|
|
* number of tuples which are stored in them. This allows multiple buffers to
|
|
* exist at once when COPYing into a partitioned table.
|
|
*/
|
|
typedef struct CopyMultiInsertInfo
|
|
{
|
|
List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
|
|
int bufferedTuples; /* number of tuples buffered over all buffers */
|
|
int bufferedBytes; /* number of bytes from all buffered tuples */
|
|
CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
|
|
EState *estate; /* Executor state used for COPY */
|
|
CommandId mycid; /* Command Id used for COPY */
|
|
int ti_options; /* table insert options */
|
|
} CopyMultiInsertInfo;
|
|
|
|
|
|
/* non-export function prototypes */
|
|
static char *limit_printout_length(const char *str);
|
|
|
|
static void ClosePipeFromProgram(CopyFromState cstate);
|
|
|
|
/*
|
|
* error context callback for COPY FROM
|
|
*
|
|
* The argument for the error context must be CopyFromState.
|
|
*/
|
|
void
|
|
CopyFromErrorCallback(void *arg)
|
|
{
|
|
CopyFromState cstate = (CopyFromState) arg;
|
|
|
|
if (cstate->relname_only)
|
|
{
|
|
errcontext("COPY %s",
|
|
cstate->cur_relname);
|
|
return;
|
|
}
|
|
if (cstate->opts.binary)
|
|
{
|
|
/* can't usefully display the data */
|
|
if (cstate->cur_attname)
|
|
errcontext("COPY %s, line %llu, column %s",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno,
|
|
cstate->cur_attname);
|
|
else
|
|
errcontext("COPY %s, line %llu",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno);
|
|
}
|
|
else
|
|
{
|
|
if (cstate->cur_attname && cstate->cur_attval)
|
|
{
|
|
/* error is relevant to a particular column */
|
|
char *attval;
|
|
|
|
attval = limit_printout_length(cstate->cur_attval);
|
|
errcontext("COPY %s, line %llu, column %s: \"%s\"",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno,
|
|
cstate->cur_attname,
|
|
attval);
|
|
pfree(attval);
|
|
}
|
|
else if (cstate->cur_attname)
|
|
{
|
|
/* error is relevant to a particular column, value is NULL */
|
|
errcontext("COPY %s, line %llu, column %s: null input",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno,
|
|
cstate->cur_attname);
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Error is relevant to a particular line.
|
|
*
|
|
* If line_buf still contains the correct line, print it.
|
|
*/
|
|
if (cstate->line_buf_valid)
|
|
{
|
|
char *lineval;
|
|
|
|
lineval = limit_printout_length(cstate->line_buf.data);
|
|
errcontext("COPY %s, line %llu: \"%s\"",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno, lineval);
|
|
pfree(lineval);
|
|
}
|
|
else
|
|
{
|
|
errcontext("COPY %s, line %llu",
|
|
cstate->cur_relname,
|
|
(unsigned long long) cstate->cur_lineno);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Make sure we don't print an unreasonable amount of COPY data in a message.
|
|
*
|
|
* Returns a pstrdup'd copy of the input.
|
|
*/
|
|
static char *
|
|
limit_printout_length(const char *str)
|
|
{
|
|
#define MAX_COPY_DATA_DISPLAY 100
|
|
|
|
int slen = strlen(str);
|
|
int len;
|
|
char *res;
|
|
|
|
/* Fast path if definitely okay */
|
|
if (slen <= MAX_COPY_DATA_DISPLAY)
|
|
return pstrdup(str);
|
|
|
|
/* Apply encoding-dependent truncation */
|
|
len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
|
|
|
|
/*
|
|
* Truncate, and add "..." to show we truncated the input.
|
|
*/
|
|
res = (char *) palloc(len + 4);
|
|
memcpy(res, str, len);
|
|
strcpy(res + len, "...");
|
|
|
|
return res;
|
|
}
|
|
|
|
/*
|
|
* Allocate memory and initialize a new CopyMultiInsertBuffer for this
|
|
* ResultRelInfo.
|
|
*/
|
|
static CopyMultiInsertBuffer *
|
|
CopyMultiInsertBufferInit(ResultRelInfo *rri)
|
|
{
|
|
CopyMultiInsertBuffer *buffer;
|
|
|
|
buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
|
|
memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
|
|
buffer->resultRelInfo = rri;
|
|
buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
|
|
buffer->nused = 0;
|
|
|
|
return buffer;
|
|
}
|
|
|
|
/*
|
|
* Make a new buffer for this ResultRelInfo.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
|
|
ResultRelInfo *rri)
|
|
{
|
|
CopyMultiInsertBuffer *buffer;
|
|
|
|
buffer = CopyMultiInsertBufferInit(rri);
|
|
|
|
/* Setup back-link so we can easily find this buffer again */
|
|
rri->ri_CopyMultiInsertBuffer = buffer;
|
|
/* Record that we're tracking this buffer */
|
|
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
|
|
}
|
|
|
|
/*
|
|
* Initialize an already allocated CopyMultiInsertInfo.
|
|
*
|
|
* If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
|
|
* for that table.
|
|
*/
|
|
static void
|
|
CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
|
|
CopyFromState cstate, EState *estate, CommandId mycid,
|
|
int ti_options)
|
|
{
|
|
miinfo->multiInsertBuffers = NIL;
|
|
miinfo->bufferedTuples = 0;
|
|
miinfo->bufferedBytes = 0;
|
|
miinfo->cstate = cstate;
|
|
miinfo->estate = estate;
|
|
miinfo->mycid = mycid;
|
|
miinfo->ti_options = ti_options;
|
|
|
|
/*
|
|
* Only setup the buffer when not dealing with a partitioned table.
|
|
* Buffers for partitioned tables will just be setup when we need to send
|
|
* tuples their way for the first time.
|
|
*/
|
|
if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
|
|
CopyMultiInsertInfoSetupBuffer(miinfo, rri);
|
|
}
|
|
|
|
/*
|
|
* Returns true if the buffers are full
|
|
*/
|
|
static inline bool
|
|
CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
|
|
{
|
|
if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
|
|
miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Returns true if we have no buffered tuples
|
|
*/
|
|
static inline bool
|
|
CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
|
|
{
|
|
return miinfo->bufferedTuples == 0;
|
|
}
|
|
|
|
/*
|
|
* Write the tuples stored in 'buffer' out to the table.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
|
|
CopyMultiInsertBuffer *buffer,
|
|
int64 *processed)
|
|
{
|
|
CopyFromState cstate = miinfo->cstate;
|
|
EState *estate = miinfo->estate;
|
|
int nused = buffer->nused;
|
|
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
|
|
TupleTableSlot **slots = buffer->slots;
|
|
int i;
|
|
|
|
if (resultRelInfo->ri_FdwRoutine)
|
|
{
|
|
int batch_size = resultRelInfo->ri_BatchSize;
|
|
int sent = 0;
|
|
|
|
Assert(buffer->bistate == NULL);
|
|
|
|
/* Ensure that the FDW supports batching and it's enabled */
|
|
Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
|
|
Assert(batch_size > 1);
|
|
|
|
/*
|
|
* We suppress error context information other than the relation name,
|
|
* if one of the operations below fails.
|
|
*/
|
|
Assert(!cstate->relname_only);
|
|
cstate->relname_only = true;
|
|
|
|
while (sent < nused)
|
|
{
|
|
int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
|
|
int inserted = size;
|
|
TupleTableSlot **rslots;
|
|
|
|
/* insert into foreign table: let the FDW do it */
|
|
rslots =
|
|
resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
|
|
resultRelInfo,
|
|
&slots[sent],
|
|
NULL,
|
|
&inserted);
|
|
|
|
sent += size;
|
|
|
|
/* No need to do anything if there are no inserted rows */
|
|
if (inserted <= 0)
|
|
continue;
|
|
|
|
/* Triggers on foreign tables should not have transition tables */
|
|
Assert(resultRelInfo->ri_TrigDesc == NULL ||
|
|
resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
|
|
|
|
/* Run AFTER ROW INSERT triggers */
|
|
if (resultRelInfo->ri_TrigDesc != NULL &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_after_row)
|
|
{
|
|
Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
|
|
|
for (i = 0; i < inserted; i++)
|
|
{
|
|
TupleTableSlot *slot = rslots[i];
|
|
|
|
/*
|
|
* AFTER ROW Triggers might reference the tableoid column,
|
|
* so (re-)initialize tts_tableOid before evaluating them.
|
|
*/
|
|
slot->tts_tableOid = relid;
|
|
|
|
ExecARInsertTriggers(estate, resultRelInfo,
|
|
slot, NIL,
|
|
cstate->transition_capture);
|
|
}
|
|
}
|
|
|
|
/* Update the row counter and progress of the COPY command */
|
|
*processed += inserted;
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
|
|
*processed);
|
|
}
|
|
|
|
for (i = 0; i < nused; i++)
|
|
ExecClearTuple(slots[i]);
|
|
|
|
/* reset relname_only */
|
|
cstate->relname_only = false;
|
|
}
|
|
else
|
|
{
|
|
CommandId mycid = miinfo->mycid;
|
|
int ti_options = miinfo->ti_options;
|
|
bool line_buf_valid = cstate->line_buf_valid;
|
|
uint64 save_cur_lineno = cstate->cur_lineno;
|
|
MemoryContext oldcontext;
|
|
|
|
Assert(buffer->bistate != NULL);
|
|
|
|
/*
|
|
* Print error context information correctly, if one of the operations
|
|
* below fails.
|
|
*/
|
|
cstate->line_buf_valid = false;
|
|
|
|
/*
|
|
* table_multi_insert may leak memory, so switch to short-lived memory
|
|
* context before calling it.
|
|
*/
|
|
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
|
table_multi_insert(resultRelInfo->ri_RelationDesc,
|
|
slots,
|
|
nused,
|
|
mycid,
|
|
ti_options,
|
|
buffer->bistate);
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
for (i = 0; i < nused; i++)
|
|
{
|
|
/*
|
|
* If there are any indexes, update them for all the inserted
|
|
* tuples, and run AFTER ROW INSERT triggers.
|
|
*/
|
|
if (resultRelInfo->ri_NumIndices > 0)
|
|
{
|
|
List *recheckIndexes;
|
|
|
|
cstate->cur_lineno = buffer->linenos[i];
|
|
recheckIndexes =
|
|
ExecInsertIndexTuples(resultRelInfo,
|
|
buffer->slots[i], estate, false,
|
|
false, NULL, NIL, false);
|
|
ExecARInsertTriggers(estate, resultRelInfo,
|
|
slots[i], recheckIndexes,
|
|
cstate->transition_capture);
|
|
list_free(recheckIndexes);
|
|
}
|
|
|
|
/*
|
|
* There's no indexes, but see if we need to run AFTER ROW INSERT
|
|
* triggers anyway.
|
|
*/
|
|
else if (resultRelInfo->ri_TrigDesc != NULL &&
|
|
(resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
|
|
resultRelInfo->ri_TrigDesc->trig_insert_new_table))
|
|
{
|
|
cstate->cur_lineno = buffer->linenos[i];
|
|
ExecARInsertTriggers(estate, resultRelInfo,
|
|
slots[i], NIL,
|
|
cstate->transition_capture);
|
|
}
|
|
|
|
ExecClearTuple(slots[i]);
|
|
}
|
|
|
|
/* Update the row counter and progress of the COPY command */
|
|
*processed += nused;
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
|
|
*processed);
|
|
|
|
/* reset cur_lineno and line_buf_valid to what they were */
|
|
cstate->line_buf_valid = line_buf_valid;
|
|
cstate->cur_lineno = save_cur_lineno;
|
|
}
|
|
|
|
/* Mark that all slots are free */
|
|
buffer->nused = 0;
|
|
}
|
|
|
|
/*
|
|
* Drop used slots and free member for this buffer.
|
|
*
|
|
* The buffer must be flushed before cleanup.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
|
|
CopyMultiInsertBuffer *buffer)
|
|
{
|
|
ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
|
|
int i;
|
|
|
|
/* Ensure buffer was flushed */
|
|
Assert(buffer->nused == 0);
|
|
|
|
/* Remove back-link to ourself */
|
|
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
|
|
|
|
if (resultRelInfo->ri_FdwRoutine == NULL)
|
|
{
|
|
Assert(buffer->bistate != NULL);
|
|
FreeBulkInsertState(buffer->bistate);
|
|
}
|
|
else
|
|
Assert(buffer->bistate == NULL);
|
|
|
|
/* Since we only create slots on demand, just drop the non-null ones. */
|
|
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
|
|
ExecDropSingleTupleTableSlot(buffer->slots[i]);
|
|
|
|
if (resultRelInfo->ri_FdwRoutine == NULL)
|
|
table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
|
|
miinfo->ti_options);
|
|
|
|
pfree(buffer);
|
|
}
|
|
|
|
/*
|
|
* Write out all stored tuples in all buffers out to the tables.
|
|
*
|
|
* Once flushed we also trim the tracked buffers list down to size by removing
|
|
* the buffers created earliest first.
|
|
*
|
|
* Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
|
|
* used. When cleaning up old buffers we'll never remove the one for
|
|
* 'curr_rri'.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
|
|
int64 *processed)
|
|
{
|
|
ListCell *lc;
|
|
|
|
foreach(lc, miinfo->multiInsertBuffers)
|
|
{
|
|
CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
|
|
|
|
CopyMultiInsertBufferFlush(miinfo, buffer, processed);
|
|
}
|
|
|
|
miinfo->bufferedTuples = 0;
|
|
miinfo->bufferedBytes = 0;
|
|
|
|
/*
|
|
* Trim the list of tracked buffers down if it exceeds the limit. Here we
|
|
* remove buffers starting with the ones we created first. It seems less
|
|
* likely that these older ones will be needed than the ones that were
|
|
* just created.
|
|
*/
|
|
while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
|
|
{
|
|
CopyMultiInsertBuffer *buffer;
|
|
|
|
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
|
|
|
|
/*
|
|
* We never want to remove the buffer that's currently being used, so
|
|
* if we happen to find that then move it to the end of the list.
|
|
*/
|
|
if (buffer->resultRelInfo == curr_rri)
|
|
{
|
|
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
|
|
miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
|
|
buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
|
|
}
|
|
|
|
CopyMultiInsertBufferCleanup(miinfo, buffer);
|
|
miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Cleanup allocated buffers and free memory
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
|
|
{
|
|
ListCell *lc;
|
|
|
|
foreach(lc, miinfo->multiInsertBuffers)
|
|
CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
|
|
|
|
list_free(miinfo->multiInsertBuffers);
|
|
}
|
|
|
|
/*
|
|
* Get the next TupleTableSlot that the next tuple should be stored in.
|
|
*
|
|
* Callers must ensure that the buffer is not full.
|
|
*
|
|
* Note: 'miinfo' is unused but has been included for consistency with the
|
|
* other functions in this area.
|
|
*/
|
|
static inline TupleTableSlot *
|
|
CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
|
|
ResultRelInfo *rri)
|
|
{
|
|
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
|
|
int nused = buffer->nused;
|
|
|
|
Assert(buffer != NULL);
|
|
Assert(nused < MAX_BUFFERED_TUPLES);
|
|
|
|
if (buffer->slots[nused] == NULL)
|
|
buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
|
|
return buffer->slots[nused];
|
|
}
|
|
|
|
/*
|
|
* Record the previously reserved TupleTableSlot that was reserved by
|
|
* CopyMultiInsertInfoNextFreeSlot as being consumed.
|
|
*/
|
|
static inline void
|
|
CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
|
|
TupleTableSlot *slot, int tuplen, uint64 lineno)
|
|
{
|
|
CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
|
|
|
|
Assert(buffer != NULL);
|
|
Assert(slot == buffer->slots[buffer->nused]);
|
|
|
|
/* Store the line number so we can properly report any errors later */
|
|
buffer->linenos[buffer->nused] = lineno;
|
|
|
|
/* Record this slot as being used */
|
|
buffer->nused++;
|
|
|
|
/* Update how many tuples are stored and their size */
|
|
miinfo->bufferedTuples++;
|
|
miinfo->bufferedBytes += tuplen;
|
|
}
|
|
|
|
/*
|
|
* Copy FROM file to relation.
|
|
*/
|
|
uint64
|
|
CopyFrom(CopyFromState cstate)
|
|
{
|
|
ResultRelInfo *resultRelInfo;
|
|
ResultRelInfo *target_resultRelInfo;
|
|
ResultRelInfo *prevResultRelInfo = NULL;
|
|
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
|
|
ModifyTableState *mtstate;
|
|
ExprContext *econtext;
|
|
TupleTableSlot *singleslot = NULL;
|
|
MemoryContext oldcontext = CurrentMemoryContext;
|
|
|
|
PartitionTupleRouting *proute = NULL;
|
|
ErrorContextCallback errcallback;
|
|
CommandId mycid = GetCurrentCommandId(true);
|
|
int ti_options = 0; /* start with default options for insert */
|
|
BulkInsertState bistate = NULL;
|
|
CopyInsertMethod insertMethod;
|
|
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
|
|
int64 processed = 0;
|
|
int64 excluded = 0;
|
|
bool has_before_insert_row_trig;
|
|
bool has_instead_insert_row_trig;
|
|
bool leafpart_use_multi_insert = false;
|
|
|
|
Assert(cstate->rel);
|
|
Assert(list_length(cstate->range_table) == 1);
|
|
|
|
/*
|
|
* The target must be a plain, foreign, or partitioned relation, or have
|
|
* an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
|
|
* allowed on views, so we only hint about them in the view case.)
|
|
*/
|
|
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
|
|
cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
|
|
cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
|
|
!(cstate->rel->trigdesc &&
|
|
cstate->rel->trigdesc->trig_insert_instead_row))
|
|
{
|
|
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to view \"%s\"",
|
|
RelationGetRelationName(cstate->rel)),
|
|
errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
|
|
else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to materialized view \"%s\"",
|
|
RelationGetRelationName(cstate->rel))));
|
|
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to sequence \"%s\"",
|
|
RelationGetRelationName(cstate->rel))));
|
|
else
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("cannot copy to non-table relation \"%s\"",
|
|
RelationGetRelationName(cstate->rel))));
|
|
}
|
|
|
|
/*
|
|
* If the target file is new-in-transaction, we assume that checking FSM
|
|
* for free space is a waste of time. This could possibly be wrong, but
|
|
* it's unlikely.
|
|
*/
|
|
if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
|
|
(cstate->rel->rd_createSubid != InvalidSubTransactionId ||
|
|
cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
|
|
ti_options |= TABLE_INSERT_SKIP_FSM;
|
|
|
|
/*
|
|
* Optimize if new relation storage was created in this subxact or one of
|
|
* its committed children and we won't see those rows later as part of an
|
|
* earlier scan or command. The subxact test ensures that if this subxact
|
|
* aborts then the frozen rows won't be visible after xact cleanup. Note
|
|
* that the stronger test of exactly which subtransaction created it is
|
|
* crucial for correctness of this optimization. The test for an earlier
|
|
* scan or command tolerates false negatives. FREEZE causes other sessions
|
|
* to see rows they would not see under MVCC, and a false negative merely
|
|
* spreads that anomaly to the current session.
|
|
*/
|
|
if (cstate->opts.freeze)
|
|
{
|
|
/*
|
|
* We currently disallow COPY FREEZE on partitioned tables. The
|
|
* reason for this is that we've simply not yet opened the partitions
|
|
* to determine if the optimization can be applied to them. We could
|
|
* go and open them all here, but doing so may be quite a costly
|
|
* overhead for small copies. In any case, we may just end up routing
|
|
* tuples to a small number of partitions. It seems better just to
|
|
* raise an ERROR for partitioned tables.
|
|
*/
|
|
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
|
{
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("cannot perform COPY FREEZE on a partitioned table")));
|
|
}
|
|
|
|
/*
|
|
* Tolerate one registration for the benefit of FirstXactSnapshot.
|
|
* Scan-bearing queries generally create at least two registrations,
|
|
* though relying on that is fragile, as is ignoring ActiveSnapshot.
|
|
* Clear CatalogSnapshot to avoid counting its registration. We'll
|
|
* still detect ongoing catalog scans, each of which separately
|
|
* registers the snapshot it uses.
|
|
*/
|
|
InvalidateCatalogSnapshot();
|
|
if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
|
|
errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
|
|
|
|
if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
|
|
cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
|
|
|
|
ti_options |= TABLE_INSERT_FROZEN;
|
|
}
|
|
|
|
/*
|
|
* We need a ResultRelInfo so we can use the regular executor's
|
|
* index-entry-making machinery. (There used to be a huge amount of code
|
|
* here that basically duplicated execUtils.c ...)
|
|
*/
|
|
ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
|
|
resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
|
|
ExecInitResultRelation(estate, resultRelInfo, 1);
|
|
|
|
/* Verify the named relation is a valid target for INSERT */
|
|
CheckValidResultRel(resultRelInfo, CMD_INSERT);
|
|
|
|
ExecOpenIndices(resultRelInfo, false);
|
|
|
|
/*
|
|
* Set up a ModifyTableState so we can let FDW(s) init themselves for
|
|
* foreign-table result relation(s).
|
|
*/
|
|
mtstate = makeNode(ModifyTableState);
|
|
mtstate->ps.plan = NULL;
|
|
mtstate->ps.state = estate;
|
|
mtstate->operation = CMD_INSERT;
|
|
mtstate->mt_nrels = 1;
|
|
mtstate->resultRelInfo = resultRelInfo;
|
|
mtstate->rootResultRelInfo = resultRelInfo;
|
|
|
|
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
|
resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
|
|
resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
|
|
resultRelInfo);
|
|
|
|
/*
|
|
* Also, if the named relation is a foreign table, determine if the FDW
|
|
* supports batch insert and determine the batch size (a FDW may support
|
|
* batching, but it may be disabled for the server/table).
|
|
*
|
|
* If the FDW does not support batching, we set the batch size to 1.
|
|
*/
|
|
if (resultRelInfo->ri_FdwRoutine != NULL &&
|
|
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
|
|
resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
|
|
resultRelInfo->ri_BatchSize =
|
|
resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
|
|
else
|
|
resultRelInfo->ri_BatchSize = 1;
|
|
|
|
Assert(resultRelInfo->ri_BatchSize >= 1);
|
|
|
|
/* Prepare to catch AFTER triggers. */
|
|
AfterTriggerBeginQuery();
|
|
|
|
/*
|
|
* If there are any triggers with transition tables on the named relation,
|
|
* we need to be prepared to capture transition tuples.
|
|
*
|
|
* Because partition tuple routing would like to know about whether
|
|
* transition capture is active, we also set it in mtstate, which is
|
|
* passed to ExecFindPartition() below.
|
|
*/
|
|
cstate->transition_capture = mtstate->mt_transition_capture =
|
|
MakeTransitionCaptureState(cstate->rel->trigdesc,
|
|
RelationGetRelid(cstate->rel),
|
|
CMD_INSERT);
|
|
|
|
/*
|
|
* If the named relation is a partitioned table, initialize state for
|
|
* CopyFrom tuple routing.
|
|
*/
|
|
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
|
|
proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
|
|
|
|
if (cstate->whereClause)
|
|
cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
|
|
&mtstate->ps);
|
|
|
|
/*
|
|
* It's generally more efficient to prepare a bunch of tuples for
|
|
* insertion, and insert them in one
|
|
* table_multi_insert()/ExecForeignBatchInsert() call, than call
|
|
* table_tuple_insert()/ExecForeignInsert() separately for every tuple.
|
|
* However, there are a number of reasons why we might not be able to do
|
|
* this. These are explained below.
|
|
*/
|
|
if (resultRelInfo->ri_TrigDesc != NULL &&
|
|
(resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
|
|
resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
|
|
{
|
|
/*
|
|
* Can't support multi-inserts when there are any BEFORE/INSTEAD OF
|
|
* triggers on the table. Such triggers might query the table we're
|
|
* inserting into and act differently if the tuples that have already
|
|
* been processed and prepared for insertion are not there.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (resultRelInfo->ri_FdwRoutine != NULL &&
|
|
resultRelInfo->ri_BatchSize == 1)
|
|
{
|
|
/*
|
|
* Can't support multi-inserts to a foreign table if the FDW does not
|
|
* support batching, or it's disabled for the server or foreign table.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_new_table)
|
|
{
|
|
/*
|
|
* For partitioned tables we can't support multi-inserts when there
|
|
* are any statement level insert triggers. It might be possible to
|
|
* allow partitioned tables with such triggers in the future, but for
|
|
* now, CopyMultiInsertInfoFlush expects that any after row insert and
|
|
* statement level insert triggers are on the same relation.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (cstate->volatile_defexprs)
|
|
{
|
|
/*
|
|
* Can't support multi-inserts if there are any volatile default
|
|
* expressions in the table. Similarly to the trigger case above,
|
|
* such expressions may query the table we're inserting into.
|
|
*
|
|
* Note: It does not matter if any partitions have any volatile
|
|
* default expressions as we use the defaults from the target of the
|
|
* COPY command.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else if (contain_volatile_functions(cstate->whereClause))
|
|
{
|
|
/*
|
|
* Can't support multi-inserts if there are any volatile function
|
|
* expressions in WHERE clause. Similarly to the trigger case above,
|
|
* such expressions may query the table we're inserting into.
|
|
*/
|
|
insertMethod = CIM_SINGLE;
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* For partitioned tables, we may still be able to perform bulk
|
|
* inserts. However, the possibility of this depends on which types
|
|
* of triggers exist on the partition. We must disable bulk inserts
|
|
* if the partition is a foreign table that can't use batching or it
|
|
* has any before row insert or insert instead triggers (same as we
|
|
* checked above for the parent table). Since the partition's
|
|
* resultRelInfos are initialized only when we actually need to insert
|
|
* the first tuple into them, we must have the intermediate insert
|
|
* method of CIM_MULTI_CONDITIONAL to flag that we must later
|
|
* determine if we can use bulk-inserts for the partition being
|
|
* inserted into.
|
|
*/
|
|
if (proute)
|
|
insertMethod = CIM_MULTI_CONDITIONAL;
|
|
else
|
|
insertMethod = CIM_MULTI;
|
|
|
|
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
|
|
estate, mycid, ti_options);
|
|
}
|
|
|
|
/*
|
|
* If not using batch mode (which allocates slots as needed) set up a
|
|
* tuple slot too. When inserting into a partitioned table, we also need
|
|
* one, even if we might batch insert, to read the tuple in the root
|
|
* partition's form.
|
|
*/
|
|
if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
|
|
{
|
|
singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
|
|
&estate->es_tupleTable);
|
|
bistate = GetBulkInsertState();
|
|
}
|
|
|
|
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_before_row);
|
|
|
|
has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
|
|
|
|
/*
|
|
* Check BEFORE STATEMENT insertion triggers. It's debatable whether we
|
|
* should do this for COPY, since it's not really an "INSERT" statement as
|
|
* such. However, executing these triggers maintains consistency with the
|
|
* EACH ROW triggers that we already fire on COPY.
|
|
*/
|
|
ExecBSInsertTriggers(estate, resultRelInfo);
|
|
|
|
econtext = GetPerTupleExprContext(estate);
|
|
|
|
/* Set up callback to identify error line number */
|
|
errcallback.callback = CopyFromErrorCallback;
|
|
errcallback.arg = (void *) cstate;
|
|
errcallback.previous = error_context_stack;
|
|
error_context_stack = &errcallback;
|
|
|
|
for (;;)
|
|
{
|
|
TupleTableSlot *myslot;
|
|
bool skip_tuple;
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
/*
|
|
* Reset the per-tuple exprcontext. We do this after every tuple, to
|
|
* clean-up after expression evaluations etc.
|
|
*/
|
|
ResetPerTupleExprContext(estate);
|
|
|
|
/* select slot to (initially) load row into */
|
|
if (insertMethod == CIM_SINGLE || proute)
|
|
{
|
|
myslot = singleslot;
|
|
Assert(myslot != NULL);
|
|
}
|
|
else
|
|
{
|
|
Assert(resultRelInfo == target_resultRelInfo);
|
|
Assert(insertMethod == CIM_MULTI);
|
|
|
|
myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
|
|
resultRelInfo);
|
|
}
|
|
|
|
/*
|
|
* Switch to per-tuple context before calling NextCopyFrom, which does
|
|
* evaluate default expressions etc. and requires per-tuple context.
|
|
*/
|
|
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
|
|
|
ExecClearTuple(myslot);
|
|
|
|
/* Directly store the values/nulls array in the slot */
|
|
if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
|
|
break;
|
|
|
|
ExecStoreVirtualTuple(myslot);
|
|
|
|
/*
|
|
* Constraints and where clause might reference the tableoid column,
|
|
* so (re-)initialize tts_tableOid before evaluating them.
|
|
*/
|
|
myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
|
|
|
|
/* Triggers and stuff need to be invoked in query context. */
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
if (cstate->whereClause)
|
|
{
|
|
econtext->ecxt_scantuple = myslot;
|
|
/* Skip items that don't match COPY's WHERE clause */
|
|
if (!ExecQual(cstate->qualexpr, econtext))
|
|
{
|
|
/*
|
|
* Report that this tuple was filtered out by the WHERE
|
|
* clause.
|
|
*/
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
|
|
++excluded);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
/* Determine the partition to insert the tuple into */
|
|
if (proute)
|
|
{
|
|
TupleConversionMap *map;
|
|
|
|
/*
|
|
* Attempt to find a partition suitable for this tuple.
|
|
* ExecFindPartition() will raise an error if none can be found or
|
|
* if the found partition is not suitable for INSERTs.
|
|
*/
|
|
resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
|
|
proute, myslot, estate);
|
|
|
|
if (prevResultRelInfo != resultRelInfo)
|
|
{
|
|
/* Determine which triggers exist on this partition */
|
|
has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_before_row);
|
|
|
|
has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
|
|
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
|
|
|
|
/*
|
|
* Disable multi-inserts when the partition has BEFORE/INSTEAD
|
|
* OF triggers, or if the partition is a foreign table that
|
|
* can't use batching.
|
|
*/
|
|
leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
|
|
!has_before_insert_row_trig &&
|
|
!has_instead_insert_row_trig &&
|
|
(resultRelInfo->ri_FdwRoutine == NULL ||
|
|
resultRelInfo->ri_BatchSize > 1);
|
|
|
|
/* Set the multi-insert buffer to use for this partition. */
|
|
if (leafpart_use_multi_insert)
|
|
{
|
|
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
|
|
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
|
|
resultRelInfo);
|
|
}
|
|
else if (insertMethod == CIM_MULTI_CONDITIONAL &&
|
|
!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
|
|
{
|
|
/*
|
|
* Flush pending inserts if this partition can't use
|
|
* batching, so rows are visible to triggers etc.
|
|
*/
|
|
CopyMultiInsertInfoFlush(&multiInsertInfo,
|
|
resultRelInfo,
|
|
&processed);
|
|
}
|
|
|
|
if (bistate != NULL)
|
|
ReleaseBulkInsertStatePin(bistate);
|
|
prevResultRelInfo = resultRelInfo;
|
|
}
|
|
|
|
/*
|
|
* If we're capturing transition tuples, we might need to convert
|
|
* from the partition rowtype to root rowtype. But if there are no
|
|
* BEFORE triggers on the partition that could change the tuple,
|
|
* we can just remember the original unconverted tuple to avoid a
|
|
* needless round trip conversion.
|
|
*/
|
|
if (cstate->transition_capture != NULL)
|
|
cstate->transition_capture->tcs_original_insert_tuple =
|
|
!has_before_insert_row_trig ? myslot : NULL;
|
|
|
|
/*
|
|
* We might need to convert from the root rowtype to the partition
|
|
* rowtype.
|
|
*/
|
|
map = ExecGetRootToChildMap(resultRelInfo, estate);
|
|
if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
|
|
{
|
|
/* non batch insert */
|
|
if (map != NULL)
|
|
{
|
|
TupleTableSlot *new_slot;
|
|
|
|
new_slot = resultRelInfo->ri_PartitionTupleSlot;
|
|
myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Prepare to queue up tuple for later batch insert into
|
|
* current partition.
|
|
*/
|
|
TupleTableSlot *batchslot;
|
|
|
|
/* no other path available for partitioned table */
|
|
Assert(insertMethod == CIM_MULTI_CONDITIONAL);
|
|
|
|
batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
|
|
resultRelInfo);
|
|
|
|
if (map != NULL)
|
|
myslot = execute_attr_map_slot(map->attrMap, myslot,
|
|
batchslot);
|
|
else
|
|
{
|
|
/*
|
|
* This looks more expensive than it is (Believe me, I
|
|
* optimized it away. Twice.). The input is in virtual
|
|
* form, and we'll materialize the slot below - for most
|
|
* slot types the copy performs the work materialization
|
|
* would later require anyway.
|
|
*/
|
|
ExecCopySlot(batchslot, myslot);
|
|
myslot = batchslot;
|
|
}
|
|
}
|
|
|
|
/* ensure that triggers etc see the right relation */
|
|
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
|
}
|
|
|
|
skip_tuple = false;
|
|
|
|
/* BEFORE ROW INSERT Triggers */
|
|
if (has_before_insert_row_trig)
|
|
{
|
|
if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
|
|
skip_tuple = true; /* "do nothing" */
|
|
}
|
|
|
|
if (!skip_tuple)
|
|
{
|
|
/*
|
|
* If there is an INSTEAD OF INSERT ROW trigger, let it handle the
|
|
* tuple. Otherwise, proceed with inserting the tuple into the
|
|
* table or foreign table.
|
|
*/
|
|
if (has_instead_insert_row_trig)
|
|
{
|
|
ExecIRInsertTriggers(estate, resultRelInfo, myslot);
|
|
}
|
|
else
|
|
{
|
|
/* Compute stored generated columns */
|
|
if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
|
|
resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
|
|
ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
|
|
CMD_INSERT);
|
|
|
|
/*
|
|
* If the target is a plain table, check the constraints of
|
|
* the tuple.
|
|
*/
|
|
if (resultRelInfo->ri_FdwRoutine == NULL &&
|
|
resultRelInfo->ri_RelationDesc->rd_att->constr)
|
|
ExecConstraints(resultRelInfo, myslot, estate);
|
|
|
|
/*
|
|
* Also check the tuple against the partition constraint, if
|
|
* there is one; except that if we got here via tuple-routing,
|
|
* we don't need to if there's no BR trigger defined on the
|
|
* partition.
|
|
*/
|
|
if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
|
|
(proute == NULL || has_before_insert_row_trig))
|
|
ExecPartitionCheck(resultRelInfo, myslot, estate, true);
|
|
|
|
/* Store the slot in the multi-insert buffer, when enabled. */
|
|
if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
|
|
{
|
|
/*
|
|
* The slot previously might point into the per-tuple
|
|
* context. For batching it needs to be longer lived.
|
|
*/
|
|
ExecMaterializeSlot(myslot);
|
|
|
|
/* Add this tuple to the tuple buffer */
|
|
CopyMultiInsertInfoStore(&multiInsertInfo,
|
|
resultRelInfo, myslot,
|
|
cstate->line_buf.len,
|
|
cstate->cur_lineno);
|
|
|
|
/*
|
|
* If enough inserts have queued up, then flush all
|
|
* buffers out to their tables.
|
|
*/
|
|
if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
|
|
CopyMultiInsertInfoFlush(&multiInsertInfo,
|
|
resultRelInfo,
|
|
&processed);
|
|
|
|
/*
|
|
* We delay updating the row counter and progress of the
|
|
* COPY command until after writing the tuples stored in
|
|
* the buffer out to the table, as in single insert mode.
|
|
* See CopyMultiInsertBufferFlush().
|
|
*/
|
|
continue; /* next tuple please */
|
|
}
|
|
else
|
|
{
|
|
List *recheckIndexes = NIL;
|
|
|
|
/* OK, store the tuple */
|
|
if (resultRelInfo->ri_FdwRoutine != NULL)
|
|
{
|
|
myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
|
|
resultRelInfo,
|
|
myslot,
|
|
NULL);
|
|
|
|
if (myslot == NULL) /* "do nothing" */
|
|
continue; /* next tuple please */
|
|
|
|
/*
|
|
* AFTER ROW Triggers might reference the tableoid
|
|
* column, so (re-)initialize tts_tableOid before
|
|
* evaluating them.
|
|
*/
|
|
myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
|
|
}
|
|
else
|
|
{
|
|
/* OK, store the tuple and create index entries for it */
|
|
table_tuple_insert(resultRelInfo->ri_RelationDesc,
|
|
myslot, mycid, ti_options, bistate);
|
|
|
|
if (resultRelInfo->ri_NumIndices > 0)
|
|
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
|
|
myslot,
|
|
estate,
|
|
false,
|
|
false,
|
|
NULL,
|
|
NIL,
|
|
false);
|
|
}
|
|
|
|
/* AFTER ROW INSERT Triggers */
|
|
ExecARInsertTriggers(estate, resultRelInfo, myslot,
|
|
recheckIndexes, cstate->transition_capture);
|
|
|
|
list_free(recheckIndexes);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* We count only tuples not suppressed by a BEFORE INSERT trigger
|
|
* or FDW; this is the same definition used by nodeModifyTable.c
|
|
* for counting tuples inserted by an INSERT command. Update
|
|
* progress of the COPY command as well.
|
|
*/
|
|
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
|
|
++processed);
|
|
}
|
|
}
|
|
|
|
/* Flush any remaining buffered tuples */
|
|
if (insertMethod != CIM_SINGLE)
|
|
{
|
|
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
|
|
CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
|
|
}
|
|
|
|
/* Done, clean up */
|
|
error_context_stack = errcallback.previous;
|
|
|
|
if (bistate != NULL)
|
|
FreeBulkInsertState(bistate);
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
/* Execute AFTER STATEMENT insertion triggers */
|
|
ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
|
|
|
|
/* Handle queued AFTER triggers */
|
|
AfterTriggerEndQuery(estate);
|
|
|
|
ExecResetTupleTable(estate->es_tupleTable, false);
|
|
|
|
/* Allow the FDW to shut down */
|
|
if (target_resultRelInfo->ri_FdwRoutine != NULL &&
|
|
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
|
|
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
|
|
target_resultRelInfo);
|
|
|
|
/* Tear down the multi-insert buffer data */
|
|
if (insertMethod != CIM_SINGLE)
|
|
CopyMultiInsertInfoCleanup(&multiInsertInfo);
|
|
|
|
/* Close all the partitioned tables, leaf partitions, and their indices */
|
|
if (proute)
|
|
ExecCleanupTupleRouting(mtstate, proute);
|
|
|
|
/* Close the result relations, including any trigger target relations */
|
|
ExecCloseResultRelations(estate);
|
|
ExecCloseRangeTableRelations(estate);
|
|
|
|
FreeExecutorState(estate);
|
|
|
|
return processed;
|
|
}
|
|
|
|
/*
|
|
* Setup to read tuples from a file for COPY FROM.
|
|
*
|
|
* 'rel': Used as a template for the tuples
|
|
* 'whereClause': WHERE clause from the COPY FROM command
|
|
* 'filename': Name of server-local file to read, NULL for STDIN
|
|
* 'is_program': true if 'filename' is program to execute
|
|
* 'data_source_cb': callback that provides the input data
|
|
* 'attnamelist': List of char *, columns to include. NIL selects all cols.
|
|
* 'options': List of DefElem. See copy_opt_item in gram.y for selections.
|
|
*
|
|
* Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
|
|
*/
|
|
CopyFromState
|
|
BeginCopyFrom(ParseState *pstate,
|
|
Relation rel,
|
|
Node *whereClause,
|
|
const char *filename,
|
|
bool is_program,
|
|
copy_data_source_cb data_source_cb,
|
|
List *attnamelist,
|
|
List *options)
|
|
{
|
|
CopyFromState cstate;
|
|
bool pipe = (filename == NULL);
|
|
TupleDesc tupDesc;
|
|
AttrNumber num_phys_attrs,
|
|
num_defaults;
|
|
FmgrInfo *in_functions;
|
|
Oid *typioparams;
|
|
Oid in_func_oid;
|
|
int *defmap;
|
|
ExprState **defexprs;
|
|
MemoryContext oldcontext;
|
|
bool volatile_defexprs;
|
|
const int progress_cols[] = {
|
|
PROGRESS_COPY_COMMAND,
|
|
PROGRESS_COPY_TYPE,
|
|
PROGRESS_COPY_BYTES_TOTAL
|
|
};
|
|
int64 progress_vals[] = {
|
|
PROGRESS_COPY_COMMAND_FROM,
|
|
0,
|
|
0
|
|
};
|
|
|
|
/* Allocate workspace and zero all fields */
|
|
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
|
|
|
|
/*
|
|
* We allocate everything used by a cstate in a new memory context. This
|
|
* avoids memory leaks during repeated use of COPY in a query.
|
|
*/
|
|
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
|
|
"COPY",
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
|
|
|
|
/* Extract options from the statement node tree */
|
|
ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
|
|
|
|
/* Process the target relation */
|
|
cstate->rel = rel;
|
|
|
|
tupDesc = RelationGetDescr(cstate->rel);
|
|
|
|
/* process common options or initialization */
|
|
|
|
/* Generate or convert list of attributes to process */
|
|
cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
|
|
|
|
num_phys_attrs = tupDesc->natts;
|
|
|
|
/* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
|
|
cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
|
if (cstate->opts.force_notnull)
|
|
{
|
|
List *attnums;
|
|
ListCell *cur;
|
|
|
|
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
|
|
|
|
foreach(cur, attnums)
|
|
{
|
|
int attnum = lfirst_int(cur);
|
|
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
|
|
errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
|
|
NameStr(attr->attname))));
|
|
cstate->opts.force_notnull_flags[attnum - 1] = true;
|
|
}
|
|
}
|
|
|
|
/* Convert FORCE_NULL name list to per-column flags, check validity */
|
|
cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
|
if (cstate->opts.force_null)
|
|
{
|
|
List *attnums;
|
|
ListCell *cur;
|
|
|
|
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
|
|
|
|
foreach(cur, attnums)
|
|
{
|
|
int attnum = lfirst_int(cur);
|
|
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
|
|
errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
|
|
NameStr(attr->attname))));
|
|
cstate->opts.force_null_flags[attnum - 1] = true;
|
|
}
|
|
}
|
|
|
|
/* Convert convert_selectively name list to per-column flags */
|
|
if (cstate->opts.convert_selectively)
|
|
{
|
|
List *attnums;
|
|
ListCell *cur;
|
|
|
|
cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
|
|
|
|
attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
|
|
|
|
foreach(cur, attnums)
|
|
{
|
|
int attnum = lfirst_int(cur);
|
|
Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
|
|
errmsg_internal("selected column \"%s\" not referenced by COPY",
|
|
NameStr(attr->attname))));
|
|
cstate->convert_select_flags[attnum - 1] = true;
|
|
}
|
|
}
|
|
|
|
/* Use client encoding when ENCODING option is not specified. */
|
|
if (cstate->opts.file_encoding < 0)
|
|
cstate->file_encoding = pg_get_client_encoding();
|
|
else
|
|
cstate->file_encoding = cstate->opts.file_encoding;
|
|
|
|
/*
|
|
* Look up encoding conversion function.
|
|
*/
|
|
if (cstate->file_encoding == GetDatabaseEncoding() ||
|
|
cstate->file_encoding == PG_SQL_ASCII ||
|
|
GetDatabaseEncoding() == PG_SQL_ASCII)
|
|
{
|
|
cstate->need_transcoding = false;
|
|
}
|
|
else
|
|
{
|
|
cstate->need_transcoding = true;
|
|
cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
|
|
GetDatabaseEncoding());
|
|
}
|
|
|
|
cstate->copy_src = COPY_FILE; /* default */
|
|
|
|
cstate->whereClause = whereClause;
|
|
|
|
/* Initialize state variables */
|
|
cstate->eol_type = EOL_UNKNOWN;
|
|
cstate->cur_relname = RelationGetRelationName(cstate->rel);
|
|
cstate->cur_lineno = 0;
|
|
cstate->cur_attname = NULL;
|
|
cstate->cur_attval = NULL;
|
|
cstate->relname_only = false;
|
|
|
|
/*
|
|
* Allocate buffers for the input pipeline.
|
|
*
|
|
* attribute_buf and raw_buf are used in both text and binary modes, but
|
|
* input_buf and line_buf only in text mode.
|
|
*/
|
|
cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
|
|
cstate->raw_buf_index = cstate->raw_buf_len = 0;
|
|
cstate->raw_reached_eof = false;
|
|
|
|
if (!cstate->opts.binary)
|
|
{
|
|
/*
|
|
* If encoding conversion is needed, we need another buffer to hold
|
|
* the converted input data. Otherwise, we can just point input_buf
|
|
* to the same buffer as raw_buf.
|
|
*/
|
|
if (cstate->need_transcoding)
|
|
{
|
|
cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
|
|
cstate->input_buf_index = cstate->input_buf_len = 0;
|
|
}
|
|
else
|
|
cstate->input_buf = cstate->raw_buf;
|
|
cstate->input_reached_eof = false;
|
|
|
|
initStringInfo(&cstate->line_buf);
|
|
}
|
|
|
|
initStringInfo(&cstate->attribute_buf);
|
|
|
|
/* Assign range table and rteperminfos, we'll need them in CopyFrom. */
|
|
if (pstate)
|
|
{
|
|
cstate->range_table = pstate->p_rtable;
|
|
cstate->rteperminfos = pstate->p_rteperminfos;
|
|
}
|
|
|
|
tupDesc = RelationGetDescr(cstate->rel);
|
|
num_phys_attrs = tupDesc->natts;
|
|
num_defaults = 0;
|
|
volatile_defexprs = false;
|
|
|
|
/*
|
|
* Pick up the required catalog information for each attribute in the
|
|
* relation, including the input function, the element type (to pass to
|
|
* the input function), and info about defaults and constraints. (Which
|
|
* input function we use depends on text/binary format choice.)
|
|
*/
|
|
in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
|
|
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
|
|
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
|
|
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
|
|
|
|
for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
|
|
{
|
|
Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
|
|
|
|
/* We don't need info for dropped attributes */
|
|
if (att->attisdropped)
|
|
continue;
|
|
|
|
/* Fetch the input function and typioparam info */
|
|
if (cstate->opts.binary)
|
|
getTypeBinaryInputInfo(att->atttypid,
|
|
&in_func_oid, &typioparams[attnum - 1]);
|
|
else
|
|
getTypeInputInfo(att->atttypid,
|
|
&in_func_oid, &typioparams[attnum - 1]);
|
|
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
|
|
|
|
/* Get default info if available */
|
|
defexprs[attnum - 1] = NULL;
|
|
|
|
if (!att->attgenerated)
|
|
{
|
|
Expr *defexpr = (Expr *) build_column_default(cstate->rel,
|
|
attnum);
|
|
|
|
if (defexpr != NULL)
|
|
{
|
|
/* Run the expression through planner */
|
|
defexpr = expression_planner(defexpr);
|
|
|
|
/* Initialize executable expression in copycontext */
|
|
defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
|
|
|
|
/* if NOT copied from input */
|
|
/* use default value if one exists */
|
|
if (!list_member_int(cstate->attnumlist, attnum))
|
|
{
|
|
defmap[num_defaults] = attnum - 1;
|
|
num_defaults++;
|
|
}
|
|
|
|
/*
|
|
* If a default expression looks at the table being loaded,
|
|
* then it could give the wrong answer when using
|
|
* multi-insert. Since database access can be dynamic this is
|
|
* hard to test for exactly, so we use the much wider test of
|
|
* whether the default expression is volatile. We allow for
|
|
* the special case of when the default expression is the
|
|
* nextval() of a sequence which in this specific case is
|
|
* known to be safe for use with the multi-insert
|
|
* optimization. Hence we use this special case function
|
|
* checker rather than the standard check for
|
|
* contain_volatile_functions().
|
|
*/
|
|
if (!volatile_defexprs)
|
|
volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
|
|
}
|
|
}
|
|
}
|
|
|
|
cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
|
|
|
|
/* initialize progress */
|
|
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
|
|
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
|
|
cstate->bytes_processed = 0;
|
|
|
|
/* We keep those variables in cstate. */
|
|
cstate->in_functions = in_functions;
|
|
cstate->typioparams = typioparams;
|
|
cstate->defmap = defmap;
|
|
cstate->defexprs = defexprs;
|
|
cstate->volatile_defexprs = volatile_defexprs;
|
|
cstate->num_defaults = num_defaults;
|
|
cstate->is_program = is_program;
|
|
|
|
if (data_source_cb)
|
|
{
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
|
|
cstate->copy_src = COPY_CALLBACK;
|
|
cstate->data_source_cb = data_source_cb;
|
|
}
|
|
else if (pipe)
|
|
{
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
|
|
Assert(!is_program); /* the grammar does not allow this */
|
|
if (whereToSendOutput == DestRemote)
|
|
ReceiveCopyBegin(cstate);
|
|
else
|
|
cstate->copy_file = stdin;
|
|
}
|
|
else
|
|
{
|
|
cstate->filename = pstrdup(filename);
|
|
|
|
if (cstate->is_program)
|
|
{
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
|
|
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
|
|
if (cstate->copy_file == NULL)
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not execute command \"%s\": %m",
|
|
cstate->filename)));
|
|
}
|
|
else
|
|
{
|
|
struct stat st;
|
|
|
|
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
|
|
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
|
|
if (cstate->copy_file == NULL)
|
|
{
|
|
/* copy errno because ereport subfunctions might change it */
|
|
int save_errno = errno;
|
|
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not open file \"%s\" for reading: %m",
|
|
cstate->filename),
|
|
(save_errno == ENOENT || save_errno == EACCES) ?
|
|
errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
|
|
"You may want a client-side facility such as psql's \\copy.") : 0));
|
|
}
|
|
|
|
if (fstat(fileno(cstate->copy_file), &st))
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not stat file \"%s\": %m",
|
|
cstate->filename)));
|
|
|
|
if (S_ISDIR(st.st_mode))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
|
errmsg("\"%s\" is a directory", cstate->filename)));
|
|
|
|
progress_vals[2] = st.st_size;
|
|
}
|
|
}
|
|
|
|
pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
|
|
|
|
if (cstate->opts.binary)
|
|
{
|
|
/* Read and verify binary header */
|
|
ReceiveCopyBinaryHeader(cstate);
|
|
}
|
|
|
|
/* create workspace for CopyReadAttributes results */
|
|
if (!cstate->opts.binary)
|
|
{
|
|
AttrNumber attr_count = list_length(cstate->attnumlist);
|
|
|
|
cstate->max_fields = attr_count;
|
|
cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
|
|
}
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
return cstate;
|
|
}
|
|
|
|
/*
|
|
* Clean up storage and release resources for COPY FROM.
|
|
*/
|
|
void
|
|
EndCopyFrom(CopyFromState cstate)
|
|
{
|
|
/* No COPY FROM related resources except memory. */
|
|
if (cstate->is_program)
|
|
{
|
|
ClosePipeFromProgram(cstate);
|
|
}
|
|
else
|
|
{
|
|
if (cstate->filename != NULL && FreeFile(cstate->copy_file))
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not close file \"%s\": %m",
|
|
cstate->filename)));
|
|
}
|
|
|
|
pgstat_progress_end_command();
|
|
|
|
MemoryContextDelete(cstate->copycontext);
|
|
pfree(cstate);
|
|
}
|
|
|
|
/*
|
|
* Closes the pipe from an external program, checking the pclose() return code.
|
|
*/
|
|
static void
|
|
ClosePipeFromProgram(CopyFromState cstate)
|
|
{
|
|
int pclose_rc;
|
|
|
|
Assert(cstate->is_program);
|
|
|
|
pclose_rc = ClosePipeStream(cstate->copy_file);
|
|
if (pclose_rc == -1)
|
|
ereport(ERROR,
|
|
(errcode_for_file_access(),
|
|
errmsg("could not close pipe to external command: %m")));
|
|
else if (pclose_rc != 0)
|
|
{
|
|
/*
|
|
* If we ended a COPY FROM PROGRAM before reaching EOF, then it's
|
|
* expectable for the called program to fail with SIGPIPE, and we
|
|
* should not report that as an error. Otherwise, SIGPIPE indicates a
|
|
* problem.
|
|
*/
|
|
if (!cstate->raw_reached_eof &&
|
|
wait_result_is_signal(pclose_rc, SIGPIPE))
|
|
return;
|
|
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
|
|
errmsg("program \"%s\" failed",
|
|
cstate->filename),
|
|
errdetail_internal("%s", wait_result_to_str(pclose_rc))));
|
|
}
|
|
}
|