Large object dumps vs older pg_restore

Started by Tom Lanealmost 16 years ago4 messages
#1Tom Lane
tgl@sss.pgh.pa.us

I've been looking at the proposed patch for pg_dump with large objects,
and I'm afraid it breaks things even worse than they're broken in HEAD.

Currently, when pg_restore is processing a BLOBS item, it emits
lo_create() followed by lo_write() calls, and conditionally precedes
that by lo_unlink() if --clean was specified. It pretty much has to
drive all that off the one archive entry since there are no others
(excepting BLOB COMMENTS which is unhelpful since it appears later).

The patch wants to emit a separate BLOB item for each blob, upon which
we can hang ownership, ACL, and comment information in the same ways
that we deal with these things for other sorts of database objects.
That's good since it means that switches like --no-owner will work
properly. The trouble is that this means the responsibility to do
lo_unlink and lo_create has to be taken out from processing of
the BLOBS item.

The way the patch does that is that it renames BLOBS to BLOB DATA,
and drives the do-we-create decision off the name of the archive
entry.  This breaks compatibility of the archive to prior pg_restore
versions.  An 8.4 pg_restore will have no idea that it doesn't know
how to process the archive, but nonetheless will emit wrong results
--- in particular, you get a lo_create() from the BLOB item and then
another from BLOB DATA, so the restore fails completely.  There are
other bad effects too because 8.4 doesn't really know quite what
BLOB DATA is, but it needs to because there are various places that
have to treat that specially.

Probably the only way we can make this design work is to bump the
archive version number so that older pg_restores will fail. (Whereupon
there is no need to rename the entry type BTW.) This is slightly
annoying but it's not like we've not done it multiple times before.

If we wanted to keep backwards compatibility, we'd have to leave
the lo_create responsibility with the BLOBS item, and have the
BLOB metadata items be things that just add ACLs/ownership/comments
without doing the actual create, and have to be processed after
BLOBS instead of before it. This is probably workable but it
doesn't seem to me that it's accomplishing the goal of making blobs
work like normal objects.

So, any objections to bumping the version number?

regards, tom lane

