[RFC][PATCH] Logical Replication/BDR prototype and architecture
Hi everyone,
This mail contains the highlevel design description of how our prototype of
in-core logical replication works. The individual patches will be posted as
replies to this email. I obviously welcome all sorts of productive comments to
both the individual patches and the architecture.
Unless somebody objects I will add most of the individual marked as RFC to the
current commitfest. I hope that with comments stemming from that round we can
get several of the patches into the first or second commitfest. As soon as the
design is clear/accepted we will try very hard to get the following patches
into the second or third round.
If anybody disaggrees with the procedual way we try do this, please raise a hand
now.
I tried to find the right balance between keeping the description short enough
that anybody will read the design docs and verbose enough that it is
understandable. I can go into much more detail in any part if wanted.
Please, keep in mind that those patches are *RFC* and a prototype and not
intended to be applied as-is. There is a short description of the individual
patches and their relevancy at the end of the email.
Greetings,
Andres
========
=== Design goals for logical replication === :
- in core
- fast
- async
- robust
- multi-master
- modular
- as unintrusive as possible implementation wise
- basis for other technologies (sharding, replication into other DBMSs, ...)
For reasons why we think this is an important set of features please check out
the presentation from the in-core replication summit at pgcon:
http://wiki.postgresql.org/wiki/File:BDR_Presentation_PGCon2012.pdf
While you may argue that most of the above design goals are already provided by
various trigger based replication solutions like Londiste or Slony, we think
that thats not enough for various reasons:
- not in core (and thus less trustworthy)
- duplication of writes due to an additional log
- performance in general (check the end of the above presentation)
- complex to use because there is no native administration interface
We want to emphasize that this proposed architecture is based on the experience
of developing a minimal prototype which we developed with the above goals in
mind. While we obviously hope that a good part of it is reusable for the
community we definitely do *not* expect that the community accepts this
+as-is. It is intended to be the basis upon which we, the community, can build
and design the future logical replication.
=== Basic architecture === :
Very broadly speaking there are several major pieces common to most approaches
to replication:
1. Source data generation
2. Transportation of that data
3. Applying the changes
4. Conflict resolution
1.:
As we need a change stream that contains all required changes in the correct
order, the requirement for this stream to reflect changes across multiple
concurrent backends raises concurrency and scalability issues. Reusing the
WAL stream for this seems a good choice since it is needed anyway and adresses
those issues already, and it further means that we don't incur duplicate
writes. Any other stream generating componenent would introduce additional
scalability issues.
We need a change stream that contains all required changes in the correct order
which thus needs to be synchronized across concurrent backends which introduces
obvious concurrency/scalability issues.
Reusing the WAL stream for this seems a good choice since it is needed anyway
and adresses those issues already, and it further means we don't duplicate the
writes and locks already performance for its maintenance.
Unfortunately, in this case, the WAL is mostly a physical representation of the
changes and thus does not, by itself, contain the necessary information in a
convenient format to create logical changesets.
The biggest problem is, that interpreting tuples in the WAL stream requires an
up-to-date system catalog and needs to be done in a compatible backend and
architecture. The requirement of an up-to-date catalog could be solved by
adding more data to the WAL stream but it seems to be likely that that would
require relatively intrusive & complex changes. Instead we chose to require a
synchronized catalog at the decoding site. That adds some complexity to use
cases like replicating into a different database or cross-version
replication. For those it is relatively straight-forward to develop a proxy pg
instance that only contains the catalog and does the transformation to textual
changes.
This also is the solution to the other big problem, the need to work around
architecture/version specific binary formats. The alternative, producing
cross-version, cross-architecture compatible binary changes or even moreso
textual changes all the time seems to be prohibitively expensive. Both from a
cpu and a storage POV and also from the point of implementation effort.
The catalog on the site where changes originate can *not* be used for the
decoding because at the time we decode the WAL the catalog may have changed
from the state it was in when the WAL was generated. A possible solution for
this would be to have a fully versioned catalog but that again seems to be
rather complex and intrusive.
For some operations (UPDATE, DELETE) and corner-cases (e.g. full page writes)
additional data needs to be logged, but the additional amount of data isn't
that big. Requiring a primary-key for any change but INSERT seems to be a
sensible thing for now. The required changes are fully contained in heapam.c
and are pretty simple so far.
2.:
For transport of the non-decoded data from the originating site to the decoding
site we decided to reuse the infrastructure already provided by
walsender/walreceiver. We introduced a new command that, analogous to
START_REPLICATION, is called START_LOGICAL_REPLICATION that will stream out all
xlog records that pass through a filter.
The on-the-wire format stays the same. The filter currently simply filters out
all record which are not interesting for logical replication (indexes,
freezing, ...) and records that did not originate on the same system.
The requirement of filtering by 'origin' of a wal node comes from the planned
multimaster support. Changes replayed locally that originate from another site
should not replayed again there. If the wal is plainly used without such a
filter that would cause loops. Instead we tag every wal record with the "node
id" of the site that caused the change to happen and changes with a nodes own
"node id" won't get applied again.
Currently filtered records get simply replaced by NOOP records and loads of
zeroes which obviously is not a sensible solution. The difficulty of actually
removing the records is that that would change the LSNs. We currently rely on
those though.
The filtering might very well get expanded to support partial replication and
such in future.
3.:
To sensibly apply changes out of the WAL stream we need to solve two things:
Reassemble transactions and apply them to the target database.
The logical stream from 1. via 2. consists out of individual changes identified
by the relfilenode of the table and the xid of the transaction. Given
(sub)transactions, rollbacks, crash recovery, subtransactions and the like
those changes obviously cannot be individually applied without fully loosing
the pretence of consistency. To solve that we introduced a module, dubbed
ApplyCache which does the reassembling. This module is *independent* of the
data source and of the method of applying changes so it can be reused for
replicating into a foreign system or similar.
Due to the overhead of planner/executor/toast reassembly/type conversion (yes,
we benchmarked!) we decided against statement generation for apply. Even when
using prepared statements the overhead is rather noticeable.
Instead we decided to use relatively lowlevel heapam.h/genam.h accesses to do
the apply. For now we decided to use only one process to do the applying,
parallelizing that seems to be too complex for an introduction of an already
complex feature.
In our tests the apply process could keep up with pgbench -c/j 20+ generating
changes. This will obviously heavily depend on the workload. A fully seek bound
workload will definitely not scale that well.
Just to reiterate: Plugging in another method to do the apply should be a
relatively simple matter of setting up three callbacks to a different function
(begin, apply_change, commit).
Another complexity in this is how to synchronize the catalogs. We plan to use
command/event triggers and the oid preserving features from pg_upgrade to keep
the catalogs in-sync. We did not start working on that.
4.:
While we started to think about conflict resolution/avoidance we did not start
to work on it. We currently *cannot* handle conflicts. We think that the base
features/architecture should be aggreed uppon before starting with it.
Multimaster tests were done with sequences setup with INCREMENT 2 and different
start values on the two nodes.
=== Current Prototype ===
The current prototype consists of a series of patches that are split in
hopefully sensible and coherent parts to make reviewing of individual parts
possible.
Its also available in the 'cabal-rebasing' branch on
git.postgresql.org/users/andresfreund/postgres.git . That branch will modify
history though.
01: wakeup handling: reduces replication lag, not very interesting in this context
02: Add zeroRecPtr: not very interesting either
03: new syscache for relfilenode. This would benefit by some syscache experienced eyes
04: embedded lists: This is a general facility, general review appreciated
05: preliminary bgworker support: This is not ready and just posted as its
preliminary work for the other patches. Simon will post a real patch soon
06: XLogReader: Review definitely appreciated
07: logical data additions for WAL: Review definitely appreciated, I do not expect fundamental changes
08: ApplyCache: Important infrastructure for the patch, review definitely appreciated
09: Wal Decoding: Decode WAL generated with wal_level=logical into an ApplyCache
10: WAL with 'origin node': This is another important base-piece for logical rep
11: WAL segment handling changes: If the basic idea of adding a node_id to the
functions and adding a pg_lcr directory is acceptable the rest of the patch is
fairly boring/mechanical
12: walsender/walreceiver changes: Implement transport/filtering of logical
changes. Very relevant
13: shared memory/crash recovery state handling for logical rep: Very relevant
minus the TODO's in the commit message
14: apply module: review appreciated
15: apply process: somewhat dependent on the preliminary changes in 05, general
direction is visible, loads of detail work needed as soon as some design
decisions are agreed uppon.
16: this document. Not very interesting after youve read it ;)
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
From: Andres Freund <andres@anarazel.de>
The previous coding could miss xlog writeouts at several places. E.g. when wal
was written out by the background writer or even after a commit if
synchronous_commit=off.
This could lead to delays in sending data to the standby of up to 7 seconds.
To fix this move the responsibility of notification to the layer where the
neccessary information is actually present. We take some care not to do the
notification while we hold conteded locks like WALInsertLock or WalWriteLock
locks.
Document the preexisting fact that we rely on SetLatch to be safe from within
signal handlers and critical sections.
This removes the temporary bandaid from 2c8a4e9be2730342cbca85150a2a9d876aa77ff6
---
src/backend/access/transam/twophase.c | 21 -----------------
src/backend/access/transam/xact.c | 7 ------
src/backend/access/transam/xlog.c | 24 +++++++++++++------
src/backend/port/unix_latch.c | 3 +++
src/backend/port/win32_latch.c | 4 ++++
src/backend/replication/walsender.c | 41 ++++++++++++++++++++++++++++++++-
src/include/replication/walsender.h | 2 ++
7 files changed, 66 insertions(+), 36 deletions(-)
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index b94fae3..bdb7bcd 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1042,13 +1042,6 @@ EndPrepare(GlobalTransaction gxact)
/* If we crash now, we have prepared: WAL replay will fix things */
- /*
- * Wake up all walsenders to send WAL up to the PREPARE record immediately
- * if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
/* write correct CRC and close file */
if ((write(fd, &statefile_crc, sizeof(pg_crc32))) != sizeof(pg_crc32))
{
@@ -2045,13 +2038,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
/* Flush XLOG to disk */
XLogFlush(recptr);
- /*
- * Wake up all walsenders to send WAL up to the COMMIT PREPARED record
- * immediately if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
/* Mark the transaction committed in pg_clog */
TransactionIdCommitTree(xid, nchildren, children);
@@ -2133,13 +2119,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
XLogFlush(recptr);
/*
- * Wake up all walsenders to send WAL up to the ABORT PREPARED record
- * immediately if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
- /*
* Mark the transaction aborted in clog. This is not absolutely necessary
* but we may as well do it while we are here.
*/
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 8f00186..3cc2bfa 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1141,13 +1141,6 @@ RecordTransactionCommit(void)
XLogFlush(XactLastRecEnd);
/*
- * Wake up all walsenders to send WAL up to the COMMIT record
- * immediately if replication is enabled
- */
- if (max_wal_senders > 0)
- WalSndWakeup();
-
- /*
* Now we may update the CLOG, if we wrote a COMMIT record above
*/
if (markXidCommitted)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bcb71c4..166efb0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1017,6 +1017,8 @@ begin:;
END_CRIT_SECTION();
+ /* wakeup the WalSnd now that we released the WALWriteLock */
+ WalSndWakeupProcess();
return RecPtr;
}
@@ -1218,6 +1220,9 @@ begin:;
END_CRIT_SECTION();
+ /* wakeup the WalSnd now that we outside contented locks */
+ WalSndWakeupProcess();
+
return RecPtr;
}
@@ -1822,6 +1827,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
if (finishing_seg || (xlog_switch && last_iteration))
{
issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
+
+ /* signal that we need to wakeup WalSnd later */
+ WalSndWakeupRequest();
+
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
if (XLogArchivingActive())
@@ -1886,6 +1895,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
openLogOff = 0;
}
issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
+
+ /* signal that we need to wakeup WalSnd later */
+ WalSndWakeupRequest();
}
LogwrtResult.Flush = LogwrtResult.Write;
}
@@ -2149,6 +2161,9 @@ XLogFlush(XLogRecPtr record)
END_CRIT_SECTION();
+ /* wakeup the WalSnd now that we released the WALWriteLock */
+ WalSndWakeupProcess();
+
/*
* If we still haven't flushed to the request point then we have a
* problem; most likely, the requested flush point is past end of XLOG.
@@ -2274,13 +2289,8 @@ XLogBackgroundFlush(void)
END_CRIT_SECTION();
- /*
- * If we wrote something then we have something to send to standbys also,
- * otherwise the replication delay become around 7s with just async
- * commit.
- */
- if (wrote_something)
- WalSndWakeup();
+ /* wakeup the WalSnd now that we released the WALWriteLock */
+ WalSndWakeupProcess();
return wrote_something;
}
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 65b2fc5..335e9f6 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -418,6 +418,9 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
* NB: when calling this in a signal handler, be sure to save and restore
* errno around it. (That's standard practice in most signal handlers, of
* course, but we used to omit it in handlers that only set a flag.)
+ *
+ * NB: this function is called from critical sections and signal handlers so
+ * throwing an error is not a good idea.
*/
void
SetLatch(volatile Latch *latch)
diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c
index eb46dca..1f1ed33 100644
--- a/src/backend/port/win32_latch.c
+++ b/src/backend/port/win32_latch.c
@@ -247,6 +247,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
return result;
}
+/*
+ * The comments above the unix implementation (unix_latch.c) of this function
+ * apply here as well.
+ */
void
SetLatch(volatile Latch *latch)
{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45a3b2e..e44c734 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -107,6 +107,11 @@ static StringInfoData reply_message;
*/
static TimestampTz last_reply_timestamp;
+/*
+ * State for WalSndWakeupRequest
+ */
+static bool wroteNewXlogData = false;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -1424,7 +1429,12 @@ WalSndShmemInit(void)
}
}
-/* Wake up all walsenders */
+/*
+ * Wake up all walsenders
+ *
+ * This will be called inside critical sections, so throwing an error is not
+ * adviseable.
+ */
void
WalSndWakeup(void)
{
@@ -1434,6 +1444,35 @@ WalSndWakeup(void)
SetLatch(&WalSndCtl->walsnds[i].latch);
}
+/*
+ * Remember that we want to wakeup walsenders later
+ *
+ * This is separated from doing the actual wakeup because the writeout is done
+ * while holding contended locks.
+ */
+void
+WalSndWakeupRequest(void)
+{
+ wroteNewXlogData = true;
+}
+
+/*
+ * wakeup walsenders if there is work to be done
+ */
+void
+WalSndWakeupProcess(void)
+{
+ if(wroteNewXlogData){
+ wroteNewXlogData = false;
+ /*
+ * Wake up all walsenders to send WAL up to the point where its flushed
+ * safely to disk.
+ */
+ if (max_wal_senders > 0)
+ WalSndWakeup();
+ }
+}
+
/* Set state for current walsender (only called in walsender) */
void
WalSndSetState(WalSndState state)
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 128d2db..38191e7 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -31,6 +31,8 @@ extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
+extern void WalSndWakeupRequest(void);
+extern void WalSndWakeupProcess(void);
extern void WalSndRqstFileReload(void);
extern Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS);
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
This is locally defined in lots of places and would get introduced frequently
in the next commits. It is expected that this can be defined in a header-only
manner as soon as the XLogInsert scalability groundwork from Heikki gets in.
---
src/backend/access/transam/xlogutils.c | 1 +
src/include/access/xlogdefs.h | 1 +
2 files changed, 2 insertions(+)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6ddcc59..3a2462b 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -51,6 +51,7 @@ typedef struct xl_invalid_page
static HTAB *invalid_page_tab = NULL;
+XLogRecPtr zeroRecPtr = {0, 0};
/* Report a reference to an invalid page */
static void
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 5e6d7e6..2768427 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -35,6 +35,7 @@ typedef struct XLogRecPtr
uint32 xrecoff; /* byte offset of location in log file */
} XLogRecPtr;
+extern XLogRecPtr zeroRecPtr;
#define XLogRecPtrIsInvalid(r) ((r).xrecoff == 0)
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
This patch is problematic because formally indexes used by syscaches needs to
be unique, this one is not though because of 0/InvalidOids entries for
nailed/shared catalog entries. Those values aren't allowed to be queried though.
It might be nicer to add infrastructure to do this properly, I just don't have
a clue what the best way for this would be.
---
src/backend/utils/cache/syscache.c | 11 +++++++++++
src/include/catalog/indexing.h | 2 ++
src/include/utils/syscache.h | 1 +
3 files changed, 14 insertions(+)
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index c365ec7..9cfb013 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -588,6 +588,17 @@ static const struct cachedesc cacheinfo[] = {
},
1024
},
+ {RelationRelationId, /* RELFILENODE */
+ ClassRelfilenodeIndexId,
+ 1,
+ {
+ Anum_pg_class_relfilenode,
+ 0,
+ 0,
+ 0
+ },
+ 1024
+ },
{RewriteRelationId, /* RULERELNAME */
RewriteRelRulenameIndexId,
2,
diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h
index 450ec25..5c9419b 100644
--- a/src/include/catalog/indexing.h
+++ b/src/include/catalog/indexing.h
@@ -106,6 +106,8 @@ DECLARE_UNIQUE_INDEX(pg_class_oid_index, 2662, on pg_class using btree(oid oid_o
#define ClassOidIndexId 2662
DECLARE_UNIQUE_INDEX(pg_class_relname_nsp_index, 2663, on pg_class using btree(relname name_ops, relnamespace oid_ops));
#define ClassNameNspIndexId 2663
+DECLARE_INDEX(pg_class_relfilenode_index, 2844, on pg_class using btree(relfilenode oid_ops));
+#define ClassRelfilenodeIndexId 2844
DECLARE_UNIQUE_INDEX(pg_collation_name_enc_nsp_index, 3164, on pg_collation using btree(collname name_ops, collencoding int4_ops, collnamespace oid_ops));
#define CollationNameEncNspIndexId 3164
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index d59dd4e..63a5042 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -73,6 +73,7 @@ enum SysCacheIdentifier
RANGETYPE,
RELNAMENSP,
RELOID,
+ RELFILENODE,
RULERELNAME,
STATRELATTINH,
TABLESPACEOID,
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
Adds a single and a double linked list which can easily embedded into other
datastructures and can be used without any additional allocations.
Problematic: It requires USE_INLINE to be used. It could be remade to fallback
to to externally defined functions if that is not available but that hardly
seems sensibly at this day and age. Besides, the speed hit would be noticeable
and its only used in new code which could be disabled on machines - given they
still exists - without proper support for inline functions
---
src/include/utils/ilist.h | 253 +++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 253 insertions(+)
create mode 100644 src/include/utils/ilist.h
diff --git a/src/include/utils/ilist.h b/src/include/utils/ilist.h
new file mode 100644
index 0000000..81d540e
--- /dev/null
+++ b/src/include/utils/ilist.h
@@ -0,0 +1,253 @@
+#ifndef ILIST_H
+#define ILIST_H
+
+#ifdef __GNUC__
+#define unused_attr __attribute__((unused))
+#else
+#define unused_attr
+#endif
+
+#ifndef USE_INLINE
+#error "a compiler supporting static inlines is required"
+#endif
+
+#include <assert.h>
+
+typedef struct ilist_d_node ilist_d_node;
+
+struct ilist_d_node
+{
+ ilist_d_node* prev;
+ ilist_d_node* next;
+};
+
+typedef struct
+{
+ ilist_d_node head;
+} ilist_d_head;
+
+typedef struct ilist_s_node ilist_s_node;
+
+struct ilist_s_node
+{
+ ilist_s_node* next;
+};
+
+typedef struct
+{
+ ilist_s_node head;
+} ilist_s_head;
+
+#ifdef ILIST_DEBUG
+void ilist_d_check(ilist_d_head* head);
+#else
+static inline void ilist_d_check(ilist_d_head* head)
+{
+}
+#endif
+
+static inline void ilist_d_init(ilist_d_head *head)
+{
+ head->head.next = head->head.prev = &head->head;
+ ilist_d_check(head);
+}
+
+/*
+ * adds a node at the beginning of the list
+ */
+static inline void ilist_d_push_front(ilist_d_head *head, ilist_d_node *node)
+{
+ node->next = head->head.next;
+ node->prev = &head->head;
+ node->next->prev = node;
+ head->head.next = node;
+ ilist_d_check(head);
+}
+
+
+/*
+ * adds a node at the end of the list
+ */
+static inline void ilist_d_push_back(ilist_d_head *head, ilist_d_node *node)
+{
+ node->next = &head->head;
+ node->prev = head->head.prev;
+ node->prev->next = node;
+ head->head.prev = node;
+ ilist_d_check(head);
+}
+
+
+/*
+ * adds a node after another *in the same list*
+ */
+static inline void ilist_d_add_after(unused_attr ilist_d_head *head, ilist_d_node *after, ilist_d_node *node)
+{
+ node->prev = after;
+ node->next = after->next;
+ after->next = node;
+ node->next->prev = node;
+ ilist_d_check(head);
+}
+
+/*
+ * adds a node after another *in the same list*
+ */
+static inline void ilist_d_add_before(unused_attr ilist_d_head *head, ilist_d_node *before, ilist_d_node *node)
+{
+ node->prev = before->prev;
+ node->next = before;
+ before->prev = node;
+ node->prev->next = node;
+ ilist_d_check(head);
+}
+
+
+/*
+ * removes a node from a list
+ */
+static inline void ilist_d_remove(unused_attr ilist_d_head *head, ilist_d_node *node)
+{
+ ilist_d_check(head);
+ node->prev->next = node->next;
+ node->next->prev = node->prev;
+ ilist_d_check(head);
+}
+
+/*
+ * removes the first node from a list or returns NULL
+ */
+static inline ilist_d_node* ilist_d_pop_front(ilist_d_head *head)
+{
+ ilist_d_node* ret;
+
+ if (&head->head == head->head.next)
+ return NULL;
+
+ ret = head->head.next;
+ ilist_d_remove(head, head->head.next);
+ return ret;
+}
+
+
+static inline bool ilist_d_has_next(ilist_d_head *head, ilist_d_node *node)
+{
+ return node->next != &head->head;
+}
+
+static inline bool ilist_d_has_prev(ilist_d_head *head, ilist_d_node *node)
+{
+ return node->prev != &head->head;
+}
+
+static inline bool ilist_d_is_empty(ilist_d_head *head)
+{
+ return head->head.next == head->head.prev;
+}
+
+#define ilist_d_front(type, membername, ptr) (&((ptr)->head) == (ptr)->head.next) ? \
+ NULL : ilist_container(type, membername, (ptr)->head.next)
+
+#define ilist_d_front_unchecked(type, membername, ptr) ilist_container(type, membername, (ptr)->head.next)
+
+#define ilist_d_back(type, membername, ptr) (&((ptr)->head) == (ptr)->head.prev) ? \
+ NULL : ilist_container(type, membername, (ptr)->head.prev)
+
+#define ilist_container(type, membername, ptr) ((type*)((char*)(ptr) - offsetof(type, membername)))
+
+#define ilist_d_foreach(name, ptr) for(name = (ptr)->head.next; \
+ name != &(ptr)->head; \
+ name = name->next)
+
+#define ilist_d_foreach_modify(name, nxt, ptr) for(name = (ptr)->head.next, \
+ nxt = name->next; \
+ name != &(ptr)->head \
+ ; \
+ name = nxt, nxt = name->next)
+
+static inline void ilist_s_init(ilist_s_head *head)
+{
+ head->head.next = NULL;
+}
+
+static inline void ilist_s_push_front(ilist_s_head *head, ilist_s_node *node)
+{
+ node->next = head->head.next;
+ head->head.next = node;
+}
+
+/*
+ * fails if the list is empty
+ */
+static inline ilist_s_node* ilist_s_pop_front(ilist_s_head *head)
+{
+ ilist_s_node* front = head->head.next;
+ head->head.next = head->head.next->next;
+ return front;
+}
+
+/*
+ * removes a node from a list
+ * Attention: O(n)
+ */
+static inline void ilist_s_remove(ilist_s_head *head,
+ ilist_s_node *node)
+{
+ ilist_s_node *last = &head->head;
+ ilist_s_node *cur;
+#ifndef NDEBUG
+ bool found = false;
+#endif
+ while ((cur = last->next))
+ {
+ if (cur == node)
+ {
+ last->next = cur->next;
+#ifndef NDEBUG
+ found = true;
+#endif
+ break;
+ }
+ last = cur;
+ }
+ assert(found);
+}
+
+
+static inline void ilist_s_add_after(unused_attr ilist_s_head *head,
+ ilist_s_node *after, ilist_s_node *node)
+{
+ node->next = after->next;
+ after->next = node;
+}
+
+
+static inline bool ilist_s_is_empty(ilist_s_head *head)
+{
+ return head->head.next == NULL;
+}
+
+static inline bool ilist_s_has_next(unused_attr ilist_s_head* head,
+ ilist_s_node *node)
+{
+ return node->next != NULL;
+}
+
+
+#define ilist_s_front(type, membername, ptr) (ilist_s_is_empty(ptr) ? \
+ ilist_container(type, membername, (ptr).next) : NULL
+
+#define ilist_s_front_unchecked(type, membername, ptr) \
+ ilist_container(type, membername, (ptr)->head.next)
+
+#define ilist_s_foreach(name, ptr) for(name = (ptr)->head.next; \
+ name != NULL; \
+ name = name->next)
+
+#define ilist_s_foreach_modify(name, nxt, ptr) for(name = (ptr)->head.next, \
+ nxt = name ? name->next : NULL; \
+ name != NULL; \
+ name = nxt, nxt = name ? name->next : NULL)
+
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty
From: Simon Riggs <simon@2ndquadrant.com>
Early prototype that allows for just 1 bgworker which calls a function called
do_applyprocess(). Expect major changes in this, but not in ways that would
effect the apply process.
---
src/backend/postmaster/Makefile | 4 +-
src/backend/postmaster/bgworker.c | 403 +++++++++++++++++++++++++
src/backend/postmaster/postmaster.c | 91 ++++--
src/backend/tcop/postgres.c | 5 +
src/backend/utils/init/miscinit.c | 5 +-
src/backend/utils/init/postinit.c | 3 +-
src/backend/utils/misc/guc.c | 37 ++-
src/backend/utils/misc/postgresql.conf.sample | 4 +
src/include/postmaster/bgworker.h | 29 ++
9 files changed, 550 insertions(+), 31 deletions(-)
create mode 100644 src/backend/postmaster/bgworker.c
create mode 100644 src/include/postmaster/bgworker.h
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index 3056b09..7b23353 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -12,7 +12,7 @@ subdir = src/backend/postmaster
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
-OBJS = autovacuum.o bgwriter.o fork_process.o pgarch.o pgstat.o postmaster.o \
- startup.o syslogger.o walwriter.o checkpointer.o
+OBJS = autovacuum.o bgworker.o bgwriter.o fork_process.o pgarch.o pgstat.o \
+ postmaster.o startup.o syslogger.o walwriter.o checkpointer.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
new file mode 100644
index 0000000..8144050
--- /dev/null
+++ b/src/backend/postmaster/bgworker.c
@@ -0,0 +1,403 @@
+/*-------------------------------------------------------------------------
+ *
+ * bgworker.c
+ *
+ * PostgreSQL Integrated Worker Daemon
+ *
+ * Background workers can execute arbitrary user code. A shared library
+ * can request creation of a worker using RequestAddinBGWorkerProcess().
+ *
+ * The worker process is forked from the postmaster and then attaches
+ * to shared memory similarly to an autovacuum worker and finally begins
+ * executing the supplied WorkerMain function.
+ *
+ * If the fork() call fails in the postmaster, it will try again later.
+ * Note that the failure can only be transient (fork failure due to
+ * high load, memory pressure, too many processes, etc); more permanent
+ * problems, like failure to connect to a database, are detected later in the
+ * worker and dealt with just by having the worker exit normally. Postmaster
+ * will launch a new worker again later.
+ *
+ * Note that there can be more than one worker in a database concurrently.
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/postmaster/bgworker.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/time.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "access/heapam.h"
+#include "access/reloptions.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "catalog/dependency.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_database.h"
+#include "commands/dbcommands.h"
+#include "commands/vacuum.h"
+#include "libpq/pqsignal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/fork_process.h"
+#include "postmaster/postmaster.h"
+#include "storage/bufmgr.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/pmsignal.h"
+#include "storage/proc.h"
+#include "storage/procsignal.h"
+#include "storage/sinvaladt.h"
+#include "tcop/tcopprot.h"
+#include "utils/fmgroids.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/ps_status.h"
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/syscache.h"
+#include "utils/timestamp.h"
+#include "utils/tqual.h"
+
+
+/*
+ * GUC parameters
+ */
+int MaxWorkers;
+
+static int bgworker_addin_request = 0;
+static bool bgworker_addin_request_allowed = true;
+
+/* Flags to tell if we are in a worker process */
+static bool am_bgworker = false;
+
+/* Flags set by signal handlers */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t got_SIGUSR2 = false;
+static volatile sig_atomic_t got_SIGTERM = false;
+
+static void bgworker_sigterm_handler(SIGNAL_ARGS);
+
+NON_EXEC_STATIC void BgWorkerMain(int argc, char *argv[]);
+
+static bool do_logicalapply(void);
+
+/********************************************************************
+ * BGWORKER CODE
+ ********************************************************************/
+
+/* SIGTERM: time to die */
+static void
+bgworker_sigterm_handler(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_SIGTERM = true;
+ if (MyProc)
+ SetLatch(&MyProc->procLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * Main entry point for background worker process, to be called from the
+ * postmaster.
+ *
+ * This code is heavily based on autovacuum.c, q.v.
+ */
+int
+StartBgWorker(void)
+{
+ pid_t worker_pid;
+
+#ifdef EXEC_BACKEND
+ switch ((worker_pid = bgworker_forkexec()))
+#else
+ switch ((worker_pid = fork_process()))
+#endif
+ {
+ case -1:
+ ereport(LOG,
+ (errmsg("could not fork worker process: %m")));
+ return 0;
+
+#ifndef EXEC_BACKEND
+ case 0:
+ /* in postmaster child ... */
+ /* Close the postmaster's sockets */
+ ClosePostmasterPorts(false);
+
+ /* Lose the postmaster's on-exit routines */
+ on_exit_reset();
+
+ BgWorkerMain(0, NULL);
+ break;
+#endif
+ default:
+ return (int) worker_pid;
+ }
+
+ /* shouldn't get here */
+ return 0;
+}
+
+/*
+ * BgWorkerMain
+ */
+NON_EXEC_STATIC void
+BgWorkerMain(int argc, char *argv[])
+{
+ sigjmp_buf local_sigjmp_buf;
+ //Oid dbid = 12037; /* kluge to set dbid for "Postgres" */
+ bool init = false;
+
+ /* we are a postmaster subprocess now */
+ IsUnderPostmaster = true;
+ am_bgworker = true;
+
+ /* reset MyProcPid */
+ MyProcPid = getpid();
+
+ /* record Start Time for logging */
+ MyStartTime = time(NULL);
+
+ /* Identify myself via ps */
+ init_ps_display("worker process", "", "", "");
+
+ SetProcessingMode(InitProcessing);
+
+ /*
+ * If possible, make this process a group leader, so that the postmaster
+ * can signal any child processes too. (autovacuum probably never has any
+ * child processes, but for consistency we make all postmaster child
+ * processes do this.)
+ */
+#ifdef HAVE_SETSID
+ if (setsid() < 0)
+ elog(FATAL, "setsid() failed: %m");
+#endif
+
+ /*
+ * Set up signal handlers. We operate on databases much like a regular
+ * backend, so we use the same signal handling. See equivalent code in
+ * tcop/postgres.c.
+ *
+ * Currently, we don't pay attention to postgresql.conf changes that
+ * happen during a single daemon iteration, so we can ignore SIGHUP.
+ */
+ pqsignal(SIGHUP, SIG_IGN);
+
+ /*
+ * SIGINT is used to signal canceling the current action; SIGTERM
+ * means abort and exit cleanly, and SIGQUIT means abandon ship.
+ */
+ pqsignal(SIGINT, StatementCancelHandler);
+ pqsignal(SIGTERM, bgworker_sigterm_handler); // was die);
+ pqsignal(SIGQUIT, quickdie);
+ pqsignal(SIGALRM, handle_sig_alarm);
+
+ pqsignal(SIGPIPE, SIG_IGN);
+ pqsignal(SIGUSR1, procsignal_sigusr1_handler);
+ pqsignal(SIGUSR2, SIG_IGN);
+ pqsignal(SIGFPE, FloatExceptionHandler);
+ pqsignal(SIGCHLD, SIG_DFL);
+
+ /* Early initialization */
+ BaseInit();
+
+ /*
+ * Create a per-backend PGPROC struct in shared memory, except in the
+ * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
+ * this before we can use LWLocks (and in the EXEC_BACKEND case we already
+ * had to do some stuff with LWLocks).
+ */
+#ifndef EXEC_BACKEND
+ InitProcess();
+#endif
+
+ /*
+ * If an exception is encountered, processing resumes here.
+ *
+ * See notes in postgres.c about the design of this coding.
+ */
+ if (sigsetjmp(local_sigjmp_buf, 1) != 0)
+ {
+ /* Prevents interrupts while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /* Report the error to the server log */
+ EmitErrorReport();
+
+ /*
+ * We can now go away. Note that because we called InitProcess, a
+ * callback was registered to do ProcKill, which will clean up
+ * necessary state.
+ */
+ proc_exit(0);
+ }
+
+ /* We can now handle ereport(ERROR) */
+ PG_exception_stack = &local_sigjmp_buf;
+
+ PG_SETMASK(&UnBlockSig);
+
+ /*
+ * Force zero_damaged_pages OFF in a worker process, even if it is set
+ * in postgresql.conf. We don't really want such a dangerous option being
+ * applied non-interactively.
+ */
+ SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /*
+ * Force statement_timeout to zero to avoid a timeout setting from
+ * preventing regular maintenance from being executed.
+ */
+ SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
+
+ /*
+ * Force default_transaction_isolation to READ COMMITTED. We don't
+ * want to pay the overhead of serializable mode, nor add any risk
+ * of causing deadlocks or delaying other transactions.
+ */
+ SetConfigOption("default_transaction_isolation", "read committed",
+ PGC_SUSET, PGC_S_OVERRIDE);
+
+ /*
+ * Force synchronous replication off to allow regular maintenance even if
+ * we are waiting for standbys to connect. This is important to ensure we
+ * aren't blocked from performing anti-wraparound tasks.
+ */
+ if (synchronous_commit > SYNCHRONOUS_COMMIT_LOCAL_FLUSH)
+ SetConfigOption("synchronous_commit", "local",
+ PGC_SUSET, PGC_S_OVERRIDE);
+
+ for (;;)
+ {
+ bool not_idle;
+
+ /* the normal shutdown case */
+ if (got_SIGTERM)
+ break;
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ if (!init)
+ {
+ char dbname[NAMEDATALEN] = "postgres";
+
+ /*
+ * Connect to the selected database
+ *
+ * Note: if we have selected a just-deleted database (due to using
+ * stale stats info), we'll fail and exit here.
+ *
+ * Note that MyProcPort is not setup correctly, so normal
+ * authentication will simply fail. This is bypassed by moving
+ * straight to superuser mode, using same trick as autovacuum.
+ */
+ InitPostgres(dbname, InvalidOid, NULL, NULL);
+ SetProcessingMode(NormalProcessing);
+ ereport(LOG,
+ (errmsg("starting worker process on database \"%s\"", dbname)));
+
+ if (PostAuthDelay)
+ pg_usleep(PostAuthDelay * 1000000L);
+
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "worker process");
+
+ init = true;
+ }
+
+ /*
+ * If we're initialised correctly we can call the worker code.
+ */
+ if (init)
+ not_idle = do_logicalapply();
+
+ if(!not_idle){
+ /* Just for test and can be removed. */
+ pg_usleep(100000L);
+ }
+ }
+
+ /* Normal exit from the bgworker is here */
+ ereport(LOG,
+ (errmsg("worker shutting down")));
+
+ /* All done, go away */
+ proc_exit(0);
+}
+
+bool
+IsWorkerProcess(void)
+{
+ return am_bgworker;
+}
+
+/*
+ * RequestAddinBgWorkerProcess
+ * Request a background worker process
+ *
+ * This is only useful if called from the _PG_init hook of a library that
+ * is loaded into the postmaster via shared_preload_libraries. Once
+ * shared memory has been allocated, calls will be ignored. (We could
+ * raise an error, but it seems better to make it a no-op, so that
+ * libraries containing such calls can be reloaded if needed.)
+ */
+void
+RequestAddinBgWorkerProcess(const char *WorkerName,
+ void *Main,
+ const char *DBname)
+{
+ if (IsUnderPostmaster || !bgworker_addin_request_allowed)
+ return; /* too late */
+ bgworker_addin_request++;
+}
+
+/*
+ * Compute number of BgWorkers to allocate.
+ */
+int
+NumBgWorkers(void)
+{
+ return 1;
+
+#ifdef UNUSED
+ int numWorkers;
+
+ /*
+ * Include number of workers required by server, for example,
+ * parallel query worker tasks.
+ */
+
+ /*
+ * Add any requested by loadable modules.
+ */
+ bgworker_addin_request_allowed = false;
+ numWorkers += bgworker_addin_request;
+
+ return numWorkers;
+#endif
+}
+
+static bool
+do_logicalapply(void)
+{
+ elog(LOG, "doing logical apply");
+ return false;
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index eeea933..71cfd6d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -103,6 +103,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgworker.h"
#include "postmaster/fork_process.h"
#include "postmaster/pgarch.h"
#include "postmaster/postmaster.h"
@@ -131,7 +132,7 @@
* children we have and send them appropriate signals when necessary.
*
* "Special" children such as the startup, bgwriter and autovacuum launcher
- * tasks are not in this list. Autovacuum worker and walsender processes are
+ * tasks are not in this list. All worker and walsender processes are
* in it. Also, "dead_end" children are in it: these are children launched just
* for the purpose of sending a friendly rejection message to a would-be
* client. We must track them because they are attached to shared memory,
@@ -144,6 +145,7 @@ typedef struct bkend
long cancel_key; /* cancel key for cancels for this backend */
int child_slot; /* PMChildSlot for this backend, if any */
bool is_autovacuum; /* is it an autovacuum process? */
+ bool is_bgworker; /* is it a bgworker process? */
bool dead_end; /* is it going to send an error and quit? */
Dlelem elem; /* list link in BackendList */
} Backend;
@@ -216,6 +218,8 @@ static pid_t StartupPID = 0,
PgStatPID = 0,
SysLoggerPID = 0;
+static pid_t *BgWorkerPID; /* Array of PIDs of bg workers */
+
/* Startup/shutdown state */
#define NoShutdown 0
#define SmartShutdown 1
@@ -303,6 +307,8 @@ static volatile sig_atomic_t start_autovac_launcher = false;
/* the launcher needs to be signalled to communicate some condition */
static volatile bool avlauncher_needs_signal = false;
+static int NWorkers;
+
/*
* State for assigning random salts and cancel keys.
* Also, the global MyCancelKey passes the cancel key assigned to a given
@@ -366,12 +372,16 @@ static bool SignalSomeChildren(int signal, int targets);
#define BACKEND_TYPE_NORMAL 0x0001 /* normal backend */
#define BACKEND_TYPE_AUTOVAC 0x0002 /* autovacuum worker process */
#define BACKEND_TYPE_WALSND 0x0004 /* walsender process */
-#define BACKEND_TYPE_ALL 0x0007 /* OR of all the above */
+#define BACKEND_TYPE_BGWORKER 0x0008 /* general bgworker process */
+#define BACKEND_TYPE_ALL 0x000F /* OR of all the above */
+
+#define BACKEND_TYPE_WORKER (BACKEND_TYPE_AUTOVAC | BACKEND_TYPE_BGWORKER)
static int CountChildren(int target);
+static void StartBackgroundWorkers(void);
static bool CreateOptsFile(int argc, char *argv[], char *fullprogname);
static pid_t StartChildProcess(AuxProcType type);
-static void StartAutovacuumWorker(void);
+static int StartWorker(bool is_autovacuum);
static void InitPostmasterDeathWatchHandle(void);
#ifdef EXEC_BACKEND
@@ -1037,7 +1047,7 @@ PostmasterMain(int argc, char *argv[])
* handling setup of child processes. See tcop/postgres.c,
* bootstrap/bootstrap.c, postmaster/bgwriter.c, postmaster/walwriter.c,
* postmaster/autovacuum.c, postmaster/pgarch.c, postmaster/pgstat.c,
- * postmaster/syslogger.c and postmaster/checkpointer.c.
+ * postmaster/syslogger.c, postmaster/bgworker.c and postmaster/checkpointer.c
*/
pqinitmask();
PG_SETMASK(&BlockSig);
@@ -1085,6 +1095,17 @@ PostmasterMain(int argc, char *argv[])
autovac_init();
/*
+ * Allocate background workers actually required.
+ */
+ NWorkers = NumBgWorkers();
+ if (NWorkers > 0)
+ {
+ BgWorkerPID = (pid_t *) MemoryContextAlloc(TopMemoryContext,
+ NWorkers * sizeof(pid_t));
+ memset(BgWorkerPID, 0, NWorkers * sizeof(pid_t));
+ }
+
+ /*
* Load configuration files for client authentication.
*/
if (!load_hba())
@@ -1428,6 +1449,10 @@ ServerLoop(void)
kill(AutoVacPID, SIGUSR2);
}
+ /* Check all the workers requested are running. */
+ if (pmState == PM_RUN)
+ StartBackgroundWorkers();
+
/*
* Touch the socket and lock file every 58 minutes, to ensure that
* they are not removed by overzealous /tmp-cleaning tasks. We assume
@@ -2133,8 +2158,8 @@ pmdie(SIGNAL_ARGS)
if (pmState == PM_RUN || pmState == PM_RECOVERY ||
pmState == PM_HOT_STANDBY || pmState == PM_STARTUP)
{
- /* autovacuum workers are told to shut down immediately */
- SignalSomeChildren(SIGTERM, BACKEND_TYPE_AUTOVAC);
+ /* workers are told to shut down immediately */
+ SignalSomeChildren(SIGTERM, BACKEND_TYPE_WORKER);
/* and the autovac launcher too */
if (AutoVacPID != 0)
signal_child(AutoVacPID, SIGTERM);
@@ -2203,9 +2228,9 @@ pmdie(SIGNAL_ARGS)
{
ereport(LOG,
(errmsg("aborting any active transactions")));
- /* shut down all backends and autovac workers */
+ /* shut down all backends and workers */
SignalSomeChildren(SIGTERM,
- BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC);
+ BACKEND_TYPE_NORMAL | BACKEND_TYPE_WORKER);
/* and the autovac launcher too */
if (AutoVacPID != 0)
signal_child(AutoVacPID, SIGTERM);
@@ -2396,6 +2421,7 @@ reaper(SIGNAL_ARGS)
PgArchPID = pgarch_start();
if (PgStatPID == 0)
PgStatPID = pgstat_start();
+ StartBackgroundWorkers();
/* at this point we are really open for business */
ereport(LOG,
@@ -2963,7 +2989,7 @@ PostmasterStateMachine(void)
* later after writing the checkpoint record, like the archiver
* process.
*/
- if (CountChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC) == 0 &&
+ if (CountChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_WORKER) == 0 &&
StartupPID == 0 &&
WalReceiverPID == 0 &&
BgWriterPID == 0 &&
@@ -3202,6 +3228,8 @@ SignalSomeChildren(int signal, int target)
if (bp->is_autovacuum)
child = BACKEND_TYPE_AUTOVAC;
+ else if (bp->is_bgworker)
+ child = BACKEND_TYPE_BGWORKER;
else if (IsPostmasterChildWalSender(bp->child_slot))
child = BACKEND_TYPE_WALSND;
else
@@ -3224,7 +3252,7 @@ SignalSomeChildren(int signal, int target)
*
* returns: STATUS_ERROR if the fork failed, STATUS_OK otherwise.
*
- * Note: if you change this code, also consider StartAutovacuumWorker.
+ * Note: if you change this code, also consider StartWorker.
*/
static int
BackendStartup(Port *port)
@@ -3325,6 +3353,7 @@ BackendStartup(Port *port)
*/
bn->pid = pid;
bn->is_autovacuum = false;
+ bn->is_bgworker = false;
DLInitElem(&bn->elem, bn);
DLAddHead(BackendList, &bn->elem);
#ifdef EXEC_BACKEND
@@ -4302,7 +4331,7 @@ sigusr1_handler(SIGNAL_ARGS)
if (CheckPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER))
{
/* The autovacuum launcher wants us to start a worker process. */
- StartAutovacuumWorker();
+ (void) StartWorker(true);
}
if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER) &&
@@ -4448,6 +4477,8 @@ CountChildren(int target)
if (bp->is_autovacuum)
child = BACKEND_TYPE_AUTOVAC;
+ else if (bp->is_bgworker)
+ child = BACKEND_TYPE_BGWORKER;
else if (IsPostmasterChildWalSender(bp->child_slot))
child = BACKEND_TYPE_WALSND;
else
@@ -4570,16 +4601,16 @@ StartChildProcess(AuxProcType type)
}
/*
- * StartAutovacuumWorker
- * Start an autovac worker process.
+ * StartWorker
+ * Start a worker process either for autovacuum or more generally.
*
* This function is here because it enters the resulting PID into the
* postmaster's private backends list.
*
* NB -- this code very roughly matches BackendStartup.
*/
-static void
-StartAutovacuumWorker(void)
+static int
+StartWorker(bool is_autovacuum)
{
Backend *bn;
@@ -4608,22 +4639,26 @@ StartAutovacuumWorker(void)
bn->dead_end = false;
bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
- bn->pid = StartAutoVacWorker();
+ if (is_autovacuum)
+ bn->pid = StartAutoVacWorker();
+ else
+ bn->pid = StartBgWorker();
+
if (bn->pid > 0)
{
- bn->is_autovacuum = true;
+ bn->is_autovacuum = is_autovacuum;
DLInitElem(&bn->elem, bn);
DLAddHead(BackendList, &bn->elem);
#ifdef EXEC_BACKEND
ShmemBackendArrayAdd(bn);
#endif
/* all OK */
- return;
+ return bn->pid;
}
/*
* fork failed, fall through to report -- actual error message was
- * logged by StartAutoVacWorker
+ * logged by Start...Worker
*/
(void) ReleasePostmasterChildSlot(bn->child_slot);
free(bn);
@@ -4643,11 +4678,25 @@ StartAutovacuumWorker(void)
* quick succession between the autovac launcher and postmaster in case
* things get ugly.
*/
- if (AutoVacPID != 0)
+ if (is_autovacuum && AutoVacPID != 0)
{
AutoVacWorkerFailed();
avlauncher_needs_signal = true;
}
+
+ return 0;
+}
+
+static void
+StartBackgroundWorkers(void)
+{
+ int i;
+
+ for (i = 0; i < NWorkers; i++)
+ {
+ if (BgWorkerPID[i] == 0)
+ BgWorkerPID[i] = StartWorker(false);
+ }
}
/*
@@ -4687,7 +4736,7 @@ CreateOptsFile(int argc, char *argv[], char *fullprogname)
*
* This reports the number of entries needed in per-child-process arrays
* (the PMChildFlags array, and if EXEC_BACKEND the ShmemBackendArray).
- * These arrays include regular backends, autovac workers and walsenders,
+ * These arrays include regular backends, all workers and walsenders,
* but not special children nor dead_end children. This allows the arrays
* to have a fixed maximum size, to wit the same too-many-children limit
* enforced by canAcceptConnections(). The exact value isn't too critical
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 51b6df5..5aead05 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -56,6 +56,7 @@
#include "parser/analyze.h"
#include "parser/parser.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "replication/walsender.h"
#include "rewrite/rewriteHandler.h"
@@ -2841,6 +2842,10 @@ ProcessInterrupts(void)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("terminating autovacuum process due to administrator command")));
+ else if (IsWorkerProcess())
+ ereport(FATAL,
+ (errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating worker process due to administrator command")));
else if (RecoveryConflictPending && RecoveryConflictRetryable)
{
pgstat_report_recovery_conflict(RecoveryConflictReason);
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index fb376a0..f7ae60a 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -33,6 +33,7 @@
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -498,9 +499,9 @@ InitializeSessionUserIdStandalone(void)
{
/*
* This function should only be called in single-user mode and in
- * autovacuum workers.
+ * autovacuum or background workers.
*/
- AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess());
+ AssertState(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsWorkerProcess());
/* call only once */
AssertState(!OidIsValid(AuthenticatedUserId));
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 1baa67d..3208b5e7 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -36,6 +36,7 @@
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "postmaster/postmaster.h"
+#include "postmaster/bgworker.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
@@ -584,7 +585,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
* In standalone mode and in autovacuum worker processes, we use a fixed
* ID, otherwise we figure it out from the authenticated user name.
*/
- if (bootstrap || IsAutoVacuumWorkerProcess())
+ if (bootstrap || IsAutoVacuumWorkerProcess() || IsWorkerProcess())
{
InitializeSessionUserIdStandalone();
am_superuser = true;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b756e58..93c798b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -52,6 +52,7 @@
#include "parser/scansup.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
+#include "postmaster/bgworker.h"
#include "postmaster/bgwriter.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
@@ -107,7 +108,7 @@
* removed, we still could not exceed INT_MAX/4 because some places compute
* 4*MaxBackends without any overflow check. This is rechecked in
* check_maxconnections, since MaxBackends is computed as MaxConnections
- * plus autovacuum_max_workers plus one (for the autovacuum launcher).
+ * plus max_workers plus autovacuum_max_workers plus one (for the autovacuum launcher).
*/
#define MAX_BACKENDS 0x7fffff
@@ -197,6 +198,8 @@ static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
static bool check_maxconnections(int *newval, void **extra, GucSource source);
static void assign_maxconnections(int newval, void *extra);
+static bool check_maxworkers(int *newval, void **extra, GucSource source);
+static void assign_maxworkers(int newval, void *extra);
static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
static void assign_autovacuum_max_workers(int newval, void *extra);
static bool check_effective_io_concurrency(int *newval, void **extra, GucSource source);
@@ -1605,6 +1608,16 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"max_workers", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
+ gettext_noop("Sets the maximum number of background worker processes."),
+ NULL
+ },
+ &MaxWorkers,
+ 10, 1, MAX_BACKENDS,
+ check_maxworkers, assign_maxworkers, NULL
+ },
+
+ {
{"superuser_reserved_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the number of connection slots reserved for superusers."),
NULL
@@ -8605,7 +8618,7 @@ show_tcp_keepalives_count(void)
static bool
check_maxconnections(int *newval, void **extra, GucSource source)
{
- if (*newval + autovacuum_max_workers + 1 > MAX_BACKENDS)
+ if (*newval + MaxWorkers + autovacuum_max_workers + 1 > MAX_BACKENDS)
return false;
return true;
}
@@ -8613,13 +8626,27 @@ check_maxconnections(int *newval, void **extra, GucSource source)
static void
assign_maxconnections(int newval, void *extra)
{
- MaxBackends = newval + autovacuum_max_workers + 1;
+ MaxBackends = newval + MaxWorkers + autovacuum_max_workers + 1;
+}
+
+static bool
+check_maxworkers(int *newval, void **extra, GucSource source)
+{
+ if (*newval + MaxConnections + autovacuum_max_workers + 1 > MAX_BACKENDS)
+ return false;
+ return true;
+}
+
+static void
+assign_maxworkers(int newval, void *extra)
+{
+ MaxBackends = newval + MaxConnections + autovacuum_max_workers + 1;
}
static bool
check_autovacuum_max_workers(int *newval, void **extra, GucSource source)
{
- if (MaxConnections + *newval + 1 > MAX_BACKENDS)
+ if (MaxConnections + MaxWorkers + *newval + 1 > MAX_BACKENDS)
return false;
return true;
}
@@ -8627,7 +8654,7 @@ check_autovacuum_max_workers(int *newval, void **extra, GucSource source)
static void
assign_autovacuum_max_workers(int newval, void *extra)
{
- MaxBackends = MaxConnections + newval + 1;
+ MaxBackends = MaxConnections + MaxWorkers + newval + 1;
}
static bool
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index fa75d00..ce3fc08 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -148,6 +148,10 @@
#bgwriter_lru_maxpages = 100 # 0-1000 max buffers written/round
#bgwriter_lru_multiplier = 2.0 # 0-10.0 multipler on buffers scanned/round
+# - Background Workers -
+#max_workers = 10 # max number of general worker subprocesses
+ # (change requires restart)
+
# - Asynchronous Behavior -
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h
new file mode 100644
index 0000000..92d0a75
--- /dev/null
+++ b/src/include/postmaster/bgworker.h
@@ -0,0 +1,29 @@
+/*-------------------------------------------------------------------------
+ *
+ * bgworker.h
+ * header file for integrated background worker daemon
+ *
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/postmaster/bgworker.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef BGWORKER_H
+#define BGWORKER_H
+
+
+/* GUC variables */
+int MaxWorkers;
+
+extern int StartBgWorker(void);
+extern int NumBgWorkers(void);
+
+extern bool IsWorkerProcess(void);
+extern void RequestAddinBgWorkerProcess(const char *WorkerName,
+ void *Main,
+ const char *DBname);
+
+#endif /* BGWORKER_H */
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
Features:
- streaming reading/writing
- filtering
- reassembly of records
Reusing the ReadRecord infrastructure in situations where the code that wants
to do so is not tightly integrated into xlog.c is rather hard and would require
changes to rather integral parts of the recovery code which doesn't seem to be
a good idea.
Missing:
- "compressing" the stream when removing uninteresting records
- writing out correct CRCs
- validating CRCs
- separating reader/writer
---
src/backend/access/transam/Makefile | 2 +-
src/backend/access/transam/xlogreader.c | 914 +++++++++++++++++++++++++++++++
src/include/access/xlogreader.h | 173 ++++++
3 files changed, 1088 insertions(+), 1 deletion(-)
create mode 100644 src/backend/access/transam/xlogreader.c
create mode 100644 src/include/access/xlogreader.h
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index f82f10e..660b5fc 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
- twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogutils.o
+ twophase.o twophase_rmgr.o xlog.o xlogfuncs.o xlogreader.o xlogutils.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
new file mode 100644
index 0000000..6f15d66
--- /dev/null
+++ b/src/backend/access/transam/xlogreader.c
@@ -0,0 +1,914 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogreader.c
+ *
+ * Aa somewhat generic xlog read interface
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/readxlog.c
+ *
+ *-------------------------------------------------------------------------
+ *
+ * FIXME:
+ * * CRC computation
+ * * separation of reader/writer
+ */
+
+#include "postgres.h"
+
+#include "access/xlog_internal.h"
+#include "access/transam.h"
+#include "catalog/pg_control.h"
+#include "access/xlogreader.h"
+
+/* FIXME */
+#include "replication/walsender_private.h"
+#include "replication/walprotocol.h"
+
+//#define VERBOSE_DEBUG
+
+XLogReaderState* XLogReaderAllocate(void)
+{
+ XLogReaderState* state = (XLogReaderState*)malloc(sizeof(XLogReaderState));
+ int i;
+
+ if (!state)
+ goto oom;
+
+ memset(&state->buf.record, 0, sizeof(XLogRecord));
+ state->buf.record_data_size = XLOG_BLCKSZ*8;
+ state->buf.record_data =
+ malloc(state->buf.record_data_size);
+
+ if (!state->buf.record_data)
+ goto oom;
+
+ if (!state)
+ goto oom;
+
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ {
+ state->buf.bkp_block_data[i] =
+ malloc(BLCKSZ);
+
+ if (!state->buf.bkp_block_data[i])
+ goto oom;
+ }
+ XLogReaderReset(state);
+ return state;
+
+oom:
+ elog(ERROR, "could not allocate memory for XLogReaderState");
+ return 0;
+}
+
+void XLogReaderReset(XLogReaderState* state)
+{
+ state->in_record = false;
+ state->in_bkp_blocks = 0;
+ state->in_bkp_block_header = false;
+ state->in_skip = false;
+ state->remaining_size = 0;
+ state->nbytes = 0;
+ state->incomplete = false;
+ state->initialized = false;
+ state->needs_input = false;
+ state->needs_output = false;
+}
+
+static inline bool
+XLogReaderHasInput(XLogReaderState* state, Size size)
+{
+ XLogRecPtr tmp = state->curptr;
+ XLByteAdvance(tmp, size);
+ if (XLByteLE(state->endptr, tmp))
+ return false;
+ return true;
+}
+
+static inline bool
+XLogReaderHasOutput(XLogReaderState* state, Size size){
+ if (state->nbytes + size > MAX_SEND_SIZE)
+ return false;
+ return true;
+}
+
+static inline bool
+XLogReaderHasSpace(XLogReaderState* state, Size size)
+{
+ XLogRecPtr tmp = state->curptr;
+ XLByteAdvance(tmp, size);
+ if (XLByteLE(state->endptr, tmp))
+ return false;
+ else if (state->nbytes + size > MAX_SEND_SIZE)
+ return false;
+ return true;
+}
+
+void
+XLogReaderRead(XLogReaderState* state)
+{
+ XLogRecord* temp_record;
+
+ state->needs_input = false;
+ state->needs_output = false;
+
+ /*
+ * Do some basic sanity checking and setup if were starting anew.
+ */
+ if (!state->initialized)
+ {
+ state->initialized = true;
+ /*
+ * we need to start reading at the beginning of the page to understand
+ * what we are currently reading. We will skip over that because we
+ * check curptr < startptr later.
+ */
+ state->curptr.xrecoff = state->curptr.xrecoff - state->curptr.xrecoff % XLOG_BLCKSZ;
+ Assert(state->curptr.xrecoff % XLOG_BLCKSZ == 0);
+ elog(LOG, "start reading from %X/%X, scrolled back to %X/%X",
+ state->startptr.xlogid, state->startptr.xrecoff,
+ state->curptr.xlogid, state->curptr.xrecoff);
+
+ }
+ else
+ {
+ /*
+ * We didn't finish reading the last time round. Since then new data
+ * could have been appended to the current page. So we need to update
+ * our copy of that.
+ *
+ * XXX: We could tie that to state->needs_input but that doesn't seem
+ * worth the complication atm.
+ */
+ XLogRecPtr rereadptr = state->curptr;
+ rereadptr.xrecoff -= rereadptr.xrecoff % XLOG_BLCKSZ;
+
+ XLByteAdvance(rereadptr, SizeOfXLogShortPHD);
+
+ if(!XLByteLE(rereadptr, state->endptr))
+ goto not_enough_input;
+
+ rereadptr.xrecoff -= rereadptr.xrecoff % XLOG_BLCKSZ;
+
+ state->read_page(state, state->cur_page, rereadptr);
+
+ state->page_header = (XLogPageHeader)state->cur_page;
+ state->page_header_size = XLogPageHeaderSize(state->page_header);
+
+ }
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "starting reading for %X from %X",
+ state->startptr.xrecoff, state->curptr.xrecoff);
+#endif
+ while (XLByteLT(state->curptr, state->endptr))
+ {
+ uint32 len_in_block;
+ /* did we read a partial xlog record due to input/output constraints */
+ bool partial_read = false;
+ bool partial_write = false;
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "one loop start: record: %u skip: %u bkb_block: %d in_bkp_header: %u xrecoff: %X/%X remaining: %u, off: %u",
+ state->in_record, state->in_skip,
+ state->in_bkp_blocks, state->in_bkp_block_header,
+ state->curptr.xlogid, state->curptr.xrecoff,
+ state->remaining_size,
+ state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+
+ if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "reading page header, at %X/%X",
+ state->curptr.xlogid, state->curptr.xrecoff);
+#endif
+ /* check whether we can read enough to see the short header */
+ if (!XLogReaderHasInput(state, SizeOfXLogShortPHD))
+ goto not_enough_input;
+
+ state->read_page(state, state->cur_page, state->curptr);
+ state->page_header = (XLogPageHeader)state->cur_page;
+ state->page_header_size = XLogPageHeaderSize(state->page_header);
+
+ /* check wether we have enough to read/write the full header */
+ if (!XLogReaderHasInput(state, state->page_header_size))
+ goto not_enough_input;
+
+ /* writeout page header only if were somewhere interesting */
+ if (!XLByteLT(state->curptr, state->startptr))
+ {
+ if (!XLogReaderHasOutput(state, state->page_header_size))
+ goto not_enough_output;
+
+ state->writeout_data(state, state->cur_page, state->page_header_size);
+ }
+
+ XLByteAdvance(state->curptr, state->page_header_size);
+
+ if (XLByteLT(state->curptr, state->startptr))
+ {
+ /* don't intepret anything if were before startpoint */
+ }
+ else if (state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+ {
+ XLogContRecord* temp_contrecord;
+
+ if(!XLogReaderHasInput(state, SizeOfXLogContRecord))
+ goto not_enough_input;
+
+ if(!XLogReaderHasOutput(state, SizeOfXLogContRecord))
+ goto not_enough_output;
+
+ temp_contrecord =
+ (XLogContRecord*)(state->cur_page
+ + state->curptr.xrecoff % XLOG_BLCKSZ);
+
+
+ state->writeout_data(state, (char*)temp_contrecord, SizeOfXLogContRecord);
+
+ XLByteAdvance(state->curptr, SizeOfXLogContRecord);
+
+ if (!state->in_record)
+ {
+ /* we need to support this case for initializing a cluster... */
+ elog(WARNING, "contrecord although were not in a record at %X/%X, starting at %X/%X",
+ state->curptr.xlogid, state->curptr.xrecoff,
+ state->startptr.xlogid, state->startptr.xrecoff);
+ state->in_record = true;
+ state->in_skip = true;
+ state->remaining_size = temp_contrecord->xl_rem_len;
+ continue;
+ }
+
+
+ if(temp_contrecord->xl_rem_len < state->remaining_size)
+ elog(PANIC, "remaining length is smaller than to be read data: %u %u",
+ temp_contrecord->xl_rem_len, state->remaining_size
+ );
+
+ }
+ else
+ {
+ if (state->in_record)
+ {
+ elog(PANIC, "no contrecord although were in a record");
+ }
+ }
+ }
+
+ if (!state->in_record)
+ {
+ /*
+ * a record must be stored aligned. So skip as far we need to
+ * comply with that.
+ */
+ Size skiplen;
+ skiplen = MAXALIGN(state->curptr.xrecoff)
+ - state->curptr.xrecoff;
+
+ if (skiplen)
+ {
+ if (!XLogReaderHasSpace(state, skiplen))
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "not aligning bc of space");
+#endif
+ /*
+ * We don't have enough space to read/write the alignment
+ * bytes, so fake up a skip-state
+ */
+ state->in_record = true;
+ state->in_skip = true;
+ state->remaining_size = skiplen;
+
+ if (!XLogReaderHasInput(state, skiplen))
+ goto not_enough_input;
+ goto not_enough_output;
+ }
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "aligning from %X/%X to %X/%X",
+ state->curptr.xlogid, state->curptr.xrecoff,
+ state->curptr.xlogid, state->curptr.xrecoff + (uint32)skiplen);
+#endif
+ if (!XLByteLT(state->curptr, state->startptr))
+ state->writeout_data(state, NULL, skiplen);
+ XLByteAdvance(state->curptr, skiplen);
+ }
+ }
+
+ /* skip until we reach the part of the page were interested in */
+ if (XLByteLT(state->curptr, state->startptr))
+ {
+
+ if (state->in_skip)
+ {
+ /* the code already handles that, we expect a contrecord */
+ }
+ else if ((state->curptr.xrecoff % XLOG_BLCKSZ) == state->page_header_size &&
+ state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+ {
+
+ XLogContRecord* temp_contrecord = (XLogContRecord*)
+ (state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ);
+
+ /*
+ * we know we have enough space here because we didn't start
+ * writing out data yet because were < startptr
+ */
+ Assert(XLogReaderHasSpace(state, SizeOfXLogContRecord));
+
+ XLByteAdvance(state->curptr, SizeOfXLogContRecord);
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping contrecord before start");
+#endif
+ state->in_skip = true;
+ state->in_record = true;
+ state->in_bkp_blocks = 0;
+ state->remaining_size = temp_contrecord->xl_rem_len;
+ }
+ else
+ {
+ Assert(!state->in_record);
+
+ /* read how much space we have left on the current page */
+ if(state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+ len_in_block = 0;
+ else
+ len_in_block = XLOG_BLCKSZ - state->curptr.xrecoff % XLOG_BLCKSZ;
+
+ if(len_in_block < SizeOfXLogRecord)
+ {
+ XLByteAdvance(state->curptr, len_in_block);
+ continue;
+ }
+
+ /*
+ * now read the record information and start skipping till the
+ * record is over
+ */
+ temp_record = (XLogRecord*)(state->cur_page + (state->curptr.xrecoff % XLOG_BLCKSZ));
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping record before start %lu, tot %u at %X/%X off %d ",
+ temp_record->xl_tot_len - SizeOfXLogRecord,
+ temp_record->xl_tot_len,
+ state->curptr.xlogid, state->curptr.xrecoff,
+ state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+
+ Assert(XLogReaderHasSpace(state, SizeOfXLogRecord));
+
+ XLByteAdvance(state->curptr, SizeOfXLogRecord);
+
+ state->in_skip = true;
+ state->in_record = true;
+ state->in_bkp_blocks = 0;
+ state->remaining_size = temp_record->xl_tot_len
+ - SizeOfXLogRecord;
+ }
+ }
+
+ /*
+ * ----------------------------------------
+ * start to read a record
+ *
+ * This will only happen if were already behind state->startptr
+ * ----------------------------------------
+ */
+ if (!state->in_record)
+ {
+ /*
+ * if were at the beginning of a page (after the page header) it
+ * could be that were starting in a continuation of an earlier
+ * record. Its debatable wether thats a valid use-case. Support it
+ * for now but cry loudly.
+ */
+ if ((state->curptr.xrecoff % XLOG_BLCKSZ) == state->page_header_size &&
+ state->page_header->xlp_info & XLP_FIRST_IS_CONTRECORD)
+ {
+ XLogContRecord* temp_contrecord = (XLogContRecord*)
+ (state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ);
+
+ if (!XLogReaderHasInput(state, SizeOfXLogContRecord))
+ goto not_enough_input;
+
+ if (!XLogReaderHasOutput(state, SizeOfXLogContRecord))
+ goto not_enough_output;
+
+ state->writeout_data(state,
+ (char*)temp_contrecord,
+ SizeOfXLogContRecord);
+ XLByteAdvance(state->curptr, SizeOfXLogContRecord);
+
+ elog(PANIC, "hum, ho, first is contrecord, but trying to read the record afterwards %X/%X",
+ state->curptr.xlogid, state->curptr.xrecoff);
+
+ state->in_skip = true;
+ state->in_record = true;
+ state->in_bkp_blocks = 0;
+ state->remaining_size = temp_contrecord->xl_rem_len;
+ continue;
+ }
+
+ /* read how much space we have left on the current page */
+ if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+ len_in_block = 0;
+ else
+ len_in_block = XLOG_BLCKSZ - state->curptr.xrecoff % XLOG_BLCKSZ;
+
+ /* if there is not enough space for the xlog header, skip to next page */
+ if (len_in_block < SizeOfXLogRecord)
+ {
+
+ if (!XLogReaderHasOutput(state, len_in_block))
+ goto not_enough_input;
+
+ if (!XLogReaderHasOutput(state, len_in_block))
+ goto not_enough_output;
+
+ state->writeout_data(state,
+ NULL,
+ len_in_block);
+
+ XLByteAdvance(state->curptr, len_in_block);
+ continue;
+ }
+
+ temp_record = (XLogRecord*)(state->cur_page + (state->curptr.xrecoff % XLOG_BLCKSZ));
+
+ /*
+ * we quickly loose the original address of a record as we can skip
+ * records and such, so keep the original addresses.
+ */
+ state->buf.origptr = state->curptr;
+
+ /* we writeout data as soon as we know whether were writing out something sensible */
+ XLByteAdvance(state->curptr, SizeOfXLogRecord);
+
+ /* ----------------------------------------
+ * normally we don't look at the content of xlog records here,
+ * XLOG_SWITCH is a special case though, as everything left in that
+ * segment won't be sensbible content.
+ * So skip to the next segment. For that we currently simply leave
+ * the loop as we don't have any mechanism to communicate that
+ * behaviour otherwise.
+ * ----------------------------------------
+ */
+ if (temp_record->xl_rmid == RM_XLOG_ID
+ && (temp_record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+ {
+
+ /*
+ * writeout data so that this gap makes sense in the written
+ * out data
+ */
+ state->writeout_data(state,
+ (char*)temp_record,
+ SizeOfXLogRecord);
+
+ /*
+ * Pretend the current data extends to end of segment
+ *
+ * FIXME: This logic is copied from xlog.c but seems to
+ * disregard xrecoff wrapping around to the next xlogid?
+ */
+ state->curptr.xrecoff += XLogSegSize - 1;
+ state->curptr.xrecoff -= state->curptr.xrecoff % XLogSegSize;
+
+ state->in_record = false;
+ state->in_bkp_blocks = 0;
+ state->in_skip = false;
+ goto out;
+ }
+ /* ----------------------------------------
+ * Ok, we found interesting data. That means we need to do the full
+ * deal, reading the record, reading the BKP blocks afterward and
+ * then hand of the record to be processed.
+ * ----------------------------------------
+ */
+ else if (state->is_record_interesting(state, temp_record))
+ {
+ /*
+ * the rest of the record might be on another page so we need a
+ * copy instead just pointing into the current page.
+ */
+ memcpy(&state->buf.record,
+ temp_record,
+ sizeof(XLogRecord));/* really needs sizeof(XLogRecord) */
+
+ state->writeout_data(state,
+ (char*)temp_record,
+ SizeOfXLogRecord);
+ /*
+ * read till the record itself finished, after that we will
+ * continue with the bkp blocks et al
+ */
+ state->remaining_size = temp_record->xl_len;
+
+ state->in_record = true;
+ state->in_bkp_blocks = 0;
+ state->in_skip = false;
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "found record at %X/%X, tx %u, rmid %hhu, len %u tot %u",
+ state->buf.origptr.xlogid, state->buf.origptr.xrecoff,
+ temp_record->xl_xid, temp_record->xl_rmid, temp_record->xl_len,
+ temp_record->xl_tot_len);
+#endif
+ }
+ /* ----------------------------------------
+ * ok, everybody aggrees, the content of the current record are
+ * just plain boring. So fake-up a record that replaces it by a
+ * NOOP record.
+ *
+ * FIXME: we should allow "compressing" the output here. That is
+ * write something that shows how long the record should be if
+ * everything is decompressed again. This can radically reduce
+ * space-usage over the wire.
+ * It could also be very useful for traditional SR by removing
+ * unneded BKP blocks from being transferred.
+ * For that we would need to recompute CRCs though, which we
+ * currently don't support.
+ * ----------------------------------------
+ */
+ else
+ {
+ /*
+ * we need to fix up a fake record with correct length that can
+ * be written out.
+ */
+ /* needs space for padding to SizeOfXLogRecord */
+ XLogRecord spacer;
+
+ /*
+ * xl_tot_len contains the size of the XLogRecord itself, we
+ * read that already though.
+ */
+ state->remaining_size = temp_record->xl_tot_len
+ - SizeOfXLogRecord;
+
+ state->in_record = true;
+ state->in_bkp_blocks = 0;
+ state->in_skip = true;
+
+ /* FIXME: fixup the xl_prev of the next record */
+ spacer.xl_prev = state->buf.origptr;
+ spacer.xl_xid = InvalidTransactionId;
+ spacer.xl_tot_len = temp_record->xl_tot_len;
+ spacer.xl_len = temp_record->xl_tot_len - SizeOfXLogRecord;
+ spacer.xl_rmid = RM_XLOG_ID;
+ spacer.xl_info = XLOG_NOOP;
+
+ state->writeout_data(state,
+ (char*)&spacer,
+ SizeOfXLogRecord);
+ }
+ }
+ /*
+ * We read an interesting page and now want the BKP
+ * blocks. Unfortunately a bkp header is stored unaligned and can be
+ * split across pages. So we copy it to a bit more permanent location.
+ */
+ else if (state->in_bkp_blocks > 0
+ && state->remaining_size == 0)
+ {
+ Assert(!state->in_bkp_block_header);
+ Assert(state->buf.record.xl_info &
+ XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks));
+
+ state->in_bkp_block_header = true;
+ state->remaining_size = sizeof(BkpBlock);
+ /* in_bkp_blocks will be changed uppon completion */
+ state->in_skip = false;
+ }
+
+ Assert(state->in_record);
+
+ /* compute how much space on the current page is left */
+ if (state->curptr.xrecoff % XLOG_BLCKSZ == 0)
+ len_in_block = 0;
+ else
+ len_in_block = XLOG_BLCKSZ - state->curptr.xrecoff % XLOG_BLCKSZ;
+
+ /* we have more data available than we need, so read only as much as needed */
+ if(len_in_block > state->remaining_size)
+ len_in_block = state->remaining_size;
+
+ /*
+ * Handle constraints set by endptr and the size of the output buffer.
+ *
+ * Normally we use XLogReaderHasSpace for that, but thats not
+ * convenient because we want to read data in parts. So, open-code the
+ * logic for that here.
+ */
+ if (state->curptr.xlogid == state->endptr.xlogid &&
+ state->curptr.xrecoff + len_in_block > state->endptr.xrecoff)
+ {
+ Size cur_len = len_in_block;
+ len_in_block = state->endptr.xrecoff - state->curptr.xrecoff;
+ partial_read = true;
+ elog(LOG, "truncating len_in_block due to endptr %X/%X %lu to %i at %X/%X",
+ state->startptr.xlogid, state->startptr.xrecoff,
+ cur_len, len_in_block,
+ state->curptr.xlogid, state->curptr.xrecoff);
+ }
+ else if (len_in_block > (MAX_SEND_SIZE - state->nbytes))
+ {
+ Size cur_len = len_in_block;
+ len_in_block = MAX_SEND_SIZE - state->nbytes;
+ partial_write = true;
+ elog(LOG, "truncating len_in_block due to nbytes %lu to %i",
+ cur_len, len_in_block);
+ }
+
+ /* ----------------------------------------
+ * copy data to whatever were currently reading.
+ * ----------------------------------------
+ */
+
+ /* nothing to do if were skipping */
+ if (state->in_skip)
+ {
+ /* writeout zero data */
+ if (!XLByteLT(state->curptr, state->startptr))
+ state->writeout_data(state, NULL, len_in_block);
+ }
+ /* copy data into the current bkp block */
+ else if (state->in_bkp_block_header)
+ {
+ int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+ BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+ Assert(state->in_bkp_blocks);
+
+ memcpy((char*)bkpb + sizeof(BkpBlock) - state->remaining_size,
+ state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+ len_in_block);
+
+ state->writeout_data(state,
+ state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+ len_in_block);
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "copying bkp header for %d of %u complete %lu at %X/%X rem %u",
+ blockno, len_in_block, sizeof(BkpBlock),
+ state->curptr.xlogid, state->curptr.xrecoff,
+ state->remaining_size);
+ if (state->remaining_size == len_in_block)
+ {
+ elog(LOG, "block off %u len %u", bkpb->hole_offset, bkpb->hole_length);
+ }
+#endif
+ }
+ else if (state->in_bkp_blocks)
+ {
+ int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+ BkpBlock* bkpb = &state->buf.bkp_block[blockno];
+ char* data = state->buf.bkp_block_data[blockno];
+
+ memcpy(data + BLCKSZ - bkpb->hole_length - state->remaining_size,
+ state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+ len_in_block);
+
+ state->writeout_data(state,
+ state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+ len_in_block);
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "copying data for %d of %u complete %u",
+ blockno, len_in_block, state->remaining_size);
+#endif
+ }
+ /* read the (rest) of the XLogRecord's data */
+ else if (state->in_record)
+ {
+ if(state->buf.record_data_size < state->buf.record.xl_len){
+ state->buf.record_data_size = state->buf.record.xl_len;
+ state->buf.record_data =
+ realloc(state->buf.record_data, state->buf.record_data_size);
+ }
+
+ memcpy(state->buf.record_data
+ + state->buf.record.xl_len
+ - state->remaining_size,
+ state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+ len_in_block);
+
+ state->writeout_data(state,
+ state->cur_page + state->curptr.xrecoff % XLOG_BLCKSZ,
+ len_in_block);
+ }
+
+ /* should handle wrapping around to next page */
+ XLByteAdvance(state->curptr, len_in_block);
+
+ state->remaining_size -= len_in_block;
+
+ /*
+ * ----------------------------------------
+ * we completed whatever we were reading. So, handle going to the next
+ * state.
+ * ----------------------------------------
+ */
+
+ if (state->remaining_size == 0)
+ {
+ /*
+ * in the in_skip case we already read backup blocks, so everything
+ * is finished.
+ */
+ if (state->in_skip)
+ {
+ state->in_record = false;
+ state->in_bkp_blocks = 0;
+ state->in_skip = false;
+ /* alignment is handled when starting to read a record */
+ }
+ /*
+ * We read the header of the current block. Start reading the
+ * content of that now.
+ */
+ else if (state->in_bkp_block_header)
+ {
+ BkpBlock* bkpb;
+ int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+
+ Assert(state->in_bkp_blocks);
+
+ bkpb = &state->buf.bkp_block[blockno];
+ state->remaining_size = BLCKSZ - bkpb->hole_length;
+ state->in_bkp_block_header = false;
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "completed reading of header for %d, reading data now %u hole %u, off %u",
+ blockno, state->remaining_size, bkpb->hole_length,
+ bkpb->hole_offset);
+#endif
+ }
+ /*
+ * The current backup block is finished, more maybe comming
+ */
+ else if (state->in_bkp_blocks)
+ {
+ int blockno = XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks;
+ BkpBlock* bkpb;
+ char* bkpb_data;
+
+ Assert(!state->in_bkp_block_header);
+
+ bkpb = &state->buf.bkp_block[blockno];
+ bkpb_data = state->buf.bkp_block_data[blockno];
+
+ /*
+ * reassemble block to its entirety by removing the bkp_hole
+ * "compression"
+ */
+ if(bkpb->hole_length){
+ memmove(bkpb_data + bkpb->hole_offset,
+ bkpb_data + bkpb->hole_offset + bkpb->hole_length,
+ BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+ memset(bkpb_data + bkpb->hole_offset,
+ 0,
+ bkpb->hole_length);
+ }
+#if 0
+ elog(LOG, "finished with bkp block %d", blockno);
+#endif
+ state->in_bkp_blocks--;
+
+ state->in_skip = false;
+
+ /*
+ * only continue with in_record=true if we have bkp block
+ */
+ while (state->in_bkp_blocks)
+ {
+ if (state->buf.record.xl_info &
+ XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks))
+ {
+ elog(LOG, "reading record %u", XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+ break;
+ }
+ state->in_bkp_blocks--;
+ }
+
+ if (!state->in_bkp_blocks)
+ {
+ goto all_bkp_finished;
+ }
+ /* bkp blocks are stored without regard for alignment */
+ }
+ /*
+ * read a non-skipped record, start reading bkp blocks afterwards
+ */
+ else if (state->in_record)
+ {
+ state->in_record = true;
+ state->in_skip = false;
+ state->in_bkp_blocks = XLR_MAX_BKP_BLOCKS;
+
+ /*
+ * only continue with in_record=true if we have bkp block
+ */
+ while (state->in_bkp_blocks)
+ {
+ if (state->buf.record.xl_info &
+ XLR_SET_BKP_BLOCK(XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks))
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "reading bkp block %u", XLR_MAX_BKP_BLOCKS - state->in_bkp_blocks);
+#endif
+ break;
+ }
+ state->in_bkp_blocks--;
+ }
+
+ if (!state->in_bkp_blocks)
+ {
+ goto all_bkp_finished;
+ }
+ /* bkp blocks are stored without regard for alignment */
+ }
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "finish with record at %X/%X",
+ state->curptr.xlogid, state->curptr.xrecoff);
+#endif
+ }
+ /*
+ * Something could only be partially read inside a single block because
+ * of input or output space constraints.. This case needs to be
+ * separate because otherwise we would treat it as a continuation which
+ * would obviously be wrong (we don't have a continuation record).
+ */
+ else if (partial_read)
+ {
+ partial_read = false;
+ goto not_enough_input;
+ }
+ else if (partial_write)
+ {
+ partial_write = false;
+ goto not_enough_output;
+ }
+ /*
+ * Data continues into the next block. Read the contiuation record
+ * there and then continue.
+ */
+ else
+ {
+ }
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "one loop: record: %u skip: %u bkb_block: %d in_bkp_header: %u xrecoff: %X/%X remaining: %u, off: %u",
+ state->in_record, state->in_skip,
+ state->in_bkp_blocks, state->in_bkp_block_header,
+ state->curptr.xlogid, state->curptr.xrecoff,
+ state->remaining_size,
+ state->curptr.xrecoff % XLOG_BLCKSZ);
+#endif
+ continue;
+
+ all_bkp_finished:
+ {
+ Assert(!state->in_skip);
+ Assert(!state->in_bkp_block_header);
+ Assert(!state->in_bkp_blocks);
+
+ state->finished_record(state, &state->buf);
+
+ state->in_record = false;
+
+ /* alignment is handled when starting to read a record */
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "currently at %X/%X to %X/%X, wrote nbytes: %lu",
+ state->curptr.xlogid, state->curptr.xrecoff,
+ state->endptr.xlogid, state->endptr.xrecoff, state->nbytes);
+#endif
+ }
+ }
+
+out:
+ if (state->in_skip)
+ {
+ state->incomplete = true;
+ }
+ else if (state->in_record)
+ {
+ state->incomplete = true;
+ }
+ else
+ {
+ state->incomplete = false;
+ }
+ return;
+
+not_enough_input:
+ state->needs_input = true;
+ goto out;
+
+not_enough_output:
+ state->needs_output = true;
+ goto out;
+}
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
new file mode 100644
index 0000000..7df98cf
--- /dev/null
+++ b/src/include/access/xlogreader.h
@@ -0,0 +1,173 @@
+/*-------------------------------------------------------------------------
+ *
+ * readxlog.h
+ *
+ * Generic xlog reading facility.
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ * src/include/access/readxlog.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _READXLOG_H
+#define _READXLOG_H
+
+#include "access/xlog_internal.h"
+
+typedef struct XLogRecordBuffer
+{
+ /* the record itself */
+ XLogRecord record;
+
+ /* at which LSN was that record found at */
+ XLogRecPtr origptr;
+
+ /* the data for xlog record */
+ char* record_data;
+ uint32 record_data_size;
+
+ BkpBlock bkp_block[XLR_MAX_BKP_BLOCKS];
+ char* bkp_block_data[XLR_MAX_BKP_BLOCKS];
+} XLogRecordBuffer;
+
+
+struct XLogReaderState;
+
+typedef bool (*XLogReaderStateInterestingCB)(struct XLogReaderState*, XLogRecord* r);
+typedef void (*XLogReaderStateWriteoutCB)(struct XLogReaderState*, char* data, Size len);
+typedef void (*XLogReaderStateFinishedRecordCB)(struct XLogReaderState*, XLogRecordBuffer* buf);
+typedef void (*XLogReaderStateReadPageCB)(struct XLogReaderState*, char* cur_page, XLogRecPtr at);
+
+typedef struct XLogReaderState
+{
+ /* ----------------------------------------
+ * Public parameters
+ * ----------------------------------------
+ */
+
+ /* callbacks */
+
+ /*
+ * Called to decide whether a xlog record is interesting and should be
+ * assembled, analyzed (finished_record) and written out or skipped.
+ */
+ XLogReaderStateInterestingCB is_record_interesting;
+
+ /*
+ * writeout data. This doesn't have to do anything if the data isn't needed
+ * lateron.
+ */
+ XLogReaderStateWriteoutCB writeout_data;
+
+ /*
+ * Gets called after a record, including the backup blocks, has been fully
+ * reassembled.
+ */
+ XLogReaderStateFinishedRecordCB finished_record;
+
+ /*
+ * Data input function. Has to read XLOG_BLKSZ blocks into cur_page
+ * although everything behind endptr does not have to be valid.
+ */
+ XLogReaderStateReadPageCB read_page;
+
+ /*
+ * this can be used by the caller to pass state to the callbacks without
+ * using global variables or such ugliness
+ */
+ void* private_data;
+
+
+ /* from where to where are we reading */
+
+ /* so we know where interesting data starts after scrolling back to the beginning of a page */
+ XLogRecPtr startptr;
+
+ /* continue up to here in this run */
+ XLogRecPtr endptr;
+
+
+ /* ----------------------------------------
+ * output parameters
+ * ----------------------------------------
+ */
+
+ /* we need new input data - a later endptr - to continue reading */
+ bool needs_input;
+
+ /* we need new output space to continue reading */
+ bool needs_output;
+
+ /* track our progress */
+ XLogRecPtr curptr;
+
+ /*
+ * are we in the middle of something? This is useful for the outside to
+ * know whether to start reading anew
+ */
+ bool incomplete;
+
+ /* ----------------------------------------
+ * private parameters
+ * ----------------------------------------
+ */
+
+ char cur_page[XLOG_BLCKSZ];
+ XLogPageHeader page_header;
+ uint32 page_header_size;
+ XLogRecordBuffer buf;
+
+
+ /* ----------------------------------------
+ * state machine variables
+ * ----------------------------------------
+ */
+
+ bool initialized;
+
+ /* are we currently reading a record */
+ bool in_record;
+
+ /* how many bkp blocks remain to be read */
+ int in_bkp_blocks;
+
+ /*
+ * the header of a bkp block can be split across pages, so we need to
+ * support reading that incrementally
+ */
+ int in_bkp_block_header;
+
+ /* we don't want to read this block, so keep track of that */
+ bool in_skip;
+
+ /* how much more to read in the current state */
+ uint32 remaining_size;
+
+ Size nbytes; /* size of sent data*/
+
+} XLogReaderState;
+
+/*
+ * Get a new XLogReader
+ *
+ * The 4 callbacks, startptr and endptr have to be set before the reader can be
+ * used.
+ */
+extern XLogReaderState* XLogReaderAllocate(void);
+
+/*
+ * Reset internal state so it can be used without continuing from the last
+ * state.
+ *
+ * The callbacks and private_data won't be reset
+ */
+extern void XLogReaderReset(XLogReaderState* state);
+
+/*
+ * Read the xlog and call the appropriate callbacks as far as possible within
+ * the constraints of input data (startptr, endptr) and output space.
+ */
+extern void XLogReaderRead(XLogReaderState* state);
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
This adds a new wal_level value 'logical'
Missing cases:
- heap_multi_insert
- primary key changes for updates
- no primary key
- LOG_NEWPAGE
---
src/backend/access/heap/heapam.c | 135 ++++++++++++++++++++++++++++---
src/backend/access/transam/xlog.c | 1 +
src/backend/catalog/index.c | 74 +++++++++++++++++
src/bin/pg_controldata/pg_controldata.c | 2 +
src/include/access/xlog.h | 3 +-
src/include/catalog/index.h | 4 +
6 files changed, 207 insertions(+), 12 deletions(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9519e73..9149d53 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -52,6 +52,7 @@
#include "access/xact.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
+#include "catalog/index.h"
#include "catalog/namespace.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -1937,10 +1938,19 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
xl_heap_insert xlrec;
xl_heap_header xlhdr;
XLogRecPtr recptr;
- XLogRecData rdata[3];
+ XLogRecData rdata[4];
Page page = BufferGetPage(buffer);
uint8 info = XLOG_HEAP_INSERT;
+ /*
+ * For the logical replication case we need the tuple even if were
+ * doing a full page write. We could alternatively store a pointer into
+ * the fpw though.
+ * For that to work we add another rdata entry for the buffer in that
+ * case.
+ */
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
xlrec.all_visible_cleared = all_visible_cleared;
xlrec.target.node = relation->rd_node;
xlrec.target.tid = heaptup->t_self;
@@ -1960,18 +1970,32 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
*/
rdata[1].data = (char *) &xlhdr;
rdata[1].len = SizeOfHeapHeader;
- rdata[1].buffer = buffer;
+ rdata[1].buffer = need_tuple ? InvalidBuffer : buffer;
rdata[1].buffer_std = true;
rdata[1].next = &(rdata[2]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);
rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[2].buffer = buffer;
+ rdata[2].buffer = need_tuple ? InvalidBuffer : buffer;
rdata[2].buffer_std = true;
rdata[2].next = NULL;
/*
+ * add record for the buffer without actual content thats removed if
+ * fpw is done for that buffer
+ */
+ if(need_tuple){
+ rdata[2].next = &(rdata[3]);
+
+ rdata[3].data = NULL;
+ rdata[3].len = 0;
+ rdata[3].buffer = buffer;
+ rdata[3].buffer_std = true;
+ rdata[3].next = NULL;
+ }
+
+ /*
* If this is the single and first tuple on page, we can reinit the
* page instead of restoring the whole thing. Set flag, and hide
* buffer references from XLogInsert.
@@ -1980,7 +2004,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+ rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
}
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -2568,7 +2592,9 @@ l1:
{
xl_heap_delete xlrec;
XLogRecPtr recptr;
- XLogRecData rdata[2];
+ XLogRecData rdata[4];
+
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL && relation->rd_id >= FirstNormalObjectId;
xlrec.all_visible_cleared = all_visible_cleared;
xlrec.target.node = relation->rd_node;
@@ -2584,6 +2610,73 @@ l1:
rdata[1].buffer_std = true;
rdata[1].next = NULL;
+ /*
+ * XXX: We could decide not to log changes when the origin is not the
+ * local node, that should reduce redundant logging.
+ */
+ if(need_tuple){
+ xl_heap_header xlhdr;
+
+ Oid indexoid = InvalidOid;
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+ TupleDesc desc = RelationGetDescr(relation);
+ Relation index_rel;
+ TupleDesc indexdesc;
+ int natt;
+
+ Datum idxvals[INDEX_MAX_KEYS];
+ bool idxisnull[INDEX_MAX_KEYS];
+ HeapTuple idxtuple;
+
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+ MemSet(idxvals, 0, sizeof(idxvals));
+ MemSet(idxisnull, 0, sizeof(idxisnull));
+ relationFindPrimaryKey(relation, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+ if(!indexoid){
+ elog(WARNING, "Could not find primary key for table with oid %u",
+ relation->rd_id);
+ goto no_index_found;
+ }
+
+ index_rel = index_open(indexoid, AccessShareLock);
+
+ indexdesc = RelationGetDescr(index_rel);
+
+ for(natt = 0; natt < indexdesc->natts; natt++){
+ idxvals[natt] =
+ fastgetattr(&tp, pkattnum[natt], desc, &idxisnull[natt]);
+ Assert(!idxisnull[natt]);
+ }
+
+ idxtuple = heap_form_tuple(indexdesc, idxvals, idxisnull);
+
+ xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2;
+ xlhdr.t_infomask = idxtuple->t_data->t_infomask;
+ xlhdr.t_hoff = idxtuple->t_data->t_hoff;
+
+ rdata[1].next = &(rdata[2]);
+ rdata[2].data = (char*)&xlhdr;
+ rdata[2].len = SizeOfHeapHeader;
+ rdata[2].buffer = InvalidBuffer;
+ rdata[2].next = NULL;
+
+ rdata[2].next = &(rdata[3]);
+ rdata[3].data = (char *) idxtuple->t_data + offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].len = idxtuple->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ rdata[3].buffer = InvalidBuffer;
+ rdata[3].next = NULL;
+
+ heap_close(index_rel, NoLock);
+ no_index_found:
+ ;
+ }
+
recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
PageSetLSN(page, recptr);
@@ -4413,9 +4506,14 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
xl_heap_header xlhdr;
uint8 info;
XLogRecPtr recptr;
- XLogRecData rdata[4];
+ XLogRecData rdata[5];
Page page = BufferGetPage(newbuf);
+ /*
+ * Just as for XLOG_HEAP_INSERT we need to make sure the tuple
+ */
+ bool need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
/* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
@@ -4446,28 +4544,43 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
xlhdr.t_hoff = newtup->t_data->t_hoff;
/*
- * As with insert records, we need not store the rdata[2] segment if we
- * decide to store the whole buffer instead.
+ * As with insert's logging , we need not store the the Datum containing
+ * tuples separately from the buffer if we do logical replication that
+ * is...
*/
rdata[2].data = (char *) &xlhdr;
rdata[2].len = SizeOfHeapHeader;
- rdata[2].buffer = newbuf;
+ rdata[2].buffer = need_tuple ? InvalidBuffer : newbuf;
rdata[2].buffer_std = true;
rdata[2].next = &(rdata[3]);
/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
- rdata[3].buffer = newbuf;
+ rdata[3].buffer = need_tuple ? InvalidBuffer : newbuf;
rdata[3].buffer_std = true;
rdata[3].next = NULL;
+ /*
+ * separate storage for the buffer reference of the new page in the
+ * wal_level=logical case
+ */
+ if(need_tuple){
+ rdata[3].next = &(rdata[4]);
+
+ rdata[4].data = NULL,
+ rdata[4].len = 0;
+ rdata[4].buffer = newbuf;
+ rdata[4].buffer_std = true;
+ rdata[4].next = NULL;
+ }
+
/* If new tuple is the single and first tuple on page... */
if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber &&
PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
{
info |= XLOG_HEAP_INIT_PAGE;
- rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+ rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = InvalidBuffer;
}
recptr = XLogInsert(RM_HEAP_ID, info, rdata);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 166efb0..c6feed0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ const struct config_enum_entry wal_level_options[] = {
{"minimal", WAL_LEVEL_MINIMAL, false},
{"archive", WAL_LEVEL_ARCHIVE, false},
{"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+ {"logical", WAL_LEVEL_LOGICAL, false},
{NULL, 0, false}
};
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9e8b1cc..4cddcac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -49,6 +49,7 @@
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "parser/parser.h"
+#include "parser/parse_relation.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -3311,3 +3312,76 @@ ResetReindexPending(void)
{
pendingReindexedIndexes = NIL;
}
+
+/*
+ * relationFindPrimaryKey
+ * Find primary key for a relation if it exists.
+ *
+ * If no primary key is found *indexOid is set to InvalidOid
+ *
+ * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey.
+ *
+ * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to
+ * make this more efficient.
+ */
+void
+relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+ int16 *nratts, int16 *attnums, Oid *atttypids,
+ Oid *opclasses){
+ List *indexoidlist;
+ ListCell *indexoidscan;
+ HeapTuple indexTuple = NULL;
+ Datum indclassDatum;
+ bool isnull;
+ oidvector *indclass;
+ int i;
+ Form_pg_index indexStruct = NULL;
+
+ *indexOid = InvalidOid;
+
+ indexoidlist = RelationGetIndexList(pkrel);
+
+ foreach(indexoidscan, indexoidlist)
+ {
+ Oid indexoid = lfirst_oid(indexoidscan);
+
+ indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+ if(!HeapTupleIsValid(indexTuple))
+ elog(ERROR, "cache lookup failed for index %u", indexoid);
+
+ indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+ if(indexStruct->indisprimary && indexStruct->indimmediate)
+ {
+ *indexOid = indexoid;
+ break;
+ }
+ ReleaseSysCache(indexTuple);
+
+ }
+ list_free(indexoidlist);
+
+ if (!OidIsValid(*indexOid))
+ return;
+
+ /* Must get indclass the hard way */
+ indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+ Anum_pg_index_indclass, &isnull);
+ Assert(!isnull);
+ indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+ *nratts = indexStruct->indnatts;
+ /*
+ * Now build the list of PK attributes from the indkey definition (we
+ * assume a primary key cannot have expressional elements)
+ */
+ for (i = 0; i < indexStruct->indnatts; i++)
+ {
+ int pkattno = indexStruct->indkey.values[i];
+
+ attnums[i] = pkattno;
+ atttypids[i] = attnumTypeId(pkrel, pkattno);
+ opclasses[i] = indclass->values[i];
+ }
+
+ ReleaseSysCache(indexTuple);
+}
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index c00183a..47715c9 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -82,6 +82,8 @@ wal_level_str(WalLevel wal_level)
return "archive";
case WAL_LEVEL_HOT_STANDBY:
return "hot_standby";
+ case WAL_LEVEL_LOGICAL:
+ return "logical";
}
return _("unrecognized wal_level");
}
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index df5f232..2843aca 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -199,7 +199,8 @@ typedef enum WalLevel
{
WAL_LEVEL_MINIMAL = 0,
WAL_LEVEL_ARCHIVE,
- WAL_LEVEL_HOT_STANDBY
+ WAL_LEVEL_HOT_STANDBY,
+ WAL_LEVEL_LOGICAL
} WalLevel;
extern int wal_level;
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 7c8198f..2ba0ac3 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -101,4 +101,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
extern bool ReindexIsProcessingIndex(Oid indexOid);
extern Oid IndexGetRelation(Oid indexId, bool missing_ok);
+extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+ int16 *nratts, int16 *attnums, Oid *atttypids,
+ Oid *opclasses);
+
#endif /* INDEX_H */
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
The individual changes need to be identified by an xid. The xid can be a
subtransaction or a toplevel one, at commit those can be reintegrated by doing
a k-way mergesort between the individual transaction.
Callbacks for apply_begin, apply_change and apply_commit are provided to
retrieve complete transactions.
Missing:
- spill-to-disk
- correct subtransaction merge, current behaviour is simple/wrong
- DDL handling (?)
- resource usage controls
---
src/backend/replication/Makefile | 2 +
src/backend/replication/logical/Makefile | 19 ++
src/backend/replication/logical/applycache.c | 380 ++++++++++++++++++++++++++
src/include/replication/applycache.h | 185 +++++++++++++
4 files changed, 586 insertions(+)
create mode 100644 src/backend/replication/logical/Makefile
create mode 100644 src/backend/replication/logical/applycache.c
create mode 100644 src/include/replication/applycache.h
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 9d9ec87..ae7f6b1 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -17,6 +17,8 @@ override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \
repl_gram.o syncrep.o
+SUBDIRS = logical
+
include $(top_srcdir)/src/backend/common.mk
# repl_scanner is compiled as part of repl_gram
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
new file mode 100644
index 0000000..2eadab8
--- /dev/null
+++ b/src/backend/replication/logical/Makefile
@@ -0,0 +1,19 @@
+#-------------------------------------------------------------------------
+#
+# Makefile--
+# Makefile for src/backend/replication/logical
+#
+# IDENTIFICATION
+# src/backend/replication/logical/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/backend/replication/logical
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+
+override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
+
+OBJS = applycache.o
+
+include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/applycache.c b/src/backend/replication/logical/applycache.c
new file mode 100644
index 0000000..b73b0ba
--- /dev/null
+++ b/src/backend/replication/logical/applycache.c
@@ -0,0 +1,380 @@
+/*-------------------------------------------------------------------------
+ *
+ * applycache.c
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/applycache.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/xact.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
+#include "replication/applycache.h"
+
+#include "utils/ilist.h"
+#include "utils/memutils.h"
+#include "utils/relcache.h"
+#include "utils/syscache.h"
+
+const Size max_memtries = 1<<16;
+
+const size_t max_cached_changes = 1024;
+const size_t max_cached_tuplebufs = 1024; /* ~8MB */
+const size_t max_cached_transactions = 512;
+
+typedef struct ApplyCacheTXNByIdEnt
+{
+ TransactionId xid;
+ ApplyCacheTXN* txn;
+} ApplyCacheTXNByIdEnt;
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache);
+static void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn);
+
+static ApplyCacheTXN* ApplyCacheTXNByXid(ApplyCache*, TransactionId xid, bool create);
+
+
+ApplyCache*
+ApplyCacheAllocate(void)
+{
+ ApplyCache* cache = (ApplyCache*)malloc(sizeof(ApplyCache));
+ HASHCTL hash_ctl;
+
+ if (!cache)
+ elog(ERROR, "Could not allocate the ApplyCache");
+
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+
+ cache->context = AllocSetContextCreate(TopMemoryContext,
+ "ApplyCache",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ hash_ctl.keysize = sizeof(TransactionId);
+ hash_ctl.entrysize = sizeof(ApplyCacheTXNByIdEnt);
+ hash_ctl.hash = tag_hash;
+ hash_ctl.hcxt = cache->context;
+
+ cache->by_txn = hash_create("ApplyCacheByXid", 1000, &hash_ctl,
+ HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
+
+ cache->nr_cached_transactions = 0;
+ cache->nr_cached_changes = 0;
+ cache->nr_cached_tuplebufs = 0;
+
+ ilist_d_init(&cache->cached_transactions);
+ ilist_d_init(&cache->cached_changes);
+ ilist_s_init(&cache->cached_tuplebufs);
+
+ return cache;
+}
+
+void ApplyCacheFree(ApplyCache* cache)
+{
+ /* FIXME: check for in-progress transactions */
+ /* FIXME: clean up cached transaction */
+ /* FIXME: clean up cached changes */
+ /* FIXME: clean up cached tuplebufs */
+ hash_destroy(cache->by_txn);
+ free(cache);
+}
+
+static ApplyCacheTXN* ApplyCacheGetTXN(ApplyCache *cache)
+{
+ ApplyCacheTXN* txn;
+
+ if (cache->nr_cached_transactions)
+ {
+ cache->nr_cached_transactions--;
+ txn = ilist_container(ApplyCacheTXN, node,
+ ilist_d_pop_front(&cache->cached_transactions));
+ }
+ else
+ {
+ txn = (ApplyCacheTXN*)
+ malloc(sizeof(ApplyCacheTXN));
+
+ if (!txn)
+ elog(ERROR, "Could not allocate a ApplyCacheTXN struct");
+ }
+
+ memset(txn, 0, sizeof(ApplyCacheTXN));
+ ilist_d_init(&txn->changes);
+ ilist_d_init(&txn->subtxns);
+ return txn;
+}
+
+void ApplyCacheReturnTXN(ApplyCache *cache, ApplyCacheTXN* txn)
+{
+ if(cache->nr_cached_transactions < max_cached_transactions){
+ cache->nr_cached_transactions++;
+ ilist_d_push_front(&cache->cached_transactions, &txn->node);
+ }
+ else{
+ free(txn);
+ }
+}
+
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache* cache)
+{
+ ApplyCacheChange* change;
+
+ if (cache->nr_cached_changes)
+ {
+ cache->nr_cached_changes--;
+ change = ilist_container(ApplyCacheChange, node,
+ ilist_d_pop_front(&cache->cached_changes));
+ }
+ else
+ {
+ change = (ApplyCacheChange*)malloc(sizeof(ApplyCacheChange));
+
+ if (!change)
+ elog(ERROR, "Could not allocate a ApplyCacheChange struct");
+ }
+
+
+ memset(change, 0, sizeof(ApplyCacheChange));
+ return change;
+}
+
+void
+ApplyCacheReturnChange(ApplyCache* cache, ApplyCacheChange* change)
+{
+ if (change->newtuple)
+ ApplyCacheReturnTupleBuf(cache, change->newtuple);
+ if (change->oldtuple)
+ ApplyCacheReturnTupleBuf(cache, change->oldtuple);
+
+ if (change->table)
+ heap_freetuple(change->table);
+
+ if(cache->nr_cached_changes < max_cached_changes){
+ cache->nr_cached_changes++;
+ ilist_d_push_front(&cache->cached_changes, &change->node);
+ }
+ else{
+ free(change);
+ }
+}
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache* cache)
+{
+ ApplyCacheTupleBuf* tuple;
+
+ if (cache->nr_cached_tuplebufs)
+ {
+ cache->nr_cached_tuplebufs--;
+ tuple = ilist_container(ApplyCacheTupleBuf, node,
+ ilist_s_pop_front(&cache->cached_tuplebufs));
+ }
+ else
+ {
+ tuple =
+ (ApplyCacheTupleBuf*)malloc(sizeof(ApplyCacheTupleBuf));
+
+ if (!tuple)
+ elog(ERROR, "Could not allocate a ApplyCacheTupleBuf struct");
+ }
+
+ return tuple;
+}
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple)
+{
+ if(cache->nr_cached_tuplebufs < max_cached_tuplebufs){
+ cache->nr_cached_tuplebufs++;
+ ilist_s_push_front(&cache->cached_tuplebufs, &tuple->node);
+ }
+ else{
+ free(tuple);
+ }
+}
+
+
+static
+ApplyCacheTXN*
+ApplyCacheTXNByXid(ApplyCache* cache, TransactionId xid, bool create)
+{
+ ApplyCacheTXNByIdEnt* ent;
+ bool found;
+
+ ent = (ApplyCacheTXNByIdEnt*)
+ hash_search(cache->by_txn,
+ (void *)&xid,
+ (create ? HASH_ENTER : HASH_FIND),
+ &found);
+
+ if (found)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "found cache entry for %u at %p", xid, ent);
+#endif
+ }
+ else
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "didn't find cache entry for %u in %p at %p, creating %u",
+ xid, cache, ent, create);
+#endif
+ }
+
+ if (!found && !create)
+ return NULL;
+
+ if (!found)
+ {
+ ent->txn = ApplyCacheGetTXN(cache);
+ }
+
+ return ent->txn;
+}
+
+void
+ApplyCacheAddChange(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn,
+ ApplyCacheChange* change)
+{
+ ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, true);
+ txn->lsn = lsn;
+ ilist_d_push_back(&txn->changes, &change->node);
+}
+
+
+void
+ApplyCacheCommitChild(ApplyCache* cache, TransactionId xid,
+ TransactionId subxid, XLogRecPtr lsn)
+{
+ ApplyCacheTXN* txn;
+ ApplyCacheTXN* subtxn;
+
+ subtxn = ApplyCacheTXNByXid(cache, subxid, false);
+
+ /*
+ * No need to do anything if that subtxn didn't contain any changes
+ */
+ if (!subtxn)
+ return;
+
+ subtxn->lsn = lsn;
+
+ txn = ApplyCacheTXNByXid(cache, xid, true);
+
+ ilist_d_push_back(&txn->subtxns, &subtxn->node);
+}
+
+void
+ApplyCacheCommit(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+ ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+ ilist_d_node* cur_change, *next_change;
+ ilist_d_node* cur_txn, *next_txn;
+ bool found;
+
+ if (!txn)
+ return;
+
+ txn->lsn = lsn;
+
+ cache->begin(cache, txn);
+
+ /*
+ * FIXME:
+ * do a k-way mergesort of all changes ordered by xid
+ *
+ * For now we just iterate through all subtransactions and then through the
+ * main txn. But thats *WRONG*.
+ *
+ * The best way to do is probably to model the current heads of all TXNs as
+ * a heap and always remove from the smallest lsn till thats not the case
+ * anymore.
+ */
+ ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+ {
+ ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn);
+
+ ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ cache->apply_change(cache, txn, subtxn, change);
+
+ ApplyCacheReturnChange(cache, change);
+ }
+ ApplyCacheReturnTXN(cache, subtxn);
+ }
+
+ ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ cache->apply_change(cache, txn, NULL, change);
+
+ ApplyCacheReturnChange(cache, change);
+ }
+
+ cache->commit(cache, txn);
+
+ /* now remove reference from cache */
+ hash_search(cache->by_txn,
+ (void *)&xid,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+
+ ApplyCacheReturnTXN(cache, txn);
+}
+
+void
+ApplyCacheAbort(ApplyCache* cache, TransactionId xid, XLogRecPtr lsn)
+{
+ ilist_d_node* cur_change, *next_change;
+ ilist_d_node* cur_txn, *next_txn;
+ ApplyCacheTXN* txn = ApplyCacheTXNByXid(cache, xid, false);
+ bool found;
+
+ /* no changes in this commit */
+ if (!txn)
+ return;
+
+ /* iterate through all subtransactions and free memory */
+ ilist_d_foreach_modify (cur_txn, next_txn, &txn->subtxns)
+ {
+ ApplyCacheTXN* subtxn = ilist_container(ApplyCacheTXN, node, cur_txn);
+ ilist_d_foreach_modify (cur_change, next_change, &subtxn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ ApplyCacheReturnChange(cache, change);
+ }
+ ApplyCacheReturnTXN(cache, subtxn);
+ }
+
+ ilist_d_foreach_modify (cur_change, next_change, &txn->changes)
+ {
+ ApplyCacheChange* change =
+ ilist_container(ApplyCacheChange, node, cur_change);
+ ApplyCacheReturnChange(cache, change);
+ }
+
+ /* now remove reference from cache */
+ hash_search(cache->by_txn,
+ (void *)&xid,
+ HASH_REMOVE,
+ &found);
+ Assert(found);
+
+ ApplyCacheReturnTXN(cache, txn);
+}
diff --git a/src/include/replication/applycache.h b/src/include/replication/applycache.h
new file mode 100644
index 0000000..4ceba63
--- /dev/null
+++ b/src/include/replication/applycache.h
@@ -0,0 +1,185 @@
+/*
+ * applycache.h
+ *
+ * PostgreSQL logical replay "cache" management
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/applycache.h
+ */
+#ifndef APPLYCACHE_H
+#define APPLYCACHE_H
+
+#include "access/htup.h"
+#include "utils/hsearch.h"
+#include "utils/ilist.h"
+
+typedef struct ApplyCache ApplyCache;
+
+enum ApplyCacheChangeType
+{
+ APPLY_CACHE_CHANGE_INSERT,
+ APPLY_CACHE_CHANGE_UPDATE,
+ APPLY_CACHE_CHANGE_DELETE
+};
+
+typedef struct ApplyCacheTupleBuf
+{
+ /* position in preallocated list */
+ ilist_s_node node;
+
+ HeapTupleData tuple;
+ HeapTupleHeaderData header;
+ char data[MaxHeapTupleSize];
+} ApplyCacheTupleBuf;
+
+typedef struct ApplyCacheChange
+{
+ XLogRecPtr lsn;
+ enum ApplyCacheChangeType action;
+
+ ApplyCacheTupleBuf* newtuple;
+
+ ApplyCacheTupleBuf* oldtuple;
+
+ HeapTuple table;
+
+ /*
+ * While in use this is how a change is linked into a transactions,
+ * otherwise its the preallocated list.
+ */
+ ilist_d_node node;
+} ApplyCacheChange;
+
+typedef struct ApplyCacheTXN
+{
+ TransactionId xid;
+
+ XLogRecPtr lsn;
+
+ /*
+ * How many ApplyCacheChange's do we have in this txn.
+ *
+ * Subtransactions are *not* included.
+ */
+ Size nentries;
+
+ /*
+ * How many of the above entries are stored in memory in contrast to being
+ * spilled to disk.
+ */
+ Size nentries_mem;
+
+ /*
+ * List of actual changes
+ */
+ ilist_d_head changes;
+
+ /*
+ * non-hierarchical list of subtransactions that are *not* aborted
+ */
+ ilist_d_head subtxns;
+
+ /*
+ * our position in a list of subtransactions while the TXN is in
+ * use. Otherwise its the position in the list of preallocated
+ * transactions.
+ */
+ ilist_d_node node;
+} ApplyCacheTXN;
+
+
+/* XXX: were currently passing the originating subtxn. Not sure thats necessary */
+typedef void (*ApplyCacheApplyChangeCB)(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change);
+typedef void (*ApplyCacheBeginCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+typedef void (*ApplyCacheCommitCB)(ApplyCache* cache, ApplyCacheTXN* txn);
+
+/*
+ * max number of concurrent top-level transactions or transaction where we
+ * don't know if they are top-level can be calculated by:
+ * (max_connections + max_prepared_xactx + ?) * PGPROC_MAX_CACHED_SUBXIDS
+ */
+struct ApplyCache
+{
+ TransactionId last_txn;
+ ApplyCacheTXN *last_txn_cache;
+ HTAB *by_txn;
+
+ ApplyCacheBeginCB begin;
+ ApplyCacheApplyChangeCB apply_change;
+ ApplyCacheCommitCB commit;
+
+ void* private_data;
+
+ MemoryContext context;
+
+ /*
+ * we don't want to repeatedly (de-)allocated those structs, so cache them for reusage.
+ */
+ ilist_d_head cached_transactions;
+ size_t nr_cached_transactions;
+
+ ilist_d_head cached_changes;
+ size_t nr_cached_changes;
+
+ ilist_s_head cached_tuplebufs;
+ size_t nr_cached_tuplebufs;
+};
+
+
+ApplyCache*
+ApplyCacheAllocate(void);
+
+void
+ApplyCacheFree(ApplyCache*);
+
+ApplyCacheTupleBuf*
+ApplyCacheGetTupleBuf(ApplyCache*);
+
+void
+ApplyCacheReturnTupleBuf(ApplyCache* cache, ApplyCacheTupleBuf* tuple);
+
+/*
+ * Returns a (potentically preallocated) change struct. Its lifetime is managed
+ * by the applycache module.
+ *
+ * If not added to a transaction with ApplyCacheAddChange it needs to be
+ * returned via ApplyCacheReturnChange
+ *
+ * FIXME: better name
+ */
+ApplyCacheChange*
+ApplyCacheGetChange(ApplyCache*);
+
+/*
+ * Return an unused ApplyCacheChange struct
+ */
+void
+ApplyCacheReturnChange(ApplyCache*, ApplyCacheChange*);
+
+
+/*
+ * record the transaction as in-progress if not already done, add the current
+ * change.
+ *
+ * We have a one-entry cache for lookin up the current ApplyCacheTXN so we
+ * don't need to do a full hash-lookup if the same xid is used
+ * sequentially. Them being used multiple times that way is rather frequent.
+ */
+void
+ApplyCacheAddChange(ApplyCache*, TransactionId, XLogRecPtr lsn, ApplyCacheChange*);
+
+/*
+ *
+ */
+void
+ApplyCacheCommit(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheCommitChild(ApplyCache*, TransactionId, TransactionId, XLogRecPtr lsn);
+
+void
+ApplyCacheAbort(ApplyCache*, TransactionId, XLogRecPtr lsn);
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
This requires an up2date catalog and can thus only be run on a replica.
Missing:
- HEAP_NEWPAGE support
- HEAP2_MULTI_INSERT support
- DDL integration. *No* ddl, including TRUNCATE is possible atm
---
src/backend/replication/logical/Makefile | 2 +-
src/backend/replication/logical/decode.c | 439 ++++++++++++++++++++++++++++++
src/include/replication/decode.h | 23 ++
3 files changed, 463 insertions(+), 1 deletion(-)
create mode 100644 src/backend/replication/logical/decode.c
create mode 100644 src/include/replication/decode.h
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 2eadab8..7dd9663 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = applycache.o
+OBJS = applycache.o decode.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
new file mode 100644
index 0000000..7e07d50
--- /dev/null
+++ b/src/backend/replication/logical/decode.c
@@ -0,0 +1,439 @@
+/*-------------------------------------------------------------------------
+ *
+ * decode.c
+ *
+ * Decodes wal records from an xlogreader.h callback into an applycache
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/decode.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/transam.h"
+#include "access/xlog_internal.h"
+#include "access/xact.h"
+
+#include "replication/applycache.h"
+#include "replication/decode.h"
+
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/lsyscache.h"
+
+static void DecodeXLogTuple(char* data, Size len,
+ HeapTuple table, ApplyCacheTupleBuf* tuple);
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf);
+static void DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf);
+
+static void DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+ TransactionId *sub_xids, int nsubxacts);
+
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+ uint8 info = r->xl_info & ~XLR_INFO_MASK;
+
+ switch (r->xl_rmid)
+ {
+ case RM_HEAP_ID:
+ {
+ info &= XLOG_HEAP_OPMASK;
+ switch (info)
+ {
+ case XLOG_HEAP_INSERT:
+ DecodeInsert(cache, buf);
+ break;
+
+ /* no guarantee that we get an HOT update again, so handle it as a normal update*/
+ case XLOG_HEAP_HOT_UPDATE:
+ case XLOG_HEAP_UPDATE:
+ DecodeUpdate(cache, buf);
+ break;
+
+ case XLOG_HEAP_NEWPAGE:
+ DecodeNewpage(cache, buf);
+ break;
+
+ case XLOG_HEAP_DELETE:
+ DecodeDelete(cache, buf);
+ break;
+ default:
+ break;
+ }
+ break;
+ }
+ case RM_HEAP2_ID:
+ {
+ info &= XLOG_HEAP_OPMASK;
+ switch (info)
+ {
+ case XLOG_HEAP2_MULTI_INSERT:
+ /* this also handles the XLOG_HEAP_INIT_PAGE case */
+ DecodeMultiInsert(cache, buf);
+ break;
+ default:
+ /* everything else here is just physical stuff were not interested in */
+ break;
+ }
+ break;
+ }
+
+ case RM_XACT_ID:
+ {
+ switch (info)
+ {
+ case XLOG_XACT_COMMIT:
+ {
+ TransactionId *sub_xids;
+ xl_xact_commit *xlrec = (xl_xact_commit*)buf->record_data;
+
+ /* FIXME: this is not really allowed if there is no subtransactions */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ DecodeCommit(cache, buf, r->xl_xid, sub_xids, xlrec->nsubxacts);
+
+ break;
+ }
+ case XLOG_XACT_COMMIT_PREPARED:
+ {
+ TransactionId *sub_xids;
+ xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared*)buf->record_data;
+
+ sub_xids = (TransactionId *) &(xlrec->crec.xnodes[xlrec->crec.nrels]);
+
+ DecodeCommit(cache, buf, r->xl_xid, sub_xids,
+ xlrec->crec.nsubxacts);
+
+ break;
+ }
+ case XLOG_XACT_COMMIT_COMPACT:
+ {
+ xl_xact_commit_compact *xlrec = (xl_xact_commit_compact*)buf->record_data;
+ DecodeCommit(cache, buf, r->xl_xid, xlrec->subxacts,
+ xlrec->nsubxacts);
+ break;
+ }
+ case XLOG_XACT_ABORT:
+ case XLOG_XACT_ABORT_PREPARED:
+ {
+ TransactionId *sub_xids;
+ xl_xact_abort *xlrec = (xl_xact_abort*)buf->record_data;
+ int i;
+
+ /* FIXME: this is not really allowed if there is no subtransactions */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+
+ for(i = 0; i < xlrec->nsubxacts; i++)
+ {
+ ApplyCacheAbort(cache, *sub_xids, buf->origptr);
+ sub_xids += 1;
+ }
+
+ /* TODO: check that this also contains not-yet-aborted subtxns */
+ ApplyCacheAbort(cache, r->xl_xid, buf->origptr);
+
+ elog(WARNING, "ABORT %u", r->xl_xid);
+ break;
+ }
+ case XLOG_XACT_ASSIGNMENT:
+ /*
+ * XXX: We could reassign transactions to the parent here
+ * to save space and effort when merging transactions at
+ * commit.
+ */
+ break;
+ case XLOG_XACT_PREPARE:
+ /*
+ * FXIME: we should replay the transaction and prepare it
+ * as well.
+ */
+ break;
+ default:
+ break;
+ ;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+}
+
+static void
+DecodeCommit(ApplyCache* cache, XLogRecordBuffer* buf, TransactionId xid,
+ TransactionId *sub_xids, int nsubxacts)
+{
+ int i;
+
+ for (i = 0; i < nsubxacts; i++)
+ {
+ ApplyCacheCommitChild(cache, xid, *sub_xids, buf->origptr);
+ sub_xids++;
+ }
+
+ /* replay actions of all transaction + subtransactions in order */
+ ApplyCacheCommit(cache, xid, buf->origptr);
+}
+
+static void DecodeInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+ xl_heap_insert *xlrec = (xl_heap_insert *) buf->record_data;
+
+ Oid relfilenode = xlrec->target.node.relNode;
+
+ ApplyCacheChange* change;
+
+ if (r->xl_info & XLR_BKP_BLOCK_1
+ && r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader))
+ {
+ elog(FATAL, "huh, no tuple data on wal_level = logical?");
+ }
+
+ if(relfilenode == 0)
+ {
+ elog(ERROR, "nailed catalog changed");
+ }
+
+ change = ApplyCacheGetChange(cache);
+ change->action = APPLY_CACHE_CHANGE_INSERT;
+
+ /*
+ * Lookup the pg_class entry for the relfilenode to get the real oid
+ */
+ {
+ MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+ change->table = SearchSysCacheCopy1(RELFILENODE,
+ relfilenode);
+ MemoryContextSwitchTo(curctx);
+ }
+
+ if (!HeapTupleIsValid(change->table))
+ {
+#ifdef SHOULD_BE_HANDLED_BETTER
+ elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+ relfilenode);
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+ if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping change to systable");
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+#ifdef VERBOSE_DEBUG
+ {
+ /*for accessing the cache */
+ Form_pg_class class_form;
+ class_form = (Form_pg_class) GETSTRUCT(change->table);
+ elog(WARNING, "INSERT INTO \"%s\"", NameStr(class_form->relname));
+ }
+#endif
+
+ change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+ DecodeXLogTuple((char*)xlrec + SizeOfHeapInsert,
+ r->xl_len - SizeOfHeapInsert,
+ change->table, change->newtuple);
+
+ ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void
+DecodeUpdate(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+ xl_heap_update *xlrec = (xl_heap_update *) buf->record_data;
+
+ Oid relfilenode = xlrec->target.node.relNode;
+
+ ApplyCacheChange* change;
+
+ if ((r->xl_info & XLR_BKP_BLOCK_1 || r->xl_info & XLR_BKP_BLOCK_2) &&
+ (r->xl_len < (SizeOfHeapUpdate + SizeOfHeapHeader)))
+ {
+ elog(FATAL, "huh, no tuple data on wal_level = logical?");
+ }
+
+ change = ApplyCacheGetChange(cache);
+ change->action = APPLY_CACHE_CHANGE_UPDATE;
+
+ /*
+ * Lookup the pg_class entry for the relfilenode to get the real oid
+ */
+ {
+ MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+ change->table = SearchSysCacheCopy1(RELFILENODE,
+ relfilenode);
+ MemoryContextSwitchTo(curctx);
+ }
+
+ if (!HeapTupleIsValid(change->table))
+ {
+#ifdef SHOULD_BE_HANDLED_BETTER
+ elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+ relfilenode);
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+ if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping change to systable");
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+#ifdef VERBOSE_DEBUG
+ {
+ /*for accessing the cache */
+ Form_pg_class class_form;
+ class_form = (Form_pg_class) GETSTRUCT(change->table);
+ elog(WARNING, "UPDATE \"%s\"", NameStr(class_form->relname));
+ }
+#endif
+
+ /* FIXME: need to save the old tuple as well if we want primary key updates to work. */
+ change->newtuple = ApplyCacheGetTupleBuf(cache);
+
+ DecodeXLogTuple((char*)xlrec + SizeOfHeapUpdate,
+ r->xl_len - SizeOfHeapUpdate,
+ change->table, change->newtuple);
+
+ ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+static void DecodeDelete(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ XLogRecord* r = &buf->record;
+
+ xl_heap_delete *xlrec = (xl_heap_delete *) buf->record_data;
+
+ Oid relfilenode = xlrec->target.node.relNode;
+
+ ApplyCacheChange* change;
+
+ change = ApplyCacheGetChange(cache);
+ change->action = APPLY_CACHE_CHANGE_DELETE;
+
+ if (r->xl_len <= (SizeOfHeapDelete + SizeOfHeapHeader))
+ {
+ elog(FATAL, "huh, no primary key for a delete on wal_level = logical?");
+ }
+
+ /*
+ * Lookup the pg_class entry for the relfilenode to get the real oid
+ */
+ {
+ MemoryContext curctx = MemoryContextSwitchTo(TopMemoryContext);
+ change->table = SearchSysCacheCopy1(RELFILENODE,
+ relfilenode);
+ MemoryContextSwitchTo(curctx);
+ }
+
+ if (!HeapTupleIsValid(change->table))
+ {
+#ifdef SHOULD_BE_HANDLED_BETTER
+ elog(WARNING, "cache lookup failed for relfilenode %u, systable?",
+ relfilenode);
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+ if (HeapTupleGetOid(change->table) < FirstNormalObjectId)
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "skipping change to systable");
+#endif
+ ApplyCacheReturnChange(cache, change);
+ return;
+ }
+
+#ifdef VERBOSE_DEBUG
+ {
+ /*for accessing the cache */
+ Form_pg_class class_form;
+ class_form = (Form_pg_class) GETSTRUCT(change->table);
+ elog(WARNING, "DELETE FROM \"%s\"", NameStr(class_form->relname));
+ }
+#endif
+
+ change->oldtuple = ApplyCacheGetTupleBuf(cache);
+
+ DecodeXLogTuple((char*)xlrec + SizeOfHeapDelete,
+ r->xl_len - SizeOfHeapDelete,
+ change->table, change->oldtuple);
+
+ ApplyCacheAddChange(cache, r->xl_xid, buf->origptr, change);
+}
+
+
+static void
+DecodeNewpage(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ elog(WARNING, "skipping XLOG_HEAP_NEWPAGE record because we are too dumb");
+}
+
+static void
+DecodeMultiInsert(ApplyCache *cache, XLogRecordBuffer* buf)
+{
+ elog(WARNING, "skipping XLOG_HEAP2_MULTI_INSERT record because we are too dumb");
+}
+
+
+static void DecodeXLogTuple(char* data, Size len,
+ HeapTuple table, ApplyCacheTupleBuf* tuple)
+{
+ xl_heap_header xlhdr;
+ int datalen = len - SizeOfHeapHeader;
+
+ Assert(datalen >= 0);
+ Assert(datalen <= MaxHeapTupleSize);
+
+ tuple->tuple.t_len = datalen + offsetof(HeapTupleHeaderData, t_bits);
+
+ /* not a disk based tuple */
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ /* probably not needed, but ... (is it actually valid to set it?) */
+ tuple->tuple.t_tableOid = HeapTupleGetOid(table);
+ tuple->tuple.t_data = &tuple->header;
+
+ /* data is not stored aligned */
+ memcpy((char *) &xlhdr,
+ data,
+ SizeOfHeapHeader);
+
+ memset(&tuple->header, 0, sizeof(HeapTupleHeaderData));
+
+ memcpy((char *) &tuple->header + offsetof(HeapTupleHeaderData, t_bits),
+ data + SizeOfHeapHeader,
+ datalen);
+
+ tuple->header.t_infomask = xlhdr.t_infomask;
+ tuple->header.t_infomask2 = xlhdr.t_infomask2;
+ tuple->header.t_hoff = xlhdr.t_hoff;
+}
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
new file mode 100644
index 0000000..53088e2
--- /dev/null
+++ b/src/include/replication/decode.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ * decode.h
+ * PostgreSQL WAL to logical transformation
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef DECODE_H
+#define DECODE_H
+
+#include "access/xlogreader.h"
+#include "replication/applycache.h"
+
+void DecodeRecordIntoApplyCache(ApplyCache *cache, XLogRecordBuffer* buf);
+
+typedef struct ReaderApplyState
+{
+ ApplyCache *apply_cache;
+} ReaderApplyState;
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
One solution to avoid loops when doing wal based logical replication in
topologies which are more complex than one unidirectional transport is
introducing the concept of a 'origin_id' into the wal stream. Luckily there is
some padding in the XLogRecord struct that allows us to add that field without
further bloating the struct.
This solution was chosen because it allows for just about any topology and is
inobstrusive.
This adds a new configuration parameter multimaster_node_id which determines
the id used for wal originating in one cluster.
When applying changes from wal from another cluster code can set the variable
current_replication_origin_id. This is a global variable because passing it
through everything which can generate wal would be far to intrusive.
---
src/backend/access/transam/xact.c | 48 +++++++++++++++++++------
src/backend/access/transam/xlog.c | 3 +-
src/backend/access/transam/xlogreader.c | 2 ++
src/backend/replication/logical/Makefile | 2 +-
src/backend/replication/logical/logical.c | 19 ++++++++++
src/backend/utils/misc/guc.c | 19 ++++++++++
src/backend/utils/misc/postgresql.conf.sample | 3 ++
src/include/access/xlog.h | 4 +--
src/include/access/xlogdefs.h | 2 ++
src/include/replication/logical.h | 22 ++++++++++++
10 files changed, 110 insertions(+), 14 deletions(-)
create mode 100644 src/backend/replication/logical/logical.c
create mode 100644 src/include/replication/logical.h
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 3cc2bfa..dc30a17 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,8 +36,9 @@
#include "libpq/be-fsstubs.h"
#include "miscadmin.h"
#include "pgstat.h"
-#include "replication/walsender.h"
+#include "replication/logical.h"
#include "replication/syncrep.h"
+#include "replication/walsender.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
@@ -4545,12 +4546,13 @@ xactGetCommittedChildren(TransactionId **ptr)
* actions for which the order of execution is critical.
*/
static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
- TransactionId *sub_xids, int nsubxacts,
- SharedInvalidationMessage *inval_msgs, int nmsgs,
- RelFileNode *xnodes, int nrels,
- Oid dbId, Oid tsId,
- uint32 xinfo)
+xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
+ XLogRecPtr lsn, XLogRecPtr origin_lsn,
+ TransactionId *sub_xids, int nsubxacts,
+ SharedInvalidationMessage *inval_msgs, int nmsgs,
+ RelFileNode *xnodes, int nrels,
+ Oid dbId, Oid tsId,
+ uint32 xinfo)
{
TransactionId max_xid;
int i;
@@ -4659,8 +4661,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
* Utility function to call xact_redo_commit_internal after breaking down xlrec
*/
static void
+<<<<<<< HEAD
xact_redo_commit(xl_xact_commit *xlrec,
TransactionId xid, XLogRecPtr lsn)
+=======
+xact_redo_commit(xl_xact_commit *xlrec, RepNodeId originating_node,
+ TransactionId xid, XLogRecPtr lsn)
+>>>>>>> Introduce the concept that wal has a 'origin' node
{
TransactionId *subxacts;
SharedInvalidationMessage *inval_msgs;
@@ -4670,18 +4677,26 @@ xact_redo_commit(xl_xact_commit *xlrec,
/* invalidation messages array follows subxids */
inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
+<<<<<<< HEAD
xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
inval_msgs, xlrec->nmsgs,
xlrec->xnodes, xlrec->nrels,
xlrec->dbId,
xlrec->tsId,
xlrec->xinfo);
+=======
+ xact_redo_commit_internal(xid, originating_node, lsn, xlrec->origin_lsn,
+ subxacts, xlrec->nsubxacts, inval_msgs,
+ xlrec->nmsgs, xlrec->xnodes, xlrec->nrels,
+ xlrec->dbId, xlrec->tsId, xlrec->xinfo);
+>>>>>>> Introduce the concept that wal has a 'origin' node
}
/*
* Utility function to call xact_redo_commit_internal for compact form of message.
*/
static void
+<<<<<<< HEAD
xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
TransactionId xid, XLogRecPtr lsn)
{
@@ -4691,6 +4706,18 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
InvalidOid, /* dbId */
InvalidOid, /* tsId */
0); /* xinfo */
+=======
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec, RepNodeId originating_node,
+ TransactionId xid, XLogRecPtr lsn)
+{
+ xact_redo_commit_internal(xid, originating_node, lsn, zeroRecPtr, xlrec->subxacts,
+ xlrec->nsubxacts,
+ NULL, 0, /* inval msgs */
+ NULL, 0, /* relfilenodes */
+ InvalidOid, /* dbId */
+ InvalidOid, /* tsId */
+ 0); /* xinfo */
+>>>>>>> Introduce the concept that wal has a 'origin' node
}
/*
@@ -4786,17 +4813,18 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
/* Backup blocks are not used in xact records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ /* FIXME: we probably shouldn't pass xl_origin_id at multiple places, hm */
if (info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
- xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+ xact_redo_commit_compact(xlrec, record->xl_origin_id, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- xact_redo_commit(xlrec, record->xl_xid, lsn);
+ xact_redo_commit(xlrec, record->xl_origin_id, record->xl_xid, lsn);
}
else if (info == XLOG_XACT_ABORT)
{
@@ -4814,7 +4842,7 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
- xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
+ xact_redo_commit(&xlrec->crec, record->xl_origin_id, xlrec->xid, lsn);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c6feed0..504b4d0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
#include "postmaster/startup.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -1032,7 +1033,7 @@ begin:;
record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info;
record->xl_rmid = rmid;
-
+ record->xl_origin_id = current_replication_origin_id;
/* Now we can finish computing the record's CRC */
COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
SizeOfXLogRecord - sizeof(pg_crc32));
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 6f15d66..bacd31e 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -24,6 +24,7 @@
#include "access/xlogreader.h"
/* FIXME */
+#include "replication/logical.h" /* InvalidMultimasterNodeId */
#include "replication/walsender_private.h"
#include "replication/walprotocol.h"
@@ -563,6 +564,7 @@ XLogReaderRead(XLogReaderState* state)
spacer.xl_len = temp_record->xl_tot_len - SizeOfXLogRecord;
spacer.xl_rmid = RM_XLOG_ID;
spacer.xl_info = XLOG_NOOP;
+ spacer.xl_origin_id = InvalidMultimasterNodeId;
state->writeout_data(state,
(char*)&spacer,
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 7dd9663..c2d6d82 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = applycache.o decode.o
+OBJS = applycache.o decode.o logical.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
new file mode 100644
index 0000000..4f34488
--- /dev/null
+++ b/src/backend/replication/logical/logical.c
@@ -0,0 +1,19 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+#include "replication/logical.h"
+int guc_replication_origin_id = InvalidMultimasterNodeId;
+RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
+XLogRecPtr current_replication_origin_lsn = {0, 0};
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 93c798b..46b0657 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -60,6 +60,7 @@
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/bufmgr.h"
#include "storage/standby.h"
#include "storage/fd.h"
@@ -198,6 +199,7 @@ static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
static bool check_maxconnections(int *newval, void **extra, GucSource source);
static void assign_maxconnections(int newval, void *extra);
+static void assign_replication_node_id(int newval, void *extra);
static bool check_maxworkers(int *newval, void **extra, GucSource source);
static void assign_maxworkers(int newval, void *extra);
static bool check_autovacuum_max_workers(int *newval, void **extra, GucSource source);
@@ -1598,6 +1600,16 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"multimaster_node_id", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("node id for multimaster."),
+ NULL
+ },
+ &guc_replication_origin_id,
+ InvalidMultimasterNodeId, InvalidMultimasterNodeId, MaxMultimasterNodeId,
+ NULL, assign_replication_node_id, NULL
+ },
+
+ {
{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
gettext_noop("Sets the maximum number of concurrent connections."),
NULL
@@ -8629,6 +8641,13 @@ assign_maxconnections(int newval, void *extra)
MaxBackends = newval + MaxWorkers + autovacuum_max_workers + 1;
}
+static void
+assign_replication_node_id(int newval, void *extra)
+{
+ guc_replication_origin_id = newval;
+ current_replication_origin_id = newval;
+}
+
static bool
check_maxworkers(int *newval, void **extra, GucSource source)
{
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ce3fc08..12f8a3f 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -241,6 +241,9 @@
#hot_standby_feedback = off # send info from standby to prevent
# query conflicts
+# - Multi Master Servers -
+
+#multimaster_node_id = 0 #invalid node id
#------------------------------------------------------------------------------
# QUERY TUNING
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 2843aca..dd89cff 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -47,8 +47,8 @@ typedef struct XLogRecord
uint32 xl_len; /* total len of rmgr data */
uint8 xl_info; /* flag bits, see below */
RmgrId xl_rmid; /* resource manager for this record */
-
- /* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
+ RepNodeId xl_origin_id; /* what node did originally cause this record to be written */
+ /* Depending on MAXALIGN, there are either 0 or 4 wasted bytes here */
/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 2768427..6b6700a 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -84,6 +84,8 @@ extern XLogRecPtr zeroRecPtr;
*/
typedef uint32 TimeLineID;
+typedef uint16 RepNodeId;
+
/*
* Because O_DIRECT bypasses the kernel buffers, and because we never
* read those buffers except during crash recovery or if wal_level != minimal,
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
new file mode 100644
index 0000000..0698b61
--- /dev/null
+++ b/src/include/replication/logical.h
@@ -0,0 +1,22 @@
+/*
+ * logical.h
+ *
+ * PostgreSQL logical replication support
+ *
+ * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/replication/logical.h
+ */
+#ifndef LOGICAL_H
+#define LOGICAL_H
+
+#include "access/xlogdefs.h"
+
+extern int guc_replication_origin_id;
+extern RepNodeId current_replication_origin_id;
+extern XLogRecPtr current_replication_origin_lsn;
+
+#define InvalidMultimasterNodeId 0
+#define MaxMultimasterNodeId (2<<3)
+#endif
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
For that add a 'node_id' parameter to most commands dealing with wal
segments. A node_id thats 'InvalidMultimasterNodeId' references local wal,
every other node_id referes to wal in a new pg_lcr directory.
Using duplicated code would reduce the impact of that change but the long-term
code-maintenance burden outweighs that by a far bit.
Besides the decision to add a 'node_id' parameter to several functions the
changes in this patch are fairly mechanical.
---
src/backend/access/transam/xlog.c | 54 ++++++++++++++++-----------
src/backend/replication/basebackup.c | 4 +-
src/backend/replication/walreceiver.c | 2 +-
src/backend/replication/walsender.c | 9 +++--
src/bin/initdb/initdb.c | 1 +
src/bin/pg_resetxlog/pg_resetxlog.c | 2 +-
src/include/access/xlog.h | 2 +-
src/include/access/xlog_internal.h | 13 +++++--
src/include/replication/logical.h | 2 +
src/include/replication/walsender_private.h | 2 +-
10 files changed, 56 insertions(+), 35 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 504b4d0..0622726 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -635,8 +635,8 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
static bool AdvanceXLInsertBuffer(bool new_segment);
static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
-static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
- bool find_free, int *max_advance,
+static bool InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg,
+ char *tmppath, bool find_free, int *max_advance,
bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
int source, bool notexistOk);
@@ -1736,8 +1736,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
/* create/use new log file */
use_existent = true;
- openLogFile = XLogFileInit(openLogId, openLogSeg,
- &use_existent, true);
+ openLogFile = XLogFileInit(InvalidMultimasterNodeId, openLogId,
+ openLogSeg, &use_existent, true);
openLogOff = 0;
}
@@ -2376,6 +2376,9 @@ XLogNeedsFlush(XLogRecPtr record)
* place. This should be TRUE except during bootstrap log creation. The
* caller must *not* hold the lock at call.
*
+ * node_id: if != InvalidMultimasterNodeId this xlog file is actually a LCR
+ * file
+ *
* Returns FD of opened file.
*
* Note: errors here are ERROR not PANIC because we might or might not be
@@ -2384,8 +2387,8 @@ XLogNeedsFlush(XLogRecPtr record)
* in a critical section.
*/
int
-XLogFileInit(uint32 log, uint32 seg,
- bool *use_existent, bool use_lock)
+XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
+ bool *use_existent, bool use_lock)
{
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
@@ -2396,7 +2399,7 @@ XLogFileInit(uint32 log, uint32 seg,
int fd;
int nbytes;
- XLogFilePath(path, ThisTimeLineID, log, seg);
+ XLogFilePath(path, ThisTimeLineID, node_id, log, seg);
/*
* Try to use existent file (checkpoint maker may have created it already)
@@ -2425,6 +2428,11 @@ XLogFileInit(uint32 log, uint32 seg,
*/
elog(DEBUG2, "creating and filling new WAL file");
+ /*
+ * FIXME: to be safe we need to create tempfile in the pg_lcr directory if
+ * its actually an lcr file because pg_lcr might be in a different
+ * partition.
+ */
snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
unlink(tmppath);
@@ -2493,7 +2501,7 @@ XLogFileInit(uint32 log, uint32 seg,
installed_log = log;
installed_seg = seg;
max_advance = XLOGfileslop;
- if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
+ if (!InstallXLogFileSegment(node_id, &installed_log, &installed_seg, tmppath,
*use_existent, &max_advance,
use_lock))
{
@@ -2548,7 +2556,7 @@ XLogFileCopy(uint32 log, uint32 seg,
/*
* Open the source file
*/
- XLogFilePath(path, srcTLI, srclog, srcseg);
+ XLogFilePath(path, srcTLI, InvalidMultimasterNodeId, srclog, srcseg);
srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (srcfd < 0)
ereport(ERROR,
@@ -2619,7 +2627,8 @@ XLogFileCopy(uint32 log, uint32 seg,
/*
* Now move the segment into place with its final name.
*/
- if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
+ if (!InstallXLogFileSegment(InvalidMultimasterNodeId, &log, &seg, tmppath,
+ false, NULL, false))
elog(ERROR, "InstallXLogFileSegment should not have failed");
}
@@ -2653,14 +2662,14 @@ XLogFileCopy(uint32 log, uint32 seg,
* file into place.
*/
static bool
-InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
+InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance,
bool use_lock)
{
char path[MAXPGPATH];
struct stat stat_buf;
- XLogFilePath(path, ThisTimeLineID, *log, *seg);
+ XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);
/*
* We want to be sure that only one process does this at a time.
@@ -2687,7 +2696,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
}
NextLogSeg(*log, *seg);
(*max_advance)--;
- XLogFilePath(path, ThisTimeLineID, *log, *seg);
+ XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);
}
}
@@ -2736,7 +2745,7 @@ XLogFileOpen(uint32 log, uint32 seg)
char path[MAXPGPATH];
int fd;
- XLogFilePath(path, ThisTimeLineID, log, seg);
+ XLogFilePath(path, ThisTimeLineID, InvalidMultimasterNodeId, log, seg);
fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
S_IRUSR | S_IWUSR);
@@ -2783,7 +2792,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
case XLOG_FROM_PG_XLOG:
case XLOG_FROM_STREAM:
- XLogFilePath(path, tli, log, seg);
+ XLogFilePath(path, tli, InvalidMultimasterNodeId, log, seg);
restoredFromArchive = false;
break;
@@ -2804,7 +2813,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
bool reload = false;
struct stat statbuf;
- XLogFilePath(xlogfpath, tli, log, seg);
+ XLogFilePath(xlogfpath, tli, InvalidMultimasterNodeId, log, seg);
if (stat(xlogfpath, &statbuf) == 0)
{
if (unlink(xlogfpath) != 0)
@@ -2922,7 +2931,7 @@ XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
}
/* Couldn't find it. For simplicity, complain about front timeline */
- XLogFilePath(path, recoveryTargetTLI, log, seg);
+ XLogFilePath(path, recoveryTargetTLI, InvalidMultimasterNodeId, log, seg);
errno = ENOENT;
ereport(emode,
(errcode_for_file_access(),
@@ -3366,7 +3375,8 @@ PreallocXlogFiles(XLogRecPtr endptr)
{
NextLogSeg(_logId, _logSeg);
use_existent = true;
- lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
+ lf = XLogFileInit(InvalidMultimasterNodeId, _logId, _logSeg,
+ &use_existent, true);
close(lf);
if (!use_existent)
CheckpointStats.ckpt_segs_added++;
@@ -3486,8 +3496,9 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
* separate archive directory.
*/
if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
- InstallXLogFileSegment(&endlogId, &endlogSeg, path,
- true, &max_advance, true))
+ InstallXLogFileSegment(InvalidMultimasterNodeId, &endlogId,
+ &endlogSeg, path, true,
+ &max_advance, true))
{
ereport(DEBUG2,
(errmsg("recycled transaction log file \"%s\"",
@@ -5255,7 +5266,8 @@ BootStrapXLOG(void)
/* Create first XLOG segment file */
use_existent = false;
- openLogFile = XLogFileInit(0, 1, &use_existent, false);
+ openLogFile = XLogFileInit(InvalidMultimasterNodeId, 0, 1,
+ &use_existent, false);
/* Write the first page with the initial record */
errno = 0;
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 0bc88a4..47e4641 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -245,7 +245,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
char fn[MAXPGPATH];
int i;
- XLogFilePath(fn, ThisTimeLineID, logid, logseg);
+ XLogFilePath(fn, ThisTimeLineID, InvalidMultimasterNodeId, logid, logseg);
_tarWriteHeader(fn, NULL, &statbuf);
/* Send the actual WAL file contents, block-by-block */
@@ -264,7 +264,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
* http://lists.apple.com/archives/xcode-users/2003/Dec//msg000
* 51.html
*/
- XLogRead(buf, ptr, TAR_SEND_SIZE);
+ XLogRead(buf, InvalidMultimasterNodeId, ptr, TAR_SEND_SIZE);
if (pq_putmessage('d', buf, TAR_SEND_SIZE))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 650b74f..e97196b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -509,7 +509,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Create/use new log file */
XLByteToSeg(recptr, recvId, recvSeg);
use_existent = true;
- recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
+ recvFile = XLogFileInit(InvalidMultimasterNodeId, recvId, recvSeg, &use_existent, true);
recvOff = 0;
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e44c734..8cd3a00 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -977,7 +977,7 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count)
{
char *p;
XLogRecPtr recptr;
@@ -1009,8 +1009,8 @@ retry:
close(sendFile);
XLByteToSeg(recptr, sendId, sendSeg);
- XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
-
+ XLogFilePath(path, ThisTimeLineID, node_id,
+ sendId, sendSeg);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
{
@@ -1215,7 +1215,8 @@ XLogSend(char *msgbuf, bool *caughtup)
* Read the log directly into the output buffer to avoid extra memcpy
* calls.
*/
- XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
+ XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), InvalidMultimasterNodeId,
+ startptr, nbytes);
/*
* We fill the message header last so that the send timestamp is taken as
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 3789948..1f26382 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -2637,6 +2637,7 @@ main(int argc, char *argv[])
"global",
"pg_xlog",
"pg_xlog/archive_status",
+ "pg_lcr",
"pg_clog",
"pg_notify",
"pg_serial",
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index 65ba910..7ee3a3a 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -973,7 +973,7 @@ WriteEmptyXLOG(void)
/* Write the first page */
XLogFilePath(path, ControlFile.checkPointCopy.ThisTimeLineID,
- newXlogId, newXlogSeg);
+ InvalidMultimasterNodeId, newXlogId, newXlogSeg);
unlink(path);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index dd89cff..3b02c0b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -268,7 +268,7 @@ extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
-extern int XLogFileInit(uint32 log, uint32 seg,
+extern int XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
extern int XLogFileOpen(uint32 log, uint32 seg);
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3328a50..deadddf 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -19,6 +19,7 @@
#include "access/xlog.h"
#include "fmgr.h"
#include "pgtime.h"
+#include "replication/logical.h"
#include "storage/block.h"
#include "storage/relfilenode.h"
@@ -216,14 +217,11 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
#define MAXFNAMELEN 64
#define XLogFileName(fname, tli, log, seg) \
- snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
+ snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg);
#define XLogFromFileName(fname, tli, log, seg) \
sscanf(fname, "%08X%08X%08X", tli, log, seg)
-#define XLogFilePath(path, tli, log, seg) \
- snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg)
-
#define TLHistoryFileName(fname, tli) \
snprintf(fname, MAXFNAMELEN, "%08X.history", tli)
@@ -239,6 +237,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
#define BackupHistoryFilePath(path, tli, log, seg, offset) \
snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X.%08X.backup", tli, log, seg, offset)
+/* FIXME: move to xlogutils.c, needs to fix sharing with receivexlog.c first though */
+static inline int XLogFilePath(char* path, TimeLineID tli, RepNodeId node_id, uint32 log, uint32 seg){
+ if(node_id == InvalidMultimasterNodeId)
+ return snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg);
+ else
+ return snprintf(path, MAXPGPATH, LCRDIR "/%d/%08X%08X%08X", node_id, tli, log, seg);
+}
/*
* Method table for resource managers.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 0698b61..8f44fad 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -19,4 +19,6 @@ extern XLogRecPtr current_replication_origin_lsn;
#define InvalidMultimasterNodeId 0
#define MaxMultimasterNodeId (2<<3)
+
+#define LCRDIR "pg_lcr"
#endif
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 66234cd..bc58ff4 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl;
extern void WalSndSetState(WalSndState state);
-extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+extern void XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count);
/*
* Internal functions for parsing the replication grammar, in repl_gram.y and
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
In order to have restartable replication with minimal additional writes its
very useful to know up to which point we have replayed/received changes from a
foreign node.
One representation of that is the lsn of changes at the originating cluster.
We need to keep track of the point up to which we received data and up to where
we applied data.
For that we added a field 'origin_lsn' to commit records. This allows to keep
track of the apply position with crash recovery with minimal additional io. We
only added the field to non-compact commit records to reduce the overhead in
case logical replication is not used.
Checkpoints need to keep track of the apply/receive positions as well because
otherwise it would be hard to determine the lsn from where to restart
receive/apply after a shutdown/crash if no changes happened since the last
shutdown/crash.
While running the startup process, the walreceiver and a (future) apply process
will need a coherent picture those two states so add shared memory state to
keep track of it. Currently this is represented in the walreceivers shared
memory segment. This will likely need to change.
During crash recovery/physical replication the origin_lsn field of commit
records is used to update the shared memory, and thus the next checkpoint's,
notion of the apply state.
Missing:
- For correct crash recovery we need more state than the 'apply lsn' because
transactions on the originating side can overlap. At the lsn we just applied
many other transaction can be in-progres. To correctly handle that we need to
keep track of oldest start lsn of any transaction currently being reassembled
(c.f. ApplyCache). Then we can start to reassemble the ApplyCache up from
that point and throw away any transaction which comitted before the
recorded/recovered apply lsn.
It should be sufficient to store that knowledge in shared memory and
checkpoint records.
---
src/backend/access/transam/xact.c | 22 ++++++++-
src/backend/access/transam/xlog.c | 73 ++++++++++++++++++++++++++++
src/backend/replication/walreceiverfuncs.c | 8 +++
src/include/access/xact.h | 1 +
src/include/catalog/pg_control.h | 13 ++++-
src/include/replication/walreceiver.h | 13 +++++
6 files changed, 128 insertions(+), 2 deletions(-)
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index dc30a17..40ac965 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -39,11 +39,13 @@
#include "replication/logical.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "replication/walreceiver.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
#include "storage/smgr.h"
+#include "storage/spin.h"
#include "utils/combocid.h"
#include "utils/guc.h"
#include "utils/inval.h"
@@ -1015,7 +1017,8 @@ RecordTransactionCommit(void)
/*
* Do we need the long commit record? If not, use the compact format.
*/
- if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
+ if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
+ (wal_level == WAL_LEVEL_LOGICAL && current_replication_origin_id != guc_replication_origin_id))
{
XLogRecData rdata[4];
int lastrdata = 0;
@@ -1037,6 +1040,8 @@ RecordTransactionCommit(void)
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
xlrec.nmsgs = nmsgs;
+ xlrec.origin_lsn = current_replication_origin_lsn;
+
rdata[0].data = (char *) (&xlrec);
rdata[0].len = MinSizeOfXactCommit;
rdata[0].buffer = InvalidBuffer;
@@ -4575,6 +4580,21 @@ xact_redo_commit_internal(TransactionId xid, RepNodeId originating_node,
LWLockRelease(XidGenLock);
}
+ /*
+ * record where were at wrt to recovery. We need that to know from where on
+ * to restart applying logical change records
+ */
+ if(LogicalWalReceiverActive() && !XLByteEQ(origin_lsn, zeroRecPtr))
+ {
+ /*
+ * probably we don't need the locking because no lcr receiver can run
+ * yet.
+ */
+ SpinLockAcquire(&WalRcv->mutex);
+ WalRcv->mm_applyState[originating_node] = origin_lsn;
+ SpinLockRelease(&WalRcv->mutex);
+ }
+
if (standbyState == STANDBY_DISABLED)
{
/*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0622726..20a4611 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5183,6 +5183,7 @@ BootStrapXLOG(void)
uint64 sysidentifier;
struct timeval tv;
pg_crc32 crc;
+ int i;
/*
* Select a hopefully-unique system identifier code for this installation.
@@ -5229,6 +5230,13 @@ BootStrapXLOG(void)
checkPoint.time = (pg_time_t) time(NULL);
checkPoint.oldestActiveXid = InvalidTransactionId;
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId;
+ i++){
+ checkPoint.logicalReceiveState[i] = zeroRecPtr;
+ checkPoint.logicalApplyState[i] = zeroRecPtr;
+ }
+
+
ShmemVariableCache->nextXid = checkPoint.nextXid;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
@@ -6314,6 +6322,53 @@ StartupXLOG(void)
InRecovery = true;
}
+ /*
+ * setup shared memory state for logical wal receiver
+ *
+ * Do this unconditionally so enabling/disabling/enabling logical replay
+ * doesn't loose information due to rewriting pg_control
+ */
+ {
+ int i;
+
+ Assert(WalRcv);
+ /* locking is not really required here afaics, but ... */
+ SpinLockAcquire(&WalRcv->mutex);
+
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++)
+ {
+ XLogRecPtr* receiveState = &ControlFile->checkPointCopy.logicalReceiveState[i];
+ XLogRecPtr* applyState = &ControlFile->checkPointCopy.logicalApplyState[i];
+ if(i == guc_replication_origin_id && (
+ !XLByteEQ(*receiveState, zeroRecPtr) ||
+ !XLByteEQ(*applyState, zeroRecPtr))
+ )
+ {
+ elog(WARNING, "logical recovery state for own db. apply: %X/%X, receive %X/%X, origin %d",
+ applyState->xlogid, applyState->xrecoff,
+ receiveState->xlogid, receiveState->xrecoff,
+ guc_replication_origin_id);
+ WalRcv->mm_receiveState[i] = zeroRecPtr;
+ WalRcv->mm_applyState[i] = zeroRecPtr;
+ }
+ else{
+ WalRcv->mm_receiveState[i] = *receiveState;
+ WalRcv->mm_applyState[i] = *applyState;
+ }
+ }
+ SpinLockRelease(&WalRcv->mutex);
+
+ /* FIXME: remove at some point */
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++){
+ elog(LOG, "restored apply state for node %d to %X/%X, receive %X/%X",
+ i,
+ WalRcv->mm_applyState[i].xlogid, WalRcv->mm_applyState[i].xrecoff,
+ WalRcv->mm_receiveState[i].xlogid, WalRcv->mm_receiveState[i].xrecoff);
+ }
+ }
+
/* REDO */
if (InRecovery)
{
@@ -7906,6 +7961,24 @@ CreateCheckPoint(int flags)
&checkPoint.nextMultiOffset);
/*
+ * fill out where are at wrt logical replay. Do this unconditionally so we
+ * don't loose information due to rewriting pg_control when toggling
+ * logical replay
+ */
+ {
+ int i;
+ SpinLockAcquire(&WalRcv->mutex);
+
+ for(i = InvalidMultimasterNodeId + 1; i < MaxMultimasterNodeId - 1;
+ i++){
+ checkPoint.logicalApplyState[i] = WalRcv->mm_applyState[i];
+ checkPoint.logicalReceiveState[i] = WalRcv->mm_receiveState[i];
+ }
+ SpinLockRelease(&WalRcv->mutex);
+ elog(LOG, "updated logical checkpoint data");
+ }
+
+ /*
* Having constructed the checkpoint record, ensure all shmem disk buffers
* and commit-log buffers are flushed to disk.
*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 876196f..cb49282 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,6 +64,14 @@ WalRcvShmemInit(void)
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
+
+ memset(&WalRcv->mm_receiveState,
+ 0, sizeof(WalRcv->mm_receiveState));
+ memset(&WalRcv->mm_applyState,
+ 0, sizeof(WalRcv->mm_applyState));
+
+ memset(&WalRcv->mm_receiveLatch,
+ 0, sizeof(WalRcv->mm_receiveLatch));
}
}
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b12d2a0..2757782 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -137,6 +137,7 @@ typedef struct xl_xact_commit
int nmsgs; /* number of shared inval msgs */
Oid dbId; /* MyDatabaseId */
Oid tsId; /* MyDatabaseTableSpace */
+ XLogRecPtr origin_lsn; /* location of originating commit */
/* Array of RelFileNode(s) to drop at commit */
RelFileNode xnodes[1]; /* VARIABLE LENGTH ARRAY */
/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index 5cff396..bc6316e 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -16,12 +16,13 @@
#define PG_CONTROL_H
#include "access/xlogdefs.h"
+#include "replication/logical.h"
#include "pgtime.h" /* for pg_time_t */
#include "utils/pg_crc.h"
/* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION 922
+#define PG_CONTROL_VERSION 923
/*
* Body of CheckPoint XLOG records. This is declared here because we keep
@@ -50,6 +51,13 @@ typedef struct CheckPoint
* it's set to InvalidTransactionId.
*/
TransactionId oldestActiveXid;
+
+ /*
+ * The replay state from every other node. This is only needed if wal_level
+ * >= logical and thus is only filled then.
+ */
+ XLogRecPtr logicalApplyState[MaxMultimasterNodeId - 1];
+ XLogRecPtr logicalReceiveState[MaxMultimasterNodeId - 1];
} CheckPoint;
/* XLOG info values for XLOG rmgr */
@@ -85,6 +93,9 @@ typedef enum DBState
* NOTE: try to keep this under 512 bytes so that it will fit on one physical
* sector of typical disk drives. This reduces the odds of corruption due to
* power failure midway through a write.
+ *
+ * FIXME: in order to allow many nodes in mm (which increases checkpoint size)
+ * we should change the writing of this to write(temp_file);fsync();rename();fsync();
*/
typedef struct ControlFileData
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index d21ec94..c9ab1be 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -14,6 +14,8 @@
#include "access/xlog.h"
#include "access/xlogdefs.h"
+#include "replication/logical.h"
+#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
@@ -90,6 +92,17 @@ typedef struct
char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */
+
+ /*
+ * replay point up to which we replayed for every node
+ * XXX: should possibly be dynamically sized?
+ * FIXME: should go to its own shm segment?
+ */
+ XLogRecPtr mm_receiveState[MaxMultimasterNodeId - 1];
+ XLogRecPtr mm_applyState[MaxMultimasterNodeId - 1];
+
+ Latch* mm_receiveLatch[MaxMultimasterNodeId - 1];
+
} WalRcvData;
extern WalRcvData *WalRcv;
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
A logical WALReceiver is started directly by Postmaster when we enter PM_RUN
state and the new parameter multimaster_conninfo is set. For now only one of
those is started, but the code doesn't rely on that. In future multiple ones
should be allowed.
To transfer that data a new command, START_LOGICAL_REPLICATION is introduced in
the walsender reusing most of the infrastructure for START_REPLICATION. The
former uses the same on-the-wire format as the latter.
To make initialization possibly IDENTIFY_SYSTEM returns two new columns node_id
returning the multimaster_node_id and last_checkpoint returning the RedoRecPtr.
The walreceiver writes that data into the previously introduce pg_lcr/$node_id
directory.
Future Directions/TODO:
- pass node_ids were interested in to START_LOGICAL_REPLICATION to allow
complex topologies
- allow to pass filters to reduce the transfer volume
- compress the transferred data by actually removing uninteresting records
instead of replacing them by NOOP records. This adds some complexities
because we still need to map the received lsn to the requested lsn so we know
where to restart transferring data and such.
- check that wal on the sending side was generated with WAL_LEVEL_LOGICAL
---
src/backend/postmaster/postmaster.c | 10 +-
.../libpqwalreceiver/libpqwalreceiver.c | 104 ++++-
src/backend/replication/repl_gram.y | 19 +-
src/backend/replication/repl_scanner.l | 1 +
src/backend/replication/walreceiver.c | 165 +++++++-
src/backend/replication/walreceiverfuncs.c | 1 +
src/backend/replication/walsender.c | 422 +++++++++++++++-----
src/backend/utils/misc/guc.c | 9 +
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/nodes/nodes.h | 1 +
src/include/nodes/replnodes.h | 10 +
src/include/replication/logical.h | 4 +
src/include/replication/walreceiver.h | 9 +-
13 files changed, 624 insertions(+), 132 deletions(-)
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 71cfd6d..13e9592 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1449,6 +1449,11 @@ ServerLoop(void)
kill(AutoVacPID, SIGUSR2);
}
+ /* Restart walreceiver process in certain states only. */
+ if (WalReceiverPID == 0 && pmState == PM_RUN &&
+ LogicalWalReceiverActive())
+ WalReceiverPID = StartWalReceiver();
+
/* Check all the workers requested are running. */
if (pmState == PM_RUN)
StartBackgroundWorkers();
@@ -2169,7 +2174,8 @@ pmdie(SIGNAL_ARGS)
/* and the walwriter too */
if (WalWriterPID != 0)
signal_child(WalWriterPID, SIGTERM);
-
+ if (WalReceiverPID != 0)
+ signal_child(WalReceiverPID, SIGTERM);
/*
* If we're in recovery, we can't kill the startup process
* right away, because at present doing so does not release
@@ -2421,6 +2427,8 @@ reaper(SIGNAL_ARGS)
PgArchPID = pgarch_start();
if (PgStatPID == 0)
PgStatPID = pgstat_start();
+ if (WalReceiverPID == 0 && LogicalWalReceiverActive())
+ WalReceiverPID = StartWalReceiver();
StartBackgroundWorkers();
/* at this point we are really open for business */
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 979b66b..0ea3fce 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -46,7 +46,8 @@ static PGconn *streamConn = NULL;
static char *recvBuf = NULL;
/* Prototypes for interface functions */
-static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_connect(char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery);
+static bool libpqrcv_start(char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
static void libpqrcv_send(const char *buffer, int nbytes);
@@ -63,10 +64,12 @@ void
_PG_init(void)
{
/* Tell walreceiver how to reach us */
- if (walrcv_connect != NULL || walrcv_receive != NULL ||
- walrcv_send != NULL || walrcv_disconnect != NULL)
+ if (walrcv_connect != NULL || walrcv_start != NULL ||
+ walrcv_receive != NULL || walrcv_send != NULL ||
+ walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
+ walrcv_start = libpqrcv_start;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
@@ -76,7 +79,7 @@ _PG_init(void)
* Establish the connection to the primary server for XLOG streaming
*/
static bool
-libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+libpqrcv_connect(char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery)
{
char conninfo_repl[MAXCONNINFO + 75];
char *primary_sysid;
@@ -84,7 +87,8 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
TimeLineID primary_tli;
TimeLineID standby_tli;
PGresult *res;
- char cmd[64];
+
+ elog(LOG, "wal receiver connecting");
/*
* Connect using deliberately undocumented parameter: replication. The
@@ -96,10 +100,16 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
conninfo);
streamConn = PQconnectdb(conninfo_repl);
- if (PQstatus(streamConn) != CONNECTION_OK)
+ if (PQstatus(streamConn) != CONNECTION_OK){
+ /*
+ * FIXME: its very annoying for development if the whole buffer is
+ * immediately filled. We need a better solution.
+ */
+ pg_usleep(1000000);
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn))));
+ }
/*
* Get the system identifier and timeline ID as a DataRow message from the
@@ -114,7 +124,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
"the primary server: %s",
PQerrorMessage(streamConn))));
}
- if (PQnfields(res) != 3 || PQntuples(res) != 1)
+ if (PQnfields(res) != 5 || PQntuples(res) != 1)
{
int ntuples = PQntuples(res);
int nfields = PQnfields(res);
@@ -122,14 +132,40 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
PQclear(res);
ereport(ERROR,
(errmsg("invalid response from primary server"),
- errdetail("Expected 1 tuple with 3 fields, got %d tuples with %d fields.",
+ errdetail("Expected 1 tuple with 5 fields, got %d tuples with %d fields.",
ntuples, nfields)));
}
primary_sysid = PQgetvalue(res, 0, 0);
+
primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+ /* FIXME: this should be already implemented nicely somewhere? */
+ if(sscanf(PQgetvalue(res, 0, 2),
+ "%X/%X", &where_at->xlogid, &where_at->xrecoff) != 2){
+ elog(FATAL, "couldn't parse the xlog address from the other side: %s",
+ PQgetvalue(res, 0, 2));
+ }
+
+ elog(LOG, "other end is currently at %X/%X",
+ where_at->xlogid, where_at->xrecoff);
+
+ receiving_from_node_id = pg_atoi(PQgetvalue(res, 0, 3), 4, 0);
+
+ /* FIXME: this should be already implemented nicely somewhere? */
+ if(sscanf(PQgetvalue(res, 0, 4),
+ "%X/%X", &redo->xlogid, &redo->xrecoff) != 2){
+ elog(FATAL, "couldn't parse the xlog address from the other side: %s",
+ PQgetvalue(res, 0, 4));
+ }
+
+ elog(LOG, "other end's redo is currently at %X/%X",
+ redo->xlogid, redo->xrecoff);
+
+
/*
* Confirm that the system identifier of the primary is the same as ours.
+ *
+ * FIXME: do we wan't that restriction for mm?
*/
snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
GetSystemIdentifier());
@@ -142,21 +178,49 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
primary_sysid, standby_sysid)));
}
- /*
- * Confirm that the current timeline of the primary is the same as the
- * recovery target timeline.
- */
- standby_tli = GetRecoveryTargetTLI();
PQclear(res);
- if (primary_tli != standby_tli)
- ereport(ERROR,
- (errmsg("timeline %u of the primary does not match recovery target timeline %u",
- primary_tli, standby_tli)));
- ThisTimeLineID = primary_tli;
/* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
- startpoint.xlogid, startpoint.xrecoff);
+ if (startedDuringRecovery)
+ {
+ /*
+ * Confirm that the current timeline of the primary is the same as the
+ * recovery target timeline.
+ */
+ standby_tli = GetRecoveryTargetTLI();
+ if (primary_tli != standby_tli)
+ ereport(ERROR,
+ (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+ primary_tli, standby_tli)));
+ ThisTimeLineID = primary_tli;
+ }
+
+ return true;
+}
+
+/*
+ * start streaming data
+ */
+static bool
+libpqrcv_start(char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery)
+{
+ PGresult *res;
+ char cmd[64];
+
+ if(startedDuringRecovery)
+ {
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
+ startpoint->xlogid, startpoint->xrecoff);
+ }
+ else
+ {
+ /* ignore the timeline */
+ elog(LOG, "receiving_from_node_id: %u at %X/%X", receiving_from_node_id,
+ startpoint->xlogid, startpoint->xrecoff);
+ snprintf(cmd, sizeof(cmd), "START_LOGICAL_REPLICATION %X/%X",
+ startpoint->xlogid, startpoint->xrecoff);
+ }
+
res = libpqrcv_PQexec(cmd);
if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index b6cfdac..b49ae6f 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,9 +76,10 @@ Node *replication_parse_result;
%token K_NOWAIT
%token K_WAL
%token K_START_REPLICATION
+%token K_START_LOGICAL_REPLICATION
%type <node> command
-%type <node> base_backup start_replication identify_system
+%type <node> base_backup start_replication start_logical_replication identify_system
%type <list> base_backup_opt_list
%type <defelt> base_backup_opt
%%
@@ -97,6 +98,7 @@ command:
identify_system
| base_backup
| start_replication
+ | start_logical_replication
;
/*
@@ -166,6 +168,21 @@ start_replication:
$$ = (Node *) cmd;
}
;
+
+/*
+ * START_LOGICAL_REPLICATION %X/%X
+ */
+start_logical_replication:
+ K_START_LOGICAL_REPLICATION RECPTR
+ {
+ StartLogicalReplicationCmd *cmd;
+
+ cmd = makeNode(StartLogicalReplicationCmd);
+ cmd->startpoint = $2;
+
+ $$ = (Node *) cmd;
+ }
+ ;
%%
#include "repl_scanner.c"
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 9d4edcf..f8be982 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -64,6 +64,7 @@ NOWAIT { return K_NOWAIT; }
PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; }
START_REPLICATION { return K_START_REPLICATION; }
+START_LOGICAL_REPLICATION { return K_START_LOGICAL_REPLICATION; }
"," { return ','; }
";" { return ';'; }
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e97196b..73a3021 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -44,6 +44,7 @@
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
+#include "replication/logical.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
@@ -58,9 +59,12 @@ bool am_walreceiver;
/* GUC variable */
int wal_receiver_status_interval;
bool hot_standby_feedback;
+char *mm_conninfo = 0;
+RepNodeId receiving_from_node_id = InvalidMultimasterNodeId;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
+walrcv_start_type walrcv_start = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
@@ -93,9 +97,13 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+XLogRecPtr curRecv;
+
static StandbyReplyMessage reply_message;
static StandbyHSFeedbackMessage feedback_message;
+static bool startedDuringRecovery; /* are we going to receive WAL data */
+
/*
* About SIGTERM handling:
*
@@ -122,6 +130,9 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
+
+static void LogicalWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
+
static void XLogWalRcvSendReply(void);
static void XLogWalRcvSendHSFeedback(void);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -170,13 +181,17 @@ void
WalReceiverMain(void)
{
char conninfo[MAXCONNINFO];
- XLogRecPtr startpoint;
+ XLogRecPtr startpoint = {0, 0};
+ XLogRecPtr other_end_at;
+ XLogRecPtr other_end_redo;
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
am_walreceiver = true;
+ elog(LOG, "wal receiver starting");
+
/*
* WalRcv should be set up already (if we are a backend, we inherit this
* by fork() or EXEC_BACKEND mechanism from the postmaster).
@@ -200,8 +215,11 @@ WalReceiverMain(void)
/* fall through */
case WALRCV_STOPPED:
- SpinLockRelease(&walrcv->mutex);
- proc_exit(1);
+ if (startedDuringRecovery)
+ {
+ SpinLockRelease(&walrcv->mutex);
+ proc_exit(1);
+ }
break;
case WALRCV_STARTING:
@@ -212,13 +230,35 @@ WalReceiverMain(void)
/* Shouldn't happen */
elog(PANIC, "walreceiver still running according to shared memory state");
}
- /* Advertise our PID so that the startup process can kill us */
+ /* Advertise our PID so that we can be killed */
walrcv->pid = MyProcPid;
walrcv->walRcvState = WALRCV_RUNNING;
- /* Fetch information required to start streaming */
- strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
- startpoint = walrcv->receiveStart;
+ /*
+ * Fetch information required to start streaming.
+ *
+ * During recovery the WALReceiver is started from the Startup process,
+ * by sending a postmaster signal. In normal running the Postmaster
+ * starts the WALReceiver directly. In that case the walrcv shmem struct
+ * is simply zeroed, so walrcv->startedDuringRecovery will show as false.
+ *
+ * The connection info required to access the upstream master comes from
+ * the multimaster_conninfo parameter, stored in the mm_conninfo variable.
+ *
+ * XXX The starting point for logical replication is not yet determined.
+ */
+ startedDuringRecovery = walrcv->startedDuringRecovery;
+ if (startedDuringRecovery)
+ {
+ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ startpoint = walrcv->receiveStart;
+ }
+ else
+ {
+ elog(LOG, "logical replication starting: %s", mm_conninfo);
+ strlcpy(conninfo, (char *) mm_conninfo, MAXCONNINFO);
+ /* The startpoint for logical replay can only be determined after connecting */
+ }
/* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
@@ -262,8 +302,9 @@ WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_send == NULL || walrcv_disconnect == NULL)
+ if (walrcv_connect == NULL || walrcv_start == NULL ||
+ walrcv_receive == NULL || walrcv_send == NULL ||
+ walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
@@ -277,7 +318,58 @@ WalReceiverMain(void)
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
- walrcv_connect(conninfo, startpoint);
+ walrcv_connect(conninfo, &other_end_redo, &other_end_at, startedDuringRecovery);
+
+ if(LogicalWalReceiverActive()){
+ char buf[MAXPGPATH];
+
+ if(RecoveryInProgress()){
+ elog(FATAL, "cannot have the logical receiver running while recovery is ongoing");
+ }
+
+ if(receiving_from_node_id == InvalidMultimasterNodeId)
+ elog(FATAL, "didn't setup/derive other node id");
+
+ Assert(WalRcv);
+
+ startpoint = WalRcv->mm_receiveState[receiving_from_node_id];
+
+ /*
+ * in this case we connect to some master we haven't yet received data
+ * from yet.
+ * FIXME: This means we would need to initialize the local cluster!
+ */
+ if(XLByteEQ(startpoint, zeroRecPtr)){
+ startpoint = other_end_redo;
+
+ /* we need to scroll back to the begin of the segment */
+ startpoint.xrecoff -= startpoint.xrecoff % XLogSegSize;
+
+ WalRcv->mm_receiveState[receiving_from_node_id] = startpoint;
+
+ WalRcv->mm_applyState[receiving_from_node_id] = other_end_redo;
+
+ /* FIXME: this should be an ereport */
+ elog(LOG, "initializing recovery from logical node %d to %X/%X, transfer from %X/%X",
+ receiving_from_node_id,
+ other_end_at.xlogid, other_end_at.xrecoff,
+ startpoint.xlogid, startpoint.xrecoff);
+ }
+ else if(XLByteLT(other_end_at, startpoint)){
+ elog(FATAL, "something went wrong, the other side has a too small xlogid/xlrecoff. Other: %X/%X, self: %X/%X",
+ other_end_at.xlogid, other_end_at.xrecoff,
+ startpoint.xlogid, startpoint.xrecoff);
+ }
+
+ /*
+ * the set of foreign nodes can increase all the time, so we just make
+ * sure the particular one we need exists.
+ */
+ snprintf(buf, MAXPGPATH-1, "%s/%u", LCRDIR, receiving_from_node_id);
+ pg_mkdir_p(buf, S_IRWXU);
+ }
+
+ walrcv_start(conninfo, &startpoint, startedDuringRecovery);
DisableWalRcvImmediateExit();
/* Loop until end-of-streaming or error */
@@ -298,7 +390,7 @@ WalReceiverMain(void)
* Exit walreceiver if we're not in recovery. This should not happen,
* but cross-check the status here.
*/
- if (!RecoveryInProgress())
+ if (!RecoveryInProgress() && !LogicalWalReceiverActive())
ereport(FATAL,
(errmsg("cannot continue WAL streaming, recovery has already ended")));
@@ -443,7 +535,17 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
- XLogWalRcvWrite(buf, len, msghdr.dataStart);
+
+ /*
+ * The WALReceiver connects either during recovery or during
+ * normal running. During recovery then pure WAL data is
+ * received, whereas during normal running we send Logical
+ * Change Records (LCRs) which are stored differently.
+ */
+ if (LogicalWalReceiverActive())
+ XLogWalRcvWrite(buf, len, msghdr.dataStart);
+ else
+ LogicalWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
case 'k': /* Keepalive */
@@ -477,6 +579,10 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
int startoff;
int byteswritten;
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "received data len %lu, at %X/%X",
+ nbytes, recptr.xlogid, recptr.xrecoff);
+#endif
while (nbytes > 0)
{
int segbytes;
@@ -509,7 +615,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Create/use new log file */
XLByteToSeg(recptr, recvId, recvSeg);
use_existent = true;
- recvFile = XLogFileInit(InvalidMultimasterNodeId, recvId, recvSeg, &use_existent, true);
+ recvFile = XLogFileInit(receiving_from_node_id, recvId, recvSeg, &use_existent, true);
recvOff = 0;
}
@@ -585,6 +691,27 @@ XLogWalRcvFlush(bool dying)
{
walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = LogstreamResult.Flush;
+
+ /* FIXME */
+ if(LogicalWalReceiverActive()){
+ if(XLByteLE(curRecv, LogstreamResult.Write)){
+ WalRcv->mm_receiveState[receiving_from_node_id] = curRecv;
+
+ if(WalRcv->mm_receiveLatch[receiving_from_node_id])
+ SetLatch(WalRcv->mm_receiveLatch[receiving_from_node_id]);
+#if 0
+ elog(LOG, "confirming flush to %X/%X",
+ curRecv.xlogid, curRecv.xrecoff);
+#endif
+ }
+ else{
+#if 0
+ elog(LOG, "not conf flush to %X/%X, wrote to %X/%X",
+ curRecv.xlogid, curRecv.xrecoff,
+ LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff);
+#endif
+ }
+ }
}
SpinLockRelease(&walrcv->mutex);
@@ -614,6 +741,15 @@ XLogWalRcvFlush(bool dying)
}
/*
+ * Handle LCR data.
+ */
+static void
+LogicalWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
+{
+ elog(LOG, "received msg of length %u", (uint) nbytes);
+}
+
+/*
* Send reply message to primary, indicating our current XLOG positions and
* the current time.
*/
@@ -750,6 +886,9 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
+ /* we need to store that in shmem */
+ curRecv = walEnd;
+
if (log_min_messages <= DEBUG2)
elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
timestamptz_to_str(sendTime),
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index cb49282..aa07746 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -207,6 +207,7 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
walrcv->conninfo[0] = '\0';
walrcv->walRcvState = WALRCV_STARTING;
walrcv->startTime = now;
+ walrcv->startedDuringRecovery = true;
/*
* If this is the first startup of walreceiver, we initialize receivedUpto
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8cd3a00..d2e1c76 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -37,9 +37,13 @@
#include <signal.h>
#include <unistd.h>
+#include "access/xlogreader.h"
#include "access/transam.h"
+#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_control.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
@@ -64,6 +68,7 @@
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "utils/timestamp.h"
+#include "utils/syscache.h"
/* Array of WalSnds in shared memory */
@@ -74,9 +79,12 @@ WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+
bool am_cascading_walsender = false; /* Am I cascading WAL to
* another standby ? */
+bool am_doing_logical = false; /* Am I sending logical changes instead of physical ones */
+
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int replication_timeout = 60 * 1000; /* maximum time to send one
@@ -112,6 +120,12 @@ static TimestampTz last_reply_timestamp;
*/
static bool wroteNewXlogData = false;
+/*
+ * state for continuous reading of the local servers wal for sending logical
+ * wal
+ */
+static XLogReaderState* xlogreader_state = 0;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t walsender_shutdown_requested = false;
@@ -131,8 +145,19 @@ static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogSend(char *msgbuf, bool *caughtup);
+static Size XLogSendPhysical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+ XLogRecPtr endptr);
+static Size XLogSendLogical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+ XLogRecPtr endptr);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd *cmd);
+static void StartLogicalReplication(StartLogicalReplicationCmd *cmd);
+
+static bool RecordRelevantForLogicalReplication(XLogReaderState* state, XLogRecord* r);
+static void ProcessRecord(XLogReaderState* state, XLogRecordBuffer* buf);
+static void WriteoutData(XLogReaderState* state, char* data, Size len);
+static void XLogReadPage(XLogReaderState* state, char *buf, XLogRecPtr startptr);
+
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
@@ -293,8 +318,10 @@ IdentifySystem(void)
char sysid[32];
char tli[11];
char xpos[MAXFNAMELEN];
+ char node_id[MAXFNAMELEN];//FIXME
+ char redoptr_s[MAXFNAMELEN];
XLogRecPtr logptr;
-
+ XLogRecPtr redoptr = GetRedoRecPtr();
/*
* Reply with a result set with one row, three columns. First col is
* system ID, second is timeline ID, and third is current xlog location.
@@ -309,9 +336,14 @@ IdentifySystem(void)
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
+ snprintf(node_id, sizeof(node_id), "%i", guc_replication_origin_id);
+
+ snprintf(redoptr_s, sizeof(redoptr_s), "%X/%X",
+ redoptr.xlogid, redoptr.xrecoff);
+
/* Send a RowDescription message */
pq_beginmessage(&buf, 'T');
- pq_sendint(&buf, 3, 2); /* 3 fields */
+ pq_sendint(&buf, 5, 2); /* 5 fields */
/* first field */
pq_sendstring(&buf, "systemid"); /* col name */
@@ -332,24 +364,47 @@ IdentifySystem(void)
pq_sendint(&buf, 0, 2); /* format code */
/* third field */
- pq_sendstring(&buf, "xlogpos");
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
- pq_sendint(&buf, TEXTOID, 4);
- pq_sendint(&buf, -1, 2);
- pq_sendint(&buf, 0, 4);
- pq_sendint(&buf, 0, 2);
+ pq_sendstring(&buf, "xlogpos"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fourth field */
+ pq_sendstring(&buf, "node_id"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, INT4OID, 4); /* type oid */
+ pq_sendint(&buf, 4, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* fifth field */
+ pq_sendstring(&buf, "last_checkpoint"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
pq_endmessage(&buf);
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 3, 2); /* # of columns */
+ pq_sendint(&buf, 5, 2); /* # of columns */
pq_sendint(&buf, strlen(sysid), 4); /* col1 len */
pq_sendbytes(&buf, (char *) &sysid, strlen(sysid));
pq_sendint(&buf, strlen(tli), 4); /* col2 len */
pq_sendbytes(&buf, (char *) tli, strlen(tli));
pq_sendint(&buf, strlen(xpos), 4); /* col3 len */
pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
+ pq_sendint(&buf, strlen(node_id), 4); /* col4 len */
+ pq_sendbytes(&buf, (char *)node_id, strlen(node_id));
+ pq_sendint(&buf, strlen(redoptr_s), 4); /* col5 len */
+ pq_sendbytes(&buf, (char *)redoptr_s, strlen(redoptr_s));
pq_endmessage(&buf);
@@ -432,6 +487,8 @@ StartReplication(StartReplicationCmd *cmd)
pq_endmessage(&buf);
pq_flush();
+ am_doing_logical = false;
+
/*
* Initialize position to the received one, then the xlog records begin to
* be shipped from that position
@@ -440,6 +497,56 @@ StartReplication(StartReplicationCmd *cmd)
}
/*
+ * START_LOGICAL_REPLICATION
+ */
+static void
+StartLogicalReplication(StartLogicalReplicationCmd *cmd)
+{
+ StringInfoData buf;
+
+ /* XXX: see above */
+ MarkPostmasterChildWalSender();
+ SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+
+ /* XXX: see above*/
+ if (am_cascading_walsender && !RecoveryInProgress())
+ {
+ ereport(LOG,
+ (errmsg("terminating walsender process to force cascaded standby "
+ "to update timeline and reconnect")));
+ walsender_ready_to_stop = true;
+ }
+
+ /* XXX: see above*/
+ WalSndSetState(WALSNDSTATE_CATCHUP);
+
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+ pq_flush();
+
+ am_doing_logical = true;
+
+ sentPtr = cmd->startpoint;
+
+ if(!xlogreader_state){
+ xlogreader_state = XLogReaderAllocate();
+ xlogreader_state->is_record_interesting = RecordRelevantForLogicalReplication;
+ xlogreader_state->finished_record = ProcessRecord;
+ xlogreader_state->writeout_data = WriteoutData;
+ xlogreader_state->read_page = XLogReadPage;
+
+ /* FIXME: it would probably better to handle this */
+ XLogReaderReset(xlogreader_state);
+ }
+
+ xlogreader_state->startptr = cmd->startpoint;
+ xlogreader_state->curptr = cmd->startpoint;
+}
+
+/*
* Execute an incoming replication command.
*/
static bool
@@ -483,6 +590,13 @@ HandleReplicationCommand(const char *cmd_string)
replication_started = true;
break;
+ case T_StartLogicalReplicationCmd:
+ StartLogicalReplication((StartLogicalReplicationCmd *) cmd_node);
+
+ /* break out of the loop */
+ replication_started = true;
+ break;
+
case T_BaseBackupCmd:
SendBaseBackup((BaseBackupCmd *) cmd_node);
@@ -1071,54 +1185,142 @@ retry:
p += readbytes;
}
- /*
- * After reading into the buffer, check that what we read was valid. We do
- * this after reading, because even though the segment was present when we
- * opened it, it might get recycled or removed while we read it. The
- * read() succeeds in that case, but the data we tried to read might
- * already have been overwritten with new WAL records.
- */
- XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startptr, log, seg);
- if (log < lastRemovedLog ||
- (log == lastRemovedLog && seg <= lastRemovedSeg))
- {
- char filename[MAXFNAMELEN];
+ if(node_id == InvalidMultimasterNodeId){
+ /*
+ * After reading into the buffer, check that what we read was valid. We
+ * do this after reading, because even though the segment was present
+ * when we opened it, it might get recycled or removed while we read
+ * it. The read() succeeds in that case, but the data we tried to read
+ * might already have been overwritten with new WAL records.
+ */
+ XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
+ XLByteToSeg(startptr, log, seg);
+ if (log < lastRemovedLog ||
+ (log == lastRemovedLog && seg <= lastRemovedSeg))
+ {
+ char filename[MAXFNAMELEN];
- XLogFileName(filename, ThisTimeLineID, log, seg);
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("requested WAL segment %s has already been removed",
- filename)));
+ XLogFileName(filename, ThisTimeLineID, log, seg);
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("requested WAL segment %s has already been removed",
+ filename)));
+ }
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with
+ * the file of the same name retrieved from archive. So we always need
+ * to check what we read was valid after reading into the buffer. If
+ * it's invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendFile >= 0)
+ {
+ close(sendFile);
+ sendFile = -1;
+
+ goto retry;
+ }
+ }
}
+ else{
+ /* FIXME: check shm? */
+ }
+}
+static bool
+RecordRelevantForLogicalReplication(XLogReaderState* state, XLogRecord* r){
/*
- * During recovery, the currently-open WAL file might be replaced with the
- * file of the same name retrieved from archive. So we always need to
- * check what we read was valid after reading into the buffer. If it's
- * invalid, we try to open and read the file again.
+ * For now we only send out data that are originating locally which implies
+ * a start topology between all nodes. Later we might support more
+ * complicated models. For that filtering positively by wanted IDs sounds
+ * like a better idea.
*/
- if (am_cascading_walsender)
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
- bool reload;
+ if(r->xl_origin_id != current_replication_origin_id)
+ return false;
+
+ switch(r->xl_rmid){
+ case RM_HEAP_ID:
+ case RM_HEAP2_ID:
+ case RM_XACT_ID:
+ case RM_XLOG_ID:
+ /* FIXME: filter additionally */
+ return true;
+ default:
+ return false;
+ }
+}
- SpinLockAcquire(&walsnd->mutex);
- reload = walsnd->needreload;
- walsnd->needreload = false;
- SpinLockRelease(&walsnd->mutex);
- if (reload && sendFile >= 0)
- {
- close(sendFile);
- sendFile = -1;
+static void
+XLogReadPage(XLogReaderState* state, char *buf, XLogRecPtr startptr)
+{
+ XLogPageHeader page_header;
- goto retry;
- }
+ Assert((startptr.xrecoff % XLOG_BLCKSZ) == 0);
+
+ /* elog(LOG, "Reading from %X/%X", startptr.xlogid, startptr.xrecoff); */
+
+ /* FIXME: more sensible implementation */
+ XLogRead(buf, InvalidMultimasterNodeId, startptr, XLOG_BLCKSZ);
+
+ page_header = (XLogPageHeader)buf;
+
+ if(page_header->xlp_magic != XLOG_PAGE_MAGIC){
+ elog(FATAL, "page header magic %x, should be %x", page_header->xlp_magic,
+ XLOG_PAGE_MAGIC);
}
}
+static void
+ProcessRecord(XLogReaderState* state, XLogRecordBuffer* buf){
+ //FIXME: process table relfilenode reassignments here
+}
+
+static void WriteoutData(XLogReaderState* state, char* data, Size len){
+ //FIXME: state->nbytes shouldn't be used in here
+ /* we want to writeout zeros */
+ if(data == 0)
+ memset((char*)state->private_data + state->nbytes, 0, len);
+ else
+ memcpy((char*)state->private_data + state->nbytes, data, len);
+ state->nbytes += len;
+}
+
+static Size
+XLogSendLogical(char *msgbuf, bool *caughtup, XLogRecPtr startptr,
+ XLogRecPtr endptr)
+{
+#ifdef BUGGY
+ if(!xlogreader_state->incomplete){
+ XLogReaderReset(xlogreader_state);
+ xlogreader_state->startptr = startptr;
+ xlogreader_state->curptr = startptr;
+ }
+#endif
+
+ xlogreader_state->endptr = endptr;
+ xlogreader_state->private_data = msgbuf;
+ xlogreader_state->nbytes = 0;//FIXME: this should go
+
+ XLogReaderRead(xlogreader_state);
+
+ //FIXME
+ sentPtr = xlogreader_state->curptr;
+
+ return xlogreader_state->nbytes;
+}
+
/*
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and buffer it in the libpq output
@@ -1136,10 +1338,11 @@ static void
XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
- XLogRecPtr startptr;
- XLogRecPtr endptr;
- Size nbytes;
+ XLogRecPtr startptr = sentPtr;
+ XLogRecPtr endptr = sentPtr;
+
WalDataMessageHeader msghdr;
+ Size nbytes = 0;
/*
* Attempt to send all data that's already been written out and fsync'd to
@@ -1155,44 +1358,17 @@ XLogSend(char *msgbuf, bool *caughtup)
if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
+#if 0
+ elog(LOG, "caughtup %X/%X", SendRqstPtr.xlogid, SendRqstPtr.xrecoff);
+#endif
return;
}
- /*
- * Figure out how much to send in one message. If there's no more than
- * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
- * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
- *
- * The rounding is not only for performance reasons. Walreceiver relies on
- * the fact that we never split a WAL record across two messages. Since a
- * long WAL record is split at page boundary into continuation records,
- * page boundary is always a safe cut-off point. We also assume that
- * SendRqstPtr never points to the middle of a WAL record.
- */
- startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log segment
- * in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
-
- endptr = startptr;
+ /* FIXME: this is duplicated in physical transport */
XLByteAdvance(endptr, MAX_SEND_SIZE);
- if (endptr.xlogid != startptr.xlogid)
- {
- /* Don't cross a logfile boundary within one message */
- Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xlogid = startptr.xlogid;
- endptr.xrecoff = XLogFileSize;
- }
/* if we went beyond SendRqstPtr, back off */
- if (XLByteLE(SendRqstPtr, endptr))
- {
+ if (XLByteLE(SendRqstPtr, endptr)){
endptr = SendRqstPtr;
*caughtup = true;
}
@@ -1203,34 +1379,39 @@ XLogSend(char *msgbuf, bool *caughtup)
*caughtup = false;
}
- nbytes = endptr.xrecoff - startptr.xrecoff;
- Assert(nbytes <= MAX_SEND_SIZE);
-
/*
* OK to read and send the slice.
*/
msgbuf[0] = 'w';
- /*
- * Read the log directly into the output buffer to avoid extra memcpy
- * calls.
- */
- XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), InvalidMultimasterNodeId,
- startptr, nbytes);
+ nbytes += 1 + sizeof(WalDataMessageHeader);
+
+ if(am_doing_logical)
+ nbytes += XLogSendLogical(msgbuf + nbytes, caughtup, sentPtr, endptr);
+ else
+ nbytes += XLogSendPhysical(msgbuf + nbytes, caughtup, sentPtr, endptr);
+
+#if 0
+ elog(LOG, "setting sentPtr to %X/%X, SendRqstPtr %X/%X, endptr %X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff,
+ SendRqstPtr.xlogid, SendRqstPtr.xrecoff,
+ endptr.xlogid, endptr.xrecoff);
+#endif
/*
* We fill the message header last so that the send timestamp is taken as
* late as possible.
*/
msghdr.dataStart = startptr;
- msghdr.walEnd = SendRqstPtr;
+ msghdr.walEnd = sentPtr;
msghdr.sendTime = GetCurrentTimestamp();
+
memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
- pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
+ pq_putmessage_noblock('d', msgbuf,
+ nbytes);
- sentPtr = endptr;
/* Update shared memory status */
{
@@ -1251,8 +1432,59 @@ XLogSend(char *msgbuf, bool *caughtup)
sentPtr.xlogid, sentPtr.xrecoff);
set_ps_display(activitymsg, false);
}
+}
+
+static Size
+XLogSendPhysical(char *msgbuf, bool *caughtup, XLogRecPtr startptr, XLogRecPtr endptr){
+ Size nbytes;
+
+ /*
+ * Figure out how much to send in one message. If there's no more than
+ * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
+ * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
+ *
+ * The rounding is not only for performance reasons. Walreceiver relies on
+ * the fact that we never split a WAL record across two messages. Since a
+ * long WAL record is split at page boundary into continuation records,
+ * page boundary is always a safe cut-off point. We also assume that
+ * endptr never points to the middle of a WAL record.
+ */
+ startptr = sentPtr;
+ if (startptr.xrecoff >= XLogFileSize)
+ {
+ /*
+ * crossing a logid boundary, skip the non-existent last log segment
+ * in previous logical log file.
+ *
+ * FIXME: Isn't getting to that point a bug in the XLByte arithmetic?
+ */
+ startptr.xlogid += 1;
+ startptr.xrecoff = 0;
+ }
+
+ endptr = startptr;
+ XLByteAdvance(endptr, MAX_SEND_SIZE);
+ if (endptr.xlogid != startptr.xlogid)
+ {
+ /* Don't cross a logfile boundary within one message */
+ Assert(endptr.xlogid == startptr.xlogid + 1);
+ endptr.xlogid = startptr.xlogid;
+ endptr.xrecoff = XLogFileSize;
+ }
+
+
+ nbytes = endptr.xrecoff - startptr.xrecoff;
+ Assert(nbytes <= MAX_SEND_SIZE);
+
+ /*
+ * Read the log directly into the output buffer to avoid extra memcpy
+ * calls.
+ */
+ XLogRead(msgbuf, InvalidMultimasterNodeId, startptr, nbytes);
+
+ sentPtr = endptr;
- return;
+ return nbytes;
}
/*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 46b0657..6a58f96 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -3058,6 +3058,15 @@ static struct config_string ConfigureNamesString[] =
},
{
+ {"multimaster_conninfo", PGC_POSTMASTER, REPLICATION_MASTER,
+ gettext_noop("Connection string to upstream master."),
+ NULL
+ },
+ &mm_conninfo,
+ 0, NULL, NULL, NULL
+ },
+
+ {
{"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE,
gettext_noop("Sets default text search configuration."),
NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 12f8a3f..240c13d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -243,6 +243,7 @@
# - Multi Master Servers -
+#multimaster_conninfo = 'host=myupstreammaster'
#multimaster_node_id = 0 #invalid node id
#------------------------------------------------------------------------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 1e16088..78b2f5f 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -403,6 +403,7 @@ typedef enum NodeTag
T_IdentifySystemCmd,
T_BaseBackupCmd,
T_StartReplicationCmd,
+ T_StartLogicalReplicationCmd,
/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 236a36d..fee111c 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -49,4 +49,14 @@ typedef struct StartReplicationCmd
XLogRecPtr startpoint;
} StartReplicationCmd;
+/* ----------------------
+ * START_LOGICAL_REPLICATION command
+ * ----------------------
+ */
+typedef struct StartLogicalReplicationCmd
+{
+ NodeTag type;
+ XLogRecPtr startpoint;
+} StartLogicalReplicationCmd;
+
#endif /* REPLNODES_H */
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 8f44fad..fc9e120 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -13,6 +13,10 @@
#include "access/xlogdefs.h"
+/* user settable parameters for multi-master in postmaster */
+extern char *mm_conninfo; /* copied in walreceiver.h also */
+#define LogicalWalReceiverActive() (mm_conninfo != NULL)
+
extern int guc_replication_origin_id;
extern RepNodeId current_replication_origin_id;
extern XLogRecPtr current_replication_origin_lsn;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index c9ab1be..b565190 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -22,6 +22,7 @@
extern bool am_walreceiver;
extern int wal_receiver_status_interval;
extern bool hot_standby_feedback;
+extern RepNodeId receiving_from_node_id;
/*
* MAXCONNINFO: maximum size of a connection string.
@@ -38,9 +39,9 @@ extern bool hot_standby_feedback;
*/
typedef enum
{
- WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
+ WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_RUNNING, /* walreceiver is running */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
@@ -55,6 +56,7 @@ typedef struct
*/
pid_t pid;
WalRcvState walRcvState;
+ bool startedDuringRecovery;
pg_time_t startTime;
/*
@@ -108,9 +110,12 @@ typedef struct
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
-typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr* redo, XLogRecPtr* where_at, bool startedDuringRecovery);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
+typedef bool (*walrcv_start_type) (char *conninfo, XLogRecPtr* startpoint, bool startedDuringRecovery);
+extern PGDLLIMPORT walrcv_start_type walrcv_start;
+
typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
char **buffer, int *len);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
We decided to use low level functions to do the apply instead of producing sql
statements containing the data (or using prepared statements) because both, the
text conversion and the full executor overhead aren't introduce a significant
overhead which is unneccesary if youre using the same version of pg on the same
architecture.
There are loads of use cases though that require different methods of applyin
though - so the part doing the applying from an ApplyCache is just a bunch of
well abstracted callbacks getting passed all the required knowledge to change
the data representation into other formats.
Missing:
- TOAST handling. For physical apply not much needs to be done because the
toast inserts will have been made beforehand. There needs to be an option in
ApplyCache that helps reassembling TOAST datums to make it easier to write
apply modules which convert to text.
---
src/backend/replication/logical/Makefile | 2 +-
src/backend/replication/logical/apply.c | 313 ++++++++++++++++++++++++++++++
src/include/replication/apply.h | 24 +++
3 files changed, 338 insertions(+), 1 deletion(-)
create mode 100644 src/backend/replication/logical/apply.c
create mode 100644 src/include/replication/apply.h
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index c2d6d82..d0e0b13 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,6 +14,6 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
-OBJS = applycache.o decode.o logical.o
+OBJS = apply.o applycache.o decode.o logical.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/apply.c b/src/backend/replication/logical/apply.c
new file mode 100644
index 0000000..646bd54
--- /dev/null
+++ b/src/backend/replication/logical/apply.c
@@ -0,0 +1,313 @@
+/*-------------------------------------------------------------------------
+ *
+ * logical.c
+ *
+ * Support functions for logical/multimaster replication
+ *
+ *
+ * Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical.c
+ *
+ */
+#include "postgres.h"
+
+#include "access/xact.h"
+#include "access/heapam.h"
+#include "access/genam.h"
+
+#include "catalog/pg_control.h"
+#include "catalog/index.h"
+
+#include "executor/executor.h"
+
+#include "replication/applycache.h"
+#include "replication/apply.h"
+
+#include "utils/rel.h"
+#include "utils/snapmgr.h"
+#include "utils/lsyscache.h"
+
+
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple);
+
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+ ApplyApplyCacheState *state = cache->private_data;
+
+ state->original_resource_owner = CurrentResourceOwner;
+
+ PreventTransactionChain(true, "Apply Process cannot be started inside a txn");
+
+ StartTransactionCommand();
+
+ PushActiveSnapshot(GetTransactionSnapshot());
+}
+
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn)
+{
+ ApplyApplyCacheState *state = cache->private_data;
+
+ current_replication_origin_lsn = txn->lsn;
+
+ PopActiveSnapshot();
+ CommitTransactionCommand();
+
+
+ /*
+ * for some reason after (Start|Commit)TransactionCommand we loose our
+ * resource owner, restore it.
+ * XXX: is that correct?
+ */
+ CurrentResourceOwner = state->original_resource_owner;
+
+ current_replication_origin_lsn.xlogid = 0;
+ current_replication_origin_lsn.xrecoff = 0;
+}
+
+
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change)
+{
+ /* for inserting */
+ Relation tuple_rel;
+
+ tuple_rel = heap_open(HeapTupleGetOid(change->table), RowExclusiveLock);
+
+ switch (change->action)
+ {
+ case APPLY_CACHE_CHANGE_INSERT:
+ {
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "INSERT");
+#endif
+ simple_heap_insert(tuple_rel, &change->newtuple->tuple);
+
+ UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple);
+ break;
+ }
+ case APPLY_CACHE_CHANGE_UPDATE:
+ {
+ Oid indexoid = InvalidOid;
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+
+ ScanKeyData cur_skey[INDEX_MAX_KEYS];
+ int i;
+ bool isnull;
+ TupleDesc desc = RelationGetDescr(tuple_rel);
+
+ Relation index_rel;
+
+ HeapTuple old_tuple;
+ bool found = false;
+
+ IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "UPDATE");
+#endif
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+
+ relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+ if (!OidIsValid(indexoid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("there is no primary key for table \"%s\"",
+ RelationGetRelationName(tuple_rel))));
+
+ index_rel = index_open(indexoid, AccessShareLock);
+
+ for (i = 0; i < pknratts; i++)
+ {
+ Oid operator;
+ Oid opfamily;
+ RegProcedure regop;
+
+ opfamily = get_opclass_family(pkopclass[i]);
+
+ operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+ regop = get_opcode(operator);
+
+ ScanKeyInit(&cur_skey[i],
+ pkattnum[i],
+ BTEqualStrategyNumber,
+ regop,
+ fastgetattr(&change->newtuple->tuple, pkattnum[i], desc, &isnull));
+
+ Assert(!isnull);
+ }
+
+ scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(),
+ pknratts, 0);
+ index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+ while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ if (found)
+ {
+ elog(ERROR, "WTF, more than one tuple found via pk???");
+ }
+ found = true;
+
+ simple_heap_update(tuple_rel, &old_tuple->t_self, &change->newtuple->tuple);
+ }
+
+ if (!found)
+ elog(ERROR, "could not find tuple to update");
+
+ index_endscan(scan);
+
+ if (!HeapTupleIsHeapOnly(&change->newtuple->tuple))
+ UserTableUpdateIndexes(tuple_rel, &change->newtuple->tuple);
+
+ heap_close(index_rel, NoLock);
+
+ break;
+ }
+ case APPLY_CACHE_CHANGE_DELETE:
+ {
+ Oid indexoid = InvalidOid;
+ int16 pknratts;
+ int16 pkattnum[INDEX_MAX_KEYS];
+ Oid pktypoid[INDEX_MAX_KEYS];
+ Oid pkopclass[INDEX_MAX_KEYS];
+
+ ScanKeyData cur_skey[INDEX_MAX_KEYS];
+ int i;
+ bool isnull;
+
+ Relation index_rel;
+
+ HeapTuple old_tuple;
+ bool found = false;
+
+ TupleDesc index_desc;
+
+ IndexScanDesc scan;
+
+#ifdef VERBOSE_DEBUG
+ elog(LOG, "DELETE comming");
+#endif
+ MemSet(pkattnum, 0, sizeof(pkattnum));
+ MemSet(pktypoid, 0, sizeof(pktypoid));
+ MemSet(pkopclass, 0, sizeof(pkopclass));
+
+ relationFindPrimaryKey(tuple_rel, &indexoid, &pknratts, pkattnum, pktypoid, pkopclass);
+
+ if (!OidIsValid(indexoid))
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("there is no primary key for table \"%s\"",
+ RelationGetRelationName(tuple_rel))));
+
+ index_rel = index_open(indexoid, AccessShareLock);
+ index_desc = RelationGetDescr(index_rel);
+
+ for (i = 0; i < pknratts; i++)
+ {
+ Oid operator;
+ Oid opfamily;
+ RegProcedure regop;
+
+ opfamily = get_opclass_family(pkopclass[i]);
+
+ operator = get_opfamily_member(opfamily, pktypoid[i], pktypoid[i], BTEqualStrategyNumber);
+
+ regop = get_opcode(operator);
+
+ ScanKeyInit(&cur_skey[i],
+ pkattnum[i],
+ BTEqualStrategyNumber,
+ regop,
+ fastgetattr(&change->oldtuple->tuple, i + 1, index_desc, &isnull));
+
+ Assert(!isnull);
+ }
+
+ scan = index_beginscan(tuple_rel, index_rel, GetTransactionSnapshot(),
+ pknratts, 0);
+ index_rescan(scan, cur_skey, pknratts, NULL, 0);
+
+
+ while ((old_tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ if (found)
+ {
+ elog(ERROR, "WTF, more than one tuple found via pk???");
+ }
+ found = true;
+ simple_heap_delete(tuple_rel, &old_tuple->t_self);
+ }
+
+ if (!found)
+ elog(ERROR, "could not find tuple to update");
+
+ index_endscan(scan);
+
+ heap_close(index_rel, NoLock);
+
+ break;
+ }
+ }
+ /* FIXME: locking */
+
+ heap_close(tuple_rel, NoLock);
+ CommandCounterIncrement();
+}
+
+/*
+ * The state object used by CatalogOpenIndexes and friends is actually the
+ * same as the executor's ResultRelInfo, but we give it another type name
+ * to decouple callers from that fact.
+ */
+typedef struct ResultRelInfo *UserTableIndexState;
+
+static void
+UserTableUpdateIndexes(Relation heapRel, HeapTuple heapTuple)
+{
+ /* this is largely copied together from copy.c's CopyFrom */
+ EState *estate = CreateExecutorState();
+ ResultRelInfo *resultRelInfo;
+ List *recheckIndexes = NIL;
+ TupleDesc tupleDesc = RelationGetDescr(heapRel);
+
+ resultRelInfo = makeNode(ResultRelInfo);
+ resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
+ resultRelInfo->ri_RelationDesc = heapRel;
+ resultRelInfo->ri_TrigInstrument = NULL;
+
+ ExecOpenIndices(resultRelInfo);
+
+ estate->es_result_relations = resultRelInfo;
+ estate->es_num_result_relations = 1;
+ estate->es_result_relation_info = resultRelInfo;
+
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ TupleTableSlot *slot = ExecInitExtraTupleSlot(estate);
+ ExecSetSlotDescriptor(slot, tupleDesc);
+ ExecStoreTuple(heapTuple, slot, InvalidBuffer, false);
+
+ recheckIndexes = ExecInsertIndexTuples(slot, &heapTuple->t_self,
+ estate);
+ }
+
+ ExecResetTupleTable(estate->es_tupleTable, false);
+
+ ExecCloseIndices(resultRelInfo);
+
+ FreeExecutorState(estate);
+ /* FIXME: recheck the indexes */
+ list_free(recheckIndexes);
+}
diff --git a/src/include/replication/apply.h b/src/include/replication/apply.h
new file mode 100644
index 0000000..3b818c0
--- /dev/null
+++ b/src/include/replication/apply.h
@@ -0,0 +1,24 @@
+/*
+ * apply.h
+ *
+ * PostgreSQL logical replay
+ *
+ * Portions Copyright (c) 2012, PostgreSQL Global Development Group
+ *
+ * src/include/replication/logical/replay.h
+ */
+#ifndef APPLY_H
+#define APPLY_H
+
+#include "utils/resowner.h"
+
+typedef struct ApplyApplyCacheState
+{
+ ResourceOwner original_resource_owner;
+} ApplyApplyCacheState;
+
+void apply_begin_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_commit_txn(ApplyCache* cache, ApplyCacheTXN* txn);
+void apply_change(ApplyCache* cache, ApplyCacheTXN* txn, ApplyCacheTXN* subtxn, ApplyCacheChange* change);
+
+#endif
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
One apply process currently can only apply changes from one database in another
cluster (with a specific node_id).
Currently synchronous_commit=off is statically set in the apply process because
after a crash we can safely recover all changes which we didn't apply so there
is no point of incurring the overhead of synchronous commits. This might be
problematic in combination with synchronous replication.
Missing/Todo:
- The foreign node_id currently is hardcoded (2, 1 depending on the local id)
as is the database (postgres). This obviously need to change.
- Proper mainloop with error handling, PROCESS_INTERRUPTS and everything
- Start multiple apply processes (per node_id per database)
- Possibly switch databases during runtime?
---
src/backend/postmaster/bgworker.c | 10 +-
src/backend/replication/logical/logical.c | 194 +++++++++++++++++++++++++++++
src/include/replication/logical.h | 3 +
3 files changed, 198 insertions(+), 9 deletions(-)
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 8144050..bbb7e86 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -52,6 +52,7 @@
#include "postmaster/bgworker.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
+#include "replication/logical.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/latch.h"
@@ -91,8 +92,6 @@ static void bgworker_sigterm_handler(SIGNAL_ARGS);
NON_EXEC_STATIC void BgWorkerMain(int argc, char *argv[]);
-static bool do_logicalapply(void);
-
/********************************************************************
* BGWORKER CODE
********************************************************************/
@@ -394,10 +393,3 @@ NumBgWorkers(void)
return numWorkers;
#endif
}
-
-static bool
-do_logicalapply(void)
-{
- elog(LOG, "doing logical apply");
- return false;
-}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 4f34488..7fadafe 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -13,7 +13,201 @@
*
*/
#include "postgres.h"
+
+#include "access/xlogreader.h"
+
+#include "replication/applycache.h"
#include "replication/logical.h"
+#include "replication/apply.h"
+#include "replication/decode.h"
+#include "replication/walreceiver.h"
+/*FIXME: XLogRead*/
+#include "replication/walsender_private.h"
+
+#include "storage/ipc.h"
+#include "storage/proc.h"
+
int guc_replication_origin_id = InvalidMultimasterNodeId;
RepNodeId current_replication_origin_id = InvalidMultimasterNodeId;
XLogRecPtr current_replication_origin_lsn = {0, 0};
+
+static XLogReaderState* xlogreader_state = 0;
+
+static bool
+replay_record_is_interesting(XLogReaderState* state, XLogRecord* r)
+{
+ /*
+ * we filtered in the sender, so no filtering necessary atm.
+ *
+ * If we want to introduce per-table filtering after a proxy or such this
+ * would be the place.
+ */
+ return true;
+}
+
+static void
+replay_writeout_data(XLogReaderState* state, char* data, Size len)
+{
+ /* no data needs to persists after this */
+ return;
+}
+
+static void
+replay_finished_record(XLogReaderState* state, XLogRecordBuffer* buf)
+{
+ ReaderApplyState* apply_state = state->private_data;
+ ApplyCache *cache = apply_state->apply_cache;
+
+ DecodeRecordIntoApplyCache(cache, buf);
+}
+
+static void
+replay_read_page(XLogReaderState* state, char* cur_page, XLogRecPtr startptr)
+{
+ XLogPageHeader page_header;
+
+ Assert((startptr.xrecoff % XLOG_BLCKSZ) == 0);
+
+ /* FIXME: more sensible/efficient implementation */
+ XLogRead(cur_page, receiving_from_node_id, startptr, XLOG_BLCKSZ);
+
+ page_header = (XLogPageHeader)cur_page;
+
+ if (page_header->xlp_magic != XLOG_PAGE_MAGIC)
+ {
+ elog(FATAL, "page header magic %x, should be %x at %X/%X", page_header->xlp_magic,
+ XLOG_PAGE_MAGIC, startptr.xlogid, startptr.xrecoff);
+ }
+}
+
+bool
+do_logicalapply(void)
+{
+ XLogRecPtr *from;
+ XLogRecPtr *to;
+ ReaderApplyState *apply_state;
+ int res;
+
+ static bool initialized = false;
+
+ if (!initialized)
+ {
+ /*
+ * FIXME: We need a sensible implementation for choosing this.
+ */
+ if (guc_replication_origin_id == 1)
+ {
+ receiving_from_node_id = 2;
+ }
+ else
+ {
+ receiving_from_node_id = 1;
+ }
+ }
+
+ ResetLatch(&MyProc->procLatch);
+
+ SpinLockAcquire(&WalRcv->mutex);
+ from = &WalRcv->mm_applyState[receiving_from_node_id];
+ to = &WalRcv->mm_receiveState[receiving_from_node_id];
+ SpinLockRelease(&WalRcv->mutex);
+
+ if (XLByteEQ(*to, zeroRecPtr)){
+ /* shmem state not ready, walreceivers didn't start up yet */
+ return false;
+ }
+
+ if (!initialized)
+ {
+ ApplyCache *apply_cache;
+
+ initialized = true;
+
+ elog(LOG, "at node %u were receiving from %u",
+ guc_replication_origin_id,
+ receiving_from_node_id);
+
+ /* FIXME: do we want to set that permanently? */
+ current_replication_origin_id = receiving_from_node_id;
+
+ /* we cannot loose anything due to this as we just restart replay */
+ SetConfigOption("synchronous_commit", "off",
+ PGC_SUSET, PGC_S_OVERRIDE);
+
+ WalRcv->mm_receiveLatch[receiving_from_node_id] = &MyProc->procLatch;
+
+ /* initialize xlogreader */
+ xlogreader_state = XLogReaderAllocate();
+ XLogReaderReset(xlogreader_state);
+
+ xlogreader_state->is_record_interesting = replay_record_is_interesting;
+ xlogreader_state->finished_record = replay_finished_record;
+ xlogreader_state->writeout_data = replay_writeout_data;
+ xlogreader_state->read_page = replay_read_page;
+ xlogreader_state->private_data = malloc(sizeof(ReaderApplyState));
+ if (!xlogreader_state->private_data)
+ elog(ERROR, "Could not allocate the ReaderApplyState struct");
+
+ xlogreader_state->startptr = *from;
+ xlogreader_state->curptr = *from;
+
+ apply_state = (ReaderApplyState*)xlogreader_state->private_data;
+
+ /*
+ * allocate an ApplyCache that will apply data using lowlevel calls
+ * without type conversion et al. This requires binary compatibility
+ * between both systems.
+ * XXX: This would be the place too hook different apply methods, like
+ * producing sql and applying it.
+ */
+ apply_cache = ApplyCacheAllocate();
+ apply_cache->begin = apply_begin_txn;
+ apply_cache->apply_change = apply_change;
+ apply_cache->commit = apply_commit_txn;
+ apply_state->apply_cache = apply_cache;
+
+ apply_cache->private_data = malloc(sizeof(ApplyApplyCacheState));
+ if (!apply_cache->private_data)
+ elog(ERROR, "Could not allocate the DecodeApplyCacheState struct");
+
+ elog(WARNING, "initialized");
+
+ }
+
+ if(XLByteLT(*to, *from))
+ {
+ goto wait;
+ }
+
+ xlogreader_state->endptr = *to;
+
+ XLogReaderRead(xlogreader_state);
+
+ SpinLockAcquire(&WalRcv->mutex);
+ /*
+ * FIXME: This is not enough to recover properly after a crash because we
+ * loose in-progress transactions. For that we need two pointers: One to
+ * remember which is the lsn we committed last and which is the lsn with
+ * the oldest, in-progress, transaction. Then we can start reading at the
+ * latter and just throw away everything which commits before the former.
+ */
+ WalRcv->mm_applyState[receiving_from_node_id] = xlogreader_state->curptr;
+ SpinLockRelease(&WalRcv->mutex);
+
+wait:
+ /*
+ * if we either need data to complete reading or have finished everything
+ * up to this point
+ */
+ if (xlogreader_state->needs_input || !xlogreader_state->incomplete)
+ {
+ res = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET|WL_POSTMASTER_DEATH, 0);
+ if (res & WL_POSTMASTER_DEATH)
+ {
+ elog(WARNING, "got deathsig");
+ proc_exit(0);
+ }
+ }
+ return true;
+}
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index fc9e120..aa19ab9 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -25,4 +25,7 @@ extern XLogRecPtr current_replication_origin_lsn;
#define MaxMultimasterNodeId (2<<3)
#define LCRDIR "pg_lcr"
+
+bool do_logicalapply(void);
+
#endif
--
1.7.10.rc3.3.g19a6c.dirty
From: Andres Freund <andres@anarazel.de>
---
src/backend/replication/logical/DESIGN | 209 ++++++++++++++++++++++++++++++++
1 file changed, 209 insertions(+)
create mode 100644 src/backend/replication/logical/DESIGN
diff --git a/src/backend/replication/logical/DESIGN b/src/backend/replication/logical/DESIGN
new file mode 100644
index 0000000..2cf08ff
--- /dev/null
+++ b/src/backend/replication/logical/DESIGN
@@ -0,0 +1,209 @@
+=== Design goals for logical replication ===:
+- in core
+- fast
+- async
+- robust
+- multi-master
+- modular
+- as unintrusive as possible implementation wise
+- basis for other technologies (sharding, replication into other DBMSs, ...)
+
+For reasons why we think this is an important set of features please check out
+the presentation from the in-core replication summit at pgcon:
+http://wiki.postgresql.org/wiki/File:BDR_Presentation_PGCon2012.pdf
+
+While you may argue that most of the above design goals are already provided by
+various trigger based replication solutions like Londiste or Slony, we think
+that thats not enough for various reasons:
+
+- not in core (and thus less trustworthy)
+- duplication of writes due to an additional log
+- performance in general (check the end of the above presentation)
+- complex to use because there is no native administration interface
+
+We want to emphasize that this proposed architecture is based on the experience
+of developing a minimal prototype which we developed with the above goals in
+mind. While we obviously hope that a good part of it is reusable for the
+community we definitely do *not* expect that the community accepts this
++as-is. It is intended to be the basis upon which we, the community, can build
+and design the future logical replication.
+
+=== Basic architecture ===:
+Very broadly speaking there are several major pieces common to most approaches
+to replication:
+1. Source data generation
+2. Transportation of that data
+3. Applying the changes
+4. Conflict resolution
+
+
+1.:
+
+As we need a change stream that contains all required changes in the correct
+order, the requirement for this stream to reflect changes across multiple
+concurrent backends raises concurrency and scalability issues. Reusing the
+WAL stream for this seems a good choice since it is needed anyway and adresses
+those issues already, and it further means that we don't incur duplicate
+writes. Any other stream generating componenent would introduce additional
+scalability issues.
+
+We need a change stream that contains all required changes in the correct order
+which thus needs to be synchronized across concurrent backends which introduces
+obvious concurrency/scalability issues.
+Reusing the WAL stream for this seems a good choice since it is needed anyway
+and adresses those issues already, and it further means we don't duplicate the
+writes and locks already performance for its maintenance.
+
+Unfortunately, in this case, the WAL is mostly a physical representation of the
+changes and thus does not, by itself, contain the necessary information in a
+convenient format to create logical changesets.
+
+The biggest problem is, that interpreting tuples in the WAL stream requires an
+up-to-date system catalog and needs to be done in a compatible backend and
+architecture. The requirement of an up-to-date catalog could be solved by
+adding more data to the WAL stream but it seems to be likely that that would
+require relatively intrusive & complex changes. Instead we chose to require a
+synchronized catalog at the decoding site. That adds some complexity to use
+cases like replicating into a different database or cross-version
+replication. For those it is relatively straight-forward to develop a proxy pg
+instance that only contains the catalog and does the transformation to textual
+changes.
+
+This also is the solution to the other big problem, the need to work around
+architecture/version specific binary formats. The alternative, producing
+cross-version, cross-architecture compatible binary changes or even moreso
+textual changes all the time seems to be prohibitively expensive. Both from a
+cpu and a storage POV and also from the point of implementation effort.
+
+The catalog on the site where changes originate can *not* be used for the
+decoding because at the time we decode the WAL the catalog may have changed
+from the state it was in when the WAL was generated. A possible solution for
+this would be to have a fully versioned catalog but that again seems to be
+rather complex and intrusive.
+
+For some operations (UPDATE, DELETE) and corner-cases (e.g. full page writes)
+additional data needs to be logged, but the additional amount of data isn't
+that big. Requiring a primary-key for any change but INSERT seems to be a
+sensible thing for now. The required changes are fully contained in heapam.c
+and are pretty simple so far.
+
+2.:
+
+For transport of the non-decoded data from the originating site to the decoding
+site we decided to reuse the infrastructure already provided by
+walsender/walreceiver. We introduced a new command that, analogous to
+START_REPLICATION, is called START_LOGICAL_REPLICATION that will stream out all
+xlog records that pass through a filter.
+
+The on-the-wire format stays the same. The filter currently simply filters out
+all record which are not interesting for logical replication (indexes,
+freezing, ...) and records that did not originate on the same system.
+
+The requirement of filtering by 'origin' of a wal node comes from the planned
+multimaster support. Changes replayed locally that originate from another site
+should not replayed again there. If the wal is plainly used without such a
+filter that would cause loops. Instead we tag every wal record with the "node
+id" of the site that caused the change to happen and changes with a nodes own
+"node id" won't get applied again.
+
+Currently filtered records get simply replaced by NOOP records and loads of
+zeroes which obviously is not a sensible solution. The difficulty of actually
+removing the records is that that would change the LSNs. We currently rely on
+those though.
+
+The filtering might very well get expanded to support partial replication and
+such in future.
+
+
+3.:
+
+To sensibly apply changes out of the WAL stream we need to solve two things:
+Reassemble transactions and apply them to the target database.
+
+The logical stream from 1. via 2. consists out of individual changes identified
+by the relfilenode of the table and the xid of the transaction. Given
+(sub)transactions, rollbacks, crash recovery, subtransactions and the like
+those changes obviously cannot be individually applied without fully loosing
+the pretence of consistency. To solve that we introduced a module, dubbed
+ApplyCache which does the reassembling. This module is *independent* of the
+data source and of the method of applying changes so it can be reused for
+replicating into a foreign system or similar.
+
+Due to the overhead of planner/executor/toast reassembly/type conversion (yes,
+we benchmarked!) we decided against statement generation for apply. Even when
+using prepared statements the overhead is rather noticeable.
+
+Instead we decided to use relatively lowlevel heapam.h/genam.h accesses to do
+the apply. For now we decided to use only one process to do the applying,
+parallelizing that seems to be too complex for an introduction of an already
+complex feature.
+In our tests the apply process could keep up with pgbench -c/j 20+ generating
+changes. This will obviously heavily depend on the workload. A fully seek bound
+workload will definitely not scale that well.
+
+Just to reiterate: Plugging in another method to do the apply should be a
+relatively simple matter of setting up three callbacks to a different function
+(begin, apply_change, commit).
+
+Another complexity in this is how to synchronize the catalogs. We plan to use
+command/event triggers and the oid preserving features from pg_upgrade to keep
+the catalogs in-sync. We did not start working on that.
+
+
+4.:
+
+While we started to think about conflict resolution/avoidance we did not start
+to work on it. We currently *cannot* handle conflicts. We think that the base
+features/architecture should be aggreed uppon before starting with it.
+
+Multimaster tests were done with sequences setup with INCREMENT 2 and different
+start values on the two nodes.
+
+=== Current Prototype ===
+
+The current prototype consists of a series of patches that are split in
+hopefully sensible and coherent parts to make reviewing of individual parts
+possible.
+
+Its also available in the 'cabal-rebasing' branch on
+git.postgresql.org/users/andresfreund/postgres.git . That branch will modify
+history though.
+
+01: wakeup handling: reduces replication lag, not very interesting in this context
+
+02: Add zeroRecPtr: not very interesting either
+
+03: new syscache for relfilenode. This would benefit by some syscache experienced eyes
+
+04: embedded lists: This is a general facility, general review appreciated
+
+05: preliminary bgworker support: This is not ready and just posted as its
+ preliminary work for the other patches. Simon will post a real patch soon
+
+06: XLogReader: Review definitely appreciated
+
+07: logical data additions for WAL: Review definitely appreciated, I do not expect fundamental changes
+
+08: ApplyCache: Important infrastructure for the patch, review definitely appreciated
+
+09: Wal Decoding: Decode WAL generated with wal_level=logical into an ApplyCache
+
+10: WAL with 'origin node': This is another important base-piece for logical rep
+
+11: WAL segment handling changes: If the basic idea of adding a node_id to the
+ functions and adding a pg_lcr directory is acceptable the rest of the patch is
+ fairly boring/mechanical
+
+12: walsender/walreceiver changes: Implement transport/filtering of logical
+ changes. Very relevant
+
+13: shared memory/crash recovery state handling for logical rep: Very relevant
+ minus the TODO's in the commit message
+
+14: apply module: review appreciated
+
+15: apply process: somewhat dependent on the preliminary changes in 05, general
+ direction is visible, loads of detail work needed as soon as some design
+ decisions are agreed uppon.
+
+16: this document. Not very interesting after youve read it ;)
--
1.7.10.rc3.3.g19a6c.dirty
On Wed, Jun 13, 2012 at 6:28 AM, Andres Freund <andres@2ndquadrant.com> wrote:
+synchronized catalog at the decoding site. That adds some complexity to use +cases like replicating into a different database or cross-version +replication. For those it is relatively straight-forward to develop a proxy pg +instance that only contains the catalog and does the transformation to textual +changes.
wow. Anyways, could you elaborate on a little on how this proxy
instance concept would work? Let's take the case where I have N
small-ish schema identical database shards that I want to aggregate
into a single warehouse -- something that HS/SR currently can't do.
There's a lot of ways to do that obviously but assuming the warehouse
would have to have a unique schema, could it be done in your
architecture?
merlin
Hi Merlin,
On Wednesday, June 13, 2012 04:21:12 PM Merlin Moncure wrote:
On Wed, Jun 13, 2012 at 6:28 AM, Andres Freund <andres@2ndquadrant.com>
wrote:
+synchronized catalog at the decoding site. That adds some complexity to use +cases like replicating into a different database or cross-version +replication. For those it is relatively straight-forward to develop a proxy pg +instance that only contains the catalog and does the transformation to textual +changes.wow. Anyways, could you elaborate on a little on how this proxy
instance concept would work?
To do the decoding into another form you need an up2date catalog + correct
binaries. So the idea would be to have a minimal instance which is just a copy
of the database with all the tables with an oid < FirstNormalObjectId i.e.
only the catalog tables. Then you can apply all xlog changes on system tables
using the existing infrastructure for HS (or use the command trigger
equivalent we need to build for BDR) and decode everything else into the
ApplyCache just as done in the patch. Then you would fill out the callbacks
for the ApplyCache (see patch 14/16 and 15/16 for an example) to do whatever
you want with the data. I.e. generate plain sql statements or run some
transform procedure.
Let's take the case where I have N small-ish schema identical database
shards that I want to aggregate into a single warehouse -- something that
HS/SR currently can't do.
There's a lot of ways to do that obviously but assuming the warehouse
would have to have a unique schema, could it be done in your
architecture?
Not sure what you mean by the warehouse having a unique schema? It has the
same schema as the OLTP counterparts? That would obviously be the easy case if
you take care and guarantee uniqueness of keys upfront. That basically would
be trivial ;)
It gets a bit more complex if you need to transform the data for the
warehouse. I don't plan to put in work to make that possible without some C
coding (filling out the callbacks and doing the work in there). It shouldn't
need much though.
Does that answer your question?
Andres
--
Andres Freund http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Training & Services
Andres Freund <andres@2ndquadrant.com> wrote:
This adds a new wal_level value 'logical'
Missing cases:
- heap_multi_insert
- primary key changes for updates
- no primary key
- LOG_NEWPAGE
First, Wow!
I look forward to the point where we can replace our trigger-based
replication with this! Your "missing cases" for primary key issues
would not cause us any pain for our current system, since we require
a primary key and don't support updates to PKs for replicated
tables. While I don't expect that the first cut of this will be able
to replace our replication-related functionality, I'm interested in
making sure it can be extended in that direction, so I have a couple
things to consider:
(1) For our usage, with dozens of source databases feeding into
multiple aggregate databases and interfaces, DDL replication is not
of much if any interest. It should be easy enough to ignore as long
as it is low volume, so that doesn't worry me too much; but if I'm
missing something any you run across any logical WAL logging for DDL
which does generate a lot of WAL traffic, it would be nice to have a
way to turn that off at generation time rather than filtering it or
ignoring it later. (Probably won't be an issue, just a head-up.)
(2) To match the functionality we now have, we would need the
logical stream to include the *before* image of the whole tuple for
each row updated or deleted. I understand that this is not needed
for the use cases you are initially targeting; I just hope the
design leaves this option open without needing to disturb other use
cases. Perhaps this would require yet another wal_level value.
Perhaps rather than testing the current value directly for
determining whether to log something, the GUC processing could set
some booleans for faster testing and less code churn when the
initial implementation is expanded to support other use cases (like
ours).
(3) Similar to point 2, it would be extremely desirable to be able
to determine table name and columns names for the tuples in a stream
from that stream, without needing to query a hot standby or similar
digging into other sources of information. Not only will the
various source databases all have different OID values for the same
objects, and the aggregate targets have different values from each
other and the sources, but some targets don't have the tables at
all. I'm talking about our database transaction repository and the
interfaces to business partners which we currently drive off of the
same transaction stream which drives replication.
Would it be helpful or just a distraction if I were to provide a
more detailed description of our whole replication / transaction
store / interface area?
If it would be useful, I could also describe some other replication
patterns I have seen over the years. In particular, one which might
be interesting is where subsets of the data are distributed to
multiple standalone machines which have intermittent or unreliable
connections to a central site, which periodically collects data from
all the remote sites, recalculates distribution, and sends
transactions back out to those remote sites to add, remove, and
update rows based on the distribution rules and the new data.
-Kevin