copy with compression progress n

Started by Andreas Pflugover 19 years ago6 messages
#1Andreas Pflug
pgadmin@pse-consulting.de
1 attachment(s)

I've been playing around with COPYing large binary data, and implemented
a COMPRESSION transfer format. The server side compression saves
significant bandwidth, which may be the major limiting factor when large
amounts of data is involved (i.e. in many cases where COPY TO/FROM
STDIN/STDOUT is used)
In addition, a progress notification can be enabled using a PROGRESS
<each n lines> option.

I tested this with a table, containing 2000 rows with a highly
compressable bytea column (size 1.4GB, on-disk 138MB). Numbers are as
follows (8.2 HEAD psql):
pg_dump -a -F c -t 652s, 146MB
\copy TO /dev/null 322s
\copy TO /dev/null binary 24s
\copy TO /dev/null compression 108s
\copy TO /tmp/file binary 55s, 1.4GB
\copy TO /tmp/file compression 108s, 133MB
\copy TO STDOUT binary|gzip -1 69s, 117MB

So using the plain text copy has a large overhead for text data over
binary formats. OTOH, copying normal rows WITH BINARY may bloat the
result too. A typical test table gave these numbers:
COPY: 6014 Bytes
BINARY: 15071 Bytes
COMPRESSION: 2334 Bytes

The compression (pg_lzcompress) is less efficient than a binary copy
piped to gzip, as long as the data transfer of 1.4GB from server to
client isn't limited by network bandwidth. Apparently, pg_lzcompress
uses 53s to compress to 133MB, while gzip only needs 14s for 117MB.
Might be worth to have a look optimizing that since it's used in
tuptoaster. Still, when network traffic is involved, it may be better to
have some time spent on the server to reduce data (e.g. for Slony, which
uses COPY to start a replication, and is likely to be operated over
lines <1GBit/s).

The attached patch implements COPY ... WITH [BINARY] COMPRESSION
(compression implies BINARY). The copy data uses bit 17 of the flag
field to identify compressed data.
The PROGRESS <n> option to throw notices each n lines has a caveat: when
copying TO STDOUT, data transfer will cease after the first notice was
sent. This may either mean "dont ereport(NOTICE) when COPYing data to
the client" or a bug somewhere.

Regards,
Andreas

Attachments:

copy-compression.patchtext/plain; name=copy-compression.patchDownload
Index: src/backend/commands/copy.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/copy.c,v
retrieving revision 1.266
diff -c -r1.266 copy.c
*** src/backend/commands/copy.c	26 May 2006 22:50:02 -0000	1.266
--- src/backend/commands/copy.c	31 May 2006 08:52:42 -0000
***************
*** 47,53 ****
  #include "utils/memutils.h"
  #include "utils/relcache.h"
  #include "utils/syscache.h"
! 
  
  #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
  #define OCTVALUE(c) ((c) - '0')
--- 47,53 ----
  #include "utils/memutils.h"
  #include "utils/relcache.h"
  #include "utils/syscache.h"
! #include "utils/pg_lzcompress.h"
  
  #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
  #define OCTVALUE(c) ((c) - '0')
***************
*** 103,114 ****
--- 103,121 ----
  	int			client_encoding;	/* remote side's character encoding */
  	bool		need_transcoding;		/* client encoding diff from server? */
  	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
+     bool        do_compress;    /* compress data before writing to output */
+     bool        do_flush;       /* flush fe_msgbuf to copy target file/pipe */
+     bool        use_raw_buf;    /* use raw buffered data for CopyGetData */
  	uint64		processed;		/* # of tuples processed */
+ 	uint64		progress;		/* progress notice each # tuples processed */
+     
+ 	MemoryContext oldcontext;
  
  	/* parameters from the COPY command */
  	Relation	rel;			/* relation to copy to or from */
  	List	   *attnumlist;		/* integer list of attnums to copy */
  	bool		binary;			/* binary format? */
+ 	bool		compression;	/* binary compressed format? */
  	bool		oids;			/* include OIDs? */
  	bool		csv_mode;		/* Comma Separated Value format? */
  	bool		header_line;	/* CSV header line? */
***************
*** 153,162 ****
  	 * converts it.  Note: we guarantee that there is a \0 at
  	 * raw_buf[raw_buf_len].
  	 */
