diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8e3b5df7dc..194a2f9998 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6076,6 +6076,23 @@ void SetRecoveryPause(bool recoveryPause) { SpinLockAcquire(&XLogCtl->info_lck); + + /* + * Wait for the application of the record being applied to finish, so that + * no records will be applied after this function returns. We don't need to + * wait when ending a pause. Anyway we are requesting a recovery pause, we + * don't mind a possible slow down of recovery by the info_lck here. + * We don't need to wait in the startup process. + */ + while(InRecovery && + recoveryPause && !XLogCtl->recoveryPause && + XLogCtl->replayEndRecPtr != XLogCtl->lastReplayedEndRecPtr) + { + SpinLockRelease(&XLogCtl->info_lck); + CHECK_FOR_INTERRUPTS(); + pg_usleep(10000L); /* 10 ms */ + SpinLockAcquire(&XLogCtl->info_lck); + } XLogCtl->recoveryPause = recoveryPause; SpinLockRelease(&XLogCtl->info_lck); } @@ -7262,6 +7279,7 @@ StartupXLOG(void) do { bool switchedTLI = false; + bool pause_requested = false; #ifdef WAL_DEBUG if (XLOG_DEBUG || @@ -7292,11 +7310,9 @@ StartupXLOG(void) * Note that we intentionally don't take the info_lck spinlock * here. We might therefore read a slightly stale value of * the recoveryPause flag, but it can't be very stale (no - * worse than the last spinlock we did acquire). Since a - * pause request is a pretty asynchronous thing anyway, - * possibly responding to it one WAL record later than we - * otherwise would is a minor issue, so it doesn't seem worth - * adding another spinlock cycle to prevent that. + * worse than the last spinlock we did acquire). We eventually + * make sure catching the pause request if any just before + * applying this record. */ if (((volatile XLogCtlData *) XLogCtl)->recoveryPause) recoveryPausesHere(false); @@ -7385,12 +7401,19 @@ StartupXLOG(void) /* * Update shared replayEndRecPtr before replaying this record, * so that XLogFlush will update minRecoveryPoint correctly. + * Also we check for the correct value of the recoveryPause + * flag here not to have redo overrun during a pause. See + * SetRecoveryPuase() for details. */ SpinLockAcquire(&XLogCtl->info_lck); XLogCtl->replayEndRecPtr = EndRecPtr; XLogCtl->replayEndTLI = ThisTimeLineID; + pause_requested = XLogCtl->recoveryPause; SpinLockRelease(&XLogCtl->info_lck); + if (pause_requested) + recoveryPausesHere(false); + /* * If we are attempting to enter Hot Standby mode, process * XIDs we see