From b1d6a8f86813772b9198367a34c8ff8bff7fef9e Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Wed, 10 Mar 2021 15:39:08 +1300 Subject: [PATCH] pgbench: Refactor thread portability support. Instead of maintaining an incomplete emulation of POSIX threads for Windows, let's use an extremely minimalist macro-based abstraction for now. A later patch will extend this, without the need to supply more complicated pthread emulation code. (There may be a need for a more serious portable thread abstraction in later projects, but this is not it.) Minor incidental problems fixed: it wasn't OK to use (pthread_t) 0 as a special value, it wasn't OK to compare thread_t values with ==, and we incorrectly assumed that pthread functions set errno. Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de --- src/bin/pgbench/pgbench.c | 126 ++++++++++---------------------------- 1 file changed, 31 insertions(+), 95 deletions(-) diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index f1d98be2d2d..746b589e6d0 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -111,22 +111,36 @@ typedef struct socket_set #endif /* POLL_USING_SELECT */ /* - * Multi-platform pthread implementations + * Multi-platform thread implementations */ #ifdef WIN32 -/* Use native win32 threads on Windows */ -typedef struct win32_pthread *pthread_t; -typedef int pthread_attr_t; - -static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); -static int pthread_join(pthread_t th, void **thread_return); +/* Use Windows threads */ +#include +#define GETERRNO() (_dosmaperr(GetLastError()), errno) +#define THREAD_T HANDLE +#define THREAD_FUNC_RETURN_TYPE unsigned +#define THREAD_FUNC_RETURN return 0 +#define THREAD_CREATE(handle, function, arg) \ + ((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0) +#define THREAD_JOIN(handle) \ + (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \ + GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO()) #elif defined(ENABLE_THREAD_SAFETY) -/* Use platform-dependent pthread capability */ +/* Use POSIX threads */ #include +#define THREAD_T pthread_t +#define THREAD_FUNC_RETURN_TYPE void * +#define THREAD_FUNC_RETURN return NULL +#define THREAD_CREATE(handle, function, arg) \ + pthread_create((handle), NULL, (function), (arg)) +#define THREAD_JOIN(handle) \ + pthread_join((handle), NULL) #else /* No threads implementation, use none (-j 1) */ -#define pthread_t void * +#define THREAD_T void * +#define THREAD_FUNC_RETURN_TYPE void * +#define THREAD_FUNC_RETURN return NULL #endif @@ -436,7 +450,7 @@ typedef struct typedef struct { int tid; /* thread id */ - pthread_t thread; /* thread handle */ + THREAD_T thread; /* thread handle */ CState *state; /* array of CState */ int nstate; /* length of state[] */ @@ -459,8 +473,6 @@ typedef struct int64 latency_late; /* executed but late transactions */ } TState; -#define INVALID_THREAD ((pthread_t) 0) - /* * queries read from files */ @@ -604,7 +616,7 @@ static void doLog(TState *thread, CState *st, static void processXactStats(TState *thread, CState *st, instr_time *now, bool skipped, StatsData *agg); static void addScript(ParsedScript script); -static void *threadRun(void *arg); +static THREAD_FUNC_RETURN_TYPE threadRun(void *arg); static void finishCon(CState *st); static void setalarm(int seconds); static socket_set *alloc_socket_set(int count); @@ -6142,18 +6154,14 @@ main(int argc, char **argv) /* the first thread (i = 0) is executed by main thread */ if (i > 0) { - int err = pthread_create(&thread->thread, NULL, threadRun, thread); + errno = THREAD_CREATE(&thread->thread, threadRun, thread); - if (err != 0 || thread->thread == INVALID_THREAD) + if (errno != 0) { pg_log_fatal("could not create thread: %m"); exit(1); } } - else - { - thread->thread = INVALID_THREAD; - } } #else INSTR_TIME_SET_CURRENT(threads[0].start_time); @@ -6161,7 +6169,6 @@ main(int argc, char **argv) if (duration > 0) end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) + (int64) 1000000 * duration; - threads[0].thread = INVALID_THREAD; #endif /* ENABLE_THREAD_SAFETY */ /* wait for threads and accumulate results */ @@ -6172,12 +6179,12 @@ main(int argc, char **argv) TState *thread = &threads[i]; #ifdef ENABLE_THREAD_SAFETY - if (threads[i].thread == INVALID_THREAD) + if (i == 0) /* actually run this thread directly in the main thread */ (void) threadRun(thread); else /* wait of other threads. should check that 0 is returned? */ - pthread_join(thread->thread, NULL); + THREAD_JOIN(thread->thread); #else (void) threadRun(thread); #endif /* ENABLE_THREAD_SAFETY */ @@ -6216,7 +6223,7 @@ main(int argc, char **argv) return exit_code; } -static void * +static THREAD_FUNC_RETURN_TYPE threadRun(void *arg) { TState *thread = (TState *) arg; @@ -6501,7 +6508,7 @@ done: thread->logfile = NULL; } free_socket_set(sockets); - return NULL; + THREAD_FUNC_RETURN; } static void @@ -6732,74 +6739,3 @@ socket_has_input(socket_set *sa, int fd, int idx) } #endif /* POLL_USING_SELECT */ - - -/* partial pthread implementation for Windows */ - -#ifdef WIN32 - -typedef struct win32_pthread -{ - HANDLE handle; - void *(*routine) (void *); - void *arg; - void *result; -} win32_pthread; - -static unsigned __stdcall -win32_pthread_run(void *arg) -{ - win32_pthread *th = (win32_pthread *) arg; - - th->result = th->routine(th->arg); - - return 0; -} - -static int -pthread_create(pthread_t *thread, - pthread_attr_t *attr, - void *(*start_routine) (void *), - void *arg) -{ - int save_errno; - win32_pthread *th; - - th = (win32_pthread *) pg_malloc(sizeof(win32_pthread)); - th->routine = start_routine; - th->arg = arg; - th->result = NULL; - - th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL); - if (th->handle == NULL) - { - save_errno = errno; - free(th); - return save_errno; - } - - *thread = th; - return 0; -} - -static int -pthread_join(pthread_t th, void **thread_return) -{ - if (th == NULL || th->handle == NULL) - return errno = EINVAL; - - if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0) - { - _dosmaperr(GetLastError()); - return errno; - } - - if (thread_return) - *thread_return = th->result; - - CloseHandle(th->handle); - free(th); - return 0; -} - -#endif /* WIN32 */ -- 2.39.5