! #define RAW_BUF_SIZE 65536		/* we palloc RAW_BUF_SIZE+1 bytes */
  	char	   *raw_buf;
  	int			raw_buf_index;	/* next byte to process */
  	int			raw_buf_len;	/* total # of bytes stored */
  } CopyStateData;
  
  typedef CopyStateData *CopyState;
--- 160,170 ----
  	 * converts it.  Note: we guarantee that there is a \0 at
  	 * raw_buf[raw_buf_len].
  	 */
! #define RAW_BUF_SIZE 65536		/* initially, we palloc RAW_BUF_SIZE+1 bytes */
  	char	   *raw_buf;
  	int			raw_buf_index;	/* next byte to process */
  	int			raw_buf_len;	/* total # of bytes stored */
+     int         raw_buf_size;   /* actual raw_buf_size */
  } CopyStateData;
  
  typedef CopyStateData *CopyState;
***************
*** 260,265 ****
--- 268,276 ----
  static void CopySendEndOfRow(CopyState cstate);
  static int CopyGetData(CopyState cstate, void *databuf,
  			int minread, int maxread);
+ static bool CopyLoadRawBuf(CopyState cstate);
+ static int  CopyLoadBuf(CopyState cstate, void *databuf,
+ 			int minread, int maxread);
  static void CopySendInt32(CopyState cstate, int32 val);
  static bool CopyGetInt32(CopyState cstate, int32 *val);
  static void CopySendInt16(CopyState cstate, int16 val);
