
All error messages use the American English spelling of recognize, apply to the single one not doing so to be consistent. Author: Daniel Gustafsson <daniel@yesql.se>
597 lines
16 KiB
C
597 lines
16 KiB
C
/*-------------------------------------------------------------------------
|
|
*
|
|
* pgoutput.c
|
|
* Logical Replication output plugin
|
|
*
|
|
* Copyright (c) 2012-2015, PostgreSQL Global Development Group
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/replication/pgoutput/pgoutput.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#include "postgres.h"
|
|
|
|
#include "catalog/pg_publication.h"
|
|
|
|
#include "replication/logical.h"
|
|
#include "replication/logicalproto.h"
|
|
#include "replication/origin.h"
|
|
#include "replication/pgoutput.h"
|
|
|
|
#include "utils/inval.h"
|
|
#include "utils/int8.h"
|
|
#include "utils/memutils.h"
|
|
#include "utils/syscache.h"
|
|
#include "utils/varlena.h"
|
|
|
|
PG_MODULE_MAGIC;
|
|
|
|
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
|
|
|
|
static void pgoutput_startup(LogicalDecodingContext * ctx,
|
|
OutputPluginOptions *opt, bool is_init);
|
|
static void pgoutput_shutdown(LogicalDecodingContext * ctx);
|
|
static void pgoutput_begin_txn(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn);
|
|
static void pgoutput_commit_txn(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
|
|
static void pgoutput_change(LogicalDecodingContext *ctx,
|
|
ReorderBufferTXN *txn, Relation rel,
|
|
ReorderBufferChange *change);
|
|
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
|
|
RepOriginId origin_id);
|
|
|
|
static bool publications_valid;
|
|
|
|
static List *LoadPublications(List *pubnames);
|
|
static void publication_invalidation_cb(Datum arg, int cacheid,
|
|
uint32 hashvalue);
|
|
|
|
/* Entry in the map used to remember which relation schemas we sent. */
|
|
typedef struct RelationSyncEntry
|
|
{
|
|
Oid relid; /* relation oid */
|
|
bool schema_sent; /* did we send the schema? */
|
|
bool replicate_valid;
|
|
PublicationActions pubactions;
|
|
} RelationSyncEntry;
|
|
|
|
/* Map used to remember which relation schemas we sent. */
|
|
static HTAB *RelationSyncCache = NULL;
|
|
|
|
static void init_rel_sync_cache(MemoryContext decoding_context);
|
|
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
|
|
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
|
|
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
|
|
uint32 hashvalue);
|
|
|
|
/*
|
|
* Specify output plugin callbacks
|
|
*/
|
|
void
|
|
_PG_output_plugin_init(OutputPluginCallbacks *cb)
|
|
{
|
|
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
|
|
|
|
cb->startup_cb = pgoutput_startup;
|
|
cb->begin_cb = pgoutput_begin_txn;
|
|
cb->change_cb = pgoutput_change;
|
|
cb->commit_cb = pgoutput_commit_txn;
|
|
cb->filter_by_origin_cb = pgoutput_origin_filter;
|
|
cb->shutdown_cb = pgoutput_shutdown;
|
|
}
|
|
|
|
static void
|
|
parse_output_parameters(List *options, uint32 *protocol_version,
|
|
List **publication_names)
|
|
{
|
|
ListCell *lc;
|
|
bool protocol_version_given = false;
|
|
bool publication_names_given = false;
|
|
|
|
foreach(lc, options)
|
|
{
|
|
DefElem *defel = (DefElem *) lfirst(lc);
|
|
|
|
Assert(defel->arg == NULL || IsA(defel->arg, String));
|
|
|
|
/* Check each param, whether or not we recognize it */
|
|
if (strcmp(defel->defname, "proto_version") == 0)
|
|
{
|
|
int64 parsed;
|
|
|
|
if (protocol_version_given)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
errmsg("conflicting or redundant options")));
|
|
protocol_version_given = true;
|
|
|
|
if (!scanint8(strVal(defel->arg), true, &parsed))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("invalid proto_version")));
|
|
|
|
if (parsed > PG_UINT32_MAX || parsed < 0)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("proto_verson \"%s\" out of range",
|
|
strVal(defel->arg))));
|
|
|
|
*protocol_version = (uint32) parsed;
|
|
}
|
|
else if (strcmp(defel->defname, "publication_names") == 0)
|
|
{
|
|
if (publication_names_given)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_SYNTAX_ERROR),
|
|
errmsg("conflicting or redundant options")));
|
|
publication_names_given = true;
|
|
|
|
if (!SplitIdentifierString(strVal(defel->arg), ',',
|
|
publication_names))
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_NAME),
|
|
errmsg("invalid publication_names syntax")));
|
|
}
|
|
else
|
|
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Initialize this plugin
|
|
*/
|
|
static void
|
|
pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
|
|
bool is_init)
|
|
{
|
|
PGOutputData *data = palloc0(sizeof(PGOutputData));
|
|
|
|
/* Create our memory context for private allocations. */
|
|
data->context = AllocSetContextCreate(ctx->context,
|
|
"logical replication output context",
|
|
ALLOCSET_DEFAULT_MINSIZE,
|
|
ALLOCSET_DEFAULT_INITSIZE,
|
|
ALLOCSET_DEFAULT_MAXSIZE);
|
|
|
|
ctx->output_plugin_private = data;
|
|
|
|
/* This plugin uses binary protocol. */
|
|
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
|
|
|
/*
|
|
* This is replication start and not slot initialization.
|
|
*
|
|
* Parse and validate options passed by the client.
|
|
*/
|
|
if (!is_init)
|
|
{
|
|
/* Parse the params and ERROR if we see any we don't recognize */
|
|
parse_output_parameters(ctx->output_plugin_options,
|
|
&data->protocol_version,
|
|
&data->publication_names);
|
|
|
|
/* Check if we support requested protocol */
|
|
if (data->protocol_version != LOGICALREP_PROTO_VERSION_NUM)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("client sent proto_version=%d but we only support protocol %d or lower",
|
|
data->protocol_version, LOGICALREP_PROTO_VERSION_NUM)));
|
|
|
|
if (data->protocol_version < LOGICALREP_PROTO_MIN_VERSION_NUM)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("client sent proto_version=%d but we only support protocol %d or higher",
|
|
data->protocol_version, LOGICALREP_PROTO_MIN_VERSION_NUM)));
|
|
|
|
if (list_length(data->publication_names) < 1)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("publication_names parameter missing")));
|
|
|
|
/* Init publication state. */
|
|
data->publications = NIL;
|
|
publications_valid = false;
|
|
CacheRegisterSyscacheCallback(PUBLICATIONOID,
|
|
publication_invalidation_cb,
|
|
(Datum) 0);
|
|
|
|
/* Initialize relation schema cache. */
|
|
init_rel_sync_cache(CacheMemoryContext);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* BEGIN callback
|
|
*/
|
|
static void
|
|
pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
|
|
{
|
|
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
|
|
|
|
OutputPluginPrepareWrite(ctx, !send_replication_origin);
|
|
logicalrep_write_begin(ctx->out, txn);
|
|
|
|
if (send_replication_origin)
|
|
{
|
|
char *origin;
|
|
|
|
/* Message boundary */
|
|
OutputPluginWrite(ctx, false);
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
|
|
/*
|
|
* XXX: which behaviour we want here?
|
|
*
|
|
* Alternatives:
|
|
* - don't send origin message if origin name not found
|
|
* (that's what we do now)
|
|
* - throw error - that will break replication, not good
|
|
* - send some special "unknown" origin
|
|
*/
|
|
if (replorigin_by_oid(txn->origin_id, true, &origin))
|
|
logicalrep_write_origin(ctx->out, origin, txn->origin_lsn);
|
|
}
|
|
|
|
OutputPluginWrite(ctx, true);
|
|
}
|
|
|
|
/*
|
|
* COMMIT callback
|
|
*/
|
|
static void
|
|
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
XLogRecPtr commit_lsn)
|
|
{
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
logicalrep_write_commit(ctx->out, txn, commit_lsn);
|
|
OutputPluginWrite(ctx, true);
|
|
}
|
|
|
|
/*
|
|
* Sends the decoded DML over wire.
|
|
*/
|
|
static void
|
|
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
|
Relation relation, ReorderBufferChange *change)
|
|
{
|
|
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
|
|
MemoryContext old;
|
|
RelationSyncEntry *relentry;
|
|
|
|
relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
|
|
|
|
/* First check the table filter */
|
|
switch (change->action)
|
|
{
|
|
case REORDER_BUFFER_CHANGE_INSERT:
|
|
if (!relentry->pubactions.pubinsert)
|
|
return;
|
|
break;
|
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
|
if (!relentry->pubactions.pubupdate)
|
|
return;
|
|
break;
|
|
case REORDER_BUFFER_CHANGE_DELETE:
|
|
if (!relentry->pubactions.pubdelete)
|
|
return;
|
|
break;
|
|
default:
|
|
Assert(false);
|
|
}
|
|
|
|
/* Avoid leaking memory by using and resetting our own context */
|
|
old = MemoryContextSwitchTo(data->context);
|
|
|
|
/*
|
|
* Write the relation schema if the current schema haven't been sent yet.
|
|
*/
|
|
if (!relentry->schema_sent)
|
|
{
|
|
TupleDesc desc;
|
|
int i;
|
|
|
|
desc = RelationGetDescr(relation);
|
|
|
|
/*
|
|
* Write out type info if needed. We do that only for user created
|
|
* types.
|
|
*/
|
|
for (i = 0; i < desc->natts; i++)
|
|
{
|
|
Form_pg_attribute att = desc->attrs[i];
|
|
|
|
if (att->attisdropped)
|
|
continue;
|
|
|
|
if (att->atttypid < FirstNormalObjectId)
|
|
continue;
|
|
|
|
OutputPluginPrepareWrite(ctx, false);
|
|
logicalrep_write_typ(ctx->out, att->atttypid);
|
|
OutputPluginWrite(ctx, false);
|
|
}
|
|
|
|
OutputPluginPrepareWrite(ctx, false);
|
|
logicalrep_write_rel(ctx->out, relation);
|
|
OutputPluginWrite(ctx, false);
|
|
relentry->schema_sent = true;
|
|
}
|
|
|
|
/* Send the data */
|
|
switch (change->action)
|
|
{
|
|
case REORDER_BUFFER_CHANGE_INSERT:
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
logicalrep_write_insert(ctx->out, relation,
|
|
&change->data.tp.newtuple->tuple);
|
|
OutputPluginWrite(ctx, true);
|
|
break;
|
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
|
{
|
|
HeapTuple oldtuple = change->data.tp.oldtuple ?
|
|
&change->data.tp.oldtuple->tuple : NULL;
|
|
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
logicalrep_write_update(ctx->out, relation, oldtuple,
|
|
&change->data.tp.newtuple->tuple);
|
|
OutputPluginWrite(ctx, true);
|
|
break;
|
|
}
|
|
case REORDER_BUFFER_CHANGE_DELETE:
|
|
if (change->data.tp.oldtuple)
|
|
{
|
|
OutputPluginPrepareWrite(ctx, true);
|
|
logicalrep_write_delete(ctx->out, relation,
|
|
&change->data.tp.oldtuple->tuple);
|
|
OutputPluginWrite(ctx, true);
|
|
}
|
|
else
|
|
elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
|
|
break;
|
|
default:
|
|
Assert(false);
|
|
}
|
|
|
|
/* Cleanup */
|
|
MemoryContextSwitchTo(old);
|
|
MemoryContextReset(data->context);
|
|
}
|
|
|
|
/*
|
|
* Currently we always forward.
|
|
*/
|
|
static bool
|
|
pgoutput_origin_filter(LogicalDecodingContext *ctx,
|
|
RepOriginId origin_id)
|
|
{
|
|
return false;
|
|
}
|
|
|
|
/*
|
|
* Shutdown the output plugin.
|
|
*
|
|
* Note, we don't need to clean the data->context as it's child context
|
|
* of the ctx->context so it will be cleaned up by logical decoding machinery.
|
|
*/
|
|
static void
|
|
pgoutput_shutdown(LogicalDecodingContext * ctx)
|
|
{
|
|
if (RelationSyncCache)
|
|
{
|
|
hash_destroy(RelationSyncCache);
|
|
RelationSyncCache = NULL;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Load publications from the list of publication names.
|
|
*/
|
|
static List *
|
|
LoadPublications(List *pubnames)
|
|
{
|
|
List *result = NIL;
|
|
ListCell *lc;
|
|
|
|
foreach (lc, pubnames)
|
|
{
|
|
char *pubname = (char *) lfirst(lc);
|
|
Publication *pub = GetPublicationByName(pubname, false);
|
|
|
|
result = lappend(result, pub);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/*
|
|
* Publication cache invalidation callback.
|
|
*/
|
|
static void
|
|
publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
|
|
{
|
|
publications_valid = false;
|
|
|
|
/*
|
|
* Also invalidate per-relation cache so that next time the filtering
|
|
* info is checked it will be updated with the new publication
|
|
* settings.
|
|
*/
|
|
rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
|
|
}
|
|
|
|
/*
|
|
* Initialize the relation schema sync cache for a decoding session.
|
|
*
|
|
* The hash table is destroyed at the end of a decoding session. While
|
|
* relcache invalidations still exist and will still be invoked, they
|
|
* will just see the null hash table global and take no action.
|
|
*/
|
|
static void
|
|
init_rel_sync_cache(MemoryContext cachectx)
|
|
{
|
|
HASHCTL ctl;
|
|
MemoryContext old_ctxt;
|
|
|
|
if (RelationSyncCache != NULL)
|
|
return;
|
|
|
|
/* Make a new hash table for the cache */
|
|
MemSet(&ctl, 0, sizeof(ctl));
|
|
ctl.keysize = sizeof(Oid);
|
|
ctl.entrysize = sizeof(RelationSyncEntry);
|
|
ctl.hcxt = cachectx;
|
|
|
|
old_ctxt = MemoryContextSwitchTo(cachectx);
|
|
RelationSyncCache = hash_create("logical replication output relation cache",
|
|
128, &ctl,
|
|
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
|
(void) MemoryContextSwitchTo(old_ctxt);
|
|
|
|
Assert(RelationSyncCache != NULL);
|
|
|
|
CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
|
|
CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
|
|
rel_sync_cache_publication_cb,
|
|
(Datum) 0);
|
|
}
|
|
|
|
/*
|
|
* Find or create entry in the relation schema cache.
|
|
*/
|
|
static RelationSyncEntry *
|
|
get_rel_sync_entry(PGOutputData *data, Oid relid)
|
|
{
|
|
RelationSyncEntry *entry;
|
|
bool found;
|
|
MemoryContext oldctx;
|
|
|
|
Assert(RelationSyncCache != NULL);
|
|
|
|
/* Find cached function info, creating if not found */
|
|
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
|
|
entry = (RelationSyncEntry *) hash_search(RelationSyncCache,
|
|
(void *) &relid,
|
|
HASH_ENTER, &found);
|
|
MemoryContextSwitchTo(oldctx);
|
|
Assert(entry != NULL);
|
|
|
|
/* Not found means schema wasn't sent */
|
|
if (!found || !entry->replicate_valid)
|
|
{
|
|
List *pubids = GetRelationPublications(relid);
|
|
ListCell *lc;
|
|
|
|
/* Reload publications if needed before use. */
|
|
if (!publications_valid)
|
|
{
|
|
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
|
|
if (data->publications)
|
|
list_free_deep(data->publications);
|
|
|
|
data->publications = LoadPublications(data->publication_names);
|
|
MemoryContextSwitchTo(oldctx);
|
|
publications_valid = true;
|
|
}
|
|
|
|
/*
|
|
* Build publication cache. We can't use one provided by relcache
|
|
* as relcache considers all publications given relation is in, but
|
|
* here we only need to consider ones that the subscriber requested.
|
|
*/
|
|
entry->pubactions.pubinsert = entry->pubactions.pubupdate =
|
|
entry->pubactions.pubdelete = false;
|
|
|
|
foreach(lc, data->publications)
|
|
{
|
|
Publication *pub = lfirst(lc);
|
|
|
|
if (pub->alltables || list_member_oid(pubids, pub->oid))
|
|
{
|
|
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;
|
|
entry->pubactions.pubupdate |= pub->pubactions.pubupdate;
|
|
entry->pubactions.pubdelete |= pub->pubactions.pubdelete;
|
|
}
|
|
|
|
if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
|
|
entry->pubactions.pubdelete)
|
|
break;
|
|
}
|
|
|
|
list_free(pubids);
|
|
|
|
entry->replicate_valid = true;
|
|
}
|
|
|
|
if (!found)
|
|
entry->schema_sent = false;
|
|
|
|
return entry;
|
|
}
|
|
|
|
/*
|
|
* Relcache invalidation callback
|
|
*/
|
|
static void
|
|
rel_sync_cache_relation_cb(Datum arg, Oid relid)
|
|
{
|
|
RelationSyncEntry *entry;
|
|
|
|
/*
|
|
* We can get here if the plugin was used in SQL interface as the
|
|
* RelSchemaSyncCache is destroyed when the decoding finishes, but there
|
|
* is no way to unregister the relcache invalidation callback.
|
|
*/
|
|
if (RelationSyncCache == NULL)
|
|
return;
|
|
|
|
/*
|
|
* Nobody keeps pointers to entries in this hash table around outside
|
|
* logical decoding callback calls - but invalidation events can come in
|
|
* *during* a callback if we access the relcache in the callback. Because
|
|
* of that we must mark the cache entry as invalid but not remove it from
|
|
* the hash while it could still be referenced, then prune it at a later
|
|
* safe point.
|
|
*
|
|
* Getting invalidations for relations that aren't in the table is
|
|
* entirely normal, since there's no way to unregister for an
|
|
* invalidation event. So we don't care if it's found or not.
|
|
*/
|
|
entry = (RelationSyncEntry *) hash_search(RelationSyncCache, &relid,
|
|
HASH_FIND, NULL);
|
|
|
|
/*
|
|
* Reset schema sent status as the relation definition may have
|
|
* changed.
|
|
*/
|
|
if (entry != NULL)
|
|
entry->schema_sent = false;
|
|
}
|
|
|
|
/*
|
|
* Publication relation map syscache invalidation callback
|
|
*/
|
|
static void
|
|
rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
|
|
{
|
|
HASH_SEQ_STATUS status;
|
|
RelationSyncEntry *entry;
|
|
|
|
/*
|
|
* We can get here if the plugin was used in SQL interface as the
|
|
* RelSchemaSyncCache is destroyed when the decoding finishes, but there
|
|
* is no way to unregister the relcache invalidation callback.
|
|
*/
|
|
if (RelationSyncCache == NULL)
|
|
return;
|
|
|
|
/*
|
|
* There is no way to find which entry in our cache the hash belongs to
|
|
* so mark the whole cache as invalid.
|
|
*/
|
|
hash_seq_init(&status, RelationSyncCache);
|
|
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
|
|
entry->replicate_valid = false;
|
|
}
|