From 319e9e53f3797fe4a4753b54a98ab717bb2f2ca0 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Mon, 11 Mar 2024 21:54:03 +0100 Subject: [PATCH] Add tests for libpq query cancellation APIs This is in preparation of making changes and additions to these APIs. Author: Jelte Fennema-Nio Discussion: https://postgr.es/m/CAGECzQRb21spiiykQ48rzz8w+Hcykz+mB2_hxR65D9Qk6nnw=w@mail.gmail.com --- src/test/modules/libpq_pipeline/libpq_pipeline.c | 175 ++++++++++++++++++++++- 1 file changed, 173 insertions(+), 2 deletions(-) diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 5f43aa40de..97f8febf92 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -65,6 +65,11 @@ exit_nicely(PGconn *conn) } /* + * The following few functions are wrapped in macros to make the reported line + * number in an error match the line number of the invocation. + */ + +/* * Print an error to stderr and terminate the program. */ #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__) @@ -74,7 +79,6 @@ pg_fatal_impl(int line, const char *fmt,...) { va_list args; - fflush(stdout); fprintf(stderr, "\n%s:%d: ", progname, line); @@ -86,6 +90,170 @@ pg_fatal_impl(int line, const char *fmt,...) exit(1); } +/* + * Check that the query on the given connection got canceled. + */ +#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn) +static void +confirm_query_canceled_impl(int line, PGconn *conn) +{ + PGresult *res = NULL; + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal_impl(line, "PQgetResult returned null: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal_impl(line, "query did not fail when it was expected"); + if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0) + pg_fatal_impl(line, "query failed with a different error than cancellation: %s", + PQerrorMessage(conn)); + PQclear(res); + + while (PQisBusy(conn)) + PQconsumeInput(conn); +} + +#define send_cancellable_query(conn, monitorConn) \ + send_cancellable_query_impl(__LINE__, conn, monitorConn) +static void +send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) +{ + const char *env_wait; + const Oid paramTypes[1] = {INT4OID}; + int procpid = PQbackendPID(conn); + + env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT"); + if (env_wait == NULL) + env_wait = "180"; + + if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes, + &env_wait, NULL, NULL, 0) != 1) + pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn)); + + /* + * Wait until the query is actually running. Otherwise sending a + * cancellation request might not cancel the query due to race conditions. + */ + while (true) + { + char *value; + PGresult *res; + const char *paramValues[1]; + char pidval[16]; + + snprintf(pidval, 16, "%d", procpid); + paramValues[0] = pidval; + + res = PQexecParams(monitorConn, + "SELECT count(*) FROM pg_stat_activity WHERE " + "pid = $1 AND state = 'active'", + 1, NULL, paramValues, NULL, NULL, 1); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn)); + if (PQntuples(res) != 1) + pg_fatal("unexpected number of rows received: %d", PQntuples(res)); + if (PQnfields(res) != 1) + pg_fatal("unexpected number of columns received: %d", PQnfields(res)); + value = PQgetvalue(res, 0, 0); + if (*value != '0') + { + PQclear(res); + break; + } + PQclear(res); + + /* wait 10ms before polling again */ + pg_usleep(10000); + } +} + +/* + * Create a new connection with the same conninfo as the given one. + */ +static PGconn * +copy_connection(PGconn *conn) +{ + PGconn *copyConn; + PQconninfoOption *opts = PQconninfo(conn); + const char **keywords; + const char **vals; + int nopts = 1; + int i = 0; + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + nopts++; + + keywords = pg_malloc(sizeof(char *) * nopts); + vals = pg_malloc(sizeof(char *) * nopts); + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + { + if (opt->val) + { + keywords[i] = opt->keyword; + vals[i] = opt->val; + i++; + } + } + keywords[i] = vals[i] = NULL; + + copyConn = PQconnectdbParams(keywords, vals, false); + + if (PQstatus(copyConn) != CONNECTION_OK) + pg_fatal("Connection to database failed: %s", + PQerrorMessage(copyConn)); + + return copyConn; +} + +/* + * Test query cancellation routines + */ +static void +test_cancel(PGconn *conn) +{ + PGcancel *cancel; + PGconn *monitorConn; + char errorbuf[256]; + + fprintf(stderr, "test cancellations... "); + + if (PQsetnonblocking(conn, 1) != 0) + pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn)); + + /* + * Make a separate connection to the database to monitor the query on the + * main connection. + */ + monitorConn = copy_connection(conn); + Assert(PQstatus(monitorConn) == CONNECTION_OK); + + /* test PQcancel */ + send_cancellable_query(conn, monitorConn); + cancel = PQgetCancel(conn); + if (!PQcancel(cancel, errorbuf, sizeof(errorbuf))) + pg_fatal("failed to run PQcancel: %s", errorbuf); + confirm_query_canceled(conn); + + /* PGcancel object can be reused for the next query */ + send_cancellable_query(conn, monitorConn); + if (!PQcancel(cancel, errorbuf, sizeof(errorbuf))) + pg_fatal("failed to run PQcancel: %s", errorbuf); + confirm_query_canceled(conn); + + PQfreeCancel(cancel); + + /* test PQrequestCancel */ + send_cancellable_query(conn, monitorConn); + if (!PQrequestCancel(conn)) + pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn)); + confirm_query_canceled(conn); + + fprintf(stderr, "ok\n"); +} + static void test_disallowed_in_pipeline(PGconn *conn) { @@ -1789,6 +1957,7 @@ usage(const char *progname) static void print_test_list(void) { + printf("cancel\n"); printf("disallowed_in_pipeline\n"); printf("multi_pipelines\n"); printf("nosync\n"); @@ -1890,7 +2059,9 @@ main(int argc, char **argv) PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE); } - if (strcmp(testname, "disallowed_in_pipeline") == 0) + if (strcmp(testname, "cancel") == 0) + test_cancel(conn); + else if (strcmp(testname, "disallowed_in_pipeline") == 0) test_disallowed_in_pipeline(conn); else if (strcmp(testname, "multi_pipelines") == 0) test_multi_pipelines(conn); -- 2.11.4.GIT