PATCH: pgbench - remove thread fork-emulation

Started by Fabien COELHOover 10 years ago2 messages
#1Fabien COELHO
coelho@cri.ensmp.fr
1 attachment(s)

This patch removes the pgbench thread fork-emulation code and simplifies
things where possible, especially around pthread_create and pthread_join.
The stats collection for the report is done directly instead of using an
intermediate structure.

As a result, if no thread implementation is available, pgbench is
restricted to work with only the main thread (ie "pgbench -j 1 ...").

== Rational ==

Pgbench currently provides a thread emulation through process forks. This
feature was developed way back when it may have been common that some
platforms were not supporting threads. This is now very rare (can you name
one such platform?).

However, the thread fork-emulation feature has drawbacks: Namely,
processes are not threads, the memory is not shared (sure), so it hinders
simple implementation for some features, or results in not providing these
features with fork-emulation, or having a different behavior under
fork-emulation:

Latency collection (-r) is not supported with fork emulation.

Progress (-P) is reported differently with fork emulation.

For a new feature under discussion, which consist in allowing one log
instead of per-thread logs, supporting fork-emulation requires a (heavy)
post-processing external sort phase whereas with actual threads all
threads can share and append to the same log file with limited overhead,
which is significantly simpler.

== Note ==

This is a small regression (for platforms without thread support, -j J
with J > 1 is not supported anymore after the patch), so maybe this should
be included for PostgreSQL 10.0 only? I do not think this should required,
but this is only my opinion.

--
Fabien.

Attachments:

pgbench-threads-1.patchtext/x-diff; name=pgbench-threads-1.patchDownload
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 06a4dfd..989f151 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -70,20 +70,8 @@ static int	pthread_join(pthread_t th, void **thread_return);
 /* Use platform-dependent pthread capability */
 #include <pthread.h>
 #else
-/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
-#define PTHREAD_FORK_EMULATION
-#include <sys/wait.h>
-
-#define pthread_t				pg_pthread_t
-#define pthread_attr_t			pg_pthread_attr_t
-#define pthread_create			pg_pthread_create
-#define pthread_join			pg_pthread_join
-
-typedef struct fork_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int	pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int	pthread_join(pthread_t th, void **thread_return);
+/* No threads implementation, use none (-j 1) */
+#define pthread_t void *
 #endif
 
 
@@ -210,8 +198,6 @@ typedef struct
 	PGconn	   *con;			/* connection handle to DB */
 	int			id;				/* client No. */
 	int			state;			/* state No. */
-	int			cnt;			/* xacts count */
-	int			ecnt;			/* error count */
 	int			listen;			/* 0 indicates that an async query has been
 								 * sent */
 	int			sleeping;		/* 1 indicates that the client is napping */
@@ -221,15 +207,19 @@ typedef struct
 	int64		txn_scheduled;	/* scheduled start time of transaction (usec) */
 	instr_time	txn_begin;		/* used for measuring schedule lag times */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
-	int64		txn_latencies;	/* cumulated latencies */
-	int64		txn_sqlats;		/* cumulated square latencies */
 	bool		is_throttled;	/* whether transaction throttling is done */
 	int			use_file;		/* index in sql_files for this client */
 	bool		prepared[MAX_FILES];
+
+	/* per client collected stats */
+	int			cnt;			/* xacts count */
+	int			ecnt;			/* error count */
+	int64		txn_latencies;	/* cumulated latencies */
+	int64		txn_sqlats;		/* cumulated square latencies */
 } CState;
 
 /*
- * Thread state and result
+ * Thread state
  */
 typedef struct
 {
@@ -242,6 +232,9 @@ typedef struct
 	int		   *exec_count;		/* number of cmd executions (per Command) */
 	unsigned short random_state[3];		/* separate randomness for each thread */
 	int64		throttle_trigger;		/* previous/next throttling (us) */
+
+	/* per thread collected stats */
+	instr_time	conn_time;
 	int64		throttle_lag;	/* total transaction lag behind throttling */
 	int64		throttle_lag_max;		/* max transaction lag */
 	int64		throttle_latency_skipped; /* lagging transactions skipped */
@@ -250,18 +243,6 @@ typedef struct
 
 #define INVALID_THREAD		((pthread_t) 0)
 
-typedef struct
-{
-	instr_time	conn_time;
-	int64		xacts;
-	int64		latencies;
-	int64		sqlats;
-	int64		throttle_lag;
-	int64		throttle_lag_max;
-	int64		throttle_latency_skipped;
-	int64		latency_late;
-} TResult;
-
 /*
  * queries read from files
  */
@@ -2895,6 +2876,13 @@ main(int argc, char **argv)
 					fprintf(stderr, "invalid number of threads: %d\n", nthreads);
 					exit(1);
 				}
+#if !defined(ENABLE_THREAD_SAFETY)
+				if (nthreads != 1)
+				{
+					fprintf(stderr, "no threads available, use only \"-j 1\"\n");
+					exit(1);
+				}
+#endif /* !ENABLE_THREAD_SAFETY */
 				break;
 			case 'C':
 				benchmarking_option_set = true;
@@ -3161,22 +3149,6 @@ main(int argc, char **argv)
 	}
 
 	/*
-	 * is_latencies only works with multiple threads in thread-based
-	 * implementations, not fork-based ones, because it supposes that the
-	 * parent can see changes made to the per-thread execution stats by child
-	 * threads.  It seems useful enough to accept despite this limitation, but
-	 * perhaps we should FIXME someday (by passing the stats data back up
-	 * through the parent-to-child pipes).
-	 */
-#ifndef ENABLE_THREAD_SAFETY
-	if (is_latencies && nthreads > 1)
-	{
-		fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
-		exit(1);
-	}
-#endif
-
-	/*
 	 * save main process id in the global variable because process id will be
 	 * changed after fork.
 	 */
@@ -3372,6 +3344,7 @@ main(int argc, char **argv)
 		setalarm(duration);
 
 	/* start threads */
+#if defined(ENABLE_THREAD_SAFETY)
 	for (i = 0; i < nthreads; i++)
 	{
 		TState	   *thread = &threads[i];
@@ -3394,32 +3367,43 @@ main(int argc, char **argv)
 			thread->thread = INVALID_THREAD;
 		}
 	}
+#else
+	INSTR_TIME_SET_CURRENT(threads[0].start_time);
+	threads[0].thread = INVALID_THREAD;
+#endif /* ENABLE_THREAD_SAFETY */
 
 	/* wait for threads and accumulate results */
 	INSTR_TIME_SET_ZERO(conn_total_time);
 	for (i = 0; i < nthreads; i++)
 	{
-		void	   *ret = NULL;
+		TState	   *thread = &threads[i];
+		int j;
 
+#if defined(ENABLE_THREAD_SAFETY)
 		if (threads[i].thread == INVALID_THREAD)
-			ret = threadRun(&threads[i]);
+			/* actually run this thread directly in the main thread */
+			(void) threadRun(thread);
 		else
-			pthread_join(threads[i].thread, &ret);
+			/* wait of other threads. should check that 0 is returned? */
+			pthread_join(thread->thread, NULL);
+#else
+		(void) threadRun(thread);
+#endif /* ENABLE_THREAD_SAFETY */
 
-		if (ret != NULL)
-		{
-			TResult    *r = (TResult *) ret;
+		/* thread level stats */
+		throttle_lag += thread->throttle_lag;
+		throttle_latency_skipped = threads->throttle_latency_skipped;
+		latency_late = thread->latency_late;
+		if (throttle_lag_max > thread->throttle_lag_max)
+			throttle_lag_max = thread->throttle_lag_max;
+		INSTR_TIME_ADD(conn_total_time, thread->conn_time);
 
-			total_xacts += r->xacts;
-			total_latencies += r->latencies;
-			total_sqlats += r->sqlats;
-			throttle_lag += r->throttle_lag;
-			throttle_latency_skipped += r->throttle_latency_skipped;
-			latency_late += r->latency_late;
-			if (r->throttle_lag_max > throttle_lag_max)
-				throttle_lag_max = r->throttle_lag_max;
-			INSTR_TIME_ADD(conn_total_time, r->conn_time);
-			free(ret);
+		/* client-level stats */
+		for (j = 0; j < thread->nstate; j++)
+		{
+			total_xacts += thread->state[j].cnt;
+			total_latencies += thread->state[i].txn_latencies;
+			total_sqlats += thread->state[i].txn_sqlats;
 		}
 	}
 	disconnect_all(state, nclients);
@@ -3449,7 +3433,6 @@ threadRun(void *arg)
 {
 	TState	   *thread = (TState *) arg;
 	CState	   *state = thread->state;
-	TResult    *result;
 	FILE	   *logfile = NULL; /* per-thread log file */
 	instr_time	start,
 				end;
@@ -3480,9 +3463,7 @@ threadRun(void *arg)
 	thread->throttle_lag = 0;
 	thread->throttle_lag_max = 0;
 
-	result = pg_malloc(sizeof(TResult));
-
-	INSTR_TIME_SET_ZERO(result->conn_time);
+	INSTR_TIME_SET_ZERO(thread->conn_time);
 
 	/* open log file if requested */
 	if (use_log)
@@ -3513,8 +3494,8 @@ threadRun(void *arg)
 	}
 
 	/* time after thread and connections set up */
-	INSTR_TIME_SET_CURRENT(result->conn_time);
-	INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+	INSTR_TIME_SET_CURRENT(thread->conn_time);
+	INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
 
 	agg_vals_init(&aggs, thread->start_time);
 
@@ -3526,7 +3507,7 @@ threadRun(void *arg)
 		int			prev_ecnt = st->ecnt;
 
 		st->use_file = getrand(thread, 0, num_files - 1);
-		if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+		if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
 			remains--;			/* I've aborted */
 
 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3641,7 +3622,7 @@ threadRun(void *arg)
 			if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
 							|| commands[st->state]->type == META_COMMAND))
 			{
-				if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+				if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
 					remains--;	/* I've aborted */
 			}
 