#2Andrew Dunstan
andrew@dunslane.net
In reply to: Tom Lane (#1)
Re: Large object dumps vs older pg_restore

Tom Lane wrote:

[snip]

Probably the only way we can make this design work is to bump the
archive version number so that older pg_restores will fail. (Whereupon
there is no need to rename the entry type BTW.) This is slightly
annoying but it's not like we've not done it multiple times before.

If we wanted to keep backwards compatibility, we'd have to leave
the lo_create responsibility with the BLOBS item, and have the
BLOB metadata items be things that just add ACLs/ownership/comments
without doing the actual create, and have to be processed after
BLOBS instead of before it. This is probably workable but it
doesn't seem to me that it's accomplishing the goal of making blobs
work like normal objects.

So, any objections to bumping the version number?

When I read the snipped part of this email my immediate thought was "Why
aren't we bumping the archive version number?"

So +1 for this course of action.

cheers

andrew

#3KaiGai Kohei
kaigai@ak.jp.nec.com
In reply to: Tom Lane (#1)
Re: Large object dumps vs older pg_restore

(2010/02/18 4:54), Tom Lane wrote:

I've been looking at the proposed patch for pg_dump with large objects,
and I'm afraid it breaks things even worse than they're broken in HEAD.

Currently, when pg_restore is processing a BLOBS item, it emits
lo_create() followed by lo_write() calls, and conditionally precedes
that by lo_unlink() if --clean was specified. It pretty much has to
drive all that off the one archive entry since there are no others
(excepting BLOB COMMENTS which is unhelpful since it appears later).

The patch wants to emit a separate BLOB item for each blob, upon which
we can hang ownership, ACL, and comment information in the same ways
that we deal with these things for other sorts of database objects.
That's good since it means that switches like --no-owner will work
properly. The trouble is that this means the responsibility to do
lo_unlink and lo_create has to be taken out from processing of
the BLOBS item.

The way the patch does that is that it renames BLOBS to BLOB DATA,
and drives the do-we-create decision off the name of the archive
entry.  This breaks compatibility of the archive to prior pg_restore
versions.  An 8.4 pg_restore will have no idea that it doesn't know
how to process the archive, but nonetheless will emit wrong results
--- in particular, you get a lo_create() from the BLOB item and then
another from BLOB DATA, so the restore fails completely.  There are
other bad effects too because 8.4 doesn't really know quite what
BLOB DATA is, but it needs to because there are various places that
have to treat that specially.

Probably the only way we can make this design work is to bump the
archive version number so that older pg_restores will fail. (Whereupon
there is no need to rename the entry type BTW.) This is slightly
annoying but it's not like we've not done it multiple times before.

IIRC, Itagaki-san also suggested to abort when we try to restore
the dumped database image to the older pgsql. So, the checks in
the DropBlobIfExists() are modified.
This idea allows to abort restoring at the head of pg_restore.
It seems to me fair enough.

If we wanted to keep backwards compatibility, we'd have to leave
the lo_create responsibility with the BLOBS item, and have the
BLOB metadata items be things that just add ACLs/ownership/comments
without doing the actual create, and have to be processed after
BLOBS instead of before it. This is probably workable but it
doesn't seem to me that it's accomplishing the goal of making blobs
work like normal objects.

It was also an idea that I tried to implement in this approach at first.
But it is difficult to treat various kind of pg_restore options correctly.

Thanks,
--
OSS Platform Development Division, NEC
KaiGai Kohei <kaigai@ak.jp.nec.com>

#4KaiGai Kohei
kaigai@ak.jp.nec.com
In reply to: KaiGai Kohei (#3)
1 attachment(s)
Re: Large object dumps vs older pg_restore

The attached patch is modified to be applied to the latest tree,
and it increments the version number of the archive file.

kaigai@saba pg_dump]$ ./pg_dump -Ft postgres > ~/hoge.tar

[kaigai@saba pg_dump]$ /usr/bin/pg_restore --version
pg_restore (PostgreSQL) 8.4.2
[kaigai@saba pg_dump]$ /usr/bin/pg_restore -d test ~/hoge.tar
pg_restore: [archiver] unsupported version (1.12) in file header
--> unable to restore using older pg_restore

[kaigai@saba pg_dump]$ ./pg_restore --version
pg_restore (PostgreSQL) 9.0devel
[kaigai@saba pg_dump]$ ./pg_restore -d test ~/hoge.tar

[kaigai@saba pg_dump]$ psql test
psql (9.0devel)
Type "help" for help.

test=# \lo_list
Large objects
ID | Owner | Description
-------+--------+--------------------------------
16384 | kaigai |
16385 | ymj |
16386 | tak | this is a small large object
16387 | kaigai | this is a small large object 2
(4 rows)

test=# select oid, * from pg_largeobject_metadata;
oid | lomowner | lomacl
-------+----------+---------------------------------
16387 | 10 |
16384 | 10 | {kaigai=rw/kaigai,ymj=r/kaigai}
16385 | 16388 | {ymj=rw/ymj,tak=r/ymj}
16386 | 16389 | {tak=rw/tak,ymj=r/tak}
(4 rows)

Thanks,

(2010/02/18 9:12), KaiGai Kohei wrote:

(2010/02/18 4:54), Tom Lane wrote:

I've been looking at the proposed patch for pg_dump with large objects,
and I'm afraid it breaks things even worse than they're broken in HEAD.

Currently, when pg_restore is processing a BLOBS item, it emits
lo_create() followed by lo_write() calls, and conditionally precedes
that by lo_unlink() if --clean was specified. It pretty much has to
drive all that off the one archive entry since there are no others
(excepting BLOB COMMENTS which is unhelpful since it appears later).

The patch wants to emit a separate BLOB item for each blob, upon which
we can hang ownership, ACL, and comment information in the same ways
that we deal with these things for other sorts of database objects.
That's good since it means that switches like --no-owner will work
properly. The trouble is that this means the responsibility to do
lo_unlink and lo_create has to be taken out from processing of
the BLOBS item.

The way the patch does that is that it renames BLOBS to BLOB DATA,
and drives the do-we-create decision off the name of the archive
entry.  This breaks compatibility of the archive to prior pg_restore
versions.  An 8.4 pg_restore will have no idea that it doesn't know
how to process the archive, but nonetheless will emit wrong results
--- in particular, you get a lo_create() from the BLOB item and then
another from BLOB DATA, so the restore fails completely.  There are
other bad effects too because 8.4 doesn't really know quite what
BLOB DATA is, but it needs to because there are various places that
have to treat that specially.

Probably the only way we can make this design work is to bump the
archive version number so that older pg_restores will fail. (Whereupon
there is no need to rename the entry type BTW.) This is slightly
annoying but it's not like we've not done it multiple times before.

IIRC, Itagaki-san also suggested to abort when we try to restore
the dumped database image to the older pgsql. So, the checks in
the DropBlobIfExists() are modified.
This idea allows to abort restoring at the head of pg_restore.
It seems to me fair enough.

If we wanted to keep backwards compatibility, we'd have to leave
the lo_create responsibility with the BLOBS item, and have the
BLOB metadata items be things that just add ACLs/ownership/comments
without doing the actual create, and have to be processed after
BLOBS instead of before it. This is probably workable but it
doesn't seem to me that it's accomplishing the goal of making blobs
work like normal objects.

It was also an idea that I tried to implement in this approach at first.
But it is difficult to treat various kind of pg_restore options correctly.

Thanks,

--
OSS Platform Development Division, NEC
KaiGai Kohei <kaigai@ak.jp.nec.com>

Attachments:

pgsql-fix-pg_dump-blob-privs.8.patchapplication/octect-stream; name=pgsql-fix-pg_dump-blob-privs.8.patchDownload
 src/bin/pg_dump/pg_backup_archiver.c |   48 ++++--
 src/bin/pg_dump/pg_backup_archiver.h |   10 +-
 src/bin/pg_dump/pg_backup_custom.c   |   11 +-
 src/bin/pg_dump/pg_backup_db.c       |   20 +--
 src/bin/pg_dump/pg_backup_files.c    |   11 +-
 src/bin/pg_dump/pg_backup_null.c     |   17 ++-
 src/bin/pg_dump/pg_backup_tar.c      |   11 +-
 src/bin/pg_dump/pg_dump.c            |  321 +++++++++++++++-------------------
 src/bin/pg_dump/pg_dump.h            |   12 +-
 src/bin/pg_dump/pg_dump_sort.c       |   12 +-
 10 files changed, 240 insertions(+), 233 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 24c0fd4..22fef19 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -329,7 +329,7 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
 			AH->currentTE = te;
 
 			reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
-			if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
+			if (((reqs & (REQ_SCHEMA|REQ_DATA)) != 0) && te->dropStmt)
 			{
 				/* We want the schema */
 				ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
@@ -381,7 +381,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
 		/* Work out what, if anything, we want from this entry */
 		reqs = _tocEntryRequired(te, ropt, true);
 
-		if ((reqs & REQ_SCHEMA) != 0)	/* We want the schema */
+		/* Access privileges for both of schema and data */
+		if ((reqs & (REQ_SCHEMA|REQ_DATA)) != 0)
 		{
 			ahlog(AH, 1, "setting owner and privileges for %s %s\n",
 				  te->desc, te->tag);
@@ -520,6 +521,7 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
 				_printTocEntry(AH, te, ropt, true, false);
 
 				if (strcmp(te->desc, "BLOBS") == 0 ||
+					strcmp(te->desc, "BLOB DATA") == 0 ||
 					strcmp(te->desc, "BLOB COMMENTS") == 0)
 				{
 					ahlog(AH, 1, "restoring %s\n", te->desc);
@@ -903,7 +905,7 @@ EndRestoreBlobs(ArchiveHandle *AH)
  * Called by a format handler to initiate restoration of a blob
  */
 void
-StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
+StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop, bool compat)
 {
 	Oid			loOid;
 
@@ -919,19 +921,24 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
 
 	if (AH->connection)
 	{
-		loOid = lo_create(AH->connection, oid);
-		if (loOid == 0 || loOid != oid)
-			die_horribly(AH, modulename, "could not create large object %u\n",
-						 oid);
-
+		if (compat)
+		{
+			loOid = lo_create(AH->connection, oid);
+			if (loOid == 0 || loOid != oid)
+				die_horribly(AH, modulename, "could not create large object %u\n",
+							 oid);
+		}
 		AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
 		if (AH->loFd == -1)
 			die_horribly(AH, modulename, "could not open large object\n");
 	}
 	else
 	{
-		ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
-				 oid, INV_WRITE);
+		if (compat)
+			ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n", oid, INV_WRITE);
+		else
+			ahprintf(AH, "SELECT pg_catalog.lo_open(%u, %d);\n",
+					 oid, INV_WRITE);
 	}
 
 	AH->writingBlob = 1;
@@ -1940,7 +1947,8 @@ WriteDataChunks(ArchiveHandle *AH)
 			AH->currToc = te;
 			/* printf("Writing data for %d (%x)\n", te->id, te); */
 
-			if (strcmp(te->desc, "BLOBS") == 0)
+			if (strcmp(te->desc, "BLOBS") == 0 ||
+				strcmp(te->desc, "BLOB DATA") == 0)
 			{
 				startPtr = AH->StartBlobsPtr;
 				endPtr = AH->EndBlobsPtr;
@@ -2077,6 +2085,7 @@ ReadToc(ArchiveHandle *AH)
 				te->section = SECTION_NONE;
 			else if (strcmp(te->desc, "TABLE DATA") == 0 ||
 					 strcmp(te->desc, "BLOBS") == 0 ||
+					 strcmp(te->desc, "BLOB DATA") == 0 ||
 					 strcmp(te->desc, "BLOB COMMENTS") == 0)
 				te->section = SECTION_DATA;
 			else if (strcmp(te->desc, "CONSTRAINT") == 0 ||
@@ -2286,9 +2295,14 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
 	if (!te->hadDumper)
 	{
 		/*
-		 * Special Case: If 'SEQUENCE SET' then it is considered a data entry
+		 * Special Case: If 'SEQUENCE SET', 'BLOB ITEM' or 'ACL' for large
+		 * objects, then it is considered a data entry
+		 *
+		 * XXX - we assume te->tag is not numeric except for large objects.
 		 */
-		if (strcmp(te->desc, "SEQUENCE SET") == 0)
+		if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
+			strcmp(te->desc, "BLOB ITEM") == 0 ||
+			(strcmp(te->desc, "ACL") == 0 && atooid(te->tag) > 0))
 			res = res & REQ_DATA;
 		else
 			res = res & ~REQ_DATA;
@@ -2713,6 +2727,13 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
 		return;
 	}
 
+	/* Use ALTER LARGE OBJECT for BLOB ITEM */
+	if (strcmp(type, "BLOB ITEM") == 0)
+	{
+		appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
+		return;
+	}
+
 	write_msg(modulename, "WARNING: don't know how to set owner for object type %s\n",
 			  type);
 }
@@ -2824,6 +2845,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
 		strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
 	{
 		if (strcmp(te->desc, "AGGREGATE") == 0 ||
+			strcmp(te->desc, "BLOB ITEM") == 0 ||
 			strcmp(te->desc, "CONVERSION") == 0 ||
 			strcmp(te->desc, "DATABASE") == 0 ||
 			strcmp(te->desc, "DOMAIN") == 0 ||
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index c09cec5..ae3589a 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -62,7 +62,7 @@ typedef z_stream *z_streamp;
 #endif
 
 #define K_VERS_MAJOR 1
-#define K_VERS_MINOR 11
+#define K_VERS_MINOR 12
 #define K_VERS_REV 0
 
 /* Data block types */
@@ -87,8 +87,10 @@ typedef z_stream *z_streamp;
 #define K_VERS_1_10 (( (1 * 256 + 10) * 256 + 0) * 256 + 0)		/* add tablespace */
 #define K_VERS_1_11 (( (1 * 256 + 11) * 256 + 0) * 256 + 0)		/* add toc section
 																 * indicator */
-
-#define K_VERS_MAX (( (1 * 256 + 11) * 256 + 255) * 256 + 0)
+#define K_VERS_1_12 (( (1 * 256 + 12) * 256 + 0) * 256 + 0)		/* BLOB ITEM &
+																 * BLOB DATA
+																 * sections */
+#define K_VERS_MAX (( (1 * 256 + 12) * 256 + 255) * 256 + 0)
 
 
 /* Flags to indicate disposition of offsets stored in files */
@@ -359,7 +361,7 @@ int			ReadOffset(ArchiveHandle *, pgoff_t *);
 size_t		WriteOffset(ArchiveHandle *, pgoff_t, int);
 
 extern void StartRestoreBlobs(ArchiveHandle *AH);
-extern void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop);
+extern void StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop, bool compat);
 extern void EndRestoreBlob(ArchiveHandle *AH, Oid oid);
 extern void EndRestoreBlobs(ArchiveHandle *AH);
 
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index ea16c0b..fc815cf 100644
--- a/src/bin/pg_dump/pg_backup_custom.c
+++ b/src/bin/pg_dump/pg_backup_custom.c
@@ -54,7 +54,7 @@ static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
 static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
 static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
 static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
-static void _LoadBlobs(ArchiveHandle *AH, bool drop);
+static void _LoadBlobs(ArchiveHandle *AH, bool drop, bool compat);
 static void _Clone(ArchiveHandle *AH);
 static void _DeClone(ArchiveHandle *AH);
 
@@ -498,7 +498,10 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
 			break;
 
 		case BLK_BLOBS:
-			_LoadBlobs(AH, ropt->dropSchema);
+			if (strcmp(te->desc, "BLOBS") == 0)
+				_LoadBlobs(AH, ropt->dropSchema, true);
+			else
+				_LoadBlobs(AH, false, false);
 			break;
 
 		default:				/* Always have a default */
@@ -619,7 +622,7 @@ _PrintData(ArchiveHandle *AH)
 }
 
 static void
-_LoadBlobs(ArchiveHandle *AH, bool drop)
+_LoadBlobs(ArchiveHandle *AH, bool drop, bool compat)
 {
 	Oid			oid;
 
@@ -628,7 +631,7 @@ _LoadBlobs(ArchiveHandle *AH, bool drop)
 	oid = ReadInt(AH);
 	while (oid != 0)
 	{
-		StartRestoreBlob(AH, oid, drop);
+		StartRestoreBlob(AH, oid, drop, compat);
 		_PrintData(AH);
 		EndRestoreBlob(AH, oid);
 		oid = ReadInt(AH);
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index e557290..23384bd 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -12,6 +12,7 @@
 
 #include "pg_backup_db.h"
 #include "dumputils.h"
+#include "libpq/libpq-fs.h"
 
 #include <unistd.h>
 
@@ -704,17 +705,14 @@ void
 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
 {
 	/* Call lo_unlink only if exists to avoid not-found error. */
-	if (PQserverVersion(AH->connection) >= 90000)
-	{
-		ahprintf(AH, "SELECT pg_catalog.lo_unlink(oid) "
-					 "FROM pg_catalog.pg_largeobject_metadata "
-					 "WHERE oid = %u;\n", oid);
-	}
-	else
-	{
-		ahprintf(AH, "SELECT CASE WHEN EXISTS(SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u') THEN pg_catalog.lo_unlink('%u') END;\n",
-				 oid, oid);
-	}
+	if (AH->connection &&
+		PQserverVersion(AH->connection) < 90000)
+		die_horribly(AH, NULL,
+					 "could not restore large object into older server");
+
+	ahprintf(AH, "SELECT pg_catalog.lo_unlink(oid) "
+			 "FROM pg_catalog.pg_largeobject_metadata "
+			 "WHERE oid = %u;\n", oid);
 }
 
 static bool
diff --git a/src/bin/pg_dump/pg_backup_files.c b/src/bin/pg_dump/pg_backup_files.c
index 1faac0a..6406fcb 100644
--- a/src/bin/pg_dump/pg_backup_files.c
+++ b/src/bin/pg_dump/pg_backup_files.c
@@ -66,7 +66,7 @@ typedef struct
 } lclTocEntry;
 
 static const char *modulename = gettext_noop("file archiver");
-static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
+static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt, bool compat);
 static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname);
 
 /*
@@ -330,7 +330,9 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
 		return;
 
 	if (strcmp(te->desc, "BLOBS") == 0)
-		_LoadBlobs(AH, ropt);
+		_LoadBlobs(AH, ropt, true);
+	else if (strcmp(te->desc, "BLOB DATA") == 0)
+		_LoadBlobs(AH, ropt, false);
 	else
 		_PrintFileData(AH, tctx->filename, ropt);
 }
@@ -365,10 +367,11 @@ _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char fname[K_STD_BUF_SIZE])
 }
 
 static void
-_LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
+_LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt, bool compat)
 {
 	Oid			oid;
 	lclContext *ctx = (lclContext *) AH->formatData;
+	char		drop = (compat ? ropt->dropSchema : false);
 	char		fname[K_STD_BUF_SIZE];
 
 	StartRestoreBlobs(AH);
@@ -382,7 +385,7 @@ _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
 
 	while (oid != 0)
 	{
-		StartRestoreBlob(AH, oid, ropt->dropSchema);
+		StartRestoreBlob(AH, oid, drop, compat);
 		_PrintFileData(AH, fname, ropt);
 		EndRestoreBlob(AH, oid);
 		_getBlobTocEntry(AH, &oid, fname);
diff --git a/src/bin/pg_dump/pg_backup_null.c b/src/bin/pg_dump/pg_backup_null.c
index 4217210..0c1b693 100644
--- a/src/bin/pg_dump/pg_backup_null.c
+++ b/src/bin/pg_dump/pg_backup_null.c
@@ -147,14 +147,19 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
 static void
 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
+	bool	compat = (strcmp(te->desc, "BLOBS") == 0 ? true : false);
+
 	if (oid == 0)
 		die_horribly(AH, NULL, "invalid OID for large object\n");
 
-	if (AH->ropt->dropSchema)
+	if (compat && AH->ropt->dropSchema)
 		DropBlobIfExists(AH, oid);
 
-	ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
-			 oid, INV_WRITE);
+	if (compat)
+		ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+				 oid, INV_WRITE);
+	else
+		ahprintf(AH, "SELECT pg_catalog.lo_open(%u, %d);\n", oid, INV_WRITE);
 
 	AH->WriteDataPtr = _WriteBlobData;
 }
@@ -195,12 +200,14 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
 	{
 		AH->currToc = te;
 
-		if (strcmp(te->desc, "BLOBS") == 0)
+		if (strcmp(te->desc, "BLOBS") == 0 ||
+			strcmp(te->desc, "BLOB DATA") == 0)
 			_StartBlobs(AH, te);
 
 		(*te->dataDumper) ((Archive *) AH, te->dataDumperArg);
 
-		if (strcmp(te->desc, "BLOBS") == 0)
+		if (strcmp(te->desc, "BLOBS") == 0 ||
+			strcmp(te->desc, "BLOB DATA") == 0)
 			_EndBlobs(AH, te);
 
 		AH->currToc = NULL;
diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c
index 5cbc365..8cda3e1 100644
--- a/src/bin/pg_dump/pg_backup_tar.c
+++ b/src/bin/pg_dump/pg_backup_tar.c
@@ -100,7 +100,7 @@ typedef struct
 
 static const char *modulename = gettext_noop("tar archiver");
 
-static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
+static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt, bool compat);
 
 static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
 static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH);
@@ -696,19 +696,22 @@ _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
 	}
 
 	if (strcmp(te->desc, "BLOBS") == 0)
-		_LoadBlobs(AH, ropt);
+		_LoadBlobs(AH, ropt, true);
+	else if (strcmp(te->desc, "BLOB DATA") == 0)
+		_LoadBlobs(AH, ropt, false);
 	else
 		_PrintFileData(AH, tctx->filename, ropt);
 }
 
 static void
-_LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
+_LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt, bool compat)
 {
 	Oid			oid;
 	lclContext *ctx = (lclContext *) AH->formatData;
 	TAR_MEMBER *th;
 	size_t		cnt;
 	bool		foundBlob = false;
+	bool		drop = (compat ? ropt->dropSchema : false);
 	char		buf[4096];
 
 	StartRestoreBlobs(AH);
@@ -725,7 +728,7 @@ _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
 			{
 				ahlog(AH, 1, "restoring large object OID %u\n", oid);
 
-				StartRestoreBlob(AH, oid, ropt->dropSchema);
+				StartRestoreBlob(AH, oid, drop, compat);
 
 				while ((cnt = tarRead(buf, 4095, th)) > 0)
 				{
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 428b962..ea95d7d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -190,9 +190,9 @@ static void selectSourceSchema(const char *schemaName);
 static char *getFormattedTypeName(Oid oid, OidOptions opts);
 static char *myFormatType(const char *typname, int32 typmod);
 static const char *fmtQualifiedId(const char *schema, const char *id);
-static bool hasBlobs(Archive *AH);
-static int	dumpBlobs(Archive *AH, void *arg);
-static int	dumpBlobComments(Archive *AH, void *arg);
+static void getBlobs(Archive *AH);
+static void dumpBlobItem(Archive *AH, BlobInfo *binfo);
+static int  dumpBlobData(Archive *AH, void *arg);
 static void dumpDatabase(Archive *AH);
 static void dumpEncoding(Archive *AH);
 static void dumpStdStrings(Archive *AH);
@@ -701,25 +701,8 @@ main(int argc, char **argv)
 			getTableDataFKConstraints();
 	}
 
-	if (outputBlobs && hasBlobs(g_fout))
-	{
-		/* Add placeholders to allow correct sorting of blobs */
-		DumpableObject *blobobj;
-		DumpableObject *blobcobj;
-
-		blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
-		blobobj->objType = DO_BLOBS;
-		blobobj->catId = nilCatalogId;
-		AssignDumpId(blobobj);
-		blobobj->name = strdup("BLOBS");
-
-		blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
-		blobcobj->objType = DO_BLOB_COMMENTS;
-		blobcobj->catId = nilCatalogId;
-		AssignDumpId(blobcobj);
-		blobcobj->name = strdup("BLOB COMMENTS");
-		addObjectDependency(blobcobj, blobobj->dumpId);
-	}
+	if (outputBlobs)
+		getBlobs(g_fout);
 
 	/*
 	 * Collect dependency data to assist in ordering the objects.
@@ -1938,43 +1921,149 @@ dumpStdStrings(Archive *AH)
 
 
 /*
- * hasBlobs:
+ * getBlobs:
  *	Test whether database contains any large objects
  */
-static bool
-hasBlobs(Archive *AH)
+static void
+getBlobs(Archive *AH)
 {
-	bool		result;
-	const char *blobQry;
-	PGresult   *res;
+	PQExpBuffer		blobQry = createPQExpBuffer();
+	BlobInfo	   *binfo;
+	DumpableObject *bdata;
+	PGresult	   *res;
+	int				i;
+
+	/* Verbose message */
+	if (g_verbose)
+		write_msg(NULL, "reading binary large objects\n");
 
 	/* Make sure we are in proper schema */
 	selectSourceSchema("pg_catalog");
 
 	/* Check for BLOB OIDs */
 	if (AH->remoteVersion >= 90000)
-		blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
+		appendPQExpBuffer(blobQry,
+						  "SELECT oid, (%s lomowner), lomacl,"
+						  " obj_description(oid, 'pg_largeobject')"
+						  " FROM pg_largeobject_metadata",
+						  username_subquery);
+	else if (AH->remoteVersion >= 70200)
+		appendPQExpBuffer(blobQry,
+						  "SELECT DISTINCT loid, NULL, NULL,"
+						  " obj_description(loid, 'pg_largeobject')"
+						  " FROM pg_largeobject");
 	else if (AH->remoteVersion >= 70100)
-		blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
+		appendPQExpBuffer(blobQry,
+						  "SELECT DISTINCT loid, NULL, NULL,"
+						  " obj_description(loid)"
+						  " FROM pg_largeobject");
 	else
-		blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
+		appendPQExpBuffer(blobQry,
+						  "SELECT DISTINCT oid, NULL, NULL,"
+						  " obj_description(oid)"
+						  " FROM pg_class WHERE relkind = 'l'");
 
-	res = PQexec(g_conn, blobQry);
-	check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
+	res = PQexec(g_conn, blobQry->data);
+	check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK);
 
-	result = PQntuples(res) > 0;
+	/*
+	 * Now, a large object has its own "BLOB ITEM" section to
+	 * declare itself.
+	 */
+	for (i = 0; i < PQntuples(res); i++)
+	{
+		binfo = (BlobInfo *) malloc(sizeof(BlobInfo));
+		binfo->dobj.objType = DO_BLOB_ITEM;
+		binfo->dobj.catId = nilCatalogId;
+		AssignDumpId(&binfo->dobj);
+
+		binfo->dobj.name = strdup(PQgetvalue(res, i, 0));
+		binfo->rolname = strdup(PQgetvalue(res, i, 1));
+		binfo->blobacl = strdup(PQgetvalue(res, i, 2));
+		binfo->blobdescr = strdup(PQgetvalue(res, i, 3));
+	}
+
+	/*
+	 * If we have a large object at least, "BLOB DATA" section
+	 * is also necessary.
+	 */
+	if (PQntuples(res) > 0)
+	{
+		bdata = (DumpableObject *) malloc(sizeof(DumpableObject));
+		bdata->objType = DO_BLOB_DATA;
+		bdata->catId = nilCatalogId;
+		AssignDumpId(bdata);
+		bdata->name = strdup("BLOBS");
+	}
 
 	PQclear(res);
+}
 
