Rename bbstreamer to astreamer.

I (rhaas) intended "bbstreamer" to stand for "base backup streamer,"
but that implies that this infrastructure can only ever be used by
pg_basebackup.  In fact, it is a generally useful way of streaming
data from a tar or compressed tar file, and it could be extended to
work with other archive formats as well if we ever wanted to do that.
Hence, rename it to "astreamer" (archive streamer) in preparation for
reusing the infrastructure from pg_verifybackup (and perhaps
eventually also other utilities, such as pg_combinebackup or
pg_waldump).

This is purely a renaming commit. Comment adjustments and relocation
of the actual code to someplace from which it can be reused are left
to future commits.

Amul Sul, reviewed by Sravan Kumar and by me.

Discussion: http://postgr.es/m/CAAJ_b94StvLWrc_p4q-f7n3OPfr6GhL8_XuAg2aAaYZp1tF-nw@mail.gmail.com
This commit is contained in:
Robert Haas 2024-08-05 09:35:42 -04:00
parent 66e94448ab
commit 3c90569811
13 changed files with 860 additions and 860 deletions

View File

@ -37,12 +37,12 @@ OBJS = \
BBOBJS = \
pg_basebackup.o \
bbstreamer_file.o \
bbstreamer_gzip.o \
bbstreamer_inject.o \
bbstreamer_lz4.o \
bbstreamer_tar.o \
bbstreamer_zstd.o
astreamer_file.o \
astreamer_gzip.o \
astreamer_inject.o \
astreamer_lz4.o \
astreamer_tar.o \
astreamer_zstd.o
all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical

View File

@ -0,0 +1,226 @@
/*-------------------------------------------------------------------------
*
* astreamer.h
*
* Each tar archive returned by the server is passed to one or more
* astreamer objects for further processing. The astreamer may do
* something simple, like write the archive to a file, perhaps after
* compressing it, but it can also do more complicated things, like
* annotating the byte stream to indicate which parts of the data
* correspond to tar headers or trailing padding, vs. which parts are
* payload data. A subsequent astreamer may use this information to
* make further decisions about how to process the data; for example,
* it might choose to modify the archive contents.
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/astreamer.h
*-------------------------------------------------------------------------
*/
#ifndef ASTREAMER_H
#define ASTREAMER_H
#include "common/compression.h"
#include "lib/stringinfo.h"
#include "pqexpbuffer.h"
struct astreamer;
struct astreamer_ops;
typedef struct astreamer astreamer;
typedef struct astreamer_ops astreamer_ops;
/*
* Each chunk of archive data passed to a astreamer is classified into one
* of these categories. When data is first received from the remote server,
* each chunk will be categorized as ASTREAMER_UNKNOWN, and the chunks will
* be of whatever size the remote server chose to send.
*
* If the archive is parsed (e.g. see astreamer_tar_parser_new()), then all
* chunks should be labelled as one of the other types listed here. In
* addition, there should be exactly one ASTREAMER_MEMBER_HEADER chunk and
* exactly one ASTREAMER_MEMBER_TRAILER chunk per archive member, even if
* that means a zero-length call. There can be any number of
* ASTREAMER_MEMBER_CONTENTS chunks in between those calls. There
* should exactly ASTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
* last ASTREAMER_MEMBER_TRAILER chunk.
*
* In theory, we could need other classifications here, such as a way of
* indicating an archive header, but the "tar" format doesn't need anything
* else, so for the time being there's no point.
*/
typedef enum
{
ASTREAMER_UNKNOWN,
ASTREAMER_MEMBER_HEADER,
ASTREAMER_MEMBER_CONTENTS,
ASTREAMER_MEMBER_TRAILER,
ASTREAMER_ARCHIVE_TRAILER,
} astreamer_archive_context;
/*
* Each chunk of data that is classified as ASTREAMER_MEMBER_HEADER,
* ASTREAMER_MEMBER_CONTENTS, or ASTREAMER_MEMBER_TRAILER should also
* pass a pointer to an instance of this struct. The details are expected
* to be present in the archive header and used to fill the struct, after
* which all subsequent calls for the same archive member are expected to
* pass the same details.
*/
typedef struct
{
char pathname[MAXPGPATH];
pgoff_t size;
mode_t mode;
uid_t uid;
gid_t gid;
bool is_directory;
bool is_link;
char linktarget[MAXPGPATH];
} astreamer_member;
/*
* Generally, each type of astreamer will define its own struct, but the
* first element should be 'astreamer base'. A astreamer that does not
* require any additional private data could use this structure directly.
*
* bbs_ops is a pointer to the astreamer_ops object which contains the
* function pointers appropriate to this type of astreamer.
*
* bbs_next is a pointer to the successor astreamer, for those types of
* astreamer which forward data to a successor. It need not be used and
* should be set to NULL when not relevant.
*
* bbs_buffer is a buffer for accumulating data for temporary storage. Each
* type of astreamer makes its own decisions about whether and how to use
* this buffer.
*/
struct astreamer
{
const astreamer_ops *bbs_ops;
astreamer *bbs_next;
StringInfoData bbs_buffer;
};
/*
* There are three callbacks for a astreamer. The 'content' callback is
* called repeatedly, as described in the astreamer_archive_context comments.
* Then, the 'finalize' callback is called once at the end, to give the
* astreamer a chance to perform cleanup such as closing files. Finally,
* because this code is running in a frontend environment where, as of this
* writing, there are no memory contexts, the 'free' callback is called to
* release memory. These callbacks should always be invoked using the static
* inline functions defined below.
*/
struct astreamer_ops
{
void (*content) (astreamer *streamer, astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
void (*finalize) (astreamer *streamer);
void (*free) (astreamer *streamer);
};
/* Send some content to a astreamer. */
static inline void
astreamer_content(astreamer *streamer, astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
Assert(streamer != NULL);
streamer->bbs_ops->content(streamer, member, data, len, context);
}
/* Finalize a astreamer. */
static inline void
astreamer_finalize(astreamer *streamer)
{
Assert(streamer != NULL);
streamer->bbs_ops->finalize(streamer);
}
/* Free a astreamer. */
static inline void
astreamer_free(astreamer *streamer)
{
Assert(streamer != NULL);
streamer->bbs_ops->free(streamer);
}
/*
* This is a convenience method for use when implementing a astreamer; it is
* not for use by outside callers. It adds the amount of data specified by
* 'nbytes' to the astreamer's buffer and adjusts '*len' and '*data'
* accordingly.
*/
static inline void
astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len,
int nbytes)
{
Assert(nbytes <= *len);
appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
*len -= nbytes;
*data += nbytes;
}
/*
* This is a convenience method for use when implementing a astreamer; it is
* not for use by outsider callers. It attempts to add enough data to the
* astreamer's buffer to reach a length of target_bytes and adjusts '*len'
* and '*data' accordingly. It returns true if the target length has been
* reached and false otherwise.
*/
static inline bool
astreamer_buffer_until(astreamer *streamer, const char **data, int *len,
int target_bytes)
{
int buflen = streamer->bbs_buffer.len;
if (buflen >= target_bytes)
{
/* Target length already reached; nothing to do. */
return true;
}
if (buflen + *len < target_bytes)
{
/* Not enough data to reach target length; buffer all of it. */
astreamer_buffer_bytes(streamer, data, len, *len);
return false;
}
/* Buffer just enough to reach the target length. */
astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
return true;
}
/*
* Functions for creating astreamer objects of various types. See the header
* comments for each of these functions for details.
*/
extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file);
extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file,
pg_compress_specification *compress);
extern astreamer *astreamer_extractor_new(const char *basepath,
const char *(*link_map) (const char *),
void (*report_output_file) (const char *));
extern astreamer *astreamer_gzip_decompressor_new(astreamer *next);
extern astreamer *astreamer_lz4_compressor_new(astreamer *next,
pg_compress_specification *compress);
extern astreamer *astreamer_lz4_decompressor_new(astreamer *next);
extern astreamer *astreamer_zstd_compressor_new(astreamer *next,
pg_compress_specification *compress);
extern astreamer *astreamer_zstd_decompressor_new(astreamer *next);
extern astreamer *astreamer_tar_parser_new(astreamer *next);
extern astreamer *astreamer_tar_terminator_new(astreamer *next);
extern astreamer *astreamer_tar_archiver_new(astreamer *next);
extern astreamer *astreamer_recovery_injector_new(astreamer *next,
bool is_recovery_guc_supported,
PQExpBuffer recoveryconfcontents);
extern void astreamer_inject_file(astreamer *streamer, char *pathname,
char *data, int len);
#endif

View File

