From 1bac5d02bbcc9c34dcb44b358bc27f3d204bb584 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Fri, 25 Dec 2020 00:34:01 -0600
Subject: [PATCH 05/20] Use cf* abstraction in archiver and tar

..rather than direct, conditional calls to gzopen/fopen.

See also: bf9aa490db24b2334b3595ee33653bf2fe39208c
---
 src/bin/pg_dump/compress_io.c        |  53 +++++++++++++
 src/bin/pg_dump/compress_io.h        |   5 ++
 src/bin/pg_dump/pg_backup_archiver.c | 109 +++++++++------------------
 src/bin/pg_dump/pg_backup_archiver.h |  16 ++--
 src/bin/pg_dump/pg_backup_tar.c      |  85 +++++----------------
 5 files changed, 117 insertions(+), 151 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 21957d68f3..d66d6f60f5 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -540,6 +540,58 @@ cfopen(const char *path, const char *mode, Compress *compression)
 	}
 }
 
+/*
+ * Open a file descriptor, with specified compression.
+ * Returns an opaque cfp object.
+ */
+cfp *
+cfdopen(int fd, const char *mode, Compress *compression)
+{
+	cfp		   *fp = pg_malloc0(sizeof(cfp));
+
+	switch (compression->alg)
+	{
+#ifdef HAVE_LIBZ
+	case COMPR_ALG_LIBZ:
+		if (compression->level != Z_DEFAULT_COMPRESSION)
+		{
+			/* user has specified a compression level, so tell zlib to use it */
+			char		mode_compression[32];
+
+			snprintf(mode_compression, sizeof(mode_compression), "%s%d",
+					 mode, compression->level);
+			fp->compressedfp = gzdopen(fd, mode_compression);
+		}
+		else
+		{
+			/* don't specify a level, just use the zlib default */
+			fp->compressedfp = gzdopen(fd, mode);
+		}
+
+		if (fp->compressedfp == NULL)
+		{
+			free_keep_errno(fp);
+			fp = NULL;
+		}
+		return fp;
+#endif
+
+	case COMPR_ALG_NONE:
+		fp->uncompressedfp = fdopen(fd, mode);
+		if (fp->uncompressedfp == NULL)
+		{
+			free_keep_errno(fp);
+			fp = NULL;
+		}
+		else
+			setvbuf(fp->uncompressedfp, NULL, _IONBF, 0);
+		return fp;
+
+	default:
+		/* Should not happen */
+		fatal("requested compression not available in this installation");
+	}
+}
 
 int
 cfread(void *ptr, int size, cfp *fp)
@@ -616,6 +668,7 @@ cfgets(cfp *fp, char *buf, int len)
 	return fgets(buf, len, fp->uncompressedfp);
 }
 
+/* Close the given compressed or uncompressed stream; return 0 on success. */
 int
 cfclose(cfp *fp)
 {
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index fb9d659acc..318a6b5340 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -21,6 +21,10 @@
 #define ZLIB_OUT_SIZE	4096
 #define ZLIB_IN_SIZE	4096
 
+/* Forward declaration */
+struct ArchiveHandle;
+typedef struct _archiveHandle ArchiveHandle;
+
 /* Prototype for callback function to WriteDataToArchive() */
 typedef void (*WriteFunc) (ArchiveHandle *AH, const char *buf, size_t len);
 
@@ -58,6 +62,7 @@ extern const struct compressLibs compresslibs[];
 typedef struct cfp cfp;
 
 extern cfp *cfopen(const char *path, const char *mode, Compress *compression);
+extern cfp *cfdopen(int fd, const char *mode, Compress *compression);
 extern cfp *cfopen_read(const char *path, const char *mode, Compress *compression);
 extern cfp *cfopen_write(const char *path, const char *mode, Compress *compression);
 extern int	cfread(void *ptr, int size, cfp *fp);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 3eb6c55600..bd06fbb787 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -39,17 +39,11 @@
 #include "pg_backup_archiver.h"
 #include "pg_backup_db.h"
 #include "pg_backup_utils.h"
+#include "compress_io.h"
 
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
-/* state needed to save/restore an archive's output target */
-typedef struct _outputContext
-{
-	void	   *OF;
-	int			gzOut;
-} OutputContext;
-
 /*
  * State for tracking TocEntrys that are ready to process during a parallel
  * restore.  (This used to be a list, and we still call it that, though now
@@ -99,8 +93,8 @@ static int	RestoringToDB(ArchiveHandle *AH);
 static void dump_lo_buf(ArchiveHandle *AH);
 static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
 static void SetOutput(ArchiveHandle *AH, const char *filename, Compress *compress);
-static OutputContext SaveOutput(ArchiveHandle *AH);
-static void RestoreOutput(ArchiveHandle *AH, OutputContext savedContext);
+static cfp *SaveOutput(ArchiveHandle *AH);
+static void RestoreOutput(ArchiveHandle *AH, cfp *fp);
 
 static int	restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel);
 static void restore_toc_entries_prefork(ArchiveHandle *AH,
@@ -270,10 +264,8 @@ CloseArchive(Archive *AHX)
 	AH->ClosePtr(AH);
 
 	/* Close the output */
-	if (AH->gzOut)
-		res = GZCLOSE(AH->OF);
-	else if (AH->OF != stdout)
-		res = fclose(AH->OF);
+	if ((FILE *)AH->OF != stdout)
+		res = cfclose(AH->OF);
 
 	if (res != 0)
 		fatal("could not close output file: %m");
@@ -355,7 +347,7 @@ RestoreArchive(Archive *AHX)
 	RestoreOptions *ropt = AH->public.ropt;
 	bool		parallel_mode;
 	TocEntry   *te;
-	OutputContext sav;
+	cfp			*sav;
 
 	AH->stage = STAGE_INITIALIZING;
 
@@ -1120,7 +1112,7 @@ PrintTOCSummary(Archive *AHX)
 	RestoreOptions *ropt = AH->public.ropt;
 	TocEntry   *te;
 	teSection	curSection;
-	OutputContext sav;
+	cfp			*sav;
 	const char *fmtName;
 	char		stamp_str[64];
 
@@ -1492,6 +1484,7 @@ archprintf(Archive *AH, const char *fmt,...)
 static void
 SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression)
 {
+	char		fmode[14];
 	int			fn;
 
 	if (filename)
@@ -1511,38 +1504,22 @@ SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression)
 	else
 		fn = fileno(stdout);
 
