From be73511ccaa67adfb09b1f93588970c3898664b0 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 4 Apr 2026 16:48:14 +1300
Subject: [PATCH 2/4] libarchive: Provide astreamer_libarchive.c.

This allows modern tar files (and potential other unrelated formats) to
be consumed from a file, with support for various compression
algorithms.

This astreamer is a unusual in that it produces data rather than having
data pushed into it with astreamer_content().  The wrapper
astreamer_pull() is used to signal that difference, though it just calls
astreamer_content() with NULL data.

(Proof-of-concept)
---
 src/fe_utils/Makefile               |   4 +
 src/fe_utils/astreamer_libarchive.c | 257 ++++++++++++++++++++++++++++
 src/fe_utils/meson.build            |   4 +
 src/include/fe_utils/astreamer.h    |  12 ++
 src/tools/pgindent/typedefs.list    |   1 +
 5 files changed, 278 insertions(+)
 create mode 100644 src/fe_utils/astreamer_libarchive.c

diff --git a/src/fe_utils/Makefile b/src/fe_utils/Makefile
index cbfbf93ac69..f6c88a73ee7 100644
--- a/src/fe_utils/Makefile
+++ b/src/fe_utils/Makefile
@@ -40,6 +40,10 @@ OBJS = \
 	string_utils.o \
 	version.o
 
+ifeq ($(with_libarchive), yes)
+OBJS += astreamer_libarchive.o
+endif
+
 ifeq ($(PORTNAME), win32)
 override CPPFLAGS += -DFD_SETSIZE=1024
 endif