***************
*** 409,442 ****
  static void
  CopySendEndOfRow(CopyState cstate)
  {
  	StringInfo	fe_msgbuf = cstate->fe_msgbuf;
  
! 	switch (cstate->copy_dest)
  	{
! 		case COPY_FILE:
! 			if (!cstate->binary)
! 			{
! 				/* Default line termination depends on platform */
! #ifndef WIN32
! 				CopySendChar(cstate, '\n');
  #else
! 				CopySendString(cstate, "\r\n");
  #endif
! 			}
  
! 			(void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
! 						  1, cstate->copy_file);
  			if (ferror(cstate->copy_file))
  				ereport(ERROR,
  						(errcode_for_file_access(),
  						 errmsg("could not write to COPY file: %m")));
  			break;
  		case COPY_OLD_FE:
! 			/* The FE/BE protocol uses \n as newline for all platforms */
! 			if (!cstate->binary)
! 				CopySendChar(cstate, '\n');
! 
! 			if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
  			{
  				/* no hope of recovering connection sync, so FATAL */
  				ereport(FATAL,
--- 420,497 ----
  static void
  CopySendEndOfRow(CopyState cstate)
  {
+     PGLZ_Header  *tmp=0;
  	StringInfo	fe_msgbuf = cstate->fe_msgbuf;
+ 	
+ 	void *data;
+ 	int len;
+ 	bool writeUncompressed = false;
  
! 	if (!cstate->binary)
  	{
! 		/* Default line termination depends on platform */
! #ifdef WIN32
! 		if (cstate->copy_dest == COPY_FILE)
! 		    CopySendString(cstate, "\r\n");
! 		else
  #else
! 			/* The FE/BE protocol uses \n as newline for all platforms */
! 		    CopySendChar(cstate, '\n');
  #endif
! 	}
! 			
! 
! 	if (cstate->do_compress)
! 	{
! 	    if (!cstate->do_flush && fe_msgbuf->len < RAW_BUF_SIZE)
! 		{
! 			/* Wait for some more data until we compress and write out */
! 			return;
! 		}
! 
! 		tmp = (PGLZ_Header *) palloc(PGLZ_MAX_OUTPUT(fe_msgbuf->len));
! 
! #if 1
! 		(void) pglz_compress(fe_msgbuf->data, fe_msgbuf->len, tmp, NULL);
! #else /* simulate non-compressible data, test compression performance */
! 		tmp->varsize = fe_msgbuf->len + sizeof(PGLZ_Header);
! 		tmp->rawsize = fe_msgbuf->len;
! #endif
! 		data = tmp;
! 
! 		if (PGLZ_IS_COMPRESSED(tmp))
! 		    len = tmp->varsize;
! 		else
! 		{
! 			// incompressible data
! 			len = sizeof(PGLZ_Header);
! 			writeUncompressed = true;
! 		}
! 	}
! 	else
! 	{
! 		data = fe_msgbuf->data;
! 		len = fe_msgbuf->len;
! 	}
  
! 	switch (cstate->copy_dest)
! 	{
! 		case COPY_FILE:
! 			(void) fwrite(data, len, 1, cstate->copy_file);
! 			if (ferror(cstate->copy_file))
! 				ereport(ERROR,
! 						(errcode_for_file_access(),
! 						 errmsg("could not write to COPY file: %m")));
! 			if (writeUncompressed)
! 			    (void) fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file);
  			if (ferror(cstate->copy_file))
  				ereport(ERROR,
  						(errcode_for_file_access(),
  						 errmsg("could not write to COPY file: %m")));
  			break;
  		case COPY_OLD_FE:
! 			if (pq_putbytes(data, len) || 
! 				(writeUncompressed && pq_putbytes(fe_msgbuf->data, fe_msgbuf->len)))
  			{
  				/* no hope of recovering connection sync, so FATAL */
  				ereport(FATAL,
***************
*** 445,459 ****
  			}
  			break;
  		case COPY_NEW_FE:
- 			/* The FE/BE protocol uses \n as newline for all platforms */
- 			if (!cstate->binary)
- 				CopySendChar(cstate, '\n');
- 
  			/* Dump the accumulated row as one CopyData message */
! 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
  			break;
  	}
  
  	/* Reset fe_msgbuf to empty */
  	fe_msgbuf->len = 0;
  	fe_msgbuf->data[0] = '\0';
--- 500,515 ----
  			}
  			break;
  		case COPY_NEW_FE:
  			/* Dump the accumulated row as one CopyData message */
! 			(void) pq_putmessage('d', data, len);
! 			if (writeUncompressed)
! 			    (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
  			break;
  	}
  
+ 	if (tmp)
+ 	    pfree(tmp);
+ 
  	/* Reset fe_msgbuf to empty */
  	fe_msgbuf->len = 0;
  	fe_msgbuf->data[0] = '\0';
***************
*** 475,480 ****
--- 531,564 ----
  static int
  CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
  {
+     if (cstate->use_raw_buf)
+ 	{
+ 	    int bytesread = 0;
+ 
+ 		while (bytesread < minread)
+ 		{
+ 		    int nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
+ 			if (nbytes > maxread-bytesread)
+ 			    nbytes = maxread-bytesread;
+ 
+ 			memcpy((char*)databuf + bytesread, cstate->raw_buf + cstate->raw_buf_index, nbytes);
+ 			cstate->raw_buf_index += nbytes;
+ 			bytesread += nbytes;
+ 
+ 			if (bytesread >= minread || !CopyLoadRawBuf(cstate))
+ 			    break;
+ 		}
+ 
+ 		return bytesread;
+ 	}
+ 	else
+ 	    return CopyLoadBuf(cstate, databuf, minread, maxread);
+ }
+ 
+ 
+ static int
+ CopyLoadBuf(CopyState cstate, void *databuf, int minread, int maxread)
+ {
  	int			bytesread = 0;
  
  	switch (cstate->copy_dest)
***************
*** 662,669 ****
  	else
  		nbytes = 0;				/* no data need be saved */
  
! 	inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
! 						  1, RAW_BUF_SIZE - nbytes);
  	nbytes += inbytes;
  	cstate->raw_buf[nbytes] = '\0';
  	cstate->raw_buf_index = 0;
--- 746,816 ----
  	else
  		nbytes = 0;				/* no data need be saved */
  
! 	if (cstate->do_compress)
! 	  {
! 		PGLZ_Header pglzHdr;
! 		inbytes = CopyLoadBuf(cstate, &pglzHdr, sizeof(PGLZ_Header), sizeof(PGLZ_Header));
! 
! 		if (inbytes != sizeof(PGLZ_Header))
! 		{
! 			ereport(ERROR,
! 					(errcode(ERRCODE_CONNECTION_FAILURE),
! 					 errmsg("not enough data")));
! 		}
! 		/* make sure raw_buf is big enough */
! 		if (cstate->raw_buf_size < pglzHdr.rawsize + nbytes)
! 		{
! 		    char *newbuf;
! 			MemoryContext rowContext;
! 
! 			cstate->raw_buf_size = pglzHdr.rawsize + nbytes;
! 
! 			/* raw_buf is allocated statement-wide */
! 		    rowContext = MemoryContextSwitchTo(cstate->oldcontext);
! 			newbuf=palloc(cstate->raw_buf_size+1);
! 		    MemoryContextSwitchTo(rowContext);
! 
! 			if (nbytes > 0)
! 			    memcpy(newbuf, cstate->raw_buf, nbytes);
! 
! 			pfree(cstate->raw_buf);
! 			cstate->raw_buf = newbuf;
! 		}
! 
! 		if (PGLZ_IS_COMPRESSED(&pglzHdr))
! 		{
! 			PGLZ_Header *tmp = (PGLZ_Header*)palloc(pglzHdr.varsize);
! 			memcpy(tmp, &pglzHdr, sizeof(PGLZ_Header));
! 
! 		    inbytes = CopyLoadBuf(cstate, (char*)tmp + sizeof(PGLZ_Header),
! 								  pglzHdr.varsize - sizeof(PGLZ_Header), pglzHdr.varsize - sizeof(PGLZ_Header));
! 			if (inbytes != pglzHdr.varsize-sizeof(PGLZ_Header))
! 			{
! 				ereport(ERROR,
! 						(errcode(ERRCODE_CONNECTION_FAILURE),
! 						 errmsg("not enough data")));
! 			}
! 			pglz_decompress(tmp, cstate->raw_buf + nbytes);
! 			inbytes = pglzHdr.rawsize;
! 		}
! 		else
! 		{
! 			/* not compressed */
! 		    inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes,
! 								  pglzHdr.rawsize, pglzHdr.rawsize);
! 			if (inbytes != pglzHdr.rawsize)
! 			{
! 				ereport(ERROR,
! 						(errcode(ERRCODE_CONNECTION_FAILURE),
! 						 errmsg("not enough data")));
! 			}
! 		}
! 	}
! 	else
! 	{
! 	    inbytes = CopyLoadBuf(cstate, cstate->raw_buf + nbytes,
! 							  cstate->raw_buf_size - nbytes, cstate->raw_buf_size - nbytes);
! 	}
  	nbytes += inbytes;
  	cstate->raw_buf[nbytes] = '\0';
  	cstate->raw_buf_index = 0;
***************
*** 733,738 ****
--- 880,902 ----
  						 errmsg("conflicting or redundant options")));
  			cstate->binary = intVal(defel->arg);
  		}
+ 		else if (strcmp(defel->defname, "compression") == 0)
+ 		{
+ 			if (cstate->compression)
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_SYNTAX_ERROR),
+ 						 errmsg("conflicting or redundant options")));
+ 			cstate->compression = intVal(defel->arg);
+ 			cstate->binary = intVal(defel->arg);
+ 		}
+ 		else if (strcmp(defel->defname, "progress") == 0)
+ 		{
+ 			if (cstate->progress)
+ 				ereport(ERROR,
+ 						(errcode(ERRCODE_SYNTAX_ERROR),
+ 						 errmsg("conflicting or redundant options")));
+ 			cstate->progress = intVal(defel->arg);
+ 		}
  		else if (strcmp(defel->defname, "oids") == 0)
  		{
  			if (cstate->oids)
***************
*** 1009,1015 ****
  	initStringInfo(&cstate->attribute_buf);
  	initStringInfo(&cstate->line_buf);
  	cstate->line_buf_converted = false;
! 	cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
  	cstate->raw_buf_index = cstate->raw_buf_len = 0;
  	cstate->processed = 0;
  
--- 1173,1180 ----
  	initStringInfo(&cstate->attribute_buf);
  	initStringInfo(&cstate->line_buf);
  	cstate->line_buf_converted = false;
! 	cstate->raw_buf_size=RAW_BUF_SIZE;
! 	cstate->raw_buf = (char *) palloc(cstate->raw_buf_size+1);
  	cstate->raw_buf_index = cstate->raw_buf_len = 0;
  	cstate->processed = 0;
  
***************
*** 1274,1287 ****
--- 1439,1462 ----
  
  		/* Signature */
  		CopySendData(cstate, (char *) BinarySignature, 11);
+ 
  		/* Flags field */
  		tmp = 0;
  		if (cstate->oids)
  			tmp |= (1 << 16);
+ 		if (cstate->compression)
+ 		    tmp |= (2 << 16);
  		CopySendInt32(cstate, tmp);
  		/* No header extension */
  		tmp = 0;
  		CopySendInt32(cstate, tmp);
+ 
+ 		if (cstate->compression)
+ 		{
+ 			CopySendEndOfRow(cstate);
+ 			/* from now on, rows will be compressed */
+ 		    cstate->do_compress = true;
+ 		}
  	}
  	else
  	{
***************
*** 1404,1413 ****
  		}
  
  		CopySendEndOfRow(cstate);
- 
  		MemoryContextSwitchTo(oldcontext);
! 		
  		cstate->processed++;
  	}
  
  	heap_endscan(scandesc);
--- 1579,1589 ----
  		}
  
  		CopySendEndOfRow(cstate);
  		MemoryContextSwitchTo(oldcontext);
! 
  		cstate->processed++;
+ 		if (cstate->progress && (cstate->processed % cstate->progress) == 0)
+ 			ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed)));
  	}
  
  	heap_endscan(scandesc);
***************
*** 1417,1422 ****
--- 1593,1599 ----
  		/* Generate trailer for a binary copy */
  		CopySendInt16(cstate, -1);
  		/* Need to flush out the trailer */
+ 		cstate->do_flush = true;
  		CopySendEndOfRow(cstate);
  	}
  
***************
*** 1563,1570 ****
  	int		   *defmap;
  	ExprState **defexprs;		/* array of default att expressions */
  	ExprContext *econtext;		/* used for ExecEvalExpr for default atts */
- 	MemoryContext oldcontext = CurrentMemoryContext;
  	ErrorContextCallback errcontext;
  
  	tupDesc = RelationGetDescr(cstate->rel);
  	attr = tupDesc->attrs;
--- 1740,1747 ----
  	int		   *defmap;
  	ExprState **defexprs;		/* array of default att expressions */
  	ExprContext *econtext;		/* used for ExecEvalExpr for default atts */
  	ErrorContextCallback errcontext;
+ 	cstate->oldcontext = CurrentMemoryContext;
  
  	tupDesc = RelationGetDescr(cstate->rel);
  	attr = tupDesc->attrs;
***************
*** 1677,1683 ****
  					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
  					 errmsg("invalid COPY file header (missing flags)")));
  		file_has_oids = (tmp & (1 << 16)) != 0;
! 		tmp &= ~(1 << 16);
  		if ((tmp >> 16) != 0)
  			ereport(ERROR,
  					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
--- 1854,1861 ----
  					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
  					 errmsg("invalid COPY file header (missing flags)")));
  		file_has_oids = (tmp & (1 << 16)) != 0;
! 		cstate->compression = (tmp & (2 << 16)) != 0;
! 		tmp &= ~(3 << 16);
  		if ((tmp >> 16) != 0)
  			ereport(ERROR,
  					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
***************
*** 1696,1701 ****
--- 1874,1883 ----
  						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
  						 errmsg("invalid COPY file header (wrong length)")));
  		}
+ 
+ 		if (cstate->compression)
+ 		    cstate->do_compress = true;
+ 		cstate->use_raw_buf = true;
  	}
  
  	if (file_has_oids && cstate->binary)
***************
*** 1913,1919 ****
  			HeapTupleSetOid(tuple, loaded_oid);
  
  		/* Triggers and stuff need to be invoked in query context. */
! 		MemoryContextSwitchTo(oldcontext);
  
  		skip_tuple = false;
  
--- 2095,2101 ----
  			HeapTupleSetOid(tuple, loaded_oid);
  
  		/* Triggers and stuff need to be invoked in query context. */
! 		MemoryContextSwitchTo(cstate->oldcontext);
  
  		skip_tuple = false;
  
***************
*** 1958,1970 ****
  			 * tuples inserted by an INSERT command.
  			 */
  			cstate->processed++;
  		}
  	}
  
  	/* Done, clean up */
  	error_context_stack = errcontext.previous;
  
! 	MemoryContextSwitchTo(oldcontext);
  
  	/* Execute AFTER STATEMENT insertion triggers */
  	ExecASInsertTriggers(estate, resultRelInfo);
--- 2140,2159 ----
  			 * tuples inserted by an INSERT command.
  			 */
  			cstate->processed++;
+ 		
+ 			if (cstate->progress && (cstate->processed % cstate->progress) == 0)
+ 			{
+ 				error_context_stack = errcontext.previous;
+ 				ereport(NOTICE, (errmsg("COPY " UINT64_FORMAT, cstate->processed)));
+ 				error_context_stack = &errcontext;
+ 			}
  		}
  	}
  
  	/* Done, clean up */
  	error_context_stack = errcontext.previous;
  
! 	MemoryContextSwitchTo(cstate->oldcontext);
  
  	/* Execute AFTER STATEMENT insertion triggers */
  	ExecASInsertTriggers(estate, resultRelInfo);
Index: src/backend/parser/gram.y
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.545
diff -c -r2.545 gram.y
*** src/backend/parser/gram.y	27 May 2006 17:38:45 -0000	2.545
--- src/backend/parser/gram.y	31 May 2006 08:52:56 -0000
***************
*** 366,372 ****
  	CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P
  	CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
  	CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
! 	COMMITTED CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB
  	CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME
  	CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
  
--- 366,372 ----
  	CACHE CALLED CASCADE CASCADED CASE CAST CHAIN CHAR_P
  	CHARACTER CHARACTERISTICS CHECK CHECKPOINT CLASS CLOSE
  	CLUSTER COALESCE COLLATE COLUMN COMMENT COMMIT
! 	COMMITTED COMPRESSION CONNECTION CONSTRAINT CONSTRAINTS CONVERSION_P CONVERT COPY CREATE CREATEDB
  	CREATEROLE CREATEUSER CROSS CSV CURRENT_DATE CURRENT_ROLE CURRENT_TIME
  	CURRENT_TIMESTAMP CURRENT_USER CURSOR CYCLE
  
***************
*** 408,414 ****
  
  	PARTIAL PASSWORD PLACING POSITION
  	PRECISION PRESERVE PREPARE PREPARED PRIMARY
! 	PRIOR PRIVILEGES PROCEDURAL PROCEDURE
  
  	QUOTE
  
--- 408,414 ----
  
  	PARTIAL PASSWORD PLACING POSITION
  	PRECISION PRESERVE PREPARE PREPARED PRIMARY
! 	PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRESS
  
  	QUOTE
  
***************
*** 1649,1654 ****
--- 1649,1662 ----
  				{
  					$$ = makeDefElem("binary", (Node *)makeInteger(TRUE));
  				}
+ 			| COMPRESSION
+ 				{
+ 					$$ = makeDefElem("compression", (Node *)makeInteger(TRUE));
+ 				}
+ 			| PROGRESS opt_as Iconst
+ 				{
+ 					$$ = makeDefElem("progress", (Node *)makeInteger($3));
+ 				}
  			| OIDS
  				{
  					$$ = makeDefElem("oids", (Node *)makeInteger(TRUE));
***************
*** 8369,8374 ****
--- 8377,8383 ----
  			| COMMENT
  			| COMMIT
  			| COMMITTED
+ 			| COMPRESSION
  			| CONNECTION
  			| CONSTRAINTS
  			| CONVERSION_P
***************
*** 8476,8481 ****
--- 8485,8491 ----
  			| PRIVILEGES
  			| PROCEDURAL
  			| PROCEDURE
+ 			| PROGRESS
  			| QUOTE
  			| READ
  			| REASSIGN
Index: src/backend/parser/keywords.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/keywords.c,v
retrieving revision 1.171
diff -c -r1.171 keywords.c
*** src/backend/parser/keywords.c	5 Mar 2006 15:58:32 -0000	1.171
--- src/backend/parser/keywords.c	31 May 2006 08:52:57 -0000
***************
*** 83,88 ****
--- 83,89 ----
  	{"comment", COMMENT},
  	{"commit", COMMIT},
  	{"committed", COMMITTED},
+ 	{"compression", COMPRESSION},
  	{"connection", CONNECTION},
  	{"constraint", CONSTRAINT},
  	{"constraints", CONSTRAINTS},
***************
*** 267,272 ****
--- 268,274 ----
  	{"privileges", PRIVILEGES},
  	{"procedural", PROCEDURAL},
  	{"procedure", PROCEDURE},
+ 	{"progress", PROGRESS},
  	{"quote", QUOTE},
  	{"read", READ},
  	{"real", REAL},
Index: src/bin/psql/copy.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/bin/psql/copy.c,v
retrieving revision 1.61
diff -c -r1.61 copy.c
*** src/bin/psql/copy.c	26 May 2006 19:51:29 -0000	1.61
--- src/bin/psql/copy.c	31 May 2006 08:53:00 -0000
***************
*** 62,70 ****
--- 62,72 ----
  	bool		psql_inout;		/* true = use psql stdin/stdout */
  	bool		from;
  	bool		binary;
+     bool        compression;
  	bool		oids;
  	bool		csv_mode;
  	bool		header;
+ 	long 	    progress;
  	char	   *delim;
  	char	   *null;
  	char	   *quote;
***************
*** 139,145 ****
  	}
  
  	result->table = pg_strdup(token);
