Use a more granular approach to follow update chains

Instead of simply checking the KEYS_UPDATED bit, we need to check
whether each lock held on the future version of the tuple conflicts with
the lock we're trying to acquire.

Per bug report  by Tomonari Katsumata
This commit is contained in:
Alvaro Herrera 2013-11-27 17:50:33 -03:00
parent e4828e9ccb
commit 247c76a989

@ -4794,6 +4794,83 @@ l5:
*result_xmax = new_xmax; *result_xmax = new_xmax;
} }
/*
* Subroutine for heap_lock_updated_tuple_rec.
*
* Given an hypothetical multixact status held by the transaction identified
* with the given xid, does the current transaction need to wait, fail, or can
* it continue if it wanted to acquire a lock of the given mode? "needwait"
* is set to true if waiting is necessary; if it can continue, then
* HeapTupleMayBeUpdated is returned. In case of a conflict, a different
* HeapTupleSatisfiesUpdate return code is returned.
*
* The held status is said to be hypothetical because it might correspond to a
* lock held by a single Xid, i.e. not a real MultiXactId; we express it this
* way for simplicity of API.
*/
static HTSU_Result
test_lockmode_for_conflict(MultiXactStatus status, TransactionId xid,
LockTupleMode mode, bool *needwait)
{
MultiXactStatus wantedstatus;
*needwait = false;
wantedstatus = get_mxact_status_for_lock(mode, false);
/*
* Note: we *must* check TransactionIdIsInProgress before
* TransactionIdDidAbort/Commit; see comment at top of tqual.c for an
* explanation.
*/
if (TransactionIdIsCurrentTransactionId(xid))
{
/*
* Updated by our own transaction? Just return failure. This shouldn't
* normally happen.
*/
return HeapTupleSelfUpdated;
}
else if (TransactionIdIsInProgress(xid))
{
/*
* If the locking transaction is running, what we do depends on whether
* the lock modes conflict: if they do, then we must wait for it to
* finish; otherwise we can fall through to lock this tuple version
* without waiting.
*/
if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
LOCKMODE_from_mxstatus(wantedstatus)))
{
*needwait = true;
}
/*
* If we set needwait above, then this value doesn't matter; otherwise,
* this value signals to caller that it's okay to proceed.
*/
return HeapTupleMayBeUpdated;
}
else if (TransactionIdDidAbort(xid))
return HeapTupleMayBeUpdated;
else if (TransactionIdDidCommit(xid))
{
/*
* If the updating transaction committed, what we do depends on whether
* the lock modes conflict: if they do, then we must report error to
* caller. But if they don't, we can fall through to lock it.
*/
if (DoLockModesConflict(LOCKMODE_from_mxstatus(status),
LOCKMODE_from_mxstatus(wantedstatus)))
/* bummer */
return HeapTupleUpdated;
return HeapTupleMayBeUpdated;
}
/* Not in progress, not aborted, not committed -- must have crashed */
return HeapTupleMayBeUpdated;
}
/* /*
* Recursive part of heap_lock_updated_tuple * Recursive part of heap_lock_updated_tuple
@ -4811,7 +4888,8 @@ heap_lock_updated_tuple_rec(Relation rel, ItemPointer tid, TransactionId xid,
Buffer buf; Buffer buf;
uint16 new_infomask, uint16 new_infomask,
new_infomask2, new_infomask2,
old_infomask; old_infomask,
old_infomask2;
TransactionId xmax, TransactionId xmax,
new_xmax; new_xmax;
TransactionId priorXmax = InvalidTransactionId; TransactionId priorXmax = InvalidTransactionId;
@ -4853,45 +4931,107 @@ l4:
} }
old_infomask = mytup.t_data->t_infomask; old_infomask = mytup.t_data->t_infomask;
old_infomask2 = mytup.t_data->t_infomask2;
xmax = HeapTupleHeaderGetRawXmax(mytup.t_data); xmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
/* /*
* If this tuple is updated and the key has been modified (or * If this tuple version has been updated or locked by some concurrent
* deleted), what we do depends on the status of the updating * transaction(s), what we do depends on whether our lock mode
* transaction: if it's live, we sleep until it finishes; if it has * conflicts with what those other transactions hold, and also on the
* committed, we have to fail (i.e. return HeapTupleUpdated); if it * status of them.
* aborted, we ignore it. For updates that didn't touch the key, we
* can just plough ahead.
*/ */
if (!(old_infomask & HEAP_XMAX_INVALID) && if (!(old_infomask & HEAP_XMAX_INVALID))
(mytup.t_data->t_infomask2 & HEAP_KEYS_UPDATED))
{ {
TransactionId update_xid; TransactionId rawxmax;
bool needwait;
/* rawxmax = HeapTupleHeaderGetRawXmax(mytup.t_data);
* Note: we *must* check TransactionIdIsInProgress before if (old_infomask & HEAP_XMAX_IS_MULTI)
* TransactionIdDidAbort/Commit; see comment at top of tqual.c for
* an explanation.
*/
update_xid = HeapTupleHeaderGetUpdateXid(mytup.t_data);
if (TransactionIdIsCurrentTransactionId(update_xid))
{ {
UnlockReleaseBuffer(buf); int nmembers;
return HeapTupleSelfUpdated; int i;
} MultiXactMember *members;
else if (TransactionIdIsInProgress(update_xid))
nmembers = GetMultiXactIdMembers(rawxmax, &members, false, false);
for (i = 0; i < nmembers; i++)
{
HTSU_Result res;
res = test_lockmode_for_conflict(members[i].status,
members[i].xid,
mode, &needwait);
if (needwait)
{ {
LockBuffer(buf, BUFFER_LOCK_UNLOCK); LockBuffer(buf, BUFFER_LOCK_UNLOCK);
/* No LockTupleTuplock here -- see heap_lock_updated_tuple */ XactLockTableWait(members[i].xid);
XactLockTableWait(update_xid); pfree(members);
goto l4; goto l4;
} }
else if (TransactionIdDidAbort(update_xid)) if (res != HeapTupleMayBeUpdated)
; /* okay to proceed */
else if (TransactionIdDidCommit(update_xid))
{ {
UnlockReleaseBuffer(buf); UnlockReleaseBuffer(buf);
return HeapTupleUpdated; pfree(members);
return res;
}
}
if (members)
pfree(members);
}
else
{
HTSU_Result res;
MultiXactStatus status;
/*
* For a non-multi Xmax, we first need to compute the
* corresponding MultiXactStatus by using the infomask bits.
*/
if (HEAP_XMAX_IS_LOCKED_ONLY(old_infomask))
{
if (HEAP_XMAX_IS_KEYSHR_LOCKED(old_infomask))
status = MultiXactStatusForKeyShare;
else if (HEAP_XMAX_IS_SHR_LOCKED(old_infomask))
status = MultiXactStatusForShare;
else if (HEAP_XMAX_IS_EXCL_LOCKED(old_infomask))
{
if (old_infomask2 & HEAP_KEYS_UPDATED)
status = MultiXactStatusForUpdate;
else
status = MultiXactStatusForNoKeyUpdate;
}
else
{
/*
* LOCK_ONLY present alone (a pg_upgraded tuple
* marked as share-locked in the old cluster) shouldn't
* be seen in the middle of an update chain.
*/
elog(ERROR, "invalid lock status in tuple");
}
}
else
{
/* it's an update, but which kind? */
if (old_infomask2 & HEAP_KEYS_UPDATED)
status = MultiXactStatusUpdate;
else
status = MultiXactStatusNoKeyUpdate;
}
res = test_lockmode_for_conflict(status, rawxmax, mode,
&needwait);
if (needwait)
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
XactLockTableWait(rawxmax);
goto l4;
}
if (res != HeapTupleMayBeUpdated)
{
UnlockReleaseBuffer(buf);
return res;
}
} }
} }