Use dlists instead of SHM_QUEUE for syncrep queue
Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used and have an easier to use interface. Reviewed-by: Thomas Munro <thomas.munro@gmail.com> (in an older version) Discussion: https://postgr.es/m/20221120055930.t6kl3tyivzhlrzu2@awork3.anarazel.de Discussion: https://postgr.es/m/20200211042229.msv23badgqljrdg2@alap3.anarazel.de
This commit is contained in:
parent
5764f611e1
commit
12605414a7
@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
|
|||||||
else
|
else
|
||||||
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
|
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
|
||||||
|
|
||||||
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
|
Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
|
||||||
Assert(WalSndCtl != NULL);
|
Assert(WalSndCtl != NULL);
|
||||||
|
|
||||||
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
||||||
@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
|
|||||||
* assertions, but better safe than sorry).
|
* assertions, but better safe than sorry).
|
||||||
*/
|
*/
|
||||||
pg_read_barrier();
|
pg_read_barrier();
|
||||||
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
|
Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
|
||||||
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
||||||
MyProc->waitLSN = 0;
|
MyProc->waitLSN = 0;
|
||||||
|
|
||||||
@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
|
|||||||
static void
|
static void
|
||||||
SyncRepQueueInsert(int mode)
|
SyncRepQueueInsert(int mode)
|
||||||
{
|
{
|
||||||
PGPROC *proc;
|
dlist_head *queue;
|
||||||
|
dlist_iter iter;
|
||||||
|
|
||||||
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
||||||
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
|
queue = &WalSndCtl->SyncRepQueue[mode];
|
||||||
&(WalSndCtl->SyncRepQueue[mode]),
|
|
||||||
offsetof(PGPROC, syncRepLinks));
|
|
||||||
|
|
||||||
while (proc)
|
dlist_reverse_foreach(iter, queue)
|
||||||
{
|
{
|
||||||
|
PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Stop at the queue element that we should after to ensure the queue
|
* Stop at the queue element that we should insert after to ensure the
|
||||||
* is ordered by LSN.
|
* queue is ordered by LSN.
|
||||||
*/
|
*/
|
||||||
if (proc->waitLSN < MyProc->waitLSN)
|
if (proc->waitLSN < MyProc->waitLSN)
|
||||||
break;
|
{
|
||||||
|
dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
|
||||||
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
|
return;
|
||||||
&(proc->syncRepLinks),
|
}
|
||||||
offsetof(PGPROC, syncRepLinks));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (proc)
|
/*
|
||||||
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
|
* If we get here, the list was either empty, or this process needs to be
|
||||||
else
|
* at the head.
|
||||||
SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
|
*/
|
||||||
|
dlist_push_head(queue, &MyProc->syncRepLinks);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -373,8 +374,8 @@ static void
|
|||||||
SyncRepCancelWait(void)
|
SyncRepCancelWait(void)
|
||||||
{
|
{
|
||||||
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
||||||
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
|
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
|
||||||
SHMQueueDelete(&(MyProc->syncRepLinks));
|
dlist_delete_thoroughly(&MyProc->syncRepLinks);
|
||||||
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
||||||
LWLockRelease(SyncRepLock);
|
LWLockRelease(SyncRepLock);
|
||||||
}
|
}
|
||||||
@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void)
|
|||||||
* First check if we are removed from the queue without the lock to not
|
* First check if we are removed from the queue without the lock to not
|
||||||
* slow down backend exit.
|
* slow down backend exit.
|
||||||
*/
|
*/
|
||||||
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
|
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
|
||||||
{
|
{
|
||||||
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
/* maybe we have just been removed, so recheck */
|
/* maybe we have just been removed, so recheck */
|
||||||
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
|
if (!dlist_node_is_detached(&MyProc->syncRepLinks))
|
||||||
SHMQueueDelete(&(MyProc->syncRepLinks));
|
dlist_delete_thoroughly(&MyProc->syncRepLinks);
|
||||||
|
|
||||||
LWLockRelease(SyncRepLock);
|
LWLockRelease(SyncRepLock);
|
||||||
}
|
}
|
||||||
@ -879,20 +880,17 @@ static int
|
|||||||
SyncRepWakeQueue(bool all, int mode)
|
SyncRepWakeQueue(bool all, int mode)
|
||||||
{
|
{
|
||||||
volatile WalSndCtlData *walsndctl = WalSndCtl;
|
volatile WalSndCtlData *walsndctl = WalSndCtl;
|
||||||
PGPROC *proc = NULL;
|
|
||||||
PGPROC *thisproc = NULL;
|
|
||||||
int numprocs = 0;
|
int numprocs = 0;
|
||||||
|
dlist_mutable_iter iter;
|
||||||
|
|
||||||
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
||||||
Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
|
Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
|
||||||
Assert(SyncRepQueueIsOrderedByLSN(mode));
|
Assert(SyncRepQueueIsOrderedByLSN(mode));
|
||||||
|
|
||||||
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
|
||||||
&(WalSndCtl->SyncRepQueue[mode]),
|
|
||||||
offsetof(PGPROC, syncRepLinks));
|
|
||||||
|
|
||||||
while (proc)
|
|
||||||
{
|
{
|
||||||
|
PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Assume the queue is ordered by LSN
|
* Assume the queue is ordered by LSN
|
||||||
*/
|
*/
|
||||||
@ -900,18 +898,9 @@ SyncRepWakeQueue(bool all, int mode)
|
|||||||
return numprocs;
|
return numprocs;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Move to next proc, so we can delete thisproc from the queue.
|
* Remove from queue.
|
||||||
* thisproc is valid, proc may be NULL after this.
|
|
||||||
*/
|
*/
|
||||||
thisproc = proc;
|
dlist_delete_thoroughly(&proc->syncRepLinks);
|
||||||
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
|
||||||
&(proc->syncRepLinks),
|
|
||||||
offsetof(PGPROC, syncRepLinks));
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Remove thisproc from queue.
|
|
||||||
*/
|
|
||||||
SHMQueueDelete(&(thisproc->syncRepLinks));
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
|
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
|
||||||
@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode)
|
|||||||
* Set state to complete; see SyncRepWaitForLSN() for discussion of
|
* Set state to complete; see SyncRepWaitForLSN() for discussion of
|
||||||
* the various states.
|
* the various states.
|
||||||
*/
|
*/
|
||||||
thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
|
proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Wake only when we have set state and removed from queue.
|
* Wake only when we have set state and removed from queue.
|
||||||
*/
|
*/
|
||||||
SetLatch(&(thisproc->procLatch));
|
SetLatch(&(proc->procLatch));
|
||||||
|
|
||||||
numprocs++;
|
numprocs++;
|
||||||
}
|
}
|
||||||
@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void)
|
|||||||
static bool
|
static bool
|
||||||
SyncRepQueueIsOrderedByLSN(int mode)
|
SyncRepQueueIsOrderedByLSN(int mode)
|
||||||
{
|
{
|
||||||
PGPROC *proc = NULL;
|
|
||||||
XLogRecPtr lastLSN;
|
XLogRecPtr lastLSN;
|
||||||
|
dlist_iter iter;
|
||||||
|
|
||||||
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
||||||
|
|
||||||
lastLSN = 0;
|
lastLSN = 0;
|
||||||
|
|
||||||
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
|
||||||
&(WalSndCtl->SyncRepQueue[mode]),
|
|
||||||
offsetof(PGPROC, syncRepLinks));
|
|
||||||
|
|
||||||
while (proc)
|
|
||||||
{
|
{
|
||||||
|
PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check the queue is ordered by LSN and that multiple procs don't
|
* Check the queue is ordered by LSN and that multiple procs don't
|
||||||
* have matching LSNs
|
* have matching LSNs
|
||||||
@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
lastLSN = proc->waitLSN;
|
lastLSN = proc->waitLSN;
|
||||||
|
|
||||||
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
|
||||||
&(proc->syncRepLinks),
|
|
||||||
offsetof(PGPROC, syncRepLinks));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -3275,7 +3275,7 @@ WalSndShmemInit(void)
|
|||||||
MemSet(WalSndCtl, 0, WalSndShmemSize());
|
MemSet(WalSndCtl, 0, WalSndShmemSize());
|
||||||
|
|
||||||
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
|
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
|
||||||
SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
|
dlist_init(&(WalSndCtl->SyncRepQueue[i]));
|
||||||
|
|
||||||
for (i = 0; i < max_wal_senders; i++)
|
for (i = 0; i < max_wal_senders; i++)
|
||||||
{
|
{
|
||||||
|
@ -410,7 +410,7 @@ InitProcess(void)
|
|||||||
/* Initialize fields for sync rep */
|
/* Initialize fields for sync rep */
|
||||||
MyProc->waitLSN = 0;
|
MyProc->waitLSN = 0;
|
||||||
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
||||||
SHMQueueElemInit(&(MyProc->syncRepLinks));
|
dlist_node_init(&MyProc->syncRepLinks);
|
||||||
|
|
||||||
/* Initialize fields for group XID clearing. */
|
/* Initialize fields for group XID clearing. */
|
||||||
MyProc->procArrayGroupMember = false;
|
MyProc->procArrayGroupMember = false;
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#define _WALSENDER_PRIVATE_H
|
#define _WALSENDER_PRIVATE_H
|
||||||
|
|
||||||
#include "access/xlog.h"
|
#include "access/xlog.h"
|
||||||
|
#include "lib/ilist.h"
|
||||||
#include "nodes/nodes.h"
|
#include "nodes/nodes.h"
|
||||||
#include "replication/syncrep.h"
|
#include "replication/syncrep.h"
|
||||||
#include "storage/latch.h"
|
#include "storage/latch.h"
|
||||||
@ -89,7 +90,7 @@ typedef struct
|
|||||||
* Synchronous replication queue with one queue per request type.
|
* Synchronous replication queue with one queue per request type.
|
||||||
* Protected by SyncRepLock.
|
* Protected by SyncRepLock.
|
||||||
*/
|
*/
|
||||||
SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
|
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Current location of the head of the queue. All waiters should have a
|
* Current location of the head of the queue. All waiters should have a
|
||||||
|
@ -248,7 +248,7 @@ struct PGPROC
|
|||||||
*/
|
*/
|
||||||
XLogRecPtr waitLSN; /* waiting for this LSN or higher */
|
XLogRecPtr waitLSN; /* waiting for this LSN or higher */
|
||||||
int syncRepState; /* wait state for sync rep */
|
int syncRepState; /* wait state for sync rep */
|
||||||
SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */
|
dlist_node syncRepLinks; /* list link if process is in syncrep queue */
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* All PROCLOCK objects for locks held or awaited by this backend are
|
* All PROCLOCK objects for locks held or awaited by this backend are
|
||||||
|
Loading…
x
Reference in New Issue
Block a user