- 
  	token = strtokx(NULL, whitespace, ".,()", "\"",
  					0, false, pset.encoding);
  	if (!token)
--- 141,146 ----
***************
*** 283,288 ****
--- 284,297 ----
  				result->oids = true;
  			else if (pg_strcasecmp(token, "binary") == 0)
  				result->binary = true;
+ 			else if (pg_strcasecmp(token, "compression") == 0)
+ 				result->compression = true;
+ 			else if (pg_strcasecmp(token, "progress") == 0)
+ 			{
+ 				token = strtokx(NULL, whitespace, NULL, NULL,
+ 							0, false, pset.encoding);
+ 				result->progress = atol(token);
+ 			}
  			else if (pg_strcasecmp(token, "csv") == 0)
  				result->csv_mode = true;
  			else if (pg_strcasecmp(token, "header") == 0)
***************
*** 478,483 ****
--- 487,497 ----
  			appendPQExpBuffer(&query, " WITH NULL AS '%s'", options->null);
  	}
  
+ 	if (options->compression)
+ 		appendPQExpBuffer(&query, " COMPRESSION");
+ 	if (options->progress)
+ 	    appendPQExpBuffer(&query, " PROGRESS %ld", options->progress);
+ 
  	if (options->csv_mode)
  		appendPQExpBuffer(&query, " CSV");
  
Index: src/include/utils/pg_lzcompress.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/utils/pg_lzcompress.h,v
retrieving revision 1.11
diff -c -r1.11 pg_lzcompress.h
*** src/include/utils/pg_lzcompress.h	25 May 2005 21:40:42 -0000	1.11
--- src/include/utils/pg_lzcompress.h	31 May 2006 08:53:02 -0000
***************
*** 56,62 ****
   * ----------
   */
  #define PGLZ_IS_COMPRESSED(_lzdata)		((_lzdata)->varsize !=				\
! e										 (_lzdata)->rawsize +			e	\
  														sizeof(PGLZ_Header))
  
  /* ----------
--- 56,62 ----
   * ----------
   */
  #define PGLZ_IS_COMPRESSED(_lzdata)		((_lzdata)->varsize !=				\
! 										 (_lzdata)->rawsize +				\
  														sizeof(PGLZ_Header))
  
  /* ----------
#2Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andreas Pflug (#1)
Re: copy with compression progress n

Andreas Pflug <pgadmin@pse-consulting.de> writes:

The attached patch implements COPY ... WITH [BINARY] COMPRESSION
(compression implies BINARY). The copy data uses bit 17 of the flag
field to identify compressed data.

I think this is a pretty horrid idea, because it changes pg_lzcompress
from an unimportant implementation detail into a backup file format
that we have to support till the end of time. What happens if, say,
we need to abandon pg_lzcompress because we find out it has patent
problems?

It *might* be tolerable if we used gzip instead, but I really don't see
the argument for doing this inside the server at all: piping to gzip
seems like a perfectly acceptable solution, quite possibly with higher
performance than doing it all in a single process (which isn't going
to be able to use more than one CPU).

I don't see the argument for restricting it to binary only, either.

regards, tom lane

#3Andreas Pflug
pgadmin@pse-consulting.de
In reply to: Tom Lane (#2)
Re: copy with compression progress n

Tom Lane wrote:

Andreas Pflug <pgadmin@pse-consulting.de> writes:

The attached patch implements COPY ... WITH [BINARY] COMPRESSION
(compression implies BINARY). The copy data uses bit 17 of the flag
field to identify compressed data.

I think this is a pretty horrid idea, because it changes pg_lzcompress
from an unimportant implementation detail into a backup file format
that we have to support till the end of time. What happens if, say,
we need to abandon pg_lzcompress because we find out it has patent
problems?

It *might* be tolerable if we used gzip instead,

I used pg_lzcompress because it's present in the backend. I'm fine with
every other good compression algorithm.

but I really don't see
the argument for doing this inside the server at all: piping to gzip
seems like a perfectly acceptable solution,

As I said, this hits only if it is possible to pipe the result into gzip
in a performant way. The issue already arises if psql or any other COPY
client (slony, pg_dump) is not on the same machine: Network bandwidth
will limit throughput.

quite possibly with higher
performance than doing it all in a single process (which isn't going
to be able to use more than one CPU).

Which is pretty normal for pgsql.

I don't see the argument for restricting it to binary only, either.

That's not a restriction, but a result: compressed data is binary.
Marking it as binary will make it working with older frontends as well,
as long as they don't try to interpret the data. Actually, all 8.x psql
versions should work (with COPY STDxx, not \copy).

Do you have a comment about the progress notification and its impact on
copy to stdout?

Regards,
Andreas

#4Tom Lane
tgl@sss.pgh.pa.us
In reply to: Andreas Pflug (#3)
Re: copy with compression progress n

Andreas Pflug <pgadmin@pse-consulting.de> writes:

Do you have a comment about the progress notification and its impact on
copy to stdout?

I didn't bother to comment on it because I think it's useless, as well
as broken for the stdout case. Anyone who actually sees a use for it
will have to comment on why they want it.

regards, tom lane

#5Andreas Pflug
pgadmin@pse-consulting.de
In reply to: Tom Lane (#4)
Re: copy progress notification

Tom Lane wrote:

Andreas Pflug <pgadmin@pse-consulting.de> writes:

Do you have a comment about the progress notification and its impact on
copy to stdout?

I didn't bother to comment on it because I think it's useless,

It's useful to see anything at all, and to be able to estimate how long
the whole process will take. People might find it interesting whether
they should go for a cup of coffee or come better back the next day...

as well as broken for the stdout case.

I know it's broken, but why? Is using ereport when sending copy data
illegal by design? If not, it's not the feature that's broken but
something in cvs HEAD.

Regards,
Andreas

#6Hannu Krosing
hannu@skype.net
In reply to: Andreas Pflug (#3)
Re: copy with compression progress n

Ühel kenal päeval, K, 2006-05-31 kell 17:31, kirjutas Andreas Pflug:

Tom Lane wrote:

Andreas Pflug <pgadmin@pse-consulting.de> writes:

The attached patch implements COPY ... WITH [BINARY] COMPRESSION
(compression implies BINARY). The copy data uses bit 17 of the flag
field to identify compressed data.

I think this is a pretty horrid idea, because it changes pg_lzcompress
from an unimportant implementation detail into a backup file format
that we have to support till the end of time. What happens if, say,
we need to abandon pg_lzcompress because we find out it has patent
problems?

It *might* be tolerable if we used gzip instead,

I used pg_lzcompress because it's present in the backend. I'm fine with
every other good compression algorithm.

but I really don't see
the argument for doing this inside the server at all: piping to gzip
seems like a perfectly acceptable solution,

As I said, this hits only if it is possible to pipe the result into gzip
in a performant way. The issue already arises if psql or any other COPY
client (slony, pg_dump) is not on the same machine: Network bandwidth
will limit throughput.

Maybe make up a way to pipe COPY result through some external process
(like gzip) on the server side without having shell access there.

To make it secure, the external process should probably be run from a
hardwired directory via chroot.

--
----------------
Hannu Krosing
Database Architect
Skype Technologies OÜ
Akadeemia tee 21 F, Tallinn, 12618, Estonia

Skype me: callto:hkrosing
Get Skype for free: http://www.skype.com