@@ -3654,68 +3635,6 @@ threadRun(void *arg)
 			}
 		}
 
-#ifdef PTHREAD_FORK_EMULATION
-		/* each process reports its own progression */
-		if (progress)
-		{
-			instr_time	now_time;
-			int64		now;
-
-			INSTR_TIME_SET_CURRENT(now_time);
-			now = INSTR_TIME_GET_MICROSEC(now_time);
-			if (now >= next_report)
-			{
-				/* generate and show report */
-				int64		count = 0,
-							lats = 0,
-							sqlats = 0,
-							skipped = 0;
-				int64		lags = thread->throttle_lag;
-				int64		run = now - last_report;
-				double		tps,
-							total_run,
-							latency,
-							sqlat,
-							stdev,
-							lag;
-
-				for (i = 0; i < nstate; i++)
-				{
-					count += state[i].cnt;
-					lats += state[i].txn_latencies;
-					sqlats += state[i].txn_sqlats;
-				}
-
-				total_run = (now - thread_start) / 1000000.0;
-				tps = 1000000.0 * (count - last_count) / run;
-				latency = 0.001 * (lats - last_lats) / (count - last_count);
-				sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
-				stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
-				lag = 0.001 * (lags - last_lags) / (count - last_count);
-				skipped = thread->throttle_latency_skipped - last_skipped;
-
-				fprintf(stderr,
-						"progress %d: %.1f s, %.1f tps, "
-						"lat %.3f ms stddev %.3f",
-						thread->tid, total_run, tps, latency, stdev);
-				if (throttle_delay)
-				{
-					fprintf(stderr, ", lag %.3f ms", lag);
-					if (latency_limit)
-						fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
-				}
-				fprintf(stderr, "\n");
-
-				last_count = count;
-				last_lats = lats;
-				last_sqlats = sqlats;
-				last_lags = lags;
-				last_report = now;
-				last_skipped = thread->throttle_latency_skipped;
-				next_report += (int64) progress *1000000;
-			}
-		}
-#else
 		/* progress report by thread 0 for all threads */
 		if (progress && thread->tid == 0)
 		{
@@ -3779,31 +3698,16 @@ threadRun(void *arg)
 				next_report += (int64) progress *1000000;
 			}
 		}
-#endif   /* PTHREAD_FORK_EMULATION */
 	}
 
 done:
 	INSTR_TIME_SET_CURRENT(start);
 	disconnect_all(state, nstate);
