Add LZ4 compression to pg_dump

Expand pg_dump's compression streaming and file APIs to support the lz4
algorithm. The newly added compress_lz4.{c,h} files cover all the
functionality of the aforementioned APIs. Minor changes were necessary
in various pg_backup_* files, where code for the 'lz4' file suffix has
been added, as well as pg_dump's compression option parsing.

Author: Georgios Kokolatos
Reviewed-by: Michael Paquier, Rachel Heaton, Justin Pryzby, Shi Yu, Tomas Vondra
Discussion: https://postgr.es/m/faUNEOpts9vunEaLnmxmG-DldLSg_ql137OC3JYDmgrOMHm1RvvWY2IdBkv_CRxm5spCCb_OmKNk2T03TMm0fBEWveFF9wA1WizPuAgB7Ss%3D%40protonmail.com
This commit is contained in:
Tomas Vondra 2023-02-23 21:19:19 +01:00
parent e0b3074e89
commit 0da243fed0
12 changed files with 782 additions and 22 deletions

View File

@ -330,9 +330,10 @@ PostgreSQL documentation
machine-readable format that <application>pg_restore</application> machine-readable format that <application>pg_restore</application>
can read. A directory format archive can be manipulated with can read. A directory format archive can be manipulated with
standard Unix tools; for example, files in an uncompressed archive standard Unix tools; for example, files in an uncompressed archive
can be compressed with the <application>gzip</application> tool. can be compressed with the <application>gzip</application> or
This format is compressed by default and also supports parallel <application>lz4</application>tool.
dumps. This format is compressed by default using <literal>gzip</literal>
and also supports parallel dumps.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
@ -654,7 +655,7 @@ PostgreSQL documentation
<para> <para>
Specify the compression method and/or the compression level to use. Specify the compression method and/or the compression level to use.
The compression method can be set to <literal>gzip</literal> or The compression method can be set to <literal>gzip</literal> or
<literal>none</literal> for no compression. <literal>lz4</literal> or <literal>none</literal> for no compression.
A compression detail string can optionally be specified. If the A compression detail string can optionally be specified. If the
detail string is an integer, it specifies the compression level. detail string is an integer, it specifies the compression level.
Otherwise, it should be a comma-separated list of items, each of the Otherwise, it should be a comma-separated list of items, each of the
@ -675,8 +676,8 @@ PostgreSQL documentation
individual table-data segments, and the default is to compress using individual table-data segments, and the default is to compress using
<literal>gzip</literal> at a moderate level. For plain text output, <literal>gzip</literal> at a moderate level. For plain text output,
setting a nonzero compression level causes the entire output file to be compressed, setting a nonzero compression level causes the entire output file to be compressed,
as though it had been fed through <application>gzip</application>; but the default as though it had been fed through <application>gzip</application> or
is not to compress. <application>lz4</application>; but the default is not to compress.
</para> </para>
<para> <para>
The tar archive format currently does not support compression at all. The tar archive format currently does not support compression at all.

View File

@ -17,6 +17,7 @@ top_builddir = ../../..
include $(top_builddir)/src/Makefile.global include $(top_builddir)/src/Makefile.global
export GZIP_PROGRAM=$(GZIP) export GZIP_PROGRAM=$(GZIP)
export LZ4
export with_icu export with_icu
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
@ -26,6 +27,7 @@ OBJS = \
$(WIN32RES) \ $(WIN32RES) \
compress_gzip.o \ compress_gzip.o \
compress_io.o \ compress_io.o \
compress_lz4.o \
compress_none.o \ compress_none.o \
dumputils.o \ dumputils.o \
parallel.o \ parallel.o \

View File

