From 431ba7bebf139b6edf5544ce18f39a1a4dcb8110 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 5 Dec 2019 15:14:09 -0500 Subject: [PATCH] pg_basebackup: Refactor code for reading COPY and tar data. Add a new function ReceiveCopyData that does just that, taking a callback as an argument to specify what should be done with each chunk as it is received. This allows a single copy of the logic to be shared between ReceiveTarFile and ReceiveAndUnpackTarFile, and eliminates a few #ifdef conditions based on HAVE_LIBZ. While this is slightly more code, it's arguably clearer, and there is a pending patch that introduces additional calls to ReceiveCopyData. This commit is not intended to result in any functional change. Discussion: http://postgr.es/m/CA+TgmoYZDTHbSpwZtW=JDgAhwVAYvmdSrRUjOd+AYdfNNXVBDg@mail.gmail.com --- src/bin/pg_basebackup/pg_basebackup.c | 1131 +++++++++++++------------ 1 file changed, 570 insertions(+), 561 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index a9d162a7da..16886fbe71 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -57,6 +57,40 @@ typedef struct TablespaceList TablespaceListCell *tail; } TablespaceList; +typedef struct WriteTarState +{ + int tablespacenum; + char filename[MAXPGPATH]; + FILE *tarfile; + char tarhdr[512]; + bool basetablespace; + bool in_tarhdr; + bool skip_file; + bool is_recovery_guc_supported; + bool is_postgresql_auto_conf; + bool found_postgresql_auto_conf; + int file_padding_len; + size_t tarhdrsz; + pgoff_t filesz; +#ifdef HAVE_LIBZ + gzFile ztarfile; +#endif +} WriteTarState; + +typedef struct UnpackTarState +{ + int tablespacenum; + char current_path[MAXPGPATH]; + char filename[MAXPGPATH]; + const char *mapped_tblspc_path; + pgoff_t current_len_left; + int current_padding; + FILE *file; +} UnpackTarState; + +typedef void (*WriteDataCallback) (size_t nbytes, char *buf, + void *callback_data); + /* * pg_xlog has been renamed to pg_wal in version 10. This version number * should be compared with PQserverVersion(). @@ -142,7 +176,10 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo static void progress_report(int tablespacenum, const char *filename, bool force); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); +static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); +static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, + void *callback_data); static void BaseBackup(void); static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, @@ -873,43 +910,79 @@ parse_max_rate(char *src) return (int32) result; } +/* + * Read a stream of COPY data and invoke the provided callback for each + * chunk. + */ +static void +ReceiveCopyData(PGconn *conn, WriteDataCallback callback, + void *callback_data) +{ + PGresult *res; + + /* Get the COPY data stream. */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COPY_OUT) + { + pg_log_error("could not get COPY data stream: %s", + PQerrorMessage(conn)); + exit(1); + } + PQclear(res); + + /* Loop over chunks until done. */ + while (1) + { + int r; + char *copybuf; + + r = PQgetCopyData(conn, ©buf, 0); + if (r == -1) + { + /* End of chunk. */ + break; + } + else if (r == -2) + { + pg_log_error("could not read COPY data: %s", + PQerrorMessage(conn)); + exit(1); + } + + (*callback) (r, copybuf, callback_data); + + PQfreemem(copybuf); + } +} + /* * Write a piece of tar data */ static void -writeTarData( -#ifdef HAVE_LIBZ - gzFile ztarfile, -#endif - FILE *tarfile, char *buf, int r, char *current_file) +writeTarData(WriteTarState *state, char *buf, int r) { #ifdef HAVE_LIBZ - if (ztarfile != NULL) + if (state->ztarfile != NULL) { - if (gzwrite(ztarfile, buf, r) != r) + if (gzwrite(state->ztarfile, buf, r) != r) { pg_log_error("could not write to compressed file \"%s\": %s", - current_file, get_gz_error(ztarfile)); + state->filename, get_gz_error(state->ztarfile)); exit(1); } } else #endif { - if (fwrite(buf, r, 1, tarfile) != 1) + if (fwrite(buf, r, 1, state->tarfile) != 1) { - pg_log_error("could not write to file \"%s\": %m", current_file); + pg_log_error("could not write to file \"%s\": %m", + state->filename); exit(1); } } } -#ifdef HAVE_LIBZ -#define WRITE_TAR_DATA(buf, sz) writeTarData(ztarfile, tarfile, buf, sz, filename) -#else -#define WRITE_TAR_DATA(buf, sz) writeTarData(tarfile, buf, sz, filename) -#endif - /* * Receive a tar format file from the connection to the server, and write * the data from this file directly into a tar file. If compression is @@ -923,29 +996,19 @@ writeTarData( static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) { - char filename[MAXPGPATH]; - char *copybuf = NULL; - FILE *tarfile = NULL; - char tarhdr[512]; - bool basetablespace = PQgetisnull(res, rownum, 0); - bool in_tarhdr = true; - bool skip_file = false; - bool is_recovery_guc_supported = true; - bool is_postgresql_auto_conf = false; - bool found_postgresql_auto_conf = false; - int file_padding_len = 0; - size_t tarhdrsz = 0; - pgoff_t filesz = 0; + char zerobuf[1024]; + WriteTarState state; -#ifdef HAVE_LIBZ - gzFile ztarfile = NULL; -#endif + memset(&state, 0, sizeof(state)); + state.tablespacenum = rownum; + state.basetablespace = PQgetisnull(res, rownum, 0); + state.in_tarhdr = true; /* recovery.conf is integrated into postgresql.conf in 12 and newer */ - if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_RECOVERY_GUC) - is_recovery_guc_supported = false; + if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC) + state.is_recovery_guc_supported = true; - if (basetablespace) + if (state.basetablespace) { /* * Base tablespaces @@ -959,40 +1022,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #ifdef HAVE_LIBZ if (compresslevel != 0) { - ztarfile = gzdopen(dup(fileno(stdout)), "wb"); - if (gzsetparams(ztarfile, compresslevel, + state.ztarfile = gzdopen(dup(fileno(stdout)), "wb"); + if (gzsetparams(state.ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) { pg_log_error("could not set compression level %d: %s", - compresslevel, get_gz_error(ztarfile)); + compresslevel, get_gz_error(state.ztarfile)); exit(1); } } else #endif - tarfile = stdout; - strcpy(filename, "-"); + state.tarfile = stdout; + strcpy(state.filename, "-"); } else { #ifdef HAVE_LIBZ if (compresslevel != 0) { - snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir); - ztarfile = gzopen(filename, "wb"); - if (gzsetparams(ztarfile, compresslevel, + snprintf(state.filename, sizeof(state.filename), + "%s/base.tar.gz", basedir); + state.ztarfile = gzopen(state.filename, "wb"); + if (gzsetparams(state.ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) { pg_log_error("could not set compression level %d: %s", - compresslevel, get_gz_error(ztarfile)); + compresslevel, get_gz_error(state.ztarfile)); exit(1); } } else #endif { - snprintf(filename, sizeof(filename), "%s/base.tar", basedir); - tarfile = fopen(filename, "wb"); + snprintf(state.filename, sizeof(state.filename), + "%s/base.tar", basedir); + state.tarfile = fopen(state.filename, "wb"); } } } @@ -1004,34 +1069,35 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #ifdef HAVE_LIBZ if (compresslevel != 0) { - snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir, - PQgetvalue(res, rownum, 0)); - ztarfile = gzopen(filename, "wb"); - if (gzsetparams(ztarfile, compresslevel, + snprintf(state.filename, sizeof(state.filename), + "%s/%s.tar.gz", + basedir, PQgetvalue(res, rownum, 0)); + state.ztarfile = gzopen(state.filename, "wb"); + if (gzsetparams(state.ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) { pg_log_error("could not set compression level %d: %s", - compresslevel, get_gz_error(ztarfile)); + compresslevel, get_gz_error(state.ztarfile)); exit(1); } } else #endif { - snprintf(filename, sizeof(filename), "%s/%s.tar", basedir, - PQgetvalue(res, rownum, 0)); - tarfile = fopen(filename, "wb"); + snprintf(state.filename, sizeof(state.filename), "%s/%s.tar", + basedir, PQgetvalue(res, rownum, 0)); + state.tarfile = fopen(state.filename, "wb"); } } #ifdef HAVE_LIBZ if (compresslevel != 0) { - if (!ztarfile) + if (!state.ztarfile) { /* Compression is in use */ pg_log_error("could not create compressed file \"%s\": %s", - filename, get_gz_error(ztarfile)); + state.filename, get_gz_error(state.ztarfile)); exit(1); } } @@ -1039,309 +1105,99 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #endif { /* Either no zlib support, or zlib support but compresslevel = 0 */ - if (!tarfile) + if (!state.tarfile) { - pg_log_error("could not create file \"%s\": %m", filename); + pg_log_error("could not create file \"%s\": %m", state.filename); exit(1); } } + ReceiveCopyData(conn, ReceiveTarCopyChunk, &state); + /* - * Get the COPY data stream + * End of copy data. If requested, and this is the base tablespace, write + * configuration file into the tarfile. When done, close the file (but not + * stdout). + * + * Also, write two completely empty blocks at the end of the tar file, as + * required by some tar programs. */ - res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COPY_OUT) + + MemSet(zerobuf, 0, sizeof(zerobuf)); + + if (state.basetablespace && writerecoveryconf) { - pg_log_error("could not get COPY data stream: %s", - PQerrorMessage(conn)); - exit(1); + char header[512]; + + /* + * If postgresql.auto.conf has not been found in the streamed data, + * add recovery configuration to postgresql.auto.conf if recovery + * parameters are GUCs. If the instance connected to is older than + * 12, create recovery.conf with this data otherwise. + */ + if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported) + { + int padding; + + tarCreateHeader(header, + state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf", + NULL, + recoveryconfcontents->len, + pg_file_create_mode, 04000, 02000, + time(NULL)); + + padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len; + + writeTarData(&state, header, sizeof(header)); + writeTarData(&state, recoveryconfcontents->data, + recoveryconfcontents->len); + if (padding) + writeTarData(&state, zerobuf, padding); + } + + /* + * standby.signal is supported only if recovery parameters are GUCs. + */ + if (state.is_recovery_guc_supported) + { + tarCreateHeader(header, "standby.signal", NULL, + 0, /* zero-length file */ + pg_file_create_mode, 04000, 02000, + time(NULL)); + + writeTarData(&state, header, sizeof(header)); + writeTarData(&state, zerobuf, 511); + } } - while (1) - { - int r; - - if (copybuf != NULL) - { - PQfreemem(copybuf); - copybuf = NULL; - } - - r = PQgetCopyData(conn, ©buf, 0); - if (r == -1) - { - /* - * End of chunk. If requested, and this is the base tablespace, - * write configuration file into the tarfile. When done, close the - * file (but not stdout). - * - * Also, write two completely empty blocks at the end of the tar - * file, as required by some tar programs. - */ - char zerobuf[1024]; - - MemSet(zerobuf, 0, sizeof(zerobuf)); - - if (basetablespace && writerecoveryconf) - { - char header[512]; - - /* - * If postgresql.auto.conf has not been found in the streamed - * data, add recovery configuration to postgresql.auto.conf if - * recovery parameters are GUCs. If the instance connected to - * is older than 12, create recovery.conf with this data - * otherwise. - */ - if (!found_postgresql_auto_conf || !is_recovery_guc_supported) - { - int padding; - - tarCreateHeader(header, - is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf", - NULL, - recoveryconfcontents->len, - pg_file_create_mode, 04000, 02000, - time(NULL)); - - padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len; - - WRITE_TAR_DATA(header, sizeof(header)); - WRITE_TAR_DATA(recoveryconfcontents->data, - recoveryconfcontents->len); - if (padding) - WRITE_TAR_DATA(zerobuf, padding); - } - - /* - * standby.signal is supported only if recovery parameters are - * GUCs. - */ - if (is_recovery_guc_supported) - { - tarCreateHeader(header, "standby.signal", NULL, - 0, /* zero-length file */ - pg_file_create_mode, 04000, 02000, - time(NULL)); - - WRITE_TAR_DATA(header, sizeof(header)); - WRITE_TAR_DATA(zerobuf, 511); - } - } - - /* 2 * 512 bytes empty data at end of file */ - WRITE_TAR_DATA(zerobuf, sizeof(zerobuf)); + /* 2 * 512 bytes empty data at end of file */ + writeTarData(&state, zerobuf, sizeof(zerobuf)); #ifdef HAVE_LIBZ - if (ztarfile != NULL) - { - if (gzclose(ztarfile) != 0) - { - pg_log_error("could not close compressed file \"%s\": %s", - filename, get_gz_error(ztarfile)); - exit(1); - } - } - else -#endif - { - if (strcmp(basedir, "-") != 0) - { - if (fclose(tarfile) != 0) - { - pg_log_error("could not close file \"%s\": %m", - filename); - exit(1); - } - } - } - - break; - } - else if (r == -2) + if (state.ztarfile != NULL) + { + if (gzclose(state.ztarfile) != 0) { - pg_log_error("could not read COPY data: %s", - PQerrorMessage(conn)); + pg_log_error("could not close compressed file \"%s\": %s", + state.filename, get_gz_error(state.ztarfile)); exit(1); } - - if (!writerecoveryconf || !basetablespace) + } + else +#endif + { + if (strcmp(basedir, "-") != 0) { - /* - * When not writing config file, or when not working on the base - * tablespace, we never have to look for an existing configuration - * file in the stream. - */ - WRITE_TAR_DATA(copybuf, r); - } - else - { - /* - * Look for a config file in the existing tar stream. If it's - * there, we must skip it so we can later overwrite it with our - * own version of the file. - * - * To do this, we have to process the individual files inside the - * TAR stream. The stream consists of a header and zero or more - * chunks, all 512 bytes long. The stream from the server is - * broken up into smaller pieces, so we have to track the size of - * the files to find the next header structure. - */ - int rr = r; - int pos = 0; - - while (rr > 0) + if (fclose(state.tarfile) != 0) { - if (in_tarhdr) - { - /* - * We're currently reading a header structure inside the - * TAR stream, i.e. the file metadata. - */ - if (tarhdrsz < 512) - { - /* - * Copy the header structure into tarhdr in case the - * header is not aligned to 512 bytes or it's not - * returned in whole by the last PQgetCopyData call. - */ - int hdrleft; - int bytes2copy; - - hdrleft = 512 - tarhdrsz; - bytes2copy = (rr > hdrleft ? hdrleft : rr); - - memcpy(&tarhdr[tarhdrsz], copybuf + pos, bytes2copy); - - rr -= bytes2copy; - pos += bytes2copy; - tarhdrsz += bytes2copy; - } - else - { - /* - * We have the complete header structure in tarhdr, - * look at the file metadata: we may want append - * recovery info into postgresql.auto.conf and skip - * standby.signal file if recovery parameters are - * integrated as GUCs, and recovery.conf otherwise. In - * both cases we must calculate tar padding. - */ - if (is_recovery_guc_supported) - { - skip_file = (strcmp(&tarhdr[0], "standby.signal") == 0); - is_postgresql_auto_conf = (strcmp(&tarhdr[0], "postgresql.auto.conf") == 0); - } - else - skip_file = (strcmp(&tarhdr[0], "recovery.conf") == 0); - - filesz = read_tar_number(&tarhdr[124], 12); - file_padding_len = ((filesz + 511) & ~511) - filesz; - - if (is_recovery_guc_supported && - is_postgresql_auto_conf && - writerecoveryconf) - { - /* replace tar header */ - char header[512]; - - tarCreateHeader(header, "postgresql.auto.conf", NULL, - filesz + recoveryconfcontents->len, - pg_file_create_mode, 04000, 02000, - time(NULL)); - - WRITE_TAR_DATA(header, sizeof(header)); - } - else - { - /* copy stream with padding */ - filesz += file_padding_len; - - if (!skip_file) - { - /* - * If we're not skipping the file, write the - * tar header unmodified. - */ - WRITE_TAR_DATA(tarhdr, 512); - } - } - - /* Next part is the file, not the header */ - in_tarhdr = false; - } - } - else - { - /* - * We're processing a file's contents. - */ - if (filesz > 0) - { - /* - * We still have data to read (and possibly write). - */ - int bytes2write; - - bytes2write = (filesz > rr ? rr : filesz); - - if (!skip_file) - WRITE_TAR_DATA(copybuf + pos, bytes2write); - - rr -= bytes2write; - pos += bytes2write; - filesz -= bytes2write; - } - else if (is_recovery_guc_supported && - is_postgresql_auto_conf && - writerecoveryconf) - { - /* append recovery config to postgresql.auto.conf */ - int padding; - int tailsize; - - tailsize = (512 - file_padding_len) + recoveryconfcontents->len; - padding = ((tailsize + 511) & ~511) - tailsize; - - WRITE_TAR_DATA(recoveryconfcontents->data, recoveryconfcontents->len); - - if (padding) - { - char zerobuf[512]; - - MemSet(zerobuf, 0, sizeof(zerobuf)); - WRITE_TAR_DATA(zerobuf, padding); - } - - /* skip original file padding */ - is_postgresql_auto_conf = false; - skip_file = true; - filesz += file_padding_len; - - found_postgresql_auto_conf = true; - } - else - { - /* - * No more data in the current file, the next piece of - * data (if any) will be a new file header structure. - */ - in_tarhdr = true; - skip_file = false; - is_postgresql_auto_conf = false; - tarhdrsz = 0; - filesz = 0; - } - } + pg_log_error("could not close file \"%s\": %m", + state.filename); + exit(1); } } - totaldone += r; - progress_report(rownum, filename, false); - } /* while (1) */ - progress_report(rownum, filename, true); + } - if (copybuf != NULL) - PQfreemem(copybuf); + progress_report(rownum, state.filename, true); /* * Do not sync the resulting tar file yet, all files are synced once at @@ -1349,6 +1205,194 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) */ } +/* + * Receive one chunk of tar-format data from the server. + */ +static void +ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data) +{ + WriteTarState *state = callback_data; + + if (!writerecoveryconf || !state->basetablespace) + { + /* + * When not writing config file, or when not working on the base + * tablespace, we never have to look for an existing configuration + * file in the stream. + */ + writeTarData(state, copybuf, r); + } + else + { + /* + * Look for a config file in the existing tar stream. If it's there, + * we must skip it so we can later overwrite it with our own version + * of the file. + * + * To do this, we have to process the individual files inside the TAR + * stream. The stream consists of a header and zero or more chunks, + * all 512 bytes long. The stream from the server is broken up into + * smaller pieces, so we have to track the size of the files to find + * the next header structure. + */ + int rr = r; + int pos = 0; + + while (rr > 0) + { + if (state->in_tarhdr) + { + /* + * We're currently reading a header structure inside the TAR + * stream, i.e. the file metadata. + */ + if (state->tarhdrsz < 512) + { + /* + * Copy the header structure into tarhdr in case the + * header is not aligned to 512 bytes or it's not returned + * in whole by the last PQgetCopyData call. + */ + int hdrleft; + int bytes2copy; + + hdrleft = 512 - state->tarhdrsz; + bytes2copy = (rr > hdrleft ? hdrleft : rr); + + memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos, + bytes2copy); + + rr -= bytes2copy; + pos += bytes2copy; + state->tarhdrsz += bytes2copy; + } + else + { + /* + * We have the complete header structure in tarhdr, look + * at the file metadata: we may want append recovery info + * into postgresql.auto.conf and skip standby.signal file + * if recovery parameters are integrated as GUCs, and + * recovery.conf otherwise. In both cases we must + * calculate tar padding. + */ + if (state->is_recovery_guc_supported) + { + state->skip_file = + (strcmp(&state->tarhdr[0], "standby.signal") == 0); + state->is_postgresql_auto_conf = + (strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0); + } + else + state->skip_file = + (strcmp(&state->tarhdr[0], "recovery.conf") == 0); + + state->filesz = read_tar_number(&state->tarhdr[124], 12); + state->file_padding_len = + ((state->filesz + 511) & ~511) - state->filesz; + + if (state->is_recovery_guc_supported && + state->is_postgresql_auto_conf && + writerecoveryconf) + { + /* replace tar header */ + char header[512]; + + tarCreateHeader(header, "postgresql.auto.conf", NULL, + state->filesz + recoveryconfcontents->len, + pg_file_create_mode, 04000, 02000, + time(NULL)); + + writeTarData(state, header, sizeof(header)); + } + else + { + /* copy stream with padding */ + state->filesz += state->file_padding_len; + + if (!state->skip_file) + { + /* + * If we're not skipping the file, write the tar + * header unmodified. + */ + writeTarData(state, state->tarhdr, 512); + } + } + + /* Next part is the file, not the header */ + state->in_tarhdr = false; + } + } + else + { + /* + * We're processing a file's contents. + */ + if (state->filesz > 0) + { + /* + * We still have data to read (and possibly write). + */ + int bytes2write; + + bytes2write = (state->filesz > rr ? rr : state->filesz); + + if (!state->skip_file) + writeTarData(state, copybuf + pos, bytes2write); + + rr -= bytes2write; + pos += bytes2write; + state->filesz -= bytes2write; + } + else if (state->is_recovery_guc_supported && + state->is_postgresql_auto_conf && + writerecoveryconf) + { + /* append recovery config to postgresql.auto.conf */ + int padding; + int tailsize; + + tailsize = (512 - state->file_padding_len) + recoveryconfcontents->len; + padding = ((tailsize + 511) & ~511) - tailsize; + + writeTarData(state, recoveryconfcontents->data, + recoveryconfcontents->len); + + if (padding) + { + char zerobuf[512]; + + MemSet(zerobuf, 0, sizeof(zerobuf)); + writeTarData(state, zerobuf, padding); + } + + /* skip original file padding */ + state->is_postgresql_auto_conf = false; + state->skip_file = true; + state->filesz += state->file_padding_len; + + state->found_postgresql_auto_conf = true; + } + else + { + /* + * No more data in the current file, the next piece of + * data (if any) will be a new file header structure. + */ + state->in_tarhdr = true; + state->skip_file = false; + state->is_postgresql_auto_conf = false; + state->tarhdrsz = 0; + state->filesz = 0; + } + } + } + } + totaldone += r; + progress_report(state->tablespacenum, state->filename, false); +} + /* * Retrieve tablespace path, either relocated or original depending on whether @@ -1384,244 +1428,34 @@ get_tablespace_mapping(const char *dir) static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) { - char current_path[MAXPGPATH]; - char filename[MAXPGPATH]; - const char *mapped_tblspc_path; - pgoff_t current_len_left = 0; - int current_padding = 0; + UnpackTarState state; bool basetablespace; - char *copybuf = NULL; - FILE *file = NULL; + + memset(&state, 0, sizeof(state)); + state.tablespacenum = rownum; basetablespace = PQgetisnull(res, rownum, 0); if (basetablespace) - strlcpy(current_path, basedir, sizeof(current_path)); + strlcpy(state.current_path, basedir, sizeof(state.current_path)); else - strlcpy(current_path, + strlcpy(state.current_path, get_tablespace_mapping(PQgetvalue(res, rownum, 1)), - sizeof(current_path)); + sizeof(state.current_path)); - /* - * Get the COPY data - */ - res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COPY_OUT) - { - pg_log_error("could not get COPY data stream: %s", - PQerrorMessage(conn)); - exit(1); - } + ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state); - while (1) - { - int r; - if (copybuf != NULL) - { - PQfreemem(copybuf); - copybuf = NULL; - } + if (state.file) + fclose(state.file); - r = PQgetCopyData(conn, ©buf, 0); + progress_report(rownum, state.filename, true); - if (r == -1) - { - /* - * End of chunk - */ - if (file) - fclose(file); - - break; - } - else if (r == -2) - { - pg_log_error("could not read COPY data: %s", - PQerrorMessage(conn)); - exit(1); - } - - if (file == NULL) - { -#ifndef WIN32 - int filemode; -#endif - - /* - * No current file, so this must be the header for a new file - */ - if (r != 512) - { - pg_log_error("invalid tar block header size: %d", r); - exit(1); - } - totaldone += 512; - - current_len_left = read_tar_number(©buf[124], 12); - -#ifndef WIN32 - /* Set permissions on the file */ - filemode = read_tar_number(©buf[100], 8); -#endif - - /* - * All files are padded up to 512 bytes - */ - current_padding = - ((current_len_left + 511) & ~511) - current_len_left; - - /* - * First part of header is zero terminated filename - */ - snprintf(filename, sizeof(filename), "%s/%s", current_path, - copybuf); - if (filename[strlen(filename) - 1] == '/') - { - /* - * Ends in a slash means directory or symlink to directory - */ - if (copybuf[156] == '5') - { - /* - * Directory - */ - filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ - if (mkdir(filename, pg_dir_create_mode) != 0) - { - /* - * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 - * clusters) will have been created by the wal - * receiver process. Also, when the WAL directory - * location was specified, pg_wal (or pg_xlog) has - * already been created as a symbolic link before - * starting the actual backup. So just ignore creation - * failures on related directories. - */ - if (!((pg_str_endswith(filename, "/pg_wal") || - pg_str_endswith(filename, "/pg_xlog") || - pg_str_endswith(filename, "/archive_status")) && - errno == EEXIST)) - { - pg_log_error("could not create directory \"%s\": %m", - filename); - exit(1); - } - } -#ifndef WIN32 - if (chmod(filename, (mode_t) filemode)) - pg_log_error("could not set permissions on directory \"%s\": %m", - filename); -#endif - } - else if (copybuf[156] == '2') - { - /* - * Symbolic link - * - * It's most likely a link in pg_tblspc directory, to the - * location of a tablespace. Apply any tablespace mapping - * given on the command line (--tablespace-mapping). (We - * blindly apply the mapping without checking that the - * link really is inside pg_tblspc. We don't expect there - * to be other symlinks in a data directory, but if there - * are, you can call it an undocumented feature that you - * can map them too.) - */ - filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ - - mapped_tblspc_path = get_tablespace_mapping(©buf[157]); - if (symlink(mapped_tblspc_path, filename) != 0) - { - pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", - filename, mapped_tblspc_path); - exit(1); - } - } - else - { - pg_log_error("unrecognized link indicator \"%c\"", - copybuf[156]); - exit(1); - } - continue; /* directory or link handled */ - } - - /* - * regular file - */ - file = fopen(filename, "wb"); - if (!file) - { - pg_log_error("could not create file \"%s\": %m", filename); - exit(1); - } - -#ifndef WIN32 - if (chmod(filename, (mode_t) filemode)) - pg_log_error("could not set permissions on file \"%s\": %m", - filename); -#endif - - if (current_len_left == 0) - { - /* - * Done with this file, next one will be a new tar header - */ - fclose(file); - file = NULL; - continue; - } - } /* new file */ - else - { - /* - * Continuing blocks in existing file - */ - if (current_len_left == 0 && r == current_padding) - { - /* - * Received the padding block for this file, ignore it and - * close the file, then move on to the next tar header. - */ - fclose(file); - file = NULL; - totaldone += r; - continue; - } - - if (fwrite(copybuf, r, 1, file) != 1) - { - pg_log_error("could not write to file \"%s\": %m", filename); - exit(1); - } - totaldone += r; - progress_report(rownum, filename, false); - - current_len_left -= r; - if (current_len_left == 0 && current_padding == 0) - { - /* - * Received the last block, and there is no padding to be - * expected. Close the file and move on to the next tar - * header. - */ - fclose(file); - file = NULL; - continue; - } - } /* continuing data in existing file */ - } /* loop over all data blocks */ - progress_report(rownum, filename, true); - - if (file != NULL) + if (state.file != NULL) { pg_log_error("COPY stream ended before last file was finished"); exit(1); } - if (copybuf != NULL) - PQfreemem(copybuf); - if (basetablespace && writerecoveryconf) WriteRecoveryConfig(conn, basedir, recoveryconfcontents); @@ -1631,6 +1465,181 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) */ } +static void +ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data) +{ + UnpackTarState *state = callback_data; + + if (state->file == NULL) + { +#ifndef WIN32 + int filemode; +#endif + + /* + * No current file, so this must be the header for a new file + */ + if (r != 512) + { + pg_log_error("invalid tar block header size: %zu", r); + exit(1); + } + totaldone += 512; + + state->current_len_left = read_tar_number(©buf[124], 12); + +#ifndef WIN32 + /* Set permissions on the file */ + filemode = read_tar_number(©buf[100], 8); +#endif + + /* + * All files are padded up to 512 bytes + */ + state->current_padding = + ((state->current_len_left + 511) & ~511) - state->current_len_left; + + /* + * First part of header is zero terminated filename + */ + snprintf(state->filename, sizeof(state->filename), + "%s/%s", state->current_path, copybuf); + if (state->filename[strlen(state->filename) - 1] == '/') + { + /* + * Ends in a slash means directory or symlink to directory + */ + if (copybuf[156] == '5') + { + /* + * Directory. Remove trailing slash first. + */ + state->filename[strlen(state->filename) - 1] = '\0'; + if (mkdir(state->filename, pg_dir_create_mode) != 0) + { + /* + * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 + * clusters) will have been created by the wal receiver + * process. Also, when the WAL directory location was + * specified, pg_wal (or pg_xlog) has already been created + * as a symbolic link before starting the actual backup. + * So just ignore creation failures on related + * directories. + */ + if (!((pg_str_endswith(state->filename, "/pg_wal") || + pg_str_endswith(state->filename, "/pg_xlog") || + pg_str_endswith(state->filename, "/archive_status")) && + errno == EEXIST)) + { + pg_log_error("could not create directory \"%s\": %m", + state->filename); + exit(1); + } + } +#ifndef WIN32 + if (chmod(state->filename, (mode_t) filemode)) + pg_log_error("could not set permissions on directory \"%s\": %m", + state->filename); +#endif + } + else if (copybuf[156] == '2') + { + /* + * Symbolic link + * + * It's most likely a link in pg_tblspc directory, to the + * location of a tablespace. Apply any tablespace mapping + * given on the command line (--tablespace-mapping). (We + * blindly apply the mapping without checking that the link + * really is inside pg_tblspc. We don't expect there to be + * other symlinks in a data directory, but if there are, you + * can call it an undocumented feature that you can map them + * too.) + */ + state->filename[strlen(state->filename) - 1] = '\0'; /* Remove trailing slash */ + + state->mapped_tblspc_path = + get_tablespace_mapping(©buf[157]); + if (symlink(state->mapped_tblspc_path, state->filename) != 0) + { + pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", + state->filename, state->mapped_tblspc_path); + exit(1); + } + } + else + { + pg_log_error("unrecognized link indicator \"%c\"", + copybuf[156]); + exit(1); + } + return; /* directory or link handled */ + } + + /* + * regular file + */ + state->file = fopen(state->filename, "wb"); + if (!state->file) + { + pg_log_error("could not create file \"%s\": %m", state->filename); + exit(1); + } + +#ifndef WIN32 + if (chmod(state->filename, (mode_t) filemode)) + pg_log_error("could not set permissions on file \"%s\": %m", + state->filename); +#endif + + if (state->current_len_left == 0) + { + /* + * Done with this file, next one will be a new tar header + */ + fclose(state->file); + state->file = NULL; + return; + } + } /* new file */ + else + { + /* + * Continuing blocks in existing file + */ + if (state->current_len_left == 0 && r == state->current_padding) + { + /* + * Received the padding block for this file, ignore it and close + * the file, then move on to the next tar header. + */ + fclose(state->file); + state->file = NULL; + totaldone += r; + return; + } + + if (fwrite(copybuf, r, 1, state->file) != 1) + { + pg_log_error("could not write to file \"%s\": %m", state->filename); + exit(1); + } + totaldone += r; + progress_report(state->tablespacenum, state->filename, false); + + state->current_len_left -= r; + if (state->current_len_left == 0 && state->current_padding == 0) + { + /* + * Received the last block, and there is no padding to be + * expected. Close the file and move on to the next tar header. + */ + fclose(state->file); + state->file = NULL; + return; + } + } /* continuing data in existing file */ +} static void BaseBackup(void)