From dc145ccbe47146bdf9dc86910d3135e7e64873ea Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Sun, 14 Mar 2021 17:12:07 -0500
Subject: [PATCH 4/6] Use GUC hooks to support compression 'level'..

..which is useful for zstd and zlib, but less so for lz4.
---
 src/backend/access/transam/xlog.c       |  20 ++++
 src/backend/access/transam/xloginsert.c |   7 +-
 src/backend/access/transam/xlogreader.c | 129 ++++++++++++++++++++++++
 src/backend/utils/misc/guc.c            |  39 ++-----
 src/include/access/xlog.h               |  11 ++
 src/include/access/xlogreader.h         |   2 +
 6 files changed, 175 insertions(+), 33 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9aa4d3eaea..198e6906ee 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -121,6 +121,8 @@ bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
 bool		wal_log_hints = false;
 int			wal_compression = WAL_COMPRESSION_ZSTD;
+char		*wal_compression_string = ""; /* Overwritten by GUC */
+int			wal_compression_level = 1;
 char	   *wal_consistency_checking_string = NULL;
 bool	   *wal_consistency_checking = NULL;
 bool		wal_init_zero = true;
@@ -7923,6 +7925,24 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
 	}
 }
 
+bool
+check_wal_compression(char **newval, void **extra, GucSource source)
+{
+	int tmp;
+	if (get_compression_level(*newval, &tmp) != -1)
+		return true;
+
+	return false;
+}
+
+/* Parse the GUC into integers for wal_compression and wal_compression_level */
+void
+assign_wal_compression(const char *newval, void *extra)
+{
+	wal_compression = get_compression_level(newval, &wal_compression_level);
+	Assert(wal_compression >= 0);
+}
+
 
 /*
  * Issue appropriate kind of fsync (if any) for an XLOG output file.
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 5f9d07156a..0e0cfc199c 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -887,8 +887,8 @@ XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
 
 		case WAL_COMPRESSION_LZ4:
 #ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
+			len = LZ4_compress_fast(source, dest, orig_len,
+									   COMPRESS_BUFSIZE, wal_compression_level);
 			if (len <= 0)
 				len = -1;		/* failure */
 #else
@@ -898,9 +898,8 @@ XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
 
 		case WAL_COMPRESSION_ZSTD:
 #ifdef USE_ZSTD
-			/* Uses level=1, not ZSTD_CLEVEL_DEFAULT */
 			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								1);
+								wal_compression_level);
 			if (ZSTD_isError(len))
 				len = -1;
 #else
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index d2d882d0d1..c548898eb0 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -18,6 +18,8 @@
 #include "postgres.h"
 
 #include <unistd.h>
+#include <limits.h>
+
 #ifdef USE_LZ4
 #include <lz4.h>
 #endif
@@ -53,6 +55,36 @@ static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
 
+static const struct {
+	char *name;
+	enum WalCompression compress_id; /* The internal ID */
+	bool has_level; /* If it accepts a numeric "level" */
+	int min_level, dfl_level, max_level;
+} wal_compression_options[] = {
+	{"pglz", WAL_COMPRESSION_PGLZ, false},
+
+#ifdef USE_LZ4
+	{"lz4", WAL_COMPRESSION_LZ4, false}, // XXX
+#endif
+
+#ifdef USE_ZSTD
+	/* XXX: the minimum level depends on the version */
+	/* Must be first */
+	{"zstd-fast", WAL_COMPRESSION_ZSTD, true, -7, -1, 0},
+
+	{"zstd", WAL_COMPRESSION_ZSTD, true, -7, 1, 22},
+#endif
+
+	{"on", WAL_COMPRESSION_PGLZ, false},
+	{"off", WAL_COMPRESSION_NONE, false},
+	{"true", WAL_COMPRESSION_PGLZ, false},
+	{"false", WAL_COMPRESSION_NONE, false},
+	{"yes", WAL_COMPRESSION_PGLZ, false},
+	{"no", WAL_COMPRESSION_NONE, false},
+	{"1", WAL_COMPRESSION_PGLZ, false},
+	{"0", WAL_COMPRESSION_NONE, false},
+};
+
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
 
@@ -1578,6 +1610,103 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
 	}
 }
 
+/*
+ * Return the wal compression ID, or -1 if the input is
+ * invalid/unrecognized/unsupported.
+ * The compression level is stored in *level.
+ */
+int
+get_compression_level(const char *in, int *level)
+{
+	for (int idx=0; idx < lengthof(wal_compression_options); ++idx)
+	{
+		int len;
+		long tmp;
+		char *end;
+
+		if (strcmp(in, wal_compression_options[idx].name) == 0)
+		{
+			/* it has no -level suffix */
+			*level = wal_compression_options[idx].dfl_level;
+			return wal_compression_options[idx].compress_id;
+		}
+
+		len = strlen(wal_compression_options[idx].name);
+		if (strncmp(in, wal_compression_options[idx].name, len) != 0)
+			continue;
+		if (in[len] != '-')
+			continue;
+
+		/* it has a -level suffix, but level is not allowed */
+		if (!wal_compression_options[idx].has_level)
+		{
+#ifndef FRONTEND
+			GUC_check_errdetail("Compression method does not accept a compression level");
+#endif
+			return -1;
+		}
+
+		in += len + 1;
+		len = strlen(in);
+		errno = 0;
+		/* pg_strtoint16 throws an error, which we don't want */
+		/* option_parse_int is frontend only */
+		tmp = strtol(in, &end, 0);
+		if (end != in+len || end == in ||
+				(errno != 0 && tmp == 0) ||
+				(errno == ERANGE && (tmp == LONG_MIN || tmp == LONG_MAX)))
+		{
+#ifndef FRONTEND
+			GUC_check_errdetail("Could not parse compression level: %s", in);
+#endif
+			return -1;
+		}
+
+		/*
+		 * For convenience, allow specification of zstd-fast-N, which is
+		 * interpretted as a negative compression level.
+		 */
+		if (strncmp(wal_compression_options[idx].name, "zstd-fast", 9) == 0 &&
+			tmp > 0)
+			tmp = -tmp;
+
+		if (tmp < wal_compression_options[idx].min_level ||
+				tmp > wal_compression_options[idx].max_level)
+		{
+#ifndef FRONTEND
+			GUC_check_errdetail("Compression level is outside of allowed range: %d...%d",
+					wal_compression_options[idx].min_level,
+					wal_compression_options[idx].max_level);
+#endif
+			return -1;
+		}
+
+		*level = tmp;
+		return wal_compression_options[idx].compress_id;
+	}
+
+#ifndef FRONTEND
+	// XXX: this is trying to distinguish between invalid an unsupported algorithms?
+	if (strcmp(in, "zlib") == 0 && false)
+		GUC_check_errdetail("Compression method is not supported by this build.");
+	else {
+		StringInfoData all_methods;
+
+		initStringInfo(&all_methods);
+		for (int idx=0; idx < lengthof(wal_compression_options); ++idx)
+		{
+			if (idx > 0)
+				appendStringInfoString(&all_methods, ", ");
+			appendStringInfoString(&all_methods, wal_compression_options[idx].name);
+		}
+
+		GUC_check_errdetail("Supported compression methods are: %s", all_methods.data);
+		pfree(all_methods.data);
+	}
+#endif
+	return -1;
+}
+
 /*
  * Restore a full-page image from a backup block attached to an XLOG record.
  *
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 96854b6a2b..9d3fb67fad 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -546,25 +546,6 @@ static struct config_enum_entry default_toast_compression_options[] = {
 	{NULL, 0, false}
 };
 
-static const struct config_enum_entry wal_compression_options[] = {
-	{"pglz", WAL_COMPRESSION_PGLZ, false},
-#ifdef USE_LZ4
-	{"lz4", WAL_COMPRESSION_LZ4, false},
-#endif
-#ifdef USE_ZSTD
-	{"zstd", WAL_COMPRESSION_ZSTD, false},
-#endif
-	{"on", WAL_COMPRESSION_PGLZ, false},
-	{"off", WAL_COMPRESSION_NONE, false},
-	{"true", WAL_COMPRESSION_PGLZ, true},
-	{"false", WAL_COMPRESSION_NONE, true},
-	{"yes", WAL_COMPRESSION_PGLZ, true},
-	{"no", WAL_COMPRESSION_NONE, true},
-	{"1", WAL_COMPRESSION_PGLZ, true},
-	{"0", WAL_COMPRESSION_NONE, true},
-	{NULL, 0, false}
-};
-
 /*
  * Options for enum values stored in other modules
  */
@@ -4643,6 +4624,16 @@ static struct config_string ConfigureNamesString[] =
 		check_wal_consistency_checking, assign_wal_consistency_checking, NULL
 	},
 
+	{
+		{"wal_compression", PGC_SUSET, WAL_SETTINGS,
+			gettext_noop("Compresses full-page writes written in WAL file with specified method."),
+			NULL
+		},
+		&wal_compression_string,
+		"zstd-6",
+		check_wal_compression, assign_wal_compression, NULL
+	},
+
 	{
 		{"jit_provider", PGC_POSTMASTER, CLIENT_CONN_PRELOAD,
 			gettext_noop("JIT provider to use."),
@@ -4894,16 +4885,6 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
-	{
-		{"wal_compression", PGC_SUSET, WAL_SETTINGS,
-			gettext_noop("Compresses full-page writes written in WAL file with specified method."),
-			NULL
-		},
-		&wal_compression,
-		WAL_COMPRESSION_ZSTD, wal_compression_options,
-		NULL, NULL, NULL
-	},
-
 	{
 		{"wal_level", PGC_POSTMASTER, WAL_SETTINGS,
 			gettext_noop("Sets the level of information written to the WAL."),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 09f6464331..521aac2ce4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -16,6 +16,8 @@
 #include "datatype/timestamp.h"
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
+#include "storage/fd.h"
+#include "utils/guc.h"
 
 
 /* Sync methods */
@@ -51,6 +53,10 @@ extern char *wal_consistency_checking_string;
 extern bool log_checkpoints;
 extern bool track_wal_io_timing;
 
+extern char *wal_compression_string;
+extern int wal_compression;
+extern int wal_compression_level;
+
 extern int	CheckPointSegments;
 
 /* Archive modes */
@@ -248,6 +254,11 @@ extern void SetWalWriterSleeping(bool sleeping);
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
 
+/* GUC */
+extern bool check_wal_compression(char **newval, void **extra, GucSource source);
+extern void assign_wal_compression(const char *newval, void *extra);
+
+
 /*
  * Routines used by xlogrecovery.c to call back into xlog.c during recovery.
  */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 477f0efe26..dd6c117ef7 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -337,4 +337,6 @@ extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
 							   RelFileNode *rnode, ForkNumber *forknum,
 							   BlockNumber *blknum);
 
+extern int get_compression_level(const char *in, int *level);
+
 #endif							/* XLOGREADER_H */
-- 
2.17.1