-	return result;
+/*
+ * dumpBlobItem
+ *
+ * dump a definition of the given large object
+ */
+static void
+dumpBlobItem(Archive *AH, BlobInfo *binfo)
+{
+	PQExpBuffer		aquery = createPQExpBuffer();
+	PQExpBuffer		bquery = createPQExpBuffer();
+	PQExpBuffer		dquery = createPQExpBuffer();
+
+	/*
+	 * Cleanup a large object
+	 */
+	appendPQExpBuffer(dquery,
+					  "SELECT pg_catalog.lo_unlink(oid) "
+					  "FROM pg_catalog.pg_largeobject_metadata "
+					  "WHERE oid = %s;\n", binfo->dobj.name);
+	/*
+	 * Create an empty large object
+	 */
+	appendPQExpBuffer(bquery,
+					  "SELECT pg_catalog.lo_create(%s);\n",
+					  binfo->dobj.name);
+	/*
+	 * Create a comment on large object, if necessary
+	 */
+	if (strlen(binfo->blobdescr) > 0)
+	{
+		appendPQExpBuffer(bquery, "\nCOMMENT ON LARGE OBJECT %s IS ",
+						  binfo->dobj.name);
+		appendStringLiteralAH(bquery, binfo->blobdescr, AH);
+		appendPQExpBuffer(bquery, ";\n");
+	}
+
+	ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
+				 binfo->dobj.name,
+				 NULL, NULL,
+				 binfo->rolname, false,
+				 "BLOB ITEM", SECTION_DATA,
+				 bquery->data, dquery->data, NULL,
+				 binfo->dobj.dependencies, binfo->dobj.nDeps,
+				 NULL, NULL);
+
+	/*
+	 * Dump access privileges, if necessary
+	 */
+	dumpACL(AH, binfo->dobj.catId, binfo->dobj.dumpId,
+			"LARGE OBJECT",
+			binfo->dobj.name, NULL,
+			binfo->dobj.name, NULL,
+			binfo->rolname, binfo->blobacl);
+
+	destroyPQExpBuffer(aquery);
+	destroyPQExpBuffer(bquery);
+	destroyPQExpBuffer(dquery);
 }
 
 /*
- * dumpBlobs:
- *	dump all blobs
+ * dumpBlobData:
+ *	dump all the data contents of large object
  */
 static int
