diff --git a/configure b/configure
index 6f659a5..f2ef62d 100755
--- a/configure
+++ b/configure
@@ -698,6 +698,7 @@ ELF_SYS
 EGREP
 GREP
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 with_libxml
@@ -862,6 +863,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 enable_float4_byval
@@ -8016,6 +8018,86 @@ fi
 
 
 #
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
+
+#
 # Elf
 #
 
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 3bbdf17..08b8ab2 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -195,6 +195,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 6450c5a..1522c70 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -50,6 +50,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-schemapg postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index a4f6d4d..3a6f54b 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -95,6 +95,7 @@
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include  "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -143,6 +144,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream* PqStream;
+
+
 /*
  * Message status
  */
@@ -185,6 +189,31 @@ PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only conpression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+	if (port->use_compression)
+	{
+		char compression = 'z'; /* Request compression message */
+		int rc;
+		/* Switch on compression at client side */
+		socket_set_nonblocking(false);
+		while ((rc = secure_write(MyProcPort, &compression, 1)) < 0 && errno == EINTR);
+		if (rc != 1)
+			return -1;
+
+		/* initialize compression */
+		PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -225,6 +254,7 @@ pq_init(void)
 					  NULL, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
 	AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL, NULL);
+
 }
 
 /* --------------------------------
@@ -282,6 +312,9 @@ socket_close(int code, Datum arg)
 		free(MyProcPort->gss);
 #endif							/* ENABLE_GSS || ENABLE_SSPI */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -932,12 +965,14 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			   r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -953,21 +988,34 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		size_t processed = 0;
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		PqRecvLength += processed;
 
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("Failed to decompress data: %s", zpq_error(PqStream))));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -988,7 +1036,7 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		return r;
 	}
 }
 
@@ -1003,7 +1051,7 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1022,7 +1070,7 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1043,44 +1091,11 @@ pq_getbyte_if_available(unsigned char *c)
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1101,7 +1116,7 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1135,7 +1150,7 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1176,7 +1191,7 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1426,13 +1441,18 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal compression buffer */
 	{
-		int			r;
-
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
-		if (r <= 0)
+		int		r;
+		size_t  processed = 0;
+		size_t  available = bufend - bufptr;
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
+
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1480,7 +1500,6 @@ internal_flush(void)
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 660f318..295d756 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2101,6 +2101,16 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "compression") == 0)
+			{
+				if (!parse_bool(valptr, &port->use_compression))
+					ereport(FATAL,
+							(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+							 errmsg("invalid boolean value for parameter \"%s\": \"%s\"",
+									"compression",
+									valptr),
+							 errhint("Valid values are: \"false\", \"off\", 0, \"true\", \"on\", 1.")));
+			}
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4305,6 +4315,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index a15aa06..769a06d 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -183,6 +183,7 @@ int			nclients = 1;		/* number of clients */
 int			nthreads = 1;		/* number of threads */
 bool		is_connect;			/* establish connection for each transaction */
 bool		is_latencies;		/* report per-command latencies */
+bool        libpq_compression;  /* use libpq compression */
 int			main_pid;			/* main process id used in log filename */
 
 char	   *pghost = "";
@@ -575,6 +576,7 @@ usage(void)
 		   "  -h, --host=HOSTNAME      database server host or socket directory\n"
 		   "  -p, --port=PORT          database server port number\n"
 		   "  -U, --username=USERNAME  connect as specified database user\n"
+		   "  -Z, --compression        use libpq compression\n"
 		   "  -V, --version            output version information, then exit\n"
 		   "  -?, --help               show this help, then exit\n"
 		   "\n"
