From ae1b7961d4c0d821d7a56f70b41c91fc530bbf91 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 1 Mar 2019 20:13:23 -0800
Subject: [PATCH v14 2/2] tableam: Add pg_dump support.

TODO:
- matviews and partitioned tables also can have a table AM

Author: Dimitri Dolgov, with some changes by me.
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/bin/pg_dump/pg_backup_archiver.c        | 57 ++++++++++++++++++++-
 src/bin/pg_dump/pg_backup_archiver.h        |  6 ++-
 src/bin/pg_dump/pg_dump.c                   | 14 ++++-
 src/bin/pg_dump/pg_dump.h                   |  1 +
 src/test/modules/test_pg_dump/t/001_base.pl | 39 ++++++++++++++
 5 files changed, 114 insertions(+), 3 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 0f1afeacf79..62bf1493aa2 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -85,6 +85,7 @@ static void _becomeUser(ArchiveHandle *AH, const char *user);
 static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
 static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
 static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
+static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
 static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
@@ -1090,6 +1091,7 @@ ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
 	newToc->tag = pg_strdup(opts->tag);
 	newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
 	newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
+	newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
 	newToc->owner = pg_strdup(opts->owner);
 	newToc->desc = pg_strdup(opts->description);
 	newToc->defn = pg_strdup(opts->createStmt);
@@ -2350,6 +2352,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	AH->currUser = NULL;		/* unknown */
 	AH->currSchema = NULL;		/* ditto */
 	AH->currTablespace = NULL;	/* ditto */
+	AH->currTableAm = NULL;	/* ditto */
 
 	AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
 
@@ -2576,6 +2579,7 @@ WriteToc(ArchiveHandle *AH)
 		WriteStr(AH, te->copyStmt);
 		WriteStr(AH, te->namespace);
 		WriteStr(AH, te->tablespace);
+		WriteStr(AH, te->tableam);
 		WriteStr(AH, te->owner);
 		WriteStr(AH, "false");
 
@@ -2678,6 +2682,9 @@ ReadToc(ArchiveHandle *AH)
 		if (AH->version >= K_VERS_1_10)
 			te->tablespace = ReadStr(AH);
 
+		if (AH->version >= K_VERS_1_14)
+			te->tableam = ReadStr(AH);
+
 		te->owner = ReadStr(AH);
 		if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
 			write_msg(modulename,
@@ -3431,6 +3438,48 @@ _selectTablespace(ArchiveHandle *AH, const char *tablespace)
 	destroyPQExpBuffer(qry);
 }
 
+/*
+ * Set the proper default_table_access_method value for the table.
+ */
+static void
+_selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
+{
+	PQExpBuffer cmd;
+	const char *want, *have;
+
+	have = AH->currTableAm;
+	want = tableam;
+
+	if (!want)
+		return;
+
+	if (have && strcmp(want, have) == 0)
+		return;
+
+	cmd = createPQExpBuffer();
+	appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
+
+	if (RestoringToDB(AH))
+	{
+		PGresult   *res;
+
+		res = PQexec(AH->connection, cmd->data);
+
+		if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
+			warn_or_exit_horribly(AH, modulename,
+								  "could not set default_table_access_method: %s",
+								  PQerrorMessage(AH->connection));
+
+		PQclear(res);
+	}
+	else
+		ahprintf(AH, "%s\n\n", cmd->data);
+
+	destroyPQExpBuffer(cmd);
+
+	AH->currTableAm = pg_strdup(want);
+}
+
 /*
  * Extract an object description for a TOC entry, and append it to buf.
  *
@@ -3526,10 +3575,11 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
-	/* Select owner, schema, and tablespace as necessary */
+	/* Select owner, schema, tablespace and default AM as necessary */
 	_becomeOwner(AH, te);
 	_selectOutputSchema(AH, te->namespace);
 	_selectTablespace(AH, te->tablespace);
+	_selectTableAccessMethod(AH, te->tableam);
 
 	/* Emit header comment for item */
 	if (!AH->noTocComments)
@@ -4006,6 +4056,9 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
 	if (AH->currTablespace)
 		free(AH->currTablespace);
 	AH->currTablespace = NULL;
+	if (AH->currTableAm)
+		free(AH->currTableAm);
+	AH->currTableAm = NULL;
 }
 
 /*
@@ -4891,6 +4944,8 @@ DeCloneArchive(ArchiveHandle *AH)
 		free(AH->currSchema);
 	if (AH->currTablespace)
 		free(AH->currTablespace);
+	if (AH->currTableAm)
+		free(AH->currTableAm);
 	if (AH->savedPassword)
 		free(AH->savedPassword);
 
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index ebf3d209ea1..2015b735ae3 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -94,6 +94,7 @@ typedef z_stream *z_streamp;
 													 * entries */
 #define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0)	/* change search_path
 													 * behavior */
+#define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0)	/* add tableam */
 
 /*
  * Current archive version number (the format we can output)
@@ -102,7 +103,7 @@ typedef z_stream *z_streamp;
  * https://postgr.es/m/20190227123217.GA27552@alvherre.pgsql
  */
 #define K_VERS_MAJOR 1
-#define K_VERS_MINOR 13
+#define K_VERS_MINOR 14
 #define K_VERS_REV 0
 #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV);
 
@@ -352,6 +353,7 @@ struct _archiveHandle
 	char	   *currUser;		/* current username, or NULL if unknown */
 	char	   *currSchema;		/* current schema, or NULL */
 	char	   *currTablespace; /* current tablespace, or NULL */
+	char	   *currTableAm;	/* current table access method, or NULL */
 
 	void	   *lo_buf;
 	size_t		lo_buf_used;
@@ -378,6 +380,7 @@ struct _tocEntry
 	char	   *namespace;		/* null or empty string if not in a schema */
 	char	   *tablespace;		/* null if not in a tablespace; empty string
 								 * means use database default */
+	char	   *tableam;		/* table access method, only for TABLE tags */
 	char	   *owner;
 	char	   *desc;
 	char	   *defn;
@@ -416,6 +419,7 @@ typedef struct _archiveOpts
 	const char *tag;
 	const char *namespace;
 	const char *tablespace;
+	const char *tableam;
 	const char *owner;
 	const char *description;
 	teSection	section;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 5d830383485..9e541b3b0ac 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5856,6 +5856,7 @@ getTables(Archive *fout, int *numTables)
 	int			i_partkeydef;
 	int			i_ispartition;
 	int			i_partbound;
+	int			i_amname;
 
 	/*
 	 * Find all the tables and table-like objects.
@@ -5941,7 +5942,7 @@ getTables(Archive *fout, int *numTables)
 						  "tc.relfrozenxid AS tfrozenxid, "
 						  "tc.relminmxid AS tminmxid, "
 						  "c.relpersistence, c.relispopulated, "
-						  "c.relreplident, c.relpages, "
+						  "c.relreplident, c.relpages, am.amname, "
 						  "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
 						  "d.refobjid AS owning_tab, "
 						  "d.refobjsubid AS owning_col, "
@@ -5972,6 +5973,7 @@ getTables(Archive *fout, int *numTables)
 						  "d.objsubid = 0 AND "
 						  "d.refclassid = c.tableoid AND d.deptype IN ('a', 'i')) "
 						  "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
+						  "LEFT JOIN pg_am am ON (c.relam = am.oid) "
 						  "LEFT JOIN pg_init_privs pip ON "
 						  "(c.oid = pip.objoid "
 						  "AND pip.classoid = 'pg_class'::regclass "
@@ -6439,6 +6441,7 @@ getTables(Archive *fout, int *numTables)
 	i_partkeydef = PQfnumber(res, "partkeydef");
 	i_ispartition = PQfnumber(res, "ispartition");
 	i_partbound = PQfnumber(res, "partbound");
+	i_amname = PQfnumber(res, "amname");
 
 	if (dopt->lockWaitTimeout)
 	{
@@ -6508,6 +6511,10 @@ getTables(Archive *fout, int *numTables)
 		else
 			tblinfo[i].checkoption = pg_strdup(PQgetvalue(res, i, i_checkoption));
 		tblinfo[i].toast_reloptions = pg_strdup(PQgetvalue(res, i, i_toastreloptions));
+		if (PQgetisnull(res, i, i_amname))
+			tblinfo[i].amname = NULL;
+		else
+			tblinfo[i].amname = pg_strdup(PQgetvalue(res, i, i_amname));
 
 		/* other fields were zeroed above */
 