-dumpBlobs(Archive *AH, void *arg)
+dumpBlobData(Archive *AH, void *arg)
 {
 	const char *blobQry;
 	const char *blobFetchQry;
@@ -2022,7 +2111,7 @@ dumpBlobs(Archive *AH, void *arg)
 			loFd = lo_open(g_conn, blobOid, INV_READ);
 			if (loFd == -1)
 			{
-				write_msg(NULL, "dumpBlobs(): could not open large object: %s",
+				write_msg(NULL, "dumpBlobData(): could not open large object: %s",
 						  PQerrorMessage(g_conn));
 				exit_nicely();
 			}
@@ -2035,7 +2124,7 @@ dumpBlobs(Archive *AH, void *arg)
 				cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE);
 				if (cnt < 0)
 				{
-					write_msg(NULL, "dumpBlobs(): error reading large object: %s",
+					write_msg(NULL, "dumpBlobData(): error reading large object: %s",
 							  PQerrorMessage(g_conn));
 					exit_nicely();
 				}
@@ -2054,134 +2143,6 @@ dumpBlobs(Archive *AH, void *arg)
 	return 1;
 }
 
-/*
- * dumpBlobComments
- *	dump all blob properties.
- *  It has "BLOB COMMENTS" tag due to the historical reason, but note
- *  that it is the routine to dump all the properties of blobs.
- *
- * Since we don't provide any way to be selective about dumping blobs,
- * there's no need to be selective about their comments either.  We put
- * all the comments into one big TOC entry.
- */
-static int
-dumpBlobComments(Archive *AH, void *arg)
-{
-	const char *blobQry;
-	const char *blobFetchQry;
-	PQExpBuffer cmdQry = createPQExpBuffer();
-	PGresult   *res;
-	int			i;
-
-	if (g_verbose)
-		write_msg(NULL, "saving large object properties\n");
-
-	/* Make sure we are in proper schema */
-	selectSourceSchema("pg_catalog");
-
-	/* Cursor to get all BLOB comments */
-	if (AH->remoteVersion >= 90000)
-		blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
-			"obj_description(oid, 'pg_largeobject'), "
-			"pg_get_userbyid(lomowner), lomacl "
-			"FROM pg_largeobject_metadata";
-	else if (AH->remoteVersion >= 70300)
-		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-			"obj_description(loid, 'pg_largeobject'), NULL, NULL "
-			"FROM (SELECT DISTINCT loid FROM "
-			"pg_description d JOIN pg_largeobject l ON (objoid = loid) "
-			"WHERE classoid = 'pg_largeobject'::regclass) ss";
-	else if (AH->remoteVersion >= 70200)
-		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-			"obj_description(loid, 'pg_largeobject'), NULL, NULL "
-			"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
-	else if (AH->remoteVersion >= 70100)
-		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-			"obj_description(loid), NULL, NULL "
-			"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
-	else
-		blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
-			"	( "
-			"		SELECT description "
-			"		FROM pg_description pd "
-			"		WHERE pd.objoid=pc.oid "
-			"	), NULL, NULL "
-			"FROM pg_class pc WHERE relkind = 'l'";
-
-	res = PQexec(g_conn, blobQry);
-	check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
-
-	/* Command to fetch from cursor */
-	blobFetchQry = "FETCH 100 IN blobcmt";
-
-	do
-	{
-		PQclear(res);
-
-		/* Do a fetch */
-		res = PQexec(g_conn, blobFetchQry);
-		check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
-
-		/* Process the tuples, if any */
-		for (i = 0; i < PQntuples(res); i++)
-		{
-			Oid			blobOid = atooid(PQgetvalue(res, i, 0));
-			char	   *lo_comment = PQgetvalue(res, i, 1);
-			char	   *lo_owner = PQgetvalue(res, i, 2);
-			char	   *lo_acl = PQgetvalue(res, i, 3);
-			char		lo_name[32];
-
-			resetPQExpBuffer(cmdQry);
-
-			/* comment on the blob */
-			if (!PQgetisnull(res, i, 1))
-			{
-				appendPQExpBuffer(cmdQry,
-								  "COMMENT ON LARGE OBJECT %u IS ", blobOid);
-				appendStringLiteralAH(cmdQry, lo_comment, AH);
-				appendPQExpBuffer(cmdQry, ";\n");
-			}
-
-			/* dump blob ownership, if necessary */
-			if (!PQgetisnull(res, i, 2))
-			{
-				appendPQExpBuffer(cmdQry,
-								  "ALTER LARGE OBJECT %u OWNER TO %s;\n",
-								  blobOid, lo_owner);
-			}
-
-			/* dump blob privileges, if necessary */
-			if (!PQgetisnull(res, i, 3) &&
-				!dataOnly && !aclsSkip)
-			{
-				snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
-				if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
-									  lo_acl, lo_owner, "",
-									  AH->remoteVersion, cmdQry))
-				{
-					write_msg(NULL, "could not parse ACL (%s) for "
-							  "large object %u", lo_acl, blobOid);
-					exit_nicely();
-				}
-			}
-
-			if (cmdQry->len > 0)
-			{
-				appendPQExpBuffer(cmdQry, "\n");
-				archputs(cmdQry->data, AH);
-			}
-		}
-	} while (PQntuples(res) > 0);
-
-	PQclear(res);
-
-	archputs("\n", AH);
-
-	destroyPQExpBuffer(cmdQry);
-
-	return 1;
-}
-
 static void
 binary_upgrade_set_type_oids_by_type_oid(PQExpBuffer upgrade_buffer,
 											   Oid pg_type_oid)
