Invent --transaction-size option for pg_restore.

This patch allows pg_restore to wrap its commands into transaction
blocks, somewhat like --single-transaction, except that we commit
and start a new block after every N objects.  Using this mode
with a size limit of 1000 or so objects greatly reduces the number
of transactions consumed by the restore, while preventing any
one transaction from taking enough locks to overrun the receiving
server's shared lock table.

(A value of 1000 works well with the default lock table size of
around 6400 locks.  Higher --transaction-size values can be used
if one has increased the receiving server's lock table size.)

Excessive consumption of XIDs has been reported as a problem for
pg_upgrade in particular, but it could be bad for any restore; and the
change also reduces the number of fsyncs and amount of WAL generated,
so it should provide speed benefits too.

This patch does not try to make parallel workers batch the SQL
commands they issue.  The trouble with doing that is that other
workers may need to see the objects a worker creates right away.
Possibly this can be improved later.

In this patch I have hard-wired pg_upgrade to use a transaction size
of 1000 divided by the number of parallel restore jobs allowed
(without that, we'd still be at risk of overrunning the shared lock
table).  Perhaps there would be value in adding another pg_upgrade
option to allow user control of that, but I'm unsure that it's worth
the trouble; I think few users would use it, and any who did would see
not that much benefit compared to the default.

Patch by me, but the original idea to batch SQL commands during
restore is due to Robins Tharakan.

Discussion: https://postgr.es/m/a9f9376f1c3343a6bb319dce294e20ac@EX13D05UWC001.ant.amazon.com
This commit is contained in:
Tom Lane 2024-04-01 16:46:24 -04:00
parent a45c78e328
commit 959b38d770
7 changed files with 220 additions and 8 deletions

View File

@ -786,6 +786,30 @@ PostgreSQL documentation
</listitem>
</varlistentry>
<varlistentry>
<term><option>--transaction-size=<replaceable class="parameter">N</replaceable></option></term>
<listitem>
<para>
Execute the restore as a series of transactions, each processing
up to <replaceable class="parameter">N</replaceable> database
objects. This option implies <option>--exit-on-error</option>.
</para>
<para>
<option>--transaction-size</option> offers an intermediate choice
between the default behavior (one transaction per SQL command)
and <option>-1</option>/<option>--single-transaction</option>
(one transaction for all restored objects).
While <option>--single-transaction</option> has the least
overhead, it may be impractical for large databases because the
transaction will take a lock on each restored object, possibly
exhausting the server's lock table space.
Using <option>--transaction-size</option> with a size of a few
thousand objects offers nearly the same performance benefits while
capping the amount of lock table space needed.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--use-set-session-authorization</option></term>
<listitem>

View File

@ -149,7 +149,9 @@ typedef struct _restoreOptions
* compression */
int suppressDumpWarnings; /* Suppress output of WARNING entries
* to stderr */
bool single_txn;
bool single_txn; /* restore all TOCs in one transaction */
int txn_size; /* restore this many TOCs per txn, if > 0 */
bool *idWanted; /* array showing which dump IDs to emit */
int enable_row_security;

View File

@ -502,7 +502,28 @@ RestoreArchive(Archive *AHX)
/* Otherwise, drop anything that's selected and has a dropStmt */
if (((te->reqs & (REQ_SCHEMA | REQ_DATA)) != 0) && te->dropStmt)
{
bool not_allowed_in_txn = false;
pg_log_info("dropping %s %s", te->desc, te->tag);
/*
* In --transaction-size mode, we have to temporarily exit our
* transaction block to drop objects that can't be dropped
* within a transaction.
*/
if (ropt->txn_size > 0)
{
if (strcmp(te->desc, "DATABASE") == 0 ||
strcmp(te->desc, "DATABASE PROPERTIES") == 0)
{
not_allowed_in_txn = true;
if (AH->connection)
CommitTransaction(AHX);
else
ahprintf(AH, "COMMIT;\n");
}
}
/* Select owner and schema as necessary */
_becomeOwner(AH, te);
_selectOutputSchema(AH, te->namespace);
@ -628,6 +649,33 @@ RestoreArchive(Archive *AHX)
}
}
}
/*
* In --transaction-size mode, re-establish the transaction
* block if needed; otherwise, commit after every N drops.
*/
if (ropt->txn_size > 0)
{
if (not_allowed_in_txn)
{
if (AH->connection)
StartTransaction(AHX);
else
ahprintf(AH, "BEGIN;\n");
AH->txnCount = 0;
}
else if (++AH->txnCount >= ropt->txn_size)
{
if (AH->connection)
{
CommitTransaction(AHX);
StartTransaction(AHX);
}
else
ahprintf(AH, "COMMIT;\nBEGIN;\n");
AH->txnCount = 0;
}
}
}
}
@ -724,7 +772,11 @@ RestoreArchive(Archive *AHX)
}
}
if (ropt->single_txn)
/*
* Close out any persistent transaction we may have. While these two
* cases are started in different places, we can end both cases here.
*/
if (ropt->single_txn || ropt->txn_size > 0)
{
if (AH->connection)
CommitTransaction(AHX);
@ -785,6 +837,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
*/
if ((reqs & REQ_SCHEMA) != 0)
{
bool object_is_db = false;
/*
* In --transaction-size mode, must exit our transaction block to
* create a database or set its properties.
*/
if (strcmp(te->desc, "DATABASE") == 0 ||
strcmp(te->desc, "DATABASE PROPERTIES") == 0)
{
object_is_db = true;
if (ropt->txn_size > 0)
{
if (AH->connection)
CommitTransaction(&AH->public);
else
ahprintf(AH, "COMMIT;\n\n");
}
}
/* Show namespace in log message if available */
if (te->namespace)
pg_log_info("creating %s \"%s.%s\"",
@ -835,10 +906,10 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
/*
* If we created a DB, connect to it. Also, if we changed DB
* properties, reconnect to ensure that relevant GUC settings are
* applied to our session.
* applied to our session. (That also restarts the transaction block
* in --transaction-size mode.)
*/
if (strcmp(te->desc, "DATABASE") == 0 ||
strcmp(te->desc, "DATABASE PROPERTIES") == 0)
if (object_is_db)
{
pg_log_info("connecting to new database \"%s\"", te->tag);
_reconnectToDB(AH, te->tag);
@ -964,6 +1035,25 @@ restore_toc_entry(ArchiveHandle *AH, TocEntry *te, bool is_parallel)
}
}
/*
* If we emitted anything for this TOC entry, that counts as one action
* against the transaction-size limit. Commit if it's time to.
*/
if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0 && ropt->txn_size > 0)
{
if (++AH->txnCount >= ropt->txn_size)
{
if (AH->connection)
{
CommitTransaction(&AH->public);
StartTransaction(&AH->public);
}
else
ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
AH->txnCount = 0;
}
}
if (AH->public.n_errors > 0 && status == WORKER_OK)
status = WORKER_IGNORED_ERRORS;
@ -1310,7 +1400,12 @@ StartRestoreLOs(ArchiveHandle *AH)
{
RestoreOptions *ropt = AH->public.ropt;
if (!ropt->single_txn)
/*
* LOs must be restored within a transaction block, since we need the LO
* handle to stay open while we write it. Establish a transaction unless
* there's one being used globally.
*/
if (!(ropt->single_txn || ropt->txn_size > 0))
{
if (AH->connection)
StartTransaction(&AH->public);
@ -1329,7 +1424,7 @@ EndRestoreLOs(ArchiveHandle *AH)
{
RestoreOptions *ropt = AH->public.ropt;
if (!ropt->single_txn)
if (!(ropt->single_txn || ropt->txn_size > 0))
{
if (AH->connection)
CommitTransaction(&AH->public);
@ -3171,6 +3266,19 @@ _doSetFixedOutputState(ArchiveHandle *AH)
else
ahprintf(AH, "SET row_security = off;\n");
/*
* In --transaction-size mode, we should always be in a transaction when
* we begin to restore objects.
*/
if (ropt && ropt->txn_size > 0)
{
if (AH->connection)
StartTransaction(&AH->public);
else
ahprintf(AH, "\nBEGIN;\n");
AH->txnCount = 0;
}
ahprintf(AH, "\n");
}
@ -4043,6 +4151,14 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
}
}
/*
* In --transaction-size mode, we must commit the open transaction before
* dropping the database connection. This also ensures that child workers
* can see the objects we've created so far.
*/
if (AH->public.ropt->txn_size > 0)
CommitTransaction(&AH->public);
/*
* Now close parent connection in prep for parallel steps. We do this
* mainly to ensure that we don't exceed the specified number of parallel
@ -4782,6 +4898,10 @@ CloneArchive(ArchiveHandle *AH)
clone = (ArchiveHandle *) pg_malloc(sizeof(ArchiveHandle));
memcpy(clone, AH, sizeof(ArchiveHandle));
/* Likewise flat-copy the RestoreOptions, so we can alter them locally */
clone->public.ropt = (RestoreOptions *) pg_malloc(sizeof(RestoreOptions));
memcpy(clone->public.ropt, AH->public.ropt, sizeof(RestoreOptions));
/* Handle format-independent fields */
memset(&(clone->sqlparse), 0, sizeof(clone->sqlparse));
@ -4803,6 +4923,13 @@ CloneArchive(ArchiveHandle *AH)
/* clones should not share lo_buf */
clone->lo_buf = NULL;
/*
* Clone connections disregard --transaction-size; they must commit after
* each command so that the results are immediately visible to other
* workers.
*/
clone->public.ropt->txn_size = 0;
/*
* Connect our new clone object to the database, using the same connection
* parameters used for the original connection.

View File

@ -324,6 +324,9 @@ struct _archiveHandle
char *currTablespace; /* current tablespace, or NULL */
char *currTableAm; /* current table access method, or NULL */
/* in --transaction-size mode, this counts objects emitted in cur xact */
int txnCount;
void *lo_buf;
size_t lo_buf_used;
size_t lo_buf_size;

View File

@ -554,6 +554,7 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
{
/* Make a writable copy of the command string */
char *buf = pg_strdup(te->defn);
RestoreOptions *ropt = AH->public.ropt;
char *st;
char *en;
@ -562,6 +563,23 @@ IssueCommandPerBlob(ArchiveHandle *AH, TocEntry *te,
{
*en++ = '\0';
ahprintf(AH, "%s%s%s;\n", cmdBegin, st, cmdEnd);
/* In --transaction-size mode, count each command as an action */
if (ropt && ropt->txn_size > 0)
{
if (++AH->txnCount >= ropt->txn_size)
{
if (AH->connection)
{
CommitTransaction(&AH->public);
StartTransaction(&AH->public);
}
else
ahprintf(AH, "COMMIT;\nBEGIN;\n\n");
AH->txnCount = 0;
}
}
st = en;
}
ahprintf(AH, "\n");

View File

@ -120,6 +120,7 @@ main(int argc, char **argv)
{"role", required_argument, NULL, 2},
{"section", required_argument, NULL, 3},
{"strict-names", no_argument, &strict_names, 1},
{"transaction-size", required_argument, NULL, 5},
{"use-set-session-authorization", no_argument, &use_setsessauth, 1},
{"no-comments", no_argument, &no_comments, 1},
{"no-publications", no_argument, &no_publications, 1},
@ -289,10 +290,18 @@ main(int argc, char **argv)
set_dump_section(optarg, &(opts->dumpSections));
break;
case 4:
case 4: /* filter */
read_restore_filters(optarg, opts);
break;
case 5: /* transaction-size */
if (!option_parse_int(optarg, "--transaction-size",
1, INT_MAX,
&opts->txn_size))
exit(1);
opts->exit_on_error = true;
break;
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@ -337,6 +346,9 @@ main(int argc, char **argv)
if (opts->dataOnly && opts->dropSchema)
pg_fatal("options -c/--clean and -a/--data-only cannot be used together");
if (opts->single_txn && opts->txn_size > 0)
pg_fatal("options -1/--single-transaction and --transaction-size cannot be used together");
/*
* -C is not compatible with -1, because we can't create a database inside
* a transaction block.
@ -484,6 +496,7 @@ usage(const char *progname)
printf(_(" --section=SECTION restore named section (pre-data, data, or post-data)\n"));
printf(_(" --strict-names require table and/or schema include patterns to\n"
" match at least one entity each\n"));
printf(_(" --transaction-size=N commit after every N objects\n"));
printf(_(" --use-set-session-authorization\n"
" use SET SESSION AUTHORIZATION commands instead of\n"
" ALTER OWNER commands to set ownership\n"));

View File

@ -51,6 +51,13 @@
#include "fe_utils/string_utils.h"
#include "pg_upgrade.h"
/*
* Maximum number of pg_restore actions (TOC entries) to process within one
* transaction. At some point we might want to make this user-controllable,
* but for now a hard-wired setting will suffice.
*/
#define RESTORE_TRANSACTION_SIZE 1000
static void set_locale_and_encoding(void);
static void prepare_new_cluster(void);
static void prepare_new_globals(void);
@ -562,10 +569,12 @@ create_new_objects(void)
true,
true,
"\"%s/pg_restore\" %s %s --exit-on-error --verbose "
"--transaction-size=%d "
"--dbname postgres \"%s/%s\"",
new_cluster.bindir,
cluster_conn_opts(&new_cluster),
create_opts,
RESTORE_TRANSACTION_SIZE,
log_opts.dumpdir,
sql_file_name);
@ -578,6 +587,7 @@ create_new_objects(void)
log_file_name[MAXPGPATH];
DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum];
const char *create_opts;
int txn_size;
/* Skip template1 in this pass */
if (strcmp(old_db->db_name, "template1") == 0)
@ -597,13 +607,28 @@ create_new_objects(void)
else
create_opts = "--create";
/*
* In parallel mode, reduce the --transaction-size of each restore job
* so that the total number of locks that could be held across all the
* jobs stays in bounds.
*/
txn_size = RESTORE_TRANSACTION_SIZE;
if (user_opts.jobs > 1)
{
txn_size /= user_opts.jobs;
/* Keep some sanity if -j is huge */
txn_size = Max(txn_size, 10);
}
parallel_exec_prog(log_file_name,
NULL,
"\"%s/pg_restore\" %s %s --exit-on-error --verbose "
"--transaction-size=%d "
"--dbname template1 \"%s/%s\"",
new_cluster.bindir,
cluster_conn_opts(&new_cluster),
create_opts,
txn_size,
log_opts.dumpdir,
sql_file_name);
}