diff --git a/src/fe_utils/astreamer_libarchive.c b/src/fe_utils/astreamer_libarchive.c
new file mode 100644
index 00000000000..d57853171f4
--- /dev/null
+++ b/src/fe_utils/astreamer_libarchive.c
@@ -0,0 +1,257 @@
+/*-------------------------------------------------------------------------
+ *
+ * astreamer_libarchive.c
+ *
+ * This module reads from archives using https://www.libarchive.org/.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/fe_utils/astreamer_libarchive.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <archive.h>
+#include <archive_entry.h>
+
+#include "common/logging.h"
+#include "fe_utils/astreamer.h"
+
+/* This is the data size we'll try to stream at once. */
+#define ASTREAMER_LIBARCHIVE_READER_BUFFER_SIZE (128 * 1024)
+
+typedef struct astreamer_libarchive_reader
+{
+	astreamer	base;
+	astreamer_member member;
+	struct archive *archive;
+	bool		end_of_file;
+	bool		end_of_archive;
+	char		data[ASTREAMER_LIBARCHIVE_READER_BUFFER_SIZE];
+} astreamer_libarchive_reader;
+
+static void astreamer_libarchive_reader_content(astreamer *streamer,
+												astreamer_member *member,
+												const char *data, int len,
+												astreamer_archive_context context);
+static void astreamer_libarchive_reader_finalize(astreamer *streamer);
+static void astreamer_libarchive_reader_free(astreamer *streamer);
+
+static const astreamer_ops astreamer_libarchive_reader_ops = {
+	.content = astreamer_libarchive_reader_content,
+	.finalize = astreamer_libarchive_reader_finalize,
+	.free = astreamer_libarchive_reader_free
+};
+
+/*
+ * Create an astreamer that decodes 'pathname' with libarchive and feeds its
+ * contents to 'next'.  This streamer is a source that must be the first in
+ * the chain, and content should be produced by calling
+ * astreamer_pull(streamer).
+ */
+astreamer *
+astreamer_libarchive_reader_new_pathname(astreamer *next,
+										 const char *pathname)
+{
+	astreamer_libarchive_reader *streamer;
+	int			r;
+
+	streamer = palloc0_object(astreamer_libarchive_reader);
+	*((const astreamer_ops **) &streamer->base.bbs_ops) =
+		&astreamer_libarchive_reader_ops;
+	streamer->base.bbs_next = next;
+
+	/* Prepare to read tar archives with any known compression filter. */
+	streamer->archive = archive_read_new();
+	if (streamer->archive == NULL)
+		pg_fatal("out of memory");
+	if (archive_read_support_format_tar(streamer->archive) != ARCHIVE_OK)
+		pg_fatal("libarchive: could not initialize tar format: %s",
+				 archive_error_string(streamer->archive));
+	if (archive_read_support_filter_all(streamer->archive) != ARCHIVE_OK)
+		pg_fatal("libarchive: could not initialize tar filter: %s",
+				 archive_error_string(streamer->archive));
+
+	/* Open file. */
+	r = archive_read_open_filename(streamer->archive,
+								   pathname,
+								   ASTREAMER_LIBARCHIVE_READER_BUFFER_SIZE);
+	if (r != ARCHIVE_OK)
+		pg_fatal("libarchive: could not open \"%s\": %s",
+				 pathname,
+				 archive_error_string(streamer->archive));
+
+	/* Start by wanting a new file. */
+	streamer->end_of_file = true;
+	streamer->end_of_archive = false;
+
+	return &streamer->base;
+}
+
+/* Fill in an astreamer member given a libarchive entry. */
+static void
+astreamer_libarchive_reader_fill_member(astreamer_member *member,
+										struct archive_entry *entry)
+{
+	strlcpy(member->pathname,
+			archive_entry_pathname(entry),
+			sizeof(member->pathname));
+	member->size = archive_entry_size(entry);
+	member->mode = archive_entry_mode(entry);
+	member->uid = archive_entry_uid(entry);
+	member->gid = archive_entry_gid(entry);
+	switch (archive_entry_filetype(entry))
+	{
+		case AE_IFREG:
+			member->is_regular = true;
+			break;
+		case AE_IFDIR:
+			member->is_directory = true;
+			break;
+		case AE_IFLNK:
+			member->is_symlink = true;
+			strlcpy(member->linktarget,
+					archive_entry_symlink(entry),
+					sizeof(member->linktarget));
+			break;
+		default:
+			break;
+	}
+}
+
+static void
+astreamer_libarchive_reader_content(astreamer *streamer,
+									astreamer_member *member,
+									const char *data_ignored,
+									int len_ignored,
+									astreamer_archive_context context)
+{
+	astreamer_libarchive_reader *mystreamer;
+	ssize_t		size;
+
+	/*
+	 * This should be reached by calling astreamer_pull().
+	 *
+	 * If libarchive had a non-blocking or push API (cf discussion in
+	 * libarchive issue #1268), then we could push raw data in here, like
+	 * astreamer_tar_parser.
+	 *
+	 * Given only a blocking interface, we have to ask it to pull data into
+	 * our astreamer pipeline.  The amount it reads at once is bounded by
+	 * ASTREAMER_LIBARCHIVE_READER_BUFFER_SIZE, and we'll return control after
+	 * emitting just one data chunk that so that the caller has the chance to
+	 * give up early.
+	 */
+	Assert(member == NULL);
+	Assert(data_ignored == NULL);
+	Assert(len_ignored == 0);
+	Assert(context == ASTREAMER_UNKNOWN);
+
+	mystreamer = (astreamer_libarchive_reader *) streamer;
+
+	while (!mystreamer->end_of_archive)
+	{
+		/* Do we need a new file? */
+		if (mystreamer->end_of_file)
+		{
+			struct archive_entry *entry;
+
+			/* Start next file, or discover end of archive. */
+			switch (archive_read_next_header(mystreamer->archive, &entry))
+			{
+				case ARCHIVE_RETRY:
+					continue;
+				case ARCHIVE_FATAL:
+					pg_fatal("libarchive: %s",
+							 archive_error_string(mystreamer->archive));
+					break;
+				case ARCHIVE_WARN:
+					pg_log_warning("libarchive: %s",
+								   archive_error_string(mystreamer->archive));
+					pg_fallthrough;
+				case ARCHIVE_OK:
+					/* Send file header, then fall through to send one chunk. */
+					mystreamer->end_of_file = false;
+					astreamer_libarchive_reader_fill_member(&mystreamer->member,
+															entry);
+					astreamer_content(mystreamer->base.bbs_next,
+									  &mystreamer->member,
+									  NULL,
+									  0,
+									  ASTREAMER_MEMBER_HEADER);
+					break;
+				case ARCHIVE_EOF:
+					/* End of archive. */
+					mystreamer->end_of_archive = true;
+					astreamer_content(mystreamer->base.bbs_next,
+									  NULL,
+									  NULL,
+									  0,
+									  ASTREAMER_ARCHIVE_TRAILER);
+					return;
+				default:
+					pg_fatal("unexpected result from archive_read_next_header()");
+					break;
+			}
+		}
+
+		/* Stream a chunk of data, or discover end of file. */
+		Assert(!mystreamer->end_of_file);
+		size = archive_read_data(mystreamer->archive,
+								 mystreamer->data,
+								 sizeof(mystreamer->data));
+		switch (size)
+		{
+			case ARCHIVE_RETRY:
+				continue;
+			case ARCHIVE_FATAL:
+				pg_fatal("libarchive: %s",
+						 archive_error_string(mystreamer->archive));
+				pg_unreachable();
+			case ARCHIVE_WARN:
+				pg_log_warning("libarchive: %s",
+							   archive_error_string(mystreamer->archive));
+				continue;
+			default:
+				break;
+		}
+
+		if (size == 0)
+		{
+			/* Send trailer, and go around to start another file. */
+			mystreamer->end_of_file = true;
+			astreamer_content(mystreamer->base.bbs_next,
+							  &mystreamer->member,
+							  NULL,
+							  0,
+							  ASTREAMER_MEMBER_TRAILER);
+			continue;
+		}
+
+		/* Stream large chunk and return. */
+		astreamer_content(mystreamer->base.bbs_next,
+						  &mystreamer->member,
+						  mystreamer->data,
+						  size,
+						  ASTREAMER_MEMBER_CONTENTS);
+		return;
+	}
+}
+
+static void
+astreamer_libarchive_reader_finalize(astreamer *streamer)
+{
+	astreamer_finalize(streamer->bbs_next);
+}
+
+static void
+astreamer_libarchive_reader_free(astreamer *streamer)
+{
+	astreamer_libarchive_reader *mystreamer;
+
+	mystreamer = (astreamer_libarchive_reader *) streamer;
+	archive_free(mystreamer->archive);
+	pfree(streamer);
+}
diff --git a/src/fe_utils/meson.build b/src/fe_utils/meson.build
index 86befca192e..6b95c36e9a5 100644
--- a/src/fe_utils/meson.build
+++ b/src/fe_utils/meson.build
@@ -21,6 +21,10 @@ fe_utils_sources = files(
   'version.c',
 )
 
