mirror of https://github.com/postgres/postgres
Add LZ4 compression to pg_dump
Expand pg_dump's compression streaming and file APIs to support the lz4 algorithm. The newly added compress_lz4.{c,h} files cover all the functionality of the aforementioned APIs. Minor changes were necessary in various pg_backup_* files, where code for the 'lz4' file suffix has been added, as well as pg_dump's compression option parsing. Author: Georgios Kokolatos Reviewed-by: Michael Paquier, Rachel Heaton, Justin Pryzby, Shi Yu, Tomas Vondra Discussion: https://postgr.es/m/faUNEOpts9vunEaLnmxmG-DldLSg_ql137OC3JYDmgrOMHm1RvvWY2IdBkv_CRxm5spCCb_OmKNk2T03TMm0fBEWveFF9wA1WizPuAgB7Ss%3D%40protonmail.com
This commit is contained in:
parent
e0b3074e89
commit
0da243fed0
|
@ -330,9 +330,10 @@ PostgreSQL documentation
|
|||
machine-readable format that <application>pg_restore</application>
|
||||
can read. A directory format archive can be manipulated with
|
||||
standard Unix tools; for example, files in an uncompressed archive
|
||||
can be compressed with the <application>gzip</application> tool.
|
||||
This format is compressed by default and also supports parallel
|
||||
dumps.
|
||||
can be compressed with the <application>gzip</application> or
|
||||
<application>lz4</application>tool.
|
||||
This format is compressed by default using <literal>gzip</literal>
|
||||
and also supports parallel dumps.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
@ -654,7 +655,7 @@ PostgreSQL documentation
|
|||
<para>
|
||||
Specify the compression method and/or the compression level to use.
|
||||
The compression method can be set to <literal>gzip</literal> or
|
||||
<literal>none</literal> for no compression.
|
||||
<literal>lz4</literal> or <literal>none</literal> for no compression.
|
||||
A compression detail string can optionally be specified. If the
|
||||
detail string is an integer, it specifies the compression level.
|
||||
Otherwise, it should be a comma-separated list of items, each of the
|
||||
|
@ -675,8 +676,8 @@ PostgreSQL documentation
|
|||
individual table-data segments, and the default is to compress using
|
||||
<literal>gzip</literal> at a moderate level. For plain text output,
|
||||
setting a nonzero compression level causes the entire output file to be compressed,
|
||||
as though it had been fed through <application>gzip</application>; but the default
|
||||
is not to compress.
|
||||
as though it had been fed through <application>gzip</application> or
|
||||
<application>lz4</application>; but the default is not to compress.
|
||||
</para>
|
||||
<para>
|
||||
The tar archive format currently does not support compression at all.
|
||||
|
|
|
@ -17,6 +17,7 @@ top_builddir = ../../..
|
|||
include $(top_builddir)/src/Makefile.global
|
||||
|
||||
export GZIP_PROGRAM=$(GZIP)
|
||||
export LZ4
|
||||
export with_icu
|
||||
|
||||
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
|
||||
|
@ -26,6 +27,7 @@ OBJS = \
|
|||
$(WIN32RES) \
|
||||
compress_gzip.o \
|
||||
compress_io.o \
|
||||
compress_lz4.o \
|
||||
compress_none.o \
|
||||
dumputils.o \
|
||||
parallel.o \
|
||||
|
|
|
@ -53,7 +53,7 @@
|
|||
* InitDiscoverCompressFileHandle tries to infer the compression by the
|
||||
* filename suffix. If the suffix is not yet known then it tries to simply
|
||||
* open the file and if it fails, it tries to open the same file with the .gz
|
||||
* suffix.
|
||||
* suffix, and then again with the .lz4 suffix.
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/bin/pg_dump/compress_io.c
|
||||
|
@ -67,6 +67,7 @@
|
|||
|
||||
#include "compress_gzip.h"
|
||||
#include "compress_io.h"
|
||||
#include "compress_lz4.h"
|
||||
#include "compress_none.h"
|
||||
#include "pg_backup_utils.h"
|
||||
|
||||
|
@ -93,6 +94,10 @@ supports_compression(const pg_compress_specification compression_spec)
|
|||
if (algorithm == PG_COMPRESSION_GZIP)
|
||||
supported = true;
|
||||
#endif
|
||||
#ifdef USE_LZ4
|
||||
if (algorithm == PG_COMPRESSION_LZ4)
|
||||
supported = true;
|
||||
#endif
|
||||
|
||||
if (!supported)
|
||||
return psprintf("this build does not support compression with %s",
|
||||
|
@ -123,6 +128,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
|
|||
InitCompressorNone(cs, compression_spec);
|
||||
else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
|
||||
InitCompressorGzip(cs, compression_spec);
|
||||
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
|
||||
InitCompressorLZ4(cs, compression_spec);
|
||||
|
||||
return cs;
|
||||
}
|
||||
|
@ -187,6 +194,8 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
|
|||
InitCompressFileHandleNone(CFH, compression_spec);
|
||||
else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
|
||||
InitCompressFileHandleGzip(CFH, compression_spec);
|
||||
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
|
||||
InitCompressFileHandleLZ4(CFH, compression_spec);
|
||||
|
||||
return CFH;
|
||||
}
|
||||
|
@ -196,11 +205,11 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
|
|||
* be either "r" or "rb".
|
||||
*
|
||||
* If the file at 'path' contains the suffix of a supported compression method,
|
||||
* currently this includes only ".gz", then this compression will be used
|
||||
* currently this includes ".gz" and ".lz4", then this compression will be used
|
||||
* throughout. Otherwise the compression will be inferred by iteratively trying
|
||||
* to open the file at 'path', first as is, then by appending known compression
|
||||
* suffixes. So if you pass "foo" as 'path', this will open either "foo" or
|
||||
* "foo.gz", trying in that order.
|
||||
* "foo.gz" or "foo.lz4", trying in that order.
|
||||
*
|
||||
* On failure, return NULL with an error code in errno.
|
||||
*/
|
||||
|
@ -238,6 +247,17 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
|
|||
if (exists)
|
||||
compression_spec.algorithm = PG_COMPRESSION_GZIP;
|
||||
}
|
||||
#endif
|
||||
#ifdef USE_LZ4
|
||||
if (!exists)
|
||||
{
|
||||
free_keep_errno(fname);
|
||||
fname = psprintf("%s.lz4", path);
|
||||
exists = (stat(fname, &st) == 0);
|
||||
|
||||
if (exists)
|
||||
compression_spec.algorithm = PG_COMPRESSION_LZ4;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,626 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* compress_lz4.c
|
||||
* Routines for archivers to write a LZ4 compressed data stream.
|
||||
*
|
||||
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/bin/pg_dump/compress_lz4.c
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres_fe.h"
|
||||
#include "pg_backup_utils.h"
|
||||
|
||||
#include "compress_lz4.h"
|
||||
|
||||
#ifdef USE_LZ4
|
||||
#include <lz4.h>
|
||||
#include <lz4frame.h>
|
||||
|
||||
#define LZ4_OUT_SIZE (4 * 1024)
|
||||
#define LZ4_IN_SIZE (16 * 1024)
|
||||
|
||||
/*
|
||||
* LZ4F_HEADER_SIZE_MAX first appeared in v1.7.5 of the library.
|
||||
* Redefine it for installations with a lesser version.
|
||||
*/
|
||||
#ifndef LZ4F_HEADER_SIZE_MAX
|
||||
#define LZ4F_HEADER_SIZE_MAX 32
|
||||
#endif
|
||||
|
||||
/*----------------------
|
||||
* Compressor API
|
||||
*----------------------
|
||||
*/
|
||||
|
||||
typedef struct LZ4CompressorState
|
||||
{
|
||||
char *outbuf;
|
||||
size_t outsize;
|
||||
} LZ4CompressorState;
|
||||
|
||||
/* Private routines that support LZ4 compressed data I/O */
|
||||
static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs);
|
||||
static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
|
||||
const void *data, size_t dLen);
|
||||
static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
|
||||
|
||||
static void
|
||||
ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
|
||||
{
|
||||
LZ4_streamDecode_t lz4StreamDecode;
|
||||
char *buf;
|
||||
char *decbuf;
|
||||
size_t buflen;
|
||||
size_t cnt;
|
||||
|
||||
buflen = LZ4_IN_SIZE;
|
||||
buf = pg_malloc(buflen);
|
||||
decbuf = pg_malloc(buflen);
|
||||
|
||||
LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
|
||||
|
||||
while ((cnt = cs->readF(AH, &buf, &buflen)))
|
||||
{
|
||||
int decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
|
||||
buf, decbuf,
|
||||
cnt, buflen);
|
||||
|
||||
ahwrite(decbuf, 1, decBytes, AH);
|
||||
}
|
||||
|
||||
pg_free(buf);
|
||||
pg_free(decbuf);
|
||||
}
|
||||
|
||||
static void
|
||||
WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
|
||||
const void *data, size_t dLen)
|
||||
{
|
||||
LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data;
|
||||
size_t compressed;
|
||||
size_t requiredsize = LZ4_compressBound(dLen);
|
||||
|
||||
if (requiredsize > LZ4cs->outsize)
|
||||
{
|
||||
LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
|
||||
LZ4cs->outsize = requiredsize;
|
||||
}
|
||||
|
||||
compressed = LZ4_compress_default(data, LZ4cs->outbuf,
|
||||
dLen, LZ4cs->outsize);
|
||||
|
||||
if (compressed <= 0)
|
||||
pg_fatal("failed to LZ4 compress data");
|
||||
|
||||
cs->writeF(AH, LZ4cs->outbuf, compressed);
|
||||
}
|
||||
|
||||
static void
|
||||
EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
|
||||
{
|
||||
LZ4CompressorState *LZ4cs;
|
||||
|
||||
LZ4cs = (LZ4CompressorState *) cs->private_data;
|
||||
if (LZ4cs)
|
||||
{
|
||||
pg_free(LZ4cs->outbuf);
|
||||
pg_free(LZ4cs);
|
||||
cs->private_data = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Public routines that support LZ4 compressed data I/O
|
||||
*/
|
||||
void
|
||||
InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
|
||||
{
|
||||
cs->readData = ReadDataFromArchiveLZ4;
|
||||
cs->writeData = WriteDataToArchiveLZ4;
|
||||
cs->end = EndCompressorLZ4;
|
||||
|
||||
cs->compression_spec = compression_spec;
|
||||
|
||||
/* Will be lazy init'd */
|
||||
cs->private_data = pg_malloc0(sizeof(LZ4CompressorState));
|
||||
}
|
||||
|
||||
/*----------------------
|
||||
* Compress File API
|
||||
*----------------------
|
||||
*/
|
||||
|
||||
/*
|
||||
* State needed for LZ4 (de)compression using the CompressFileHandle API.
|
||||
*/
|
||||
typedef struct LZ4File
|
||||
{
|
||||
FILE *fp;
|
||||
|
||||
LZ4F_preferences_t prefs;
|
||||
|
||||
LZ4F_compressionContext_t ctx;
|
||||
LZ4F_decompressionContext_t dtx;
|
||||
|
||||
bool inited;
|
||||
bool compressing;
|
||||
|
||||
size_t buflen;
|
||||
char *buffer;
|
||||
|
||||
size_t overflowalloclen;
|
||||
size_t overflowlen;
|
||||
char *overflowbuf;
|
||||
|
||||
size_t errcode;
|
||||
} LZ4File;
|
||||
|
||||
/*
|
||||
* LZ4 equivalent to feof() or gzeof(). The end of file is reached if there
|
||||
* is no decompressed output in the overflow buffer and the end of the file
|
||||
* is reached.
|
||||
*/
|
||||
static int
|
||||
LZ4File_eof(CompressFileHandle *CFH)
|
||||
{
|
||||
LZ4File *fs = (LZ4File *) CFH->private_data;
|
||||
|
||||
return fs->overflowlen == 0 && feof(fs->fp);
|
||||
}
|
||||
|
||||
static const char *
|
||||
LZ4File_get_error(CompressFileHandle *CFH)
|
||||
{
|
||||
LZ4File *fs = (LZ4File *) CFH->private_data;
|
||||
const char *errmsg;
|
||||
|
||||
if (LZ4F_isError(fs->errcode))
|
||||
errmsg = LZ4F_getErrorName(fs->errcode);
|
||||
else
|
||||
errmsg = strerror(errno);
|
||||
|
||||
return errmsg;
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare an already alloc'ed LZ4File struct for subsequent calls.
|
||||
*
|
||||
* It creates the necessary contexts for the operations. When compressing,
|
||||
* it additionally writes the LZ4 header in the output stream.
|
||||
*/
|
||||
static int
|
||||
LZ4File_init(LZ4File *fs, int size, bool compressing)
|
||||
{
|
||||
size_t status;
|
||||
|
||||
if (fs->inited)
|
||||
return 0;
|
||||
|
||||
fs->compressing = compressing;
|
||||
fs->inited = true;
|
||||
|
||||
if (fs->compressing)
|
||||
{
|
||||
fs->buflen = LZ4F_compressBound(LZ4_IN_SIZE, &fs->prefs);
|
||||
if (fs->buflen < LZ4F_HEADER_SIZE_MAX)
|
||||
fs->buflen = LZ4F_HEADER_SIZE_MAX;
|
||||
|
||||
status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION);
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
fs->errcode = status;
|
||||
return 1;
|
||||
}
|
||||
|
||||
fs->buffer = pg_malloc(fs->buflen);
|
||||
status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen,
|
||||
&fs->prefs);
|
||||
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
fs->errcode = status;
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (fwrite(fs->buffer, 1, status, fs->fp) != status)
|
||||
{
|
||||
errno = (errno) ? errno : ENOSPC;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
status = LZ4F_createDecompressionContext(&fs->dtx, LZ4F_VERSION);
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
fs->errcode = status;
|
||||
return 1;
|
||||
}
|
||||
|
||||
fs->buflen = size > LZ4_OUT_SIZE ? size : LZ4_OUT_SIZE;
|
||||
fs->buffer = pg_malloc(fs->buflen);
|
||||
|
||||
fs->overflowalloclen = fs->buflen;
|
||||
fs->overflowbuf = pg_malloc(fs->overflowalloclen);
|
||||
fs->overflowlen = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Read already decompressed content from the overflow buffer into 'ptr' up to
|
||||
* 'size' bytes, if available. If the eol_flag is set, then stop at the first
|
||||
* occurrence of the new line char prior to 'size' bytes.
|
||||
*
|
||||
* Any unread content in the overflow buffer is moved to the beginning.
|
||||
*/
|
||||
static int
|
||||
LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
|
||||
{
|
||||
char *p;
|
||||
int readlen = 0;
|
||||
|
||||
if (fs->overflowlen == 0)
|
||||
return 0;
|
||||
|
||||
if (fs->overflowlen >= size)
|
||||
readlen = size;
|
||||
else
|
||||
readlen = fs->overflowlen;
|
||||
|
||||
if (eol_flag && (p = memchr(fs->overflowbuf, '\n', readlen)))
|
||||
/* Include the line terminating char */
|
||||
readlen = p - fs->overflowbuf + 1;
|
||||
|
||||
memcpy(ptr, fs->overflowbuf, readlen);
|
||||
fs->overflowlen -= readlen;
|
||||
|
||||
if (fs->overflowlen > 0)
|
||||
memmove(fs->overflowbuf, fs->overflowbuf + readlen, fs->overflowlen);
|
||||
|
||||
return readlen;
|
||||
}
|
||||
|
||||
/*
|
||||
* The workhorse for reading decompressed content out of an LZ4 compressed
|
||||
* stream.
|
||||
*
|
||||
* It will read up to 'ptrsize' decompressed content, or up to the new line
|
||||
* char if found first when the eol_flag is set. It is possible that the
|
||||
* decompressed output generated by reading any compressed input via the
|
||||
* LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
|
||||
* at an overflow buffer within LZ4File. Of course, when the function is
|
||||
* called, it will first try to consume any decompressed content already
|
||||
* present in the overflow buffer, before decompressing new content.
|
||||
*/
|
||||
static int
|
||||
LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
|
||||
{
|
||||
size_t dsize = 0;
|
||||
size_t rsize;
|
||||
size_t size = ptrsize;
|
||||
bool eol_found = false;
|
||||
|
||||
void *readbuf;
|
||||
|
||||
/* Lazy init */
|
||||
if (LZ4File_init(fs, size, false /* decompressing */ ))
|
||||
return -1;
|
||||
|
||||
/* Verify that there is enough space in the outbuf */
|
||||
if (size > fs->buflen)
|
||||
{
|
||||
fs->buflen = size;
|
||||
fs->buffer = pg_realloc(fs->buffer, size);
|
||||
}
|
||||
|
||||
/* use already decompressed content if available */
|
||||
dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag);
|
||||
if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
|
||||
return dsize;
|
||||
|
||||
readbuf = pg_malloc(size);
|
||||
|
||||
do
|
||||
{
|
||||
char *rp;
|
||||
char *rend;
|
||||
|
||||
rsize = fread(readbuf, 1, size, fs->fp);
|
||||
if (rsize < size && !feof(fs->fp))
|
||||
return -1;
|
||||
|
||||
rp = (char *) readbuf;
|
||||
rend = (char *) readbuf + rsize;
|
||||
|
||||
while (rp < rend)
|
||||
{
|
||||
size_t status;
|
||||
size_t outlen = fs->buflen;
|
||||
size_t read_remain = rend - rp;
|
||||
|
||||
memset(fs->buffer, 0, outlen);
|
||||
status = LZ4F_decompress(fs->dtx, fs->buffer, &outlen,
|
||||
rp, &read_remain, NULL);
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
fs->errcode = status;
|
||||
return -1;
|
||||
}
|
||||
|
||||
rp += read_remain;
|
||||
|
||||
/*
|
||||
* fill in what space is available in ptr if the eol flag is set,
|
||||
* either skip if one already found or fill up to EOL if present
|
||||
* in the outbuf
|
||||
*/
|
||||
if (outlen > 0 && dsize < size && eol_found == false)
|
||||
{
|
||||
char *p;
|
||||
size_t lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
|
||||
size_t len = outlen < lib ? outlen : lib;
|
||||
|
||||
if (eol_flag &&
|
||||
(p = memchr(fs->buffer, '\n', outlen)) &&
|
||||
(size_t) (p - fs->buffer + 1) <= len)
|
||||
{
|
||||
len = p - fs->buffer + 1;
|
||||
eol_found = true;
|
||||
}
|
||||
|
||||
memcpy((char *) ptr + dsize, fs->buffer, len);
|
||||
dsize += len;
|
||||
|
||||
/* move what did not fit, if any, at the beginning of the buf */
|
||||
if (len < outlen)
|
||||
memmove(fs->buffer, fs->buffer + len, outlen - len);
|
||||
outlen -= len;
|
||||
}
|
||||
|
||||
/* if there is available output, save it */
|
||||
if (outlen > 0)
|
||||
{
|
||||
while (fs->overflowlen + outlen > fs->overflowalloclen)
|
||||
{
|
||||
fs->overflowalloclen *= 2;
|
||||
fs->overflowbuf = pg_realloc(fs->overflowbuf,
|
||||
fs->overflowalloclen);
|
||||
}
|
||||
|
||||
memcpy(fs->overflowbuf + fs->overflowlen, fs->buffer, outlen);
|
||||
fs->overflowlen += outlen;
|
||||
}
|
||||
}
|
||||
} while (rsize == size && dsize < size && eol_found == 0);
|
||||
|
||||
pg_free(readbuf);
|
||||
|
||||
return (int) dsize;
|
||||
}
|
||||
|
||||
/*
|
||||
* Compress size bytes from ptr and write them to the stream.
|
||||
*/
|
||||
static size_t
|
||||
LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
|
||||
{
|
||||
LZ4File *fs = (LZ4File *) CFH->private_data;
|
||||
size_t status;
|
||||
int remaining = size;
|
||||
|
||||
/* Lazy init */
|
||||
if (LZ4File_init(fs, size, true))
|
||||
return -1;
|
||||
|
||||
while (remaining > 0)
|
||||
{
|
||||
int chunk = remaining < LZ4_IN_SIZE ? remaining : LZ4_IN_SIZE;
|
||||
|
||||
remaining -= chunk;
|
||||
|
||||
status = LZ4F_compressUpdate(fs->ctx, fs->buffer, fs->buflen,
|
||||
ptr, chunk, NULL);
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
fs->errcode = status;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (fwrite(fs->buffer, 1, status, fs->fp) != status)
|
||||
{
|
||||
errno = (errno) ? errno : ENOSPC;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
/*
|
||||
* fread() equivalent implementation for LZ4 compressed files.
|
||||
*/
|
||||
static size_t
|
||||
LZ4File_read(void *ptr, size_t size, CompressFileHandle *CFH)
|
||||
{
|
||||
LZ4File *fs = (LZ4File *) CFH->private_data;
|
||||
int ret;
|
||||
|
||||
ret = LZ4File_read_internal(fs, ptr, size, false);
|
||||
if (ret != size && !LZ4File_eof(CFH))
|
||||
pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* fgetc() equivalent implementation for LZ4 compressed files.
|
||||
*/
|
||||
static int
|
||||
LZ4File_getc(CompressFileHandle *CFH)
|
||||
{
|
||||
LZ4File *fs = (LZ4File *) CFH->private_data;
|
||||
unsigned char c;
|
||||
|
||||
if (LZ4File_read_internal(fs, &c, 1, false) != 1)
|
||||
{
|
||||
if (!LZ4File_eof(CFH))
|
||||
pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
|
||||
else
|
||||
pg_fatal("could not read from input file: end of file");
|
||||
}
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
/*
|
||||
* fgets() equivalent implementation for LZ4 compressed files.
|
||||
*/
|
||||
static char *
|
||||
LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH)
|
||||
{
|
||||
LZ4File *fs = (LZ4File *) CFH->private_data;
|
||||
size_t dsize;
|
||||
|
||||
dsize = LZ4File_read_internal(fs, ptr, size, true);
|
||||
if (dsize < 0)
|
||||
pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
|
||||
|
||||
/* Done reading */
|
||||
if (dsize == 0)
|
||||
return NULL;
|
||||
|
||||
return ptr;
|
||||
}
|
||||
|
||||
/*
|
||||
* Finalize (de)compression of a stream. When compressing it will write any
|
||||
* remaining content and/or generated footer from the LZ4 API.
|
||||
*/
|
||||
static int
|
||||
LZ4File_close(CompressFileHandle *CFH)
|
||||
{
|
||||
FILE *fp;
|
||||
LZ4File *fs = (LZ4File *) CFH->private_data;
|
||||
size_t status;
|
||||
int ret;
|
||||
|
||||
fp = fs->fp;
|
||||
if (fs->inited)
|
||||
{
|
||||
if (fs->compressing)
|
||||
{
|
||||
status = LZ4F_compressEnd(fs->ctx, fs->buffer, fs->buflen, NULL);
|
||||
if (LZ4F_isError(status))
|
||||
pg_fatal("failed to end compression: %s",
|
||||
LZ4F_getErrorName(status));
|
||||
else if ((ret = fwrite(fs->buffer, 1, status, fs->fp)) != status)
|
||||
{
|
||||
errno = (errno) ? errno : ENOSPC;
|
||||
WRITE_ERROR_EXIT;
|
||||
}
|
||||
|
||||
status = LZ4F_freeCompressionContext(fs->ctx);
|
||||
if (LZ4F_isError(status))
|
||||
pg_fatal("failed to end compression: %s",
|
||||
LZ4F_getErrorName(status));
|
||||
}
|
||||
else
|
||||
{
|
||||
status = LZ4F_freeDecompressionContext(fs->dtx);
|
||||
if (LZ4F_isError(status))
|
||||
pg_fatal("failed to end decompression: %s",
|
||||
LZ4F_getErrorName(status));
|
||||
pg_free(fs->overflowbuf);
|
||||
}
|
||||
|
||||
pg_free(fs->buffer);
|
||||
}
|
||||
|
||||
pg_free(fs);
|
||||
|
||||
return fclose(fp);
|
||||
}
|
||||
|
||||
static int
|
||||
LZ4File_open(const char *path, int fd, const char *mode,
|
||||
CompressFileHandle *CFH)
|
||||
{
|
||||
FILE *fp;
|
||||
LZ4File *lz4fp = (LZ4File *) CFH->private_data;
|
||||
|
||||
if (fd >= 0)
|
||||
fp = fdopen(fd, mode);
|
||||
else
|
||||
fp = fopen(path, mode);
|
||||
if (fp == NULL)
|
||||
{
|
||||
lz4fp->errcode = errno;
|
||||
return 1;
|
||||
}
|
||||
|
||||
lz4fp->fp = fp;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int
|
||||
LZ4File_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
|
||||
{
|
||||
char *fname;
|
||||
int ret;
|
||||
|
||||
fname = psprintf("%s.lz4", path);
|
||||
ret = CFH->open_func(fname, -1, mode, CFH);
|
||||
pg_free(fname);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Public routines
|
||||
*/
|
||||
void
|
||||
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
|
||||
const pg_compress_specification compression_spec)
|
||||
{
|
||||
LZ4File *lz4fp;
|
||||
|
||||
CFH->open_func = LZ4File_open;
|
||||
CFH->open_write_func = LZ4File_open_write;
|
||||
CFH->read_func = LZ4File_read;
|
||||
CFH->write_func = LZ4File_write;
|
||||
CFH->gets_func = LZ4File_gets;
|
||||
CFH->getc_func = LZ4File_getc;
|
||||
CFH->eof_func = LZ4File_eof;
|
||||
CFH->close_func = LZ4File_close;
|
||||
CFH->get_error_func = LZ4File_get_error;
|
||||
|
||||
CFH->compression_spec = compression_spec;
|
||||
lz4fp = pg_malloc0(sizeof(*lz4fp));
|
||||
if (CFH->compression_spec.level >= 0)
|
||||
lz4fp->prefs.compressionLevel = CFH->compression_spec.level;
|
||||
|
||||
CFH->private_data = lz4fp;
|
||||
}
|
||||
#else /* USE_LZ4 */
|
||||
void
|
||||
InitCompressorLZ4(CompressorState *cs,
|
||||
const pg_compress_specification compression_spec)
|
||||
{
|
||||
pg_fatal("this build does not support compression with %s", "LZ4");
|
||||
}
|
||||
|
||||
void
|
||||
InitCompressFileHandleLZ4(CompressFileHandle *CFH,
|
||||
const pg_compress_specification compression_spec)
|
||||
{
|
||||
pg_fatal("this build does not support compression with %s", "LZ4");
|
||||
}
|
||||
#endif /* USE_LZ4 */
|
|
@ -0,0 +1,24 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* compress_lz4.h
|
||||
* LZ4 interface to compress_io.c routines
|
||||
*
|
||||
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
|
||||
* Portions Copyright (c) 1994, Regents of the University of California
|
||||
*
|
||||
* IDENTIFICATION
|
||||
* src/bin/pg_dump/compress_lz4.h
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef _COMPRESS_LZ4_H_
|
||||
#define _COMPRESS_LZ4_H_
|
||||
|
||||
#include "compress_io.h"
|
||||
|
||||
extern void InitCompressorLZ4(CompressorState *cs,
|
||||
const pg_compress_specification compression_spec);
|
||||
extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
|
||||
const pg_compress_specification compression_spec);
|
||||
|
||||
#endif /* _COMPRESS_LZ4_H_ */
|
|
@ -3,6 +3,7 @@
|
|||
pg_dump_common_sources = files(
|
||||
'compress_gzip.c',
|
||||
'compress_io.c',
|
||||
'compress_lz4.c',
|
||||
'compress_none.c',
|
||||
'dumputils.c',
|
||||
'parallel.c',
|
||||
|
@ -18,7 +19,7 @@ pg_dump_common_sources = files(
|
|||
pg_dump_common = static_library('libpgdump_common',
|
||||
pg_dump_common_sources,
|
||||
c_pch: pch_postgres_fe_h,
|
||||
dependencies: [frontend_code, libpq, zlib],
|
||||
dependencies: [frontend_code, libpq, lz4, zlib],
|
||||
kwargs: internal_lib_args,
|
||||
)
|
||||
|
||||
|
@ -86,7 +87,10 @@ tests += {
|
|||
'sd': meson.current_source_dir(),
|
||||
'bd': meson.current_build_dir(),
|
||||
'tap': {
|
||||
'env': {'GZIP_PROGRAM': gzip.path()},
|
||||
'env': {
|
||||
'GZIP_PROGRAM': gzip.path(),
|
||||
'LZ4': program_lz4.found() ? program_lz4.path() : '',
|
||||
},
|
||||
'tests': [
|
||||
't/001_basic.pl',
|
||||
't/002_pg_dump.pl',
|
||||
|
|
|
@ -2075,7 +2075,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
|
|||
|
||||
/*
|
||||
* Check if the specified archive is a directory. If so, check if
|
||||
* there's a "toc.dat" (or "toc.dat.gz") file in it.
|
||||
* there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
|
||||
*/
|
||||
if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
|
||||
{
|
||||
|
@ -2085,6 +2085,10 @@ _discoverArchiveFormat(ArchiveHandle *AH)
|
|||
#ifdef HAVE_LIBZ
|
||||
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
|
||||
return AH->format;
|
||||
#endif
|
||||
#ifdef USE_LZ4
|
||||
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
|
||||
return AH->format;
|
||||
#endif
|
||||
pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
|
||||
AH->fSpec);
|
||||
|
|
|
@ -779,10 +779,13 @@ _PrepParallelRestore(ArchiveHandle *AH)
|
|||
|
||||
if (stat(fname, &st) == 0)
|
||||
te->dataLength = st.st_size;
|
||||
else
|
||||
else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
|
||||
{
|
||||
/* It might be compressed */
|
||||
strlcat(fname, ".gz", sizeof(fname));
|
||||
if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
|
||||
strlcat(fname, ".gz", sizeof(fname));
|
||||
else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
|
||||
strlcat(fname, ".lz4", sizeof(fname));
|
||||
|
||||
if (stat(fname, &st) == 0)
|
||||
te->dataLength = st.st_size;
|
||||
}
|
||||
|
|
|
@ -715,13 +715,12 @@ main(int argc, char **argv)
|
|||
case PG_COMPRESSION_NONE:
|
||||
/* fallthrough */
|
||||
case PG_COMPRESSION_GZIP:
|
||||
/* fallthrough */
|
||||
case PG_COMPRESSION_LZ4:
|
||||
break;
|
||||
case PG_COMPRESSION_ZSTD:
|
||||
pg_fatal("compression with %s is not yet supported", "ZSTD");
|
||||
break;
|
||||
case PG_COMPRESSION_LZ4:
|
||||
pg_fatal("compression with %s is not yet supported", "LZ4");
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -139,6 +139,80 @@ my %pgdump_runs = (
|
|||
args => [ '-d', "$tempdir/compression_gzip_plain.sql.gz", ],
|
||||
},
|
||||
},
|
||||
|
||||
# Do not use --no-sync to give test coverage for data sync.
|
||||
compression_lz4_custom => {
|
||||
test_key => 'compression',
|
||||
compile_option => 'lz4',
|
||||
dump_cmd => [
|
||||
'pg_dump', '--format=custom',
|
||||
'--compress=lz4', "--file=$tempdir/compression_lz4_custom.dump",
|
||||
'postgres',
|
||||
],
|
||||
restore_cmd => [
|
||||
'pg_restore',
|
||||
"--file=$tempdir/compression_lz4_custom.sql",
|
||||
"$tempdir/compression_lz4_custom.dump",
|
||||
],
|
||||
command_like => {
|
||||
command => [
|
||||
'pg_restore',
|
||||
'-l', "$tempdir/compression_lz4_custom.dump",
|
||||
],
|
||||
expected => qr/Compression: lz4/,
|
||||
name => 'data content is lz4 compressed'
|
||||
},
|
||||
},
|
||||
|
||||
# Do not use --no-sync to give test coverage for data sync.
|
||||
compression_lz4_dir => {
|
||||
test_key => 'compression',
|
||||
compile_option => 'lz4',
|
||||
dump_cmd => [
|
||||
'pg_dump', '--jobs=2',
|
||||
'--format=directory', '--compress=lz4:1',
|
||||
"--file=$tempdir/compression_lz4_dir", 'postgres',
|
||||
],
|
||||
# Give coverage for manually compressed blob.toc files during
|
||||
# restore.
|
||||
compress_cmd => {
|
||||
program => $ENV{'LZ4'},
|
||||
args => [
|
||||
'-z', '-f', '--rm',
|
||||
"$tempdir/compression_lz4_dir/blobs.toc",
|
||||
"$tempdir/compression_lz4_dir/blobs.toc.lz4",
|
||||
],
|
||||
},
|
||||
# Verify that data files were compressed
|
||||
glob_patterns => [
|
||||
"$tempdir/compression_lz4_dir/toc.dat",
|
||||
"$tempdir/compression_lz4_dir/*.dat.lz4",
|
||||
],
|
||||
restore_cmd => [
|
||||
'pg_restore', '--jobs=2',
|
||||
"--file=$tempdir/compression_lz4_dir.sql",
|
||||
"$tempdir/compression_lz4_dir",
|
||||
],
|
||||
},
|
||||
|
||||
compression_lz4_plain => {
|
||||
test_key => 'compression',
|
||||
compile_option => 'lz4',
|
||||
dump_cmd => [
|
||||
'pg_dump', '--format=plain', '--compress=lz4',
|
||||
"--file=$tempdir/compression_lz4_plain.sql.lz4", 'postgres',
|
||||
],
|
||||
# Decompress the generated file to run through the tests.
|
||||
compress_cmd => {
|
||||
program => $ENV{'LZ4'},
|
||||
args => [
|
||||
'-d', '-f',
|
||||
"$tempdir/compression_lz4_plain.sql.lz4",
|
||||
"$tempdir/compression_lz4_plain.sql",
|
||||
],
|
||||
},
|
||||
},
|
||||
|
||||
clean => {
|
||||
dump_cmd => [
|
||||
'pg_dump',
|
||||
|
@ -4175,11 +4249,11 @@ foreach my $run (sort keys %pgdump_runs)
|
|||
my $run_db = 'postgres';
|
||||
|
||||
# Skip command-level tests for gzip if there is no support for it.
|
||||
if ( defined($pgdump_runs{$run}->{compile_option})
|
||||
&& $pgdump_runs{$run}->{compile_option} eq 'gzip'
|
||||
&& !$supports_gzip)
|
||||
if ($pgdump_runs{$run}->{compile_option} &&
|
||||
($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
|
||||
($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))
|
||||
{
|
||||
note "$run: skipped due to no gzip support";
|
||||
note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
|
||||
next;
|
||||
}
|
||||
|
||||
|
|
|
@ -152,6 +152,7 @@ do
|
|||
# as field names, which is unfortunate but we won't change it now.
|
||||
test "$f" = src/bin/pg_dump/compress_gzip.h && continue
|
||||
test "$f" = src/bin/pg_dump/compress_io.h && continue
|
||||
test "$f" = src/bin/pg_dump/compress_lz4.h && continue
|
||||
test "$f" = src/bin/pg_dump/compress_none.h && continue
|
||||
test "$f" = src/bin/pg_dump/parallel.h && continue
|
||||
test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue
|
||||
|
|
|
@ -1387,11 +1387,13 @@ LWLock
|
|||
LWLockHandle
|
||||
LWLockMode
|
||||
LWLockPadded
|
||||
LZ4CompressorState
|
||||
LZ4F_compressionContext_t
|
||||
LZ4F_decompressOptions_t
|
||||
LZ4F_decompressionContext_t
|
||||
LZ4F_errorCode_t
|
||||
LZ4F_preferences_t
|
||||
LZ4File
|
||||
LabelProvider
|
||||
LagTracker
|
||||
LargeObjectDesc
|
||||
|
|
Loading…
Reference in New Issue