diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index c72a1f2..600b544 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -10,7 +10,7 @@ include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \ hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \ - replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \ + replorigindesc.o replslotdesc.o seqdesc.o smgrdesc.o spgdesc.o \ standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/replslotdesc.c b/src/backend/access/rmgrdesc/replslotdesc.c new file mode 100644 index 0000000..a49ebf7 --- /dev/null +++ b/src/backend/access/rmgrdesc/replslotdesc.c @@ -0,0 +1,73 @@ +/*------------------------------------------------------------------------- + * + * replslotdesc.c + * rmgr descriptor routines for replication/slot.c + * + * Portions Copyright (c) 2015, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/replslotdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "replication/slot_xlog.h" + +void +replslot_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_REPLSLOT_UPDATE: + { + ReplicationSlotInWAL xlrec; + + xlrec = (ReplicationSlotInWAL) rec; + + appendStringInfo(buf, "update"); + + break; + } + case XLOG_REPLSLOT_CREATE: + { + ReplicationSlotInWAL xlrec; + + xlrec = (ReplicationSlotInWAL) rec; + + appendStringInfo(buf, "create"); + + break; + } + case XLOG_REPLSLOT_DROP: + { + xl_replslot_drop *xlrec; + + xlrec = (xl_replslot_drop *) rec; + + appendStringInfo(buf, "drop %s", NameStr(xlrec->name)); + + break; + } + } +} + +const char * +replslot_identify(uint8 info) +{ + switch (info) + { + case XLOG_REPLSLOT_UPDATE: + return "UPDATE"; + case XLOG_REPLSLOT_CREATE: + return "CREATE"; + case XLOG_REPLSLOT_DROP: + return "DROP"; + default: + return NULL; + } +} diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 7c4d773..0bd5796 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -24,6 +24,7 @@ #include "commands/sequence.h" #include "commands/tablespace.h" #include "replication/origin.h" +#include "replication/slot_xlog.h" #include "storage/standby.h" #include "utils/relmapper.h" diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 6cbe65e..2c5e743 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -2114,6 +2114,9 @@ dbase_redo(XLogReaderState *record) /* Clean out the xlog relcache too */ XLogDropDatabase(xlrec->db_id); + /* Drop any logical failover slots for this database */ + ReplicationSlotsDropDBSlots(xlrec->db_id); + /* And remove the physical files */ if (!rmtree(dst_path, true)) ereport(WARNING, diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 9f60687..01e1b8e 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -135,6 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_BRIN_ID: case RM_COMMIT_TS_ID: case RM_REPLORIGIN_ID: + case RM_REPLSLOT_ID: break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1ce9081..29c3f8a 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -85,16 +85,19 @@ CheckLogicalDecodingRequirements(void) errmsg("logical decoding requires a database connection"))); /* ---- - * TODO: We got to change that someday soon... + * TODO: * - * There's basically three things missing to allow this: + * There's some things missing to allow this: * 1) We need to be able to correctly and quickly identify the timeline a * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. + * 2) To prevent rows we need we would need to enhance hot_standby_feedback + * so it sends both xmin and catalog_xmin to the master. + * A standby slot can't write WAL, so we wouldn't be able to use it + * directly for failover, without some very complex state interactions + * via master. + * + * So this doesn't seem likely to change anytime soon. + * * ---- */ if (RecoveryInProgress()) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c39e957..44fcc02 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -26,6 +26,16 @@ * While the server is running, the state data is also cached in memory for * efficiency. * + * Originally, replication slots were unique to a single node, which meant + * they couldn't easily be used across replication failover. Global slots + * could come in various designs, the simplest of which is "failover slots". + * Any slot created on a master node generates WAL records that maintains + * a copy of the slot on standby nodes. If a standby node is promoted the + * failover slot allows access to be restarted just as if the the original + * master node was being accessed, allowing for the timeline change. + * Global slots may cause problems with name collisions with incautious + * choices of naming convention, which requires some additional checking. + * * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate * or free a slot. ReplicationSlotControlLock must be taken in shared mode * to iterate over the slots, and in exclusive mode to change the in_use flag @@ -44,6 +54,7 @@ #include "common/string.h" #include "miscadmin.h" #include "replication/slot.h" +#include "replication/slot_xlog.h" #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -97,12 +108,17 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 0; /* the maximum number of replication * slots */ -static void ReplicationSlotDropAcquired(void); +static void ReplicationSlotDropAcquired(bool deactivate); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); -static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel); +static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel, + bool create, bool redo); + +/* internal redo functions */ +static void ReplicationSlotRedoUpdate(ReplicationSlotInWAL xlrec); +static void ReplicationSlotRedoCreate(ReplicationSlotInWAL xlrec); /* * Report shared-memory space needed by ReplicationSlotShmemInit. @@ -346,6 +362,11 @@ ReplicationSlotAcquire(const char *name) (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is already active for PID %d", name, active_pid))); + if (RecoveryInProgress() && slot->data.failover) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is reserved for use after failover", + name))); /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; @@ -367,9 +388,9 @@ ReplicationSlotRelease(void) /* * Delete the slot. There is no !PANIC case where this is allowed to * fail, all that may happen is an incomplete cleanup of the on-disk - * data. + * data. Ensure we deactivate the slot also. */ - ReplicationSlotDropAcquired(); + ReplicationSlotDropAcquired(true); } else { @@ -397,15 +418,21 @@ ReplicationSlotDrop(const char *name) ReplicationSlotAcquire(name); - ReplicationSlotDropAcquired(); + /* + * Ensure we deactivate slot + */ + ReplicationSlotDropAcquired(true); } /* * Permanently drop the currently acquired replication slot which will be * released by the point this function returns. + * + * If deactivate is true, grab ReplicationSlotControlLock and change + * in_use flags for slot. */ static void -ReplicationSlotDropAcquired(void) +ReplicationSlotDropAcquired(bool deactivate) { char path[MAXPGPATH]; char tmppath[MAXPGPATH]; @@ -423,6 +450,18 @@ ReplicationSlotDropAcquired(void) */ LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE); + /* Record the drop in XLOG if we aren't replaying WAL */ + if (XLogInsertAllowed()) + { + xl_replslot_drop xlrec; + + memcpy(&(xlrec.name), NameStr(slot->data.name), sizeof(NAMEDATALEN)); + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), sizeof(xlrec)); + (void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_DROP); + } + /* Generate pathnames. */ sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name)); @@ -451,7 +490,11 @@ ReplicationSlotDropAcquired(void) } else { - bool fail_softly = slot->data.persistency == RS_EPHEMERAL; + bool fail_softly = false; + + if (RecoveryInProgress() || + slot->data.persistency == RS_EPHEMERAL) + fail_softly = true; SpinLockAcquire(&slot->mutex); slot->active_pid = 0; @@ -470,10 +513,13 @@ ReplicationSlotDropAcquired(void) * and nobody can be attached to this slot and thus access it without * scanning the array. */ - LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); - slot->active_pid = 0; - slot->in_use = false; - LWLockRelease(ReplicationSlotControlLock); + if (deactivate) + { + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + slot->active_pid = 0; + slot->in_use = false; + LWLockRelease(ReplicationSlotControlLock); + } /* * Slot is dead and doesn't prevent resource removal anymore, recompute @@ -511,7 +557,7 @@ ReplicationSlotSave(void) Assert(MyReplicationSlot != NULL); sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name)); - SaveSlotToPath(MyReplicationSlot, path, ERROR); + SaveSlotToPath(MyReplicationSlot, path, ERROR, false, false); } /* @@ -739,6 +785,45 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) return false; } +void +ReplicationSlotsDropDBSlots(Oid dboid) +{ + int i; + + Assert(MyReplicationSlot == NULL); + + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->data.database == dboid) + { + /* + * There should be no connections to this dbid + * therefore all slots for this dbid should be + * logical, inactive failover slots. + */ + Assert(s->active_pid == 0); + Assert(s->in_use == false); + Assert(SlotIsLogical(s)); + + /* + * Acquire the replication slot + */ + MyReplicationSlot = s; + + /* + * No need to deactivate slot, especially since we + * already hold ReplicationSlotControlLock. + */ + ReplicationSlotDropAcquired(false); + } + } + LWLockRelease(ReplicationSlotControlLock); + + MyReplicationSlot = NULL; +} /* * Check whether the server's configuration supports using replication @@ -860,7 +945,7 @@ CheckPointReplicationSlots(void) /* save the slot to disk, locking is handled in SaveSlotToPath() */ sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); - SaveSlotToPath(s, path, LOG); + SaveSlotToPath(s, path, LOG, false, false); } LWLockRelease(ReplicationSlotAllocationLock); } @@ -964,7 +1049,7 @@ CreateSlotOnDisk(ReplicationSlot *slot) /* Write the actual state file. */ slot->dirty = true; /* signal that we really need to write */ - SaveSlotToPath(slot, tmppath, ERROR); + SaveSlotToPath(slot, tmppath, ERROR, true, false); /* Rename the directory into place. */ if (rename(tmppath, path) != 0) @@ -990,7 +1075,8 @@ CreateSlotOnDisk(ReplicationSlot *slot) * Shared functionality between saving and creating a replication slot. */ static void -SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) +SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel, + bool create, bool redo) { char tmppath[MAXPGPATH]; char path[MAXPGPATH]; @@ -998,15 +1084,18 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) ReplicationSlotOnDisk cp; bool was_dirty; - /* first check whether there's something to write out */ - SpinLockAcquire(&slot->mutex); - was_dirty = slot->dirty; - slot->just_dirtied = false; - SpinLockRelease(&slot->mutex); + if (!redo) + { + /* first check whether there's something to write out */ + SpinLockAcquire(&slot->mutex); + was_dirty = slot->dirty; + slot->just_dirtied = false; + SpinLockRelease(&slot->mutex); - /* and don't do anything if there's nothing to write */ - if (!was_dirty) - return; + /* and don't do anything if there's nothing to write */ + if (!was_dirty) + return; + } LWLockAcquire(slot->io_in_progress_lock, LW_EXCLUSIVE); @@ -1039,6 +1128,21 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel) SpinLockRelease(&slot->mutex); + /* + * If needed, record this action in WAL + */ + if (!redo && + slot->data.failover && + !RecoveryInProgress()) + { + XLogBeginInsert(); + XLogRegisterData((char *) (&cp.slotdata), sizeof(ReplicationSlotPersistentData)); + if (create) + (void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_CREATE); + else + (void) XLogInsert(RM_REPLSLOT_ID, XLOG_REPLSLOT_UPDATE); + } + COMP_CRC32C(cp.checksum, (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize, SnapBuildOnDiskChecksummedSize); @@ -1279,3 +1383,182 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +static void +ReplicationSlotRedoUpdate(ReplicationSlotInWAL xlrec) +{ + bool found = false; + ReplicationSlot *slot; + int i; + + /* + * Prevent any slot from being created/dropped while we're active. As we + * explicitly do *not* want to block iterating over replication_slots or + * acquiring a slot we cannot take the control lock - but that's OK, + * because holding ReplicationSlotAllocationLock is strictly stronger, and + * enough to guarantee that nobody can change the in_use bits on us. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + slot = &ReplicationSlotCtl->replication_slots[i]; + + if (slot->in_use || + strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) != 0) + continue; + + /* update the persistent data */ + slot->data.xmin = xlrec->xmin; + slot->data.catalog_xmin = xlrec->catalog_xmin; + slot->data.restart_lsn = xlrec->restart_lsn; + slot->data.confirmed_flush = xlrec->confirmed_flush; + + /* update in memory state */ + slot->effective_xmin = xlrec->xmin; + slot->effective_catalog_xmin = xlrec->catalog_xmin; + + found = true; + break; + } + + if (found) + { + char path[MAXPGPATH]; + + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + SaveSlotToPath(slot, path, WARNING, false, true); + } + + LWLockRelease(ReplicationSlotAllocationLock); + + if (!found) + { + ereport(WARNING, + (errmsg("WAL record cannot find failover slot"))); + } +} + +static void +ReplicationSlotRedoCreate(ReplicationSlotInWAL xlrec) +{ + ReplicationSlot *slot; + bool found_available = false; + bool found_duplicate = false; + int use_slotid = 0; + int i; + + /* + * Prevent any slot from being created/dropped while we're active. As we + * explicitly do *not* want to block iterating over replication_slots or + * acquiring a slot we cannot take the control lock - but that's OK, + * because holding ReplicationSlotAllocationLock is strictly stronger, and + * enough to guarantee that nobody can change the in_use bits on us. + */ + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); + for (i = 0; i < max_replication_slots; i++) + { + slot = &ReplicationSlotCtl->replication_slots[i]; + + /* + * Find first available slot, but keep on scanning... + */ + if (!slot->in_use && !found_available) + { + use_slotid = i; + found_available = true; + } + + /* + * Check for any duplicates + */ + if (strcmp(NameStr(xlrec->name), NameStr(slot->data.name)) != 0) + { + found_available = true; + found_duplicate = true; + use_slotid = i; + break; + } + } + + if (found_duplicate) + { + LWLockRelease(ReplicationSlotAllocationLock); + + /* + * Do something nasty to the sinful duplicants, but + * take with locking. + */ + + LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED); + } + + if (found_available) + { + char path[MAXPGPATH]; + + slot = &ReplicationSlotCtl->replication_slots[use_slotid]; + + /* restore the entire set of persistent data */ + memcpy(&slot->data, xlrec, + sizeof(ReplicationSlotPersistentData)); + + /* initialize in memory state */ + slot->effective_xmin = xlrec->xmin; + slot->effective_catalog_xmin = xlrec->catalog_xmin; + + slot->candidate_catalog_xmin = InvalidTransactionId; + slot->candidate_xmin_lsn = InvalidXLogRecPtr; + slot->candidate_restart_lsn = InvalidXLogRecPtr; + slot->candidate_restart_valid = InvalidXLogRecPtr; + + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + SaveSlotToPath(slot, path, WARNING, true, true); + } + + LWLockRelease(ReplicationSlotAllocationLock); + + if (!found_available) + ereport(WARNING, + (errmsg("WAL record cannot find failover slot"))); +} + +void +replslot_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + /* + * Update the values for an existing failover slot. + */ + case XLOG_REPLSLOT_UPDATE: + ReplicationSlotRedoUpdate((ReplicationSlotInWAL) XLogRecGetData(record)); + break; + + /* + * Create a new failover slot. If there is already an existing + * failover slot of that name, kill any user, then drop it and + * create this one in its place. + */ + case XLOG_REPLSLOT_CREATE: + ReplicationSlotRedoCreate((ReplicationSlotInWAL) XLogRecGetData(record)); + break; + + /* + * Drop an existing failover slot. + */ + case XLOG_REPLSLOT_DROP: + { + xl_replslot_drop *xlrec = + (xl_replslot_drop *) XLogRecGetData(record); + + ReplicationSlotDrop(NameStr(xlrec->name)); + + break; + } + + default: + elog(PANIC, "replslot_redo: unknown op code %u", info); + } +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index b3c8140..ee1b3e1 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -18,6 +18,7 @@ #include "access/htup_details.h" #include "replication/slot.h" +#include "replication/slot_xlog.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" #include "utils/builtins.h" diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c index 5b88a8d..07efbe7 100644 --- a/src/bin/pg_xlogdump/rmgrdesc.c +++ b/src/bin/pg_xlogdump/rmgrdesc.c @@ -26,6 +26,7 @@ #include "commands/sequence.h" #include "commands/tablespace.h" #include "replication/origin.h" +#include "replication/slot_xlog.h" #include "rmgrdesc.h" #include "storage/standby.h" #include "utils/relmapper.h" diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index c083216..d944747 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL) PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL) +PG_RMGR(RM_REPLSLOT_ID, "ReplicationSlot", replslot_redo, replslot_desc, replslot_identify, NULL, NULL) diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 20dd7a2..134ced2 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -4,6 +4,7 @@ * * Copyright (c) 2012-2015, PostgreSQL Global Development Group * + * src/include/replication/slot.h *------------------------------------------------------------------------- */ #ifndef SLOT_H @@ -11,69 +12,12 @@ #include "fmgr.h" #include "access/xlog.h" -#include "access/xlogreader.h" +#include "replication/slot_xlog.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" /* - * Behaviour of replication slots, upon release or crash. - * - * Slots marked as PERSISTENT are crashsafe and will not be dropped when - * released. Slots marked as EPHEMERAL will be dropped when released or after - * restarts. - * - * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist(). - */ -typedef enum ReplicationSlotPersistency -{ - RS_PERSISTENT, - RS_EPHEMERAL -} ReplicationSlotPersistency; - -/* - * On-Disk data of a replication slot, preserved across restarts. - */ -typedef struct ReplicationSlotPersistentData -{ - /* The slot's identifier */ - NameData name; - - /* database the slot is active on */ - Oid database; - - /* - * The slot's behaviour when being dropped (or restored after a crash). - */ - ReplicationSlotPersistency persistency; - - /* - * xmin horizon for data - * - * NB: This may represent a value that hasn't been written to disk yet; - * see notes for effective_xmin, below. - */ - TransactionId xmin; - - /* - * xmin horizon for catalog tuples - * - * NB: This may represent a value that hasn't been written to disk yet; - * see notes for effective_xmin, below. - */ - TransactionId catalog_xmin; - - /* oldest LSN that might be required by this replication slot */ - XLogRecPtr restart_lsn; - - /* oldest LSN that the client has acked receipt for */ - XLogRecPtr confirmed_flush; - - /* plugin name */ - NameData plugin; -} ReplicationSlotPersistentData; - -/* * Shared memory state of a single replication slot. */ typedef struct ReplicationSlot @@ -171,6 +115,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); +extern void ReplicationSlotsDropDBSlots(Oid dboid); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(void); diff --git a/src/include/replication/slot_xlog.h b/src/include/replication/slot_xlog.h new file mode 100644 index 0000000..aef0312 --- /dev/null +++ b/src/include/replication/slot_xlog.h @@ -0,0 +1,101 @@ +/*------------------------------------------------------------------------- + * slot_xlog.h + * Replication slot management. + * + * Copyright (c) 2012-2015, PostgreSQL Global Development Group + * + * src/include/replication/slot_xlog.h + *------------------------------------------------------------------------- + */ +#ifndef SLOT_XLOG_H +#define SLOT_XLOG_H + +#include "fmgr.h" +#include "access/xlog.h" +#include "access/xlogdefs.h" +#include "access/xlogreader.h" + +/* + * Behaviour of replication slots, upon release or crash. + * + * Slots marked as PERSISTENT are crashsafe and will not be dropped when + * released. Slots marked as EPHEMERAL will be dropped when released or after + * restarts. + * + * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist(). + */ +typedef enum ReplicationSlotPersistency +{ + RS_PERSISTENT, + RS_EPHEMERAL +} ReplicationSlotPersistency; + +/* + * On-Disk data of a replication slot, preserved across restarts. + */ +typedef struct ReplicationSlotPersistentData +{ + /* The slot's identifier */ + NameData name; + + /* database the slot is active on */ + Oid database; + + /* + * The slot's behaviour when being dropped (or restored after a crash). + */ + ReplicationSlotPersistency persistency; + + /* + * Slots created on master become failover-slots and are maintained + * on all standbys, but are only assignable after failover. + */ + bool failover; + + /* + * xmin horizon for data + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId xmin; + + /* + * xmin horizon for catalog tuples + * + * NB: This may represent a value that hasn't been written to disk yet; + * see notes for effective_xmin, below. + */ + TransactionId catalog_xmin; + + /* oldest LSN that might be required by this replication slot */ + XLogRecPtr restart_lsn; + TimeLineID restart_tli; + + /* oldest LSN that the client has acked receipt for */ + XLogRecPtr confirmed_flush; + + /* plugin name */ + NameData plugin; +} ReplicationSlotPersistentData; + +typedef ReplicationSlotPersistentData *ReplicationSlotInWAL; + +/* + * WAL records for failover slots + */ +#define XLOG_REPLSLOT_UPDATE 0x00 +#define XLOG_REPLSLOT_DROP 0x01 +#define XLOG_REPLSLOT_CREATE 0x02 + +typedef struct xl_replslot_drop +{ + NameData name; +} xl_replslot_drop; + +/* WAL logging */ +extern void replslot_redo(XLogReaderState *record); +extern void replslot_desc(StringInfo buf, XLogReaderState *record); +extern const char *replslot_identify(uint8 info); + +#endif /* SLOT_XLOG_H */