-	result->xacts = 0;
-	result->latencies = 0;
-	result->sqlats = 0;
-	for (i = 0; i < nstate; i++)
-	{
-		result->xacts += state[i].cnt;
-		result->latencies += state[i].txn_latencies;
-		result->sqlats += state[i].txn_sqlats;
-	}
-	result->throttle_lag = thread->throttle_lag;
-	result->throttle_lag_max = thread->throttle_lag_max;
-	result->throttle_latency_skipped = thread->throttle_latency_skipped;
-	result->latency_late = thread->latency_late;
-
 	INSTR_TIME_SET_CURRENT(end);
-	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+	INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
 	if (logfile)
 		fclose(logfile);
-	return result;
+	return NULL;
 }
 
 /*
@@ -3825,90 +3729,6 @@ setalarm(int seconds)
 	alarm(seconds);
 }
 
-#ifndef ENABLE_THREAD_SAFETY
-
-/*
- * implements pthread using fork.
- */
-
-typedef struct fork_pthread
-{
-	pid_t		pid;
-	int			pipes[2];
-}	fork_pthread;
-
-static int
-pthread_create(pthread_t *thread,
-			   pthread_attr_t *attr,
-			   void *(*start_routine) (void *),
-			   void *arg)
-{
-	fork_pthread *th;
-	void	   *ret;
-	int			rc;
-
-	th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
-	if (pipe(th->pipes) < 0)
-	{
-		free(th);
-		return errno;
-	}
-
-	th->pid = fork();
-	if (th->pid == -1)			/* error */
-	{
-		free(th);
-		return errno;
-	}
-	if (th->pid != 0)			/* in parent process */
-	{
-		close(th->pipes[1]);
-		*thread = th;
-		return 0;
-	}
-
-	/* in child process */
-	close(th->pipes[0]);
-
-	/* set alarm again because the child does not inherit timers */
-	if (duration > 0)
-		setalarm(duration);
-
-	ret = start_routine(arg);
-	rc = write(th->pipes[1], ret, sizeof(TResult));
-	(void) rc;
-	close(th->pipes[1]);
-	free(th);
-	exit(0);
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
-	int			status;
-
-	while (waitpid(th->pid, &status, 0) != th->pid)
-	{
-		if (errno != EINTR)
-			return errno;
-	}
-
-	if (thread_return != NULL)
-	{
-		/* assume result is TResult */
-		*thread_return = pg_malloc(sizeof(TResult));
-		if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
-		{
-			free(*thread_return);
-			*thread_return = NULL;
-		}
-	}
-	close(th->pipes[0]);
-
-	free(th);
-	return 0;
-}
-#endif
 #else							/* WIN32 */
 
 static VOID CALLBACK
#2Heikki Linnakangas
hlinnaka@iki.fi
In reply to: Fabien COELHO (#1)
Re: PATCH: pgbench - remove thread fork-emulation

On 04/28/2015 02:18 AM, Fabien COELHO wrote:

This patch removes the pgbench thread fork-emulation code and simplifies
things where possible, especially around pthread_create and pthread_join.
The stats collection for the report is done directly instead of using an
intermediate structure.

As a result, if no thread implementation is available, pgbench is
restricted to work with only the main thread (ie "pgbench -j 1 ...").

== Rational ==

Pgbench currently provides a thread emulation through process forks. This
feature was developed way back when it may have been common that some
platforms were not supporting threads. This is now very rare (can you name
one such platform?).

However, the thread fork-emulation feature has drawbacks: Namely,
processes are not threads, the memory is not shared (sure), so it hinders
simple implementation for some features, or results in not providing these
features with fork-emulation, or having a different behavior under
fork-emulation:

Latency collection (-r) is not supported with fork emulation.

Progress (-P) is reported differently with fork emulation.

For a new feature under discussion, which consist in allowing one log
instead of per-thread logs, supporting fork-emulation requires a (heavy)
post-processing external sort phase whereas with actual threads all
threads can share and append to the same log file with limited overhead,
which is significantly simpler.

I agree with all that, it's time to let the fork-emulation mode to go.
Committed.

- Heikki

--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers