Use incremental parsing of backup manifests.

This changes the three callers to json_parse_manifest() to use
json_parse_manifest_incremental_chunk() if appropriate. In the case of
the backend caller, since we don't know the size of the manifest in
advance we always call the incremental parser.

Author: Andrew Dunstan
Reviewed-By: Jacob Champion

Discussion: https://postgr.es/m/7b0a51d6-0d9d-7366-3a1a-f74397a02f55@dunslane.net
This commit is contained in:
Andrew Dunstan 2024-03-11 02:31:51 -04:00
parent ea7b4e9a2a
commit 222e11a10a
3 changed files with 178 additions and 62 deletions

View File

@ -33,6 +33,14 @@
#define BLOCKS_PER_READ 512
/*
* we expect the find the last lines of the manifest, including the checksum,
* in the last MIN_CHUNK bytes of the manifest. We trigger an incremental
* parse step if we are about to overflow MAX_CHUNK bytes.
*/
#define MIN_CHUNK 1024
#define MAX_CHUNK (128 * 1024)
/*
* Details extracted from the WAL ranges present in the supplied backup manifest.
*/
@ -112,6 +120,11 @@ struct IncrementalBackupInfo
* turns out to be a problem in practice, we'll need to be more clever.
*/
BlockRefTable *brtab;
/*
* State object for incremental JSON parsing
*/
JsonManifestParseIncrementalState *inc_state;
};
static void manifest_process_version(JsonManifestParseContext *context,
@ -142,6 +155,7 @@ CreateIncrementalBackupInfo(MemoryContext mcxt)
{
IncrementalBackupInfo *ib;
MemoryContext oldcontext;
JsonManifestParseContext *context;
oldcontext = MemoryContextSwitchTo(mcxt);
@ -157,6 +171,17 @@ CreateIncrementalBackupInfo(MemoryContext mcxt)
*/
ib->manifest_files = backup_file_create(mcxt, 10000, NULL);
context = palloc0(sizeof(JsonManifestParseContext));
/* Parse the manifest. */
context->private_data = ib;
context->version_cb = manifest_process_version;
context->system_identifier_cb = manifest_process_system_identifier;
context->per_file_cb = manifest_process_file;
context->per_wal_range_cb = manifest_process_wal_range;
context->error_cb = manifest_report_error;
ib->inc_state = json_parse_manifest_incremental_init(context);
MemoryContextSwitchTo(oldcontext);
return ib;
@ -176,13 +201,20 @@ AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data,
/* Switch to our memory context. */
oldcontext = MemoryContextSwitchTo(ib->mcxt);
/*
* XXX. Our json parser is at present incapable of parsing json blobs
* incrementally, so we have to accumulate the entire backup manifest
* before we can do anything with it. This should really be fixed, since
* some users might have very large numbers of files in the data
* directory.
*/
if (ib->buf.len > MIN_CHUNK && ib->buf.len + len > MAX_CHUNK)
{
/*
* time for an incremental parse. We'll do all but the last MIN_CHUNK
* so that we have enough left for the final piece.
*/
json_parse_manifest_incremental_chunk(
ib->inc_state, ib->buf.data, ib->buf.len - MIN_CHUNK, false);
/* now remove what we just parsed */
memmove(ib->buf.data, ib->buf.data + (ib->buf.len - MIN_CHUNK),
MIN_CHUNK + 1);
ib->buf.len = MIN_CHUNK;
}
appendBinaryStringInfo(&ib->buf, data, len);
/* Switch back to previous memory context. */
@ -196,20 +228,14 @@ AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data,
void
FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
{
JsonManifestParseContext context;
MemoryContext oldcontext;
/* Switch to our memory context. */
oldcontext = MemoryContextSwitchTo(ib->mcxt);
/* Parse the manifest. */
context.private_data = ib;
context.version_cb = manifest_process_version;
context.system_identifier_cb = manifest_process_system_identifier;
context.per_file_cb = manifest_process_file;
context.per_wal_range_cb = manifest_process_wal_range;
context.error_cb = manifest_report_error;
json_parse_manifest(&context, ib->buf.data, ib->buf.len);
/* Parse the last chunk of the manifest */
json_parse_manifest_incremental_chunk(
ib->inc_state, ib->buf.data, ib->buf.len, true);
/* Done with the buffer, so release memory. */
pfree(ib->buf.data);

View File

@ -34,6 +34,12 @@
*/
#define ESTIMATED_BYTES_PER_MANIFEST_LINE 100
/*
* size of json chunk to be read in
*
*/
#define READ_CHUNK_SIZE (128 * 1024)
/*
* Define a hash table which we can use to store information about the files
* mentioned in the backup manifest.
@ -109,6 +115,7 @@ load_backup_manifest(char *backup_directory)
int rc;
JsonManifestParseContext context;
manifest_data *result;
int chunk_size = READ_CHUNK_SIZE;
/* Open the manifest file. */
snprintf(pathname, MAXPGPATH, "%s/backup_manifest", backup_directory);
@ -133,27 +140,6 @@ load_backup_manifest(char *backup_directory)
/* Create the hash table. */
ht = manifest_files_create(initial_size, NULL);
/*
* Slurp in the whole file.
*
* This is not ideal, but there's currently no way to get pg_parse_json()
* to perform incremental parsing.
*/
buffer = pg_malloc(statbuf.st_size);
rc = read(fd, buffer, statbuf.st_size);
if (rc != statbuf.st_size)
{
if (rc < 0)
pg_fatal("could not read file \"%s\": %m", pathname);
else
pg_fatal("could not read file \"%s\": read %d of %lld",
pathname, rc, (long long int) statbuf.st_size);
}
/* Close the manifest file. */
close(fd);
/* Parse the manifest. */
result = pg_malloc0(sizeof(manifest_data));
result->files = ht;
context.private_data = result;
@ -162,7 +148,69 @@ load_backup_manifest(char *backup_directory)
context.per_file_cb = combinebackup_per_file_cb;
context.per_wal_range_cb = combinebackup_per_wal_range_cb;
context.error_cb = report_manifest_error;
json_parse_manifest(&context, buffer, statbuf.st_size);
/*
* Parse the file, in chunks if necessary.
*/
if (statbuf.st_size <= chunk_size)
{
buffer = pg_malloc(statbuf.st_size);
rc = read(fd, buffer, statbuf.st_size);
if (rc != statbuf.st_size)
{
if (rc < 0)
pg_fatal("could not read file \"%s\": %m", pathname);
else
pg_fatal("could not read file \"%s\": read %d of %lld",
pathname, rc, (long long int) statbuf.st_size);
}
/* Close the manifest file. */
close(fd);
/* Parse the manifest. */
json_parse_manifest(&context, buffer, statbuf.st_size);
}
else
{
int bytes_left = statbuf.st_size;
JsonManifestParseIncrementalState *inc_state;
inc_state = json_parse_manifest_incremental_init(&context);
buffer = pg_malloc(chunk_size + 1);
while (bytes_left > 0)
{
int bytes_to_read = chunk_size;
/*
* Make sure that the last chunk is sufficiently large. (i.e. at
* least half the chunk size) so that it will contain fully the
* piece at the end with the checksum.
*/
if (bytes_left < chunk_size)
bytes_to_read = bytes_left;
else if (bytes_left < 2 * chunk_size)
bytes_to_read = bytes_left / 2;
rc = read(fd, buffer, bytes_to_read);
if (rc != bytes_to_read)
{
if (rc < 0)
pg_fatal("could not read file \"%s\": %m", pathname);
else
pg_fatal("could not read file \"%s\": read %lld of %lld",
pathname,
(long long int) (statbuf.st_size + rc - bytes_left),
(long long int) statbuf.st_size);
}
bytes_left -= rc;
json_parse_manifest_incremental_chunk(
inc_state, buffer, rc, bytes_left == 0);
}
close(fd);
}
/* All done. */
pfree(buffer);

View File

@ -43,7 +43,7 @@
/*
* How many bytes should we try to read from a file at once?
*/
#define READ_CHUNK_SIZE 4096
#define READ_CHUNK_SIZE (128 * 1024)
/*
* Each file described by the manifest file is parsed to produce an object
@ -399,6 +399,8 @@ parse_manifest_file(char *manifest_path)
JsonManifestParseContext context;
manifest_data *result;
int chunk_size = READ_CHUNK_SIZE;
/* Open the manifest file. */
if ((fd = open(manifest_path, O_RDONLY | PG_BINARY, 0)) < 0)
report_fatal_error("could not open file \"%s\": %m", manifest_path);
@ -414,28 +416,6 @@ parse_manifest_file(char *manifest_path)
/* Create the hash table. */
ht = manifest_files_create(initial_size, NULL);
/*
* Slurp in the whole file.
*
* This is not ideal, but there's currently no easy way to get
* pg_parse_json() to perform incremental parsing.
*/
buffer = pg_malloc(statbuf.st_size);
rc = read(fd, buffer, statbuf.st_size);
if (rc != statbuf.st_size)
{
if (rc < 0)
report_fatal_error("could not read file \"%s\": %m",
manifest_path);
else
report_fatal_error("could not read file \"%s\": read %d of %lld",
manifest_path, rc, (long long int) statbuf.st_size);
}
/* Close the manifest file. */
close(fd);
/* Parse the manifest. */
result = pg_malloc0(sizeof(manifest_data));
result->files = ht;
context.private_data = result;
@ -444,7 +424,69 @@ parse_manifest_file(char *manifest_path)
context.per_file_cb = verifybackup_per_file_cb;
context.per_wal_range_cb = verifybackup_per_wal_range_cb;
context.error_cb = report_manifest_error;
json_parse_manifest(&context, buffer, statbuf.st_size);
/*
* Parse the file, in chunks if necessary.
*/
if (statbuf.st_size <= chunk_size)
{
buffer = pg_malloc(statbuf.st_size);
rc = read(fd, buffer, statbuf.st_size);
if (rc != statbuf.st_size)
{
if (rc < 0)
pg_fatal("could not read file \"%s\": %m", manifest_path);
else
pg_fatal("could not read file \"%s\": read %d of %lld",
manifest_path, rc, (long long int) statbuf.st_size);
}
/* Close the manifest file. */
close(fd);
/* Parse the manifest. */
json_parse_manifest(&context, buffer, statbuf.st_size);
}
else
{
int bytes_left = statbuf.st_size;
JsonManifestParseIncrementalState *inc_state;
inc_state = json_parse_manifest_incremental_init(&context);
buffer = pg_malloc(chunk_size + 1);
while (bytes_left > 0)
{
int bytes_to_read = chunk_size;
/*
* Make sure that the last chunk is sufficiently large. (i.e. at
* least half the chunk size) so that it will contain fully the
* piece at the end with the checksum.
*/
if (bytes_left < chunk_size)
bytes_to_read = bytes_left;
else if (bytes_left < 2 * chunk_size)
bytes_to_read = bytes_left / 2;
rc = read(fd, buffer, bytes_to_read);
if (rc != bytes_to_read)
{
if (rc < 0)
pg_fatal("could not read file \"%s\": %m", manifest_path);
else
pg_fatal("could not read file \"%s\": read %lld of %lld",
manifest_path,
(long long int) (statbuf.st_size + rc - bytes_left),
(long long int) statbuf.st_size);
}
bytes_left -= rc;
json_parse_manifest_incremental_chunk(
inc_state, buffer, rc, bytes_left == 0);
}
close(fd);
}
/* Done with the buffer. */
pfree(buffer);