-	/* If compression explicitly requested, use gzopen */
-#ifdef HAVE_LIBZ
-	if (compression->alg != COMPR_ALG_NONE)
+	if (fn >= 0)
 	{
-		char		fmode[14];
+		/* Handle output to stdout */
+		sprintf(fmode, "%sb%d",
+			AH->mode == archModeAppend ? PG_BINARY_A : PG_BINARY_W,
+			compression->level);
 
-		/* Don't use PG_BINARY_x since this is zlib */
-		sprintf(fmode, "wb%d", compression->level);
-		if (fn >= 0)
-			AH->OF = gzdopen(dup(fn), fmode);
-		else
-			AH->OF = gzopen(filename, fmode);
-		AH->gzOut = 1;
+		AH->OF = cfdopen(dup(fn), fmode, compression);
 	}
 	else
-#endif
-	{							/* Use fopen */
-		if (AH->mode == archModeAppend)
-		{
-			if (fn >= 0)
-				AH->OF = fdopen(dup(fn), PG_BINARY_A);
-			else
-				AH->OF = fopen(filename, PG_BINARY_A);
-		}
-		else
-		{
-			if (fn >= 0)
-				AH->OF = fdopen(dup(fn), PG_BINARY_W);
-			else
-				AH->OF = fopen(filename, PG_BINARY_W);
-		}
-		AH->gzOut = 0;
+	{
+		Assert(filename != NULL);
+		sprintf(fmode, "%cb%d",
+			AH->mode == archModeAppend ? 'a' : 'w',
+			compression->level);
+		AH->OF = cfopen(filename, fmode, compression);
 	}
 
 	if (!AH->OF)
@@ -1554,32 +1531,22 @@ SetOutput(ArchiveHandle *AH, const char *filename, Compress *compression)
 	}
 }
 
-static OutputContext
+/* Return a pointer to the old FP */
+static cfp *
 SaveOutput(ArchiveHandle *AH)
 {
-	OutputContext sav;
-
-	sav.OF = AH->OF;
-	sav.gzOut = AH->gzOut;
-
-	return sav;
+	return AH->OF;
 }
 
 static void
-RestoreOutput(ArchiveHandle *AH, OutputContext savedContext)
+RestoreOutput(ArchiveHandle *AH, cfp *savedContext)
 {
 	int			res;
-
-	if (AH->gzOut)
-		res = GZCLOSE(AH->OF);
-	else
-		res = fclose(AH->OF);
-
+	res = cfclose(AH->OF);
 	if (res != 0)
 		fatal("could not close output file: %m");
 
-	AH->gzOut = savedContext.gzOut;
-	AH->OF = savedContext.OF;
+	AH->OF = savedContext;
 }
 
 
@@ -1703,22 +1670,14 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
 
 		bytes_written = size * nmemb;
 	}
-	else if (AH->gzOut)
-		bytes_written = GZWRITE(ptr, size, nmemb, AH->OF);
 	else if (AH->CustomOutPtr)
 		bytes_written = AH->CustomOutPtr(AH, ptr, size * nmemb);
