Re-implement psql's FETCH_COUNT feature atop libpq's chunked mode.

Formerly this was done with a cursor, which is problematic since
not all result-set-returning query types can be put into a cursor.
The new implementation is better integrated into other psql
features, too.

Daniel Vérité, reviewed by Laurenz Albe and myself (and whacked
around a bit by me, so any remaining bugs are my fault)

Discussion: https://postgr.es/m/CAKZiRmxsVTkO928CM+-ADvsMyePmU3L9DQCa9NwqjvLPcEe5QA@mail.gmail.com
This commit is contained in:
Tom Lane 2024-04-06 20:45:05 -04:00
parent 4643a2b265
commit 90f5178211
4 changed files with 171 additions and 365 deletions

View File

@ -31,7 +31,6 @@
#include "settings.h"
static bool DescribeQuery(const char *query, double *elapsed_msec);
static bool ExecQueryUsingCursor(const char *query, double *elapsed_msec);
static int ExecQueryAndProcessResults(const char *query,
double *elapsed_msec,
bool *svpt_gone_p,
@ -40,7 +39,6 @@ static int ExecQueryAndProcessResults(const char *query,
const printQueryOpt *opt,
FILE *printQueryFout);
static bool command_no_begin(const char *query);
static bool is_select_command(const char *query);
/*
@ -83,6 +81,46 @@ openQueryOutputFile(const char *fname, FILE **fout, bool *is_pipe)
return true;
}
/*
* Check if an output stream for \g needs to be opened, and if yes,
* open it and update the caller's gfile_fout and is_pipe state variables.
* Return true if OK, false if an error occurred.
*/
static bool
SetupGOutput(FILE **gfile_fout, bool *is_pipe)
{
/* If there is a \g file or program, and it's not already open, open it */
if (pset.gfname != NULL && *gfile_fout == NULL)
{
if (openQueryOutputFile(pset.gfname, gfile_fout, is_pipe))
{
if (*is_pipe)
disable_sigpipe_trap();
}
else
return false;
}
return true;
}
/*
* Close the output stream for \g, if we opened it.
*/
static void
CloseGOutput(FILE *gfile_fout, bool is_pipe)
{
if (gfile_fout)
{
if (is_pipe)
{
SetShellResultVariables(pclose(gfile_fout));
restore_sigpipe_trap();
}
else
fclose(gfile_fout);
}
}
/*
* setQFout
* -- handler for -o command line option and \o command
@ -373,6 +411,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
case PGRES_TUPLES_CHUNK:
case PGRES_EMPTY_QUERY:
case PGRES_COPY_IN:
case PGRES_COPY_OUT:
@ -1135,16 +1174,10 @@ SendQuery(const char *query)
/* Describe query's result columns, without executing it */
OK = DescribeQuery(query, &elapsed_msec);
}
else if (pset.fetch_count <= 0 || pset.gexec_flag ||
pset.crosstab_flag || !is_select_command(query))
{
/* Default fetch-it-all-and-print mode */
OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
}
else
{
/* Fetch-in-segments mode */
OK = ExecQueryUsingCursor(query, &elapsed_msec);
/* Default fetch-and-print mode */
OK = (ExecQueryAndProcessResults(query, &elapsed_msec, &svpt_gone, false, 0, NULL, NULL) > 0);
}
if (!OK && pset.echo == PSQL_ECHO_ERRORS)
@ -1454,6 +1487,21 @@ ExecQueryAndProcessResults(const char *query,
return -1;
}
/*
* Fetch the result in chunks if FETCH_COUNT is set. But we don't enable
* chunking if SHOW_ALL_RESULTS is false, since that requires us to
* accumulate all rows before we can tell what should be displayed, which
* would counter the idea of FETCH_COUNT. Chunking is also disabled when
* \crosstab, \gexec, \gset or \watch is used.
*/
if (pset.fetch_count > 0 && pset.show_all_results &&
!pset.crosstab_flag && !pset.gexec_flag &&
!pset.gset_prefix && !is_watch)
{
if (!PQsetChunkedRowsMode(pset.db, pset.fetch_count))
pg_log_warning("fetching results in chunked mode failed");
}
/*
* If SIGINT is sent while the query is processing, the interrupt will be
* consumed. The user's intention, though, is to cancel the entire watch
@ -1475,6 +1523,7 @@ ExecQueryAndProcessResults(const char *query,
while (result != NULL)
{
ExecStatusType result_status;
bool is_chunked_result = false;
PGresult *next_result;
bool last;
@ -1572,20 +1621,9 @@ ExecQueryAndProcessResults(const char *query,
}
else if (pset.gfname)
{
/* send to \g file, which we may have opened already */
if (gfile_fout == NULL)
{
if (openQueryOutputFile(pset.gfname,
&gfile_fout, &gfile_is_pipe))
{
if (gfile_is_pipe)
disable_sigpipe_trap();
copy_stream = gfile_fout;
}
else
success = false;
}
else
/* COPY followed by \g filename or \g |program */
success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
if (gfile_fout)
copy_stream = gfile_fout;
}
else
@ -1603,6 +1641,101 @@ ExecQueryAndProcessResults(const char *query,
success &= HandleCopyResult(&result, copy_stream);
}
/* If we have a chunked result, collect and print all chunks */
if (result_status == PGRES_TUPLES_CHUNK)
{
FILE *tuples_fout = printQueryFout ? printQueryFout : stdout;
printQueryOpt my_popt = pset.popt;
int64 total_tuples = 0;
bool is_pager = false;
int flush_error = 0;
/* initialize print options for partial table output */
my_popt.topt.start_table = true;
my_popt.topt.stop_table = false;
my_popt.topt.prior_records = 0;
/* open \g file if needed */
success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
if (gfile_fout)
tuples_fout = gfile_fout;
/* force use of pager for any chunked resultset going to stdout */
if (success && tuples_fout == stdout)
{
tuples_fout = PageOutput(INT_MAX, &(my_popt.topt));
is_pager = true;
}
do
{
/*
* display the current chunk of results, unless the output
* stream stopped working or we got cancelled
*/
if (success && !flush_error && !cancel_pressed)
{
printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
flush_error = fflush(tuples_fout);
}
/* after the first result set, disallow header decoration */
my_popt.topt.start_table = false;
/* count tuples before dropping the result */
my_popt.topt.prior_records += PQntuples(result);
total_tuples += PQntuples(result);
ClearOrSaveResult(result);
/* get the next result, loop if it's PGRES_TUPLES_CHUNK */
result = PQgetResult(pset.db);
} while (PQresultStatus(result) == PGRES_TUPLES_CHUNK);
/* We expect an empty PGRES_TUPLES_OK, else there's a problem */
if (PQresultStatus(result) == PGRES_TUPLES_OK)
{
char buf[32];
Assert(PQntuples(result) == 0);
/* Display the footer using the empty result */
if (success && !flush_error && !cancel_pressed)
{
my_popt.topt.stop_table = true;
printQuery(result, &my_popt, tuples_fout, is_pager, pset.logfile);
fflush(tuples_fout);
}
if (is_pager)
ClosePager(tuples_fout);
/*
* We must do a fake SetResultVariables(), since we don't have
* a PGresult corresponding to the whole query.
*/
SetVariable(pset.vars, "ERROR", "false");
SetVariable(pset.vars, "SQLSTATE", "00000");
snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
SetVariable(pset.vars, "ROW_COUNT", buf);
/* Prevent SetResultVariables call below */
is_chunked_result = true;
/* Clear the empty result so it isn't printed below */
ClearOrSaveResult(result);
result = NULL;
}
else
{
/* Probably an error report, so close the pager and print it */
if (is_pager)
ClosePager(tuples_fout);
success &= AcceptResult(result, true);
/* SetResultVariables and ClearOrSaveResult happen below */
}
}
/*
* Check PQgetResult() again. In the typical case of a single-command
* string, it will return NULL. Otherwise, we'll have other results
@ -1640,31 +1773,18 @@ ExecQueryAndProcessResults(const char *query,
* tuple output, but it's still used for status output.
*/
FILE *tuples_fout = printQueryFout;
bool do_print = true;
if (PQresultStatus(result) == PGRES_TUPLES_OK &&
pset.gfname)
{
if (gfile_fout == NULL)
{
if (openQueryOutputFile(pset.gfname,
&gfile_fout, &gfile_is_pipe))
{
if (gfile_is_pipe)
disable_sigpipe_trap();
}
else
success = do_print = false;
}
if (PQresultStatus(result) == PGRES_TUPLES_OK)
success &= SetupGOutput(&gfile_fout, &gfile_is_pipe);
if (gfile_fout)
tuples_fout = gfile_fout;
}
if (do_print)
if (success)
success &= PrintQueryResult(result, last, opt,
tuples_fout, printQueryFout);
}
/* set variables from last result */
if (!is_watch && last)
/* set variables from last result, unless dealt with elsewhere */
if (last && !is_watch && !is_chunked_result)
SetResultVariables(result, success);
ClearOrSaveResult(result);
@ -1678,16 +1798,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* close \g file if we opened it */
if (gfile_fout)
{
if (gfile_is_pipe)
{
SetShellResultVariables(pclose(gfile_fout));
restore_sigpipe_trap();
}
else
fclose(gfile_fout);
}
CloseGOutput(gfile_fout, gfile_is_pipe);
/* may need this to recover from conn loss during COPY */
if (!CheckConnection())
@ -1700,274 +1811,6 @@ ExecQueryAndProcessResults(const char *query,
}
/*
* ExecQueryUsingCursor: run a SELECT-like query using a cursor
*
* This feature allows result sets larger than RAM to be dealt with.
*
* Returns true if the query executed successfully, false otherwise.
*
* If pset.timing is on, total query time (exclusive of result-printing) is
* stored into *elapsed_msec.
*/
static bool
ExecQueryUsingCursor(const char *query, double *elapsed_msec)
{
bool OK = true;
PGresult *result;
PQExpBufferData buf;
printQueryOpt my_popt = pset.popt;
bool timing = pset.timing;
FILE *fout;
bool is_pipe;
bool is_pager = false;
bool started_txn = false;
int64 total_tuples = 0;
int ntuples;
int fetch_count;
char fetch_cmd[64];
instr_time before,
after;
int flush_error;
*elapsed_msec = 0;
/* initialize print options for partial table output */
my_popt.topt.start_table = true;
my_popt.topt.stop_table = false;
my_popt.topt.prior_records = 0;
if (timing)
INSTR_TIME_SET_CURRENT(before);
else
INSTR_TIME_SET_ZERO(before);
/* if we're not in a transaction, start one */
if (PQtransactionStatus(pset.db) == PQTRANS_IDLE)
{
result = PQexec(pset.db, "BEGIN");
OK = AcceptResult(result, true) &&
(PQresultStatus(result) == PGRES_COMMAND_OK);
ClearOrSaveResult(result);
if (!OK)
return false;
started_txn = true;
}
/* Send DECLARE CURSOR */
initPQExpBuffer(&buf);
appendPQExpBuffer(&buf, "DECLARE _psql_cursor NO SCROLL CURSOR FOR\n%s",
query);
result = PQexec(pset.db, buf.data);
OK = AcceptResult(result, true) &&
(PQresultStatus(result) == PGRES_COMMAND_OK);
if (!OK)
SetResultVariables(result, OK);
ClearOrSaveResult(result);
termPQExpBuffer(&buf);
if (!OK)
goto cleanup;
if (timing)
{
INSTR_TIME_SET_CURRENT(after);
INSTR_TIME_SUBTRACT(after, before);
*elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
}
/*
* In \gset mode, we force the fetch count to be 2, so that we will throw
* the appropriate error if the query returns more than one row.
*/
if (pset.gset_prefix)
fetch_count = 2;
else
fetch_count = pset.fetch_count;
snprintf(fetch_cmd, sizeof(fetch_cmd),
"FETCH FORWARD %d FROM _psql_cursor",
fetch_count);
/* prepare to write output to \g argument, if any */
if (pset.gfname)
{
if (!openQueryOutputFile(pset.gfname, &fout, &is_pipe))
{
OK = false;
goto cleanup;
}
if (is_pipe)
disable_sigpipe_trap();
}
else
{
fout = pset.queryFout;
is_pipe = false; /* doesn't matter */
}
/* clear any pre-existing error indication on the output stream */
clearerr(fout);
for (;;)
{
if (timing)
INSTR_TIME_SET_CURRENT(before);
/* get fetch_count tuples at a time */
result = PQexec(pset.db, fetch_cmd);
if (timing)
{
INSTR_TIME_SET_CURRENT(after);
INSTR_TIME_SUBTRACT(after, before);
*elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
}
if (PQresultStatus(result) != PGRES_TUPLES_OK)
{
/* shut down pager before printing error message */
if (is_pager)
{
ClosePager(fout);
is_pager = false;
}
OK = AcceptResult(result, true);
Assert(!OK);
SetResultVariables(result, OK);
ClearOrSaveResult(result);
break;
}
if (pset.gset_prefix)
{
/* StoreQueryTuple will complain if not exactly one row */
OK = StoreQueryTuple(result);
ClearOrSaveResult(result);
break;
}
/*
* Note we do not deal with \gdesc, \gexec or \crosstabview modes here
*/
ntuples = PQntuples(result);
total_tuples += ntuples;
if (ntuples < fetch_count)
{
/* this is the last result set, so allow footer decoration */
my_popt.topt.stop_table = true;
}
else if (fout == stdout && !is_pager)
{
/*
* If query requires multiple result sets, hack to ensure that
* only one pager instance is used for the whole mess
*/
fout = PageOutput(INT_MAX, &(my_popt.topt));
is_pager = true;
}
printQuery(result, &my_popt, fout, is_pager, pset.logfile);
ClearOrSaveResult(result);
/* after the first result set, disallow header decoration */
my_popt.topt.start_table = false;
my_popt.topt.prior_records += ntuples;
/*
* Make sure to flush the output stream, so intermediate results are
* visible to the client immediately. We check the results because if
* the pager dies/exits/etc, there's no sense throwing more data at
* it.
*/
flush_error = fflush(fout);
/*
* Check if we are at the end, if a cancel was pressed, or if there
* were any errors either trying to flush out the results, or more
* generally on the output stream at all. If we hit any errors
* writing things to the stream, we presume $PAGER has disappeared and
* stop bothering to pull down more data.
*/
if (ntuples < fetch_count || cancel_pressed || flush_error ||
ferror(fout))
break;
}
if (pset.gfname)
{
/* close \g argument file/pipe */
if (is_pipe)
{
SetShellResultVariables(pclose(fout));
restore_sigpipe_trap();
}
else
fclose(fout);
}
else if (is_pager)
{
/* close transient pager */
ClosePager(fout);
}
if (OK)
{
/*
* We don't have a PGresult here, and even if we did it wouldn't have
* the right row count, so fake SetResultVariables(). In error cases,
* we already set the result variables above.
*/
char buf[32];
SetVariable(pset.vars, "ERROR", "false");
SetVariable(pset.vars, "SQLSTATE", "00000");
snprintf(buf, sizeof(buf), INT64_FORMAT, total_tuples);
SetVariable(pset.vars, "ROW_COUNT", buf);
}
cleanup:
if (timing)
INSTR_TIME_SET_CURRENT(before);
/*
* We try to close the cursor on either success or failure, but on failure
* ignore the result (it's probably just a bleat about being in an aborted
* transaction)
*/
result = PQexec(pset.db, "CLOSE _psql_cursor");
if (OK)
{
OK = AcceptResult(result, true) &&
(PQresultStatus(result) == PGRES_COMMAND_OK);
ClearOrSaveResult(result);
}
else
PQclear(result);
if (started_txn)
{
result = PQexec(pset.db, OK ? "COMMIT" : "ROLLBACK");
OK &= AcceptResult(result, true) &&
(PQresultStatus(result) == PGRES_COMMAND_OK);
ClearOrSaveResult(result);
}
if (timing)
{
INSTR_TIME_SET_CURRENT(after);
INSTR_TIME_SUBTRACT(after, before);
*elapsed_msec += INSTR_TIME_GET_MILLISEC(after);
}
return OK;
}
/*
* Advance the given char pointer over white space and SQL comments.
*/
@ -2247,43 +2090,6 @@ command_no_begin(const char *query)
}
/*
* Check whether the specified command is a SELECT (or VALUES).
*/
static bool
is_select_command(const char *query)
{
int wordlen;
/*
* First advance over any whitespace, comments and left parentheses.
*/
for (;;)
{
query = skip_white_space(query);
if (query[0] == '(')
query++;
else
break;
}
/*
* Check word length (since "selectx" is not "select").
*/
wordlen = 0;
while (isalpha((unsigned char) query[wordlen]))
wordlen += PQmblenBounded(&query[wordlen], pset.encoding);
if (wordlen == 6 && pg_strncasecmp(query, "select", 6) == 0)
return true;
if (wordlen == 6 && pg_strncasecmp(query, "values", 6) == 0)
return true;
return false;
}
/*
* Test if the current user is a database superuser.
*/