@@ -6530,21 +6491,16 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_DEFAULT_ACL:
 			dumpDefaultACL(fout, (DefaultACLInfo *) dobj);
 			break;
-		case DO_BLOBS:
-			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
-						 dobj->name, NULL, NULL, "",
-						 false, "BLOBS", SECTION_DATA,
-						 "", "", NULL,
-						 dobj->dependencies, dobj->nDeps,
-						 dumpBlobs, NULL);
+		case DO_BLOB_ITEM:
+			dumpBlobItem(fout, (BlobInfo *) dobj);
 			break;
-		case DO_BLOB_COMMENTS:
+		case DO_BLOB_DATA:
 			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
 						 dobj->name, NULL, NULL, "",
-						 false, "BLOB COMMENTS", SECTION_DATA,
+						 false, "BLOB DATA", SECTION_DATA,
 						 "", "", NULL,
 						 dobj->dependencies, dobj->nDeps,
-						 dumpBlobComments, NULL);
+						 dumpBlobData, NULL);
 			break;
 	}
 }
@@ -10400,8 +10356,13 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId,
 {
 	PQExpBuffer sql;
 
-	/* Do nothing if ACL dump is not enabled */
-	if (dataOnly || aclsSkip)
+	/*
+	 * Do nothing if ACL dump is not enabled
+	 *
+	 * Note that the caller has to check necessity to dump ACLs
+	 * depending on --data-only / --schema-only.
+	 */
+	if (aclsSkip)
 		return;
 
 	sql = createPQExpBuffer();
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 1e65fac..3776d84 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -115,8 +115,8 @@ typedef enum
 	DO_FDW,
 	DO_FOREIGN_SERVER,
 	DO_DEFAULT_ACL,
-	DO_BLOBS,
-	DO_BLOB_COMMENTS
+	DO_BLOB_DATA,
+	DO_BLOB_ITEM,
 } DumpableObjectType;
 
 typedef struct _dumpableObject