-
+	else if (RestoringToDB(AH))
+		 /* If we're doing a restore, and it's direct to DB, and we're
+		  * connected then send it to the DB. */
+		bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
 	else
-	{
-		/*
-		 * If we're doing a restore, and it's direct to DB, and we're
-		 * connected then send it to the DB.
-		 */
-		if (RestoringToDB(AH))
-			bytes_written = ExecuteSqlCommandBuf(&AH->public, (const char *) ptr, size * nmemb);
-		else
-			bytes_written = fwrite(ptr, size, nmemb, AH->OF) * size;
-	}
+		bytes_written = cfwrite(ptr, size * nmemb, AH->OF);
 
 	if (bytes_written != size * nmemb)
 		WRITE_ERROR_EXIT;
@@ -2127,6 +2086,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 		fh = stdin;
 		if (!fh)
 			fatal("could not open input file: %m");
+		setvbuf(fh, NULL, _IONBF, 0);
 	}
 
 	if ((cnt = fread(sig, 1, 5, fh)) != 5)
@@ -2266,6 +2226,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 		 SetupWorkerPtrType setupWorkerPtr)
 {
 	ArchiveHandle *AH;
+	Compress nocompression = {0};
 
 	pg_log_debug("allocating AH for %s, format %d",
 				 FileSpec ? FileSpec : "(stdio)", fmt);
@@ -2319,8 +2280,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
 
 	/* Open stdout with no compression for AH output handle */
-	AH->gzOut = 0;
-	AH->OF = stdout;
+	AH->OF = cfdopen(fileno(stdout), "w", &nocompression);
+	// AH->OF = cfdopen(STDOUT_FILENO, "w", compression); // XXX
 
 	/*
 	 * On Windows, we need to use binary mode to read/write non-text files,
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 6e033d040e..9f511b49b9 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -30,6 +30,9 @@
 #include "pg_backup.h"
 #include "pqexpbuffer.h"
 
+/* Forward declaration XXX: CIRCULAR */
+typedef struct cfp cfp;
+
 #define LOBBUFSIZE 16384
 
 /*
@@ -38,19 +41,11 @@
  */
 #ifdef HAVE_LIBZ
 #include <zlib.h>
-#define GZCLOSE(fh) gzclose(fh)
-#define GZWRITE(p, s, n, fh) gzwrite(fh, p, (n) * (s))
-#define GZREAD(p, s, n, fh) gzread(fh, p, (n) * (s))
-#define GZEOF(fh)	gzeof(fh)
 #else
-#define GZCLOSE(fh) fclose(fh)
-#define GZWRITE(p, s, n, fh) (fwrite(p, s, n, fh) * (s))
-#define GZREAD(p, s, n, fh) fread(p, s, n, fh)
-#define GZEOF(fh)	feof(fh)
+
 /* this is just the redefinition of a libz constant, in case zlib isn't
  * available */
 #define Z_DEFAULT_COMPRESSION (-1)
-
 typedef struct _z_stream
 {
 	void	   *next_in;
@@ -318,8 +313,7 @@ struct _archiveHandle
 
 	char	   *fSpec;			/* Archive File Spec */
 	FILE	   *FH;				/* General purpose file handle */
-	void	   *OF;
-	int			gzOut;			/* Output file */
+	cfp	   *OF;				/* Output file (compressed or not) */
 
 	struct _tocEntry *toc;		/* Header of circular list of TOC entries */
 	int			tocCount;		/* Number of TOC entries */
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index 4ba79ab924..16f4e0792a 100644
--- a/src/bin/pg_dump/pg_backup_tar.c
+++ b/src/bin/pg_dump/pg_backup_tar.c
@@ -66,12 +66,7 @@ static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
 
 typedef struct
 {
-#ifdef HAVE_LIBZ
-	gzFile		zFH;
-#else
-	FILE	   *zFH;
-#endif
-	FILE	   *nFH;
+	cfp			*FH;
 	FILE	   *tarFH;
 	FILE	   *tmpFH;
 	char	   *targetFile;
@@ -191,7 +186,7 @@ InitArchiveFmt_Tar(ArchiveHandle *AH)
 		 * Make unbuffered since we will dup() it, and the buffers screw each
 		 * other
 		 */
-		/* setvbuf(ctx->tarFH, NULL, _IONBF, 0); */
+		// setvbuf(ctx->tarFH, NULL, _IONBF, 0);
 
 		ctx->hasSeek = checkSeek(ctx->tarFH);
 
@@ -223,7 +218,7 @@ InitArchiveFmt_Tar(ArchiveHandle *AH)
 		 * Make unbuffered since we will dup() it, and the buffers screw each
 		 * other
 		 */
-		/* setvbuf(ctx->tarFH, NULL, _IONBF, 0); */
+		setvbuf(ctx->tarFH, NULL, _IONBF, 0);
 
 		ctx->tarFHpos = 0;
 
@@ -321,10 +316,6 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
 	lclContext *ctx = (lclContext *) AH->formatData;
 	TAR_MEMBER *tm;
 
-#ifdef HAVE_LIBZ
-	char		fmode[14];
-#endif
-
 	if (mode == 'r')
 	{
 		tm = _tarPositionTo(AH, filename);
@@ -345,16 +336,10 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
 			}
 		}
 
-#ifdef HAVE_LIBZ
-
 		if (AH->compression.alg == COMPR_ALG_NONE)
-			tm->nFH = ctx->tarFH;
+			tm->FH = cfdopen(dup(fileno(ctx->tarFH)), "rb", &AH->compression);
 		else
 			fatal("compression is not supported by tar archive format");
-		/* tm->zFH = gzdopen(dup(fileno(ctx->tarFH)), "rb"); */
-#else
-		tm->nFH = ctx->tarFH;
-#endif
 	}
 	else
 	{
@@ -406,21 +391,11 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
 
 		umask(old_umask);
 
-#ifdef HAVE_LIBZ
-
-		if (AH->compression.alg != COMPR_ALG_NONE)
-		{
-			sprintf(fmode, "wb%d", AH->compression.level);
-			tm->zFH = gzdopen(dup(fileno(tm->tmpFH)), fmode);
-			if (tm->zFH == NULL)
-				fatal("could not open temporary file");
-		}
-		else
-			tm->nFH = tm->tmpFH;
-#else
-
-		tm->nFH = tm->tmpFH;
-#endif
+		tm->FH = cfdopen(dup(fileno(tm->tmpFH)),
+				mode == 'r' ? "r" : "w",
+				&AH->compression);
+		if (tm->FH == NULL)
+			fatal("could not open temporary file");
 
 		tm->AH = AH;
 		tm->targetFile = pg_strdup(filename);
@@ -435,12 +410,14 @@ tarOpen(ArchiveHandle *AH, const char *filename, char mode)
 static void
 tarClose(ArchiveHandle *AH, TAR_MEMBER *th)
 {
+	int	res;
+
 	/*
 	 * Close the GZ file since we dup'd. This will flush the buffers.
 	 */
-	if (AH->compression.alg != COMPR_ALG_NONE)
-		if (GZCLOSE(th->zFH) != 0)
-			fatal("could not close tar member");
+	res = cfclose(th->FH);
+	if (res != 0)
+		fatal("could not close tar member");
 
 	if (th->mode == 'w')
 		_tarAddFile(AH, th);	/* This will close the temp file */
@@ -453,8 +430,7 @@ tarClose(ArchiveHandle *AH, TAR_MEMBER *th)
 	if (th->targetFile)
 		free(th->targetFile);
 
-	th->nFH = NULL;
-	th->zFH = NULL;
+	th->FH = NULL;
 }
 
 #ifdef __NOT_USED__
@@ -540,29 +516,9 @@ _tarReadRaw(ArchiveHandle *AH, void *buf, size_t len, TAR_MEMBER *th, FILE *fh)
 		}
 		else if (th)
 		{
-			if (th->zFH)
-			{
-				res = GZREAD(&((char *) buf)[used], 1, len, th->zFH);
-				if (res != len && !GZEOF(th->zFH))
-				{
-#ifdef HAVE_LIBZ
-					int			errnum;
-					const char *errmsg = gzerror(th->zFH, &errnum);
-
-					fatal("could not read from input file: %s",
-						  errnum == Z_ERRNO ? strerror(errno) : errmsg);
-#else
-					fatal("could not read from input file: %s",
-						  strerror(errno));
-#endif
-				}
-			}
-			else
-			{
-				res = fread(&((char *) buf)[used], 1, len, th->nFH);
-				if (res != len && !feof(th->nFH))
-					READ_ERROR_EXIT(th->nFH);
-			}
+			res = cfread(&((char *) buf)[used], len, th->FH);
+			if (res != len && !cfeof(th->FH))
+				fatal("could not read from input file: %m");
 		}
 	}
 
@@ -594,10 +550,7 @@ tarWrite(const void *buf, size_t len, TAR_MEMBER *th)
 {
 	size_t		res;
 
-	if (th->zFH != NULL)
-		res = GZWRITE(buf, 1, len, th->zFH);
-	else
-		res = fwrite(buf, 1, len, th->nFH);
+	res = cfwrite(buf, len, th->FH);
 
 	th->pos += res;
 	return res;
-- 
2.17.0