@ -53,7 +53,7 @@
* InitDiscoverCompressFileHandle tries to infer the compression by the * InitDiscoverCompressFileHandle tries to infer the compression by the
* filename suffix. If the suffix is not yet known then it tries to simply * filename suffix. If the suffix is not yet known then it tries to simply
* open the file and if it fails, it tries to open the same file with the .gz * open the file and if it fails, it tries to open the same file with the .gz
* suffix. * suffix, and then again with the .lz4 suffix.
* *
* IDENTIFICATION * IDENTIFICATION
* src/bin/pg_dump/compress_io.c * src/bin/pg_dump/compress_io.c
@ -67,6 +67,7 @@
#include "compress_gzip.h" #include "compress_gzip.h"
#include "compress_io.h" #include "compress_io.h"
#include "compress_lz4.h"
#include "compress_none.h" #include "compress_none.h"
#include "pg_backup_utils.h" #include "pg_backup_utils.h"
@ -93,6 +94,10 @@ supports_compression(const pg_compress_specification compression_spec)
if (algorithm == PG_COMPRESSION_GZIP) if (algorithm == PG_COMPRESSION_GZIP)
supported = true; supported = true;
#endif #endif
#ifdef USE_LZ4
if (algorithm == PG_COMPRESSION_LZ4)
supported = true;
#endif
if (!supported) if (!supported)
return psprintf("this build does not support compression with %s", return psprintf("this build does not support compression with %s",
@ -123,6 +128,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
InitCompressorNone(cs, compression_spec); InitCompressorNone(cs, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_GZIP) else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
InitCompressorGzip(cs, compression_spec); InitCompressorGzip(cs, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
InitCompressorLZ4(cs, compression_spec);
return cs; return cs;
} }
@ -187,6 +194,8 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
InitCompressFileHandleNone(CFH, compression_spec); InitCompressFileHandleNone(CFH, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_GZIP) else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
InitCompressFileHandleGzip(CFH, compression_spec); InitCompressFileHandleGzip(CFH, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
InitCompressFileHandleLZ4(CFH, compression_spec);
return CFH; return CFH;
} }
@ -196,11 +205,11 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
* be either "r" or "rb". * be either "r" or "rb".
* *
* If the file at 'path' contains the suffix of a supported compression method, * If the file at 'path' contains the suffix of a supported compression method,
* currently this includes only ".gz", then this compression will be used * currently this includes ".gz" and ".lz4", then this compression will be used
* throughout. Otherwise the compression will be inferred by iteratively trying * throughout. Otherwise the compression will be inferred by iteratively trying
* to open the file at 'path', first as is, then by appending known compression * to open the file at 'path', first as is, then by appending known compression
* suffixes. So if you pass "foo" as 'path', this will open either "foo" or * suffixes. So if you pass "foo" as 'path', this will open either "foo" or
* "foo.gz", trying in that order. * "foo.gz" or "foo.lz4", trying in that order.
* *
* On failure, return NULL with an error code in errno. * On failure, return NULL with an error code in errno.
*/ */
@ -238,6 +247,17 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
if (exists) if (exists)
compression_spec.algorithm = PG_COMPRESSION_GZIP; compression_spec.algorithm = PG_COMPRESSION_GZIP;
} }
#endif
#ifdef USE_LZ4
if (!exists)
{
free_keep_errno(fname);
fname = psprintf("%s.lz4", path);
exists = (stat(fname, &st) == 0);
if (exists)
compression_spec.algorithm = PG_COMPRESSION_LZ4;
}
#endif #endif
} }

View File

