Add support for LZ4 compression in pg_receivewal
pg_receivewal gains a new option, --compression-method=lz4, available when the code is compiled with --with-lz4. Similarly to gzip, this gives the possibility to compress archived WAL segments with LZ4. This option is not compatible with --compress. The implementation uses LZ4 frames, and is compatible with simple lz4 commands. Like gzip, using --synchronous ensures that any data will be flushed to disk within the current .partial segment, so as it is possible to retrieve as much WAL data as possible even from a non-completed segment (this requires completing the partial file with zeros up to the WAL segment size supported by the backend after decompression, but this is the same as gzip). The calculation of the streaming start LSN is able to transparently find and check LZ4-compressed segments. Contrary to gzip where the uncompressed size is directly stored in the object read, the LZ4 chunk protocol does not store the uncompressed data by default. There is contentSize that can be used with LZ4 frames by that would not help if using an archive that includes segments compressed with the defaults of a "lz4" command, where this is not stored. So, this commit has taken the most extensible approach by decompressing the already-archived segment to check its uncompressed size, through a blank output buffer in chunks of 64kB (no actual performance difference noticed with 8kB, 16kB or 32kB, and the operation in itself is actually fast). Tests have been added to verify the creation and correctness of the generated LZ4 files. The latter is achieved by the use of command "lz4", if found in the environment. The tar-based WAL method in walmethods.c, used now only by pg_basebackup, does not know yet about LZ4. Its code could be extended for this purpose. Author: Georgios Kokolatos Reviewed-by: Michael Paquier, Jian Guo, Magnus Hagander, Dilip Kumar Discussion: https://postgr.es/m/ZCm1J5vfyQ2E6dYvXz8si39HQ2gwxSZ3IpYaVgYa3lUwY88SLapx9EEnOf5uEwrddhx2twG7zYKjVeuP5MwZXCNPybtsGouDsAD1o2L_I5E=@pm.me
This commit is contained in:
parent
5cd7eb1f1c
commit
babbbb595d
doc/src/sgml/ref
src
@ -268,13 +268,15 @@ PostgreSQL documentation
|
||||
<listitem>
|
||||
<para>
|
||||
Enables compression of write-ahead logs using the specified method.
|
||||
Supported values <literal>gzip</literal>, and
|
||||
<literal>none</literal>.
|
||||
Supported values <literal>gzip</literal>, <literal>lz4</literal>
|
||||
(if <productname>PostgreSQL</productname> was compiled with
|
||||
<option>--with-lz4</option>), and <literal>none</literal>.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The suffix <filename>.gz</filename> will automatically be added to
|
||||
all filenames when using <literal>gzip</literal>
|
||||
all filenames when using <literal>gzip</literal>, and the suffix
|
||||
<filename>.lz4</filename> is added when using <literal>lz4</literal>.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
@ -350,6 +350,7 @@ XGETTEXT = @XGETTEXT@
|
||||
|
||||
GZIP = gzip
|
||||
BZIP2 = bzip2
|
||||
LZ4 = lz4
|
||||
|
||||
DOWNLOAD = wget -O $@ --no-use-server-timestamps
|
||||
#DOWNLOAD = curl -o $@
|
||||
|
@ -19,6 +19,7 @@ top_builddir = ../../..
|
||||
include $(top_builddir)/src/Makefile.global
|
||||
|
||||
# make these available to TAP test scripts
|
||||
export LZ4
|
||||
export TAR
|
||||
# Note that GZIP cannot be used directly as this environment variable is
|
||||
# used by the command "gzip" to pass down options, so stick with a different
|
||||
|
@ -32,6 +32,10 @@
|
||||
#include "receivelog.h"
|
||||
#include "streamutil.h"
|
||||
|
||||
#ifdef HAVE_LIBLZ4
|
||||
#include "lz4frame.h"
|
||||
#endif
|
||||
|
||||
/* Time to sleep between reconnection attempts */
|
||||
#define RECONNECT_SLEEP_TIME 5
|
||||
|
||||
@ -136,6 +140,15 @@ is_xlogfilename(const char *filename, bool *ispartial,
|
||||
return true;
|
||||
}
|
||||
|
||||
/* File looks like a completed LZ4-compressed WAL file */
|
||||
if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
|
||||
strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
|
||||
{
|
||||
*ispartial = false;
|
||||
*wal_compression_method = COMPRESSION_LZ4;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* File looks like a partial uncompressed WAL file */
|
||||
if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
|
||||
strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
|
||||
@ -154,6 +167,15 @@ is_xlogfilename(const char *filename, bool *ispartial,
|
||||
return true;
|
||||
}
|
||||
|
||||
/* File looks like a partial LZ4-compressed WAL file */
|
||||
if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
|
||||
strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
|
||||
{
|
||||
*ispartial = true;
|
||||
*wal_compression_method = COMPRESSION_LZ4;
|
||||
return true;
|
||||
}
|
||||
|
||||
/* File does not look like something we know */
|
||||
return false;
|
||||
}
|
||||
@ -278,12 +300,20 @@ FindStreamingStart(uint32 *tli)
|
||||
/*
|
||||
* Check that the segment has the right size, if it's supposed to be
|
||||
* completed. For non-compressed segments just check the on-disk size
|
||||
* and see if it matches a completed segment. For gzip-compressed
|
||||
* and see if it matches a completed segment. For gzip-compressed
|
||||
* segments, look at the last 4 bytes of the compressed file, which is
|
||||
* where the uncompressed size is located for files with a size lower
|
||||
* than 4GB, and then compare it to the size of a completed segment.
|
||||
* The 4 last bytes correspond to the ISIZE member according to
|
||||
* http://www.zlib.org/rfc-gzip.html.
|
||||
*
|
||||
* For LZ4-compressed segments, uncompress the file in a throw-away
|
||||
* buffer keeping track of the uncompressed size, then compare it to
|
||||
* the size of a completed segment. Per its protocol, LZ4 does not
|
||||
* store the uncompressed size of an object by default. contentSize
|
||||
* is one possible way to do that, but we need to rely on a method
|
||||
* where WAL segments could have been compressed by a different source
|
||||
* than pg_receivewal, like an archive_command with lz4.
|
||||
*/
|
||||
if (!ispartial && wal_compression_method == COMPRESSION_NONE)
|
||||
{
|
||||
@ -350,6 +380,114 @@ FindStreamingStart(uint32 *tli)
|
||||
continue;
|
||||
}
|
||||
}
|
||||
else if (!ispartial && wal_compression_method == COMPRESSION_LZ4)
|
||||
{
|
||||
#ifdef HAVE_LIBLZ4
|
||||
#define LZ4_CHUNK_SZ 64 * 1024 /* 64kB as maximum chunk size read */
|
||||
int fd;
|
||||
ssize_t r;
|
||||
size_t uncompressed_size = 0;
|
||||
char fullpath[MAXPGPATH * 2];
|
||||
char *outbuf;
|
||||
char *readbuf;
|
||||
LZ4F_decompressionContext_t ctx = NULL;
|
||||
LZ4F_decompressOptions_t dec_opt;
|
||||
LZ4F_errorCode_t status;
|
||||
|
||||
memset(&dec_opt, 0, sizeof(dec_opt));
|
||||
snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
|
||||
|
||||
fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
|
||||
if (fd < 0)
|
||||
{
|
||||
pg_log_error("could not open file \"%s\": %m", fullpath);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
pg_log_error("could not create LZ4 decompression context: %s",
|
||||
LZ4F_getErrorName(status));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
outbuf = pg_malloc0(LZ4_CHUNK_SZ);
|
||||
readbuf = pg_malloc0(LZ4_CHUNK_SZ);
|
||||
do
|
||||
{
|
||||
char *readp;
|
||||
char *readend;
|
||||
|
||||
r = read(fd, readbuf, LZ4_CHUNK_SZ);
|
||||
if (r < 0)
|
||||
{
|
||||
pg_log_error("could not read file \"%s\": %m", fullpath);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Done reading the file */
|
||||
if (r == 0)
|
||||
break;
|
||||
|
||||
/* Process one chunk */
|
||||
readp = readbuf;
|
||||
readend = readbuf + r;
|
||||
while (readp < readend)
|
||||
{
|
||||
size_t out_size = LZ4_CHUNK_SZ;
|
||||
size_t read_size = readend - readp;
|
||||
|
||||
memset(outbuf, 0, LZ4_CHUNK_SZ);
|
||||
status = LZ4F_decompress(ctx, outbuf, &out_size,
|
||||
readp, &read_size, &dec_opt);
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
pg_log_error("could not decompress file \"%s\": %s",
|
||||
fullpath,
|
||||
LZ4F_getErrorName(status));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
readp += read_size;
|
||||
uncompressed_size += out_size;
|
||||
}
|
||||
|
||||
/*
|
||||
* No need to continue reading the file when the
|
||||
* uncompressed_size exceeds WalSegSz, even if there are still
|
||||
* data left to read. However, if uncompressed_size is equal
|
||||
* to WalSegSz, it should verify that there is no more data to
|
||||
* read.
|
||||
*/
|
||||
} while (uncompressed_size <= WalSegSz && r > 0);
|
||||
|
||||
close(fd);
|
||||
pg_free(outbuf);
|
||||
pg_free(readbuf);
|
||||
|
||||
status = LZ4F_freeDecompressionContext(ctx);
|
||||
if (LZ4F_isError(status))
|
||||
{
|
||||
pg_log_error("could not free LZ4 decompression context: %s",
|
||||
LZ4F_getErrorName(status));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
if (uncompressed_size != WalSegSz)
|
||||
{
|
||||
pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %ld, skipping",
|
||||
dirent->d_name, uncompressed_size);
|
||||
continue;
|
||||
}
|
||||
#else
|
||||
pg_log_error("could not check file \"%s\"",
|
||||
dirent->d_name);
|
||||
pg_log_error("this build does not support compression with %s",
|
||||
"LZ4");
|
||||
exit(1);
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Looks like a valid segment. Remember that we saw it. */
|
||||
if ((segno > high_segno) ||
|
||||
@ -650,6 +788,8 @@ main(int argc, char **argv)
|
||||
case 6:
|
||||
if (pg_strcasecmp(optarg, "gzip") == 0)
|
||||
compression_method = COMPRESSION_GZIP;
|
||||
else if (pg_strcasecmp(optarg, "lz4") == 0)
|
||||
compression_method = COMPRESSION_LZ4;
|
||||
else if (pg_strcasecmp(optarg, "none") == 0)
|
||||
compression_method = COMPRESSION_NONE;
|
||||
else
|
||||
@ -746,6 +886,22 @@ main(int argc, char **argv)
|
||||
pg_log_error("this build does not support compression with %s",
|
||||
"gzip");
|
||||
exit(1);
|
||||
#endif
|
||||
break;
|
||||
case COMPRESSION_LZ4:
|
||||
#ifdef HAVE_LIBLZ4
|
||||
if (compresslevel != 0)
|
||||
{
|
||||
pg_log_error("cannot use --compress with --compression-method=%s",
|
||||
"lz4");
|
||||
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
|
||||
progname);
|
||||
exit(1);
|
||||
}
|
||||
#else
|
||||
pg_log_error("this build does not support compression with %s",
|
||||
"LZ4");
|
||||
exit(1);
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use strict;
|
||||
use warnings;
|
||||
use PostgreSQL::Test::Utils;
|
||||
use PostgreSQL::Test::Cluster;
|
||||
use Test::More tests => 37;
|
||||
use Test::More tests => 42;
|
||||
|
||||
program_help_ok('pg_receivewal');
|
||||
program_version_ok('pg_receivewal');
|
||||
@ -138,13 +138,69 @@ SKIP:
|
||||
"gzip verified the integrity of compressed WAL segments");
|
||||
}
|
||||
|
||||
# Check LZ4 compression if available
|
||||
SKIP:
|
||||
{
|
||||
skip "postgres was not built with LZ4 support", 5
|
||||
if (!check_pg_config("#define HAVE_LIBLZ4 1"));
|
||||
|
||||
# Generate more WAL including one completed, compressed segment.
|
||||
$primary->psql('postgres', 'SELECT pg_switch_wal();');
|
||||
$nextlsn =
|
||||
$primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
|
||||
chomp($nextlsn);
|
||||
$primary->psql('postgres', 'INSERT INTO test_table VALUES (3);');
|
||||
|
||||
# Stream up to the given position.
|
||||
$primary->command_ok(
|
||||
[
|
||||
'pg_receivewal', '-D',
|
||||
$stream_dir, '--verbose',
|
||||
'--endpos', $nextlsn,
|
||||
'--no-loop', '--compression-method',
|
||||
'lz4'
|
||||
],
|
||||
'streaming some WAL using --compression-method=lz4');
|
||||
|
||||
# Verify that the stored files are generated with their expected
|
||||
# names.
|
||||
my @lz4_wals = glob "$stream_dir/*.lz4";
|
||||
is(scalar(@lz4_wals), 1,
|
||||
"one WAL segment compressed with LZ4 was created");
|
||||
my @lz4_partial_wals = glob "$stream_dir/*.lz4.partial";
|
||||
is(scalar(@lz4_partial_wals),
|
||||
1, "one partial WAL segment compressed with LZ4 was created");
|
||||
|
||||
# Verify that the start streaming position is computed correctly by
|
||||
# comparing it with the partial file generated previously. The name
|
||||
# of the previous partial, now-completed WAL segment is updated, keeping
|
||||
# its base number.
|
||||
$partial_wals[0] =~ s/(\.gz)?\.partial$/.lz4/;
|
||||
is($lz4_wals[0] eq $partial_wals[0],
|
||||
1, "one partial WAL segment is now completed");
|
||||
# Update the list of partial wals with the current one.
|
||||
@partial_wals = @lz4_partial_wals;
|
||||
|
||||
# Check the integrity of the completed segment, if LZ4 is an available
|
||||
# command.
|
||||
my $lz4 = $ENV{LZ4};
|
||||
skip "program lz4 is not found in your system", 1
|
||||
if ( !defined $lz4
|
||||
|| $lz4 eq ''
|
||||
|| system_log($lz4, '--version') != 0);
|
||||
|
||||
my $lz4_is_valid = system_log($lz4, '-t', @lz4_wals);
|
||||
is($lz4_is_valid, 0,
|
||||
"lz4 verified the integrity of compressed WAL segments");
|
||||
}
|
||||
|
||||
# Verify that the start streaming position is computed and that the value is
|
||||
# correct regardless of whether ZLIB is available.
|
||||
# correct regardless of whether any compression is available.
|
||||
$primary->psql('postgres', 'SELECT pg_switch_wal();');
|
||||
$nextlsn =
|
||||
$primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
|
||||
chomp($nextlsn);
|
||||
$primary->psql('postgres', 'INSERT INTO test_table VALUES (3);');
|
||||
$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);');
|
||||
$primary->command_ok(
|
||||
[
|
||||
'pg_receivewal', '-D', $stream_dir, '--verbose',
|
||||
@ -152,7 +208,7 @@ $primary->command_ok(
|
||||
],
|
||||
"streaming some WAL");
|
||||
|
||||
$partial_wals[0] =~ s/(\.gz)?.partial//;
|
||||
$partial_wals[0] =~ s/(\.gz|\.lz4)?.partial//;
|
||||
ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
|
||||
|
||||
# Permissions on WAL files should be default
|
||||
@ -190,7 +246,7 @@ my $walfile_streamed = $primary->safe_psql(
|
||||
|
||||
# Switch to a new segment, to make sure that the segment retained by the
|
||||
# slot is still streamed. This may not be necessary, but play it safe.
|
||||
$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);');
|
||||
$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);');
|
||||
$primary->psql('postgres', 'SELECT pg_switch_wal();');
|
||||
$nextlsn =
|
||||
$primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
|
||||
@ -198,7 +254,7 @@ chomp($nextlsn);
|
||||
|
||||
# Add a bit more data to accelerate the end of the next pg_receivewal
|
||||
# commands.
|
||||
$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);');
|
||||
$primary->psql('postgres', 'INSERT INTO test_table VALUES (6);');
|
||||
|
||||
# Check case where the slot does not exist.
|
||||
$primary->command_fails_like(
|
||||
@ -253,13 +309,13 @@ $standby->promote;
|
||||
# on the new timeline.
|
||||
my $walfile_after_promotion = $standby->safe_psql('postgres',
|
||||
"SELECT pg_walfile_name(pg_current_wal_insert_lsn());");
|
||||
$standby->psql('postgres', 'INSERT INTO test_table VALUES (6);');
|
||||
$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);');
|
||||
$standby->psql('postgres', 'SELECT pg_switch_wal();');
|
||||
$nextlsn =
|
||||
$standby->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
|
||||
chomp($nextlsn);
|
||||
# This speeds up the operation.
|
||||
$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);');
|
||||
$standby->psql('postgres', 'INSERT INTO test_table VALUES (8);');
|
||||
|
||||
# Now try to resume from the slot after the promotion.
|
||||
my $timeline_dir = $primary->basedir . '/timeline_wal';
|
||||
|
@ -17,6 +17,10 @@
|
||||
#include <sys/stat.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#ifdef HAVE_LIBLZ4
|
||||
#include <lz4frame.h>
|
||||
#endif
|
||||
#ifdef HAVE_LIBZ
|
||||
#include <zlib.h>
|
||||
#endif
|
||||
@ -30,6 +34,9 @@
|
||||
/* Size of zlib buffer for .tar.gz */
|
||||
#define ZLIB_OUT_SIZE 4096
|
||||
|
||||
/* Size of LZ4 input chunk for .lz4 */
|
||||
#define LZ4_IN_SIZE 4096
|
||||
|
||||
/*-------------------------------------------------------------------------
|
||||
* WalDirectoryMethod - write wal to a directory looking like pg_wal
|
||||
*-------------------------------------------------------------------------
|
||||
@ -60,6 +67,11 @@ typedef struct DirectoryMethodFile
|
||||
#ifdef HAVE_LIBZ
|
||||
gzFile gzfp;
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
LZ4F_compressionContext_t ctx;
|
||||
size_t lz4bufsize;
|
||||
void *lz4buf;
|
||||
#endif
|
||||
} DirectoryMethodFile;
|
||||
|
||||
static const char *
|
||||
@ -76,7 +88,8 @@ dir_get_file_name(const char *pathname, const char *temp_suffix)
|
||||
|
||||
snprintf(filename, MAXPGPATH, "%s%s%s",
|
||||
pathname,
|
||||
dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : "",
|
||||
dir_data->compression_method == COMPRESSION_GZIP ? ".gz" :
|
||||
dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "",
|
||||
temp_suffix ? temp_suffix : "");
|
||||
|
||||
return filename;
|
||||
@ -92,6 +105,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
|
||||
#ifdef HAVE_LIBZ
|
||||
gzFile gzfp = NULL;
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
LZ4F_compressionContext_t ctx = NULL;
|
||||
size_t lz4bufsize = 0;
|
||||
void *lz4buf = NULL;
|
||||
#endif
|
||||
|
||||
filename = dir_get_file_name(pathname, temp_suffix);
|
||||
snprintf(tmppath, sizeof(tmppath), "%s/%s",
|
||||
@ -126,6 +144,50 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
|
||||
}
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
if (dir_data->compression_method == COMPRESSION_LZ4)
|
||||
{
|
||||
size_t ctx_out;
|
||||
size_t header_size;
|
||||
|
||||
ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
|
||||
if (LZ4F_isError(ctx_out))
|
||||
{
|
||||
close(fd);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
|
||||
lz4buf = pg_malloc0(lz4bufsize);
|
||||
|
||||
/* add the header */
|
||||
header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL);
|
||||
if (LZ4F_isError(header_size))
|
||||
{
|
||||
(void) LZ4F_freeCompressionContext(ctx);
|
||||
pg_free(lz4buf);
|
||||
close(fd);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
errno = 0;
|
||||
if (write(fd, lz4buf, header_size) != header_size)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
|
||||
(void) LZ4F_freeCompressionContext(ctx);
|
||||
pg_free(lz4buf);
|
||||
close(fd);
|
||||
|
||||
/*
|
||||
* If write didn't set errno, assume problem is no disk space.
|
||||
*/
|
||||
errno = save_errno ? save_errno : ENOSPC;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Do pre-padding on non-compressed files */
|
||||
if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
|
||||
@ -176,6 +238,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
|
||||
if (dir_data->compression_method == COMPRESSION_GZIP)
|
||||
gzclose(gzfp);
|
||||
else
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
if (dir_data->compression_method == COMPRESSION_LZ4)
|
||||
{
|
||||
(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
|
||||
(void) LZ4F_freeCompressionContext(ctx);
|
||||
pg_free(lz4buf);
|
||||
close(fd);
|
||||
}
|
||||
else
|
||||
#endif
|
||||
close(fd);
|
||||
return NULL;
|
||||
@ -187,6 +259,15 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
|
||||
if (dir_data->compression_method == COMPRESSION_GZIP)
|
||||
f->gzfp = gzfp;
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
if (dir_data->compression_method == COMPRESSION_LZ4)
|
||||
{
|
||||
f->ctx = ctx;
|
||||
f->lz4buf = lz4buf;
|
||||
f->lz4bufsize = lz4bufsize;
|
||||
}
|
||||
#endif
|
||||
|
||||
f->fd = fd;
|
||||
f->currpos = 0;
|
||||
f->pathname = pg_strdup(pathname);
|
||||
@ -209,6 +290,43 @@ dir_write(Walfile f, const void *buf, size_t count)
|
||||
if (dir_data->compression_method == COMPRESSION_GZIP)
|
||||
r = (ssize_t) gzwrite(df->gzfp, buf, count);
|
||||
else
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
if (dir_data->compression_method == COMPRESSION_LZ4)
|
||||
{
|
||||
size_t chunk;
|
||||
size_t remaining;
|
||||
const void *inbuf = buf;
|
||||
|
||||
remaining = count;
|
||||
while (remaining > 0)
|
||||
{
|
||||
size_t compressed;
|
||||
|
||||
if (remaining > LZ4_IN_SIZE)
|
||||
chunk = LZ4_IN_SIZE;
|
||||
else
|
||||
chunk = remaining;
|
||||
|
||||
remaining -= chunk;
|
||||
compressed = LZ4F_compressUpdate(df->ctx,
|
||||
df->lz4buf, df->lz4bufsize,
|
||||
inbuf, chunk,
|
||||
NULL);
|
||||
|
||||
if (LZ4F_isError(compressed))
|
||||
return -1;
|
||||
|
||||
if (write(df->fd, df->lz4buf, compressed) != compressed)
|
||||
return -1;
|
||||
|
||||
inbuf = ((char *) inbuf) + chunk;
|
||||
}
|
||||
|
||||
/* Our caller keeps track of the uncompressed size. */
|
||||
r = (ssize_t) count;
|
||||
}
|
||||
else
|
||||
#endif
|
||||
r = write(df->fd, buf, count);
|
||||
if (r > 0)
|
||||
@ -239,6 +357,25 @@ dir_close(Walfile f, WalCloseMethod method)
|
||||
if (dir_data->compression_method == COMPRESSION_GZIP)
|
||||
r = gzclose(df->gzfp);
|
||||
else
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
if (dir_data->compression_method == COMPRESSION_LZ4)
|
||||
{
|
||||
size_t compressed;
|
||||
|
||||
compressed = LZ4F_compressEnd(df->ctx,
|
||||
df->lz4buf, df->lz4bufsize,
|
||||
NULL);
|
||||
|
||||
if (LZ4F_isError(compressed))
|
||||
return -1;
|
||||
|
||||
if (write(df->fd, df->lz4buf, compressed) != compressed)
|
||||
return -1;
|
||||
|
||||
r = close(df->fd);
|
||||
}
|
||||
else
|
||||
#endif
|
||||
r = close(df->fd);
|
||||
|
||||
@ -293,6 +430,12 @@ dir_close(Walfile f, WalCloseMethod method)
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef HAVE_LIBLZ4
|
||||
pg_free(df->lz4buf);
|
||||
/* supports free on NULL */
|
||||
LZ4F_freeCompressionContext(df->ctx);
|
||||
#endif
|
||||
|
||||
pg_free(df->pathname);
|
||||
pg_free(df->fullpath);
|
||||
if (df->temp_suffix)
|
||||
@ -317,6 +460,21 @@ dir_sync(Walfile f)
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_LIBLZ4
|
||||
if (dir_data->compression_method == COMPRESSION_LZ4)
|
||||
{
|
||||
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
|
||||
size_t compressed;
|
||||
|
||||
/* Flush any internal buffers */
|
||||
compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
|
||||
if (LZ4F_isError(compressed))
|
||||
return -1;
|
||||
|
||||
if (write(df->fd, df->lz4buf, compressed) != compressed)
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
|
||||
return fsync(((DirectoryMethodFile *) f)->fd);
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ typedef enum
|
||||
typedef enum
|
||||
{
|
||||
COMPRESSION_GZIP,
|
||||
COMPRESSION_LZ4,
|
||||
COMPRESSION_NONE
|
||||
} WalCompressionMethod;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user