Refactor sending of RowDescription messages in replication protocol
Some routines open-coded the construction of RowDescription messages. Instead, we have support for doing this using tuple descriptors and DestRemoteSimple, so use that instead. Reviewed-by: Nathan Bossart <nathandbossart@gmail.com> Discussion: https://www.postgresql.org/message-id/flat/7e4fdbdc-699c-4cd0-115d-fb78a957fc22@enterprisedb.com
This commit is contained in:
parent
f10a025cfe
commit
2ce648f750
@ -739,6 +739,15 @@ TupleDescInitBuiltinEntry(TupleDesc desc,
|
|||||||
att->attcollation = InvalidOid;
|
att->attcollation = InvalidOid;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case OIDOID:
|
||||||
|
att->attlen = 4;
|
||||||
|
att->attbyval = true;
|
||||||
|
att->attalign = TYPALIGN_INT;
|
||||||
|
att->attstorage = TYPSTORAGE_PLAIN;
|
||||||
|
att->attcompression = InvalidCompressionMethod;
|
||||||
|
att->attcollation = InvalidOid;
|
||||||
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
elog(ERROR, "unsupported type %u", oidtypeid);
|
elog(ERROR, "unsupported type %u", oidtypeid);
|
||||||
}
|
}
|
||||||
|
@ -25,11 +25,13 @@
|
|||||||
*/
|
*/
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "access/tupdesc.h"
|
||||||
#include "catalog/pg_type_d.h"
|
#include "catalog/pg_type_d.h"
|
||||||
#include "libpq/libpq.h"
|
#include "libpq/libpq.h"
|
||||||
#include "libpq/pqformat.h"
|
#include "libpq/pqformat.h"
|
||||||
#include "replication/basebackup.h"
|
#include "replication/basebackup.h"
|
||||||
#include "replication/basebackup_sink.h"
|
#include "replication/basebackup_sink.h"
|
||||||
|
#include "tcop/dest.h"
|
||||||
#include "utils/timestamp.h"
|
#include "utils/timestamp.h"
|
||||||
|
|
||||||
typedef struct bbsink_copystream
|
typedef struct bbsink_copystream
|
||||||
@ -336,35 +338,24 @@ SendCopyDone(void)
|
|||||||
static void
|
static void
|
||||||
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
|
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
|
||||||
{
|
{
|
||||||
|
DestReceiver *dest;
|
||||||
|
TupleDesc tupdesc;
|
||||||
StringInfoData buf;
|
StringInfoData buf;
|
||||||
char str[MAXFNAMELEN];
|
char str[MAXFNAMELEN];
|
||||||
Size len;
|
Size len;
|
||||||
|
|
||||||
pq_beginmessage(&buf, 'T'); /* RowDescription */
|
dest = CreateDestReceiver(DestRemoteSimple);
|
||||||
pq_sendint16(&buf, 2); /* 2 fields */
|
|
||||||
|
|
||||||
/* Field headers */
|
|
||||||
pq_sendstring(&buf, "recptr");
|
|
||||||
pq_sendint32(&buf, 0); /* table oid */
|
|
||||||
pq_sendint16(&buf, 0); /* attnum */
|
|
||||||
pq_sendint32(&buf, TEXTOID); /* type oid */
|
|
||||||
pq_sendint16(&buf, -1);
|
|
||||||
pq_sendint32(&buf, 0);
|
|
||||||
pq_sendint16(&buf, 0);
|
|
||||||
|
|
||||||
pq_sendstring(&buf, "tli");
|
|
||||||
pq_sendint32(&buf, 0); /* table oid */
|
|
||||||
pq_sendint16(&buf, 0); /* attnum */
|
|
||||||
|
|
||||||
|
tupdesc = CreateTemplateTupleDesc(2);
|
||||||
|
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0);
|
||||||
/*
|
/*
|
||||||
* int8 may seem like a surprising data type for this, but in theory int4
|
* int8 may seem like a surprising data type for this, but in theory int4
|
||||||
* would not be wide enough for this, as TimeLineID is unsigned.
|
* would not be wide enough for this, as TimeLineID is unsigned.
|
||||||
*/
|
*/
|
||||||
pq_sendint32(&buf, INT8OID); /* type oid */
|
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
|
||||||
pq_sendint16(&buf, 8);
|
|
||||||
pq_sendint32(&buf, 0);
|
/* send RowDescription */
|
||||||
pq_sendint16(&buf, 0);
|
dest->rStartup(dest, CMD_SELECT, tupdesc);
|
||||||
pq_endmessage(&buf);
|
|
||||||
|
|
||||||
/* Data row */
|
/* Data row */
|
||||||
pq_beginmessage(&buf, 'D');
|
pq_beginmessage(&buf, 'D');
|
||||||
@ -391,41 +382,22 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
|
|||||||
static void
|
static void
|
||||||
SendTablespaceList(List *tablespaces)
|
SendTablespaceList(List *tablespaces)
|
||||||
{
|
{
|
||||||
|
DestReceiver *dest;
|
||||||
|
TupleDesc tupdesc;
|
||||||
StringInfoData buf;
|
StringInfoData buf;
|
||||||
ListCell *lc;
|
ListCell *lc;
|
||||||
|
|
||||||
|
dest = CreateDestReceiver(DestRemoteSimple);
|
||||||
|
|
||||||
|
tupdesc = CreateTemplateTupleDesc(3);
|
||||||
|
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0);
|
||||||
|
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0);
|
||||||
|
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
|
||||||
|
|
||||||
|
/* send RowDescription */
|
||||||
|
dest->rStartup(dest, CMD_SELECT, tupdesc);
|
||||||
|
|
||||||
/* Construct and send the directory information */
|
/* Construct and send the directory information */
|
||||||
pq_beginmessage(&buf, 'T'); /* RowDescription */
|
|
||||||
pq_sendint16(&buf, 3); /* 3 fields */
|
|
||||||
|
|
||||||
/* First field - spcoid */
|
|
||||||
pq_sendstring(&buf, "spcoid");
|
|
||||||
pq_sendint32(&buf, 0); /* table oid */
|
|
||||||
pq_sendint16(&buf, 0); /* attnum */
|
|
||||||
pq_sendint32(&buf, OIDOID); /* type oid */
|
|
||||||
pq_sendint16(&buf, 4); /* typlen */
|
|
||||||
pq_sendint32(&buf, 0); /* typmod */
|
|
||||||
pq_sendint16(&buf, 0); /* format code */
|
|
||||||
|
|
||||||
/* Second field - spclocation */
|
|
||||||
pq_sendstring(&buf, "spclocation");
|
|
||||||
pq_sendint32(&buf, 0);
|
|
||||||
pq_sendint16(&buf, 0);
|
|
||||||
pq_sendint32(&buf, TEXTOID);
|
|
||||||
pq_sendint16(&buf, -1);
|
|
||||||
pq_sendint32(&buf, 0);
|
|
||||||
pq_sendint16(&buf, 0);
|
|
||||||
|
|
||||||
/* Third field - size */
|
|
||||||
pq_sendstring(&buf, "size");
|
|
||||||
pq_sendint32(&buf, 0);
|
|
||||||
pq_sendint16(&buf, 0);
|
|
||||||
pq_sendint32(&buf, INT8OID);
|
|
||||||
pq_sendint16(&buf, 8);
|
|
||||||
pq_sendint32(&buf, 0);
|
|
||||||
pq_sendint16(&buf, 0);
|
|
||||||
pq_endmessage(&buf);
|
|
||||||
|
|
||||||
foreach(lc, tablespaces)
|
foreach(lc, tablespaces)
|
||||||
{
|
{
|
||||||
tablespaceinfo *ti = lfirst(lc);
|
tablespaceinfo *ti = lfirst(lc);
|
||||||
|
@ -579,6 +579,8 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
|
|||||||
static void
|
static void
|
||||||
SendTimeLineHistory(TimeLineHistoryCmd *cmd)
|
SendTimeLineHistory(TimeLineHistoryCmd *cmd)
|
||||||
{
|
{
|
||||||
|
DestReceiver *dest;
|
||||||
|
TupleDesc tupdesc;
|
||||||
StringInfoData buf;
|
StringInfoData buf;
|
||||||
char histfname[MAXFNAMELEN];
|
char histfname[MAXFNAMELEN];
|
||||||
char path[MAXPGPATH];
|
char path[MAXPGPATH];
|
||||||
@ -587,36 +589,21 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
|
|||||||
off_t bytesleft;
|
off_t bytesleft;
|
||||||
Size len;
|
Size len;
|
||||||
|
|
||||||
|
dest = CreateDestReceiver(DestRemoteSimple);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Reply with a result set with one row, and two columns. The first col is
|
* Reply with a result set with one row, and two columns. The first col is
|
||||||
* the name of the history file, 2nd is the contents.
|
* the name of the history file, 2nd is the contents.
|
||||||
*/
|
*/
|
||||||
|
tupdesc = CreateTemplateTupleDesc(2);
|
||||||
|
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
|
||||||
|
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
|
||||||
|
|
||||||
TLHistoryFileName(histfname, cmd->timeline);
|
TLHistoryFileName(histfname, cmd->timeline);
|
||||||
TLHistoryFilePath(path, cmd->timeline);
|
TLHistoryFilePath(path, cmd->timeline);
|
||||||
|
|
||||||
/* Send a RowDescription message */
|
/* Send a RowDescription message */
|
||||||
pq_beginmessage(&buf, 'T');
|
dest->rStartup(dest, CMD_SELECT, tupdesc);
|
||||||
pq_sendint16(&buf, 2); /* 2 fields */
|
|
||||||
|
|
||||||
/* first field */
|
|
||||||
pq_sendstring(&buf, "filename"); /* col name */
|
|
||||||
pq_sendint32(&buf, 0); /* table oid */
|
|
||||||
pq_sendint16(&buf, 0); /* attnum */
|
|
||||||
pq_sendint32(&buf, TEXTOID); /* type oid */
|
|
||||||
pq_sendint16(&buf, -1); /* typlen */
|
|
||||||
pq_sendint32(&buf, 0); /* typmod */
|
|
||||||
pq_sendint16(&buf, 0); /* format code */
|
|
||||||
|
|
||||||
/* second field */
|
|
||||||
pq_sendstring(&buf, "content"); /* col name */
|
|
||||||
pq_sendint32(&buf, 0); /* table oid */
|
|
||||||
pq_sendint16(&buf, 0); /* attnum */
|
|
||||||
pq_sendint32(&buf, TEXTOID); /* type oid */
|
|
||||||
pq_sendint16(&buf, -1); /* typlen */
|
|
||||||
pq_sendint32(&buf, 0); /* typmod */
|
|
||||||
pq_sendint16(&buf, 0); /* format code */
|
|
||||||
pq_endmessage(&buf);
|
|
||||||
|
|
||||||
/* Send a DataRow message */
|
/* Send a DataRow message */
|
||||||
pq_beginmessage(&buf, 'D');
|
pq_beginmessage(&buf, 'D');
|
||||||
|
Loading…
x
Reference in New Issue
Block a user