1 /*-------------------------------------------------------------------------
4 * Verify libpq pipeline execution functionality
6 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * src/test/modules/libpq_pipeline/libpq_pipeline.c
13 *-------------------------------------------------------------------------
16 #include "postgres_fe.h"
18 #include <sys/select.h>
21 #include "catalog/pg_type_d.h"
22 #include "common/fe_memutils.h"
24 #include "pg_getopt.h"
25 #include "portability/instr_time.h"
28 static void exit_nicely(PGconn
*conn
);
29 static void pg_attribute_noreturn() pg_fatal_impl(int line
, const char *fmt
,...)
30 pg_attribute_printf(2, 3);
31 static bool process_result(PGconn
*conn
, PGresult
*res
, int results
,
34 const char *const progname
= "libpq_pipeline";
36 /* Options and defaults */
37 char *tracefile
= NULL
; /* path to PQtrace() file */
41 #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
46 static const char *const drop_table_sql
=
47 "DROP TABLE IF EXISTS pq_pipeline_demo";
48 static const char *const create_table_sql
=
49 "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
51 static const char *const insert_sql
=
52 "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 static const char *const insert_sql2
=
54 "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
56 /* max char length of an int32/64, plus sign and null terminator */
61 exit_nicely(PGconn
*conn
)
68 * The following few functions are wrapped in macros to make the reported line
69 * number in an error match the line number of the invocation.
73 * Print an error to stderr and terminate the program.
75 #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
77 pg_attribute_noreturn()
78 pg_fatal_impl(int line
, const char *fmt
,...)
84 fprintf(stderr
, "\n%s:%d: ", progname
, line
);
86 vfprintf(stderr
, fmt
, args
);
88 Assert(fmt
[strlen(fmt
) - 1] != '\n');
89 fprintf(stderr
, "\n");
94 * Check that the query on the given connection got canceled.
96 #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
98 confirm_query_canceled_impl(int line
, PGconn
*conn
)
100 PGresult
*res
= NULL
;
102 res
= PQgetResult(conn
);
104 pg_fatal_impl(line
, "PQgetResult returned null: %s",
105 PQerrorMessage(conn
));
106 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
107 pg_fatal_impl(line
, "query did not fail when it was expected");
108 if (strcmp(PQresultErrorField(res
, PG_DIAG_SQLSTATE
), "57014") != 0)
109 pg_fatal_impl(line
, "query failed with a different error than cancellation: %s",
110 PQerrorMessage(conn
));
113 while (PQisBusy(conn
))
114 PQconsumeInput(conn
);
117 #define send_cancellable_query(conn, monitorConn) \
118 send_cancellable_query_impl(__LINE__, conn, monitorConn)
120 send_cancellable_query_impl(int line
, PGconn
*conn
, PGconn
*monitorConn
)
122 const char *env_wait
;
123 const Oid paramTypes
[1] = {INT4OID
};
124 int procpid
= PQbackendPID(conn
);
126 env_wait
= getenv("PG_TEST_TIMEOUT_DEFAULT");
127 if (env_wait
== NULL
)
130 if (PQsendQueryParams(conn
, "SELECT pg_sleep($1)", 1, paramTypes
,
131 &env_wait
, NULL
, NULL
, 0) != 1)
132 pg_fatal_impl(line
, "failed to send query: %s", PQerrorMessage(conn
));
135 * Wait until the query is actually running. Otherwise sending a
136 * cancellation request might not cancel the query due to race conditions.
142 const char *paramValues
[1];
145 snprintf(pidval
, 16, "%d", procpid
);
146 paramValues
[0] = pidval
;
148 res
= PQexecParams(monitorConn
,
149 "SELECT count(*) FROM pg_stat_activity WHERE "
150 "pid = $1 AND state = 'active'",
151 1, NULL
, paramValues
, NULL
, NULL
, 1);
153 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
154 pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn
));
155 if (PQntuples(res
) != 1)
156 pg_fatal("unexpected number of rows received: %d", PQntuples(res
));
157 if (PQnfields(res
) != 1)
158 pg_fatal("unexpected number of columns received: %d", PQnfields(res
));
159 value
= PQgetvalue(res
, 0, 0);
167 /* wait 10ms before polling again */
173 * Create a new connection with the same conninfo as the given one.
176 copy_connection(PGconn
*conn
)
179 PQconninfoOption
*opts
= PQconninfo(conn
);
180 const char **keywords
;
185 for (PQconninfoOption
*opt
= opts
; opt
->keyword
!= NULL
; ++opt
)
188 keywords
= pg_malloc(sizeof(char *) * nopts
);
189 vals
= pg_malloc(sizeof(char *) * nopts
);
191 for (PQconninfoOption
*opt
= opts
; opt
->keyword
!= NULL
; ++opt
)
195 keywords
[i
] = opt
->keyword
;
200 keywords
[i
] = vals
[i
] = NULL
;
202 copyConn
= PQconnectdbParams(keywords
, vals
, false);
204 if (PQstatus(copyConn
) != CONNECTION_OK
)
205 pg_fatal("Connection to database failed: %s",
206 PQerrorMessage(copyConn
));
212 * Test query cancellation routines
215 test_cancel(PGconn
*conn
)
221 fprintf(stderr
, "test cancellations... ");
223 if (PQsetnonblocking(conn
, 1) != 0)
224 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn
));
227 * Make a separate connection to the database to monitor the query on the
230 monitorConn
= copy_connection(conn
);
231 Assert(PQstatus(monitorConn
) == CONNECTION_OK
);
234 send_cancellable_query(conn
, monitorConn
);
235 cancel
= PQgetCancel(conn
);
236 if (!PQcancel(cancel
, errorbuf
, sizeof(errorbuf
)))
237 pg_fatal("failed to run PQcancel: %s", errorbuf
);
238 confirm_query_canceled(conn
);
240 /* PGcancel object can be reused for the next query */
241 send_cancellable_query(conn
, monitorConn
);
242 if (!PQcancel(cancel
, errorbuf
, sizeof(errorbuf
)))
243 pg_fatal("failed to run PQcancel: %s", errorbuf
);
244 confirm_query_canceled(conn
);
246 PQfreeCancel(cancel
);
248 /* test PQrequestCancel */
249 send_cancellable_query(conn
, monitorConn
);
250 if (!PQrequestCancel(conn
))
251 pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn
));
252 confirm_query_canceled(conn
);
254 fprintf(stderr
, "ok\n");
258 test_disallowed_in_pipeline(PGconn
*conn
)
260 PGresult
*res
= NULL
;
262 fprintf(stderr
, "test error cases... ");
264 if (PQisnonblocking(conn
))
265 pg_fatal("Expected blocking connection mode");
267 if (PQenterPipelineMode(conn
) != 1)
268 pg_fatal("Unable to enter pipeline mode");
270 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
271 pg_fatal("Pipeline mode not activated properly");
273 /* PQexec should fail in pipeline mode */
274 res
= PQexec(conn
, "SELECT 1");
275 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
276 pg_fatal("PQexec should fail in pipeline mode but succeeded");
277 if (strcmp(PQerrorMessage(conn
),
278 "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
279 pg_fatal("did not get expected error message; got: \"%s\"",
280 PQerrorMessage(conn
));
282 /* PQsendQuery should fail in pipeline mode */
283 if (PQsendQuery(conn
, "SELECT 1") != 0)
284 pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
285 if (strcmp(PQerrorMessage(conn
),
286 "PQsendQuery not allowed in pipeline mode\n") != 0)
287 pg_fatal("did not get expected error message; got: \"%s\"",
288 PQerrorMessage(conn
));
290 /* Entering pipeline mode when already in pipeline mode is OK */
291 if (PQenterPipelineMode(conn
) != 1)
292 pg_fatal("re-entering pipeline mode should be a no-op but failed");
294 if (PQisBusy(conn
) != 0)
295 pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
297 /* ok, back to normal command mode */
298 if (PQexitPipelineMode(conn
) != 1)
299 pg_fatal("couldn't exit idle empty pipeline mode");
301 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
302 pg_fatal("Pipeline mode not terminated properly");
304 /* exiting pipeline mode when not in pipeline mode should be a no-op */
305 if (PQexitPipelineMode(conn
) != 1)
306 pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
308 /* can now PQexec again */
309 res
= PQexec(conn
, "SELECT 1");
310 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
311 pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
312 PQerrorMessage(conn
));
314 fprintf(stderr
, "ok\n");
318 test_multi_pipelines(PGconn
*conn
)
320 PGresult
*res
= NULL
;
321 const char *dummy_params
[1] = {"1"};
322 Oid dummy_param_oids
[1] = {INT4OID
};
324 fprintf(stderr
, "multi pipeline... ");
327 * Queue up a couple of small pipelines and process each without returning
328 * to command mode first.
330 if (PQenterPipelineMode(conn
) != 1)
331 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
334 if (PQsendQueryParams(conn
, "SELECT $1", 1, dummy_param_oids
,
335 dummy_params
, NULL
, NULL
, 0) != 1)
336 pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn
));
338 if (PQpipelineSync(conn
) != 1)
339 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn
));
341 /* second pipeline */
342 if (PQsendQueryParams(conn
, "SELECT $1", 1, dummy_param_oids
,
343 dummy_params
, NULL
, NULL
, 0) != 1)
344 pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn
));
346 /* Skip flushing once. */
347 if (PQsendPipelineSync(conn
) != 1)
348 pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn
));
351 if (PQsendQueryParams(conn
, "SELECT $1", 1, dummy_param_oids
,
352 dummy_params
, NULL
, NULL
, 0) != 1)
353 pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn
));
355 if (PQpipelineSync(conn
) != 1)
356 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
358 /* OK, start processing the results */
362 res
= PQgetResult(conn
);
364 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
365 PQerrorMessage(conn
));
367 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
368 pg_fatal("Unexpected result code %s from first pipeline item",
369 PQresStatus(PQresultStatus(res
)));
373 if (PQgetResult(conn
) != NULL
)
374 pg_fatal("PQgetResult returned something extra after first result");
376 if (PQexitPipelineMode(conn
) != 0)
377 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
379 res
= PQgetResult(conn
);
381 pg_fatal("PQgetResult returned null when sync result expected: %s",
382 PQerrorMessage(conn
));
384 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
385 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
386 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
389 /* second pipeline */
391 res
= PQgetResult(conn
);
393 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
394 PQerrorMessage(conn
));
396 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
397 pg_fatal("Unexpected result code %s from second pipeline item",
398 PQresStatus(PQresultStatus(res
)));
402 if (PQgetResult(conn
) != NULL
)
403 pg_fatal("PQgetResult returned something extra after first result");
405 if (PQexitPipelineMode(conn
) != 0)
406 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
408 res
= PQgetResult(conn
);
410 pg_fatal("PQgetResult returned null when sync result expected: %s",
411 PQerrorMessage(conn
));
413 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
414 pg_fatal("Unexpected result code %s instead of sync result, error: %s",
415 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
420 res
= PQgetResult(conn
);
422 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
423 PQerrorMessage(conn
));
425 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
426 pg_fatal("Unexpected result code %s from third pipeline item",
427 PQresStatus(PQresultStatus(res
)));
429 res
= PQgetResult(conn
);
431 pg_fatal("Expected null result, got %s",
432 PQresStatus(PQresultStatus(res
)));
434 res
= PQgetResult(conn
);
436 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
437 PQerrorMessage(conn
));
439 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
440 pg_fatal("Unexpected result code %s from second pipeline sync",
441 PQresStatus(PQresultStatus(res
)));
443 /* We're still in pipeline mode ... */
444 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
445 pg_fatal("Fell out of pipeline mode somehow");
447 /* until we end it, which we can safely do now */
448 if (PQexitPipelineMode(conn
) != 1)
449 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
450 PQerrorMessage(conn
));
452 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
453 pg_fatal("exiting pipeline mode didn't seem to work");
455 fprintf(stderr
, "ok\n");
459 * Test behavior when a pipeline dispatches a number of commands that are
460 * not flushed by a sync point.
463 test_nosync(PGconn
*conn
)
467 int sock
= PQsocket(conn
);
469 fprintf(stderr
, "nosync... ");
472 pg_fatal("invalid socket");
474 if (PQenterPipelineMode(conn
) != 1)
475 pg_fatal("could not enter pipeline mode");
476 for (int i
= 0; i
< numqueries
; i
++)
481 if (PQsendQueryParams(conn
, "SELECT repeat('xyzxz', 12)",
482 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
483 pg_fatal("error sending select: %s", PQerrorMessage(conn
));
487 * If the server has written anything to us, read (some of) it now.
489 FD_ZERO(&input_mask
);
490 FD_SET(sock
, &input_mask
);
493 if (select(sock
+ 1, &input_mask
, NULL
, NULL
, &tv
) < 0)
495 fprintf(stderr
, "select() failed: %s\n", strerror(errno
));
498 if (FD_ISSET(sock
, &input_mask
) && PQconsumeInput(conn
) != 1)
499 pg_fatal("failed to read from server: %s", PQerrorMessage(conn
));
502 /* tell server to flush its output buffer */
503 if (PQsendFlushRequest(conn
) != 1)
504 pg_fatal("failed to send flush request");
507 /* Now read all results */
512 res
= PQgetResult(conn
);
514 /* NULL results are only expected after TUPLES_OK */
516 pg_fatal("got unexpected NULL result after %d results", results
);
518 /* We expect exactly one TUPLES_OK result for each query we sent */
519 if (PQresultStatus(res
) == PGRES_TUPLES_OK
)
523 /* and one NULL result should follow each */
524 res2
= PQgetResult(conn
);
526 pg_fatal("expected NULL, got %s",
527 PQresStatus(PQresultStatus(res2
)));
531 /* if we're done, we're done */
532 if (results
== numqueries
)
538 /* anything else is unexpected */
539 pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res
)));
542 fprintf(stderr
, "ok\n");
546 * When an operation in a pipeline fails the rest of the pipeline is flushed. We
547 * still have to get results for each pipeline item, but the item will just be
548 * a PGRES_PIPELINE_ABORTED code.
550 * This intentionally doesn't use a transaction to wrap the pipeline. You should
551 * usually use an xact, but in this case we want to observe the effects of each
555 test_pipeline_abort(PGconn
*conn
)
557 PGresult
*res
= NULL
;
558 const char *dummy_params
[1] = {"1"};
559 Oid dummy_param_oids
[1] = {INT4OID
};
564 fprintf(stderr
, "aborted pipeline... ");
566 res
= PQexec(conn
, drop_table_sql
);
567 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
568 pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn
));
570 res
= PQexec(conn
, create_table_sql
);
571 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
572 pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn
));
575 * Queue up a couple of small pipelines and process each without returning
576 * to command mode first. Make sure the second operation in the first
579 if (PQenterPipelineMode(conn
) != 1)
580 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
582 dummy_params
[0] = "1";
583 if (PQsendQueryParams(conn
, insert_sql
, 1, dummy_param_oids
,
584 dummy_params
, NULL
, NULL
, 0) != 1)
585 pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn
));
587 if (PQsendQueryParams(conn
, "SELECT no_such_function($1)",
588 1, dummy_param_oids
, dummy_params
,
590 pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn
));
592 dummy_params
[0] = "2";
593 if (PQsendQueryParams(conn
, insert_sql
, 1, dummy_param_oids
,
594 dummy_params
, NULL
, NULL
, 0) != 1)
595 pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn
));
597 if (PQpipelineSync(conn
) != 1)
598 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
600 dummy_params
[0] = "3";
601 if (PQsendQueryParams(conn
, insert_sql
, 1, dummy_param_oids
,
602 dummy_params
, NULL
, NULL
, 0) != 1)
603 pg_fatal("dispatching second-pipeline insert failed: %s",
604 PQerrorMessage(conn
));
606 if (PQpipelineSync(conn
) != 1)
607 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
610 * OK, start processing the pipeline results.
612 * We should get a command-ok for the first query, then a fatal error and
613 * a pipeline aborted message for the second insert, a pipeline-end, then
614 * a command-ok and a pipeline-ok for the second pipeline operation.
616 res
= PQgetResult(conn
);
618 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
619 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
620 pg_fatal("Unexpected result status %s: %s",
621 PQresStatus(PQresultStatus(res
)),
622 PQresultErrorMessage(res
));
625 /* NULL result to signal end-of-results for this command */
626 if ((res
= PQgetResult(conn
)) != NULL
)
627 pg_fatal("Expected null result, got %s",
628 PQresStatus(PQresultStatus(res
)));
630 /* Second query caused error, so we expect an error next */
631 res
= PQgetResult(conn
);
633 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
634 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
635 pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
636 PQresStatus(PQresultStatus(res
)));
639 /* NULL result to signal end-of-results for this command */
640 if ((res
= PQgetResult(conn
)) != NULL
)
641 pg_fatal("Expected null result, got %s",
642 PQresStatus(PQresultStatus(res
)));
645 * pipeline should now be aborted.
647 * Note that we could still queue more queries at this point if we wanted;
648 * they'd get added to a new third pipeline since we've already sent a
649 * second. The aborted flag relates only to the pipeline being received.
651 if (PQpipelineStatus(conn
) != PQ_PIPELINE_ABORTED
)
652 pg_fatal("pipeline should be flagged as aborted but isn't");
654 /* third query in pipeline, the second insert */
655 res
= PQgetResult(conn
);
657 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
658 if (PQresultStatus(res
) != PGRES_PIPELINE_ABORTED
)
659 pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
660 PQresStatus(PQresultStatus(res
)));
663 /* NULL result to signal end-of-results for this command */
664 if ((res
= PQgetResult(conn
)) != NULL
)
665 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res
)));
667 if (PQpipelineStatus(conn
) != PQ_PIPELINE_ABORTED
)
668 pg_fatal("pipeline should be flagged as aborted but isn't");
670 /* Ensure we're still in pipeline */
671 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
672 pg_fatal("Fell out of pipeline mode somehow");
675 * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
677 * (This is so clients know to start processing results normally again and
678 * can tell the difference between skipped commands and the sync.)
680 res
= PQgetResult(conn
);
682 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
683 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
684 pg_fatal("Unexpected result code from first pipeline sync\n"
685 "Expected PGRES_PIPELINE_SYNC, got %s",
686 PQresStatus(PQresultStatus(res
)));
689 if (PQpipelineStatus(conn
) == PQ_PIPELINE_ABORTED
)
690 pg_fatal("sync should've cleared the aborted flag but didn't");
692 /* We're still in pipeline mode... */
693 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
694 pg_fatal("Fell out of pipeline mode somehow");
696 /* the insert from the second pipeline */
697 res
= PQgetResult(conn
);
699 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
700 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
701 pg_fatal("Unexpected result code %s from first item in second pipeline",
702 PQresStatus(PQresultStatus(res
)));
705 /* Read the NULL result at the end of the command */
706 if ((res
= PQgetResult(conn
)) != NULL
)
707 pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res
)));
709 /* the second pipeline sync */
710 if ((res
= PQgetResult(conn
)) == NULL
)
711 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
712 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
713 pg_fatal("Unexpected result code %s from second pipeline sync",
714 PQresStatus(PQresultStatus(res
)));
717 if ((res
= PQgetResult(conn
)) != NULL
)
718 pg_fatal("Expected null result, got %s: %s",
719 PQresStatus(PQresultStatus(res
)),
720 PQerrorMessage(conn
));
722 /* Try to send two queries in one command */
723 if (PQsendQueryParams(conn
, "SELECT 1; SELECT 2", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
724 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
725 if (PQpipelineSync(conn
) != 1)
726 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
728 while ((res
= PQgetResult(conn
)) != NULL
)
730 switch (PQresultStatus(res
))
732 case PGRES_FATAL_ERROR
:
733 if (strcmp(PQresultErrorField(res
, PG_DIAG_SQLSTATE
), "42601") != 0)
734 pg_fatal("expected error about multiple commands, got %s",
735 PQerrorMessage(conn
));
736 printf("got expected %s", PQerrorMessage(conn
));
740 pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res
)));
745 pg_fatal("did not get cannot-insert-multiple-commands error");
746 res
= PQgetResult(conn
);
748 pg_fatal("got NULL result");
749 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
750 pg_fatal("Unexpected result code %s from pipeline sync",
751 PQresStatus(PQresultStatus(res
)));
752 fprintf(stderr
, "ok\n");
754 /* Test single-row mode with an error partways */
755 if (PQsendQueryParams(conn
, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
756 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
757 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
758 if (PQpipelineSync(conn
) != 1)
759 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
760 PQsetSingleRowMode(conn
);
763 while ((res
= PQgetResult(conn
)) != NULL
)
765 switch (PQresultStatus(res
))
767 case PGRES_SINGLE_TUPLE
:
768 printf("got row: %s\n", PQgetvalue(res
, 0, 0));
771 case PGRES_FATAL_ERROR
:
772 if (strcmp(PQresultErrorField(res
, PG_DIAG_SQLSTATE
), "22012") != 0)
773 pg_fatal("expected division-by-zero, got: %s (%s)",
774 PQerrorMessage(conn
),
775 PQresultErrorField(res
, PG_DIAG_SQLSTATE
));
776 printf("got expected division-by-zero\n");
780 pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res
)));
785 pg_fatal("did not get division-by-zero error");
787 pg_fatal("did not get three rows");
788 /* the third pipeline sync */
789 if ((res
= PQgetResult(conn
)) == NULL
)
790 pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn
));
791 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
792 pg_fatal("Unexpected result code %s from third pipeline sync",
793 PQresStatus(PQresultStatus(res
)));
796 /* We're still in pipeline mode... */
797 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
798 pg_fatal("Fell out of pipeline mode somehow");
800 /* until we end it, which we can safely do now */
801 if (PQexitPipelineMode(conn
) != 1)
802 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
803 PQerrorMessage(conn
));
805 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
806 pg_fatal("exiting pipeline mode didn't seem to work");
809 * Since we fired the pipelines off without a surrounding xact, the results
812 * - Implicit xact started by server around 1st pipeline
813 * - First insert applied
814 * - Second statement aborted xact
815 * - Third insert skipped
816 * - Sync rolled back first implicit xact
817 * - Implicit xact created by server around 2nd pipeline
818 * - insert applied from 2nd pipeline
819 * - Sync commits 2nd xact
821 * So we should only have the value 3 that we inserted.
823 res
= PQexec(conn
, "SELECT itemno FROM pq_pipeline_demo");
825 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
826 pg_fatal("Expected tuples, got %s: %s",
827 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
828 if (PQntuples(res
) != 1)
829 pg_fatal("expected 1 result, got %d", PQntuples(res
));
830 for (i
= 0; i
< PQntuples(res
); i
++)
832 const char *val
= PQgetvalue(res
, i
, 0);
834 if (strcmp(val
, "3") != 0)
835 pg_fatal("expected only insert with value 3, got %s", val
);
840 fprintf(stderr
, "ok\n");
843 /* State machine enum for test_pipelined_insert */
844 enum PipelineInsertStep
857 test_pipelined_insert(PGconn
*conn
, int n_rows
)
859 Oid insert_param_oids
[2] = {INT4OID
, INT8OID
};
860 const char *insert_params
[2];
861 char insert_param_0
[MAXINTLEN
];
862 char insert_param_1
[MAXINT8LEN
];
863 enum PipelineInsertStep send_step
= BI_BEGIN_TX
,
864 recv_step
= BI_BEGIN_TX
;
868 insert_params
[0] = insert_param_0
;
869 insert_params
[1] = insert_param_1
;
871 rows_to_send
= rows_to_receive
= n_rows
;
874 * Do a pipelined insert into a table created at the start of the pipeline
876 if (PQenterPipelineMode(conn
) != 1)
877 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
879 while (send_step
!= BI_PREPARE
)
886 sql
= "BEGIN TRANSACTION";
887 send_step
= BI_DROP_TABLE
;
891 sql
= drop_table_sql
;
892 send_step
= BI_CREATE_TABLE
;
895 case BI_CREATE_TABLE
:
896 sql
= create_table_sql
;
897 send_step
= BI_PREPARE
;
901 pg_fatal("invalid state");
902 sql
= NULL
; /* keep compiler quiet */
905 pg_debug("sending: %s\n", sql
);
906 if (PQsendQueryParams(conn
, sql
,
907 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
908 pg_fatal("dispatching %s failed: %s", sql
, PQerrorMessage(conn
));
911 Assert(send_step
== BI_PREPARE
);
912 pg_debug("sending: %s\n", insert_sql2
);
913 if (PQsendPrepare(conn
, "my_insert", insert_sql2
, 2, insert_param_oids
) != 1)
914 pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn
));
915 send_step
= BI_INSERT_ROWS
;
918 * Now we start inserting. We'll be sending enough data that we could fill
919 * our output buffer, so to avoid deadlocking we need to enter nonblocking
920 * mode and consume input while we send more output. As results of each
921 * query are processed we should pop them to allow processing of the next
922 * query. There's no need to finish the pipeline before processing
925 if (PQsetnonblocking(conn
, 1) != 0)
926 pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn
));
928 while (recv_step
!= BI_DONE
)
934 sock
= PQsocket(conn
);
937 break; /* shouldn't happen */
939 FD_ZERO(&input_mask
);
940 FD_SET(sock
, &input_mask
);
941 FD_ZERO(&output_mask
);
942 FD_SET(sock
, &output_mask
);
944 if (select(sock
+ 1, &input_mask
, &output_mask
, NULL
, NULL
) < 0)
946 fprintf(stderr
, "select() failed: %s\n", strerror(errno
));
951 * Process any results, so we keep the server's output buffer free
952 * flowing and it can continue to process input
954 if (FD_ISSET(sock
, &input_mask
))
956 PQconsumeInput(conn
);
958 /* Read until we'd block if we tried to read */
959 while (!PQisBusy(conn
) && recv_step
< BI_DONE
)
962 const char *cmdtag
= "";
963 const char *description
= "";
967 * Read next result. If no more results from this query,
968 * advance to the next query
970 res
= PQgetResult(conn
);
974 status
= PGRES_COMMAND_OK
;
982 cmdtag
= "DROP TABLE";
985 case BI_CREATE_TABLE
:
986 cmdtag
= "CREATE TABLE";
991 description
= "PREPARE";
997 if (rows_to_receive
== 0)
1006 description
= "SYNC";
1007 status
= PGRES_PIPELINE_SYNC
;
1012 pg_fatal("unreachable state");
1015 if (PQresultStatus(res
) != status
)
1016 pg_fatal("%s reported status %s, expected %s\n"
1017 "Error message: \"%s\"",
1018 description
, PQresStatus(PQresultStatus(res
)),
1019 PQresStatus(status
), PQerrorMessage(conn
));
1021 if (strncmp(PQcmdStatus(res
), cmdtag
, strlen(cmdtag
)) != 0)
1022 pg_fatal("%s expected command tag '%s', got '%s'",
1023 description
, cmdtag
, PQcmdStatus(res
));
1025 pg_debug("Got %s OK\n", cmdtag
[0] != '\0' ? cmdtag
: description
);
1031 /* Write more rows and/or the end pipeline message, if needed */
1032 if (FD_ISSET(sock
, &output_mask
))
1036 if (send_step
== BI_INSERT_ROWS
)
1038 snprintf(insert_param_0
, MAXINTLEN
, "%d", rows_to_send
);
1039 /* use up some buffer space with a wide value */
1040 snprintf(insert_param_1
, MAXINT8LEN
, "%lld", 1LL << 62);
1042 if (PQsendQueryPrepared(conn
, "my_insert",
1043 2, insert_params
, NULL
, NULL
, 0) == 1)
1045 pg_debug("sent row %d\n", rows_to_send
);
1048 if (rows_to_send
== 0)
1054 * in nonblocking mode, so it's OK for an insert to fail
1057 fprintf(stderr
, "WARNING: failed to send insert #%d: %s\n",
1058 rows_to_send
, PQerrorMessage(conn
));
1061 else if (send_step
== BI_COMMIT_TX
)
1063 if (PQsendQueryParams(conn
, "COMMIT",
1064 0, NULL
, NULL
, NULL
, NULL
, 0) == 1)
1066 pg_debug("sent COMMIT\n");
1071 fprintf(stderr
, "WARNING: failed to send commit: %s\n",
1072 PQerrorMessage(conn
));
1075 else if (send_step
== BI_SYNC
)
1077 if (PQpipelineSync(conn
) == 1)
1079 fprintf(stdout
, "pipeline sync sent\n");
1084 fprintf(stderr
, "WARNING: pipeline sync failed: %s\n",
1085 PQerrorMessage(conn
));
1091 /* We've got the sync message and the pipeline should be done */
1092 if (PQexitPipelineMode(conn
) != 1)
1093 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1094 PQerrorMessage(conn
));
1096 if (PQsetnonblocking(conn
, 0) != 0)
1097 pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn
));
1099 fprintf(stderr
, "ok\n");
1103 test_prepared(PGconn
*conn
)
1105 PGresult
*res
= NULL
;
1106 Oid param_oids
[1] = {INT4OID
};
1107 Oid expected_oids
[4];
1110 fprintf(stderr
, "prepared... ");
1112 if (PQenterPipelineMode(conn
) != 1)
1113 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
1114 if (PQsendPrepare(conn
, "select_one", "SELECT $1, '42', $1::numeric, "
1116 1, param_oids
) != 1)
1117 pg_fatal("preparing query failed: %s", PQerrorMessage(conn
));
1118 expected_oids
[0] = INT4OID
;
1119 expected_oids
[1] = TEXTOID
;
1120 expected_oids
[2] = NUMERICOID
;
1121 expected_oids
[3] = INTERVALOID
;
1122 if (PQsendDescribePrepared(conn
, "select_one") != 1)
1123 pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn
));
1124 if (PQpipelineSync(conn
) != 1)
1125 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1127 res
= PQgetResult(conn
);
1129 pg_fatal("PQgetResult returned null");
1130 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1131 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1133 res
= PQgetResult(conn
);
1135 pg_fatal("expected NULL result");
1137 res
= PQgetResult(conn
);
1139 pg_fatal("PQgetResult returned NULL");
1140 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1141 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1142 if (PQnfields(res
) != lengthof(expected_oids
))
1143 pg_fatal("expected %zu columns, got %d",
1144 lengthof(expected_oids
), PQnfields(res
));
1145 for (int i
= 0; i
< PQnfields(res
); i
++)
1147 typ
= PQftype(res
, i
);
1148 if (typ
!= expected_oids
[i
])
1149 pg_fatal("field %d: expected type %u, got %u",
1150 i
, expected_oids
[i
], typ
);
1153 res
= PQgetResult(conn
);
1155 pg_fatal("expected NULL result");
1157 res
= PQgetResult(conn
);
1158 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1159 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1161 fprintf(stderr
, "closing statement..");
1162 if (PQsendClosePrepared(conn
, "select_one") != 1)
1163 pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn
));
1164 if (PQpipelineSync(conn
) != 1)
1165 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1167 res
= PQgetResult(conn
);
1169 pg_fatal("expected non-NULL result");
1170 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1171 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1173 res
= PQgetResult(conn
);
1175 pg_fatal("expected NULL result");
1176 res
= PQgetResult(conn
);
1177 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1178 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1180 if (PQexitPipelineMode(conn
) != 1)
1181 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn
));
1183 /* Now that it's closed we should get an error when describing */
1184 res
= PQdescribePrepared(conn
, "select_one");
1185 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
1186 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res
)));
1189 * Also test the blocking close, this should not fail since closing a
1190 * non-existent prepared statement is a no-op
1192 res
= PQclosePrepared(conn
, "select_one");
1193 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1194 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1196 fprintf(stderr
, "creating portal... ");
1197 PQexec(conn
, "BEGIN");
1198 PQexec(conn
, "DECLARE cursor_one CURSOR FOR SELECT 1");
1199 PQenterPipelineMode(conn
);
1200 if (PQsendDescribePortal(conn
, "cursor_one") != 1)
1201 pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn
));
1202 if (PQpipelineSync(conn
) != 1)
1203 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1204 res
= PQgetResult(conn
);
1206 pg_fatal("PQgetResult returned null");
1207 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1208 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1210 typ
= PQftype(res
, 0);
1212 pg_fatal("portal: expected type %u, got %u",
1215 res
= PQgetResult(conn
);
1217 pg_fatal("expected NULL result");
1218 res
= PQgetResult(conn
);
1219 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1220 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1222 fprintf(stderr
, "closing portal... ");
1223 if (PQsendClosePortal(conn
, "cursor_one") != 1)
1224 pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn
));
1225 if (PQpipelineSync(conn
) != 1)
1226 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1228 res
= PQgetResult(conn
);
1230 pg_fatal("expected non-NULL result");
1231 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1232 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1234 res
= PQgetResult(conn
);
1236 pg_fatal("expected NULL result");
1237 res
= PQgetResult(conn
);
1238 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1239 pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res
)));
1241 if (PQexitPipelineMode(conn
) != 1)
1242 pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn
));
1244 /* Now that it's closed we should get an error when describing */
1245 res
= PQdescribePortal(conn
, "cursor_one");
1246 if (PQresultStatus(res
) != PGRES_FATAL_ERROR
)
1247 pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res
)));
1250 * Also test the blocking close, this should not fail since closing a
1251 * non-existent portal is a no-op
1253 res
= PQclosePortal(conn
, "cursor_one");
1254 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1255 pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res
)));
1257 fprintf(stderr
, "ok\n");
1260 /* Notice processor: print notices, and count how many we got */
1262 notice_processor(void *arg
, const char *message
)
1264 int *n_notices
= (int *) arg
;
1267 fprintf(stderr
, "NOTICE %d: %s", *n_notices
, message
);
1270 /* Verify behavior in "idle" state */
1272 test_pipeline_idle(PGconn
*conn
)
1277 fprintf(stderr
, "\npipeline idle...\n");
1279 PQsetNoticeProcessor(conn
, notice_processor
, &n_notices
);
1281 /* Try to exit pipeline mode in pipeline-idle state */
1282 if (PQenterPipelineMode(conn
) != 1)
1283 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
1284 if (PQsendQueryParams(conn
, "SELECT 1", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1285 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
1286 PQsendFlushRequest(conn
);
1287 res
= PQgetResult(conn
);
1289 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1290 PQerrorMessage(conn
));
1291 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1292 pg_fatal("unexpected result code %s from first pipeline item",
1293 PQresStatus(PQresultStatus(res
)));
1295 res
= PQgetResult(conn
);
1297 pg_fatal("did not receive terminating NULL");
1298 if (PQsendQueryParams(conn
, "SELECT 2", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1299 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
1300 if (PQexitPipelineMode(conn
) == 1)
1301 pg_fatal("exiting pipeline succeeded when it shouldn't");
1302 if (strncmp(PQerrorMessage(conn
), "cannot exit pipeline mode",
1303 strlen("cannot exit pipeline mode")) != 0)
1304 pg_fatal("did not get expected error; got: %s",
1305 PQerrorMessage(conn
));
1306 PQsendFlushRequest(conn
);
1307 res
= PQgetResult(conn
);
1308 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1309 pg_fatal("unexpected result code %s from second pipeline item",
1310 PQresStatus(PQresultStatus(res
)));
1312 res
= PQgetResult(conn
);
1314 pg_fatal("did not receive terminating NULL");
1315 if (PQexitPipelineMode(conn
) != 1)
1316 pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn
));
1319 pg_fatal("got %d notice(s)", n_notices
);
1320 fprintf(stderr
, "ok - 1\n");
1322 /* Have a WARNING in the middle of a resultset */
1323 if (PQenterPipelineMode(conn
) != 1)
1324 pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn
));
1325 if (PQsendQueryParams(conn
, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1326 pg_fatal("failed to send query: %s", PQerrorMessage(conn
));
1327 PQsendFlushRequest(conn
);
1328 res
= PQgetResult(conn
);
1330 pg_fatal("unexpected NULL result received");
1331 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1332 pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res
)));
1333 if (PQexitPipelineMode(conn
) != 1)
1334 pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn
));
1335 fprintf(stderr
, "ok - 2\n");
1339 test_simple_pipeline(PGconn
*conn
)
1341 PGresult
*res
= NULL
;
1342 const char *dummy_params
[1] = {"1"};
1343 Oid dummy_param_oids
[1] = {INT4OID
};
1345 fprintf(stderr
, "simple pipeline... ");
1348 * Enter pipeline mode and dispatch a set of operations, which we'll then
1349 * process the results of as they come in.
1351 * For a simple case we should be able to do this without interim
1352 * processing of results since our output buffer will give us enough slush
1353 * to work with and we won't block on sending. So blocking mode is fine.
1355 if (PQisnonblocking(conn
))
1356 pg_fatal("Expected blocking connection mode");
1358 if (PQenterPipelineMode(conn
) != 1)
1359 pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn
));
1361 if (PQsendQueryParams(conn
, "SELECT $1",
1362 1, dummy_param_oids
, dummy_params
,
1363 NULL
, NULL
, 0) != 1)
1364 pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn
));
1366 if (PQexitPipelineMode(conn
) != 0)
1367 pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1369 if (PQpipelineSync(conn
) != 1)
1370 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1372 res
= PQgetResult(conn
);
1374 pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1375 PQerrorMessage(conn
));
1377 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1378 pg_fatal("Unexpected result code %s from first pipeline item",
1379 PQresStatus(PQresultStatus(res
)));
1384 if (PQgetResult(conn
) != NULL
)
1385 pg_fatal("PQgetResult returned something extra after first query result.");
1388 * Even though we've processed the result there's still a sync to come and
1389 * we can't exit pipeline mode yet
1391 if (PQexitPipelineMode(conn
) != 0)
1392 pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1394 res
= PQgetResult(conn
);
1396 pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1397 PQerrorMessage(conn
));
1399 if (PQresultStatus(res
) != PGRES_PIPELINE_SYNC
)
1400 pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1401 PQresStatus(PQresultStatus(res
)), PQerrorMessage(conn
));
1406 if (PQgetResult(conn
) != NULL
)
1407 pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1408 PQresStatus(PQresultStatus(res
)));
1410 /* We're still in pipeline mode... */
1411 if (PQpipelineStatus(conn
) == PQ_PIPELINE_OFF
)
1412 pg_fatal("Fell out of pipeline mode somehow");
1414 /* ... until we end it, which we can safely do now */
1415 if (PQexitPipelineMode(conn
) != 1)
1416 pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1417 PQerrorMessage(conn
));
1419 if (PQpipelineStatus(conn
) != PQ_PIPELINE_OFF
)
1420 pg_fatal("Exiting pipeline mode didn't seem to work");
1422 fprintf(stderr
, "ok\n");
1426 test_singlerowmode(PGconn
*conn
)
1430 bool pipeline_ended
= false;
1432 if (PQenterPipelineMode(conn
) != 1)
1433 pg_fatal("failed to enter pipeline mode: %s",
1434 PQerrorMessage(conn
));
1436 /* One series of three commands, using single-row mode for the first two. */
1437 for (i
= 0; i
< 3; i
++)
1441 param
[0] = psprintf("%d", 44 + i
);
1443 if (PQsendQueryParams(conn
,
1444 "SELECT generate_series(42, $1)",
1447 (const char **) param
,
1451 pg_fatal("failed to send query: %s",
1452 PQerrorMessage(conn
));
1455 if (PQpipelineSync(conn
) != 1)
1456 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1458 for (i
= 0; !pipeline_ended
; i
++)
1461 bool saw_ending_tuplesok
;
1462 bool isSingleTuple
= false;
1464 /* Set single row mode for only first 2 SELECT queries */
1467 if (PQsetSingleRowMode(conn
) != 1)
1468 pg_fatal("PQsetSingleRowMode() failed for i=%d", i
);
1471 /* Consume rows for this query */
1472 saw_ending_tuplesok
= false;
1473 while ((res
= PQgetResult(conn
)) != NULL
)
1475 ExecStatusType est
= PQresultStatus(res
);
1477 if (est
== PGRES_PIPELINE_SYNC
)
1479 fprintf(stderr
, "end of pipeline reached\n");
1480 pipeline_ended
= true;
1483 pg_fatal("Expected three results, got %d", i
);
1487 /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1490 if (i
<= 1 && est
!= PGRES_SINGLE_TUPLE
)
1491 pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1492 i
, PQresStatus(est
));
1493 if (i
>= 2 && est
!= PGRES_TUPLES_OK
)
1494 pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1495 i
, PQresStatus(est
));
1499 fprintf(stderr
, "Result status %s for query %d", PQresStatus(est
), i
);
1502 case PGRES_TUPLES_OK
:
1503 fprintf(stderr
, ", tuples: %d\n", PQntuples(res
));
1504 saw_ending_tuplesok
= true;
1507 if (PQntuples(res
) == 0)
1508 fprintf(stderr
, "all tuples received in query %d\n", i
);
1510 pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1514 case PGRES_SINGLE_TUPLE
:
1515 isSingleTuple
= true;
1516 fprintf(stderr
, ", %d tuple: %s\n", PQntuples(res
), PQgetvalue(res
, 0, 0));
1520 pg_fatal("unexpected");
1524 if (!pipeline_ended
&& !saw_ending_tuplesok
)
1525 pg_fatal("didn't get expected terminating TUPLES_OK");
1529 * Now issue one command, get its results in with single-row mode, then
1530 * issue another command, and get its results in normal mode; make sure
1531 * the single-row mode flag is reset as expected.
1533 if (PQsendQueryParams(conn
, "SELECT generate_series(0, 0)",
1534 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1535 pg_fatal("failed to send query: %s",
1536 PQerrorMessage(conn
));
1537 if (PQsendFlushRequest(conn
) != 1)
1538 pg_fatal("failed to send flush request");
1539 if (PQsetSingleRowMode(conn
) != 1)
1540 pg_fatal("PQsetSingleRowMode() failed");
1541 res
= PQgetResult(conn
);
1543 pg_fatal("unexpected NULL");
1544 if (PQresultStatus(res
) != PGRES_SINGLE_TUPLE
)
1545 pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1546 PQresStatus(PQresultStatus(res
)));
1547 res
= PQgetResult(conn
);
1549 pg_fatal("unexpected NULL");
1550 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1551 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1552 PQresStatus(PQresultStatus(res
)));
1553 if (PQgetResult(conn
) != NULL
)
1554 pg_fatal("expected NULL result");
1556 if (PQsendQueryParams(conn
, "SELECT 1",
1557 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1558 pg_fatal("failed to send query: %s",
1559 PQerrorMessage(conn
));
1560 if (PQsendFlushRequest(conn
) != 1)
1561 pg_fatal("failed to send flush request");
1562 res
= PQgetResult(conn
);
1564 pg_fatal("unexpected NULL");
1565 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1566 pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1567 PQresStatus(PQresultStatus(res
)));
1568 if (PQgetResult(conn
) != NULL
)
1569 pg_fatal("expected NULL result");
1571 if (PQexitPipelineMode(conn
) != 1)
1572 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn
));
1574 fprintf(stderr
, "ok\n");
1578 * Simple test to verify that a pipeline is discarded as a whole when there's
1579 * an error, ignoring transaction commands.
1582 test_transaction(PGconn
*conn
)
1588 res
= PQexec(conn
, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1589 "CREATE TABLE pq_pipeline_tst (id int)");
1590 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1591 pg_fatal("failed to create test table: %s",
1592 PQerrorMessage(conn
));
1595 if (PQenterPipelineMode(conn
) != 1)
1596 pg_fatal("failed to enter pipeline mode: %s",
1597 PQerrorMessage(conn
));
1598 if (PQsendPrepare(conn
, "rollback", "ROLLBACK", 0, NULL
) != 1)
1599 pg_fatal("could not send prepare on pipeline: %s",
1600 PQerrorMessage(conn
));
1602 if (PQsendQueryParams(conn
,
1604 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1605 pg_fatal("failed to send query: %s",
1606 PQerrorMessage(conn
));
1607 if (PQsendQueryParams(conn
,
1609 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1610 pg_fatal("failed to send query: %s",
1611 PQerrorMessage(conn
));
1614 * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1615 * get out of the pipeline-aborted state first.
1617 if (PQsendQueryPrepared(conn
, "rollback", 0, NULL
, NULL
, NULL
, 1) != 1)
1618 pg_fatal("failed to execute prepared: %s",
1619 PQerrorMessage(conn
));
1621 /* This insert fails because we're in pipeline-aborted state */
1622 if (PQsendQueryParams(conn
,
1623 "INSERT INTO pq_pipeline_tst VALUES (1)",
1624 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1625 pg_fatal("failed to send query: %s",
1626 PQerrorMessage(conn
));
1627 if (PQpipelineSync(conn
) != 1)
1628 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1632 * This insert fails even though the pipeline got a SYNC, because we're in
1633 * an aborted transaction
1635 if (PQsendQueryParams(conn
,
1636 "INSERT INTO pq_pipeline_tst VALUES (2)",
1637 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1638 pg_fatal("failed to send query: %s",
1639 PQerrorMessage(conn
));
1640 if (PQpipelineSync(conn
) != 1)
1641 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1645 * Send ROLLBACK using prepared stmt. This one works because we just did
1646 * PQpipelineSync above.
1648 if (PQsendQueryPrepared(conn
, "rollback", 0, NULL
, NULL
, NULL
, 1) != 1)
1649 pg_fatal("failed to execute prepared: %s",
1650 PQerrorMessage(conn
));
1653 * Now that we're out of a transaction and in pipeline-good mode, this
1656 if (PQsendQueryParams(conn
,
1657 "INSERT INTO pq_pipeline_tst VALUES (3)",
1658 0, NULL
, NULL
, NULL
, NULL
, 0) != 1)
1659 pg_fatal("failed to send query: %s",
1660 PQerrorMessage(conn
));
1661 /* Send two syncs now -- match up to SYNC messages below */
1662 if (PQpipelineSync(conn
) != 1)
1663 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1665 if (PQpipelineSync(conn
) != 1)
1666 pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn
));
1669 expect_null
= false;
1670 for (int i
= 0;; i
++)
1672 ExecStatusType restype
;
1674 res
= PQgetResult(conn
);
1677 printf("%d: got NULL result\n", i
);
1679 pg_fatal("did not expect NULL here");
1680 expect_null
= false;
1683 restype
= PQresultStatus(res
);
1684 printf("%d: got status %s", i
, PQresStatus(restype
));
1686 pg_fatal("expected NULL");
1687 if (restype
== PGRES_FATAL_ERROR
)
1688 printf("; error: %s", PQerrorMessage(conn
));
1689 else if (restype
== PGRES_PIPELINE_ABORTED
)
1691 printf(": command didn't run because pipeline aborted\n");
1697 if (restype
== PGRES_PIPELINE_SYNC
)
1704 if (PQgetResult(conn
) != NULL
)
1705 pg_fatal("returned something extra after all the syncs: %s",
1706 PQresStatus(PQresultStatus(res
)));
1708 if (PQexitPipelineMode(conn
) != 1)
1709 pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn
));
1711 /* We expect to find one tuple containing the value "3" */
1712 res
= PQexec(conn
, "SELECT * FROM pq_pipeline_tst");
1713 if (PQresultStatus(res
) != PGRES_TUPLES_OK
)
1714 pg_fatal("failed to obtain result: %s", PQerrorMessage(conn
));
1715 if (PQntuples(res
) != 1)
1716 pg_fatal("did not get 1 tuple");
1717 if (strcmp(PQgetvalue(res
, 0, 0), "3") != 0)
1718 pg_fatal("did not get expected tuple");
1721 fprintf(stderr
, "ok\n");
1725 * In this test mode we send a stream of queries, with one in the middle
1726 * causing an error. Verify that we can still send some more after the
1727 * error and have libpq work properly.
1730 test_uniqviol(PGconn
*conn
)
1732 int sock
= PQsocket(conn
);
1734 Oid paramTypes
[2] = {INT8OID
, INT8OID
};
1735 const char *paramValues
[2];
1736 char paramValue0
[MAXINT8LEN
];
1737 char paramValue1
[MAXINT8LEN
];
1741 bool read_done
= false;
1742 bool write_done
= false;
1743 bool error_sent
= false;
1744 bool got_error
= false;
1750 fprintf(stderr
, "uniqviol ...");
1752 PQsetnonblocking(conn
, 1);
1754 paramValues
[0] = paramValue0
;
1755 paramValues
[1] = paramValue1
;
1756 sprintf(paramValue1
, "42");
1758 res
= PQexec(conn
, "drop table if exists ppln_uniqviol;"
1759 "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1760 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1761 pg_fatal("failed to create table: %s", PQerrorMessage(conn
));
1763 res
= PQexec(conn
, "begin");
1764 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
1765 pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn
));
1767 res
= PQprepare(conn
, "insertion",
1768 "insert into ppln_uniqviol values ($1, $2) returning id",
1770 if (res
== NULL
|| PQresultStatus(res
) != PGRES_COMMAND_OK
)
1771 pg_fatal("failed to prepare query: %s", PQerrorMessage(conn
));
1773 if (PQenterPipelineMode(conn
) != 1)
1774 pg_fatal("failed to enter pipeline mode");
1779 * Avoid deadlocks by reading everything the server has sent before
1780 * sending anything. (Special precaution is needed here to process
1781 * PQisBusy before testing the socket for read-readiness, because the
1782 * socket does not turn read-ready after "sending" queries in aborted
1785 while (PQisBusy(conn
) == 0)
1789 if (results
>= numsent
)
1796 res
= PQgetResult(conn
);
1797 new_error
= process_result(conn
, res
, results
, numsent
);
1798 if (new_error
&& got_error
)
1799 pg_fatal("got two errors");
1800 got_error
|= new_error
;
1801 if (results
++ >= numsent
- 1)
1813 FD_SET(sock
, &out_fds
);
1816 FD_SET(sock
, &in_fds
);
1818 if (select(sock
+ 1, &in_fds
, write_done
? NULL
: &out_fds
, NULL
, NULL
) == -1)
1822 pg_fatal("select() failed: %m");
1825 if (FD_ISSET(sock
, &in_fds
) && PQconsumeInput(conn
) == 0)
1826 pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn
));
1829 * If the socket is writable and we haven't finished sending queries,
1832 if (!write_done
&& FD_ISSET(sock
, &out_fds
))
1839 * provoke uniqueness violation exactly once after having
1840 * switched to read mode.
1842 if (switched
>= 1 && !error_sent
&& ctr
% socketful
>= socketful
/ 2)
1844 sprintf(paramValue0
, "%d", numsent
/ 2);
1845 fprintf(stderr
, "E");
1850 fprintf(stderr
, ".");
1851 sprintf(paramValue0
, "%d", ctr
++);
1854 if (PQsendQueryPrepared(conn
, "insertion", 2, paramValues
, NULL
, NULL
, 0) != 1)
1855 pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn
));
1858 /* Are we done writing? */
1859 if (socketful
!= 0 && numsent
% socketful
== 42 && error_sent
)
1861 if (PQsendFlushRequest(conn
) != 1)
1862 pg_fatal("failed to send flush request");
1864 fprintf(stderr
, "\ndone writing\n");
1869 /* is the outgoing socket full? */
1870 flush
= PQflush(conn
);
1872 pg_fatal("failed to flush: %s", PQerrorMessage(conn
));
1876 socketful
= numsent
;
1877 fprintf(stderr
, "\nswitch to reading\n");
1886 pg_fatal("did not get expected error");
1888 fprintf(stderr
, "ok\n");
1892 * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1893 * the expected NULL that should follow it.
1895 * Returns true if we read a fatal error message, otherwise false.
1898 process_result(PGconn
*conn
, PGresult
*res
, int results
, int numsent
)
1901 bool got_error
= false;
1904 pg_fatal("got unexpected NULL");
1906 switch (PQresultStatus(res
))
1908 case PGRES_FATAL_ERROR
:
1910 fprintf(stderr
, "result %d/%d (error): %s\n", results
, numsent
, PQerrorMessage(conn
));
1913 res2
= PQgetResult(conn
);
1915 pg_fatal("expected NULL, got %s",
1916 PQresStatus(PQresultStatus(res2
)));
1919 case PGRES_TUPLES_OK
:
1920 fprintf(stderr
, "result %d/%d: %s\n", results
, numsent
, PQgetvalue(res
, 0, 0));
1923 res2
= PQgetResult(conn
);
1925 pg_fatal("expected NULL, got %s",
1926 PQresStatus(PQresultStatus(res2
)));
1929 case PGRES_PIPELINE_ABORTED
:
1930 fprintf(stderr
, "result %d/%d: pipeline aborted\n", results
, numsent
);
1931 res2
= PQgetResult(conn
);
1933 pg_fatal("expected NULL, got %s",
1934 PQresStatus(PQresultStatus(res2
)));
1938 pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res
)));
1946 usage(const char *progname
)
1948 fprintf(stderr
, "%s tests libpq's pipeline mode.\n\n", progname
);
1949 fprintf(stderr
, "Usage:\n");
1950 fprintf(stderr
, " %s [OPTION] tests\n", progname
);
1951 fprintf(stderr
, " %s [OPTION] TESTNAME [CONNINFO]\n", progname
);
1952 fprintf(stderr
, "\nOptions:\n");
1953 fprintf(stderr
, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1954 fprintf(stderr
, " -r NUMROWS use NUMROWS as the test size\n");
1958 print_test_list(void)
1961 printf("disallowed_in_pipeline\n");
1962 printf("multi_pipelines\n");
1964 printf("pipeline_abort\n");
1965 printf("pipeline_idle\n");
1966 printf("pipelined_insert\n");
1967 printf("prepared\n");
1968 printf("simple_pipeline\n");
1969 printf("singlerow\n");
1970 printf("transaction\n");
1971 printf("uniqviol\n");
1975 main(int argc
, char **argv
)
1977 const char *conninfo
= "";
1981 int numrows
= 10000;
1985 while ((c
= getopt(argc
, argv
, "r:t:")) != -1)
1989 case 'r': /* numrows */
1991 numrows
= strtol(optarg
, NULL
, 10);
1992 if (errno
!= 0 || numrows
<= 0)
1994 fprintf(stderr
, "couldn't parse \"%s\" as a positive integer\n",
1999 case 't': /* trace file */
2000 tracefile
= pg_strdup(optarg
);
2007 testname
= pg_strdup(argv
[optind
]);
2016 if (strcmp(testname
, "tests") == 0)
2024 conninfo
= pg_strdup(argv
[optind
]);
2028 /* Make a connection to the database */
2029 conn
= PQconnectdb(conninfo
);
2030 if (PQstatus(conn
) != CONNECTION_OK
)
2032 fprintf(stderr
, "Connection to database failed: %s\n",
2033 PQerrorMessage(conn
));
2037 res
= PQexec(conn
, "SET lc_messages TO \"C\"");
2038 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
2039 pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn
));
2040 res
= PQexec(conn
, "SET debug_parallel_query = off");
2041 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
2042 pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn
));
2044 /* Set the trace file, if requested */
2045 if (tracefile
!= NULL
)
2047 if (strcmp(tracefile
, "-") == 0)
2050 trace
= fopen(tracefile
, "w");
2052 pg_fatal("could not open file \"%s\": %m", tracefile
);
2054 /* Make it line-buffered */
2055 setvbuf(trace
, NULL
, PG_IOLBF
, 0);
2057 PQtrace(conn
, trace
);
2058 PQsetTraceFlags(conn
,
2059 PQTRACE_SUPPRESS_TIMESTAMPS
| PQTRACE_REGRESS_MODE
);
2062 if (strcmp(testname
, "cancel") == 0)
2064 else if (strcmp(testname
, "disallowed_in_pipeline") == 0)
2065 test_disallowed_in_pipeline(conn
);
2066 else if (strcmp(testname
, "multi_pipelines") == 0)
2067 test_multi_pipelines(conn
);
2068 else if (strcmp(testname
, "nosync") == 0)
2070 else if (strcmp(testname
, "pipeline_abort") == 0)
2071 test_pipeline_abort(conn
);
2072 else if (strcmp(testname
, "pipeline_idle") == 0)
2073 test_pipeline_idle(conn
);
2074 else if (strcmp(testname
, "pipelined_insert") == 0)
2075 test_pipelined_insert(conn
, numrows
);
2076 else if (strcmp(testname
, "prepared") == 0)
2077 test_prepared(conn
);
2078 else if (strcmp(testname
, "simple_pipeline") == 0)
2079 test_simple_pipeline(conn
);
2080 else if (strcmp(testname
, "singlerow") == 0)
2081 test_singlerowmode(conn
);
2082 else if (strcmp(testname
, "transaction") == 0)
2083 test_transaction(conn
);
2084 else if (strcmp(testname
, "uniqviol") == 0)
2085 test_uniqviol(conn
);
2088 fprintf(stderr
, "\"%s\" is not a recognized test name\n", testname
);
2092 /* close the connection to the database and cleanup */