diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index f3c7937a1d..9fe4ac8126 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -32,6 +32,14 @@ /* Time to sleep between reconnection attempts */ #define RECONNECT_SLEEP_TIME 5 +typedef enum +{ + STREAM_STOP_NONE, + STREAM_STOP_END_OF_WAL, + STREAM_STOP_KEEPALIVE, + STREAM_STOP_SIGNAL +} StreamStopReason; + /* Global Options */ static char *outfile = NULL; static int verbose = 0; @@ -55,6 +63,7 @@ static const char *plugin = "test_decoding"; /* Global State */ static int outfd = -1; static volatile sig_atomic_t time_to_abort = false; +static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE; static volatile sig_atomic_t output_reopen = false; static bool output_isfile; static TimestampTz output_last_fsync = -1; @@ -66,7 +75,8 @@ static void usage(void); static void StreamLogicalLog(void); static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, - bool keepalive, XLogRecPtr lsn); + StreamStopReason reason, + XLogRecPtr lsn); static void usage(void) @@ -207,9 +217,11 @@ StreamLogicalLog(void) TimestampTz last_status = -1; int i; PQExpBuffer query; + XLogRecPtr cur_record_lsn; output_written_lsn = InvalidXLogRecPtr; output_fsync_lsn = InvalidXLogRecPtr; + cur_record_lsn = InvalidXLogRecPtr; /* * Connect in replication mode to the server @@ -275,7 +287,8 @@ StreamLogicalLog(void) int bytes_written; TimestampTz now; int hdr_len; - XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; + + cur_record_lsn = InvalidXLogRecPtr; if (copybuf != NULL) { @@ -487,7 +500,7 @@ StreamLogicalLog(void) if (endposReached) { - prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); + stop_reason = STREAM_STOP_KEEPALIVE; time_to_abort = true; break; } @@ -527,7 +540,7 @@ StreamLogicalLog(void) */ if (!flushAndSendFeedback(conn, &now)) goto error; - prepareToTerminate(conn, endpos, false, cur_record_lsn); + stop_reason = STREAM_STOP_END_OF_WAL; time_to_abort = true; break; } @@ -572,12 +585,16 @@ StreamLogicalLog(void) /* endpos was exactly the record we just processed, we're done */ if (!flushAndSendFeedback(conn, &now)) goto error; - prepareToTerminate(conn, endpos, false, cur_record_lsn); + stop_reason = STREAM_STOP_END_OF_WAL; time_to_abort = true; break; } } + /* Clean up connection state if stream has been aborted */ + if (time_to_abort) + prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn); + res = PQgetResult(conn); if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -656,6 +673,7 @@ error: static void sigexit_handler(SIGNAL_ARGS) { + stop_reason = STREAM_STOP_SIGNAL; time_to_abort = true; } @@ -1021,18 +1039,31 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now) * retry on failure. */ static void -prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn) +prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, + XLogRecPtr lsn) { (void) PQputCopyEnd(conn, NULL); (void) PQflush(conn); if (verbose) { - if (keepalive) - pg_log_info("end position %X/%X reached by keepalive", - LSN_FORMAT_ARGS(endpos)); - else - pg_log_info("end position %X/%X reached by WAL record at %X/%X", - LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn)); + switch (reason) + { + case STREAM_STOP_SIGNAL: + pg_log_info("received interrupt signal, exiting"); + break; + case STREAM_STOP_KEEPALIVE: + pg_log_info("end position %X/%X reached by keepalive", + LSN_FORMAT_ARGS(endpos)); + break; + case STREAM_STOP_END_OF_WAL: + Assert(!XLogRecPtrIsInvalid(lsn)); + pg_log_info("end position %X/%X reached by WAL record at %X/%X", + LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn)); + break; + case STREAM_STOP_NONE: + Assert(false); + break; + } } } diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index e941fb6c82..a1cf01e38e 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2639,6 +2639,7 @@ Step StopList StrategyNumber StreamCtl +StreamStopReason String StringInfo StringInfoData