@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_file.c
* astreamer_file.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_file.c
* src/bin/pg_basebackup/astreamer_file.c
*-------------------------------------------------------------------------
*/
@ -13,60 +13,60 @@
#include <unistd.h>
#include "bbstreamer.h"
#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
typedef struct bbstreamer_plain_writer
typedef struct astreamer_plain_writer
{
bbstreamer base;
astreamer base;
char *pathname;
FILE *file;
bool should_close_file;
} bbstreamer_plain_writer;
} astreamer_plain_writer;
typedef struct bbstreamer_extractor
typedef struct astreamer_extractor
{
bbstreamer base;
astreamer base;
char *basepath;
const char *(*link_map) (const char *);
void (*report_output_file) (const char *);
char filename[MAXPGPATH];
FILE *file;
} bbstreamer_extractor;
} astreamer_extractor;
static void bbstreamer_plain_writer_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_plain_writer_finalize(bbstreamer *streamer);
static void bbstreamer_plain_writer_free(bbstreamer *streamer);
static void astreamer_plain_writer_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_plain_writer_finalize(astreamer *streamer);
static void astreamer_plain_writer_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_plain_writer_ops = {
.content = bbstreamer_plain_writer_content,
.finalize = bbstreamer_plain_writer_finalize,
.free = bbstreamer_plain_writer_free
static const astreamer_ops astreamer_plain_writer_ops = {
.content = astreamer_plain_writer_content,
.finalize = astreamer_plain_writer_finalize,
.free = astreamer_plain_writer_free
};
static void bbstreamer_extractor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_extractor_finalize(bbstreamer *streamer);
static void bbstreamer_extractor_free(bbstreamer *streamer);
static void astreamer_extractor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_extractor_finalize(astreamer *streamer);
static void astreamer_extractor_free(astreamer *streamer);
static void extract_directory(const char *filename, mode_t mode);
static void extract_link(const char *filename, const char *linktarget);
static FILE *create_file_for_extract(const char *filename, mode_t mode);
static const bbstreamer_ops bbstreamer_extractor_ops = {
.content = bbstreamer_extractor_content,
.finalize = bbstreamer_extractor_finalize,
.free = bbstreamer_extractor_free
static const astreamer_ops astreamer_extractor_ops = {
.content = astreamer_extractor_content,
.finalize = astreamer_extractor_finalize,
.free = astreamer_extractor_free
};
/*
* Create a bbstreamer that just writes data to a file.
* Create a astreamer that just writes data to a file.
*
* The caller must specify a pathname and may specify a file. The pathname is
* used for error-reporting purposes either way. If file is NULL, the pathname
@ -74,14 +74,14 @@ static const bbstreamer_ops bbstreamer_extractor_ops = {
* for writing and closed when done. If file is not NULL, the data is written
* there.
*/
bbstreamer *
bbstreamer_plain_writer_new(char *pathname, FILE *file)
astreamer *
astreamer_plain_writer_new(char *pathname, FILE *file)
{
bbstreamer_plain_writer *streamer;
astreamer_plain_writer *streamer;
streamer = palloc0(sizeof(bbstreamer_plain_writer));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_plain_writer_ops;
streamer = palloc0(sizeof(astreamer_plain_writer));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_plain_writer_ops;
streamer->pathname = pstrdup(pathname);
streamer->file = file;
@ -101,13 +101,13 @@ bbstreamer_plain_writer_new(char *pathname, FILE *file)
* Write archive content to file.
*/
static void
bbstreamer_plain_writer_content(bbstreamer *streamer,
bbstreamer_member *member, const char *data,
int len, bbstreamer_archive_context context)
astreamer_plain_writer_content(astreamer *streamer,
astreamer_member *member, const char *data,
int len, astreamer_archive_context context)
{
bbstreamer_plain_writer *mystreamer;
astreamer_plain_writer *mystreamer;
mystreamer = (bbstreamer_plain_writer *) streamer;
mystreamer = (astreamer_plain_writer *) streamer;
if (len == 0)
return;
@ -128,11 +128,11 @@ bbstreamer_plain_writer_content(bbstreamer *streamer,
* the file if we opened it, but not if the caller provided it.
*/
static void
bbstreamer_plain_writer_finalize(bbstreamer *streamer)
astreamer_plain_writer_finalize(astreamer *streamer)
{
bbstreamer_plain_writer *mystreamer;
astreamer_plain_writer *mystreamer;
mystreamer = (bbstreamer_plain_writer *) streamer;
mystreamer = (astreamer_plain_writer *) streamer;
if (mystreamer->should_close_file && fclose(mystreamer->file) != 0)
pg_fatal("could not close file \"%s\": %m",
@ -143,14 +143,14 @@ bbstreamer_plain_writer_finalize(bbstreamer *streamer)
}
/*
* Free memory associated with this bbstreamer.
* Free memory associated with this astreamer.
*/
static void
bbstreamer_plain_writer_free(bbstreamer *streamer)
astreamer_plain_writer_free(astreamer *streamer)
{
bbstreamer_plain_writer *mystreamer;
astreamer_plain_writer *mystreamer;
mystreamer = (bbstreamer_plain_writer *) streamer;
mystreamer = (astreamer_plain_writer *) streamer;
Assert(!mystreamer->should_close_file);
Assert(mystreamer->base.bbs_next == NULL);
@ -160,13 +160,13 @@ bbstreamer_plain_writer_free(bbstreamer *streamer)
}
/*
* Create a bbstreamer that extracts an archive.
* Create a astreamer that extracts an archive.
*
* All pathnames in the archive are interpreted relative to basepath.
*
* Unlike e.g. bbstreamer_plain_writer_new() we can't do anything useful here
* Unlike e.g. astreamer_plain_writer_new() we can't do anything useful here
* with untyped chunks; we need typed chunks which follow the rules described
* in bbstreamer.h. Assuming we have that, we don't need to worry about the
* in astreamer.h. Assuming we have that, we don't need to worry about the
* original archive format; it's enough to just look at the member information
* provided and write to the corresponding file.
*
@ -179,16 +179,16 @@ bbstreamer_plain_writer_free(bbstreamer *streamer)
* new output file. The pathname to that file is passed as an argument. If
* NULL, the call is skipped.
*/
bbstreamer *
bbstreamer_extractor_new(const char *basepath,
const char *(*link_map) (const char *),
void (*report_output_file) (const char *))
astreamer *
astreamer_extractor_new(const char *basepath,
const char *(*link_map) (const char *),
void (*report_output_file) (const char *))
{
bbstreamer_extractor *streamer;
astreamer_extractor *streamer;
streamer = palloc0(sizeof(bbstreamer_extractor));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_extractor_ops;
streamer = palloc0(sizeof(astreamer_extractor));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_extractor_ops;
streamer->basepath = pstrdup(basepath);
streamer->link_map = link_map;
streamer->report_output_file = report_output_file;
@ -200,19 +200,19 @@ bbstreamer_extractor_new(const char *basepath,
* Extract archive contents to the filesystem.
*/
static void
bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_extractor_content(astreamer *streamer, astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
int fnamelen;
Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
Assert(context != BBSTREAMER_UNKNOWN);
Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER);
Assert(context != ASTREAMER_UNKNOWN);
switch (context)
{
case BBSTREAMER_MEMBER_HEADER:
case ASTREAMER_MEMBER_HEADER:
Assert(mystreamer->file == NULL);
/* Prepend basepath. */
@ -245,7 +245,7 @@ bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
mystreamer->report_output_file(mystreamer->filename);
break;
case BBSTREAMER_MEMBER_CONTENTS:
case ASTREAMER_MEMBER_CONTENTS:
if (mystreamer->file == NULL)
break;
@ -260,14 +260,14 @@ bbstreamer_extractor_content(bbstreamer *streamer, bbstreamer_member *member,
}
break;
case BBSTREAMER_MEMBER_TRAILER:
case ASTREAMER_MEMBER_TRAILER:
if (mystreamer->file == NULL)
break;
fclose(mystreamer->file);
mystreamer->file = NULL;
break;
case BBSTREAMER_ARCHIVE_TRAILER:
case ASTREAMER_ARCHIVE_TRAILER:
break;
default:
@ -375,10 +375,10 @@ create_file_for_extract(const char *filename, mode_t mode)
* There's nothing to do here but sanity checking.
*/
static void
bbstreamer_extractor_finalize(bbstreamer *streamer)
astreamer_extractor_finalize(astreamer *streamer)
{
bbstreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY
= (bbstreamer_extractor *) streamer;
astreamer_extractor *mystreamer PG_USED_FOR_ASSERTS_ONLY
= (astreamer_extractor *) streamer;
Assert(mystreamer->file == NULL);
}
@ -387,9 +387,9 @@ bbstreamer_extractor_finalize(bbstreamer *streamer)
* Free memory.
*/
static void
bbstreamer_extractor_free(bbstreamer *streamer)
astreamer_extractor_free(astreamer *streamer)
{
bbstreamer_extractor *mystreamer = (bbstreamer_extractor *) streamer;
astreamer_extractor *mystreamer = (astreamer_extractor *) streamer;
pfree(mystreamer->basepath);
pfree(mystreamer);

View File

@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_gzip.c
* astreamer_gzip.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_gzip.c
* src/bin/pg_basebackup/astreamer_gzip.c
*-------------------------------------------------------------------------
*/
@ -17,74 +17,74 @@
#include <zlib.h>
#endif
#include "bbstreamer.h"
#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#ifdef HAVE_LIBZ
typedef struct bbstreamer_gzip_writer
typedef struct astreamer_gzip_writer
{
bbstreamer base;
astreamer base;
char *pathname;
gzFile gzfile;
} bbstreamer_gzip_writer;
} astreamer_gzip_writer;
typedef struct bbstreamer_gzip_decompressor
typedef struct astreamer_gzip_decompressor
{
bbstreamer base;
astreamer base;
z_stream zstream;
size_t bytes_written;
} bbstreamer_gzip_decompressor;
} astreamer_gzip_decompressor;
static void bbstreamer_gzip_writer_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_gzip_writer_finalize(bbstreamer *streamer);
static void bbstreamer_gzip_writer_free(bbstreamer *streamer);
static void astreamer_gzip_writer_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_gzip_writer_finalize(astreamer *streamer);
static void astreamer_gzip_writer_free(astreamer *streamer);
static const char *get_gz_error(gzFile gzf);
static const bbstreamer_ops bbstreamer_gzip_writer_ops = {
.content = bbstreamer_gzip_writer_content,
.finalize = bbstreamer_gzip_writer_finalize,
.free = bbstreamer_gzip_writer_free
static const astreamer_ops astreamer_gzip_writer_ops = {
.content = astreamer_gzip_writer_content,
.finalize = astreamer_gzip_writer_finalize,
.free = astreamer_gzip_writer_free
};
static void bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer);
static void bbstreamer_gzip_decompressor_free(bbstreamer *streamer);
static void astreamer_gzip_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_gzip_decompressor_finalize(astreamer *streamer);
static void astreamer_gzip_decompressor_free(astreamer *streamer);
static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
static void gzip_pfree(void *opaque, void *address);
static const bbstreamer_ops bbstreamer_gzip_decompressor_ops = {
.content = bbstreamer_gzip_decompressor_content,
.finalize = bbstreamer_gzip_decompressor_finalize,
.free = bbstreamer_gzip_decompressor_free
static const astreamer_ops astreamer_gzip_decompressor_ops = {
.content = astreamer_gzip_decompressor_content,
.finalize = astreamer_gzip_decompressor_finalize,
.free = astreamer_gzip_decompressor_free
};
#endif
/*
* Create a bbstreamer that just compresses data using gzip, and then writes
* Create a astreamer that just compresses data using gzip, and then writes
* it to a file.
*
* As in the case of bbstreamer_plain_writer_new, pathname is always used
* As in the case of astreamer_plain_writer_new, pathname is always used
* for error reporting purposes; if file is NULL, it is also the opened and
* closed so that the data may be written there.
*/
bbstreamer *
bbstreamer_gzip_writer_new(char *pathname, FILE *file,
pg_compress_specification *compress)
astreamer *
astreamer_gzip_writer_new(char *pathname, FILE *file,
pg_compress_specification *compress)
{
#ifdef HAVE_LIBZ
bbstreamer_gzip_writer *streamer;
astreamer_gzip_writer *streamer;
streamer = palloc0(sizeof(bbstreamer_gzip_writer));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_gzip_writer_ops;
streamer = palloc0(sizeof(astreamer_gzip_writer));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_gzip_writer_ops;
streamer->pathname = pstrdup(pathname);
@ -123,13 +123,13 @@ bbstreamer_gzip_writer_new(char *pathname, FILE *file,
* Write archive content to gzip file.
*/
static void
bbstreamer_gzip_writer_content(bbstreamer *streamer,
bbstreamer_member *member, const char *data,
int len, bbstreamer_archive_context context)
astreamer_gzip_writer_content(astreamer *streamer,
astreamer_member *member, const char *data,
int len, astreamer_archive_context context)
{
bbstreamer_gzip_writer *mystreamer;
astreamer_gzip_writer *mystreamer;
mystreamer = (bbstreamer_gzip_writer *) streamer;
mystreamer = (astreamer_gzip_writer *) streamer;
if (len == 0)
return;
@ -151,16 +151,16 @@ bbstreamer_gzip_writer_content(bbstreamer *streamer,
*
* It makes no difference whether we opened the file or the caller did it,
* because libz provides no way of avoiding a close on the underlying file
* handle. Notice, however, that bbstreamer_gzip_writer_new() uses dup() to
* handle. Notice, however, that astreamer_gzip_writer_new() uses dup() to
* work around this issue, so that the behavior from the caller's viewpoint
* is the same as for bbstreamer_plain_writer.
* is the same as for astreamer_plain_writer.
*/
static void
bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
astreamer_gzip_writer_finalize(astreamer *streamer)
{
bbstreamer_gzip_writer *mystreamer;
astreamer_gzip_writer *mystreamer;
mystreamer = (bbstreamer_gzip_writer *) streamer;
mystreamer = (astreamer_gzip_writer *) streamer;
errno = 0; /* in case gzclose() doesn't set it */
if (gzclose(mystreamer->gzfile) != 0)
@ -171,14 +171,14 @@ bbstreamer_gzip_writer_finalize(bbstreamer *streamer)
}
/*
* Free memory associated with this bbstreamer.
* Free memory associated with this astreamer.
*/
static void
bbstreamer_gzip_writer_free(bbstreamer *streamer)
astreamer_gzip_writer_free(astreamer *streamer)
{
bbstreamer_gzip_writer *mystreamer;
astreamer_gzip_writer *mystreamer;
mystreamer = (bbstreamer_gzip_writer *) streamer;
mystreamer = (astreamer_gzip_writer *) streamer;
Assert(mystreamer->base.bbs_next == NULL);
Assert(mystreamer->gzfile == NULL);
@ -208,18 +208,18 @@ get_gz_error(gzFile gzf)
* Create a new base backup streamer that performs decompression of gzip
* compressed blocks.
*/
bbstreamer *
bbstreamer_gzip_decompressor_new(bbstreamer *next)
astreamer *
astreamer_gzip_decompressor_new(astreamer *next)
{
#ifdef HAVE_LIBZ
bbstreamer_gzip_decompressor *streamer;
astreamer_gzip_decompressor *streamer;
z_stream *zs;
Assert(next != NULL);
streamer = palloc0(sizeof(bbstreamer_gzip_decompressor));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_gzip_decompressor_ops;
streamer = palloc0(sizeof(astreamer_gzip_decompressor));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_gzip_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@ -258,15 +258,15 @@ bbstreamer_gzip_decompressor_new(bbstreamer *next)
* to the next streamer.
*/
static void
bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_gzip_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_gzip_decompressor *mystreamer;
astreamer_gzip_decompressor *mystreamer;
z_stream *zs;
mystreamer = (bbstreamer_gzip_decompressor *) streamer;
mystreamer = (astreamer_gzip_decompressor *) streamer;
zs = &mystreamer->zstream;
zs->next_in = (const uint8 *) data;
@ -301,9 +301,9 @@ bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
/* If output buffer is full then pass data to next streamer */
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
{
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen, context);
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen, context);
mystreamer->bytes_written = 0;
}
}
@ -313,31 +313,31 @@ bbstreamer_gzip_decompressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
bbstreamer_gzip_decompressor_finalize(bbstreamer *streamer)
astreamer_gzip_decompressor_finalize(astreamer *streamer)
{
bbstreamer_gzip_decompressor *mystreamer;
astreamer_gzip_decompressor *mystreamer;
mystreamer = (bbstreamer_gzip_decompressor *) streamer;
mystreamer = (astreamer_gzip_decompressor *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
BBSTREAMER_UNKNOWN);
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
ASTREAMER_UNKNOWN);
bbstreamer_finalize(mystreamer->base.bbs_next);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
bbstreamer_gzip_decompressor_free(bbstreamer *streamer)
astreamer_gzip_decompressor_free(astreamer *streamer)
{
bbstreamer_free(streamer->bbs_next);
astreamer_free(streamer->bbs_next);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
}

View File

@ -1,51 +1,51 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_inject.c
* astreamer_inject.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_inject.c
* src/bin/pg_basebackup/astreamer_inject.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include "bbstreamer.h"
#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
typedef struct bbstreamer_recovery_injector
typedef struct astreamer_recovery_injector
{
bbstreamer base;
astreamer base;
bool skip_file;
bool is_recovery_guc_supported;
bool is_postgresql_auto_conf;
bool found_postgresql_auto_conf;
PQExpBuffer recoveryconfcontents;
bbstreamer_member member;
} bbstreamer_recovery_injector;
astreamer_member member;
} astreamer_recovery_injector;
static void bbstreamer_recovery_injector_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_recovery_injector_finalize(bbstreamer *streamer);
static void bbstreamer_recovery_injector_free(bbstreamer *streamer);
static void astreamer_recovery_injector_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_recovery_injector_finalize(astreamer *streamer);
static void astreamer_recovery_injector_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_recovery_injector_ops = {
.content = bbstreamer_recovery_injector_content,
.finalize = bbstreamer_recovery_injector_finalize,
.free = bbstreamer_recovery_injector_free
static const astreamer_ops astreamer_recovery_injector_ops = {
.content = astreamer_recovery_injector_content,
.finalize = astreamer_recovery_injector_finalize,
.free = astreamer_recovery_injector_free
};
/*
* Create a bbstreamer that can edit recoverydata into an archive stream.
* Create a astreamer that can edit recoverydata into an archive stream.
*
* The input should be a series of typed chunks (not BBSTREAMER_UNKNOWN) as
* per the conventions described in bbstreamer.h; the chunks forwarded to
* the next bbstreamer will be similarly typed, but the
* BBSTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've
* The input should be a series of typed chunks (not ASTREAMER_UNKNOWN) as
* per the conventions described in astreamer.h; the chunks forwarded to
* the next astreamer will be similarly typed, but the
* ASTREAMER_MEMBER_HEADER chunks may be zero-length in cases where we've
* edited the archive stream.
*
* Our goal is to do one of the following three things with the content passed
@ -61,16 +61,16 @@ static const bbstreamer_ops bbstreamer_recovery_injector_ops = {
* zero-length standby.signal file, dropping any file with that name from
* the archive.
*/
bbstreamer *
bbstreamer_recovery_injector_new(bbstreamer *next,
bool is_recovery_guc_supported,
PQExpBuffer recoveryconfcontents)
astreamer *
astreamer_recovery_injector_new(astreamer *next,
bool is_recovery_guc_supported,
PQExpBuffer recoveryconfcontents)
{
bbstreamer_recovery_injector *streamer;
astreamer_recovery_injector *streamer;
streamer = palloc0(sizeof(bbstreamer_recovery_injector));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_recovery_injector_ops;
streamer = palloc0(sizeof(astreamer_recovery_injector));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_recovery_injector_ops;
streamer->base.bbs_next = next;
streamer->is_recovery_guc_supported = is_recovery_guc_supported;
streamer->recoveryconfcontents = recoveryconfcontents;
@ -82,21 +82,21 @@ bbstreamer_recovery_injector_new(bbstreamer *next,
* Handle each chunk of tar content while injecting recovery configuration.
*/
static void
bbstreamer_recovery_injector_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_recovery_injector_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_recovery_injector *mystreamer;
astreamer_recovery_injector *mystreamer;
mystreamer = (bbstreamer_recovery_injector *) streamer;
Assert(member != NULL || context == BBSTREAMER_ARCHIVE_TRAILER);
mystreamer = (astreamer_recovery_injector *) streamer;
Assert(member != NULL || context == ASTREAMER_ARCHIVE_TRAILER);
switch (context)
{
case BBSTREAMER_MEMBER_HEADER:
case ASTREAMER_MEMBER_HEADER:
/* Must copy provided data so we have the option to modify it. */
memcpy(&mystreamer->member, member, sizeof(bbstreamer_member));
memcpy(&mystreamer->member, member, sizeof(astreamer_member));
/*
* On v12+, skip standby.signal and edit postgresql.auto.conf; on
@ -119,8 +119,8 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
/*
* Zap data and len because the archive header is no
* longer valid; some subsequent bbstreamer must
* regenerate it if it's necessary.
* longer valid; some subsequent astreamer must regenerate
* it if it's necessary.
*/
data = NULL;
len = 0;
@ -135,26 +135,26 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
return;
break;
case BBSTREAMER_MEMBER_CONTENTS:
case ASTREAMER_MEMBER_CONTENTS:
/* Do not forward if the file is to be skipped. */
if (mystreamer->skip_file)
return;
break;
case BBSTREAMER_MEMBER_TRAILER:
case ASTREAMER_MEMBER_TRAILER:
/* Do not forward it the file is to be skipped. */
if (mystreamer->skip_file)
return;
/* Append provided content to whatever we already sent. */
if (mystreamer->is_postgresql_auto_conf)
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len,
BBSTREAMER_MEMBER_CONTENTS);
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len,
ASTREAMER_MEMBER_CONTENTS);
break;
case BBSTREAMER_ARCHIVE_TRAILER:
case ASTREAMER_ARCHIVE_TRAILER:
if (mystreamer->is_recovery_guc_supported)
{
/*
@ -163,22 +163,22 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
* member now.
*/
if (!mystreamer->found_postgresql_auto_conf)
bbstreamer_inject_file(mystreamer->base.bbs_next,
"postgresql.auto.conf",
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len);
astreamer_inject_file(mystreamer->base.bbs_next,
"postgresql.auto.conf",
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len);
/* Inject empty standby.signal file. */
bbstreamer_inject_file(mystreamer->base.bbs_next,
"standby.signal", "", 0);
astreamer_inject_file(mystreamer->base.bbs_next,
"standby.signal", "", 0);
}
else
{
/* Inject recovery.conf file with specified contents. */
bbstreamer_inject_file(mystreamer->base.bbs_next,
"recovery.conf",
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len);
astreamer_inject_file(mystreamer->base.bbs_next,
"recovery.conf",
mystreamer->recoveryconfcontents->data,
mystreamer->recoveryconfcontents->len);
}
/* Nothing to do here. */
@ -189,26 +189,26 @@ bbstreamer_recovery_injector_content(bbstreamer *streamer,
pg_fatal("unexpected state while injecting recovery settings");
}
bbstreamer_content(mystreamer->base.bbs_next, &mystreamer->member,
data, len, context);
astreamer_content(mystreamer->base.bbs_next, &mystreamer->member,
data, len, context);
}
/*
* End-of-stream processing for this bbstreamer.
* End-of-stream processing for this astreamer.
*/
static void
bbstreamer_recovery_injector_finalize(bbstreamer *streamer)
astreamer_recovery_injector_finalize(astreamer *streamer)
{
bbstreamer_finalize(streamer->bbs_next);
astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with this bbstreamer.
* Free memory associated with this astreamer.
*/
static void
bbstreamer_recovery_injector_free(bbstreamer *streamer)
astreamer_recovery_injector_free(astreamer *streamer)
{
bbstreamer_free(streamer->bbs_next);
astreamer_free(streamer->bbs_next);
pfree(streamer);
}
@ -216,10 +216,10 @@ bbstreamer_recovery_injector_free(bbstreamer *streamer)
* Inject a member into the archive with specified contents.
*/
void
bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data,
int len)
astreamer_inject_file(astreamer *streamer, char *pathname, char *data,
int len)
{
bbstreamer_member member;
astreamer_member member;
strlcpy(member.pathname, pathname, MAXPGPATH);
member.size = len;
@ -238,12 +238,12 @@ bbstreamer_inject_file(bbstreamer *streamer, char *pathname, char *data,
/*
* We don't know here how to generate valid member headers and trailers
* for the archiving format in use, so if those are needed, some successor
* bbstreamer will have to generate them using the data from 'member'.
* astreamer will have to generate them using the data from 'member'.
*/
bbstreamer_content(streamer, &member, NULL, 0,
BBSTREAMER_MEMBER_HEADER);
bbstreamer_content(streamer, &member, data, len,
BBSTREAMER_MEMBER_CONTENTS);
bbstreamer_content(streamer, &member, NULL, 0,
BBSTREAMER_MEMBER_TRAILER);
astreamer_content(streamer, &member, NULL, 0,
ASTREAMER_MEMBER_HEADER);
astreamer_content(streamer, &member, data, len,
ASTREAMER_MEMBER_CONTENTS);
astreamer_content(streamer, &member, NULL, 0,
ASTREAMER_MEMBER_TRAILER);
}

View File

@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_lz4.c
* astreamer_lz4.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_lz4.c
* src/bin/pg_basebackup/astreamer_lz4.c
*-------------------------------------------------------------------------
*/
@ -17,15 +17,15 @@
#include <lz4frame.h>
#endif
#include "bbstreamer.h"
#include "astreamer.h"
#include "common/file_perm.h"
#include "common/logging.h"
#include "common/string.h"
#ifdef USE_LZ4
typedef struct bbstreamer_lz4_frame
typedef struct astreamer_lz4_frame
{
bbstreamer base;
astreamer base;
LZ4F_compressionContext_t cctx;
LZ4F_decompressionContext_t dctx;
@ -33,32 +33,32 @@ typedef struct bbstreamer_lz4_frame
size_t bytes_written;
bool header_written;
} bbstreamer_lz4_frame;
} astreamer_lz4_frame;
static void bbstreamer_lz4_compressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_lz4_compressor_finalize(bbstreamer *streamer);
static void bbstreamer_lz4_compressor_free(bbstreamer *streamer);
static void astreamer_lz4_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_lz4_compressor_finalize(astreamer *streamer);
static void astreamer_lz4_compressor_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_lz4_compressor_ops = {
.content = bbstreamer_lz4_compressor_content,
.finalize = bbstreamer_lz4_compressor_finalize,
.free = bbstreamer_lz4_compressor_free
static const astreamer_ops astreamer_lz4_compressor_ops = {
.content = astreamer_lz4_compressor_content,
.finalize = astreamer_lz4_compressor_finalize,
.free = astreamer_lz4_compressor_free
};
static void bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer);
static void bbstreamer_lz4_decompressor_free(bbstreamer *streamer);
static void astreamer_lz4_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_lz4_decompressor_finalize(astreamer *streamer);
static void astreamer_lz4_decompressor_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
.content = bbstreamer_lz4_decompressor_content,
.finalize = bbstreamer_lz4_decompressor_finalize,
.free = bbstreamer_lz4_decompressor_free
static const astreamer_ops astreamer_lz4_decompressor_ops = {
.content = astreamer_lz4_decompressor_content,
.finalize = astreamer_lz4_decompressor_finalize,
.free = astreamer_lz4_decompressor_free
};
#endif
@ -66,19 +66,19 @@ static const bbstreamer_ops bbstreamer_lz4_decompressor_ops = {
* Create a new base backup streamer that performs lz4 compression of tar
* blocks.
*/
bbstreamer *
bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compress)
astreamer *
astreamer_lz4_compressor_new(astreamer *next, pg_compress_specification *compress)
{
#ifdef USE_LZ4
bbstreamer_lz4_frame *streamer;
astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
LZ4F_preferences_t *prefs;
Assert(next != NULL);
streamer = palloc0(sizeof(bbstreamer_lz4_frame));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_lz4_compressor_ops;
streamer = palloc0(sizeof(astreamer_lz4_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_lz4_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@ -113,19 +113,19 @@ bbstreamer_lz4_compressor_new(bbstreamer *next, pg_compress_specification *compr
* of output buffer to next streamer and empty the buffer.
*/
static void
bbstreamer_lz4_compressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_lz4_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_lz4_frame *mystreamer;
astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t out_bound,
compressed_size,
avail_out;
mystreamer = (bbstreamer_lz4_frame *) streamer;
mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
/* Write header before processing the first input chunk. */
@ -159,10 +159,10 @@ bbstreamer_lz4_compressor_content(bbstreamer *streamer,
out_bound = LZ4F_compressBound(len, &mystreamer->prefs);
if (avail_out < out_bound)
{
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
context);
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
context);
/* Enlarge buffer if it falls short of out bound. */
if (mystreamer->base.bbs_buffer.maxlen < out_bound)
@ -196,25 +196,25 @@ bbstreamer_lz4_compressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
astreamer_lz4_compressor_finalize(astreamer *streamer)
{
bbstreamer_lz4_frame *mystreamer;
astreamer_lz4_frame *mystreamer;
uint8 *next_out;
size_t footer_bound,
compressed_size,
avail_out;
mystreamer = (bbstreamer_lz4_frame *) streamer;
mystreamer = (astreamer_lz4_frame *) streamer;
/* Find out the footer bound and update the output buffer. */
footer_bound = LZ4F_compressBound(0, &mystreamer->prefs);
if ((mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written) <
footer_bound)
{
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
BBSTREAMER_UNKNOWN);
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
ASTREAMER_UNKNOWN);
/* Enlarge buffer if it falls short of footer bound. */
if (mystreamer->base.bbs_buffer.maxlen < footer_bound)
@ -243,24 +243,24 @@ bbstreamer_lz4_compressor_finalize(bbstreamer *streamer)
mystreamer->bytes_written += compressed_size;
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
BBSTREAMER_UNKNOWN);
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->bytes_written,
ASTREAMER_UNKNOWN);
bbstreamer_finalize(mystreamer->base.bbs_next);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
bbstreamer_lz4_compressor_free(bbstreamer *streamer)
astreamer_lz4_compressor_free(astreamer *streamer)
{
bbstreamer_lz4_frame *mystreamer;
astreamer_lz4_frame *mystreamer;
mystreamer = (bbstreamer_lz4_frame *) streamer;
bbstreamer_free(streamer->bbs_next);
mystreamer = (astreamer_lz4_frame *) streamer;
astreamer_free(streamer->bbs_next);
LZ4F_freeCompressionContext(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
@ -271,18 +271,18 @@ bbstreamer_lz4_compressor_free(bbstreamer *streamer)
* Create a new base backup streamer that performs decompression of lz4
* compressed blocks.
*/
bbstreamer *
bbstreamer_lz4_decompressor_new(bbstreamer *next)
astreamer *
astreamer_lz4_decompressor_new(astreamer *next)
{
#ifdef USE_LZ4
bbstreamer_lz4_frame *streamer;
astreamer_lz4_frame *streamer;
LZ4F_errorCode_t ctxError;
Assert(next != NULL);
streamer = palloc0(sizeof(bbstreamer_lz4_frame));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_lz4_decompressor_ops;
streamer = palloc0(sizeof(astreamer_lz4_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_lz4_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@ -307,18 +307,18 @@ bbstreamer_lz4_decompressor_new(bbstreamer *next)
* to the next streamer.
*/
static void
bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_lz4_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_lz4_frame *mystreamer;
astreamer_lz4_frame *mystreamer;
uint8 *next_in,
*next_out;
size_t avail_in,
avail_out;
mystreamer = (bbstreamer_lz4_frame *) streamer;
mystreamer = (astreamer_lz4_frame *) streamer;
next_in = (uint8 *) data;
next_out = (uint8 *) mystreamer->base.bbs_buffer.data;
avail_in = len;
@ -366,10 +366,10 @@ bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
*/
if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
{
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
context);
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
context);
avail_out = mystreamer->base.bbs_buffer.maxlen;
mystreamer->bytes_written = 0;
@ -387,34 +387,34 @@ bbstreamer_lz4_decompressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
bbstreamer_lz4_decompressor_finalize(bbstreamer *streamer)
astreamer_lz4_decompressor_finalize(astreamer *streamer)
{
bbstreamer_lz4_frame *mystreamer;
astreamer_lz4_frame *mystreamer;
mystreamer = (bbstreamer_lz4_frame *) streamer;
mystreamer = (astreamer_lz4_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
BBSTREAMER_UNKNOWN);
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
ASTREAMER_UNKNOWN);
bbstreamer_finalize(mystreamer->base.bbs_next);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
bbstreamer_lz4_decompressor_free(bbstreamer *streamer)
astreamer_lz4_decompressor_free(astreamer *streamer)
{
bbstreamer_lz4_frame *mystreamer;
astreamer_lz4_frame *mystreamer;
mystreamer = (bbstreamer_lz4_frame *) streamer;
bbstreamer_free(streamer->bbs_next);
mystreamer = (astreamer_lz4_frame *) streamer;
astreamer_free(streamer->bbs_next);
LZ4F_freeDecompressionContext(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);

View File

@ -1,13 +1,13 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_tar.c
* astreamer_tar.c
*
* This module implements three types of tar processing. A tar parser
* expects unlabelled chunks of data (e.g. BBSTREAMER_UNKNOWN) and splits
* it into labelled chunks (any other value of bbstreamer_archive_context).
* expects unlabelled chunks of data (e.g. ASTREAMER_UNKNOWN) and splits
* it into labelled chunks (any other value of astreamer_archive_context).
* A tar archiver does the reverse: it takes a bunch of labelled chunks
* and produces a tarfile, optionally replacing member headers and trailers
* so that upstream bbstreamer objects can perform surgery on the tarfile
* so that upstream astreamer objects can perform surgery on the tarfile
* contents without knowing the details of the tar format. A tar terminator
* just adds two blocks of NUL bytes to the end of the file, since older
* server versions produce files with this terminator omitted.
@ -15,7 +15,7 @@
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_tar.c
* src/bin/pg_basebackup/astreamer_tar.c
*-------------------------------------------------------------------------
*/
@ -23,83 +23,83 @@
#include <time.h>
#include "bbstreamer.h"
#include "astreamer.h"
#include "common/logging.h"
#include "pgtar.h"
typedef struct bbstreamer_tar_parser
typedef struct astreamer_tar_parser
{
bbstreamer base;
bbstreamer_archive_context next_context;
bbstreamer_member member;
astreamer base;
astreamer_archive_context next_context;
astreamer_member member;
size_t file_bytes_sent;
size_t pad_bytes_expected;
} bbstreamer_tar_parser;
} astreamer_tar_parser;
typedef struct bbstreamer_tar_archiver
typedef struct astreamer_tar_archiver
{
bbstreamer base;
astreamer base;
bool rearchive_member;
} bbstreamer_tar_archiver;
} astreamer_tar_archiver;
static void bbstreamer_tar_parser_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_tar_parser_finalize(bbstreamer *streamer);
static void bbstreamer_tar_parser_free(bbstreamer *streamer);
static bool bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer);
static void astreamer_tar_parser_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_tar_parser_finalize(astreamer *streamer);
static void astreamer_tar_parser_free(astreamer *streamer);
static bool astreamer_tar_header(astreamer_tar_parser *mystreamer);
static const bbstreamer_ops bbstreamer_tar_parser_ops = {
.content = bbstreamer_tar_parser_content,
.finalize = bbstreamer_tar_parser_finalize,
.free = bbstreamer_tar_parser_free
static const astreamer_ops astreamer_tar_parser_ops = {
.content = astreamer_tar_parser_content,
.finalize = astreamer_tar_parser_finalize,
.free = astreamer_tar_parser_free
};
static void bbstreamer_tar_archiver_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_tar_archiver_finalize(bbstreamer *streamer);
static void bbstreamer_tar_archiver_free(bbstreamer *streamer);
static void astreamer_tar_archiver_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_tar_archiver_finalize(astreamer *streamer);
static void astreamer_tar_archiver_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_tar_archiver_ops = {
.content = bbstreamer_tar_archiver_content,
.finalize = bbstreamer_tar_archiver_finalize,
.free = bbstreamer_tar_archiver_free
static const astreamer_ops astreamer_tar_archiver_ops = {
.content = astreamer_tar_archiver_content,
.finalize = astreamer_tar_archiver_finalize,
.free = astreamer_tar_archiver_free
};
static void bbstreamer_tar_terminator_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_tar_terminator_finalize(bbstreamer *streamer);
static void bbstreamer_tar_terminator_free(bbstreamer *streamer);
static void astreamer_tar_terminator_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_tar_terminator_finalize(astreamer *streamer);
static void astreamer_tar_terminator_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_tar_terminator_ops = {
.content = bbstreamer_tar_terminator_content,
.finalize = bbstreamer_tar_terminator_finalize,
.free = bbstreamer_tar_terminator_free
static const astreamer_ops astreamer_tar_terminator_ops = {
.content = astreamer_tar_terminator_content,
.finalize = astreamer_tar_terminator_finalize,
.free = astreamer_tar_terminator_free
};
/*
* Create a bbstreamer that can parse a stream of content as tar data.
* Create a astreamer that can parse a stream of content as tar data.
*
* The input should be a series of BBSTREAMER_UNKNOWN chunks; the bbstreamer
* The input should be a series of ASTREAMER_UNKNOWN chunks; the astreamer
* specified by 'next' will receive a series of typed chunks, as per the
* conventions described in bbstreamer.h.
* conventions described in astreamer.h.
*/
bbstreamer *
bbstreamer_tar_parser_new(bbstreamer *next)
astreamer *
astreamer_tar_parser_new(astreamer *next)
{
bbstreamer_tar_parser *streamer;
astreamer_tar_parser *streamer;
streamer = palloc0(sizeof(bbstreamer_tar_parser));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_tar_parser_ops;
streamer = palloc0(sizeof(astreamer_tar_parser));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_tar_parser_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
streamer->next_context = BBSTREAMER_MEMBER_HEADER;
streamer->next_context = ASTREAMER_MEMBER_HEADER;
return &streamer->base;
}
@ -108,29 +108,29 @@ bbstreamer_tar_parser_new(bbstreamer *next)
* Parse unknown content as tar data.
*/
static void
bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
size_t nbytes;
/* Expect unparsed input. */
Assert(member == NULL);
Assert(context == BBSTREAMER_UNKNOWN);
Assert(context == ASTREAMER_UNKNOWN);
while (len > 0)
{
switch (mystreamer->next_context)
{
case BBSTREAMER_MEMBER_HEADER:
case ASTREAMER_MEMBER_HEADER:
/*
* If we're expecting an archive member header, accumulate a
* full block of data before doing anything further.
*/
if (!bbstreamer_buffer_until(streamer, &data, &len,
TAR_BLOCK_SIZE))
if (!astreamer_buffer_until(streamer, &data, &len,
TAR_BLOCK_SIZE))
return;
/*
@ -139,32 +139,32 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
* thought was the next file header is actually the start of
* the archive trailer. Switch modes accordingly.
*/
if (bbstreamer_tar_header(mystreamer))
if (astreamer_tar_header(mystreamer))
{
if (mystreamer->member.size == 0)
{
/* No content; trailer is zero-length. */
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
BBSTREAMER_MEMBER_TRAILER);
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
ASTREAMER_MEMBER_TRAILER);
/* Expect next header. */
mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
}
else
{
/* Expect contents. */
mystreamer->next_context = BBSTREAMER_MEMBER_CONTENTS;
mystreamer->next_context = ASTREAMER_MEMBER_CONTENTS;
}
mystreamer->base.bbs_buffer.len = 0;
mystreamer->file_bytes_sent = 0;
}
else
mystreamer->next_context = BBSTREAMER_ARCHIVE_TRAILER;
mystreamer->next_context = ASTREAMER_ARCHIVE_TRAILER;
break;
case BBSTREAMER_MEMBER_CONTENTS:
case ASTREAMER_MEMBER_CONTENTS:
/*
* Send as much content as we have, but not more than the
@ -174,10 +174,10 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
nbytes = mystreamer->member.size - mystreamer->file_bytes_sent;
nbytes = Min(nbytes, len);
Assert(nbytes > 0);
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, nbytes,
BBSTREAMER_MEMBER_CONTENTS);
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, nbytes,
ASTREAMER_MEMBER_CONTENTS);
mystreamer->file_bytes_sent += nbytes;
data += nbytes;
len -= nbytes;
@ -193,53 +193,53 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
if (mystreamer->pad_bytes_expected == 0)
{
/* Trailer is zero-length. */
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
BBSTREAMER_MEMBER_TRAILER);
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
NULL, 0,
ASTREAMER_MEMBER_TRAILER);
/* Expect next header. */
mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
}
else
{
/* Trailer is not zero-length. */
mystreamer->next_context = BBSTREAMER_MEMBER_TRAILER;
mystreamer->next_context = ASTREAMER_MEMBER_TRAILER;
}
mystreamer->base.bbs_buffer.len = 0;
}
break;
case BBSTREAMER_MEMBER_TRAILER:
case ASTREAMER_MEMBER_TRAILER:
/*
* If we're expecting an archive member trailer, accumulate
* the expected number of padding bytes before sending
* anything onward.
*/
if (!bbstreamer_buffer_until(streamer, &data, &len,
mystreamer->pad_bytes_expected))
if (!astreamer_buffer_until(streamer, &data, &len,
mystreamer->pad_bytes_expected))
return;
/* OK, now we can send it. */
bbstreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, mystreamer->pad_bytes_expected,
BBSTREAMER_MEMBER_TRAILER);
astreamer_content(mystreamer->base.bbs_next,
&mystreamer->member,
data, mystreamer->pad_bytes_expected,
ASTREAMER_MEMBER_TRAILER);
/* Expect next file header. */
mystreamer->next_context = BBSTREAMER_MEMBER_HEADER;
mystreamer->next_context = ASTREAMER_MEMBER_HEADER;
mystreamer->base.bbs_buffer.len = 0;
break;
case BBSTREAMER_ARCHIVE_TRAILER:
case ASTREAMER_ARCHIVE_TRAILER:
/*
* We've seen an end-of-archive indicator, so anything more is
* buffered and sent as part of the archive trailer. But we
* don't expect more than 2 blocks.
*/
bbstreamer_buffer_bytes(streamer, &data, &len, len);
astreamer_buffer_bytes(streamer, &data, &len, len);
if (len > 2 * TAR_BLOCK_SIZE)
pg_fatal("tar file trailer exceeds 2 blocks");
return;
@ -255,14 +255,14 @@ bbstreamer_tar_parser_content(bbstreamer *streamer, bbstreamer_member *member,
* Parse a file header within a tar stream.
*
* The return value is true if we found a file header and passed it on to the
* next bbstreamer; it is false if we have reached the archive trailer.
* next astreamer; it is false if we have reached the archive trailer.
*/
static bool
bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
astreamer_tar_header(astreamer_tar_parser *mystreamer)
{
bool has_nonzero_byte = false;
int i;
bbstreamer_member *member = &mystreamer->member;
astreamer_member *member = &mystreamer->member;
char *buffer = mystreamer->base.bbs_buffer.data;
Assert(mystreamer->base.bbs_buffer.len == TAR_BLOCK_SIZE);
@ -304,10 +304,10 @@ bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
/* Compute number of padding bytes. */
mystreamer->pad_bytes_expected = tarPaddingBytesRequired(member->size);
/* Forward the entire header to the next bbstreamer. */
bbstreamer_content(mystreamer->base.bbs_next, member,
buffer, TAR_BLOCK_SIZE,
BBSTREAMER_MEMBER_HEADER);
/* Forward the entire header to the next astreamer. */
astreamer_content(mystreamer->base.bbs_next, member,
buffer, TAR_BLOCK_SIZE,
ASTREAMER_MEMBER_HEADER);
return true;
}
@ -316,50 +316,50 @@ bbstreamer_tar_header(bbstreamer_tar_parser *mystreamer)
* End-of-stream processing for a tar parser.
*/
static void
bbstreamer_tar_parser_finalize(bbstreamer *streamer)
astreamer_tar_parser_finalize(astreamer *streamer)
{
bbstreamer_tar_parser *mystreamer = (bbstreamer_tar_parser *) streamer;
astreamer_tar_parser *mystreamer = (astreamer_tar_parser *) streamer;
if (mystreamer->next_context != BBSTREAMER_ARCHIVE_TRAILER &&
(mystreamer->next_context != BBSTREAMER_MEMBER_HEADER ||
if (mystreamer->next_context != ASTREAMER_ARCHIVE_TRAILER &&
(mystreamer->next_context != ASTREAMER_MEMBER_HEADER ||
mystreamer->base.bbs_buffer.len > 0))
pg_fatal("COPY stream ended before last file was finished");
/* Send the archive trailer, even if empty. */
bbstreamer_content(streamer->bbs_next, NULL,
streamer->bbs_buffer.data, streamer->bbs_buffer.len,
BBSTREAMER_ARCHIVE_TRAILER);
astreamer_content(streamer->bbs_next, NULL,
streamer->bbs_buffer.data, streamer->bbs_buffer.len,
ASTREAMER_ARCHIVE_TRAILER);
/* Now finalize successor. */
bbstreamer_finalize(streamer->bbs_next);
astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar parser.
*/
static void
bbstreamer_tar_parser_free(bbstreamer *streamer)
astreamer_tar_parser_free(astreamer *streamer)
{
pfree(streamer->bbs_buffer.data);
bbstreamer_free(streamer->bbs_next);
astreamer_free(streamer->bbs_next);
}
/*
* Create a bbstreamer that can generate a tar archive.
* Create a astreamer that can generate a tar archive.
*
* This is intended to be usable either for generating a brand-new tar archive
* or for modifying one on the fly. The input should be a series of typed
* chunks (i.e. not BBSTREAMER_UNKNOWN). See also the comments for
* bbstreamer_tar_parser_content.
* chunks (i.e. not ASTREAMER_UNKNOWN). See also the comments for
* astreamer_tar_parser_content.
*/
bbstreamer *
bbstreamer_tar_archiver_new(bbstreamer *next)
astreamer *
astreamer_tar_archiver_new(astreamer *next)
{
bbstreamer_tar_archiver *streamer;
astreamer_tar_archiver *streamer;
streamer = palloc0(sizeof(bbstreamer_tar_archiver));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_tar_archiver_ops;
streamer = palloc0(sizeof(astreamer_tar_archiver));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_tar_archiver_ops;
streamer->base.bbs_next = next;
return &streamer->base;
@ -368,36 +368,36 @@ bbstreamer_tar_archiver_new(bbstreamer *next)
/*
* Fix up the stream of input chunks to create a valid tar file.
*
* If a BBSTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
* If a ASTREAMER_MEMBER_HEADER chunk is of size 0, it is replaced with a
* newly-constructed tar header. If it is of size TAR_BLOCK_SIZE, it is
* passed through without change. Any other size is a fatal error (and
* indicates a bug).
*
* Whenever a new BBSTREAMER_MEMBER_HEADER chunk is constructed, the
* corresponding BBSTREAMER_MEMBER_TRAILER chunk is also constructed from
* Whenever a new ASTREAMER_MEMBER_HEADER chunk is constructed, the
* corresponding ASTREAMER_MEMBER_TRAILER chunk is also constructed from
* scratch. Specifically, we construct a block of zero bytes sufficient to
* pad out to a block boundary, as required by the tar format. Other
* BBSTREAMER_MEMBER_TRAILER chunks are passed through without change.
* ASTREAMER_MEMBER_TRAILER chunks are passed through without change.
*
* Any BBSTREAMER_MEMBER_CONTENTS chunks are passed through without change.
* Any ASTREAMER_MEMBER_CONTENTS chunks are passed through without change.
*
* The BBSTREAMER_ARCHIVE_TRAILER chunk is replaced with two
* The ASTREAMER_ARCHIVE_TRAILER chunk is replaced with two
* blocks of zero bytes. Not all tar programs require this, but apparently
* some do. The server does not supply this trailer. If no archive trailer is
* present, one will be added by bbstreamer_tar_parser_finalize.
* present, one will be added by astreamer_tar_parser_finalize.
*/
static void
bbstreamer_tar_archiver_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_tar_archiver_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_tar_archiver *mystreamer = (bbstreamer_tar_archiver *) streamer;
astreamer_tar_archiver *mystreamer = (astreamer_tar_archiver *) streamer;
char buffer[2 * TAR_BLOCK_SIZE];
Assert(context != BBSTREAMER_UNKNOWN);
Assert(context != ASTREAMER_UNKNOWN);
if (context == BBSTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
if (context == ASTREAMER_MEMBER_HEADER && len != TAR_BLOCK_SIZE)
{
Assert(len == 0);
@ -411,7 +411,7 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer,
/* Also make a note to replace padding, in case size changed. */
mystreamer->rearchive_member = true;
}
else if (context == BBSTREAMER_MEMBER_TRAILER &&
else if (context == ASTREAMER_MEMBER_TRAILER &&
mystreamer->rearchive_member)
{
int pad_bytes = tarPaddingBytesRequired(member->size);
@ -424,7 +424,7 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer,
/* Don't do this again unless we replace another header. */
mystreamer->rearchive_member = false;
}
else if (context == BBSTREAMER_ARCHIVE_TRAILER)
else if (context == ASTREAMER_ARCHIVE_TRAILER)
{
/* Trailer should always be two blocks of zero bytes. */
memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
@ -432,40 +432,40 @@ bbstreamer_tar_archiver_content(bbstreamer *streamer,
len = 2 * TAR_BLOCK_SIZE;
}
bbstreamer_content(streamer->bbs_next, member, data, len, context);
astreamer_content(streamer->bbs_next, member, data, len, context);
}
/*
* End-of-stream processing for a tar archiver.
*/
static void
bbstreamer_tar_archiver_finalize(bbstreamer *streamer)
astreamer_tar_archiver_finalize(astreamer *streamer)
{
bbstreamer_finalize(streamer->bbs_next);
astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar archiver.
*/
static void
bbstreamer_tar_archiver_free(bbstreamer *streamer)
astreamer_tar_archiver_free(astreamer *streamer)
{
bbstreamer_free(streamer->bbs_next);
astreamer_free(streamer->bbs_next);
pfree(streamer);
}
/*
* Create a bbstreamer that blindly adds two blocks of NUL bytes to the
* Create a astreamer that blindly adds two blocks of NUL bytes to the
* end of an incomplete tarfile that the server might send us.
*/
bbstreamer *
bbstreamer_tar_terminator_new(bbstreamer *next)
astreamer *
astreamer_tar_terminator_new(astreamer *next)
{
bbstreamer *streamer;
astreamer *streamer;
streamer = palloc0(sizeof(bbstreamer));
*((const bbstreamer_ops **) &streamer->bbs_ops) =
&bbstreamer_tar_terminator_ops;
streamer = palloc0(sizeof(astreamer));
*((const astreamer_ops **) &streamer->bbs_ops) =
&astreamer_tar_terminator_ops;
streamer->bbs_next = next;
return streamer;
@ -475,17 +475,17 @@ bbstreamer_tar_terminator_new(bbstreamer *next)
* Pass all the content through without change.
*/
static void
bbstreamer_tar_terminator_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_tar_terminator_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
/* Expect unparsed input. */
Assert(member == NULL);
Assert(context == BBSTREAMER_UNKNOWN);
Assert(context == ASTREAMER_UNKNOWN);
/* Just forward it. */
bbstreamer_content(streamer->bbs_next, member, data, len, context);
astreamer_content(streamer->bbs_next, member, data, len, context);
}
/*
@ -493,22 +493,22 @@ bbstreamer_tar_terminator_content(bbstreamer *streamer,
* to supply.
*/
static void
bbstreamer_tar_terminator_finalize(bbstreamer *streamer)
astreamer_tar_terminator_finalize(astreamer *streamer)
{
char buffer[2 * TAR_BLOCK_SIZE];
memset(buffer, 0, 2 * TAR_BLOCK_SIZE);
bbstreamer_content(streamer->bbs_next, NULL, buffer,
2 * TAR_BLOCK_SIZE, BBSTREAMER_UNKNOWN);
bbstreamer_finalize(streamer->bbs_next);
astreamer_content(streamer->bbs_next, NULL, buffer,
2 * TAR_BLOCK_SIZE, ASTREAMER_UNKNOWN);
astreamer_finalize(streamer->bbs_next);
}
/*
* Free memory associated with a tar terminator.
*/
static void
bbstreamer_tar_terminator_free(bbstreamer *streamer)
astreamer_tar_terminator_free(astreamer *streamer)
{
bbstreamer_free(streamer->bbs_next);
astreamer_free(streamer->bbs_next);
pfree(streamer);
}

View File

@ -1,11 +1,11 @@
/*-------------------------------------------------------------------------
*
* bbstreamer_zstd.c
* astreamer_zstd.c
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer_zstd.c
* src/bin/pg_basebackup/astreamer_zstd.c
*-------------------------------------------------------------------------
*/
@ -17,44 +17,44 @@
#include <zstd.h>
#endif
#include "bbstreamer.h"
#include "astreamer.h"
#include "common/logging.h"
#ifdef USE_ZSTD
typedef struct bbstreamer_zstd_frame
typedef struct astreamer_zstd_frame
{
bbstreamer base;
astreamer base;
ZSTD_CCtx *cctx;
ZSTD_DCtx *dctx;
ZSTD_outBuffer zstd_outBuf;
} bbstreamer_zstd_frame;
} astreamer_zstd_frame;
static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
static void astreamer_zstd_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_zstd_compressor_finalize(astreamer *streamer);
static void astreamer_zstd_compressor_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
.content = bbstreamer_zstd_compressor_content,
.finalize = bbstreamer_zstd_compressor_finalize,
.free = bbstreamer_zstd_compressor_free
static const astreamer_ops astreamer_zstd_compressor_ops = {
.content = astreamer_zstd_compressor_content,
.finalize = astreamer_zstd_compressor_finalize,
.free = astreamer_zstd_compressor_free
};
static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
static void astreamer_zstd_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context);
static void astreamer_zstd_decompressor_finalize(astreamer *streamer);
static void astreamer_zstd_decompressor_free(astreamer *streamer);
static const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
.content = bbstreamer_zstd_decompressor_content,
.finalize = bbstreamer_zstd_decompressor_finalize,
.free = bbstreamer_zstd_decompressor_free
static const astreamer_ops astreamer_zstd_decompressor_ops = {
.content = astreamer_zstd_decompressor_content,
.finalize = astreamer_zstd_decompressor_finalize,
.free = astreamer_zstd_decompressor_free
};
#endif
@ -62,19 +62,19 @@ static const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
* Create a new base backup streamer that performs zstd compression of tar
* blocks.
*/
bbstreamer *
bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *compress)
astreamer *
astreamer_zstd_compressor_new(astreamer *next, pg_compress_specification *compress)
{
#ifdef USE_ZSTD
bbstreamer_zstd_frame *streamer;
astreamer_zstd_frame *streamer;
size_t ret;
Assert(next != NULL);
streamer = palloc0(sizeof(bbstreamer_zstd_frame));
streamer = palloc0(sizeof(astreamer_zstd_frame));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_zstd_compressor_ops;
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_zstd_compressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@ -142,12 +142,12 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *comp
* of output buffer to next streamer and empty the buffer.
*/
static void
bbstreamer_zstd_compressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_zstd_compressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
@ -162,10 +162,10 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer,
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
@ -187,9 +187,9 @@ bbstreamer_zstd_compressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
astreamer_zstd_compressor_finalize(astreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
size_t yet_to_flush;
do
@ -204,10 +204,10 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
if (mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos <
max_needed)
{
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
BBSTREAMER_UNKNOWN);
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
ASTREAMER_UNKNOWN);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
@ -227,23 +227,23 @@ bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
/* Make sure to pass any remaining bytes to the next streamer. */
if (mystreamer->zstd_outBuf.pos > 0)
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
BBSTREAMER_UNKNOWN);
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
ASTREAMER_UNKNOWN);
bbstreamer_finalize(mystreamer->base.bbs_next);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
bbstreamer_zstd_compressor_free(bbstreamer *streamer)
astreamer_zstd_compressor_free(astreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
bbstreamer_free(streamer->bbs_next);
astreamer_free(streamer->bbs_next);
ZSTD_freeCCtx(mystreamer->cctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);
@ -254,17 +254,17 @@ bbstreamer_zstd_compressor_free(bbstreamer *streamer)
* Create a new base backup streamer that performs decompression of zstd
* compressed blocks.
*/
bbstreamer *
bbstreamer_zstd_decompressor_new(bbstreamer *next)
astreamer *
astreamer_zstd_decompressor_new(astreamer *next)
{
#ifdef USE_ZSTD
bbstreamer_zstd_frame *streamer;
astreamer_zstd_frame *streamer;
Assert(next != NULL);
streamer = palloc0(sizeof(bbstreamer_zstd_frame));
*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
&bbstreamer_zstd_decompressor_ops;
streamer = palloc0(sizeof(astreamer_zstd_frame));
*((const astreamer_ops **) &streamer->base.bbs_ops) =
&astreamer_zstd_decompressor_ops;
streamer->base.bbs_next = next;
initStringInfo(&streamer->base.bbs_buffer);
@ -293,12 +293,12 @@ bbstreamer_zstd_decompressor_new(bbstreamer *next)
* to the next streamer.
*/
static void
bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
astreamer_zstd_decompressor_content(astreamer *streamer,
astreamer_member *member,
const char *data, int len,
astreamer_archive_context context)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
ZSTD_inBuffer inBuf = {data, len, 0};
while (inBuf.pos < inBuf.size)
@ -311,10 +311,10 @@ bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
*/
if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
{
bbstreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
astreamer_content(mystreamer->base.bbs_next, member,
mystreamer->zstd_outBuf.dst,
mystreamer->zstd_outBuf.pos,
context);
/* Reset the ZSTD output buffer. */
mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
@ -335,32 +335,32 @@ bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
* End-of-stream processing.
*/
static void
bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
astreamer_zstd_decompressor_finalize(astreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
/*
* End of the stream, if there is some pending data in output buffers then
* we must forward it to next streamer.
*/
if (mystreamer->zstd_outBuf.pos > 0)
bbstreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
BBSTREAMER_UNKNOWN);
astreamer_content(mystreamer->base.bbs_next, NULL,
mystreamer->base.bbs_buffer.data,
mystreamer->base.bbs_buffer.maxlen,
ASTREAMER_UNKNOWN);
bbstreamer_finalize(mystreamer->base.bbs_next);
astreamer_finalize(mystreamer->base.bbs_next);
}
/*
* Free memory.
*/
static void
bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
astreamer_zstd_decompressor_free(astreamer *streamer)
{
bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
astreamer_zstd_frame *mystreamer = (astreamer_zstd_frame *) streamer;
bbstreamer_free(streamer->bbs_next);
astreamer_free(streamer->bbs_next);
ZSTD_freeDCtx(mystreamer->dctx);
pfree(streamer->bbs_buffer.data);
pfree(streamer);

View File

@ -1,226 +0,0 @@
/*-------------------------------------------------------------------------
*
* bbstreamer.h
*
* Each tar archive returned by the server is passed to one or more
* bbstreamer objects for further processing. The bbstreamer may do
* something simple, like write the archive to a file, perhaps after
* compressing it, but it can also do more complicated things, like
* annotating the byte stream to indicate which parts of the data
* correspond to tar headers or trailing padding, vs. which parts are
* payload data. A subsequent bbstreamer may use this information to
* make further decisions about how to process the data; for example,
* it might choose to modify the archive contents.
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/bbstreamer.h
*-------------------------------------------------------------------------
*/
#ifndef BBSTREAMER_H
#define BBSTREAMER_H
#include "common/compression.h"
#include "lib/stringinfo.h"
#include "pqexpbuffer.h"
struct bbstreamer;
struct bbstreamer_ops;
typedef struct bbstreamer bbstreamer;
typedef struct bbstreamer_ops bbstreamer_ops;
/*
* Each chunk of archive data passed to a bbstreamer is classified into one
* of these categories. When data is first received from the remote server,
* each chunk will be categorized as BBSTREAMER_UNKNOWN, and the chunks will
* be of whatever size the remote server chose to send.
*
* If the archive is parsed (e.g. see bbstreamer_tar_parser_new()), then all
* chunks should be labelled as one of the other types listed here. In
* addition, there should be exactly one BBSTREAMER_MEMBER_HEADER chunk and
* exactly one BBSTREAMER_MEMBER_TRAILER chunk per archive member, even if
* that means a zero-length call. There can be any number of
* BBSTREAMER_MEMBER_CONTENTS chunks in between those calls. There
* should exactly BBSTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
* last BBSTREAMER_MEMBER_TRAILER chunk.
*
* In theory, we could need other classifications here, such as a way of
* indicating an archive header, but the "tar" format doesn't need anything
* else, so for the time being there's no point.
*/
typedef enum
{
BBSTREAMER_UNKNOWN,
BBSTREAMER_MEMBER_HEADER,
BBSTREAMER_MEMBER_CONTENTS,
BBSTREAMER_MEMBER_TRAILER,
BBSTREAMER_ARCHIVE_TRAILER,
} bbstreamer_archive_context;
/*
* Each chunk of data that is classified as BBSTREAMER_MEMBER_HEADER,
* BBSTREAMER_MEMBER_CONTENTS, or BBSTREAMER_MEMBER_TRAILER should also
* pass a pointer to an instance of this struct. The details are expected
* to be present in the archive header and used to fill the struct, after
* which all subsequent calls for the same archive member are expected to
* pass the same details.
*/
typedef struct
{
char pathname[MAXPGPATH];
pgoff_t size;
mode_t mode;
uid_t uid;
gid_t gid;
bool is_directory;
bool is_link;
char linktarget[MAXPGPATH];
} bbstreamer_member;
/*
* Generally, each type of bbstreamer will define its own struct, but the
* first element should be 'bbstreamer base'. A bbstreamer that does not
* require any additional private data could use this structure directly.
*
* bbs_ops is a pointer to the bbstreamer_ops object which contains the
* function pointers appropriate to this type of bbstreamer.
*
* bbs_next is a pointer to the successor bbstreamer, for those types of
* bbstreamer which forward data to a successor. It need not be used and
* should be set to NULL when not relevant.
*
* bbs_buffer is a buffer for accumulating data for temporary storage. Each
* type of bbstreamer makes its own decisions about whether and how to use
* this buffer.
*/
struct bbstreamer
{
const bbstreamer_ops *bbs_ops;
bbstreamer *bbs_next;
StringInfoData bbs_buffer;
};
/*
* There are three callbacks for a bbstreamer. The 'content' callback is
* called repeatedly, as described in the bbstreamer_archive_context comments.
* Then, the 'finalize' callback is called once at the end, to give the
* bbstreamer a chance to perform cleanup such as closing files. Finally,
* because this code is running in a frontend environment where, as of this
* writing, there are no memory contexts, the 'free' callback is called to
* release memory. These callbacks should always be invoked using the static
* inline functions defined below.
*/
struct bbstreamer_ops
{
void (*content) (bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context);
void (*finalize) (bbstreamer *streamer);
void (*free) (bbstreamer *streamer);
};
/* Send some content to a bbstreamer. */
static inline void
bbstreamer_content(bbstreamer *streamer, bbstreamer_member *member,
const char *data, int len,
bbstreamer_archive_context context)
{
Assert(streamer != NULL);
streamer->bbs_ops->content(streamer, member, data, len, context);
}
/* Finalize a bbstreamer. */
static inline void
bbstreamer_finalize(bbstreamer *streamer)
{
Assert(streamer != NULL);
streamer->bbs_ops->finalize(streamer);
}
/* Free a bbstreamer. */
static inline void
bbstreamer_free(bbstreamer *streamer)
{
Assert(streamer != NULL);
streamer->bbs_ops->free(streamer);
}
/*
* This is a convenience method for use when implementing a bbstreamer; it is
* not for use by outside callers. It adds the amount of data specified by
* 'nbytes' to the bbstreamer's buffer and adjusts '*len' and '*data'
* accordingly.
*/
static inline void
bbstreamer_buffer_bytes(bbstreamer *streamer, const char **data, int *len,
int nbytes)
{
Assert(nbytes <= *len);
appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
*len -= nbytes;
*data += nbytes;
}
/*
* This is a convenience method for use when implementing a bbstreamer; it is
* not for use by outsider callers. It attempts to add enough data to the
* bbstreamer's buffer to reach a length of target_bytes and adjusts '*len'
* and '*data' accordingly. It returns true if the target length has been
* reached and false otherwise.
*/
static inline bool
bbstreamer_buffer_until(bbstreamer *streamer, const char **data, int *len,
int target_bytes)
{
int buflen = streamer->bbs_buffer.len;
if (buflen >= target_bytes)
{
/* Target length already reached; nothing to do. */
return true;
}
if (buflen + *len < target_bytes)
{
/* Not enough data to reach target length; buffer all of it. */
bbstreamer_buffer_bytes(streamer, data, len, *len);
return false;
}
/* Buffer just enough to reach the target length. */
bbstreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
return true;
}
/*
* Functions for creating bbstreamer objects of various types. See the header
* comments for each of these functions for details.
*/
extern bbstreamer *bbstreamer_plain_writer_new(char *pathname, FILE *file);
extern bbstreamer *bbstreamer_gzip_writer_new(char *pathname, FILE *file,
pg_compress_specification *compress);
extern bbstreamer *bbstreamer_extractor_new(const char *basepath,
const char *(*link_map) (const char *),
void (*report_output_file) (const char *));
extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next);
extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
pg_compress_specification *compress);
extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next,
pg_compress_specification *compress);
extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next);
extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
extern bbstreamer *bbstreamer_recovery_injector_new(bbstreamer *next,
bool is_recovery_guc_supported,
PQExpBuffer recoveryconfcontents);
extern void bbstreamer_inject_file(bbstreamer *streamer, char *pathname,
char *data, int len);
#endif

View File

@ -1,12 +1,12 @@
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
common_sources = files(
'bbstreamer_file.c',
'bbstreamer_gzip.c',
'bbstreamer_inject.c',
'bbstreamer_lz4.c',
'bbstreamer_tar.c',
'bbstreamer_zstd.c',
'astreamer_file.c',
'astreamer_gzip.c',
'astreamer_inject.c',
'astreamer_lz4.c',
'astreamer_tar.c',
'astreamer_zstd.c',
'receivelog.c',
'streamutil.c',
'walmethods.c',

View File

@ -1,12 +1,12 @@
# src/bin/pg_basebackup/nls.mk
CATALOG_NAME = pg_basebackup
GETTEXT_FILES = $(FRONTEND_COMMON_GETTEXT_FILES) \
bbstreamer_file.c \
bbstreamer_gzip.c \
bbstreamer_inject.c \
bbstreamer_lz4.c \
bbstreamer_tar.c \
bbstreamer_zstd.c \
astreamer_file.c \
astreamer_gzip.c \
astreamer_inject.c \
astreamer_lz4.c \
astreamer_tar.c \
astreamer_zstd.c \
pg_basebackup.c \
pg_createsubscriber.c \
pg_receivewal.c \

View File

@ -26,8 +26,8 @@
#endif
#include "access/xlog_internal.h"
#include "astreamer.h"
#include "backup/basebackup.h"
#include "bbstreamer.h"
#include "common/compression.h"
#include "common/file_perm.h"
#include "common/file_utils.h"
@ -57,8 +57,8 @@ typedef struct ArchiveStreamState
{
int tablespacenum;
pg_compress_specification *compress;
bbstreamer *streamer;
bbstreamer *manifest_inject_streamer;
astreamer *streamer;
astreamer *manifest_inject_streamer;
PQExpBuffer manifest_buffer;
char manifest_filename[MAXPGPATH];
FILE *manifest_file;
@ -67,7 +67,7 @@ typedef struct ArchiveStreamState
typedef struct WriteTarState
{
int tablespacenum;
bbstreamer *streamer;
astreamer *streamer;
} WriteTarState;
typedef struct WriteManifestState
@ -199,11 +199,11 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo
static void progress_update_filename(const char *filename);
static void progress_report(int tablespacenum, bool force, bool finished);
static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
bbstreamer **manifest_inject_streamer_p,
bool is_recovery_guc_supported,
bool expect_unterminated_tarfile,
pg_compress_specification *compress);
static astreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
astreamer **manifest_inject_streamer_p,
bool is_recovery_guc_supported,
bool expect_unterminated_tarfile,
pg_compress_specification *compress);
static void ReceiveArchiveStreamChunk(size_t r, char *copybuf,
void *callback_data);
static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor);
@ -1053,19 +1053,19 @@ ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
* the options selected by the user. We may just write the results directly
* to a file, or we might compress first, or we might extract the tar file
* and write each member separately. This function doesn't do any of that
* directly, but it works out what kind of bbstreamer we need to create so
* directly, but it works out what kind of astreamer we need to create so
* that the right stuff happens when, down the road, we actually receive
* the data.
*/
static bbstreamer *
static astreamer *
CreateBackupStreamer(char *archive_name, char *spclocation,
bbstreamer **manifest_inject_streamer_p,
astreamer **manifest_inject_streamer_p,
bool is_recovery_guc_supported,
bool expect_unterminated_tarfile,
pg_compress_specification *compress)
{
bbstreamer *streamer = NULL;
bbstreamer *manifest_inject_streamer = NULL;
astreamer *streamer = NULL;
astreamer *manifest_inject_streamer = NULL;
bool inject_manifest;
bool is_tar,
is_tar_gz,
@ -1160,9 +1160,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
directory = psprintf("%s/%s", basedir, spclocation);
else
directory = get_tablespace_mapping(spclocation);
streamer = bbstreamer_extractor_new(directory,
get_tablespace_mapping,
progress_update_filename);
streamer = astreamer_extractor_new(directory,
get_tablespace_mapping,
progress_update_filename);
}
else
{
@ -1188,27 +1188,27 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
}
if (compress->algorithm == PG_COMPRESSION_NONE)
streamer = bbstreamer_plain_writer_new(archive_filename,
archive_file);
streamer = astreamer_plain_writer_new(archive_filename,
archive_file);
else if (compress->algorithm == PG_COMPRESSION_GZIP)
{
strlcat(archive_filename, ".gz", sizeof(archive_filename));
streamer = bbstreamer_gzip_writer_new(archive_filename,
archive_file, compress);
streamer = astreamer_gzip_writer_new(archive_filename,
archive_file, compress);
}
else if (compress->algorithm == PG_COMPRESSION_LZ4)
{
strlcat(archive_filename, ".lz4", sizeof(archive_filename));
streamer = bbstreamer_plain_writer_new(archive_filename,
archive_file);
streamer = bbstreamer_lz4_compressor_new(streamer, compress);
streamer = astreamer_plain_writer_new(archive_filename,
archive_file);
streamer = astreamer_lz4_compressor_new(streamer, compress);
}
else if (compress->algorithm == PG_COMPRESSION_ZSTD)
{
strlcat(archive_filename, ".zst", sizeof(archive_filename));
streamer = bbstreamer_plain_writer_new(archive_filename,
archive_file);
streamer = bbstreamer_zstd_compressor_new(streamer, compress);
streamer = astreamer_plain_writer_new(archive_filename,
archive_file);
streamer = astreamer_zstd_compressor_new(streamer, compress);
}
else
{
@ -1222,7 +1222,7 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
* into it.
*/
if (must_parse_archive)
streamer = bbstreamer_tar_archiver_new(streamer);
streamer = astreamer_tar_archiver_new(streamer);
progress_update_filename(archive_filename);
}
@ -1241,9 +1241,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
if (spclocation == NULL && writerecoveryconf)
{
Assert(must_parse_archive);
streamer = bbstreamer_recovery_injector_new(streamer,
is_recovery_guc_supported,
recoveryconfcontents);
streamer = astreamer_recovery_injector_new(streamer,
is_recovery_guc_supported,
recoveryconfcontents);
}
/*
@ -1253,9 +1253,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
* we're talking to such a server we'll need to add the terminator here.
*/
if (must_parse_archive)
streamer = bbstreamer_tar_parser_new(streamer);
streamer = astreamer_tar_parser_new(streamer);
else if (expect_unterminated_tarfile)
streamer = bbstreamer_tar_terminator_new(streamer);
streamer = astreamer_tar_terminator_new(streamer);
/*
* If the user has requested a server compressed archive along with
@ -1264,11 +1264,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
if (format == 'p')
{
if (is_tar_gz)
streamer = bbstreamer_gzip_decompressor_new(streamer);
streamer = astreamer_gzip_decompressor_new(streamer);
else if (is_tar_lz4)
streamer = bbstreamer_lz4_decompressor_new(streamer);
streamer = astreamer_lz4_decompressor_new(streamer);
else if (is_tar_zstd)
streamer = bbstreamer_zstd_decompressor_new(streamer);
streamer = astreamer_zstd_decompressor_new(streamer);
}
/* Return the results. */
@ -1307,10 +1307,10 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress)
if (state.manifest_inject_streamer != NULL &&
state.manifest_buffer != NULL)
{
bbstreamer_inject_file(state.manifest_inject_streamer,
"backup_manifest",
state.manifest_buffer->data,
state.manifest_buffer->len);
astreamer_inject_file(state.manifest_inject_streamer,
"backup_manifest",
state.manifest_buffer->data,
state.manifest_buffer->len);
destroyPQExpBuffer(state.manifest_buffer);
state.manifest_buffer = NULL;
}
@ -1318,8 +1318,8 @@ ReceiveArchiveStream(PGconn *conn, pg_compress_specification *compress)
/* If there's still an archive in progress, end processing. */
if (state.streamer != NULL)
{
bbstreamer_finalize(state.streamer);
bbstreamer_free(state.streamer);
astreamer_finalize(state.streamer);
astreamer_free(state.streamer);
state.streamer = NULL;
}
}
@ -1383,8 +1383,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
/* End processing of any prior archive. */
if (state->streamer != NULL)
{
bbstreamer_finalize(state->streamer);
bbstreamer_free(state->streamer);
astreamer_finalize(state->streamer);
astreamer_free(state->streamer);
state->streamer = NULL;
}
@ -1437,8 +1437,8 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
else if (state->streamer != NULL)
{
/* Archive data. */
bbstreamer_content(state->streamer, NULL, copybuf + 1,
r - 1, BBSTREAMER_UNKNOWN);
astreamer_content(state->streamer, NULL, copybuf + 1,
r - 1, ASTREAMER_UNKNOWN);
}
else
pg_fatal("unexpected payload data");
@ -1600,7 +1600,7 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
bool tablespacenum, pg_compress_specification *compress)
{
WriteTarState state;
bbstreamer *manifest_inject_streamer;
astreamer *manifest_inject_streamer;
bool is_recovery_guc_supported;
bool expect_unterminated_tarfile;
@ -1636,16 +1636,16 @@ ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
pg_fatal("out of memory");
/* Inject it into the output tarfile. */
bbstreamer_inject_file(manifest_inject_streamer, "backup_manifest",
buf.data, buf.len);
astreamer_inject_file(manifest_inject_streamer, "backup_manifest",
buf.data, buf.len);
/* Free memory. */
termPQExpBuffer(&buf);
}
/* Cleanup. */
bbstreamer_finalize(state.streamer);
bbstreamer_free(state.streamer);
astreamer_finalize(state.streamer);
astreamer_free(state.streamer);
progress_report(tablespacenum, true, false);
@ -1663,7 +1663,7 @@ ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
{
WriteTarState *state = callback_data;
bbstreamer_content(state->streamer, NULL, copybuf, r, BBSTREAMER_UNKNOWN);
astreamer_content(state->streamer, NULL, copybuf, r, ASTREAMER_UNKNOWN);
totaldone += r;
progress_report(state->tablespacenum, false, false);

View File

@ -3317,19 +3317,19 @@ bbsink_shell
bbsink_state
bbsink_throttle
bbsink_zstd
bbstreamer
bbstreamer_archive_context
bbstreamer_extractor
bbstreamer_gzip_decompressor
bbstreamer_gzip_writer
bbstreamer_lz4_frame
bbstreamer_member
bbstreamer_ops
bbstreamer_plain_writer
bbstreamer_recovery_injector
bbstreamer_tar_archiver
bbstreamer_tar_parser
bbstreamer_zstd_frame
astreamer
astreamer_archive_context
astreamer_extractor
astreamer_gzip_decompressor
astreamer_gzip_writer
astreamer_lz4_frame
astreamer_member
astreamer_ops
astreamer_plain_writer
astreamer_recovery_injector
astreamer_tar_archiver
astreamer_tar_parser
astreamer_zstd_frame
bgworker_main_type
bh_node_type
binaryheap