Sketch of a Hook into the Logging Collector
Hello all,
I have some needs that seem to support changing Postgres slightly to
give user programs a lot more power over how to process logging output
that neither the log collector nor the syslog output can well-satisfy
as-is.
I am approaching this from the angle of increasing power by exposing
the log collector ("syslogger") pipe protocol.
Included is a patch whose general aesthetic is to touch as little of
the logging code as possible while achieving maximum power in a
contrib and stand-alone sample program. However, a more satisfying
treatment that may be slightly more invasive to the logging system is
very welcome.
The general idea here is that the logging collector pipe protocol is a
pretty reasonable one for non-Postgres programs to consume as-is
pretty easily. The proof of concept consists of three things:
* A hook into the log collector. It is small but a little weird
compared to the other hooks in the system.
* A contrib, pg_logcollectdup. This contrib lets one forward logs to
a named pipe specified in postgresql.conf.
* A stand-alone program, pg_logcollect_sample, that renders protocol
traffic newline separated and with its binary fields converted to
readable text.
The patch can also be seen from https://github.com/fdr/postgres.git in
the branch 'logdup'. I do rebase this at-will for the time being.
I have a few detailed dissatisfactions with this approach, but I'd
rather hear your own dissatisfactions. I also don't like the name.
A demo of configuring all of the above and seeing some output follows.
I can paste this in one go on my machine. It creates /tmp/pg-logdup
and /tmp/pgdata-logdup. All spawned processes can be listed
afterwards with 'jobs'.
# Install postgres and pg_logcollectdup contrib
./configure --prefix=/tmp/pg-logdup
make -sj16 install
cd contrib/pg_logcollectdup/
make -s install
/tmp/pg-logdup/bin/initdb -D /tmp/pgdata-logdup
mkfifo /tmp/pgdata-logdup/log-pipe
# Configure postgresql.conf
echo "logging_collector = on" >> \
/tmp/pgdata-logdup/postgresql.conf
echo "shared_preload_libraries = 'pg_logcollectdup'" >> \
/tmp/pgdata-logdup/postgresql.conf
echo "logcollectdup.destination = '/tmp/pgdata-logdup/log-pipe'" >> \
/tmp/pgdata-logdup/postgresql.conf
# Set up destination pipe
mkfifo /tmp/pgdata-logdup/log-pipe
# Build sample pipe formatter
make pg_logcollect_sample
# Run it in the background
./pg_logcollect_sample \
< /tmp/pgdata-logdup/log-pipe \
/tmp/pgdata-logdup/duplogs.txt &
# Run Postgres with a non-default port for convenience
/tmp/pg-logdup/bin/postgres -D /tmp/pgdata-logdup --port=2345 &
# Prevent race against file creation, then look at the logs
sleep 1
cat /tmp/pgdata-logdup/duplogs.txt
--
fdr
Attachments:
log-collector-extension-v1.patchapplication/octet-stream; name=log-collector-extension-v1.patchDownload
*** a/contrib/Makefile
--- b/contrib/Makefile
***************
*** 32,37 **** SUBDIRS = \
--- 32,38 ----
pg_archivecleanup \
pg_buffercache \
pg_freespacemap \
+ pg_logcollectdup \
pg_standby \
pg_stat_statements \
pg_test_fsync \
*** /dev/null
--- b/contrib/pg_logcollectdup/Makefile
***************
*** 0 ****
--- 1,15 ----
+ MODULE_big = pg_logcollectdup
+ OBJS = pg_logcollectdup.o
+
+ EXTENSION = pg_logcollectdup
+
+ ifdef USE_PGXS
+ PG_CONFIG = pg_config
+ PGXS := $(shell $(PG_CONFIG) --pgxs)
+ include $(PGXS)
+ else
+ subdir = contrib/pg_logcollectdup
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ endif
*** /dev/null
--- b/contrib/pg_logcollectdup/pg_logcollect_sample.c
***************
*** 0 ****
--- 1,207 ----
+ /*
+ * pg_logcollect_sample.c
+ *
+ * Implements a stand-alone program that can read log collector aka syslogger
+ * pipe traffic and print it out in a readable character sequences (NUL bytes
+ * and binary numbers converted), without making any effort to defragment it.
+ * It's mostly intended to be a demonstration or sample, although it may be
+ * useful in its own right.
+ *
+ * Notably, this program does not link against Postgres at all.
+ */
+ #include <errno.h>
+ #include <limits.h>
+ #include <stdio.h>
+ #include <string.h>
+ #include <unistd.h>
+
+ /* Taken from postgres/src/include/c.h */
+ typedef unsigned short uint16;
+ typedef signed int int32;
+
+ /* PIPE_CHUNK_SIZE definition taken from syslogger.h */
+
+ /*
+ * Primitive protocol structure for writing to syslogger pipe(s). The idea
+ * here is to divide long messages into chunks that are not more than
+ * PIPE_BUF bytes long, which according to POSIX spec must be written into
+ * the pipe atomically. The pipe reader then uses the protocol headers to
+ * reassemble the parts of a message into a single string. The reader can
+ * also cope with non-protocol data coming down the pipe, though we cannot
+ * guarantee long strings won't get split apart.
+ *
+ * We use non-nul bytes in is_last to make the protocol a tiny bit
+ * more robust against finding a false double nul byte prologue. But
+ * we still might find it in the len and/or pid bytes unless we're careful.
+ */
+
+ #ifdef PIPE_BUF
+ /* Are there any systems with PIPE_BUF > 64K? Unlikely, but ... */
+ #if PIPE_BUF > 65536
+ #define PIPE_CHUNK_SIZE 65536
+ #else
+ #define PIPE_CHUNK_SIZE ((int) PIPE_BUF)
+ #endif
+ #else /* not defined */
+ /* POSIX says the value of PIPE_BUF must be at least 512, so use that */
+ #define PIPE_CHUNK_SIZE 512
+ #endif
+
+
+ /* End PIPE_CHUNK_SIZE define */
+
+ /* Constants defined by both the protocol and the system's PIPE_BUF */
+ #define PIPE_HEADER_SIZE (9)
+ #define PIPE_MAX_PAYLOAD ((PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))
+
+ static ssize_t safe_read(int fd, void *buf, size_t count);
+ static void printInput(char *logbuffer, int *bytes_in_logbuffer);
+
+ /*
+ * read, but retry as long as one receives EINTR.
+ */
+ ssize_t
+ safe_read(int fd, void *buf, size_t count)
+ {
+ const int save_errno = 0;
+ ssize_t numRead;
+
+ readAgain:
+ errno = 0;
+ numRead = read(fd, buf, count);
+
+ if (numRead < 0 && errno == EINTR)
+ goto readAgain;
+
+ /*
+ * If read() succeeds, then restore the old errno to avoid clearing errors
+ * on behalf of the caller. If it fails, then leave errno alone, since
+ * that's what read normally does anyway.
+ */
+ if (errno == 0)
+ errno = save_errno;
+
+ return numRead;
+ }
+
+ /*
+ * Print input data, using code taken from syslogger.c but stripped of most of
+ * its more interesting functionality except stepping through the input buffer
+ * and printing it in a more palatable human-readable format.
+ */
+ static void
+ printInput(char *logBuf, int *logBufLen)
+ {
+ char *cursor = logBuf;
+
+ /* While there is enough data for a header, process it */
+ while (*logBufLen >= PIPE_HEADER_SIZE)
+ {
+ char *nuls = cursor;
+ uint16 *len = (void *) (logBuf + 2);
+ int32 *pid = (void *) (logBuf + 4);
+ char *fmt = logBuf + 8;
+ char *payload = logBuf + 9;
+
+ /*
+ * Sometimes, non-protocol traffic (e.g. libraries that write directly
+ * stderr) end up piped out of a Postgres process. Here, detect the
+ * Postgres format as obeyed by ereport/elog and handle if possible.
+ * The header looks like this:
+ *
+ * [NUL] [NUL] [DATALEN]*2 [PID Integer Fragment]*4 [t|T|f|F]
+ *
+ * The last byte deserves more explanation:
+ *
+ * * If capitalized, this is a CSV formatted record. If lower case,
+ * the log record respects the user-specified or default
+ * formatting.
+ *
+ * * If 't' or 'T', then this is the last fragment (termination) for
+ * a log message.
+ *
+ * * If 'f' or 'F', then this is a fragment that has a continuation
+ * yet to come.
+ *
+ * This also means the minimum protocol-abiding traffic may be nine
+ * bytes long on read(), and at maximum can be PIPE_CHUNK_SIZE, since
+ * protocol traffic relies on the atomic nature of fragmenting into
+ * PIPE_CHUNK_SIZE pieces.
+ */
+ if (nuls[0] == '\0' && nuls[1] == '\0' &&
+ *len > 0 && *len <= PIPE_MAX_PAYLOAD &&
+ *pid != 0 &&
+ (*fmt == 't' || *fmt == 'f' ||
+ *fmt == 'T' || *fmt == 'F'))
+ {
+ const int chunkLen = PIPE_HEADER_SIZE + *len;
+
+ printf("LEN=%d PID=%d FMT=%c %*s\n", *len, *pid, *fmt,
+ *len, payload);
+
+ /* Finished processing this chunk */
+ cursor += chunkLen;
+ *logBufLen -= chunkLen;
+ }
+ else
+ {
+ int protoCur;
+
+ /*
+ * Process non-protocol data, but just in case, look for the
+ * beginning of some protocol traffic and re-start the formatting
+ * routine if that happens.
+ *
+ * It is expected that in many scenarios, a non-protocol message
+ * will arrive all in one read(), and we want to respect the read()
+ * boundary if possible.
+ *
+ * NB: Skip looking at the first byte, because the previous branch
+ * would have already spotted a valid chunk that is aligned
+ * properly, and not found inside some arbitrary data.
+ */
+ for (protoCur = 1; protoCur < *logBufLen; protoCur += 1)
+ {
+ if (cursor[protoCur] == '\0')
+ break;
+ }
+
+ printf("LEN= PID= FMT= %*s\n", protoCur, cursor);
+
+ cursor += protoCur;
+ *logBufLen -= protoCur;
+ }
+ }
+
+ /* Don't have a full chunk, so left-align what remains in the buffer */
+ if (*logBufLen > 0 && cursor != logBuf)
+ memmove(logBuf, cursor, *logBufLen);
+ }
+
+ int
+ main(void)
+ {
+ char buf[2 * PIPE_CHUNK_SIZE];
+ int bufLen = 0;
+
+ while (1)
+ {
+ int numRead;
+
+ numRead = safe_read(0, buf, sizeof buf);
+
+ /* Handle EOF */
+ if (numRead == 0)
+ return 0;
+
+ /* Exit if read doesn't work for whatever reason */
+ if (numRead < 0)
+ return 1;
+
+ bufLen += numRead;
+
+ printInput(buf, &bufLen);
+ fflush(stdout);
+
+ }
+ }
*** /dev/null
--- b/contrib/pg_logcollectdup/pg_logcollectdup.c
***************
*** 0 ****
--- 1,263 ----
+ /*
+ * pg_logcollectdup.c
+ *
+ * Implements a module to be loaded via shared_preload_libraries that, should
+ * "logcollectdup.destination" be set in postgresql.conf and the log collector
+ * ("syslogger") be enabled will allow a copy of the log collection protocol
+ * traffic to be forwarded to file system path of once's choice. It is
+ * suggested that this path is most useful if it is a mkfifo named pipe, so
+ * that a completely seperate program can handle the protocol traffic.
+ */
+
+ #include "postgres.h"
+
+ #include <sys/stat.h>
+ #include <unistd.h>
+
+ #include "funcapi.h"
+ #include "postmaster/syslogger.h"
+ #include "utils/guc.h"
+
+ PG_MODULE_MAGIC;
+
+ /* GUC-configured destination of the log pages */
+ static char *destination;
+
+ static ProcessLogCollect_hook_type prev_ProcessLogCollect = NULL;
+
+ static void openDestFd(char *dest, int *currentFd);
+ static void closeDestFd(int *currentFd);
+ static void logcollectdup_ProcessLogCollect(char *buf, int len);
+ static void call_ProcessLogCollect(char *buf, int len);
+
+ /*
+ * File descriptor that log pages are written to. Is re-set if a
+ * write fails.
+ */
+ static int currentFd = -1;
+
+ void _PG_init(void);
+ void _PG_fini(void);
+
+ /*
+ * _PG_init() - library load-time initialization
+ *
+ * DO NOT make this static nor change its name!
+ *
+ * Init the module, all we have to do here is getting our GUC
+ */
+ void
+ _PG_init(void) {
+ PG_TRY();
+ {
+ destination = GetConfigOptionByName(
+ "logcollectdup.destination", NULL);
+ }
+ PG_CATCH();
+ {
+ DefineCustomStringVariable("logcollectdup.destination",
+ "Path send log collector bytes to",
+ "",
+ &destination,
+ "",
+ PGC_SUSET,
+ GUC_NOT_IN_SAMPLE,
+ NULL,
+ NULL,
+ NULL);
+ EmitWarningsOnPlaceholders("logcollectdup.destination");
+ }
+ PG_END_TRY();
+
+ prev_ProcessLogCollect = ProcessLogCollect_hook;
+ ProcessLogCollect_hook = logcollectdup_ProcessLogCollect;
+ }
+
+ /*
+ * Open the destination file descriptor for writing, refusing to return until
+ * there is success, writing the fd value into *fd.
+ *
+ * That may sound extreme, but considering that the log collector would also
+ * cause logging processes to block were it to halt or close and the whole
+ * point of this module is to allow some other process to obtain a copy of the
+ * protocol traffic, it seems reasonable.
+ */
+ static void
+ openDestFd(char *dest, int *fd)
+ {
+ const int save_errno = errno;
+ struct stat stat;
+
+ /* Spin until the pipe can be opened */
+ while (dest != NULL && *fd < 0)
+ {
+ errno = 0;
+ *fd = open(dest, O_WRONLY);
+
+ if (errno != 0)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup cannot open destination"),
+ errdetail("The open() request failed with "
+ "the message: %s.", strerror(errno))));
+
+ /*
+ * Wait a while to not spin too aggressively while things are
+ * messed up.
+ */
+ sleep(1);
+ }
+
+ /* Must have a valid file descriptor here */
+ Assert(*fd >= 0);
+
+ /*
+ * Check if the opened file descriptor is a pipe. If it isn't,
+ * whine, close, and invalidate the file descriptor.
+ */
+ fstat(*fd, &stat);
+
+ if (!S_ISFIFO(stat.st_mode))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup only supports writing into "
+ "named pipes")));
+ closeDestFd(fd);
+ }
+
+ errno = save_errno;
+ }
+
+ /*
+ * Close the passed file descriptor and invalidate it.
+ */
+ static void
+ closeDestFd(int *fd)
+ {
+ const int save_errno = errno;
+
+ do
+ {
+ errno = 0;
+
+ /*
+ * Ignore errors except EINTR: other than EINTR, there is no
+ * obvious handling one can do from a failed close() that matters
+ * in this case.
+ */
+ close(*fd);
+
+ if (errno == EINTR)
+ continue;
+
+ if (errno == EBADF)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup attempted to close an "
+ "invalid file descriptor"),
+ errdetail("The file descriptor that failed a close "
+ "attempt was %d, and it failed with "
+ "the reason: %s.",
+ *fd, strerror(errno))));
+
+ /* Exit the EINTR retry loop */
+ *fd = -1;
+ } while (*fd >= 0);
+
+ errno = save_errno;
+ }
+
+ static void
+ logcollectdup_ProcessLogCollect(char *buf, int len)
+ {
+ int save_errno = errno;
+ int bytesWritten;
+
+ do
+ {
+ if (destination == NULL && currentFd < 0)
+ {
+ /*
+ * No destination defined, and no file descriptor open; in this
+ * case this extension was loaded but not configured, so just exit.
+ */
+ goto exit;
+ }
+ else if (destination != NULL && currentFd < 0)
+ {
+ /*
+ * Destination defined, but no file descriptor open yet. Open the
+ * file descriptor very insistently; when this returns it must be
+ * open, which also means backends that need to log *will block*
+ * until this succeeds.
+ */
+ openDestFd(destination, ¤tFd);
+ }
+ else if (destination == NULL && currentFd >= 0)
+ {
+ /*
+ * Destination undefined, but a file descriptor is still open.
+ * This can be the result of a SIGHUP/configuration change, so
+ * close and invalidate the file descriptor.
+ *
+ * Invalidates currentFd, continuing the retry loop.
+ */
+ closeDestFd(¤tFd);
+ }
+ } while (currentFd < 0);
+
+ writeAgain:
+ errno = 0;
+ bytesWritten = write(currentFd, buf, len);
+
+ /*
+ * Given PIPE_BUF atomicity, only expect failed writes or complete
+ * writes.
+ */
+
+ Assert(bytesWritten < 0 || bytesWritten == len);
+ if (bytesWritten < 0)
+ {
+ if (errno == EINTR)
+ goto writeAgain;
+
+ Assert(errno != 0);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup cannot write to pipe"),
+ errdetail("The write failed with the message: %s",
+ strerror(errno))));
+
+ /*
+ * Try very hard to toss out the old file descriptor and get a new one
+ * when things go awry.
+ */
+ closeDestFd(¤tFd);
+ openDestFd(destination, ¤tFd);
+ goto writeAgain;
+ }
+
+ exit:
+ errno = save_errno;
+ call_ProcessLogCollect(buf, len);
+ }
+
+ static void
+ call_ProcessLogCollect(char *buf, int len)
+ {
+ if (prev_ProcessLogCollect != NULL)
+ prev_ProcessLogCollect(buf, len);
+ else
+ standard_ProcessLogCollect(buf, len);
+ }
+
+ /*
+ * Module unload callback
+ */
+ void
+ _PG_fini(void)
+ {
+ /* Uninstall hook */
+ ProcessLogCollect_hook = prev_ProcessLogCollect;
+ }
*** /dev/null
--- b/contrib/pg_logcollectdup/pg_logcollectdup.control
***************
*** 0 ****
--- 1,4 ----
+ comment = 'forward log collector pipe traffic to a path'
+ default_version = '1.0'
+ module_pathname = '$libdir/pg_logcollectdup'
+ relocatable = true
*** a/src/backend/postmaster/syslogger.c
--- b/src/backend/postmaster/syslogger.c
***************
*** 64,69 ****
--- 64,70 ----
*/
#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
+ ProcessLogCollect_hook_type ProcessLogCollect_hook = NULL;
/*
* GUC parameters. Logging_collector cannot be changed after postmaster
***************
*** 460,465 **** SysLoggerMain(int argc, char *argv[])
--- 461,471 ----
bytesRead = read(syslogPipe[0],
logbuffer + bytes_in_logbuffer,
sizeof(logbuffer) - bytes_in_logbuffer);
+
+ if (ProcessLogCollect_hook != NULL)
+ ProcessLogCollect_hook(logbuffer + bytes_in_logbuffer,
+ bytesRead);
+
if (bytesRead < 0)
{
if (errno != EINTR)
***************
*** 768,773 **** syslogger_parseArgs(int argc, char *argv[])
--- 774,785 ----
* --------------------------------
*/
+ void
+ standard_ProcessLogCollect(char *buf, int len)
+ {
+ /* No-op for now */
+ }
+
/*
* Process data received through the syslogger pipe.
*
*** a/src/include/postmaster/syslogger.h
--- b/src/include/postmaster/syslogger.h
***************
*** 14,19 ****
--- 14,23 ----
#include <limits.h> /* for PIPE_BUF */
+ /* Hook for plugins to process logs in process_pipe_input */
+ typedef void (*ProcessLogCollect_hook_type) (char *buf, int len);
+ extern PGDLLIMPORT ProcessLogCollect_hook_type ProcessLogCollect_hook;
+ extern void standard_ProcessLogCollect(char *buf, int len);
/*
* Primitive protocol structure for writing to syslogger pipe(s). The idea
On Sat, Dec 8, 2012 at 10:40 AM, Daniel Farina <daniel@heroku.com> wrote:
* A contrib, pg_logcollectdup. This contrib lets one forward logs to
a named pipe specified in postgresql.conf.
I have revised this part in the attached patch. It's some error
handling in a case of user error, and the previous demo script and
narrative precepts are still the same.
--
fdr
Attachments:
log-collector-extension-v2.patchapplication/octet-stream; name=log-collector-extension-v2.patchDownload
*** a/contrib/Makefile
--- b/contrib/Makefile
***************
*** 32,37 **** SUBDIRS = \
--- 32,38 ----
pg_archivecleanup \
pg_buffercache \
pg_freespacemap \
+ pg_logcollectdup \
pg_standby \
pg_stat_statements \
pg_test_fsync \
*** /dev/null
--- b/contrib/pg_logcollectdup/Makefile
***************
*** 0 ****
--- 1,15 ----
+ MODULE_big = pg_logcollectdup
+ OBJS = pg_logcollectdup.o
+
+ EXTENSION = pg_logcollectdup
+
+ ifdef USE_PGXS
+ PG_CONFIG = pg_config
+ PGXS := $(shell $(PG_CONFIG) --pgxs)
+ include $(PGXS)
+ else
+ subdir = contrib/pg_logcollectdup
+ top_builddir = ../..
+ include $(top_builddir)/src/Makefile.global
+ include $(top_srcdir)/contrib/contrib-global.mk
+ endif
*** /dev/null
--- b/contrib/pg_logcollectdup/pg_logcollect_sample.c
***************
*** 0 ****
--- 1,207 ----
+ /*
+ * pg_logcollect_sample.c
+ *
+ * Implements a stand-alone program that can read log collector aka syslogger
+ * pipe traffic and print it out in a readable character sequences (NUL bytes
+ * and binary numbers converted), without making any effort to defragment it.
+ * It's mostly intended to be a demonstration or sample, although it may be
+ * useful in its own right.
+ *
+ * Notably, this program does not link against Postgres at all.
+ */
+ #include <errno.h>
+ #include <limits.h>
+ #include <stdio.h>
+ #include <string.h>
+ #include <unistd.h>
+
+ /* Taken from postgres/src/include/c.h */
+ typedef unsigned short uint16;
+ typedef signed int int32;
+
+ /* PIPE_CHUNK_SIZE definition taken from syslogger.h */
+
+ /*
+ * Primitive protocol structure for writing to syslogger pipe(s). The idea
+ * here is to divide long messages into chunks that are not more than
+ * PIPE_BUF bytes long, which according to POSIX spec must be written into
+ * the pipe atomically. The pipe reader then uses the protocol headers to
+ * reassemble the parts of a message into a single string. The reader can
+ * also cope with non-protocol data coming down the pipe, though we cannot
+ * guarantee long strings won't get split apart.
+ *
+ * We use non-nul bytes in is_last to make the protocol a tiny bit
+ * more robust against finding a false double nul byte prologue. But
+ * we still might find it in the len and/or pid bytes unless we're careful.
+ */
+
+ #ifdef PIPE_BUF
+ /* Are there any systems with PIPE_BUF > 64K? Unlikely, but ... */
+ #if PIPE_BUF > 65536
+ #define PIPE_CHUNK_SIZE 65536
+ #else
+ #define PIPE_CHUNK_SIZE ((int) PIPE_BUF)
+ #endif
+ #else /* not defined */
+ /* POSIX says the value of PIPE_BUF must be at least 512, so use that */
+ #define PIPE_CHUNK_SIZE 512
+ #endif
+
+
+ /* End PIPE_CHUNK_SIZE define */
+
+ /* Constants defined by both the protocol and the system's PIPE_BUF */
+ #define PIPE_HEADER_SIZE (9)
+ #define PIPE_MAX_PAYLOAD ((PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))
+
+ static ssize_t safe_read(int fd, void *buf, size_t count);
+ static void printInput(char *logbuffer, int *bytes_in_logbuffer);
+
+ /*
+ * read, but retry as long as one receives EINTR.
+ */
+ ssize_t
+ safe_read(int fd, void *buf, size_t count)
+ {
+ const int save_errno = 0;
+ ssize_t numRead;
+
+ readAgain:
+ errno = 0;
+ numRead = read(fd, buf, count);
+
+ if (numRead < 0 && errno == EINTR)
+ goto readAgain;
+
+ /*
+ * If read() succeeds, then restore the old errno to avoid clearing errors
+ * on behalf of the caller. If it fails, then leave errno alone, since
+ * that's what read normally does anyway.
+ */
+ if (errno == 0)
+ errno = save_errno;
+
+ return numRead;
+ }
+
+ /*
+ * Print input data, using code taken from syslogger.c but stripped of most of
+ * its more interesting functionality except stepping through the input buffer
+ * and printing it in a more palatable human-readable format.
+ */
+ static void
+ printInput(char *logBuf, int *logBufLen)
+ {
+ char *cursor = logBuf;
+
+ /* While there is enough data for a header, process it */
+ while (*logBufLen >= PIPE_HEADER_SIZE)
+ {
+ char *nuls = cursor;
+ uint16 *len = (void *) (logBuf + 2);
+ int32 *pid = (void *) (logBuf + 4);
+ char *fmt = logBuf + 8;
+ char *payload = logBuf + 9;
+
+ /*
+ * Sometimes, non-protocol traffic (e.g. libraries that write directly
+ * stderr) end up piped out of a Postgres process. Here, detect the
+ * Postgres format as obeyed by ereport/elog and handle if possible.
+ * The header looks like this:
+ *
+ * [NUL] [NUL] [DATALEN]*2 [PID Integer Fragment]*4 [t|T|f|F]
+ *
+ * The last byte deserves more explanation:
+ *
+ * * If capitalized, this is a CSV formatted record. If lower case,
+ * the log record respects the user-specified or default
+ * formatting.
+ *
+ * * If 't' or 'T', then this is the last fragment (termination) for
+ * a log message.
+ *
+ * * If 'f' or 'F', then this is a fragment that has a continuation
+ * yet to come.
+ *
+ * This also means the minimum protocol-abiding traffic may be nine
+ * bytes long on read(), and at maximum can be PIPE_CHUNK_SIZE, since
+ * protocol traffic relies on the atomic nature of fragmenting into
+ * PIPE_CHUNK_SIZE pieces.
+ */
+ if (nuls[0] == '\0' && nuls[1] == '\0' &&
+ *len > 0 && *len <= PIPE_MAX_PAYLOAD &&
+ *pid != 0 &&
+ (*fmt == 't' || *fmt == 'f' ||
+ *fmt == 'T' || *fmt == 'F'))
+ {
+ const int chunkLen = PIPE_HEADER_SIZE + *len;
+
+ printf("LEN=%d PID=%d FMT=%c %*s\n", *len, *pid, *fmt,
+ *len, payload);
+
+ /* Finished processing this chunk */
+ cursor += chunkLen;
+ *logBufLen -= chunkLen;
+ }
+ else
+ {
+ int protoCur;
+
+ /*
+ * Process non-protocol data, but just in case, look for the
+ * beginning of some protocol traffic and re-start the formatting
+ * routine if that happens.
+ *
+ * It is expected that in many scenarios, a non-protocol message
+ * will arrive all in one read(), and we want to respect the read()
+ * boundary if possible.
+ *
+ * NB: Skip looking at the first byte, because the previous branch
+ * would have already spotted a valid chunk that is aligned
+ * properly, and not found inside some arbitrary data.
+ */
+ for (protoCur = 1; protoCur < *logBufLen; protoCur += 1)
+ {
+ if (cursor[protoCur] == '\0')
+ break;
+ }
+
+ printf("LEN= PID= FMT= %*s\n", protoCur, cursor);
+
+ cursor += protoCur;
+ *logBufLen -= protoCur;
+ }
+ }
+
+ /* Don't have a full chunk, so left-align what remains in the buffer */
+ if (*logBufLen > 0 && cursor != logBuf)
+ memmove(logBuf, cursor, *logBufLen);
+ }
+
+ int
+ main(void)
+ {
+ char buf[2 * PIPE_CHUNK_SIZE];
+ int bufLen = 0;
+
+ while (1)
+ {
+ int numRead;
+
+ numRead = safe_read(0, buf, sizeof buf);
+
+ /* Handle EOF */
+ if (numRead == 0)
+ return 0;
+
+ /* Exit if read doesn't work for whatever reason */
+ if (numRead < 0)
+ return 1;
+
+ bufLen += numRead;
+
+ printInput(buf, &bufLen);
+ fflush(stdout);
+
+ }
+ }
*** /dev/null
--- b/contrib/pg_logcollectdup/pg_logcollectdup.c
***************
*** 0 ****
--- 1,306 ----
+ /*
+ * pg_logcollectdup.c
+ *
+ * Implements a module to be loaded via shared_preload_libraries that, should
+ * "logcollectdup.destination" be set in postgresql.conf and the log collector
+ * ("syslogger") be enabled will allow a copy of the log collection protocol
+ * traffic to be forwarded to file system path of once's choice. It is
+ * suggested that this path is most useful if it is a mkfifo named pipe, so
+ * that a completely seperate program can handle the protocol traffic.
+ */
+
+ #include "postgres.h"
+
+ #include <sys/stat.h>
+ #include <unistd.h>
+
+ #include "funcapi.h"
+ #include "postmaster/syslogger.h"
+ #include "utils/guc.h"
+
+ PG_MODULE_MAGIC;
+
+ /* GUC-configured destination of the log pages */
+ static char *destination;
+
+ static ProcessLogCollect_hook_type prev_ProcessLogCollect = NULL;
+
+ static void openDestFd(char *dest, int *currentFd);
+ static void closeDestFd(int *currentFd);
+ static void logcollectdup_ProcessLogCollect(char *buf, int len);
+ static void call_ProcessLogCollect(char *buf, int len);
+
+ /*
+ * File descriptor that log pages are written to. Is re-set if a
+ * write fails.
+ */
+ static int currentFd = -1;
+
+ void _PG_init(void);
+ void _PG_fini(void);
+
+ /*
+ * _PG_init() - library load-time initialization
+ *
+ * DO NOT make this static nor change its name!
+ *
+ * Init the module, all we have to do here is getting our GUC
+ */
+ void
+ _PG_init(void) {
+ PG_TRY();
+ {
+ destination = GetConfigOptionByName(
+ "logcollectdup.destination", NULL);
+ }
+ PG_CATCH();
+ {
+ DefineCustomStringVariable("logcollectdup.destination",
+ "Path send log collector bytes to",
+ "",
+ &destination,
+ "",
+ PGC_SUSET,
+ GUC_NOT_IN_SAMPLE,
+ NULL,
+ NULL,
+ NULL);
+ EmitWarningsOnPlaceholders("logcollectdup.destination");
+ }
+ PG_END_TRY();
+
+ prev_ProcessLogCollect = ProcessLogCollect_hook;
+ ProcessLogCollect_hook = logcollectdup_ProcessLogCollect;
+ }
+
+ /*
+ * Checks if the file descriptor pointed to by *fd is a fifo, logging
+ * a problem, closing it, and invalidating if is not.
+ */
+ static bool
+ checkFifoOrInvalidate(int *fd)
+ {
+ const int save_errno = errno;
+ int statRes;
+ struct stat st;
+
+ /*
+ * If the file descriptor is not valid, it's definitely not a
+ * fifo, and there's no need to invalidate it, either.
+ */
+ if (*fd < 0)
+ return false;
+
+ errno = 0;
+ statRes = fstat(*fd, &st);
+
+ if (statRes < 0)
+ {
+ /*
+ * Couldn't fstat. Without confirmation that the file
+ * descriptor points to a pipe, assume it is not one.
+ */
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup could not get information "
+ "about its output pipe"),
+ errdetail("The fstat() request failed with the message: %s.",
+ strerror(errno))));
+ goto closeFail;
+ }
+ else if (statRes == 0 && !S_ISFIFO(st.st_mode))
+ {
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup only supports writing into "
+ "named pipes")));
+ goto closeFail;
+ }
+
+ /* Success: it's a valid fifo. */
+ Assert(statRes == 0);
+ Assert(S_ISFIFO(stat.st_mode));
+ errno = save_errno;
+ return true;
+
+ closeFail:
+ closeDestFd(fd);
+ errno = save_errno;
+ return false;
+ }
+
+ /*
+ * Open the destination file descriptor for writing, refusing to return until
+ * there is success, writing the fd value into *fd.
+ *
+ * That may sound extreme, but considering that the log collector would also
+ * cause logging processes to block were it to halt or close and the whole
+ * point of this module is to allow some other process to obtain a copy of the
+ * protocol traffic, it seems reasonable.
+ */
+ static void
+ openDestFd(char *dest, int *fd)
+ {
+ const int save_errno = errno;
+
+ /* Spin until the pipe can be opened */
+ while (dest != NULL && *fd < 0)
+ {
+ errno = 0;
+ *fd = open(dest, O_WRONLY);
+
+ if (errno != 0)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup cannot open destination"),
+ errdetail("The open() request failed with "
+ "the message: %s.", strerror(errno))));
+
+ /*
+ * Even if the file descriptor was opened successfully, it
+ * might not be a pipe. If it isn't a pipe, whine, close, and
+ * invalidate the file descriptor.
+ */
+ checkFifoOrInvalidate(fd);
+
+ sleep(1);
+ }
+
+ /* Must have a valid file descriptor here */
+ Assert(*fd >= 0);
+
+ errno = save_errno;
+ }
+
+ /*
+ * Close the passed file descriptor and invalidate it.
+ */
+ static void
+ closeDestFd(int *fd)
+ {
+ const int save_errno = errno;
+
+ do
+ {
+ errno = 0;
+
+ /*
+ * Ignore errors except EINTR: other than EINTR, there is no
+ * obvious handling one can do from a failed close() that matters
+ * in this case.
+ */
+ close(*fd);
+
+ if (errno == EINTR)
+ continue;
+
+ if (errno == EBADF)
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup attempted to close an "
+ "invalid file descriptor"),
+ errdetail("The file descriptor that failed a close "
+ "attempt was %d, and it failed with "
+ "the reason: %s.",
+ *fd, strerror(errno))));
+
+ /* Exit the EINTR retry loop */
+ *fd = -1;
+ } while (*fd >= 0);
+
+ errno = save_errno;
+ }
+
+ static void
+ logcollectdup_ProcessLogCollect(char *buf, int len)
+ {
+ int save_errno = errno;
+ int bytesWritten;
+
+ do
+ {
+ if (destination == NULL && currentFd < 0)
+ {
+ /*
+ * No destination defined, and no file descriptor open; in this
+ * case this extension was loaded but not configured, so just exit.
+ */
+ goto exit;
+ }
+ else if (destination != NULL && currentFd < 0)
+ {
+ /*
+ * Destination defined, but no file descriptor open yet. Open the
+ * file descriptor very insistently; when this returns it must be
+ * open, which also means backends that need to log *will block*
+ * until this succeeds.
+ */
+ openDestFd(destination, ¤tFd);
+ }
+ else if (destination == NULL && currentFd >= 0)
+ {
+ /*
+ * Destination undefined, but a file descriptor is still open.
+ * This can be the result of a SIGHUP/configuration change, so
+ * close and invalidate the file descriptor.
+ *
+ * Invalidates currentFd, continuing the retry loop.
+ */
+ closeDestFd(¤tFd);
+ }
+ } while (currentFd < 0);
+
+ writeAgain:
+ errno = 0;
+ bytesWritten = write(currentFd, buf, len);
+
+ /*
+ * Given PIPE_BUF atomicity, only expect failed writes or complete
+ * writes.
+ */
+
+ Assert(bytesWritten < 0 || bytesWritten == len);
+ if (bytesWritten < 0)
+ {
+ if (errno == EINTR)
+ goto writeAgain;
+
+ Assert(errno != 0);
+ ereport(WARNING,
+ (errcode_for_file_access(),
+ errmsg("pg_logcollectdup cannot write to pipe"),
+ errdetail("The write failed with the message: %s",
+ strerror(errno))));
+
+ /*
+ * Try very hard to toss out the old file descriptor and get a new one
+ * when things go awry.
+ */
+ closeDestFd(¤tFd);
+ openDestFd(destination, ¤tFd);
+ goto writeAgain;
+ }
+
+ exit:
+ errno = save_errno;
+ call_ProcessLogCollect(buf, len);
+ }
+
+ static void
+ call_ProcessLogCollect(char *buf, int len)
+ {
+ if (prev_ProcessLogCollect != NULL)
+ prev_ProcessLogCollect(buf, len);
+ else
+ standard_ProcessLogCollect(buf, len);
+ }
+
+ /*
+ * Module unload callback
+ */
+ void
+ _PG_fini(void)
+ {
+ /* Uninstall hook */
+ ProcessLogCollect_hook = prev_ProcessLogCollect;
+ }
*** /dev/null
--- b/contrib/pg_logcollectdup/pg_logcollectdup.control
***************
*** 0 ****
--- 1,4 ----
+ comment = 'forward log collector pipe traffic to a path'
+ default_version = '1.0'
+ module_pathname = '$libdir/pg_logcollectdup'
+ relocatable = true
*** a/src/backend/postmaster/syslogger.c
--- b/src/backend/postmaster/syslogger.c
***************
*** 64,69 ****
--- 64,70 ----
*/
#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
+ ProcessLogCollect_hook_type ProcessLogCollect_hook = NULL;
/*
* GUC parameters. Logging_collector cannot be changed after postmaster
***************
*** 460,465 **** SysLoggerMain(int argc, char *argv[])
--- 461,471 ----
bytesRead = read(syslogPipe[0],
logbuffer + bytes_in_logbuffer,
sizeof(logbuffer) - bytes_in_logbuffer);
+
+ if (ProcessLogCollect_hook != NULL)
+ ProcessLogCollect_hook(logbuffer + bytes_in_logbuffer,
+ bytesRead);
+
if (bytesRead < 0)
{
if (errno != EINTR)
***************
*** 768,773 **** syslogger_parseArgs(int argc, char *argv[])
--- 774,785 ----
* --------------------------------
*/
+ void
+ standard_ProcessLogCollect(char *buf, int len)
+ {
+ /* No-op for now */
+ }
+
/*
* Process data received through the syslogger pipe.
*
*** a/src/include/postmaster/syslogger.h
--- b/src/include/postmaster/syslogger.h
***************
*** 14,19 ****
--- 14,23 ----
#include <limits.h> /* for PIPE_BUF */
+ /* Hook for plugins to process logs in process_pipe_input */
+ typedef void (*ProcessLogCollect_hook_type) (char *buf, int len);
+ extern PGDLLIMPORT ProcessLogCollect_hook_type ProcessLogCollect_hook;
+ extern void standard_ProcessLogCollect(char *buf, int len);
/*
* Primitive protocol structure for writing to syslogger pipe(s). The idea
On Sat, Dec 8, 2012 at 10:40 AM, Daniel Farina <daniel@heroku.com> wrote:
Hello all,
I am approaching this from the angle of increasing power by exposing
the log collector ("syslogger") pipe protocol.
I just spotted a better, already-committed patch. Thanks to Hannu for
pointing it out:
https://commitfest.postgresql.org/action/patch_view?id=717
I'll retract this patch, unless someone finds it interesting for some reason.
--
fdr
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers