mirror of https://github.com/postgres/postgres
Reduce memory consumption for pending invalidation messages.
The existing data structures in inval.c are fairly inefficient for the common case of a command or subtransaction that registers a small number of cache invalidation events. While this doesn't matter if we commit right away, it can build up to a lot of bloat in a transaction that contains many DDL operations. By making a few more assumptions about the expected use-case, we can switch to a representation using densely-packed arrays. Although this eliminates some data-copying, it doesn't seem to make much difference time-wise. But the space consumption decreases substantially. Patch by me; thanks to Nathan Bossart for review. Discussion: https://postgr.es/m/2380555.1622395376@sss.pgh.pa.us
This commit is contained in:
parent
069d33d0c5
commit
3aafc030a5
|
@ -71,11 +71,6 @@
|
|||
* manipulating the init file is in relcache.c, but we keep track of the
|
||||
* need for it here.
|
||||
*
|
||||
* The request lists proper are kept in CurTransactionContext of their
|
||||
* creating (sub)transaction, since they can be forgotten on abort of that
|
||||
* transaction but must be kept till top-level commit otherwise. For
|
||||
* simplicity we keep the controlling list-of-lists in TopTransactionContext.
|
||||
*
|
||||
* Currently, inval messages are sent without regard for the possibility
|
||||
* that the object described by the catalog tuple might be a session-local
|
||||
* object such as a temporary table. This is because (1) this code has
|
||||
|
@ -106,7 +101,6 @@
|
|||
#include "catalog/catalog.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#include "miscadmin.h"
|
||||
#include "port/pg_bitutils.h"
|
||||
#include "storage/sinval.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/catcache.h"
|
||||
|
@ -121,35 +115,86 @@
|
|||
|
||||
|
||||
/*
|
||||
* To minimize palloc traffic, we keep pending requests in successively-
|
||||
* larger chunks (a slightly more sophisticated version of an expansible
|
||||
* array). All request types can be stored as SharedInvalidationMessage
|
||||
* records. The ordering of requests within a list is never significant.
|
||||
* Pending requests are stored as ready-to-send SharedInvalidationMessages.
|
||||
* We keep the messages themselves in arrays in TopTransactionContext
|
||||
* (there are separate arrays for catcache and relcache messages). Control
|
||||
* information is kept in a chain of TransInvalidationInfo structs, also
|
||||
* allocated in TopTransactionContext. (We could keep a subtransaction's
|
||||
* TransInvalidationInfo in its CurTransactionContext; but that's more
|
||||
* wasteful not less so, since in very many scenarios it'd be the only
|
||||
* allocation in the subtransaction's CurTransactionContext.)
|
||||
*
|
||||
* We can store the message arrays densely, and yet avoid moving data around
|
||||
* within an array, because within any one subtransaction we need only
|
||||
* distinguish between messages emitted by prior commands and those emitted
|
||||
* by the current command. Once a command completes and we've done local
|
||||
* processing on its messages, we can fold those into the prior-commands
|
||||
* messages just by changing array indexes in the TransInvalidationInfo
|
||||
* struct. Similarly, we need distinguish messages of prior subtransactions
|
||||
* from those of the current subtransaction only until the subtransaction
|
||||
* completes, after which we adjust the array indexes in the parent's
|
||||
* TransInvalidationInfo to include the subtransaction's messages.
|
||||
*
|
||||
* The ordering of the individual messages within a command's or
|
||||
* subtransaction's output is not considered significant, although this
|
||||
* implementation happens to preserve the order in which they were queued.
|
||||
* (Previous versions of this code did not preserve it.)
|
||||
*
|
||||
* For notational convenience, control information is kept in two-element
|
||||
* arrays, the first for catcache messages and the second for relcache
|
||||
* messages.
|
||||
*/
|
||||
typedef struct InvalidationChunk
|
||||
{
|
||||
struct InvalidationChunk *next; /* list link */
|
||||
int nitems; /* # items currently stored in chunk */
|
||||
int maxitems; /* size of allocated array in this chunk */
|
||||
SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
|
||||
} InvalidationChunk;
|
||||
#define CatCacheMsgs 0
|
||||
#define RelCacheMsgs 1
|
||||
|
||||
typedef struct InvalidationListHeader
|
||||
/* Pointers to main arrays in TopTransactionContext */
|
||||
typedef struct InvalMessageArray
|
||||
{
|
||||
InvalidationChunk *cclist; /* list of chunks holding catcache msgs */
|
||||
InvalidationChunk *rclist; /* list of chunks holding relcache msgs */
|
||||
} InvalidationListHeader;
|
||||
SharedInvalidationMessage *msgs; /* palloc'd array (can be expanded) */
|
||||
int maxmsgs; /* current allocated size of array */
|
||||
} InvalMessageArray;
|
||||
|
||||
static InvalMessageArray InvalMessageArrays[2];
|
||||
|
||||
/* Control information for one logical group of messages */
|
||||
typedef struct InvalidationMsgsGroup
|
||||
{
|
||||
int firstmsg[2]; /* first index in relevant array */
|
||||
int nextmsg[2]; /* last+1 index */
|
||||
} InvalidationMsgsGroup;
|
||||
|
||||
/* Macros to help preserve InvalidationMsgsGroup abstraction */
|
||||
#define SetSubGroupToFollow(targetgroup, priorgroup, subgroup) \
|
||||
do { \
|
||||
(targetgroup)->firstmsg[subgroup] = \
|
||||
(targetgroup)->nextmsg[subgroup] = \
|
||||
(priorgroup)->nextmsg[subgroup]; \
|
||||
} while (0)
|
||||
|
||||
#define SetGroupToFollow(targetgroup, priorgroup) \
|
||||
do { \
|
||||
SetSubGroupToFollow(targetgroup, priorgroup, CatCacheMsgs); \
|
||||
SetSubGroupToFollow(targetgroup, priorgroup, RelCacheMsgs); \
|
||||
} while (0)
|
||||
|
||||
#define NumMessagesInSubGroup(group, subgroup) \
|
||||
((group)->nextmsg[subgroup] - (group)->firstmsg[subgroup])
|
||||
|
||||
#define NumMessagesInGroup(group) \
|
||||
(NumMessagesInSubGroup(group, CatCacheMsgs) + \
|
||||
NumMessagesInSubGroup(group, RelCacheMsgs))
|
||||
|
||||
|
||||
/*----------------
|
||||
* Invalidation info is divided into two lists:
|
||||
* Invalidation messages are divided into two groups:
|
||||
* 1) events so far in current command, not yet reflected to caches.
|
||||
* 2) events in previous commands of current transaction; these have
|
||||
* been reflected to local caches, and must be either broadcast to
|
||||
* other backends or rolled back from local cache when we commit
|
||||
* or abort the transaction.
|
||||
* Actually, we need two such lists for each level of nested transaction,
|
||||
* Actually, we need such groups for each level of nested transaction,
|
||||
* so that we can discard events from an aborted subtransaction. When
|
||||
* a subtransaction commits, we append its lists to the parent's lists.
|
||||
* a subtransaction commits, we append its events to the parent's groups.
|
||||
*
|
||||
* The relcache-file-invalidated flag can just be a simple boolean,
|
||||
* since we only act on it at transaction commit; we don't care which
|
||||
|
@ -165,11 +210,11 @@ typedef struct TransInvalidationInfo
|
|||
/* Subtransaction nesting depth */
|
||||
int my_level;
|
||||
|
||||
/* head of current-command event list */
|
||||
InvalidationListHeader CurrentCmdInvalidMsgs;
|
||||
/* Events emitted by current command */
|
||||
InvalidationMsgsGroup CurrentCmdInvalidMsgs;
|
||||
|
||||
/* head of previous-commands event list */
|
||||
InvalidationListHeader PriorCmdInvalidMsgs;
|
||||
/* Events emitted by previous commands of this (sub)transaction */
|
||||
InvalidationMsgsGroup PriorCmdInvalidMsgs;
|
||||
|
||||
/* init file must be invalidated? */
|
||||
bool RelcacheInitFileInval;
|
||||
|
@ -177,10 +222,6 @@ typedef struct TransInvalidationInfo
|
|||
|
||||
static TransInvalidationInfo *transInvalInfo = NULL;
|
||||
|
||||
static SharedInvalidationMessage *SharedInvalidMessagesArray;
|
||||
static int numSharedInvalidMessagesArray;
|
||||
static int maxSharedInvalidMessagesArray;
|
||||
|
||||
/* GUC storage */
|
||||
int debug_discard_caches = 0;
|
||||
|
||||
|
@ -218,124 +259,118 @@ static struct RELCACHECALLBACK
|
|||
static int relcache_callback_count = 0;
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* Invalidation list support functions
|
||||
*
|
||||
* These three routines encapsulate processing of the "chunked"
|
||||
* representation of what is logically just a list of messages.
|
||||
* Invalidation subgroup support functions
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* AddInvalidationMessage
|
||||
* Add an invalidation message to a list (of chunks).
|
||||
* Add an invalidation message to a (sub)group.
|
||||
*
|
||||
* Note that we do not pay any great attention to maintaining the original
|
||||
* ordering of the messages.
|
||||
* The group must be the last active one, since we assume we can add to the
|
||||
* end of the relevant InvalMessageArray.
|
||||
*
|
||||
* subgroup must be CatCacheMsgs or RelCacheMsgs.
|
||||
*/
|
||||
static void
|
||||
AddInvalidationMessage(InvalidationChunk **listHdr,
|
||||
SharedInvalidationMessage *msg)
|
||||
AddInvalidationMessage(InvalidationMsgsGroup *group, int subgroup,
|
||||
const SharedInvalidationMessage *msg)
|
||||
{
|
||||
InvalidationChunk *chunk = *listHdr;
|
||||
InvalMessageArray *ima = &InvalMessageArrays[subgroup];
|
||||
int nextindex = group->nextmsg[subgroup];
|
||||
|
||||
if (chunk == NULL)
|
||||
if (nextindex >= ima->maxmsgs)
|
||||
{
|
||||
/* First time through; create initial chunk */
|
||||
#define FIRSTCHUNKSIZE 32
|
||||
chunk = (InvalidationChunk *)
|
||||
MemoryContextAlloc(CurTransactionContext,
|
||||
offsetof(InvalidationChunk, msgs) +
|
||||
FIRSTCHUNKSIZE * sizeof(SharedInvalidationMessage));
|
||||
chunk->nitems = 0;
|
||||
chunk->maxitems = FIRSTCHUNKSIZE;
|
||||
chunk->next = *listHdr;
|
||||
*listHdr = chunk;
|
||||
}
|
||||
else if (chunk->nitems >= chunk->maxitems)
|
||||
{
|
||||
/* Need another chunk; double size of last chunk */
|
||||
int chunksize = 2 * chunk->maxitems;
|
||||
if (ima->msgs == NULL)
|
||||
{
|
||||
/* Create new storage array in TopTransactionContext */
|
||||
int reqsize = 32; /* arbitrary */
|
||||
|
||||
chunk = (InvalidationChunk *)
|
||||
MemoryContextAlloc(CurTransactionContext,
|
||||
offsetof(InvalidationChunk, msgs) +
|
||||
chunksize * sizeof(SharedInvalidationMessage));
|
||||
chunk->nitems = 0;
|
||||
chunk->maxitems = chunksize;
|
||||
chunk->next = *listHdr;
|
||||
*listHdr = chunk;
|
||||
ima->msgs = (SharedInvalidationMessage *)
|
||||
MemoryContextAlloc(TopTransactionContext,
|
||||
reqsize * sizeof(SharedInvalidationMessage));
|
||||
ima->maxmsgs = reqsize;
|
||||
Assert(nextindex == 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Enlarge storage array */
|
||||
int reqsize = 2 * ima->maxmsgs;
|
||||
|
||||
ima->msgs = (SharedInvalidationMessage *)
|
||||
repalloc(ima->msgs,
|
||||
reqsize * sizeof(SharedInvalidationMessage));
|
||||
ima->maxmsgs = reqsize;
|
||||
}
|
||||
}
|
||||
/* Okay, add message to current chunk */
|
||||
chunk->msgs[chunk->nitems] = *msg;
|
||||
chunk->nitems++;
|
||||
/* Okay, add message to current group */
|
||||
ima->msgs[nextindex] = *msg;
|
||||
group->nextmsg[subgroup]++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Append one list of invalidation message chunks to another, resetting
|
||||
* the source chunk-list pointer to NULL.
|
||||
* Append one subgroup of invalidation messages to another, resetting
|
||||
* the source subgroup to empty.
|
||||
*/
|
||||
static void
|
||||
AppendInvalidationMessageList(InvalidationChunk **destHdr,
|
||||
InvalidationChunk **srcHdr)
|
||||
AppendInvalidationMessageSubGroup(InvalidationMsgsGroup *dest,
|
||||
InvalidationMsgsGroup *src,
|
||||
int subgroup)
|
||||
{
|
||||
InvalidationChunk *chunk = *srcHdr;
|
||||
/* Messages must be adjacent in main array */
|
||||
Assert(dest->nextmsg[subgroup] == src->firstmsg[subgroup]);
|
||||
|
||||
if (chunk == NULL)
|
||||
return; /* nothing to do */
|
||||
/* ... which makes this easy: */
|
||||
dest->nextmsg[subgroup] = src->nextmsg[subgroup];
|
||||
|
||||
while (chunk->next != NULL)
|
||||
chunk = chunk->next;
|
||||
|
||||
chunk->next = *destHdr;
|
||||
|
||||
*destHdr = *srcHdr;
|
||||
|
||||
*srcHdr = NULL;
|
||||
/*
|
||||
* This is handy for some callers and irrelevant for others. But we do it
|
||||
* always, reasoning that it's bad to leave different groups pointing at
|
||||
* the same fragment of the message array.
|
||||
*/
|
||||
SetSubGroupToFollow(src, dest, subgroup);
|
||||
}
|
||||
|
||||
/*
|
||||
* Process a list of invalidation messages.
|
||||
* Process a subgroup of invalidation messages.
|
||||
*
|
||||
* This is a macro that executes the given code fragment for each message in
|
||||
* a message chunk list. The fragment should refer to the message as *msg.
|
||||
* a message subgroup. The fragment should refer to the message as *msg.
|
||||
*/
|
||||
#define ProcessMessageList(listHdr, codeFragment) \
|
||||
#define ProcessMessageSubGroup(group, subgroup, codeFragment) \
|
||||
do { \
|
||||
InvalidationChunk *_chunk; \
|
||||
for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
|
||||
int _msgindex = (group)->firstmsg[subgroup]; \
|
||||
int _endmsg = (group)->nextmsg[subgroup]; \
|
||||
for (; _msgindex < _endmsg; _msgindex++) \
|
||||
{ \
|
||||
int _cindex; \
|
||||
for (_cindex = 0; _cindex < _chunk->nitems; _cindex++) \
|
||||
{ \
|
||||
SharedInvalidationMessage *msg = &_chunk->msgs[_cindex]; \
|
||||
codeFragment; \
|
||||
} \
|
||||
SharedInvalidationMessage *msg = \
|
||||
&InvalMessageArrays[subgroup].msgs[_msgindex]; \
|
||||
codeFragment; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/*
|
||||
* Process a list of invalidation messages group-wise.
|
||||
* Process a subgroup of invalidation messages as an array.
|
||||
*
|
||||
* As above, but the code fragment can handle an array of messages.
|
||||
* The fragment should refer to the messages as msgs[], with n entries.
|
||||
*/
|
||||
#define ProcessMessageListMulti(listHdr, codeFragment) \
|
||||
#define ProcessMessageSubGroupMulti(group, subgroup, codeFragment) \
|
||||
do { \
|
||||
InvalidationChunk *_chunk; \
|
||||
for (_chunk = (listHdr); _chunk != NULL; _chunk = _chunk->next) \
|
||||
{ \
|
||||
SharedInvalidationMessage *msgs = _chunk->msgs; \
|
||||
int n = _chunk->nitems; \
|
||||
int n = NumMessagesInSubGroup(group, subgroup); \
|
||||
if (n > 0) { \
|
||||
SharedInvalidationMessage *msgs = \
|
||||
&InvalMessageArrays[subgroup].msgs[(group)->firstmsg[subgroup]]; \
|
||||
codeFragment; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
* Invalidation set support functions
|
||||
* Invalidation group support functions
|
||||
*
|
||||
* These routines understand about the division of a logical invalidation
|
||||
* list into separate physical lists for catcache and relcache entries.
|
||||
* group into separate physical arrays for catcache and relcache entries.
|
||||
* ----------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
@ -343,7 +378,7 @@ AppendInvalidationMessageList(InvalidationChunk **destHdr,
|
|||
* Add a catcache inval entry
|
||||
*/
|
||||
static void
|
||||
AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
|
||||
AddCatcacheInvalidationMessage(InvalidationMsgsGroup *group,
|
||||
int id, uint32 hashValue, Oid dbId)
|
||||
{
|
||||
SharedInvalidationMessage msg;
|
||||
|
@ -364,14 +399,14 @@ AddCatcacheInvalidationMessage(InvalidationListHeader *hdr,
|
|||
*/
|
||||
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
|
||||
|
||||
AddInvalidationMessage(&hdr->cclist, &msg);
|
||||
AddInvalidationMessage(group, CatCacheMsgs, &msg);
|
||||
}
|
||||
|
||||
/*
|
||||
* Add a whole-catalog inval entry
|
||||
*/
|
||||
static void
|
||||
AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
|
||||
AddCatalogInvalidationMessage(InvalidationMsgsGroup *group,
|
||||
Oid dbId, Oid catId)
|
||||
{
|
||||
SharedInvalidationMessage msg;
|
||||
|
@ -382,14 +417,14 @@ AddCatalogInvalidationMessage(InvalidationListHeader *hdr,
|
|||
/* check AddCatcacheInvalidationMessage() for an explanation */
|
||||
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
|
||||
|
||||
AddInvalidationMessage(&hdr->cclist, &msg);
|
||||
AddInvalidationMessage(group, CatCacheMsgs, &msg);
|
||||
}
|
||||
|
||||
/*
|
||||
* Add a relcache inval entry
|
||||
*/
|
||||
static void
|
||||
AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
|
||||
AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
|
||||
Oid dbId, Oid relId)
|
||||
{
|
||||
SharedInvalidationMessage msg;
|
||||
|
@ -399,11 +434,11 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
|
|||
* it will never change. InvalidOid for relId means all relations so we
|
||||
* don't need to add individual ones when it is present.
|
||||
*/
|
||||
ProcessMessageList(hdr->rclist,
|
||||
if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
|
||||
(msg->rc.relId == relId ||
|
||||
msg->rc.relId == InvalidOid))
|
||||
return);
|
||||
ProcessMessageSubGroup(group, RelCacheMsgs,
|
||||
if (msg->rc.id == SHAREDINVALRELCACHE_ID &&
|
||||
(msg->rc.relId == relId ||
|
||||
msg->rc.relId == InvalidOid))
|
||||
return);
|
||||
|
||||
/* OK, add the item */
|
||||
msg.rc.id = SHAREDINVALRELCACHE_ID;
|
||||
|
@ -412,24 +447,26 @@ AddRelcacheInvalidationMessage(InvalidationListHeader *hdr,
|
|||
/* check AddCatcacheInvalidationMessage() for an explanation */
|
||||
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
|
||||
|
||||
AddInvalidationMessage(&hdr->rclist, &msg);
|
||||
AddInvalidationMessage(group, RelCacheMsgs, &msg);
|
||||
}
|
||||
|
||||
/*
|
||||
* Add a snapshot inval entry
|
||||
*
|
||||
* We put these into the relcache subgroup for simplicity.
|
||||
*/
|
||||
static void
|
||||
AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
|
||||
AddSnapshotInvalidationMessage(InvalidationMsgsGroup *group,
|
||||
Oid dbId, Oid relId)
|
||||
{
|
||||
SharedInvalidationMessage msg;
|
||||
|
||||
/* Don't add a duplicate item */
|
||||
/* We assume dbId need not be checked because it will never change */
|
||||
ProcessMessageList(hdr->rclist,
|
||||
if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
|
||||
msg->sn.relId == relId)
|
||||
return);
|
||||
ProcessMessageSubGroup(group, RelCacheMsgs,
|
||||
if (msg->sn.id == SHAREDINVALSNAPSHOT_ID &&
|
||||
msg->sn.relId == relId)
|
||||
return);
|
||||
|
||||
/* OK, add the item */
|
||||
msg.sn.id = SHAREDINVALSNAPSHOT_ID;
|
||||
|
@ -438,33 +475,33 @@ AddSnapshotInvalidationMessage(InvalidationListHeader *hdr,
|
|||
/* check AddCatcacheInvalidationMessage() for an explanation */
|
||||
VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
|
||||
|
||||
AddInvalidationMessage(&hdr->rclist, &msg);
|
||||
AddInvalidationMessage(group, RelCacheMsgs, &msg);
|
||||
}
|
||||
|
||||
/*
|
||||
* Append one list of invalidation messages to another, resetting
|
||||
* the source list to empty.
|
||||
* Append one group of invalidation messages to another, resetting
|
||||
* the source group to empty.
|
||||
*/
|
||||
static void
|
||||
AppendInvalidationMessages(InvalidationListHeader *dest,
|
||||
InvalidationListHeader *src)
|
||||
AppendInvalidationMessages(InvalidationMsgsGroup *dest,
|
||||
InvalidationMsgsGroup *src)
|
||||
{
|
||||
AppendInvalidationMessageList(&dest->cclist, &src->cclist);
|
||||
AppendInvalidationMessageList(&dest->rclist, &src->rclist);
|
||||
AppendInvalidationMessageSubGroup(dest, src, CatCacheMsgs);
|
||||
AppendInvalidationMessageSubGroup(dest, src, RelCacheMsgs);
|
||||
}
|
||||
|
||||
/*
|
||||
* Execute the given function for all the messages in an invalidation list.
|
||||
* The list is not altered.
|
||||
* Execute the given function for all the messages in an invalidation group.
|
||||
* The group is not altered.
|
||||
*
|
||||
* catcache entries are processed first, for reasons mentioned above.
|
||||
*/
|
||||
static void
|
||||
ProcessInvalidationMessages(InvalidationListHeader *hdr,
|
||||
ProcessInvalidationMessages(InvalidationMsgsGroup *group,
|
||||
void (*func) (SharedInvalidationMessage *msg))
|
||||
{
|
||||
ProcessMessageList(hdr->cclist, func(msg));
|
||||
ProcessMessageList(hdr->rclist, func(msg));
|
||||
ProcessMessageSubGroup(group, CatCacheMsgs, func(msg));
|
||||
ProcessMessageSubGroup(group, RelCacheMsgs, func(msg));
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -472,11 +509,11 @@ ProcessInvalidationMessages(InvalidationListHeader *hdr,
|
|||
* rather than just one at a time.
|
||||
*/
|
||||
static void
|
||||
ProcessInvalidationMessagesMulti(InvalidationListHeader *hdr,
|
||||
ProcessInvalidationMessagesMulti(InvalidationMsgsGroup *group,
|
||||
void (*func) (const SharedInvalidationMessage *msgs, int n))
|
||||
{
|
||||
ProcessMessageListMulti(hdr->cclist, func(msgs, n));
|
||||
ProcessMessageListMulti(hdr->rclist, func(msgs, n));
|
||||
ProcessMessageSubGroupMulti(group, CatCacheMsgs, func(msgs, n));
|
||||
ProcessMessageSubGroupMulti(group, RelCacheMsgs, func(msgs, n));
|
||||
}
|
||||
|
||||
/* ----------------------------------------------------------------
|
||||
|
@ -731,7 +768,7 @@ AcceptInvalidationMessages(void)
|
|||
|
||||
/*
|
||||
* PrepareInvalidationState
|
||||
* Initialize inval lists for the current (sub)transaction.
|
||||
* Initialize inval data for the current (sub)transaction.
|
||||
*/
|
||||
static void
|
||||
PrepareInvalidationState(void)
|
||||
|
@ -748,12 +785,45 @@ PrepareInvalidationState(void)
|
|||
myInfo->parent = transInvalInfo;
|
||||
myInfo->my_level = GetCurrentTransactionNestLevel();
|
||||
|
||||
/*
|
||||
* If there's any previous entry, this one should be for a deeper nesting
|
||||
* level.
|
||||
*/
|
||||
Assert(transInvalInfo == NULL ||
|
||||
myInfo->my_level > transInvalInfo->my_level);
|
||||
/* Now, do we have a previous stack entry? */
|
||||
if (transInvalInfo != NULL)
|
||||
{
|
||||
/* Yes; this one should be for a deeper nesting level. */
|
||||
Assert(myInfo->my_level > transInvalInfo->my_level);
|
||||
|
||||
/*
|
||||
* The parent (sub)transaction must not have any current (i.e.,
|
||||
* not-yet-locally-processed) messages. If it did, we'd have a
|
||||
* semantic problem: the new subtransaction presumably ought not be
|
||||
* able to see those events yet, but since the CommandCounter is
|
||||
* linear, that can't work once the subtransaction advances the
|
||||
* counter. This is a convenient place to check for that, as well as
|
||||
* being important to keep management of the message arrays simple.
|
||||
*/
|
||||
if (NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs) != 0)
|
||||
elog(ERROR, "cannot start a subtransaction when there are unprocessed inval messages");
|
||||
|
||||
/*
|
||||
* MemoryContextAllocZero set firstmsg = nextmsg = 0 in each group,
|
||||
* which is fine for the first (sub)transaction, but otherwise we need
|
||||
* to update them to follow whatever is already in the arrays.
|
||||
*/
|
||||
SetGroupToFollow(&myInfo->PriorCmdInvalidMsgs,
|
||||
&transInvalInfo->CurrentCmdInvalidMsgs);
|
||||
SetGroupToFollow(&myInfo->CurrentCmdInvalidMsgs,
|
||||
&myInfo->PriorCmdInvalidMsgs);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Here, we need only clear any array pointers left over from a prior
|
||||
* transaction.
|
||||
*/
|
||||
InvalMessageArrays[CatCacheMsgs].msgs = NULL;
|
||||
InvalMessageArrays[CatCacheMsgs].maxmsgs = 0;
|
||||
InvalMessageArrays[RelCacheMsgs].msgs = NULL;
|
||||
InvalMessageArrays[RelCacheMsgs].maxmsgs = 0;
|
||||
}
|
||||
|
||||
transInvalInfo = myInfo;
|
||||
}
|
||||
|
@ -777,47 +847,8 @@ PostPrepare_Inval(void)
|
|||
}
|
||||
|
||||
/*
|
||||
* Collect invalidation messages into SharedInvalidMessagesArray array.
|
||||
*/
|
||||
static void
|
||||
MakeSharedInvalidMessagesArray(const SharedInvalidationMessage *msgs, int n)
|
||||
{
|
||||
/*
|
||||
* Initialise array first time through in each commit
|
||||
*/
|
||||
if (SharedInvalidMessagesArray == NULL)
|
||||
{
|
||||
maxSharedInvalidMessagesArray = FIRSTCHUNKSIZE;
|
||||
numSharedInvalidMessagesArray = 0;
|
||||
|
||||
/*
|
||||
* Although this is being palloc'd we don't actually free it directly.
|
||||
* We're so close to EOXact that we now we're going to lose it anyhow.
|
||||
*/
|
||||
SharedInvalidMessagesArray = palloc(maxSharedInvalidMessagesArray
|
||||
* sizeof(SharedInvalidationMessage));
|
||||
}
|
||||
|
||||
if ((numSharedInvalidMessagesArray + n) > maxSharedInvalidMessagesArray)
|
||||
{
|
||||
maxSharedInvalidMessagesArray = pg_nextpower2_32(numSharedInvalidMessagesArray + n);
|
||||
|
||||
SharedInvalidMessagesArray = repalloc(SharedInvalidMessagesArray,
|
||||
maxSharedInvalidMessagesArray
|
||||
* sizeof(SharedInvalidationMessage));
|
||||
}
|
||||
|
||||
/*
|
||||
* Append the next chunk onto the array
|
||||
*/
|
||||
memcpy(SharedInvalidMessagesArray + numSharedInvalidMessagesArray,
|
||||
msgs, n * sizeof(SharedInvalidationMessage));
|
||||
numSharedInvalidMessagesArray += n;
|
||||
}
|
||||
|
||||
/*
|
||||
* xactGetCommittedInvalidationMessages() is executed by
|
||||
* RecordTransactionCommit() to add invalidation messages onto the
|
||||
* xactGetCommittedInvalidationMessages() is called by
|
||||
* RecordTransactionCommit() to collect invalidation messages to add to the
|
||||
* commit record. This applies only to commit message types, never to
|
||||
* abort records. Must always run before AtEOXact_Inval(), since that
|
||||
* removes the data we need to see.
|
||||
|
@ -832,7 +863,9 @@ int
|
|||
xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
|
||||
bool *RelcacheInitFileInval)
|
||||
{
|
||||
MemoryContext oldcontext;
|
||||
SharedInvalidationMessage *msgarray;
|
||||
int nummsgs;
|
||||
int nmsgs;
|
||||
|
||||
/* Quick exit if we haven't done anything with invalidation messages. */
|
||||
if (transInvalInfo == NULL)
|
||||
|
@ -853,27 +886,48 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
|
|||
*RelcacheInitFileInval = transInvalInfo->RelcacheInitFileInval;
|
||||
|
||||
/*
|
||||
* Walk through TransInvalidationInfo to collect all the messages into a
|
||||
* single contiguous array of invalidation messages. It must be contiguous
|
||||
* so we can copy directly into WAL message. Maintain the order that they
|
||||
* would be processed in by AtEOXact_Inval(), to ensure emulated behaviour
|
||||
* in redo is as similar as possible to original. We want the same bugs,
|
||||
* if any, not new ones.
|
||||
* Collect all the pending messages into a single contiguous array of
|
||||
* invalidation messages, to simplify what needs to happen while building
|
||||
* the commit WAL message. Maintain the order that they would be
|
||||
* processed in by AtEOXact_Inval(), to ensure emulated behaviour in redo
|
||||
* is as similar as possible to original. We want the same bugs, if any,
|
||||
* not new ones.
|
||||
*/
|
||||
oldcontext = MemoryContextSwitchTo(CurTransactionContext);
|
||||
nummsgs = NumMessagesInGroup(&transInvalInfo->PriorCmdInvalidMsgs) +
|
||||
NumMessagesInGroup(&transInvalInfo->CurrentCmdInvalidMsgs);
|
||||
|
||||
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
|
||||
MakeSharedInvalidMessagesArray);
|
||||
ProcessInvalidationMessagesMulti(&transInvalInfo->PriorCmdInvalidMsgs,
|
||||
MakeSharedInvalidMessagesArray);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
*msgs = msgarray = (SharedInvalidationMessage *)
|
||||
MemoryContextAlloc(CurTransactionContext,
|
||||
nummsgs * sizeof(SharedInvalidationMessage));
|
||||
|
||||
Assert(!(numSharedInvalidMessagesArray > 0 &&
|
||||
SharedInvalidMessagesArray == NULL));
|
||||
nmsgs = 0;
|
||||
ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
|
||||
CatCacheMsgs,
|
||||
(memcpy(msgarray + nmsgs,
|
||||
msgs,
|
||||
n * sizeof(SharedInvalidationMessage)),
|
||||
nmsgs += n));
|
||||
ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
|
||||
CatCacheMsgs,
|
||||
(memcpy(msgarray + nmsgs,
|
||||
msgs,
|
||||
n * sizeof(SharedInvalidationMessage)),
|
||||
nmsgs += n));
|
||||
ProcessMessageSubGroupMulti(&transInvalInfo->PriorCmdInvalidMsgs,
|
||||
RelCacheMsgs,
|
||||
(memcpy(msgarray + nmsgs,
|
||||
msgs,
|
||||
n * sizeof(SharedInvalidationMessage)),
|
||||
nmsgs += n));
|
||||
ProcessMessageSubGroupMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
|
||||
RelCacheMsgs,
|
||||
(memcpy(msgarray + nmsgs,
|
||||
msgs,
|
||||
n * sizeof(SharedInvalidationMessage)),
|
||||
nmsgs += n));
|
||||
Assert(nmsgs == nummsgs);
|
||||
|
||||
*msgs = SharedInvalidMessagesArray;
|
||||
|
||||
return numSharedInvalidMessagesArray;
|
||||
return nmsgs;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -942,7 +996,7 @@ ProcessCommittedInvalidationMessages(SharedInvalidationMessage *msgs,
|
|||
* about CurrentCmdInvalidMsgs too, since those changes haven't touched
|
||||
* the caches yet.
|
||||
*
|
||||
* In any case, reset the various lists to empty. We need not physically
|
||||
* In any case, reset our state to empty. We need not physically
|
||||
* free memory here, since TopTransactionContext is about to be emptied
|
||||
* anyway.
|
||||
*
|
||||
|
@ -986,8 +1040,6 @@ AtEOXact_Inval(bool isCommit)
|
|||
|
||||
/* Need not free anything explicitly */
|
||||
transInvalInfo = NULL;
|
||||
SharedInvalidMessagesArray = NULL;
|
||||
numSharedInvalidMessagesArray = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1043,10 +1095,21 @@ AtEOSubXact_Inval(bool isCommit)
|
|||
return;
|
||||
}
|
||||
|
||||
/* Pass up my inval messages to parent */
|
||||
/*
|
||||
* Pass up my inval messages to parent. Notice that we stick them in
|
||||
* PriorCmdInvalidMsgs, not CurrentCmdInvalidMsgs, since they've
|
||||
* already been locally processed. (This would trigger the Assert in
|
||||
* AppendInvalidationMessageSubGroup if the parent's
|
||||
* CurrentCmdInvalidMsgs isn't empty; but we already checked that in
|
||||
* PrepareInvalidationState.)
|
||||
*/
|
||||
AppendInvalidationMessages(&myInfo->parent->PriorCmdInvalidMsgs,
|
||||
&myInfo->PriorCmdInvalidMsgs);
|
||||
|
||||
/* Must readjust parent's CurrentCmdInvalidMsgs indexes now */
|
||||
SetGroupToFollow(&myInfo->parent->CurrentCmdInvalidMsgs,
|
||||
&myInfo->parent->PriorCmdInvalidMsgs);
|
||||
|
||||
/* Pending relcache inval becomes parent's problem too */
|
||||
if (myInfo->RelcacheInitFileInval)
|
||||
myInfo->parent->RelcacheInitFileInval = true;
|
||||
|
@ -1514,31 +1577,24 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
|
|||
/*
|
||||
* LogLogicalInvalidations
|
||||
*
|
||||
* Emit WAL for invalidations. This is currently only used for logging
|
||||
* invalidations at the command end or at commit time if any invalidations
|
||||
* are pending.
|
||||
* Emit WAL for invalidations caused by the current command.
|
||||
*
|
||||
* This is currently only used for logging invalidations at the command end
|
||||
* or at commit time if any invalidations are pending.
|
||||
*/
|
||||
void
|
||||
LogLogicalInvalidations()
|
||||
LogLogicalInvalidations(void)
|
||||
{
|
||||
xl_xact_invals xlrec;
|
||||
SharedInvalidationMessage *invalMessages;
|
||||
int nmsgs = 0;
|
||||
InvalidationMsgsGroup *group;
|
||||
int nmsgs;
|
||||
|
||||
/* Quick exit if we haven't done anything with invalidation messages. */
|
||||
if (transInvalInfo == NULL)
|
||||
return;
|
||||
|
||||
ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
|
||||
MakeSharedInvalidMessagesArray);
|
||||
|
||||
Assert(!(numSharedInvalidMessagesArray > 0 &&
|
||||
SharedInvalidMessagesArray == NULL));
|
||||
|
||||
invalMessages = SharedInvalidMessagesArray;
|
||||
nmsgs = numSharedInvalidMessagesArray;
|
||||
SharedInvalidMessagesArray = NULL;
|
||||
numSharedInvalidMessagesArray = 0;
|
||||
group = &transInvalInfo->CurrentCmdInvalidMsgs;
|
||||
nmsgs = NumMessagesInGroup(group);
|
||||
|
||||
if (nmsgs > 0)
|
||||
{
|
||||
|
@ -1549,10 +1605,12 @@ LogLogicalInvalidations()
|
|||
/* perform insertion */
|
||||
XLogBeginInsert();
|
||||
XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals);
|
||||
XLogRegisterData((char *) invalMessages,
|
||||
nmsgs * sizeof(SharedInvalidationMessage));
|
||||
ProcessMessageSubGroupMulti(group, CatCacheMsgs,
|
||||
XLogRegisterData((char *) msgs,
|
||||
n * sizeof(SharedInvalidationMessage)));
|
||||
ProcessMessageSubGroupMulti(group, RelCacheMsgs,
|
||||
XLogRegisterData((char *) msgs,
|
||||
n * sizeof(SharedInvalidationMessage)));
|
||||
XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
|
||||
|
||||
pfree(invalMessages);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue