dblink/isolationtester/fe_utils: Use new cancel API
[pgsql.git] / src / test / isolation / isolationtester.c
blob0b342b5c2bbba131aeae163d81208809e74de451
1 /*
2 * src/test/isolation/isolationtester.c
4 * isolationtester.c
5 * Runs an isolation test specified by a spec file.
6 */
8 #include "postgres_fe.h"
10 #include <sys/select.h>
11 #include <sys/time.h>
13 #include "datatype/timestamp.h"
14 #include "isolationtester.h"
15 #include "libpq-fe.h"
16 #include "pg_getopt.h"
17 #include "pqexpbuffer.h"
19 #define PREP_WAITING "isolationtester_waiting"
22 * conns[0] is the global setup, teardown, and watchdog connection. Additional
23 * connections represent spec-defined sessions.
25 typedef struct IsoConnInfo
27 /* The libpq connection object for this connection. */
28 PGconn *conn;
29 /* The backend PID, in numeric and string formats. */
30 int backend_pid;
31 const char *backend_pid_str;
32 /* Name of the associated session. */
33 const char *sessionname;
34 /* Active step on this connection, or NULL if idle. */
35 PermutationStep *active_step;
36 /* Number of NOTICE messages received from connection. */
37 int total_notices;
38 } IsoConnInfo;
40 static IsoConnInfo *conns = NULL;
41 static int nconns = 0;
43 /* Flag indicating some new NOTICE has arrived */
44 static bool any_new_notice = false;
46 /* Maximum time to wait before giving up on a step (in usec) */
47 static int64 max_step_wait = 360 * USECS_PER_SEC;
50 static void check_testspec(TestSpec *testspec);
51 static void run_testspec(TestSpec *testspec);
52 static void run_all_permutations(TestSpec *testspec);
53 static void run_all_permutations_recurse(TestSpec *testspec, int *piles,
54 int nsteps, PermutationStep **steps);
55 static void run_named_permutations(TestSpec *testspec);
56 static void run_permutation(TestSpec *testspec, int nsteps,
57 PermutationStep **steps);
59 /* Flag bits for try_complete_step(s) */
60 #define STEP_NONBLOCK 0x1 /* return as soon as cmd waits for a lock */
61 #define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
63 static int try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
64 int nwaiting, int flags);
65 static bool try_complete_step(TestSpec *testspec, PermutationStep *pstep,
66 int flags);
68 static int step_qsort_cmp(const void *a, const void *b);
69 static int step_bsearch_cmp(const void *a, const void *b);
71 static bool step_has_blocker(PermutationStep *pstep);
72 static void printResultSet(PGresult *res);
73 static void isotesterNoticeProcessor(void *arg, const char *message);
74 static void blackholeNoticeProcessor(void *arg, const char *message);
76 static void
77 disconnect_atexit(void)
79 int i;
81 for (i = 0; i < nconns; i++)
82 if (conns[i].conn)
83 PQfinish(conns[i].conn);
86 int
87 main(int argc, char **argv)
89 const char *conninfo;
90 const char *env_wait;
91 TestSpec *testspec;
92 PGresult *res;
93 PQExpBufferData wait_query;
94 int opt;
95 int i;
97 while ((opt = getopt(argc, argv, "V")) != -1)
99 switch (opt)
101 case 'V':
102 puts("isolationtester (PostgreSQL) " PG_VERSION);
103 exit(0);
104 default:
105 fprintf(stderr, "Usage: isolationtester [CONNINFO]\n");
106 return EXIT_FAILURE;
111 * Make stdout unbuffered to match stderr; and ensure stderr is unbuffered
112 * too, which it should already be everywhere except sometimes in Windows.
114 setbuf(stdout, NULL);
115 setbuf(stderr, NULL);
118 * If the user supplies a non-option parameter on the command line, use it
119 * as the conninfo string; otherwise default to setting dbname=postgres
120 * and using environment variables or defaults for all other connection
121 * parameters.
123 if (argc > optind)
124 conninfo = argv[optind];
125 else
126 conninfo = "dbname = postgres";
129 * If PG_TEST_TIMEOUT_DEFAULT is set, adopt its value (given in seconds)
130 * as half the max time to wait for any one step to complete.
132 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
133 if (env_wait != NULL)
134 max_step_wait = 2 * ((int64) atoi(env_wait)) * USECS_PER_SEC;
136 /* Read the test spec from stdin */
137 spec_yyparse();
138 testspec = &parseresult;
140 /* Perform post-parse checking, and fill in linking fields */
141 check_testspec(testspec);
143 printf("Parsed test spec with %d sessions\n", testspec->nsessions);
146 * Establish connections to the database, one for each session and an
147 * extra for lock wait detection and global work.
149 nconns = 1 + testspec->nsessions;
150 conns = (IsoConnInfo *) pg_malloc0(nconns * sizeof(IsoConnInfo));
151 atexit(disconnect_atexit);
153 for (i = 0; i < nconns; i++)
155 const char *sessionname;
157 if (i == 0)
158 sessionname = "control connection";
159 else
160 sessionname = testspec->sessions[i - 1]->name;
162 conns[i].sessionname = sessionname;
164 conns[i].conn = PQconnectdb(conninfo);
165 if (PQstatus(conns[i].conn) != CONNECTION_OK)
167 fprintf(stderr, "Connection %d failed: %s",
168 i, PQerrorMessage(conns[i].conn));
169 exit(1);
173 * Set up notice processors for the user-defined connections, so that
174 * messages can get printed prefixed with the session names. The
175 * control connection gets a "blackhole" processor instead (hides all
176 * messages).
178 if (i != 0)
179 PQsetNoticeProcessor(conns[i].conn,
180 isotesterNoticeProcessor,
181 (void *) &conns[i]);
182 else
183 PQsetNoticeProcessor(conns[i].conn,
184 blackholeNoticeProcessor,
185 NULL);
188 * Similarly, append the session name to application_name to make it
189 * easier to map spec file sessions to log output and
190 * pg_stat_activity. The reason to append instead of just setting the
191 * name is that we don't know the name of the test currently running.
193 res = PQexecParams(conns[i].conn,
194 "SELECT set_config('application_name',\n"
195 " current_setting('application_name') || '/' || $1,\n"
196 " false)",
197 1, NULL,
198 &sessionname,
199 NULL, NULL, 0);
200 if (PQresultStatus(res) != PGRES_TUPLES_OK)
202 fprintf(stderr, "setting of application name failed: %s",
203 PQerrorMessage(conns[i].conn));
204 exit(1);
207 /* Save each connection's backend PID for subsequent use. */
208 conns[i].backend_pid = PQbackendPID(conns[i].conn);
209 conns[i].backend_pid_str = psprintf("%d", conns[i].backend_pid);
213 * Build the query we'll use to detect lock contention among sessions in
214 * the test specification. Most of the time, we could get away with
215 * simply checking whether a session is waiting for *any* lock: we don't
216 * exactly expect concurrent use of test tables. However, autovacuum will
217 * occasionally take AccessExclusiveLock to truncate a table, and we must
218 * ignore that transient wait.
220 initPQExpBuffer(&wait_query);
221 appendPQExpBufferStr(&wait_query,
222 "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
223 /* The spec syntax requires at least one session; assume that here. */
224 appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str);
225 for (i = 2; i < nconns; i++)
226 appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str);
227 appendPQExpBufferStr(&wait_query, "}')");
229 res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL);
230 if (PQresultStatus(res) != PGRES_COMMAND_OK)
232 fprintf(stderr, "prepare of lock wait query failed: %s",
233 PQerrorMessage(conns[0].conn));
234 exit(1);
236 PQclear(res);
237 termPQExpBuffer(&wait_query);
240 * Run the permutations specified in the spec, or all if none were
241 * explicitly specified.
243 run_testspec(testspec);
245 return 0;
249 * Validity-check the test spec and fill in cross-links between nodes.
251 static void
252 check_testspec(TestSpec *testspec)
254 int nallsteps;
255 Step **allsteps;
256 int i,
260 /* Create a sorted lookup table of all steps. */
261 nallsteps = 0;
262 for (i = 0; i < testspec->nsessions; i++)
263 nallsteps += testspec->sessions[i]->nsteps;
265 allsteps = pg_malloc(nallsteps * sizeof(Step *));
267 k = 0;
268 for (i = 0; i < testspec->nsessions; i++)
270 for (j = 0; j < testspec->sessions[i]->nsteps; j++)
271 allsteps[k++] = testspec->sessions[i]->steps[j];
274 qsort(allsteps, nallsteps, sizeof(Step *), step_qsort_cmp);
276 /* Verify that all step names are unique. */
277 for (i = 1; i < nallsteps; i++)
279 if (strcmp(allsteps[i - 1]->name,
280 allsteps[i]->name) == 0)
282 fprintf(stderr, "duplicate step name: %s\n",
283 allsteps[i]->name);
284 exit(1);
288 /* Set the session index fields in steps. */
289 for (i = 0; i < testspec->nsessions; i++)
291 Session *session = testspec->sessions[i];
293 for (j = 0; j < session->nsteps; j++)
294 session->steps[j]->session = i;
298 * If we have manually-specified permutations, link PermutationSteps to
299 * Steps, and fill in blocker links.
301 for (i = 0; i < testspec->npermutations; i++)
303 Permutation *p = testspec->permutations[i];
305 for (j = 0; j < p->nsteps; j++)
307 PermutationStep *pstep = p->steps[j];
308 Step **this = (Step **) bsearch(pstep->name,
309 allsteps,
310 nallsteps,
311 sizeof(Step *),
312 step_bsearch_cmp);
314 if (this == NULL)
316 fprintf(stderr, "undefined step \"%s\" specified in permutation\n",
317 pstep->name);
318 exit(1);
320 pstep->step = *this;
322 /* Mark the step used, for check below */
323 pstep->step->used = true;
327 * Identify any blocker steps. We search only the current
328 * permutation, since steps not used there couldn't be concurrent.
329 * Note that it's OK to reference later permutation steps, so this
330 * can't be combined with the previous loop.
332 for (j = 0; j < p->nsteps; j++)
334 PermutationStep *pstep = p->steps[j];
336 for (k = 0; k < pstep->nblockers; k++)
338 PermutationStepBlocker *blocker = pstep->blockers[k];
339 int n;
341 if (blocker->blocktype == PSB_ONCE)
342 continue; /* nothing to link to */
344 blocker->step = NULL;
345 for (n = 0; n < p->nsteps; n++)
347 PermutationStep *otherp = p->steps[n];
349 if (strcmp(otherp->name, blocker->stepname) == 0)
351 blocker->step = otherp->step;
352 break;
355 if (blocker->step == NULL)
357 fprintf(stderr, "undefined blocking step \"%s\" referenced in permutation step \"%s\"\n",
358 blocker->stepname, pstep->name);
359 exit(1);
361 /* can't block on completion of step of own session */
362 if (blocker->step->session == pstep->step->session)
364 fprintf(stderr, "permutation step \"%s\" cannot block on its own session\n",
365 pstep->name);
366 exit(1);
373 * If we have manually-specified permutations, verify that all steps have
374 * been used, warning about anything defined but not used. We can skip
375 * this when using automatically-generated permutations.
377 if (testspec->permutations)
379 for (i = 0; i < nallsteps; i++)
381 if (!allsteps[i]->used)
382 fprintf(stderr, "unused step name: %s\n", allsteps[i]->name);
386 free(allsteps);
390 * Run the permutations specified in the spec, or all if none were
391 * explicitly specified.
393 static void
394 run_testspec(TestSpec *testspec)
396 if (testspec->permutations)
397 run_named_permutations(testspec);
398 else
399 run_all_permutations(testspec);
403 * Run all permutations of the steps and sessions.
405 static void
406 run_all_permutations(TestSpec *testspec)
408 int nsteps;
409 int i;
410 PermutationStep *steps;
411 PermutationStep **stepptrs;
412 int *piles;
414 /* Count the total number of steps in all sessions */
415 nsteps = 0;
416 for (i = 0; i < testspec->nsessions; i++)
417 nsteps += testspec->sessions[i]->nsteps;
419 /* Create PermutationStep workspace array */
420 steps = (PermutationStep *) pg_malloc0(sizeof(PermutationStep) * nsteps);
421 stepptrs = (PermutationStep **) pg_malloc(sizeof(PermutationStep *) * nsteps);
422 for (i = 0; i < nsteps; i++)
423 stepptrs[i] = steps + i;
426 * To generate the permutations, we conceptually put the steps of each
427 * session on a pile. To generate a permutation, we pick steps from the
428 * piles until all piles are empty. By picking steps from piles in
429 * different order, we get different permutations.
431 * A pile is actually just an integer which tells how many steps we've
432 * already picked from this pile.
434 piles = pg_malloc(sizeof(int) * testspec->nsessions);
435 for (i = 0; i < testspec->nsessions; i++)
436 piles[i] = 0;
438 run_all_permutations_recurse(testspec, piles, 0, stepptrs);
440 free(steps);
441 free(stepptrs);
442 free(piles);
445 static void
446 run_all_permutations_recurse(TestSpec *testspec, int *piles,
447 int nsteps, PermutationStep **steps)
449 int i;
450 bool found = false;
452 for (i = 0; i < testspec->nsessions; i++)
454 /* If there's any more steps in this pile, pick it and recurse */
455 if (piles[i] < testspec->sessions[i]->nsteps)
457 Step *newstep = testspec->sessions[i]->steps[piles[i]];
460 * These automatically-generated PermutationSteps never have
461 * blocker conditions. So we need only fill these fields, relying
462 * on run_all_permutations() to have zeroed the rest:
464 steps[nsteps]->name = newstep->name;
465 steps[nsteps]->step = newstep;
467 piles[i]++;
469 run_all_permutations_recurse(testspec, piles, nsteps + 1, steps);
471 piles[i]--;
473 found = true;
477 /* If all the piles were empty, this permutation is completed. Run it */
478 if (!found)
479 run_permutation(testspec, nsteps, steps);
483 * Run permutations given in the test spec
485 static void
486 run_named_permutations(TestSpec *testspec)
488 int i;
490 for (i = 0; i < testspec->npermutations; i++)
492 Permutation *p = testspec->permutations[i];
494 run_permutation(testspec, p->nsteps, p->steps);
498 static int
499 step_qsort_cmp(const void *a, const void *b)
501 Step *stepa = *((Step **) a);
502 Step *stepb = *((Step **) b);
504 return strcmp(stepa->name, stepb->name);
507 static int
508 step_bsearch_cmp(const void *a, const void *b)
510 char *stepname = (char *) a;
511 Step *step = *((Step **) b);
513 return strcmp(stepname, step->name);
517 * Run one permutation
519 static void
520 run_permutation(TestSpec *testspec, int nsteps, PermutationStep **steps)
522 PGresult *res;
523 int i;
524 int nwaiting = 0;
525 PermutationStep **waiting;
527 waiting = pg_malloc(sizeof(PermutationStep *) * testspec->nsessions);
529 printf("\nstarting permutation:");
530 for (i = 0; i < nsteps; i++)
531 printf(" %s", steps[i]->name);
532 printf("\n");
534 /* Perform setup */
535 for (i = 0; i < testspec->nsetupsqls; i++)
537 res = PQexec(conns[0].conn, testspec->setupsqls[i]);
538 if (PQresultStatus(res) == PGRES_TUPLES_OK)
540 printResultSet(res);
542 else if (PQresultStatus(res) != PGRES_COMMAND_OK)
544 fprintf(stderr, "setup failed: %s", PQerrorMessage(conns[0].conn));
545 exit(1);
547 PQclear(res);
550 /* Perform per-session setup */
551 for (i = 0; i < testspec->nsessions; i++)
553 if (testspec->sessions[i]->setupsql)
555 res = PQexec(conns[i + 1].conn, testspec->sessions[i]->setupsql);
556 if (PQresultStatus(res) == PGRES_TUPLES_OK)
558 printResultSet(res);
560 else if (PQresultStatus(res) != PGRES_COMMAND_OK)
562 fprintf(stderr, "setup of session %s failed: %s",
563 conns[i + 1].sessionname,
564 PQerrorMessage(conns[i + 1].conn));
565 exit(1);
567 PQclear(res);
571 /* Perform steps */
572 for (i = 0; i < nsteps; i++)
574 PermutationStep *pstep = steps[i];
575 Step *step = pstep->step;
576 IsoConnInfo *iconn = &conns[1 + step->session];
577 PGconn *conn = iconn->conn;
578 bool mustwait;
579 int j;
582 * Check whether the session that needs to perform the next step is
583 * still blocked on an earlier step. If so, wait for it to finish.
585 if (iconn->active_step != NULL)
587 struct timeval start_time;
589 gettimeofday(&start_time, NULL);
591 while (iconn->active_step != NULL)
593 PermutationStep *oldstep = iconn->active_step;
596 * Wait for oldstep. But even though we don't use
597 * STEP_NONBLOCK, it might not complete because of blocker
598 * conditions.
600 if (!try_complete_step(testspec, oldstep, STEP_RETRY))
602 /* Done, so remove oldstep from the waiting[] array. */
603 int w;
605 for (w = 0; w < nwaiting; w++)
607 if (oldstep == waiting[w])
608 break;
610 if (w >= nwaiting)
611 abort(); /* can't happen */
612 if (w + 1 < nwaiting)
613 memmove(&waiting[w], &waiting[w + 1],
614 (nwaiting - (w + 1)) * sizeof(PermutationStep *));
615 nwaiting--;
619 * Check for other steps that have finished. We should do
620 * this if oldstep completed, as it might have unblocked
621 * something. On the other hand, if oldstep hasn't completed,
622 * we must poll all the active steps in hopes of unblocking
623 * oldstep. So either way, poll them.
625 nwaiting = try_complete_steps(testspec, waiting, nwaiting,
626 STEP_NONBLOCK | STEP_RETRY);
629 * If the target session is still busy, apply a timeout to
630 * keep from hanging indefinitely, which could happen with
631 * incorrect blocker annotations. Use the same 2 *
632 * max_step_wait limit as try_complete_step does for deciding
633 * to die. (We don't bother with trying to cancel anything,
634 * since it's unclear what to cancel in this case.)
636 if (iconn->active_step != NULL)
638 struct timeval current_time;
639 int64 td;
641 gettimeofday(&current_time, NULL);
642 td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
643 td *= USECS_PER_SEC;
644 td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
645 if (td > 2 * max_step_wait)
647 fprintf(stderr, "step %s timed out after %d seconds\n",
648 iconn->active_step->name,
649 (int) (td / USECS_PER_SEC));
650 fprintf(stderr, "active steps are:");
651 for (j = 1; j < nconns; j++)
653 IsoConnInfo *oconn = &conns[j];
655 if (oconn->active_step != NULL)
656 fprintf(stderr, " %s",
657 oconn->active_step->name);
659 fprintf(stderr, "\n");
660 exit(1);
666 /* Send the query for this step. */
667 if (!PQsendQuery(conn, step->sql))
669 fprintf(stdout, "failed to send query for step %s: %s\n",
670 step->name, PQerrorMessage(conn));
671 exit(1);
674 /* Remember we launched a step. */
675 iconn->active_step = pstep;
677 /* Remember target number of NOTICEs for any blocker conditions. */
678 for (j = 0; j < pstep->nblockers; j++)
680 PermutationStepBlocker *blocker = pstep->blockers[j];
682 if (blocker->blocktype == PSB_NUM_NOTICES)
683 blocker->target_notices = blocker->num_notices +
684 conns[blocker->step->session + 1].total_notices;
687 /* Try to complete this step without blocking. */
688 mustwait = try_complete_step(testspec, pstep, STEP_NONBLOCK);
690 /* Check for completion of any steps that were previously waiting. */
691 nwaiting = try_complete_steps(testspec, waiting, nwaiting,
692 STEP_NONBLOCK | STEP_RETRY);
694 /* If this step is waiting, add it to the array of waiters. */
695 if (mustwait)
696 waiting[nwaiting++] = pstep;
699 /* Wait for any remaining queries. */
700 nwaiting = try_complete_steps(testspec, waiting, nwaiting, STEP_RETRY);
701 if (nwaiting != 0)
703 fprintf(stderr, "failed to complete permutation due to mutually-blocking steps\n");
704 exit(1);
707 /* Perform per-session teardown */
708 for (i = 0; i < testspec->nsessions; i++)
710 if (testspec->sessions[i]->teardownsql)
712 res = PQexec(conns[i + 1].conn, testspec->sessions[i]->teardownsql);
713 if (PQresultStatus(res) == PGRES_TUPLES_OK)
715 printResultSet(res);
717 else if (PQresultStatus(res) != PGRES_COMMAND_OK)
719 fprintf(stderr, "teardown of session %s failed: %s",
720 conns[i + 1].sessionname,
721 PQerrorMessage(conns[i + 1].conn));
722 /* don't exit on teardown failure */
724 PQclear(res);
728 /* Perform teardown */
729 if (testspec->teardownsql)
731 res = PQexec(conns[0].conn, testspec->teardownsql);
732 if (PQresultStatus(res) == PGRES_TUPLES_OK)
734 printResultSet(res);
736 else if (PQresultStatus(res) != PGRES_COMMAND_OK)
738 fprintf(stderr, "teardown failed: %s",
739 PQerrorMessage(conns[0].conn));
740 /* don't exit on teardown failure */
742 PQclear(res);
745 free(waiting);
749 * Check for completion of any waiting step(s).
750 * Remove completed ones from the waiting[] array,
751 * and return the new value of nwaiting.
752 * See try_complete_step for the meaning of the flags.
754 static int
755 try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
756 int nwaiting, int flags)
758 int old_nwaiting;
759 bool have_blocker;
763 int w = 0;
765 /* Reset latch; we only care about notices received within loop. */
766 any_new_notice = false;
768 /* Likewise, these variables reset for each retry. */
769 old_nwaiting = nwaiting;
770 have_blocker = false;
772 /* Scan the array, try to complete steps. */
773 while (w < nwaiting)
775 if (try_complete_step(testspec, waiting[w], flags))
777 /* Still blocked, leave it alone. */
778 if (waiting[w]->nblockers > 0)
779 have_blocker = true;
780 w++;
782 else
784 /* Done, remove it from array. */
785 if (w + 1 < nwaiting)
786 memmove(&waiting[w], &waiting[w + 1],
787 (nwaiting - (w + 1)) * sizeof(PermutationStep *));
788 nwaiting--;
793 * If any of the still-waiting steps have blocker conditions attached,
794 * it's possible that one of the steps we examined afterwards has
795 * released them (either by completing, or by sending a NOTICE). If
796 * any step completions or NOTICEs happened, repeat the loop until
797 * none occurs. Without this provision, completion timing could vary
798 * depending on the order in which the steps appear in the array.
800 } while (have_blocker && (nwaiting < old_nwaiting || any_new_notice));
801 return nwaiting;
805 * Our caller already sent the query associated with this step. Wait for it
806 * to either complete, or hit a blocking condition.
808 * When calling this function on behalf of a given step for a second or later
809 * time, pass the STEP_RETRY flag. Do not pass it on the first call.
811 * Returns true if the step was *not* completed, false if it was completed.
812 * Reasons for non-completion are (a) the STEP_NONBLOCK flag was specified
813 * and the query is waiting to acquire a lock, or (b) the step has an
814 * unsatisfied blocker condition. When STEP_NONBLOCK is given, we assume
815 * that any lock wait will persist until we have executed additional steps.
817 static bool
818 try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
820 Step *step = pstep->step;
821 IsoConnInfo *iconn = &conns[1 + step->session];
822 PGconn *conn = iconn->conn;
823 fd_set read_set;
824 struct timeval start_time;
825 struct timeval timeout;
826 int sock = PQsocket(conn);
827 int ret;
828 PGresult *res;
829 PGnotify *notify;
830 bool canceled = false;
833 * If the step is annotated with (*), then on the first call, force it to
834 * wait. This is useful for ensuring consistent output when the step
835 * might or might not complete so fast that we don't observe it waiting.
837 if (!(flags & STEP_RETRY))
839 int i;
841 for (i = 0; i < pstep->nblockers; i++)
843 PermutationStepBlocker *blocker = pstep->blockers[i];
845 if (blocker->blocktype == PSB_ONCE)
847 printf("step %s: %s <waiting ...>\n",
848 step->name, step->sql);
849 return true;
854 if (sock < 0)
856 fprintf(stderr, "invalid socket: %s", PQerrorMessage(conn));
857 exit(1);
860 gettimeofday(&start_time, NULL);
861 FD_ZERO(&read_set);
863 while (PQisBusy(conn))
865 FD_SET(sock, &read_set);
866 timeout.tv_sec = 0;
867 timeout.tv_usec = 10000; /* Check for lock waits every 10ms. */
869 ret = select(sock + 1, &read_set, NULL, NULL, &timeout);
870 if (ret < 0) /* error in select() */
872 if (errno == EINTR)
873 continue;
874 fprintf(stderr, "select failed: %m\n");
875 exit(1);
877 else if (ret == 0) /* select() timeout: check for lock wait */
879 struct timeval current_time;
880 int64 td;
882 /* If it's OK for the step to block, check whether it has. */
883 if (flags & STEP_NONBLOCK)
885 bool waiting;
887 res = PQexecPrepared(conns[0].conn, PREP_WAITING, 1,
888 &conns[step->session + 1].backend_pid_str,
889 NULL, NULL, 0);
890 if (PQresultStatus(res) != PGRES_TUPLES_OK ||
891 PQntuples(res) != 1)
893 fprintf(stderr, "lock wait query failed: %s",
894 PQerrorMessage(conns[0].conn));
895 exit(1);
897 waiting = ((PQgetvalue(res, 0, 0))[0] == 't');
898 PQclear(res);
900 if (waiting) /* waiting to acquire a lock */
903 * Since it takes time to perform the lock-check query,
904 * some data --- notably, NOTICE messages --- might have
905 * arrived since we looked. We must call PQconsumeInput
906 * and then PQisBusy to collect and process any such
907 * messages. In the (unlikely) case that PQisBusy then
908 * returns false, we might as well go examine the
909 * available result.
911 if (!PQconsumeInput(conn))
913 fprintf(stderr, "PQconsumeInput failed: %s\n",
914 PQerrorMessage(conn));
915 exit(1);
917 if (!PQisBusy(conn))
918 break;
921 * conn is still busy, so conclude that the step really is
922 * waiting.
924 if (!(flags & STEP_RETRY))
925 printf("step %s: %s <waiting ...>\n",
926 step->name, step->sql);
927 return true;
929 /* else, not waiting */
932 /* Figure out how long we've been waiting for this step. */
933 gettimeofday(&current_time, NULL);
934 td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
935 td *= USECS_PER_SEC;
936 td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
939 * After max_step_wait microseconds, try to cancel the query.
941 * If the user tries to test an invalid permutation, we don't want
942 * to hang forever, especially when this is running in the
943 * buildfarm. This will presumably lead to this permutation
944 * failing, but remaining permutations and tests should still be
945 * OK.
947 if (td > max_step_wait && !canceled)
949 PGcancelConn *cancel_conn = PQcancelCreate(conn);
951 if (PQcancelBlocking(cancel_conn))
954 * print to stdout not stderr, as this should appear in
955 * the test case's results
957 printf("isolationtester: canceling step %s after %d seconds\n",
958 step->name, (int) (td / USECS_PER_SEC));
959 canceled = true;
961 else
962 fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn));
963 PQcancelFinish(cancel_conn);
967 * After twice max_step_wait, just give up and die.
969 * Since cleanup steps won't be run in this case, this may cause
970 * later tests to fail. That stinks, but it's better than waiting
971 * forever for the server to respond to the cancel.
973 if (td > 2 * max_step_wait)
975 fprintf(stderr, "step %s timed out after %d seconds\n",
976 step->name, (int) (td / USECS_PER_SEC));
977 exit(1);
980 else if (!PQconsumeInput(conn)) /* select(): data available */
982 fprintf(stderr, "PQconsumeInput failed: %s\n",
983 PQerrorMessage(conn));
984 exit(1);
989 * The step is done, but we won't report it as complete so long as there
990 * are blockers.
992 if (step_has_blocker(pstep))
994 if (!(flags & STEP_RETRY))
995 printf("step %s: %s <waiting ...>\n",
996 step->name, step->sql);
997 return true;
1000 /* Otherwise, go ahead and complete it. */
1001 if (flags & STEP_RETRY)
1002 printf("step %s: <... completed>\n", step->name);
1003 else
1004 printf("step %s: %s\n", step->name, step->sql);
1006 while ((res = PQgetResult(conn)))
1008 switch (PQresultStatus(res))
1010 case PGRES_COMMAND_OK:
1011 case PGRES_EMPTY_QUERY:
1012 break;
1013 case PGRES_TUPLES_OK:
1014 printResultSet(res);
1015 break;
1016 case PGRES_FATAL_ERROR:
1019 * Detail may contain XID values, so we want to just show
1020 * primary. Beware however that libpq-generated error results
1021 * may not contain subfields, only an old-style message.
1024 const char *sev = PQresultErrorField(res,
1025 PG_DIAG_SEVERITY);
1026 const char *msg = PQresultErrorField(res,
1027 PG_DIAG_MESSAGE_PRIMARY);
1029 if (sev && msg)
1030 printf("%s: %s\n", sev, msg);
1031 else
1032 printf("%s\n", PQresultErrorMessage(res));
1034 break;
1035 default:
1036 printf("unexpected result status: %s\n",
1037 PQresStatus(PQresultStatus(res)));
1039 PQclear(res);
1042 /* Report any available NOTIFY messages, too */
1043 PQconsumeInput(conn);
1044 while ((notify = PQnotifies(conn)) != NULL)
1046 /* Try to identify which session it came from */
1047 const char *sendername = NULL;
1048 char pidstring[32];
1049 int i;
1051 for (i = 0; i < testspec->nsessions; i++)
1053 if (notify->be_pid == conns[i + 1].backend_pid)
1055 sendername = conns[i + 1].sessionname;
1056 break;
1059 if (sendername == NULL)
1061 /* Doesn't seem to be any test session, so show the hard way */
1062 snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
1063 sendername = pidstring;
1065 printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
1066 testspec->sessions[step->session]->name,
1067 notify->relname, notify->extra, sendername);
1068 PQfreemem(notify);
1069 PQconsumeInput(conn);
1072 /* Connection is now idle. */
1073 iconn->active_step = NULL;
1075 return false;
1078 /* Detect whether a step has any unsatisfied blocker conditions */
1079 static bool
1080 step_has_blocker(PermutationStep *pstep)
1082 int i;
1084 for (i = 0; i < pstep->nblockers; i++)
1086 PermutationStepBlocker *blocker = pstep->blockers[i];
1087 IsoConnInfo *iconn;
1089 switch (blocker->blocktype)
1091 case PSB_ONCE:
1092 /* Ignore; try_complete_step handles this specially */
1093 break;
1094 case PSB_OTHER_STEP:
1095 /* Block if referenced step is active */
1096 iconn = &conns[1 + blocker->step->session];
1097 if (iconn->active_step &&
1098 iconn->active_step->step == blocker->step)
1099 return true;
1100 break;
1101 case PSB_NUM_NOTICES:
1102 /* Block if not enough notices received yet */
1103 iconn = &conns[1 + blocker->step->session];
1104 if (iconn->total_notices < blocker->target_notices)
1105 return true;
1106 break;
1109 return false;
1112 static void
1113 printResultSet(PGresult *res)
1115 PQprintOpt popt;
1117 memset(&popt, 0, sizeof(popt));
1118 popt.header = true;
1119 popt.align = true;
1120 popt.fieldSep = "|";
1121 PQprint(stdout, res, &popt);
1124 /* notice processor for regular user sessions */
1125 static void
1126 isotesterNoticeProcessor(void *arg, const char *message)
1128 IsoConnInfo *myconn = (IsoConnInfo *) arg;
1130 /* Prefix the backend's message with the session name. */
1131 printf("%s: %s", myconn->sessionname, message);
1132 /* Record notices, since we may need this to decide to unblock a step. */
1133 myconn->total_notices++;
1134 any_new_notice = true;
1137 /* notice processor, hides the message */
1138 static void
1139 blackholeNoticeProcessor(void *arg, const char *message)
1141 /* do nothing */