From 9c4fc70af1bba238b639639a55df8f26d44e36fe Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Sun, 26 Feb 2023 11:41:47 -0600
Subject: [PATCH 4/4] WIP: +change to use LZ4 frame API

This uses buffering to avoid flushing and writing a block header for
each row, for an additional improvement in compressed size.

XXX: update archive version since this changes the meaning of "lz4" in
the header
---
 src/bin/pg_dump/compress_lz4.c | 102 +++++++++++++++++++++++++--------
 1 file changed, 79 insertions(+), 23 deletions(-)

diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index 32a8f668907..88a819c553b 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -40,7 +40,7 @@ typedef struct LZ4CompressorState
 {
 	char	   *outbuf;
 	size_t		outsize;
-	LZ4_stream_t *stream;
+	LZ4F_compressionContext_t ctx;
 } LZ4CompressorState;
 
 /* Private routines that support LZ4 compressed data I/O */
@@ -52,29 +52,59 @@ static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
 static void
 ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
-	LZ4_streamDecode_t lz4StreamDecode;
+	size_t		res;
 	char	   *buf;
 	char	   *decbuf;
 	size_t		buflen;
+	size_t		decbuflen;
 	size_t		cnt;
 
+	LZ4F_decompressOptions_t opts = {.stableDst = 0}; /* not stable */
+	LZ4F_decompressionContext_t dtx;
+
+	res = LZ4F_createDecompressionContext(&dtx, LZ4F_VERSION);
+	if (LZ4F_isError(res))
+		pg_fatal("failed to LZ4F_createDecompressionContext: %s",
+				LZ4F_getErrorName(res));
+
 	buflen = LZ4_IN_SIZE;
 	buf = pg_malloc(buflen);
-	decbuf = pg_malloc(buflen);
 
-	LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
+	decbuflen = LZ4_IN_SIZE;
+	decbuf = pg_malloc(LZ4_IN_SIZE);
 
 	while ((cnt = cs->readF(AH, &buf, &buflen)))
 	{
-		int			decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
-															buf, decbuf,
-															cnt, buflen);
+		char	*end = buf + cnt;
 
-		ahwrite(decbuf, 1, decBytes, AH);
+		for (char *ptr = buf; ptr != end;)
+		{
+			size_t			decBytes = decbuflen;
+			size_t			srcBytes = cnt;
+
+			res = LZ4F_decompress(dtx,
+					decbuf, &decBytes,
+					ptr, &srcBytes, &opts);
+			if (LZ4F_isError(res))
+				pg_fatal("failed to LZ4F_decompress: %s", LZ4F_getErrorName(res));
+
+			ptr += srcBytes;
+			cnt -= srcBytes;
+			if (decBytes > 0)
+				ahwrite(decbuf, 1, decBytes, AH);
+
+			if (decbuflen < res)
+			{
+				/* resize the buffer to the expected size */
+				decbuf = pg_realloc(decbuf, res);
+				decbuflen = res;
+			}
+		}
 	}
 
 	pg_free(buf);
 	pg_free(decbuf);
+	LZ4F_freeDecompressionContext(dtx);
 }
 
 static void
@@ -82,40 +112,66 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
 					  const void *data, size_t dLen)
 {
 	LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data;
-	size_t		compressed;
-	size_t		requiredsize = LZ4_compressBound(dLen);
+	size_t		requiredsize = LZ4F_compressBound(dLen, NULL);
+	size_t		res;
 
+	char *oldout = LZ4cs->outbuf;
 	if (requiredsize > LZ4cs->outsize)
 	{
 		LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
 		LZ4cs->outsize = requiredsize;
 	}
 
-	if (LZ4cs->stream == NULL)
-		LZ4cs->stream = LZ4_createStream();
-
-	compressed = LZ4_compress_fast_continue(LZ4cs->stream, data, LZ4cs->outbuf,
-			dLen, LZ4cs->outsize,
-			AH->compression_spec.level);
+	if (oldout == NULL)
+	{
+		LZ4F_preferences_t prefs = {
+			.compressionLevel = cs->compression_spec.level
+		};
+
+		res = LZ4F_createCompressionContext(&LZ4cs->ctx, LZ4F_VERSION);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_createCompressionContext: %s", LZ4F_getErrorName(res));
+
+		res = LZ4F_compressBegin(LZ4cs->ctx, LZ4cs->outbuf, LZ4cs->outsize, &prefs);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_compressBegin: %s", LZ4F_getErrorName(res));
+		cs->writeF(AH, LZ4cs->outbuf, res);
+	}
 
-	if (compressed <= 0)
-		pg_fatal("failed to LZ4 compress data");
+	res = LZ4F_compressUpdate(LZ4cs->ctx, LZ4cs->outbuf, LZ4cs->outsize,
+								 data, dLen, NULL);
+	if (LZ4F_isError(res))
+		pg_fatal("failed to LZ4F_compressUpdate: %s", LZ4F_getErrorName(res));
 
-	cs->writeF(AH, LZ4cs->outbuf, compressed);
+	if (res > 0)
+		cs->writeF(AH, LZ4cs->outbuf, res);
 }
 
 static void
 EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
 	LZ4CompressorState *LZ4cs;
+	size_t	res;
 
 	LZ4cs = (LZ4CompressorState *) cs->private_data;
 
-	if (LZ4cs->stream != NULL)
-		LZ4_freeStream(LZ4cs->stream);
+	if (LZ4cs->outbuf != NULL)
+	{
+		res = LZ4F_compressEnd(LZ4cs->ctx, LZ4cs->outbuf, LZ4cs->outsize, NULL);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_compressEnd: %s", LZ4F_getErrorName(res));
+
+		if (res > 0)
+			cs->writeF(AH, LZ4cs->outbuf, res);
 
-	pg_free(LZ4cs->outbuf);
-	pg_free(LZ4cs);
+		LZ4F_freeCompressionContext(LZ4cs->ctx);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_freeCompressionContext: %s", LZ4F_getErrorName(res));
+
+		pg_free(LZ4cs->outbuf);
+		LZ4cs->outbuf = NULL;
+		pg_free(LZ4cs);
+	}
 }
 
 
-- 
2.34.1