View File

@ -161,7 +161,7 @@ psql_like(
'\errverbose with no previous error');
# There are three main ways to run a query that might affect
# \errverbose: The normal way, using a cursor by setting FETCH_COUNT,
# \errverbose: The normal way, piecemeal retrieval using FETCH_COUNT,
# and using \gdesc. Test them all.
like(
@ -184,10 +184,10 @@ like(
"\\set FETCH_COUNT 1\nSELECT error;\n\\errverbose",
on_error_stop => 0))[2],
qr/\A^psql:<stdin>:2: ERROR: .*$
^LINE 2: SELECT error;$
^LINE 1: SELECT error;$
^ *^.*$
^psql:<stdin>:3: error: ERROR: [0-9A-Z]{5}: .*$
^LINE 2: SELECT error;$
^LINE 1: SELECT error;$
^ *^.*$
^LOCATION: .*$/m,
'\errverbose after FETCH_COUNT query with error');

View File

@ -4755,7 +4755,7 @@ number of rows: 0
last error message: syntax error at end of input
\echo 'last error code:' :LAST_ERROR_SQLSTATE
last error code: 42601
-- check row count for a cursor-fetched query
-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
unique2
@ -4787,7 +4787,7 @@ error: false
error code: 00000
\echo 'number of rows:' :ROW_COUNT
number of rows: 19
-- cursor-fetched query with an error after the first group
-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
?column?
----------

View File

@ -1161,14 +1161,14 @@ SELECT 4 AS \gdesc
\echo 'last error message:' :LAST_ERROR_MESSAGE
\echo 'last error code:' :LAST_ERROR_SQLSTATE
-- check row count for a cursor-fetched query
-- check row count for a query with chunked results
\set FETCH_COUNT 10
select unique2 from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE
\echo 'number of rows:' :ROW_COUNT
-- cursor-fetched query with an error after the first group
-- chunked results with an error after the first chunk
select 1/(15-unique2) from tenk1 order by unique2 limit 19;
\echo 'error:' :ERROR
\echo 'error code:' :SQLSTATE