Add tests for libpq query cancellation APIs
[pgsql.git] / src / test / modules / libpq_pipeline / libpq_pipeline.c
blob97f8febf929859fea0b9c0a4b32661a0e362f14f
1 /*-------------------------------------------------------------------------
3 * libpq_pipeline.c
4 * Verify libpq pipeline execution functionality
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * IDENTIFICATION
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
13 *-------------------------------------------------------------------------
16 #include "postgres_fe.h"
18 #include <sys/select.h>
19 #include <sys/time.h>
21 #include "catalog/pg_type_d.h"
22 #include "common/fe_memutils.h"
23 #include "libpq-fe.h"
24 #include "pg_getopt.h"
25 #include "portability/instr_time.h"
28 static void exit_nicely(PGconn *conn);
29 static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
30 pg_attribute_printf(2, 3);
31 static bool process_result(PGconn *conn, PGresult *res, int results,
32 int numsent);
34 const char *const progname = "libpq_pipeline";
36 /* Options and defaults */
37 char *tracefile = NULL; /* path to PQtrace() file */
40 #ifdef DEBUG_OUTPUT
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 #else
43 #define pg_debug(...)
44 #endif
46 static const char *const drop_table_sql =
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql =
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 "int8filler int8);";
51 static const char *const insert_sql =
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2 =
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
56 /* max char length of an int32/64, plus sign and null terminator */
57 #define MAXINTLEN 12
58 #define MAXINT8LEN 20
60 static void
61 exit_nicely(PGconn *conn)
63 PQfinish(conn);
64 exit(1);
68 * The following few functions are wrapped in macros to make the reported line
69 * number in an error match the line number of the invocation.
73 * Print an error to stderr and terminate the program.
75 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
76 static void
77 pg_attribute_noreturn()
78 pg_fatal_impl(int line, const char *fmt,...)
80 va_list args;
82 fflush(stdout);
84 fprintf(stderr, "\n%s:%d: ", progname, line);
85 va_start(args, fmt);
86 vfprintf(stderr, fmt, args);
87 va_end(args);
88 Assert(fmt[strlen(fmt) - 1] != '\n');
89 fprintf(stderr, "\n");
90 exit(1);
94 * Check that the query on the given connection got canceled.
96 #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
97 static void
98 confirm_query_canceled_impl(int line, PGconn *conn)
100 PGresult *res = NULL;
102 res = PQgetResult(conn);
103 if (res == NULL)
104 pg_fatal_impl(line, "PQgetResult returned null: %s",
105 PQerrorMessage(conn));
106 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
107 pg_fatal_impl(line, "query did not fail when it was expected");
108 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
109 pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
110 PQerrorMessage(conn));
111 PQclear(res);
113 while (PQisBusy(conn))
114 PQconsumeInput(conn);
117 #define send_cancellable_query(conn, monitorConn) \
118 send_cancellable_query_impl(__LINE__, conn, monitorConn)
119 static void
120 send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
122 const char *env_wait;
123 const Oid paramTypes[1] = {INT4OID};
124 int procpid = PQbackendPID(conn);
126 env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
127 if (env_wait == NULL)
128 env_wait = "180";
130 if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
131 &env_wait, NULL, NULL, 0) != 1)
132 pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
135 * Wait until the query is actually running. Otherwise sending a
136 * cancellation request might not cancel the query due to race conditions.
138 while (true)
140 char *value;
141 PGresult *res;
142 const char *paramValues[1];
143 char pidval[16];
145 snprintf(pidval, 16, "%d", procpid);
146 paramValues[0] = pidval;
148 res = PQexecParams(monitorConn,
149 "SELECT count(*) FROM pg_stat_activity WHERE "
150 "pid = $1 AND state = 'active'",
151 1, NULL, paramValues, NULL, NULL, 1);
153 if (PQresultStatus(res) != PGRES_TUPLES_OK)
154 pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
155 if (PQntuples(res) != 1)
156 pg_fatal("unexpected number of rows received: %d", PQntuples(res));
157 if (PQnfields(res) != 1)
158 pg_fatal("unexpected number of columns received: %d", PQnfields(res));
159 value = PQgetvalue(res, 0, 0);
160 if (*value != '0')
162 PQclear(res);
163 break;
165 PQclear(res);
167 /* wait 10ms before polling again */
168 pg_usleep(10000);
173 * Create a new connection with the same conninfo as the given one.
175 static PGconn *
176 copy_connection(PGconn *conn)
178 PGconn *copyConn;
179 PQconninfoOption *opts = PQconninfo(conn);
180 const char **keywords;
181 const char **vals;
182 int nopts = 1;
183 int i = 0;
185 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
186 nopts++;
188 keywords = pg_malloc(sizeof(char *) * nopts);
189 vals = pg_malloc(sizeof(char *) * nopts);
191 for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
193 if (opt->val)
195 keywords[i] = opt->keyword;
196 vals[i] = opt->val;
197 i++;
200 keywords[i] = vals[i] = NULL;
202 copyConn = PQconnectdbParams(keywords, vals, false);
204 if (PQstatus(copyConn) != CONNECTION_OK)
205 pg_fatal("Connection to database failed: %s",
206 PQerrorMessage(copyConn));
208 return copyConn;
212 * Test query cancellation routines
214 static void
215 test_cancel(PGconn *conn)
217 PGcancel *cancel;
218 PGconn *monitorConn;
219 char errorbuf[256];
221 fprintf(stderr, "test cancellations... ");
223 if (PQsetnonblocking(conn, 1) != 0)
224 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
227 * Make a separate connection to the database to monitor the query on the
228 * main connection.
230 monitorConn = copy_connection(conn);
231 Assert(PQstatus(monitorConn) == CONNECTION_OK);
233 /* test PQcancel */
234 send_cancellable_query(conn, monitorConn);
235 cancel = PQgetCancel(conn);
236 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
237 pg_fatal("failed to run PQcancel: %s", errorbuf);
238 confirm_query_canceled(conn);
240 /* PGcancel object can be reused for the next query */
241 send_cancellable_query(conn, monitorConn);
242 if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
243 pg_fatal("failed to run PQcancel: %s", errorbuf);
244 confirm_query_canceled(conn);
246 PQfreeCancel(cancel);
248 /* test PQrequestCancel */
249 send_cancellable_query(conn, monitorConn);
250 if (!PQrequestCancel(conn))
251 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
252 confirm_query_canceled(conn);
254 fprintf(stderr, "ok\n");
257 static void
258 test_disallowed_in_pipeline(PGconn *conn)
260 PGresult *res = NULL;
262 fprintf(stderr, "test error cases... ");
264 if (PQisnonblocking(conn))
265 pg_fatal("Expected blocking connection mode");
267 if (PQenterPipelineMode(conn) != 1)
268 pg_fatal("Unable to enter pipeline mode");
270 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
271 pg_fatal("Pipeline mode not activated properly");
273 /* PQexec should fail in pipeline mode */
274 res = PQexec(conn, "SELECT 1");
275 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
276 pg_fatal("PQexec should fail in pipeline mode but succeeded");
277 if (strcmp(PQerrorMessage(conn),
278 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
279 pg_fatal("did not get expected error message; got: \"%s\"",
280 PQerrorMessage(conn));
282 /* PQsendQuery should fail in pipeline mode */
283 if (PQsendQuery(conn, "SELECT 1") != 0)
284 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
285 if (strcmp(PQerrorMessage(conn),
286 "PQsendQuery not allowed in pipeline mode\n") != 0)
287 pg_fatal("did not get expected error message; got: \"%s\"",
288 PQerrorMessage(conn));
290 /* Entering pipeline mode when already in pipeline mode is OK */
291 if (PQenterPipelineMode(conn) != 1)
292 pg_fatal("re-entering pipeline mode should be a no-op but failed");
294 if (PQisBusy(conn) != 0)
295 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
297 /* ok, back to normal command mode */
298 if (PQexitPipelineMode(conn) != 1)
299 pg_fatal("couldn't exit idle empty pipeline mode");
301 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
302 pg_fatal("Pipeline mode not terminated properly");
304 /* exiting pipeline mode when not in pipeline mode should be a no-op */
305 if (PQexitPipelineMode(conn) != 1)
306 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
308 /* can now PQexec again */
309 res = PQexec(conn, "SELECT 1");
310 if (PQresultStatus(res) != PGRES_TUPLES_OK)
311 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
312 PQerrorMessage(conn));
314 fprintf(stderr, "ok\n");
317 static void
318 test_multi_pipelines(PGconn *conn)
320 PGresult *res = NULL;
321 const char *dummy_params[1] = {"1"};
322 Oid dummy_param_oids[1] = {INT4OID};
324 fprintf(stderr, "multi pipeline... ");
327 * Queue up a couple of small pipelines and process each without returning
328 * to command mode first.
330 if (PQenterPipelineMode(conn) != 1)
331 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
333 /* first pipeline */
334 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
335 dummy_params, NULL, NULL, 0) != 1)
336 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
338 if (PQpipelineSync(conn) != 1)
339 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
341 /* second pipeline */
342 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
343 dummy_params, NULL, NULL, 0) != 1)
344 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
346 /* Skip flushing once. */
347 if (PQsendPipelineSync(conn) != 1)
348 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
350 /* third pipeline */
351 if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
352 dummy_params, NULL, NULL, 0) != 1)
353 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
355 if (PQpipelineSync(conn) != 1)
356 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
358 /* OK, start processing the results */
360 /* first pipeline */
362 res = PQgetResult(conn);
363 if (res == NULL)
364 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
365 PQerrorMessage(conn));
367 if (PQresultStatus(res) != PGRES_TUPLES_OK)
368 pg_fatal("Unexpected result code %s from first pipeline item",
369 PQresStatus(PQresultStatus(res)));
370 PQclear(res);
371 res = NULL;
373 if (PQgetResult(conn) != NULL)
374 pg_fatal("PQgetResult returned something extra after first result");
376 if (PQexitPipelineMode(conn) != 0)
377 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
379 res = PQgetResult(conn);
380 if (res == NULL)
381 pg_fatal("PQgetResult returned null when sync result expected: %s",
382 PQerrorMessage(conn));
384 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
385 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
386 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
387 PQclear(res);
389 /* second pipeline */
391 res = PQgetResult(conn);
392 if (res == NULL)
393 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
394 PQerrorMessage(conn));
396 if (PQresultStatus(res) != PGRES_TUPLES_OK)
397 pg_fatal("Unexpected result code %s from second pipeline item",
398 PQresStatus(PQresultStatus(res)));
399 PQclear(res);
400 res = NULL;
402 if (PQgetResult(conn) != NULL)
403 pg_fatal("PQgetResult returned something extra after first result");
405 if (PQexitPipelineMode(conn) != 0)
406 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
408 res = PQgetResult(conn);
409 if (res == NULL)
410 pg_fatal("PQgetResult returned null when sync result expected: %s",
411 PQerrorMessage(conn));
413 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
414 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
415 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
416 PQclear(res);
418 /* third pipeline */
420 res = PQgetResult(conn);
421 if (res == NULL)
422 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
423 PQerrorMessage(conn));
425 if (PQresultStatus(res) != PGRES_TUPLES_OK)
426 pg_fatal("Unexpected result code %s from third pipeline item",
427 PQresStatus(PQresultStatus(res)));
429 res = PQgetResult(conn);
430 if (res != NULL)
431 pg_fatal("Expected null result, got %s",
432 PQresStatus(PQresultStatus(res)));
434 res = PQgetResult(conn);
435 if (res == NULL)
436 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
437 PQerrorMessage(conn));
439 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
440 pg_fatal("Unexpected result code %s from second pipeline sync",
441 PQresStatus(PQresultStatus(res)));
443 /* We're still in pipeline mode ... */
444 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
445 pg_fatal("Fell out of pipeline mode somehow");
447 /* until we end it, which we can safely do now */
448 if (PQexitPipelineMode(conn) != 1)
449 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
450 PQerrorMessage(conn));
452 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
453 pg_fatal("exiting pipeline mode didn't seem to work");
455 fprintf(stderr, "ok\n");
459 * Test behavior when a pipeline dispatches a number of commands that are
460 * not flushed by a sync point.
462 static void
463 test_nosync(PGconn *conn)
465 int numqueries = 10;
466 int results = 0;
467 int sock = PQsocket(conn);
469 fprintf(stderr, "nosync... ");
471 if (sock < 0)
472 pg_fatal("invalid socket");
474 if (PQenterPipelineMode(conn) != 1)
475 pg_fatal("could not enter pipeline mode");
476 for (int i = 0; i < numqueries; i++)
478 fd_set input_mask;
479 struct timeval tv;
481 if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
482 0, NULL, NULL, NULL, NULL, 0) != 1)
483 pg_fatal("error sending select: %s", PQerrorMessage(conn));
484 PQflush(conn);
487 * If the server has written anything to us, read (some of) it now.
489 FD_ZERO(&input_mask);
490 FD_SET(sock, &input_mask);
491 tv.tv_sec = 0;
492 tv.tv_usec = 0;
493 if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
495 fprintf(stderr, "select() failed: %s\n", strerror(errno));
496 exit_nicely(conn);
498 if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
499 pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
502 /* tell server to flush its output buffer */
503 if (PQsendFlushRequest(conn) != 1)
504 pg_fatal("failed to send flush request");
505 PQflush(conn);
507 /* Now read all results */
508 for (;;)
510 PGresult *res;
512 res = PQgetResult(conn);
514 /* NULL results are only expected after TUPLES_OK */
515 if (res == NULL)
516 pg_fatal("got unexpected NULL result after %d results", results);
518 /* We expect exactly one TUPLES_OK result for each query we sent */
519 if (PQresultStatus(res) == PGRES_TUPLES_OK)
521 PGresult *res2;
523 /* and one NULL result should follow each */
524 res2 = PQgetResult(conn);
525 if (res2 != NULL)
526 pg_fatal("expected NULL, got %s",
527 PQresStatus(PQresultStatus(res2)));
528 PQclear(res);
529 results++;
531 /* if we're done, we're done */
532 if (results == numqueries)
533 break;
535 continue;
538 /* anything else is unexpected */
539 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
542 fprintf(stderr, "ok\n");
546 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
547 * still have to get results for each pipeline item, but the item will just be
548 * a PGRES_PIPELINE_ABORTED code.
550 * This intentionally doesn't use a transaction to wrap the pipeline. You should
551 * usually use an xact, but in this case we want to observe the effects of each
552 * statement.
554 static void
555 test_pipeline_abort(PGconn *conn)
557 PGresult *res = NULL;
558 const char *dummy_params[1] = {"1"};
559 Oid dummy_param_oids[1] = {INT4OID};
560 int i;
561 int gotrows;
562 bool goterror;
564 fprintf(stderr, "aborted pipeline... ");
566 res = PQexec(conn, drop_table_sql);
567 if (PQresultStatus(res) != PGRES_COMMAND_OK)
568 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
570 res = PQexec(conn, create_table_sql);
571 if (PQresultStatus(res) != PGRES_COMMAND_OK)
572 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
575 * Queue up a couple of small pipelines and process each without returning
576 * to command mode first. Make sure the second operation in the first
577 * pipeline ERRORs.
579 if (PQenterPipelineMode(conn) != 1)
580 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
582 dummy_params[0] = "1";
583 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
584 dummy_params, NULL, NULL, 0) != 1)
585 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
587 if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
588 1, dummy_param_oids, dummy_params,
589 NULL, NULL, 0) != 1)
590 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
592 dummy_params[0] = "2";
593 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
594 dummy_params, NULL, NULL, 0) != 1)
595 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
597 if (PQpipelineSync(conn) != 1)
598 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
600 dummy_params[0] = "3";
601 if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
602 dummy_params, NULL, NULL, 0) != 1)
603 pg_fatal("dispatching second-pipeline insert failed: %s",
604 PQerrorMessage(conn));
606 if (PQpipelineSync(conn) != 1)
607 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
610 * OK, start processing the pipeline results.
612 * We should get a command-ok for the first query, then a fatal error and
613 * a pipeline aborted message for the second insert, a pipeline-end, then
614 * a command-ok and a pipeline-ok for the second pipeline operation.
616 res = PQgetResult(conn);
617 if (res == NULL)
618 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
619 if (PQresultStatus(res) != PGRES_COMMAND_OK)
620 pg_fatal("Unexpected result status %s: %s",
621 PQresStatus(PQresultStatus(res)),
622 PQresultErrorMessage(res));
623 PQclear(res);
625 /* NULL result to signal end-of-results for this command */
626 if ((res = PQgetResult(conn)) != NULL)
627 pg_fatal("Expected null result, got %s",
628 PQresStatus(PQresultStatus(res)));
630 /* Second query caused error, so we expect an error next */
631 res = PQgetResult(conn);
632 if (res == NULL)
633 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
634 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
635 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
636 PQresStatus(PQresultStatus(res)));
637 PQclear(res);
639 /* NULL result to signal end-of-results for this command */
640 if ((res = PQgetResult(conn)) != NULL)
641 pg_fatal("Expected null result, got %s",
642 PQresStatus(PQresultStatus(res)));
645 * pipeline should now be aborted.
647 * Note that we could still queue more queries at this point if we wanted;
648 * they'd get added to a new third pipeline since we've already sent a
649 * second. The aborted flag relates only to the pipeline being received.
651 if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
652 pg_fatal("pipeline should be flagged as aborted but isn't");
654 /* third query in pipeline, the second insert */
655 res = PQgetResult(conn);
656 if (res == NULL)
657 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
658 if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
659 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
660 PQresStatus(PQresultStatus(res)));
661 PQclear(res);
663 /* NULL result to signal end-of-results for this command */
664 if ((res = PQgetResult(conn)) != NULL)
665 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
667 if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
668 pg_fatal("pipeline should be flagged as aborted but isn't");
670 /* Ensure we're still in pipeline */
671 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
672 pg_fatal("Fell out of pipeline mode somehow");
675 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
677 * (This is so clients know to start processing results normally again and
678 * can tell the difference between skipped commands and the sync.)
680 res = PQgetResult(conn);
681 if (res == NULL)
682 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
683 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
684 pg_fatal("Unexpected result code from first pipeline sync\n"
685 "Expected PGRES_PIPELINE_SYNC, got %s",
686 PQresStatus(PQresultStatus(res)));
687 PQclear(res);
689 if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
690 pg_fatal("sync should've cleared the aborted flag but didn't");
692 /* We're still in pipeline mode... */
693 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
694 pg_fatal("Fell out of pipeline mode somehow");
696 /* the insert from the second pipeline */
697 res = PQgetResult(conn);
698 if (res == NULL)
699 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
700 if (PQresultStatus(res) != PGRES_COMMAND_OK)
701 pg_fatal("Unexpected result code %s from first item in second pipeline",
702 PQresStatus(PQresultStatus(res)));
703 PQclear(res);
705 /* Read the NULL result at the end of the command */
706 if ((res = PQgetResult(conn)) != NULL)
707 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
709 /* the second pipeline sync */
710 if ((res = PQgetResult(conn)) == NULL)
711 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
712 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
713 pg_fatal("Unexpected result code %s from second pipeline sync",
714 PQresStatus(PQresultStatus(res)));
715 PQclear(res);
717 if ((res = PQgetResult(conn)) != NULL)
718 pg_fatal("Expected null result, got %s: %s",
719 PQresStatus(PQresultStatus(res)),
720 PQerrorMessage(conn));
722 /* Try to send two queries in one command */
723 if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
724 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
725 if (PQpipelineSync(conn) != 1)
726 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
727 goterror = false;
728 while ((res = PQgetResult(conn)) != NULL)
730 switch (PQresultStatus(res))
732 case PGRES_FATAL_ERROR:
733 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
734 pg_fatal("expected error about multiple commands, got %s",
735 PQerrorMessage(conn));
736 printf("got expected %s", PQerrorMessage(conn));
737 goterror = true;
738 break;
739 default:
740 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
741 break;
744 if (!goterror)
745 pg_fatal("did not get cannot-insert-multiple-commands error");
746 res = PQgetResult(conn);
747 if (res == NULL)
748 pg_fatal("got NULL result");
749 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
750 pg_fatal("Unexpected result code %s from pipeline sync",
751 PQresStatus(PQresultStatus(res)));
752 fprintf(stderr, "ok\n");
754 /* Test single-row mode with an error partways */
755 if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
756 0, NULL, NULL, NULL, NULL, 0) != 1)
757 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
758 if (PQpipelineSync(conn) != 1)
759 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
760 PQsetSingleRowMode(conn);
761 goterror = false;
762 gotrows = 0;
763 while ((res = PQgetResult(conn)) != NULL)
765 switch (PQresultStatus(res))
767 case PGRES_SINGLE_TUPLE:
768 printf("got row: %s\n", PQgetvalue(res, 0, 0));
769 gotrows++;
770 break;
771 case PGRES_FATAL_ERROR:
772 if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
773 pg_fatal("expected division-by-zero, got: %s (%s)",
774 PQerrorMessage(conn),
775 PQresultErrorField(res, PG_DIAG_SQLSTATE));
776 printf("got expected division-by-zero\n");
777 goterror = true;
778 break;
779 default:
780 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
782 PQclear(res);
784 if (!goterror)
785 pg_fatal("did not get division-by-zero error");
786 if (gotrows != 3)
787 pg_fatal("did not get three rows");
788 /* the third pipeline sync */
789 if ((res = PQgetResult(conn)) == NULL)
790 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
791 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
792 pg_fatal("Unexpected result code %s from third pipeline sync",
793 PQresStatus(PQresultStatus(res)));
794 PQclear(res);
796 /* We're still in pipeline mode... */
797 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
798 pg_fatal("Fell out of pipeline mode somehow");
800 /* until we end it, which we can safely do now */
801 if (PQexitPipelineMode(conn) != 1)
802 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
803 PQerrorMessage(conn));
805 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
806 pg_fatal("exiting pipeline mode didn't seem to work");
809 * Since we fired the pipelines off without a surrounding xact, the results
810 * should be:
812 * - Implicit xact started by server around 1st pipeline
813 * - First insert applied
814 * - Second statement aborted xact
815 * - Third insert skipped
816 * - Sync rolled back first implicit xact
817 * - Implicit xact created by server around 2nd pipeline
818 * - insert applied from 2nd pipeline
819 * - Sync commits 2nd xact
821 * So we should only have the value 3 that we inserted.
823 res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
825 if (PQresultStatus(res) != PGRES_TUPLES_OK)
826 pg_fatal("Expected tuples, got %s: %s",
827 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
828 if (PQntuples(res) != 1)
829 pg_fatal("expected 1 result, got %d", PQntuples(res));
830 for (i = 0; i < PQntuples(res); i++)
832 const char *val = PQgetvalue(res, i, 0);
834 if (strcmp(val, "3") != 0)
835 pg_fatal("expected only insert with value 3, got %s", val);
838 PQclear(res);
840 fprintf(stderr, "ok\n");
843 /* State machine enum for test_pipelined_insert */
844 enum PipelineInsertStep
846 BI_BEGIN_TX,
847 BI_DROP_TABLE,
848 BI_CREATE_TABLE,
849 BI_PREPARE,
850 BI_INSERT_ROWS,
851 BI_COMMIT_TX,
852 BI_SYNC,
853 BI_DONE,
856 static void
857 test_pipelined_insert(PGconn *conn, int n_rows)
859 Oid insert_param_oids[2] = {INT4OID, INT8OID};
860 const char *insert_params[2];
861 char insert_param_0[MAXINTLEN];
862 char insert_param_1[MAXINT8LEN];
863 enum PipelineInsertStep send_step = BI_BEGIN_TX,
864 recv_step = BI_BEGIN_TX;
865 int rows_to_send,
866 rows_to_receive;
868 insert_params[0] = insert_param_0;
869 insert_params[1] = insert_param_1;
871 rows_to_send = rows_to_receive = n_rows;
874 * Do a pipelined insert into a table created at the start of the pipeline
876 if (PQenterPipelineMode(conn) != 1)
877 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
879 while (send_step != BI_PREPARE)
881 const char *sql;
883 switch (send_step)
885 case BI_BEGIN_TX:
886 sql = "BEGIN TRANSACTION";
887 send_step = BI_DROP_TABLE;
888 break;
890 case BI_DROP_TABLE:
891 sql = drop_table_sql;
892 send_step = BI_CREATE_TABLE;
893 break;
895 case BI_CREATE_TABLE:
896 sql = create_table_sql;
897 send_step = BI_PREPARE;
898 break;
900 default:
901 pg_fatal("invalid state");
902 sql = NULL; /* keep compiler quiet */
905 pg_debug("sending: %s\n", sql);
906 if (PQsendQueryParams(conn, sql,
907 0, NULL, NULL, NULL, NULL, 0) != 1)
908 pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
911 Assert(send_step == BI_PREPARE);
912 pg_debug("sending: %s\n", insert_sql2);
913 if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
914 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
915 send_step = BI_INSERT_ROWS;
918 * Now we start inserting. We'll be sending enough data that we could fill
919 * our output buffer, so to avoid deadlocking we need to enter nonblocking
920 * mode and consume input while we send more output. As results of each
921 * query are processed we should pop them to allow processing of the next
922 * query. There's no need to finish the pipeline before processing
923 * results.
925 if (PQsetnonblocking(conn, 1) != 0)
926 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
928 while (recv_step != BI_DONE)
930 int sock;
931 fd_set input_mask;
932 fd_set output_mask;
934 sock = PQsocket(conn);
936 if (sock < 0)
937 break; /* shouldn't happen */
939 FD_ZERO(&input_mask);
940 FD_SET(sock, &input_mask);
941 FD_ZERO(&output_mask);
942 FD_SET(sock, &output_mask);
944 if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
946 fprintf(stderr, "select() failed: %s\n", strerror(errno));
947 exit_nicely(conn);
951 * Process any results, so we keep the server's output buffer free
952 * flowing and it can continue to process input
954 if (FD_ISSET(sock, &input_mask))
956 PQconsumeInput(conn);
958 /* Read until we'd block if we tried to read */
959 while (!PQisBusy(conn) && recv_step < BI_DONE)
961 PGresult *res;
962 const char *cmdtag = "";
963 const char *description = "";
964 int status;
967 * Read next result. If no more results from this query,
968 * advance to the next query
970 res = PQgetResult(conn);
971 if (res == NULL)
972 continue;
974 status = PGRES_COMMAND_OK;
975 switch (recv_step)
977 case BI_BEGIN_TX:
978 cmdtag = "BEGIN";
979 recv_step++;
980 break;
981 case BI_DROP_TABLE:
982 cmdtag = "DROP TABLE";
983 recv_step++;
984 break;
985 case BI_CREATE_TABLE:
986 cmdtag = "CREATE TABLE";
987 recv_step++;
988 break;
989 case BI_PREPARE:
990 cmdtag = "";
991 description = "PREPARE";
992 recv_step++;
993 break;
994 case BI_INSERT_ROWS:
995 cmdtag = "INSERT";
996 rows_to_receive--;
997 if (rows_to_receive == 0)
998 recv_step++;
999 break;
1000 case BI_COMMIT_TX:
1001 cmdtag = "COMMIT";
1002 recv_step++;
1003 break;
1004 case BI_SYNC:
1005 cmdtag = "";
1006 description = "SYNC";
1007 status = PGRES_PIPELINE_SYNC;
1008 recv_step++;
1009 break;
1010 case BI_DONE:
1011 /* unreachable */
1012 pg_fatal("unreachable state");
1015 if (PQresultStatus(res) != status)
1016 pg_fatal("%s reported status %s, expected %s\n"
1017 "Error message: \"%s\"",
1018 description, PQresStatus(PQresultStatus(res)),
1019 PQresStatus(status), PQerrorMessage(conn));
1021 if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1022 pg_fatal("%s expected command tag '%s', got '%s'",
1023 description, cmdtag, PQcmdStatus(res));
1025 pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1027 PQclear(res);
1031 /* Write more rows and/or the end pipeline message, if needed */
1032 if (FD_ISSET(sock, &output_mask))
1034 PQflush(conn);
1036 if (send_step == BI_INSERT_ROWS)
1038 snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1039 /* use up some buffer space with a wide value */
1040 snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1042 if (PQsendQueryPrepared(conn, "my_insert",
1043 2, insert_params, NULL, NULL, 0) == 1)
1045 pg_debug("sent row %d\n", rows_to_send);
1047 rows_to_send--;
1048 if (rows_to_send == 0)
1049 send_step++;
1051 else
1054 * in nonblocking mode, so it's OK for an insert to fail
1055 * to send
1057 fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1058 rows_to_send, PQerrorMessage(conn));
1061 else if (send_step == BI_COMMIT_TX)
1063 if (PQsendQueryParams(conn, "COMMIT",
1064 0, NULL, NULL, NULL, NULL, 0) == 1)
1066 pg_debug("sent COMMIT\n");
1067 send_step++;
1069 else
1071 fprintf(stderr, "WARNING: failed to send commit: %s\n",
1072 PQerrorMessage(conn));
1075 else if (send_step == BI_SYNC)
1077 if (PQpipelineSync(conn) == 1)
1079 fprintf(stdout, "pipeline sync sent\n");
1080 send_step++;
1082 else
1084 fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1085 PQerrorMessage(conn));
1091 /* We've got the sync message and the pipeline should be done */
1092 if (PQexitPipelineMode(conn) != 1)
1093 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1094 PQerrorMessage(conn));
1096 if (PQsetnonblocking(conn, 0) != 0)
1097 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1099 fprintf(stderr, "ok\n");
1102 static void
1103 test_prepared(PGconn *conn)
1105 PGresult *res = NULL;
1106 Oid param_oids[1] = {INT4OID};
1107 Oid expected_oids[4];
1108 Oid typ;
1110 fprintf(stderr, "prepared... ");
1112 if (PQenterPipelineMode(conn) != 1)
1113 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1114 if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1115 "interval '1 sec'",
1116 1, param_oids) != 1)
1117 pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1118 expected_oids[0] = INT4OID;
1119 expected_oids[1] = TEXTOID;
1120 expected_oids[2] = NUMERICOID;
1121 expected_oids[3] = INTERVALOID;
1122 if (PQsendDescribePrepared(conn, "select_one") != 1)
1123 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1124 if (PQpipelineSync(conn) != 1)
1125 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1127 res = PQgetResult(conn);
1128 if (res == NULL)
1129 pg_fatal("PQgetResult returned null");
1130 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1131 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1132 PQclear(res);
1133 res = PQgetResult(conn);
1134 if (res != NULL)
1135 pg_fatal("expected NULL result");
1137 res = PQgetResult(conn);
1138 if (res == NULL)
1139 pg_fatal("PQgetResult returned NULL");
1140 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1141 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1142 if (PQnfields(res) != lengthof(expected_oids))
1143 pg_fatal("expected %zu columns, got %d",
1144 lengthof(expected_oids), PQnfields(res));
1145 for (int i = 0; i < PQnfields(res); i++)
1147 typ = PQftype(res, i);
1148 if (typ != expected_oids[i])
1149 pg_fatal("field %d: expected type %u, got %u",
1150 i, expected_oids[i], typ);
1152 PQclear(res);
1153 res = PQgetResult(conn);
1154 if (res != NULL)
1155 pg_fatal("expected NULL result");
1157 res = PQgetResult(conn);
1158 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1159 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1161 fprintf(stderr, "closing statement..");
1162 if (PQsendClosePrepared(conn, "select_one") != 1)
1163 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
1164 if (PQpipelineSync(conn) != 1)
1165 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1167 res = PQgetResult(conn);
1168 if (res == NULL)
1169 pg_fatal("expected non-NULL result");
1170 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1171 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1172 PQclear(res);
1173 res = PQgetResult(conn);
1174 if (res != NULL)
1175 pg_fatal("expected NULL result");
1176 res = PQgetResult(conn);
1177 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1178 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1180 if (PQexitPipelineMode(conn) != 1)
1181 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1183 /* Now that it's closed we should get an error when describing */
1184 res = PQdescribePrepared(conn, "select_one");
1185 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1186 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1189 * Also test the blocking close, this should not fail since closing a
1190 * non-existent prepared statement is a no-op
1192 res = PQclosePrepared(conn, "select_one");
1193 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1194 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1196 fprintf(stderr, "creating portal... ");
1197 PQexec(conn, "BEGIN");
1198 PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1199 PQenterPipelineMode(conn);
1200 if (PQsendDescribePortal(conn, "cursor_one") != 1)
1201 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1202 if (PQpipelineSync(conn) != 1)
1203 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1204 res = PQgetResult(conn);
1205 if (res == NULL)
1206 pg_fatal("PQgetResult returned null");
1207 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1208 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1210 typ = PQftype(res, 0);
1211 if (typ != INT4OID)
1212 pg_fatal("portal: expected type %u, got %u",
1213 INT4OID, typ);
1214 PQclear(res);
1215 res = PQgetResult(conn);
1216 if (res != NULL)
1217 pg_fatal("expected NULL result");
1218 res = PQgetResult(conn);
1219 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1220 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1222 fprintf(stderr, "closing portal... ");
1223 if (PQsendClosePortal(conn, "cursor_one") != 1)
1224 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
1225 if (PQpipelineSync(conn) != 1)
1226 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1228 res = PQgetResult(conn);
1229 if (res == NULL)
1230 pg_fatal("expected non-NULL result");
1231 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1232 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1233 PQclear(res);
1234 res = PQgetResult(conn);
1235 if (res != NULL)
1236 pg_fatal("expected NULL result");
1237 res = PQgetResult(conn);
1238 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1239 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1241 if (PQexitPipelineMode(conn) != 1)
1242 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1244 /* Now that it's closed we should get an error when describing */
1245 res = PQdescribePortal(conn, "cursor_one");
1246 if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1247 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1250 * Also test the blocking close, this should not fail since closing a
1251 * non-existent portal is a no-op
1253 res = PQclosePortal(conn, "cursor_one");
1254 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1255 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1257 fprintf(stderr, "ok\n");
1260 /* Notice processor: print notices, and count how many we got */
1261 static void
1262 notice_processor(void *arg, const char *message)
1264 int *n_notices = (int *) arg;
1266 (*n_notices)++;
1267 fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1270 /* Verify behavior in "idle" state */
1271 static void
1272 test_pipeline_idle(PGconn *conn)
1274 PGresult *res;
1275 int n_notices = 0;
1277 fprintf(stderr, "\npipeline idle...\n");
1279 PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1281 /* Try to exit pipeline mode in pipeline-idle state */
1282 if (PQenterPipelineMode(conn) != 1)
1283 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1284 if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1285 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1286 PQsendFlushRequest(conn);
1287 res = PQgetResult(conn);
1288 if (res == NULL)
1289 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1290 PQerrorMessage(conn));
1291 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1292 pg_fatal("unexpected result code %s from first pipeline item",
1293 PQresStatus(PQresultStatus(res)));
1294 PQclear(res);
1295 res = PQgetResult(conn);
1296 if (res != NULL)
1297 pg_fatal("did not receive terminating NULL");
1298 if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1299 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1300 if (PQexitPipelineMode(conn) == 1)
1301 pg_fatal("exiting pipeline succeeded when it shouldn't");
1302 if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1303 strlen("cannot exit pipeline mode")) != 0)
1304 pg_fatal("did not get expected error; got: %s",
1305 PQerrorMessage(conn));
1306 PQsendFlushRequest(conn);
1307 res = PQgetResult(conn);
1308 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1309 pg_fatal("unexpected result code %s from second pipeline item",
1310 PQresStatus(PQresultStatus(res)));
1311 PQclear(res);
1312 res = PQgetResult(conn);
1313 if (res != NULL)
1314 pg_fatal("did not receive terminating NULL");
1315 if (PQexitPipelineMode(conn) != 1)
1316 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1318 if (n_notices > 0)
1319 pg_fatal("got %d notice(s)", n_notices);
1320 fprintf(stderr, "ok - 1\n");
1322 /* Have a WARNING in the middle of a resultset */
1323 if (PQenterPipelineMode(conn) != 1)
1324 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1325 if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1326 pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1327 PQsendFlushRequest(conn);
1328 res = PQgetResult(conn);
1329 if (res == NULL)
1330 pg_fatal("unexpected NULL result received");
1331 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1332 pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1333 if (PQexitPipelineMode(conn) != 1)
1334 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1335 fprintf(stderr, "ok - 2\n");
1338 static void
1339 test_simple_pipeline(PGconn *conn)
1341 PGresult *res = NULL;
1342 const char *dummy_params[1] = {"1"};
1343 Oid dummy_param_oids[1] = {INT4OID};
1345 fprintf(stderr, "simple pipeline... ");
1348 * Enter pipeline mode and dispatch a set of operations, which we'll then
1349 * process the results of as they come in.
1351 * For a simple case we should be able to do this without interim
1352 * processing of results since our output buffer will give us enough slush
1353 * to work with and we won't block on sending. So blocking mode is fine.
1355 if (PQisnonblocking(conn))
1356 pg_fatal("Expected blocking connection mode");
1358 if (PQenterPipelineMode(conn) != 1)
1359 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1361 if (PQsendQueryParams(conn, "SELECT $1",
1362 1, dummy_param_oids, dummy_params,
1363 NULL, NULL, 0) != 1)
1364 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1366 if (PQexitPipelineMode(conn) != 0)
1367 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1369 if (PQpipelineSync(conn) != 1)
1370 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1372 res = PQgetResult(conn);
1373 if (res == NULL)
1374 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1375 PQerrorMessage(conn));
1377 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1378 pg_fatal("Unexpected result code %s from first pipeline item",
1379 PQresStatus(PQresultStatus(res)));
1381 PQclear(res);
1382 res = NULL;
1384 if (PQgetResult(conn) != NULL)
1385 pg_fatal("PQgetResult returned something extra after first query result.");
1388 * Even though we've processed the result there's still a sync to come and
1389 * we can't exit pipeline mode yet
1391 if (PQexitPipelineMode(conn) != 0)
1392 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1394 res = PQgetResult(conn);
1395 if (res == NULL)
1396 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1397 PQerrorMessage(conn));
1399 if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1400 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1401 PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1403 PQclear(res);
1404 res = NULL;
1406 if (PQgetResult(conn) != NULL)
1407 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1408 PQresStatus(PQresultStatus(res)));
1410 /* We're still in pipeline mode... */
1411 if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1412 pg_fatal("Fell out of pipeline mode somehow");
1414 /* ... until we end it, which we can safely do now */
1415 if (PQexitPipelineMode(conn) != 1)
1416 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1417 PQerrorMessage(conn));
1419 if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1420 pg_fatal("Exiting pipeline mode didn't seem to work");
1422 fprintf(stderr, "ok\n");
1425 static void
1426 test_singlerowmode(PGconn *conn)
1428 PGresult *res;
1429 int i;
1430 bool pipeline_ended = false;
1432 if (PQenterPipelineMode(conn) != 1)
1433 pg_fatal("failed to enter pipeline mode: %s",
1434 PQerrorMessage(conn));
1436 /* One series of three commands, using single-row mode for the first two. */
1437 for (i = 0; i < 3; i++)
1439 char *param[1];
1441 param[0] = psprintf("%d", 44 + i);
1443 if (PQsendQueryParams(conn,
1444 "SELECT generate_series(42, $1)",
1446 NULL,
1447 (const char **) param,
1448 NULL,
1449 NULL,
1450 0) != 1)
1451 pg_fatal("failed to send query: %s",
1452 PQerrorMessage(conn));
1453 pfree(param[0]);
1455 if (PQpipelineSync(conn) != 1)
1456 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1458 for (i = 0; !pipeline_ended; i++)
1460 bool first = true;
1461 bool saw_ending_tuplesok;
1462 bool isSingleTuple = false;
1464 /* Set single row mode for only first 2 SELECT queries */
1465 if (i < 2)
1467 if (PQsetSingleRowMode(conn) != 1)
1468 pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1471 /* Consume rows for this query */
1472 saw_ending_tuplesok = false;
1473 while ((res = PQgetResult(conn)) != NULL)
1475 ExecStatusType est = PQresultStatus(res);
1477 if (est == PGRES_PIPELINE_SYNC)
1479 fprintf(stderr, "end of pipeline reached\n");
1480 pipeline_ended = true;
1481 PQclear(res);
1482 if (i != 3)
1483 pg_fatal("Expected three results, got %d", i);
1484 break;
1487 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1488 if (first)
1490 if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1491 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1492 i, PQresStatus(est));
1493 if (i >= 2 && est != PGRES_TUPLES_OK)
1494 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1495 i, PQresStatus(est));
1496 first = false;
1499 fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1500 switch (est)
1502 case PGRES_TUPLES_OK:
1503 fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1504 saw_ending_tuplesok = true;
1505 if (isSingleTuple)
1507 if (PQntuples(res) == 0)
1508 fprintf(stderr, "all tuples received in query %d\n", i);
1509 else
1510 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1512 break;
1514 case PGRES_SINGLE_TUPLE:
1515 isSingleTuple = true;
1516 fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1517 break;
1519 default:
1520 pg_fatal("unexpected");
1522 PQclear(res);
1524 if (!pipeline_ended && !saw_ending_tuplesok)
1525 pg_fatal("didn't get expected terminating TUPLES_OK");
1529 * Now issue one command, get its results in with single-row mode, then
1530 * issue another command, and get its results in normal mode; make sure
1531 * the single-row mode flag is reset as expected.
1533 if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1534 0, NULL, NULL, NULL, NULL, 0) != 1)
1535 pg_fatal("failed to send query: %s",
1536 PQerrorMessage(conn));
1537 if (PQsendFlushRequest(conn) != 1)
1538 pg_fatal("failed to send flush request");
1539 if (PQsetSingleRowMode(conn) != 1)
1540 pg_fatal("PQsetSingleRowMode() failed");
1541 res = PQgetResult(conn);
1542 if (res == NULL)
1543 pg_fatal("unexpected NULL");
1544 if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
1545 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1546 PQresStatus(PQresultStatus(res)));
1547 res = PQgetResult(conn);
1548 if (res == NULL)
1549 pg_fatal("unexpected NULL");
1550 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1551 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1552 PQresStatus(PQresultStatus(res)));
1553 if (PQgetResult(conn) != NULL)
1554 pg_fatal("expected NULL result");
1556 if (PQsendQueryParams(conn, "SELECT 1",
1557 0, NULL, NULL, NULL, NULL, 0) != 1)
1558 pg_fatal("failed to send query: %s",
1559 PQerrorMessage(conn));
1560 if (PQsendFlushRequest(conn) != 1)
1561 pg_fatal("failed to send flush request");
1562 res = PQgetResult(conn);
1563 if (res == NULL)
1564 pg_fatal("unexpected NULL");
1565 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1566 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1567 PQresStatus(PQresultStatus(res)));
1568 if (PQgetResult(conn) != NULL)
1569 pg_fatal("expected NULL result");
1571 if (PQexitPipelineMode(conn) != 1)
1572 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1574 fprintf(stderr, "ok\n");
1578 * Simple test to verify that a pipeline is discarded as a whole when there's
1579 * an error, ignoring transaction commands.
1581 static void
1582 test_transaction(PGconn *conn)
1584 PGresult *res;
1585 bool expect_null;
1586 int num_syncs = 0;
1588 res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1589 "CREATE TABLE pq_pipeline_tst (id int)");
1590 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1591 pg_fatal("failed to create test table: %s",
1592 PQerrorMessage(conn));
1593 PQclear(res);
1595 if (PQenterPipelineMode(conn) != 1)
1596 pg_fatal("failed to enter pipeline mode: %s",
1597 PQerrorMessage(conn));
1598 if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1599 pg_fatal("could not send prepare on pipeline: %s",
1600 PQerrorMessage(conn));
1602 if (PQsendQueryParams(conn,
1603 "BEGIN",
1604 0, NULL, NULL, NULL, NULL, 0) != 1)
1605 pg_fatal("failed to send query: %s",
1606 PQerrorMessage(conn));
1607 if (PQsendQueryParams(conn,
1608 "SELECT 0/0",
1609 0, NULL, NULL, NULL, NULL, 0) != 1)
1610 pg_fatal("failed to send query: %s",
1611 PQerrorMessage(conn));
1614 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1615 * get out of the pipeline-aborted state first.
1617 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1618 pg_fatal("failed to execute prepared: %s",
1619 PQerrorMessage(conn));
1621 /* This insert fails because we're in pipeline-aborted state */
1622 if (PQsendQueryParams(conn,
1623 "INSERT INTO pq_pipeline_tst VALUES (1)",
1624 0, NULL, NULL, NULL, NULL, 0) != 1)
1625 pg_fatal("failed to send query: %s",
1626 PQerrorMessage(conn));
1627 if (PQpipelineSync(conn) != 1)
1628 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1629 num_syncs++;
1632 * This insert fails even though the pipeline got a SYNC, because we're in
1633 * an aborted transaction
1635 if (PQsendQueryParams(conn,
1636 "INSERT INTO pq_pipeline_tst VALUES (2)",
1637 0, NULL, NULL, NULL, NULL, 0) != 1)
1638 pg_fatal("failed to send query: %s",
1639 PQerrorMessage(conn));
1640 if (PQpipelineSync(conn) != 1)
1641 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1642 num_syncs++;
1645 * Send ROLLBACK using prepared stmt. This one works because we just did
1646 * PQpipelineSync above.
1648 if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1649 pg_fatal("failed to execute prepared: %s",
1650 PQerrorMessage(conn));
1653 * Now that we're out of a transaction and in pipeline-good mode, this
1654 * insert works
1656 if (PQsendQueryParams(conn,
1657 "INSERT INTO pq_pipeline_tst VALUES (3)",
1658 0, NULL, NULL, NULL, NULL, 0) != 1)
1659 pg_fatal("failed to send query: %s",
1660 PQerrorMessage(conn));
1661 /* Send two syncs now -- match up to SYNC messages below */
1662 if (PQpipelineSync(conn) != 1)
1663 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1664 num_syncs++;
1665 if (PQpipelineSync(conn) != 1)
1666 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1667 num_syncs++;
1669 expect_null = false;
1670 for (int i = 0;; i++)
1672 ExecStatusType restype;
1674 res = PQgetResult(conn);
1675 if (res == NULL)
1677 printf("%d: got NULL result\n", i);
1678 if (!expect_null)
1679 pg_fatal("did not expect NULL here");
1680 expect_null = false;
1681 continue;
1683 restype = PQresultStatus(res);
1684 printf("%d: got status %s", i, PQresStatus(restype));
1685 if (expect_null)
1686 pg_fatal("expected NULL");
1687 if (restype == PGRES_FATAL_ERROR)
1688 printf("; error: %s", PQerrorMessage(conn));
1689 else if (restype == PGRES_PIPELINE_ABORTED)
1691 printf(": command didn't run because pipeline aborted\n");
1693 else
1694 printf("\n");
1695 PQclear(res);
1697 if (restype == PGRES_PIPELINE_SYNC)
1698 num_syncs--;
1699 else
1700 expect_null = true;
1701 if (num_syncs <= 0)
1702 break;
1704 if (PQgetResult(conn) != NULL)
1705 pg_fatal("returned something extra after all the syncs: %s",
1706 PQresStatus(PQresultStatus(res)));
1708 if (PQexitPipelineMode(conn) != 1)
1709 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1711 /* We expect to find one tuple containing the value "3" */
1712 res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1713 if (PQresultStatus(res) != PGRES_TUPLES_OK)
1714 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1715 if (PQntuples(res) != 1)
1716 pg_fatal("did not get 1 tuple");
1717 if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1718 pg_fatal("did not get expected tuple");
1719 PQclear(res);
1721 fprintf(stderr, "ok\n");
1725 * In this test mode we send a stream of queries, with one in the middle
1726 * causing an error. Verify that we can still send some more after the
1727 * error and have libpq work properly.
1729 static void
1730 test_uniqviol(PGconn *conn)
1732 int sock = PQsocket(conn);
1733 PGresult *res;
1734 Oid paramTypes[2] = {INT8OID, INT8OID};
1735 const char *paramValues[2];
1736 char paramValue0[MAXINT8LEN];
1737 char paramValue1[MAXINT8LEN];
1738 int ctr = 0;
1739 int numsent = 0;
1740 int results = 0;
1741 bool read_done = false;
1742 bool write_done = false;
1743 bool error_sent = false;
1744 bool got_error = false;
1745 int switched = 0;
1746 int socketful = 0;
1747 fd_set in_fds;
1748 fd_set out_fds;
1750 fprintf(stderr, "uniqviol ...");
1752 PQsetnonblocking(conn, 1);
1754 paramValues[0] = paramValue0;
1755 paramValues[1] = paramValue1;
1756 sprintf(paramValue1, "42");
1758 res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1759 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1760 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1761 pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1763 res = PQexec(conn, "begin");
1764 if (PQresultStatus(res) != PGRES_COMMAND_OK)
1765 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1767 res = PQprepare(conn, "insertion",
1768 "insert into ppln_uniqviol values ($1, $2) returning id",
1769 2, paramTypes);
1770 if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1771 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1773 if (PQenterPipelineMode(conn) != 1)
1774 pg_fatal("failed to enter pipeline mode");
1776 while (!read_done)
1779 * Avoid deadlocks by reading everything the server has sent before
1780 * sending anything. (Special precaution is needed here to process
1781 * PQisBusy before testing the socket for read-readiness, because the
1782 * socket does not turn read-ready after "sending" queries in aborted
1783 * pipeline mode.)
1785 while (PQisBusy(conn) == 0)
1787 bool new_error;
1789 if (results >= numsent)
1791 if (write_done)
1792 read_done = true;
1793 break;
1796 res = PQgetResult(conn);
1797 new_error = process_result(conn, res, results, numsent);
1798 if (new_error && got_error)
1799 pg_fatal("got two errors");
1800 got_error |= new_error;
1801 if (results++ >= numsent - 1)
1803 if (write_done)
1804 read_done = true;
1805 break;
1809 if (read_done)
1810 break;
1812 FD_ZERO(&out_fds);
1813 FD_SET(sock, &out_fds);
1815 FD_ZERO(&in_fds);
1816 FD_SET(sock, &in_fds);
1818 if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1820 if (errno == EINTR)
1821 continue;
1822 pg_fatal("select() failed: %m");
1825 if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1826 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1829 * If the socket is writable and we haven't finished sending queries,
1830 * send some.
1832 if (!write_done && FD_ISSET(sock, &out_fds))
1834 for (;;)
1836 int flush;
1839 * provoke uniqueness violation exactly once after having
1840 * switched to read mode.
1842 if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1844 sprintf(paramValue0, "%d", numsent / 2);
1845 fprintf(stderr, "E");
1846 error_sent = true;
1848 else
1850 fprintf(stderr, ".");
1851 sprintf(paramValue0, "%d", ctr++);
1854 if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1855 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1856 numsent++;
1858 /* Are we done writing? */
1859 if (socketful != 0 && numsent % socketful == 42 && error_sent)
1861 if (PQsendFlushRequest(conn) != 1)
1862 pg_fatal("failed to send flush request");
1863 write_done = true;
1864 fprintf(stderr, "\ndone writing\n");
1865 PQflush(conn);
1866 break;
1869 /* is the outgoing socket full? */
1870 flush = PQflush(conn);
1871 if (flush == -1)
1872 pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1873 if (flush == 1)
1875 if (socketful == 0)
1876 socketful = numsent;
1877 fprintf(stderr, "\nswitch to reading\n");
1878 switched++;
1879 break;
1885 if (!got_error)
1886 pg_fatal("did not get expected error");
1888 fprintf(stderr, "ok\n");
1892 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1893 * the expected NULL that should follow it.
1895 * Returns true if we read a fatal error message, otherwise false.
1897 static bool
1898 process_result(PGconn *conn, PGresult *res, int results, int numsent)
1900 PGresult *res2;
1901 bool got_error = false;
1903 if (res == NULL)
1904 pg_fatal("got unexpected NULL");
1906 switch (PQresultStatus(res))
1908 case PGRES_FATAL_ERROR:
1909 got_error = true;
1910 fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1911 PQclear(res);
1913 res2 = PQgetResult(conn);
1914 if (res2 != NULL)
1915 pg_fatal("expected NULL, got %s",
1916 PQresStatus(PQresultStatus(res2)));
1917 break;
1919 case PGRES_TUPLES_OK:
1920 fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1921 PQclear(res);
1923 res2 = PQgetResult(conn);
1924 if (res2 != NULL)
1925 pg_fatal("expected NULL, got %s",
1926 PQresStatus(PQresultStatus(res2)));
1927 break;
1929 case PGRES_PIPELINE_ABORTED:
1930 fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1931 res2 = PQgetResult(conn);
1932 if (res2 != NULL)
1933 pg_fatal("expected NULL, got %s",
1934 PQresStatus(PQresultStatus(res2)));
1935 break;
1937 default:
1938 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1941 return got_error;
1945 static void
1946 usage(const char *progname)
1948 fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1949 fprintf(stderr, "Usage:\n");
1950 fprintf(stderr, " %s [OPTION] tests\n", progname);
1951 fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1952 fprintf(stderr, "\nOptions:\n");
1953 fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1954 fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1957 static void
1958 print_test_list(void)
1960 printf("cancel\n");
1961 printf("disallowed_in_pipeline\n");
1962 printf("multi_pipelines\n");
1963 printf("nosync\n");
1964 printf("pipeline_abort\n");
1965 printf("pipeline_idle\n");
1966 printf("pipelined_insert\n");
1967 printf("prepared\n");
1968 printf("simple_pipeline\n");
1969 printf("singlerow\n");
1970 printf("transaction\n");
1971 printf("uniqviol\n");
1975 main(int argc, char **argv)
1977 const char *conninfo = "";
1978 PGconn *conn;
1979 FILE *trace;
1980 char *testname;
1981 int numrows = 10000;
1982 PGresult *res;
1983 int c;
1985 while ((c = getopt(argc, argv, "r:t:")) != -1)
1987 switch (c)
1989 case 'r': /* numrows */
1990 errno = 0;
1991 numrows = strtol(optarg, NULL, 10);
1992 if (errno != 0 || numrows <= 0)
1994 fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1995 optarg);
1996 exit(1);
1998 break;
1999 case 't': /* trace file */
2000 tracefile = pg_strdup(optarg);
2001 break;
2005 if (optind < argc)
2007 testname = pg_strdup(argv[optind]);
2008 optind++;
2010 else
2012 usage(argv[0]);
2013 exit(1);
2016 if (strcmp(testname, "tests") == 0)
2018 print_test_list();
2019 exit(0);
2022 if (optind < argc)
2024 conninfo = pg_strdup(argv[optind]);
2025 optind++;
2028 /* Make a connection to the database */
2029 conn = PQconnectdb(conninfo);
2030 if (PQstatus(conn) != CONNECTION_OK)
2032 fprintf(stderr, "Connection to database failed: %s\n",
2033 PQerrorMessage(conn));
2034 exit_nicely(conn);
2037 res = PQexec(conn, "SET lc_messages TO \"C\"");
2038 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2039 pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
2040 res = PQexec(conn, "SET debug_parallel_query = off");
2041 if (PQresultStatus(res) != PGRES_COMMAND_OK)
2042 pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
2044 /* Set the trace file, if requested */
2045 if (tracefile != NULL)
2047 if (strcmp(tracefile, "-") == 0)
2048 trace = stdout;
2049 else
2050 trace = fopen(tracefile, "w");
2051 if (trace == NULL)
2052 pg_fatal("could not open file \"%s\": %m", tracefile);
2054 /* Make it line-buffered */
2055 setvbuf(trace, NULL, PG_IOLBF, 0);
2057 PQtrace(conn, trace);
2058 PQsetTraceFlags(conn,
2059 PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2062 if (strcmp(testname, "cancel") == 0)
2063 test_cancel(conn);
2064 else if (strcmp(testname, "disallowed_in_pipeline") == 0)
2065 test_disallowed_in_pipeline(conn);
2066 else if (strcmp(testname, "multi_pipelines") == 0)
2067 test_multi_pipelines(conn);
2068 else if (strcmp(testname, "nosync") == 0)
2069 test_nosync(conn);
2070 else if (strcmp(testname, "pipeline_abort") == 0)
2071 test_pipeline_abort(conn);
2072 else if (strcmp(testname, "pipeline_idle") == 0)
2073 test_pipeline_idle(conn);
2074 else if (strcmp(testname, "pipelined_insert") == 0)
2075 test_pipelined_insert(conn, numrows);
2076 else if (strcmp(testname, "prepared") == 0)
2077 test_prepared(conn);
2078 else if (strcmp(testname, "simple_pipeline") == 0)
2079 test_simple_pipeline(conn);
2080 else if (strcmp(testname, "singlerow") == 0)
2081 test_singlerowmode(conn);
2082 else if (strcmp(testname, "transaction") == 0)
2083 test_transaction(conn);
2084 else if (strcmp(testname, "uniqviol") == 0)
2085 test_uniqviol(conn);
2086 else
2088 fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
2089 exit(1);
2092 /* close the connection to the database and cleanup */
2093 PQfinish(conn);
2094 return 0;