+if libarchive.found()
+  fe_utils_sources += 'astreamer_libarchive.c'
+endif
+
 psqlscan = custom_target('psqlscan',
   input: 'psqlscan.l',
   output: 'psqlscan.c',
diff --git a/src/include/fe_utils/astreamer.h b/src/include/fe_utils/astreamer.h
index 8329e4efbc5..c6c54e954e9 100644
--- a/src/include/fe_utils/astreamer.h
+++ b/src/include/fe_utils/astreamer.h
@@ -142,6 +142,13 @@ astreamer_content(astreamer *streamer, astreamer_member *member,
 	streamer->bbs_ops->content(streamer, member, data, len, context);
 }
 
+/* Variant for astreamers that produce data themselves. */
+static inline void
+astreamer_pull(astreamer *streamer)
+{
+	astreamer_content(streamer, NULL, NULL, 0, ASTREAMER_UNKNOWN);
+}
+
 /* Finalize a astreamer. */
 static inline void
 astreamer_finalize(astreamer *streamer)
@@ -228,4 +235,9 @@ extern astreamer *astreamer_tar_parser_new(astreamer *next);
 extern astreamer *astreamer_tar_terminator_new(astreamer *next);
 extern astreamer *astreamer_tar_archiver_new(astreamer *next);
 
+#ifdef USE_LIBARCHIVE
+extern astreamer *astreamer_libarchive_reader_new_pathname(astreamer *next,
+														   const char *pathname);
+#endif
+
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c72f6c59573..1bb3a2bafd4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3564,6 +3564,7 @@ astreamer_archive_context
 astreamer_extractor
 astreamer_gzip_decompressor
 astreamer_gzip_writer
+astreamer_libarchive_reader
 astreamer_lz4_frame
 astreamer_member
 astreamer_ops
-- 
2.53.0

