Hopefully make libpq_pipeline's new cancel test more reliable

The newly introduced cancel test in libpq_pipeline was flaky. It's not
completely clear why, but one option is that the check for "active" was
actually seeing the active state for the previous query. This change
should address any such race condition by first waiting until the
connection is reported as idle.

Author: Jelte Fennema-Nio <me@jeltef.nl>
Discussion: https://postgr.es/m/CAGECzQRvmUK5-d68A+cm+fgmfht9Dv2uZ28-qq3QiaF6EAZqPQ@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2024-03-13 19:53:49 +01:00
parent dbfc447165
commit 1ee910ce43
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
1 changed files with 55 additions and 35 deletions

View File

@ -114,6 +114,51 @@ confirm_query_canceled_impl(int line, PGconn *conn)
PQconsumeInput(conn);
}
/*
* Using monitorConn, query pg_stat_activity to see that the connection with
* the given PID is in the given state. We never stop until it does.
*/
static void
wait_for_connection_state(int line, PGconn *monitorConn, int procpid, char *state)
{
const Oid paramTypes[] = {INT4OID, TEXTOID};
const char *paramValues[2];
char *pidstr = psprintf("%d", procpid);
paramValues[0] = pidstr;
paramValues[1] = state;
while (true)
{
PGresult *res;
char *value;
res = PQexecParams(monitorConn,
"SELECT count(*) FROM pg_stat_activity WHERE "
"pid = $1 AND state = $2",
2, paramTypes, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
if (PQntuples(res) != 1)
pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
if (PQnfields(res) != 1)
pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
value = PQgetvalue(res, 0, 0);
if (value[0] != '0')
{
PQclear(res);
break;
}
PQclear(res);
/* wait 10ms before polling again */
pg_usleep(10000);
}
pfree(pidstr);
}
#define send_cancellable_query(conn, monitorConn) \
send_cancellable_query_impl(__LINE__, conn, monitorConn)
static void
@ -121,7 +166,13 @@ send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
{
const char *env_wait;
const Oid paramTypes[1] = {INT4OID};
int procpid = PQbackendPID(conn);
/*
* Wait for the connection to be idle, so that our check for an active
* connection below is reliable, instead of possibly seeing an outdated
* state.
*/
wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle");
env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
if (env_wait == NULL)
@ -132,41 +183,10 @@ send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
/*
* Wait until the query is actually running. Otherwise sending a
* cancellation request might not cancel the query due to race conditions.
* Wait for the query to start, because if the query is not running yet
* the cancel request that we send won't have any effect.
*/
while (true)
{
char *value;
PGresult *res;
const char *paramValues[1];
char pidval[16];
snprintf(pidval, 16, "%d", procpid);
paramValues[0] = pidval;
res = PQexecParams(monitorConn,
"SELECT count(*) FROM pg_stat_activity WHERE "
"pid = $1 AND state = 'active'",
1, NULL, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
if (PQntuples(res) != 1)
pg_fatal("unexpected number of rows received: %d", PQntuples(res));
if (PQnfields(res) != 1)
pg_fatal("unexpected number of columns received: %d", PQnfields(res));
value = PQgetvalue(res, 0, 0);
if (*value != '0')
{
PQclear(res);
break;
}
PQclear(res);
/* wait 10ms before polling again */
pg_usleep(10000);
}
wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "active");
}
/*