Fix crashes on plans with multiple Gather (Merge) nodes.
es_query_dsa turns out to be broken by design, because it supposes that there is only one DSA for the whole query, whereas there is actually one per Gather (Merge) node. For now, work around that problem by setting and clearing the pointer around the sections of code that might need it. It's probably a better idea to get rid of es_query_dsa altogether in favor of having each node keep track individually of which DSA is relevant, but that seems like more than we would want to back-patch. Thomas Munro, reviewed and tested by Andreas Seltenreich, Amit Kapila, and by me. Discussion: http://postgr.es/m/CAEepm=1U6as=brnVvMNixEV2tpi8NuyQoTmO8Qef0-VV+=7MDA@mail.gmail.com
This commit is contained in:
parent
7731c32087
commit
fd7c0fa732
@ -330,7 +330,7 @@ EstimateParamExecSpace(EState *estate, Bitmapset *params)
|
|||||||
* parameter array) and then the datum as serialized by datumSerialize().
|
* parameter array) and then the datum as serialized by datumSerialize().
|
||||||
*/
|
*/
|
||||||
static dsa_pointer
|
static dsa_pointer
|
||||||
SerializeParamExecParams(EState *estate, Bitmapset *params)
|
SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
|
||||||
{
|
{
|
||||||
Size size;
|
Size size;
|
||||||
int nparams;
|
int nparams;
|
||||||
@ -341,8 +341,8 @@ SerializeParamExecParams(EState *estate, Bitmapset *params)
|
|||||||
|
|
||||||
/* Allocate enough space for the current parameter values. */
|
/* Allocate enough space for the current parameter values. */
|
||||||
size = EstimateParamExecSpace(estate, params);
|
size = EstimateParamExecSpace(estate, params);
|
||||||
handle = dsa_allocate(estate->es_query_dsa, size);
|
handle = dsa_allocate(area, size);
|
||||||
start_address = dsa_get_address(estate->es_query_dsa, handle);
|
start_address = dsa_get_address(area, handle);
|
||||||
|
|
||||||
/* First write the number of parameters as a 4-byte integer. */
|
/* First write the number of parameters as a 4-byte integer. */
|
||||||
nparams = bms_num_members(params);
|
nparams = bms_num_members(params);
|
||||||
@ -736,12 +736,6 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
|
|||||||
LWTRANCHE_PARALLEL_QUERY_DSA,
|
LWTRANCHE_PARALLEL_QUERY_DSA,
|
||||||
pcxt->seg);
|
pcxt->seg);
|
||||||
|
|
||||||
/*
|
|
||||||
* Make the area available to executor nodes running in the leader.
|
|
||||||
* See also ParallelQueryMain which makes it available to workers.
|
|
||||||
*/
|
|
||||||
estate->es_query_dsa = pei->area;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Serialize parameters, if any, using DSA storage. We don't dare use
|
* Serialize parameters, if any, using DSA storage. We don't dare use
|
||||||
* the main parallel query DSM for this because we might relaunch
|
* the main parallel query DSM for this because we might relaunch
|
||||||
@ -750,7 +744,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
|
|||||||
*/
|
*/
|
||||||
if (!bms_is_empty(sendParams))
|
if (!bms_is_empty(sendParams))
|
||||||
{
|
{
|
||||||
pei->param_exec = SerializeParamExecParams(estate, sendParams);
|
pei->param_exec = SerializeParamExecParams(estate, sendParams,
|
||||||
|
pei->area);
|
||||||
fpes->param_exec = pei->param_exec;
|
fpes->param_exec = pei->param_exec;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -763,7 +758,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
|
|||||||
d.pcxt = pcxt;
|
d.pcxt = pcxt;
|
||||||
d.instrumentation = instrumentation;
|
d.instrumentation = instrumentation;
|
||||||
d.nnodes = 0;
|
d.nnodes = 0;
|
||||||
|
|
||||||
|
/* Install our DSA area while initializing the plan. */
|
||||||
|
estate->es_query_dsa = pei->area;
|
||||||
ExecParallelInitializeDSM(planstate, &d);
|
ExecParallelInitializeDSM(planstate, &d);
|
||||||
|
estate->es_query_dsa = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that the world hasn't shifted under our feet. This could
|
* Make sure that the world hasn't shifted under our feet. This could
|
||||||
@ -832,19 +831,22 @@ ExecParallelReinitialize(PlanState *planstate,
|
|||||||
/* Free any serialized parameters from the last round. */
|
/* Free any serialized parameters from the last round. */
|
||||||
if (DsaPointerIsValid(fpes->param_exec))
|
if (DsaPointerIsValid(fpes->param_exec))
|
||||||
{
|
{
|
||||||
dsa_free(estate->es_query_dsa, fpes->param_exec);
|
dsa_free(pei->area, fpes->param_exec);
|
||||||
fpes->param_exec = InvalidDsaPointer;
|
fpes->param_exec = InvalidDsaPointer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Serialize current parameter values if required. */
|
/* Serialize current parameter values if required. */
|
||||||
if (!bms_is_empty(sendParams))
|
if (!bms_is_empty(sendParams))
|
||||||
{
|
{
|
||||||
pei->param_exec = SerializeParamExecParams(estate, sendParams);
|
pei->param_exec = SerializeParamExecParams(estate, sendParams,
|
||||||
|
pei->area);
|
||||||
fpes->param_exec = pei->param_exec;
|
fpes->param_exec = pei->param_exec;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Traverse plan tree and let each child node reset associated state. */
|
/* Traverse plan tree and let each child node reset associated state. */
|
||||||
|
estate->es_query_dsa = pei->area;
|
||||||
ExecParallelReInitializeDSM(planstate, pei->pcxt);
|
ExecParallelReInitializeDSM(planstate, pei->pcxt);
|
||||||
|
estate->es_query_dsa = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -277,7 +277,13 @@ gather_getnext(GatherState *gatherstate)
|
|||||||
|
|
||||||
if (gatherstate->need_to_scan_locally)
|
if (gatherstate->need_to_scan_locally)
|
||||||
{
|
{
|
||||||
|
EState *estate = gatherstate->ps.state;
|
||||||
|
|
||||||
|
/* Install our DSA area while executing the plan. */
|
||||||
|
estate->es_query_dsa =
|
||||||
|
gatherstate->pei ? gatherstate->pei->area : NULL;
|
||||||
outerTupleSlot = ExecProcNode(outerPlan);
|
outerTupleSlot = ExecProcNode(outerPlan);
|
||||||
|
estate->es_query_dsa = NULL;
|
||||||
|
|
||||||
if (!TupIsNull(outerTupleSlot))
|
if (!TupIsNull(outerTupleSlot))
|
||||||
return outerTupleSlot;
|
return outerTupleSlot;
|
||||||
|
@ -637,8 +637,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
|
|||||||
{
|
{
|
||||||
PlanState *outerPlan = outerPlanState(gm_state);
|
PlanState *outerPlan = outerPlanState(gm_state);
|
||||||
TupleTableSlot *outerTupleSlot;
|
TupleTableSlot *outerTupleSlot;
|
||||||
|
EState *estate = gm_state->ps.state;
|
||||||
|
|
||||||
|
/* Install our DSA area while executing the plan. */
|
||||||
|
estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL;
|
||||||
outerTupleSlot = ExecProcNode(outerPlan);
|
outerTupleSlot = ExecProcNode(outerPlan);
|
||||||
|
estate->es_query_dsa = NULL;
|
||||||
|
|
||||||
if (!TupIsNull(outerTupleSlot))
|
if (!TupIsNull(outerTupleSlot))
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user