From bc0e962f09ec59795c49046924445193190702a4 Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Fri, 15 Mar 2024 18:45:02 -0400
Subject: [PATCH v11 5/6] Improve storage efficiency for BLOB ACLs.

Since we are already requiring the blobs of a metadata group to share
the same ACL, it's not terribly hard to store only one copy of that
ACL, and then make pg_restore regenerate the appropriate commands for
each blob.  This saves space in the dump file not only by removing
duplicative SQL command strings, but by not needing a separate TOC
entry for each blob's ACL.  In turn, that reduces client-side
memory requirements for handling many blobs.

TOC entries that need special processing are labeled as "ACL"/"LARGE
OBJECTS nnn..nnn".  If we have a blob with a unique ACL, continue to
label it as "ACL"/"LARGE OBJECT nnn".  We don't actually have to make
such a distinction, but it saves a few cycles during restore for the
easy case, and it seems like a good idea to not change the TOC
contents unnecessarily.

We already bumped the archive file format version number earlier
in this patch series, so it's not necessary to do so again, even
though this adds another thing that pg_restore needs to know.
---
 src/bin/pg_dump/pg_backup_archiver.c | 23 +++++--
 src/bin/pg_dump/pg_backup_archiver.h |  1 +
 src/bin/pg_dump/pg_backup_db.c       | 83 ++++++++++++++++++++++++
 src/bin/pg_dump/pg_dump.c            | 97 ++++++++++++++++++----------
 4 files changed, 164 insertions(+), 40 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 29a358d16d..c7a6c918a6 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -3111,11 +3111,11 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
 			strcmp(te->desc, "BLOB") == 0 ||
 			strcmp(te->desc, "BLOB METADATA") == 0 ||
 			(strcmp(te->desc, "ACL") == 0 &&
-			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+			 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
 			(strcmp(te->desc, "COMMENT") == 0 &&
-			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+			 strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
 			(strcmp(te->desc, "SECURITY LABEL") == 0 &&
-			 strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
+			 strncmp(te->tag, "LARGE OBJECT", 12) == 0))
 			res = res & REQ_DATA;
 		else
 			res = res & ~REQ_DATA;
@@ -3152,11 +3152,11 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
 			  (strcmp(te->desc, "BLOB") == 0 ||
 			   strcmp(te->desc, "BLOB METADATA") == 0 ||
 			   (strcmp(te->desc, "ACL") == 0 &&
-				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+				strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
 			   (strcmp(te->desc, "COMMENT") == 0 &&
-				strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+				strncmp(te->tag, "LARGE OBJECT", 12) == 0) ||
 			   (strcmp(te->desc, "SECURITY LABEL") == 0 &&
-				strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
+				strncmp(te->tag, "LARGE OBJECT", 12) == 0))))
 			res = res & REQ_SCHEMA;
 	}
 
@@ -3737,7 +3737,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 
 	/*
 	 * Actually print the definition.  Normally we can just print the defn
-	 * string if any, but we have two special cases:
+	 * string if any, but we have three special cases:
 	 *
 	 * 1. A crude hack for suppressing AUTHORIZATION clause that old pg_dump
 	 * versions put into CREATE SCHEMA.  Don't mutate the variant for schema
@@ -3746,6 +3746,10 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 	 *
 	 * 2. BLOB METADATA entries need special processing since their defn
 	 * strings are just lists of OIDs, not complete SQL commands.
+	 *
+	 * 3. ACL LARGE OBJECTS entries need special processing because they
+	 * contain only one copy of the ACL GRANT/REVOKE commands, which we must
+	 * apply to each large object listed in the associated BLOB METADATA.
 	 */
 	if (ropt->noOwner &&
 		strcmp(te->desc, "SCHEMA") == 0 && strncmp(te->defn, "--", 2) != 0)
@@ -3756,6 +3760,11 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 	{
 		IssueCommandPerBlob(AH, te, "SELECT pg_catalog.lo_create('", "')");
 	}