@@ -12632,6 +12639,9 @@ dumpAccessMethod(Archive *fout, AccessMethodInfo *aminfo)
 		case AMTYPE_INDEX:
 			appendPQExpBuffer(q, "TYPE INDEX ");
 			break;
+		case AMTYPE_TABLE:
+			appendPQExpBuffer(q, "TYPE TABLE ");
+			break;
 		default:
 			write_msg(NULL, "WARNING: invalid type \"%c\" of access method \"%s\"\n",
 					  aminfo->amtype, qamname);
@@ -16072,6 +16082,8 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
 								  .namespace = tbinfo->dobj.namespace->dobj.name,
 								  .tablespace = (tbinfo->relkind == RELKIND_VIEW) ?
 								  NULL : tbinfo->reltablespace,
+								  .tableam = (tbinfo->relkind == RELKIND_RELATION) ?
+								  tbinfo->amname : NULL,
 								  .owner = tbinfo->rolname,
 								  .description = reltypename,
 								  .section = tbinfo->postponed_def ?
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 21d2ab05b0e..2e1b90acd0a 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -324,6 +324,7 @@ typedef struct _tableInfo
 	char	   *partkeydef;		/* partition key definition */
 	char	   *partbound;		/* partition bound definition */
 	bool		needs_override; /* has GENERATED ALWAYS AS IDENTITY */
+	char	   *amname;			/* relation access method */
 
 	/*
 	 * Stuff computed only for dumpable tables.
diff --git a/src/test/modules/test_pg_dump/t/001_base.pl b/src/test/modules/test_pg_dump/t/001_base.pl
index fb4ecf8acac..33f88b12f31 100644
--- a/src/test/modules/test_pg_dump/t/001_base.pl
+++ b/src/test/modules/test_pg_dump/t/001_base.pl
@@ -322,6 +322,19 @@ my %tests = (
 		like => { binary_upgrade => 1, },
 	},
 
+	'CREATE ACCESS METHOD regress_test_table_am' => {
+		create_order => 11,
+		create_sql   => 'CREATE ACCESS METHOD regress_table_am TYPE TABLE HANDLER heap_tableam_handler;',
+		regexp => qr/^
+			\QCREATE ACCESS METHOD regress_table_am TYPE TABLE HANDLER heap_tableam_handler;\E
+			\n/xm,
+		like => {
+			%full_runs,
+			schema_only			=> 1,
+			section_pre_data	=> 1,
+		},
+	},
+
 	'COMMENT ON EXTENSION test_pg_dump' => {
 		regexp => qr/^
 			\QCOMMENT ON EXTENSION test_pg_dump \E
@@ -537,6 +550,32 @@ my %tests = (
 			schema_only      => 1,
 			section_pre_data => 1,
 		},
+	},
+
+	'SET regress_pg_dump_table_am' => {
+		create_order => 12,
+		regexp => qr/^
+			\QSET default_table_access_method = regress_table_am;\E
+			\n/xm,
+		like => {
+			%full_runs,
+			schema_only			=> 1,
+			section_pre_data	=> 1,
+		},
+	},
+
+	'CREATE TABLE regress_pg_dump_table_am' => {
+		create_order => 13,
+		create_sql =>
+		  'CREATE TABLE regress_pg_dump_table_am (col1 int not null, col2 int) USING regress_table_am;',
+		regexp => qr/^
+			\QCREATE TABLE public.regress_pg_dump_table_added (\E
+			\n\s+\Qcol1 integer NOT NULL,\E
+			\n\s+\Qcol2 integer\E
+			\n\);\n/xm,
+		like => {
+			binary_upgrade => 1,
+		},
 	},);
 
 #########################################
-- 
2.21.0.dirty

