Some never executed code regarding the table sync worker
Hi all,
After launched the sync table worker it enters ApplyWorkerMain
function. And then the table sync worker calls
LogicalRepSyncTableStart to synchronize the target table. In
LogicalRepSyncTableStart, finish_sync_worker is always called and then
the table sync worker process always exits after synched. On the other
hand, some source code seems to suppose that the table sync worker
still continue to working even after LogicalRepSyncTableStart
For example in ApplyWorkerMain, LogicalRepSyncTableStart returns
replication slot name that was used for synchronization but this code
is never executed.
if (am_tablesync_worker())
{
char *syncslotname;
/* This is table synchroniation worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
/* The slot name needs to be allocated in permanent memory context. */
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx);
pfree(syncslotname);
}
------
And, since the table sync worker doesn't call process_syncing_tables
so far process_syncing_tables_for_sync is never executed.
/*
* Process state possible change(s) of tables that are being synchronized.
*/
void
process_syncing_tables(XLogRecPtr current_lsn)
{
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
process_syncing_tables_for_apply(current_lsn);
}
These code will be used for future enhancement? I think that since
leaving unused code is not good we can get rid of these unnecessary
codes. Attached patch removes unnecessary codes related to the table
sync worker.
Please give me feedback.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
Attachments:
unused_code_of_table_sync_worker.patchapplication/octet-stream; name=unused_code_of_table_sync_worker.patchDownload
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index a067fe3..1809f0a 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -188,47 +188,6 @@ invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue)
}
/*
- * Handle table synchronization cooperation from the synchronization
- * worker.
- *
- * If the sync worker is in catch up mode and reached the predetermined
- * synchronization point in the WAL stream, mark the table as READY and
- * finish. If it caught up too far, set to SYNCDONE and finish. Things will
- * then proceed in the "sync in front" scenario.
- */
-static void
-process_syncing_tables_for_sync(XLogRecPtr current_lsn)
-{
- Assert(IsTransactionState());
-
- SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-
- if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP &&
- current_lsn >= MyLogicalRepWorker->relstate_lsn)
- {
- TimeLineID tli;
-
- MyLogicalRepWorker->relstate =
- (current_lsn == MyLogicalRepWorker->relstate_lsn)
- ? SUBREL_STATE_READY
- : SUBREL_STATE_SYNCDONE;
- MyLogicalRepWorker->relstate_lsn = current_lsn;
-
- SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
- SetSubscriptionRelState(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- MyLogicalRepWorker->relstate,
- MyLogicalRepWorker->relstate_lsn);
-
- walrcv_endstreaming(wrconn, &tli);
- finish_sync_worker();
- }
- else
- SpinLockRelease(&MyLogicalRepWorker->relmutex);
-}
-
-/*
* Handle table synchronization cooperation from the apply worker.
*
* Walk over all subscription tables that are individually tracked by the
@@ -252,8 +211,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
* If the synchronization position is reached, then the table can be marked as
* READY and is no longer tracked.
*/
-static void
-process_syncing_tables_for_apply(XLogRecPtr current_lsn)
+void
+process_syncing_tables(XLogRecPtr current_lsn)
{
static List *table_states = NIL;
ListCell *lc;
@@ -406,18 +365,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
/*
- * Process state possible change(s) of tables that are being synchronized.
- */
-void
-process_syncing_tables(XLogRecPtr current_lsn)
-{
- if (am_tablesync_worker())
- process_syncing_tables_for_sync(current_lsn);
- else
- process_syncing_tables_for_apply(current_lsn);
-}
-
-/*
* Create list of columns for COPY based on logical relation mapping.
*/
static List *
@@ -685,12 +632,12 @@ copy_table(Relation rel)
}
/*
- * Start syncing the table in the sync worker.
+ * Sync the table in the sync worker.
*
- * The returned slot name is palloced in current memory context.
+ * After synced the table, exit.
*/
-char *
-LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
+void
+LogicalRepSyncTable(XLogRecPtr *origin_startpos)
{
char *slotname;
char *err;
@@ -835,6 +782,4 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
elog(ERROR, "unknown relation state \"%c\"",
MyLogicalRepWorker->relstate);
}
-
- return slotname;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index bbf3506..cf2426e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1493,17 +1493,12 @@ ApplyWorkerMain(Datum main_arg)
if (am_tablesync_worker())
{
- char *syncslotname;
-
- /* This is table synchroniation worker, call initial sync. */
- syncslotname = LogicalRepSyncTableStart(&origin_startpos);
-
- /* The slot name needs to be allocated in permanent memory context. */
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
- myslotname = pstrdup(syncslotname);
- MemoryContextSwitchTo(oldctx);
-
- pfree(syncslotname);
+ /*
+ * The table sync worker exits here after synced table and gave the
+ * the control of the replication of the table to the main apply
+ * process.
+ */
+ LogicalRepSyncTable(&origin_startpos);
}
else
{
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 5bebca3..0c57435 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -70,8 +70,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
-extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
-void process_syncing_tables(XLogRecPtr current_lsn);
+extern void LogicalRepSyncTable(XLogRecPtr *origin_startpos);
+extern void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
Hi,
At Sat, 1 Apr 2017 02:35:00 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in <CAD21AoCSmEm-P=ND6xXW4fF46_f9QhF-Uwdyna__MEEq=jbHfw@mail.gmail.com>
Hi all,
After launched the sync table worker it enters ApplyWorkerMain
function. And then the table sync worker calls
LogicalRepSyncTableStart to synchronize the target table.
Sure,
In
LogicalRepSyncTableStart, finish_sync_worker is always called and then
the table sync worker process always exits after synched.
After entering the function with the relstates
SUBREL_STATE_INIT/DATASYNC if the relstate finally reaches
SUBREL_STATE_CATCHUP, finish_sync_worker is not called and
returns with the slotname. Then the slotname will be used by the
subsequently launched apply worker. Of course, it immediately
ends if failed to catch up, though.
Aren't you missing something? or I am?
On the other
hand, some source code seems to suppose that the table sync worker
still continue to working even after LogicalRepSyncTableStartFor example in ApplyWorkerMain, LogicalRepSyncTableStart returns
replication slot name that was used for synchronization but this code
is never executed.if (am_tablesync_worker())
{
char *syncslotname;/* This is table synchroniation worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);/* The slot name needs to be allocated in permanent memory context. */
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx);pfree(syncslotname);
}------
And, since the table sync worker doesn't call process_syncing_tables
so far process_syncing_tables_for_sync is never executed./*
* Process state possible change(s) of tables that are being synchronized.
*/
void
process_syncing_tables(XLogRecPtr current_lsn)
{
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
process_syncing_tables_for_apply(current_lsn);
}These code will be used for future enhancement? I think that since
leaving unused code is not good we can get rid of these unnecessary
codes. Attached patch removes unnecessary codes related to the table
sync worker.
Please give me feedback.
--
Kyotaro Horiguchi
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers
On Tue, Apr 4, 2017 at 6:26 PM, Kyotaro HORIGUCHI
<horiguchi.kyotaro@lab.ntt.co.jp> wrote:
Hi,
At Sat, 1 Apr 2017 02:35:00 +0900, Masahiko Sawada <sawada.mshk@gmail.com> wrote in <CAD21AoCSmEm-P=ND6xXW4fF46_f9QhF-Uwdyna__MEEq=jbHfw@mail.gmail.com>
Hi all,
After launched the sync table worker it enters ApplyWorkerMain
function. And then the table sync worker calls
LogicalRepSyncTableStart to synchronize the target table.Sure,
In
LogicalRepSyncTableStart, finish_sync_worker is always called and then
the table sync worker process always exits after synched.After entering the function with the relstates
SUBREL_STATE_INIT/DATASYNC if the relstate finally reaches
SUBREL_STATE_CATCHUP, finish_sync_worker is not called and
returns with the slotname. Then the slotname will be used by the
subsequently launched apply worker. Of course, it immediately
ends if failed to catch up, though.Aren't you missing something? or I am?
On the other
hand, some source code seems to suppose that the table sync worker
still continue to working even after LogicalRepSyncTableStartFor example in ApplyWorkerMain, LogicalRepSyncTableStart returns
replication slot name that was used for synchronization but this code
is never executed.if (am_tablesync_worker())
{
char *syncslotname;/* This is table synchroniation worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);/* The slot name needs to be allocated in permanent memory context. */
oldctx = MemoryContextSwitchTo(ApplyCacheContext);
myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx);pfree(syncslotname);
}------
And, since the table sync worker doesn't call process_syncing_tables
so far process_syncing_tables_for_sync is never executed./*
* Process state possible change(s) of tables that are being synchronized.
*/
void
process_syncing_tables(XLogRecPtr current_lsn)
{
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
process_syncing_tables_for_apply(current_lsn);
}These code will be used for future enhancement? I think that since
leaving unused code is not good we can get rid of these unnecessary
codes. Attached patch removes unnecessary codes related to the table
sync worker.
Please give me feedback.
Oops, you're right. I had misunderstood that path. Thank you.
Removed this from open item.
Regards,
--
Masahiko Sawada
NIPPON TELEGRAPH AND TELEPHONE CORPORATION
NTT Open Source Software Center
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers