diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile index fca9a24a57..37cf81b1aa 100644 --- a/src/backend/storage/lmgr/Makefile +++ b/src/backend/storage/lmgr/Makefile @@ -4,7 +4,7 @@ # Makefile for storage/lmgr # # IDENTIFICATION -# $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.14 2000/08/31 16:10:36 petere Exp $ +# $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.15 2001/01/25 03:31:16 tgl Exp $ # #------------------------------------------------------------------------- @@ -12,7 +12,7 @@ subdir = src/backend/storage/lmgr top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = lmgr.o lock.o proc.o +OBJS = lmgr.o lock.o proc.o deadlock.o all: SUBSYS.o diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README index af9fbc8421..72e0d16f12 100644 --- a/src/backend/storage/lmgr/README +++ b/src/backend/storage/lmgr/README @@ -1,4 +1,4 @@ -$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $ +$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.7 2001/01/25 03:31:16 tgl Exp $ There are two fundamental lock structures: the per-lockable-object LOCK struct, and the per-lock-holder HOLDER struct. A LOCK object exists @@ -373,7 +373,8 @@ time with "C before B", which won't move C far enough up. So we look for soft edges outgoing from C starting at the front of the wait queue. 5. The working data structures needed by the deadlock detection code can -be proven not to need more than MAXBACKENDS entries. Therefore the -working storage can be statically allocated instead of depending on -palloc(). This is a good thing, since if the deadlock detector could -fail for extraneous reasons, all the above safety proofs fall down. +be limited to numbers of entries computed from MaxBackends. Therefore, +we can allocate the worst-case space needed during backend startup. +This seems a safer approach than trying to allocate workspace on the fly; +we don't want to risk having the deadlock detector run out of memory, +else we really have no guarantees at all that deadlock will be detected. diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c new file mode 100644 index 0000000000..aae635a6cc --- /dev/null +++ b/src/backend/storage/lmgr/deadlock.c @@ -0,0 +1,734 @@ +/*------------------------------------------------------------------------- + * + * deadlock.c + * POSTGRES deadlock detection code + * + * See src/backend/storage/lmgr/README for a description of the deadlock + * detection and resolution algorithms. + * + * + * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.1 2001/01/25 03:31:16 tgl Exp $ + * + * Interface: + * + * DeadLockCheck() + * InitDeadLockChecking() + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/proc.h" +#include "utils/memutils.h" + + +/* One edge in the waits-for graph */ +typedef struct { + PROC *waiter; /* the waiting process */ + PROC *blocker; /* the process it is waiting for */ + int pred; /* workspace for TopoSort */ + int link; /* workspace for TopoSort */ +} EDGE; + +/* One potential reordering of a lock's wait queue */ +typedef struct { + LOCK *lock; /* the lock whose wait queue is described */ + PROC **procs; /* array of PROC *'s in new wait order */ + int nProcs; +} WAIT_ORDER; + + +static bool DeadLockCheckRecurse(PROC *proc); +static bool TestConfiguration(PROC *startProc); +static bool FindLockCycle(PROC *checkProc, + EDGE *softEdges, int *nSoftEdges); +static bool FindLockCycleRecurse(PROC *checkProc, + EDGE *softEdges, int *nSoftEdges); +static bool ExpandConstraints(EDGE *constraints, int nConstraints); +static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints, + PROC **ordering); +#ifdef DEBUG_DEADLOCK +static void PrintLockQueue(LOCK *lock, const char *info); +#endif + + +/* + * Working space for the deadlock detector + */ + +/* Workspace for FindLockCycle */ +static PROC **visitedProcs; /* Array of visited procs */ +static int nVisitedProcs; +/* Workspace for TopoSort */ +static PROC **topoProcs; /* Array of not-yet-output procs */ +static int *beforeConstraints; /* Counts of remaining before-constraints */ +static int *afterConstraints; /* List head for after-constraints */ +/* Output area for ExpandConstraints */ +static WAIT_ORDER *waitOrders; /* Array of proposed queue rearrangements */ +static int nWaitOrders; +static PROC **waitOrderProcs; /* Space for waitOrders queue contents */ +/* Current list of constraints being considered */ +static EDGE *curConstraints; +static int nCurConstraints; +static int maxCurConstraints; +/* Storage space for results from FindLockCycle */ +static EDGE *possibleConstraints; +static int nPossibleConstraints; +static int maxPossibleConstraints; + + +/* + * InitDeadLockChecking -- initialize deadlock checker during backend startup + * + * This does per-backend initialization of the deadlock checker; primarily, + * allocation of working memory for DeadLockCheck. We do this per-backend + * since there's no percentage in making the kernel do copy-on-write + * inheritance of workspace from the postmaster. We want to allocate the + * space at startup because the deadlock checker might be invoked when there's + * no free memory left. + */ +void +InitDeadLockChecking(void) +{ + MemoryContext oldcxt; + + /* Make sure allocations are permanent */ + oldcxt = MemoryContextSwitchTo(TopMemoryContext); + + /* + * FindLockCycle needs at most MaxBackends entries in visitedProcs[] + */ + visitedProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *)); + + /* + * TopoSort needs to consider at most MaxBackends wait-queue entries, + * and it needn't run concurrently with FindLockCycle. + */ + topoProcs = visitedProcs; /* re-use this space */ + beforeConstraints = (int *) palloc(MaxBackends * sizeof(int)); + afterConstraints = (int *) palloc(MaxBackends * sizeof(int)); + + /* + * We need to consider rearranging at most MaxBackends/2 wait queues + * (since it takes at least two waiters in a queue to create a soft edge), + * and the expanded form of the wait queues can't involve more than + * MaxBackends total waiters. + */ + waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER)); + waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *)); + + /* + * Allow at most MaxBackends distinct constraints in a configuration. + * (Is this enough? In practice it seems it should be, but I don't quite + * see how to prove it. If we run out, we might fail to find a workable + * wait queue rearrangement even though one exists.) NOTE that this + * number limits the maximum recursion depth of DeadLockCheckRecurse. + * Making it really big might potentially allow a stack-overflow problem. + */ + maxCurConstraints = MaxBackends; + curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE)); + + /* + * Allow up to 3*MaxBackends constraints to be saved without having to + * re-run TestConfiguration. (This is probably more than enough, but + * we can survive if we run low on space by doing excess runs of + * TestConfiguration to re-compute constraint lists each time needed.) + * The last MaxBackends entries in possibleConstraints[] are reserved as + * output workspace for FindLockCycle. + */ + maxPossibleConstraints = MaxBackends * 4; + possibleConstraints = + (EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE)); + + MemoryContextSwitchTo(oldcxt); +} + +/* + * DeadLockCheck -- Checks for deadlocks for a given process + * + * This code looks for deadlocks involving the given process. If any + * are found, it tries to rearrange lock wait queues to resolve the + * deadlock. If resolution is impossible, return TRUE --- the caller + * is then expected to abort the given proc's transaction. + * + * We can't block on user locks, so no sense testing for deadlock + * because there is no blocking, and no timer for the block. So, + * only look at regular locks. + * + * We must have already locked the master lock before being called. + * NOTE: although the lockctl structure appears to allow each lock + * table to have a different spinlock, all locks that can block had + * better use the same spinlock, else this code will not be adequately + * interlocked! + */ +bool +DeadLockCheck(PROC *proc) +{ + int i, + j; + + /* Initialize to "no constraints" */ + nCurConstraints = 0; + nPossibleConstraints = 0; + nWaitOrders = 0; + + /* Search for deadlocks and possible fixes */ + if (DeadLockCheckRecurse(proc)) + return true; /* cannot find a non-deadlocked state */ + + /* Apply any needed rearrangements of wait queues */ + for (i = 0; i < nWaitOrders; i++) + { + LOCK *lock = waitOrders[i].lock; + PROC **procs = waitOrders[i].procs; + int nProcs = waitOrders[i].nProcs; + PROC_QUEUE *waitQueue = &(lock->waitProcs); + + Assert(nProcs == waitQueue->size); + +#ifdef DEBUG_DEADLOCK + PrintLockQueue(lock, "DeadLockCheck:"); +#endif + + /* Reset the queue and re-add procs in the desired order */ + ProcQueueInit(waitQueue); + for (j = 0; j < nProcs; j++) + { + SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links)); + waitQueue->size++; + } + +#ifdef DEBUG_DEADLOCK + PrintLockQueue(lock, "rearranged to:"); +#endif + } + return false; +} + +/* + * DeadLockCheckRecurse -- recursively search for valid orderings + * + * curConstraints[] holds the current set of constraints being considered + * by an outer level of recursion. Add to this each possible solution + * constraint for any cycle detected at this level. + * + * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free + * state is attainable, in which case waitOrders[] shows the required + * rearrangements of lock wait queues (if any). + */ +static bool +DeadLockCheckRecurse(PROC *proc) +{ + int nEdges; + int oldPossibleConstraints; + bool savedList; + int i; + + nEdges = TestConfiguration(proc); + if (nEdges < 0) + return true; /* hard deadlock --- no solution */ + if (nEdges == 0) + return false; /* good configuration found */ + if (nCurConstraints >= maxCurConstraints) + return true; /* out of room for active constraints? */ + oldPossibleConstraints = nPossibleConstraints; + if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints) + { + /* We can save the edge list in possibleConstraints[] */ + nPossibleConstraints += nEdges; + savedList = true; + } + else + { + /* Not room; will need to regenerate the edges on-the-fly */ + savedList = false; + } + /* + * Try each available soft edge as an addition to the configuration. + */ + for (i = 0; i < nEdges; i++) + { + if (!savedList && i > 0) + { + /* Regenerate the list of possible added constraints */ + if (nEdges != TestConfiguration(proc)) + elog(FATAL, "DeadLockCheckRecurse: inconsistent results"); + } + curConstraints[nCurConstraints] = + possibleConstraints[oldPossibleConstraints+i]; + nCurConstraints++; + if (!DeadLockCheckRecurse(proc)) + return false; /* found a valid solution! */ + /* give up on that added constraint, try again */ + nCurConstraints--; + } + nPossibleConstraints = oldPossibleConstraints; + return true; /* no solution found */ +} + + +/*-------------------- + * Test a configuration (current set of constraints) for validity. + * + * Returns: + * 0: the configuration is good (no deadlocks) + * -1: the configuration has a hard deadlock or is not self-consistent + * >0: the configuration has one or more soft deadlocks + * + * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily + * and a list of its soft edges is returned beginning at + * possibleConstraints+nPossibleConstraints. The return value is the + * number of soft edges. + *-------------------- + */ +static bool +TestConfiguration(PROC *startProc) +{ + int softFound = 0; + EDGE *softEdges = possibleConstraints + nPossibleConstraints; + int nSoftEdges; + int i; + + /* + * Make sure we have room for FindLockCycle's output. + */ + if (nPossibleConstraints + MaxBackends > maxPossibleConstraints) + return -1; + /* + * Expand current constraint set into wait orderings. Fail if the + * constraint set is not self-consistent. + */ + if (!ExpandConstraints(curConstraints, nCurConstraints)) + return -1; + /* + * Check for cycles involving startProc or any of the procs mentioned + * in constraints. We check startProc last because if it has a soft + * cycle still to be dealt with, we want to deal with that first. + */ + for (i = 0; i < nCurConstraints; i++) + { + if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges)) + { + if (nSoftEdges == 0) + return -1; /* hard deadlock detected */ + softFound = nSoftEdges; + } + if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges)) + { + if (nSoftEdges == 0) + return -1; /* hard deadlock detected */ + softFound = nSoftEdges; + } + } + if (FindLockCycle(startProc, softEdges, &nSoftEdges)) + { + if (nSoftEdges == 0) + return -1; /* hard deadlock detected */ + softFound = nSoftEdges; + } + return softFound; +} + + +/* + * FindLockCycle -- basic check for deadlock cycles + * + * Scan outward from the given proc to see if there is a cycle in the + * waits-for graph that includes this proc. Return TRUE if a cycle + * is found, else FALSE. If a cycle is found, we also return a list of + * the "soft edges", if any, included in the cycle. These edges could + * potentially be eliminated by rearranging wait queues. + * + * Since we need to be able to check hypothetical configurations that would + * exist after wait queue rearrangement, the routine pays attention to the + * table of hypothetical queue orders in waitOrders[]. These orders will + * be believed in preference to the actual ordering seen in the locktable. + */ +static bool +FindLockCycle(PROC *checkProc, + EDGE *softEdges, /* output argument */ + int *nSoftEdges) /* output argument */ +{ + nVisitedProcs = 0; + *nSoftEdges = 0; + return FindLockCycleRecurse(checkProc, softEdges, nSoftEdges); +} + +static bool +FindLockCycleRecurse(PROC *checkProc, + EDGE *softEdges, /* output argument */ + int *nSoftEdges) /* output argument */ +{ + PROC *proc; + LOCK *lock; + HOLDER *holder; + SHM_QUEUE *lockHolders; + LOCKMETHODTABLE *lockMethodTable; + LOCKMETHODCTL *lockctl; + PROC_QUEUE *waitQueue; + int queue_size; + int conflictMask; + int i; + int numLockModes, + lm; + + /* + * Have we already seen this proc? + */ + for (i = 0; i < nVisitedProcs; i++) + { + if (visitedProcs[i] == checkProc) + { + /* If we return to starting point, we have a deadlock cycle */ + if (i == 0) + return true; + /* + * Otherwise, we have a cycle but it does not include the start + * point, so say "no deadlock". + */ + return false; + } + } + /* Mark proc as seen */ + Assert(nVisitedProcs < MaxBackends); + visitedProcs[nVisitedProcs++] = checkProc; + /* + * If the proc is not waiting, we have no outgoing waits-for edges. + */ + if (checkProc->links.next == INVALID_OFFSET) + return false; + lock = checkProc->waitLock; + if (lock == NULL) + return false; + lockMethodTable = GetLocksMethodTable(lock); + lockctl = lockMethodTable->ctl; + numLockModes = lockctl->numLockModes; + conflictMask = lockctl->conflictTab[checkProc->waitLockMode]; + /* + * Scan for procs that already hold conflicting locks. These are + * "hard" edges in the waits-for graph. + */ + lockHolders = &(lock->lockHolders); + + holder = (HOLDER *) SHMQueueNext(lockHolders, lockHolders, + offsetof(HOLDER, lockLink)); + + while (holder) + { + proc = (PROC *) MAKE_PTR(holder->tag.proc); + + /* A proc never blocks itself */ + if (proc != checkProc) + { + for (lm = 1; lm <= numLockModes; lm++) + { + if (holder->holding[lm] > 0 && + ((1 << lm) & conflictMask) != 0) + { + /* This proc hard-blocks checkProc */ + if (FindLockCycleRecurse(proc, softEdges, nSoftEdges)) + return true; + /* If no deadlock, we're done looking at this holder */ + break; + } + } + } + + holder = (HOLDER *) SHMQueueNext(lockHolders, &holder->lockLink, + offsetof(HOLDER, lockLink)); + } + + /* + * Scan for procs that are ahead of this one in the lock's wait queue. + * Those that have conflicting requests soft-block this one. This must + * be done after the hard-block search, since if another proc both + * hard- and soft-blocks this one, we want to call it a hard edge. + * + * If there is a proposed re-ordering of the lock's wait order, + * use that rather than the current wait order. + */ + for (i = 0; i < nWaitOrders; i++) + { + if (waitOrders[i].lock == lock) + break; + } + + if (i < nWaitOrders) + { + /* Use the given hypothetical wait queue order */ + PROC **procs = waitOrders[i].procs; + + queue_size = waitOrders[i].nProcs; + + for (i = 0; i < queue_size; i++) + { + proc = procs[i]; + + /* Done when we reach the target proc */ + if (proc == checkProc) + break; + + /* Is there a conflict with this guy's request? */ + if (((1 << proc->waitLockMode) & conflictMask) != 0) + { + /* This proc soft-blocks checkProc */ + if (FindLockCycleRecurse(proc, softEdges, nSoftEdges)) + { + /* Add this edge to the list of soft edges in the cycle */ + Assert(*nSoftEdges < MaxBackends); + softEdges[*nSoftEdges].waiter = checkProc; + softEdges[*nSoftEdges].blocker = proc; + (*nSoftEdges)++; + return true; + } + } + } + } + else + { + /* Use the true lock wait queue order */ + waitQueue = &(lock->waitProcs); + queue_size = waitQueue->size; + + proc = (PROC *) MAKE_PTR(waitQueue->links.next); + + while (queue_size-- > 0) + { + /* Done when we reach the target proc */ + if (proc == checkProc) + break; + + /* Is there a conflict with this guy's request? */ + if (((1 << proc->waitLockMode) & conflictMask) != 0) + { + /* This proc soft-blocks checkProc */ + if (FindLockCycleRecurse(proc, softEdges, nSoftEdges)) + { + /* Add this edge to the list of soft edges in the cycle */ + Assert(*nSoftEdges < MaxBackends); + softEdges[*nSoftEdges].waiter = checkProc; + softEdges[*nSoftEdges].blocker = proc; + (*nSoftEdges)++; + return true; + } + } + + proc = (PROC *) MAKE_PTR(proc->links.next); + } + } + + /* + * No conflict detected here. + */ + return false; +} + + +/* + * ExpandConstraints -- expand a list of constraints into a set of + * specific new orderings for affected wait queues + * + * Input is a list of soft edges to be reversed. The output is a list + * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PROC array + * workspace in waitOrderProcs[]. + * + * Returns TRUE if able to build an ordering that satisfies all the + * constraints, FALSE if not (there are contradictory constraints). + */ +static bool +ExpandConstraints(EDGE *constraints, + int nConstraints) +{ + int nWaitOrderProcs = 0; + int i, + j; + + nWaitOrders = 0; + /* + * Scan constraint list backwards. This is because the last-added + * constraint is the only one that could fail, and so we want to test + * it for inconsistency first. + */ + for (i = nConstraints; --i >= 0; ) + { + PROC *proc = constraints[i].waiter; + LOCK *lock = proc->waitLock; + + /* Did we already make a list for this lock? */ + for (j = nWaitOrders; --j >= 0; ) + { + if (waitOrders[j].lock == lock) + break; + } + if (j >= 0) + continue; + /* No, so allocate a new list */ + waitOrders[nWaitOrders].lock = lock; + waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs; + waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; + nWaitOrderProcs += lock->waitProcs.size; + Assert(nWaitOrderProcs <= MaxBackends); + /* + * Do the topo sort. TopoSort need not examine constraints after + * this one, since they must be for different locks. + */ + if (!TopoSort(lock, constraints, i+1, + waitOrders[nWaitOrders].procs)) + return false; + nWaitOrders++; + } + return true; +} + + +/* + * TopoSort -- topological sort of a wait queue + * + * Generate a re-ordering of a lock's wait queue that satisfies given + * constraints about certain procs preceding others. (Each such constraint + * is a fact of a partial ordering.) Minimize rearrangement of the queue + * not needed to achieve the partial ordering. + * + * This is a lot simpler and slower than, for example, the topological sort + * algorithm shown in Knuth's Volume 1. However, Knuth's method doesn't + * try to minimize the damage to the existing order. In practice we are + * not likely to be working with more than a few constraints, so the apparent + * slowness of the algorithm won't really matter. + * + * The initial queue ordering is taken directly from the lock's wait queue. + * The output is an array of PROC pointers, of length equal to the lock's + * wait queue length (the caller is responsible for providing this space). + * The partial order is specified by an array of EDGE structs. Each EDGE + * is one that we need to reverse, therefore the "waiter" must appear before + * the "blocker" in the output array. The EDGE array may well contain + * edges associated with other locks; these should be ignored. + * + * Returns TRUE if able to build an ordering that satisfies all the + * constraints, FALSE if not (there are contradictory constraints). + */ +static bool +TopoSort(LOCK *lock, + EDGE *constraints, + int nConstraints, + PROC **ordering) /* output argument */ +{ + PROC_QUEUE *waitQueue = &(lock->waitProcs); + int queue_size = waitQueue->size; + PROC *proc; + int i, + j, + k, + last; + + /* First, fill topoProcs[] array with the procs in their current order */ + proc = (PROC *) MAKE_PTR(waitQueue->links.next); + for (i = 0; i < queue_size; i++) + { + topoProcs[i] = proc; + proc = (PROC *) MAKE_PTR(proc->links.next); + } + + /* + * Scan the constraints, and for each proc in the array, generate a count + * of the number of constraints that say it must be before something else, + * plus a list of the constraints that say it must be after something else. + * The count for the j'th proc is stored in beforeConstraints[j], and the + * head of its list in afterConstraints[j]. Each constraint stores its + * list link in constraints[i].link (note any constraint will be in + * just one list). The array index for the before-proc of the i'th + * constraint is remembered in constraints[i].pred. + */ + MemSet(beforeConstraints, 0, queue_size * sizeof(int)); + MemSet(afterConstraints, 0, queue_size * sizeof(int)); + for (i = 0; i < nConstraints; i++) + { + proc = constraints[i].waiter; + /* Ignore constraint if not for this lock */ + if (proc->waitLock != lock) + continue; + /* Find the waiter proc in the array */ + for (j = queue_size; --j >= 0; ) + { + if (topoProcs[j] == proc) + break; + } + Assert(j >= 0); /* should have found a match */ + /* Find the blocker proc in the array */ + proc = constraints[i].blocker; + for (k = queue_size; --k >= 0; ) + { + if (topoProcs[k] == proc) + break; + } + Assert(k >= 0); /* should have found a match */ + beforeConstraints[j]++; /* waiter must come before */ + /* add this constraint to list of after-constraints for blocker */ + constraints[i].pred = j; + constraints[i].link = afterConstraints[k]; + afterConstraints[k] = i+1; + } + /*-------------------- + * Now scan the topoProcs array backwards. At each step, output the + * last proc that has no remaining before-constraints, and decrease + * the beforeConstraints count of each of the procs it was constrained + * against. + * i = index of ordering[] entry we want to output this time + * j = search index for topoProcs[] + * k = temp for scanning constraint list for proc j + * last = last non-null index in topoProcs (avoid redundant searches) + *-------------------- + */ + last = queue_size-1; + for (i = queue_size; --i >= 0; ) + { + /* Find next candidate to output */ + while (topoProcs[last] == NULL) + last--; + for (j = last; j >= 0; j--) + { + if (topoProcs[j] != NULL && beforeConstraints[j] == 0) + break; + } + /* If no available candidate, topological sort fails */ + if (j < 0) + return false; + /* Output candidate, and mark it done by zeroing topoProcs[] entry */ + ordering[i] = topoProcs[j]; + topoProcs[j] = NULL; + /* Update beforeConstraints counts of its predecessors */ + for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link) + { + beforeConstraints[constraints[k-1].pred]--; + } + } + + /* Done */ + return true; +} + +#ifdef DEBUG_DEADLOCK +static void +PrintLockQueue(LOCK *lock, const char *info) +{ + PROC_QUEUE *waitQueue = &(lock->waitProcs); + int queue_size = waitQueue->size; + PROC *proc; + int i; + + printf("%s lock %lx queue ", info, MAKE_OFFSET(lock)); + proc = (PROC *) MAKE_PTR(waitQueue->links.next); + for (i = 0; i < queue_size; i++) + { + printf(" %d", proc->pid); + proc = (PROC *) MAKE_PTR(proc->links.next); + } + printf("\n"); + fflush(stdout); +} +#endif diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 3d77ab2b4d..08e023718e 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.80 2001/01/24 19:43:08 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.81 2001/01/25 03:31:16 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -24,16 +24,16 @@ * * LockAcquire(), LockRelease(), LockMethodTableInit(), * LockMethodTableRename(), LockReleaseAll, - * LockResolveConflicts(), GrantLock() + * LockCheckConflicts(), GrantLock() * *------------------------------------------------------------------------- */ +#include "postgres.h" + #include #include #include -#include "postgres.h" - #include "access/xact.h" #include "miscadmin.h" #include "storage/proc.h" @@ -44,7 +44,6 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, LOCK *lock, HOLDER *holder); static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding); -static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc); static char *lock_types[] = { @@ -211,6 +210,18 @@ LockingDisabled(void) return LockingIsDisabled; } +/* + * Fetch the lock method table associated with a given lock + */ +LOCKMETHODTABLE * +GetLocksMethodTable(LOCK *lock) +{ + LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock); + + Assert(lockmethod > 0 && lockmethod < NumLockMethods); + return LockMethodTable[lockmethod]; +} + /* * LockMethodInit -- initialize the lock table's lock type @@ -559,7 +570,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, if (!holder) { SpinRelease(masterLock); - elog(NOTICE, "LockAcquire: holder table corrupted"); + elog(FATAL, "LockAcquire: holder table corrupted"); return FALSE; } @@ -623,11 +634,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* -------------------- - * If I'm the only one holding any lock on this object, then there - * cannot be a conflict. The same is true if I already hold this lock. + * If I already hold one or more locks of the requested type, + * just grant myself another one without blocking. * -------------------- */ - if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0) + if (holder->holding[lockmode] > 0) { GrantLock(lock, holder, lockmode); HOLDER_PRINT("LockAcquire: owning", holder); @@ -637,11 +648,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* -------------------- * If this process (under any XID) is a holder of the lock, - * then there is no conflict, either. + * also grant myself another one without blocking. * -------------------- */ LockCountMyLocks(holder->tag.lock, MyProc, myHolding); - if (myHolding[lockmode] != 0) + if (myHolding[lockmode] > 0) { GrantLock(lock, holder, lockmode); HOLDER_PRINT("LockAcquire: my other XID owning", holder); @@ -649,42 +660,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, return TRUE; } - /* - * If lock requested conflicts with locks requested by waiters... + /* -------------------- + * If lock requested conflicts with locks requested by waiters, + * must join wait queue. Otherwise, check for conflict with + * already-held locks. (That's last because most complex check.) + * -------------------- */ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) - { - /* - * If my process doesn't hold any locks that conflict with waiters - * then force to sleep, so that prior waiters get first chance. - */ - for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++) - { - if (myHolding[i] > 0 && - lockMethodTable->ctl->conflictTab[i] & lock->waitMask) - break; /* yes, there is a conflict */ - } - - if (i > lockMethodTable->ctl->numLockModes) - { - HOLDER_PRINT("LockAcquire: another proc already waiting", - holder); - status = STATUS_FOUND; - } - else - status = LockResolveConflicts(lockmethod, lockmode, - lock, holder, - MyProc, myHolding); - } + status = STATUS_FOUND; else - status = LockResolveConflicts(lockmethod, lockmode, - lock, holder, - MyProc, myHolding); + status = LockCheckConflicts(lockMethodTable, lockmode, + lock, holder, + MyProc, myHolding); if (status == STATUS_OK) - GrantLock(lock, holder, lockmode); - else if (status == STATUS_FOUND) { + /* No conflict with held or previously requested locks */ + GrantLock(lock, holder, lockmode); + } + else + { + Assert(status == STATUS_FOUND); #ifdef USER_LOCKS /* @@ -765,49 +761,50 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } /* ---------------------------- - * LockResolveConflicts -- test for lock conflicts + * LockCheckConflicts -- test whether requested lock conflicts + * with those already granted + * + * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict. * * NOTES: - * Here's what makes this complicated: one transaction's - * locks don't conflict with one another. When many processes - * hold locks, each has to subtract off the other's locks when - * determining whether or not any new lock acquired conflicts with - * the old ones. + * Here's what makes this complicated: one process's locks don't + * conflict with one another, even if they are held under different + * transaction IDs (eg, session and xact locks do not conflict). + * So, we must subtract off our own locks when determining whether the + * requested new lock conflicts with those already held. * * The caller can optionally pass the process's total holding counts, if * known. If NULL is passed then these values will be computed internally. * ---------------------------- */ int -LockResolveConflicts(LOCKMETHOD lockmethod, - LOCKMODE lockmode, - LOCK *lock, - HOLDER *holder, - PROC *proc, - int *myHolding) /* myHolding[] array or NULL */ +LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, + LOCKMODE lockmode, + LOCK *lock, + HOLDER *holder, + PROC *proc, + int *myHolding) /* myHolding[] array or NULL */ { - LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl; + LOCKMETHODCTL *lockctl = lockMethodTable->ctl; int numLockModes = lockctl->numLockModes; int bitmask; int i, tmpMask; int localHolding[MAX_LOCKMODES]; - Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); - /* ---------------------------- * first check for global conflicts: If no locks conflict - * with mine, then I get the lock. + * with my request, then I get the lock. * * Checking for conflict: lock->grantMask represents the types of * currently held locks. conflictTable[lockmode] has a bit - * set for each type of lock that conflicts with mine. Bitwise + * set for each type of lock that conflicts with request. Bitwise * compare tells if there is a conflict. * ---------------------------- */ if (!(lockctl->conflictTab[lockmode] & lock->grantMask)) { - HOLDER_PRINT("LockResolveConflicts: no conflict", holder); + HOLDER_PRINT("LockCheckConflicts: no conflict", holder); return STATUS_OK; } @@ -844,11 +841,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod, if (!(lockctl->conflictTab[lockmode] & bitmask)) { /* no conflict. OK to get the lock */ - HOLDER_PRINT("LockResolveConflicts: resolved", holder); + HOLDER_PRINT("LockCheckConflicts: resolved", holder); return STATUS_OK; } - HOLDER_PRINT("LockResolveConflicts: conflicting", holder); + HOLDER_PRINT("LockCheckConflicts: conflicting", holder); return STATUS_FOUND; } @@ -889,33 +886,12 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding) } } -/* - * LockGetMyHeldLocks -- compute bitmask of lock types held by a process - * for a given lockable object. - */ -static int -LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc) -{ - int myHolding[MAX_LOCKMODES]; - int heldLocks = 0; - int i, - tmpMask; - - LockCountMyLocks(lockOffset, proc, myHolding); - - for (i = 1, tmpMask = 2; - i < MAX_LOCKMODES; - i++, tmpMask <<= 1) - { - if (myHolding[i] > 0) - heldLocks |= tmpMask; - } - return heldLocks; -} - /* * GrantLock -- update the lock and holder data structures to show * the lock request has been granted. + * + * NOTE: if proc was blocked, it also needs to be removed from the wait list + * and have its waitLock/waitHolder fields cleared. That's not done here. */ void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode) @@ -936,6 +912,9 @@ GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode) /* * WaitOnLock -- wait to acquire a lock * + * Caller must have set MyProc->heldLocks to reflect locks already held + * on the lockable object by this process (under all XIDs). + * * The locktable spinlock must be held at entry. */ static int @@ -956,7 +935,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, strcat(new_status, " waiting"); set_ps_display(new_status); - /* + /* ------------------- * NOTE: Think not to put any lock state cleanup after the call to * ProcSleep, in either the normal or failure path. The lock state * must be fully set by the lock grantor, or by HandleDeadLock if we @@ -965,12 +944,13 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, * after someone else grants us the lock, but before we've noticed it. * Hence, after granting, the locktable state must fully reflect the * fact that we own the lock; we can't do additional work on return. + * ------------------- */ - if (ProcSleep(lockMethodTable->ctl, + if (ProcSleep(lockMethodTable, lockmode, lock, - holder) != NO_ERROR) + holder) != STATUS_OK) { /* ------------------- * We failed as a result of a deadlock, see HandleDeadLock(). @@ -992,14 +972,60 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, return STATUS_OK; } +/*-------------------- + * Remove a proc from the wait-queue it is on + * (caller must know it is on one). + * + * Locktable lock must be held by caller. + * + * NB: this does not remove the process' holder object, nor the lock object, + * even though their counts might now have gone to zero. That will happen + * during a subsequent LockReleaseAll call, which we expect will happen + * during transaction cleanup. (Removal of a proc from its wait queue by + * this routine can only happen if we are aborting the transaction.) + *-------------------- + */ +void +RemoveFromWaitQueue(PROC *proc) +{ + LOCK *waitLock = proc->waitLock; + LOCKMODE lockmode = proc->waitLockMode; + + /* Make sure proc is waiting */ + Assert(proc->links.next != INVALID_OFFSET); + Assert(waitLock); + Assert(waitLock->waitProcs.size > 0); + + /* Remove proc from lock's wait queue */ + SHMQueueDelete(&(proc->links)); + waitLock->waitProcs.size--; + + /* Undo increments of request counts by waiting process */ + Assert(waitLock->nRequested > 0); + Assert(waitLock->nRequested > proc->waitLock->nGranted); + waitLock->nRequested--; + Assert(waitLock->requested[lockmode] > 0); + waitLock->requested[lockmode]--; + /* don't forget to clear waitMask bit if appropriate */ + if (waitLock->granted[lockmode] == waitLock->requested[lockmode]) + waitLock->waitMask &= BITS_OFF[lockmode]; + + /* Clean up the proc's own state */ + proc->waitLock = NULL; + proc->waitHolder = NULL; + + /* See if any other waiters for the lock can be woken up now */ + ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock); +} + /* * LockRelease -- look up 'locktag' in lock table 'lockmethod' and - * release it. + * release one 'lockmode' lock on it. * - * Side Effects: if the lock no longer conflicts with the highest - * priority waiting process, that process is granted the lock - * and awoken. (We have to grant the lock here to avoid a - * race between the waking process and any new process to + * Side Effects: find any waiting processes that are now wakable, + * grant them their requested locks and awaken them. + * (We have to grant the lock here to avoid a race between + * the waking process and any new process to * come along and request the lock.) */ bool @@ -1013,7 +1039,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, HOLDER *holder; HOLDERTAG holdertag; HTAB *holderTable; - bool wakeupNeeded = true; + bool wakeupNeeded = false; #ifdef LOCK_DEBUG if (lockmethod == USER_LOCKMETHOD && Trace_userlocks) @@ -1086,7 +1112,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, return FALSE; } HOLDER_PRINT("LockRelease: found", holder); - Assert(holder->tag.lock == MAKE_OFFSET(lock)); /* * Check that we are actually holding a lock of the type we want to @@ -1094,11 +1119,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, */ if (!(holder->holding[lockmode] > 0)) { - SpinRelease(masterLock); HOLDER_PRINT("LockRelease: WRONGTYPE", holder); + Assert(holder->holding[lockmode] >= 0); + SpinRelease(masterLock); elog(NOTICE, "LockRelease: you don't own a lock of type %s", lock_types[lockmode]); - Assert(holder->holding[lockmode] >= 0); return FALSE; } Assert(holder->nHolding > 0); @@ -1120,34 +1145,24 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, lock->grantMask &= BITS_OFF[lockmode]; } -#ifdef NOT_USED - /* -------------------------- - * If there are still active locks of the type I just released, no one - * should be woken up. Whoever is asleep will still conflict - * with the remaining locks. - * -------------------------- - */ - if (lock->granted[lockmode]) - wakeupNeeded = false; - else -#endif - - /* - * Above is not valid any more (due to MVCC lock modes). Actually - * we should compare granted[lockmode] with number of - * waiters holding lock of this type and try to wakeup only if - * these numbers are equal (and lock released conflicts with locks - * requested by waiters). For the moment we only check the last - * condition. - */ - if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) - wakeupNeeded = true; - LOCK_PRINT("LockRelease: updated", lock, lockmode); Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); + /* -------------------------- + * We need only run ProcLockWakeup if the released lock conflicts with + * at least one of the lock types requested by waiter(s). Otherwise + * whatever conflict made them wait must still exist. NOTE: before MVCC, + * we could skip wakeup if lock->granted[lockmode] was still positive. + * But that's not true anymore, because the remaining granted locks might + * belong to some waiter, who could now be awakened because he doesn't + * conflict with his own locks. + * -------------------------- + */ + if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) + wakeupNeeded = true; + if (lock->nRequested == 0) { /* ------------------ @@ -1161,8 +1176,13 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, (Pointer) &(lock->tag), HASH_REMOVE, &found); - Assert(lock && found); - wakeupNeeded = false; + if (!lock || !found) + { + SpinRelease(masterLock); + elog(NOTICE, "LockRelease: remove lock, table corrupted"); + return FALSE; + } + wakeupNeeded = false; /* should be false, but make sure */ } /* @@ -1192,12 +1212,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, } } + /* + * Wake up waiters if needed. + */ if (wakeupNeeded) - ProcLockWakeup(lockmethod, lock); -#ifdef LOCK_DEBUG - else if (LOCK_DEBUG_ENABLED(lock)) - elog(DEBUG, "LockRelease: no wakeup needed"); -#endif + ProcLockWakeup(lockMethodTable, lock); SpinRelease(masterLock); return TRUE; @@ -1310,8 +1329,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, else { /* -------------- - * set nRequested to zero so that we can garbage collect the lock - * down below... + * This holder accounts for all the requested locks on the object, + * so we can be lazy and just zero things out. * -------------- */ lock->nRequested = 0; @@ -1347,7 +1366,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, return FALSE; } - if (!lock->nRequested) + if (lock->nRequested == 0) { /* -------------------- * We've just released the last lock, so garbage-collect the @@ -1359,7 +1378,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found); - if ((!lock) || (!found)) + if (!lock || !found) { SpinRelease(masterLock); elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB"); @@ -1367,7 +1386,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, } } else if (wakeupNeeded) - ProcLockWakeup(lockmethod, lock); + ProcLockWakeup(lockMethodTable, lock); next_item: holder = nextHolder; @@ -1412,245 +1431,6 @@ LockShmemSize(int maxBackends) return size; } -/* - * DeadLockCheck -- Checks for deadlocks for a given process - * - * This code takes a list of locks a process holds, and the lock that - * the process is sleeping on, and tries to find if any of the processes - * waiting on its locks hold the lock it is waiting for. If no deadlock - * is found, it goes on to look at all the processes waiting on their locks. - * - * We can't block on user locks, so no sense testing for deadlock - * because there is no blocking, and no timer for the block. So, - * only look at regular locks. - * - * We have already locked the master lock before being called. - */ -bool -DeadLockCheck(PROC *thisProc, LOCK *findlock) -{ - PROC *waitProc; - PROC_QUEUE *waitQueue; - SHM_QUEUE *procHolders = &(thisProc->procHolders); - HOLDER *holder; - HOLDER *nextHolder; - LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl; - LOCK *lock; - int i, - j; - bool first_run = (thisProc == MyProc); - - static PROC *checked_procs[MAXBACKENDS]; - static int nprocs; - - /* initialize at start of recursion */ - if (first_run) - { - checked_procs[0] = thisProc; - nprocs = 1; - } - - /* - * Scan over all the locks held/awaited by thisProc. - */ - holder = (HOLDER *) SHMQueueNext(procHolders, procHolders, - offsetof(HOLDER, procLink)); - - while (holder) - { - /* Get link first, since we may unlink/delete this holder */ - nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink, - offsetof(HOLDER, procLink)); - - Assert(holder->tag.proc == MAKE_OFFSET(thisProc)); - - lock = (LOCK *) MAKE_PTR(holder->tag.lock); - - /* Ignore user locks */ - if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD) - goto nxtl; - - HOLDER_PRINT("DeadLockCheck", holder); - LOCK_PRINT("DeadLockCheck", lock, 0); - - /* - * waitLock is always in procHolders of waiting proc, if !first_run - * then upper caller will handle waitProcs queue of waitLock. - */ - if (thisProc->waitLock == lock && !first_run) - goto nxtl; - - /* - * If we found proc holding findlock and sleeping on some my other - * lock then we have to check does it block me or another waiters. - */ - if (lock == findlock && !first_run) - { - int lm; - - Assert(holder->nHolding > 0); - for (lm = 1; lm <= lockctl->numLockModes; lm++) - { - if (holder->holding[lm] > 0 && - lockctl->conflictTab[lm] & findlock->waitMask) - return true; - } - - /* - * Else - get the next lock from thisProc's procHolders - */ - goto nxtl; - } - - waitQueue = &(lock->waitProcs); - waitProc = (PROC *) MAKE_PTR(waitQueue->links.next); - - /* - * Inner loop scans over all processes waiting for this lock. - * - * NOTE: loop must count down because we want to examine each item - * in the queue even if waitQueue->size decreases due to waking up - * some of the processes. - */ - for (i = waitQueue->size; --i >= 0; ) - { - Assert(waitProc->waitLock == lock); - if (waitProc == thisProc) - { - /* This should only happen at first level */ - Assert(waitProc == MyProc); - goto nextWaitProc; - } - if (lock == findlock) /* first_run also true */ - { - /* - * If I'm blocked by his heldLocks... - */ - if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks) - { - /* and he blocked by me -> deadlock */ - if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) - return true; - /* we shouldn't look at procHolders of our blockers */ - goto nextWaitProc; - } - - /* - * If he isn't blocked by me and we request - * non-conflicting lock modes - no deadlock here because - * he isn't blocked by me in any sense (explicitly or - * implicitly). Note that we don't do like test if - * !first_run (when thisProc is holder and non-waiter on - * lock) and so we call DeadLockCheck below for every - * waitProc in thisProc->procHolders, even for waitProc-s - * un-blocked by thisProc. Should we? This could save us - * some time... - */ - if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) && - !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode))) - goto nextWaitProc; - } - - /* - * Skip this waiter if already checked. - */ - for (j = 0; j < nprocs; j++) - { - if (checked_procs[j] == waitProc) - goto nextWaitProc; - } - - /* Recursively check this process's procHolders. */ - Assert(nprocs < MAXBACKENDS); - checked_procs[nprocs++] = waitProc; - - if (DeadLockCheck(waitProc, findlock)) - { - int heldLocks; - - /* - * Ok, but is waitProc waiting for me (thisProc) ? - */ - if (thisProc->waitLock == lock) - { - Assert(first_run); - heldLocks = thisProc->heldLocks; - } - else - { - /* should we cache heldLocks to speed this up? */ - heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc); - Assert(heldLocks != 0); - } - if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks) - { - /* - * Last attempt to avoid deadlock: try to wakeup myself. - */ - if (first_run) - { - if (LockResolveConflicts(DEFAULT_LOCKMETHOD, - MyProc->waitLockMode, - MyProc->waitLock, - MyProc->waitHolder, - MyProc, - NULL) == STATUS_OK) - { - GrantLock(MyProc->waitLock, - MyProc->waitHolder, - MyProc->waitLockMode); - ProcWakeup(MyProc, NO_ERROR); - return false; - } - } - return true; - } - - /* - * Hell! Is he blocked by any (other) holder ? - */ - if (LockResolveConflicts(DEFAULT_LOCKMETHOD, - waitProc->waitLockMode, - lock, - waitProc->waitHolder, - waitProc, - NULL) != STATUS_OK) - { - /* - * Blocked by others - no deadlock... - */ - LOCK_PRINT("DeadLockCheck: blocked by others", - lock, waitProc->waitLockMode); - goto nextWaitProc; - } - - /* - * Well - wakeup this guy! This is the case of - * implicit blocking: thisProc blocked someone who - * blocked waitProc by the fact that he/someone is - * already waiting for lock. We do this for - * anti-starving. - */ - GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode); - waitProc = ProcWakeup(waitProc, NO_ERROR); - /* - * Use next-proc link returned by ProcWakeup, since this - * proc's own links field is now cleared. - */ - continue; - } - -nextWaitProc: - waitProc = (PROC *) MAKE_PTR(waitProc->links.next); - } - -nxtl: - holder = nextHolder; - } - - /* if we got here, no deadlock */ - return false; -} #ifdef LOCK_DEBUG /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 377e9dbeb5..fd4c4b1485 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.97 2001/01/25 03:31:16 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -18,7 +18,7 @@ * * * Interface (a): - * ProcSleep(), ProcWakeup(), ProcWakeupNext(), + * ProcSleep(), ProcWakeup(), * ProcQueueAlloc() -- create a shm queue for sleeping processes * ProcQueueInit() -- create a queue without allocing memory * @@ -47,8 +47,6 @@ * shared among backends (we keep a few sets of semaphores around). * This is so that we can support more backends. (system-wide semaphore * sets run out pretty fast.) -ay 4/95 - * - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $ */ #include "postgres.h" @@ -257,7 +255,7 @@ InitProcess(void) } SHMQueueElemInit(&(MyProc->links)); - MyProc->errType = NO_ERROR; + MyProc->errType = STATUS_OK; MyProc->pid = MyProcPid; MyProc->databaseId = MyDatabaseId; MyProc->xid = InvalidTransactionId; @@ -284,7 +282,16 @@ InitProcess(void) (location != MAKE_OFFSET(MyProc))) elog(STOP, "InitProcess: ShmemPID table broken"); + /* + * Arrange to clean up at backend exit. + */ on_shmem_exit(ProcKill, 0); + + /* + * Now that we have a PROC, we could try to acquire locks, + * so initialize the deadlock checker. + */ + InitDeadLockChecking(); } /* @@ -304,50 +311,6 @@ ZeroProcSemaphore(PROC *proc) } } -/* - * Remove a proc from the wait-queue it is on - * (caller must know it is on one). - * Locktable lock must be held by caller. - * - * NB: this does not remove the process' holder object, nor the lock object, - * even though their counts might now have gone to zero. That will happen - * during a subsequent LockReleaseAll call, which we expect will happen - * during transaction cleanup. (Removal of a proc from its wait queue by - * this routine can only happen if we are aborting the transaction.) - */ -static void -RemoveFromWaitQueue(PROC *proc) -{ - LOCK *waitLock = proc->waitLock; - LOCKMODE lockmode = proc->waitLockMode; - - /* Make sure proc is waiting */ - Assert(proc->links.next != INVALID_OFFSET); - Assert(waitLock); - Assert(waitLock->waitProcs.size > 0); - - /* Remove proc from lock's wait queue */ - SHMQueueDelete(&(proc->links)); - waitLock->waitProcs.size--; - - /* Undo increments of request counts by waiting process */ - Assert(waitLock->nRequested > 0); - Assert(waitLock->nRequested > proc->waitLock->nGranted); - waitLock->nRequested--; - Assert(waitLock->requested[lockmode] > 0); - waitLock->requested[lockmode]--; - /* don't forget to clear waitMask bit if appropriate */ - if (waitLock->granted[lockmode] == waitLock->requested[lockmode]) - waitLock->waitMask &= ~(1 << lockmode); - - /* Clean up the proc's own state */ - proc->waitLock = NULL; - proc->waitHolder = NULL; - - /* See if any other waiters for the lock can be woken up now */ - ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock); -} - /* * Cancel any pending wait for lock, when aborting a transaction. * @@ -529,34 +492,34 @@ ProcQueueInit(PROC_QUEUE *queue) /* * ProcSleep -- put a process to sleep * - * P() on the semaphore should put us to sleep. The process - * semaphore is normally zero, so when we try to acquire it, we sleep. + * Caller must have set MyProc->heldLocks to reflect locks already held + * on the lockable object by this process (under all XIDs). * * Locktable's spinlock must be held at entry, and will be held * at exit. * - * Result is NO_ERROR if we acquired the lock, STATUS_ERROR if not (deadlock). + * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock). * * ASSUME: that no one will fiddle with the queue until after * we release the spin lock. * * NOTES: The process queue is now a priority queue for locking. + * + * P() on the semaphore should put us to sleep. The process + * semaphore is normally zero, so when we try to acquire it, we sleep. */ int -ProcSleep(LOCKMETHODCTL *lockctl, +ProcSleep(LOCKMETHODTABLE *lockMethodTable, LOCKMODE lockmode, LOCK *lock, HOLDER *holder) { - PROC_QUEUE *waitQueue = &(lock->waitProcs); + LOCKMETHODCTL *lockctl = lockMethodTable->ctl; SPINLOCK spinlock = lockctl->masterLock; - int myMask = (1 << lockmode); - int waitMask = lock->waitMask; + PROC_QUEUE *waitQueue = &(lock->waitProcs); + int myHeldLocks = MyProc->heldLocks; PROC *proc; int i; - int aheadGranted[MAX_LOCKMODES]; - bool selfConflict = (lockctl->conflictTab[lockmode] & myMask), - prevSame = false; #ifndef __BEOS__ struct itimerval timeval, dummy; @@ -564,64 +527,63 @@ ProcSleep(LOCKMETHODCTL *lockctl, bigtime_t time_interval; #endif - proc = (PROC *) MAKE_PTR(waitQueue->links.next); - - /* if we don't conflict with any waiter - be first in queue */ - if (!(lockctl->conflictTab[lockmode] & waitMask)) - goto ins; - - /* otherwise, determine where we should go into the queue */ - for (i = 1; i < MAX_LOCKMODES; i++) - aheadGranted[i] = lock->granted[i]; - (aheadGranted[lockmode])++; - - for (i = 0; i < waitQueue->size; i++) + /* ---------------------- + * Determine where to add myself in the wait queue. + * + * Normally I should go at the end of the queue. However, if I already + * hold locks that conflict with the request of any previous waiter, + * put myself in the queue just in front of the first such waiter. + * This is not a necessary step, since deadlock detection would move + * me to before that waiter anyway; but it's relatively cheap to detect + * such a conflict immediately, and avoid delaying till deadlock timeout. + * + * Special case: if I find I should go in front of the first waiter, + * and I do not conflict with already-held locks, then just grant myself + * the requested lock immediately. + * ---------------------- + */ + if (myHeldLocks != 0) { - LOCKMODE procWaitMode = proc->waitLockMode; - - /* must I wait for him ? */ - if (lockctl->conflictTab[lockmode] & proc->heldLocks) + proc = (PROC *) MAKE_PTR(waitQueue->links.next); + for (i = 0; i < waitQueue->size; i++) { - /* is he waiting for me ? */ - if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks) + /* Must he wait for me? */ + if (lockctl->conflictTab[proc->waitLockMode] & myHeldLocks) { - /* Yes, report deadlock failure */ - MyProc->errType = STATUS_ERROR; - return STATUS_ERROR; - } - /* I must go after him in queue - so continue loop */ - } - /* if he waits for me, go before him in queue */ - else if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks) - break; - /* if conflicting locks requested */ - else if (lockctl->conflictTab[procWaitMode] & myMask) - { - - /* - * If I request non self-conflicting lock and there are others - * requesting the same lock just before this guy - stop here. - */ - if (!selfConflict && prevSame) + /* Must I wait for him ? */ + if (lockctl->conflictTab[lockmode] & proc->heldLocks) + { + /* Yes, can report deadlock failure immediately */ + MyProc->errType = STATUS_ERROR; + return STATUS_ERROR; + } + if (i == 0) + { + /* I must go before first waiter. Check special case. */ + if (LockCheckConflicts(lockMethodTable, + lockmode, + lock, + holder, + MyProc, + NULL) == STATUS_OK) + { + /* Skip the wait and just grant myself the lock. */ + GrantLock(lock, holder, lockmode); + return STATUS_OK; + } + } + /* Break out of loop to put myself before him */ break; + } + proc = (PROC *) MAKE_PTR(proc->links.next); } - - /* - * Last attempt to not move any further to the back of the queue: - * if we don't conflict with remaining waiters, stop here. - */ - else if (!(lockctl->conflictTab[lockmode] & waitMask)) - break; - - /* Move past this guy, and update state accordingly */ - prevSame = (procWaitMode == lockmode); - (aheadGranted[procWaitMode])++; - if (aheadGranted[procWaitMode] == lock->requested[procWaitMode]) - waitMask &= ~(1 << procWaitMode); - proc = (PROC *) MAKE_PTR(proc->links.next); + } + else + { + /* I hold no locks, so I can't push in front of anyone. */ + proc = (PROC *) &(waitQueue->links); } -ins:; /* ------------------- * Insert self into queue, ahead of the given proc (or at tail of queue). * ------------------- @@ -629,15 +591,14 @@ ins:; SHMQueueInsertBefore(&(proc->links), &(MyProc->links)); waitQueue->size++; - lock->waitMask |= myMask; + lock->waitMask |= (1 << lockmode); /* Set up wait information in PROC object, too */ MyProc->waitLock = lock; MyProc->waitHolder = holder; MyProc->waitLockMode = lockmode; - /* We assume the caller set up MyProc->heldLocks */ - MyProc->errType = NO_ERROR; /* initialize result for success */ + MyProc->errType = STATUS_OK; /* initialize result for success */ /* mark that we are waiting for a lock */ waitingForLock = true; @@ -662,7 +623,7 @@ ins:; * By delaying the check until we've waited for a bit, we can avoid * running the rather expensive deadlock-check code in most cases. * - * Need to zero out struct to set the interval and the micro seconds fields + * Need to zero out struct to set the interval and the microseconds fields * to 0. * -------------- */ @@ -768,89 +729,59 @@ ProcWakeup(PROC *proc, int errType) /* * ProcLockWakeup -- routine for waking up processes when a lock is - * released. + * released (or a prior waiter is aborted). Scan all waiters + * for lock, waken any that are no longer blocked. */ -int -ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock) +void +ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock) { - PROC_QUEUE *queue = &(lock->waitProcs); + LOCKMETHODCTL *lockctl = lockMethodTable->ctl; + PROC_QUEUE *waitQueue = &(lock->waitProcs); + int queue_size = waitQueue->size; PROC *proc; - int awoken = 0; - LOCKMODE last_lockmode = 0; - int queue_size = queue->size; + int conflictMask = 0; Assert(queue_size >= 0); - if (!queue_size) - return STATUS_NOT_FOUND; + if (queue_size == 0) + return; - proc = (PROC *) MAKE_PTR(queue->links.next); + proc = (PROC *) MAKE_PTR(waitQueue->links.next); while (queue_size-- > 0) { - if (proc->waitLockMode == last_lockmode) + LOCKMODE lockmode = proc->waitLockMode; + + /* + * Waken if (a) doesn't conflict with requests of earlier waiters, + * and (b) doesn't conflict with already-held locks. + */ + if (((1 << lockmode) & conflictMask) == 0 && + LockCheckConflicts(lockMethodTable, + lockmode, + lock, + proc->waitHolder, + proc, + NULL) == STATUS_OK) { + /* OK to waken */ + GrantLock(lock, proc->waitHolder, lockmode); + proc = ProcWakeup(proc, STATUS_OK); /* - * This proc will conflict as the previous one did, don't even - * try. + * ProcWakeup removes proc from the lock's waiting process queue + * and returns the next proc in chain; don't use proc's next-link, + * because it's been cleared. */ - goto nextProc; } - - /* - * Does this proc conflict with locks held by others ? - */ - if (LockResolveConflicts(lockmethod, - proc->waitLockMode, - lock, - proc->waitHolder, - proc, - NULL) != STATUS_OK) + else { - /* Yes. Quit if we already awoke at least one process. */ - if (awoken != 0) - break; - /* Otherwise, see if any later waiters can be awoken. */ - last_lockmode = proc->waitLockMode; - goto nextProc; + /* Cannot wake this guy. Add his request to conflict mask. */ + conflictMask |= lockctl->conflictTab[lockmode]; + proc = (PROC *) MAKE_PTR(proc->links.next); } - - /* - * OK to wake up this sleeping process. - */ - GrantLock(lock, proc->waitHolder, proc->waitLockMode); - proc = ProcWakeup(proc, NO_ERROR); - awoken++; - - /* - * ProcWakeup removes proc from the lock's waiting process queue - * and returns the next proc in chain; don't use proc's next-link, - * because it's been cleared. - */ - continue; - -nextProc: - proc = (PROC *) MAKE_PTR(proc->links.next); } - Assert(queue->size >= 0); - - if (awoken) - return STATUS_OK; - else - { - /* Something is still blocking us. May have deadlocked. */ -#ifdef LOCK_DEBUG - if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) - { - elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", - MAKE_OFFSET(lock)); - if (Debug_deadlocks) - DumpAllLocks(); - } -#endif - return STATUS_NOT_FOUND; - } + Assert(waitQueue->size >= 0); } /* -------------------- @@ -900,7 +831,7 @@ HandleDeadLock(SIGNAL_ARGS) DumpAllLocks(); #endif - if (!DeadLockCheck(MyProc, MyProc->waitLock)) + if (!DeadLockCheck(MyProc)) { /* No deadlock, so keep waiting */ UnlockLockTable(); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 39eefac0ca..0ada6eaccf 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: lock.h,v 1.43 2001/01/24 19:43:27 momjian Exp $ + * $Id: lock.h,v 1.44 2001/01/25 03:31:16 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -247,6 +247,7 @@ typedef struct HOLDER extern void InitLocks(void); extern void LockDisable(bool status); extern bool LockingDisabled(void); +extern LOCKMETHODTABLE *GetLocksMethodTable(LOCK *lock); extern LOCKMETHOD LockMethodTableInit(char *tabName, LOCKMASK *conflictsP, int *prioP, int numModes, int maxBackends); extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod); @@ -256,12 +257,15 @@ extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode); extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, bool allxids, TransactionId xid); -extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode, - LOCK *lock, HOLDER *holder, PROC *proc, - int *myHolding); +extern int LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, + LOCKMODE lockmode, + LOCK *lock, HOLDER *holder, PROC *proc, + int *myHolding); extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode); +extern void RemoveFromWaitQueue(PROC *proc); extern int LockShmemSize(int maxBackends); -extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock); +extern bool DeadLockCheck(PROC *proc); +extern void InitDeadLockChecking(void); #ifdef LOCK_DEBUG extern void DumpLocks(void); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 3f8902e7d3..4dd5a8c2a6 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: proc.h,v 1.38 2001/01/24 19:43:28 momjian Exp $ + * $Id: proc.h,v 1.39 2001/01/25 03:31:16 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -41,7 +41,7 @@ struct proc SHM_QUEUE links; /* list link if process is in a list */ SEMA sem; /* ONE semaphore to sleep on */ - int errType; /* error code tells why we woke up */ + int errType; /* STATUS_OK or STATUS_ERROR after wakeup */ TransactionId xid; /* transaction currently being executed by * this proc */ @@ -86,13 +86,6 @@ do { \ if (MyProc) (MyProc->sLocks[(lock)])--; \ } while (0) -/* - * flags explaining why process woke up - */ -#define NO_ERROR 0 -#define ERR_TIMEOUT 1 -#define ERR_BUFFER_IO 2 - /* * There is one ProcGlobal struct for the whole installation. @@ -134,10 +127,10 @@ extern void ProcReleaseLocks(bool isCommit); extern bool ProcRemove(int pid); extern void ProcQueueInit(PROC_QUEUE *queue); -extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode, +extern int ProcSleep(LOCKMETHODTABLE *lockMethodTable, LOCKMODE lockmode, LOCK *lock, HOLDER *holder); extern PROC *ProcWakeup(PROC *proc, int errType); -extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock); +extern void ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock); extern void ProcReleaseSpins(PROC *proc); extern bool LockWaitCancel(void); extern void HandleDeadLock(SIGNAL_ARGS);