diff --git a/src/backend/access/common/printtup.c b/src/backend/access/common/printtup.c index 1939ff5155..d9664aa6c6 100644 --- a/src/backend/access/common/printtup.c +++ b/src/backend/access/common/printtup.c @@ -26,9 +26,9 @@ static void printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo); -static void printtup(TupleTableSlot *slot, DestReceiver *self); -static void printtup_20(TupleTableSlot *slot, DestReceiver *self); -static void printtup_internal_20(TupleTableSlot *slot, DestReceiver *self); +static bool printtup(TupleTableSlot *slot, DestReceiver *self); +static bool printtup_20(TupleTableSlot *slot, DestReceiver *self); +static bool printtup_internal_20(TupleTableSlot *slot, DestReceiver *self); static void printtup_shutdown(DestReceiver *self); static void printtup_destroy(DestReceiver *self); @@ -299,7 +299,7 @@ printtup_prepare_info(DR_printtup *myState, TupleDesc typeinfo, int numAttrs) * printtup --- print a tuple in protocol 3.0 * ---------------- */ -static void +static bool printtup(TupleTableSlot *slot, DestReceiver *self) { TupleDesc typeinfo = slot->tts_tupleDescriptor; @@ -376,13 +376,15 @@ printtup(TupleTableSlot *slot, DestReceiver *self) /* Return to caller's context, and flush row's temporary memory */ MemoryContextSwitchTo(oldcontext); MemoryContextReset(myState->tmpcontext); + + return true; } /* ---------------- * printtup_20 --- print a tuple in protocol 2.0 * ---------------- */ -static void +static bool printtup_20(TupleTableSlot *slot, DestReceiver *self) { TupleDesc typeinfo = slot->tts_tupleDescriptor; @@ -452,6 +454,8 @@ printtup_20(TupleTableSlot *slot, DestReceiver *self) /* Return to caller's context, and flush row's temporary memory */ MemoryContextSwitchTo(oldcontext); MemoryContextReset(myState->tmpcontext); + + return true; } /* ---------------- @@ -528,7 +532,7 @@ debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo) * debugtup - print one tuple for an interactive backend * ---------------- */ -void +bool debugtup(TupleTableSlot *slot, DestReceiver *self) { TupleDesc typeinfo = slot->tts_tupleDescriptor; @@ -553,6 +557,8 @@ debugtup(TupleTableSlot *slot, DestReceiver *self) printatt((unsigned) i + 1, typeinfo->attrs[i], value); } printf("\t----\n"); + + return true; } /* ---------------- @@ -564,7 +570,7 @@ debugtup(TupleTableSlot *slot, DestReceiver *self) * This is largely same as printtup_20, except we use binary formatting. * ---------------- */ -static void +static bool printtup_internal_20(TupleTableSlot *slot, DestReceiver *self) { TupleDesc typeinfo = slot->tts_tupleDescriptor; @@ -636,4 +642,6 @@ printtup_internal_20(TupleTableSlot *slot, DestReceiver *self) /* Return to caller's context, and flush row's temporary memory */ MemoryContextSwitchTo(oldcontext); MemoryContextReset(myState->tmpcontext); + + return true; } diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3201476c9e..28dcd34001 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -4454,7 +4454,7 @@ copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) /* * copy_dest_receive --- receive one tuple */ -static void +static bool copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) { DR_copy *myState = (DR_copy *) self; @@ -4466,6 +4466,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* And send the data */ CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull); myState->processed++; + + return true; } /* diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index cb7a145ee5..5a853c48a8 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -62,7 +62,7 @@ typedef struct static ObjectAddress CreateAsReladdr = {InvalidOid, InvalidOid, 0}; static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); -static void intorel_receive(TupleTableSlot *slot, DestReceiver *self); +static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self); static void intorel_shutdown(DestReceiver *self); static void intorel_destroy(DestReceiver *self); @@ -482,7 +482,7 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) /* * intorel_receive --- receive one tuple */ -static void +static bool intorel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_intorel *myState = (DR_intorel *) self; @@ -507,6 +507,8 @@ intorel_receive(TupleTableSlot *slot, DestReceiver *self) myState->bistate); /* We know this is a newly created relation, so there are no indexes */ + + return true; } /* diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index f00aab39e7..62e61a2674 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -56,7 +56,7 @@ typedef struct static int matview_maintenance_depth = 0; static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo); -static void transientrel_receive(TupleTableSlot *slot, DestReceiver *self); +static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self); static void transientrel_shutdown(DestReceiver *self); static void transientrel_destroy(DestReceiver *self); static void refresh_matview_datafill(DestReceiver *dest, Query *query, @@ -467,7 +467,7 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo) /* * transientrel_receive --- receive one tuple */ -static void +static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self) { DR_transientrel *myState = (DR_transientrel *) self; @@ -486,6 +486,8 @@ transientrel_receive(TupleTableSlot *slot, DestReceiver *self) myState->bistate); /* We know this is a newly created relation, so there are no indexes */ + + return true; } /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index ac0230411c..b5ced388d2 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1593,7 +1593,15 @@ ExecutePlan(EState *estate, * practice, this is probably always the case at this point.) */ if (sendTuples) - (*dest->receiveSlot) (slot, dest); + { + /* + * If we are not able to send the tuple, we assume the destination + * has closed and no more tuples can be sent. If that's the case, + * end the loop. + */ + if (!((*dest->receiveSlot) (slot, dest))) + break; + } /* * Count tuples processed, if this is a SELECT. (For other operation diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 2b81f60a51..533050dc85 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -1266,7 +1266,7 @@ do_tup_output(TupOutputState *tstate, Datum *values, bool *isnull) ExecStoreVirtualTuple(slot); /* send the tuple to the receiver */ - (*tstate->dest->receiveSlot) (slot, tstate->dest); + (void) (*tstate->dest->receiveSlot) (slot, tstate->dest); /* clean up */ ExecClearTuple(slot); diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 6e14c9d296..cd93c045dc 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -167,7 +167,7 @@ static Datum postquel_get_single_result(TupleTableSlot *slot, static void sql_exec_error_callback(void *arg); static void ShutdownSQLFunction(Datum arg); static void sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo); -static void sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self); +static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self); static void sqlfunction_shutdown(DestReceiver *self); static void sqlfunction_destroy(DestReceiver *self); @@ -1904,7 +1904,7 @@ sqlfunction_startup(DestReceiver *self, int operation, TupleDesc typeinfo) /* * sqlfunction_receive --- receive one tuple */ -static void +static bool sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) { DR_sqlfunction *myState = (DR_sqlfunction *) self; @@ -1914,6 +1914,8 @@ sqlfunction_receive(TupleTableSlot *slot, DestReceiver *self) /* Store the filtered tuple into the tuplestore */ tuplestore_puttupleslot(myState->tstore, slot); + + return true; } /* diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 23cb6f407d..7ccabdb44b 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -1774,7 +1774,7 @@ spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo) * store tuple retrieved by Executor into SPITupleTable * of current SPI procedure */ -void +bool spi_printtup(TupleTableSlot *slot, DestReceiver *self) { SPITupleTable *tuptable; @@ -1809,6 +1809,8 @@ spi_printtup(TupleTableSlot *slot, DestReceiver *self) (tuptable->free)--; MemoryContextSwitchTo(oldcxt); + + return true; } /* diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index 383b5352cb..8abb1f16e4 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -115,12 +115,13 @@ static RemapInfo *BuildRemapInfo(TupleDesc tupledesc); * type over a range type over a range type over an array type over a record, * or something like that. */ -static void +static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) { TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; TupleDesc tupledesc = slot->tts_tupleDescriptor; HeapTuple tuple; + shm_mq_result result; /* * Test to see whether the tupledesc has changed; if so, set up for the @@ -195,7 +196,16 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) } /* Send the tuple itself. */ - shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + + if (result == SHM_MQ_DETACHED) + return false; + else if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to send tuples"))); + + return true; } /* diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c index 516440ad32..8f1e1b3f50 100644 --- a/src/backend/executor/tstoreReceiver.c +++ b/src/backend/executor/tstoreReceiver.c @@ -37,8 +37,8 @@ typedef struct } TStoreState; -static void tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); -static void tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); +static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self); +static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self); /* @@ -90,19 +90,21 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) * Receive a tuple from the executor and store it in the tuplestore. * This is for the easy case where we don't have to detoast. */ -static void +static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self) { TStoreState *myState = (TStoreState *) self; tuplestore_puttupleslot(myState->tstore, slot); + + return true; } /* * Receive a tuple from the executor and store it in the tuplestore. * This is for the case where we have to detoast any toasted values. */ -static void +static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) { TStoreState *myState = (TStoreState *) self; @@ -152,6 +154,8 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self) /* And release any temporary detoasted values */ for (i = 0; i < nfree; i++) pfree(DatumGetPointer(myState->tofree[i])); + + return true; } /* diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 2c7dc6e526..de45cbc4fb 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -45,9 +45,10 @@ * dummy DestReceiver functions * ---------------- */ -static void +static bool donothingReceive(TupleTableSlot *slot, DestReceiver *self) { + return true; } static void diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index fcdc4c347c..3f6cb12b4e 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -1109,7 +1109,13 @@ RunFromStore(Portal portal, ScanDirection direction, uint64 count, if (!ok) break; - (*dest->receiveSlot) (slot, dest); + /* + * If we are not able to send the tuple, we assume the destination + * has closed and no more tuples can be sent. If that's the case, + * end the loop. + */ + if (!((*dest->receiveSlot) (slot, dest))) + break; ExecClearTuple(slot); diff --git a/src/include/access/printtup.h b/src/include/access/printtup.h index 64dde01cd1..608c564287 100644 --- a/src/include/access/printtup.h +++ b/src/include/access/printtup.h @@ -25,11 +25,11 @@ extern void SendRowDescriptionMessage(TupleDesc typeinfo, List *targetlist, extern void debugStartup(DestReceiver *self, int operation, TupleDesc typeinfo); -extern void debugtup(TupleTableSlot *slot, DestReceiver *self); +extern bool debugtup(TupleTableSlot *slot, DestReceiver *self); /* XXX these are really in executor/spi.c */ extern void spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo); -extern void spi_printtup(TupleTableSlot *slot, DestReceiver *self); +extern bool spi_printtup(TupleTableSlot *slot, DestReceiver *self); #endif /* PRINTTUP_H */ diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 4e42d61c37..dd80433f74 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -104,7 +104,9 @@ typedef enum * pointers that the executor must call. * * Note: the receiveSlot routine must be passed a slot containing a TupleDesc - * identical to the one given to the rStartup routine. + * identical to the one given to the rStartup routine. It returns bool where + * a "true" value means "continue processing" and a "false" value means + * "stop early, just as if we'd reached the end of the scan". * ---------------- */ typedef struct _DestReceiver DestReceiver; @@ -112,7 +114,7 @@ typedef struct _DestReceiver DestReceiver; struct _DestReceiver { /* Called for each tuple to be output: */ - void (*receiveSlot) (TupleTableSlot *slot, + bool (*receiveSlot) (TupleTableSlot *slot, DestReceiver *self); /* Per-executor-run initialization and shutdown: */ void (*rStartup) (DestReceiver *self,