@ -0,0 +1,626 @@
/*-------------------------------------------------------------------------
*
* compress_lz4.c
* Routines for archivers to write a LZ4 compressed data stream.
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/bin/pg_dump/compress_lz4.c
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include "pg_backup_utils.h"
#include "compress_lz4.h"
#ifdef USE_LZ4
#include <lz4.h>
#include <lz4frame.h>
#define LZ4_OUT_SIZE (4 * 1024)
#define LZ4_IN_SIZE (16 * 1024)
/*
* LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
* Redefine it for installations with a lesser version.
*/
#ifndef LZ4F_HEADER_SIZE_MAX
#define LZ4F_HEADER_SIZE_MAX 32
#endif
/*----------------------
* Compressor API
*----------------------
*/
typedef struct LZ4CompressorState
{
char *outbuf;
size_t outsize;
} LZ4CompressorState;
/* Private routines that support LZ4 compressed data I/O */
static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs);
static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen);
static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
static void
ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
{
LZ4_streamDecode_t lz4StreamDecode;
char *buf;
char *decbuf;
size_t buflen;
size_t cnt;
buflen = LZ4_IN_SIZE;
buf = pg_malloc(buflen);
decbuf = pg_malloc(buflen);
LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
while ((cnt = cs->readF(AH, &buf, &buflen)))
{
int decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
buf, decbuf,
cnt, buflen);
ahwrite(decbuf, 1, decBytes, AH);
}
pg_free(buf);
pg_free(decbuf);
}
static void
WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
const void *data, size_t dLen)
{
LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data;
size_t compressed;
size_t requiredsize = LZ4_compressBound(dLen);
if (requiredsize > LZ4cs->outsize)
{
LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
LZ4cs->outsize = requiredsize;
}
compressed = LZ4_compress_default(data, LZ4cs->outbuf,
dLen, LZ4cs->outsize);
if (compressed <= 0)
pg_fatal("failed to LZ4 compress data");
cs->writeF(AH, LZ4cs->outbuf, compressed);
}
static void
EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
{
LZ4CompressorState *LZ4cs;
LZ4cs = (LZ4CompressorState *) cs->private_data;
if (LZ4cs)
{
pg_free(LZ4cs->outbuf);
pg_free(LZ4cs);
cs->private_data = NULL;
}
}
/*
* Public routines that support LZ4 compressed data I/O
*/
void
InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
{
cs->readData = ReadDataFromArchiveLZ4;
cs->writeData = WriteDataToArchiveLZ4;
cs->end = EndCompressorLZ4;
cs->compression_spec = compression_spec;
/* Will be lazy init'd */
cs->private_data = pg_malloc0(sizeof(LZ4CompressorState));
}
/*----------------------
* Compress File API
*----------------------
*/
/*
* State needed for LZ4 (de)compression using the CompressFileHandle API.
*/
typedef struct LZ4File
{
FILE *fp;
LZ4F_preferences_t prefs;
LZ4F_compressionContext_t ctx;
LZ4F_decompressionContext_t dtx;
bool inited;
bool compressing;
size_t buflen;
char *buffer;
size_t overflowalloclen;
size_t overflowlen;
char *overflowbuf;
size_t errcode;
} LZ4File;
/*
* LZ4 equivalent to feof() or gzeof(). The end of file is reached if there
* is no decompressed output in the overflow buffer and the end of the file
* is reached.
*/
static int
LZ4File_eof(CompressFileHandle *CFH)
{
LZ4File *fs = (LZ4File *) CFH->private_data;
return fs->overflowlen == 0 && feof(fs->fp);
}
static const char *
LZ4File_get_error(CompressFileHandle *CFH)
{
LZ4File *fs = (LZ4File *) CFH->private_data;
const char *errmsg;
if (LZ4F_isError(fs->errcode))
errmsg = LZ4F_getErrorName(fs->errcode);
else
errmsg = strerror(errno);
return errmsg;
}
/*
* Prepare an already alloc'ed LZ4File struct for subsequent calls.
*
* It creates the necessary contexts for the operations. When compressing,
* it additionally writes the LZ4 header in the output stream.
*/
static int
LZ4File_init(LZ4File *fs, int size, bool compressing)
{
size_t status;
if (fs->inited)
return 0;
fs->compressing = compressing;
fs->inited = true;
if (fs->compressing)
{
fs->buflen = LZ4F_compressBound(LZ4_IN_SIZE, &fs->prefs);
if (fs->buflen < LZ4F_HEADER_SIZE_MAX)
fs->buflen = LZ4F_HEADER_SIZE_MAX;
status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION);
if (LZ4F_isError(status))
{
fs->errcode = status;
return 1;
}
fs->buffer = pg_malloc(fs->buflen);
status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen,
&fs->prefs);
if (LZ4F_isError(status))
{
fs->errcode = status;
return 1;
}
if (fwrite(fs->buffer, 1, status, fs->fp) != status)
{
errno = (errno) ? errno : ENOSPC;
return 1;
}
}
else
{
status = LZ4F_createDecompressionContext(&fs->dtx, LZ4F_VERSION);
if (LZ4F_isError(status))
{
fs->errcode = status;
return 1;
}
fs->buflen = size > LZ4_OUT_SIZE ? size : LZ4_OUT_SIZE;
fs->buffer = pg_malloc(fs->buflen);
fs->overflowalloclen = fs->buflen;
fs->overflowbuf = pg_malloc(fs->overflowalloclen);
fs->overflowlen = 0;
}
return 0;
}
/*
* Read already decompressed content from the overflow buffer into 'ptr' up to
* 'size' bytes, if available. If the eol_flag is set, then stop at the first
* occurrence of the new line char prior to 'size' bytes.
*
* Any unread content in the overflow buffer is moved to the beginning.
*/
static int
LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
{
char *p;
int readlen = 0;
if (fs->overflowlen == 0)
return 0;
if (fs->overflowlen >= size)
readlen = size;
else
readlen = fs->overflowlen;
if (eol_flag && (p = memchr(fs->overflowbuf, '\n', readlen)))
/* Include the line terminating char */
readlen = p - fs->overflowbuf + 1;
memcpy(ptr, fs->overflowbuf, readlen);
fs->overflowlen -= readlen;
if (fs->overflowlen > 0)
memmove(fs->overflowbuf, fs->overflowbuf + readlen, fs->overflowlen);
return readlen;
}
/*
* The workhorse for reading decompressed content out of an LZ4 compressed
* stream.
*
* It will read up to 'ptrsize' decompressed content, or up to the new line
* char if found first when the eol_flag is set. It is possible that the
* decompressed output generated by reading any compressed input via the
* LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
* at an overflow buffer within LZ4File. Of course, when the function is
* called, it will first try to consume any decompressed content already
* present in the overflow buffer, before decompressing new content.
*/
static int
LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
{
size_t dsize = 0;
size_t rsize;
size_t size = ptrsize;
bool eol_found = false;
void *readbuf;
/* Lazy init */
if (LZ4File_init(fs, size, false /* decompressing */ ))
return -1;
/* Verify that there is enough space in the outbuf */
if (size > fs->buflen)
{
fs->buflen = size;
fs->buffer = pg_realloc(fs->buffer, size);
}
/* use already decompressed content if available */
dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag);
if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
return dsize;
readbuf = pg_malloc(size);
do
{
char *rp;
char *rend;
rsize = fread(readbuf, 1, size, fs->fp);
if (rsize < size && !feof(fs->fp))
return -1;
rp = (char *) readbuf;
rend = (char *) readbuf + rsize;
while (rp < rend)
{
size_t status;
size_t outlen = fs->buflen;
size_t read_remain = rend - rp;
memset(fs->buffer, 0, outlen);
status = LZ4F_decompress(fs->dtx, fs->buffer, &outlen,
rp, &read_remain, NULL);
if (LZ4F_isError(status))
{
fs->errcode = status;
return -1;
}
rp += read_remain;
/*
* fill in what space is available in ptr if the eol flag is set,
* either skip if one already found or fill up to EOL if present
* in the outbuf
*/
if (outlen > 0 && dsize < size && eol_found == false)
{
char *p;
size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
size_t len = outlen < lib ? outlen : lib;
if (eol_flag &&
(p = memchr(fs->buffer, '\n', outlen)) &&
(size_t) (p - fs->buffer + 1) <= len)
{
len = p - fs->buffer + 1;
eol_found = true;
}
memcpy((char *) ptr + dsize, fs->buffer, len);
dsize += len;
/* move what did not fit, if any, at the beginning of the buf */
if (len < outlen)
memmove(fs->buffer, fs->buffer + len, outlen - len);
outlen -= len;
}
/* if there is available output, save it */
if (outlen > 0)
{
while (fs->overflowlen + outlen > fs->overflowalloclen)
{
fs->overflowalloclen *= 2;
fs->overflowbuf = pg_realloc(fs->overflowbuf,
fs->overflowalloclen);
}
memcpy(fs->overflowbuf + fs->overflowlen, fs->buffer, outlen);
fs->overflowlen += outlen;
}
}
} while (rsize == size && dsize < size && eol_found == 0);
pg_free(readbuf);
return (int) dsize;
}
/*
* Compress size bytes from ptr and write them to the stream.
*/
static size_t
LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
{
LZ4File *fs = (LZ4File *) CFH->private_data;
size_t status;
int remaining = size;
/* Lazy init */
if (LZ4File_init(fs, size, true))
return -1;
while (remaining > 0)
{
int chunk = remaining < LZ4_IN_SIZE ? remaining : LZ4_IN_SIZE;
remaining -= chunk;
status = LZ4F_compressUpdate(fs->ctx, fs->buffer, fs->buflen,
ptr, chunk, NULL);
if (LZ4F_isError(status))
{
fs->errcode = status;
return -1;
}
if (fwrite(fs->buffer, 1, status, fs->fp) != status)
{
errno = (errno) ? errno : ENOSPC;
return 1;
}
}
return size;
}
/*
* fread() equivalent implementation for LZ4 compressed files.
*/
static size_t
LZ4File_read(void *ptr, size_t size, CompressFileHandle *CFH)
{
LZ4File *fs = (LZ4File *) CFH->private_data;
int ret;
ret = LZ4File_read_internal(fs, ptr, size, false);
if (ret != size && !LZ4File_eof(CFH))
pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
return ret;
}
/*
* fgetc() equivalent implementation for LZ4 compressed files.
*/
static int
LZ4File_getc(CompressFileHandle *CFH)
{
LZ4File *fs = (LZ4File *) CFH->private_data;
unsigned char c;
if (LZ4File_read_internal(fs, &c, 1, false) != 1)
{
if (!LZ4File_eof(CFH))
pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
else
pg_fatal("could not read from input file: end of file");
}
return c;
}
/*
* fgets() equivalent implementation for LZ4 compressed files.
*/
static char *
LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH)
{
LZ4File *fs = (LZ4File *) CFH->private_data;
size_t dsize;
dsize = LZ4File_read_internal(fs, ptr, size, true);
if (dsize < 0)
pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
/* Done reading */
if (dsize == 0)
return NULL;
return ptr;
}
/*
* Finalize (de)compression of a stream. When compressing it will write any
* remaining content and/or generated footer from the LZ4 API.
*/
static int
LZ4File_close(CompressFileHandle *CFH)
{
FILE *fp;
LZ4File *fs = (LZ4File *) CFH->private_data;
size_t status;
int ret;
fp = fs->fp;
if (fs->inited)
{
if (fs->compressing)
{
status = LZ4F_compressEnd(fs->ctx, fs->buffer, fs->buflen, NULL);
if (LZ4F_isError(status))
pg_fatal("failed to end compression: %s",
LZ4F_getErrorName(status));
else if ((ret = fwrite(fs->buffer, 1, status, fs->fp)) != status)
{
errno = (errno) ? errno : ENOSPC;
WRITE_ERROR_EXIT;
}
status = LZ4F_freeCompressionContext(fs->ctx);
if (LZ4F_isError(status))
pg_fatal("failed to end compression: %s",
LZ4F_getErrorName(status));
}
else
{
status = LZ4F_freeDecompressionContext(fs->dtx);
if (LZ4F_isError(status))
pg_fatal("failed to end decompression: %s",
LZ4F_getErrorName(status));
pg_free(fs->overflowbuf);
}
pg_free(fs->buffer);
}
pg_free(fs);
return fclose(fp);
}
static int
LZ4File_open(const char *path, int fd, const char *mode,
CompressFileHandle *CFH)
{
FILE *fp;
LZ4File *lz4fp = (LZ4File *) CFH->private_data;
if (fd >= 0)
fp = fdopen(fd, mode);
else
fp = fopen(path, mode);
if (fp == NULL)
{
lz4fp->errcode = errno;
return 1;
}
lz4fp->fp = fp;
return 0;
}
static int
LZ4File_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
{
char *fname;
int ret;
fname = psprintf("%s.lz4", path);
ret = CFH->open_func(fname, -1, mode, CFH);
pg_free(fname);
return ret;
}
/*
* Public routines
*/
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
const pg_compress_specification compression_spec)
{
LZ4File *lz4fp;
CFH->open_func = LZ4File_open;
CFH->open_write_func = LZ4File_open_write;
CFH->read_func = LZ4File_read;
CFH->write_func = LZ4File_write;
CFH->gets_func = LZ4File_gets;
CFH->getc_func = LZ4File_getc;
CFH->eof_func = LZ4File_eof;
CFH->close_func = LZ4File_close;
CFH->get_error_func = LZ4File_get_error;
CFH->compression_spec = compression_spec;
lz4fp = pg_malloc0(sizeof(*lz4fp));
if (CFH->compression_spec.level >= 0)
lz4fp->prefs.compressionLevel = CFH->compression_spec.level;
CFH->private_data = lz4fp;
}
#else /* USE_LZ4 */
void
InitCompressorLZ4(CompressorState *cs,
const pg_compress_specification compression_spec)
{
pg_fatal("this build does not support compression with %s", "LZ4");
}
void
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
const pg_compress_specification compression_spec)
{
pg_fatal("this build does not support compression with %s", "LZ4");
}
#endif /* USE_LZ4 */

