Refactor sending of DataRow messages in replication protocol

Some routines open-coded the construction of DataRow messages.  Use
TupOutputState struct and associated functions instead, which was
already done in some places.

SendTimeLineHistory() is a bit more complicated and isn't converted by
this.

Reviewed-by: Nathan Bossart <nathandbossart@gmail.com>
Discussion: https://www.postgresql.org/message-id/flat/7e4fdbdc-699c-4cd0-115d-fb78a957fc22@enterprisedb.com
This commit is contained in:
Peter Eisentraut 2022-07-06 08:28:02 +02:00
parent b55f62abb2
commit 16d52fc89d
2 changed files with 33 additions and 47 deletions

View File

@ -121,6 +121,17 @@ printsimple(TupleTableSlot *slot, DestReceiver *self)
} }
break; break;
case OIDOID:
{
Oid num = ObjectIdGetDatum(value);
char str[10]; /* 10 digits */
int len;
len = pg_ultoa_n(num, str);
pq_sendcountedtext(&buf, str, len, false);
}
break;
default: default:
elog(ERROR, "unsupported type OID: %u", attr->atttypid); elog(ERROR, "unsupported type OID: %u", attr->atttypid);
} }

View File

@ -27,11 +27,13 @@
#include "access/tupdesc.h" #include "access/tupdesc.h"
#include "catalog/pg_type_d.h" #include "catalog/pg_type_d.h"
#include "executor/executor.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "replication/basebackup.h" #include "replication/basebackup.h"
#include "replication/basebackup_sink.h" #include "replication/basebackup_sink.h"
#include "tcop/dest.h" #include "tcop/dest.h"
#include "utils/builtins.h"
#include "utils/timestamp.h" #include "utils/timestamp.h"
typedef struct bbsink_copystream typedef struct bbsink_copystream
@ -86,7 +88,6 @@ static void SendCopyOutResponse(void);
static void SendCopyDone(void); static void SendCopyDone(void);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static void SendTablespaceList(List *tablespaces); static void SendTablespaceList(List *tablespaces);
static void send_int8_string(StringInfoData *buf, int64 intval);
static const bbsink_ops bbsink_copystream_ops = { static const bbsink_ops bbsink_copystream_ops = {
.begin_backup = bbsink_copystream_begin_backup, .begin_backup = bbsink_copystream_begin_backup,
@ -339,10 +340,10 @@ static void
SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
{ {
DestReceiver *dest; DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc; TupleDesc tupdesc;
StringInfoData buf; Datum values[2];
char str[MAXFNAMELEN]; bool nulls[2] = {0};
Size len;
dest = CreateDestReceiver(DestRemoteSimple); dest = CreateDestReceiver(DestRemoteSimple);
@ -355,22 +356,14 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0);
/* send RowDescription */ /* send RowDescription */
dest->rStartup(dest, CMD_SELECT, tupdesc); tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
/* Data row */ /* Data row */
pq_beginmessage(&buf, 'D'); values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr)));
pq_sendint16(&buf, 2); /* number of columns */ values[1] = Int64GetDatum(tli);
do_tup_output(tstate, values, nulls);
len = snprintf(str, sizeof(str), end_tup_output(tstate);
"%X/%X", LSN_FORMAT_ARGS(ptr));
pq_sendint32(&buf, len);
pq_sendbytes(&buf, str, len);
len = snprintf(str, sizeof(str), "%u", tli);
pq_sendint32(&buf, len);
pq_sendbytes(&buf, str, len);
pq_endmessage(&buf);
/* Send a CommandComplete message */ /* Send a CommandComplete message */
pq_puttextmessage('C', "SELECT"); pq_puttextmessage('C', "SELECT");
@ -383,8 +376,8 @@ static void
SendTablespaceList(List *tablespaces) SendTablespaceList(List *tablespaces)
{ {
DestReceiver *dest; DestReceiver *dest;
TupOutputState *tstate;
TupleDesc tupdesc; TupleDesc tupdesc;
StringInfoData buf;
ListCell *lc; ListCell *lc;
dest = CreateDestReceiver(DestRemoteSimple); dest = CreateDestReceiver(DestRemoteSimple);
@ -395,51 +388,33 @@ SendTablespaceList(List *tablespaces)
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0);
/* send RowDescription */ /* send RowDescription */
dest->rStartup(dest, CMD_SELECT, tupdesc); tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
/* Construct and send the directory information */ /* Construct and send the directory information */
foreach(lc, tablespaces) foreach(lc, tablespaces)
{ {
tablespaceinfo *ti = lfirst(lc); tablespaceinfo *ti = lfirst(lc);
Datum values[3];
bool nulls[3] = {0};
/* Send one datarow message */ /* Send one datarow message */
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 3); /* number of columns */
if (ti->path == NULL) if (ti->path == NULL)
{ {
pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ nulls[0] = true;
pq_sendint32(&buf, -1); nulls[1] = true;
} }
else else
{ {
Size len; values[0] = ObjectIdGetDatum(strtoul(ti->oid, NULL, 10));
values[1] = CStringGetTextDatum(ti->path);
len = strlen(ti->oid);
pq_sendint32(&buf, len);
pq_sendbytes(&buf, ti->oid, len);
len = strlen(ti->path);
pq_sendint32(&buf, len);
pq_sendbytes(&buf, ti->path, len);
} }
if (ti->size >= 0) if (ti->size >= 0)
send_int8_string(&buf, ti->size / 1024); values[2] = Int64GetDatum(ti->size / 1024);
else else
pq_sendint32(&buf, -1); /* NULL */ nulls[2] = true;
pq_endmessage(&buf); do_tup_output(tstate, values, nulls);
} }
}
/* end_tup_output(tstate);
* Send a 64-bit integer as a string via the wire protocol.
*/
static void
send_int8_string(StringInfoData *buf, int64 intval)
{
char is[32];
sprintf(is, INT64_FORMAT, intval);
pq_sendint32(buf, strlen(is));
pq_sendbytes(buf, is, strlen(is));
} }