Logical replication prefetch
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends. The problem is not logical replication
specific: physical replication stream is also handled by single
walreceiver. But for physical replication Postgres now implements
prefetch: looking at WAL record blocks it is quite easy to predict which
pages will be required for redo and prefetch them. With logical
replication situation is much more complicated.
My first idea was to implement parallel apply of transactions. But to do
it we need to track dependencies between transactions. Right now
Postgres can apply transactions in parallel, but only if they are
streamed (which is done only for large transactions) and serialize them
by commits. It is possible to enforce parallel apply of short
transactions using `debug_logical_replication_streaming` but then
performance is ~2x times slower than in case of sequential apply by
single worker. By removing serialization by commits, it is possible to
speedup apply 3x times and make subscriber apply changes faster then
producer can produce them even with multiple clients. But it is possible
only if transactions are independent and it can be enforced only by
tracking dependencies which seems to be very non-trivial and invasive.
I still do not completely give up with tracking dependencies approach,
but decided first to try more simple solution - prefetching. It is
already used for physical replication. Certainly in case of physical
replication it is much simpler, because each WAL record contains list of
accessed blocks.
In case of logical replication prefetching can be done either by
prefetching access to replica identity index (usually primary key),
either by executing replication command by some background worker
Certainly first case is much more easy. We just perform index lookup in
prefetch worker and it loads accessed index and heap pages in shared
buffer, so main apply worker does not need to read something from disk.
But it works well only for DELETE and HOT UPDATE operations.
In the second case we normally execute the LR command in background
worker and then abort transaction. Certainly in this case we are doing
the same work twice. But assumption is the same: parallel prefetch
workers should load affected pages, speeding up work of the main apply
worker.
I have implemented some PoC (see attached patch). And get first results
of efficiency of such prefetching.
*** First scenario (update-only).
Publisher:
```
create table t(pk integer primary key, counter integer, filler text
default repeat('x', 1000)) with (fillfactor=10);
insert into t values (generate_series(1,100000), 0);
create publication pub1 for table t;
```
Subscriber:
```
create table t(pk integer primary key, counter integer, filler text
default repeat('x', 1000)) with (fillfactor=10);
create subscription sub1 connection 'port=54321 dbname=postgres'
publication pub1;
```
Then I wait until replication is synced, stop subscriber and do random
dot updates in 10 sessions at publisher:
```
pgbench -T 100 -c 10 -M prepared -n -f update.sql -p 54321 -d postgres
```
where update.sql is:
```
\set pk random(1, 100000)
update t set counter=counter+1 where pk=:pk;
```
Then I start subscriber and measure how much time is needed for it to
caught up.
Results:
no prefetch: 2:00 min
prefetch (replica identity only): 0:55 min
prefetch (all): 1:10 min
This is definitely the best case for replica-identity index only
prefetch (update-only and no other indexes).
How to interpret this results?
Without prefetch applying updates takes about two times more at
subscriber than performing this updates at publisher.
It means that under huge workload subscriber has no chances to caught up.
With prefetching replica identity index, apply time is even smaller than
time needed to perform updates at publisher.
Performing the whole operation and transaction abort certainly adds more
overhead. But still improvement is quite significant.
Please also notice that this results were obtains at the system with
larger amount of RAM (64Gb) and fast SSD.
With data set not fitting in RAM and much slower disks, the difference
is expected to be more significant.
I have tried to simulate it be adding 0.1msec delay to pg_preadv.
When I add artificial 0.1msec `preadv` delay, I got the following results:
no prefetch: 7:40
prefetch (replica identity only): 3:10 min
prefetch (all): 3:09
In this case apply takes much more time than 100 seconds during which
updates are performed at publisher. Prefetch can improve speed about two
times,
but it doesn't allow subcriber to caught-up.
*** Second scenario: inserts with secondary random key.
Publisher:
```
create table t(pk serial primary key, sk integer, counter integer default 0)
insert into t (sk) select random()*10000000 from generate_series(1,10000000)
create index on t(sk)
create publication pub1 for table t
```
Subscriber:
```
create table t(pk integer primary key, sk integer, counter integer)
create index on t(sk)
create subscription sub1 connection 'port=54321 dbname=postgres'
publication pub1
```
workload:
```
pgbench -T 100 -c 10 -M prepared -n -f insert.sql -p 54321 -d postgres
```
where insert.sql:
```
INSERT INTO t (sk) VALUES (random()*10000000);
```
Results (with 0.1msec delay) are the followingL
no prefetch: 10:10 min
prefetch (identity): 8:25 min
prefetch (full): 5:50min
Here as expected prefetching only primary key doesn't provide some big
improvement. But replaying insert command in prefetch worker allows to
speedup apply almost twice.
Please notice that this approach requires minimal changes in Postgres,
because all infrastructure of parallel apply workers is already present
and we can reuse the same apply code (with minimal changes) for
performing prefetch. I only have to introduce extra tuple lock types
(no-lock and try-lock) to minimize overhead and lock conflicts between
prefetch and main apply workers. Still it can not completely prevent
locks conflicts and deadlocks in prefetch workers. Looks like more work
is needed here. Also I set `wal_level=minimal` in prefetch workers to
avoid WAL-logging overhead.
Number of prefetch workers is specified by
`max_parallel_prefetch_workers_per_subscription` GUC. If it is zero
(default) then no prefetching is performed.
Prefetch mode is controlled by `prefetch_replica_identity_only` GUC . By
default it is true which makes prefetch efficient for hot updates,
deletes or inserts in table with just one index (primary key).
Attached please find patch and two shell scripts used to produce this
test results.
Also it may be more convenient to inspect this patch as PR:
https://github.com/knizhnik/postgres/pull/3
I wonder if such LR prefetching approach is considered to be useful?
Or it is better to investigate other ways to improve LR apply speed
(parallel apply)?
Attachments:
v1-0001-logical-replication-prefetch.patchtext/plain; charset=UTF-8; name=v1-0001-logical-replication-prefetch.patchDownload
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..c9c4223b22e 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
* invoking table_tuple_lock.
*/
static bool
-should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode lockmode)
{
bool refetch = false;
@@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
break;
case TM_Updated:
/* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ refetch = true;
+ }
break;
case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ refetch = true;
+ }
break;
case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
@@ -236,8 +242,16 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
/* Found our tuple and it's not locked */
@@ -246,7 +260,7 @@ retry:
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -256,14 +270,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -395,16 +409,23 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
-
/* Found our tuple and it's not locked */
break;
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -414,14 +435,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -508,7 +529,7 @@ retry:
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, LockTupleShare))
goto retry;
return true;
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..d2c426ecab7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
* Try to get a parallel apply worker from the pool. If none is available then
* start a new one.
*/
-static ParallelApplyWorkerInfo *
+ParallelApplyWorkerInfo *
pa_launch_parallel_worker(void)
{
MemoryContext oldcontext;
@@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void)
}
}
+
+static void
+pa_apply_dispatch(StringInfo s)
+{
+ if (MyParallelShared->do_prefetch)
+ {
+ PG_TRY();
+ {
+ apply_dispatch(s);
+ }
+ PG_CATCH();
+ {
+ HOLD_INTERRUPTS();
+
+ elog(DEBUG1, "Failed to prefetch LR operation");
+
+ /* TODO: should we somehow dump the error or just silently ignore it? */
+ /* EmitErrorReport(); */
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ lr_prefetch_errors += 1;
+ }
+ PG_END_TRY();
+ if (!prefetch_replica_identity_only)
+ {
+ /* We need to abort transaction to undo insert */
+ AbortCurrentTransaction();
+ }
+ }
+ else
+ {
+ apply_dispatch(s);
+ }
+}
+
/* Parallel apply worker main loop. */
static void
LogicalParallelApplyLoop(shm_mq_handle *mqh)
@@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
*/
s.cursor += SIZE_STATS_MESSAGE;
- apply_dispatch(&s);
+ pa_apply_dispatch(&s);
}
else if (shmq_res == SHM_MQ_WOULD_BLOCK)
{
@@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializingApplyWorker = false;
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ if (!MyParallelShared->do_prefetch)
+ {
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
originname, sizeof(originname));
- originid = replorigin_by_name(originname, false);
-
- /*
- * The parallel apply worker doesn't need to monopolize this replication
- * origin which was already acquired by its leader process.
- */
- replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
- CommitTransactionCommand();
+ originid = replorigin_by_name(originname, false);
+ /*
+ * The parallel apply worker doesn't need to monopolize this replication
+ * origin which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();
+ }
+ else
+ {
+ /* Do not write WAL for prefetch */
+ wal_level = WAL_LEVEL_MINIMAL;
+ }
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
shm_mq_result result;
TimestampTz startTime = 0;
- Assert(!IsTransactionState());
- Assert(!winfo->serialize_changes);
+ if (!winfo->shared->do_prefetch)
+ {
+ Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);
+ }
/*
* We don't try to send data to parallel worker for 'immediate' mode. This
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..ff2eaad5462 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
+int max_parallel_prefetch_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fd11805a44c..8ff0076dad3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
/* Are we initializing an apply worker? */
bool InitializingApplyWorker = false;
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = true;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -329,6 +341,11 @@ bool InitializingApplyWorker = false;
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+/*
+ * If operation is performed by parallel prefetch worker
+ */
+#define is_prefetching() (am_parallel_apply_worker() && MyParallelShared->do_prefetch)
+
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
TransApplyAction apply_action;
StringInfoData original_msg;
+ if (is_prefetching())
+ {
+ return false;
+ }
+
apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
@@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
RelationGetRelationName(rel))));
}
+#define SAFE_APPLY(call) \
+ if (is_prefetching()) \
+ { \
+ PG_TRY(); \
+ { \
+ call; \
+ } \
+ PG_CATCH(); \
+ { \
+ HOLD_INTERRUPTS(); \
+ elog(DEBUG1, "Failed to prefetch LR operation");\
+ FlushErrorState(); \
+ RESUME_INTERRUPTS(); \
+ lr_prefetch_errors += 1; \
+ } \
+ PG_END_TRY(); \
+ } else { \
+ call; \
+ }
+
+
/*
* Handle INSERT message.
*/
@@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_insert_internal(edata, relinfo, remoteslot);
+ SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, remoteslot));
ExecCloseIndices(relinfo);
}
@@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
!relinfo->ri_RelationDesc->rd_rel->relhasindex ||
RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
- /* Caller will not have done this bit. */
- Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
- InitConflictIndexes(relinfo);
+ if (is_prefetching() && prefetch_replica_identity_only)
+ {
+ TupleTableSlot *localslot = NULL;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ Relation localrel = relinfo->ri_RelationDesc;
+ EPQState epqstate;
- /* Do the insert. */
- TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
- ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+
+ (void)FindReplTupleInLocalRel(edata, localrel,
+ &relmapentry->remoterel,
+ relmapentry->localindexoid,
+ remoteslot, &localslot);
+ }
+ else
+ {
+ /* Caller will not have done this bit. */
+ Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+ InitConflictIndexes(relinfo);
+
+ /* Do the insert. */
+ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+ ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ }
+ if (is_prefetching())
+ {
+ lr_prefetch_inserts += 1;
+ }
}
/*
@@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s)
apply_handle_tuple_routing(edata,
remoteslot, &newtup, CMD_UPDATE);
else
- apply_handle_update_internal(edata, edata->targetRelInfo,
- remoteslot, &newtup, rel->localindexoid);
+ SAFE_APPLY(apply_handle_update_internal(edata, edata->targetRelInfo,
+ remoteslot, &newtup, rel->localindexoid));
finish_edata(edata);
@@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata,
localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ if (prefetch_replica_identity_only)
+ goto Cleanup;
+ }
+
/*
* Tuple found.
*
@@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
remoteslot, newslot, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
}
@@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_delete_internal(edata, relinfo,
- remoteslot, rel->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata, relinfo,
+ remoteslot, rel->localindexoid));
ExecCloseIndices(relinfo);
}
@@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ goto Cleanup;
+ }
+
/* If found delete it. */
if (found)
{
@@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
remoteslot, NULL, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
EvalPlanQualEnd(&epqstate);
}
@@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
EState *estate = edata->estate;
bool found;
+ LockTupleMode lockmode = is_prefetching() ? prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : LockTupleExclusive;
+
/*
* Regardless of the top-level operation, we're performing a read here, so
* check for SELECT privileges.
@@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
#endif
found = RelationFindReplTupleByIndex(localrel, localidxoid,
- LockTupleExclusive,
+ lockmode,
remoteslot, *localslot);
}
else
- found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+ found = RelationFindReplTupleSeq(localrel, lockmode,
remoteslot, *localslot);
return found;
@@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
switch (operation)
{
case CMD_INSERT:
- apply_handle_insert_internal(edata, partrelinfo,
- remoteslot_part);
+ SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo,
+ remoteslot_part));
break;
case CMD_DELETE:
- apply_handle_delete_internal(edata, partrelinfo,
- remoteslot_part,
- part_entry->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata, partrelinfo,
+ remoteslot_part,
+ part_entry->localindexoid));
break;
case CMD_UPDATE:
@@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
@@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict reporting */
newslot = table_slot_create(partrel, &estate->es_tupleTable);
slot_store_data(newslot, part_entry, newtup);
@@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
slot_getallattrs(remoteslot);
}
MemoryContextSwitchTo(oldctx);
- apply_handle_insert_internal(edata, partrelinfo_new,
- remoteslot_part);
+ SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new,
+ remoteslot_part));
}
EvalPlanQualEnd(&epqstate);
@@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
MemoryContextSwitchTo(ApplyMessageContext);
}
-
/* Update statistics of the worker. */
static void
UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
@@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
}
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+ ParallelApplyWorkerInfo* winfo;
+
+ if (buf[0] != 'w')
+ return;
+
+ switch (buf[MSG_CODE_OFFSET])
+ {
+ case LOGICAL_REP_MSG_INSERT:
+ case LOGICAL_REP_MSG_UPDATE:
+ case LOGICAL_REP_MSG_DELETE:
+ /* Round robin prefetch worker */
+ winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers];
+ pa_send_data(winfo, len, buf);
+ break;
+
+ case LOGICAL_REP_MSG_TYPE:
+ case LOGICAL_REP_MSG_RELATION:
+ /* broadcast to all prefetch workers */
+ for (int i = 0; i < n_prefetch_workers; i++)
+ {
+ winfo = prefetch_worker[i];
+ pa_send_data(winfo, len, buf);
+ }
+ break;
+
+ default:
+ /* Ignore other messages */
+ break;
+ }
+}
+
/*
* Apply main loop.
*/
@@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ char* prefetch_buf = NULL;
+ size_t prefetch_buf_pos = 0;
+ size_t prefetch_buf_used = 0;
+ size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
+ if (max_parallel_prefetch_workers_per_subscription != 0)
+ {
+ int i;
+ for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++)
+ {
+ prefetch_worker[i] = pa_launch_parallel_worker();
+ if (!prefetch_worker[i])
+ {
+ elog(LOG, "Launch only %d prefetch worklers from %d",
+ i, max_parallel_prefetch_workers_per_subscription);
+ break;
+ }
+ prefetch_worker[i]->in_use = true;
+ prefetch_worker[i]->shared->do_prefetch = true;
+ }
+ n_prefetch_workers = i;
+ prefetch_buf = palloc(prefetch_buf_size);
+ }
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
- int len;
+ int32 len;
char *buf = NULL;
bool endofstream = false;
+ bool no_more_data = false;
long wait_time;
CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
- if (len != 0)
+ /* Loop to process all available data (without blocking). */
+ for (;;)
{
- /* Loop to process all available data (without blocking). */
- for (;;)
- {
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- if (len == 0)
+ if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used)
+ {
+ prefetch_buf_used = 0;
+ do
{
- break;
- }
- else if (len < 0)
+ if (prefetch_buf_used + len + 4 > prefetch_buf_size)
+ {
+ prefetch_buf_size *= 2;
+ elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size);
+ prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size);
+ }
+ memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+ memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+ prefetch_buf_used += 4 + len;
+ if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE)
+ break;
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ } while (len > 0);
+
+ no_more_data = len <= 0;
+
+ for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len)
{
- ereport(LOG,
- (errmsg("data stream from publisher has ended")));
- endofstream = true;
- break;
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
}
- else
- {
- int c;
- StringInfoData s;
+ memcpy(&len, prefetch_buf, 4);
+ buf = &prefetch_buf[4];
+ prefetch_buf_pos = len + 4;
+ }
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
- /* Reset timeout. */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
- /* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyMessageContext);
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
- initReadOnlyStringInfo(&s, buf, len);
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
- c = pq_getmsgbyte(&s);
+ initReadOnlyStringInfo(&s, buf, len);
- if (c == 'w')
- {
- XLogRecPtr start_lsn;
- XLogRecPtr end_lsn;
- TimestampTz send_time;
+ c = pq_getmsgbyte(&s);
- start_lsn = pq_getmsgint64(&s);
- end_lsn = pq_getmsgint64(&s);
- send_time = pq_getmsgint64(&s);
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
- if (last_received < start_lsn)
- last_received = start_lsn;
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time = pq_getmsgint64(&s);
- if (last_received < end_lsn)
- last_received = end_lsn;
+ if (last_received < start_lsn)
+ last_received = start_lsn;
- UpdateWorkerStats(last_received, send_time, false);
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- apply_dispatch(&s);
- }
- else if (c == 'k')
- {
- XLogRecPtr end_lsn;
- TimestampTz timestamp;
- bool reply_requested;
+ UpdateWorkerStats(last_received, send_time, false);
- end_lsn = pq_getmsgint64(&s);
- timestamp = pq_getmsgint64(&s);
- reply_requested = pq_getmsgbyte(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr end_lsn;
+ TimestampTz timestamp;
+ bool reply_requested;
- if (last_received < end_lsn)
- last_received = end_lsn;
+ end_lsn = pq_getmsgint64(&s);
+ timestamp = pq_getmsgint64(&s);
+ reply_requested = pq_getmsgbyte(&s);
- send_feedback(last_received, reply_requested, false);
- UpdateWorkerStats(last_received, timestamp, true);
- }
- /* other message types are purposefully ignored */
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- MemoryContextReset(ApplyMessageContext);
+ send_feedback(last_received, reply_requested, false);
+ UpdateWorkerStats(last_received, timestamp, true);
}
+ /* other message types are purposefully ignored */
+ MemoryContextReset(ApplyMessageContext);
+ }
+ if (prefetch_buf_pos < prefetch_buf_used)
+ {
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ buf = &prefetch_buf[prefetch_buf_pos + 4];
+ prefetch_buf_pos += 4 + len;
+ }
+ else if (prefetch_buf_used != 0 && no_more_data)
+ {
+ break;
+ }
+ else
+ {
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..3b254898663 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
+#include "replication/worker_internal.h"
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"prefetch_replica_identity_only",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."),
+ NULL,
+ },
+ &prefetch_replica_identity_only,
+ true,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"max_parallel_prefetch_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel prefetch workers per subscription."),
+ NULL,
+ },
+ &max_parallel_prefetch_workers_per_subscription,
+ 2, 0, MAX_LR_PREFETCH_WORKERS,
+ NULL, NULL, NULL
+ },
+
{
{"max_active_replication_origins",
PGC_POSTMASTER,
diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h
index 0b534e30603..88f5d2e4cc5 100644
--- a/src/include/nodes/lockoptions.h
+++ b/src/include/nodes/lockoptions.h
@@ -56,6 +56,10 @@ typedef enum LockTupleMode
LockTupleNoKeyExclusive,
/* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
LockTupleExclusive,
+ /* Do not lock tuple */
+ LockTupleNoLock,
+ /* Try explusive lock, silent give up in case of conflict */
+ LockTupleTryExclusive,
} LockTupleMode;
#endif /* LOCKOPTIONS_H */
diff --git a/src/include/port/pg_iovec.h b/src/include/port/pg_iovec.h
index 90be3af449d..8fefeb8c245 100644
--- a/src/include/port/pg_iovec.h
+++ b/src/include/port/pg_iovec.h
@@ -53,6 +53,7 @@ struct iovec
static inline ssize_t
pg_preadv(int fd, const struct iovec *iov, int iovcnt, off_t offset)
{
+ pg_usleep(100);
#if HAVE_DECL_PREADV
/*
* Avoid a small amount of argument copying overhead in the kernel if
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f3305..19d1a8d466b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..c6745e77efc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
*/
PartialFileSetState fileset_state;
FileSet fileset;
+
+ /*
+ * Prefetch worker
+ */
+ bool do_prefetch;
} ParallelApplyWorkerShared;
/*
@@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
@@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType action);
+
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
#define isTablesyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void);
static inline bool
am_tablesync_worker(void)
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends. The problem is not logical replication
specific: physical replication stream is also handled by single
walreceiver. But for physical replication Postgres now implements
prefetch: looking at WAL record blocks it is quite easy to predict which
pages will be required for redo and prefetch them. With logical
replication situation is much more complicated.My first idea was to implement parallel apply of transactions. But to do
it we need to track dependencies between transactions. Right now
Postgres can apply transactions in parallel, but only if they are
streamed (which is done only for large transactions) and serialize them
by commits. It is possible to enforce parallel apply of short
transactions using `debug_logical_replication_streaming` but then
performance is ~2x times slower than in case of sequential apply by
single worker.
What is the reason of such a large slow down? Is it because the amount
of network transfer has increased without giving any significant
advantage because of the serialization of commits?
By removing serialization by commits, it is possible to
speedup apply 3x times and make subscriber apply changes faster then
producer can produce them even with multiple clients. But it is possible
only if transactions are independent and it can be enforced only by
tracking dependencies which seems to be very non-trivial and invasive.I still do not completely give up with tracking dependencies approach,
but decided first to try more simple solution - prefetching.
Sounds reasonable, but in the long run, we should track transaction
dependencies and allow parallel apply of all the transactions.
It is
already used for physical replication. Certainly in case of physical
replication it is much simpler, because each WAL record contains list of
accessed blocks.In case of logical replication prefetching can be done either by
prefetching access to replica identity index (usually primary key),
either by executing replication command by some background worker
Certainly first case is much more easy.
It seems there is only one case described, so what exactly are you
referring to first and second?
We just perform index lookup in
prefetch worker and it loads accessed index and heap pages in shared
buffer, so main apply worker does not need to read something from disk.
But it works well only for DELETE and HOT UPDATE operations.In the second case we normally execute the LR command in background
worker and then abort transaction. Certainly in this case we are doing
the same work twice. But assumption is the same: parallel prefetch
workers should load affected pages, speeding up work of the main apply
worker.I have implemented some PoC (see attached patch). And get first results
of efficiency of such prefetching.*** First scenario (update-only).
Publisher:
```
create table t(pk integer primary key, counter integer, filler text
default repeat('x', 1000)) with (fillfactor=10);
insert into t values (generate_series(1,100000), 0);
create publication pub1 for table t;
```Subscriber:
```
create table t(pk integer primary key, counter integer, filler text
default repeat('x', 1000)) with (fillfactor=10);
create subscription sub1 connection 'port=54321 dbname=postgres'
publication pub1;
```Then I wait until replication is synced, stop subscriber and do random
dot updates in 10 sessions at publisher:```
pgbench -T 100 -c 10 -M prepared -n -f update.sql -p 54321 -d postgres
```where update.sql is:
```
\set pk random(1, 100000)
update t set counter=counter+1 where pk=:pk;
```Then I start subscriber and measure how much time is needed for it to
caught up.
Results:no prefetch: 2:00 min
prefetch (replica identity only): 0:55 min
prefetch (all): 1:10 minThis is definitely the best case for replica-identity index only
prefetch (update-only and no other indexes).
How to interpret this results?Without prefetch applying updates takes about two times more at
subscriber than performing this updates at publisher.
It means that under huge workload subscriber has no chances to caught up.With prefetching replica identity index, apply time is even smaller than
time needed to perform updates at publisher.
Performing the whole operation and transaction abort certainly adds more
overhead. But still improvement is quite significant.Please also notice that this results were obtains at the system with
larger amount of RAM (64Gb) and fast SSD.
With data set not fitting in RAM and much slower disks, the difference
is expected to be more significant.
But what about worst cases where these additional pre-fetches could
lead to removing some pages from shared_buffers, which are required by
the workload on the subscriber? I think you try such workloads as
well.
I have tried to simulate it be adding 0.1msec delay to pg_preadv.
When I add artificial 0.1msec `preadv` delay, I got the following results:no prefetch: 7:40
prefetch (replica identity only): 3:10 min
prefetch (all): 3:09In this case apply takes much more time than 100 seconds during which
updates are performed at publisher. Prefetch can improve speed about two
times,
but it doesn't allow subcriber to caught-up.
...
Please notice that this approach requires minimal changes in Postgres,
because all infrastructure of parallel apply workers is already present
and we can reuse the same apply code (with minimal changes) for
performing prefetch. I only have to introduce extra tuple lock types
(no-lock and try-lock) to minimize overhead and lock conflicts between
prefetch and main apply workers. Still it can not completely prevent
locks conflicts and deadlocks in prefetch workers. Looks like more work
is needed here.
I understand that it is just a POC, so you haven't figured out all the
details, but it would be good to know the reason of these deadlocks.
I wonder if such LR prefetching approach is considered to be useful?
Or it is better to investigate other ways to improve LR apply speed
(parallel apply)?
I think it could be a good intermediate step till we are able to find
a solution for tracking the dependencies. Do you think this work will
be useful once we have parallel apply, and if so how?
--
With Regards,
Amit Kapila.
On 08/07/2025 2:51 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik<knizhnik@garret.ru> wrote:
It is possible to enforce parallel apply of short
transactions using `debug_logical_replication_streaming` but then
performance is ~2x times slower than in case of sequential apply by
single worker.What is the reason of such a large slow down? Is it because the amount
of network transfer has increased without giving any significant
advantage because of the serialization of commits?
No, I do not think that network traffic is somehow increased.
If I removed locks (just by commenting body of `pa_lock_stream` and
`pa_unlock_stream` functions and callof `pa_wait_for_xact_finish`), I
get 3x speed improvement (with 4 parallel apply workers) comparing with
normal mode
(when transactions are applied by main logical replication worker). So
the main reason is lock overhead/contention and de-facto serialization
of transactions (in `top` I see that only one worker is active most the
time.
Even with simulated 0.1msec read delay, results of update tests are the
following:
normal mode: 7:40
forced parallel mode: 8:30
forced parallel mode (no locks): 1:45
By removing serialization by commits, it is possible to
speedup apply 3x times and make subscriber apply changes faster then
producer can produce them even with multiple clients. But it is possible
only if transactions are independent and it can be enforced only by
tracking dependencies which seems to be very non-trivial and invasive.I still do not completely give up with tracking dependencies approach,
but decided first to try more simple solution - prefetching.Sounds reasonable, but in the long run, we should track transaction
dependencies and allow parallel apply of all the transactions.
I agree.
I see two different approaches:
1. Build dependency graph: track dependency between xids when
transaction is executed at publisher and then include this graph in
commit record.
2. Calculate hash of replica identity key and check that data sets of
transactions do no intersect (certainly will notwork if there are some
triggers).
It is
already used for physical replication. Certainly in case of physical
replication it is much simpler, because each WAL record contains list of
accessed blocks.In case of logical replication prefetching can be done either by
prefetching access to replica identity index (usually primary key),
either by executing replication command by some background worker
Certainly first case is much more easy.It seems there is only one case described, so what exactly are you
referring to first and second?
First: perform lookup in replica identity index (primary key). It will
prefetch index pages and referenced heap page.
Seconds: execute LR operation (insert/update) in prefetch worker and
then rollback transaction.
But what about worst cases where these additional pre-fetches could
lead to removing some pages from shared_buffers, which are required by
the workload on the subscriber? I think you try such workloads as
well.
It is the common problem of all prefetch algorithms: if size of cache
where prefetch results are stored (shared buffers, OS file cache,...) is
not larger enough to keep prefetch result until it will be used,
then prefetch will not provide any improvement of performance and may be
even cause some degradation.
So it is really challenged task to choose optimal time for prefetch
operation: too early - and its results will be thrown away before
requested, too late - executor has to wait prefetch completion or load
page itself. Certainly there is some kind of autotuning: worker
performing prefetch has to wait for IO completion and executor whichm
pickup page from cache process requests faster and so should catch up
prefetch workers. Then it has to perform IO itself and start fall behind
prefetch workers.
I understand that it is just a POC, so you haven't figured out all the
details, but it would be good to know the reason of these deadlocks.
Will investigate it.
I wonder if such LR prefetching approach is considered to be useful?
Or it is better to investigate other ways to improve LR apply speed
(parallel apply)?I think it could be a good intermediate step till we are able to find
a solution for tracking the dependencies. Do you think this work will
be useful once we have parallel apply, and if so how?
I think that if we will have parallel apply, prefetch is not needed.
At least that is what I see now in Neon (open source serverless Postgres
which separates compute and storage).
We have implemented prefetch for seqscan and indexscan because of
relatively large acess latency with page server. And it can really
significantly improve performance - 4 or even more times.
But the same effect cna be achieved by forcing parallel plan with larger
number of parallel workers. Unfortunately effect of this two
optimizations is not multiplied, so parallel plan + prefetch shows
almost the same speed as any of this optimizations.
On Wed, Jul 9, 2025 at 12:08 AM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
On 08/07/2025 2:51 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
It is possible to enforce parallel apply of short
transactions using `debug_logical_replication_streaming` but then
performance is ~2x times slower than in case of sequential apply by
single worker.What is the reason of such a large slow down? Is it because the amount
of network transfer has increased without giving any significant
advantage because of the serialization of commits?No, I do not think that network traffic is somehow increased.
If I removed locks (just by commenting body of `pa_lock_stream` and `pa_unlock_stream` functions and callof `pa_wait_for_xact_finish`), I get 3x speed improvement (with 4 parallel apply workers) comparing with normal mode
(when transactions are applied by main logical replication worker). So the main reason is lock overhead/contention and de-facto serialization of transactions (in `top` I see that only one worker is active most the time.Even with simulated 0.1msec read delay, results of update tests are the following:
normal mode: 7:40
forced parallel mode: 8:30
forced parallel mode (no locks): 1:45By removing serialization by commits, it is possible to
speedup apply 3x times and make subscriber apply changes faster then
producer can produce them even with multiple clients. But it is possible
only if transactions are independent and it can be enforced only by
tracking dependencies which seems to be very non-trivial and invasive.I still do not completely give up with tracking dependencies approach,
but decided first to try more simple solution - prefetching.Sounds reasonable, but in the long run, we should track transaction
dependencies and allow parallel apply of all the transactions.I agree.
I see two different approaches:1. Build dependency graph: track dependency between xids when transaction is executed at publisher and then include this graph in commit record.
2. Calculate hash of replica identity key and check that data sets of transactions do no intersect (certainly will notwork if there are some triggers).
I think it is better to compute transaction dependencies on the
subscriber side because there could be many transactions that could be
filtered because the containing tables or rows are not published.
But what about worst cases where these additional pre-fetches could
lead to removing some pages from shared_buffers, which are required by
the workload on the subscriber? I think you try such workloads as
well.It is the common problem of all prefetch algorithms: if size of cache where prefetch results are stored (shared buffers, OS file cache,...) is not larger enough to keep prefetch result until it will be used,
then prefetch will not provide any improvement of performance and may be even cause some degradation.
So it is really challenged task to choose optimal time for prefetch operation: too early - and its results will be thrown away before requested, too late - executor has to wait prefetch completion or load page itself. Certainly there is some kind of autotuning: worker performing prefetch has to wait for IO completion and executor whichm pickup page from cache process requests faster and so should catch up prefetch workers. Then it has to perform IO itself and start fall behind prefetch workers.I understand that it is just a POC, so you haven't figured out all the
details, but it would be good to know the reason of these deadlocks.Will investigate it.
I wonder if such LR prefetching approach is considered to be useful?
Or it is better to investigate other ways to improve LR apply speed
(parallel apply)?I think it could be a good intermediate step till we are able to find
a solution for tracking the dependencies. Do you think this work will
be useful once we have parallel apply, and if so how?I think that if we will have parallel apply, prefetch is not needed.
At least that is what I see now in Neon (open source serverless Postgres which separates compute and storage).
We have implemented prefetch for seqscan and indexscan because of relatively large acess latency with page server. And it can really significantly improve performance - 4 or even more times.
But the same effect cna be achieved by forcing parallel plan with larger number of parallel workers. Unfortunately effect of this two optimizations is not multiplied, so parallel plan + prefetch shows almost the same speed as any of this optimizations.
I think our case is a bit different, and prefetch could even be used
when we are able to track dependencies and achieve true parallelism.
We can consider using prefetch to speed up dependent transactions that
can't be parallelized.
--
With Regards,
Amit Kapila.
On 11/07/2025 11:52 am, Amit Kapila wrote:
On Wed, Jul 9, 2025 at 12:08 AM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
On 08/07/2025 2:51 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
It is possible to enforce parallel apply of short
transactions using `debug_logical_replication_streaming` but then
performance is ~2x times slower than in case of sequential apply by
single worker.What is the reason of such a large slow down? Is it because the amount
of network transfer has increased without giving any significant
advantage because of the serialization of commits?No, I do not think that network traffic is somehow increased.
If I removed locks (just by commenting body of `pa_lock_stream` and `pa_unlock_stream` functions and callof `pa_wait_for_xact_finish`), I get 3x speed improvement (with 4 parallel apply workers) comparing with normal mode
(when transactions are applied by main logical replication worker). So the main reason is lock overhead/contention and de-facto serialization of transactions (in `top` I see that only one worker is active most the time.Even with simulated 0.1msec read delay, results of update tests are the following:
normal mode: 7:40
forced parallel mode: 8:30
forced parallel mode (no locks): 1:45By removing serialization by commits, it is possible to
speedup apply 3x times and make subscriber apply changes faster then
producer can produce them even with multiple clients. But it is possible
only if transactions are independent and it can be enforced only by
tracking dependencies which seems to be very non-trivial and invasive.I still do not completely give up with tracking dependencies approach,
but decided first to try more simple solution - prefetching.Sounds reasonable, but in the long run, we should track transaction
dependencies and allow parallel apply of all the transactions.I agree.
I see two different approaches:1. Build dependency graph: track dependency between xids when transaction is executed at publisher and then include this graph in commit record.
2. Calculate hash of replica identity key and check that data sets of transactions do no intersect (certainly will notwork if there are some triggers).I think it is better to compute transaction dependencies on the
subscriber side because there could be many transactions that could be
filtered because the containing tables or rows are not published.
It is certainly true. Also tracking dependencies at subscriber doesn't
require to change protocol and makes it possible to do parallel apply
for old publishers.
But from the other hand it seems to be too late. It will be nice that
just after receiving first transaction statement, apply process can make
a decision to which parallel apply worker it should be sent.
At subscriber side it seems to be easier to calculate hash of replica
identity key and based on this hash (or more precisely the set of hashes
representing transaction working set) make a decision whther transaction
interleave with some prior transaction or not and schedule it accordingly.
But what about worst cases where these additional pre-fetches could
lead to removing some pages from shared_buffers, which are required by
the workload on the subscriber? I think you try such workloads as
well.It is the common problem of all prefetch algorithms: if size of cache where prefetch results are stored (shared buffers, OS file cache,...) is not larger enough to keep prefetch result until it will be used,
then prefetch will not provide any improvement of performance and may be even cause some degradation.
So it is really challenged task to choose optimal time for prefetch operation: too early - and its results will be thrown away before requested, too late - executor has to wait prefetch completion or load page itself. Certainly there is some kind of autotuning: worker performing prefetch has to wait for IO completion and executor whichm pickup page from cache process requests faster and so should catch up prefetch workers. Then it has to perform IO itself and start fall behind prefetch workers.I understand that it is just a POC, so you haven't figured out all the
details, but it would be good to know the reason of these deadlocks.Will investigate it.
I found the reason: conflict happen between main apply worker and
prefetch worker which was able to catch-up main worker and so they both
are trying to apply the same statement.
I fixed the problem by adding extra parameter to
ExecSimpleRelationUpdate/Insert and handle prefetch as some kind of
speculative operation.
With this change results for insert test are the following:
no prefetch: 10 min
prefetch (identity): 8 min
prefetch (full): 3 min
I think our case is a bit different, and prefetch could even be used
when we are able to track dependencies and achieve true parallelism.
We can consider using prefetch to speed up dependent transactions that
can't be parallelized.
Make sense.
On 08/07/2025 2:51 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends. The problem is not logical replication
specific: physical replication stream is also handled by single
walreceiver. But for physical replication Postgres now implements
prefetch: looking at WAL record blocks it is quite easy to predict which
pages will be required for redo and prefetch them. With logical
replication situation is much more complicated.My first idea was to implement parallel apply of transactions. But to do
it we need to track dependencies between transactions. Right now
Postgres can apply transactions in parallel, but only if they are
streamed (which is done only for large transactions) and serialize them
by commits. It is possible to enforce parallel apply of short
transactions using `debug_logical_replication_streaming` but then
performance is ~2x times slower than in case of sequential apply by
single worker.What is the reason of such a large slow down? Is it because the amount
of network transfer has increased without giving any significant
advantage because of the serialization of commits?
It is not directly related with subj, but I do not understand this code:
```
/*
* Stop the worker if there are enough workers in the pool.
*
* XXX Additionally, we also stop the worker if the leader apply worker
* serialize part of the transaction data due to a send timeout.
This is
* because the message could be partially written to the queue and
there
* is no way to clean the queue other than resending the message
until it
* succeeds. Instead of trying to send the data which anyway would have
* been serialized and then letting the parallel apply worker deal with
* the spurious message, we stop the worker.
*/
if (winfo->serialize_changes ||
list_length(ParallelApplyWorkerPool) >
(max_parallel_apply_workers_per_subscription / 2))
{
logicalrep_pa_worker_stop(winfo);
pa_free_worker_info(winfo);
return;
}
```
It stops worker if number fo workers in pool is more than half of
`max_parallel_apply_workers_per_subscription`.
What I see is that `pa_launch_parallel_worker` spawns new workers and
after completion of transaction it is immediately terminated.
Actually this leads to awful slowdown of apply process.
If I just disable and all
`max_parallel_apply_workers_per_subscription`are actually used for
applying transactions, then time of parallel apply with 4 workers is 6
minutes comparing with 10 minutes fr applying all transactions by main
workers. It is still not so larger improvement, but at least it is
improvement and not degradation.
On Fri, Jul 11, 2025 at 7:49 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
On 08/07/2025 2:51 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends. The problem is not logical replication
specific: physical replication stream is also handled by single
walreceiver. But for physical replication Postgres now implements
prefetch: looking at WAL record blocks it is quite easy to predict which
pages will be required for redo and prefetch them. With logical
replication situation is much more complicated.My first idea was to implement parallel apply of transactions. But to do
it we need to track dependencies between transactions. Right now
Postgres can apply transactions in parallel, but only if they are
streamed (which is done only for large transactions) and serialize them
by commits. It is possible to enforce parallel apply of short
transactions using `debug_logical_replication_streaming` but then
performance is ~2x times slower than in case of sequential apply by
single worker.What is the reason of such a large slow down? Is it because the amount
of network transfer has increased without giving any significant
advantage because of the serialization of commits?It is not directly related with subj, but I do not understand this code:
```
/*
* Stop the worker if there are enough workers in the pool.
*
* XXX Additionally, we also stop the worker if the leader apply worker
* serialize part of the transaction data due to a send timeout.
This is
* because the message could be partially written to the queue and
there
* is no way to clean the queue other than resending the message
until it
* succeeds. Instead of trying to send the data which anyway would have
* been serialized and then letting the parallel apply worker deal with
* the spurious message, we stop the worker.
*/
if (winfo->serialize_changes ||
list_length(ParallelApplyWorkerPool) >
(max_parallel_apply_workers_per_subscription / 2))
{
logicalrep_pa_worker_stop(winfo);
pa_free_worker_info(winfo);return;
}
```It stops worker if number fo workers in pool is more than half of
`max_parallel_apply_workers_per_subscription`.
What I see is that `pa_launch_parallel_worker` spawns new workers and
after completion of transaction it is immediately terminated.
Actually this leads to awful slowdown of apply process.
I didn't understand your scenario. pa_launch_parallel_worker() should
spawn a new worker only if all the workers in the pool are busy, and
then it will free the worker if the pool already has enough workers.
So, do you mean to say that the workers in the pool are always busy in
your workload which lead spawn/exit of new workers? Can you please
explain your scenario in some more detail?
--
With Regards,
Amit Kapila.
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends.
BTW, do you know how users deal with this lag? For example, one can
imagine creating multiple pub-sub pairs for different sets of tables
so that the workload on the subscriber could also be shared by
multiple apply workers. I can also think of splitting the workload
among multiple pub-sub pairs by using row filters.
--
With Regards,
Amit Kapila.
On 13/07/2025 9:28 am, Amit Kapila wrote:
I didn't understand your scenario. pa_launch_parallel_worker() should
spawn a new worker only if all the workers in the pool are busy, and
then it will free the worker if the pool already has enough workers.
So, do you mean to say that the workers in the pool are always busy in
your workload which lead spawn/exit of new workers? Can you please
explain your scenario in some more detail?
Current LR apply logic is not working well for applying small OLTP
transactions.
First of all by default reorder buffer at publisher will buffer them and
so prevent parallel apply at subscriber.
Publisher switches to streaming mode only if transaction is too large or
`debug_logical_replication_streaming=immediate`.
But even if we force publisher to stream short transactions, subscriber
will try to launch new parallel apply worker for each transactions (if
all existed workers are busy).
If there are 100 active backends at publisher, then subscriber will try
to launch 100 parallel apply workers.
Most likely it fails because of limit for maximal number of workers. In
this case leader will serialize such transactions.
So if there are 100 streamed transactions and 10 parallel apply workers,
then 10 transactions are started in parallel and 90 will be serialized
to disk.
It seems to be not so efficient for short transaction. It is better to
wait for some time until some of workers become vacant.
But the worst thing happen when parallel apply worker completes its
transactions. If number of parallel apply workers in pool exceeds
`max_parallel_apply_workers_per_subscription / 2`,
then this parallel apply worker is terminated. So instead of having
`max_parallel_apply_workers_per_subscription` workers applying
transactions at maximal possible speed and leader
which distributes transaction between them and stops receiving new data
from publisher if there is no vacant worker, we will have leader
serializing and writing transactions to the disk
(and then definitely reading them from the disk) and permanently
starting and terminating parallel apply worker processes. It leads to
awful performance.
Certainly originally intended use case was different: parallel apply is
performed only for large transactions. Number of of such transactions is
not so big and
so there should be enough parallel apply workers in pool to proceed
them. And if there are not enough workers, it is not a problem to spawn
new one and terminate
it after completion of transaction (because transaction is long,
overhead of spawning process is not so larger comparing with redo of
large transaction).
But if we want to efficiently replicate OLTP workload, then we
definitely need some other approach.
Prefetch is actually more compatible with current implementation because
prefetch operations don't need to be grouped by transaction and can be
executed by any prefetch worker.
On 13/07/2025 1:28 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends.BTW, do you know how users deal with this lag? For example, one can
imagine creating multiple pub-sub pairs for different sets of tables
so that the workload on the subscriber could also be shared by
multiple apply workers. I can also think of splitting the workload
among multiple pub-sub pairs by using row filters
Yes, I saw that users starts several subscriptions/publications to
receive and apply changes in parallel.
But it can not be considered as universal solution:
1. Not always there are multiple tables (or partitions of one one table)
so that it it possible to split them between multiple publications.
2. It violates transactional behavior (consistency): if transactions
update several tables included in different publications then applying
this changes independently, we can observe at replica behaviour when one
table is update - and another - not. The same is true for row filters.
3. Each walsender will have to scan WAL, so having N subscriptions we
have to read and decode WAL N times.
On Tuesday, July 8, 2025 2:36 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber can
not caught-up with publisher just because LR changes are applied by single
worker and at publisher changes are made by multiple concurrent backends.
The problem is not logical replication
specific: physical replication stream is also handled by single walreceiver. But
for physical replication Postgres now implements
prefetch: looking at WAL record blocks it is quite easy to predict which pages
will be required for redo and prefetch them. With logical replication situation is
much more complicated.My first idea was to implement parallel apply of transactions. But to do it we
need to track dependencies between transactions. Right now Postgres can
apply transactions in parallel, but only if they are streamed (which is done
only for large transactions) and serialize them by commits. It is possible to
enforce parallel apply of short transactions using
`debug_logical_replication_streaming` but then performance is ~2x times
slower than in case of sequential apply by single worker. By removing
serialization by commits, it is possible to speedup apply 3x times and make
subscriber apply changes faster then producer can produce them even with
multiple clients. But it is possible only if transactions are independent and it
can be enforced only by tracking dependencies which seems to be very
non-trivial and invasive.I still do not completely give up with tracking dependencies approach, but
decided first to try more simple solution - prefetching. It is already used for
physical replication. Certainly in case of physical replication it is much simpler,
because each WAL record contains list of accessed blocks.In case of logical replication prefetching can be done either by prefetching
access to replica identity index (usually primary key), either by executing
replication command by some background worker Certainly first case is much
more easy. We just perform index lookup in prefetch worker and it loads
accessed index and heap pages in shared buffer, so main apply worker does
not need to read something from disk.
But it works well only for DELETE and HOT UPDATE operations.In the second case we normally execute the LR command in background
worker and then abort transaction. Certainly in this case we are doing the same
work twice. But assumption is the same: parallel prefetch workers should load
affected pages, speeding up work of the main apply worker.I have implemented some PoC (see attached patch). And get first results of
efficiency of such prefetching.*** First scenario (update-only).
Publisher:
```
create table t(pk integer primary key, counter integer, filler text default repeat('x',
1000)) with (fillfactor=10); insert into t values (generate_series(1,100000), 0);
create publication pub1 for table t; ```Subscriber:
```
create table t(pk integer primary key, counter integer, filler text default repeat('x',
1000)) with (fillfactor=10); create subscription sub1 connection 'port=54321
dbname=postgres'
publication pub1;
```Then I wait until replication is synced, stop subscriber and do random dot
updates in 10 sessions at publisher:```
pgbench -T 100 -c 10 -M prepared -n -f update.sql -p 54321 -d postgres ```where update.sql is:
```
\set pk random(1, 100000)
update t set counter=counter+1 where pk=:pk; ```Then I start subscriber and measure how much time is needed for it to caught
up.
Results:no prefetch: 2:00 min
prefetch (replica identity only): 0:55 min prefetch (all): 1:10 minThis is definitely the best case for replica-identity index only prefetch
(update-only and no other indexes).
How to interpret this results?Without prefetch applying updates takes about two times more at subscriber
than performing this updates at publisher.
It means that under huge workload subscriber has no chances to caught up.With prefetching replica identity index, apply time is even smaller than time
needed to perform updates at publisher.
Performing the whole operation and transaction abort certainly adds more
overhead. But still improvement is quite significant.Please also notice that this results were obtains at the system with larger
amount of RAM (64Gb) and fast SSD.
With data set not fitting in RAM and much slower disks, the difference is
expected to be more significant.
I have tried to simulate it be adding 0.1msec delay to pg_preadv.
When I add artificial 0.1msec `preadv` delay, I got the following results:no prefetch: 7:40
prefetch (replica identity only): 3:10 min prefetch (all): 3:09In this case apply takes much more time than 100 seconds during which
updates are performed at publisher. Prefetch can improve speed about two
times, but it doesn't allow subcriber to caught-up.*** Second scenario: inserts with secondary random key.
Publisher:
```
create table t(pk serial primary key, sk integer, counter integer default 0) insert
into t (sk) select random()*10000000 from generate_series(1,10000000) create
index on t(sk) create publication pub1 for table t ```Subscriber:
```create table t(pk integer primary key, sk integer, counter integer) create index on
t(sk) create subscription sub1 connection 'port=54321 dbname=postgres'
publication pub1
```workload:
```
pgbench -T 100 -c 10 -M prepared -n -f insert.sql -p 54321 -d postgres```
where insert.sql:
```
INSERT INTO t (sk) VALUES (random()*10000000); ```Results (with 0.1msec delay) are the followingL
no prefetch: 10:10 min
prefetch (identity): 8:25 min
prefetch (full): 5:50minHere as expected prefetching only primary key doesn't provide some big
improvement. But replaying insert command in prefetch worker allows to
speedup apply almost twice.Please notice that this approach requires minimal changes in Postgres,
because all infrastructure of parallel apply workers is already present and we
can reuse the same apply code (with minimal changes) for performing prefetch.
I only have to introduce extra tuple lock types (no-lock and try-lock) to minimize
overhead and lock conflicts between prefetch and main apply workers. Still it
can not completely prevent locks conflicts and deadlocks in prefetch workers.
Looks like more work is needed here. Also I set `wal_level=minimal` in
prefetch workers to avoid WAL-logging overhead.Number of prefetch workers is specified by
`max_parallel_prefetch_workers_per_subscription` GUC. If it is zero
(default) then no prefetching is performed.
Prefetch mode is controlled by `prefetch_replica_identity_only` GUC . By
default it is true which makes prefetch efficient for hot updates, deletes or
inserts in table with just one index (primary key).Attached please find patch and two shell scripts used to produce this test
results.
Also it may be more convenient to inspect this patch as PR:
https://github.com/knizhnik/postgres/pull/3I wonder if such LR prefetching approach is considered to be useful?
Or it is better to investigate other ways to improve LR apply speed (parallel
apply)?
Thank you for the proposal ! I find it to be a very interesting feature。
I tested the patch you shared in your original email and encountered potential
deadlocks when testing pgbench TPC-B like workload. Could you please provide an
updated patch version so that I can conduct further performance experiments ?
Additionally, I was also exploring ways to improve performance and have tried an
alternative version of prefetch for experimentation. The alternative design is
that we assigns each non-streaming transaction to a parallel apply worker, while
strictly maintaining the order of commits. During parallel apply, if the
transactions that need to be committed before the current transaction are not
yet finished, the worker performs pre-fetch operations. Specifically, for
updates and deletes, the worker finds and caches the target local tuple to be
updated/deleted. Once all preceding transactions are committed, the parallel
apply worker uses these cached tuples to execute the actual updates or deletes.
What do you think about this alternative ? I think the alternative might offer
more stability in scenarios where shared buffer elimination occurs frequently
and avoids leaving dead tuples in the buffer. However, it also presents some
drawbacks, such as the need to add wait events to maintain commit order,
compared to the approach discussed in this thread.
(Note that, due to time constraints, I have not implemented the pre-fetch for
Inserts and the code is not in reviewable shape and lacks comments and
documentation, but just share the POC patch for reference).
Best Regards,
Hou zj
Attachments:
vPOC-0001-Apply-non-streaming-transactions-in-parallel-ap.patchapplication/octet-stream; name=vPOC-0001-Apply-non-streaming-transactions-in-parallel-ap.patchDownload
From 1073207ed7838061a648b9b157f9422a08e010aa Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 11 Jun 2025 11:42:26 +0800
Subject: [PATCH vPOC] Apply non-streaming transactions in parallel apply
worker
---
src/backend/executor/execReplication.c | 39 +-
.../replication/logical/applyparallelworker.c | 427 +++++++++-
src/backend/replication/logical/proto.c | 35 +
src/backend/replication/logical/relation.c | 36 +-
src/backend/replication/logical/worker.c | 747 ++++++++++++++++--
src/include/executor/executor.h | 9 +-
src/include/replication/logicalproto.h | 3 +
src/include/replication/logicalrelation.h | 3 +
src/include/replication/worker_internal.h | 28 +-
src/test/subscription/t/010_truncate.pl | 2 +-
src/test/subscription/t/026_stats.pl | 1 +
src/test/subscription/t/027_nosuperuser.pl | 1 +
12 files changed, 1203 insertions(+), 128 deletions(-)
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..fdf73ba264f 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -139,6 +139,12 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
{
case TM_Ok:
break;
+ case TM_SelfModified:
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be updated was already modified in the current transaction")));
+ refetch = true;
+ break;
case TM_Updated:
/* XXX: Improve handling here */
if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
@@ -179,7 +185,8 @@ bool
RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
LockTupleMode lockmode,
TupleTableSlot *searchslot,
- TupleTableSlot *outslot)
+ TupleTableSlot *outslot,
+ bool locktuple)
{
ScanKeyData skey[INDEX_MAX_KEYS];
int skey_attoff;
@@ -246,7 +253,7 @@ retry:
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && locktuple)
{
TM_FailureData tmfd;
TM_Result res;
@@ -353,7 +360,8 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
*/
bool
RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
- TupleTableSlot *searchslot, TupleTableSlot *outslot)
+ TupleTableSlot *searchslot, TupleTableSlot *outslot,
+ bool locktuple)
{
TupleTableSlot *scanslot;
TableScanDesc scan;
@@ -404,7 +412,7 @@ retry:
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && locktuple)
{
TM_FailureData tmfd;
TM_Result res;
@@ -431,6 +439,29 @@ retry:
return found;
}
+bool
+RelationLockTuple(Relation rel, LockTupleMode lockmode,
+ TupleTableSlot *searchslot, TupleTableSlot *outslot,
+ CommandId cid)
+{
+ TM_FailureData tmfd;
+ TM_Result res;
+
+ PushActiveSnapshot(GetLatestSnapshot());
+
+ res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
+ outslot,
+ cid,
+ lockmode,
+ LockWaitBlock,
+ 0 /* don't follow updates */ ,
+ &tmfd);
+
+ PopActiveSnapshot();
+
+ return !should_refetch_tuple(res, &tmfd);
+}
+
/*
* Build additional index information necessary for conflict detection.
*/
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..0dd001e3623 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -209,6 +209,8 @@
#define PARALLEL_APPLY_LOCK_STREAM 0
#define PARALLEL_APPLY_LOCK_XACT 1
+#define PARALLEL_APPLY_INIT_RELATION 'r'
+
/*
* Hash table entry to map xid to the parallel apply worker state.
*/
@@ -216,8 +218,14 @@ typedef struct ParallelApplyWorkerEntry
{
TransactionId xid; /* Hash key -- must be first */
ParallelApplyWorkerInfo *winfo;
+ XLogRecPtr local_end;
} ParallelApplyWorkerEntry;
+typedef struct ParallelApplyCommitSeq
+{
+ pg_atomic_uint64 committable_seq_num;
+} ParallelApplyCommitSeq;
+
/*
* A hash table used to cache the state of streaming transactions being applied
* by the parallel apply workers.
@@ -254,9 +262,18 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
/* A list to maintain subtransactions, if any. */
static List *subxactlist = NIL;
+static dsm_segment *commit_seq_seg = NULL;
+static ParallelApplyCommitSeq *pa_commit_seq = NULL;
+
+static TransactionId last_parallelized_xid = InvalidTransactionId;
+static uint64 last_parallelized_commit_seq = 0;
+
+static dsm_handle pa_get_dsm_for_commit_seq(void);
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
+static bool pa_transaction_committed(uint64 commit_seq_num);
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
static PartialFileSetState pa_get_fileset_state(void);
+static void pa_init_relmap_cache(StringInfo s);
/*
* Returns true if it is OK to start a parallel apply worker, false otherwise.
@@ -334,6 +351,12 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
shm_mq *mq;
Size queue_size = DSM_QUEUE_SIZE;
Size error_queue_size = DSM_ERROR_QUEUE_SIZE;
+ dsm_handle commit_seq_handle;
+
+ commit_seq_handle = pa_get_dsm_for_commit_seq();
+
+ if (commit_seq_handle == DSM_HANDLE_INVALID)
+ return false;
/*
* Estimate how much shared memory we need.
@@ -364,11 +387,12 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
/* Set up the header region. */
shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
SpinLockInit(&shared->mutex);
-
+ shared->xid = InvalidTransactionId;
shared->xact_state = PARALLEL_TRANS_UNKNOWN;
pg_atomic_init_u32(&(shared->pending_stream_count), 0);
shared->last_commit_end = InvalidXLogRecPtr;
shared->fileset_state = FS_EMPTY;
+ shared->commit_seq_handle = commit_seq_handle;
shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
@@ -396,6 +420,24 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
return true;
}
+static dsm_handle
+pa_get_dsm_for_commit_seq(void)
+{
+ if (commit_seq_seg)
+ return dsm_segment_handle(commit_seq_seg);
+
+ /* Create the shared memory segment and establish a table of contents. */
+ commit_seq_seg = dsm_create(sizeof(ParallelApplyCommitSeq), 0);
+ if (!commit_seq_seg)
+ return DSM_HANDLE_INVALID;
+
+ pa_commit_seq = (ParallelApplyCommitSeq *) dsm_segment_address(commit_seq_seg);
+
+ pg_atomic_init_u64(&(pa_commit_seq->committable_seq_num), 0);
+
+ return dsm_segment_handle(commit_seq_seg);
+}
+
/*
* Try to get a parallel apply worker from the pool. If none is available then
* start a new one.
@@ -413,10 +455,29 @@ pa_launch_parallel_worker(void)
{
winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
- if (!winfo->in_use)
+ if (!winfo->stream_txn &&
+ pa_transaction_committed(winfo->shared->commit_seq_num))
+ {
+ /*
+ * Save the local commit lsn of the last transaction that was
+ * applied by this worker. We need to collect this info to
+ * determine the flush position to reply to the publisher (See
+ * get_flush_position()).
+ */
+ (void) pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+ return winfo;
+ }
+
+ if (winfo->stream_txn && !winfo->in_use)
return winfo;
}
+ pa_get_dsm_for_commit_seq();
+
+ if (list_length(ParallelApplyWorkerPool) ==
+ max_parallel_apply_workers_per_subscription)
+ return NULL;
+
/*
* Start a new parallel apply worker.
*
@@ -445,10 +506,23 @@ pa_launch_parallel_worker(void)
if (launched)
{
+ StringInfoData out;
+
+ elog(DEBUG1, "started new pa worker");
ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
+
+ initStringInfo(&out);
+ appendStringInfoChar(&out, PARALLEL_APPLY_INIT_RELATION);
+ logicalrep_write_all_rels(&out);
+
+ if (out.len > 1)
+ pa_send_data(winfo, out.len, out.data);
+
+ pfree(out.data);
}
else
{
+ elog(DEBUG1, "failed to start pa worker");
pa_free_worker_info(winfo);
winfo = NULL;
}
@@ -467,7 +541,7 @@ pa_launch_parallel_worker(void)
* streaming changes.
*/
void
-pa_allocate_worker(TransactionId xid)
+pa_allocate_worker(TransactionId xid, bool stream_txn)
{
bool found;
ParallelApplyWorkerInfo *winfo = NULL;
@@ -504,11 +578,21 @@ pa_allocate_worker(TransactionId xid)
SpinLockAcquire(&winfo->shared->mutex);
winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
winfo->shared->xid = xid;
+ winfo->shared->commit_seq_num = last_parallelized_commit_seq;
+ winfo->shared->wait_for_xid = last_parallelized_xid;
SpinLockRelease(&winfo->shared->mutex);
winfo->in_use = true;
winfo->serialize_changes = false;
+ winfo->stream_txn = stream_txn;
entry->winfo = winfo;
+ entry->local_end = InvalidXLogRecPtr;
+
+ if (!stream_txn)
+ {
+ last_parallelized_xid = xid;
+ last_parallelized_commit_seq++;
+ }
}
/*
@@ -542,6 +626,57 @@ pa_find_worker(TransactionId xid)
return NULL;
}
+XLogRecPtr
+pa_get_last_commit_end(TransactionId xid, bool delete_entry, bool *skipped_write)
+{
+ bool found;
+ ParallelApplyWorkerEntry *entry;
+ ParallelApplyWorkerInfo *winfo;
+
+ Assert(TransactionIdIsValid(xid));
+
+ if (skipped_write)
+ *skipped_write = false;
+
+ /* Find an entry for the requested transaction. */
+ entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
+
+ if (!found)
+ return InvalidXLogRecPtr;
+
+ winfo = entry->winfo;
+
+ if (winfo == NULL)
+ {
+ if (delete_entry &&
+ !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+ elog(ERROR, "hash table corrupted");
+
+ if (skipped_write)
+ *skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+ return entry->local_end;
+ }
+
+ if (!pa_transaction_committed(winfo->shared->commit_seq_num))
+ return InvalidXLogRecPtr;
+
+ entry->local_end = winfo->shared->last_commit_end;
+ entry->winfo = NULL;
+
+ if (skipped_write)
+ *skipped_write = XLogRecPtrIsInvalid(entry->local_end);
+
+ elog(DEBUG1, "store local commit %X/%X end to txn entry: %u",
+ LSN_FORMAT_ARGS(entry->local_end), xid);
+
+ if (delete_entry &&
+ !hash_search(ParallelApplyTxnHash, &xid, HASH_REMOVE, NULL))
+ elog(ERROR, "hash table corrupted");
+
+ return entry->local_end;
+}
+
/*
* Makes the worker available for reuse.
*
@@ -557,7 +692,8 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
{
Assert(!am_parallel_apply_worker());
Assert(winfo->in_use);
- Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
+ Assert(!winfo->stream_txn ||
+ pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
elog(ERROR, "hash table corrupted");
@@ -573,9 +709,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
* been serialized and then letting the parallel apply worker deal with
* the spurious message, we stop the worker.
*/
- if (winfo->serialize_changes ||
- list_length(ParallelApplyWorkerPool) >
- (max_parallel_apply_workers_per_subscription / 2))
+ if (winfo->serialize_changes)
{
logicalrep_pa_worker_stop(winfo);
pa_free_worker_info(winfo);
@@ -705,6 +839,106 @@ pa_process_spooled_messages_if_required(void)
return true;
}
+void
+pa_advance_committable_seq_num(void)
+{
+ Assert(am_parallel_apply_worker());
+ pg_atomic_add_fetch_u64(&pa_commit_seq->committable_seq_num, 1);
+}
+
+static bool
+pa_transaction_committable(uint64 commit_seq_num)
+{
+ uint64 shared_seq_num;
+
+ shared_seq_num = pg_atomic_read_u64(&pa_commit_seq->committable_seq_num);
+
+ Assert(shared_seq_num <= commit_seq_num);
+
+ return commit_seq_num == shared_seq_num;
+}
+
+static bool
+pa_transaction_committed(uint64 commit_seq_num)
+{
+ return commit_seq_num <
+ pg_atomic_read_u64(&pa_commit_seq->committable_seq_num);
+}
+
+static void
+pa_wait_for_transaction(TransactionId wait_for_xid, uint64 commit_seq_num)
+{
+ int count = 0;
+
+ if (!TransactionIdIsValid(wait_for_xid) || commit_seq_num == 0)
+ return;
+
+ elog(DEBUG1, "plan to wait for commit seq: %lld, remote_xid %u to finish",
+ (long long int) commit_seq_num, wait_for_xid);
+
+ for (;;)
+ {
+ if (pa_transaction_committable(commit_seq_num))
+ break;
+
+ if (count > 100000)
+ {
+ pa_lock_transaction(wait_for_xid, AccessShareLock);
+ pa_unlock_transaction(wait_for_xid, AccessShareLock);
+ }
+ else
+ count++;
+
+ /* An interrupt may have occurred while we were waiting. */
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ elog(DEBUG1, "finished wait for commit seq: %lld, remote_xid %u to finish",
+ (long long int) commit_seq_num, wait_for_xid);
+}
+
+void
+pa_wait_until_committable(void)
+{
+ TransactionId wait_for_xid = InvalidTransactionId;
+ uint64 commit_seq_num = 0;
+
+ if (am_parallel_apply_worker())
+ {
+ SpinLockAcquire(&MyParallelShared->mutex);
+ wait_for_xid = MyParallelShared->wait_for_xid;
+ commit_seq_num = MyParallelShared->commit_seq_num;
+ SpinLockRelease(&MyParallelShared->mutex);
+ }
+ else
+ {
+ wait_for_xid = last_parallelized_xid;
+ commit_seq_num = last_parallelized_commit_seq;
+ }
+
+ pa_wait_for_transaction(wait_for_xid, commit_seq_num);
+}
+
+bool
+pa_cur_transaction_committable(void)
+{
+ uint64 commit_seq_num = 0;
+
+ if (am_parallel_apply_worker())
+ {
+ SpinLockAcquire(&MyParallelShared->mutex);
+ commit_seq_num = MyParallelShared->commit_seq_num;
+ SpinLockRelease(&MyParallelShared->mutex);
+ }
+ else
+ {
+ commit_seq_num = last_parallelized_commit_seq;
+ }
+
+ return pa_transaction_committable(commit_seq_num);
+}
+
+
/*
* Interrupt handler for main loop of parallel apply worker.
*/
@@ -745,6 +979,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
"ApplyMessageContext",
ALLOCSET_DEFAULT_SIZES);
+ ApplyBufferContext = AllocSetContextCreate(ApplyContext,
+ "ApplyBufferContext",
+ ALLOCSET_DEFAULT_SIZES);
+
/*
* Push apply error context callback. Fields will be filled while applying
* a change.
@@ -775,26 +1013,35 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
initReadOnlyStringInfo(&s, data, len);
- /*
- * The first byte of messages sent from leader apply worker to
- * parallel apply workers can only be 'w'.
- */
c = pq_getmsgbyte(&s);
- if (c != 'w')
- elog(ERROR, "unexpected message \"%c\"", c);
-
- /*
- * Ignore statistics fields that have been updated by the leader
- * apply worker.
- *
- * XXX We can avoid sending the statistics fields from the leader
- * apply worker but for that, it needs to rebuild the entire
- * message by removing these fields which could be more work than
- * simply ignoring these fields in the parallel apply worker.
- */
- s.cursor += SIZE_STATS_MESSAGE;
+ if (c == 'w')
+ {
+ /*
+ * Ignore statistics fields that have been updated by the
+ * leader apply worker.
+ *
+ * XXX We can avoid sending the statistics fields from the
+ * leader apply worker but for that, it needs to rebuild the
+ * entire message by removing these fields which could be more
+ * work than simply ignoring these fields in the parallel apply
+ * worker.
+ */
+ s.cursor += SIZE_STATS_MESSAGE;
- apply_dispatch(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == PARALLEL_APPLY_INIT_RELATION)
+ {
+ pa_init_relmap_cache(&s);
+ }
+ else
+ {
+ /*
+ * The first byte of messages sent from leader apply worker to
+ * parallel apply workers can only be 'w' or 'r'.
+ */
+ elog(ERROR, "unexpected message \"%c\"", c);
+ }
}
else if (shmq_res == SHM_MQ_WOULD_BLOCK)
{
@@ -811,6 +1058,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
if (rc & WL_LATCH_SET)
ResetLatch(MyLatch);
+
+ if (!IsTransactionState())
+ pgstat_report_stat(true);
}
}
else
@@ -848,6 +1098,7 @@ pa_shutdown(int code, Datum arg)
INVALID_PROC_NUMBER);
dsm_detach((dsm_segment *) DatumGetPointer(arg));
+ dsm_detach(commit_seq_seg);
}
/*
@@ -913,6 +1164,18 @@ ParallelApplyWorkerMain(Datum main_arg)
*/
logicalrep_worker_attach(worker_slot);
+ /*
+ * Attach to the dynamic shared memory segment for the parallel apply
+ * commit sequence.
+ */
+ commit_seq_seg = dsm_attach(MyParallelShared->commit_seq_handle);
+ if (!commit_seq_seg)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not map dynamic shared memory segment")));
+
+ pa_commit_seq = dsm_segment_address(commit_seq_seg);
+
/*
* Register the shutdown callback after we are attached to the worker
* slot. This is to ensure that MyLogicalRepWorker remains valid when this
@@ -1149,7 +1412,6 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
shm_mq_result result;
TimestampTz startTime = 0;
- Assert(!IsTransactionState());
Assert(!winfo->serialize_changes);
/*
@@ -1201,6 +1463,51 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
}
}
+void
+pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel)
+{
+ List *workers_stopped = NIL;
+ StringInfoData out;
+
+ if (!ParallelApplyWorkerPool)
+ return;
+
+ initStringInfo(&out);
+ appendStringInfoChar(&out, PARALLEL_APPLY_INIT_RELATION);
+ logicalrep_write_remote_rel(&out, rel);
+
+ foreach_ptr(ParallelApplyWorkerInfo, winfo, ParallelApplyWorkerPool)
+ {
+ if (winfo == stream_apply_worker)
+ continue;
+
+ if (winfo->serialize_changes)
+ continue;
+
+ elog(DEBUG1, "distributing schema changes to pa workers");
+
+ if (pa_send_data(winfo, out.len, out.data))
+ continue;
+
+ elog(DEBUG1, "failed to distribute, will stop that worker instead");
+
+ /* cannot distribute to this worker, stop this worker */
+ pa_wait_for_transaction(winfo->shared->xid,
+ winfo->shared->commit_seq_num + 1);
+
+ pa_get_last_commit_end(winfo->shared->xid, false, NULL);
+
+ logicalrep_pa_worker_stop(winfo);
+
+ workers_stopped = lappend(workers_stopped, winfo);
+ }
+
+ pfree(out.data);
+
+ foreach_ptr(ParallelApplyWorkerInfo, winfo, workers_stopped)
+ pa_free_worker_info(winfo);
+}
+
/*
* Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
* that the current data and any subsequent data for this transaction will be
@@ -1282,9 +1589,9 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
/*
- * Wait for the transaction lock to be released. This is required to
- * detect deadlock among leader and parallel apply workers. Refer to the
- * comments atop this file.
+ * Wait for the transaction lock to be released. This is required to detect
+ * deadlock among leader and parallel apply. Refer to the comments atop
+ * this file.
*/
pa_lock_transaction(winfo->shared->xid, AccessShareLock);
pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
@@ -1298,6 +1605,7 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("lost connection to the logical replication parallel apply worker")));
+
}
/*
@@ -1361,6 +1669,9 @@ pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
void
pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
{
+ if (!TransactionIdIsValid(top_xid))
+ return;
+
if (current_xid != top_xid &&
!list_member_xid(subxactlist, current_xid))
{
@@ -1617,23 +1928,63 @@ pa_decr_and_wait_stream_block(void)
void
pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
{
- Assert(am_leader_apply_worker());
+ XLogRecPtr local_lsn = InvalidXLogRecPtr;
+ TransactionId pa_remote_xid = winfo->shared->xid;
- /*
- * Unlock the shared object lock so that parallel apply worker can
- * continue to receive and apply changes.
- */
- pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
+ Assert(am_leader_apply_worker());
/*
+ * Unlock the shared object lock taken for transactions so that parallel
+ * apply worker can continue to receive and apply changes.
+ *
* Wait for that worker to finish. This is necessary to maintain commit
* order which avoids failures due to transaction dependencies and
* deadlocks.
*/
- pa_wait_for_xact_finish(winfo);
+ if (winfo->serialize_changes || winfo->stream_txn)
+ {
+ pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
+ pa_wait_for_xact_finish(winfo);
+
+ local_lsn = winfo->shared->last_commit_end;
+ pa_remote_xid = InvalidTransactionId;
+
+ pa_free_worker(winfo);
+ }
if (!XLogRecPtrIsInvalid(remote_lsn))
- store_flush_position(remote_lsn, winfo->shared->last_commit_end);
+ store_flush_position(remote_lsn, local_lsn, pa_remote_xid);
+
+ pa_set_stream_apply_worker(NULL);
+}
+
+static void
+pa_init_relmap_cache(StringInfo s)
+{
+ for (;;)
+ {
+ LogicalRepRelation *rel = logicalrep_read_rel(s);
+
+ logicalrep_relmap_update(rel);
+
+ elog(DEBUG1, "pa worker init relmap for %s", rel->relname);
+
+ if (s->cursor == s->len)
+ break;
+ }
+}
+
+void
+pa_update_commit_seq(ParallelApplyWorkerInfo *winfo)
+{
+ Assert(am_leader_apply_worker());
+ Assert(winfo->stream_txn);
+
+ SpinLockAcquire(&winfo->shared->mutex);
+ winfo->shared->commit_seq_num = last_parallelized_commit_seq;
+ winfo->shared->wait_for_xid = last_parallelized_xid;
+ SpinLockRelease(&winfo->shared->mutex);
- pa_free_worker(winfo);
+ elog(DEBUG1, "updated wait event for pa worker, seq: %lld, wait xid: %u",
+ (long long int) last_parallelized_commit_seq, last_parallelized_xid);
}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1a352b542dc..c072c44795f 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -691,6 +691,41 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel,
logicalrep_write_attrs(out, rel, columns, include_gencols_type);
}
+void
+logicalrep_write_remote_rel(StringInfo out, LogicalRepRelation *rel)
+{
+ pq_sendint32(out, rel->remoteid);
+
+ /* Write relation name */
+ pq_sendstring(out, rel->nspname);
+ pq_sendstring(out, rel->relname);
+
+ /* Write the replica identity. */
+ pq_sendbyte(out, rel->replident);
+
+ /* Write attribute description */
+ pq_sendint16(out, rel->natts);
+
+ for (int i = 0; i < rel->natts; i++)
+ {
+ uint8 flags = 0;
+
+ if (bms_is_member(i, rel->attkeys))
+ flags |= LOGICALREP_IS_REPLICA_IDENTITY;
+
+ pq_sendbyte(out, flags);
+
+ /* attribute name */
+ pq_sendstring(out, rel->attnames[i]);
+
+ /* attribute type id */
+ pq_sendint32(out, rel->atttyps[i]);
+
+ /* ignore attribute mode for now */
+ pq_sendint32(out, 0);
+ }
+}
+
/*
* Read the relation info from stream and return as LogicalRepRelation.
*/
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index f59046ad620..35392cf9aac 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -133,6 +133,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
{
LogicalRepRelation *remoterel;
+ if (entry->refcount)
+ elog(ERROR, "cannot free the entry when it is sill being used");
+
remoterel = &entry->remoterel;
pfree(remoterel->nspname);
@@ -366,6 +369,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
remoterel = &entry->remoterel;
/* Ensure we don't leak a relcache refcount. */
+ if (entry->refcount > 0)
+ {
+ Assert(entry->localrel);
+ entry->refcount++;
+ return entry;
+ }
+
if (entry->localrel)
elog(ERROR, "remote relation ID %u is already open", remoteid);
@@ -494,6 +504,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
entry->localreloid,
&entry->statelsn);
+ entry->refcount++;
+
return entry;
}
@@ -503,8 +515,13 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
void
logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
{
- table_close(rel->localrel, lockmode);
- rel->localrel = NULL;
+ rel->refcount--;
+
+ if (!rel->refcount)
+ {
+ table_close(rel->localrel, lockmode);
+ rel->localrel = NULL;
+ }
}
/*
@@ -946,3 +963,18 @@ FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
return InvalidOid;
}
+
+void
+logicalrep_write_all_rels(StringInfo out)
+{
+ LogicalRepRelMapEntry *entry;
+ HASH_SEQ_STATUS status;
+
+ if (LogicalRepRelMap == NULL)
+ return;
+
+ hash_seq_init(&status, LogicalRepRelMap);
+
+ while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
+ logicalrep_write_remote_rel(out, &entry->remoterel);
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c5fb627aa56..9130f893787 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -193,6 +193,7 @@ typedef struct FlushPosition
dlist_node node;
XLogRecPtr local_end;
XLogRecPtr remote_end;
+ TransactionId pa_remote_xid;
} FlushPosition;
static dlist_head lsn_mapping = DLIST_STATIC_INIT(lsn_mapping);
@@ -209,6 +210,19 @@ typedef struct ApplyExecutionData
PartitionTupleRouting *proute; /* partition routing info */
} ApplyExecutionData;
+typedef struct ApplyBufferChange
+{
+ LogicalRepMsgType action;
+ ApplyExecutionData *edata;
+ TupleTableSlot *old_slot;
+ TupleTableSlot *new_slot;
+ TupleTableSlot *local_slot;
+ LogicalRepTupleData *tuple;
+} ApplyBufferChange;
+
+static bool buffering_changes = false;
+static List *buffered_changes = NIL;
+
/* Struct for saving and restoring apply errcontext information */
typedef struct ApplyErrorCallbackArg
{
@@ -283,6 +297,7 @@ ErrorContextCallback *apply_error_context_stack = NULL;
MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
+MemoryContext ApplyBufferContext = NULL;
/* per stream context for streaming transactions */
static MemoryContext LogicalStreamingContext = NULL;
@@ -296,6 +311,7 @@ static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+static TransactionId remote_xid = InvalidTransactionId;
/* fields valid only when processing streamed transaction */
static bool in_streamed_transaction = false;
@@ -364,11 +380,7 @@ static inline void cleanup_subxact_info(void);
/*
* Serialize and deserialize changes for a toplevel transaction.
*/
-static void stream_open_file(Oid subid, TransactionId xid,
- bool first_segment);
static void stream_write_change(char action, StringInfo s);
-static void stream_open_and_write_change(TransactionId xid, char action, StringInfo s);
-static void stream_close_file(void);
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
@@ -380,11 +392,13 @@ static void apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
- Oid localindexoid);
+ Oid localindexoid,
+ TupleTableSlot *bufferedlocalslot);
static void apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
- Oid localindexoid);
+ Oid localindexoid,
+ TupleTableSlot *bufferedlocalslot);
static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
LogicalRepRelation *remoterel,
Oid localidxoid,
@@ -394,6 +408,14 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
CmdType operation);
+static void apply_buffered_changes(void);
+static LogicalRepTupleData *apply_buffer_copy_tuple(LogicalRepTupleData *tupleData);
+static void apply_buffer_add_change(LogicalRepMsgType action,
+ ApplyExecutionData *edata,
+ TupleTableSlot *old_slot,
+ TupleTableSlot *new_slot,
+ LogicalRepTupleData *tuple);
+static void apply_buffer_store_local_slot(TupleTableSlot *local_slot);
/* Functions for skipping changes */
static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
@@ -508,7 +530,8 @@ begin_replication_step(void)
maybe_reread_subscription();
}
- PushActiveSnapshot(GetTransactionSnapshot());
+ if (!buffering_changes || !ActiveSnapshotSet())
+ PushActiveSnapshot(GetTransactionSnapshot());
MemoryContextSwitchTo(ApplyMessageContext);
}
@@ -523,6 +546,10 @@ begin_replication_step(void)
static void
end_replication_step(void)
{
+ if (buffering_changes)
+ return;
+
+ Assert(ActiveSnapshotSet());
PopActiveSnapshot();
CommandCounterIncrement();
@@ -556,14 +583,16 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
TransApplyAction apply_action;
StringInfoData original_msg;
- apply_action = get_transaction_apply_action(stream_xid, &winfo);
+ Assert(!in_streamed_transaction || TransactionIdIsValid(stream_xid));
+
+ apply_action = get_transaction_apply_action(in_streamed_transaction
+ ? stream_xid : remote_xid,
+ &winfo);
/* not in streaming mode */
if (apply_action == TRANS_LEADER_APPLY)
return false;
- Assert(TransactionIdIsValid(stream_xid));
-
/*
* The parallel apply worker needs the xid in this message to decide
* whether to define a savepoint, so save the original message that has
@@ -574,9 +603,12 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
/*
* We should have received XID of the subxact as the first part of the
- * message, so extract it.
+ * message in streaming transactions, so extract it.
*/
- current_xid = pq_getmsgint(s, 4);
+ if (in_streamed_transaction)
+ current_xid = pq_getmsgint(s, 4);
+ else
+ current_xid = remote_xid;
if (!TransactionIdIsValid(current_xid))
ereport(ERROR,
@@ -685,10 +717,17 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
estate->es_opened_result_relations =
lappend(estate->es_opened_result_relations, resultRelInfo);
- estate->es_output_cid = GetCurrentCommandId(true);
+ if (!buffering_changes)
+ {
+ estate->es_output_cid = GetCurrentCommandId(true);
- /* Prepare to catch AFTER triggers. */
- AfterTriggerBeginQuery();
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+ }
+ else
+ {
+ estate->es_output_cid = GetCurrentCommandId(false);
+ }
/* other fields of edata remain NULL for now */
@@ -985,17 +1024,50 @@ static void
apply_handle_begin(StringInfo s)
{
LogicalRepBeginData begin_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
/* There must not be an active streaming transaction. */
Assert(!TransactionIdIsValid(stream_xid));
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
+
+ remote_xid = begin_data.xid;
+
+ set_apply_error_context_xact(remote_xid, begin_data.final_lsn);
remote_final_lsn = begin_data.final_lsn;
maybe_start_skipping_changes(begin_data.final_lsn);
+ pa_allocate_worker(remote_xid, false);
+
+ apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+ elog(DEBUG1, "new remote_xid %u", remote_xid);
+ switch (apply_action)
+ {
+ case TRANS_LEADER_APPLY:
+ buffering_changes = !pa_cur_transaction_committable();
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+ pa_send_data(winfo, s->len, s->data);
+ pa_set_stream_apply_worker(winfo);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+ /* Hold the lock until the end of the transaction. */
+ pa_lock_transaction(MyParallelShared->xid, AccessExclusiveLock);
+ buffering_changes = !pa_cur_transaction_committable();
+ break;
+
+ default:
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+ break;
+ }
+
in_remote_transaction = true;
pgstat_report_activity(STATE_RUNNING, NULL);
@@ -1010,6 +1082,11 @@ static void
apply_handle_commit(StringInfo s)
{
LogicalRepCommitData commit_data;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
+
+ /* Save the message before it is consumed. */
+ StringInfoData original_msg = *s;
logicalrep_read_commit(s, &commit_data);
@@ -1020,7 +1097,79 @@ apply_handle_commit(StringInfo s)
LSN_FORMAT_ARGS(commit_data.commit_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
- apply_handle_commit_internal(&commit_data);
+ apply_action = get_transaction_apply_action(remote_xid, &winfo);
+
+ switch (apply_action)
+ {
+ case TRANS_LEADER_APPLY:
+ if (buffering_changes)
+ {
+ pa_wait_until_committable();
+ apply_buffered_changes();
+ }
+
+ apply_handle_commit_internal(&commit_data);
+ break;
+
+ case TRANS_LEADER_SEND_TO_PARALLEL:
+ Assert(winfo);
+
+ if (pa_send_data(winfo, s->len, s->data))
+ {
+ /* Finish processing the transaction. */
+ pa_xact_finish(winfo, commit_data.end_lsn);
+ break;
+ }
+
+ /*
+ * Switch to serialize mode when we are not able to send the
+ * change to parallel apply worker.
+ */
+ pa_switch_to_partial_serialize(winfo, true);
+
+ /* fall through */
+ case TRANS_LEADER_PARTIAL_SERIALIZE:
+ Assert(winfo);
+
+ stream_open_and_write_change(remote_xid, LOGICAL_REP_MSG_COMMIT,
+ &original_msg);
+
+ pa_set_fileset_state(winfo->shared, FS_SERIALIZE_DONE);
+
+ /* Finish processing the transaction. */
+ pa_xact_finish(winfo, commit_data.end_lsn);
+ break;
+
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If the parallel apply worker is applying spooled messages then
+ * close the file before committing.
+ */
+ if (stream_fd)
+ stream_close_file();
+
+ pa_wait_until_committable();
+ apply_buffered_changes();
+ apply_handle_commit_internal(&commit_data);
+
+ MyParallelShared->last_commit_end = XactLastCommitEnd;
+
+ pa_advance_committable_seq_num();
+ pa_unlock_transaction(remote_xid, AccessExclusiveLock);
+ break;
+
+ default:
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
+ break;
+ }
+
+ remote_xid = InvalidTransactionId;
+ in_remote_transaction = false;
+
+ buffering_changes = false;
+
+ elog(DEBUG1, "reset remote_xid %u", remote_xid);
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables(commit_data.end_lsn);
@@ -1140,7 +1289,8 @@ apply_handle_prepare(StringInfo s)
* XactLastCommitEnd, and adding it for this purpose doesn't seems worth
* it.
*/
- store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+ store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+ InvalidTransactionId);
in_remote_transaction = false;
@@ -1185,6 +1335,8 @@ apply_handle_commit_prepared(StringInfo s)
/* There is no transaction when COMMIT PREPARED is called */
begin_replication_step();
+ /* TODO wait for xid to finish */
+
/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
@@ -1197,7 +1349,8 @@ apply_handle_commit_prepared(StringInfo s)
CommitTransactionCommand();
pgstat_report_stat(false);
- store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
+ store_flush_position(prepare_data.end_lsn, XactLastCommitEnd,
+ InvalidTransactionId);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
@@ -1263,7 +1416,8 @@ apply_handle_rollback_prepared(StringInfo s)
* transaction because we always flush the WAL record for it. See
* apply_handle_prepare.
*/
- store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
+ store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr,
+ InvalidTransactionId);
in_remote_transaction = false;
/* Process any tables that are being synchronized in parallel. */
@@ -1322,7 +1476,8 @@ apply_handle_stream_prepare(StringInfo s)
* It is okay not to set the local_end LSN for the prepare because
* we always flush the prepare record. See apply_handle_prepare.
*/
- store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr);
+ store_flush_position(prepare_data.end_lsn, InvalidXLogRecPtr,
+ InvalidTransactionId);
in_remote_transaction = false;
@@ -1501,6 +1656,8 @@ apply_handle_stream_start(StringInfo s)
/* notify handle methods we're processing a remote transaction */
in_streamed_transaction = true;
+ buffering_changes = false;
+
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
@@ -1513,7 +1670,7 @@ apply_handle_stream_start(StringInfo s)
/* Try to allocate a worker for the streaming transaction. */
if (first_segment)
- pa_allocate_worker(stream_xid);
+ pa_allocate_worker(stream_xid, true);
apply_action = get_transaction_apply_action(stream_xid, &winfo);
@@ -1535,6 +1692,8 @@ apply_handle_stream_start(StringInfo s)
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);
+ pa_update_commit_seq(winfo);
+
/*
* Once we start serializing the changes, the parallel apply
* worker will wait for the leader to release the stream lock
@@ -1571,6 +1730,12 @@ apply_handle_stream_start(StringInfo s)
case TRANS_LEADER_PARTIAL_SERIALIZE:
Assert(winfo);
+ /*
+ * TODO, the pa worker could start to wait too soon when processing
+ * some old stream start
+ */
+ pa_update_commit_seq(winfo);
+
/*
* Open the spool file unless it was already opened when switching
* to serialize mode. The transaction started in
@@ -1599,6 +1764,8 @@ apply_handle_stream_start(StringInfo s)
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
}
+ pa_wait_until_committable();
+
parallel_stream_nchanges = 0;
break;
@@ -2069,6 +2236,8 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
end_replication_step();
+ Assert(!buffering_changes);
+
/*
* Read the entries one by one and pass them through the same logic as in
* apply_dispatch.
@@ -2294,7 +2463,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
pgstat_report_stat(false);
- store_flush_position(commit_data->end_lsn, XactLastCommitEnd);
+ store_flush_position(commit_data->end_lsn, XactLastCommitEnd,
+ InvalidTransactionId);
}
else
{
@@ -2318,15 +2488,31 @@ static void
apply_handle_relation(StringInfo s)
{
LogicalRepRelation *rel;
+ ParallelApplyWorkerInfo *winfo;
+ TransApplyAction apply_action;
if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
return;
rel = logicalrep_read_rel(s);
+
+ apply_action = get_transaction_apply_action(in_streamed_transaction
+ ? stream_xid : remote_xid,
+ &winfo);
+
+ if (apply_action == TRANS_LEADER_APPLY)
+ {
+ pa_wait_until_committable();
+ apply_buffered_changes();
+ }
+
logicalrep_relmap_update(rel);
/* Also reset all entries in the partition map that refer to remoterel. */
logicalrep_partmap_reset_relmap(rel);
+
+ if (am_leader_apply_worker())
+ pa_distribute_schema_changes_to_workers(rel);
}
/*
@@ -2394,6 +2580,7 @@ apply_handle_insert(StringInfo s)
ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
+ MemoryContext applyctx;
MemoryContext oldctx;
bool run_as_owner;
@@ -2405,6 +2592,9 @@ apply_handle_insert(StringInfo s)
handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
return;
+ if (buffering_changes && pa_cur_transaction_committable())
+ apply_buffered_changes();
+
begin_replication_step();
relid = logicalrep_read_insert(s, &newtup);
@@ -2431,6 +2621,9 @@ apply_handle_insert(StringInfo s)
/* Set relation for error callback */
apply_error_callback_arg.rel = rel;
+ if (buffering_changes)
+ applyctx = MemoryContextSwitchTo(ApplyBufferContext);
+
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
estate = edata->estate;
@@ -2444,20 +2637,84 @@ apply_handle_insert(StringInfo s)
slot_fill_defaults(rel, estate, remoteslot);
MemoryContextSwitchTo(oldctx);
+ if (buffering_changes)
+ {
+ apply_buffer_add_change(LOGICAL_REP_MSG_INSERT, edata, NULL,
+ remoteslot, NULL);
+ MemoryContextSwitchTo(applyctx);
+ }
+ else
+ {
+ /* For a partitioned table, insert the tuple into a partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_INSERT);
+ else
+ {
+ ResultRelInfo *relinfo = edata->targetRelInfo;
+
+ ExecOpenIndices(relinfo, false);
+ apply_handle_insert_internal(edata, relinfo, remoteslot);
+ ExecCloseIndices(relinfo);
+ }
+
+ finish_edata(edata);
+
+ logicalrep_rel_close(rel, NoLock);
+ }
+
+ /* Reset relation for error callback */
+ apply_error_callback_arg.rel = NULL;
+
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
+
+ end_replication_step();
+}
+
+static void
+apply_handle_buffered_insert(ApplyBufferChange *change)
+{
+ LogicalRepRelMapEntry *rel = change->edata->targetRel;
+ UserContext ucxt;
+ bool run_as_owner;
+
+ Assert(!buffering_changes);
+
+ begin_replication_step();
+
+ /* Set relation for error callback */
+ apply_error_callback_arg.rel = rel;
+
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+ change->edata->estate->es_output_cid = GetCurrentCommandId(true);
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
/* For a partitioned table, insert the tuple into a partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(edata,
- remoteslot, NULL, CMD_INSERT);
+ apply_handle_tuple_routing(change->edata,
+ change->new_slot, NULL, CMD_INSERT);
else
{
- ResultRelInfo *relinfo = edata->targetRelInfo;
+ ResultRelInfo *relinfo = change->edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_insert_internal(edata, relinfo, remoteslot);
+ apply_handle_insert_internal(change->edata, relinfo, change->new_slot);
ExecCloseIndices(relinfo);
}
- finish_edata(edata);
+ finish_edata(change->edata);
+
+ logicalrep_rel_close(rel, NoLock);
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
@@ -2465,8 +2722,6 @@ apply_handle_insert(StringInfo s)
if (!run_as_owner)
RestoreUserContext(&ucxt);
- logicalrep_rel_close(rel, NoLock);
-
end_replication_step();
}
@@ -2554,6 +2809,7 @@ apply_handle_update(StringInfo s)
bool has_oldtup;
TupleTableSlot *remoteslot;
RTEPermissionInfo *target_perminfo;
+ MemoryContext applyctx;
MemoryContext oldctx;
bool run_as_owner;
@@ -2565,6 +2821,9 @@ apply_handle_update(StringInfo s)
handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
return;
+ if (buffering_changes && pa_cur_transaction_committable())
+ apply_buffered_changes();
+
begin_replication_step();
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
@@ -2595,6 +2854,9 @@ apply_handle_update(StringInfo s)
if (!run_as_owner)
SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ if (buffering_changes)
+ applyctx = MemoryContextSwitchTo(ApplyBufferContext);
+
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
estate = edata->estate;
@@ -2632,15 +2894,34 @@ apply_handle_update(StringInfo s)
has_oldtup ? &oldtup : &newtup);
MemoryContextSwitchTo(oldctx);
- /* For a partitioned table, apply update to correct partition. */
- if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(edata,
- remoteslot, &newtup, CMD_UPDATE);
+ if (buffering_changes)
+ {
+ apply_buffer_add_change(LOGICAL_REP_MSG_UPDATE, edata,
+ has_oldtup ? remoteslot : NULL,
+ has_oldtup ? NULL : remoteslot,
+ apply_buffer_copy_tuple(&newtup));
+ MemoryContextSwitchTo(applyctx);
+
+ if (rel->localrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ apply_handle_update_internal(edata, edata->targetRelInfo,
+ remoteslot, &newtup,
+ rel->localindexoid, NULL);
+ }
else
- apply_handle_update_internal(edata, edata->targetRelInfo,
- remoteslot, &newtup, rel->localindexoid);
+ {
+ /* For a partitioned table, apply update to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(edata,
+ remoteslot, &newtup, CMD_UPDATE);
+ else
+ apply_handle_update_internal(edata, edata->targetRelInfo,
+ remoteslot, &newtup, rel->localindexoid,
+ NULL);
+
+ finish_edata(edata);
- finish_edata(edata);
+ logicalrep_rel_close(rel, NoLock);
+ }
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
@@ -2648,8 +2929,68 @@ apply_handle_update(StringInfo s)
if (!run_as_owner)
RestoreUserContext(&ucxt);
+ end_replication_step();
+}
+
+static void
+apply_handle_buffered_update(ApplyBufferChange *change)
+{
+ LogicalRepRelMapEntry *rel = change->edata->targetRel;
+ UserContext ucxt;
+ bool run_as_owner;
+ CommandId original_cid;
+
+ Assert(!buffering_changes);
+
+ begin_replication_step();
+
+ /* Set relation for error callback */
+ apply_error_callback_arg.rel = rel;
+
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+ original_cid = change->edata->estate->es_output_cid;
+ change->edata->estate->es_output_cid = GetCurrentCommandId(true);
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
+ /* For a partitioned table, apply update to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(change->edata,
+ change->new_slot ? change->new_slot : change->old_slot,
+ change->tuple, CMD_UPDATE);
+ else
+ {
+ bool locked = false;
+
+ if (change->local_slot)
+ locked = RelationLockTuple(rel->localrel, LockTupleExclusive,
+ change->new_slot ? change->new_slot : change->old_slot,
+ change->local_slot, original_cid);
+
+ apply_handle_update_internal(change->edata, change->edata->targetRelInfo,
+ change->new_slot ? change->new_slot : change->old_slot,
+ change->tuple, rel->localindexoid,
+ locked ? change->local_slot : NULL);
+ }
+
+ finish_edata(change->edata);
+
logicalrep_rel_close(rel, NoLock);
+ /* Reset relation for error callback */
+ apply_error_callback_arg.rel = NULL;
+
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
+
end_replication_step();
}
@@ -2663,25 +3004,35 @@ apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
- Oid localindexoid)
+ Oid localindexoid,
+ TupleTableSlot *bufferedlocalslot)
{
EState *estate = edata->estate;
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
Relation localrel = relinfo->ri_RelationDesc;
EPQState epqstate;
- TupleTableSlot *localslot = NULL;
+ TupleTableSlot *localslot = bufferedlocalslot;
ConflictTupleInfo conflicttuple = {0};
- bool found;
+ bool found = (bufferedlocalslot != NULL);
MemoryContext oldctx;
+ if (!found)
+ {
+ found = FindReplTupleInLocalRel(edata, localrel,
+ &relmapentry->remoterel,
+ localindexoid,
+ remoteslot, &localslot);
+ }
+
+ if (buffering_changes)
+ {
+ apply_buffer_store_local_slot(found ? localslot : NULL);
+ return;
+ }
+
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
ExecOpenIndices(relinfo, false);
- found = FindReplTupleInLocalRel(edata, localrel,
- &relmapentry->remoterel,
- localindexoid,
- remoteslot, &localslot);
-
/*
* Tuple found.
*
@@ -2759,6 +3110,7 @@ apply_handle_delete(StringInfo s)
ApplyExecutionData *edata;
EState *estate;
TupleTableSlot *remoteslot;
+ MemoryContext applyctx;
MemoryContext oldctx;
bool run_as_owner;
@@ -2770,6 +3122,9 @@ apply_handle_delete(StringInfo s)
handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
return;
+ if (buffering_changes && pa_cur_transaction_committable())
+ apply_buffered_changes();
+
begin_replication_step();
relid = logicalrep_read_delete(s, &oldtup);
@@ -2799,6 +3154,9 @@ apply_handle_delete(StringInfo s)
if (!run_as_owner)
SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ if (buffering_changes)
+ applyctx = MemoryContextSwitchTo(ApplyBufferContext);
+
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
estate = edata->estate;
@@ -2811,21 +3169,99 @@ apply_handle_delete(StringInfo s)
slot_store_data(remoteslot, rel, &oldtup);
MemoryContextSwitchTo(oldctx);
+ if (buffering_changes)
+ {
+ apply_buffer_add_change(LOGICAL_REP_MSG_DELETE, edata, remoteslot,
+ NULL, NULL);
+ MemoryContextSwitchTo(applyctx);
+
+ if (rel->localrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ apply_handle_delete_internal(edata, edata->targetRelInfo,
+ remoteslot, rel->localindexoid, NULL);
+ }
+ else
+ {
+ /* For a partitioned table, apply delete to correct partition. */
+ if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ apply_handle_tuple_routing(edata,
+ remoteslot, NULL, CMD_DELETE);
+ else
+ {
+ ResultRelInfo *relinfo = edata->targetRelInfo;
+
+ ExecOpenIndices(relinfo, false);
+ apply_handle_delete_internal(edata, relinfo,
+ remoteslot, rel->localindexoid, NULL);
+ ExecCloseIndices(relinfo);
+ }
+
+ finish_edata(edata);
+
+ logicalrep_rel_close(rel, NoLock);
+ }
+
+ /* Reset relation for error callback */
+ apply_error_callback_arg.rel = NULL;
+
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
+
+ end_replication_step();
+}
+
+static void
+apply_handle_buffered_delete(ApplyBufferChange *change)
+{
+ LogicalRepRelMapEntry *rel = change->edata->targetRel;
+ UserContext ucxt;
+ bool run_as_owner;
+ CommandId original_cid;
+
+ Assert(!buffering_changes);
+
+ begin_replication_step();
+
+ /* Set relation for error callback */
+ apply_error_callback_arg.rel = rel;
+
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+ original_cid = change->edata->estate->es_output_cid;
+ change->edata->estate->es_output_cid = GetCurrentCommandId(true);
+
+ /* Prepare to catch AFTER triggers. */
+ AfterTriggerBeginQuery();
+
/* For a partitioned table, apply delete to correct partition. */
if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- apply_handle_tuple_routing(edata,
- remoteslot, NULL, CMD_DELETE);
+ apply_handle_tuple_routing(change->edata,
+ change->old_slot, NULL, CMD_DELETE);
else
{
- ResultRelInfo *relinfo = edata->targetRelInfo;
+ ResultRelInfo *relinfo = change->edata->targetRelInfo;
+ bool locked = false;
+
+ if (change->local_slot)
+ locked = RelationLockTuple(rel->localrel, LockTupleExclusive,
+ change->old_slot, change->local_slot,
+ original_cid);
ExecOpenIndices(relinfo, false);
- apply_handle_delete_internal(edata, relinfo,
- remoteslot, rel->localindexoid);
+ apply_handle_delete_internal(change->edata, relinfo,
+ change->old_slot, rel->localindexoid,
+ locked ? change->local_slot : NULL);
ExecCloseIndices(relinfo);
}
- finish_edata(edata);
+ finish_edata(change->edata);
+
+ logicalrep_rel_close(rel, NoLock);
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
@@ -2833,8 +3269,6 @@ apply_handle_delete(StringInfo s)
if (!run_as_owner)
RestoreUserContext(&ucxt);
- logicalrep_rel_close(rel, NoLock);
-
end_replication_step();
}
@@ -2847,25 +3281,33 @@ static void
apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
- Oid localindexoid)
+ Oid localindexoid,
+ TupleTableSlot *bufferedlocalslot)
{
EState *estate = edata->estate;
Relation localrel = relinfo->ri_RelationDesc;
LogicalRepRelation *remoterel = &edata->targetRel->remoterel;
EPQState epqstate;
- TupleTableSlot *localslot;
+ TupleTableSlot *localslot = bufferedlocalslot;
ConflictTupleInfo conflicttuple = {0};
- bool found;
+ bool found = (bufferedlocalslot != NULL);
- EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+ if (!found)
+ found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
+ remoteslot, &localslot);
+
+ if (buffering_changes)
+ {
+ apply_buffer_store_local_slot(found ? localslot : NULL);
+ return;
+ }
/* Caller should have opened indexes already. */
Assert(relinfo->ri_IndexRelationDescs != NULL ||
!localrel->rd_rel->relhasindex ||
RelationGetIndexList(localrel) == NIL);
- found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
- remoteslot, &localslot);
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
/* If found delete it. */
if (found)
@@ -2919,6 +3361,7 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
TupleTableSlot **localslot)
{
EState *estate = edata->estate;
+ MemoryContext oldctx;
bool found;
/*
@@ -2927,8 +3370,14 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
*/
TargetPrivilegesCheck(localrel, ACL_SELECT);
+ if (buffering_changes)
+ oldctx = MemoryContextSwitchTo(ApplyBufferContext);
+
*localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ if (buffering_changes)
+ MemoryContextSwitchTo(oldctx);
+
Assert(OidIsValid(localidxoid) ||
(remoterel->replident == REPLICA_IDENTITY_FULL));
@@ -2947,11 +3396,13 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
found = RelationFindReplTupleByIndex(localrel, localidxoid,
LockTupleExclusive,
- remoteslot, *localslot);
+ remoteslot, *localslot,
+ !buffering_changes);
}
else
found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
- remoteslot, *localslot);
+ remoteslot, *localslot,
+ !buffering_changes);
return found;
}
@@ -3048,7 +3499,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
case CMD_DELETE:
apply_handle_delete_internal(edata, partrelinfo,
remoteslot_part,
- part_entry->localindexoid);
+ part_entry->localindexoid,
+ NULL);
break;
case CMD_UPDATE:
@@ -3250,6 +3702,8 @@ apply_handle_truncate(StringInfo s)
ListCell *lc;
LOCKMODE lockmode = AccessExclusiveLock;
+ elog(LOG, "truncate");
+
/*
* Quick return if we are skipping data modification changes or handling
* streamed transactions.
@@ -3258,6 +3712,9 @@ apply_handle_truncate(StringInfo s)
handle_streamed_transaction(LOGICAL_REP_MSG_TRUNCATE, s))
return;
+ pa_wait_until_committable();
+ apply_buffered_changes();
+
begin_replication_step();
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
@@ -3471,6 +3928,115 @@ apply_dispatch(StringInfo s)
apply_error_callback_arg.command = saved_command;
}
+static void
+apply_buffered_changes(void)
+{
+ buffering_changes = false;
+ elog(DEBUG1, "started apply buffered changes %d", list_length(buffered_changes));
+
+ if (!buffered_changes)
+ return;
+
+ foreach_ptr(ApplyBufferChange, change, buffered_changes)
+ {
+ LogicalRepMsgType saved_command;
+
+ /*
+ * Set the current command being applied. Since this function can be
+ * called recursively when applying spooled changes, save the current
+ * command.
+ */
+ saved_command = apply_error_callback_arg.command;
+ apply_error_callback_arg.command = change->action;
+
+ switch (change->action)
+ {
+ case LOGICAL_REP_MSG_INSERT:
+ apply_handle_buffered_insert(change);
+ break;
+
+ case LOGICAL_REP_MSG_UPDATE:
+ apply_handle_buffered_update(change);
+ break;
+
+ case LOGICAL_REP_MSG_DELETE:
+ apply_handle_buffered_delete(change);
+ break;
+
+ default:
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("invalid logical replication message type \"??? (%d)\"", change->action)));
+ }
+
+ /* Reset the current command */
+ apply_error_callback_arg.command = saved_command;
+ }
+
+ Assert(ActiveSnapshotSet());
+ PopActiveSnapshot();
+ Assert(!ActiveSnapshotSet());
+ MemoryContextReset(ApplyBufferContext);
+ buffered_changes = NIL;
+}
+
+static LogicalRepTupleData *
+apply_buffer_copy_tuple(LogicalRepTupleData *tupleData)
+{
+ LogicalRepTupleData *res;
+ int ncols = tupleData->ncols;
+
+ Assert(CurrentMemoryContext == ApplyBufferContext);
+
+ res = palloc0_object(LogicalRepTupleData);
+
+ /* Allocate space for per-column values; zero out unused StringInfoDatas */
+ res->colvalues = (StringInfoData *) palloc0(ncols * sizeof(StringInfoData));
+ res->colstatus = (char *) palloc(ncols * sizeof(char));
+ res->ncols = ncols;
+
+ for (int i = 0; i < ncols; i++)
+ {
+ initStringInfo(&res->colvalues[i]);
+ appendBinaryStringInfo(&res->colvalues[i],
+ tupleData->colvalues[i].data,
+ tupleData->colvalues[i].len);
+ res->colstatus[i] = tupleData->colstatus[i];
+ }
+
+ return res;
+}
+
+static void
+apply_buffer_add_change(LogicalRepMsgType action, ApplyExecutionData *edata,
+ TupleTableSlot *old_slot, TupleTableSlot *new_slot,
+ LogicalRepTupleData *tuple)
+{
+ ApplyBufferChange *change;
+
+ Assert(CurrentMemoryContext == ApplyBufferContext);
+
+ change = palloc0_object(ApplyBufferChange);
+ change->action = action;
+ change->edata = edata;
+ change->new_slot = new_slot;
+ change->old_slot = old_slot;
+ change->tuple = tuple;
+
+ buffered_changes = lappend(buffered_changes, change);
+}
+
+static void
+apply_buffer_store_local_slot(TupleTableSlot *local_slot)
+{
+ ApplyBufferChange *change;
+
+ Assert(buffered_changes != NIL);
+
+ change = (ApplyBufferChange *) llast(buffered_changes);
+ change->local_slot = local_slot;
+}
+
/*
* Figure out which write/flush positions to report to the walsender process.
*
@@ -3499,6 +4065,36 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
FlushPosition *pos =
dlist_container(FlushPosition, node, iter.cur);
+ /*
+ * Get the parallel apply worker that holds the local commit lsn of the
+ * transaction.
+ */
+ if (TransactionIdIsValid(pos->pa_remote_xid) &&
+ XLogRecPtrIsInvalid(pos->local_end))
+ {
+ bool skipped_write;
+
+ pos->local_end = pa_get_last_commit_end(pos->pa_remote_xid, true,
+ &skipped_write);
+
+ elog(DEBUG1,
+ "got commit end from parallel apply worker, "
+ "txn: %u, remote_end %X/%X, local_end %X/%X",
+ pos->pa_remote_xid, LSN_FORMAT_ARGS(pos->remote_end),
+ LSN_FORMAT_ARGS(pos->local_end));
+
+ /* Return if the worker has not finished applying */
+ if (!skipped_write && XLogRecPtrIsInvalid(pos->local_end))
+ {
+ *have_pending_txes = true;
+ return;
+ }
+ }
+
+ /*
+ * Worker has finished applying or the transaction was applied in the
+ * leader apply worker
+ */
*write = pos->remote_end;
if (pos->local_end <= local_flush)
@@ -3507,19 +4103,6 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
dlist_delete(iter.cur);
pfree(pos);
}
- else
- {
- /*
- * Don't want to uselessly iterate over the rest of the list which
- * could potentially be long. Instead get the last element and
- * grab the write position from there.
- */
- pos = dlist_tail_element(FlushPosition, node,
- &lsn_mapping);
- *write = pos->remote_end;
- *have_pending_txes = true;
- return;
- }
}
*have_pending_txes = !dlist_is_empty(&lsn_mapping);
@@ -3529,7 +4112,8 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush,
* Store current remote/local lsn pair in the tracking list.
*/
void
-store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
+store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+ TransactionId remote_xid)
{
FlushPosition *flushpos;
@@ -3547,6 +4131,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
flushpos->local_end = local_lsn;
flushpos->remote_end = remote_lsn;
+ flushpos->pa_remote_xid = remote_xid;
dlist_push_tail(&lsn_mapping, &flushpos->node);
MemoryContextSwitchTo(ApplyMessageContext);
@@ -3594,6 +4179,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
+ ApplyBufferContext = AllocSetContextCreate(ApplyContext,
+ "ApplyBufferContext",
+ ALLOCSET_DEFAULT_SIZES);
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -4324,7 +4913,7 @@ stream_cleanup_files(Oid subid, TransactionId xid)
* changes for this transaction, create the buffile, otherwise open the
* previously created file.
*/
-static void
+void
stream_open_file(Oid subid, TransactionId xid, bool first_segment)
{
char path[MAXPGPATH];
@@ -4369,7 +4958,7 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
* stream_close_file
* Close the currently open file with streamed changes.
*/
-static void
+void
stream_close_file(void)
{
Assert(stream_fd != NULL);
@@ -4417,7 +5006,7 @@ stream_write_change(char action, StringInfo s)
* target file if not already before writing the message and close the file at
* the end.
*/
-static void
+void
stream_open_and_write_change(TransactionId xid, char action, StringInfo s)
{
Assert(!in_streamed_transaction);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544d..ed9ba8fdb9c 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -756,9 +756,14 @@ extern void check_exclusion_constraint(Relation heap, Relation index,
extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
LockTupleMode lockmode,
TupleTableSlot *searchslot,
- TupleTableSlot *outslot);
+ TupleTableSlot *outslot,
+ bool locktuple);
extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
- TupleTableSlot *searchslot, TupleTableSlot *outslot);
+ TupleTableSlot *searchslot, TupleTableSlot *outslot,
+ bool locktuple);
+extern bool RelationLockTuple(Relation rel, LockTupleMode lockmode,
+ TupleTableSlot *searchslot,
+ TupleTableSlot *outslot, CommandId cid);
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
EState *estate, TupleTableSlot *slot);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..ce880f179dc 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -64,6 +64,7 @@ typedef enum LogicalRepMsgType
LOGICAL_REP_MSG_DELETE = 'D',
LOGICAL_REP_MSG_TRUNCATE = 'T',
LOGICAL_REP_MSG_RELATION = 'R',
+ LOGICAL_REP_MSG_INTERNAL_RELATION = 'r',
LOGICAL_REP_MSG_TYPE = 'Y',
LOGICAL_REP_MSG_MESSAGE = 'M',
LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
@@ -251,6 +252,8 @@ extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecP
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel, Bitmapset *columns,
PublishGencolsType include_gencols_type);
+extern void logicalrep_write_remote_rel(StringInfo out,
+ LogicalRepRelation *rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 7a561a8e8d8..d35bd00e93c 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -37,6 +37,8 @@ typedef struct LogicalRepRelMapEntry
/* Sync state. */
char state;
XLogRecPtr statelsn;
+
+ int refcount;
} LogicalRepRelMapEntry;
extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
@@ -50,5 +52,6 @@ extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
extern bool IsIndexUsableForReplicaIdentityFull(Relation idxrel, AttrMap *attrmap);
extern Oid GetRelationIdentityOrPK(Relation rel);
+extern void logicalrep_write_all_rels(StringInfo out);
#endif /* LOGICALRELATION_H */
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..e0c20da7299 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
*/
PartialFileSetState fileset_state;
FileSet fileset;
+
+ dsm_handle commit_seq_handle;
+
+ uint64 commit_seq_num;
+ TransactionId wait_for_xid;
} ParallelApplyWorkerShared;
/*
@@ -214,7 +219,11 @@ typedef struct ParallelApplyWorkerInfo
*/
bool in_use;
+ bool stream_txn;
+
ParallelApplyWorkerShared *shared;
+
+ bool collected_local_end;
} ParallelApplyWorkerInfo;
/* Main memory context for apply worker. Permanent during worker lifetime. */
@@ -222,6 +231,8 @@ extern PGDLLIMPORT MemoryContext ApplyContext;
extern PGDLLIMPORT MemoryContext ApplyMessageContext;
+extern PGDLLIMPORT MemoryContext ApplyBufferContext;
+
extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
@@ -275,6 +286,10 @@ extern void apply_dispatch(StringInfo s);
extern void maybe_reread_subscription(void);
extern void stream_cleanup_files(Oid subid, TransactionId xid);
+extern void stream_open_file(Oid subid, TransactionId xid, bool first_segment);
+extern void stream_close_file(void);
+extern void stream_open_and_write_change(TransactionId xid, char action,
+ StringInfo s);
extern void set_stream_options(WalRcvStreamOptions *options,
char *slotname,
@@ -288,19 +303,27 @@ extern void SetupApplyOrSyncWorker(int worker_slot);
extern void DisableSubscriptionAndExit(void);
-extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
+extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn,
+ TransactionId remote_xid);
/* Function for apply error callback */
extern void apply_error_callback(void *arg);
extern void set_apply_error_context_origin(char *originname);
/* Parallel apply worker setup and interactions */
-extern void pa_allocate_worker(TransactionId xid);
+extern void pa_allocate_worker(TransactionId xid, bool stream_txn);
extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
+extern XLogRecPtr pa_get_last_commit_end(TransactionId xid, bool delete_entry,
+ bool *skipped_write);
extern void pa_detach_all_error_mq(void);
+extern void pa_advance_committable_seq_num(void);
+extern void pa_wait_until_committable(void);
+extern bool pa_cur_transaction_committable(void);
+
extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
const void *data);
+extern void pa_distribute_schema_changes_to_workers(LogicalRepRelation *rel);
extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
bool stream_locked);
@@ -325,6 +348,7 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
+extern void pa_update_commit_seq(ParallelApplyWorkerInfo *winfo);
#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl
index 3d16c2a800d..c2fba0b9a9c 100644
--- a/src/test/subscription/t/010_truncate.pl
+++ b/src/test/subscription/t/010_truncate.pl
@@ -17,7 +17,7 @@ $node_publisher->start;
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->append_conf('postgresql.conf',
- qq(max_logical_replication_workers = 6));
+ qq(max_logical_replication_workers = 7));
$node_subscriber->start;
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 00a1c2fcd48..6842476c8b0 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,7 @@ $node_publisher->start;
# Create subscriber node.
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
$node_subscriber->start;
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index 36af1c16e7f..aec039d565b 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -87,6 +87,7 @@ $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_publisher->init(allows_streaming => 'logical');
$node_subscriber->init;
$node_publisher->start;
+$node_subscriber->append_conf('postgresql.conf', "max_logical_replication_workers = 10");
$node_subscriber->start;
$publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
my %remainder_a = (
--
2.49.0.windows.1
On 14/07/2025 4:20 am, Zhijie Hou (Fujitsu) wrote:
Thank you for the proposal ! I find it to be a very interesting feature。
I tested the patch you shared in your original email and encountered potential
deadlocks when testing pgbench TPC-B like workload. Could you please provide an
updated patch version so that I can conduct further performance experiments ?
Sorry, it was fixed in my repo: https://github.com/knizhnik/postgres/pull/3
Updated patch is attached.
Additionally, I was also exploring ways to improve performance and have tried an
alternative version of prefetch for experimentation. The alternative design is
that we assigns each non-streaming transaction to a parallel apply worker, while
strictly maintaining the order of commits. During parallel apply, if the
transactions that need to be committed before the current transaction are not
yet finished, the worker performs pre-fetch operations. Specifically, for
updates and deletes, the worker finds and caches the target local tuple to be
updated/deleted. Once all preceding transactions are committed, the parallel
apply worker uses these cached tuples to execute the actual updates or deletes.
What do you think about this alternative ? I think the alternative might offer
more stability in scenarios where shared buffer elimination occurs frequently
and avoids leaving dead tuples in the buffer. However, it also presents some
drawbacks, such as the need to add wait events to maintain commit order,
compared to the approach discussed in this thread.
So as far as I understand your PoC is doing the same as approach 1 in my
proposal - prefetch of replica identity, but it is done not by parallel
prefetch workers, but normal parallel apply workers when they have to
wait until previous transaction is committed. I consider it to be more
complex but may be more efficient than my approach.
The obvious drawback of both your's and my approaches is that it
prefetch only pages of primary index (replica identity). If there are
some other indexes which keys are changed by update, then pages of such
indexes will be read from the disk when you apply update. The same is
also true for insert (in this case you always has to include new tuple
in all indexes) - this is why I have also implemented another approach:
apply operation in prefetch worker and then rollback transaction.
Also I do not quite understand how you handle invalidations? Assume that
we have two transactions - T1 and T2:
T1: ... W1 Commit
T2: ... W1
So T1 writes tuple 1 and then commits transaction. Then T2 updates tuple 1.
If I correctly understand your approach, parallel apply worker for T2
will try to prefetch tuple 1 before T1 is committed.
But in this case it will get old version of the tuple. It is not a
problem if parallel apply worker will repeat lookup and not just use
cached tuple.
One more moment. As far as you assigns each non-streaming transaction to
a parallel apply worker, number of such transactions is limited by
assigns each non-streaming transaction to a parallel apply worker,umber
of background workers. Usually it is not so large (~10). So if there
were 100 parallel transactions and publishers, then at subscriber you
still be able to executed concurrently not more than few of them. In
this sense my approach with separate prefetch workers is more flexible:
each prefetch worker can prefetch as many operations as it can.
Attachments:
v2-0001-logical-replication-prefetch.patchtext/plain; charset=UTF-8; name=v2-0001-logical-replication-prefetch.patchDownload
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42d..ff7c4ed684d 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -131,7 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
* invoking table_tuple_lock.
*/
static bool
-should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
+should_refetch_tuple(TM_Result res, TM_FailureData *tmfd, LockTupleMode lockmode)
{
bool refetch = false;
@@ -141,22 +141,28 @@ should_refetch_tuple(TM_Result res, TM_FailureData *tmfd)
break;
case TM_Updated:
/* XXX: Improve handling here */
- if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
- else
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent update, retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ if (ItemPointerIndicatesMovedPartitions(&tmfd->ctid))
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
+ else
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent update, retrying")));
+ refetch = true;
+ }
break;
case TM_Deleted:
- /* XXX: Improve handling here */
- ereport(LOG,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("concurrent delete, retrying")));
- refetch = true;
+ if (lockmode != LockTupleTryExclusive)
+ {
+ /* XXX: Improve handling here */
+ ereport(LOG,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("concurrent delete, retrying")));
+ refetch = true;
+ }
break;
case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
@@ -236,8 +242,16 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
/* Found our tuple and it's not locked */
@@ -246,7 +260,7 @@ retry:
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -256,14 +270,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -395,16 +409,23 @@ retry:
*/
if (TransactionIdIsValid(xwait))
{
- XactLockTableWait(xwait, NULL, NULL, XLTW_None);
- goto retry;
+ if (lockmode == LockTupleTryExclusive)
+ {
+ found = false;
+ break;
+ }
+ else if (lockmode != LockTupleNoLock)
+ {
+ XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+ goto retry;
+ }
}
-
/* Found our tuple and it's not locked */
break;
}
/* Found tuple, try to lock it in the lockmode. */
- if (found)
+ if (found && lockmode != LockTupleNoLock)
{
TM_FailureData tmfd;
TM_Result res;
@@ -414,14 +435,14 @@ retry:
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
outslot,
GetCurrentCommandId(false),
- lockmode,
+ lockmode == LockTupleTryExclusive ? LockTupleExclusive : lockmode,
LockWaitBlock,
0 /* don't follow updates */ ,
&tmfd);
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, lockmode))
goto retry;
}
@@ -508,7 +529,7 @@ retry:
PopActiveSnapshot();
- if (should_refetch_tuple(res, &tmfd))
+ if (should_refetch_tuple(res, &tmfd, LockTupleShare))
goto retry;
return true;
@@ -560,7 +581,7 @@ CheckAndReportConflict(ResultRelInfo *resultRelInfo, EState *estate,
*/
void
ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
- EState *estate, TupleTableSlot *slot)
+ EState *estate, TupleTableSlot *slot, bool prefetch)
{
bool skip_tuple = false;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -604,7 +625,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
slot, estate, false,
- conflictindexes ? true : false,
+ conflictindexes || prefetch ? true : false,
&conflict,
conflictindexes, false);
@@ -623,7 +644,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
* be a frequent thing so we preferred to save the performance
* overhead of extra scan before each insertion.
*/
- if (conflict)
+ if (conflict && !prefetch)
CheckAndReportConflict(resultRelInfo, estate, CT_INSERT_EXISTS,
recheckIndexes, NULL, slot);
@@ -650,7 +671,7 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
void
ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
EState *estate, EPQState *epqstate,
- TupleTableSlot *searchslot, TupleTableSlot *slot)
+ TupleTableSlot *searchslot, TupleTableSlot *slot, bool prefetch)
{
bool skip_tuple = false;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -701,7 +722,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
if (resultRelInfo->ri_NumIndices > 0 && (update_indexes != TU_None))
recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
slot, estate, true,
- conflictindexes ? true : false,
+ conflictindexes || prefetch ? true : false,
&conflict, conflictindexes,
(update_indexes == TU_Summarizing));
@@ -710,7 +731,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
* ExecSimpleRelationInsert to understand why this check is done at
* this point.
*/
- if (conflict)
+ if (conflict && !prefetch)
CheckAndReportConflict(resultRelInfo, estate, CT_UPDATE_EXISTS,
recheckIndexes, searchslot, slot);
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d3515..d2c426ecab7 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -400,7 +400,7 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
* Try to get a parallel apply worker from the pool. If none is available then
* start a new one.
*/
-static ParallelApplyWorkerInfo *
+ParallelApplyWorkerInfo *
pa_launch_parallel_worker(void)
{
MemoryContext oldcontext;
@@ -729,6 +729,43 @@ ProcessParallelApplyInterrupts(void)
}
}
+
+static void
+pa_apply_dispatch(StringInfo s)
+{
+ if (MyParallelShared->do_prefetch)
+ {
+ PG_TRY();
+ {
+ apply_dispatch(s);
+ }
+ PG_CATCH();
+ {
+ HOLD_INTERRUPTS();
+
+ elog(DEBUG1, "Failed to prefetch LR operation");
+
+ /* TODO: should we somehow dump the error or just silently ignore it? */
+ /* EmitErrorReport(); */
+ FlushErrorState();
+
+ RESUME_INTERRUPTS();
+
+ lr_prefetch_errors += 1;
+ }
+ PG_END_TRY();
+ if (!prefetch_replica_identity_only)
+ {
+ /* We need to abort transaction to undo insert */
+ AbortCurrentTransaction();
+ }
+ }
+ else
+ {
+ apply_dispatch(s);
+ }
+}
+
/* Parallel apply worker main loop. */
static void
LogicalParallelApplyLoop(shm_mq_handle *mqh)
@@ -794,7 +831,7 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
*/
s.cursor += SIZE_STATS_MESSAGE;
- apply_dispatch(&s);
+ pa_apply_dispatch(&s);
}
else if (shmq_res == SHM_MQ_WOULD_BLOCK)
{
@@ -943,20 +980,27 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializingApplyWorker = false;
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ if (!MyParallelShared->do_prefetch)
+ {
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
originname, sizeof(originname));
- originid = replorigin_by_name(originname, false);
-
- /*
- * The parallel apply worker doesn't need to monopolize this replication
- * origin which was already acquired by its leader process.
- */
- replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
- CommitTransactionCommand();
+ originid = replorigin_by_name(originname, false);
+ /*
+ * The parallel apply worker doesn't need to monopolize this replication
+ * origin which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();
+ }
+ else
+ {
+ /* Do not write WAL for prefetch */
+ wal_level = WAL_LEVEL_MINIMAL;
+ }
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -1149,8 +1193,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
shm_mq_result result;
TimestampTz startTime = 0;
- Assert(!IsTransactionState());
- Assert(!winfo->serialize_changes);
+ if (!winfo->shared->do_prefetch)
+ {
+ Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);
+ }
/*
* We don't try to send data to parallel worker for 'immediate' mode. This
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfcebb..ff2eaad5462 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
+int max_parallel_prefetch_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fd11805a44c..8f1bec5c529 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
/* Are we initializing an apply worker? */
bool InitializingApplyWorker = false;
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = true;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -329,6 +341,11 @@ bool InitializingApplyWorker = false;
static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr;
#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
+/*
+ * If operation is performed by parallel prefetch worker
+ */
+#define is_prefetching() (am_parallel_apply_worker() && MyParallelShared->do_prefetch)
+
/* BufFile handle of the current streaming file */
static BufFile *stream_fd = NULL;
@@ -556,6 +573,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
TransApplyAction apply_action;
StringInfoData original_msg;
+ if (is_prefetching())
+ {
+ return false;
+ }
+
apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
@@ -2380,6 +2402,27 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
RelationGetRelationName(rel))));
}
+#define SAFE_APPLY(call) \
+ if (is_prefetching()) \
+ { \
+ PG_TRY(); \
+ { \
+ call; \
+ } \
+ PG_CATCH(); \
+ { \
+ HOLD_INTERRUPTS(); \
+ elog(DEBUG1, "Failed to prefetch LR operation");\
+ FlushErrorState(); \
+ RESUME_INTERRUPTS(); \
+ lr_prefetch_errors += 1; \
+ } \
+ PG_END_TRY(); \
+ } else { \
+ call; \
+ }
+
+
/*
* Handle INSERT message.
*/
@@ -2453,7 +2496,7 @@ apply_handle_insert(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_insert_internal(edata, relinfo, remoteslot);
+ SAFE_APPLY(apply_handle_insert_internal(edata, relinfo, remoteslot));
ExecCloseIndices(relinfo);
}
@@ -2487,13 +2530,34 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
!relinfo->ri_RelationDesc->rd_rel->relhasindex ||
RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
- /* Caller will not have done this bit. */
- Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
- InitConflictIndexes(relinfo);
+ if (is_prefetching() && prefetch_replica_identity_only)
+ {
+ TupleTableSlot *localslot = NULL;
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+ Relation localrel = relinfo->ri_RelationDesc;
+ EPQState epqstate;
- /* Do the insert. */
- TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
- ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
+
+ (void)FindReplTupleInLocalRel(edata, localrel,
+ &relmapentry->remoterel,
+ relmapentry->localindexoid,
+ remoteslot, &localslot);
+ }
+ else
+ {
+ /* Caller will not have done this bit. */
+ Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+ InitConflictIndexes(relinfo);
+
+ /* Do the insert. */
+ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+ ExecSimpleRelationInsert(relinfo, estate, remoteslot, is_prefetching());
+ }
+ if (is_prefetching())
+ {
+ lr_prefetch_inserts += 1;
+ }
}
/*
@@ -2637,8 +2701,8 @@ apply_handle_update(StringInfo s)
apply_handle_tuple_routing(edata,
remoteslot, &newtup, CMD_UPDATE);
else
- apply_handle_update_internal(edata, edata->targetRelInfo,
- remoteslot, &newtup, rel->localindexoid);
+ SAFE_APPLY(apply_handle_update_internal(edata, edata->targetRelInfo,
+ remoteslot, &newtup, rel->localindexoid));
finish_edata(edata);
@@ -2682,6 +2746,16 @@ apply_handle_update_internal(ApplyExecutionData *edata,
localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ if (prefetch_replica_identity_only)
+ goto Cleanup;
+ }
+
/*
* Tuple found.
*
@@ -2722,7 +2796,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
/* Do the actual update. */
TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_UPDATE);
ExecSimpleRelationUpdate(relinfo, estate, &epqstate, localslot,
- remoteslot);
+ remoteslot, is_prefetching());
}
else
{
@@ -2739,7 +2813,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
remoteslot, newslot, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
}
@@ -2820,8 +2894,8 @@ apply_handle_delete(StringInfo s)
ResultRelInfo *relinfo = edata->targetRelInfo;
ExecOpenIndices(relinfo, false);
- apply_handle_delete_internal(edata, relinfo,
- remoteslot, rel->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata, relinfo,
+ remoteslot, rel->localindexoid));
ExecCloseIndices(relinfo);
}
@@ -2867,6 +2941,15 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
remoteslot, &localslot);
+ if (is_prefetching())
+ {
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ goto Cleanup;
+ }
+
/* If found delete it. */
if (found)
{
@@ -2900,7 +2983,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
remoteslot, NULL, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
EvalPlanQualEnd(&epqstate);
}
@@ -2921,6 +3004,8 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
EState *estate = edata->estate;
bool found;
+ LockTupleMode lockmode = is_prefetching() ? prefetch_replica_identity_only ? LockTupleNoLock : LockTupleTryExclusive : LockTupleExclusive;
+
/*
* Regardless of the top-level operation, we're performing a read here, so
* check for SELECT privileges.
@@ -2946,11 +3031,11 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
#endif
found = RelationFindReplTupleByIndex(localrel, localidxoid,
- LockTupleExclusive,
+ lockmode,
remoteslot, *localslot);
}
else
- found = RelationFindReplTupleSeq(localrel, LockTupleExclusive,
+ found = RelationFindReplTupleSeq(localrel, lockmode,
remoteslot, *localslot);
return found;
@@ -3041,14 +3126,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
switch (operation)
{
case CMD_INSERT:
- apply_handle_insert_internal(edata, partrelinfo,
- remoteslot_part);
+ SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo,
+ remoteslot_part));
break;
case CMD_DELETE:
- apply_handle_delete_internal(edata, partrelinfo,
- remoteslot_part,
- part_entry->localindexoid);
+ SAFE_APPLY(apply_handle_delete_internal(edata, partrelinfo,
+ remoteslot_part,
+ part_entry->localindexoid));
break;
case CMD_UPDATE:
@@ -3076,6 +3161,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot = localslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
@@ -3101,6 +3189,9 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
{
TupleTableSlot *newslot;
+ if (is_prefetching())
+ return;
+
/* Store the new tuple for conflict reporting */
newslot = table_slot_create(partrel, &estate->es_tupleTable);
slot_store_data(newslot, part_entry, newtup);
@@ -3144,7 +3235,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
TargetPrivilegesCheck(partrelinfo->ri_RelationDesc,
ACL_UPDATE);
ExecSimpleRelationUpdate(partrelinfo, estate, &epqstate,
- localslot, remoteslot_part);
+ localslot, remoteslot_part, is_prefetching());
}
else
{
@@ -3217,8 +3308,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
slot_getallattrs(remoteslot);
}
MemoryContextSwitchTo(oldctx);
- apply_handle_insert_internal(edata, partrelinfo_new,
- remoteslot_part);
+ SAFE_APPLY(apply_handle_insert_internal(edata, partrelinfo_new,
+ remoteslot_part));
}
EvalPlanQualEnd(&epqstate);
@@ -3552,7 +3643,6 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn)
MemoryContextSwitchTo(ApplyMessageContext);
}
-
/* Update statistics of the worker. */
static void
UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
@@ -3567,6 +3657,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
}
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+ ParallelApplyWorkerInfo* winfo;
+
+ if (buf[0] != 'w')
+ return;
+
+ switch (buf[MSG_CODE_OFFSET])
+ {
+ case LOGICAL_REP_MSG_INSERT:
+ case LOGICAL_REP_MSG_UPDATE:
+ case LOGICAL_REP_MSG_DELETE:
+ /* Round robin prefetch worker */
+ winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers];
+ pa_send_data(winfo, len, buf);
+ break;
+
+ case LOGICAL_REP_MSG_TYPE:
+ case LOGICAL_REP_MSG_RELATION:
+ /* broadcast to all prefetch workers */
+ for (int i = 0; i < n_prefetch_workers; i++)
+ {
+ winfo = prefetch_worker[i];
+ pa_send_data(winfo, len, buf);
+ }
+ break;
+
+ default:
+ /* Ignore other messages */
+ break;
+ }
+}
+
/*
* Apply main loop.
*/
@@ -3577,6 +3703,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ char* prefetch_buf = NULL;
+ size_t prefetch_buf_pos = 0;
+ size_t prefetch_buf_used = 0;
+ size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3724,25 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
+ if (max_parallel_prefetch_workers_per_subscription != 0)
+ {
+ int i;
+ for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++)
+ {
+ prefetch_worker[i] = pa_launch_parallel_worker();
+ if (!prefetch_worker[i])
+ {
+ elog(LOG, "Launch only %d prefetch workers from %d",
+ i, max_parallel_prefetch_workers_per_subscription);
+ break;
+ }
+ prefetch_worker[i]->in_use = true;
+ prefetch_worker[i]->shared->do_prefetch = true;
+ }
+ n_prefetch_workers = i;
+ prefetch_buf = palloc(prefetch_buf_size);
+ }
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -3611,9 +3760,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
- int len;
+ int32 len;
char *buf = NULL;
bool endofstream = false;
+ bool no_more_data = false;
long wait_time;
CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3772,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
- if (len != 0)
+ /* Loop to process all available data (without blocking). */
+ for (;;)
{
- /* Loop to process all available data (without blocking). */
- for (;;)
- {
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- if (len == 0)
+ if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used)
+ {
+ prefetch_buf_used = 0;
+ do
{
- break;
- }
- else if (len < 0)
+ if (prefetch_buf_used + len + 4 > prefetch_buf_size)
+ {
+ prefetch_buf_size *= 2;
+ elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size);
+ prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size);
+ }
+ memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+ memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+ prefetch_buf_used += 4 + len;
+ if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE)
+ break;
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ } while (len > 0);
+
+ no_more_data = len <= 0;
+
+ for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len)
{
- ereport(LOG,
- (errmsg("data stream from publisher has ended")));
- endofstream = true;
- break;
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
}
- else
- {
- int c;
- StringInfoData s;
+ memcpy(&len, prefetch_buf, 4);
+ buf = &prefetch_buf[4];
+ prefetch_buf_pos = len + 4;
+ }
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
- /* Reset timeout. */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
- /* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyMessageContext);
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
- initReadOnlyStringInfo(&s, buf, len);
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
- c = pq_getmsgbyte(&s);
+ initReadOnlyStringInfo(&s, buf, len);
- if (c == 'w')
- {
- XLogRecPtr start_lsn;
- XLogRecPtr end_lsn;
- TimestampTz send_time;
+ c = pq_getmsgbyte(&s);
- start_lsn = pq_getmsgint64(&s);
- end_lsn = pq_getmsgint64(&s);
- send_time = pq_getmsgint64(&s);
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
- if (last_received < start_lsn)
- last_received = start_lsn;
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time = pq_getmsgint64(&s);
- if (last_received < end_lsn)
- last_received = end_lsn;
+ if (last_received < start_lsn)
+ last_received = start_lsn;
- UpdateWorkerStats(last_received, send_time, false);
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- apply_dispatch(&s);
- }
- else if (c == 'k')
- {
- XLogRecPtr end_lsn;
- TimestampTz timestamp;
- bool reply_requested;
+ UpdateWorkerStats(last_received, send_time, false);
- end_lsn = pq_getmsgint64(&s);
- timestamp = pq_getmsgint64(&s);
- reply_requested = pq_getmsgbyte(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr end_lsn;
+ TimestampTz timestamp;
+ bool reply_requested;
- if (last_received < end_lsn)
- last_received = end_lsn;
+ end_lsn = pq_getmsgint64(&s);
+ timestamp = pq_getmsgint64(&s);
+ reply_requested = pq_getmsgbyte(&s);
- send_feedback(last_received, reply_requested, false);
- UpdateWorkerStats(last_received, timestamp, true);
- }
- /* other message types are purposefully ignored */
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- MemoryContextReset(ApplyMessageContext);
+ send_feedback(last_received, reply_requested, false);
+ UpdateWorkerStats(last_received, timestamp, true);
}
+ /* other message types are purposefully ignored */
+ MemoryContextReset(ApplyMessageContext);
+ }
+ if (prefetch_buf_pos < prefetch_buf_used)
+ {
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ buf = &prefetch_buf[prefetch_buf_pos + 4];
+ prefetch_buf_pos += 4 + len;
+ }
+ else if (prefetch_buf_used != 0 && no_more_data)
+ {
+ break;
+ }
+ else
+ {
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..3b254898663 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
+#include "replication/worker_internal.h"
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"prefetch_replica_identity_only",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."),
+ NULL,
+ },
+ &prefetch_replica_identity_only,
+ true,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"max_parallel_prefetch_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel prefetch workers per subscription."),
+ NULL,
+ },
+ &max_parallel_prefetch_workers_per_subscription,
+ 2, 0, MAX_LR_PREFETCH_WORKERS,
+ NULL, NULL, NULL
+ },
+
{
{"max_active_replication_origins",
PGC_POSTMASTER,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544d..fbe705bfe3f 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -761,10 +761,10 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
TupleTableSlot *searchslot, TupleTableSlot *outslot);
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
- EState *estate, TupleTableSlot *slot);
+ EState *estate, TupleTableSlot *slot, bool prefetch);
extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
EState *estate, EPQState *epqstate,
- TupleTableSlot *searchslot, TupleTableSlot *slot);
+ TupleTableSlot *searchslot, TupleTableSlot *slot, bool prefetch);
extern void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
EState *estate, EPQState *epqstate,
TupleTableSlot *searchslot);
diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h
index 0b534e30603..88f5d2e4cc5 100644
--- a/src/include/nodes/lockoptions.h
+++ b/src/include/nodes/lockoptions.h
@@ -56,6 +56,10 @@ typedef enum LockTupleMode
LockTupleNoKeyExclusive,
/* SELECT FOR UPDATE, UPDATEs that modify key columns, and DELETE */
LockTupleExclusive,
+ /* Do not lock tuple */
+ LockTupleNoLock,
+ /* Try explusive lock, silent give up in case of conflict */
+ LockTupleTryExclusive,
} LockTupleMode;
#endif /* LOCKOPTIONS_H */
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f3305..19d1a8d466b 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952c..c6745e77efc 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -180,6 +180,11 @@ typedef struct ParallelApplyWorkerShared
*/
PartialFileSetState fileset_state;
FileSet fileset;
+
+ /*
+ * Prefetch worker
+ */
+ bool do_prefetch;
} ParallelApplyWorkerShared;
/*
@@ -237,6 +242,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
@@ -326,10 +339,13 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+extern void pa_prefetch_handle_modification(StringInfo s, LogicalRepMsgType action);
+
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
#define isTablesyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_parallel_worker(void);
static inline bool
am_tablesync_worker(void)
On Sun, Jul 13, 2025 at 6:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
On 13/07/2025 1:28 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends.BTW, do you know how users deal with this lag? For example, one can
imagine creating multiple pub-sub pairs for different sets of tables
so that the workload on the subscriber could also be shared by
multiple apply workers. I can also think of splitting the workload
among multiple pub-sub pairs by using row filtersYes, I saw that users starts several subscriptions/publications to
receive and apply changes in parallel.
But it can not be considered as universal solution:
1. Not always there are multiple tables (or partitions of one one table)
so that it it possible to split them between multiple publications.
2. It violates transactional behavior (consistency): if transactions
update several tables included in different publications then applying
this changes independently, we can observe at replica behaviour when one
table is update - and another - not. The same is true for row filters.
3. Each walsender will have to scan WAL, so having N subscriptions we
have to read and decode WAL N times.
I agree that it is not a solution which can be applied in all cases
and neither I want to say that we shouldn't pursue the idea of
prefetch or parallel apply to improve the speed of apply. It was just
to know/discuss how users try to workaround lag for cases where the
lag is large.
--
With Regards,
Amit Kapila.
On Sun, Jul 13, 2025 at 5:59 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
Certainly originally intended use case was different: parallel apply is
performed only for large transactions. Number of of such transactions is
not so big and
so there should be enough parallel apply workers in pool to proceed
them. And if there are not enough workers, it is not a problem to spawn
new one and terminate
it after completion of transaction (because transaction is long,
overhead of spawning process is not so larger comparing with redo of
large transaction).
Right.
But if we want to efficiently replicate OLTP workload, then we
definitely need some other approach.
Agreed, for simplicity, for now we can have a GUC to decide the size
of the pool. There is a note in the code for this as well, see: " XXX
This worker pool threshold is arbitrary and we can provide a GUC
variable for this in the future if required. I think we can think of
some dynamic strategy where we remove from the pool if the workers are
not in use for some threshold period of time or something on those
lines. But at this stage it is better to use something simple and try
to come up with a good way to perform pre-fetch or parallelization of
short transactions.
--
With Regards,
Amit Kapila.
On Monday, July 14, 2025 2:36 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
On 14/07/2025 4:20 am, Zhijie Hou (Fujitsu) wrote:
Additionally, I was also exploring ways to improve performance and have tried an
alternative version of prefetch for experimentation. The alternative design is
that we assigns each non-streaming transaction to a parallel apply worker, while
strictly maintaining the order of commits. During parallel apply, if the
transactions that need to be committed before the current transaction are not
yet finished, the worker performs pre-fetch operations. Specifically, for
updates and deletes, the worker finds and caches the target local tuple to be
updated/deleted. Once all preceding transactions are committed, the parallel
apply worker uses these cached tuples to execute the actual updates or deletes.
What do you think about this alternative ? I think the alternative might offer
more stability in scenarios where shared buffer elimination occurs frequently
and avoids leaving dead tuples in the buffer. However, it also presents some
drawbacks, such as the need to add wait events to maintain commit order,
compared to the approach discussed in this thread.So as far as I understand your PoC is doing the same as approach 1 in my
proposal - prefetch of replica identity, but it is done not by parallel prefetch
workers, but normal parallel apply workers when they have to wait until previous
transaction is committed. I consider it to be more complex but may be more
efficient than my approach.The obvious drawback of both your's and my approaches is that it prefetch only
pages of primary index (replica identity). If there are some other indexes
which keys are changed by update, then pages of such indexes will be read from
the disk when you apply update. The same is also true for insert (in this case
you always has to include new tuple in all indexes) - this is why I have also
implemented another approach: apply operation in prefetch worker and then
rollback transaction.
Thank you for your reply! I agree that indexes other than RI do not benefit from
the pre-fetch. Regarding the apply operation and rollback approach, I have some
concerns about the possible side effects, particularly the accumulation of dead
tuples in the shared buffer. This is because all changes are performed by the
pre-fetch worker at once before being aborted. I haven't delved deeply into this
yet, but do you think this could potentially introduce additional overhead ?
Also I do not quite understand how you handle invalidations?
During the pre-fetch phase of my patch, the execution of table_tuple_lock() is
postponed until all preceding transactions have been finalized. If the cached
tuple was modified by other transactions, table_tuple_lock() will return
TM_Updated, signifying that the cached tuple is no longer valid. In these cases,
the parallel apply worker will re-fetch the tuple.
Assume that we have two transactions - T1 and T2:
T1: ... W1 Commit
T2: ... W1So T1 writes tuple 1 and then commits transaction. Then T2 updates tuple 1.
If I correctly understand your approach, parallel apply worker for T2 will try
to prefetch tuple 1 before T1 is committed.But in this case it will get old version of the tuple. It is not a problem if
parallel apply worker will repeat lookup and not just use cached tuple.
Yes, it is done like that.
One more moment. As far as you assigns each non-streaming transaction to a
parallel apply worker, number of such transactions is limited by assigns each
non-streaming transaction to a parallel apply worker,umber of background
workers. Usually it is not so large (~10). So if there were 100 parallel
transactions and publishers, then at subscriber you still be able to executed
concurrently not more than few of them. In this sense my approach with
separate prefetch workers is more flexible: each prefetch worker can prefetch
as many operations as it can.
Yes, that's true. I have been analyzing some performance issues in logical
replication, specifically under scenarios where both the publisher and
subscriber are subjected to high workloads. In these situations, the shared
buffer is frequent updated, prompting me to consider the alternative approach I
mentioned. I plan to perform additional tests and analysis on these approaches,
thanks !
Best Regards,
Hou zj
On Mon, Jul 14, 2025 at 3:13 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Sun, Jul 13, 2025 at 6:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
On 13/07/2025 1:28 pm, Amit Kapila wrote:
On Tue, Jul 8, 2025 at 12:06 PM Konstantin Knizhnik <knizhnik@garret.ru> wrote:
There is well known Postgres problem that logical replication subscriber
can not caught-up with publisher just because LR changes are applied by
single worker and at publisher changes are made by
multiple concurrent backends.BTW, do you know how users deal with this lag? For example, one can
imagine creating multiple pub-sub pairs for different sets of tables
so that the workload on the subscriber could also be shared by
multiple apply workers. I can also think of splitting the workload
among multiple pub-sub pairs by using row filtersYes, I saw that users starts several subscriptions/publications to
receive and apply changes in parallel.
But it can not be considered as universal solution:
1. Not always there are multiple tables (or partitions of one one table)
so that it it possible to split them between multiple publications.
2. It violates transactional behavior (consistency): if transactions
update several tables included in different publications then applying
this changes independently, we can observe at replica behaviour when one
table is update - and another - not. The same is true for row filters.
3. Each walsender will have to scan WAL, so having N subscriptions we
have to read and decode WAL N times.I agree that it is not a solution which can be applied in all cases
and neither I want to say that we shouldn't pursue the idea of
prefetch or parallel apply to improve the speed of apply. It was just
to know/discuss how users try to workaround lag for cases where the
lag is large.
If you are interested, I would like to know your opinion on a somewhat
related topic, which has triggered my interest in your patch. We are
working on an update_delete conflict detection patch. The exact
problem was explained in the initial email [1]/messages/by-id/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com. The basic idea to
resolve the problem is that on the subscriber, we maintain a slot that
will help in retaining dead tuples for a certain period of time till
the concurrent transactions have been applied to the subscriber. You
can read the commit message of the first patch in email [2]/messages/by-id/OS0PR01MB5716ECC539008C85E7AB65C5944FA@OS0PR01MB5716.jpnprd01.prod.outlook.com. Now, the
problem we are facing is that because of replication LAG in a scenario
similar to what we are discussing here, such that when there are many
clients on the publisher and a single apply worker on the subscriber,
the slot takes more time to get advanced. This will lead to retention
of dead tuples, which further slows down apply worker especially for
update workloads. Apart from apply, the other transactions running on
the system (say pgbench kind of workload on the subscriber) also
became slower because of the retention of dead tuples.
Now, for the workloads where the LAG is not there, like when one
splits the workload with options mentioned above (split workload among
pub-sub in some way) or the workload doesn't consist of a large number
of clients operating on the publisher and subscriber at the same time,
etc. we don't observe any major slowdown on the subscriber.
We would like to solicit your opinion as you seem to have some
experience with LR users, whether one can use this feature in cases
where required by enabling it at the subscription level. They will
have the facility to disable it if they face any performance
regression or additional bloat. Now, after having that feature, we can
work on additional features such as prefetch or parallel apply that
will reduce the chances of LAG, making the feature more broadly used.
Does that sound reasonable to you? Feel free to ignore giving your
opinion if you are not interested in that work.
[1]: /messages/by-id/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
[2]: /messages/by-id/OS0PR01MB5716ECC539008C85E7AB65C5944FA@OS0PR01MB5716.jpnprd01.prod.outlook.com
--
With Regards,
Amit Kapila.
On 15/07/2025 2:31 PM, Amit Kapila wrote:
If you are interested, I would like to know your opinion on a somewhat
related topic, which has triggered my interest in your patch. We are
working on an update_delete conflict detection patch. The exact
problem was explained in the initial email [1]. The basic idea to
resolve the problem is that on the subscriber, we maintain a slot that
will help in retaining dead tuples for a certain period of time till
the concurrent transactions have been applied to the subscriber. You
can read the commit message of the first patch in email [2]. Now, the
problem we are facing is that because of replication LAG in a scenario
similar to what we are discussing here, such that when there are many
clients on the publisher and a single apply worker on the subscriber,
the slot takes more time to get advanced. This will lead to retention
of dead tuples, which further slows down apply worker especially for
update workloads. Apart from apply, the other transactions running on
the system (say pgbench kind of workload on the subscriber) also
became slower because of the retention of dead tuples.Now, for the workloads where the LAG is not there, like when one
splits the workload with options mentioned above (split workload among
pub-sub in some way) or the workload doesn't consist of a large number
of clients operating on the publisher and subscriber at the same time,
etc. we don't observe any major slowdown on the subscriber.We would like to solicit your opinion as you seem to have some
experience with LR users, whether one can use this feature in cases
where required by enabling it at the subscription level. They will
have the facility to disable it if they face any performance
regression or additional bloat. Now, after having that feature, we can
work on additional features such as prefetch or parallel apply that
will reduce the chances of LAG, making the feature more broadly used.
Does that sound reasonable to you? Feel free to ignore giving your
opinion if you are not interested in that work.[1] - /messages/by-id/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com
[2] - /messages/by-id/OS0PR01MB5716ECC539008C85E7AB65C5944FA@OS0PR01MB5716.jpnprd01.prod.outlook.com
I am very sorry for delay with answer - it was very busy week.
I hope that I understand the problem and proposed approach to solve it
(it actually seems to be quite straightforward and similar with
`hot_standby_feedback`). And definitely suffering from the same problem:
blown database because of lagged slots.
But it really hard to propose some other solution (rather than backward
scan of WAL, but it seems to be completely unacceptable).
Concerning user's experience... First of all disclaimer: I am first of
all programmer and not DBA. Yes, I have investigated many support cases,
but still it is hard to expect that I have the full picture. There is
even no consensus concerning `hot_standby_feedback`! It is still
disabled by default in Postgres and in Neon. It makes sense for vanilla
Postgres, where replicas are first of all used for HA and only secondary
- for load balancing of read only queries.
But in Neon HA is provided in different way and the only sense of
creating RO replicas is load balancing (mostly for OLAP queries).
Execution of heavy OLAP queries without `hot_stanbdy_feedback` is some
kind of "russian roulette", because probability of conflict with
recovery is very high. But still we are using Postgres default.
But situation with feedback slots may be different: as far as I
understand it is mostly needed for bidirectional replication and
automatic conflict resolution.
So it is assumed as part of some distributed system (like BDR), rather
than feature used directly by Postgres users.
I still a little bit depressed by complexity of LR and all related
aspects. But unlikely it is possible to invent something more elegant
and simpler:)
Completely rewritten version of prefetch patch.
Now prefetch workers do not try to apply LR application and then
rollback transaction.
They just perform indexes lookup and so prefetch index and referenced
heap pages.
So no any hacks are needed to prevent lock conflicts and WAL logging.
Performance results are the following (test scenario was explained in
previous message as well as used schell scripts):
update:
prefetch (2): 5:20
prefetch (4): 3:20
prefetch (8): 2:05
no prefetch: 8:30
insert:
pk (4) prefetch: 9:55
pk+sk(4) prefetch: 5:20
pk+sk(8) prefetch: 3:08
no prefetch: 9:20
The number in parentheses specifies number of prefetch workers.
For example to spawn 8 prefetch workers I used the following settings in
postgresql.conf.replica:
prefetch_replica_identity_only=off
max_worker_processes=16
max_logical_replication_workers=16
max_parallel_apply_workers_per_subscription=8
max_parallel_prefetch_workers_per_subscription=8
port=54322
Also I run continuous test with long (3 hours) updates workload on
publisher with logical replication to subscriber.
And with 8 prefetch workers replica is able to caught up with primary
where 10 backends are performing update!
After the end of this updates replica was at the same state as primary
while without prefetch it proceed only 1/2 of
generated WAL and it takes another 5:30 hours to catch up.
Attachments:
v3-0001-logical-replication-prefetch.patchtext/plain; charset=UTF-8; name=v3-0001-logical-replication-prefetch.patchDownload
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 53ddd25c42..3c50f17227 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -275,6 +275,47 @@ retry:
return found;
}
+/*
+ * Search the relation 'rel' for tuple using the index.
+ * Returns true if tuple is found.
+ */
+bool
+RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot,
+ TupleTableSlot *outslot)
+{
+ ScanKeyData skey[INDEX_MAX_KEYS];
+ int skey_attoff;
+ IndexScanDesc scan;
+ SnapshotData snap;
+ Relation idxrel;
+ bool found;
+
+ /* Do not do prefetch when there is no index */
+ if (!OidIsValid(idxoid))
+ return false;
+
+ /* Open the index. */
+ idxrel = index_open(idxoid, AccessShareLock);
+
+ InitDirtySnapshot(snap);
+
+ /* Build scan key. */
+ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+ /* Start an index scan. */
+ scan = index_beginscan(rel, idxrel, &snap, NULL, skey_attoff, 0);
+ index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+ /* Try to find the tuple */
+ found = index_getnext_slot(scan, ForwardScanDirection, outslot);
+
+ /* Cleanup */
+ index_endscan(scan);
+ index_close(idxrel, AccessShareLock);
+
+ return found;
+}
+
/*
* Compare the tuples in the slots by checking if they have equal values.
*/
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d25085d351..a207a4acdc 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -396,6 +396,57 @@ pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
return true;
}
+/*
+ * Try to get a parallel prefetch worker.
+ */
+ParallelApplyWorkerInfo *
+pa_launch_prefetch_worker(void)
+{
+ MemoryContext oldcontext;
+ bool launched;
+ ParallelApplyWorkerInfo *winfo;
+
+ /*
+ * Start a new parallel prefetch worker.
+ *
+ * The worker info can be used for the lifetime of the worker process, so
+ * create it in a permanent context.
+ */
+ oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+ winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
+
+ /* Setup shared memory. */
+ if (!pa_setup_dsm(winfo))
+ {
+ MemoryContextSwitchTo(oldcontext);
+ pfree(winfo);
+ return NULL;
+ }
+
+ launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_PREFETCH,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ InvalidOid,
+ dsm_segment_handle(winfo->dsm_seg));
+
+ if (launched)
+ {
+ winfo->do_prefetch = true;
+ }
+ else
+ {
+ pa_free_worker_info(winfo);
+ winfo = NULL;
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return winfo;
+}
+
/*
* Try to get a parallel apply worker from the pool. If none is available then
* start a new one.
@@ -943,20 +994,22 @@ ParallelApplyWorkerMain(Datum main_arg)
InitializingApplyWorker = false;
- /* Setup replication origin tracking. */
- StartTransactionCommand();
- ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
+ if (am_parallel_apply_worker())
+ {
+ /* Setup replication origin tracking. */
+ StartTransactionCommand();
+ ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
originname, sizeof(originname));
- originid = replorigin_by_name(originname, false);
-
- /*
- * The parallel apply worker doesn't need to monopolize this replication
- * origin which was already acquired by its leader process.
- */
- replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
- CommitTransactionCommand();
+ originid = replorigin_by_name(originname, false);
+ /*
+ * The parallel apply worker doesn't need to monopolize this replication
+ * origin which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();
+ }
/*
* Setup callback for syscache so that we know when something changes in
* the subscription relation state.
@@ -1149,8 +1202,11 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
shm_mq_result result;
TimestampTz startTime = 0;
- Assert(!IsTransactionState());
- Assert(!winfo->serialize_changes);
+ if (!winfo->do_prefetch)
+ {
+ Assert(!IsTransactionState());
+ Assert(!winfo->serialize_changes);
+ }
/*
* We don't try to send data to parallel worker for 'immediate' mode. This
@@ -1519,6 +1575,9 @@ pa_get_fileset_state(void)
{
PartialFileSetState fileset_state;
+ if (am_parallel_prefetch_worker())
+ return FS_EMPTY;
+
Assert(am_parallel_apply_worker());
SpinLockAcquire(&MyParallelShared->mutex);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 4aed0dfceb..ed6c057cec 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -50,6 +50,7 @@
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
+int max_parallel_prefetch_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@@ -257,7 +258,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
/* Skip parallel apply workers. */
- if (isParallelApplyWorker(w))
+ if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w))
continue;
if (w->in_use && w->subid == subid && w->relid == relid &&
@@ -322,6 +323,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
TimestampTz now;
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+ bool is_parallel_prefetch_worker = (wtype == WORKERTYPE_PARALLEL_PREFETCH);
/*----------
* Sanity checks:
@@ -331,7 +333,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
*/
Assert(wtype != WORKERTYPE_UNKNOWN);
Assert(is_tablesync_worker == OidIsValid(relid));
- Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+ Assert((is_parallel_apply_worker|is_parallel_prefetch_worker) == (subworker_dsm != DSM_HANDLE_INVALID));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -452,8 +454,8 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
- worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
- worker->parallel_apply = is_parallel_apply_worker;
+ worker->leader_pid = (is_parallel_apply_worker|is_parallel_prefetch_worker) ? MyProcPid : InvalidPid;
+ worker->parallel_apply = is_parallel_apply_worker|is_parallel_prefetch_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -492,6 +494,16 @@ retry:
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
break;
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication parallel prefetch worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+
+ memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+ break;
+
case WORKERTYPE_TABLESYNC:
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -626,7 +638,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
if (worker)
{
- Assert(!isParallelApplyWorker(worker));
+ Assert(!isParallelApplyWorker(worker) && !isParallelPrefetchWorker(worker));
logicalrep_worker_stop_internal(worker, SIGTERM);
}
@@ -774,7 +786,7 @@ logicalrep_worker_detach(void)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
- if (isParallelApplyWorker(w))
+ if (isParallelApplyWorker(w) || isParallelPrefetchWorker(w))
logicalrep_worker_stop_internal(w, SIGTERM);
}
@@ -1369,6 +1381,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
case WORKERTYPE_PARALLEL_APPLY:
values[9] = CStringGetTextDatum("parallel apply");
break;
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ values[9] = CStringGetTextDatum("parallel prefetch");
+ break;
case WORKERTYPE_TABLESYNC:
values[9] = CStringGetTextDatum("table synchronization");
break;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c90f23ee5b..f965317529 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -681,6 +681,9 @@ process_syncing_tables(XLogRecPtr current_lsn)
*/
break;
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ break;
+
case WORKERTYPE_TABLESYNC:
process_syncing_tables_for_sync(current_lsn);
break;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fd11805a44..db1f8bcebd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -311,6 +311,18 @@ static uint32 parallel_stream_nchanges = 0;
/* Are we initializing an apply worker? */
bool InitializingApplyWorker = false;
+#define INIT_PREFETCH_BUF_SIZE (128*1024)
+static ParallelApplyWorkerInfo* prefetch_worker[MAX_LR_PREFETCH_WORKERS];
+static int prefetch_worker_rr = 0;
+static int n_prefetch_workers;
+
+bool prefetch_replica_identity_only = false;
+
+size_t lr_prefetch_hits;
+size_t lr_prefetch_misses;
+size_t lr_prefetch_errors;
+size_t lr_prefetch_inserts;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -482,6 +494,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));
+ case WORKERTYPE_PARALLEL_PREFETCH:
+ return true;
+
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
@@ -556,6 +571,11 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
TransApplyAction apply_action;
StringInfoData original_msg;
+ if (am_parallel_prefetch_worker())
+ {
+ return false;
+ }
+
apply_action = get_transaction_apply_action(stream_xid, &winfo);
/* not in streaming mode */
@@ -2487,13 +2507,36 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
!relinfo->ri_RelationDesc->rd_rel->relhasindex ||
RelationGetIndexList(relinfo->ri_RelationDesc) == NIL);
- /* Caller will not have done this bit. */
- Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
- InitConflictIndexes(relinfo);
+ if (am_parallel_prefetch_worker())
+ {
+ Relation localrel = relinfo->ri_RelationDesc;
+ TupleTableSlot *localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+ if (prefetch_replica_identity_only)
+ {
+ (void)RelationPrefetchIndex(localrel, relmapentry->localindexoid, remoteslot, localslot);
+ }
+ else
+ {
+ for (int i = 0; i < relinfo->ri_NumIndices; i++)
+ {
+ Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+ (void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot);
+ }
+ }
+ lr_prefetch_inserts += 1;
+ }
+ else
+ {
+ /* Caller will not have done this bit. */
+ Assert(relinfo->ri_onConflictArbiterIndexes == NIL);
+ InitConflictIndexes(relinfo);
- /* Do the insert. */
- TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
- ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ /* Do the insert. */
+ TargetPrivilegesCheck(relinfo->ri_RelationDesc, ACL_INSERT);
+ ExecSimpleRelationInsert(relinfo, estate, remoteslot);
+ }
}
/*
@@ -2677,6 +2720,32 @@ apply_handle_update_internal(ApplyExecutionData *edata,
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1, NIL);
ExecOpenIndices(relinfo, false);
+ if (am_parallel_prefetch_worker())
+ {
+ /*
+ * While it may be reasonable to prefetch indexes for both old and new tuples,
+ * we do it only for one of them (old if it exists, new otherwise), assuming
+ * that probability that index key is changed is quite small
+ */
+ localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot);
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ if (!prefetch_replica_identity_only)
+ {
+ for (int i = 0; i < relinfo->ri_NumIndices; i++)
+ {
+ Oid sec_index_oid = RelationGetRelid(relinfo->ri_IndexRelationDescs[i]);
+ if (sec_index_oid != localindexoid)
+ {
+ (void)RelationPrefetchIndex(localrel, sec_index_oid, remoteslot, localslot);
+ }
+ }
+ }
+ goto Cleanup;
+ }
found = FindReplTupleInLocalRel(edata, localrel,
&relmapentry->remoterel,
localindexoid,
@@ -2739,7 +2808,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
remoteslot, newslot, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
ExecCloseIndices(relinfo);
EvalPlanQualEnd(&epqstate);
}
@@ -2864,6 +2933,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
!localrel->rd_rel->relhasindex ||
RelationGetIndexList(localrel) == NIL);
+ if (am_parallel_prefetch_worker())
+ {
+ localslot = table_slot_create(localrel, &estate->es_tupleTable);
+ found = RelationPrefetchIndex(localrel, localindexoid, remoteslot, localslot);
+ if (found)
+ lr_prefetch_hits += 1;
+ else
+ lr_prefetch_misses += 1;
+ /* No need to prefdetch other indexes because the are not touched during delete */
+ goto Cleanup;
+ }
found = FindReplTupleInLocalRel(edata, localrel, remoterel, localindexoid,
remoteslot, &localslot);
@@ -2900,7 +2980,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
remoteslot, NULL, list_make1(&conflicttuple));
}
- /* Cleanup. */
+ Cleanup:
EvalPlanQualEnd(&epqstate);
}
@@ -3567,6 +3647,42 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
}
+#define MSG_CODE_OFFSET (1 + 8*3)
+
+static void
+lr_do_prefetch(char* buf, int len)
+{
+ ParallelApplyWorkerInfo* winfo;
+
+ if (buf[0] != 'w')
+ return;
+
+ switch (buf[MSG_CODE_OFFSET])
+ {
+ case LOGICAL_REP_MSG_INSERT:
+ case LOGICAL_REP_MSG_UPDATE:
+ case LOGICAL_REP_MSG_DELETE:
+ /* Round robin prefetch worker */
+ winfo = prefetch_worker[prefetch_worker_rr++ % n_prefetch_workers];
+ pa_send_data(winfo, len, buf);
+ break;
+
+ case LOGICAL_REP_MSG_TYPE:
+ case LOGICAL_REP_MSG_RELATION:
+ /* broadcast to all prefetch workers */
+ for (int i = 0; i < n_prefetch_workers; i++)
+ {
+ winfo = prefetch_worker[i];
+ pa_send_data(winfo, len, buf);
+ }
+ break;
+
+ default:
+ /* Ignore other messages */
+ break;
+ }
+}
+
/*
* Apply main loop.
*/
@@ -3577,6 +3693,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
bool ping_sent = false;
TimeLineID tli;
ErrorContextCallback errcallback;
+ char* prefetch_buf = NULL;
+ size_t prefetch_buf_pos = 0;
+ size_t prefetch_buf_used = 0;
+ size_t prefetch_buf_size = INIT_PREFETCH_BUF_SIZE;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -3594,6 +3714,23 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
"LogicalStreamingContext",
ALLOCSET_DEFAULT_SIZES);
+ if (max_parallel_prefetch_workers_per_subscription != 0)
+ {
+ int i;
+ for (i = 0; i < max_parallel_prefetch_workers_per_subscription; i++)
+ {
+ prefetch_worker[i] = pa_launch_prefetch_worker();
+ if (!prefetch_worker[i])
+ {
+ elog(LOG, "Launch only %d prefetch workers from %d",
+ i, max_parallel_prefetch_workers_per_subscription);
+ break;
+ }
+ }
+ n_prefetch_workers = i;
+ prefetch_buf = palloc(prefetch_buf_size);
+ }
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -3611,9 +3748,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
pgsocket fd = PGINVALID_SOCKET;
int rc;
- int len;
- char *buf = NULL;
+ int32 len;
+ char *buf = NULL;
bool endofstream = false;
+ bool no_more_data = false;
long wait_time;
CHECK_FOR_INTERRUPTS();
@@ -3622,87 +3760,127 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
- if (len != 0)
+ /* Loop to process all available data (without blocking). */
+ for (;;)
{
- /* Loop to process all available data (without blocking). */
- for (;;)
- {
- CHECK_FOR_INTERRUPTS();
+ CHECK_FOR_INTERRUPTS();
- if (len == 0)
+ if (len > 0 && n_prefetch_workers != 0 && prefetch_buf_pos == prefetch_buf_used)
+ {
+ prefetch_buf_used = 0;
+ do
{
- break;
- }
- else if (len < 0)
+ if (prefetch_buf_used + len + 4 > prefetch_buf_size)
+ {
+ prefetch_buf_size *= 2;
+ elog(DEBUG1, "Increase prefetch buffer size to %ld", prefetch_buf_size);
+ prefetch_buf = repalloc(prefetch_buf, prefetch_buf_size);
+ }
+ memcpy(&prefetch_buf[prefetch_buf_used], &len, 4);
+ memcpy(&prefetch_buf[prefetch_buf_used+4], buf, len);
+ prefetch_buf_used += 4 + len;
+ if (prefetch_buf_used >= INIT_PREFETCH_BUF_SIZE)
+ break;
+ len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
+ } while (len > 0);
+
+ no_more_data = len <= 0;
+
+ for (prefetch_buf_pos = 0; prefetch_buf_pos < prefetch_buf_used; prefetch_buf_pos += 4 + len)
{
- ereport(LOG,
- (errmsg("data stream from publisher has ended")));
- endofstream = true;
- break;
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ lr_do_prefetch(&prefetch_buf[prefetch_buf_pos+4], len);
}
- else
- {
- int c;
- StringInfoData s;
+ memcpy(&len, prefetch_buf, 4);
+ buf = &prefetch_buf[4];
+ prefetch_buf_pos = len + 4;
+ }
- if (ConfigReloadPending)
- {
- ConfigReloadPending = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ if (len == 0)
+ {
+ break;
+ }
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("data stream from publisher has ended")));
+ endofstream = true;
+ break;
+ }
+ else
+ {
+ int c;
+ StringInfoData s;
- /* Reset timeout. */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
- /* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyMessageContext);
+ /* Reset timeout. */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
- initReadOnlyStringInfo(&s, buf, len);
+ /* Ensure we are reading the data into our memory context. */
+ MemoryContextSwitchTo(ApplyMessageContext);
- c = pq_getmsgbyte(&s);
+ initReadOnlyStringInfo(&s, buf, len);
- if (c == 'w')
- {
- XLogRecPtr start_lsn;
- XLogRecPtr end_lsn;
- TimestampTz send_time;
+ c = pq_getmsgbyte(&s);
- start_lsn = pq_getmsgint64(&s);
- end_lsn = pq_getmsgint64(&s);
- send_time = pq_getmsgint64(&s);
+ if (c == 'w')
+ {
+ XLogRecPtr start_lsn;
+ XLogRecPtr end_lsn;
+ TimestampTz send_time;
- if (last_received < start_lsn)
- last_received = start_lsn;
+ start_lsn = pq_getmsgint64(&s);
+ end_lsn = pq_getmsgint64(&s);
+ send_time = pq_getmsgint64(&s);
- if (last_received < end_lsn)
- last_received = end_lsn;
+ if (last_received < start_lsn)
+ last_received = start_lsn;
- UpdateWorkerStats(last_received, send_time, false);
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- apply_dispatch(&s);
- }
- else if (c == 'k')
- {
- XLogRecPtr end_lsn;
- TimestampTz timestamp;
- bool reply_requested;
+ UpdateWorkerStats(last_received, send_time, false);
- end_lsn = pq_getmsgint64(&s);
- timestamp = pq_getmsgint64(&s);
- reply_requested = pq_getmsgbyte(&s);
+ apply_dispatch(&s);
+ }
+ else if (c == 'k')
+ {
+ XLogRecPtr end_lsn;
+ TimestampTz timestamp;
+ bool reply_requested;
- if (last_received < end_lsn)
- last_received = end_lsn;
+ end_lsn = pq_getmsgint64(&s);
+ timestamp = pq_getmsgint64(&s);
+ reply_requested = pq_getmsgbyte(&s);
- send_feedback(last_received, reply_requested, false);
- UpdateWorkerStats(last_received, timestamp, true);
- }
- /* other message types are purposefully ignored */
+ if (last_received < end_lsn)
+ last_received = end_lsn;
- MemoryContextReset(ApplyMessageContext);
+ send_feedback(last_received, reply_requested, false);
+ UpdateWorkerStats(last_received, timestamp, true);
}
+ /* other message types are purposefully ignored */
+ MemoryContextReset(ApplyMessageContext);
+ }
+ if (prefetch_buf_pos < prefetch_buf_used)
+ {
+ memcpy(&len, &prefetch_buf[prefetch_buf_pos], 4);
+ buf = &prefetch_buf[prefetch_buf_pos + 4];
+ prefetch_buf_pos += 4 + len;
+ }
+ else if (prefetch_buf_used != 0 && no_more_data)
+ {
+ break;
+ }
+ else
+ {
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
}
}
@@ -3926,6 +4104,10 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
static void
apply_worker_exit(void)
{
+ /* Don't restart prefetch workers */
+ if (am_parallel_prefetch_worker())
+ return;
+
if (am_parallel_apply_worker())
{
/*
@@ -4729,6 +4911,10 @@ InitializeLogRepWorker(void)
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
MySubscription->name,
get_rel_name(MyLogicalRepWorker->relid))));
+ else if (am_parallel_prefetch_worker())
+ ereport(LOG,
+ (errmsg("logical replication prefetch worker for subscription \"%s\" has started",
+ MySubscription->name)));
else
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\" has started",
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d51..b3812d7e0e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -76,6 +76,7 @@
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/syncrep.h"
+#include "replication/worker_internal.h"
#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
@@ -2143,6 +2144,18 @@ struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"prefetch_replica_identity_only",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Whether LR prefetch work should prefetch only replica identity index or all other indexes too."),
+ NULL,
+ },
+ &prefetch_replica_identity_only,
+ false,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL
@@ -3376,6 +3389,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"max_parallel_prefetch_workers_per_subscription",
+ PGC_SIGHUP,
+ REPLICATION_SUBSCRIBERS,
+ gettext_noop("Maximum number of parallel prefetch workers per subscription."),
+ NULL,
+ },
+ &max_parallel_prefetch_workers_per_subscription,
+ 2, 0, MAX_LR_PREFETCH_WORKERS,
+ NULL, NULL, NULL
+ },
+
{
{"max_active_replication_origins",
PGC_POSTMASTER,
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 104b059544..3403128977 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -759,6 +759,8 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
TupleTableSlot *outslot);
extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
TupleTableSlot *searchslot, TupleTableSlot *outslot);
+extern bool RelationPrefetchIndex(Relation rel, Oid idxoid, TupleTableSlot *searchslot,
+ TupleTableSlot *outslot);
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
EState *estate, TupleTableSlot *slot);
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 82b202f330..19d1a8d466 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -15,6 +15,8 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
+extern PGDLLIMPORT int max_parallel_prefetch_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 30b2775952..7f5b4fa51b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -32,6 +32,7 @@ typedef enum LogicalRepWorkerType
WORKERTYPE_TABLESYNC,
WORKERTYPE_APPLY,
WORKERTYPE_PARALLEL_APPLY,
+ WORKERTYPE_PARALLEL_PREFETCH,
} LogicalRepWorkerType;
typedef struct LogicalRepWorker
@@ -214,6 +215,12 @@ typedef struct ParallelApplyWorkerInfo
*/
bool in_use;
+
+ /*
+ * Performing prefetch of pages accessed by LR operations
+ */
+ bool do_prefetch;
+
ParallelApplyWorkerShared *shared;
} ParallelApplyWorkerInfo;
@@ -237,6 +244,14 @@ extern PGDLLIMPORT bool in_remote_transaction;
extern PGDLLIMPORT bool InitializingApplyWorker;
+#define MAX_LR_PREFETCH_WORKERS 128
+extern PGDLLIMPORT size_t lr_prefetch_hits;
+extern PGDLLIMPORT size_t lr_prefetch_misses;
+extern PGDLLIMPORT size_t lr_prefetch_errors;
+extern PGDLLIMPORT size_t lr_prefetch_inserts;
+
+extern bool prefetch_replica_identity_only;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
@@ -326,10 +341,13 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->in_use && \
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isParallelPrefetchWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_PARALLEL_PREFETCH)
#define isTablesyncWorker(worker) ((worker)->in_use && \
(worker)->type == WORKERTYPE_TABLESYNC)
+extern ParallelApplyWorkerInfo* pa_launch_prefetch_worker(void);
static inline bool
am_tablesync_worker(void)
@@ -351,4 +369,11 @@ am_parallel_apply_worker(void)
return isParallelApplyWorker(MyLogicalRepWorker);
}
+static inline bool
+am_parallel_prefetch_worker(void)
+{
+ Assert(MyLogicalRepWorker->in_use);
+ return isParallelPrefetchWorker(MyLogicalRepWorker);
+}
+
#endif /* WORKER_INTERNAL_H */