View File

@ -0,0 +1,24 @@
/*-------------------------------------------------------------------------
*
* compress_lz4.h
* LZ4 interface to compress_io.c routines
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/bin/pg_dump/compress_lz4.h
*
*-------------------------------------------------------------------------
*/
#ifndef _COMPRESS_LZ4_H_
#define _COMPRESS_LZ4_H_
#include "compress_io.h"
extern void InitCompressorLZ4(CompressorState *cs,
const pg_compress_specification compression_spec);
extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
const pg_compress_specification compression_spec);
#endif /* _COMPRESS_LZ4_H_ */

View File

@ -3,6 +3,7 @@
pg_dump_common_sources = files( pg_dump_common_sources = files(
'compress_gzip.c', 'compress_gzip.c',
'compress_io.c', 'compress_io.c',
'compress_lz4.c',
'compress_none.c', 'compress_none.c',
'dumputils.c', 'dumputils.c',
'parallel.c', 'parallel.c',
@ -18,7 +19,7 @@ pg_dump_common_sources = files(
pg_dump_common = static_library('libpgdump_common', pg_dump_common = static_library('libpgdump_common',
pg_dump_common_sources, pg_dump_common_sources,
c_pch: pch_postgres_fe_h, c_pch: pch_postgres_fe_h,
dependencies: [frontend_code, libpq, zlib], dependencies: [frontend_code, libpq, lz4, zlib],
kwargs: internal_lib_args, kwargs: internal_lib_args,
) )
@ -86,7 +87,10 @@ tests += {
'sd': meson.current_source_dir(), 'sd': meson.current_source_dir(),
'bd': meson.current_build_dir(), 'bd': meson.current_build_dir(),
'tap': { 'tap': {
'env': {'GZIP_PROGRAM': gzip.path()}, 'env': {
'GZIP_PROGRAM': gzip.path(),
'LZ4': program_lz4.found() ? program_lz4.path() : '',
},
'tests': [ 'tests': [
't/001_basic.pl', 't/001_basic.pl',
't/002_pg_dump.pl', 't/002_pg_dump.pl',

View File

@ -2075,7 +2075,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
/* /*
* Check if the specified archive is a directory. If so, check if * Check if the specified archive is a directory. If so, check if
* there's a "toc.dat" (or "toc.dat.gz") file in it. * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
*/ */
if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode)) if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
{ {
@ -2085,6 +2085,10 @@ _discoverArchiveFormat(ArchiveHandle *AH)
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz")) if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
return AH->format; return AH->format;
#endif
#ifdef USE_LZ4
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
return AH->format;
#endif #endif
pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)", pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
AH->fSpec); AH->fSpec);

View File

@ -779,10 +779,13 @@ _PrepParallelRestore(ArchiveHandle *AH)
if (stat(fname, &st) == 0) if (stat(fname, &st) == 0)
te->dataLength = st.st_size; te->dataLength = st.st_size;
else else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
{ {
/* It might be compressed */ if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
strlcat(fname, ".gz", sizeof(fname)); strlcat(fname, ".gz", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
strlcat(fname, ".lz4", sizeof(fname));
if (stat(fname, &st) == 0) if (stat(fname, &st) == 0)
te->dataLength = st.st_size; te->dataLength = st.st_size;
} }

View File

@ -715,13 +715,12 @@ main(int argc, char **argv)
case PG_COMPRESSION_NONE: case PG_COMPRESSION_NONE:
/* fallthrough */ /* fallthrough */
case PG_COMPRESSION_GZIP: case PG_COMPRESSION_GZIP:
/* fallthrough */
case PG_COMPRESSION_LZ4:
break; break;
case PG_COMPRESSION_ZSTD: case PG_COMPRESSION_ZSTD:
pg_fatal("compression with %s is not yet supported", "ZSTD"); pg_fatal("compression with %s is not yet supported", "ZSTD");
break; break;
case PG_COMPRESSION_LZ4:
pg_fatal("compression with %s is not yet supported", "LZ4");
break;
} }
/* /*

View File

@ -139,6 +139,80 @@ my %pgdump_runs = (
args => [ '-d', "$tempdir/compression_gzip_plain.sql.gz", ], args => [ '-d', "$tempdir/compression_gzip_plain.sql.gz", ],
}, },
}, },
# Do not use --no-sync to give test coverage for data sync.
compression_lz4_custom => {
test_key => 'compression',
compile_option => 'lz4',
dump_cmd => [
'pg_dump', '--format=custom',
'--compress=lz4', "--file=$tempdir/compression_lz4_custom.dump",
'postgres',
],
restore_cmd => [
'pg_restore',
"--file=$tempdir/compression_lz4_custom.sql",
"$tempdir/compression_lz4_custom.dump",
],
command_like => {
command => [
'pg_restore',
'-l', "$tempdir/compression_lz4_custom.dump",
],
expected => qr/Compression: lz4/,
name => 'data content is lz4 compressed'
},
},
# Do not use --no-sync to give test coverage for data sync.
compression_lz4_dir => {
test_key => 'compression',
compile_option => 'lz4',
dump_cmd => [
'pg_dump', '--jobs=2',
'--format=directory', '--compress=lz4:1',
"--file=$tempdir/compression_lz4_dir", 'postgres',
],
# Give coverage for manually compressed blob.toc files during
# restore.
compress_cmd => {
program => $ENV{'LZ4'},
args => [
'-z', '-f', '--rm',
"$tempdir/compression_lz4_dir/blobs.toc",
"$tempdir/compression_lz4_dir/blobs.toc.lz4",
],
},
# Verify that data files were compressed
glob_patterns => [
"$tempdir/compression_lz4_dir/toc.dat",
"$tempdir/compression_lz4_dir/*.dat.lz4",
],
restore_cmd => [
'pg_restore', '--jobs=2',
"--file=$tempdir/compression_lz4_dir.sql",
"$tempdir/compression_lz4_dir",
],
},
compression_lz4_plain => {
test_key => 'compression',
compile_option => 'lz4',
dump_cmd => [
'pg_dump', '--format=plain', '--compress=lz4',
"--file=$tempdir/compression_lz4_plain.sql.lz4", 'postgres',
],
# Decompress the generated file to run through the tests.
compress_cmd => {
program => $ENV{'LZ4'},
args => [
'-d', '-f',
"$tempdir/compression_lz4_plain.sql.lz4",
"$tempdir/compression_lz4_plain.sql",
],
},
},
clean => { clean => {
dump_cmd => [ dump_cmd => [
'pg_dump', 'pg_dump',
@ -4175,11 +4249,11 @@ foreach my $run (sort keys %pgdump_runs)
my $run_db = 'postgres'; my $run_db = 'postgres';
# Skip command-level tests for gzip if there is no support for it. # Skip command-level tests for gzip if there is no support for it.
if ( defined($pgdump_runs{$run}->{compile_option}) if ($pgdump_runs{$run}->{compile_option} &&
&& $pgdump_runs{$run}->{compile_option} eq 'gzip' ($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
&& !$supports_gzip) ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))
{ {
note "$run: skipped due to no gzip support"; note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
next; next;
} }

View File

@ -152,6 +152,7 @@ do
# as field names, which is unfortunate but we won't change it now. # as field names, which is unfortunate but we won't change it now.
test "$f" = src/bin/pg_dump/compress_gzip.h && continue test "$f" = src/bin/pg_dump/compress_gzip.h && continue
test "$f" = src/bin/pg_dump/compress_io.h && continue test "$f" = src/bin/pg_dump/compress_io.h && continue
test "$f" = src/bin/pg_dump/compress_lz4.h && continue
test "$f" = src/bin/pg_dump/compress_none.h && continue test "$f" = src/bin/pg_dump/compress_none.h && continue
test "$f" = src/bin/pg_dump/parallel.h && continue test "$f" = src/bin/pg_dump/parallel.h && continue
test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue

View File

@ -1387,11 +1387,13 @@ LWLock
LWLockHandle LWLockHandle
LWLockMode LWLockMode
LWLockPadded LWLockPadded
LZ4CompressorState
LZ4F_compressionContext_t LZ4F_compressionContext_t
LZ4F_decompressOptions_t LZ4F_decompressOptions_t
LZ4F_decompressionContext_t LZ4F_decompressionContext_t
LZ4F_errorCode_t LZ4F_errorCode_t
LZ4F_preferences_t LZ4F_preferences_t
LZ4File
LabelProvider LabelProvider
LagTracker LagTracker
LargeObjectDesc LargeObjectDesc