mirror of https://github.com/postgres/postgres
Fix race in parallel hash join batch cleanup, take II.
With unlucky timing and parallel_leader_participation=off (not the
default), PHJ could attempt to access per-batch shared state just as it
was being freed. There was code intended to prevent that by checking
for a cleared pointer, but it was racy. Fix, by introducing an extra
barrier phase. The new phase PHJ_BUILD_RUNNING means that it's safe to
access the per-batch state to find a batch to help with, and
PHJ_BUILD_DONE means that it is too late. The last to detach will free
the array of per-batch state as before, but now it will also atomically
advance the phase, so that late attachers can avoid the hazard. This
mirrors the way per-batch hash tables are freed (see phases
PHJ_BATCH_PROBING and PHJ_BATCH_DONE).
An earlier attempt to fix this (commit 3b8981b6
, later reverted) missed
one special case. When the inner side is empty (the "empty inner
optimization), the build barrier would only make it to
PHJ_BUILD_HASHING_INNER phase before workers attempted to detach from
the hashtable. In that case, fast-forward the build barrier to
PHJ_BUILD_RUNNING before proceeding, so that our later assertions hold
and we can still negotiate who is cleaning up.
Revealed by build farm failures, where BarrierAttach() failed a sanity
check assertion, because the memory had been clobbered by dsa_free().
In non-assert builds, the result could be a segmentation fault.
Back-patch to all supported releases.
Author: Thomas Munro <thomas.munro@gmail.com>
Author: Melanie Plageman <melanieplageman@gmail.com>
Reported-by: Michael Paquier <michael@paquier.xyz>
Reported-by: David Geier <geidav.pg@gmail.com>
Tested-by: David Geier <geidav.pg@gmail.com>
Discussion: https://postgr.es/m/20200929061142.GA29096%40paquier.xyz
This commit is contained in:
parent
ef719e7b32
commit
8d578b9b2e
|
@ -333,14 +333,21 @@ MultiExecParallelHash(HashState *node)
|
||||||
hashtable->nbuckets = pstate->nbuckets;
|
hashtable->nbuckets = pstate->nbuckets;
|
||||||
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
|
hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
|
||||||
hashtable->totalTuples = pstate->total_tuples;
|
hashtable->totalTuples = pstate->total_tuples;
|
||||||
ExecParallelHashEnsureBatchAccessors(hashtable);
|
|
||||||
|
/*
|
||||||
|
* Unless we're completely done and the batch state has been freed, make
|
||||||
|
* sure we have accessors.
|
||||||
|
*/
|
||||||
|
if (BarrierPhase(build_barrier) < PHJ_BUILD_DONE)
|
||||||
|
ExecParallelHashEnsureBatchAccessors(hashtable);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
|
* The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
|
||||||
* case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
|
* case, which will bring the build phase to PHJ_BUILD_RUNNING (if it
|
||||||
* there already).
|
* isn't there already).
|
||||||
*/
|
*/
|
||||||
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
|
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
|
||||||
|
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
|
||||||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
|
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -620,7 +627,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
|
||||||
/*
|
/*
|
||||||
* The next Parallel Hash synchronization point is in
|
* The next Parallel Hash synchronization point is in
|
||||||
* MultiExecParallelHash(), which will progress it all the way to
|
* MultiExecParallelHash(), which will progress it all the way to
|
||||||
* PHJ_BUILD_DONE. The caller must not return control from this
|
* PHJ_BUILD_RUNNING. The caller must not return control from this
|
||||||
* executor node between now and then.
|
* executor node between now and then.
|
||||||
*/
|
*/
|
||||||
}
|
}
|
||||||
|
@ -3054,14 +3061,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* It's possible for a backend to start up very late so that the whole
|
* We should never see a state where the batch-tracking array is freed,
|
||||||
* join is finished and the shm state for tracking batches has already
|
* because we should have given up sooner if we join when the build
|
||||||
* been freed by ExecHashTableDetach(). In that case we'll just leave
|
* barrier has reached the PHJ_BUILD_DONE phase.
|
||||||
* hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
|
|
||||||
* up early.
|
|
||||||
*/
|
*/
|
||||||
if (!DsaPointerIsValid(pstate->batches))
|
Assert(DsaPointerIsValid(pstate->batches));
|
||||||
return;
|
|
||||||
|
|
||||||
/* Use hash join memory context. */
|
/* Use hash join memory context. */
|
||||||
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
|
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
|
||||||
|
@ -3181,9 +3185,18 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
|
||||||
void
|
void
|
||||||
ExecHashTableDetach(HashJoinTable hashtable)
|
ExecHashTableDetach(HashJoinTable hashtable)
|
||||||
{
|
{
|
||||||
if (hashtable->parallel_state)
|
ParallelHashJoinState *pstate = hashtable->parallel_state;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we're involved in a parallel query, we must either have gotten all
|
||||||
|
* the way to PHJ_BUILD_RUNNING, or joined too late and be in
|
||||||
|
* PHJ_BUILD_DONE.
|
||||||
|
*/
|
||||||
|
Assert(!pstate ||
|
||||||
|
BarrierPhase(&pstate->build_barrier) >= PHJ_BUILD_RUNNING);
|
||||||
|
|
||||||
|
if (pstate && BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_RUNNING)
|
||||||
{
|
{
|
||||||
ParallelHashJoinState *pstate = hashtable->parallel_state;
|
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
/* Make sure any temporary files are closed. */
|
/* Make sure any temporary files are closed. */
|
||||||
|
@ -3199,17 +3212,22 @@ ExecHashTableDetach(HashJoinTable hashtable)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* If we're last to detach, clean up shared memory. */
|
/* If we're last to detach, clean up shared memory. */
|
||||||
if (BarrierDetach(&pstate->build_barrier))
|
if (BarrierArriveAndDetach(&pstate->build_barrier))
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* Late joining processes will see this state and give up
|
||||||
|
* immediately.
|
||||||
|
*/
|
||||||
|
Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_DONE);
|
||||||
|
|
||||||
if (DsaPointerIsValid(pstate->batches))
|
if (DsaPointerIsValid(pstate->batches))
|
||||||
{
|
{
|
||||||
dsa_free(hashtable->area, pstate->batches);
|
dsa_free(hashtable->area, pstate->batches);
|
||||||
pstate->batches = InvalidDsaPointer;
|
pstate->batches = InvalidDsaPointer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
hashtable->parallel_state = NULL;
|
|
||||||
}
|
}
|
||||||
|
hashtable->parallel_state = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -45,7 +45,8 @@
|
||||||
* PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
|
* PHJ_BUILD_ALLOCATING -- one sets up the batches and table 0
|
||||||
* PHJ_BUILD_HASHING_INNER -- all hash the inner rel
|
* PHJ_BUILD_HASHING_INNER -- all hash the inner rel
|
||||||
* PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
|
* PHJ_BUILD_HASHING_OUTER -- (multi-batch only) all hash the outer
|
||||||
* PHJ_BUILD_DONE -- building done, probing can begin
|
* PHJ_BUILD_RUNNING -- building done, probing can begin
|
||||||
|
* PHJ_BUILD_DONE -- all work complete, one frees batches
|
||||||
*
|
*
|
||||||
* While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
|
* While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
|
||||||
* be used repeatedly as required to coordinate expansions in the number of
|
* be used repeatedly as required to coordinate expansions in the number of
|
||||||
|
@ -73,7 +74,7 @@
|
||||||
* batches whenever it encounters them while scanning and probing, which it
|
* batches whenever it encounters them while scanning and probing, which it
|
||||||
* can do because it processes batches in serial order.
|
* can do because it processes batches in serial order.
|
||||||
*
|
*
|
||||||
* Once PHJ_BUILD_DONE is reached, backends then split up and process
|
* Once PHJ_BUILD_RUNNING is reached, backends then split up and process
|
||||||
* different batches, or gang up and work together on probing batches if there
|
* different batches, or gang up and work together on probing batches if there
|
||||||
* aren't enough to go around. For each batch there is a separate barrier
|
* aren't enough to go around. For each batch there is a separate barrier
|
||||||
* with the following phases:
|
* with the following phases:
|
||||||
|
@ -95,11 +96,16 @@
|
||||||
*
|
*
|
||||||
* To avoid deadlocks, we never wait for any barrier unless it is known that
|
* To avoid deadlocks, we never wait for any barrier unless it is known that
|
||||||
* all other backends attached to it are actively executing the node or have
|
* all other backends attached to it are actively executing the node or have
|
||||||
* already arrived. Practically, that means that we never return a tuple
|
* finished. Practically, that means that we never emit a tuple while attached
|
||||||
* while attached to a barrier, unless the barrier has reached its final
|
* to a barrier, unless the barrier has reached a phase that means that no
|
||||||
* state. In the slightly special case of the per-batch barrier, we return
|
* process will wait on it again. We emit tuples while attached to the build
|
||||||
* tuples while in PHJ_BATCH_PROBING phase, but that's OK because we use
|
* barrier in phase PHJ_BUILD_RUNNING, and to a per-batch barrier in phase
|
||||||
* BarrierArriveAndDetach() to advance it to PHJ_BATCH_DONE without waiting.
|
* PHJ_BATCH_PROBING. These are advanced to PHJ_BUILD_DONE and PHJ_BATCH_DONE
|
||||||
|
* respectively without waiting, using BarrierArriveAndDetach(). The last to
|
||||||
|
* detach receives a different return value so that it knows that it's safe to
|
||||||
|
* clean up. Any straggler process that attaches after that phase is reached
|
||||||
|
* will see that it's too late to participate or access the relevant shared
|
||||||
|
* memory objects.
|
||||||
*
|
*
|
||||||
*-------------------------------------------------------------------------
|
*-------------------------------------------------------------------------
|
||||||
*/
|
*/
|
||||||
|
@ -296,7 +302,21 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
|
||||||
* outer relation.
|
* outer relation.
|
||||||
*/
|
*/
|
||||||
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
|
if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
|
||||||
|
{
|
||||||
|
if (parallel)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Advance the build barrier to PHJ_BUILD_RUNNING
|
||||||
|
* before proceeding so we can negotiate resource
|
||||||
|
* cleanup.
|
||||||
|
*/
|
||||||
|
Barrier *build_barrier = ¶llel_state->build_barrier;
|
||||||
|
|
||||||
|
while (BarrierPhase(build_barrier) < PHJ_BUILD_RUNNING)
|
||||||
|
BarrierArriveAndWait(build_barrier, 0);
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* need to remember whether nbatch has increased since we
|
* need to remember whether nbatch has increased since we
|
||||||
|
@ -317,6 +337,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
|
||||||
|
|
||||||
build_barrier = ¶llel_state->build_barrier;
|
build_barrier = ¶llel_state->build_barrier;
|
||||||
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
|
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
|
||||||
|
BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING ||
|
||||||
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
|
BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
|
||||||
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
|
if (BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER)
|
||||||
{
|
{
|
||||||
|
@ -329,9 +350,18 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
|
||||||
BarrierArriveAndWait(build_barrier,
|
BarrierArriveAndWait(build_barrier,
|
||||||
WAIT_EVENT_HASH_BUILD_HASH_OUTER);
|
WAIT_EVENT_HASH_BUILD_HASH_OUTER);
|
||||||
}
|
}
|
||||||
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
|
else if (BarrierPhase(build_barrier) == PHJ_BUILD_DONE)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If we attached so late that the job is finished and
|
||||||
|
* the batch state has been freed, we can return
|
||||||
|
* immediately.
|
||||||
|
*/
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Each backend should now select a batch to work on. */
|
/* Each backend should now select a batch to work on. */
|
||||||
|
Assert(BarrierPhase(build_barrier) == PHJ_BUILD_RUNNING);
|
||||||
hashtable->curbatch = -1;
|
hashtable->curbatch = -1;
|
||||||
node->hj_JoinState = HJ_NEED_NEW_BATCH;
|
node->hj_JoinState = HJ_NEED_NEW_BATCH;
|
||||||
|
|
||||||
|
@ -1090,14 +1120,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
|
||||||
int start_batchno;
|
int start_batchno;
|
||||||
int batchno;
|
int batchno;
|
||||||
|
|
||||||
/*
|
|
||||||
* If we started up so late that the batch tracking array has been freed
|
|
||||||
* already by ExecHashTableDetach(), then we are finished. See also
|
|
||||||
* ExecParallelHashEnsureBatchAccessors().
|
|
||||||
*/
|
|
||||||
if (hashtable->batches == NULL)
|
|
||||||
return false;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If we were already attached to a batch, remember not to bother checking
|
* If we were already attached to a batch, remember not to bother checking
|
||||||
* it again, and detach from it (possibly freeing the hash table if we are
|
* it again, and detach from it (possibly freeing the hash table if we are
|
||||||
|
|
|
@ -258,7 +258,8 @@ typedef struct ParallelHashJoinState
|
||||||
#define PHJ_BUILD_ALLOCATING 1
|
#define PHJ_BUILD_ALLOCATING 1
|
||||||
#define PHJ_BUILD_HASHING_INNER 2
|
#define PHJ_BUILD_HASHING_INNER 2
|
||||||
#define PHJ_BUILD_HASHING_OUTER 3
|
#define PHJ_BUILD_HASHING_OUTER 3
|
||||||
#define PHJ_BUILD_DONE 4
|
#define PHJ_BUILD_RUNNING 4
|
||||||
|
#define PHJ_BUILD_DONE 5
|
||||||
|
|
||||||
/* The phases for probing each batch, used by for batch_barrier. */
|
/* The phases for probing each batch, used by for batch_barrier. */
|
||||||
#define PHJ_BATCH_ELECTING 0
|
#define PHJ_BATCH_ELECTING 0
|
||||||
|
|
Loading…
Reference in New Issue