
Author: Kyotaro Horiguchi Discussion: https://postgr.es/m/20191030.123428.18823202335157111.horikyota.ntt@gmail.com
1215 lines
34 KiB
C
1215 lines
34 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* syncrep.c
|
|
*
|
|
* Synchronous replication is new as of PostgreSQL 9.1.
|
|
*
|
|
* If requested, transaction commits wait until their commit LSN are
|
|
* acknowledged by the synchronous standbys.
|
|
*
|
|
* This module contains the code for waiting and release of backends.
|
|
* All code in this module executes on the primary. The core streaming
|
|
* replication transport remains within WALreceiver/WALsender modules.
|
|
*
|
|
* The essence of this design is that it isolates all logic about
|
|
* waiting/releasing onto the primary. The primary defines which standbys
|
|
* it wishes to wait for. The standbys are completely unaware of the
|
|
* durability requirements of transactions on the primary, reducing the
|
|
* complexity of the code and streamlining both standby operations and
|
|
* network bandwidth because there is no requirement to ship
|
|
* per-transaction state information.
|
|
*
|
|
* Replication is either synchronous or not synchronous (async). If it is
|
|
* async, we just fastpath out of here. If it is sync, then we wait for
|
|
* the write, flush or apply location on the standby before releasing
|
|
* the waiting backend. Further complexity in that interaction is
|
|
* expected in later releases.
|
|
*
|
|
* The best performing way to manage the waiting backends is to have a
|
|
* single ordered queue of waiting backends, so that we can avoid
|
|
* searching the through all waiters each time we receive a reply.
|
|
*
|
|
* In 9.5 or before only a single standby could be considered as
|
|
* synchronous. In 9.6 we support a priority-based multiple synchronous
|
|
* standbys. In 10.0 a quorum-based multiple synchronous standbys is also
|
|
* supported. The number of synchronous standbys that transactions
|
|
* must wait for replies from is specified in synchronous_standby_names.
|
|
* This parameter also specifies a list of standby names and the method
|
|
* (FIRST and ANY) to choose synchronous standbys from the listed ones.
|
|
*
|
|
* The method FIRST specifies a priority-based synchronous replication
|
|
* and makes transaction commits wait until their WAL records are
|
|
* replicated to the requested number of synchronous standbys chosen based
|
|
* on their priorities. The standbys whose names appear earlier in the list
|
|
* are given higher priority and will be considered as synchronous.
|
|
* Other standby servers appearing later in this list represent potential
|
|
* synchronous standbys. If any of the current synchronous standbys
|
|
* disconnects for whatever reason, it will be replaced immediately with
|
|
* the next-highest-priority standby.
|
|
*
|
|
* The method ANY specifies a quorum-based synchronous replication
|
|
* and makes transaction commits wait until their WAL records are
|
|
* replicated to at least the requested number of synchronous standbys
|
|
* in the list. All the standbys appearing in the list are considered as
|
|
* candidates for quorum synchronous standbys.
|
|
*
|
|
* If neither FIRST nor ANY is specified, FIRST is used as the method.
|
|
* This is for backward compatibility with 9.6 or before where only a
|
|
* priority-based sync replication was supported.
|
|
*
|
|
* Before the standbys chosen from synchronous_standby_names can
|
|
* become the synchronous standbys they must have caught up with
|
|
* the primary; that may take some time. Once caught up,
|
|
* the standbys which are considered as synchronous at that moment
|
|
* will release waiters from the queue.
|
|
*
|
|
* Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/syncrep.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include <unistd.h>
|
|
|
|
#include "access/xact.h"
|
|
#include "miscadmin.h"
|
|
#include "pgstat.h"
|
|
#include "replication/syncrep.h"
|
|
#include "replication/walsender.h"
|
|
#include "replication/walsender_private.h"
|
|
#include "storage/pmsignal.h"
|
|
#include "storage/proc.h"
|
|
#include "tcop/tcopprot.h"
|
|
#include "utils/builtins.h"
|
|
#include "utils/ps_status.h"
|
|
|
|
/* User-settable parameters for sync rep */
|
|
char *SyncRepStandbyNames;
|
|
|
|
#define SyncStandbysDefined() \
|
|
(SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')
|
|
|
|
static bool announce_next_takeover = true;
|
|
|
|
SyncRepConfigData *SyncRepConfig = NULL;
|
|
static int SyncRepWaitMode = SYNC_REP_NO_WAIT;
|
|
|
|
static void SyncRepQueueInsert(int mode);
|
|
static void SyncRepCancelWait(void);
|
|
static int SyncRepWakeQueue(bool all, int mode);
|
|
|
|
static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
|
|
XLogRecPtr *flushPtr,
|
|
XLogRecPtr *applyPtr,
|
|
bool *am_sync);
|
|
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
|
|
XLogRecPtr *flushPtr,
|
|
XLogRecPtr *applyPtr,
|
|
List *sync_standbys);
|
|
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
|
|
XLogRecPtr *flushPtr,
|
|
XLogRecPtr *applyPtr,
|
|
List *sync_standbys, uint8 nth);
|
|
static int SyncRepGetStandbyPriority(void);
|
|
static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
|
|
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
|
|
static int cmp_lsn(const void *a, const void *b);
|
|
|
|
#ifdef USE_ASSERT_CHECKING
|
|
static bool SyncRepQueueIsOrderedByLSN(int mode);
|
|
#endif
|
|
|
|
/*
|
|
* ===========================================================
|
|
* Synchronous Replication functions for normal user backends
|
|
* ===========================================================
|
|
*/
|
|
|
|
/*
|
|
* Wait for synchronous replication, if requested by user.
|
|
*
|
|
* Initially backends start in state SYNC_REP_NOT_WAITING and then
|
|
* change that state to SYNC_REP_WAITING before adding ourselves
|
|
* to the wait queue. During SyncRepWakeQueue() a WALSender changes
|
|
* the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
|
|
* This backend then resets its state to SYNC_REP_NOT_WAITING.
|
|
*
|
|
* 'lsn' represents the LSN to wait for. 'commit' indicates whether this LSN
|
|
* represents a commit record. If it doesn't, then we wait only for the WAL
|
|
* to be flushed if synchronous_commit is set to the higher level of
|
|
* remote_apply, because only commit records provide apply feedback.
|
|
*/
|
|
void
|
|
SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
|
|
{
|
|
char *new_status = NULL;
|
|
const char *old_status;
|
|
int mode;
|
|
|
|
/* Cap the level for anything other than commit to remote flush only. */
|
|
if (commit)
|
|
mode = SyncRepWaitMode;
|
|
else
|
|
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
|
|
|
|
/*
|
|
* Fast exit if user has not requested sync replication.
|
|
*/
|
|
if (!SyncRepRequested())
|
|
return;
|
|
|
|
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
|
|
Assert(WalSndCtl != NULL);
|
|
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
|
|
|
|
/*
|
|
* We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
|
|
* set. See SyncRepUpdateSyncStandbysDefined.
|
|
*
|
|
* Also check that the standby hasn't already replied. Unlikely race
|
|
* condition but we'll be fetching that cache line anyway so it's likely
|
|
* to be a low cost check.
|
|
*/
|
|
if (!WalSndCtl->sync_standbys_defined ||
|
|
lsn <= WalSndCtl->lsn[mode])
|
|
{
|
|
LWLockRelease(SyncRepLock);
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Set our waitLSN so WALSender will know when to wake us, and add
|
|
* ourselves to the queue.
|
|
*/
|
|
MyProc->waitLSN = lsn;
|
|
MyProc->syncRepState = SYNC_REP_WAITING;
|
|
SyncRepQueueInsert(mode);
|
|
Assert(SyncRepQueueIsOrderedByLSN(mode));
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
/* Alter ps display to show waiting for sync rep. */
|
|
if (update_process_title)
|
|
{
|
|
int len;
|
|
|
|
old_status = get_ps_display(&len);
|
|
new_status = (char *) palloc(len + 32 + 1);
|
|
memcpy(new_status, old_status, len);
|
|
sprintf(new_status + len, " waiting for %X/%X",
|
|
(uint32) (lsn >> 32), (uint32) lsn);
|
|
set_ps_display(new_status, false);
|
|
new_status[len] = '\0'; /* truncate off " waiting ..." */
|
|
}
|
|
|
|
/*
|
|
* Wait for specified LSN to be confirmed.
|
|
*
|
|
* Each proc has its own wait latch, so we perform a normal latch
|
|
* check/wait loop here.
|
|
*/
|
|
for (;;)
|
|
{
|
|
int rc;
|
|
|
|
/* Must reset the latch before testing state. */
|
|
ResetLatch(MyLatch);
|
|
|
|
/*
|
|
* Acquiring the lock is not needed, the latch ensures proper
|
|
* barriers. If it looks like we're done, we must really be done,
|
|
* because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
|
|
* it will never update it again, so we can't be seeing a stale value
|
|
* in that case.
|
|
*/
|
|
if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
|
|
break;
|
|
|
|
/*
|
|
* If a wait for synchronous replication is pending, we can neither
|
|
* acknowledge the commit nor raise ERROR or FATAL. The latter would
|
|
* lead the client to believe that the transaction aborted, which is
|
|
* not true: it's already committed locally. The former is no good
|
|
* either: the client has requested synchronous replication, and is
|
|
* entitled to assume that an acknowledged commit is also replicated,
|
|
* which might not be true. So in this case we issue a WARNING (which
|
|
* some clients may be able to interpret) and shut off further output.
|
|
* We do NOT reset ProcDiePending, so that the process will die after
|
|
* the commit is cleaned up.
|
|
*/
|
|
if (ProcDiePending)
|
|
{
|
|
ereport(WARNING,
|
|
(errcode(ERRCODE_ADMIN_SHUTDOWN),
|
|
errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
|
|
errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
|
|
whereToSendOutput = DestNone;
|
|
SyncRepCancelWait();
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* It's unclear what to do if a query cancel interrupt arrives. We
|
|
* can't actually abort at this point, but ignoring the interrupt
|
|
* altogether is not helpful, so we just terminate the wait with a
|
|
* suitable warning.
|
|
*/
|
|
if (QueryCancelPending)
|
|
{
|
|
QueryCancelPending = false;
|
|
ereport(WARNING,
|
|
(errmsg("canceling wait for synchronous replication due to user request"),
|
|
errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
|
|
SyncRepCancelWait();
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* Wait on latch. Any condition that should wake us up will set the
|
|
* latch, so no need for timeout.
|
|
*/
|
|
rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
|
|
WAIT_EVENT_SYNC_REP);
|
|
|
|
/*
|
|
* If the postmaster dies, we'll probably never get an acknowledgment,
|
|
* because all the wal sender processes will exit. So just bail out.
|
|
*/
|
|
if (rc & WL_POSTMASTER_DEATH)
|
|
{
|
|
ProcDiePending = true;
|
|
whereToSendOutput = DestNone;
|
|
SyncRepCancelWait();
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* WalSender has checked our LSN and has removed us from queue. Clean up
|
|
* state and leave. It's OK to reset these shared memory fields without
|
|
* holding SyncRepLock, because any walsenders will ignore us anyway when
|
|
* we're not on the queue. We need a read barrier to make sure we see the
|
|
* changes to the queue link (this might be unnecessary without
|
|
* assertions, but better safe than sorry).
|
|
*/
|
|
pg_read_barrier();
|
|
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
|
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
|
MyProc->waitLSN = 0;
|
|
|
|
if (new_status)
|
|
{
|
|
/* Reset ps display */
|
|
set_ps_display(new_status, false);
|
|
pfree(new_status);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
|
|
*
|
|
* Usually we will go at tail of queue, though it's possible that we arrive
|
|
* here out of order, so start at tail and work back to insertion point.
|
|
*/
|
|
static void
|
|
SyncRepQueueInsert(int mode)
|
|
{
|
|
PGPROC *proc;
|
|
|
|
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
|
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
|
|
&(WalSndCtl->SyncRepQueue[mode]),
|
|
offsetof(PGPROC, syncRepLinks));
|
|
|
|
while (proc)
|
|
{
|
|
/*
|
|
* Stop at the queue element that we should after to ensure the queue
|
|
* is ordered by LSN.
|
|
*/
|
|
if (proc->waitLSN < MyProc->waitLSN)
|
|
break;
|
|
|
|
proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
|
|
&(proc->syncRepLinks),
|
|
offsetof(PGPROC, syncRepLinks));
|
|
}
|
|
|
|
if (proc)
|
|
SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
|
|
else
|
|
SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
|
|
}
|
|
|
|
/*
|
|
* Acquire SyncRepLock and cancel any wait currently in progress.
|
|
*/
|
|
static void
|
|
SyncRepCancelWait(void)
|
|
{
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
|
|
SHMQueueDelete(&(MyProc->syncRepLinks));
|
|
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
|
|
LWLockRelease(SyncRepLock);
|
|
}
|
|
|
|
void
|
|
SyncRepCleanupAtProcExit(void)
|
|
{
|
|
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
|
|
{
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
SHMQueueDelete(&(MyProc->syncRepLinks));
|
|
LWLockRelease(SyncRepLock);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* ===========================================================
|
|
* Synchronous Replication functions for wal sender processes
|
|
* ===========================================================
|
|
*/
|
|
|
|
/*
|
|
* Take any action required to initialise sync rep state from config
|
|
* data. Called at WALSender startup and after each SIGHUP.
|
|
*/
|
|
void
|
|
SyncRepInitConfig(void)
|
|
{
|
|
int priority;
|
|
|
|
/*
|
|
* Determine if we are a potential sync standby and remember the result
|
|
* for handling replies from standby.
|
|
*/
|
|
priority = SyncRepGetStandbyPriority();
|
|
if (MyWalSnd->sync_standby_priority != priority)
|
|
{
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
MyWalSnd->sync_standby_priority = priority;
|
|
LWLockRelease(SyncRepLock);
|
|
ereport(DEBUG1,
|
|
(errmsg("standby \"%s\" now has synchronous standby priority %u",
|
|
application_name, priority)));
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Update the LSNs on each queue based upon our latest state. This
|
|
* implements a simple policy of first-valid-sync-standby-releases-waiter.
|
|
*
|
|
* Other policies are possible, which would change what we do here and
|
|
* perhaps also which information we store as well.
|
|
*/
|
|
void
|
|
SyncRepReleaseWaiters(void)
|
|
{
|
|
volatile WalSndCtlData *walsndctl = WalSndCtl;
|
|
XLogRecPtr writePtr;
|
|
XLogRecPtr flushPtr;
|
|
XLogRecPtr applyPtr;
|
|
bool got_recptr;
|
|
bool am_sync;
|
|
int numwrite = 0;
|
|
int numflush = 0;
|
|
int numapply = 0;
|
|
|
|
/*
|
|
* If this WALSender is serving a standby that is not on the list of
|
|
* potential sync standbys then we have nothing to do. If we are still
|
|
* starting up, still running base backup or the current flush position is
|
|
* still invalid, then leave quickly also. Streaming or stopping WAL
|
|
* senders are allowed to release waiters.
|
|
*/
|
|
if (MyWalSnd->sync_standby_priority == 0 ||
|
|
(MyWalSnd->state != WALSNDSTATE_STREAMING &&
|
|
MyWalSnd->state != WALSNDSTATE_STOPPING) ||
|
|
XLogRecPtrIsInvalid(MyWalSnd->flush))
|
|
{
|
|
announce_next_takeover = true;
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* We're a potential sync standby. Release waiters if there are enough
|
|
* sync standbys and we are considered as sync.
|
|
*/
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
|
|
/*
|
|
* Check whether we are a sync standby or not, and calculate the synced
|
|
* positions among all sync standbys.
|
|
*/
|
|
got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
|
|
|
|
/*
|
|
* If we are managing a sync standby, though we weren't prior to this,
|
|
* then announce we are now a sync standby.
|
|
*/
|
|
if (announce_next_takeover && am_sync)
|
|
{
|
|
announce_next_takeover = false;
|
|
|
|
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
|
|
ereport(LOG,
|
|
(errmsg("standby \"%s\" is now a synchronous standby with priority %u",
|
|
application_name, MyWalSnd->sync_standby_priority)));
|
|
else
|
|
ereport(LOG,
|
|
(errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
|
|
application_name)));
|
|
}
|
|
|
|
/*
|
|
* If the number of sync standbys is less than requested or we aren't
|
|
* managing a sync standby then just leave.
|
|
*/
|
|
if (!got_recptr || !am_sync)
|
|
{
|
|
LWLockRelease(SyncRepLock);
|
|
announce_next_takeover = !am_sync;
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Set the lsn first so that when we wake backends they will release up to
|
|
* this location.
|
|
*/
|
|
if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
|
|
{
|
|
walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
|
|
numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
|
|
}
|
|
if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
|
|
{
|
|
walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
|
|
numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
|
|
}
|
|
if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
|
|
{
|
|
walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
|
|
numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
|
|
}
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
|
|
elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
|
|
numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
|
|
numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
|
|
numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
|
|
}
|
|
|
|
/*
|
|
* Calculate the synced Write, Flush and Apply positions among sync standbys.
|
|
*
|
|
* Return false if the number of sync standbys is less than
|
|
* synchronous_standby_names specifies. Otherwise return true and
|
|
* store the positions into *writePtr, *flushPtr and *applyPtr.
|
|
*
|
|
* On return, *am_sync is set to true if this walsender is connecting to
|
|
* sync standby. Otherwise it's set to false.
|
|
*/
|
|
static bool
|
|
SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
|
|
XLogRecPtr *applyPtr, bool *am_sync)
|
|
{
|
|
List *sync_standbys;
|
|
|
|
*writePtr = InvalidXLogRecPtr;
|
|
*flushPtr = InvalidXLogRecPtr;
|
|
*applyPtr = InvalidXLogRecPtr;
|
|
*am_sync = false;
|
|
|
|
/* Get standbys that are considered as synchronous at this moment */
|
|
sync_standbys = SyncRepGetSyncStandbys(am_sync);
|
|
|
|
/*
|
|
* Quick exit if we are not managing a sync standby or there are not
|
|
* enough synchronous standbys.
|
|
*/
|
|
if (!(*am_sync) ||
|
|
SyncRepConfig == NULL ||
|
|
list_length(sync_standbys) < SyncRepConfig->num_sync)
|
|
{
|
|
list_free(sync_standbys);
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* In a priority-based sync replication, the synced positions are the
|
|
* oldest ones among sync standbys. In a quorum-based, they are the Nth
|
|
* latest ones.
|
|
*
|
|
* SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
|
|
* positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
|
|
* because it's a bit more efficient.
|
|
*
|
|
* XXX If the numbers of current and requested sync standbys are the same,
|
|
* we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
|
|
* positions even in a quorum-based sync replication.
|
|
*/
|
|
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
|
|
{
|
|
SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
|
|
sync_standbys);
|
|
}
|
|
else
|
|
{
|
|
SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
|
|
sync_standbys, SyncRepConfig->num_sync);
|
|
}
|
|
|
|
list_free(sync_standbys);
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
|
|
*/
|
|
static void
|
|
SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
|
|
XLogRecPtr *applyPtr, List *sync_standbys)
|
|
{
|
|
ListCell *cell;
|
|
|
|
/*
|
|
* Scan through all sync standbys and calculate the oldest Write, Flush
|
|
* and Apply positions.
|
|
*/
|
|
foreach(cell, sync_standbys)
|
|
{
|
|
WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
|
|
XLogRecPtr write;
|
|
XLogRecPtr flush;
|
|
XLogRecPtr apply;
|
|
|
|
SpinLockAcquire(&walsnd->mutex);
|
|
write = walsnd->write;
|
|
flush = walsnd->flush;
|
|
apply = walsnd->apply;
|
|
SpinLockRelease(&walsnd->mutex);
|
|
|
|
if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
|
|
*writePtr = write;
|
|
if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
|
|
*flushPtr = flush;
|
|
if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
|
|
*applyPtr = apply;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Calculate the Nth latest Write, Flush and Apply positions among sync
|
|
* standbys.
|
|
*/
|
|
static void
|
|
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
|
|
XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
|
|
{
|
|
ListCell *cell;
|
|
XLogRecPtr *write_array;
|
|
XLogRecPtr *flush_array;
|
|
XLogRecPtr *apply_array;
|
|
int len;
|
|
int i = 0;
|
|
|
|
len = list_length(sync_standbys);
|
|
write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
|
|
flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
|
|
apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
|
|
|
|
foreach(cell, sync_standbys)
|
|
{
|
|
WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
|
|
|
|
SpinLockAcquire(&walsnd->mutex);
|
|
write_array[i] = walsnd->write;
|
|
flush_array[i] = walsnd->flush;
|
|
apply_array[i] = walsnd->apply;
|
|
SpinLockRelease(&walsnd->mutex);
|
|
|
|
i++;
|
|
}
|
|
|
|
/* Sort each array in descending order */
|
|
qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
|
|
qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
|
|
qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
|
|
|
|
/* Get Nth latest Write, Flush, Apply positions */
|
|
*writePtr = write_array[nth - 1];
|
|
*flushPtr = flush_array[nth - 1];
|
|
*applyPtr = apply_array[nth - 1];
|
|
|
|
pfree(write_array);
|
|
pfree(flush_array);
|
|
pfree(apply_array);
|
|
}
|
|
|
|
/*
|
|
* Compare lsn in order to sort array in descending order.
|
|
*/
|
|
static int
|
|
cmp_lsn(const void *a, const void *b)
|
|
{
|
|
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
|
|
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
|
|
|
|
if (lsn1 > lsn2)
|
|
return -1;
|
|
else if (lsn1 == lsn2)
|
|
return 0;
|
|
else
|
|
return 1;
|
|
}
|
|
|
|
/*
|
|
* Return the list of sync standbys, or NIL if no sync standby is connected.
|
|
*
|
|
* The caller must hold SyncRepLock.
|
|
*
|
|
* On return, *am_sync is set to true if this walsender is connecting to
|
|
* sync standby. Otherwise it's set to false.
|
|
*/
|
|
List *
|
|
SyncRepGetSyncStandbys(bool *am_sync)
|
|
{
|
|
/* Set default result */
|
|
if (am_sync != NULL)
|
|
*am_sync = false;
|
|
|
|
/* Quick exit if sync replication is not requested */
|
|
if (SyncRepConfig == NULL)
|
|
return NIL;
|
|
|
|
return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
|
|
SyncRepGetSyncStandbysPriority(am_sync) :
|
|
SyncRepGetSyncStandbysQuorum(am_sync);
|
|
}
|
|
|
|
/*
|
|
* Return the list of all the candidates for quorum sync standbys,
|
|
* or NIL if no such standby is connected.
|
|
*
|
|
* The caller must hold SyncRepLock. This function must be called only in
|
|
* a quorum-based sync replication.
|
|
*
|
|
* On return, *am_sync is set to true if this walsender is connecting to
|
|
* sync standby. Otherwise it's set to false.
|
|
*/
|
|
static List *
|
|
SyncRepGetSyncStandbysQuorum(bool *am_sync)
|
|
{
|
|
List *result = NIL;
|
|
int i;
|
|
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
|
|
* rearrangement */
|
|
|
|
Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);
|
|
|
|
for (i = 0; i < max_wal_senders; i++)
|
|
{
|
|
XLogRecPtr flush;
|
|
WalSndState state;
|
|
int pid;
|
|
|
|
walsnd = &WalSndCtl->walsnds[i];
|
|
|
|
SpinLockAcquire(&walsnd->mutex);
|
|
pid = walsnd->pid;
|
|
flush = walsnd->flush;
|
|
state = walsnd->state;
|
|
SpinLockRelease(&walsnd->mutex);
|
|
|
|
/* Must be active */
|
|
if (pid == 0)
|
|
continue;
|
|
|
|
/* Must be streaming or stopping */
|
|
if (state != WALSNDSTATE_STREAMING &&
|
|
state != WALSNDSTATE_STOPPING)
|
|
continue;
|
|
|
|
/* Must be synchronous */
|
|
if (walsnd->sync_standby_priority == 0)
|
|
continue;
|
|
|
|
/* Must have a valid flush position */
|
|
if (XLogRecPtrIsInvalid(flush))
|
|
continue;
|
|
|
|
/*
|
|
* Consider this standby as a candidate for quorum sync standbys and
|
|
* append it to the result.
|
|
*/
|
|
result = lappend_int(result, i);
|
|
if (am_sync != NULL && walsnd == MyWalSnd)
|
|
*am_sync = true;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Return the list of sync standbys chosen based on their priorities,
|
|
* or NIL if no sync standby is connected.
|
|
*
|
|
* If there are multiple standbys with the same priority,
|
|
* the first one found is selected preferentially.
|
|
*
|
|
* The caller must hold SyncRepLock. This function must be called only in
|
|
* a priority-based sync replication.
|
|
*
|
|
* On return, *am_sync is set to true if this walsender is connecting to
|
|
* sync standby. Otherwise it's set to false.
|
|
*/
|
|
static List *
|
|
SyncRepGetSyncStandbysPriority(bool *am_sync)
|
|
{
|
|
List *result = NIL;
|
|
List *pending = NIL;
|
|
int lowest_priority;
|
|
int next_highest_priority;
|
|
int this_priority;
|
|
int priority;
|
|
int i;
|
|
bool am_in_pending = false;
|
|
volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
|
|
* rearrangement */
|
|
|
|
Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);
|
|
|
|
lowest_priority = SyncRepConfig->nmembers;
|
|
next_highest_priority = lowest_priority + 1;
|
|
|
|
/*
|
|
* Find the sync standbys which have the highest priority (i.e, 1). Also
|
|
* store all the other potential sync standbys into the pending list, in
|
|
* order to scan it later and find other sync standbys from it quickly.
|
|
*/
|
|
for (i = 0; i < max_wal_senders; i++)
|
|
{
|
|
XLogRecPtr flush;
|
|
WalSndState state;
|
|
int pid;
|
|
|
|
walsnd = &WalSndCtl->walsnds[i];
|
|
|
|
SpinLockAcquire(&walsnd->mutex);
|
|
pid = walsnd->pid;
|
|
flush = walsnd->flush;
|
|
state = walsnd->state;
|
|
SpinLockRelease(&walsnd->mutex);
|
|
|
|
/* Must be active */
|
|
if (pid == 0)
|
|
continue;
|
|
|
|
/* Must be streaming or stopping */
|
|
if (state != WALSNDSTATE_STREAMING &&
|
|
state != WALSNDSTATE_STOPPING)
|
|
continue;
|
|
|
|
/* Must be synchronous */
|
|
this_priority = walsnd->sync_standby_priority;
|
|
if (this_priority == 0)
|
|
continue;
|
|
|
|
/* Must have a valid flush position */
|
|
if (XLogRecPtrIsInvalid(flush))
|
|
continue;
|
|
|
|
/*
|
|
* If the priority is equal to 1, consider this standby as sync and
|
|
* append it to the result. Otherwise append this standby to the
|
|
* pending list to check if it's actually sync or not later.
|
|
*/
|
|
if (this_priority == 1)
|
|
{
|
|
result = lappend_int(result, i);
|
|
if (am_sync != NULL && walsnd == MyWalSnd)
|
|
*am_sync = true;
|
|
if (list_length(result) == SyncRepConfig->num_sync)
|
|
{
|
|
list_free(pending);
|
|
return result; /* Exit if got enough sync standbys */
|
|
}
|
|
}
|
|
else
|
|
{
|
|
pending = lappend_int(pending, i);
|
|
if (am_sync != NULL && walsnd == MyWalSnd)
|
|
am_in_pending = true;
|
|
|
|
/*
|
|
* Track the highest priority among the standbys in the pending
|
|
* list, in order to use it as the starting priority for later
|
|
* scan of the list. This is useful to find quickly the sync
|
|
* standbys from the pending list later because we can skip
|
|
* unnecessary scans for the unused priorities.
|
|
*/
|
|
if (this_priority < next_highest_priority)
|
|
next_highest_priority = this_priority;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Consider all pending standbys as sync if the number of them plus
|
|
* already-found sync ones is lower than the configuration requests.
|
|
*/
|
|
if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
|
|
{
|
|
/*
|
|
* Set *am_sync to true if this walsender is in the pending list
|
|
* because all pending standbys are considered as sync.
|
|
*/
|
|
if (am_sync != NULL && !(*am_sync))
|
|
*am_sync = am_in_pending;
|
|
|
|
result = list_concat(result, pending);
|
|
list_free(pending);
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Find the sync standbys from the pending list.
|
|
*/
|
|
priority = next_highest_priority;
|
|
while (priority <= lowest_priority)
|
|
{
|
|
ListCell *cell;
|
|
|
|
next_highest_priority = lowest_priority + 1;
|
|
|
|
foreach(cell, pending)
|
|
{
|
|
i = lfirst_int(cell);
|
|
walsnd = &WalSndCtl->walsnds[i];
|
|
|
|
this_priority = walsnd->sync_standby_priority;
|
|
if (this_priority == priority)
|
|
{
|
|
result = lappend_int(result, i);
|
|
if (am_sync != NULL && walsnd == MyWalSnd)
|
|
*am_sync = true;
|
|
|
|
/*
|
|
* We should always exit here after the scan of pending list
|
|
* starts because we know that the list has enough elements to
|
|
* reach SyncRepConfig->num_sync.
|
|
*/
|
|
if (list_length(result) == SyncRepConfig->num_sync)
|
|
{
|
|
list_free(pending);
|
|
return result; /* Exit if got enough sync standbys */
|
|
}
|
|
|
|
/*
|
|
* Remove the entry for this sync standby from the list to
|
|
* prevent us from looking at the same entry again.
|
|
*/
|
|
pending = foreach_delete_current(pending, cell);
|
|
|
|
continue; /* don't adjust next_highest_priority */
|
|
}
|
|
|
|
if (this_priority < next_highest_priority)
|
|
next_highest_priority = this_priority;
|
|
}
|
|
|
|
priority = next_highest_priority;
|
|
}
|
|
|
|
/* never reached, but keep compiler quiet */
|
|
Assert(false);
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Check if we are in the list of sync standbys, and if so, determine
|
|
* priority sequence. Return priority if set, or zero to indicate that
|
|
* we are not a potential sync standby.
|
|
*
|
|
* Compare the parameter SyncRepStandbyNames against the application_name
|
|
* for this WALSender, or allow any name if we find a wildcard "*".
|
|
*/
|
|
static int
|
|
SyncRepGetStandbyPriority(void)
|
|
{
|
|
const char *standby_name;
|
|
int priority;
|
|
bool found = false;
|
|
|
|
/*
|
|
* Since synchronous cascade replication is not allowed, we always set the
|
|
* priority of cascading walsender to zero.
|
|
*/
|
|
if (am_cascading_walsender)
|
|
return 0;
|
|
|
|
if (!SyncStandbysDefined() || SyncRepConfig == NULL)
|
|
return 0;
|
|
|
|
standby_name = SyncRepConfig->member_names;
|
|
for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
|
|
{
|
|
if (pg_strcasecmp(standby_name, application_name) == 0 ||
|
|
strcmp(standby_name, "*") == 0)
|
|
{
|
|
found = true;
|
|
break;
|
|
}
|
|
standby_name += strlen(standby_name) + 1;
|
|
}
|
|
|
|
if (!found)
|
|
return 0;
|
|
|
|
/*
|
|
* In quorum-based sync replication, all the standbys in the list have the
|
|
* same priority, one.
|
|
*/
|
|
return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
|
|
}
|
|
|
|
/*
|
|
* Walk the specified queue from head. Set the state of any backends that
|
|
* need to be woken, remove them from the queue, and then wake them.
|
|
* Pass all = true to wake whole queue; otherwise, just wake up to
|
|
* the walsender's LSN.
|
|
*
|
|
* Must hold SyncRepLock.
|
|
*/
|
|
static int
|
|
SyncRepWakeQueue(bool all, int mode)
|
|
{
|
|
volatile WalSndCtlData *walsndctl = WalSndCtl;
|
|
PGPROC *proc = NULL;
|
|
PGPROC *thisproc = NULL;
|
|
int numprocs = 0;
|
|
|
|
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
|
Assert(SyncRepQueueIsOrderedByLSN(mode));
|
|
|
|
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
|
&(WalSndCtl->SyncRepQueue[mode]),
|
|
offsetof(PGPROC, syncRepLinks));
|
|
|
|
while (proc)
|
|
{
|
|
/*
|
|
* Assume the queue is ordered by LSN
|
|
*/
|
|
if (!all && walsndctl->lsn[mode] < proc->waitLSN)
|
|
return numprocs;
|
|
|
|
/*
|
|
* Move to next proc, so we can delete thisproc from the queue.
|
|
* thisproc is valid, proc may be NULL after this.
|
|
*/
|
|
thisproc = proc;
|
|
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
|
&(proc->syncRepLinks),
|
|
offsetof(PGPROC, syncRepLinks));
|
|
|
|
/*
|
|
* Remove thisproc from queue.
|
|
*/
|
|
SHMQueueDelete(&(thisproc->syncRepLinks));
|
|
|
|
/*
|
|
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
|
|
* make sure that it sees the queue link being removed before the
|
|
* syncRepState change.
|
|
*/
|
|
pg_write_barrier();
|
|
|
|
/*
|
|
* Set state to complete; see SyncRepWaitForLSN() for discussion of
|
|
* the various states.
|
|
*/
|
|
thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
|
|
|
|
/*
|
|
* Wake only when we have set state and removed from queue.
|
|
*/
|
|
SetLatch(&(thisproc->procLatch));
|
|
|
|
numprocs++;
|
|
}
|
|
|
|
return numprocs;
|
|
}
|
|
|
|
/*
|
|
* The checkpointer calls this as needed to update the shared
|
|
* sync_standbys_defined flag, so that backends don't remain permanently wedged
|
|
* if synchronous_standby_names is unset. It's safe to check the current value
|
|
* without the lock, because it's only ever updated by one process. But we
|
|
* must take the lock to change it.
|
|
*/
|
|
void
|
|
SyncRepUpdateSyncStandbysDefined(void)
|
|
{
|
|
bool sync_standbys_defined = SyncStandbysDefined();
|
|
|
|
if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
|
|
{
|
|
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
|
|
|
|
/*
|
|
* If synchronous_standby_names has been reset to empty, it's futile
|
|
* for backends to continue waiting. Since the user no longer wants
|
|
* synchronous replication, we'd better wake them up.
|
|
*/
|
|
if (!sync_standbys_defined)
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
|
|
SyncRepWakeQueue(true, i);
|
|
}
|
|
|
|
/*
|
|
* Only allow people to join the queue when there are synchronous
|
|
* standbys defined. Without this interlock, there's a race
|
|
* condition: we might wake up all the current waiters; then, some
|
|
* backend that hasn't yet reloaded its config might go to sleep on
|
|
* the queue (and never wake up). This prevents that.
|
|
*/
|
|
WalSndCtl->sync_standbys_defined = sync_standbys_defined;
|
|
|
|
LWLockRelease(SyncRepLock);
|
|
}
|
|
}
|
|
|
|
#ifdef USE_ASSERT_CHECKING
|
|
static bool
|
|
SyncRepQueueIsOrderedByLSN(int mode)
|
|
{
|
|
PGPROC *proc = NULL;
|
|
XLogRecPtr lastLSN;
|
|
|
|
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
|
|
|
|
lastLSN = 0;
|
|
|
|
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
|
&(WalSndCtl->SyncRepQueue[mode]),
|
|
offsetof(PGPROC, syncRepLinks));
|
|
|
|
while (proc)
|
|
{
|
|
/*
|
|
* Check the queue is ordered by LSN and that multiple procs don't
|
|
* have matching LSNs
|
|
*/
|
|
if (proc->waitLSN <= lastLSN)
|
|
return false;
|
|
|
|
lastLSN = proc->waitLSN;
|
|
|
|
proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
|
|
&(proc->syncRepLinks),
|
|
offsetof(PGPROC, syncRepLinks));
|
|
}
|
|
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
/*
|
|
* ===========================================================
|
|
* Synchronous Replication functions executed by any process
|
|
* ===========================================================
|
|
*/
|
|
|
|
bool
|
|
check_synchronous_standby_names(char **newval, void **extra, GucSource source)
|
|
{
|
|
if (*newval != NULL && (*newval)[0] != '\0')
|
|
{
|
|
int parse_rc;
|
|
SyncRepConfigData *pconf;
|
|
|
|
/* Reset communication variables to ensure a fresh start */
|
|
syncrep_parse_result = NULL;
|
|
syncrep_parse_error_msg = NULL;
|
|
|
|
/* Parse the synchronous_standby_names string */
|
|
syncrep_scanner_init(*newval);
|
|
parse_rc = syncrep_yyparse();
|
|
syncrep_scanner_finish();
|
|
|
|
if (parse_rc != 0 || syncrep_parse_result == NULL)
|
|
{
|
|
GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
|
|
if (syncrep_parse_error_msg)
|
|
GUC_check_errdetail("%s", syncrep_parse_error_msg);
|
|
else
|
|
GUC_check_errdetail("synchronous_standby_names parser failed");
|
|
return false;
|
|
}
|
|
|
|
if (syncrep_parse_result->num_sync <= 0)
|
|
{
|
|
GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
|
|
syncrep_parse_result->num_sync);
|
|
return false;
|
|
}
|
|
|
|
/* GUC extra value must be malloc'd, not palloc'd */
|
|
pconf = (SyncRepConfigData *)
|
|
malloc(syncrep_parse_result->config_size);
|
|
if (pconf == NULL)
|
|
return false;
|
|
memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);
|
|
|
|
*extra = (void *) pconf;
|
|
|
|
/*
|
|
* We need not explicitly clean up syncrep_parse_result. It, and any
|
|
* other cruft generated during parsing, will be freed when the
|
|
* current memory context is deleted. (This code is generally run in
|
|
* a short-lived context used for config file processing, so that will
|
|
* not be very long.)
|
|
*/
|
|
}
|
|
else
|
|
*extra = NULL;
|
|
|
|
return true;
|
|
}
|
|
|
|
void
|
|
assign_synchronous_standby_names(const char *newval, void *extra)
|
|
{
|
|
SyncRepConfig = (SyncRepConfigData *) extra;
|
|
}
|
|
|
|
void
|
|
assign_synchronous_commit(int newval, void *extra)
|
|
{
|
|
switch (newval)
|
|
{
|
|
case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
|
|
SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
|
|
break;
|
|
case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
|
|
SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
|
|
break;
|
|
case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
|
|
SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
|
|
break;
|
|
default:
|
|
SyncRepWaitMode = SYNC_REP_NO_WAIT;
|
|
break;
|
|
}
|
|
}
|