diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 0d8624a..6cf0a3f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2790,30 +2790,28 @@ include_dir 'conf.d' - Specifies a comma-separated list of standby names that can support + Specifies a comma-separated list of standby names, along with their + respective count, separated by a hyphen,that supports synchronous replication, as described in . - At any one time there will be at most one active synchronous standby; - transactions waiting for commit will be allowed to proceed after - this standby server confirms receipt of their data. - The synchronous standby will be the first standby named in this list - that is both currently connected and streaming data in real-time - (as shown by a state of streaming in the + Transactions waiting for commit will be allowed to proceed after the + specified number of standby servers confirms receipt of their data. + The synchronous standbys will be those named in this list that are both + currently connected and streaming data in real-time (as shown by a state + of streaming in the pg_stat_replication view). - Other standby servers appearing later in this list represent potential - synchronous standbys. If the current synchronous standby disconnects for whatever reason, - it will be replaced immediately with the next-highest-priority standby. - Specifying more than one standby name can allow very high availability. + it will be replaced immediately with another standby of the same name. The name of a standby server for this purpose is the application_name setting of the standby, as set in the primary_conninfo of the standby's WAL receiver. There is - no mechanism to enforce uniqueness. In case of duplicates one of the - matching standbys will be chosen to be the synchronous standby, though - exactly which one is indeterminate. + no mechanism to enforce uniqueness. For each specified standby_name, + only the specified count of standbys will be chosen to be synchronous + standbys, though exactly which ones is indeterminate, the rest will + represent potential synchronous standbys. The special entry * matches any application_name, including the default application name of walreceiver. diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index a17f555..8cac7a8 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1081,7 +1081,7 @@ primary_slot_name = 'node_a_slot' WAL record is then sent to the standby. The standby sends reply messages each time a new batch of WAL data is written to disk, unless wal_receiver_status_interval is set to zero on the standby. - If the standby is the first matching standby, as specified in + If the standby is a matching standby, as specified in synchronous_standby_names on the primary, the reply messages from that standby will be used to wake users waiting for confirmation that the commit record has been received. These parameters @@ -1167,11 +1167,10 @@ primary_slot_name = 'node_a_slot' The best solution for avoiding data loss is to ensure you don't lose - your last remaining synchronous standby. This can be achieved by naming multiple - potential synchronous standbys using synchronous_standby_names. - The first named standby will be used as the synchronous standby. Standbys - listed after this will take over the role of synchronous standby if the - first one should fail. + your last remaining synchronous standbys. This can be achieved by naming multiple + synchronous standbys using synchronous_standby_names. + The specified number of named standbys will be used as the synchronous standbys, + all the additional standbys are potential synchronous candidates. diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 325239d..b34db25 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -5,7 +5,7 @@ * Synchronous replication is new as of PostgreSQL 9.1. * * If requested, transaction commits wait until their commit LSN is - * acknowledged by the synchronous standby. + * acknowledged by the synchronous standbys. * * This module contains the code for waiting and release of backends. * All code in this module executes on the primary. The core streaming @@ -29,11 +29,13 @@ * single ordered queue of waiting backends, so that we can avoid * searching the through all waiters each time we receive a reply. * - * In 9.1 we support only a single synchronous standby, chosen from a - * priority list of synchronous_standby_names. Before it can become the - * synchronous standby it must have caught up with the primary; that may - * take some time. Once caught up, the current highest priority standby - * will release waiters from the queue. + * In 9.5 we support the possibility to have multiple synchronous standbys, + * as defined in synchronous_standby_names. Before one standby can + * become a synchronous standby it must have caught up with the primary; + * that may take some time. + * + * Waiters will be released from the queue once the number of standbys + * specified in synchronous_standby_names have caught. * * Portions Copyright (c) 2010-2015, PostgreSQL Global Development Group * @@ -60,6 +62,11 @@ /* User-settable parameters for sync rep */ char *SyncRepStandbyNames; +/* + * Synchronous standbys are defined if there is more than + * one synchronous standby wanted. In default case, the list + * of standbys defined needs to be not empty. + */ #define SyncStandbysDefined() \ (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0') @@ -197,7 +204,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); whereToSendOutput = DestNone; SyncRepCancelWait(); break; @@ -214,7 +221,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) QueryCancelPending = false; ereport(WARNING, (errmsg("canceling wait for synchronous replication due to user request"), - errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); + errdetail("The transaction has already committed locally, but might not have been replicated to the standby(s)."))); SyncRepCancelWait(); break; } @@ -348,18 +355,78 @@ SyncRepInitConfig(void) } } + /* - * Find the WAL sender servicing the synchronous standby with the lowest - * priority value, or NULL if no synchronous standby is connected. If there - * are multiple standbys with the same lowest priority value, the first one - * found is selected. The caller must hold SyncRepLock. + * Obtain a palloc'd array containing positions of standbys currently + * considered as synchronous. Caller is responsible for freeing the + * data obtained and should as well take a necessary lock on SyncRepLock. */ -WalSnd * -SyncRepGetSynchronousStandby(void) +int * +SyncRepGetSynchronousStandby(int *num_sync, int *total_sync) { - WalSnd *result = NULL; - int result_priority = 0; - int i; + int *sync_nodes, + *group_count, + i = 0; + List *elemlist1; + ListCell *l; + char *rawstring; + + /* Initialize */ + *num_sync = 0; + *total_sync = 0; + + /* Leave if no synchronous nodes allowed */ + if (!SyncStandbysDefined()) + return NULL; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(SyncRepStandbyNames); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist1)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist1); + /* GUC machinery will have already complained - no need to do again */ + return 0; + } + + /* Allocate space for the count of each group */ + group_count = (int *) palloc(elemlist1->length * sizeof(int)); + + foreach(l, elemlist1) + { + char *standby_detail = (char *) lfirst(l); + List *elemlist2; + int value; + + if (!SplitIdentifierString(standby_detail, '-', &elemlist2)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist1); + return 0; + } + + if (!parse_int(lsecond(elemlist2), &value, 0, NULL)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist1); + return 0; + } + + /* Determine the number of nodes that can be synchronized. */ + *total_sync += value; + group_count[i++] = value; + } + + /* + * Make enough room, there is a maximum of max_wal_senders synchronous + * nodes as we scan though WAL senders here. + */ + sync_nodes = (int *) palloc((*total_sync) * sizeof(int)); for (i = 0; i < max_wal_senders; i++) { @@ -380,31 +447,31 @@ SyncRepGetSynchronousStandby(void) if (this_priority == 0) continue; - /* Must have a lower priority value than any previous ones */ - if (result != NULL && result_priority <= this_priority) - continue; + if (*num_sync == *total_sync) + break; /* Must have a valid flush position */ if (XLogRecPtrIsInvalid(walsnd->flush)) continue; - result = (WalSnd *) walsnd; - result_priority = this_priority; - /* - * If priority is equal to 1, there cannot be any other WAL senders - * with a lower priority, so we're done. + * We have a potential synchronous candidate, add it to the list of + * nodes already present */ - if (this_priority == 1) - return result; + if (group_count[this_priority - 1] != 0) + { + sync_nodes[*num_sync] = i; + (*num_sync)++; + group_count[this_priority - 1]--; + } } - return result; + return sync_nodes; } /* * Update the LSNs on each queue based upon our latest state. This - * implements a simple policy of first-valid-standby-releases-waiter. + * implements a simple policy of first-valid-standbys-release-waiter. * * Other policies are possible, which would change what we do here and what * perhaps also which information we store as well. @@ -413,9 +480,15 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; - WalSnd *syncWalSnd; + int *sync_standbys; int numwrite = 0; int numflush = 0; + int num_sync = 0; + int total_sync = 0; + int i; + bool found = false; + XLogRecPtr min_write_pos; + XLogRecPtr min_flush_pos; /* * If this WALSender is serving a standby that is not on the list of @@ -430,48 +503,97 @@ SyncRepReleaseWaiters(void) /* * We're a potential sync standby. Release waiters if we are the highest - * priority standby. + * priority standby. If there are multiple standbys with same priorities + * then we use the first mentioned standbys. */ LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - syncWalSnd = SyncRepGetSynchronousStandby(); + sync_standbys = SyncRepGetSynchronousStandby(&num_sync, &total_sync); /* We should have found ourselves at least */ - Assert(syncWalSnd != NULL); + Assert(num_sync > 0); /* - * If we aren't managing the highest priority standby then just leave. + * If we aren't managing one of the standbys with highest priority then + * just leave. */ - if (syncWalSnd != MyWalSnd) + for (i = 0; i < num_sync; i++) + { + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; + + if (walsndloc == MyWalSnd) + { + found = true; + break; + } + } + + /* + * We are definitely not one of the chosen... But we could by taking the + * next takeover. + */ + if (!found) { LWLockRelease(SyncRepLock); + pfree(sync_standbys); announce_next_takeover = true; return; } /* + * Even if we are one of the chosen standbys, leave if there are less + * synchronous standbys in waiting state than what is expected by the + * user. + */ + if (num_sync < total_sync) + { + LWLockRelease(SyncRepLock); + pfree(sync_standbys); + return; + } + + /* * Set the lsn first so that when we wake backends they will release up to - * this location. + * this location, of course only if all the standbys found as synchronous + * have already reached that point, so first find what are the oldest + * write and flush positions of all the standbys considered in sync... */ - if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < MyWalSnd->write) + min_write_pos = MyWalSnd->write; + min_flush_pos = MyWalSnd->flush; + for (i = 0; i < num_sync; i++) { - walsndctl->lsn[SYNC_REP_WAIT_WRITE] = MyWalSnd->write; + volatile WalSnd *walsndloc = &WalSndCtl->walsnds[sync_standbys[i]]; + + SpinLockAcquire(&walsndloc->mutex); + if (min_write_pos > walsndloc->write) + min_write_pos = walsndloc->write; + if (min_flush_pos > walsndloc->flush) + min_flush_pos = walsndloc->flush; + SpinLockRelease(&walsndloc->mutex); + } + + /* ... And now update if necessary */ + if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < min_write_pos) + { + walsndctl->lsn[SYNC_REP_WAIT_WRITE] = min_write_pos; numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); } - if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < MyWalSnd->flush) + if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < min_flush_pos) { - walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = MyWalSnd->flush; + walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = min_flush_pos; numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); } LWLockRelease(SyncRepLock); elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X", - numwrite, (uint32) (MyWalSnd->write >> 32), (uint32) MyWalSnd->write, - numflush, (uint32) (MyWalSnd->flush >> 32), (uint32) MyWalSnd->flush); + numwrite, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_WRITE] >> 32), + (uint32) walsndctl->lsn[SYNC_REP_WAIT_WRITE], + numflush, (uint32) (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] >> 32), + (uint32) walsndctl->lsn[SYNC_REP_WAIT_FLUSH]); /* * If we are managing the highest priority standby, though we weren't - * prior to this, then announce we are now the sync standby. + * prior to this, then announce we are now a sync standby. */ if (announce_next_takeover) { @@ -480,6 +602,9 @@ SyncRepReleaseWaiters(void) (errmsg("standby \"%s\" is now the synchronous standby with priority %u", application_name, MyWalSnd->sync_standby_priority))); } + + /* Clean up */ + pfree(sync_standbys); } /* @@ -494,7 +619,7 @@ static int SyncRepGetStandbyPriority(void) { char *rawstring; - List *elemlist; + List *elemlist1; ListCell *l; int priority = 0; bool found = false; @@ -506,24 +631,39 @@ SyncRepGetStandbyPriority(void) if (am_cascading_walsender) return 0; + /* If no synchronous nodes allowed, no cake for this WAL sender */ + if (!SyncStandbysDefined()) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); /* Parse string into list of identifiers */ - if (!SplitIdentifierString(rawstring, ',', &elemlist)) + if (!SplitIdentifierString(rawstring, ',', &elemlist1)) { /* syntax error in list */ pfree(rawstring); - list_free(elemlist); + list_free(elemlist1); /* GUC machinery will have already complained - no need to do again */ return 0; } - foreach(l, elemlist) + foreach(l, elemlist1) { - char *standby_name = (char *) lfirst(l); + char *standby_detail = (char *) lfirst(l); + char *standby_name; + List *elemlist2; + + if (!SplitIdentifierString(standby_detail, '-', &elemlist2)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist1); + return 0; + } priority++; + standby_name = (char *) linitial(elemlist2); if (pg_strcasecmp(standby_name, application_name) == 0 || pg_strcasecmp(standby_name, "*") == 0) @@ -531,10 +671,12 @@ SyncRepGetStandbyPriority(void) found = true; break; } + + list_free(elemlist2); } pfree(rawstring); - list_free(elemlist); + list_free(elemlist1); return (found ? priority : 0); } @@ -688,21 +830,76 @@ bool check_synchronous_standby_names(char **newval, void **extra, GucSource source) { char *rawstring; - List *elemlist; + char *standby_name; + List *elemlist1, + *elemlist2 = NIL, + *standby_list = NIL; + ListCell *l; + bool ret_value = true; - /* Need a modifiable copy of string */ rawstring = pstrdup(*newval); - /* Parse string into list of identifiers */ - if (!SplitIdentifierString(rawstring, ',', &elemlist)) + if (!SplitIdentifierString(rawstring, ',', &elemlist1)) { /* syntax error in list */ GUC_check_errdetail("List syntax is invalid."); pfree(rawstring); - list_free(elemlist); + list_free(elemlist1); return false; } + foreach(l, elemlist1) + { + char *standby_detail = (char *) lfirst(l); + int value; + + if (!(SplitIdentifierString(standby_detail, '-', &elemlist2)) || + elemlist2->length != 2) + { + /* syntax error in list */ + GUC_check_errdetail("List syntax is invalid."); + ret_value = false; + break; + } + + standby_name = (char *) linitial(elemlist2); + + if (strcmp(standby_name, "*") == 0 && elemlist1->length > 1) + { + /* syntax error in list */ + GUC_check_errdetail("Entry for '*' cannot be combined with others"); + ret_value = false; + break; + } + + if (standby_list != NIL) + { + ListCell *s; + bool found = false; + + foreach(s, standby_list) + if (strcmp(lfirst(s), standby_name) == 0) + found = true; + + if (found) + { + GUC_check_errdetail("Standby names are repeated"); + ret_value = false; + break; + } + } + + standby_list = lappend(standby_list, standby_name); + + if (!(parse_int(lsecond(elemlist2), &value, 0, NULL)) || + value <= 0) + { + GUC_check_errdetail("Invalid integer value."); + ret_value = false; + break; + } + } + /* * Any additional validation of standby names should go here. * @@ -712,9 +909,10 @@ check_synchronous_standby_names(char **newval, void **extra, GucSource source) */ pfree(rawstring); - list_free(elemlist); - - return true; + list_free(elemlist1); + list_free(elemlist2); + list_free(standby_list); + return ret_value; } void diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4a20569..e57178c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2731,8 +2731,8 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; - WalSnd *sync_standby; - int i; + int *sync_standbys; + int i, num_sync = 0, total_sync = 0; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) @@ -2760,10 +2760,10 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); /* - * Get the currently active synchronous standby. + * Get the list of synchronous standbys. */ LWLockAcquire(SyncRepLock, LW_SHARED); - sync_standby = SyncRepGetSynchronousStandby(); + sync_standbys = SyncRepGetSynchronousStandby(&num_sync, &total_sync); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) @@ -2834,16 +2834,32 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ if (priority == 0) values[7] = CStringGetTextDatum("async"); - else if (walsnd == sync_standby) - values[7] = CStringGetTextDatum("sync"); else - values[7] = CStringGetTextDatum("potential"); + { + int j; + bool found = false; + + for (j = 0; j < num_sync; j++) + { + /* Found that this node is one in sync */ + if (i == sync_standbys[j]) + { + values[7] = CStringGetTextDatum("sync"); + found = true; + break; + } + } + if (!found) + values[7] = CStringGetTextDatum("potential"); + } } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } /* clean up and return the tuplestore */ + if (sync_standbys) + pfree(sync_standbys); tuplestore_donestoring(tupstore); return (Datum) 0; diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 110983f..e5e32d1 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -239,7 +239,8 @@ # These settings are ignored on a standby server. #synchronous_standby_names = '' # standby servers that provide sync rep - # comma-separated list of application_name + # comma-separated list of application_name and + # their count, separated by a hyphen # from standby(s); '*' = all #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 71e2857..3ffec7b 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -49,7 +49,7 @@ extern void SyncRepUpdateSyncStandbysDefined(void); /* forward declaration to avoid pulling in walsender_private.h */ struct WalSnd; -extern struct WalSnd *SyncRepGetSynchronousStandby(void); +extern int *SyncRepGetSynchronousStandby(int *, int *); extern bool check_synchronous_standby_names(char **newval, void **extra, GucSource source); extern void assign_synchronous_commit(int newval, void *extra);