Add tests for libpq query cancellation APIs

This is in preparation of making changes and additions to these APIs.

Author: Jelte Fennema-Nio <postgres@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQRb21spiiykQ48rzz8w+Hcykz+mB2_hxR65D9Qk6nnw=w@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2024-03-11 21:54:03 +01:00
parent 24c928ad9a
commit 319e9e53f3
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
1 changed files with 173 additions and 2 deletions

View File

@ -64,6 +64,11 @@ exit_nicely(PGconn *conn)
exit(1);
}
/*
* The following few functions are wrapped in macros to make the reported line
* number in an error match the line number of the invocation.
*/
/*
* Print an error to stderr and terminate the program.
*/
@ -74,7 +79,6 @@ pg_fatal_impl(int line, const char *fmt,...)
{
va_list args;
fflush(stdout);
fprintf(stderr, "\n%s:%d: ", progname, line);
@ -86,6 +90,170 @@ pg_fatal_impl(int line, const char *fmt,...)
exit(1);
}
/*
* Check that the query on the given connection got canceled.
*/
#define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
static void
confirm_query_canceled_impl(int line, PGconn *conn)
{
PGresult *res = NULL;
res = PQgetResult(conn);
if (res == NULL)
pg_fatal_impl(line, "PQgetResult returned null: %s",
PQerrorMessage(conn));
if (PQresultStatus(res) != PGRES_FATAL_ERROR)
pg_fatal_impl(line, "query did not fail when it was expected");
if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
PQerrorMessage(conn));
PQclear(res);
while (PQisBusy(conn))
PQconsumeInput(conn);
}
#define send_cancellable_query(conn, monitorConn) \
send_cancellable_query_impl(__LINE__, conn, monitorConn)
static void
send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
{
const char *env_wait;
const Oid paramTypes[1] = {INT4OID};
int procpid = PQbackendPID(conn);
env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
if (env_wait == NULL)
env_wait = "180";
if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
&env_wait, NULL, NULL, 0) != 1)
pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
/*
* Wait until the query is actually running. Otherwise sending a
* cancellation request might not cancel the query due to race conditions.
*/
while (true)
{
char *value;
PGresult *res;
const char *paramValues[1];
char pidval[16];
snprintf(pidval, 16, "%d", procpid);
paramValues[0] = pidval;
res = PQexecParams(monitorConn,
"SELECT count(*) FROM pg_stat_activity WHERE "
"pid = $1 AND state = 'active'",
1, NULL, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
if (PQntuples(res) != 1)
pg_fatal("unexpected number of rows received: %d", PQntuples(res));
if (PQnfields(res) != 1)
pg_fatal("unexpected number of columns received: %d", PQnfields(res));
value = PQgetvalue(res, 0, 0);
if (*value != '0')
{
PQclear(res);
break;
}
PQclear(res);
/* wait 10ms before polling again */
pg_usleep(10000);
}
}
/*
* Create a new connection with the same conninfo as the given one.
*/
static PGconn *
copy_connection(PGconn *conn)
{
PGconn *copyConn;
PQconninfoOption *opts = PQconninfo(conn);
const char **keywords;
const char **vals;
int nopts = 1;
int i = 0;
for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
nopts++;
keywords = pg_malloc(sizeof(char *) * nopts);
vals = pg_malloc(sizeof(char *) * nopts);
for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
{
if (opt->val)
{
keywords[i] = opt->keyword;
vals[i] = opt->val;
i++;
}
}
keywords[i] = vals[i] = NULL;
copyConn = PQconnectdbParams(keywords, vals, false);
if (PQstatus(copyConn) != CONNECTION_OK)
pg_fatal("Connection to database failed: %s",
PQerrorMessage(copyConn));
return copyConn;
}
/*
* Test query cancellation routines
*/
static void
test_cancel(PGconn *conn)
{
PGcancel *cancel;
PGconn *monitorConn;
char errorbuf[256];
fprintf(stderr, "test cancellations... ");
if (PQsetnonblocking(conn, 1) != 0)
pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
/*
* Make a separate connection to the database to monitor the query on the
* main connection.
*/
monitorConn = copy_connection(conn);
Assert(PQstatus(monitorConn) == CONNECTION_OK);
/* test PQcancel */
send_cancellable_query(conn, monitorConn);
cancel = PQgetCancel(conn);
if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
pg_fatal("failed to run PQcancel: %s", errorbuf);
confirm_query_canceled(conn);
/* PGcancel object can be reused for the next query */
send_cancellable_query(conn, monitorConn);
if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
pg_fatal("failed to run PQcancel: %s", errorbuf);
confirm_query_canceled(conn);
PQfreeCancel(cancel);
/* test PQrequestCancel */
send_cancellable_query(conn, monitorConn);
if (!PQrequestCancel(conn))
pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
confirm_query_canceled(conn);
fprintf(stderr, "ok\n");
}
static void
test_disallowed_in_pipeline(PGconn *conn)
{
@ -1789,6 +1957,7 @@ usage(const char *progname)
static void
print_test_list(void)
{
printf("cancel\n");
printf("disallowed_in_pipeline\n");
printf("multi_pipelines\n");
printf("nosync\n");
@ -1890,7 +2059,9 @@ main(int argc, char **argv)
PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
}
if (strcmp(testname, "disallowed_in_pipeline") == 0)
if (strcmp(testname, "cancel") == 0)
test_cancel(conn);
else if (strcmp(testname, "disallowed_in_pipeline") == 0)
test_disallowed_in_pipeline(conn);
else if (strcmp(testname, "multi_pipelines") == 0)
test_multi_pipelines(conn);