Abort pgbench if script end is reached with an open pipeline
[pgsql.git] / src / bin / pgbench / pgbench.c
blob7b53f9c24da30ba99657cb6eb3fb2796f74fbaa7
1 /*
2 * pgbench.c
4 * A simple benchmark program for PostgreSQL
5 * Originally written by Tatsuo Ishii and enhanced by many contributors.
7 * src/bin/pgbench/pgbench.c
8 * Copyright (c) 2000-2024, PostgreSQL Global Development Group
9 * ALL RIGHTS RESERVED;
11 * Permission to use, copy, modify, and distribute this software and its
12 * documentation for any purpose, without fee, and without a written agreement
13 * is hereby granted, provided that the above copyright notice and this
14 * paragraph and the following two paragraphs appear in all copies.
16 * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
17 * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
18 * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
19 * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
20 * POSSIBILITY OF SUCH DAMAGE.
22 * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
23 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
24 * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
25 * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
26 * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
30 #if defined(WIN32) && FD_SETSIZE < 1024
31 #error FD_SETSIZE needs to have been increased
32 #endif
34 #include "postgres_fe.h"
36 #include <ctype.h>
37 #include <float.h>
38 #include <limits.h>
39 #include <math.h>
40 #include <signal.h>
41 #include <time.h>
42 #include <sys/time.h>
43 #include <sys/resource.h> /* for getrlimit */
45 /* For testing, PGBENCH_USE_SELECT can be defined to force use of that code */
46 #if defined(HAVE_PPOLL) && !defined(PGBENCH_USE_SELECT)
47 #define POLL_USING_PPOLL
48 #ifdef HAVE_POLL_H
49 #include <poll.h>
50 #endif
51 #else /* no ppoll(), so use select() */
52 #define POLL_USING_SELECT
53 #include <sys/select.h>
54 #endif
56 #include "common/int.h"
57 #include "common/logging.h"
58 #include "common/pg_prng.h"
59 #include "common/string.h"
60 #include "common/username.h"
61 #include "fe_utils/cancel.h"
62 #include "fe_utils/conditional.h"
63 #include "fe_utils/option_utils.h"
64 #include "fe_utils/string_utils.h"
65 #include "getopt_long.h"
66 #include "libpq-fe.h"
67 #include "pgbench.h"
68 #include "port/pg_bitutils.h"
69 #include "portability/instr_time.h"
71 /* X/Open (XSI) requires <math.h> to provide M_PI, but core POSIX does not */
72 #ifndef M_PI
73 #define M_PI 3.14159265358979323846
74 #endif
76 #define ERRCODE_T_R_SERIALIZATION_FAILURE "40001"
77 #define ERRCODE_T_R_DEADLOCK_DETECTED "40P01"
78 #define ERRCODE_UNDEFINED_TABLE "42P01"
81 * Hashing constants
83 #define FNV_PRIME UINT64CONST(0x100000001b3)
84 #define FNV_OFFSET_BASIS UINT64CONST(0xcbf29ce484222325)
85 #define MM2_MUL UINT64CONST(0xc6a4a7935bd1e995)
86 #define MM2_MUL_TIMES_8 UINT64CONST(0x35253c9ade8f4ca8)
87 #define MM2_ROT 47
90 * Multi-platform socket set implementations
93 #ifdef POLL_USING_PPOLL
94 #define SOCKET_WAIT_METHOD "ppoll"
96 typedef struct socket_set
98 int maxfds; /* allocated length of pollfds[] array */
99 int curfds; /* number currently in use */
100 struct pollfd pollfds[FLEXIBLE_ARRAY_MEMBER];
101 } socket_set;
103 #endif /* POLL_USING_PPOLL */
105 #ifdef POLL_USING_SELECT
106 #define SOCKET_WAIT_METHOD "select"
108 typedef struct socket_set
110 int maxfd; /* largest FD currently set in fds */
111 fd_set fds;
112 } socket_set;
114 #endif /* POLL_USING_SELECT */
117 * Multi-platform thread implementations
120 #ifdef WIN32
121 /* Use Windows threads */
122 #include <windows.h>
123 #define GETERRNO() (_dosmaperr(GetLastError()), errno)
124 #define THREAD_T HANDLE
125 #define THREAD_FUNC_RETURN_TYPE unsigned
126 #define THREAD_FUNC_RETURN return 0
127 #define THREAD_FUNC_CC __stdcall
128 #define THREAD_CREATE(handle, function, arg) \
129 ((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0)
130 #define THREAD_JOIN(handle) \
131 (WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
132 GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
133 #define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
134 #define THREAD_BARRIER_INIT(barrier, n) \
135 (InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
136 #define THREAD_BARRIER_WAIT(barrier) \
137 EnterSynchronizationBarrier((barrier), \
138 SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY)
139 #define THREAD_BARRIER_DESTROY(barrier)
140 #else
141 /* Use POSIX threads */
142 #include "port/pg_pthread.h"
143 #define THREAD_T pthread_t
144 #define THREAD_FUNC_RETURN_TYPE void *
145 #define THREAD_FUNC_RETURN return NULL
146 #define THREAD_FUNC_CC
147 #define THREAD_CREATE(handle, function, arg) \
148 pthread_create((handle), NULL, (function), (arg))
149 #define THREAD_JOIN(handle) \
150 pthread_join((handle), NULL)
151 #define THREAD_BARRIER_T pthread_barrier_t
152 #define THREAD_BARRIER_INIT(barrier, n) \
153 pthread_barrier_init((barrier), NULL, (n))
154 #define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
155 #define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
156 #endif
159 /********************************************************************
160 * some configurable parameters */
162 #define DEFAULT_INIT_STEPS "dtgvp" /* default -I setting */
163 #define ALL_INIT_STEPS "dtgGvpf" /* all possible steps */
165 #define LOG_STEP_SECONDS 5 /* seconds between log messages */
166 #define DEFAULT_NXACTS 10 /* default nxacts */
168 #define MIN_GAUSSIAN_PARAM 2.0 /* minimum parameter for gauss */
170 #define MIN_ZIPFIAN_PARAM 1.001 /* minimum parameter for zipfian */
171 #define MAX_ZIPFIAN_PARAM 1000.0 /* maximum parameter for zipfian */
173 int nxacts = 0; /* number of transactions per client */
174 int duration = 0; /* duration in seconds */
175 int64 end_time = 0; /* when to stop in micro seconds, under -T */
178 * scaling factor. for example, scale = 10 will make 1000000 tuples in
179 * pgbench_accounts table.
181 int scale = 1;
184 * fillfactor. for example, fillfactor = 90 will use only 90 percent
185 * space during inserts and leave 10 percent free.
187 int fillfactor = 100;
190 * use unlogged tables?
192 bool unlogged_tables = false;
195 * log sampling rate (1.0 = log everything, 0.0 = option not given)
197 double sample_rate = 0.0;
200 * When threads are throttled to a given rate limit, this is the target delay
201 * to reach that rate in usec. 0 is the default and means no throttling.
203 double throttle_delay = 0;
206 * Transactions which take longer than this limit (in usec) are counted as
207 * late, and reported as such, although they are completed anyway. When
208 * throttling is enabled, execution time slots that are more than this late
209 * are skipped altogether, and counted separately.
211 int64 latency_limit = 0;
214 * tablespace selection
216 char *tablespace = NULL;
217 char *index_tablespace = NULL;
220 * Number of "pgbench_accounts" partitions. 0 is the default and means no
221 * partitioning.
223 static int partitions = 0;
225 /* partitioning strategy for "pgbench_accounts" */
226 typedef enum
228 PART_NONE, /* no partitioning */
229 PART_RANGE, /* range partitioning */
230 PART_HASH, /* hash partitioning */
231 } partition_method_t;
233 static partition_method_t partition_method = PART_NONE;
234 static const char *const PARTITION_METHOD[] = {"none", "range", "hash"};
236 /* random seed used to initialize base_random_sequence */
237 int64 random_seed = -1;
240 * end of configurable parameters
241 *********************************************************************/
243 #define nbranches 1 /* Makes little sense to change this. Change
244 * -s instead */
245 #define ntellers 10
246 #define naccounts 100000
249 * The scale factor at/beyond which 32bit integers are incapable of storing
250 * 64bit values.
252 * Although the actual threshold is 21474, we use 20000 because it is easier to
253 * document and remember, and isn't that far away from the real threshold.
255 #define SCALE_32BIT_THRESHOLD 20000
257 bool use_log; /* log transaction latencies to a file */
258 bool use_quiet; /* quiet logging onto stderr */
259 int agg_interval; /* log aggregates instead of individual
260 * transactions */
261 bool per_script_stats = false; /* whether to collect stats per script */
262 int progress = 0; /* thread progress report every this seconds */
263 bool progress_timestamp = false; /* progress report with Unix time */
264 int nclients = 1; /* number of clients */
265 int nthreads = 1; /* number of threads */
266 bool is_connect; /* establish connection for each transaction */
267 bool report_per_command = false; /* report per-command latencies,
268 * retries after errors and failures
269 * (errors without retrying) */
270 int main_pid; /* main process id used in log filename */
273 * There are different types of restrictions for deciding that the current
274 * transaction with a serialization/deadlock error can no longer be retried and
275 * should be reported as failed:
276 * - max_tries (--max-tries) can be used to limit the number of tries;
277 * - latency_limit (-L) can be used to limit the total time of tries;
278 * - duration (-T) can be used to limit the total benchmark time.
280 * They can be combined together, and you need to use at least one of them to
281 * retry the transactions with serialization/deadlock errors. If none of them is
282 * used, the default value of max_tries is 1 and such transactions will not be
283 * retried.
287 * We cannot retry a transaction after the serialization/deadlock error if its
288 * number of tries reaches this maximum; if its value is zero, it is not used.
290 uint32 max_tries = 1;
292 bool failures_detailed = false; /* whether to group failures in
293 * reports or logs by basic types */
295 const char *pghost = NULL;
296 const char *pgport = NULL;
297 const char *username = NULL;
298 const char *dbName = NULL;
299 char *logfile_prefix = NULL;
300 const char *progname;
302 #define WSEP '@' /* weight separator */
304 volatile sig_atomic_t timer_exceeded = false; /* flag from signal handler */
307 * We don't want to allocate variables one by one; for efficiency, add a
308 * constant margin each time it overflows.
310 #define VARIABLES_ALLOC_MARGIN 8
313 * Variable definitions.
315 * If a variable only has a string value, "svalue" is that value, and value is
316 * "not set". If the value is known, "value" contains the value (in any
317 * variant).
319 * In this case "svalue" contains the string equivalent of the value, if we've
320 * had occasion to compute that, or NULL if we haven't.
322 typedef struct
324 char *name; /* variable's name */
325 char *svalue; /* its value in string form, if known */
326 PgBenchValue value; /* actual variable's value */
327 } Variable;
330 * Data structure for client variables.
332 typedef struct
334 Variable *vars; /* array of variable definitions */
335 int nvars; /* number of variables */
338 * The maximum number of variables that we can currently store in 'vars'
339 * without having to reallocate more space. We must always have max_vars
340 * >= nvars.
342 int max_vars;
344 bool vars_sorted; /* are variables sorted by name? */
345 } Variables;
347 #define MAX_SCRIPTS 128 /* max number of SQL scripts allowed */
348 #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
351 * Simple data structure to keep stats about something.
353 * XXX probably the first value should be kept and used as an offset for
354 * better numerical stability...
356 typedef struct SimpleStats
358 int64 count; /* how many values were encountered */
359 double min; /* the minimum seen */
360 double max; /* the maximum seen */
361 double sum; /* sum of values */
362 double sum2; /* sum of squared values */
363 } SimpleStats;
366 * The instr_time type is expensive when dealing with time arithmetic. Define
367 * a type to hold microseconds instead. Type int64 is good enough for about
368 * 584500 years.
370 typedef int64 pg_time_usec_t;
373 * Data structure to hold various statistics: per-thread and per-script stats
374 * are maintained and merged together.
376 typedef struct StatsData
378 pg_time_usec_t start_time; /* interval start time, for aggregates */
380 /*----------
381 * Transactions are counted depending on their execution and outcome.
382 * First a transaction may have started or not: skipped transactions occur
383 * under --rate and --latency-limit when the client is too late to execute
384 * them. Secondly, a started transaction may ultimately succeed or fail,
385 * possibly after some retries when --max-tries is not one. Thus
387 * the number of all transactions =
388 * 'skipped' (it was too late to execute them) +
389 * 'cnt' (the number of successful transactions) +
390 * 'failed' (the number of failed transactions).
392 * A successful transaction can have several unsuccessful tries before a
393 * successful run. Thus
395 * 'cnt' (the number of successful transactions) =
396 * successfully retried transactions (they got a serialization or a
397 * deadlock error(s), but were
398 * successfully retried from the very
399 * beginning) +
400 * directly successful transactions (they were successfully completed on
401 * the first try).
403 * A failed transaction is defined as unsuccessfully retried transactions.
404 * It can be one of two types:
406 * failed (the number of failed transactions) =
407 * 'serialization_failures' (they got a serialization error and were not
408 * successfully retried) +
409 * 'deadlock_failures' (they got a deadlock error and were not
410 * successfully retried).
412 * If the transaction was retried after a serialization or a deadlock
413 * error this does not guarantee that this retry was successful. Thus
415 * 'retries' (number of retries) =
416 * number of retries in all retried transactions =
417 * number of retries in (successfully retried transactions +
418 * failed transactions);
420 * 'retried' (number of all retried transactions) =
421 * successfully retried transactions +
422 * failed transactions.
423 *----------
425 int64 cnt; /* number of successful transactions, not
426 * including 'skipped' */
427 int64 skipped; /* number of transactions skipped under --rate
428 * and --latency-limit */
429 int64 retries; /* number of retries after a serialization or
430 * a deadlock error in all the transactions */
431 int64 retried; /* number of all transactions that were
432 * retried after a serialization or a deadlock
433 * error (perhaps the last try was
434 * unsuccessful) */
435 int64 serialization_failures; /* number of transactions that were
436 * not successfully retried after a
437 * serialization error */
438 int64 deadlock_failures; /* number of transactions that were not
439 * successfully retried after a deadlock
440 * error */
441 SimpleStats latency;
442 SimpleStats lag;
443 } StatsData;
446 * For displaying Unix epoch timestamps, as some time functions may have
447 * another reference.
449 pg_time_usec_t epoch_shift;
452 * Error status for errors during script execution.
454 typedef enum EStatus
456 ESTATUS_NO_ERROR = 0,
457 ESTATUS_META_COMMAND_ERROR,
459 /* SQL errors */
460 ESTATUS_SERIALIZATION_ERROR,
461 ESTATUS_DEADLOCK_ERROR,
462 ESTATUS_OTHER_SQL_ERROR,
463 } EStatus;
466 * Transaction status at the end of a command.
468 typedef enum TStatus
470 TSTATUS_IDLE,
471 TSTATUS_IN_BLOCK,
472 TSTATUS_CONN_ERROR,
473 TSTATUS_OTHER_ERROR,
474 } TStatus;
476 /* Various random sequences are initialized from this one. */
477 static pg_prng_state base_random_sequence;
479 /* Synchronization barrier for start and connection */
480 static THREAD_BARRIER_T barrier;
483 * Connection state machine states.
485 typedef enum
488 * The client must first choose a script to execute. Once chosen, it can
489 * either be throttled (state CSTATE_PREPARE_THROTTLE under --rate), start
490 * right away (state CSTATE_START_TX) or not start at all if the timer was
491 * exceeded (state CSTATE_FINISHED).
493 CSTATE_CHOOSE_SCRIPT,
496 * CSTATE_START_TX performs start-of-transaction processing. Establishes
497 * a new connection for the transaction in --connect mode, records the
498 * transaction start time, and proceed to the first command.
500 * Note: once a script is started, it will either error or run till its
501 * end, where it may be interrupted. It is not interrupted while running,
502 * so pgbench --time is to be understood as tx are allowed to start in
503 * that time, and will finish when their work is completed.
505 CSTATE_START_TX,
508 * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
509 * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
510 * sleeps until that moment, then advances to CSTATE_START_TX, or
511 * CSTATE_FINISHED if the next transaction would start beyond the end of
512 * the run.
514 CSTATE_PREPARE_THROTTLE,
515 CSTATE_THROTTLE,
518 * We loop through these states, to process each command in the script:
520 * CSTATE_START_COMMAND starts the execution of a command. On a SQL
521 * command, the command is sent to the server, and we move to
522 * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
523 * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
524 * wait for it to expire. Other meta-commands are executed immediately. If
525 * the command about to start is actually beyond the end of the script,
526 * advance to CSTATE_END_TX.
528 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
529 * for the current command.
531 * CSTATE_SLEEP waits until the end of \sleep.
533 * CSTATE_END_COMMAND records the end-of-command timestamp, increments the
534 * command counter, and loops back to CSTATE_START_COMMAND state.
536 * CSTATE_SKIP_COMMAND is used by conditional branches which are not
537 * executed. It quickly skip commands that do not need any evaluation.
538 * This state can move forward several commands, till there is something
539 * to do or the end of the script.
541 CSTATE_START_COMMAND,
542 CSTATE_WAIT_RESULT,
543 CSTATE_SLEEP,
544 CSTATE_END_COMMAND,
545 CSTATE_SKIP_COMMAND,
548 * States for failed commands.
550 * If the SQL/meta command fails, in CSTATE_ERROR clean up after an error:
551 * (1) clear the conditional stack; (2) if we have an unterminated
552 * (possibly failed) transaction block, send the rollback command to the
553 * server and wait for the result in CSTATE_WAIT_ROLLBACK_RESULT. If
554 * something goes wrong with rolling back, go to CSTATE_ABORTED.
556 * But if everything is ok we are ready for future transactions: if this
557 * is a serialization or deadlock error and we can re-execute the
558 * transaction from the very beginning, go to CSTATE_RETRY; otherwise go
559 * to CSTATE_FAILURE.
561 * In CSTATE_RETRY report an error, set the same parameters for the
562 * transaction execution as in the previous tries and process the first
563 * transaction command in CSTATE_START_COMMAND.
565 * In CSTATE_FAILURE report a failure, set the parameters for the
566 * transaction execution as they were before the first run of this
567 * transaction (except for a random state) and go to CSTATE_END_TX to
568 * complete this transaction.
570 CSTATE_ERROR,
571 CSTATE_WAIT_ROLLBACK_RESULT,
572 CSTATE_RETRY,
573 CSTATE_FAILURE,
576 * CSTATE_END_TX performs end-of-transaction processing. It calculates
577 * latency, and logs the transaction. In --connect mode, it closes the
578 * current connection.
580 * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters
581 * CSTATE_FINISHED if we have no more work to do.
583 CSTATE_END_TX,
586 * Final states. CSTATE_ABORTED means that the script execution was
587 * aborted because a command failed, CSTATE_FINISHED means success.
589 CSTATE_ABORTED,
590 CSTATE_FINISHED,
591 } ConnectionStateEnum;
594 * Connection state.
596 typedef struct
598 PGconn *con; /* connection handle to DB */
599 int id; /* client No. */
600 ConnectionStateEnum state; /* state machine's current state. */
601 ConditionalStack cstack; /* enclosing conditionals state */
604 * Separate randomness for each client. This is used for random functions
605 * PGBENCH_RANDOM_* during the execution of the script.
607 pg_prng_state cs_func_rs;
609 int use_file; /* index in sql_script for this client */
610 int command; /* command number in script */
612 /* client variables */
613 Variables variables;
615 /* various times about current transaction in microseconds */
616 pg_time_usec_t txn_scheduled; /* scheduled start time of transaction */
617 pg_time_usec_t sleep_until; /* scheduled start time of next cmd */
618 pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
619 pg_time_usec_t stmt_begin; /* used for measuring statement latencies */
621 /* whether client prepared each command of each script */
622 bool **prepared;
625 * For processing failures and repeating transactions with serialization
626 * or deadlock errors:
628 EStatus estatus; /* the error status of the current transaction
629 * execution; this is ESTATUS_NO_ERROR if
630 * there were no errors */
631 pg_prng_state random_state; /* random state */
632 uint32 tries; /* how many times have we already tried the
633 * current transaction? */
635 /* per client collected stats */
636 int64 cnt; /* client transaction count, for -t; skipped
637 * and failed transactions are also counted
638 * here */
639 } CState;
642 * Thread state
644 typedef struct
646 int tid; /* thread id */
647 THREAD_T thread; /* thread handle */
648 CState *state; /* array of CState */
649 int nstate; /* length of state[] */
652 * Separate randomness for each thread. Each thread option uses its own
653 * random state to make all of them independent of each other and
654 * therefore deterministic at the thread level.
656 pg_prng_state ts_choose_rs; /* random state for selecting a script */
657 pg_prng_state ts_throttle_rs; /* random state for transaction throttling */
658 pg_prng_state ts_sample_rs; /* random state for log sampling */
660 int64 throttle_trigger; /* previous/next throttling (us) */
661 FILE *logfile; /* where to log, or NULL */
663 /* per thread collected stats in microseconds */
664 pg_time_usec_t create_time; /* thread creation time */
665 pg_time_usec_t started_time; /* thread is running */
666 pg_time_usec_t bench_start; /* thread is benchmarking */
667 pg_time_usec_t conn_duration; /* cumulated connection and disconnection
668 * delays */
670 StatsData stats;
671 int64 latency_late; /* count executed but late transactions */
672 } TState;
675 * queries read from files
677 #define SQL_COMMAND 1
678 #define META_COMMAND 2
681 * max number of backslash command arguments or SQL variables,
682 * including the command or SQL statement itself
684 #define MAX_ARGS 256
686 typedef enum MetaCommand
688 META_NONE, /* not a known meta-command */
689 META_SET, /* \set */
690 META_SETSHELL, /* \setshell */
691 META_SHELL, /* \shell */
692 META_SLEEP, /* \sleep */
693 META_GSET, /* \gset */
694 META_ASET, /* \aset */
695 META_IF, /* \if */
696 META_ELIF, /* \elif */
697 META_ELSE, /* \else */
698 META_ENDIF, /* \endif */
699 META_STARTPIPELINE, /* \startpipeline */
700 META_ENDPIPELINE, /* \endpipeline */
701 } MetaCommand;
703 typedef enum QueryMode
705 QUERY_SIMPLE, /* simple query */
706 QUERY_EXTENDED, /* extended query */
707 QUERY_PREPARED, /* extended query with prepared statements */
708 NUM_QUERYMODE
709 } QueryMode;
711 static QueryMode querymode = QUERY_SIMPLE;
712 static const char *const QUERYMODE[] = {"simple", "extended", "prepared"};
715 * struct Command represents one command in a script.
717 * lines The raw, possibly multi-line command text. Variable substitution
718 * not applied.
719 * first_line A short, single-line extract of 'lines', for error reporting.
720 * type SQL_COMMAND or META_COMMAND
721 * meta The type of meta-command, with META_NONE/GSET/ASET if command
722 * is SQL.
723 * argc Number of arguments of the command, 0 if not yet processed.
724 * argv Command arguments, the first of which is the command or SQL
725 * string itself. For SQL commands, after post-processing
726 * argv[0] is the same as 'lines' with variables substituted.
727 * prepname The name that this command is prepared under, in prepare mode
728 * varprefix SQL commands terminated with \gset or \aset have this set
729 * to a non NULL value. If nonempty, it's used to prefix the
730 * variable name that receives the value.
731 * aset do gset on all possible queries of a combined query (\;).
732 * expr Parsed expression, if needed.
733 * stats Time spent in this command.
734 * retries Number of retries after a serialization or deadlock error in the
735 * current command.
736 * failures Number of errors in the current command that were not retried.
738 typedef struct Command
740 PQExpBufferData lines;
741 char *first_line;
742 int type;
743 MetaCommand meta;
744 int argc;
745 char *argv[MAX_ARGS];
746 char *prepname;
747 char *varprefix;
748 PgBenchExpr *expr;
749 SimpleStats stats;
750 int64 retries;
751 int64 failures;
752 } Command;
754 typedef struct ParsedScript
756 const char *desc; /* script descriptor (eg, file name) */
757 int weight; /* selection weight */
758 Command **commands; /* NULL-terminated array of Commands */
759 StatsData stats; /* total time spent in script */
760 } ParsedScript;
762 static ParsedScript sql_script[MAX_SCRIPTS]; /* SQL script files */
763 static int num_scripts; /* number of scripts in sql_script[] */
764 static int64 total_weight = 0;
766 static bool verbose_errors = false; /* print verbose messages of all errors */
768 static bool exit_on_abort = false; /* exit when any client is aborted */
770 /* Builtin test scripts */
771 typedef struct BuiltinScript
773 const char *name; /* very short name for -b ... */
774 const char *desc; /* short description */
775 const char *script; /* actual pgbench script */
776 } BuiltinScript;
778 static const BuiltinScript builtin_script[] =
781 "tpcb-like",
782 "<builtin: TPC-B (sort of)>",
783 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
784 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
785 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
786 "\\set delta random(-5000, 5000)\n"
787 "BEGIN;\n"
788 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
789 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
790 "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n"
791 "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n"
792 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
793 "END;\n"
796 "simple-update",
797 "<builtin: simple update>",
798 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
799 "\\set bid random(1, " CppAsString2(nbranches) " * :scale)\n"
800 "\\set tid random(1, " CppAsString2(ntellers) " * :scale)\n"
801 "\\set delta random(-5000, 5000)\n"
802 "BEGIN;\n"
803 "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n"
804 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
805 "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n"
806 "END;\n"
809 "select-only",
810 "<builtin: select only>",
811 "\\set aid random(1, " CppAsString2(naccounts) " * :scale)\n"
812 "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n"
817 /* Function prototypes */
818 static void setNullValue(PgBenchValue *pv);
819 static void setBoolValue(PgBenchValue *pv, bool bval);
820 static void setIntValue(PgBenchValue *pv, int64 ival);
821 static void setDoubleValue(PgBenchValue *pv, double dval);
822 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
823 PgBenchValue *retval);
824 static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
825 static void doLog(TState *thread, CState *st,
826 StatsData *agg, bool skipped, double latency, double lag);
827 static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
828 bool skipped, StatsData *agg);
829 static void addScript(const ParsedScript *script);
830 static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC threadRun(void *arg);
831 static void finishCon(CState *st);
832 static void setalarm(int seconds);
833 static socket_set *alloc_socket_set(int count);
834 static void free_socket_set(socket_set *sa);
835 static void clear_socket_set(socket_set *sa);
836 static void add_socket_to_set(socket_set *sa, int fd, int idx);
837 static int wait_on_socket_set(socket_set *sa, int64 usecs);
838 static bool socket_has_input(socket_set *sa, int fd, int idx);
840 /* callback used to build rows for COPY during data loading */
841 typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
843 /* callback functions for our flex lexer */
844 static const PsqlScanCallbacks pgbench_callbacks = {
845 NULL, /* don't need get_variable functionality */
848 static inline pg_time_usec_t
849 pg_time_now(void)
851 instr_time now;
853 INSTR_TIME_SET_CURRENT(now);
855 return (pg_time_usec_t) INSTR_TIME_GET_MICROSEC(now);
858 static inline void
859 pg_time_now_lazy(pg_time_usec_t *now)
861 if ((*now) == 0)
862 (*now) = pg_time_now();
865 #define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))
867 static void
868 usage(void)
870 printf("%s is a benchmarking tool for PostgreSQL.\n\n"
871 "Usage:\n"
872 " %s [OPTION]... [DBNAME]\n"
873 "\nInitialization options:\n"
874 " -i, --initialize invokes initialization mode\n"
875 " -I, --init-steps=[" ALL_INIT_STEPS "]+ (default \"" DEFAULT_INIT_STEPS "\")\n"
876 " run selected initialization steps, in the specified order\n"
877 " d: drop any existing pgbench tables\n"
878 " t: create the tables used by the standard pgbench scenario\n"
879 " g: generate data, client-side\n"
880 " G: generate data, server-side\n"
881 " v: invoke VACUUM on the standard tables\n"
882 " p: create primary key indexes on the standard tables\n"
883 " f: create foreign keys between the standard tables\n"
884 " -F, --fillfactor=NUM set fill factor\n"
885 " -n, --no-vacuum do not run VACUUM during initialization\n"
886 " -q, --quiet quiet logging (one message each 5 seconds)\n"
887 " -s, --scale=NUM scaling factor\n"
888 " --foreign-keys create foreign key constraints between tables\n"
889 " --index-tablespace=TABLESPACE\n"
890 " create indexes in the specified tablespace\n"
891 " --partition-method=(range|hash)\n"
892 " partition pgbench_accounts with this method (default: range)\n"
893 " --partitions=NUM partition pgbench_accounts into NUM parts (default: 0)\n"
894 " --tablespace=TABLESPACE create tables in the specified tablespace\n"
895 " --unlogged-tables create tables as unlogged tables\n"
896 "\nOptions to select what to run:\n"
897 " -b, --builtin=NAME[@W] add builtin script NAME weighted at W (default: 1)\n"
898 " (use \"-b list\" to list available scripts)\n"
899 " -f, --file=FILENAME[@W] add script FILENAME weighted at W (default: 1)\n"
900 " -N, --skip-some-updates skip updates of pgbench_tellers and pgbench_branches\n"
901 " (same as \"-b simple-update\")\n"
902 " -S, --select-only perform SELECT-only transactions\n"
903 " (same as \"-b select-only\")\n"
904 "\nBenchmarking options:\n"
905 " -c, --client=NUM number of concurrent database clients (default: 1)\n"
906 " -C, --connect establish new connection for each transaction\n"
907 " -D, --define=VARNAME=VALUE\n"
908 " define variable for use by custom script\n"
909 " -j, --jobs=NUM number of threads (default: 1)\n"
910 " -l, --log write transaction times to log file\n"
911 " -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
912 " -M, --protocol=simple|extended|prepared\n"
913 " protocol for submitting queries (default: simple)\n"
914 " -n, --no-vacuum do not run VACUUM before tests\n"
915 " -P, --progress=NUM show thread progress report every NUM seconds\n"
916 " -r, --report-per-command report latencies, failures, and retries per command\n"
917 " -R, --rate=NUM target rate in transactions per second\n"
918 " -s, --scale=NUM report this scale factor in output\n"
919 " -t, --transactions=NUM number of transactions each client runs (default: 10)\n"
920 " -T, --time=NUM duration of benchmark test in seconds\n"
921 " -v, --vacuum-all vacuum all four standard tables before tests\n"
922 " --aggregate-interval=NUM aggregate data over NUM seconds\n"
923 " --exit-on-abort exit when any client is aborted\n"
924 " --failures-detailed report the failures grouped by basic types\n"
925 " --log-prefix=PREFIX prefix for transaction time log file\n"
926 " (default: \"pgbench_log\")\n"
927 " --max-tries=NUM max number of tries to run transaction (default: 1)\n"
928 " --progress-timestamp use Unix epoch timestamps for progress\n"
929 " --random-seed=SEED set random seed (\"time\", \"rand\", integer)\n"
930 " --sampling-rate=NUM fraction of transactions to log (e.g., 0.01 for 1%%)\n"
931 " --show-script=NAME show builtin script code, then exit\n"
932 " --verbose-errors print messages of all errors\n"
933 "\nCommon options:\n"
934 " -d, --debug print debugging output\n"
935 " -h, --host=HOSTNAME database server host or socket directory\n"
936 " -p, --port=PORT database server port number\n"
937 " -U, --username=USERNAME connect as specified database user\n"
938 " -V, --version output version information, then exit\n"
939 " -?, --help show this help, then exit\n"
940 "\n"
941 "Report bugs to <%s>.\n"
942 "%s home page: <%s>\n",
943 progname, progname, PACKAGE_BUGREPORT, PACKAGE_NAME, PACKAGE_URL);
946 /* return whether str matches "^\s*[-+]?[0-9]+$" */
947 static bool
948 is_an_int(const char *str)
950 const char *ptr = str;
952 /* skip leading spaces; cast is consistent with strtoint64 */
953 while (*ptr && isspace((unsigned char) *ptr))
954 ptr++;
956 /* skip sign */
957 if (*ptr == '+' || *ptr == '-')
958 ptr++;
960 /* at least one digit */
961 if (*ptr && !isdigit((unsigned char) *ptr))
962 return false;
964 /* eat all digits */
965 while (*ptr && isdigit((unsigned char) *ptr))
966 ptr++;
968 /* must have reached end of string */
969 return *ptr == '\0';
974 * strtoint64 -- convert a string to 64-bit integer
976 * This function is a slightly modified version of pg_strtoint64() from
977 * src/backend/utils/adt/numutils.c.
979 * The function returns whether the conversion worked, and if so
980 * "*result" is set to the result.
982 * If not errorOK, an error message is also printed out on errors.
984 bool
985 strtoint64(const char *str, bool errorOK, int64 *result)
987 const char *ptr = str;
988 int64 tmp = 0;
989 bool neg = false;
992 * Do our own scan, rather than relying on sscanf which might be broken
993 * for long long.
995 * As INT64_MIN can't be stored as a positive 64 bit integer, accumulate
996 * value as a negative number.
999 /* skip leading spaces */
1000 while (*ptr && isspace((unsigned char) *ptr))
1001 ptr++;
1003 /* handle sign */
1004 if (*ptr == '-')
1006 ptr++;
1007 neg = true;
1009 else if (*ptr == '+')
1010 ptr++;
1012 /* require at least one digit */
1013 if (unlikely(!isdigit((unsigned char) *ptr)))
1014 goto invalid_syntax;
1016 /* process digits */
1017 while (*ptr && isdigit((unsigned char) *ptr))
1019 int8 digit = (*ptr++ - '0');
1021 if (unlikely(pg_mul_s64_overflow(tmp, 10, &tmp)) ||
1022 unlikely(pg_sub_s64_overflow(tmp, digit, &tmp)))
1023 goto out_of_range;
1026 /* allow trailing whitespace, but not other trailing chars */
1027 while (*ptr != '\0' && isspace((unsigned char) *ptr))
1028 ptr++;
1030 if (unlikely(*ptr != '\0'))
1031 goto invalid_syntax;
1033 if (!neg)
1035 if (unlikely(tmp == PG_INT64_MIN))
1036 goto out_of_range;
1037 tmp = -tmp;
1040 *result = tmp;
1041 return true;
1043 out_of_range:
1044 if (!errorOK)
1045 pg_log_error("value \"%s\" is out of range for type bigint", str);
1046 return false;
1048 invalid_syntax:
1049 if (!errorOK)
1050 pg_log_error("invalid input syntax for type bigint: \"%s\"", str);
1051 return false;
1054 /* convert string to double, detecting overflows/underflows */
1055 bool
1056 strtodouble(const char *str, bool errorOK, double *dv)
1058 char *end;
1060 errno = 0;
1061 *dv = strtod(str, &end);
1063 if (unlikely(errno != 0))
1065 if (!errorOK)
1066 pg_log_error("value \"%s\" is out of range for type double", str);
1067 return false;
1070 if (unlikely(end == str || *end != '\0'))
1072 if (!errorOK)
1073 pg_log_error("invalid input syntax for type double: \"%s\"", str);
1074 return false;
1076 return true;
1080 * Initialize a prng state struct.
1082 * We derive the seed from base_random_sequence, which must be set up already.
1084 static void
1085 initRandomState(pg_prng_state *state)
1087 pg_prng_seed(state, pg_prng_uint64(&base_random_sequence));
1092 * random number generator: uniform distribution from min to max inclusive.
1094 * Although the limits are expressed as int64, you can't generate the full
1095 * int64 range in one call, because the difference of the limits mustn't
1096 * overflow int64. This is not checked.
1098 static int64
1099 getrand(pg_prng_state *state, int64 min, int64 max)
1101 return min + (int64) pg_prng_uint64_range(state, 0, max - min);
1105 * random number generator: exponential distribution from min to max inclusive.
1106 * the parameter is so that the density of probability for the last cut-off max
1107 * value is exp(-parameter).
1109 static int64
1110 getExponentialRand(pg_prng_state *state, int64 min, int64 max,
1111 double parameter)
1113 double cut,
1114 uniform,
1115 rand;
1117 /* abort if wrong parameter, but must really be checked beforehand */
1118 Assert(parameter > 0.0);
1119 cut = exp(-parameter);
1120 /* pg_prng_double value in [0, 1), uniform in (0, 1] */
1121 uniform = 1.0 - pg_prng_double(state);
1124 * inner expression in (cut, 1] (if parameter > 0), rand in [0, 1)
1126 Assert((1.0 - cut) != 0.0);
1127 rand = -log(cut + (1.0 - cut) * uniform) / parameter;
1128 /* return int64 random number within between min and max */
1129 return min + (int64) ((max - min + 1) * rand);
1132 /* random number generator: gaussian distribution from min to max inclusive */
1133 static int64
1134 getGaussianRand(pg_prng_state *state, int64 min, int64 max,
1135 double parameter)
1137 double stdev;
1138 double rand;
1140 /* abort if parameter is too low, but must really be checked beforehand */
1141 Assert(parameter >= MIN_GAUSSIAN_PARAM);
1144 * Get normally-distributed random number in the range -parameter <= stdev
1145 * < parameter.
1147 * This loop is executed until the number is in the expected range.
1149 * As the minimum parameter is 2.0, the probability of looping is low:
1150 * sqrt(-2 ln(r)) <= 2 => r >= e^{-2} ~ 0.135, then when taking the
1151 * average sinus multiplier as 2/pi, we have a 8.6% looping probability in
1152 * the worst case. For a parameter value of 5.0, the looping probability
1153 * is about e^{-5} * 2 / pi ~ 0.43%.
1157 stdev = pg_prng_double_normal(state);
1159 while (stdev < -parameter || stdev >= parameter);
1161 /* stdev is in [-parameter, parameter), normalization to [0,1) */
1162 rand = (stdev + parameter) / (parameter * 2.0);
1164 /* return int64 random number within between min and max */
1165 return min + (int64) ((max - min + 1) * rand);
1169 * random number generator: generate a value, such that the series of values
1170 * will approximate a Poisson distribution centered on the given value.
1172 * Individual results are rounded to integers, though the center value need
1173 * not be one.
1175 static int64
1176 getPoissonRand(pg_prng_state *state, double center)
1179 * Use inverse transform sampling to generate a value > 0, such that the
1180 * expected (i.e. average) value is the given argument.
1182 double uniform;
1184 /* pg_prng_double value in [0, 1), uniform in (0, 1] */
1185 uniform = 1.0 - pg_prng_double(state);
1187 return (int64) (-log(uniform) * center + 0.5);
1191 * Computing zipfian using rejection method, based on
1192 * "Non-Uniform Random Variate Generation",
1193 * Luc Devroye, p. 550-551, Springer 1986.
1195 * This works for s > 1.0, but may perform badly for s very close to 1.0.
1197 static int64
1198 computeIterativeZipfian(pg_prng_state *state, int64 n, double s)
1200 double b = pow(2.0, s - 1.0);
1201 double x,
1206 /* Ensure n is sane */
1207 if (n <= 1)
1208 return 1;
1210 while (true)
1212 /* random variates */
1213 u = pg_prng_double(state);
1214 v = pg_prng_double(state);
1216 x = floor(pow(u, -1.0 / (s - 1.0)));
1218 t = pow(1.0 + 1.0 / x, s - 1.0);
1219 /* reject if too large or out of bound */
1220 if (v * x * (t - 1.0) / (b - 1.0) <= t / b && x <= n)
1221 break;
1223 return (int64) x;
1226 /* random number generator: zipfian distribution from min to max inclusive */
1227 static int64
1228 getZipfianRand(pg_prng_state *state, int64 min, int64 max, double s)
1230 int64 n = max - min + 1;
1232 /* abort if parameter is invalid */
1233 Assert(MIN_ZIPFIAN_PARAM <= s && s <= MAX_ZIPFIAN_PARAM);
1235 return min - 1 + computeIterativeZipfian(state, n, s);
1239 * FNV-1a hash function
1241 static int64
1242 getHashFnv1a(int64 val, uint64 seed)
1244 int64 result;
1245 int i;
1247 result = FNV_OFFSET_BASIS ^ seed;
1248 for (i = 0; i < 8; ++i)
1250 int32 octet = val & 0xff;
1252 val = val >> 8;
1253 result = result ^ octet;
1254 result = result * FNV_PRIME;
1257 return result;
1261 * Murmur2 hash function
1263 * Based on original work of Austin Appleby
1264 * https://github.com/aappleby/smhasher/blob/master/src/MurmurHash2.cpp
1266 static int64
1267 getHashMurmur2(int64 val, uint64 seed)
1269 uint64 result = seed ^ MM2_MUL_TIMES_8; /* sizeof(int64) */
1270 uint64 k = (uint64) val;
1272 k *= MM2_MUL;
1273 k ^= k >> MM2_ROT;
1274 k *= MM2_MUL;
1276 result ^= k;
1277 result *= MM2_MUL;
1279 result ^= result >> MM2_ROT;
1280 result *= MM2_MUL;
1281 result ^= result >> MM2_ROT;
1283 return (int64) result;
1287 * Pseudorandom permutation function
1289 * For small sizes, this generates each of the (size!) possible permutations
1290 * of integers in the range [0, size) with roughly equal probability. Once
1291 * the size is larger than 20, the number of possible permutations exceeds the
1292 * number of distinct states of the internal pseudorandom number generator,
1293 * and so not all possible permutations can be generated, but the permutations
1294 * chosen should continue to give the appearance of being random.
1296 * THIS FUNCTION IS NOT CRYPTOGRAPHICALLY SECURE.
1297 * DO NOT USE FOR SUCH PURPOSE.
1299 static int64
1300 permute(const int64 val, const int64 isize, const int64 seed)
1302 /* using a high-end PRNG is probably overkill */
1303 pg_prng_state state;
1304 uint64 size;
1305 uint64 v;
1306 int masklen;
1307 uint64 mask;
1308 int i;
1310 if (isize < 2)
1311 return 0; /* nothing to permute */
1313 /* Initialize prng state using the seed */
1314 pg_prng_seed(&state, (uint64) seed);
1316 /* Computations are performed on unsigned values */
1317 size = (uint64) isize;
1318 v = (uint64) val % size;
1320 /* Mask to work modulo largest power of 2 less than or equal to size */
1321 masklen = pg_leftmost_one_pos64(size);
1322 mask = (((uint64) 1) << masklen) - 1;
1325 * Permute the input value by applying several rounds of pseudorandom
1326 * bijective transformations. The intention here is to distribute each
1327 * input uniformly randomly across the range, and separate adjacent inputs
1328 * approximately uniformly randomly from each other, leading to a fairly
1329 * random overall choice of permutation.
1331 * To separate adjacent inputs, we multiply by a random number modulo
1332 * (mask + 1), which is a power of 2. For this to be a bijection, the
1333 * multiplier must be odd. Since this is known to lead to less randomness
1334 * in the lower bits, we also apply a rotation that shifts the topmost bit
1335 * into the least significant bit. In the special cases where size <= 3,
1336 * mask = 1 and each of these operations is actually a no-op, so we also
1337 * XOR the value with a different random number to inject additional
1338 * randomness. Since the size is generally not a power of 2, we apply
1339 * this bijection on overlapping upper and lower halves of the input.
1341 * To distribute the inputs uniformly across the range, we then also apply
1342 * a random offset modulo the full range.
1344 * Taken together, these operations resemble a modified linear
1345 * congruential generator, as is commonly used in pseudorandom number
1346 * generators. The number of rounds is fairly arbitrary, but six has been
1347 * found empirically to give a fairly good tradeoff between performance
1348 * and uniform randomness. For small sizes it selects each of the (size!)
1349 * possible permutations with roughly equal probability. For larger
1350 * sizes, not all permutations can be generated, but the intended random
1351 * spread is still produced.
1353 for (i = 0; i < 6; i++)
1355 uint64 m,
1359 /* Random multiply (by an odd number), XOR and rotate of lower half */
1360 m = (pg_prng_uint64(&state) & mask) | 1;
1361 r = pg_prng_uint64(&state) & mask;
1362 if (v <= mask)
1364 v = ((v * m) ^ r) & mask;
1365 v = ((v << 1) & mask) | (v >> (masklen - 1));
1368 /* Random multiply (by an odd number), XOR and rotate of upper half */
1369 m = (pg_prng_uint64(&state) & mask) | 1;
1370 r = pg_prng_uint64(&state) & mask;
1371 t = size - 1 - v;
1372 if (t <= mask)
1374 t = ((t * m) ^ r) & mask;
1375 t = ((t << 1) & mask) | (t >> (masklen - 1));
1376 v = size - 1 - t;
1379 /* Random offset */
1380 r = pg_prng_uint64_range(&state, 0, size - 1);
1381 v = (v + r) % size;
1384 return (int64) v;
1388 * Initialize the given SimpleStats struct to all zeroes
1390 static void
1391 initSimpleStats(SimpleStats *ss)
1393 memset(ss, 0, sizeof(SimpleStats));
1397 * Accumulate one value into a SimpleStats struct.
1399 static void
1400 addToSimpleStats(SimpleStats *ss, double val)
1402 if (ss->count == 0 || val < ss->min)
1403 ss->min = val;
1404 if (ss->count == 0 || val > ss->max)
1405 ss->max = val;
1406 ss->count++;
1407 ss->sum += val;
1408 ss->sum2 += val * val;
1412 * Merge two SimpleStats objects
1414 static void
1415 mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
1417 if (acc->count == 0 || ss->min < acc->min)
1418 acc->min = ss->min;
1419 if (acc->count == 0 || ss->max > acc->max)
1420 acc->max = ss->max;
1421 acc->count += ss->count;
1422 acc->sum += ss->sum;
1423 acc->sum2 += ss->sum2;
1427 * Initialize a StatsData struct to mostly zeroes, with its start time set to
1428 * the given value.
1430 static void
1431 initStats(StatsData *sd, pg_time_usec_t start)
1433 sd->start_time = start;
1434 sd->cnt = 0;
1435 sd->skipped = 0;
1436 sd->retries = 0;
1437 sd->retried = 0;
1438 sd->serialization_failures = 0;
1439 sd->deadlock_failures = 0;
1440 initSimpleStats(&sd->latency);
1441 initSimpleStats(&sd->lag);
1445 * Accumulate one additional item into the given stats object.
1447 static void
1448 accumStats(StatsData *stats, bool skipped, double lat, double lag,
1449 EStatus estatus, int64 tries)
1451 /* Record the skipped transaction */
1452 if (skipped)
1454 /* no latency to record on skipped transactions */
1455 stats->skipped++;
1456 return;
1460 * Record the number of retries regardless of whether the transaction was
1461 * successful or failed.
1463 if (tries > 1)
1465 stats->retries += (tries - 1);
1466 stats->retried++;
1469 switch (estatus)
1471 /* Record the successful transaction */
1472 case ESTATUS_NO_ERROR:
1473 stats->cnt++;
1475 addToSimpleStats(&stats->latency, lat);
1477 /* and possibly the same for schedule lag */
1478 if (throttle_delay)
1479 addToSimpleStats(&stats->lag, lag);
1480 break;
1482 /* Record the failed transaction */
1483 case ESTATUS_SERIALIZATION_ERROR:
1484 stats->serialization_failures++;
1485 break;
1486 case ESTATUS_DEADLOCK_ERROR:
1487 stats->deadlock_failures++;
1488 break;
1489 default:
1490 /* internal error which should never occur */
1491 pg_fatal("unexpected error status: %d", estatus);
1495 /* call PQexec() and exit() on failure */
1496 static void
1497 executeStatement(PGconn *con, const char *sql)
1499 PGresult *res;
1501 res = PQexec(con, sql);
1502 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1504 pg_log_error("query failed: %s", PQerrorMessage(con));
1505 pg_log_error_detail("Query was: %s", sql);
1506 exit(1);
1508 PQclear(res);
1511 /* call PQexec() and complain, but without exiting, on failure */
1512 static void
1513 tryExecuteStatement(PGconn *con, const char *sql)
1515 PGresult *res;
1517 res = PQexec(con, sql);
1518 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1520 pg_log_error("%s", PQerrorMessage(con));
1521 pg_log_error_detail("(ignoring this error and continuing anyway)");
1523 PQclear(res);
1526 /* set up a connection to the backend */
1527 static PGconn *
1528 doConnect(void)
1530 PGconn *conn;
1531 bool new_pass;
1532 static char *password = NULL;
1535 * Start the connection. Loop until we have a password if requested by
1536 * backend.
1540 #define PARAMS_ARRAY_SIZE 7
1542 const char *keywords[PARAMS_ARRAY_SIZE];
1543 const char *values[PARAMS_ARRAY_SIZE];
1545 keywords[0] = "host";
1546 values[0] = pghost;
1547 keywords[1] = "port";
1548 values[1] = pgport;
1549 keywords[2] = "user";
1550 values[2] = username;
1551 keywords[3] = "password";
1552 values[3] = password;
1553 keywords[4] = "dbname";
1554 values[4] = dbName;
1555 keywords[5] = "fallback_application_name";
1556 values[5] = progname;
1557 keywords[6] = NULL;
1558 values[6] = NULL;
1560 new_pass = false;
1562 conn = PQconnectdbParams(keywords, values, true);
1564 if (!conn)
1566 pg_log_error("connection to database \"%s\" failed", dbName);
1567 return NULL;
1570 if (PQstatus(conn) == CONNECTION_BAD &&
1571 PQconnectionNeedsPassword(conn) &&
1572 !password)
1574 PQfinish(conn);
1575 password = simple_prompt("Password: ", false);
1576 new_pass = true;
1578 } while (new_pass);
1580 /* check to see that the backend connection was successfully made */
1581 if (PQstatus(conn) == CONNECTION_BAD)
1583 pg_log_error("%s", PQerrorMessage(conn));
1584 PQfinish(conn);
1585 return NULL;
1588 return conn;
1591 /* qsort comparator for Variable array */
1592 static int
1593 compareVariableNames(const void *v1, const void *v2)
1595 return strcmp(((const Variable *) v1)->name,
1596 ((const Variable *) v2)->name);
1599 /* Locate a variable by name; returns NULL if unknown */
1600 static Variable *
1601 lookupVariable(Variables *variables, char *name)
1603 Variable key;
1605 /* On some versions of Solaris, bsearch of zero items dumps core */
1606 if (variables->nvars <= 0)
1607 return NULL;
1609 /* Sort if we have to */
1610 if (!variables->vars_sorted)
1612 qsort(variables->vars, variables->nvars, sizeof(Variable),
1613 compareVariableNames);
1614 variables->vars_sorted = true;
1617 /* Now we can search */
1618 key.name = name;
1619 return (Variable *) bsearch(&key,
1620 variables->vars,
1621 variables->nvars,
1622 sizeof(Variable),
1623 compareVariableNames);
1626 /* Get the value of a variable, in string form; returns NULL if unknown */
1627 static char *
1628 getVariable(Variables *variables, char *name)
1630 Variable *var;
1631 char stringform[64];
1633 var = lookupVariable(variables, name);
1634 if (var == NULL)
1635 return NULL; /* not found */
1637 if (var->svalue)
1638 return var->svalue; /* we have it in string form */
1640 /* We need to produce a string equivalent of the value */
1641 Assert(var->value.type != PGBT_NO_VALUE);
1642 if (var->value.type == PGBT_NULL)
1643 snprintf(stringform, sizeof(stringform), "NULL");
1644 else if (var->value.type == PGBT_BOOLEAN)
1645 snprintf(stringform, sizeof(stringform),
1646 "%s", var->value.u.bval ? "true" : "false");
1647 else if (var->value.type == PGBT_INT)
1648 snprintf(stringform, sizeof(stringform),
1649 INT64_FORMAT, var->value.u.ival);
1650 else if (var->value.type == PGBT_DOUBLE)
1651 snprintf(stringform, sizeof(stringform),
1652 "%.*g", DBL_DIG, var->value.u.dval);
1653 else /* internal error, unexpected type */
1654 Assert(0);
1655 var->svalue = pg_strdup(stringform);
1656 return var->svalue;
1659 /* Try to convert variable to a value; return false on failure */
1660 static bool
1661 makeVariableValue(Variable *var)
1663 size_t slen;
1665 if (var->value.type != PGBT_NO_VALUE)
1666 return true; /* no work */
1668 slen = strlen(var->svalue);
1670 if (slen == 0)
1671 /* what should it do on ""? */
1672 return false;
1674 if (pg_strcasecmp(var->svalue, "null") == 0)
1676 setNullValue(&var->value);
1680 * accept prefixes such as y, ye, n, no... but not for "o". 0/1 are
1681 * recognized later as an int, which is converted to bool if needed.
1683 else if (pg_strncasecmp(var->svalue, "true", slen) == 0 ||
1684 pg_strncasecmp(var->svalue, "yes", slen) == 0 ||
1685 pg_strcasecmp(var->svalue, "on") == 0)
1687 setBoolValue(&var->value, true);
1689 else if (pg_strncasecmp(var->svalue, "false", slen) == 0 ||
1690 pg_strncasecmp(var->svalue, "no", slen) == 0 ||
1691 pg_strcasecmp(var->svalue, "off") == 0 ||
1692 pg_strcasecmp(var->svalue, "of") == 0)
1694 setBoolValue(&var->value, false);
1696 else if (is_an_int(var->svalue))
1698 /* if it looks like an int, it must be an int without overflow */
1699 int64 iv;
1701 if (!strtoint64(var->svalue, false, &iv))
1702 return false;
1704 setIntValue(&var->value, iv);
1706 else /* type should be double */
1708 double dv;
1710 if (!strtodouble(var->svalue, true, &dv))
1712 pg_log_error("malformed variable \"%s\" value: \"%s\"",
1713 var->name, var->svalue);
1714 return false;
1716 setDoubleValue(&var->value, dv);
1718 return true;
1722 * Check whether a variable's name is allowed.
1724 * We allow any non-ASCII character, as well as ASCII letters, digits, and
1725 * underscore.
1727 * Keep this in sync with the definitions of variable name characters in
1728 * "src/fe_utils/psqlscan.l", "src/bin/psql/psqlscanslash.l" and
1729 * "src/bin/pgbench/exprscan.l". Also see parseVariable(), below.
1731 * Note: this static function is copied from "src/bin/psql/variables.c"
1732 * but changed to disallow variable names starting with a digit.
1734 static bool
1735 valid_variable_name(const char *name)
1737 const unsigned char *ptr = (const unsigned char *) name;
1739 /* Mustn't be zero-length */
1740 if (*ptr == '\0')
1741 return false;
1743 /* must not start with [0-9] */
1744 if (IS_HIGHBIT_SET(*ptr) ||
1745 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1746 "_", *ptr) != NULL)
1747 ptr++;
1748 else
1749 return false;
1751 /* remaining characters can include [0-9] */
1752 while (*ptr)
1754 if (IS_HIGHBIT_SET(*ptr) ||
1755 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1756 "_0123456789", *ptr) != NULL)
1757 ptr++;
1758 else
1759 return false;
1762 return true;
1766 * Make sure there is enough space for 'needed' more variable in the variables
1767 * array.
1769 static void
1770 enlargeVariables(Variables *variables, int needed)
1772 /* total number of variables required now */
1773 needed += variables->nvars;
1775 if (variables->max_vars < needed)
1777 variables->max_vars = needed + VARIABLES_ALLOC_MARGIN;
1778 variables->vars = (Variable *)
1779 pg_realloc(variables->vars, variables->max_vars * sizeof(Variable));
1784 * Lookup a variable by name, creating it if need be.
1785 * Caller is expected to assign a value to the variable.
1786 * Returns NULL on failure (bad name).
1788 static Variable *
1789 lookupCreateVariable(Variables *variables, const char *context, char *name)
1791 Variable *var;
1793 var = lookupVariable(variables, name);
1794 if (var == NULL)
1797 * Check for the name only when declaring a new variable to avoid
1798 * overhead.
1800 if (!valid_variable_name(name))
1802 pg_log_error("%s: invalid variable name: \"%s\"", context, name);
1803 return NULL;
1806 /* Create variable at the end of the array */
1807 enlargeVariables(variables, 1);
1809 var = &(variables->vars[variables->nvars]);
1811 var->name = pg_strdup(name);
1812 var->svalue = NULL;
1813 /* caller is expected to initialize remaining fields */
1815 variables->nvars++;
1816 /* we don't re-sort the array till we have to */
1817 variables->vars_sorted = false;
1820 return var;
1823 /* Assign a string value to a variable, creating it if need be */
1824 /* Returns false on failure (bad name) */
1825 static bool
1826 putVariable(Variables *variables, const char *context, char *name,
1827 const char *value)
1829 Variable *var;
1830 char *val;
1832 var = lookupCreateVariable(variables, context, name);
1833 if (!var)
1834 return false;
1836 /* dup then free, in case value is pointing at this variable */
1837 val = pg_strdup(value);
1839 free(var->svalue);
1840 var->svalue = val;
1841 var->value.type = PGBT_NO_VALUE;
1843 return true;
1846 /* Assign a value to a variable, creating it if need be */
1847 /* Returns false on failure (bad name) */
1848 static bool
1849 putVariableValue(Variables *variables, const char *context, char *name,
1850 const PgBenchValue *value)
1852 Variable *var;
1854 var = lookupCreateVariable(variables, context, name);
1855 if (!var)
1856 return false;
1858 free(var->svalue);
1859 var->svalue = NULL;
1860 var->value = *value;
1862 return true;
1865 /* Assign an integer value to a variable, creating it if need be */
1866 /* Returns false on failure (bad name) */
1867 static bool
1868 putVariableInt(Variables *variables, const char *context, char *name,
1869 int64 value)
1871 PgBenchValue val;
1873 setIntValue(&val, value);
1874 return putVariableValue(variables, context, name, &val);
1878 * Parse a possible variable reference (:varname).
1880 * "sql" points at a colon. If what follows it looks like a valid
1881 * variable name, return a malloc'd string containing the variable name,
1882 * and set *eaten to the number of characters consumed (including the colon).
1883 * Otherwise, return NULL.
1885 static char *
1886 parseVariable(const char *sql, int *eaten)
1888 int i = 1; /* starting at 1 skips the colon */
1889 char *name;
1891 /* keep this logic in sync with valid_variable_name() */
1892 if (IS_HIGHBIT_SET(sql[i]) ||
1893 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1894 "_", sql[i]) != NULL)
1895 i++;
1896 else
1897 return NULL;
1899 while (IS_HIGHBIT_SET(sql[i]) ||
1900 strchr("ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"
1901 "_0123456789", sql[i]) != NULL)
1902 i++;
1904 name = pg_malloc(i);
1905 memcpy(name, &sql[1], i - 1);
1906 name[i - 1] = '\0';
1908 *eaten = i;
1909 return name;
1912 static char *
1913 replaceVariable(char **sql, char *param, int len, char *value)
1915 int valueln = strlen(value);
1917 if (valueln > len)
1919 size_t offset = param - *sql;
1921 *sql = pg_realloc(*sql, strlen(*sql) - len + valueln + 1);
1922 param = *sql + offset;
1925 if (valueln != len)
1926 memmove(param + valueln, param + len, strlen(param + len) + 1);
1927 memcpy(param, value, valueln);
1929 return param + valueln;
1932 static char *
1933 assignVariables(Variables *variables, char *sql)
1935 char *p,
1936 *name,
1937 *val;
1939 p = sql;
1940 while ((p = strchr(p, ':')) != NULL)
1942 int eaten;
1944 name = parseVariable(p, &eaten);
1945 if (name == NULL)
1947 while (*p == ':')
1949 p++;
1951 continue;
1954 val = getVariable(variables, name);
1955 free(name);
1956 if (val == NULL)
1958 p++;
1959 continue;
1962 p = replaceVariable(&sql, p, eaten, val);
1965 return sql;
1968 static void
1969 getQueryParams(Variables *variables, const Command *command,
1970 const char **params)
1972 int i;
1974 for (i = 0; i < command->argc - 1; i++)
1975 params[i] = getVariable(variables, command->argv[i + 1]);
1978 static char *
1979 valueTypeName(PgBenchValue *pval)
1981 if (pval->type == PGBT_NO_VALUE)
1982 return "none";
1983 else if (pval->type == PGBT_NULL)
1984 return "null";
1985 else if (pval->type == PGBT_INT)
1986 return "int";
1987 else if (pval->type == PGBT_DOUBLE)
1988 return "double";
1989 else if (pval->type == PGBT_BOOLEAN)
1990 return "boolean";
1991 else
1993 /* internal error, should never get there */
1994 Assert(false);
1995 return NULL;
1999 /* get a value as a boolean, or tell if there is a problem */
2000 static bool
2001 coerceToBool(PgBenchValue *pval, bool *bval)
2003 if (pval->type == PGBT_BOOLEAN)
2005 *bval = pval->u.bval;
2006 return true;
2008 else /* NULL, INT or DOUBLE */
2010 pg_log_error("cannot coerce %s to boolean", valueTypeName(pval));
2011 *bval = false; /* suppress uninitialized-variable warnings */
2012 return false;
2017 * Return true or false from an expression for conditional purposes.
2018 * Non zero numerical values are true, zero and NULL are false.
2020 static bool
2021 valueTruth(PgBenchValue *pval)
2023 switch (pval->type)
2025 case PGBT_NULL:
2026 return false;
2027 case PGBT_BOOLEAN:
2028 return pval->u.bval;
2029 case PGBT_INT:
2030 return pval->u.ival != 0;
2031 case PGBT_DOUBLE:
2032 return pval->u.dval != 0.0;
2033 default:
2034 /* internal error, unexpected type */
2035 Assert(0);
2036 return false;
2040 /* get a value as an int, tell if there is a problem */
2041 static bool
2042 coerceToInt(PgBenchValue *pval, int64 *ival)
2044 if (pval->type == PGBT_INT)
2046 *ival = pval->u.ival;
2047 return true;
2049 else if (pval->type == PGBT_DOUBLE)
2051 double dval = rint(pval->u.dval);
2053 if (isnan(dval) || !FLOAT8_FITS_IN_INT64(dval))
2055 pg_log_error("double to int overflow for %f", dval);
2056 return false;
2058 *ival = (int64) dval;
2059 return true;
2061 else /* BOOLEAN or NULL */
2063 pg_log_error("cannot coerce %s to int", valueTypeName(pval));
2064 return false;
2068 /* get a value as a double, or tell if there is a problem */
2069 static bool
2070 coerceToDouble(PgBenchValue *pval, double *dval)
2072 if (pval->type == PGBT_DOUBLE)
2074 *dval = pval->u.dval;
2075 return true;
2077 else if (pval->type == PGBT_INT)
2079 *dval = (double) pval->u.ival;
2080 return true;
2082 else /* BOOLEAN or NULL */
2084 pg_log_error("cannot coerce %s to double", valueTypeName(pval));
2085 return false;
2089 /* assign a null value */
2090 static void
2091 setNullValue(PgBenchValue *pv)
2093 pv->type = PGBT_NULL;
2094 pv->u.ival = 0;
2097 /* assign a boolean value */
2098 static void
2099 setBoolValue(PgBenchValue *pv, bool bval)
2101 pv->type = PGBT_BOOLEAN;
2102 pv->u.bval = bval;
2105 /* assign an integer value */
2106 static void
2107 setIntValue(PgBenchValue *pv, int64 ival)
2109 pv->type = PGBT_INT;
2110 pv->u.ival = ival;
2113 /* assign a double value */
2114 static void
2115 setDoubleValue(PgBenchValue *pv, double dval)
2117 pv->type = PGBT_DOUBLE;
2118 pv->u.dval = dval;
2121 static bool
2122 isLazyFunc(PgBenchFunction func)
2124 return func == PGBENCH_AND || func == PGBENCH_OR || func == PGBENCH_CASE;
2127 /* lazy evaluation of some functions */
2128 static bool
2129 evalLazyFunc(CState *st,
2130 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
2132 PgBenchValue a1,
2134 bool ba1,
2135 ba2;
2137 Assert(isLazyFunc(func) && args != NULL && args->next != NULL);
2139 /* args points to first condition */
2140 if (!evaluateExpr(st, args->expr, &a1))
2141 return false;
2143 /* second condition for AND/OR and corresponding branch for CASE */
2144 args = args->next;
2146 switch (func)
2148 case PGBENCH_AND:
2149 if (a1.type == PGBT_NULL)
2151 setNullValue(retval);
2152 return true;
2155 if (!coerceToBool(&a1, &ba1))
2156 return false;
2158 if (!ba1)
2160 setBoolValue(retval, false);
2161 return true;
2164 if (!evaluateExpr(st, args->expr, &a2))
2165 return false;
2167 if (a2.type == PGBT_NULL)
2169 setNullValue(retval);
2170 return true;
2172 else if (!coerceToBool(&a2, &ba2))
2173 return false;
2174 else
2176 setBoolValue(retval, ba2);
2177 return true;
2180 return true;
2182 case PGBENCH_OR:
2184 if (a1.type == PGBT_NULL)
2186 setNullValue(retval);
2187 return true;
2190 if (!coerceToBool(&a1, &ba1))
2191 return false;
2193 if (ba1)
2195 setBoolValue(retval, true);
2196 return true;
2199 if (!evaluateExpr(st, args->expr, &a2))
2200 return false;
2202 if (a2.type == PGBT_NULL)
2204 setNullValue(retval);
2205 return true;
2207 else if (!coerceToBool(&a2, &ba2))
2208 return false;
2209 else
2211 setBoolValue(retval, ba2);
2212 return true;
2215 case PGBENCH_CASE:
2216 /* when true, execute branch */
2217 if (valueTruth(&a1))
2218 return evaluateExpr(st, args->expr, retval);
2220 /* now args contains next condition or final else expression */
2221 args = args->next;
2223 /* final else case? */
2224 if (args->next == NULL)
2225 return evaluateExpr(st, args->expr, retval);
2227 /* no, another when, proceed */
2228 return evalLazyFunc(st, PGBENCH_CASE, args, retval);
2230 default:
2231 /* internal error, cannot get here */
2232 Assert(0);
2233 break;
2235 return false;
2238 /* maximum number of function arguments */
2239 #define MAX_FARGS 16
2242 * Recursive evaluation of standard functions,
2243 * which do not require lazy evaluation.
2245 static bool
2246 evalStandardFunc(CState *st,
2247 PgBenchFunction func, PgBenchExprLink *args,
2248 PgBenchValue *retval)
2250 /* evaluate all function arguments */
2251 int nargs = 0;
2252 PgBenchExprLink *l = args;
2253 bool has_null = false;
2256 * This value is double braced to workaround GCC bug 53119, which seems to
2257 * exist at least on gcc (Debian 4.7.2-5) 4.7.2, 32-bit.
2259 PgBenchValue vargs[MAX_FARGS] = {{0}};
2261 for (nargs = 0; nargs < MAX_FARGS && l != NULL; nargs++, l = l->next)
2263 if (!evaluateExpr(st, l->expr, &vargs[nargs]))
2264 return false;
2265 has_null |= vargs[nargs].type == PGBT_NULL;
2268 if (l != NULL)
2270 pg_log_error("too many function arguments, maximum is %d", MAX_FARGS);
2271 return false;
2274 /* NULL arguments */
2275 if (has_null && func != PGBENCH_IS && func != PGBENCH_DEBUG)
2277 setNullValue(retval);
2278 return true;
2281 /* then evaluate function */
2282 switch (func)
2284 /* overloaded operators */
2285 case PGBENCH_ADD:
2286 case PGBENCH_SUB:
2287 case PGBENCH_MUL:
2288 case PGBENCH_DIV:
2289 case PGBENCH_MOD:
2290 case PGBENCH_EQ:
2291 case PGBENCH_NE:
2292 case PGBENCH_LE:
2293 case PGBENCH_LT:
2295 PgBenchValue *lval = &vargs[0],
2296 *rval = &vargs[1];
2298 Assert(nargs == 2);
2300 /* overloaded type management, double if some double */
2301 if ((lval->type == PGBT_DOUBLE ||
2302 rval->type == PGBT_DOUBLE) && func != PGBENCH_MOD)
2304 double ld,
2307 if (!coerceToDouble(lval, &ld) ||
2308 !coerceToDouble(rval, &rd))
2309 return false;
2311 switch (func)
2313 case PGBENCH_ADD:
2314 setDoubleValue(retval, ld + rd);
2315 return true;
2317 case PGBENCH_SUB:
2318 setDoubleValue(retval, ld - rd);
2319 return true;
2321 case PGBENCH_MUL:
2322 setDoubleValue(retval, ld * rd);
2323 return true;
2325 case PGBENCH_DIV:
2326 setDoubleValue(retval, ld / rd);
2327 return true;
2329 case PGBENCH_EQ:
2330 setBoolValue(retval, ld == rd);
2331 return true;
2333 case PGBENCH_NE:
2334 setBoolValue(retval, ld != rd);
2335 return true;
2337 case PGBENCH_LE:
2338 setBoolValue(retval, ld <= rd);
2339 return true;
2341 case PGBENCH_LT:
2342 setBoolValue(retval, ld < rd);
2343 return true;
2345 default:
2346 /* cannot get here */
2347 Assert(0);
2350 else /* we have integer operands, or % */
2352 int64 li,
2354 res;
2356 if (!coerceToInt(lval, &li) ||
2357 !coerceToInt(rval, &ri))
2358 return false;
2360 switch (func)
2362 case PGBENCH_ADD:
2363 if (pg_add_s64_overflow(li, ri, &res))
2365 pg_log_error("bigint add out of range");
2366 return false;
2368 setIntValue(retval, res);
2369 return true;
2371 case PGBENCH_SUB:
2372 if (pg_sub_s64_overflow(li, ri, &res))
2374 pg_log_error("bigint sub out of range");
2375 return false;
2377 setIntValue(retval, res);
2378 return true;
2380 case PGBENCH_MUL:
2381 if (pg_mul_s64_overflow(li, ri, &res))
2383 pg_log_error("bigint mul out of range");
2384 return false;
2386 setIntValue(retval, res);
2387 return true;
2389 case PGBENCH_EQ:
2390 setBoolValue(retval, li == ri);
2391 return true;
2393 case PGBENCH_NE:
2394 setBoolValue(retval, li != ri);
2395 return true;
2397 case PGBENCH_LE:
2398 setBoolValue(retval, li <= ri);
2399 return true;
2401 case PGBENCH_LT:
2402 setBoolValue(retval, li < ri);
2403 return true;
2405 case PGBENCH_DIV:
2406 case PGBENCH_MOD:
2407 if (ri == 0)
2409 pg_log_error("division by zero");
2410 return false;
2412 /* special handling of -1 divisor */
2413 if (ri == -1)
2415 if (func == PGBENCH_DIV)
2417 /* overflow check (needed for INT64_MIN) */
2418 if (li == PG_INT64_MIN)
2420 pg_log_error("bigint div out of range");
2421 return false;
2423 else
2424 setIntValue(retval, -li);
2426 else
2427 setIntValue(retval, 0);
2428 return true;
2430 /* else divisor is not -1 */
2431 if (func == PGBENCH_DIV)
2432 setIntValue(retval, li / ri);
2433 else /* func == PGBENCH_MOD */
2434 setIntValue(retval, li % ri);
2436 return true;
2438 default:
2439 /* cannot get here */
2440 Assert(0);
2444 Assert(0);
2445 return false; /* NOTREACHED */
2448 /* integer bitwise operators */
2449 case PGBENCH_BITAND:
2450 case PGBENCH_BITOR:
2451 case PGBENCH_BITXOR:
2452 case PGBENCH_LSHIFT:
2453 case PGBENCH_RSHIFT:
2455 int64 li,
2458 if (!coerceToInt(&vargs[0], &li) || !coerceToInt(&vargs[1], &ri))
2459 return false;
2461 if (func == PGBENCH_BITAND)
2462 setIntValue(retval, li & ri);
2463 else if (func == PGBENCH_BITOR)
2464 setIntValue(retval, li | ri);
2465 else if (func == PGBENCH_BITXOR)
2466 setIntValue(retval, li ^ ri);
2467 else if (func == PGBENCH_LSHIFT)
2468 setIntValue(retval, li << ri);
2469 else if (func == PGBENCH_RSHIFT)
2470 setIntValue(retval, li >> ri);
2471 else /* cannot get here */
2472 Assert(0);
2474 return true;
2477 /* logical operators */
2478 case PGBENCH_NOT:
2480 bool b;
2482 if (!coerceToBool(&vargs[0], &b))
2483 return false;
2485 setBoolValue(retval, !b);
2486 return true;
2489 /* no arguments */
2490 case PGBENCH_PI:
2491 setDoubleValue(retval, M_PI);
2492 return true;
2494 /* 1 overloaded argument */
2495 case PGBENCH_ABS:
2497 PgBenchValue *varg = &vargs[0];
2499 Assert(nargs == 1);
2501 if (varg->type == PGBT_INT)
2503 int64 i = varg->u.ival;
2505 setIntValue(retval, i < 0 ? -i : i);
2507 else
2509 double d = varg->u.dval;
2511 Assert(varg->type == PGBT_DOUBLE);
2512 setDoubleValue(retval, d < 0.0 ? -d : d);
2515 return true;
2518 case PGBENCH_DEBUG:
2520 PgBenchValue *varg = &vargs[0];
2522 Assert(nargs == 1);
2524 fprintf(stderr, "debug(script=%d,command=%d): ",
2525 st->use_file, st->command + 1);
2527 if (varg->type == PGBT_NULL)
2528 fprintf(stderr, "null\n");
2529 else if (varg->type == PGBT_BOOLEAN)
2530 fprintf(stderr, "boolean %s\n", varg->u.bval ? "true" : "false");
2531 else if (varg->type == PGBT_INT)
2532 fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
2533 else if (varg->type == PGBT_DOUBLE)
2534 fprintf(stderr, "double %.*g\n", DBL_DIG, varg->u.dval);
2535 else /* internal error, unexpected type */
2536 Assert(0);
2538 *retval = *varg;
2540 return true;
2543 /* 1 double argument */
2544 case PGBENCH_DOUBLE:
2545 case PGBENCH_SQRT:
2546 case PGBENCH_LN:
2547 case PGBENCH_EXP:
2549 double dval;
2551 Assert(nargs == 1);
2553 if (!coerceToDouble(&vargs[0], &dval))
2554 return false;
2556 if (func == PGBENCH_SQRT)
2557 dval = sqrt(dval);
2558 else if (func == PGBENCH_LN)
2559 dval = log(dval);
2560 else if (func == PGBENCH_EXP)
2561 dval = exp(dval);
2562 /* else is cast: do nothing */
2564 setDoubleValue(retval, dval);
2565 return true;
2568 /* 1 int argument */
2569 case PGBENCH_INT:
2571 int64 ival;
2573 Assert(nargs == 1);
2575 if (!coerceToInt(&vargs[0], &ival))
2576 return false;
2578 setIntValue(retval, ival);
2579 return true;
2582 /* variable number of arguments */
2583 case PGBENCH_LEAST:
2584 case PGBENCH_GREATEST:
2586 bool havedouble;
2587 int i;
2589 Assert(nargs >= 1);
2591 /* need double result if any input is double */
2592 havedouble = false;
2593 for (i = 0; i < nargs; i++)
2595 if (vargs[i].type == PGBT_DOUBLE)
2597 havedouble = true;
2598 break;
2601 if (havedouble)
2603 double extremum;
2605 if (!coerceToDouble(&vargs[0], &extremum))
2606 return false;
2607 for (i = 1; i < nargs; i++)
2609 double dval;
2611 if (!coerceToDouble(&vargs[i], &dval))
2612 return false;
2613 if (func == PGBENCH_LEAST)
2614 extremum = Min(extremum, dval);
2615 else
2616 extremum = Max(extremum, dval);
2618 setDoubleValue(retval, extremum);
2620 else
2622 int64 extremum;
2624 if (!coerceToInt(&vargs[0], &extremum))
2625 return false;
2626 for (i = 1; i < nargs; i++)
2628 int64 ival;
2630 if (!coerceToInt(&vargs[i], &ival))
2631 return false;
2632 if (func == PGBENCH_LEAST)
2633 extremum = Min(extremum, ival);
2634 else
2635 extremum = Max(extremum, ival);
2637 setIntValue(retval, extremum);
2639 return true;
2642 /* random functions */
2643 case PGBENCH_RANDOM:
2644 case PGBENCH_RANDOM_EXPONENTIAL:
2645 case PGBENCH_RANDOM_GAUSSIAN:
2646 case PGBENCH_RANDOM_ZIPFIAN:
2648 int64 imin,
2649 imax,
2650 delta;
2652 Assert(nargs >= 2);
2654 if (!coerceToInt(&vargs[0], &imin) ||
2655 !coerceToInt(&vargs[1], &imax))
2656 return false;
2658 /* check random range */
2659 if (unlikely(imin > imax))
2661 pg_log_error("empty range given to random");
2662 return false;
2664 else if (unlikely(pg_sub_s64_overflow(imax, imin, &delta) ||
2665 pg_add_s64_overflow(delta, 1, &delta)))
2667 /* prevent int overflows in random functions */
2668 pg_log_error("random range is too large");
2669 return false;
2672 if (func == PGBENCH_RANDOM)
2674 Assert(nargs == 2);
2675 setIntValue(retval, getrand(&st->cs_func_rs, imin, imax));
2677 else /* gaussian & exponential */
2679 double param;
2681 Assert(nargs == 3);
2683 if (!coerceToDouble(&vargs[2], &param))
2684 return false;
2686 if (func == PGBENCH_RANDOM_GAUSSIAN)
2688 if (param < MIN_GAUSSIAN_PARAM)
2690 pg_log_error("gaussian parameter must be at least %f (not %f)",
2691 MIN_GAUSSIAN_PARAM, param);
2692 return false;
2695 setIntValue(retval,
2696 getGaussianRand(&st->cs_func_rs,
2697 imin, imax, param));
2699 else if (func == PGBENCH_RANDOM_ZIPFIAN)
2701 if (param < MIN_ZIPFIAN_PARAM || param > MAX_ZIPFIAN_PARAM)
2703 pg_log_error("zipfian parameter must be in range [%.3f, %.0f] (not %f)",
2704 MIN_ZIPFIAN_PARAM, MAX_ZIPFIAN_PARAM, param);
2705 return false;
2708 setIntValue(retval,
2709 getZipfianRand(&st->cs_func_rs, imin, imax, param));
2711 else /* exponential */
2713 if (param <= 0.0)
2715 pg_log_error("exponential parameter must be greater than zero (not %f)",
2716 param);
2717 return false;
2720 setIntValue(retval,
2721 getExponentialRand(&st->cs_func_rs,
2722 imin, imax, param));
2726 return true;
2729 case PGBENCH_POW:
2731 PgBenchValue *lval = &vargs[0];
2732 PgBenchValue *rval = &vargs[1];
2733 double ld,
2736 Assert(nargs == 2);
2738 if (!coerceToDouble(lval, &ld) ||
2739 !coerceToDouble(rval, &rd))
2740 return false;
2742 setDoubleValue(retval, pow(ld, rd));
2744 return true;
2747 case PGBENCH_IS:
2749 Assert(nargs == 2);
2752 * note: this simple implementation is more permissive than
2753 * SQL
2755 setBoolValue(retval,
2756 vargs[0].type == vargs[1].type &&
2757 vargs[0].u.bval == vargs[1].u.bval);
2758 return true;
2761 /* hashing */
2762 case PGBENCH_HASH_FNV1A:
2763 case PGBENCH_HASH_MURMUR2:
2765 int64 val,
2766 seed;
2768 Assert(nargs == 2);
2770 if (!coerceToInt(&vargs[0], &val) ||
2771 !coerceToInt(&vargs[1], &seed))
2772 return false;
2774 if (func == PGBENCH_HASH_MURMUR2)
2775 setIntValue(retval, getHashMurmur2(val, seed));
2776 else if (func == PGBENCH_HASH_FNV1A)
2777 setIntValue(retval, getHashFnv1a(val, seed));
2778 else
2779 /* cannot get here */
2780 Assert(0);
2782 return true;
2785 case PGBENCH_PERMUTE:
2787 int64 val,
2788 size,
2789 seed;
2791 Assert(nargs == 3);
2793 if (!coerceToInt(&vargs[0], &val) ||
2794 !coerceToInt(&vargs[1], &size) ||
2795 !coerceToInt(&vargs[2], &seed))
2796 return false;
2798 if (size <= 0)
2800 pg_log_error("permute size parameter must be greater than zero");
2801 return false;
2804 setIntValue(retval, permute(val, size, seed));
2805 return true;
2808 default:
2809 /* cannot get here */
2810 Assert(0);
2811 /* dead code to avoid a compiler warning */
2812 return false;
2816 /* evaluate some function */
2817 static bool
2818 evalFunc(CState *st,
2819 PgBenchFunction func, PgBenchExprLink *args, PgBenchValue *retval)
2821 if (isLazyFunc(func))
2822 return evalLazyFunc(st, func, args, retval);
2823 else
2824 return evalStandardFunc(st, func, args, retval);
2828 * Recursive evaluation of an expression in a pgbench script
2829 * using the current state of variables.
2830 * Returns whether the evaluation was ok,
2831 * the value itself is returned through the retval pointer.
2833 static bool
2834 evaluateExpr(CState *st, PgBenchExpr *expr, PgBenchValue *retval)
2836 switch (expr->etype)
2838 case ENODE_CONSTANT:
2840 *retval = expr->u.constant;
2841 return true;
2844 case ENODE_VARIABLE:
2846 Variable *var;
2848 if ((var = lookupVariable(&st->variables, expr->u.variable.varname)) == NULL)
2850 pg_log_error("undefined variable \"%s\"", expr->u.variable.varname);
2851 return false;
2854 if (!makeVariableValue(var))
2855 return false;
2857 *retval = var->value;
2858 return true;
2861 case ENODE_FUNCTION:
2862 return evalFunc(st,
2863 expr->u.function.function,
2864 expr->u.function.args,
2865 retval);
2867 default:
2868 /* internal error which should never occur */
2869 pg_fatal("unexpected enode type in evaluation: %d", expr->etype);
2874 * Convert command name to meta-command enum identifier
2876 static MetaCommand
2877 getMetaCommand(const char *cmd)
2879 MetaCommand mc;
2881 if (cmd == NULL)
2882 mc = META_NONE;
2883 else if (pg_strcasecmp(cmd, "set") == 0)
2884 mc = META_SET;
2885 else if (pg_strcasecmp(cmd, "setshell") == 0)
2886 mc = META_SETSHELL;
2887 else if (pg_strcasecmp(cmd, "shell") == 0)
2888 mc = META_SHELL;
2889 else if (pg_strcasecmp(cmd, "sleep") == 0)
2890 mc = META_SLEEP;
2891 else if (pg_strcasecmp(cmd, "if") == 0)
2892 mc = META_IF;
2893 else if (pg_strcasecmp(cmd, "elif") == 0)
2894 mc = META_ELIF;
2895 else if (pg_strcasecmp(cmd, "else") == 0)
2896 mc = META_ELSE;
2897 else if (pg_strcasecmp(cmd, "endif") == 0)
2898 mc = META_ENDIF;
2899 else if (pg_strcasecmp(cmd, "gset") == 0)
2900 mc = META_GSET;
2901 else if (pg_strcasecmp(cmd, "aset") == 0)
2902 mc = META_ASET;
2903 else if (pg_strcasecmp(cmd, "startpipeline") == 0)
2904 mc = META_STARTPIPELINE;
2905 else if (pg_strcasecmp(cmd, "endpipeline") == 0)
2906 mc = META_ENDPIPELINE;
2907 else
2908 mc = META_NONE;
2909 return mc;
2913 * Run a shell command. The result is assigned to the variable if not NULL.
2914 * Return true if succeeded, or false on error.
2916 static bool
2917 runShellCommand(Variables *variables, char *variable, char **argv, int argc)
2919 char command[SHELL_COMMAND_SIZE];
2920 int i,
2921 len = 0;
2922 FILE *fp;
2923 char res[64];
2924 char *endptr;
2925 int retval;
2927 /*----------
2928 * Join arguments with whitespace separators. Arguments starting with
2929 * exactly one colon are treated as variables:
2930 * name - append a string "name"
2931 * :var - append a variable named 'var'
2932 * ::name - append a string ":name"
2933 *----------
2935 for (i = 0; i < argc; i++)
2937 char *arg;
2938 int arglen;
2940 if (argv[i][0] != ':')
2942 arg = argv[i]; /* a string literal */
2944 else if (argv[i][1] == ':')
2946 arg = argv[i] + 1; /* a string literal starting with colons */
2948 else if ((arg = getVariable(variables, argv[i] + 1)) == NULL)
2950 pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[i]);
2951 return false;
2954 arglen = strlen(arg);
2955 if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1)
2957 pg_log_error("%s: shell command is too long", argv[0]);
2958 return false;
2961 if (i > 0)
2962 command[len++] = ' ';
2963 memcpy(command + len, arg, arglen);
2964 len += arglen;
2967 command[len] = '\0';
2969 fflush(NULL); /* needed before either system() or popen() */
2971 /* Fast path for non-assignment case */
2972 if (variable == NULL)
2974 if (system(command))
2976 if (!timer_exceeded)
2977 pg_log_error("%s: could not launch shell command", argv[0]);
2978 return false;
2980 return true;
2983 /* Execute the command with pipe and read the standard output. */
2984 if ((fp = popen(command, "r")) == NULL)
2986 pg_log_error("%s: could not launch shell command", argv[0]);
2987 return false;
2989 if (fgets(res, sizeof(res), fp) == NULL)
2991 if (!timer_exceeded)
2992 pg_log_error("%s: could not read result of shell command", argv[0]);
2993 (void) pclose(fp);
2994 return false;
2996 if (pclose(fp) < 0)
2998 pg_log_error("%s: could not run shell command: %m", argv[0]);
2999 return false;
3002 /* Check whether the result is an integer and assign it to the variable */
3003 retval = (int) strtol(res, &endptr, 10);
3004 while (*endptr != '\0' && isspace((unsigned char) *endptr))
3005 endptr++;
3006 if (*res == '\0' || *endptr != '\0')
3008 pg_log_error("%s: shell command must return an integer (not \"%s\")", argv[0], res);
3009 return false;
3011 if (!putVariableInt(variables, "setshell", variable, retval))
3012 return false;
3014 pg_log_debug("%s: shell parameter name: \"%s\", value: \"%s\"", argv[0], argv[1], res);
3016 return true;
3020 * Report the abortion of the client when processing SQL commands.
3022 static void
3023 commandFailed(CState *st, const char *cmd, const char *message)
3025 pg_log_error("client %d aborted in command %d (%s) of script %d; %s",
3026 st->id, st->command, cmd, st->use_file, message);
3030 * Report the error in the command while the script is executing.
3032 static void
3033 commandError(CState *st, const char *message)
3035 Assert(sql_script[st->use_file].commands[st->command]->type == SQL_COMMAND);
3036 pg_log_info("client %d got an error in command %d (SQL) of script %d; %s",
3037 st->id, st->command, st->use_file, message);
3040 /* return a script number with a weighted choice. */
3041 static int
3042 chooseScript(TState *thread)
3044 int i = 0;
3045 int64 w;
3047 if (num_scripts == 1)
3048 return 0;
3050 w = getrand(&thread->ts_choose_rs, 0, total_weight - 1);
3053 w -= sql_script[i++].weight;
3054 } while (w >= 0);
3056 return i - 1;
3060 * Allocate space for CState->prepared: we need one boolean for each command
3061 * of each script.
3063 static void
3064 allocCStatePrepared(CState *st)
3066 Assert(st->prepared == NULL);
3068 st->prepared = pg_malloc(sizeof(bool *) * num_scripts);
3069 for (int i = 0; i < num_scripts; i++)
3071 ParsedScript *script = &sql_script[i];
3072 int numcmds;
3074 for (numcmds = 0; script->commands[numcmds] != NULL; numcmds++)
3076 st->prepared[i] = pg_malloc0(sizeof(bool) * numcmds);
3081 * Prepare the SQL command from st->use_file at command_num.
3083 static void
3084 prepareCommand(CState *st, int command_num)
3086 Command *command = sql_script[st->use_file].commands[command_num];
3088 /* No prepare for non-SQL commands */
3089 if (command->type != SQL_COMMAND)
3090 return;
3092 if (!st->prepared)
3093 allocCStatePrepared(st);
3095 if (!st->prepared[st->use_file][command_num])
3097 PGresult *res;
3099 pg_log_debug("client %d preparing %s", st->id, command->prepname);
3100 res = PQprepare(st->con, command->prepname,
3101 command->argv[0], command->argc - 1, NULL);
3102 if (PQresultStatus(res) != PGRES_COMMAND_OK)
3103 pg_log_error("%s", PQerrorMessage(st->con));
3104 PQclear(res);
3105 st->prepared[st->use_file][command_num] = true;
3110 * Prepare all the commands in the script that come after the \startpipeline
3111 * that's at position st->command, and the first \endpipeline we find.
3113 * This sets the ->prepared flag for each relevant command as well as the
3114 * \startpipeline itself, but doesn't move the st->command counter.
3116 static void
3117 prepareCommandsInPipeline(CState *st)
3119 int j;
3120 Command **commands = sql_script[st->use_file].commands;
3122 Assert(commands[st->command]->type == META_COMMAND &&
3123 commands[st->command]->meta == META_STARTPIPELINE);
3125 if (!st->prepared)
3126 allocCStatePrepared(st);
3129 * We set the 'prepared' flag on the \startpipeline itself to flag that we
3130 * don't need to do this next time without calling prepareCommand(), even
3131 * though we don't actually prepare this command.
3133 if (st->prepared[st->use_file][st->command])
3134 return;
3136 for (j = st->command + 1; commands[j] != NULL; j++)
3138 if (commands[j]->type == META_COMMAND &&
3139 commands[j]->meta == META_ENDPIPELINE)
3140 break;
3142 prepareCommand(st, j);
3145 st->prepared[st->use_file][st->command] = true;
3148 /* Send a SQL command, using the chosen querymode */
3149 static bool
3150 sendCommand(CState *st, Command *command)
3152 int r;
3154 if (querymode == QUERY_SIMPLE)
3156 char *sql;
3158 sql = pg_strdup(command->argv[0]);
3159 sql = assignVariables(&st->variables, sql);
3161 pg_log_debug("client %d sending %s", st->id, sql);
3162 r = PQsendQuery(st->con, sql);
3163 free(sql);
3165 else if (querymode == QUERY_EXTENDED)
3167 const char *sql = command->argv[0];
3168 const char *params[MAX_ARGS];
3170 getQueryParams(&st->variables, command, params);
3172 pg_log_debug("client %d sending %s", st->id, sql);
3173 r = PQsendQueryParams(st->con, sql, command->argc - 1,
3174 NULL, params, NULL, NULL, 0);
3176 else if (querymode == QUERY_PREPARED)
3178 const char *params[MAX_ARGS];
3180 prepareCommand(st, st->command);
3181 getQueryParams(&st->variables, command, params);
3183 pg_log_debug("client %d sending %s", st->id, command->prepname);
3184 r = PQsendQueryPrepared(st->con, command->prepname, command->argc - 1,
3185 params, NULL, NULL, 0);
3187 else /* unknown sql mode */
3188 r = 0;
3190 if (r == 0)
3192 pg_log_debug("client %d could not send %s", st->id, command->argv[0]);
3193 return false;
3195 else
3196 return true;
3200 * Get the error status from the error code.
3202 static EStatus
3203 getSQLErrorStatus(const char *sqlState)
3205 if (sqlState != NULL)
3207 if (strcmp(sqlState, ERRCODE_T_R_SERIALIZATION_FAILURE) == 0)
3208 return ESTATUS_SERIALIZATION_ERROR;
3209 else if (strcmp(sqlState, ERRCODE_T_R_DEADLOCK_DETECTED) == 0)
3210 return ESTATUS_DEADLOCK_ERROR;
3213 return ESTATUS_OTHER_SQL_ERROR;
3217 * Returns true if this type of error can be retried.
3219 static bool
3220 canRetryError(EStatus estatus)
3222 return (estatus == ESTATUS_SERIALIZATION_ERROR ||
3223 estatus == ESTATUS_DEADLOCK_ERROR);
3227 * Process query response from the backend.
3229 * If varprefix is not NULL, it's the variable name prefix where to store
3230 * the results of the *last* command (META_GSET) or *all* commands
3231 * (META_ASET).
3233 * Returns true if everything is A-OK, false if any error occurs.
3235 static bool
3236 readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
3238 PGresult *res;
3239 PGresult *next_res;
3240 int qrynum = 0;
3243 * varprefix should be set only with \gset or \aset, and \endpipeline and
3244 * SQL commands do not need it.
3246 Assert((meta == META_NONE && varprefix == NULL) ||
3247 ((meta == META_ENDPIPELINE) && varprefix == NULL) ||
3248 ((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
3250 res = PQgetResult(st->con);
3252 while (res != NULL)
3254 bool is_last;
3256 /* peek at the next result to know whether the current is last */
3257 next_res = PQgetResult(st->con);
3258 is_last = (next_res == NULL);
3260 switch (PQresultStatus(res))
3262 case PGRES_COMMAND_OK: /* non-SELECT commands */
3263 case PGRES_EMPTY_QUERY: /* may be used for testing no-op overhead */
3264 if (is_last && meta == META_GSET)
3266 pg_log_error("client %d script %d command %d query %d: expected one row, got %d",
3267 st->id, st->use_file, st->command, qrynum, 0);
3268 st->estatus = ESTATUS_META_COMMAND_ERROR;
3269 goto error;
3271 break;
3273 case PGRES_TUPLES_OK:
3274 if ((is_last && meta == META_GSET) || meta == META_ASET)
3276 int ntuples = PQntuples(res);
3278 if (meta == META_GSET && ntuples != 1)
3280 /* under \gset, report the error */
3281 pg_log_error("client %d script %d command %d query %d: expected one row, got %d",
3282 st->id, st->use_file, st->command, qrynum, PQntuples(res));
3283 st->estatus = ESTATUS_META_COMMAND_ERROR;
3284 goto error;
3286 else if (meta == META_ASET && ntuples <= 0)
3288 /* coldly skip empty result under \aset */
3289 break;
3292 /* store results into variables */
3293 for (int fld = 0; fld < PQnfields(res); fld++)
3295 char *varname = PQfname(res, fld);
3297 /* allocate varname only if necessary, freed below */
3298 if (*varprefix != '\0')
3299 varname = psprintf("%s%s", varprefix, varname);
3301 /* store last row result as a string */
3302 if (!putVariable(&st->variables, meta == META_ASET ? "aset" : "gset", varname,
3303 PQgetvalue(res, ntuples - 1, fld)))
3305 /* internal error */
3306 pg_log_error("client %d script %d command %d query %d: error storing into variable %s",
3307 st->id, st->use_file, st->command, qrynum, varname);
3308 st->estatus = ESTATUS_META_COMMAND_ERROR;
3309 goto error;
3312 if (*varprefix != '\0')
3313 pg_free(varname);
3316 /* otherwise the result is simply thrown away by PQclear below */
3317 break;
3319 case PGRES_PIPELINE_SYNC:
3320 pg_log_debug("client %d pipeline ending", st->id);
3321 if (PQexitPipelineMode(st->con) != 1)
3322 pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
3323 PQerrorMessage(st->con));
3324 break;
3326 case PGRES_NONFATAL_ERROR:
3327 case PGRES_FATAL_ERROR:
3328 st->estatus = getSQLErrorStatus(PQresultErrorField(res,
3329 PG_DIAG_SQLSTATE));
3330 if (canRetryError(st->estatus))
3332 if (verbose_errors)
3333 commandError(st, PQerrorMessage(st->con));
3334 goto error;
3336 /* fall through */
3338 default:
3339 /* anything else is unexpected */
3340 pg_log_error("client %d script %d aborted in command %d query %d: %s",
3341 st->id, st->use_file, st->command, qrynum,
3342 PQerrorMessage(st->con));
3343 goto error;
3346 PQclear(res);
3347 qrynum++;
3348 res = next_res;
3351 if (qrynum == 0)
3353 pg_log_error("client %d command %d: no results", st->id, st->command);
3354 return false;
3357 return true;
3359 error:
3360 PQclear(res);
3361 PQclear(next_res);
3364 res = PQgetResult(st->con);
3365 PQclear(res);
3366 } while (res);
3368 return false;
3372 * Parse the argument to a \sleep command, and return the requested amount
3373 * of delay, in microseconds. Returns true on success, false on error.
3375 static bool
3376 evaluateSleep(Variables *variables, int argc, char **argv, int *usecs)
3378 char *var;
3379 int usec;
3381 if (*argv[1] == ':')
3383 if ((var = getVariable(variables, argv[1] + 1)) == NULL)
3385 pg_log_error("%s: undefined variable \"%s\"", argv[0], argv[1] + 1);
3386 return false;
3389 usec = atoi(var);
3391 /* Raise an error if the value of a variable is not a number */
3392 if (usec == 0 && !isdigit((unsigned char) *var))
3394 pg_log_error("%s: invalid sleep time \"%s\" for variable \"%s\"",
3395 argv[0], var, argv[1] + 1);
3396 return false;
3399 else
3400 usec = atoi(argv[1]);
3402 if (argc > 2)
3404 if (pg_strcasecmp(argv[2], "ms") == 0)
3405 usec *= 1000;
3406 else if (pg_strcasecmp(argv[2], "s") == 0)
3407 usec *= 1000000;
3409 else
3410 usec *= 1000000;
3412 *usecs = usec;
3413 return true;
3418 * Returns true if the error can be retried.
3420 static bool
3421 doRetry(CState *st, pg_time_usec_t *now)
3423 Assert(st->estatus != ESTATUS_NO_ERROR);
3425 /* We can only retry serialization or deadlock errors. */
3426 if (!canRetryError(st->estatus))
3427 return false;
3430 * We must have at least one option to limit the retrying of transactions
3431 * that got an error.
3433 Assert(max_tries || latency_limit || duration > 0);
3436 * We cannot retry the error if we have reached the maximum number of
3437 * tries.
3439 if (max_tries && st->tries >= max_tries)
3440 return false;
3443 * We cannot retry the error if we spent too much time on this
3444 * transaction.
3446 if (latency_limit)
3448 pg_time_now_lazy(now);
3449 if (*now - st->txn_scheduled > latency_limit)
3450 return false;
3454 * We cannot retry the error if the benchmark duration is over.
3456 if (timer_exceeded)
3457 return false;
3459 /* OK */
3460 return true;
3464 * Read results and discard it until a sync point.
3466 static int
3467 discardUntilSync(CState *st)
3469 /* send a sync */
3470 if (!PQpipelineSync(st->con))
3472 pg_log_error("client %d aborted: failed to send a pipeline sync",
3473 st->id);
3474 return 0;
3477 /* receive PGRES_PIPELINE_SYNC and null following it */
3478 for (;;)
3480 PGresult *res = PQgetResult(st->con);
3482 if (PQresultStatus(res) == PGRES_PIPELINE_SYNC)
3484 PQclear(res);
3485 res = PQgetResult(st->con);
3486 Assert(res == NULL);
3487 break;
3489 PQclear(res);
3492 /* exit pipeline */
3493 if (PQexitPipelineMode(st->con) != 1)
3495 pg_log_error("client %d aborted: failed to exit pipeline mode for rolling back the failed transaction",
3496 st->id);
3497 return 0;
3499 return 1;
3503 * Get the transaction status at the end of a command especially for
3504 * checking if we are in a (failed) transaction block.
3506 static TStatus
3507 getTransactionStatus(PGconn *con)
3509 PGTransactionStatusType tx_status;
3511 tx_status = PQtransactionStatus(con);
3512 switch (tx_status)
3514 case PQTRANS_IDLE:
3515 return TSTATUS_IDLE;
3516 case PQTRANS_INTRANS:
3517 case PQTRANS_INERROR:
3518 return TSTATUS_IN_BLOCK;
3519 case PQTRANS_UNKNOWN:
3520 /* PQTRANS_UNKNOWN is expected given a broken connection */
3521 if (PQstatus(con) == CONNECTION_BAD)
3522 return TSTATUS_CONN_ERROR;
3523 /* fall through */
3524 case PQTRANS_ACTIVE:
3525 default:
3528 * We cannot find out whether we are in a transaction block or
3529 * not. Internal error which should never occur.
3531 pg_log_error("unexpected transaction status %d", tx_status);
3532 return TSTATUS_OTHER_ERROR;
3535 /* not reached */
3536 Assert(false);
3537 return TSTATUS_OTHER_ERROR;
3541 * Print verbose messages of an error
3543 static void
3544 printVerboseErrorMessages(CState *st, pg_time_usec_t *now, bool is_retry)
3546 static PQExpBuffer buf = NULL;
3548 if (buf == NULL)
3549 buf = createPQExpBuffer();
3550 else
3551 resetPQExpBuffer(buf);
3553 printfPQExpBuffer(buf, "client %d ", st->id);
3554 appendPQExpBufferStr(buf, (is_retry ?
3555 "repeats the transaction after the error" :
3556 "ends the failed transaction"));
3557 appendPQExpBuffer(buf, " (try %u", st->tries);
3559 /* Print max_tries if it is not unlimited. */
3560 if (max_tries)
3561 appendPQExpBuffer(buf, "/%u", max_tries);
3564 * If the latency limit is used, print a percentage of the current
3565 * transaction latency from the latency limit.
3567 if (latency_limit)
3569 pg_time_now_lazy(now);
3570 appendPQExpBuffer(buf, ", %.3f%% of the maximum time of tries was used",
3571 (100.0 * (*now - st->txn_scheduled) / latency_limit));
3573 appendPQExpBufferStr(buf, ")\n");
3575 pg_log_info("%s", buf->data);
3579 * Advance the state machine of a connection.
3581 static void
3582 advanceConnectionState(TState *thread, CState *st, StatsData *agg)
3586 * gettimeofday() isn't free, so we get the current timestamp lazily the
3587 * first time it's needed, and reuse the same value throughout this
3588 * function after that. This also ensures that e.g. the calculated
3589 * latency reported in the log file and in the totals are the same. Zero
3590 * means "not set yet". Reset "now" when we execute shell commands or
3591 * expressions, which might take a non-negligible amount of time, though.
3593 pg_time_usec_t now = 0;
3596 * Loop in the state machine, until we have to wait for a result from the
3597 * server or have to sleep for throttling or \sleep.
3599 * Note: In the switch-statement below, 'break' will loop back here,
3600 * meaning "continue in the state machine". Return is used to return to
3601 * the caller, giving the thread the opportunity to advance another
3602 * client.
3604 for (;;)
3606 Command *command;
3608 switch (st->state)
3610 /* Select transaction (script) to run. */
3611 case CSTATE_CHOOSE_SCRIPT:
3612 st->use_file = chooseScript(thread);
3613 Assert(conditional_stack_empty(st->cstack));
3615 /* reset transaction variables to default values */
3616 st->estatus = ESTATUS_NO_ERROR;
3617 st->tries = 1;
3619 pg_log_debug("client %d executing script \"%s\"",
3620 st->id, sql_script[st->use_file].desc);
3623 * If time is over, we're done; otherwise, get ready to start
3624 * a new transaction, or to get throttled if that's requested.
3626 st->state = timer_exceeded ? CSTATE_FINISHED :
3627 throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE : CSTATE_START_TX;
3628 break;
3630 /* Start new transaction (script) */
3631 case CSTATE_START_TX:
3632 pg_time_now_lazy(&now);
3634 /* establish connection if needed, i.e. under --connect */
3635 if (st->con == NULL)
3637 pg_time_usec_t start = now;
3639 if ((st->con = doConnect()) == NULL)
3642 * as the bench is already running, we do not abort
3643 * the process
3645 pg_log_error("client %d aborted while establishing connection", st->id);
3646 st->state = CSTATE_ABORTED;
3647 break;
3650 /* reset now after connection */
3651 now = pg_time_now();
3653 thread->conn_duration += now - start;
3655 /* Reset session-local state */
3656 pg_free(st->prepared);
3657 st->prepared = NULL;
3661 * It is the first try to run this transaction. Remember the
3662 * random state: maybe it will get an error and we will need
3663 * to run it again.
3665 st->random_state = st->cs_func_rs;
3667 /* record transaction start time */
3668 st->txn_begin = now;
3671 * When not throttling, this is also the transaction's
3672 * scheduled start time.
3674 if (!throttle_delay)
3675 st->txn_scheduled = now;
3677 /* Begin with the first command */
3678 st->state = CSTATE_START_COMMAND;
3679 st->command = 0;
3680 break;
3683 * Handle throttling once per transaction by sleeping.
3685 case CSTATE_PREPARE_THROTTLE:
3688 * Generate a delay such that the series of delays will
3689 * approximate a Poisson distribution centered on the
3690 * throttle_delay time.
3692 * If transactions are too slow or a given wait is shorter
3693 * than a transaction, the next transaction will start right
3694 * away.
3696 Assert(throttle_delay > 0);
3698 thread->throttle_trigger +=
3699 getPoissonRand(&thread->ts_throttle_rs, throttle_delay);
3700 st->txn_scheduled = thread->throttle_trigger;
3703 * If --latency-limit is used, and this slot is already late
3704 * so that the transaction will miss the latency limit even if
3705 * it completed immediately, skip this time slot and loop to
3706 * reschedule.
3708 if (latency_limit)
3710 pg_time_now_lazy(&now);
3712 if (thread->throttle_trigger < now - latency_limit)
3714 processXactStats(thread, st, &now, true, agg);
3717 * Finish client if -T or -t was exceeded.
3719 * Stop counting skipped transactions under -T as soon
3720 * as the timer is exceeded. Because otherwise it can
3721 * take a very long time to count all of them
3722 * especially when quite a lot of them happen with
3723 * unrealistically high rate setting in -R, which
3724 * would prevent pgbench from ending immediately.
3725 * Because of this behavior, note that there is no
3726 * guarantee that all skipped transactions are counted
3727 * under -T though there is under -t. This is OK in
3728 * practice because it's very unlikely to happen with
3729 * realistic setting.
3731 if (timer_exceeded || (nxacts > 0 && st->cnt >= nxacts))
3732 st->state = CSTATE_FINISHED;
3734 /* Go back to top of loop with CSTATE_PREPARE_THROTTLE */
3735 break;
3740 * stop client if next transaction is beyond pgbench end of
3741 * execution; otherwise, throttle it.
3743 st->state = end_time > 0 && st->txn_scheduled > end_time ?
3744 CSTATE_FINISHED : CSTATE_THROTTLE;
3745 break;
3748 * Wait until it's time to start next transaction.
3750 case CSTATE_THROTTLE:
3751 pg_time_now_lazy(&now);
3753 if (now < st->txn_scheduled)
3754 return; /* still sleeping, nothing to do here */
3756 /* done sleeping, but don't start transaction if we're done */
3757 st->state = timer_exceeded ? CSTATE_FINISHED : CSTATE_START_TX;
3758 break;
3761 * Send a command to server (or execute a meta-command)
3763 case CSTATE_START_COMMAND:
3764 command = sql_script[st->use_file].commands[st->command];
3767 * Transition to script end processing if done, but close up
3768 * shop if a pipeline is open at this point.
3770 if (command == NULL)
3772 if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
3773 st->state = CSTATE_END_TX;
3774 else
3776 pg_log_error("client %d aborted: end of script reached with pipeline open",
3777 st->id);
3778 st->state = CSTATE_ABORTED;
3781 break;
3784 /* record begin time of next command, and initiate it */
3785 if (report_per_command)
3787 pg_time_now_lazy(&now);
3788 st->stmt_begin = now;
3791 /* Execute the command */
3792 if (command->type == SQL_COMMAND)
3794 /* disallow \aset and \gset in pipeline mode */
3795 if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
3797 if (command->meta == META_GSET)
3799 commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
3800 st->state = CSTATE_ABORTED;
3801 break;
3803 else if (command->meta == META_ASET)
3805 commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
3806 st->state = CSTATE_ABORTED;
3807 break;
3811 if (!sendCommand(st, command))
3813 commandFailed(st, "SQL", "SQL command send failed");
3814 st->state = CSTATE_ABORTED;
3816 else
3818 /* Wait for results, unless in pipeline mode */
3819 if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
3820 st->state = CSTATE_WAIT_RESULT;
3821 else
3822 st->state = CSTATE_END_COMMAND;
3825 else if (command->type == META_COMMAND)
3827 /*-----
3828 * Possible state changes when executing meta commands:
3829 * - on errors CSTATE_ABORTED
3830 * - on sleep CSTATE_SLEEP
3831 * - else CSTATE_END_COMMAND
3833 st->state = executeMetaCommand(st, &now);
3834 if (st->state == CSTATE_ABORTED)
3835 st->estatus = ESTATUS_META_COMMAND_ERROR;
3839 * We're now waiting for an SQL command to complete, or
3840 * finished processing a metacommand, or need to sleep, or
3841 * something bad happened.
3843 Assert(st->state == CSTATE_WAIT_RESULT ||
3844 st->state == CSTATE_END_COMMAND ||
3845 st->state == CSTATE_SLEEP ||
3846 st->state == CSTATE_ABORTED);
3847 break;
3850 * non executed conditional branch
3852 case CSTATE_SKIP_COMMAND:
3853 Assert(!conditional_active(st->cstack));
3854 /* quickly skip commands until something to do... */
3855 while (true)
3857 command = sql_script[st->use_file].commands[st->command];
3859 /* cannot reach end of script in that state */
3860 Assert(command != NULL);
3863 * if this is conditional related, update conditional
3864 * state
3866 if (command->type == META_COMMAND &&
3867 (command->meta == META_IF ||
3868 command->meta == META_ELIF ||
3869 command->meta == META_ELSE ||
3870 command->meta == META_ENDIF))
3872 switch (conditional_stack_peek(st->cstack))
3874 case IFSTATE_FALSE:
3875 if (command->meta == META_IF ||
3876 command->meta == META_ELIF)
3878 /* we must evaluate the condition */
3879 st->state = CSTATE_START_COMMAND;
3881 else if (command->meta == META_ELSE)
3883 /* we must execute next command */
3884 conditional_stack_poke(st->cstack,
3885 IFSTATE_ELSE_TRUE);
3886 st->state = CSTATE_START_COMMAND;
3887 st->command++;
3889 else if (command->meta == META_ENDIF)
3891 Assert(!conditional_stack_empty(st->cstack));
3892 conditional_stack_pop(st->cstack);
3893 if (conditional_active(st->cstack))
3894 st->state = CSTATE_START_COMMAND;
3897 * else state remains in
3898 * CSTATE_SKIP_COMMAND
3900 st->command++;
3902 break;
3904 case IFSTATE_IGNORED:
3905 case IFSTATE_ELSE_FALSE:
3906 if (command->meta == META_IF)
3907 conditional_stack_push(st->cstack,
3908 IFSTATE_IGNORED);
3909 else if (command->meta == META_ENDIF)
3911 Assert(!conditional_stack_empty(st->cstack));
3912 conditional_stack_pop(st->cstack);
3913 if (conditional_active(st->cstack))
3914 st->state = CSTATE_START_COMMAND;
3916 /* could detect "else" & "elif" after "else" */
3917 st->command++;
3918 break;
3920 case IFSTATE_NONE:
3921 case IFSTATE_TRUE:
3922 case IFSTATE_ELSE_TRUE:
3923 default:
3926 * inconsistent if inactive, unreachable dead
3927 * code
3929 Assert(false);
3932 else
3934 /* skip and consider next */
3935 st->command++;
3938 if (st->state != CSTATE_SKIP_COMMAND)
3939 /* out of quick skip command loop */
3940 break;
3942 break;
3945 * Wait for the current SQL command to complete
3947 case CSTATE_WAIT_RESULT:
3948 pg_log_debug("client %d receiving", st->id);
3951 * Only check for new network data if we processed all data
3952 * fetched prior. Otherwise we end up doing a syscall for each
3953 * individual pipelined query, which has a measurable
3954 * performance impact.
3956 if (PQisBusy(st->con) && !PQconsumeInput(st->con))
3958 /* there's something wrong */
3959 commandFailed(st, "SQL", "perhaps the backend died while processing");
3960 st->state = CSTATE_ABORTED;
3961 break;
3963 if (PQisBusy(st->con))
3964 return; /* don't have the whole result yet */
3966 /* store or discard the query results */
3967 if (readCommandResponse(st,
3968 sql_script[st->use_file].commands[st->command]->meta,
3969 sql_script[st->use_file].commands[st->command]->varprefix))
3972 * outside of pipeline mode: stop reading results.
3973 * pipeline mode: continue reading results until an
3974 * end-of-pipeline response.
3976 if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
3977 st->state = CSTATE_END_COMMAND;
3979 else if (canRetryError(st->estatus))
3980 st->state = CSTATE_ERROR;
3981 else
3982 st->state = CSTATE_ABORTED;
3983 break;
3986 * Wait until sleep is done. This state is entered after a
3987 * \sleep metacommand. The behavior is similar to
3988 * CSTATE_THROTTLE, but proceeds to CSTATE_START_COMMAND
3989 * instead of CSTATE_START_TX.
3991 case CSTATE_SLEEP:
3992 pg_time_now_lazy(&now);
3993 if (now < st->sleep_until)
3994 return; /* still sleeping, nothing to do here */
3995 /* Else done sleeping. */
3996 st->state = CSTATE_END_COMMAND;
3997 break;
4000 * End of command: record stats and proceed to next command.
4002 case CSTATE_END_COMMAND:
4005 * command completed: accumulate per-command execution times
4006 * in thread-local data structure, if per-command latencies
4007 * are requested.
4009 if (report_per_command)
4011 pg_time_now_lazy(&now);
4013 command = sql_script[st->use_file].commands[st->command];
4014 /* XXX could use a mutex here, but we choose not to */
4015 addToSimpleStats(&command->stats,
4016 PG_TIME_GET_DOUBLE(now - st->stmt_begin));
4019 /* Go ahead with next command, to be executed or skipped */
4020 st->command++;
4021 st->state = conditional_active(st->cstack) ?
4022 CSTATE_START_COMMAND : CSTATE_SKIP_COMMAND;
4023 break;
4026 * Clean up after an error.
4028 case CSTATE_ERROR:
4030 TStatus tstatus;
4032 Assert(st->estatus != ESTATUS_NO_ERROR);
4034 /* Clear the conditional stack */
4035 conditional_stack_reset(st->cstack);
4037 /* Read and discard until a sync point in pipeline mode */
4038 if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
4040 if (!discardUntilSync(st))
4042 st->state = CSTATE_ABORTED;
4043 break;
4048 * Check if we have a (failed) transaction block or not,
4049 * and roll it back if any.
4051 tstatus = getTransactionStatus(st->con);
4052 if (tstatus == TSTATUS_IN_BLOCK)
4054 /* Try to rollback a (failed) transaction block. */
4055 if (!PQsendQuery(st->con, "ROLLBACK"))
4057 pg_log_error("client %d aborted: failed to send sql command for rolling back the failed transaction",
4058 st->id);
4059 st->state = CSTATE_ABORTED;
4061 else
4062 st->state = CSTATE_WAIT_ROLLBACK_RESULT;
4064 else if (tstatus == TSTATUS_IDLE)
4067 * If time is over, we're done; otherwise, check if we
4068 * can retry the error.
4070 st->state = timer_exceeded ? CSTATE_FINISHED :
4071 doRetry(st, &now) ? CSTATE_RETRY : CSTATE_FAILURE;
4073 else
4075 if (tstatus == TSTATUS_CONN_ERROR)
4076 pg_log_error("perhaps the backend died while processing");
4078 pg_log_error("client %d aborted while receiving the transaction status", st->id);
4079 st->state = CSTATE_ABORTED;
4081 break;
4085 * Wait for the rollback command to complete
4087 case CSTATE_WAIT_ROLLBACK_RESULT:
4089 PGresult *res;
4091 pg_log_debug("client %d receiving", st->id);
4092 if (!PQconsumeInput(st->con))
4094 pg_log_error("client %d aborted while rolling back the transaction after an error; perhaps the backend died while processing",
4095 st->id);
4096 st->state = CSTATE_ABORTED;
4097 break;
4099 if (PQisBusy(st->con))
4100 return; /* don't have the whole result yet */
4103 * Read and discard the query result;
4105 res = PQgetResult(st->con);
4106 switch (PQresultStatus(res))
4108 case PGRES_COMMAND_OK:
4109 /* OK */
4110 PQclear(res);
4111 /* null must be returned */
4112 res = PQgetResult(st->con);
4113 Assert(res == NULL);
4116 * If time is over, we're done; otherwise, check
4117 * if we can retry the error.
4119 st->state = timer_exceeded ? CSTATE_FINISHED :
4120 doRetry(st, &now) ? CSTATE_RETRY : CSTATE_FAILURE;
4121 break;
4122 default:
4123 pg_log_error("client %d aborted while rolling back the transaction after an error; %s",
4124 st->id, PQerrorMessage(st->con));
4125 PQclear(res);
4126 st->state = CSTATE_ABORTED;
4127 break;
4129 break;
4133 * Retry the transaction after an error.
4135 case CSTATE_RETRY:
4136 command = sql_script[st->use_file].commands[st->command];
4139 * Inform that the transaction will be retried after the
4140 * error.
4142 if (verbose_errors)
4143 printVerboseErrorMessages(st, &now, true);
4145 /* Count tries and retries */
4146 st->tries++;
4147 command->retries++;
4150 * Reset the random state as they were at the beginning of the
4151 * transaction.
4153 st->cs_func_rs = st->random_state;
4155 /* Process the first transaction command. */
4156 st->command = 0;
4157 st->estatus = ESTATUS_NO_ERROR;
4158 st->state = CSTATE_START_COMMAND;
4159 break;
4162 * Record a failed transaction.
4164 case CSTATE_FAILURE:
4165 command = sql_script[st->use_file].commands[st->command];
4167 /* Accumulate the failure. */
4168 command->failures++;
4171 * Inform that the failed transaction will not be retried.
4173 if (verbose_errors)
4174 printVerboseErrorMessages(st, &now, false);
4176 /* End the failed transaction. */
4177 st->state = CSTATE_END_TX;
4178 break;
4181 * End of transaction (end of script, really).
4183 case CSTATE_END_TX:
4185 TStatus tstatus;
4187 /* transaction finished: calculate latency and do log */
4188 processXactStats(thread, st, &now, false, agg);
4191 * missing \endif... cannot happen if CheckConditional was
4192 * okay
4194 Assert(conditional_stack_empty(st->cstack));
4197 * We must complete all the transaction blocks that were
4198 * started in this script.
4200 tstatus = getTransactionStatus(st->con);
4201 if (tstatus == TSTATUS_IN_BLOCK)
4203 pg_log_error("client %d aborted: end of script reached without completing the last transaction",
4204 st->id);
4205 st->state = CSTATE_ABORTED;
4206 break;
4208 else if (tstatus != TSTATUS_IDLE)
4210 if (tstatus == TSTATUS_CONN_ERROR)
4211 pg_log_error("perhaps the backend died while processing");
4213 pg_log_error("client %d aborted while receiving the transaction status", st->id);
4214 st->state = CSTATE_ABORTED;
4215 break;
4218 if (is_connect)
4220 pg_time_usec_t start = now;
4222 pg_time_now_lazy(&start);
4223 finishCon(st);
4224 now = pg_time_now();
4225 thread->conn_duration += now - start;
4228 if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
4230 /* script completed */
4231 st->state = CSTATE_FINISHED;
4232 break;
4235 /* next transaction (script) */
4236 st->state = CSTATE_CHOOSE_SCRIPT;
4239 * Ensure that we always return on this point, so as to
4240 * avoid an infinite loop if the script only contains meta
4241 * commands.
4243 return;
4247 * Final states. Close the connection if it's still open.
4249 case CSTATE_ABORTED:
4250 case CSTATE_FINISHED:
4253 * Don't measure the disconnection delays here even if in
4254 * CSTATE_FINISHED and -C/--connect option is specified.
4255 * Because in this case all the connections that this thread
4256 * established are closed at the end of transactions and the
4257 * disconnection delays should have already been measured at
4258 * that moment.
4260 * In CSTATE_ABORTED state, the measurement is no longer
4261 * necessary because we cannot report complete results anyways
4262 * in this case.
4264 finishCon(st);
4265 return;
4271 * Subroutine for advanceConnectionState -- initiate or execute the current
4272 * meta command, and return the next state to set.
4274 * *now is updated to the current time, unless the command is expected to
4275 * take no time to execute.
4277 static ConnectionStateEnum
4278 executeMetaCommand(CState *st, pg_time_usec_t *now)
4280 Command *command = sql_script[st->use_file].commands[st->command];
4281 int argc;
4282 char **argv;
4284 Assert(command != NULL && command->type == META_COMMAND);
4286 argc = command->argc;
4287 argv = command->argv;
4289 if (unlikely(__pg_log_level <= PG_LOG_DEBUG))
4291 PQExpBufferData buf;
4293 initPQExpBuffer(&buf);
4295 printfPQExpBuffer(&buf, "client %d executing \\%s", st->id, argv[0]);
4296 for (int i = 1; i < argc; i++)
4297 appendPQExpBuffer(&buf, " %s", argv[i]);
4299 pg_log_debug("%s", buf.data);
4301 termPQExpBuffer(&buf);
4304 if (command->meta == META_SLEEP)
4306 int usec;
4309 * A \sleep doesn't execute anything, we just get the delay from the
4310 * argument, and enter the CSTATE_SLEEP state. (The per-command
4311 * latency will be recorded in CSTATE_SLEEP state, not here, after the
4312 * delay has elapsed.)
4314 if (!evaluateSleep(&st->variables, argc, argv, &usec))
4316 commandFailed(st, "sleep", "execution of meta-command failed");
4317 return CSTATE_ABORTED;
4320 pg_time_now_lazy(now);
4321 st->sleep_until = (*now) + usec;
4322 return CSTATE_SLEEP;
4324 else if (command->meta == META_SET)
4326 PgBenchExpr *expr = command->expr;
4327 PgBenchValue result;
4329 if (!evaluateExpr(st, expr, &result))
4331 commandFailed(st, argv[0], "evaluation of meta-command failed");
4332 return CSTATE_ABORTED;
4335 if (!putVariableValue(&st->variables, argv[0], argv[1], &result))
4337 commandFailed(st, "set", "assignment of meta-command failed");
4338 return CSTATE_ABORTED;
4341 else if (command->meta == META_IF)
4343 /* backslash commands with an expression to evaluate */
4344 PgBenchExpr *expr = command->expr;
4345 PgBenchValue result;
4346 bool cond;
4348 if (!evaluateExpr(st, expr, &result))
4350 commandFailed(st, argv[0], "evaluation of meta-command failed");
4351 return CSTATE_ABORTED;
4354 cond = valueTruth(&result);
4355 conditional_stack_push(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
4357 else if (command->meta == META_ELIF)
4359 /* backslash commands with an expression to evaluate */
4360 PgBenchExpr *expr = command->expr;
4361 PgBenchValue result;
4362 bool cond;
4364 if (conditional_stack_peek(st->cstack) == IFSTATE_TRUE)
4366 /* elif after executed block, skip eval and wait for endif. */
4367 conditional_stack_poke(st->cstack, IFSTATE_IGNORED);
4368 return CSTATE_END_COMMAND;
4371 if (!evaluateExpr(st, expr, &result))
4373 commandFailed(st, argv[0], "evaluation of meta-command failed");
4374 return CSTATE_ABORTED;
4377 cond = valueTruth(&result);
4378 Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
4379 conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
4381 else if (command->meta == META_ELSE)
4383 switch (conditional_stack_peek(st->cstack))
4385 case IFSTATE_TRUE:
4386 conditional_stack_poke(st->cstack, IFSTATE_ELSE_FALSE);
4387 break;
4388 case IFSTATE_FALSE: /* inconsistent if active */
4389 case IFSTATE_IGNORED: /* inconsistent if active */
4390 case IFSTATE_NONE: /* else without if */
4391 case IFSTATE_ELSE_TRUE: /* else after else */
4392 case IFSTATE_ELSE_FALSE: /* else after else */
4393 default:
4394 /* dead code if conditional check is ok */
4395 Assert(false);
4398 else if (command->meta == META_ENDIF)
4400 Assert(!conditional_stack_empty(st->cstack));
4401 conditional_stack_pop(st->cstack);
4403 else if (command->meta == META_SETSHELL)
4405 if (!runShellCommand(&st->variables, argv[1], argv + 2, argc - 2))
4407 commandFailed(st, "setshell", "execution of meta-command failed");
4408 return CSTATE_ABORTED;
4411 else if (command->meta == META_SHELL)
4413 if (!runShellCommand(&st->variables, NULL, argv + 1, argc - 1))
4415 commandFailed(st, "shell", "execution of meta-command failed");
4416 return CSTATE_ABORTED;
4419 else if (command->meta == META_STARTPIPELINE)
4422 * In pipeline mode, we use a workflow based on libpq pipeline
4423 * functions.
4425 if (querymode == QUERY_SIMPLE)
4427 commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
4428 return CSTATE_ABORTED;
4432 * If we're in prepared-query mode, we need to prepare all the
4433 * commands that are inside the pipeline before we actually start the
4434 * pipeline itself. This solves the problem that running BEGIN
4435 * ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
4436 * snapshot having been acquired by the prepare within the pipeline.
4438 if (querymode == QUERY_PREPARED)
4439 prepareCommandsInPipeline(st);
4441 if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
4443 commandFailed(st, "startpipeline", "already in pipeline mode");
4444 return CSTATE_ABORTED;
4446 if (PQenterPipelineMode(st->con) == 0)
4448 commandFailed(st, "startpipeline", "failed to enter pipeline mode");
4449 return CSTATE_ABORTED;
4452 else if (command->meta == META_ENDPIPELINE)
4454 if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
4456 commandFailed(st, "endpipeline", "not in pipeline mode");
4457 return CSTATE_ABORTED;
4459 if (!PQpipelineSync(st->con))
4461 commandFailed(st, "endpipeline", "failed to send a pipeline sync");
4462 return CSTATE_ABORTED;
4464 /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
4465 /* collect pending results before getting out of pipeline mode */
4466 return CSTATE_WAIT_RESULT;
4470 * executing the expression or shell command might have taken a
4471 * non-negligible amount of time, so reset 'now'
4473 *now = 0;
4475 return CSTATE_END_COMMAND;
4479 * Return the number of failed transactions.
4481 static int64
4482 getFailures(const StatsData *stats)
4484 return (stats->serialization_failures +
4485 stats->deadlock_failures);
4489 * Return a string constant representing the result of a transaction
4490 * that is not successfully processed.
4492 static const char *
4493 getResultString(bool skipped, EStatus estatus)
4495 if (skipped)
4496 return "skipped";
4497 else if (failures_detailed)
4499 switch (estatus)
4501 case ESTATUS_SERIALIZATION_ERROR:
4502 return "serialization";
4503 case ESTATUS_DEADLOCK_ERROR:
4504 return "deadlock";
4505 default:
4506 /* internal error which should never occur */
4507 pg_fatal("unexpected error status: %d", estatus);
4510 else
4511 return "failed";
4515 * Print log entry after completing one transaction.
4517 * We print Unix-epoch timestamps in the log, so that entries can be
4518 * correlated against other logs.
4520 * XXX We could obtain the time from the caller and just shift it here, to
4521 * avoid the cost of an extra call to pg_time_now().
4523 static void
4524 doLog(TState *thread, CState *st,
4525 StatsData *agg, bool skipped, double latency, double lag)
4527 FILE *logfile = thread->logfile;
4528 pg_time_usec_t now = pg_time_now() + epoch_shift;
4530 Assert(use_log);
4533 * Skip the log entry if sampling is enabled and this row doesn't belong
4534 * to the random sample.
4536 if (sample_rate != 0.0 &&
4537 pg_prng_double(&thread->ts_sample_rs) > sample_rate)
4538 return;
4540 /* should we aggregate the results or not? */
4541 if (agg_interval > 0)
4543 pg_time_usec_t next;
4546 * Loop until we reach the interval of the current moment, and print
4547 * any empty intervals in between (this may happen with very low tps,
4548 * e.g. --rate=0.1).
4551 while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
4553 double lag_sum = 0.0;
4554 double lag_sum2 = 0.0;
4555 double lag_min = 0.0;
4556 double lag_max = 0.0;
4557 int64 skipped = 0;
4558 int64 serialization_failures = 0;
4559 int64 deadlock_failures = 0;
4560 int64 retried = 0;
4561 int64 retries = 0;
4563 /* print aggregated report to logfile */
4564 fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
4565 agg->start_time / 1000000, /* seconds since Unix epoch */
4566 agg->cnt,
4567 agg->latency.sum,
4568 agg->latency.sum2,
4569 agg->latency.min,
4570 agg->latency.max);
4572 if (throttle_delay)
4574 lag_sum = agg->lag.sum;
4575 lag_sum2 = agg->lag.sum2;
4576 lag_min = agg->lag.min;
4577 lag_max = agg->lag.max;
4579 fprintf(logfile, " %.0f %.0f %.0f %.0f",
4580 lag_sum,
4581 lag_sum2,
4582 lag_min,
4583 lag_max);
4585 if (latency_limit)
4586 skipped = agg->skipped;
4587 fprintf(logfile, " " INT64_FORMAT, skipped);
4589 if (max_tries != 1)
4591 retried = agg->retried;
4592 retries = agg->retries;
4594 fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT, retried, retries);
4596 if (failures_detailed)
4598 serialization_failures = agg->serialization_failures;
4599 deadlock_failures = agg->deadlock_failures;
4601 fprintf(logfile, " " INT64_FORMAT " " INT64_FORMAT,
4602 serialization_failures,
4603 deadlock_failures);
4605 fputc('\n', logfile);
4607 /* reset data and move to next interval */
4608 initStats(agg, next);
4611 /* accumulate the current transaction */
4612 accumStats(agg, skipped, latency, lag, st->estatus, st->tries);
4614 else
4616 /* no, print raw transactions */
4617 if (!skipped && st->estatus == ESTATUS_NO_ERROR)
4618 fprintf(logfile, "%d " INT64_FORMAT " %.0f %d " INT64_FORMAT " "
4619 INT64_FORMAT,
4620 st->id, st->cnt, latency, st->use_file,
4621 now / 1000000, now % 1000000);
4622 else
4623 fprintf(logfile, "%d " INT64_FORMAT " %s %d " INT64_FORMAT " "
4624 INT64_FORMAT,
4625 st->id, st->cnt, getResultString(skipped, st->estatus),
4626 st->use_file, now / 1000000, now % 1000000);
4628 if (throttle_delay)
4629 fprintf(logfile, " %.0f", lag);
4630 if (max_tries != 1)
4631 fprintf(logfile, " %u", st->tries - 1);
4632 fputc('\n', logfile);
4637 * Accumulate and report statistics at end of a transaction.
4639 * (This is also called when a transaction is late and thus skipped.
4640 * Note that even skipped and failed transactions are counted in the CState
4641 * "cnt" field.)
4643 static void
4644 processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
4645 bool skipped, StatsData *agg)
4647 double latency = 0.0,
4648 lag = 0.0;
4649 bool detailed = progress || throttle_delay || latency_limit ||
4650 use_log || per_script_stats;
4652 if (detailed && !skipped && st->estatus == ESTATUS_NO_ERROR)
4654 pg_time_now_lazy(now);
4656 /* compute latency & lag */
4657 latency = (*now) - st->txn_scheduled;
4658 lag = st->txn_begin - st->txn_scheduled;
4661 /* keep detailed thread stats */
4662 accumStats(&thread->stats, skipped, latency, lag, st->estatus, st->tries);
4664 /* count transactions over the latency limit, if needed */
4665 if (latency_limit && latency > latency_limit)
4666 thread->latency_late++;
4668 /* client stat is just counting */
4669 st->cnt++;
4671 if (use_log)
4672 doLog(thread, st, agg, skipped, latency, lag);
4674 /* XXX could use a mutex here, but we choose not to */
4675 if (per_script_stats)
4676 accumStats(&sql_script[st->use_file].stats, skipped, latency, lag,
4677 st->estatus, st->tries);
4681 /* discard connections */
4682 static void
4683 disconnect_all(CState *state, int length)
4685 int i;
4687 for (i = 0; i < length; i++)
4688 finishCon(&state[i]);
4692 * Remove old pgbench tables, if any exist
4694 static void
4695 initDropTables(PGconn *con)
4697 fprintf(stderr, "dropping old tables...\n");
4700 * We drop all the tables in one command, so that whether there are
4701 * foreign key dependencies or not doesn't matter.
4703 executeStatement(con, "drop table if exists "
4704 "pgbench_accounts, "
4705 "pgbench_branches, "
4706 "pgbench_history, "
4707 "pgbench_tellers");
4711 * Create "pgbench_accounts" partitions if needed.
4713 * This is the larger table of pgbench default tpc-b like schema
4714 * with a known size, so we choose to partition it.
4716 static void
4717 createPartitions(PGconn *con)
4719 PQExpBufferData query;
4721 /* we must have to create some partitions */
4722 Assert(partitions > 0);
4724 fprintf(stderr, "creating %d partitions...\n", partitions);
4726 initPQExpBuffer(&query);
4728 for (int p = 1; p <= partitions; p++)
4730 if (partition_method == PART_RANGE)
4732 int64 part_size = (naccounts * (int64) scale + partitions - 1) / partitions;
4734 printfPQExpBuffer(&query,
4735 "create%s table pgbench_accounts_%d\n"
4736 " partition of pgbench_accounts\n"
4737 " for values from (",
4738 unlogged_tables ? " unlogged" : "", p);
4741 * For RANGE, we use open-ended partitions at the beginning and
4742 * end to allow any valid value for the primary key. Although the
4743 * actual minimum and maximum values can be derived from the
4744 * scale, it is more generic and the performance is better.
4746 if (p == 1)
4747 appendPQExpBufferStr(&query, "minvalue");
4748 else
4749 appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) * part_size + 1);
4751 appendPQExpBufferStr(&query, ") to (");
4753 if (p < partitions)
4754 appendPQExpBuffer(&query, INT64_FORMAT, p * part_size + 1);
4755 else
4756 appendPQExpBufferStr(&query, "maxvalue");
4758 appendPQExpBufferChar(&query, ')');
4760 else if (partition_method == PART_HASH)
4761 printfPQExpBuffer(&query,
4762 "create%s table pgbench_accounts_%d\n"
4763 " partition of pgbench_accounts\n"
4764 " for values with (modulus %d, remainder %d)",
4765 unlogged_tables ? " unlogged" : "", p,
4766 partitions, p - 1);
4767 else /* cannot get there */
4768 Assert(0);
4771 * Per ddlinfo in initCreateTables, fillfactor is needed on table
4772 * pgbench_accounts.
4774 appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
4776 executeStatement(con, query.data);
4779 termPQExpBuffer(&query);
4783 * Create pgbench's standard tables
4785 static void
4786 initCreateTables(PGconn *con)
4789 * Note: TPC-B requires at least 100 bytes per row, and the "filler"
4790 * fields in these table declarations were intended to comply with that.
4791 * The pgbench_accounts table complies with that because the "filler"
4792 * column is set to blank-padded empty string. But for all other tables
4793 * the columns default to NULL and so don't actually take any space. We
4794 * could fix that by giving them non-null default values. However, that
4795 * would completely break comparability of pgbench results with prior
4796 * versions. Since pgbench has never pretended to be fully TPC-B compliant
4797 * anyway, we stick with the historical behavior.
4799 struct ddlinfo
4801 const char *table; /* table name */
4802 const char *smcols; /* column decls if accountIDs are 32 bits */
4803 const char *bigcols; /* column decls if accountIDs are 64 bits */
4804 int declare_fillfactor;
4806 static const struct ddlinfo DDLs[] = {
4808 "pgbench_history",
4809 "tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)",
4810 "tid int,bid int,aid bigint,delta int,mtime timestamp,filler char(22)",
4814 "pgbench_tellers",
4815 "tid int not null,bid int,tbalance int,filler char(84)",
4816 "tid int not null,bid int,tbalance int,filler char(84)",
4820 "pgbench_accounts",
4821 "aid int not null,bid int,abalance int,filler char(84)",
4822 "aid bigint not null,bid int,abalance int,filler char(84)",
4826 "pgbench_branches",
4827 "bid int not null,bbalance int,filler char(88)",
4828 "bid int not null,bbalance int,filler char(88)",
4832 int i;
4833 PQExpBufferData query;
4835 fprintf(stderr, "creating tables...\n");
4837 initPQExpBuffer(&query);
4839 for (i = 0; i < lengthof(DDLs); i++)
4841 const struct ddlinfo *ddl = &DDLs[i];
4843 /* Construct new create table statement. */
4844 printfPQExpBuffer(&query, "create%s table %s(%s)",
4845 unlogged_tables ? " unlogged" : "",
4846 ddl->table,
4847 (scale >= SCALE_32BIT_THRESHOLD) ? ddl->bigcols : ddl->smcols);
4849 /* Partition pgbench_accounts table */
4850 if (partition_method != PART_NONE && strcmp(ddl->table, "pgbench_accounts") == 0)
4851 appendPQExpBuffer(&query,
4852 " partition by %s (aid)", PARTITION_METHOD[partition_method]);
4853 else if (ddl->declare_fillfactor)
4855 /* fillfactor is only expected on actual tables */
4856 appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
4859 if (tablespace != NULL)
4861 char *escape_tablespace;
4863 escape_tablespace = PQescapeIdentifier(con, tablespace, strlen(tablespace));
4864 appendPQExpBuffer(&query, " tablespace %s", escape_tablespace);
4865 PQfreemem(escape_tablespace);
4868 executeStatement(con, query.data);
4871 termPQExpBuffer(&query);
4873 if (partition_method != PART_NONE)
4874 createPartitions(con);
4878 * Truncate away any old data, in one command in case there are foreign keys
4880 static void
4881 initTruncateTables(PGconn *con)
4883 executeStatement(con, "truncate table "
4884 "pgbench_accounts, "
4885 "pgbench_branches, "
4886 "pgbench_history, "
4887 "pgbench_tellers");
4890 static void
4891 initBranch(PQExpBufferData *sql, int64 curr)
4893 /* "filler" column uses NULL */
4894 printfPQExpBuffer(sql,
4895 INT64_FORMAT "\t0\t\\N\n",
4896 curr + 1);
4899 static void
4900 initTeller(PQExpBufferData *sql, int64 curr)
4902 /* "filler" column uses NULL */
4903 printfPQExpBuffer(sql,
4904 INT64_FORMAT "\t" INT64_FORMAT "\t0\t\\N\n",
4905 curr + 1, curr / ntellers + 1);
4908 static void
4909 initAccount(PQExpBufferData *sql, int64 curr)
4911 /* "filler" column defaults to blank padded empty string */
4912 printfPQExpBuffer(sql,
4913 INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
4914 curr + 1, curr / naccounts + 1);
4917 static void
4918 initPopulateTable(PGconn *con, const char *table, int64 base,
4919 initRowMethod init_row)
4921 int n;
4922 int64 k;
4923 int chars = 0;
4924 PGresult *res;
4925 PQExpBufferData sql;
4926 char copy_statement[256];
4927 const char *copy_statement_fmt = "copy %s from stdin";
4928 int64 total = base * scale;
4930 /* used to track elapsed time and estimate of the remaining time */
4931 pg_time_usec_t start;
4932 int log_interval = 1;
4934 /* Stay on the same line if reporting to a terminal */
4935 char eol = isatty(fileno(stderr)) ? '\r' : '\n';
4937 initPQExpBuffer(&sql);
4940 * Use COPY with FREEZE on v14 and later for all the tables except
4941 * pgbench_accounts when it is partitioned.
4943 if (PQserverVersion(con) >= 140000)
4945 if (strcmp(table, "pgbench_accounts") != 0 ||
4946 partitions == 0)
4947 copy_statement_fmt = "copy %s from stdin with (freeze on)";
4950 n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table);
4951 if (n >= sizeof(copy_statement))
4952 pg_fatal("invalid buffer size: must be at least %d characters long", n);
4953 else if (n == -1)
4954 pg_fatal("invalid format string");
4956 res = PQexec(con, copy_statement);
4958 if (PQresultStatus(res) != PGRES_COPY_IN)
4959 pg_fatal("unexpected copy in result: %s", PQerrorMessage(con));
4960 PQclear(res);
4962 start = pg_time_now();
4964 for (k = 0; k < total; k++)
4966 int64 j = k + 1;
4968 init_row(&sql, k);
4969 if (PQputline(con, sql.data))
4970 pg_fatal("PQputline failed");
4972 if (CancelRequested)
4973 break;
4976 * If we want to stick with the original logging, print a message each
4977 * 100k inserted rows.
4979 if ((!use_quiet) && (j % 100000 == 0))
4981 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
4982 double remaining_sec = ((double) total - j) * elapsed_sec / j;
4984 chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
4985 j, total,
4986 (int) ((j * 100) / total),
4987 table, elapsed_sec, remaining_sec, eol);
4989 /* let's not call the timing for each row, but only each 100 rows */
4990 else if (use_quiet && (j % 100 == 0))
4992 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
4993 double remaining_sec = ((double) total - j) * elapsed_sec / j;
4995 /* have we reached the next interval (or end)? */
4996 if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
4998 chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
4999 j, total,
5000 (int) ((j * 100) / total),
5001 table, elapsed_sec, remaining_sec, eol);
5003 /* skip to the next interval */
5004 log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
5009 if (chars != 0 && eol != '\n')
5010 fprintf(stderr, "%*c\r", chars - 1, ' '); /* Clear the current line */
5012 if (PQputline(con, "\\.\n"))
5013 pg_fatal("very last PQputline failed");
5014 if (PQendcopy(con))
5015 pg_fatal("PQendcopy failed");
5017 termPQExpBuffer(&sql);
5021 * Fill the standard tables with some data generated and sent from the client.
5023 * The filler column is NULL in pgbench_branches and pgbench_tellers, and is
5024 * a blank-padded string in pgbench_accounts.
5026 static void
5027 initGenerateDataClientSide(PGconn *con)
5029 fprintf(stderr, "generating data (client-side)...\n");
5032 * we do all of this in one transaction to enable the backend's
5033 * data-loading optimizations
5035 executeStatement(con, "begin");
5037 /* truncate away any old data */
5038 initTruncateTables(con);
5041 * fill branches, tellers, accounts in that order in case foreign keys
5042 * already exist
5044 initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
5045 initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
5046 initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
5048 executeStatement(con, "commit");
5052 * Fill the standard tables with some data generated on the server
5054 * As already the case with the client-side data generation, the filler
5055 * column defaults to NULL in pgbench_branches and pgbench_tellers,
5056 * and is a blank-padded string in pgbench_accounts.
5058 static void
5059 initGenerateDataServerSide(PGconn *con)
5061 PQExpBufferData sql;
5063 fprintf(stderr, "generating data (server-side)...\n");
5066 * we do all of this in one transaction to enable the backend's
5067 * data-loading optimizations
5069 executeStatement(con, "begin");
5071 /* truncate away any old data */
5072 initTruncateTables(con);
5074 initPQExpBuffer(&sql);
5076 printfPQExpBuffer(&sql,
5077 "insert into pgbench_branches(bid,bbalance) "
5078 "select bid, 0 "
5079 "from generate_series(1, %d) as bid", nbranches * scale);
5080 executeStatement(con, sql.data);
5082 printfPQExpBuffer(&sql,
5083 "insert into pgbench_tellers(tid,bid,tbalance) "
5084 "select tid, (tid - 1) / %d + 1, 0 "
5085 "from generate_series(1, %d) as tid", ntellers, ntellers * scale);
5086 executeStatement(con, sql.data);
5088 printfPQExpBuffer(&sql,
5089 "insert into pgbench_accounts(aid,bid,abalance,filler) "
5090 "select aid, (aid - 1) / %d + 1, 0, '' "
5091 "from generate_series(1, " INT64_FORMAT ") as aid",
5092 naccounts, (int64) naccounts * scale);
5093 executeStatement(con, sql.data);
5095 termPQExpBuffer(&sql);
5097 executeStatement(con, "commit");
5101 * Invoke vacuum on the standard tables
5103 static void
5104 initVacuum(PGconn *con)
5106 fprintf(stderr, "vacuuming...\n");
5107 executeStatement(con, "vacuum analyze pgbench_branches");
5108 executeStatement(con, "vacuum analyze pgbench_tellers");
5109 executeStatement(con, "vacuum analyze pgbench_accounts");
5110 executeStatement(con, "vacuum analyze pgbench_history");
5114 * Create primary keys on the standard tables
5116 static void
5117 initCreatePKeys(PGconn *con)
5119 static const char *const DDLINDEXes[] = {
5120 "alter table pgbench_branches add primary key (bid)",
5121 "alter table pgbench_tellers add primary key (tid)",
5122 "alter table pgbench_accounts add primary key (aid)"
5124 int i;
5125 PQExpBufferData query;
5127 fprintf(stderr, "creating primary keys...\n");
5128 initPQExpBuffer(&query);
5130 for (i = 0; i < lengthof(DDLINDEXes); i++)
5132 resetPQExpBuffer(&query);
5133 appendPQExpBufferStr(&query, DDLINDEXes[i]);
5135 if (index_tablespace != NULL)
5137 char *escape_tablespace;
5139 escape_tablespace = PQescapeIdentifier(con, index_tablespace,
5140 strlen(index_tablespace));
5141 appendPQExpBuffer(&query, " using index tablespace %s", escape_tablespace);
5142 PQfreemem(escape_tablespace);
5145 executeStatement(con, query.data);
5148 termPQExpBuffer(&query);
5152 * Create foreign key constraints between the standard tables
5154 static void
5155 initCreateFKeys(PGconn *con)
5157 static const char *const DDLKEYs[] = {
5158 "alter table pgbench_tellers add constraint pgbench_tellers_bid_fkey foreign key (bid) references pgbench_branches",
5159 "alter table pgbench_accounts add constraint pgbench_accounts_bid_fkey foreign key (bid) references pgbench_branches",
5160 "alter table pgbench_history add constraint pgbench_history_bid_fkey foreign key (bid) references pgbench_branches",
5161 "alter table pgbench_history add constraint pgbench_history_tid_fkey foreign key (tid) references pgbench_tellers",
5162 "alter table pgbench_history add constraint pgbench_history_aid_fkey foreign key (aid) references pgbench_accounts"
5164 int i;
5166 fprintf(stderr, "creating foreign keys...\n");
5167 for (i = 0; i < lengthof(DDLKEYs); i++)
5169 executeStatement(con, DDLKEYs[i]);
5174 * Validate an initialization-steps string
5176 * (We could just leave it to runInitSteps() to fail if there are wrong
5177 * characters, but since initialization can take awhile, it seems friendlier
5178 * to check during option parsing.)
5180 static void
5181 checkInitSteps(const char *initialize_steps)
5183 if (initialize_steps[0] == '\0')
5184 pg_fatal("no initialization steps specified");
5186 for (const char *step = initialize_steps; *step != '\0'; step++)
5188 if (strchr(ALL_INIT_STEPS " ", *step) == NULL)
5190 pg_log_error("unrecognized initialization step \"%c\"", *step);
5191 pg_log_error_detail("Allowed step characters are: \"" ALL_INIT_STEPS "\".");
5192 exit(1);
5198 * Invoke each initialization step in the given string
5200 static void
5201 runInitSteps(const char *initialize_steps)
5203 PQExpBufferData stats;
5204 PGconn *con;
5205 const char *step;
5206 double run_time = 0.0;
5207 bool first = true;
5209 initPQExpBuffer(&stats);
5211 if ((con = doConnect()) == NULL)
5212 pg_fatal("could not create connection for initialization");
5214 setup_cancel_handler(NULL);
5215 SetCancelConn(con);
5217 for (step = initialize_steps; *step != '\0'; step++)
5219 char *op = NULL;
5220 pg_time_usec_t start = pg_time_now();
5222 switch (*step)
5224 case 'd':
5225 op = "drop tables";
5226 initDropTables(con);
5227 break;
5228 case 't':
5229 op = "create tables";
5230 initCreateTables(con);
5231 break;
5232 case 'g':
5233 op = "client-side generate";
5234 initGenerateDataClientSide(con);
5235 break;
5236 case 'G':
5237 op = "server-side generate";
5238 initGenerateDataServerSide(con);
5239 break;
5240 case 'v':
5241 op = "vacuum";
5242 initVacuum(con);
5243 break;
5244 case 'p':
5245 op = "primary keys";
5246 initCreatePKeys(con);
5247 break;
5248 case 'f':
5249 op = "foreign keys";
5250 initCreateFKeys(con);
5251 break;
5252 case ' ':
5253 break; /* ignore */
5254 default:
5255 pg_log_error("unrecognized initialization step \"%c\"", *step);
5256 PQfinish(con);
5257 exit(1);
5260 if (op != NULL)
5262 double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
5264 if (!first)
5265 appendPQExpBufferStr(&stats, ", ");
5266 else
5267 first = false;
5269 appendPQExpBuffer(&stats, "%s %.2f s", op, elapsed_sec);
5271 run_time += elapsed_sec;
5275 fprintf(stderr, "done in %.2f s (%s).\n", run_time, stats.data);
5276 ResetCancelConn();
5277 PQfinish(con);
5278 termPQExpBuffer(&stats);
5282 * Extract pgbench table information into global variables scale,
5283 * partition_method and partitions.
5285 static void
5286 GetTableInfo(PGconn *con, bool scale_given)
5288 PGresult *res;
5291 * get the scaling factor that should be same as count(*) from
5292 * pgbench_branches if this is not a custom query
5294 res = PQexec(con, "select count(*) from pgbench_branches");
5295 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5297 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
5299 pg_log_error("could not count number of branches: %s", PQerrorMessage(con));
5301 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) == 0)
5302 pg_log_error_hint("Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\".",
5303 PQdb(con));
5305 exit(1);
5307 scale = atoi(PQgetvalue(res, 0, 0));
5308 if (scale < 0)
5309 pg_fatal("invalid count(*) from pgbench_branches: \"%s\"",
5310 PQgetvalue(res, 0, 0));
5311 PQclear(res);
5313 /* warn if we override user-given -s switch */
5314 if (scale_given)
5315 pg_log_warning("scale option ignored, using count from pgbench_branches table (%d)",
5316 scale);
5319 * Get the partition information for the first "pgbench_accounts" table
5320 * found in search_path.
5322 * The result is empty if no "pgbench_accounts" is found.
5324 * Otherwise, it always returns one row even if the table is not
5325 * partitioned (in which case the partition strategy is NULL).
5327 * The number of partitions can be 0 even for partitioned tables, if no
5328 * partition is attached.
5330 * We assume no partitioning on any failure, so as to avoid failing on an
5331 * old version without "pg_partitioned_table".
5333 res = PQexec(con,
5334 "select o.n, p.partstrat, pg_catalog.count(i.inhparent) "
5335 "from pg_catalog.pg_class as c "
5336 "join pg_catalog.pg_namespace as n on (n.oid = c.relnamespace) "
5337 "cross join lateral (select pg_catalog.array_position(pg_catalog.current_schemas(true), n.nspname)) as o(n) "
5338 "left join pg_catalog.pg_partitioned_table as p on (p.partrelid = c.oid) "
5339 "left join pg_catalog.pg_inherits as i on (c.oid = i.inhparent) "
5340 "where c.relname = 'pgbench_accounts' and o.n is not null "
5341 "group by 1, 2 "
5342 "order by 1 asc "
5343 "limit 1");
5345 if (PQresultStatus(res) != PGRES_TUPLES_OK)
5347 /* probably an older version, coldly assume no partitioning */
5348 partition_method = PART_NONE;
5349 partitions = 0;
5351 else if (PQntuples(res) == 0)
5354 * This case is unlikely as pgbench already found "pgbench_branches"
5355 * above to compute the scale.
5357 pg_log_error("no pgbench_accounts table found in search_path");
5358 pg_log_error_hint("Perhaps you need to do initialization (\"pgbench -i\") in database \"%s\".", PQdb(con));
5359 exit(1);
5361 else /* PQntuples(res) == 1 */
5363 /* normal case, extract partition information */
5364 if (PQgetisnull(res, 0, 1))
5365 partition_method = PART_NONE;
5366 else
5368 char *ps = PQgetvalue(res, 0, 1);
5370 /* column must be there */
5371 Assert(ps != NULL);
5373 if (strcmp(ps, "r") == 0)
5374 partition_method = PART_RANGE;
5375 else if (strcmp(ps, "h") == 0)
5376 partition_method = PART_HASH;
5377 else
5379 /* possibly a newer version with new partition method */
5380 pg_fatal("unexpected partition method: \"%s\"", ps);
5384 partitions = atoi(PQgetvalue(res, 0, 2));
5387 PQclear(res);
5391 * Replace :param with $n throughout the command's SQL text, which
5392 * is a modifiable string in cmd->lines.
5394 static bool
5395 parseQuery(Command *cmd)
5397 char *sql,
5400 cmd->argc = 1;
5402 p = sql = pg_strdup(cmd->lines.data);
5403 while ((p = strchr(p, ':')) != NULL)
5405 char var[13];
5406 char *name;
5407 int eaten;
5409 name = parseVariable(p, &eaten);
5410 if (name == NULL)
5412 while (*p == ':')
5414 p++;
5416 continue;
5420 * cmd->argv[0] is the SQL statement itself, so the max number of
5421 * arguments is one less than MAX_ARGS
5423 if (cmd->argc >= MAX_ARGS)
5425 pg_log_error("statement has too many arguments (maximum is %d): %s",
5426 MAX_ARGS - 1, cmd->lines.data);
5427 pg_free(name);
5428 return false;
5431 sprintf(var, "$%d", cmd->argc);
5432 p = replaceVariable(&sql, p, eaten, var);
5434 cmd->argv[cmd->argc] = name;
5435 cmd->argc++;
5438 Assert(cmd->argv[0] == NULL);
5439 cmd->argv[0] = sql;
5440 return true;
5444 * syntax error while parsing a script (in practice, while parsing a
5445 * backslash command, because we don't detect syntax errors in SQL)
5447 * source: source of script (filename or builtin-script ID)
5448 * lineno: line number within script (count from 1)
5449 * line: whole line of backslash command, if available
5450 * command: backslash command name, if available
5451 * msg: the actual error message
5452 * more: optional extra message
5453 * column: zero-based column number, or -1 if unknown
5455 void
5456 syntax_error(const char *source, int lineno,
5457 const char *line, const char *command,
5458 const char *msg, const char *more, int column)
5460 PQExpBufferData buf;
5462 initPQExpBuffer(&buf);
5464 printfPQExpBuffer(&buf, "%s:%d: %s", source, lineno, msg);
5465 if (more != NULL)
5466 appendPQExpBuffer(&buf, " (%s)", more);
5467 if (column >= 0 && line == NULL)
5468 appendPQExpBuffer(&buf, " at column %d", column + 1);
5469 if (command != NULL)
5470 appendPQExpBuffer(&buf, " in command \"%s\"", command);
5472 pg_log_error("%s", buf.data);
5474 termPQExpBuffer(&buf);
5476 if (line != NULL)
5478 fprintf(stderr, "%s\n", line);
5479 if (column >= 0)
5480 fprintf(stderr, "%*c error found here\n", column + 1, '^');
5483 exit(1);
5487 * Return a pointer to the start of the SQL command, after skipping over
5488 * whitespace and "--" comments.
5489 * If the end of the string is reached, return NULL.
5491 static char *
5492 skip_sql_comments(char *sql_command)
5494 char *p = sql_command;
5496 /* Skip any leading whitespace, as well as "--" style comments */
5497 for (;;)
5499 if (isspace((unsigned char) *p))
5500 p++;
5501 else if (strncmp(p, "--", 2) == 0)
5503 p = strchr(p, '\n');
5504 if (p == NULL)
5505 return NULL;
5506 p++;
5508 else
5509 break;
5512 /* NULL if there's nothing but whitespace and comments */
5513 if (*p == '\0')
5514 return NULL;
5516 return p;
5520 * Parse a SQL command; return a Command struct, or NULL if it's a comment
5522 * On entry, psqlscan.l has collected the command into "buf", so we don't
5523 * really need to do much here except check for comments and set up a Command
5524 * struct.
5526 static Command *
5527 create_sql_command(PQExpBuffer buf, const char *source)
5529 Command *my_command;
5530 char *p = skip_sql_comments(buf->data);
5532 if (p == NULL)
5533 return NULL;
5535 /* Allocate and initialize Command structure */
5536 my_command = (Command *) pg_malloc(sizeof(Command));
5537 initPQExpBuffer(&my_command->lines);
5538 appendPQExpBufferStr(&my_command->lines, p);
5539 my_command->first_line = NULL; /* this is set later */
5540 my_command->type = SQL_COMMAND;
5541 my_command->meta = META_NONE;
5542 my_command->argc = 0;
5543 my_command->retries = 0;
5544 my_command->failures = 0;
5545 memset(my_command->argv, 0, sizeof(my_command->argv));
5546 my_command->varprefix = NULL; /* allocated later, if needed */
5547 my_command->expr = NULL;
5548 initSimpleStats(&my_command->stats);
5549 my_command->prepname = NULL; /* set later, if needed */
5551 return my_command;
5554 /* Free a Command structure and associated data */
5555 static void
5556 free_command(Command *command)
5558 termPQExpBuffer(&command->lines);
5559 pg_free(command->first_line);
5560 for (int i = 0; i < command->argc; i++)
5561 pg_free(command->argv[i]);
5562 pg_free(command->varprefix);
5565 * It should also free expr recursively, but this is currently not needed
5566 * as only gset commands (which do not have an expression) are freed.
5568 pg_free(command);
5572 * Once an SQL command is fully parsed, possibly by accumulating several
5573 * parts, complete other fields of the Command structure.
5575 static void
5576 postprocess_sql_command(Command *my_command)
5578 char buffer[128];
5579 static int prepnum = 0;
5581 Assert(my_command->type == SQL_COMMAND);
5583 /* Save the first line for error display. */
5584 strlcpy(buffer, my_command->lines.data, sizeof(buffer));
5585 buffer[strcspn(buffer, "\n\r")] = '\0';
5586 my_command->first_line = pg_strdup(buffer);
5588 /* Parse query and generate prepared statement name, if necessary */
5589 switch (querymode)
5591 case QUERY_SIMPLE:
5592 my_command->argv[0] = my_command->lines.data;
5593 my_command->argc++;
5594 break;
5595 case QUERY_PREPARED:
5596 my_command->prepname = psprintf("P_%d", prepnum++);
5597 /* fall through */
5598 case QUERY_EXTENDED:
5599 if (!parseQuery(my_command))
5600 exit(1);
5601 break;
5602 default:
5603 exit(1);
5608 * Parse a backslash command; return a Command struct, or NULL if comment
5610 * At call, we have scanned only the initial backslash.
5612 static Command *
5613 process_backslash_command(PsqlScanState sstate, const char *source)
5615 Command *my_command;
5616 PQExpBufferData word_buf;
5617 int word_offset;
5618 int offsets[MAX_ARGS]; /* offsets of argument words */
5619 int start_offset;
5620 int lineno;
5621 int j;
5623 initPQExpBuffer(&word_buf);
5625 /* Remember location of the backslash */
5626 start_offset = expr_scanner_offset(sstate) - 1;
5627 lineno = expr_scanner_get_lineno(sstate, start_offset);
5629 /* Collect first word of command */
5630 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
5632 termPQExpBuffer(&word_buf);
5633 return NULL;
5636 /* Allocate and initialize Command structure */
5637 my_command = (Command *) pg_malloc0(sizeof(Command));
5638 my_command->type = META_COMMAND;
5639 my_command->argc = 0;
5640 initSimpleStats(&my_command->stats);
5642 /* Save first word (command name) */
5643 j = 0;
5644 offsets[j] = word_offset;
5645 my_command->argv[j++] = pg_strdup(word_buf.data);
5646 my_command->argc++;
5648 /* ... and convert it to enum form */
5649 my_command->meta = getMetaCommand(my_command->argv[0]);
5651 if (my_command->meta == META_SET ||
5652 my_command->meta == META_IF ||
5653 my_command->meta == META_ELIF)
5655 yyscan_t yyscanner;
5657 /* For \set, collect var name */
5658 if (my_command->meta == META_SET)
5660 if (!expr_lex_one_word(sstate, &word_buf, &word_offset))
5661 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5662 "missing argument", NULL, -1);
5664 offsets[j] = word_offset;
5665 my_command->argv[j++] = pg_strdup(word_buf.data);
5666 my_command->argc++;
5669 /* then for all parse the expression */
5670 yyscanner = expr_scanner_init(sstate, source, lineno, start_offset,
5671 my_command->argv[0]);
5673 if (expr_yyparse(yyscanner) != 0)
5675 /* dead code: exit done from syntax_error called by yyerror */
5676 exit(1);
5679 my_command->expr = expr_parse_result;
5681 /* Save line, trimming any trailing newline */
5682 my_command->first_line =
5683 expr_scanner_get_substring(sstate,
5684 start_offset,
5685 expr_scanner_offset(sstate),
5686 true);
5688 expr_scanner_finish(yyscanner);
5690 termPQExpBuffer(&word_buf);
5692 return my_command;
5695 /* For all other commands, collect remaining words. */
5696 while (expr_lex_one_word(sstate, &word_buf, &word_offset))
5699 * my_command->argv[0] is the command itself, so the max number of
5700 * arguments is one less than MAX_ARGS
5702 if (j >= MAX_ARGS)
5703 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5704 "too many arguments", NULL, -1);
5706 offsets[j] = word_offset;
5707 my_command->argv[j++] = pg_strdup(word_buf.data);
5708 my_command->argc++;
5711 /* Save line, trimming any trailing newline */
5712 my_command->first_line =
5713 expr_scanner_get_substring(sstate,
5714 start_offset,
5715 expr_scanner_offset(sstate),
5716 true);
5718 if (my_command->meta == META_SLEEP)
5720 if (my_command->argc < 2)
5721 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5722 "missing argument", NULL, -1);
5724 if (my_command->argc > 3)
5725 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5726 "too many arguments", NULL,
5727 offsets[3] - start_offset);
5730 * Split argument into number and unit to allow "sleep 1ms" etc. We
5731 * don't have to terminate the number argument with null because it
5732 * will be parsed with atoi, which ignores trailing non-digit
5733 * characters.
5735 if (my_command->argv[1][0] != ':')
5737 char *c = my_command->argv[1];
5738 bool have_digit = false;
5740 /* Skip sign */
5741 if (*c == '+' || *c == '-')
5742 c++;
5744 /* Require at least one digit */
5745 if (*c && isdigit((unsigned char) *c))
5746 have_digit = true;
5748 /* Eat all digits */
5749 while (*c && isdigit((unsigned char) *c))
5750 c++;
5752 if (*c)
5754 if (my_command->argc == 2 && have_digit)
5756 my_command->argv[2] = c;
5757 offsets[2] = offsets[1] + (c - my_command->argv[1]);
5758 my_command->argc = 3;
5760 else
5763 * Raise an error if argument starts with non-digit
5764 * character (after sign).
5766 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5767 "invalid sleep time, must be an integer",
5768 my_command->argv[1], offsets[1] - start_offset);
5773 if (my_command->argc == 3)
5775 if (pg_strcasecmp(my_command->argv[2], "us") != 0 &&
5776 pg_strcasecmp(my_command->argv[2], "ms") != 0 &&
5777 pg_strcasecmp(my_command->argv[2], "s") != 0)
5778 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5779 "unrecognized time unit, must be us, ms or s",
5780 my_command->argv[2], offsets[2] - start_offset);
5783 else if (my_command->meta == META_SETSHELL)
5785 if (my_command->argc < 3)
5786 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5787 "missing argument", NULL, -1);
5789 else if (my_command->meta == META_SHELL)
5791 if (my_command->argc < 2)
5792 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5793 "missing command", NULL, -1);
5795 else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
5796 my_command->meta == META_STARTPIPELINE ||
5797 my_command->meta == META_ENDPIPELINE)
5799 if (my_command->argc != 1)
5800 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5801 "unexpected argument", NULL, -1);
5803 else if (my_command->meta == META_GSET || my_command->meta == META_ASET)
5805 if (my_command->argc > 2)
5806 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5807 "too many arguments", NULL, -1);
5809 else
5811 /* my_command->meta == META_NONE */
5812 syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
5813 "invalid command", NULL, -1);
5816 termPQExpBuffer(&word_buf);
5818 return my_command;
5821 static void
5822 ConditionError(const char *desc, int cmdn, const char *msg)
5824 pg_fatal("condition error in script \"%s\" command %d: %s",
5825 desc, cmdn, msg);
5829 * Partial evaluation of conditionals before recording and running the script.
5831 static void
5832 CheckConditional(const ParsedScript *ps)
5834 /* statically check conditional structure */
5835 ConditionalStack cs = conditional_stack_create();
5836 int i;
5838 for (i = 0; ps->commands[i] != NULL; i++)
5840 Command *cmd = ps->commands[i];
5842 if (cmd->type == META_COMMAND)
5844 switch (cmd->meta)
5846 case META_IF:
5847 conditional_stack_push(cs, IFSTATE_FALSE);
5848 break;
5849 case META_ELIF:
5850 if (conditional_stack_empty(cs))
5851 ConditionError(ps->desc, i + 1, "\\elif without matching \\if");
5852 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
5853 ConditionError(ps->desc, i + 1, "\\elif after \\else");
5854 break;
5855 case META_ELSE:
5856 if (conditional_stack_empty(cs))
5857 ConditionError(ps->desc, i + 1, "\\else without matching \\if");
5858 if (conditional_stack_peek(cs) == IFSTATE_ELSE_FALSE)
5859 ConditionError(ps->desc, i + 1, "\\else after \\else");
5860 conditional_stack_poke(cs, IFSTATE_ELSE_FALSE);
5861 break;
5862 case META_ENDIF:
5863 if (!conditional_stack_pop(cs))
5864 ConditionError(ps->desc, i + 1, "\\endif without matching \\if");
5865 break;
5866 default:
5867 /* ignore anything else... */
5868 break;
5872 if (!conditional_stack_empty(cs))
5873 ConditionError(ps->desc, i + 1, "\\if without matching \\endif");
5874 conditional_stack_destroy(cs);
5878 * Parse a script (either the contents of a file, or a built-in script)
5879 * and add it to the list of scripts.
5881 static void
5882 ParseScript(const char *script, const char *desc, int weight)
5884 ParsedScript ps;
5885 PsqlScanState sstate;
5886 PQExpBufferData line_buf;
5887 int alloc_num;
5888 int index;
5889 int lineno;
5890 int start_offset;
5892 #define COMMANDS_ALLOC_NUM 128
5893 alloc_num = COMMANDS_ALLOC_NUM;
5895 /* Initialize all fields of ps */
5896 ps.desc = desc;
5897 ps.weight = weight;
5898 ps.commands = (Command **) pg_malloc(sizeof(Command *) * alloc_num);
5899 initStats(&ps.stats, 0);
5901 /* Prepare to parse script */
5902 sstate = psql_scan_create(&pgbench_callbacks);
5905 * Ideally, we'd scan scripts using the encoding and stdstrings settings
5906 * we get from a DB connection. However, without major rearrangement of
5907 * pgbench's argument parsing, we can't have a DB connection at the time
5908 * we parse scripts. Using SQL_ASCII (encoding 0) should work well enough
5909 * with any backend-safe encoding, though conceivably we could be fooled
5910 * if a script file uses a client-only encoding. We also assume that
5911 * stdstrings should be true, which is a bit riskier.
5913 psql_scan_setup(sstate, script, strlen(script), 0, true);
5914 start_offset = expr_scanner_offset(sstate) - 1;
5916 initPQExpBuffer(&line_buf);
5918 index = 0;
5920 for (;;)
5922 PsqlScanResult sr;
5923 promptStatus_t prompt;
5924 Command *command = NULL;
5926 resetPQExpBuffer(&line_buf);
5927 lineno = expr_scanner_get_lineno(sstate, start_offset);
5929 sr = psql_scan(sstate, &line_buf, &prompt);
5931 /* If we collected a new SQL command, process that */
5932 command = create_sql_command(&line_buf, desc);
5934 /* store new command */
5935 if (command)
5936 ps.commands[index++] = command;
5938 /* If we reached a backslash, process that */
5939 if (sr == PSCAN_BACKSLASH)
5941 command = process_backslash_command(sstate, desc);
5943 if (command)
5946 * If this is gset or aset, merge into the preceding command.
5947 * (We don't use a command slot in this case).
5949 if (command->meta == META_GSET || command->meta == META_ASET)
5951 Command *cmd;
5953 if (index == 0)
5954 syntax_error(desc, lineno, NULL, NULL,
5955 "\\gset must follow an SQL command",
5956 NULL, -1);
5958 cmd = ps.commands[index - 1];
5960 if (cmd->type != SQL_COMMAND ||
5961 cmd->varprefix != NULL)
5962 syntax_error(desc, lineno, NULL, NULL,
5963 "\\gset must follow an SQL command",
5964 cmd->first_line, -1);
5966 /* get variable prefix */
5967 if (command->argc <= 1 || command->argv[1][0] == '\0')
5968 cmd->varprefix = pg_strdup("");
5969 else
5970 cmd->varprefix = pg_strdup(command->argv[1]);
5972 /* update the sql command meta */
5973 cmd->meta = command->meta;
5975 /* cleanup unused command */
5976 free_command(command);
5978 continue;
5981 /* Attach any other backslash command as a new command */
5982 ps.commands[index++] = command;
5987 * Since we used a command slot, allocate more if needed. Note we
5988 * always allocate one more in order to accommodate the NULL
5989 * terminator below.
5991 if (index >= alloc_num)
5993 alloc_num += COMMANDS_ALLOC_NUM;
5994 ps.commands = (Command **)
5995 pg_realloc(ps.commands, sizeof(Command *) * alloc_num);
5998 /* Done if we reached EOF */
5999 if (sr == PSCAN_INCOMPLETE || sr == PSCAN_EOL)
6000 break;
6003 ps.commands[index] = NULL;
6005 addScript(&ps);
6007 termPQExpBuffer(&line_buf);
6008 psql_scan_finish(sstate);
6009 psql_scan_destroy(sstate);
6013 * Read the entire contents of file fd, and return it in a malloc'd buffer.
6015 * The buffer will typically be larger than necessary, but we don't care
6016 * in this program, because we'll free it as soon as we've parsed the script.
6018 static char *
6019 read_file_contents(FILE *fd)
6021 char *buf;
6022 size_t buflen = BUFSIZ;
6023 size_t used = 0;
6025 buf = (char *) pg_malloc(buflen);
6027 for (;;)
6029 size_t nread;
6031 nread = fread(buf + used, 1, BUFSIZ, fd);
6032 used += nread;
6033 /* If fread() read less than requested, must be EOF or error */
6034 if (nread < BUFSIZ)
6035 break;
6036 /* Enlarge buf so we can read some more */
6037 buflen += BUFSIZ;
6038 buf = (char *) pg_realloc(buf, buflen);
6040 /* There is surely room for a terminator */
6041 buf[used] = '\0';
6043 return buf;
6047 * Given a file name, read it and add its script to the list.
6048 * "-" means to read stdin.
6049 * NB: filename must be storage that won't disappear.
6051 static void
6052 process_file(const char *filename, int weight)
6054 FILE *fd;
6055 char *buf;
6057 /* Slurp the file contents into "buf" */
6058 if (strcmp(filename, "-") == 0)
6059 fd = stdin;
6060 else if ((fd = fopen(filename, "r")) == NULL)
6061 pg_fatal("could not open file \"%s\": %m", filename);
6063 buf = read_file_contents(fd);
6065 if (ferror(fd))
6066 pg_fatal("could not read file \"%s\": %m", filename);
6068 if (fd != stdin)
6069 fclose(fd);
6071 ParseScript(buf, filename, weight);
6073 free(buf);
6076 /* Parse the given builtin script and add it to the list. */
6077 static void
6078 process_builtin(const BuiltinScript *bi, int weight)
6080 ParseScript(bi->script, bi->desc, weight);
6083 /* show available builtin scripts */
6084 static void
6085 listAvailableScripts(void)
6087 int i;
6089 fprintf(stderr, "Available builtin scripts:\n");
6090 for (i = 0; i < lengthof(builtin_script); i++)
6091 fprintf(stderr, " %13s: %s\n", builtin_script[i].name, builtin_script[i].desc);
6092 fprintf(stderr, "\n");
6095 /* return builtin script "name" if unambiguous, fails if not found */
6096 static const BuiltinScript *
6097 findBuiltin(const char *name)
6099 int i,
6100 found = 0,
6101 len = strlen(name);
6102 const BuiltinScript *result = NULL;
6104 for (i = 0; i < lengthof(builtin_script); i++)
6106 if (strncmp(builtin_script[i].name, name, len) == 0)
6108 result = &builtin_script[i];
6109 found++;
6113 /* ok, unambiguous result */
6114 if (found == 1)
6115 return result;
6117 /* error cases */
6118 if (found == 0)
6119 pg_log_error("no builtin script found for name \"%s\"", name);
6120 else /* found > 1 */
6121 pg_log_error("ambiguous builtin name: %d builtin scripts found for prefix \"%s\"", found, name);
6123 listAvailableScripts();
6124 exit(1);
6128 * Determine the weight specification from a script option (-b, -f), if any,
6129 * and return it as an integer (1 is returned if there's no weight). The
6130 * script name is returned in *script as a malloc'd string.
6132 static int
6133 parseScriptWeight(const char *option, char **script)
6135 char *sep;
6136 int weight;
6138 if ((sep = strrchr(option, WSEP)))
6140 int namelen = sep - option;
6141 long wtmp;
6142 char *badp;
6144 /* generate the script name */
6145 *script = pg_malloc(namelen + 1);
6146 strncpy(*script, option, namelen);
6147 (*script)[namelen] = '\0';
6149 /* process digits of the weight spec */
6150 errno = 0;
6151 wtmp = strtol(sep + 1, &badp, 10);
6152 if (errno != 0 || badp == sep + 1 || *badp != '\0')
6153 pg_fatal("invalid weight specification: %s", sep);
6154 if (wtmp > INT_MAX || wtmp < 0)
6155 pg_fatal("weight specification out of range (0 .. %d): %lld",
6156 INT_MAX, (long long) wtmp);
6157 weight = wtmp;
6159 else
6161 *script = pg_strdup(option);
6162 weight = 1;
6165 return weight;
6168 /* append a script to the list of scripts to process */
6169 static void
6170 addScript(const ParsedScript *script)
6172 if (script->commands == NULL || script->commands[0] == NULL)
6173 pg_fatal("empty command list for script \"%s\"", script->desc);
6175 if (num_scripts >= MAX_SCRIPTS)
6176 pg_fatal("at most %d SQL scripts are allowed", MAX_SCRIPTS);
6178 CheckConditional(script);
6180 sql_script[num_scripts] = *script;
6181 num_scripts++;
6185 * Print progress report.
6187 * On entry, *last and *last_report contain the statistics and time of last
6188 * progress report. On exit, they are updated with the new stats.
6190 static void
6191 printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
6192 StatsData *last, int64 *last_report)
6194 /* generate and show report */
6195 pg_time_usec_t run = now - *last_report;
6196 int64 cnt,
6197 failures,
6198 retried;
6199 double tps,
6200 total_run,
6201 latency,
6202 sqlat,
6203 lag,
6204 stdev;
6205 char tbuf[315];
6206 StatsData cur;
6209 * Add up the statistics of all threads.
6211 * XXX: No locking. There is no guarantee that we get an atomic snapshot
6212 * of the transaction count and latencies, so these figures can well be
6213 * off by a small amount. The progress report's purpose is to give a
6214 * quick overview of how the test is going, so that shouldn't matter too
6215 * much. (If a read from a 64-bit integer is not atomic, you might get a
6216 * "torn" read and completely bogus latencies though!)
6218 initStats(&cur, 0);
6219 for (int i = 0; i < nthreads; i++)
6221 mergeSimpleStats(&cur.latency, &threads[i].stats.latency);
6222 mergeSimpleStats(&cur.lag, &threads[i].stats.lag);
6223 cur.cnt += threads[i].stats.cnt;
6224 cur.skipped += threads[i].stats.skipped;
6225 cur.retries += threads[i].stats.retries;
6226 cur.retried += threads[i].stats.retried;
6227 cur.serialization_failures +=
6228 threads[i].stats.serialization_failures;
6229 cur.deadlock_failures += threads[i].stats.deadlock_failures;
6232 /* we count only actually executed transactions */
6233 cnt = cur.cnt - last->cnt;
6234 total_run = (now - test_start) / 1000000.0;
6235 tps = 1000000.0 * cnt / run;
6236 if (cnt > 0)
6238 latency = 0.001 * (cur.latency.sum - last->latency.sum) / cnt;
6239 sqlat = 1.0 * (cur.latency.sum2 - last->latency.sum2) / cnt;
6240 stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
6241 lag = 0.001 * (cur.lag.sum - last->lag.sum) / cnt;
6243 else
6245 latency = sqlat = stdev = lag = 0;
6247 failures = getFailures(&cur) - getFailures(last);
6248 retried = cur.retried - last->retried;
6250 if (progress_timestamp)
6252 snprintf(tbuf, sizeof(tbuf), "%.3f s",
6253 PG_TIME_GET_DOUBLE(now + epoch_shift));
6255 else
6257 /* round seconds are expected, but the thread may be late */
6258 snprintf(tbuf, sizeof(tbuf), "%.1f s", total_run);
6261 fprintf(stderr,
6262 "progress: %s, %.1f tps, lat %.3f ms stddev %.3f, " INT64_FORMAT " failed",
6263 tbuf, tps, latency, stdev, failures);
6265 if (throttle_delay)
6267 fprintf(stderr, ", lag %.3f ms", lag);
6268 if (latency_limit)
6269 fprintf(stderr, ", " INT64_FORMAT " skipped",
6270 cur.skipped - last->skipped);
6273 /* it can be non-zero only if max_tries is not equal to one */
6274 if (max_tries != 1)
6275 fprintf(stderr,
6276 ", " INT64_FORMAT " retried, " INT64_FORMAT " retries",
6277 retried, cur.retries - last->retries);
6278 fprintf(stderr, "\n");
6280 *last = cur;
6281 *last_report = now;
6284 static void
6285 printSimpleStats(const char *prefix, SimpleStats *ss)
6287 if (ss->count > 0)
6289 double latency = ss->sum / ss->count;
6290 double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
6292 printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
6293 printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
6297 /* print version banner */
6298 static void
6299 printVersion(PGconn *con)
6301 int server_ver = PQserverVersion(con);
6302 int client_ver = PG_VERSION_NUM;
6304 if (server_ver != client_ver)
6306 const char *server_version;
6307 char sverbuf[32];
6309 /* Try to get full text form, might include "devel" etc */
6310 server_version = PQparameterStatus(con, "server_version");
6311 /* Otherwise fall back on server_ver */
6312 if (!server_version)
6314 formatPGVersionNumber(server_ver, true,
6315 sverbuf, sizeof(sverbuf));
6316 server_version = sverbuf;
6319 printf(_("%s (%s, server %s)\n"),
6320 "pgbench", PG_VERSION, server_version);
6322 /* For version match, only print pgbench version */
6323 else
6324 printf("%s (%s)\n", "pgbench", PG_VERSION);
6325 fflush(stdout);
6328 /* print out results */
6329 static void
6330 printResults(StatsData *total,
6331 pg_time_usec_t total_duration, /* benchmarking time */
6332 pg_time_usec_t conn_total_duration, /* is_connect */
6333 pg_time_usec_t conn_elapsed_duration, /* !is_connect */
6334 int64 latency_late)
6336 /* tps is about actually executed transactions during benchmarking */
6337 int64 failures = getFailures(total);
6338 int64 total_cnt = total->cnt + total->skipped + failures;
6339 double bench_duration = PG_TIME_GET_DOUBLE(total_duration);
6340 double tps = total->cnt / bench_duration;
6342 /* Report test parameters. */
6343 printf("transaction type: %s\n",
6344 num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
6345 printf("scaling factor: %d\n", scale);
6346 /* only print partitioning information if some partitioning was detected */
6347 if (partition_method != PART_NONE)
6348 printf("partition method: %s\npartitions: %d\n",
6349 PARTITION_METHOD[partition_method], partitions);
6350 printf("query mode: %s\n", QUERYMODE[querymode]);
6351 printf("number of clients: %d\n", nclients);
6352 printf("number of threads: %d\n", nthreads);
6354 if (max_tries)
6355 printf("maximum number of tries: %u\n", max_tries);
6357 if (duration <= 0)
6359 printf("number of transactions per client: %d\n", nxacts);
6360 printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
6361 total->cnt, nxacts * nclients);
6363 else
6365 printf("duration: %d s\n", duration);
6366 printf("number of transactions actually processed: " INT64_FORMAT "\n",
6367 total->cnt);
6370 printf("number of failed transactions: " INT64_FORMAT " (%.3f%%)\n",
6371 failures, 100.0 * failures / total_cnt);
6373 if (failures_detailed)
6375 printf("number of serialization failures: " INT64_FORMAT " (%.3f%%)\n",
6376 total->serialization_failures,
6377 100.0 * total->serialization_failures / total_cnt);
6378 printf("number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n",
6379 total->deadlock_failures,
6380 100.0 * total->deadlock_failures / total_cnt);
6383 /* it can be non-zero only if max_tries is not equal to one */
6384 if (max_tries != 1)
6386 printf("number of transactions retried: " INT64_FORMAT " (%.3f%%)\n",
6387 total->retried, 100.0 * total->retried / total_cnt);
6388 printf("total number of retries: " INT64_FORMAT "\n", total->retries);
6391 /* Remaining stats are nonsensical if we failed to execute any xacts */
6392 if (total->cnt + total->skipped <= 0)
6393 return;
6395 if (throttle_delay && latency_limit)
6396 printf("number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
6397 total->skipped, 100.0 * total->skipped / total_cnt);
6399 if (latency_limit)
6400 printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f%%)\n",
6401 latency_limit / 1000.0, latency_late, total->cnt,
6402 (total->cnt > 0) ? 100.0 * latency_late / total->cnt : 0.0);
6404 if (throttle_delay || progress || latency_limit)
6405 printSimpleStats("latency", &total->latency);
6406 else
6408 /* no measurement, show average latency computed from run time */
6409 printf("latency average = %.3f ms%s\n",
6410 0.001 * total_duration * nclients / total_cnt,
6411 failures > 0 ? " (including failures)" : "");
6414 if (throttle_delay)
6417 * Report average transaction lag under rate limit throttling. This
6418 * is the delay between scheduled and actual start times for the
6419 * transaction. The measured lag may be caused by thread/client load,
6420 * the database load, or the Poisson throttling process.
6422 printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
6423 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
6427 * Under -C/--connect, each transaction incurs a significant connection
6428 * cost, it would not make much sense to ignore it in tps, and it would
6429 * not be tps anyway.
6431 * Otherwise connections are made just once at the beginning of the run
6432 * and should not impact performance but for very short run, so they are
6433 * (right)fully ignored in tps.
6435 if (is_connect)
6437 printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / (total->cnt + failures));
6438 printf("tps = %f (including reconnection times)\n", tps);
6440 else
6442 printf("initial connection time = %.3f ms\n", 0.001 * conn_elapsed_duration);
6443 printf("tps = %f (without initial connection time)\n", tps);
6446 /* Report per-script/command statistics */
6447 if (per_script_stats || report_per_command)
6449 int i;
6451 for (i = 0; i < num_scripts; i++)
6453 if (per_script_stats)
6455 StatsData *sstats = &sql_script[i].stats;
6456 int64 script_failures = getFailures(sstats);
6457 int64 script_total_cnt =
6458 sstats->cnt + sstats->skipped + script_failures;
6460 printf("SQL script %d: %s\n"
6461 " - weight: %d (targets %.1f%% of total)\n"
6462 " - " INT64_FORMAT " transactions (%.1f%% of total, tps = %f)\n",
6463 i + 1, sql_script[i].desc,
6464 sql_script[i].weight,
6465 100.0 * sql_script[i].weight / total_weight,
6466 sstats->cnt,
6467 100.0 * sstats->cnt / total->cnt,
6468 sstats->cnt / bench_duration);
6470 printf(" - number of failed transactions: " INT64_FORMAT " (%.3f%%)\n",
6471 script_failures,
6472 100.0 * script_failures / script_total_cnt);
6474 if (failures_detailed)
6476 printf(" - number of serialization failures: " INT64_FORMAT " (%.3f%%)\n",
6477 sstats->serialization_failures,
6478 (100.0 * sstats->serialization_failures /
6479 script_total_cnt));
6480 printf(" - number of deadlock failures: " INT64_FORMAT " (%.3f%%)\n",
6481 sstats->deadlock_failures,
6482 (100.0 * sstats->deadlock_failures /
6483 script_total_cnt));
6486 /* it can be non-zero only if max_tries is not equal to one */
6487 if (max_tries != 1)
6489 printf(" - number of transactions retried: " INT64_FORMAT " (%.3f%%)\n",
6490 sstats->retried,
6491 100.0 * sstats->retried / script_total_cnt);
6492 printf(" - total number of retries: " INT64_FORMAT "\n",
6493 sstats->retries);
6496 if (throttle_delay && latency_limit && script_total_cnt > 0)
6497 printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
6498 sstats->skipped,
6499 100.0 * sstats->skipped / script_total_cnt);
6501 printSimpleStats(" - latency", &sstats->latency);
6505 * Report per-command statistics: latencies, retries after errors,
6506 * failures (errors without retrying).
6508 if (report_per_command)
6510 Command **commands;
6512 printf("%sstatement latencies in milliseconds%s:\n",
6513 per_script_stats ? " - " : "",
6514 (max_tries == 1 ?
6515 " and failures" :
6516 ", failures and retries"));
6518 for (commands = sql_script[i].commands;
6519 *commands != NULL;
6520 commands++)
6522 SimpleStats *cstats = &(*commands)->stats;
6524 if (max_tries == 1)
6525 printf(" %11.3f %10" INT64_MODIFIER "d %s\n",
6526 (cstats->count > 0) ?
6527 1000.0 * cstats->sum / cstats->count : 0.0,
6528 (*commands)->failures,
6529 (*commands)->first_line);
6530 else
6531 printf(" %11.3f %10" INT64_MODIFIER "d %10" INT64_MODIFIER "d %s\n",
6532 (cstats->count > 0) ?
6533 1000.0 * cstats->sum / cstats->count : 0.0,
6534 (*commands)->failures,
6535 (*commands)->retries,
6536 (*commands)->first_line);
6544 * Set up a random seed according to seed parameter (NULL means default),
6545 * and initialize base_random_sequence for use in initializing other sequences.
6547 static bool
6548 set_random_seed(const char *seed)
6550 uint64 iseed;
6552 if (seed == NULL || strcmp(seed, "time") == 0)
6554 /* rely on current time */
6555 iseed = pg_time_now();
6557 else if (strcmp(seed, "rand") == 0)
6559 /* use some "strong" random source */
6560 if (!pg_strong_random(&iseed, sizeof(iseed)))
6562 pg_log_error("could not generate random seed");
6563 return false;
6566 else
6568 /* parse unsigned-int seed value */
6569 unsigned long ulseed;
6570 char garbage;
6572 /* Don't try to use UINT64_FORMAT here; it might not work for sscanf */
6573 if (sscanf(seed, "%lu%c", &ulseed, &garbage) != 1)
6575 pg_log_error("unrecognized random seed option \"%s\"", seed);
6576 pg_log_error_detail("Expecting an unsigned integer, \"time\" or \"rand\".");
6577 return false;
6579 iseed = (uint64) ulseed;
6582 if (seed != NULL)
6583 pg_log_info("setting random seed to %llu", (unsigned long long) iseed);
6585 random_seed = iseed;
6587 /* Initialize base_random_sequence using seed */
6588 pg_prng_seed(&base_random_sequence, (uint64) iseed);
6590 return true;
6594 main(int argc, char **argv)
6596 static struct option long_options[] = {
6597 /* systematic long/short named options */
6598 {"builtin", required_argument, NULL, 'b'},
6599 {"client", required_argument, NULL, 'c'},
6600 {"connect", no_argument, NULL, 'C'},
6601 {"debug", no_argument, NULL, 'd'},
6602 {"define", required_argument, NULL, 'D'},
6603 {"file", required_argument, NULL, 'f'},
6604 {"fillfactor", required_argument, NULL, 'F'},
6605 {"host", required_argument, NULL, 'h'},
6606 {"initialize", no_argument, NULL, 'i'},
6607 {"init-steps", required_argument, NULL, 'I'},
6608 {"jobs", required_argument, NULL, 'j'},
6609 {"log", no_argument, NULL, 'l'},
6610 {"latency-limit", required_argument, NULL, 'L'},
6611 {"no-vacuum", no_argument, NULL, 'n'},
6612 {"port", required_argument, NULL, 'p'},
6613 {"progress", required_argument, NULL, 'P'},
6614 {"protocol", required_argument, NULL, 'M'},
6615 {"quiet", no_argument, NULL, 'q'},
6616 {"report-per-command", no_argument, NULL, 'r'},
6617 {"rate", required_argument, NULL, 'R'},
6618 {"scale", required_argument, NULL, 's'},
6619 {"select-only", no_argument, NULL, 'S'},
6620 {"skip-some-updates", no_argument, NULL, 'N'},
6621 {"time", required_argument, NULL, 'T'},
6622 {"transactions", required_argument, NULL, 't'},
6623 {"username", required_argument, NULL, 'U'},
6624 {"vacuum-all", no_argument, NULL, 'v'},
6625 /* long-named only options */
6626 {"unlogged-tables", no_argument, NULL, 1},
6627 {"tablespace", required_argument, NULL, 2},
6628 {"index-tablespace", required_argument, NULL, 3},
6629 {"sampling-rate", required_argument, NULL, 4},
6630 {"aggregate-interval", required_argument, NULL, 5},
6631 {"progress-timestamp", no_argument, NULL, 6},
6632 {"log-prefix", required_argument, NULL, 7},
6633 {"foreign-keys", no_argument, NULL, 8},
6634 {"random-seed", required_argument, NULL, 9},
6635 {"show-script", required_argument, NULL, 10},
6636 {"partitions", required_argument, NULL, 11},
6637 {"partition-method", required_argument, NULL, 12},
6638 {"failures-detailed", no_argument, NULL, 13},
6639 {"max-tries", required_argument, NULL, 14},
6640 {"verbose-errors", no_argument, NULL, 15},
6641 {"exit-on-abort", no_argument, NULL, 16},
6642 {NULL, 0, NULL, 0}
6645 int c;
6646 bool is_init_mode = false; /* initialize mode? */
6647 char *initialize_steps = NULL;
6648 bool foreign_keys = false;
6649 bool is_no_vacuum = false;
6650 bool do_vacuum_accounts = false; /* vacuum accounts table? */
6651 int optindex;
6652 bool scale_given = false;
6654 bool benchmarking_option_set = false;
6655 bool initialization_option_set = false;
6656 bool internal_script_used = false;
6658 CState *state; /* status of clients */
6659 TState *threads; /* array of thread */
6661 pg_time_usec_t
6662 start_time, /* start up time */
6663 bench_start = 0, /* first recorded benchmarking time */
6664 conn_total_duration; /* cumulated connection time in
6665 * threads */
6666 int64 latency_late = 0;
6667 StatsData stats;
6668 int weight;
6670 int i;
6671 int nclients_dealt;
6673 #ifdef HAVE_GETRLIMIT
6674 struct rlimit rlim;
6675 #endif
6677 PGconn *con;
6678 char *env;
6680 int exit_code = 0;
6681 struct timeval tv;
6684 * Record difference between Unix time and instr_time time. We'll use
6685 * this for logging and aggregation.
6687 gettimeofday(&tv, NULL);
6688 epoch_shift = tv.tv_sec * INT64CONST(1000000) + tv.tv_usec - pg_time_now();
6690 pg_logging_init(argv[0]);
6691 progname = get_progname(argv[0]);
6693 if (argc > 1)
6695 if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
6697 usage();
6698 exit(0);
6700 if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
6702 puts("pgbench (PostgreSQL) " PG_VERSION);
6703 exit(0);
6707 state = (CState *) pg_malloc0(sizeof(CState));
6709 /* set random seed early, because it may be used while parsing scripts. */
6710 if (!set_random_seed(getenv("PGBENCH_RANDOM_SEED")))
6711 pg_fatal("error while setting random seed from PGBENCH_RANDOM_SEED environment variable");
6713 while ((c = getopt_long(argc, argv, "b:c:CdD:f:F:h:iI:j:lL:M:nNp:P:qrR:s:St:T:U:v", long_options, &optindex)) != -1)
6715 char *script;
6717 switch (c)
6719 case 'b':
6720 if (strcmp(optarg, "list") == 0)
6722 listAvailableScripts();
6723 exit(0);
6725 weight = parseScriptWeight(optarg, &script);
6726 process_builtin(findBuiltin(script), weight);
6727 benchmarking_option_set = true;
6728 internal_script_used = true;
6729 break;
6730 case 'c':
6731 benchmarking_option_set = true;
6732 if (!option_parse_int(optarg, "-c/--clients", 1, INT_MAX,
6733 &nclients))
6735 exit(1);
6737 #ifdef HAVE_GETRLIMIT
6738 if (getrlimit(RLIMIT_NOFILE, &rlim) == -1)
6739 pg_fatal("getrlimit failed: %m");
6740 if (rlim.rlim_cur < nclients + 3)
6742 pg_log_error("need at least %d open files, but system limit is %ld",
6743 nclients + 3, (long) rlim.rlim_cur);
6744 pg_log_error_hint("Reduce number of clients, or use limit/ulimit to increase the system limit.");
6745 exit(1);
6747 #endif /* HAVE_GETRLIMIT */
6748 break;
6749 case 'C':
6750 benchmarking_option_set = true;
6751 is_connect = true;
6752 break;
6753 case 'd':
6754 pg_logging_increase_verbosity();
6755 break;
6756 case 'D':
6758 char *p;
6760 benchmarking_option_set = true;
6762 if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0')
6763 pg_fatal("invalid variable definition: \"%s\"", optarg);
6765 *p++ = '\0';
6766 if (!putVariable(&state[0].variables, "option", optarg, p))
6767 exit(1);
6769 break;
6770 case 'f':
6771 weight = parseScriptWeight(optarg, &script);
6772 process_file(script, weight);
6773 benchmarking_option_set = true;
6774 break;
6775 case 'F':
6776 initialization_option_set = true;
6777 if (!option_parse_int(optarg, "-F/--fillfactor", 10, 100,
6778 &fillfactor))
6779 exit(1);
6780 break;
6781 case 'h':
6782 pghost = pg_strdup(optarg);
6783 break;
6784 case 'i':
6785 is_init_mode = true;
6786 break;
6787 case 'I':
6788 pg_free(initialize_steps);
6789 initialize_steps = pg_strdup(optarg);
6790 checkInitSteps(initialize_steps);
6791 initialization_option_set = true;
6792 break;
6793 case 'j': /* jobs */
6794 benchmarking_option_set = true;
6795 if (!option_parse_int(optarg, "-j/--jobs", 1, INT_MAX,
6796 &nthreads))
6798 exit(1);
6800 break;
6801 case 'l':
6802 benchmarking_option_set = true;
6803 use_log = true;
6804 break;
6805 case 'L':
6807 double limit_ms = atof(optarg);
6809 if (limit_ms <= 0.0)
6810 pg_fatal("invalid latency limit: \"%s\"", optarg);
6811 benchmarking_option_set = true;
6812 latency_limit = (int64) (limit_ms * 1000);
6814 break;
6815 case 'M':
6816 benchmarking_option_set = true;
6817 for (querymode = 0; querymode < NUM_QUERYMODE; querymode++)
6818 if (strcmp(optarg, QUERYMODE[querymode]) == 0)
6819 break;
6820 if (querymode >= NUM_QUERYMODE)
6821 pg_fatal("invalid query mode (-M): \"%s\"", optarg);
6822 break;
6823 case 'n':
6824 is_no_vacuum = true;
6825 break;
6826 case 'N':
6827 process_builtin(findBuiltin("simple-update"), 1);
6828 benchmarking_option_set = true;
6829 internal_script_used = true;
6830 break;
6831 case 'p':
6832 pgport = pg_strdup(optarg);
6833 break;
6834 case 'P':
6835 benchmarking_option_set = true;
6836 if (!option_parse_int(optarg, "-P/--progress", 1, INT_MAX,
6837 &progress))
6838 exit(1);
6839 break;
6840 case 'q':
6841 initialization_option_set = true;
6842 use_quiet = true;
6843 break;
6844 case 'r':
6845 benchmarking_option_set = true;
6846 report_per_command = true;
6847 break;
6848 case 'R':
6850 /* get a double from the beginning of option value */
6851 double throttle_value = atof(optarg);
6853 benchmarking_option_set = true;
6855 if (throttle_value <= 0.0)
6856 pg_fatal("invalid rate limit: \"%s\"", optarg);
6857 /* Invert rate limit into per-transaction delay in usec */
6858 throttle_delay = 1000000.0 / throttle_value;
6860 break;
6861 case 's':
6862 scale_given = true;
6863 if (!option_parse_int(optarg, "-s/--scale", 1, INT_MAX,
6864 &scale))
6865 exit(1);
6866 break;
6867 case 'S':
6868 process_builtin(findBuiltin("select-only"), 1);
6869 benchmarking_option_set = true;
6870 internal_script_used = true;
6871 break;
6872 case 't':
6873 benchmarking_option_set = true;
6874 if (!option_parse_int(optarg, "-t/--transactions", 1, INT_MAX,
6875 &nxacts))
6876 exit(1);
6877 break;
6878 case 'T':
6879 benchmarking_option_set = true;
6880 if (!option_parse_int(optarg, "-T/--time", 1, INT_MAX,
6881 &duration))
6882 exit(1);
6883 break;
6884 case 'U':
6885 username = pg_strdup(optarg);
6886 break;
6887 case 'v':
6888 benchmarking_option_set = true;
6889 do_vacuum_accounts = true;
6890 break;
6891 case 1: /* unlogged-tables */
6892 initialization_option_set = true;
6893 unlogged_tables = true;
6894 break;
6895 case 2: /* tablespace */
6896 initialization_option_set = true;
6897 tablespace = pg_strdup(optarg);
6898 break;
6899 case 3: /* index-tablespace */
6900 initialization_option_set = true;
6901 index_tablespace = pg_strdup(optarg);
6902 break;
6903 case 4: /* sampling-rate */
6904 benchmarking_option_set = true;
6905 sample_rate = atof(optarg);
6906 if (sample_rate <= 0.0 || sample_rate > 1.0)
6907 pg_fatal("invalid sampling rate: \"%s\"", optarg);
6908 break;
6909 case 5: /* aggregate-interval */
6910 benchmarking_option_set = true;
6911 if (!option_parse_int(optarg, "--aggregate-interval", 1, INT_MAX,
6912 &agg_interval))
6913 exit(1);
6914 break;
6915 case 6: /* progress-timestamp */
6916 progress_timestamp = true;
6917 benchmarking_option_set = true;
6918 break;
6919 case 7: /* log-prefix */
6920 benchmarking_option_set = true;
6921 logfile_prefix = pg_strdup(optarg);
6922 break;
6923 case 8: /* foreign-keys */
6924 initialization_option_set = true;
6925 foreign_keys = true;
6926 break;
6927 case 9: /* random-seed */
6928 benchmarking_option_set = true;
6929 if (!set_random_seed(optarg))
6930 pg_fatal("error while setting random seed from --random-seed option");
6931 break;
6932 case 10: /* list */
6934 const BuiltinScript *s = findBuiltin(optarg);
6936 fprintf(stderr, "-- %s: %s\n%s\n", s->name, s->desc, s->script);
6937 exit(0);
6939 break;
6940 case 11: /* partitions */
6941 initialization_option_set = true;
6942 if (!option_parse_int(optarg, "--partitions", 0, INT_MAX,
6943 &partitions))
6944 exit(1);
6945 break;
6946 case 12: /* partition-method */
6947 initialization_option_set = true;
6948 if (pg_strcasecmp(optarg, "range") == 0)
6949 partition_method = PART_RANGE;
6950 else if (pg_strcasecmp(optarg, "hash") == 0)
6951 partition_method = PART_HASH;
6952 else
6953 pg_fatal("invalid partition method, expecting \"range\" or \"hash\", got: \"%s\"",
6954 optarg);
6955 break;
6956 case 13: /* failures-detailed */
6957 benchmarking_option_set = true;
6958 failures_detailed = true;
6959 break;
6960 case 14: /* max-tries */
6962 int32 max_tries_arg = atoi(optarg);
6964 if (max_tries_arg < 0)
6965 pg_fatal("invalid number of maximum tries: \"%s\"", optarg);
6967 benchmarking_option_set = true;
6968 max_tries = (uint32) max_tries_arg;
6970 break;
6971 case 15: /* verbose-errors */
6972 benchmarking_option_set = true;
6973 verbose_errors = true;
6974 break;
6975 case 16: /* exit-on-abort */
6976 benchmarking_option_set = true;
6977 exit_on_abort = true;
6978 break;
6979 default:
6980 /* getopt_long already emitted a complaint */
6981 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
6982 exit(1);
6986 /* set default script if none */
6987 if (num_scripts == 0 && !is_init_mode)
6989 process_builtin(findBuiltin("tpcb-like"), 1);
6990 benchmarking_option_set = true;
6991 internal_script_used = true;
6994 /* complete SQL command initialization and compute total weight */
6995 for (i = 0; i < num_scripts; i++)
6997 Command **commands = sql_script[i].commands;
6999 for (int j = 0; commands[j] != NULL; j++)
7000 if (commands[j]->type == SQL_COMMAND)
7001 postprocess_sql_command(commands[j]);
7003 /* cannot overflow: weight is 32b, total_weight 64b */
7004 total_weight += sql_script[i].weight;
7007 if (total_weight == 0 && !is_init_mode)
7008 pg_fatal("total script weight must not be zero");
7010 /* show per script stats if several scripts are used */
7011 if (num_scripts > 1)
7012 per_script_stats = true;
7015 * Don't need more threads than there are clients. (This is not merely an
7016 * optimization; throttle_delay is calculated incorrectly below if some
7017 * threads have no clients assigned to them.)
7019 if (nthreads > nclients)
7020 nthreads = nclients;
7023 * Convert throttle_delay to a per-thread delay time. Note that this
7024 * might be a fractional number of usec, but that's OK, since it's just
7025 * the center of a Poisson distribution of delays.
7027 throttle_delay *= nthreads;
7029 if (argc > optind)
7030 dbName = argv[optind++];
7031 else
7033 if ((env = getenv("PGDATABASE")) != NULL && *env != '\0')
7034 dbName = env;
7035 else if ((env = getenv("PGUSER")) != NULL && *env != '\0')
7036 dbName = env;
7037 else
7038 dbName = get_user_name_or_exit(progname);
7041 if (optind < argc)
7043 pg_log_error("too many command-line arguments (first is \"%s\")",
7044 argv[optind]);
7045 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
7046 exit(1);
7049 if (is_init_mode)
7051 if (benchmarking_option_set)
7052 pg_fatal("some of the specified options cannot be used in initialization (-i) mode");
7054 if (partitions == 0 && partition_method != PART_NONE)
7055 pg_fatal("--partition-method requires greater than zero --partitions");
7057 /* set default method */
7058 if (partitions > 0 && partition_method == PART_NONE)
7059 partition_method = PART_RANGE;
7061 if (initialize_steps == NULL)
7062 initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
7064 if (is_no_vacuum)
7066 /* Remove any vacuum step in initialize_steps */
7067 char *p;
7069 while ((p = strchr(initialize_steps, 'v')) != NULL)
7070 *p = ' ';
7073 if (foreign_keys)
7075 /* Add 'f' to end of initialize_steps, if not already there */
7076 if (strchr(initialize_steps, 'f') == NULL)
7078 initialize_steps = (char *)
7079 pg_realloc(initialize_steps,
7080 strlen(initialize_steps) + 2);
7081 strcat(initialize_steps, "f");
7085 runInitSteps(initialize_steps);
7086 exit(0);
7088 else
7090 if (initialization_option_set)
7091 pg_fatal("some of the specified options cannot be used in benchmarking mode");
7094 if (nxacts > 0 && duration > 0)
7095 pg_fatal("specify either a number of transactions (-t) or a duration (-T), not both");
7097 /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */
7098 if (nxacts <= 0 && duration <= 0)
7099 nxacts = DEFAULT_NXACTS;
7101 /* --sampling-rate may be used only with -l */
7102 if (sample_rate > 0.0 && !use_log)
7103 pg_fatal("log sampling (--sampling-rate) is allowed only when logging transactions (-l)");
7105 /* --sampling-rate may not be used with --aggregate-interval */
7106 if (sample_rate > 0.0 && agg_interval > 0)
7107 pg_fatal("log sampling (--sampling-rate) and aggregation (--aggregate-interval) cannot be used at the same time");
7109 if (agg_interval > 0 && !use_log)
7110 pg_fatal("log aggregation is allowed only when actually logging transactions");
7112 if (!use_log && logfile_prefix)
7113 pg_fatal("log file prefix (--log-prefix) is allowed only when logging transactions (-l)");
7115 if (duration > 0 && agg_interval > duration)
7116 pg_fatal("number of seconds for aggregation (%d) must not be higher than test duration (%d)", agg_interval, duration);
7118 if (duration > 0 && agg_interval > 0 && duration % agg_interval != 0)
7119 pg_fatal("duration (%d) must be a multiple of aggregation interval (%d)", duration, agg_interval);
7121 if (progress_timestamp && progress == 0)
7122 pg_fatal("--progress-timestamp is allowed only under --progress");
7124 if (!max_tries)
7126 if (!latency_limit && duration <= 0)
7127 pg_fatal("an unlimited number of transaction tries can only be used with --latency-limit or a duration (-T)");
7131 * save main process id in the global variable because process id will be
7132 * changed after fork.
7134 main_pid = (int) getpid();
7136 if (nclients > 1)
7138 state = (CState *) pg_realloc(state, sizeof(CState) * nclients);
7139 memset(state + 1, 0, sizeof(CState) * (nclients - 1));
7141 /* copy any -D switch values to all clients */
7142 for (i = 1; i < nclients; i++)
7144 int j;
7146 state[i].id = i;
7147 for (j = 0; j < state[0].variables.nvars; j++)
7149 Variable *var = &state[0].variables.vars[j];
7151 if (var->value.type != PGBT_NO_VALUE)
7153 if (!putVariableValue(&state[i].variables, "startup",
7154 var->name, &var->value))
7155 exit(1);
7157 else
7159 if (!putVariable(&state[i].variables, "startup",
7160 var->name, var->svalue))
7161 exit(1);
7167 /* other CState initializations */
7168 for (i = 0; i < nclients; i++)
7170 state[i].cstack = conditional_stack_create();
7171 initRandomState(&state[i].cs_func_rs);
7174 /* opening connection... */
7175 con = doConnect();
7176 if (con == NULL)
7177 pg_fatal("could not create connection for setup");
7179 /* report pgbench and server versions */
7180 printVersion(con);
7182 pg_log_debug("pghost: %s pgport: %s nclients: %d %s: %d dbName: %s",
7183 PQhost(con), PQport(con), nclients,
7184 duration <= 0 ? "nxacts" : "duration",
7185 duration <= 0 ? nxacts : duration, PQdb(con));
7187 if (internal_script_used)
7188 GetTableInfo(con, scale_given);
7191 * :scale variables normally get -s or database scale, but don't override
7192 * an explicit -D switch
7194 if (lookupVariable(&state[0].variables, "scale") == NULL)
7196 for (i = 0; i < nclients; i++)
7198 if (!putVariableInt(&state[i].variables, "startup", "scale", scale))
7199 exit(1);
7204 * Define a :client_id variable that is unique per connection. But don't
7205 * override an explicit -D switch.
7207 if (lookupVariable(&state[0].variables, "client_id") == NULL)
7209 for (i = 0; i < nclients; i++)
7210 if (!putVariableInt(&state[i].variables, "startup", "client_id", i))
7211 exit(1);
7214 /* set default seed for hash functions */
7215 if (lookupVariable(&state[0].variables, "default_seed") == NULL)
7217 uint64 seed = pg_prng_uint64(&base_random_sequence);
7219 for (i = 0; i < nclients; i++)
7220 if (!putVariableInt(&state[i].variables, "startup", "default_seed",
7221 (int64) seed))
7222 exit(1);
7225 /* set random seed unless overwritten */
7226 if (lookupVariable(&state[0].variables, "random_seed") == NULL)
7228 for (i = 0; i < nclients; i++)
7229 if (!putVariableInt(&state[i].variables, "startup", "random_seed",
7230 random_seed))
7231 exit(1);
7234 if (!is_no_vacuum)
7236 fprintf(stderr, "starting vacuum...");
7237 tryExecuteStatement(con, "vacuum pgbench_branches");
7238 tryExecuteStatement(con, "vacuum pgbench_tellers");
7239 tryExecuteStatement(con, "truncate pgbench_history");
7240 fprintf(stderr, "end.\n");
7242 if (do_vacuum_accounts)
7244 fprintf(stderr, "starting vacuum pgbench_accounts...");
7245 tryExecuteStatement(con, "vacuum analyze pgbench_accounts");
7246 fprintf(stderr, "end.\n");
7249 PQfinish(con);
7251 /* set up thread data structures */
7252 threads = (TState *) pg_malloc(sizeof(TState) * nthreads);
7253 nclients_dealt = 0;
7255 for (i = 0; i < nthreads; i++)
7257 TState *thread = &threads[i];
7259 thread->tid = i;
7260 thread->state = &state[nclients_dealt];
7261 thread->nstate =
7262 (nclients - nclients_dealt + nthreads - i - 1) / (nthreads - i);
7263 initRandomState(&thread->ts_choose_rs);
7264 initRandomState(&thread->ts_throttle_rs);
7265 initRandomState(&thread->ts_sample_rs);
7266 thread->logfile = NULL; /* filled in later */
7267 thread->latency_late = 0;
7268 initStats(&thread->stats, 0);
7270 nclients_dealt += thread->nstate;
7273 /* all clients must be assigned to a thread */
7274 Assert(nclients_dealt == nclients);
7276 /* get start up time for the whole computation */
7277 start_time = pg_time_now();
7279 /* set alarm if duration is specified. */
7280 if (duration > 0)
7281 setalarm(duration);
7283 errno = THREAD_BARRIER_INIT(&barrier, nthreads);
7284 if (errno != 0)
7285 pg_fatal("could not initialize barrier: %m");
7287 /* start all threads but thread 0 which is executed directly later */
7288 for (i = 1; i < nthreads; i++)
7290 TState *thread = &threads[i];
7292 thread->create_time = pg_time_now();
7293 errno = THREAD_CREATE(&thread->thread, threadRun, thread);
7295 if (errno != 0)
7296 pg_fatal("could not create thread: %m");
7299 /* compute when to stop */
7300 threads[0].create_time = pg_time_now();
7301 if (duration > 0)
7302 end_time = threads[0].create_time + (int64) 1000000 * duration;
7304 /* run thread 0 directly */
7305 (void) threadRun(&threads[0]);
7307 /* wait for other threads and accumulate results */
7308 initStats(&stats, 0);
7309 conn_total_duration = 0;
7311 for (i = 0; i < nthreads; i++)
7313 TState *thread = &threads[i];
7315 if (i > 0)
7316 THREAD_JOIN(thread->thread);
7318 for (int j = 0; j < thread->nstate; j++)
7319 if (thread->state[j].state != CSTATE_FINISHED)
7320 exit_code = 2;
7322 /* aggregate thread level stats */
7323 mergeSimpleStats(&stats.latency, &thread->stats.latency);
7324 mergeSimpleStats(&stats.lag, &thread->stats.lag);
7325 stats.cnt += thread->stats.cnt;
7326 stats.skipped += thread->stats.skipped;
7327 stats.retries += thread->stats.retries;
7328 stats.retried += thread->stats.retried;
7329 stats.serialization_failures += thread->stats.serialization_failures;
7330 stats.deadlock_failures += thread->stats.deadlock_failures;
7331 latency_late += thread->latency_late;
7332 conn_total_duration += thread->conn_duration;
7334 /* first recorded benchmarking start time */
7335 if (bench_start == 0 || thread->bench_start < bench_start)
7336 bench_start = thread->bench_start;
7340 * All connections should be already closed in threadRun(), so this
7341 * disconnect_all() will be a no-op, but clean up the connections just to
7342 * be sure. We don't need to measure the disconnection delays here.
7344 disconnect_all(state, nclients);
7347 * Beware that performance of short benchmarks with many threads and
7348 * possibly long transactions can be deceptive because threads do not
7349 * start and finish at the exact same time. The total duration computed
7350 * here encompasses all transactions so that tps shown is somehow slightly
7351 * underestimated.
7353 printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
7354 bench_start - start_time, latency_late);
7356 THREAD_BARRIER_DESTROY(&barrier);
7358 if (exit_code != 0)
7359 pg_log_error("Run was aborted; the above results are incomplete.");
7361 return exit_code;
7364 static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
7365 threadRun(void *arg)
7367 TState *thread = (TState *) arg;
7368 CState *state = thread->state;
7369 pg_time_usec_t start;
7370 int nstate = thread->nstate;
7371 int remains = nstate; /* number of remaining clients */
7372 socket_set *sockets = alloc_socket_set(nstate);
7373 int64 thread_start,
7374 last_report,
7375 next_report;
7376 StatsData last,
7377 aggs;
7379 /* open log file if requested */
7380 if (use_log)
7382 char logpath[MAXPGPATH];
7383 char *prefix = logfile_prefix ? logfile_prefix : "pgbench_log";
7385 if (thread->tid == 0)
7386 snprintf(logpath, sizeof(logpath), "%s.%d", prefix, main_pid);
7387 else
7388 snprintf(logpath, sizeof(logpath), "%s.%d.%d", prefix, main_pid, thread->tid);
7390 thread->logfile = fopen(logpath, "w");
7392 if (thread->logfile == NULL)
7393 pg_fatal("could not open logfile \"%s\": %m", logpath);
7396 /* explicitly initialize the state machines */
7397 for (int i = 0; i < nstate; i++)
7398 state[i].state = CSTATE_CHOOSE_SCRIPT;
7400 /* READY */
7401 THREAD_BARRIER_WAIT(&barrier);
7403 thread_start = pg_time_now();
7404 thread->started_time = thread_start;
7405 thread->conn_duration = 0;
7406 last_report = thread_start;
7407 next_report = last_report + (int64) 1000000 * progress;
7409 /* STEADY */
7410 if (!is_connect)
7412 /* make connections to the database before starting */
7413 for (int i = 0; i < nstate; i++)
7415 if ((state[i].con = doConnect()) == NULL)
7417 /* coldly abort on initial connection failure */
7418 pg_fatal("could not create connection for client %d",
7419 state[i].id);
7424 /* GO */
7425 THREAD_BARRIER_WAIT(&barrier);
7427 start = pg_time_now();
7428 thread->bench_start = start;
7429 thread->throttle_trigger = start;
7432 * The log format currently has Unix epoch timestamps with whole numbers
7433 * of seconds. Round the first aggregate's start time down to the nearest
7434 * Unix epoch second (the very first aggregate might really have started a
7435 * fraction of a second later, but later aggregates are measured from the
7436 * whole number time that is actually logged).
7438 initStats(&aggs, (start + epoch_shift) / 1000000 * 1000000);
7439 last = aggs;
7441 /* loop till all clients have terminated */
7442 while (remains > 0)
7444 int nsocks; /* number of sockets to be waited for */
7445 pg_time_usec_t min_usec;
7446 pg_time_usec_t now = 0; /* set this only if needed */
7449 * identify which client sockets should be checked for input, and
7450 * compute the nearest time (if any) at which we need to wake up.
7452 clear_socket_set(sockets);
7453 nsocks = 0;
7454 min_usec = PG_INT64_MAX;
7455 for (int i = 0; i < nstate; i++)
7457 CState *st = &state[i];
7459 if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
7461 /* a nap from the script, or under throttling */
7462 pg_time_usec_t this_usec;
7464 /* get current time if needed */
7465 pg_time_now_lazy(&now);
7467 /* min_usec should be the minimum delay across all clients */
7468 this_usec = (st->state == CSTATE_SLEEP ?
7469 st->sleep_until : st->txn_scheduled) - now;
7470 if (min_usec > this_usec)
7471 min_usec = this_usec;
7473 else if (st->state == CSTATE_WAIT_RESULT ||
7474 st->state == CSTATE_WAIT_ROLLBACK_RESULT)
7477 * waiting for result from server - nothing to do unless the
7478 * socket is readable
7480 int sock = PQsocket(st->con);
7482 if (sock < 0)
7484 pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
7485 goto done;
7488 add_socket_to_set(sockets, sock, nsocks++);
7490 else if (st->state != CSTATE_ABORTED &&
7491 st->state != CSTATE_FINISHED)
7494 * This client thread is ready to do something, so we don't
7495 * want to wait. No need to examine additional clients.
7497 min_usec = 0;
7498 break;
7502 /* also wake up to print the next progress report on time */
7503 if (progress && min_usec > 0 && thread->tid == 0)
7505 pg_time_now_lazy(&now);
7507 if (now >= next_report)
7508 min_usec = 0;
7509 else if ((next_report - now) < min_usec)
7510 min_usec = next_report - now;
7514 * If no clients are ready to execute actions, sleep until we receive
7515 * data on some client socket or the timeout (if any) elapses.
7517 if (min_usec > 0)
7519 int rc = 0;
7521 if (min_usec != PG_INT64_MAX)
7523 if (nsocks > 0)
7525 rc = wait_on_socket_set(sockets, min_usec);
7527 else /* nothing active, simple sleep */
7529 pg_usleep(min_usec);
7532 else /* no explicit delay, wait without timeout */
7534 rc = wait_on_socket_set(sockets, 0);
7537 if (rc < 0)
7539 if (errno == EINTR)
7541 /* On EINTR, go back to top of loop */
7542 continue;
7544 /* must be something wrong */
7545 pg_log_error("%s() failed: %m", SOCKET_WAIT_METHOD);
7546 goto done;
7549 else
7551 /* min_usec <= 0, i.e. something needs to be executed now */
7553 /* If we didn't wait, don't try to read any data */
7554 clear_socket_set(sockets);
7557 /* ok, advance the state machine of each connection */
7558 nsocks = 0;
7559 for (int i = 0; i < nstate; i++)
7561 CState *st = &state[i];
7563 if (st->state == CSTATE_WAIT_RESULT ||
7564 st->state == CSTATE_WAIT_ROLLBACK_RESULT)
7566 /* don't call advanceConnectionState unless data is available */
7567 int sock = PQsocket(st->con);
7569 if (sock < 0)
7571 pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
7572 goto done;
7575 if (!socket_has_input(sockets, sock, nsocks++))
7576 continue;
7578 else if (st->state == CSTATE_FINISHED ||
7579 st->state == CSTATE_ABORTED)
7581 /* this client is done, no need to consider it anymore */
7582 continue;
7585 advanceConnectionState(thread, st, &aggs);
7588 * If --exit-on-abort is used, the program is going to exit when
7589 * any client is aborted.
7591 if (exit_on_abort && st->state == CSTATE_ABORTED)
7592 goto done;
7595 * If advanceConnectionState changed client to finished state,
7596 * that's one fewer client that remains.
7598 else if (st->state == CSTATE_FINISHED ||
7599 st->state == CSTATE_ABORTED)
7600 remains--;
7603 /* progress report is made by thread 0 for all threads */
7604 if (progress && thread->tid == 0)
7606 pg_time_usec_t now2 = pg_time_now();
7608 if (now2 >= next_report)
7611 * Horrible hack: this relies on the thread pointer we are
7612 * passed to be equivalent to threads[0], that is the first
7613 * entry of the threads array. That is why this MUST be done
7614 * by thread 0 and not any other.
7616 printProgressReport(thread, thread_start, now2,
7617 &last, &last_report);
7620 * Ensure that the next report is in the future, in case
7621 * pgbench/postgres got stuck somewhere.
7625 next_report += (int64) 1000000 * progress;
7626 } while (now2 >= next_report);
7631 done:
7632 if (exit_on_abort)
7635 * Abort if any client is not finished, meaning some error occurred.
7637 for (int i = 0; i < nstate; i++)
7639 if (state[i].state != CSTATE_FINISHED)
7641 pg_log_error("Run was aborted due to an error in thread %d",
7642 thread->tid);
7643 exit(2);
7648 disconnect_all(state, nstate);
7650 if (thread->logfile)
7652 if (agg_interval > 0)
7654 /* log aggregated but not yet reported transactions */
7655 doLog(thread, state, &aggs, false, 0, 0);
7657 fclose(thread->logfile);
7658 thread->logfile = NULL;
7660 free_socket_set(sockets);
7661 THREAD_FUNC_RETURN;
7664 static void
7665 finishCon(CState *st)
7667 if (st->con != NULL)
7669 PQfinish(st->con);
7670 st->con = NULL;
7675 * Support for duration option: set timer_exceeded after so many seconds.
7678 #ifndef WIN32
7680 static void
7681 handle_sig_alarm(SIGNAL_ARGS)
7683 timer_exceeded = true;
7686 static void
7687 setalarm(int seconds)
7689 pqsignal(SIGALRM, handle_sig_alarm);
7690 alarm(seconds);
7693 #else /* WIN32 */
7695 static VOID CALLBACK
7696 win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
7698 timer_exceeded = true;
7701 static void
7702 setalarm(int seconds)
7704 HANDLE queue;
7705 HANDLE timer;
7707 /* This function will be called at most once, so we can cheat a bit. */
7708 queue = CreateTimerQueue();
7709 if (seconds > ((DWORD) -1) / 1000 ||
7710 !CreateTimerQueueTimer(&timer, queue,
7711 win32_timer_callback, NULL, seconds * 1000, 0,
7712 WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE))
7713 pg_fatal("failed to set timer");
7716 #endif /* WIN32 */
7720 * These functions provide an abstraction layer that hides the syscall
7721 * we use to wait for input on a set of sockets.
7723 * Currently there are two implementations, based on ppoll(2) and select(2).
7724 * ppoll() is preferred where available due to its typically higher ceiling
7725 * on the number of usable sockets. We do not use the more-widely-available
7726 * poll(2) because it only offers millisecond timeout resolution, which could
7727 * be problematic with high --rate settings.
7729 * Function APIs:
7731 * alloc_socket_set: allocate an empty socket set with room for up to
7732 * "count" sockets.
7734 * free_socket_set: deallocate a socket set.
7736 * clear_socket_set: reset a socket set to empty.
7738 * add_socket_to_set: add socket with indicated FD to slot "idx" in the
7739 * socket set. Slots must be filled in order, starting with 0.
7741 * wait_on_socket_set: wait for input on any socket in set, or for timeout
7742 * to expire. timeout is measured in microseconds; 0 means wait forever.
7743 * Returns result code of underlying syscall (>=0 if OK, else see errno).
7745 * socket_has_input: after waiting, call this to see if given socket has
7746 * input. fd and idx parameters should match some previous call to
7747 * add_socket_to_set.
7749 * Note that wait_on_socket_set destructively modifies the state of the
7750 * socket set. After checking for input, caller must apply clear_socket_set
7751 * and add_socket_to_set again before waiting again.
7754 #ifdef POLL_USING_PPOLL
7756 static socket_set *
7757 alloc_socket_set(int count)
7759 socket_set *sa;
7761 sa = (socket_set *) pg_malloc0(offsetof(socket_set, pollfds) +
7762 sizeof(struct pollfd) * count);
7763 sa->maxfds = count;
7764 sa->curfds = 0;
7765 return sa;
7768 static void
7769 free_socket_set(socket_set *sa)
7771 pg_free(sa);
7774 static void
7775 clear_socket_set(socket_set *sa)
7777 sa->curfds = 0;
7780 static void
7781 add_socket_to_set(socket_set *sa, int fd, int idx)
7783 Assert(idx < sa->maxfds && idx == sa->curfds);
7784 sa->pollfds[idx].fd = fd;
7785 sa->pollfds[idx].events = POLLIN;
7786 sa->pollfds[idx].revents = 0;
7787 sa->curfds++;
7790 static int
7791 wait_on_socket_set(socket_set *sa, int64 usecs)
7793 if (usecs > 0)
7795 struct timespec timeout;
7797 timeout.tv_sec = usecs / 1000000;
7798 timeout.tv_nsec = (usecs % 1000000) * 1000;
7799 return ppoll(sa->pollfds, sa->curfds, &timeout, NULL);
7801 else
7803 return ppoll(sa->pollfds, sa->curfds, NULL, NULL);
7807 static bool
7808 socket_has_input(socket_set *sa, int fd, int idx)
7811 * In some cases, threadRun will apply clear_socket_set and then try to
7812 * apply socket_has_input anyway with arguments that it used before that,
7813 * or might've used before that except that it exited its setup loop
7814 * early. Hence, if the socket set is empty, silently return false
7815 * regardless of the parameters. If it's not empty, we can Assert that
7816 * the parameters match a previous call.
7818 if (sa->curfds == 0)
7819 return false;
7821 Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
7822 return (sa->pollfds[idx].revents & POLLIN) != 0;
7825 #endif /* POLL_USING_PPOLL */
7827 #ifdef POLL_USING_SELECT
7829 static socket_set *
7830 alloc_socket_set(int count)
7832 return (socket_set *) pg_malloc0(sizeof(socket_set));
7835 static void
7836 free_socket_set(socket_set *sa)
7838 pg_free(sa);
7841 static void
7842 clear_socket_set(socket_set *sa)
7844 FD_ZERO(&sa->fds);
7845 sa->maxfd = -1;
7848 static void
7849 add_socket_to_set(socket_set *sa, int fd, int idx)
7851 /* See connect_slot() for background on this code. */
7852 #ifdef WIN32
7853 if (sa->fds.fd_count + 1 >= FD_SETSIZE)
7855 pg_log_error("too many concurrent database clients for this platform: %d",
7856 sa->fds.fd_count + 1);
7857 exit(1);
7859 #else
7860 if (fd < 0 || fd >= FD_SETSIZE)
7862 pg_log_error("socket file descriptor out of range for select(): %d",
7863 fd);
7864 pg_log_error_hint("Try fewer concurrent database clients.");
7865 exit(1);
7867 #endif
7868 FD_SET(fd, &sa->fds);
7869 if (fd > sa->maxfd)
7870 sa->maxfd = fd;
7873 static int
7874 wait_on_socket_set(socket_set *sa, int64 usecs)
7876 if (usecs > 0)
7878 struct timeval timeout;
7880 timeout.tv_sec = usecs / 1000000;
7881 timeout.tv_usec = usecs % 1000000;
7882 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
7884 else
7886 return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
7890 static bool
7891 socket_has_input(socket_set *sa, int fd, int idx)
7893 return (FD_ISSET(fd, &sa->fds) != 0);
7896 #endif /* POLL_USING_SELECT */