@@ -1092,7 +1094,7 @@ doConnect(void)
 	 */
 	do
 	{
-#define PARAMS_ARRAY_SIZE	7
+#define PARAMS_ARRAY_SIZE	8
 
 		const char *keywords[PARAMS_ARRAY_SIZE];
 		const char *values[PARAMS_ARRAY_SIZE];
@@ -1109,8 +1111,10 @@ doConnect(void)
 		values[4] = dbName;
 		keywords[5] = "fallback_application_name";
 		values[5] = progname;
-		keywords[6] = NULL;
-		values[6] = NULL;
+		keywords[6] = "compression";
+		values[6] = libpq_compression ? "on" : "off";
+		keywords[7] = NULL;
+		values[7] = NULL;
 
 		new_pass = false;
 
@@ -4442,6 +4446,7 @@ main(int argc, char **argv)
 		{"builtin", required_argument, NULL, 'b'},
 		{"client", required_argument, NULL, 'c'},
 		{"connect", no_argument, NULL, 'C'},
+		{"compression", no_argument, NULL, 'Z'},
 		{"debug", no_argument, NULL, 'd'},
 		{"define", required_argument, NULL, 'D'},
 		{"file", required_argument, NULL, 'f'},
@@ -4543,12 +4548,15 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "iI:h:nvp:dqb:SNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:Z", long_options, &optindex)) != -1)
 	{
 		char	   *script;
 
 		switch (c)
 		{
+			case 'Z':
+				libpq_compression = true;
+				break;
 			case 'i':
 				is_init_mode = true;
 				break;
diff --git a/src/bin/psql/startup.c b/src/bin/psql/startup.c
index be57574..9271716 100644
--- a/src/bin/psql/startup.c
+++ b/src/bin/psql/startup.c
@@ -75,6 +75,7 @@ struct adhoc_opts
 	bool		no_psqlrc;
 	bool		single_txn;
 	bool		list_dbs;
+	bool        compression;
 	SimpleActionList actions;
 };
 
@@ -237,8 +238,10 @@ main(int argc, char *argv[])
 		values[5] = pset.progname;
 		keywords[6] = "client_encoding";
 		values[6] = (pset.notty || getenv("PGCLIENTENCODING")) ? NULL : "auto";
-		keywords[7] = NULL;
-		values[7] = NULL;
+		keywords[7] = "compression";
+		values[7] = options.compression ? "on" : "off";
+		keywords[8] = NULL;
+		values[8] = NULL;
 
 		new_pass = false;
 		pset.db = PQconnectdbParams(keywords, values, true);
@@ -436,6 +439,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 		{"echo-all", no_argument, NULL, 'a'},
 		{"no-align", no_argument, NULL, 'A'},
 		{"command", required_argument, NULL, 'c'},
+		{"compression", no_argument, NULL, 'Z'},
 		{"dbname", required_argument, NULL, 'd'},
 		{"echo-queries", no_argument, NULL, 'e'},
 		{"echo-errors", no_argument, NULL, 'b'},
@@ -476,7 +480,7 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 
 	memset(options, 0, sizeof *options);
 
-	while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01",
+	while ((c = getopt_long(argc, argv, "aAbc:d:eEf:F:h:HlL:no:p:P:qR:sStT:U:v:VwWxXz?01Z",
 							long_options, &optindex)) != -1)
 	{
 		switch (c)
@@ -540,6 +544,9 @@ parse_psql_options(int argc, char *argv[], struct adhoc_opts *options)
 			case 'p':
 				options->port = pg_strdup(optarg);
 				break;
+			case 'Z':
+			    options->compression = true;
+				break;
 			case 'P':
 				{
 					char	   *value;
diff --git a/src/bin/scripts/pg_isready.c b/src/bin/scripts/pg_isready.c
index f7ad7b4..5d035cd 100644
--- a/src/bin/scripts/pg_isready.c
+++ b/src/bin/scripts/pg_isready.c
@@ -34,7 +34,7 @@ main(int argc, char **argv)
 	const char *pghostaddr_str = NULL;
 	const char *pgport_str = NULL;
 
-#define PARAMS_ARRAY_SIZE	7
+#define PARAMS_ARRAY_SIZE	8
 
 	const char *keywords[PARAMS_ARRAY_SIZE];
 	const char *values[PARAMS_ARRAY_SIZE];
diff --git a/src/common/Makefile b/src/common/Makefile
index 80e78d7..1548196 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -43,7 +43,7 @@ override CPPFLAGS += -DVAL_LIBS="\"$(LIBS)\""
 OBJS_COMMON = base64.o config_info.o controldata_utils.o exec.o ip.o \
 	keywords.o md5.o pg_lzcompress.o pgfnames.o psprintf.o relpath.o \
 	rmtree.o saslprep.o scram-common.o string.o unicode_norm.o \
-	username.o wait_error.o
+	username.o wait_error.o zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += sha2_openssl.o
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 7698cd1..5203f2d 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -182,6 +182,8 @@ typedef struct Port
 	char	   *peer_cn;
 	bool		peer_cert_valid;
 
+	bool        use_compression;
+	
 	/*
 	 * OpenSSL structures. (Keep these last so that the locations of other
 	 * fields are the same whether or not you build with OpenSSL.)
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 997947b..4ae548a 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -61,6 +61,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int  pq_configure(Port* port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 625cd21..305a53b 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -365,6 +365,9 @@
 /* Define to 1 if you have the `z' library (-lz). */
 #undef HAVE_LIBZ
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index abe0a50..e14c1f7 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,10 @@ endif
 # platforms require special flags.
 LIBS := $(LIBS:-lpgport=)
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
 # We can't use Makefile variables here because the MSVC build system scrapes
 # OBJS from this file.
 OBJS=	fe-auth.o fe-auth-scram.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 39c1999..67462eb 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -72,6 +72,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
 
 #include "common/ip.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "mb/pg_wchar.h"
 #include "port/pg_bswap.h"
 
@@ -269,6 +270,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		21,	/* sizeof("tls-server-end-point") == 21 */
 	offsetof(struct pg_conn, scram_channel_binding)},
 
+	{"compression", "COMPRESSION", "0", NULL,
+		"ZSTD-Compression", "", 1,
+	offsetof(struct pg_conn, compression)},
+
 	/*
 	 * ssl options are allowed even without client SSL support because the
 	 * client can still handle SSL modes "disable" and "allow". Other
@@ -325,6 +330,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", NULL, NULL, NULL,
+		"Compression", "Z", 5,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -430,6 +439,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2648,11 +2661,23 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						/* mark byte consumed */
+						conn->inStart = conn->inCursor;
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create((zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn);
+					} else
+						break;
 				}
 
 				/*
@@ -3462,6 +3487,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 2a6637f..488f8d2 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,11 +53,12 @@
 #include "port/pg_bswap.h"
 #include "pg_config_paths.h"
 
+#include  <common/zpq_stream.h>
 
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
-static int pqSocketCheck(PGconn *conn, int forRead, int forWrite,
-			  time_t end_time);
+static int  pqSocketCheck(PGconn *conn, int forRead, int forWrite,
+						  time_t end_time);
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
 /*
@@ -630,6 +631,7 @@ pqReadData(PGconn *conn)
 {
 	int			someread = 0;
 	int			nread;
+	size_t      processed;
 
 	if (conn->sock == PGINVALID_SOCKET)
 	{
@@ -678,10 +680,23 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	processed = 0;
+	nread = conn->zstream
+		? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+				   conn->inBufSize - conn->inEnd, &processed)
+		: pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+						conn->inBufSize - conn->inEnd);
+	conn->inEnd += processed;
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry3;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -768,10 +783,24 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	processed = 0;
+	nread = conn->zstream
+		? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,
+				   conn->inBufSize - conn->inEnd, &processed)
+		: pqsecure_read(conn, conn->inBuffer + conn->inEnd,
+						conn->inBufSize - conn->inEnd);
+	conn->inEnd += processed;
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		if (SOCK_ERRNO == EINTR)
 			goto retry4;
 		/* Some systems return EAGAIN/EWOULDBLOCK for no data */
@@ -842,12 +871,14 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered(conn->zstream))
 	{
 		int			sent;
-
+		size_t      processed = 0;
+		sent = conn->zstream
+			? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -855,8 +886,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+			: pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -896,7 +930,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered(conn->zstream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index d3ca5d2..fe82457 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -2171,6 +2171,8 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("database", conn->dbName);
 	if (conn->replication && conn->replication[0])
 		ADD_STARTUP_OPTION("replication", conn->replication);
+	if (conn->compression && conn->compression[0])
+		ADD_STARTUP_OPTION("compression", conn->compression);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
 	if (conn->send_appname)
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index eba23dc..c9bf84c 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -357,6 +358,7 @@ struct pg_conn
 	char	   *sslrootcert;	/* root certificate filename */
 	char	   *sslcrl;			/* certificate revocation list filename */
 	char	   *requirepeer;	/* required peer credentials for local sockets */
+	char	   *compression;    /* stream compression (0 or 1) */
 
 #if defined(ENABLE_GSS) || defined(ENABLE_SSPI)
 	char	   *krbsrvname;		/* Kerberos service name */
@@ -495,6 +497,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream* zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