+	else if (strcmp(te->desc, "ACL") == 0 &&
+			 strncmp(te->tag, "LARGE OBJECTS", 13) == 0)
+	{
+		IssueACLPerBlob(AH, te);
+	}
 	else
 	{
 		if (te->defn && strlen(te->defn) > 0)
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 1b9f142dea..d6104a7196 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -455,6 +455,7 @@ extern bool isValidTarHeader(char *header);
 extern void ReconnectToServer(ArchiveHandle *AH, const char *dbname);
 extern void IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
 								const char *cmdBegin, const char *cmdEnd);
+extern void IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te);
 extern void DropLOIfExists(ArchiveHandle *AH, Oid oid);
 
 void		ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH);
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index 6b3bf174f2..a02841c405 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -586,6 +586,89 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
 	pg_free(buf);
 }
 
+/*
+ * Process a "LARGE OBJECTS" ACL TocEntry.
+ *
+ * To save space in the dump file, the TocEntry contains only one copy
+ * of the required GRANT/REVOKE commands, written to apply to the first
+ * blob in the group (although we do not depend on that detail here).
+ * We must expand the text to generate commands for all the blobs listed
+ * in the associated BLOB METADATA entry.
+ */
+void
+IssueACLPerBlob(ArchiveHandle *AH, TocEntry *te)
+{
+	TocEntry   *blobte = getTocEntryByDumpId(AH, te->dependencies[0]);
+	char	   *buf;
+	char	   *st;
+	char	   *st2;
+	char	   *en;
+	bool		inquotes;
+
+	if (!blobte)
+		pg_fatal("could not find entry for ID %d", te->dependencies[0]);
+	Assert(strcmp(blobte->desc, "BLOB METADATA") == 0);
+
+	/* Make a writable copy of the ACL commands string */
+	buf = pg_strdup(te->defn);
+
+	/*
+	 * We have to parse out the commands sufficiently to locate the blob OIDs
+	 * and find the command-ending semicolons.  The commands should not
+	 * contain anything hard to parse except for double-quoted role names,
+	 * which are easy to ignore.  Once we've split apart the first and second
+	 * halves of a command, apply IssueCommandPerBlob.  (This means the
+	 * updates on the blobs are interleaved if there's multiple commands, but
+	 * that should cause no trouble.)
+	 */
+	inquotes = false;
+	st = en = buf;
+	st2 = NULL;
+	while (*en)
+	{
+		/* Ignore double-quoted material */
+		if (*en == '"')
+			inquotes = !inquotes;
+		if (inquotes)
+		{
+			en++;
+			continue;
+		}
+		/* If we found "LARGE OBJECT", that's the end of the first half */
+		if (strncmp(en, "LARGE OBJECT ", 13) == 0)
+		{
+			/* Terminate the first-half string */
+			en += 13;
+			Assert(isdigit((unsigned char) *en));
+			*en++ = '\0';
+			/* Skip the rest of the blob OID */
+			while (isdigit((unsigned char) *en))
+				en++;
+			/* Second half starts here */
+			Assert(st2 == NULL);
+			st2 = en;
+		}
+		/* If we found semicolon, that's the end of the second half */
+		else if (*en == ';')
+		{
+			/* Terminate the second-half string */
+			*en++ = '\0';
+			Assert(st2 != NULL);
+			/* Issue this command for each blob */
+			IssueCommandPerBlob(AH, blobte, st, st2);
+			/* For neatness, skip whitespace before the next command */
+			while (isspace((unsigned char) *en))
+				en++;
+			/* Reset for new command */
+			st = en;
+			st2 = NULL;
+		}
+		else
+			en++;
+	}
+	pg_free(buf);
+}
+
 void
 DropLOIfExists(ArchiveHandle *AH, Oid oid)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 13431db769..d875fe756d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -271,7 +271,7 @@ static void dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo);
 
 static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 					  const char *type, const char *name, const char *subname,
-					  const char *nspname, const char *owner,
+					  const char *nspname, const char *tag, const char *owner,
 					  const DumpableAcl *dacl);
 
 static void getDependencies(Archive *fout);