@@ -443,6 +443,14 @@ typedef struct _defaultACLInfo
 	char	   *defaclacl;
 } DefaultACLInfo;
 
+typedef struct _blobInfo
+{
+	DumpableObject	dobj;
+	char	   *rolname;
+	char	   *blobacl;
+	char	   *blobdescr;
+} BlobInfo;
+
 /* global decls */
 extern bool force_quotes;		/* double-quotes for identifiers flag */
 extern bool g_verbose;			/* verbose flag */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index d67afd0..6d5ccaa 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -92,8 +92,8 @@ static const int newObjectTypePriority[] =
 	14,							/* DO_FDW */
 	15,							/* DO_FOREIGN_SERVER */
 	27,							/* DO_DEFAULT_ACL */
-	20,							/* DO_BLOBS */
-	21							/* DO_BLOB_COMMENTS */
+	21,							/* DO_BLOB_DATA */
+	20,							/* DO_BLOB_ITEM */
 };
 
 
@@ -1157,14 +1157,14 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "DEFAULT ACL %s  (ID %d OID %u)",
 					 obj->name, obj->dumpId, obj->catId.oid);
 			return;
-		case DO_BLOBS:
+		case DO_BLOB_DATA:
 			snprintf(buf, bufsize,
-					 "BLOBS  (ID %d)",
+					 "BLOB DATA  (ID %d)",
 					 obj->dumpId);
 			return;
-		case DO_BLOB_COMMENTS:
+		case DO_BLOB_ITEM:
 			snprintf(buf, bufsize,
-					 "BLOB COMMENTS  (ID %d)",
+					 "BLOB ITEM  (ID %d)",
 					 obj->dumpId);
 			return;
 	}