pgbench: introduce a RandomState struct
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 16 Nov 2018 18:43:40 +0000 (15:43 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 16 Nov 2018 19:34:13 +0000 (16:34 -0300)
This becomes useful when used to retry a transaction after a
serialization error or deadlock abort.  (We don't yet have that feature,
but this is preparation for it.)

While at it, use separate random state for thread administratrivia such
as deciding which script to run, how long to delay for throttling, or
whether to log a message when sampling; this not only makes these tasks
independent of each other, but makes the actual thread run
deterministic.

Author: Marina Polyakova
Reviewed-by: Fabien Coelho
Discussion: https://postgr.es/m/72a0d590d6ba06f242d75c2e641820ec@postgrespro.ru

src/bin/pgbench/pgbench.c

index 81bc6d8a6e338616f2cf2f7da6693d017bef33fd..73d3de0677090bb1293ab9b20168ebe7b6bcbde3 100644 (file)
@@ -279,6 +279,14 @@ typedef struct StatsData
    SimpleStats lag;
 } StatsData;
 
+/*
+ * Struct to keep random state.
+ */
+typedef struct RandomState
+{
+   unsigned short xseed[3];
+} RandomState;
+
 /*
  * Connection state machine states.
  */
@@ -360,6 +368,12 @@ typedef struct
    ConnectionStateEnum state;  /* state machine's current state. */
    ConditionalStack cstack;    /* enclosing conditionals state */
 
+   /*
+    * Separate randomness for each client. This is used for random functions
+    * PGBENCH_RANDOM_* during the execution of the script.
+    */
+   RandomState cs_func_rs;
+
    int         use_file;       /* index in sql_script for this client */
    int         command;        /* command number in script */
 
@@ -419,7 +433,16 @@ typedef struct
    pthread_t   thread;         /* thread handle */
    CState     *state;          /* array of CState */
    int         nstate;         /* length of state[] */
-   unsigned short random_state[3]; /* separate randomness for each thread */
+
+   /*
+    * Separate randomness for each thread. Each thread option uses its own
+    * random state to make all of them independent of each other and therefore
+    * deterministic at the thread level.
+    */
+   RandomState ts_choose_rs;   /* random state for selecting a script */
+   RandomState ts_throttle_rs; /* random state for transaction throttling */
+   RandomState ts_sample_rs;   /* random state for log sampling */
+
    int64       throttle_trigger;   /* previous/next throttling (us) */
    FILE       *logfile;        /* where to log, or NULL */
    ZipfCache   zipf_cache;     /* for thread-safe  zipfian random number
@@ -769,9 +792,20 @@ strtodouble(const char *str, bool errorOK, double *dv)
    return true;
 }
 
+/*
+ * Initialize a random state struct.
+ */
+static void
+initRandomState(RandomState *random_state)
+{
+   random_state->xseed[0] = random();
+   random_state->xseed[1] = random();
+   random_state->xseed[2] = random();
+}
+
 /* random number generator: uniform distribution from min to max inclusive */
 static int64
-getrand(TState *thread, int64 min, int64 max)
+getrand(RandomState *random_state, int64 min, int64 max)
 {
    /*
     * Odd coding is so that min and max have approximately the same chance of
@@ -782,7 +816,7 @@ getrand(TState *thread, int64 min, int64 max)
     * protected by a mutex, and therefore a bottleneck on machines with many
     * CPUs.
     */
-   return min + (int64) ((max - min + 1) * pg_erand48(thread->random_state));
+   return min + (int64) ((max - min + 1) * pg_erand48(random_state->xseed));
 }
 
 /*
@@ -791,7 +825,8 @@ getrand(TState *thread, int64 min, int64 max)
  * value is exp(-parameter).
  */
 static int64
-getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
+getExponentialRand(RandomState *random_state, int64 min, int64 max,
+                  double parameter)
 {
    double      cut,
                uniform,
@@ -801,7 +836,7 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
    Assert(parameter > 0.0);
    cut = exp(-parameter);
    /* erand in [0, 1), uniform in (0, 1] */
-   uniform = 1.0 - pg_erand48(thread->random_state);
+   uniform = 1.0 - pg_erand48(random_state->xseed);
 
    /*
     * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
@@ -814,7 +849,8 @@ getExponentialRand(TState *thread, int64 min, int64 max, double parameter)
 
 /* random number generator: gaussian distribution from min to max inclusive */
 static int64
-getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
+getGaussianRand(RandomState *random_state, int64 min, int64 max,
+               double parameter)
 {
    double      stdev;
    double      rand;
@@ -842,8 +878,8 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
         * are expected in (0, 1] (see
         * https://en.wikipedia.org/wiki/Box-Muller_transform)
         */
-       double      rand1 = 1.0 - pg_erand48(thread->random_state);
-       double      rand2 = 1.0 - pg_erand48(thread->random_state);
+       double      rand1 = 1.0 - pg_erand48(random_state->xseed);
+       double      rand2 = 1.0 - pg_erand48(random_state->xseed);
 
        /* Box-Muller basic form transform */
        double      var_sqrt = sqrt(-2.0 * log(rand1));
@@ -873,7 +909,7 @@ getGaussianRand(TState *thread, int64 min, int64 max, double parameter)
  * not be one.
  */
 static int64
-getPoissonRand(TState *thread, double center)
+getPoissonRand(RandomState *random_state, double center)
 {
    /*
     * Use inverse transform sampling to generate a value > 0, such that the
@@ -882,7 +918,7 @@ getPoissonRand(TState *thread, double center)
    double      uniform;
 
    /* erand in [0, 1), uniform in (0, 1] */
-   uniform = 1.0 - pg_erand48(thread->random_state);
+   uniform = 1.0 - pg_erand48(random_state->xseed);
 
    return (int64) (-log(uniform) * center + 0.5);
 }
@@ -960,7 +996,7 @@ zipfFindOrCreateCacheCell(ZipfCache *cache, int64 n, double s)
  * Luc Devroye, p. 550-551, Springer 1986.
  */
 static int64
-computeIterativeZipfian(TState *thread, int64 n, double s)
+computeIterativeZipfian(RandomState *random_state, int64 n, double s)
 {
    double      b = pow(2.0, s - 1.0);
    double      x,
@@ -971,8 +1007,8 @@ computeIterativeZipfian(TState *thread, int64 n, double s)
    while (true)
    {
        /* random variates */
-       u = pg_erand48(thread->random_state);
-       v = pg_erand48(thread->random_state);
+       u = pg_erand48(random_state->xseed);
+       v = pg_erand48(random_state->xseed);
 
        x = floor(pow(u, -1.0 / (s - 1.0)));
 
@@ -990,10 +1026,11 @@ computeIterativeZipfian(TState *thread, int64 n, double s)
  * Jim Gray et al, SIGMOD 1994
  */
 static int64
-computeHarmonicZipfian(TState *thread, int64 n, double s)
+computeHarmonicZipfian(ZipfCache *zipf_cache, RandomState *random_state,
+                      int64 n, double s)
 {
-   ZipfCell   *cell = zipfFindOrCreateCacheCell(&thread->zipf_cache, n, s);
-   double      uniform = pg_erand48(thread->random_state);
+   ZipfCell   *cell = zipfFindOrCreateCacheCell(zipf_cache, n, s);
+   double      uniform = pg_erand48(random_state->xseed);
    double      uz = uniform * cell->harmonicn;
 
    if (uz < 1.0)
@@ -1005,17 +1042,17 @@ computeHarmonicZipfian(TState *thread, int64 n, double s)
 
 /* random number generator: zipfian distribution from min to max inclusive */
 static int64
-getZipfianRand(TState *thread, int64 min, int64 max, double s)
+getZipfianRand(ZipfCache *zipf_cache, RandomState *random_state, int64 min,
+              int64 max, double s)
 {
    int64       n = max - min + 1;
 
    /* abort if parameter is invalid */
    Assert(s > 0.0 && s != 1.0 && s <= MAX_ZIPFIAN_PARAM);
 
-
    return min - 1 + ((s > 1)
-                     ? computeIterativeZipfian(thread, n, s)
-                     : computeHarmonicZipfian(thread, n, s));
+                     ? computeIterativeZipfian(random_state, n, s)
+                     : computeHarmonicZipfian(zipf_cache, random_state, n, s));
 }
 
 /*
@@ -2310,7 +2347,7 @@ evalStandardFunc(TState *thread, CState *st,
                if (func == PGBENCH_RANDOM)
                {
                    Assert(nargs == 2);
-                   setIntValue(retval, getrand(thread, imin, imax));
+                   setIntValue(retval, getrand(&st->cs_func_rs, imin, imax));
                }
                else            /* gaussian & exponential */
                {
@@ -2332,7 +2369,8 @@ evalStandardFunc(TState *thread, CState *st,
                        }
 
                        setIntValue(retval,
-                                   getGaussianRand(thread, imin, imax, param));
+                                   getGaussianRand(&st->cs_func_rs,
+                                                   imin, imax, param));
                    }
                    else if (func == PGBENCH_RANDOM_ZIPFIAN)
                    {
@@ -2344,7 +2382,9 @@ evalStandardFunc(TState *thread, CState *st,
                            return false;
                        }
                        setIntValue(retval,
-                                   getZipfianRand(thread, imin, imax, param));
+                                   getZipfianRand(&thread->zipf_cache,
+                                                  &st->cs_func_rs,
+                                                  imin, imax, param));
                    }
                    else        /* exponential */
                    {
@@ -2357,7 +2397,8 @@ evalStandardFunc(TState *thread, CState *st,
                        }
 
                        setIntValue(retval,
-                                   getExponentialRand(thread, imin, imax, param));
+                                   getExponentialRand(&st->cs_func_rs,
+                                                      imin, imax, param));
                    }
                }
 
@@ -2652,7 +2693,7 @@ chooseScript(TState *thread)
    if (num_scripts == 1)
        return 0;
 
-   w = getrand(thread, 0, total_weight - 1);
+   w = getrand(&thread->ts_choose_rs, 0, total_weight - 1);
    do
    {
        w -= sql_script[i++].weight;
@@ -2846,7 +2887,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                 * away.
                 */
                Assert(throttle_delay > 0);
-               wait = getPoissonRand(thread, throttle_delay);
+               wait = getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
 
                thread->throttle_trigger += wait;
                st->txn_scheduled = thread->throttle_trigger;
@@ -2880,7 +2921,8 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                    {
                        processXactStats(thread, st, &now, true, agg);
                        /* next rendez-vous */
-                       wait = getPoissonRand(thread, throttle_delay);
+                       wait = getPoissonRand(&thread->ts_throttle_rs,
+                                             throttle_delay);
                        thread->throttle_trigger += wait;
                        st->txn_scheduled = thread->throttle_trigger;
                    }
@@ -3423,7 +3465,7 @@ doLog(TState *thread, CState *st,
     * to the random sample.
     */
    if (sample_rate != 0.0 &&
-       pg_erand48(thread->random_state) > sample_rate)
+       pg_erand48(thread->ts_sample_rs.xseed) > sample_rate)
        return;
 
    /* should we aggregate the results or not? */
@@ -4851,7 +4893,6 @@ set_random_seed(const char *seed)
    return true;
 }
 
-
 int
 main(int argc, char **argv)
 {
@@ -5465,6 +5506,7 @@ main(int argc, char **argv)
    for (i = 0; i < nclients; i++)
    {
        state[i].cstack = conditional_stack_create();
+       initRandomState(&state[i].cs_func_rs);
    }
 
    if (debug)
@@ -5598,9 +5640,9 @@ main(int argc, char **argv)
        thread->state = &state[nclients_dealt];
        thread->nstate =
            (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
-       thread->random_state[0] = random();
-       thread->random_state[1] = random();
-       thread->random_state[2] = random();
+       initRandomState(&thread->ts_choose_rs);
+       initRandomState(&thread->ts_throttle_rs);
+       initRandomState(&thread->ts_sample_rs);
        thread->logfile = NULL; /* filled in later */
        thread->latency_late = 0;
        thread->zipf_cache.nb_cells = 0;