@@ -3278,7 +3278,7 @@ dumpDatabase(Archive *fout)
 
 	dumpACL(fout, dbDumpId, InvalidDumpId, "DATABASE",
 			qdatname, NULL, NULL,
-			dba, &dbdacl);
+			NULL, dba, &dbdacl);
 
 	/*
 	 * Now construct a DATABASE PROPERTIES archive entry to restore any
@@ -3754,12 +3754,11 @@ dumpLO(Archive *fout, const LoInfo *loinfo)
 								  .dropStmt = "-- dummy"));
 
 	/*
-	 * Dump per-blob comments, seclabels, and ACLs if any.  We assume these
-	 * are rare enough that it's okay to generate retail TOC entries for them.
+	 * Dump per-blob comments and seclabels if any.  We assume these are rare
+	 * enough that it's okay to generate retail TOC entries for them.
 	 */
 	if (loinfo->dobj.dump & (DUMP_COMPONENT_COMMENT |
-							 DUMP_COMPONENT_SECLABEL |
-							 DUMP_COMPONENT_ACL))
+							 DUMP_COMPONENT_SECLABEL))
 	{
 		for (int i = 0; i < loinfo->numlos; i++)
 		{
@@ -3780,11 +3779,39 @@ dumpLO(Archive *fout, const LoInfo *loinfo)
 				dumpSecLabel(fout, "LARGE OBJECT", namebuf,
 							 NULL, loinfo->rolname,
 							 catId, 0, loinfo->dobj.dumpId);
+		}
+	}
+
+	/*
+	 * Dump the ACLs if any (remember that all blobs in the group will have
+	 * the same ACL).  If there's just one blob, dump a simple ACL entry; if
+	 * there's more, make a "LARGE OBJECTS" entry that really contains only
+	 * the ACL for the first blob.  _printTocEntry() will be cued by the tag
+	 * string to emit a mutated version for each blob.
+	 */
+	if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
+	{
+		char		namebuf[32];
+
+		/* Build identifying info for the first blob */
+		snprintf(namebuf, sizeof(namebuf), "%u", loinfo->looids[0]);
+
+		if (loinfo->numlos > 1)
+		{
+			char		tagbuf[64];
+
+			snprintf(tagbuf, sizeof(tagbuf), "LARGE OBJECTS %u..%u",
+					 loinfo->looids[0], loinfo->looids[loinfo->numlos - 1]);
 
-			if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
-				dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
-						"LARGE OBJECT", namebuf, NULL,
-						NULL, loinfo->rolname, &loinfo->dacl);
+			dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
+					"LARGE OBJECT", namebuf, NULL, NULL,
+					tagbuf, loinfo->rolname, &loinfo->dacl);
+		}
+		else
+		{
+			dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId,
+					"LARGE OBJECT", namebuf, NULL, NULL,
+					NULL, loinfo->rolname, &loinfo->dacl);
 		}
 	}
 
@@ -10787,7 +10814,7 @@ dumpNamespace(Archive *fout, const NamespaceInfo *nspinfo)
 	if (nspinfo->dobj.dump & DUMP_COMPONENT_ACL)
 		dumpACL(fout, nspinfo->dobj.dumpId, InvalidDumpId, "SCHEMA",
 				qnspname, NULL, NULL,
-				nspinfo->rolname, &nspinfo->dacl);
+				NULL, nspinfo->rolname, &nspinfo->dacl);
 
 	free(qnspname);
 
@@ -11084,7 +11111,7 @@ dumpEnumType(Archive *fout, const TypeInfo *tyinfo)
 		dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
 				qtypname, NULL,
 				tyinfo->dobj.namespace->dobj.name,
-				tyinfo->rolname, &tyinfo->dacl);
+				NULL, tyinfo->rolname, &tyinfo->dacl);
 
 	PQclear(res);
 	destroyPQExpBuffer(q);
@@ -11237,7 +11264,7 @@ dumpRangeType(Archive *fout, const TypeInfo *tyinfo)
 		dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
 				qtypname, NULL,
 				tyinfo->dobj.namespace->dobj.name,
-				tyinfo->rolname, &tyinfo->dacl);
+				NULL, tyinfo->rolname, &tyinfo->dacl);
 
 	PQclear(res);
 	destroyPQExpBuffer(q);
@@ -11308,7 +11335,7 @@ dumpUndefinedType(Archive *fout, const TypeInfo *tyinfo)
 		dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
 				qtypname, NULL,
 				tyinfo->dobj.namespace->dobj.name,
-				tyinfo->rolname, &tyinfo->dacl);
+				NULL, tyinfo->rolname, &tyinfo->dacl);
 
 	destroyPQExpBuffer(q);
 	destroyPQExpBuffer(delq);
@@ -11555,7 +11582,7 @@ dumpBaseType(Archive *fout, const TypeInfo *tyinfo)
 		dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
 				qtypname, NULL,
 				tyinfo->dobj.namespace->dobj.name,
-				tyinfo->rolname, &tyinfo->dacl);
+				NULL, tyinfo->rolname, &tyinfo->dacl);
 
 	PQclear(res);
 	destroyPQExpBuffer(q);
@@ -11710,7 +11737,7 @@ dumpDomain(Archive *fout, const TypeInfo *tyinfo)
 		dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
 				qtypname, NULL,
 				tyinfo->dobj.namespace->dobj.name,
-				tyinfo->rolname, &tyinfo->dacl);
+				NULL, tyinfo->rolname, &tyinfo->dacl);
 
 	/* Dump any per-constraint comments */
 	for (i = 0; i < tyinfo->nDomChecks; i++)
@@ -11924,7 +11951,7 @@ dumpCompositeType(Archive *fout, const TypeInfo *tyinfo)
 		dumpACL(fout, tyinfo->dobj.dumpId, InvalidDumpId, "TYPE",
 				qtypname, NULL,
 				tyinfo->dobj.namespace->dobj.name,
-				tyinfo->rolname, &tyinfo->dacl);
+				NULL, tyinfo->rolname, &tyinfo->dacl);
 
 	/* Dump any per-column comments */
 	if (tyinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
@@ -12200,7 +12227,7 @@ dumpProcLang(Archive *fout, const ProcLangInfo *plang)
 	if (plang->lanpltrusted && plang->dobj.dump & DUMP_COMPONENT_ACL)
 		dumpACL(fout, plang->dobj.dumpId, InvalidDumpId, "LANGUAGE",
 				qlanname, NULL, NULL,
-				plang->lanowner, &plang->dacl);
+				NULL, plang->lanowner, &plang->dacl);
 
 	free(qlanname);
 
@@ -12664,7 +12691,7 @@ dumpFunc(Archive *fout, const FuncInfo *finfo)
 		dumpACL(fout, finfo->dobj.dumpId, InvalidDumpId, keyword,
 				funcsig, NULL,
 				finfo->dobj.namespace->dobj.name,
-				finfo->rolname, &finfo->dacl);
+				NULL, finfo->rolname, &finfo->dacl);
 
 	PQclear(res);
 
@@ -14524,7 +14551,7 @@ dumpAgg(Archive *fout, const AggInfo *agginfo)
 		dumpACL(fout, agginfo->aggfn.dobj.dumpId, InvalidDumpId,
 				"FUNCTION", aggsig, NULL,
 				agginfo->aggfn.dobj.namespace->dobj.name,
-				agginfo->aggfn.rolname, &agginfo->aggfn.dacl);
+				NULL, agginfo->aggfn.rolname, &agginfo->aggfn.dacl);
 
 	free(aggsig);
 	free(aggfullsig);
@@ -14921,7 +14948,7 @@ dumpForeignDataWrapper(Archive *fout, const FdwInfo *fdwinfo)
 	/* Handle the ACL */
 	if (fdwinfo->dobj.dump & DUMP_COMPONENT_ACL)
 		dumpACL(fout, fdwinfo->dobj.dumpId, InvalidDumpId,
-				"FOREIGN DATA WRAPPER", qfdwname, NULL,
+				"FOREIGN DATA WRAPPER", qfdwname, NULL, NULL,
 				NULL, fdwinfo->rolname, &fdwinfo->dacl);
 
 	free(qfdwname);
@@ -15008,7 +15035,7 @@ dumpForeignServer(Archive *fout, const ForeignServerInfo *srvinfo)
 	/* Handle the ACL */
 	if (srvinfo->dobj.dump & DUMP_COMPONENT_ACL)
 		dumpACL(fout, srvinfo->dobj.dumpId, InvalidDumpId,
-				"FOREIGN SERVER", qsrvname, NULL,
+				"FOREIGN SERVER", qsrvname, NULL, NULL,
 				NULL, srvinfo->rolname, &srvinfo->dacl);
 
 	/* Dump user mappings */
@@ -15208,6 +15235,8 @@ dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo)
  * 'subname' is the formatted name of the sub-object, if any.  Must be quoted.
  *		(Currently we assume that subname is only provided for table columns.)
  * 'nspname' is the namespace the object is in (NULL if none).
+ * 'tag' is the tag to use for the ACL TOC entry; typically, this is NULL
+ *		to use the default for the object type.
  * 'owner' is the owner, NULL if there is no owner (for languages).
  * 'dacl' is the DumpableAcl struct for the object.
  *
@@ -15218,7 +15247,7 @@ dumpDefaultACL(Archive *fout, const DefaultACLInfo *daclinfo)
 static DumpId
 dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 		const char *type, const char *name, const char *subname,
-		const char *nspname, const char *owner,
+		const char *nspname, const char *tag, const char *owner,
 		const DumpableAcl *dacl)
 {
 	DumpId		aclDumpId = InvalidDumpId;
@@ -15290,14 +15319,16 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 
 	if (sql->len > 0)
 	{
-		PQExpBuffer tag = createPQExpBuffer();
+		PQExpBuffer tagbuf = createPQExpBuffer();
 		DumpId		aclDeps[2];
 		int			nDeps = 0;
 
-		if (subname)
-			appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
+		if (tag)
+			appendPQExpBufferStr(tagbuf, tag);
+		else if (subname)
+			appendPQExpBuffer(tagbuf, "COLUMN %s.%s", name, subname);
 		else
-			appendPQExpBuffer(tag, "%s %s", type, name);
+			appendPQExpBuffer(tagbuf, "%s %s", type, name);
 
 		aclDeps[nDeps++] = objDumpId;
 		if (altDumpId != InvalidDumpId)
@@ -15306,7 +15337,7 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 		aclDumpId = createDumpId();
 
 		ArchiveEntry(fout, nilCatalogId, aclDumpId,
-					 ARCHIVE_OPTS(.tag = tag->data,
+					 ARCHIVE_OPTS(.tag = tagbuf->data,
 								  .namespace = nspname,
 								  .owner = owner,
 								  .description = "ACL",
@@ -15315,7 +15346,7 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 								  .deps = aclDeps,
 								  .nDeps = nDeps));
 
-		destroyPQExpBuffer(tag);
+		destroyPQExpBuffer(tagbuf);
 	}
 
 	destroyPQExpBuffer(sql);
@@ -15697,8 +15728,8 @@ dumpTable(Archive *fout, const TableInfo *tbinfo)
 		tableAclDumpId =
 			dumpACL(fout, tbinfo->dobj.dumpId, InvalidDumpId,
 					objtype, namecopy, NULL,
-					tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
-					&tbinfo->dacl);
+					tbinfo->dobj.namespace->dobj.name,
+					NULL, tbinfo->rolname, &tbinfo->dacl);
 	}
 
 	/*
@@ -15791,8 +15822,8 @@ dumpTable(Archive *fout, const TableInfo *tbinfo)
 			 */
 			dumpACL(fout, tbinfo->dobj.dumpId, tableAclDumpId,
 					"TABLE", namecopy, attnamecopy,
-					tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
-					&coldacl);
+					tbinfo->dobj.namespace->dobj.name,
+					NULL, tbinfo->rolname, &coldacl);
 			free(attnamecopy);
 		}
 		PQclear(res);
-- 
2.39.3

