Add pg_rewind, for re-synchronizing a master server after failback.

Earlier versions of this tool were available (and still are) on github.

Thanks to Michael Paquier, Alvaro Herrera, Peter Eisentraut, Amit Kapila,
and Satoshi Nagayasu for review.
This commit is contained in:
Heikki Linnakangas 2015-03-23 19:47:52 +02:00
parent 87cec51d3a
commit 61081e75c6
29 changed files with 4141 additions and 2 deletions

View File

@ -1272,7 +1272,9 @@ primary_slot_name = 'node_a_slot'
and might stay down. To return to normal operation, a standby server
must be recreated,
either on the former primary system when it comes up, or on a third,
possibly new, system. Once complete, the primary and standby can be
possibly new, system. The <xref linkend="app-pgrewind"> utility can be
used to speed up this process on large clusters.
Once complete, the primary and standby can be
considered to have switched roles. Some people choose to use a third
server to provide backup for the new primary until the new standby
server is recreated,

View File

@ -190,6 +190,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY pgRecvlogical SYSTEM "pg_recvlogical.sgml">
<!ENTITY pgResetxlog SYSTEM "pg_resetxlog.sgml">
<!ENTITY pgRestore SYSTEM "pg_restore.sgml">
<!ENTITY pgRewind SYSTEM "pg_rewind.sgml">
<!ENTITY postgres SYSTEM "postgres-ref.sgml">
<!ENTITY postmaster SYSTEM "postmaster.sgml">
<!ENTITY psqlRef SYSTEM "psql-ref.sgml">

View File

@ -0,0 +1,237 @@
<!--
doc/src/sgml/ref/pg_rewind.sgml
PostgreSQL documentation
-->
<refentry id="app-pgrewind">
<indexterm zone="app-pgrewind">
<primary>pg_rewind</primary>
</indexterm>
<refmeta>
<refentrytitle><application>pg_rewind</application></refentrytitle>
<manvolnum>1</manvolnum>
<refmiscinfo>Application</refmiscinfo>
</refmeta>
<refnamediv>
<refname>pg_rewind</refname>
<refpurpose>synchronize a <productname>PostgreSQL</productname> data directory with another data directory that was forked from the first one</refpurpose>
</refnamediv>
<refsynopsisdiv>
<cmdsynopsis>
<command>pg_rewind</command>
<arg rep="repeat"><replaceable>option</replaceable></arg>
<group choice="plain">
<group choice="req">
<arg choice="plain"><option>-D </option></arg>
<arg choice="plain"><option>--target-pgdata</option></arg>
</group>
<replaceable> directory</replaceable>
<group choice="req">
<arg choice="plain"><option>--source-pgdata=<replaceable>directory</replaceable></option></arg>
<arg choice="plain"><option>--source-server=<replaceable>connstr</replaceable></option></arg>
</group>
</group>
</cmdsynopsis>
</refsynopsisdiv>
<refsect1>
<title>Description</title>
<para>
<application>pg_rewind</> is a tool for synchronizing a PostgreSQL cluster
with another copy of the same cluster, after the clusters' timelines have
diverged. A typical scenario is to bring an old master server back online
after failover, as a standby that follows the new master.
</para>
<para>
The result is equivalent to replacing the target data directory with the
source one. All files are copied, including configuration files. The
advantage of <application>pg_rewind</> over taking a new base backup, or
tools like <application>rsync</>, is that <application>pg_rewind</> does
not require reading through all unchanged files in the cluster. That makes
it a lot faster when the database is large and only a small portion of it
differs between the clusters.
</para>
<para>
<application>pg_rewind</> examines the timeline histories of the source
and target clusters to determine the point where they diverged, and
expects to find WAL in the target cluster's <filename>pg_xlog</> directory
reaching all the way back to the point of divergence. In the typical
failover scenario where the target cluster was shut down soon after the
divergence, that is not a problem, but if the target cluster had run for a
long time after the divergence, the old WAL files might not be present
anymore. In that case, they can be manually copied from the WAL archive to
the <filename>pg_xlog</> directory. Fetching missing files from a WAL
archive automatically is currently not supported.
</para>
<para>
When the target server is started up for the first time after running
<application>pg_rewind</>, it will go into recovery mode and replay all
WAL generated in the source server after the point of divergence.
If some of the WAL was no longer available in the source server when
<application>pg_rewind</> was run, and therefore could not be copied by
<application>pg_rewind</> session, it needs to be made available when the
target server is started up. That can be done by creating a
<filename>recovery.conf</> file in the target data directory with a
suitable <varname>restore_command</>.
</para>
</refsect1>
<refsect1>
<title>Options</title>
<para>
<application>pg_rewind</application> accepts the following command-line
arguments:
<variablelist>
<varlistentry>
<term><option>-D</option></term>
<term><option>--target-pgdata</option></term>
<listitem>
<para>
This option specifies the target data directory that is synchronized
with the source. The target server must shut down cleanly before
running <application>pg_rewind</application>
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--source-pgdata</option></term>
<listitem>
<para>
Specifies path to the data directory of the source server, to
synchronize the target with. When <option>--source-pgdata</> is
used, the source server must be cleanly shut down.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--source-server</option></term>
<listitem>
<para>
Specifies a libpq connection string to connect to the source
<productname>PostgreSQL</> server to synchronize the target with.
The server must be up and running, and must not be in recovery mode.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-n</option></term>
<term><option>--dry-run</option></term>
<listitem>
<para>
Do everything except actually modifying the target directory.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-P</option></term>
<term><option>--progress</option></term>
<listitem>
<para>
Enables progress reporting. Turning this on will deliver an approximate
progress report while copying data over from the source cluster.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>--debug</option></term>
<listitem>
<para>
Print verbose debugging output that is mostly useful for developers
debugging <application>pg_rewind</>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-V</option></term>
<term><option>--version</option></term>
<listitem><para>Display version information, then exit</para></listitem>
</varlistentry>
<varlistentry>
<term><option>-?</option></term>
<term><option>--help</option></term>
<listitem><para>Show help, then exit</para></listitem>
</varlistentry>
</variablelist>
</para>
</refsect1>
<refsect1>
<title>Environment</title>
<para>
When <option>--source-server</> option is used,
<application>pg_rewind</application> also uses the environment variables
supported by <application>libpq</> (see <xref linkend="libpq-envars">).
</para>
</refsect1>
<refsect1>
<title>Notes</title>
<para>
<application>pg_rewind</> requires that the <varname>wal_log_hints</>
option is enabled in <filename>postgresql.conf</>, or that data checksums
were enabled when the cluster was initialized with <application>initdb</>.
<varname>full_page_writes</> must also be enabled.
</para>
<refsect2>
<title>How it works</title>
<para>
The basic idea is to copy everything from the new cluster to the old
cluster, except for the blocks that we know to be the same.
</para>
<procedure>
<step>
<para>
Scan the WAL log of the old cluster, starting from the last checkpoint
before the point where the new cluster's timeline history forked off
from the old cluster. For each WAL record, make a note of the data
blocks that were touched. This yields a list of all the data blocks
that were changed in the old cluster, after the new cluster forked off.
</para>
</step>
<step>
<para>
Copy all those changed blocks from the new cluster to the old cluster.
</para>
</step>
<step>
<para>
Copy all other files like clog, conf files etc. from the new cluster
to old cluster. Everything except the relation files.
</para>
</step>
<step>
<para>
Apply the WAL from the new cluster, starting from the checkpoint
created at failover. (Strictly speaking, <application>pg_rewind</>
doesn't apply the WAL, it just creates a backup label file indicating
that when <productname>PostgreSQL</> is started, it will start replay
from that checkpoint and apply all the required WAL.)
</para>
</step>
</procedure>
</refsect2>
</refsect1>
</refentry>

View File

@ -260,6 +260,7 @@
&pgControldata;
&pgCtl;
&pgResetxlog;
&pgRewind;
&postgres;
&postmaster;

View File

@ -21,6 +21,7 @@ SUBDIRS = \
pg_ctl \
pg_dump \
pg_resetxlog \
pg_rewind \
psql \
scripts

7
src/bin/pg_rewind/.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
# Files generated during build
/xlogreader.c
/pg_rewind
# Generated by test suite
/tmp_check/
/regress_log/

View File

@ -0,0 +1,52 @@
#-------------------------------------------------------------------------
#
# Makefile for src/bin/pg_rewind
#
# Portions Copyright (c) 2013-2015, PostgreSQL Global Development Group
#
# src/bin/pg_rewind/Makefile
#
#-------------------------------------------------------------------------
PGFILEDESC = "pg_rewind - repurpose an old master server as standby"
PGAPPICON = win32
subdir = src/bin/pg_rewind
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
PG_CPPFLAGS = -I$(libpq_srcdir)
PG_LIBS = $(libpq_pgport)
override CPPFLAGS := -I$(libpq_srcdir) -DFRONTEND $(CPPFLAGS)
OBJS = pg_rewind.o parsexlog.o xlogreader.o datapagemap.o timeline.o \
fetch.o file_ops.o copy_fetch.o libpq_fetch.o filemap.o logging.o \
$(WIN32RES)
EXTRA_CLEAN = $(RMGRDESCSOURCES) xlogreader.c
all: pg_rewind
pg_rewind: $(OBJS) | submake-libpq submake-libpgport
$(CC) $(CFLAGS) $^ $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
xlogreader.c: % : $(top_srcdir)/src/backend/access/transam/%
rm -f $@ && $(LN_S) $< .
install: all installdirs
$(INSTALL_PROGRAM) pg_rewind$(X) '$(DESTDIR)$(bindir)/pg_rewind$(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_rewind$(X)'
clean distclean maintainer-clean:
rm -f pg_rewind$(X) $(OBJS) xlogreader.c
rm -rf tmp_check
check: all
$(prove_check) :: local
$(prove_check) :: remote

View File

@ -0,0 +1,271 @@
package RewindTest;
# Test driver for pg_rewind. Each test consists of a cycle where a new cluster
# is first created with initdb, and a streaming replication standby is set up
# to follow the master. Then the master is shut down and the standby is
# promoted, and finally pg_rewind is used to rewind the old master, using the
# standby as the source.
#
# To run a test, the test script (in t/ subdirectory) calls the functions
# in this module. These functions should be called in this sequence:
#
# 1. init_rewind_test - sets up log file etc.
#
# 2. setup_cluster - creates a PostgreSQL cluster that runs as the master
#
# 3. create_standby - runs pg_basebackup to initialize a standby server, and
# sets it up to follow the master.
#
# 4. promote_standby - runs "pg_ctl promote" to promote the standby server.
# The old master keeps running.
#
# 5. run_pg_rewind - stops the old master (if it's still running) and runs
# pg_rewind to synchronize it with the now-promoted standby server.
#
# The test script can use the helper functions master_psql and standby_psql
# to run psql against the master and standby servers, respectively. The
# test script can also use the $connstr_master and $connstr_standby global
# variables, which contain libpq connection strings for connecting to the
# master and standby servers. The data directories are also available
# in paths $test_master_datadir and $test_standby_datadir
use TestLib;
use Test::More;
use File::Copy;
use File::Path qw(remove_tree);
use IPC::Run qw(run start);
use Exporter 'import';
our @EXPORT = qw(
$connstr_master
$connstr_standby
$test_master_datadir
$test_standby_datadir
append_to_file
master_psql
standby_psql
check_query
init_rewind_test
setup_cluster
create_standby
promote_standby
run_pg_rewind
);
# Adjust these paths for your environment
my $testroot = "./tmp_check";
$test_master_datadir="$testroot/data_master";
$test_standby_datadir="$testroot/data_standby";
mkdir $testroot;
# Log files are created here
mkdir "regress_log";
# Define non-conflicting ports for both nodes.
my $port_master=$ENV{PGPORT};
my $port_standby=$port_master + 1;
my $log_path;
my $tempdir_short;
$connstr_master="port=$port_master";
$connstr_standby="port=$port_standby";
$ENV{PGDATABASE} = "postgres";
sub master_psql
{
my $cmd = shift;
system_or_bail("psql -q --no-psqlrc -d $connstr_master -c \"$cmd\"");
}
sub standby_psql
{
my $cmd = shift;
system_or_bail("psql -q --no-psqlrc -d $connstr_standby -c \"$cmd\"");
}
# Run a query against the master, and check that the output matches what's
# expected
sub check_query
{
my ($query, $expected_stdout, $test_name) = @_;
my ($stdout, $stderr);
# we want just the output, no formatting
my $result = run ['psql', '-q', '-A', '-t', '--no-psqlrc',
'-d', $connstr_master,
'-c' , $query],
'>', \$stdout, '2>', \$stderr;
# We don't use ok() for the exit code and stderr, because we want this
# check to be just a single test.
if (!$result) {
fail ("$test_name: psql exit code");
} elsif ($stderr ne '') {
diag $stderr;
fail ("$test_name: psql no stderr");
} else {
is ($stdout, $expected_stdout, "$test_name: query result matches");
}
}
sub append_to_file
{
my($filename, $str) = @_;
open my $fh, ">>", $filename or die "could not open file $filename";
print $fh $str;
close $fh;
}
sub init_rewind_test
{
($testname, $test_mode) = @_;
$log_path="regress_log/pg_rewind_log_${testname}_${test_mode}";
remove_tree $log_path;
}
sub setup_cluster
{
$tempdir_short = tempdir_short;
# Initialize master, data checksums are mandatory
remove_tree($test_master_datadir);
standard_initdb($test_master_datadir);
# Custom parameters for master's postgresql.conf
append_to_file("$test_master_datadir/postgresql.conf", qq(
wal_level = hot_standby
max_wal_senders = 2
wal_keep_segments = 20
max_wal_size = 200MB
shared_buffers = 1MB
wal_log_hints = on
hot_standby = on
autovacuum = off
max_connections = 10
));
# Accept replication connections on master
append_to_file("$test_master_datadir/pg_hba.conf", qq(
local replication all trust
));
system_or_bail("pg_ctl -w -D $test_master_datadir -o \"-k $tempdir_short --listen-addresses='' -p $port_master\" start >>$log_path 2>&1");
#### Now run the test-specific parts to initialize the master before setting
# up standby
$ENV{PGHOST} = $tempdir_short;
}
sub create_standby
{
# Set up standby with necessary parameter
remove_tree $test_standby_datadir;
# Base backup is taken with xlog files included
system_or_bail("pg_basebackup -D $test_standby_datadir -p $port_master -x >>$log_path 2>&1");
append_to_file("$test_standby_datadir/recovery.conf", qq(
primary_conninfo='$connstr_master'
standby_mode=on
recovery_target_timeline='latest'
));
# Start standby
system_or_bail("pg_ctl -w -D $test_standby_datadir -o \"-k $tempdir_short --listen-addresses='' -p $port_standby\" start >>$log_path 2>&1");
# sleep a bit to make sure the standby has caught up.
sleep 1;
}
sub promote_standby
{
#### Now run the test-specific parts to run after standby has been started
# up standby
# Now promote slave and insert some new data on master, this will put
# the master out-of-sync with the standby.
system_or_bail("pg_ctl -w -D $test_standby_datadir promote >>$log_path 2>&1");
sleep 1;
}
sub run_pg_rewind
{
# Stop the master and be ready to perform the rewind
system_or_bail("pg_ctl -w -D $test_master_datadir stop -m fast >>$log_path 2>&1");
# At this point, the rewind processing is ready to run.
# We now have a very simple scenario with a few diverged WAL record.
# The real testing begins really now with a bifurcation of the possible
# scenarios that pg_rewind supports.
# Keep a temporary postgresql.conf for master node or it would be
# overwritten during the rewind.
copy("$test_master_datadir/postgresql.conf", "$testroot/master-postgresql.conf.tmp");
# Now run pg_rewind
if ($test_mode == "local")
{
# Do rewind using a local pgdata as source
# Stop the master and be ready to perform the rewind
system_or_bail("pg_ctl -w -D $test_standby_datadir stop -m fast >>$log_path 2>&1");
my $result =
run(['./pg_rewind',
"--debug",
"--source-pgdata=$test_standby_datadir",
"--target-pgdata=$test_master_datadir"],
'>>', $log_path, '2>&1');
ok ($result, 'pg_rewind local');
}
elsif ($test_mode == "remote")
{
# Do rewind using a remote connection as source
my $result =
run(['./pg_rewind',
"--source-server=\"port=$port_standby dbname=postgres\"",
"--target-pgdata=$test_master_datadir"],
'>>', $log_path, '2>&1');
ok ($result, 'pg_rewind remote');
} else {
# Cannot come here normally
die("Incorrect test mode specified");
}
# Now move back postgresql.conf with old settings
move("$testroot/master-postgresql.conf.tmp", "$test_master_datadir/postgresql.conf");
# Plug-in rewound node to the now-promoted standby node
append_to_file("$test_master_datadir/recovery.conf", qq(
primary_conninfo='port=$port_standby'
standby_mode=on
recovery_target_timeline='latest'
));
# Restart the master to check that rewind went correctly
system_or_bail("pg_ctl -w -D $test_master_datadir -o \"-k $tempdir_short --listen-addresses='' -p $port_master\" start >>$log_path 2>&1");
#### Now run the test-specific parts to check the result
}
# Clean up after the test. Stop both servers, if they're still running.
END
{
my $save_rc = $?;
if ($test_master_datadir)
{
system "pg_ctl -D $test_master_datadir -s -m immediate stop 2> /dev/null";
}
if ($test_standby_datadir)
{
system "pg_ctl -D $test_standby_datadir -s -m immediate stop 2> /dev/null";
}
$? = $save_rc;
}

View File

@ -0,0 +1,261 @@
/*-------------------------------------------------------------------------
*
* copy_fetch.c
* Functions for using a data directory as the source.
*
* Portions Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <dirent.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include "datapagemap.h"
#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "logging.h"
#include "pg_rewind.h"
#include "catalog/catalog.h"
static void recurse_dir(const char *datadir, const char *path,
process_file_callback_t callback);
static void execute_pagemap(datapagemap_t *pagemap, const char *path);
/*
* Traverse through all files in a data directory, calling 'callback'
* for each file.
*/
void
traverse_datadir(const char *datadir, process_file_callback_t callback)
{
recurse_dir(datadir, NULL, callback);
}
/*
* recursive part of traverse_datadir
*/
static void
recurse_dir(const char *datadir, const char *parentpath,
process_file_callback_t callback)
{
DIR *xldir;
struct dirent *xlde;
char fullparentpath[MAXPGPATH];
if (parentpath)
snprintf(fullparentpath, MAXPGPATH, "%s/%s", datadir, parentpath);
else
snprintf(fullparentpath, MAXPGPATH, "%s", datadir);
xldir = opendir(fullparentpath);
if (xldir == NULL)
pg_fatal("could not open directory \"%s\": %s\n",
fullparentpath, strerror(errno));
while (errno = 0, (xlde = readdir(xldir)) != NULL)
{
struct stat fst;
char fullpath[MAXPGPATH];
char path[MAXPGPATH];
if (strcmp(xlde->d_name, ".") == 0 ||
strcmp(xlde->d_name, "..") == 0)
continue;
snprintf(fullpath, MAXPGPATH, "%s/%s", fullparentpath, xlde->d_name);
if (lstat(fullpath, &fst) < 0)
{
pg_log(PG_WARNING, "could not stat file \"%s\": %s",
fullpath, strerror(errno));
/*
* This is ok, if the new master is running and the file was just
* removed. If it was a data file, there should be a WAL record of
* the removal. If it was something else, it couldn't have been
* critical anyway.
*
* TODO: But complain if we're processing the target dir!
*/
}
if (parentpath)
snprintf(path, MAXPGPATH, "%s/%s", parentpath, xlde->d_name);
else
snprintf(path, MAXPGPATH, "%s", xlde->d_name);
if (S_ISREG(fst.st_mode))
callback(path, FILE_TYPE_REGULAR, fst.st_size, NULL);
else if (S_ISDIR(fst.st_mode))
{
callback(path, FILE_TYPE_DIRECTORY, 0, NULL);
/* recurse to handle subdirectories */
recurse_dir(datadir, path, callback);
}
#ifndef WIN32
else if (S_ISLNK(fst.st_mode))
#else
else if (pgwin32_is_junction(fullpath))
#endif
{
#if defined(HAVE_READLINK) || defined(WIN32)
char link_target[MAXPGPATH];
ssize_t len;
len = readlink(fullpath, link_target, sizeof(link_target) - 1);
if (len == -1)
pg_fatal("readlink() failed on \"%s\": %s\n",
fullpath, strerror(errno));
if (len == sizeof(link_target) - 1)
{
/* path was truncated */
pg_fatal("symbolic link \"%s\" target path too long\n",
fullpath);
}
callback(path, FILE_TYPE_SYMLINK, 0, link_target);
/*
* If it's a symlink within pg_tblspc, we need to recurse into it,
* to process all the tablespaces.
*/
if (strcmp(parentpath, "pg_tblspc") == 0)
recurse_dir(datadir, path, callback);
#else
pg_fatal("\"%s\" is a symbolic link, but symbolic links are not supported on this platform\n",
fullpath);
#endif /* HAVE_READLINK */
}
}
if (errno)
pg_fatal("could not read directory \"%s\": %s\n",
fullparentpath, strerror(errno));
if (closedir(xldir))
pg_fatal("could not close archive location \"%s\": %s\n",
fullparentpath, strerror(errno));
}
/*
* Copy a file from source to target, between 'begin' and 'end' offsets.
*
* If 'trunc' is true, any existing file with the same name is truncated.
*/
static void
copy_file_range(const char *path, off_t begin, off_t end, bool trunc)
{
char buf[BLCKSZ];
char srcpath[MAXPGPATH];
int srcfd;
snprintf(srcpath, sizeof(srcpath), "%s/%s", datadir_source, path);
srcfd = open(srcpath, O_RDONLY | PG_BINARY, 0);
if (srcfd < 0)
pg_fatal("could not open source file \"%s\": %s\n",
srcpath, strerror(errno));
if (lseek(srcfd, begin, SEEK_SET) == -1)
pg_fatal("could not seek in source file: %s\n", strerror(errno));
open_target_file(path, trunc);
while (end - begin > 0)
{
int readlen;
int len;
if (end - begin > sizeof(buf))
len = sizeof(buf);
else
len = end - begin;
readlen = read(srcfd, buf, len);
if (readlen < 0)
pg_fatal("could not read file \"%s\": %s\n",
srcpath, strerror(errno));
else if (readlen == 0)
pg_fatal("unexpected EOF while reading file \"%s\"\n", srcpath);
write_target_range(buf, begin, readlen);
begin += readlen;
}
if (close(srcfd) != 0)
pg_fatal("error closing file \"%s\": %s\n", srcpath, strerror(errno));
}
/*
* Copy all relation data files from datadir_source to datadir_target, which
* are marked in the given data page map.
*/
void
copy_executeFileMap(filemap_t *map)
{
file_entry_t *entry;
int i;
for (i = 0; i < map->narray; i++)
{
entry = map->array[i];
execute_pagemap(&entry->pagemap, entry->path);
switch (entry->action)
{
case FILE_ACTION_NONE:
/* ok, do nothing.. */
break;
case FILE_ACTION_COPY:
copy_file_range(entry->path, 0, entry->newsize, true);
break;
case FILE_ACTION_TRUNCATE:
truncate_target_file(entry->path, entry->newsize);
break;
case FILE_ACTION_COPY_TAIL:
copy_file_range(entry->path, entry->oldsize, entry->newsize, false);
break;
case FILE_ACTION_CREATE:
create_target(entry);
break;
case FILE_ACTION_REMOVE:
remove_target(entry);
break;
}
}
close_target_file();
}
static void
execute_pagemap(datapagemap_t *pagemap, const char *path)
{
datapagemap_iterator_t *iter;
BlockNumber blkno;
off_t offset;
iter = datapagemap_iterate(pagemap);
while (datapagemap_next(iter, &blkno))
{
offset = blkno * BLCKSZ;
copy_file_range(path, offset, offset + BLCKSZ, false);
/* Ok, this block has now been copied from new data dir to old */
}
free(iter);
}

View File

@ -0,0 +1,126 @@
/*-------------------------------------------------------------------------
*
* datapagemap.c
* A data structure for keeping track of data pages that have changed.
*
* This is a fairly simple bitmap.
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include "datapagemap.h"
struct datapagemap_iterator
{
datapagemap_t *map;
BlockNumber nextblkno;
};
/*****
* Public functions
*/
/*
* Add a block to the bitmap.
*/
void
datapagemap_add(datapagemap_t *map, BlockNumber blkno)
{
int offset;
int bitno;
offset = blkno / 8;
bitno = blkno % 8;
/* enlarge or create bitmap if needed */
if (map->bitmapsize <= offset)
{
int oldsize = map->bitmapsize;
int newsize;
/*
* The minimum to hold the new bit is offset + 1. But add some
* headroom, so that we don't need to repeatedly enlarge the bitmap in
* the common case that blocks are modified in order, from beginning
* of a relation to the end.
*/
newsize = offset + 1;
newsize += 10;
map->bitmap = pg_realloc(map->bitmap, newsize);
/* zero out the newly allocated region */
memset(&map->bitmap[oldsize], 0, newsize - oldsize);
map->bitmapsize = newsize;
}
/* Set the bit */
map->bitmap[offset] |= (1 << bitno);
}
/*
* Start iterating through all entries in the page map.
*
* After datapagemap_iterate, call datapagemap_next to return the entries,
* until it returns NULL. After you're done, use free() to destroy the
* iterator.
*/
datapagemap_iterator_t *
datapagemap_iterate(datapagemap_t *map)
{
datapagemap_iterator_t *iter;
iter = pg_malloc(sizeof(datapagemap_iterator_t));
iter->map = map;
iter->nextblkno = 0;
return iter;
}
bool
datapagemap_next(datapagemap_iterator_t *iter, BlockNumber *blkno)
{
datapagemap_t *map = iter->map;
for (;;)
{
BlockNumber blk = iter->nextblkno;
int nextoff = blk / 8;
int bitno = blk % 8;
if (nextoff >= map->bitmapsize)
break;
iter->nextblkno++;
if (map->bitmap[nextoff] & (1 << bitno))
{
*blkno = blk;
return true;
}
}
/* no more set bits in this bitmap. */
return false;
}
/*
* A debugging aid. Prints out the contents of the page map.
*/
void
datapagemap_print(datapagemap_t *map)
{
datapagemap_iterator_t *iter;
BlockNumber blocknum;
iter = datapagemap_iterate(map);
while (datapagemap_next(iter, &blocknum))
printf(" blk %u\n", blocknum);
free(iter);
}

View File

@ -0,0 +1,32 @@
/*-------------------------------------------------------------------------
*
* datapagemap.h
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef DATAPAGEMAP_H
#define DATAPAGEMAP_H
#include "storage/relfilenode.h"
#include "storage/block.h"
struct datapagemap
{
char *bitmap;
int bitmapsize;
};
typedef struct datapagemap datapagemap_t;
typedef struct datapagemap_iterator datapagemap_iterator_t;
extern datapagemap_t *datapagemap_create(void);
extern void datapagemap_destroy(datapagemap_t *map);
extern void datapagemap_add(datapagemap_t *map, BlockNumber blkno);
extern datapagemap_iterator_t *datapagemap_iterate(datapagemap_t *map);
extern bool datapagemap_next(datapagemap_iterator_t *iter, BlockNumber *blkno);
extern void datapagemap_print(datapagemap_t *map);
#endif /* DATAPAGEMAP_H */

61
src/bin/pg_rewind/fetch.c Normal file
View File

@ -0,0 +1,61 @@
/*-------------------------------------------------------------------------
*
* fetch.c
* Functions for fetching files from a local or remote data dir
*
* This file forms an abstraction of getting files from the "source".
* There are two implementations of this interface: one for copying files
* from a data directory via normal filesystem operations (copy_fetch.c),
* and another for fetching files from a remote server via a libpq
* connection (libpq_fetch.c)
*
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "pg_rewind.h"
#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
void
fetchRemoteFileList(void)
{
if (datadir_source)
traverse_datadir(datadir_source, &process_remote_file);
else
libpqProcessFileList();
}
/*
* Fetch all relation data files that are marked in the given data page map.
*/
void
executeFileMap(void)
{
if (datadir_source)
copy_executeFileMap(filemap);
else
libpq_executeFileMap(filemap);
}
/*
* Fetch a single file into a malloc'd buffer. The file size is returned
* in *filesize. The returned buffer is always zero-terminated, which is
* handy for text files.
*/
char *
fetchFile(char *filename, size_t *filesize)
{
if (datadir_source)
return slurpFile(datadir_source, filename, filesize);
else
return libpqGetFile(filename, filesize);
}

46
src/bin/pg_rewind/fetch.h Normal file
View File

@ -0,0 +1,46 @@
/*-------------------------------------------------------------------------
*
* fetch.h
* Fetching data from a local or remote data directory.
*
* This file includes the prototypes for functions used to copy files from
* one data directory to another. The source to copy from can be a local
* directory (copy method), or a remote PostgreSQL server (libpq fetch
* method).
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef FETCH_H
#define FETCH_H
#include "c.h"
#include "access/xlogdefs.h"
#include "filemap.h"
/*
* Common interface. Calls the copy or libpq method depending on global
* config options.
*/
extern void fetchRemoteFileList(void);
extern char *fetchFile(char *filename, size_t *filesize);
extern void executeFileMap(void);
/* in libpq_fetch.c */
extern void libpqProcessFileList(void);
extern char *libpqGetFile(const char *filename, size_t *filesize);
extern void libpq_executeFileMap(filemap_t *map);
extern void libpqConnect(const char *connstr);
extern XLogRecPtr libpqGetCurrentXlogInsertLocation(void);
/* in copy_fetch.c */
extern void copy_executeFileMap(filemap_t *map);
typedef void (*process_file_callback_t) (const char *path, file_type_t type, size_t size, const char *link_target);
extern void traverse_datadir(const char *datadir, process_file_callback_t callback);
#endif /* FETCH_H */

View File

@ -0,0 +1,305 @@
/*-------------------------------------------------------------------------
*
* file_ops.c
* Helper functions for operating on files.
*
* Most of the functions in this file are helper functions for writing to
* the target data directory. The functions check the --dry-run flag, and
* do nothing if it's enabled. You should avoid accessing the target files
* directly but if you do, make sure you honor the --dry-run mode!
*
* Portions Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "file_ops.h"
#include "filemap.h"
#include "logging.h"
#include "pg_rewind.h"
/*
* Currently open destination file.
*/
static int dstfd = -1;
static char dstpath[MAXPGPATH] = "";
static void remove_target_file(const char *path);
static void create_target_dir(const char *path);
static void remove_target_dir(const char *path);
static void create_target_symlink(const char *path, const char *link);
static void remove_target_symlink(const char *path);
/*
* Open a target file for writing. If 'trunc' is true and the file already
* exists, it will be truncated.
*/
void
open_target_file(const char *path, bool trunc)
{
int mode;
if (dry_run)
return;
if (dstfd != -1 && !trunc &&
strcmp(path, &dstpath[strlen(datadir_target) + 1]) == 0)
return; /* already open */
close_target_file();
snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
mode = O_WRONLY | O_CREAT | PG_BINARY;
if (trunc)
mode |= O_TRUNC;
dstfd = open(dstpath, mode, 0600);
if (dstfd < 0)
pg_fatal("could not open destination file \"%s\": %s\n",
dstpath, strerror(errno));
}
/*
* Close target file, if it's open.
*/
void
close_target_file(void)
{
if (dstfd == -1)
return;
if (close(dstfd) != 0)
pg_fatal("error closing destination file \"%s\": %s\n",
dstpath, strerror(errno));
dstfd = -1;
/* fsync? */
}
void
write_target_range(char *buf, off_t begin, size_t size)
{
int writeleft;
char *p;
/* update progress report */
fetch_done += size;
progress_report(false);
if (dry_run)
return;
if (lseek(dstfd, begin, SEEK_SET) == -1)
pg_fatal("could not seek in destination file \"%s\": %s\n",
dstpath, strerror(errno));
writeleft = size;
p = buf;
while (writeleft > 0)
{
int writelen;
writelen = write(dstfd, p, writeleft);
if (writelen < 0)
pg_fatal("could not write file \"%s\": %s\n",
dstpath, strerror(errno));
p += writelen;
writeleft -= writelen;
}
/* keep the file open, in case we need to copy more blocks in it */
}
void
remove_target(file_entry_t *entry)
{
Assert(entry->action == FILE_ACTION_REMOVE);
switch (entry->type)
{
case FILE_TYPE_DIRECTORY:
remove_target_dir(entry->path);
break;
case FILE_TYPE_REGULAR:
remove_target_file(entry->path);
break;
case FILE_TYPE_SYMLINK:
remove_target_symlink(entry->path);
break;
}
}
void
create_target(file_entry_t *entry)
{
Assert(entry->action == FILE_ACTION_CREATE);
switch (entry->type)
{
case FILE_TYPE_DIRECTORY:
create_target_dir(entry->path);
break;
case FILE_TYPE_SYMLINK:
create_target_symlink(entry->path, entry->link_target);
break;
case FILE_TYPE_REGULAR:
/* can't happen. Regular files are created with open_target_file. */
pg_fatal("invalid action (CREATE) for regular file\n");
break;
}
}
static void
remove_target_file(const char *path)
{
char dstpath[MAXPGPATH];
if (dry_run)
return;
snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
if (unlink(dstpath) != 0)
pg_fatal("could not remove file \"%s\": %s\n",
dstpath, strerror(errno));
}
void
truncate_target_file(const char *path, off_t newsize)
{
char dstpath[MAXPGPATH];
int fd;
if (dry_run)
return;
snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
fd = open(dstpath, O_WRONLY, 0);
if (fd < 0)
pg_fatal("could not open file \"%s\" for truncation: %s\n",
dstpath, strerror(errno));
if (ftruncate(fd, newsize) != 0)
pg_fatal("could not truncate file \"%s\" to %u bytes: %s\n",
dstpath, (unsigned int) newsize, strerror(errno));
close(fd);
}
static void
create_target_dir(const char *path)
{
char dstpath[MAXPGPATH];
if (dry_run)
return;
snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
if (mkdir(dstpath, S_IRWXU) != 0)
pg_fatal("could not create directory \"%s\": %s\n",
dstpath, strerror(errno));
}
static void
remove_target_dir(const char *path)
{
char dstpath[MAXPGPATH];
if (dry_run)
return;
snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
if (rmdir(dstpath) != 0)
pg_fatal("could not remove directory \"%s\": %s\n",
dstpath, strerror(errno));
}
static void
create_target_symlink(const char *path, const char *link)
{
char dstpath[MAXPGPATH];
if (dry_run)
return;
snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
if (symlink(link, dstpath) != 0)
pg_fatal("could not create symbolic link at \"%s\": %s\n",
dstpath, strerror(errno));
}
static void
remove_target_symlink(const char *path)
{
char dstpath[MAXPGPATH];
if (dry_run)
return;
snprintf(dstpath, sizeof(dstpath), "%s/%s", datadir_target, path);
if (unlink(dstpath) != 0)
pg_fatal("could not remove symbolic link \"%s\": %s\n",
dstpath, strerror(errno));
}
/*
* Read a file into memory. The file to be read is <datadir>/<path>.
* The file contents are returned in a malloc'd buffer, and *filesize
* is set to the length of the file.
*
* The returned buffer is always zero-terminated; the size of the returned
* buffer is actually *filesize + 1. That's handy when reading a text file.
* This function can be used to read binary files as well, you can just
* ignore the zero-terminator in that case.
*
* This function is used to implement the fetchFile function in the "fetch"
* interface (see fetch.c), but is also called directly.
*/
char *
slurpFile(const char *datadir, const char *path, size_t *filesize)
{
int fd;
char *buffer;
struct stat statbuf;
char fullpath[MAXPGPATH];
int len;
snprintf(fullpath, sizeof(fullpath), "%s/%s", datadir, path);
if ((fd = open(fullpath, O_RDONLY | PG_BINARY, 0)) == -1)
pg_fatal("could not open file \"%s\" for reading: %s\n",
fullpath, strerror(errno));
if (fstat(fd, &statbuf) < 0)
pg_fatal("could not open file \"%s\" for reading: %s\n",
fullpath, strerror(errno));
len = statbuf.st_size;
buffer = pg_malloc(len + 1);
if (read(fd, buffer, len) != len)
pg_fatal("could not read file \"%s\": %s\n",
fullpath, strerror(errno));
close(fd);
/* Zero-terminate the buffer. */
buffer[len] = '\0';
if (filesize)
*filesize = len;
return buffer;
}

View File

@ -0,0 +1,24 @@
/*-------------------------------------------------------------------------
*
* file_ops.h
* Helper functions for operating on files
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef FILE_OPS_H
#define FILE_OPS_H
#include "filemap.h"
extern void open_target_file(const char *path, bool trunc);
extern void write_target_range(char *buf, off_t begin, size_t size);
extern void close_target_file(void);
extern void truncate_target_file(const char *path, off_t newsize);
extern void create_target(file_entry_t *t);
extern void remove_target(file_entry_t *t);
extern char *slurpFile(const char *datadir, const char *path, size_t *filesize);
#endif /* FILE_OPS_H */

667
src/bin/pg_rewind/filemap.c Normal file
View File

@ -0,0 +1,667 @@
/*-------------------------------------------------------------------------
*
* filemap.c
* A data structure for keeping track of files that have changed.
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include "datapagemap.h"
#include "filemap.h"
#include "logging.h"
#include "pg_rewind.h"
#include "common/string.h"
#include "catalog/pg_tablespace.h"
#include "storage/fd.h"
filemap_t *filemap = NULL;
static bool isRelDataFile(const char *path);
static char *datasegpath(RelFileNode rnode, ForkNumber forknum,
BlockNumber segno);
static int path_cmp(const void *a, const void *b);
static int final_filemap_cmp(const void *a, const void *b);
static void filemap_list_to_array(void);
/*
* Create a new file map.
*/
filemap_t *
filemap_create(void)
{
filemap_t *map;
map = pg_malloc(sizeof(filemap_t));
map->first = map->last = NULL;
map->nlist = 0;
map->array = NULL;
map->narray = 0;
Assert(filemap == NULL);
filemap = map;
return map;
}
/*
* Callback for processing remote file list.
*
* This is called once for every file in the source server. We decide what
* action needs to be taken for the file, depending on whether the file
* exists in the target and whether the size matches.
*/
void
process_remote_file(const char *path, file_type_t type, size_t newsize,
const char *link_target)
{
bool exists;
char localpath[MAXPGPATH];
struct stat statbuf;
filemap_t *map = filemap;
file_action_t action = FILE_ACTION_NONE;
size_t oldsize = 0;
file_entry_t *entry;
Assert(map->array == NULL);
/*
* Completely ignore some special files in source and destination.
*/
if (strcmp(path, "postmaster.pid") == 0 ||
strcmp(path, "postmaster.opts") == 0)
return;
/*
* Skip temporary files, .../pgsql_tmp/... and .../pgsql_tmp.* in source.
* This has the effect that all temporary files in the destination will be
* removed.
*/
if (strstr(path, "/" PG_TEMP_FILE_PREFIX) != NULL)
return;
if (strstr(path, "/" PG_TEMP_FILES_DIR "/") != NULL)
return;
/*
* sanity check: a filename that looks like a data file better be a
* regular file
*/
if (type != FILE_TYPE_REGULAR && isRelDataFile(path))
pg_fatal("data file in source \"%s\" is not a regular file\n", path);
snprintf(localpath, sizeof(localpath), "%s/%s", datadir_target, path);
/* Does the corresponding local file exist? */
if (lstat(localpath, &statbuf) < 0)
{
if (errno != ENOENT)
pg_fatal("could not stat file \"%s\": %s\n",
localpath, strerror(errno));
exists = false;
}
else
exists = true;
switch (type)
{
case FILE_TYPE_DIRECTORY:
if (exists && !S_ISDIR(statbuf.st_mode))
{
/* it's a directory in target, but not in source. Strange.. */
pg_fatal("\"%s\" is not a directory\n", localpath);
}
if (!exists)
action = FILE_ACTION_CREATE;
else
action = FILE_ACTION_NONE;
oldsize = 0;
break;
case FILE_TYPE_SYMLINK:
if (exists &&
#ifndef WIN32
!S_ISLNK(statbuf.st_mode)
#else
!pgwin32_is_junction(localpath)
#endif
)
{
/*
* It's a symbolic link in target, but not in source.
* Strange..
*/
pg_fatal("\"%s\" is not a symbolic link\n", localpath);
}
if (!exists)
action = FILE_ACTION_CREATE;
else
action = FILE_ACTION_NONE;
oldsize = 0;
break;
case FILE_TYPE_REGULAR:
if (exists && !S_ISREG(statbuf.st_mode))
pg_fatal("\"%s\" is not a regular file\n", localpath);
if (!exists || !isRelDataFile(path))
{
/*
* File exists in source, but not in target. Or it's a
* non-data file that we have no special processing for. Copy
* it in toto.
*
* An exception: PG_VERSIONs should be identical, but avoid
* overwriting it for paranoia.
*/
if (pg_str_endswith(path, "PG_VERSION"))
{
action = FILE_ACTION_NONE;
oldsize = statbuf.st_size;
}
else
{
action = FILE_ACTION_COPY;
oldsize = 0;
}
}
else
{
/*
* It's a data file that exists in both.
*
* If it's larger in target, we can truncate it. There will
* also be a WAL record of the truncation in the source
* system, so WAL replay would eventually truncate the target
* too, but we might as well do it now.
*
* If it's smaller in the target, it means that it has been
* truncated in the target, or enlarged in the source, or
* both. If it was truncated locally, we need to copy the
* missing tail from the remote system. If it was enlarged in
* the remote system, there will be WAL records in the remote
* system for the new blocks, so we wouldn't need to copy them
* here. But we don't know which scenario we're dealing with,
* and there's no harm in copying the missing blocks now, so
* do it now.
*
* If it's the same size, do nothing here. Any locally
* modified blocks will be copied based on parsing the local
* WAL, and any remotely modified blocks will be updated after
* rewinding, when the remote WAL is replayed.
*/
oldsize = statbuf.st_size;
if (oldsize < newsize)
action = FILE_ACTION_COPY_TAIL;
else if (oldsize > newsize)
action = FILE_ACTION_TRUNCATE;
else
action = FILE_ACTION_NONE;
}
break;
}
/* Create a new entry for this file */
entry = pg_malloc(sizeof(file_entry_t));
entry->path = pg_strdup(path);
entry->type = type;
entry->action = action;
entry->oldsize = oldsize;
entry->newsize = newsize;
entry->link_target = link_target ? pg_strdup(link_target) : NULL;
entry->next = NULL;
entry->pagemap.bitmap = NULL;
entry->pagemap.bitmapsize = 0;
entry->isrelfile = isRelDataFile(path);
if (map->last)
{
map->last->next = entry;
map->last = entry;
}
else
map->first = map->last = entry;
map->nlist++;
}
/*
* Callback for processing local file list.
*
* All remote files must be already processed before calling this. This only
* marks local files that didn't exist in the remote system for deletion.
*/
void
process_local_file(const char *path, file_type_t type, size_t oldsize,
const char *link_target)
{
bool exists;
char localpath[MAXPGPATH];
struct stat statbuf;
file_entry_t key;
file_entry_t *key_ptr;
filemap_t *map = filemap;
file_entry_t *entry;
snprintf(localpath, sizeof(localpath), "%s/%s", datadir_target, path);
if (lstat(localpath, &statbuf) < 0)
{
if (errno != ENOENT)
pg_fatal("could not stat file \"%s\": %s",
localpath, strerror(errno));
exists = false;
}
if (map->array == NULL)
{
/* on first call, initialize lookup array */
if (map->nlist == 0)
{
/* should not happen */
pg_fatal("remote file list is empty\n");
}
filemap_list_to_array();
qsort(map->array, map->narray, sizeof(file_entry_t *), path_cmp);
}
/*
* Completely ignore some special files
*/
if (strcmp(path, "postmaster.pid") == 0 ||
strcmp(path, "postmaster.opts") == 0)
return;
key.path = (char *) path;
key_ptr = &key;
exists = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
path_cmp) != NULL;
/* Remove any file or folder that doesn't exist in the remote system. */
if (!exists)
{
entry = pg_malloc(sizeof(file_entry_t));
entry->path = pg_strdup(path);
entry->type = type;
entry->action = FILE_ACTION_REMOVE;
entry->oldsize = oldsize;
entry->newsize = 0;
entry->link_target = link_target ? pg_strdup(link_target) : NULL;
entry->next = NULL;
entry->pagemap.bitmap = NULL;
entry->pagemap.bitmapsize = 0;
entry->isrelfile = isRelDataFile(path);
if (map->last == NULL)
map->first = entry;
else
map->last->next = entry;
map->last = entry;
map->nlist++;
}
else
{
/*
* We already handled all files that exist in the remote system in
* process_remote_file().
*/
}
}
/*
* This callback gets called while we read the old WAL, for every block that
* have changed in the local system. It makes note of all the changed blocks
* in the pagemap of the file.
*/
void
process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
{
char *path;
file_entry_t key;
file_entry_t *key_ptr;
file_entry_t *entry;
BlockNumber blkno_inseg;
int segno;
filemap_t *map = filemap;
file_entry_t **e;
Assert(filemap->array);
segno = blkno / RELSEG_SIZE;
blkno_inseg = blkno % RELSEG_SIZE;
path = datasegpath(rnode, forknum, segno);
key.path = (char *) path;
key_ptr = &key;
e = bsearch(&key_ptr, map->array, map->narray, sizeof(file_entry_t *),
path_cmp);
if (e)
entry = *e;
else
entry = NULL;
free(path);
if (entry)
{
Assert(entry->isrelfile);
switch (entry->action)
{
case FILE_ACTION_NONE:
case FILE_ACTION_TRUNCATE:
/* skip if we're truncating away the modified block anyway */
if ((blkno_inseg + 1) * BLCKSZ <= entry->newsize)
datapagemap_add(&entry->pagemap, blkno_inseg);
break;
case FILE_ACTION_COPY_TAIL:
/*
* skip the modified block if it is part of the "tail" that
* we're copying anyway.
*/
if ((blkno_inseg + 1) * BLCKSZ <= entry->oldsize)
datapagemap_add(&entry->pagemap, blkno_inseg);
break;
case FILE_ACTION_COPY:
case FILE_ACTION_REMOVE:
break;
case FILE_ACTION_CREATE:
pg_fatal("unexpected page modification for directory or symbolic link \"%s\"\n", entry->path);
}
}
else
{
/*
* If we don't have any record of this file in the file map, it means
* that it's a relation that doesn't exist in the remote system, and
* it was subsequently removed in the local system, too. We can safely
* ignore it.
*/
}
}
/*
* Convert the linked list of entries in filemap->first/last to the array,
* filemap->array.
*/
static void
filemap_list_to_array(void)
{
int narray;
file_entry_t *entry,
*next;
filemap->array =
pg_realloc(filemap->array,
(filemap->nlist + filemap->narray) * sizeof(file_entry_t));
narray = filemap->narray;
for (entry = filemap->first; entry != NULL; entry = next)
{
filemap->array[narray++] = entry;
next = entry->next;
entry->next = NULL;
}
Assert(narray == filemap->nlist + filemap->narray);
filemap->narray = narray;
filemap->nlist = 0;
filemap->first = filemap->last = NULL;
}
void
filemap_finalize(void)
{
filemap_list_to_array();
qsort(filemap->array, filemap->narray, sizeof(file_entry_t *),
final_filemap_cmp);
}
static const char *
action_to_str(file_action_t action)
{
switch (action)
{
case FILE_ACTION_NONE:
return "NONE";
case FILE_ACTION_COPY:
return "COPY";
case FILE_ACTION_TRUNCATE:
return "TRUNCATE";
case FILE_ACTION_COPY_TAIL:
return "COPY_TAIL";
case FILE_ACTION_CREATE:
return "CREATE";
case FILE_ACTION_REMOVE:
return "REMOVE";
default:
return "unknown";
}
}
/*
* Calculate the totals needed for progress reports.
*/
void
calculate_totals(void)
{
file_entry_t *entry;
int i;
filemap_t *map = filemap;
map->total_size = 0;
map->fetch_size = 0;
for (i = 0; i < filemap->narray; i++)
{
entry = filemap->array[i];
if (entry->type != FILE_TYPE_REGULAR)
continue;
map->total_size += entry->newsize;
if (entry->action == FILE_ACTION_COPY)
{
map->fetch_size += entry->newsize;
continue;
}
if (entry->action == FILE_ACTION_COPY_TAIL)
map->fetch_size += (entry->newsize - entry->oldsize);
if (entry->pagemap.bitmapsize > 0)
{
datapagemap_iterator_t *iter;
BlockNumber blk;
iter = datapagemap_iterate(&entry->pagemap);
while (datapagemap_next(iter, &blk))
map->fetch_size += BLCKSZ;
pg_free(iter);
}
}
}
void
print_filemap(void)
{
file_entry_t *entry;
int i;
for (i = 0; i < filemap->narray; i++)
{
entry = filemap->array[i];
if (entry->action != FILE_ACTION_NONE ||
entry->pagemap.bitmapsize > 0)
{
printf("%s (%s)\n", entry->path, action_to_str(entry->action));
if (entry->pagemap.bitmapsize > 0)
datapagemap_print(&entry->pagemap);
}
}
fflush(stdout);
}
/*
* Does it look like a relation data file?
*
* For our purposes, only files belonging to the main fork are considered
* relation files. Other forks are alwayes copied in toto, because we cannot
* reliably track changes to them, because WAL only contains block references
* for the main fork.
*/
static bool
isRelDataFile(const char *path)
{
char buf[20 + 1];
RelFileNode rnode;
unsigned int segNo;
int nmatch;
bool matched;
/*----
* Relation data files can be in one of the following directories:
*
* global/
* shared relations
*
* base/<db oid>/
* regular relations, default tablespace
*
* pg_tblspc/<tblspc oid>/PG_9.4_201403261/
* within a non-default tablespace (the name of the directory
* depends on version)
*
* And the relation data files themselves have a filename like:
*
* <oid>.<segment number>
*
*----
*/
rnode.spcNode = InvalidOid;
rnode.dbNode = InvalidOid;
rnode.relNode = InvalidOid;
segNo = 0;
matched = false;
nmatch = sscanf(path, "global/%u.%u", &rnode.relNode, &segNo);
if (nmatch == 1 || nmatch == 2)
{
rnode.spcNode = GLOBALTABLESPACE_OID;
rnode.dbNode = 0;
matched = true;
}
else
{
nmatch = sscanf(path, "base/%u/%u.%u",
&rnode.dbNode, &rnode.relNode, &segNo);
if (nmatch == 2 || nmatch == 3)
{
rnode.spcNode = DEFAULTTABLESPACE_OID;
matched = true;
}
else
{
nmatch = sscanf(path, "pg_tblspc/%u/PG_%20s/%u/%u.%u",
&rnode.spcNode, buf, &rnode.dbNode, &rnode.relNode,
&segNo);
if (nmatch == 4 || nmatch == 5)
matched = true;
}
}
/*
* The sscanf tests above can match files that have extra characters at
* the end, and the last check can also match a path belonging to a
* different version (different TABLESPACE_VERSION_DIRECTORY). To make
* eliminate such cases, cross-check that GetRelationPath creates the
* exact same filename, when passed the RelFileNode information we
* extracted from the filename.
*/
if (matched)
{
char *check_path = datasegpath(rnode, MAIN_FORKNUM, segNo);
if (strcmp(check_path, path) != 0)
matched = false;
pfree(check_path);
}
return matched;
}
/*
* A helper function to create the path of a relation file and segment.
*
* The returned path is palloc'd
*/
static char *
datasegpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
{
char *path;
char *segpath;
path = relpathperm(rnode, forknum);
if (segno > 0)
{
segpath = psprintf("%s.%u", path, segno);
pfree(path);
return segpath;
}
else
return path;
}
static int
path_cmp(const void *a, const void *b)
{
file_entry_t *fa = *((file_entry_t **) a);
file_entry_t *fb = *((file_entry_t **) b);
return strcmp(fa->path, fb->path);
}
/*
* In the final stage, the filemap is sorted so that removals come last.
* From disk space usage point of view, it would be better to do removals
* first, but for now, safety first. If a whole directory is deleted, all
* files and subdirectories inside it need to removed first. On creation,
* parent directory needs to be created before files and directories inside
* it. To achieve that, the file_action_t enum is ordered so that we can
* just sort on that first. Furthermore, sort REMOVE entries in reverse
* path order, so that "foo/bar" subdirectory is removed before "foo".
*/
static int
final_filemap_cmp(const void *a, const void *b)
{
file_entry_t *fa = *((file_entry_t **) a);
file_entry_t *fb = *((file_entry_t **) b);
if (fa->action > fb->action)
return 1;
if (fa->action < fb->action)
return -1;
if (fa->action == FILE_ACTION_REMOVE)
return -strcmp(fa->path, fb->path);
else
return strcmp(fa->path, fb->path);
}

108
src/bin/pg_rewind/filemap.h Normal file
View File

@ -0,0 +1,108 @@
/*-------------------------------------------------------------------------
*
* filemap.h
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*-------------------------------------------------------------------------
*/
#ifndef FILEMAP_H
#define FILEMAP_H
#include "storage/relfilenode.h"
#include "storage/block.h"
#include "datapagemap.h"
/*
* For every file found in the local or remote system, we have a file entry
* which says what we are going to do with the file. For relation files,
* there is also a page map, marking pages in the file that were changed
* locally.
*
* The enum values are sorted in the order we want actions to be processed.
*/
typedef enum
{
FILE_ACTION_CREATE, /* create local directory or symbolic link */
FILE_ACTION_COPY, /* copy whole file, overwriting if exists */
FILE_ACTION_COPY_TAIL, /* copy tail from 'oldsize' to 'newsize' */
FILE_ACTION_NONE, /* no action (we might still copy modified blocks
* based on the parsed WAL) */
FILE_ACTION_TRUNCATE, /* truncate local file to 'newsize' bytes */
FILE_ACTION_REMOVE, /* remove local file / directory / symlink */
} file_action_t;
typedef enum
{
FILE_TYPE_REGULAR,
FILE_TYPE_DIRECTORY,
FILE_TYPE_SYMLINK
} file_type_t;
struct file_entry_t
{
char *path;
file_type_t type;
file_action_t action;
/* for a regular file */
size_t oldsize;
size_t newsize;
bool isrelfile; /* is it a relation data file? */
datapagemap_t pagemap;
/* for a symlink */
char *link_target;
struct file_entry_t *next;
};
typedef struct file_entry_t file_entry_t;
struct filemap_t
{
/*
* New entries are accumulated to a linked list, in process_remote_file
* and process_local_file.
*/
file_entry_t *first;
file_entry_t *last;
int nlist;
/*
* After processing all the remote files, the entries in the linked list
* are moved to this array. After processing local files, too, all the
* local entries are added to the array by filemap_finalize, and sorted
* in the final order. After filemap_finalize, all the entries are in
* the array, and the linked list is empty.
*/
file_entry_t **array;
int narray;
/*
* Summary information. total_size is the total size of the source cluster,
* and fetch_size is the number of bytes that needs to be copied.
*/
uint64 total_size;
uint64 fetch_size;
};
typedef struct filemap_t filemap_t;
extern filemap_t * filemap;
extern filemap_t *filemap_create(void);
extern void calculate_totals(void);
extern void print_filemap(void);
/* Functions for populating the filemap */
extern void process_remote_file(const char *path, file_type_t type, size_t newsize, const char *link_target);
extern void process_local_file(const char *path, file_type_t type, size_t newsize, const char *link_target);
extern void process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno);
extern void filemap_finalize(void);
#endif /* FILEMAP_H */

View File

@ -0,0 +1,464 @@
/*-------------------------------------------------------------------------
*
* libpq_fetch.c
* Functions for fetching files from a remote server.
*
* Copyright (c) 2013-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <dirent.h>
#include <fcntl.h>
#include <unistd.h>
/* for ntohl/htonl */
#include <netinet/in.h>
#include <arpa/inet.h>
#include "pg_rewind.h"
#include "datapagemap.h"
#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "logging.h"
#include "libpq-fe.h"
#include "catalog/catalog.h"
#include "catalog/pg_type.h"
static PGconn *conn = NULL;
/*
* Files are fetched max CHUNKSIZE bytes at a time.
*
* (This only applies to files that are copied in whole, or for truncated
* files where we copy the tail. Relation files, where we know the individual
* blocks that need to be fetched, are fetched in BLCKSZ chunks.)
*/
#define CHUNKSIZE 1000000
static void receiveFileChunks(const char *sql);
static void execute_pagemap(datapagemap_t *pagemap, const char *path);
static char *run_simple_query(const char *sql);
void
libpqConnect(const char *connstr)
{
char *str;
conn = PQconnectdb(connstr);
if (PQstatus(conn) == CONNECTION_BAD)
pg_fatal("could not connect to remote server: %s\n",
PQerrorMessage(conn));
pg_log(PG_PROGRESS, "connected to remote server\n");
/*
* Check that the server is not in hot standby mode. There is no
* fundamental reason that couldn't be made to work, but it doesn't
* currently because we use a temporary table. Better to check for it
* explicitly than error out, for a better error message.
*/
str = run_simple_query("SELECT pg_is_in_recovery()");
if (strcmp(str, "f") != 0)
pg_fatal("source server must not be in recovery mode\n");
pg_free(str);
/*
* Also check that full_page-writes are enabled. We can get torn pages if
* a page is modified while we read it with pg_read_binary_file(), and we
* rely on full page images to fix them.
*/
str = run_simple_query("SHOW full_page_writes");
if (strcmp(str, "on") != 0)
pg_fatal("full_page_writes must be enabled in the source server\n");
pg_free(str);
}
/*
* Runs a query that returns a single value.
*/
static char *
run_simple_query(const char *sql)
{
PGresult *res;
char *result;
res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("error running query (%s) in source server: %s\n",
sql, PQresultErrorMessage(res));
/* sanity check the result set */
if (PQnfields(res) != 1 || PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
pg_fatal("unexpected result set while running query\n");
result = pg_strdup(PQgetvalue(res, 0, 0));
PQclear(res);
return result;
}
/*
* Calls pg_current_xlog_insert_location() function
*/
XLogRecPtr
libpqGetCurrentXlogInsertLocation(void)
{
XLogRecPtr result;
uint32 hi;
uint32 lo;
char *val;
val = run_simple_query("SELECT pg_current_xlog_insert_location()");
if (sscanf(val, "%X/%X", &hi, &lo) != 2)
pg_fatal("unexpected result \"%s\" while fetching current XLOG insert location\n", val);
result = ((uint64) hi) << 32 | lo;
return result;
}
/*
* Get a list of all files in the data directory.
*/
void
libpqProcessFileList(void)
{
PGresult *res;
const char *sql;
int i;
/*
* Create a recursive directory listing of the whole data directory.
*
* The WITH RECURSIVE part does most of the work. The second part gets the
* targets of the symlinks in pg_tblspc directory.
*
* XXX: There is no backend function to get a symbolic link's target in
* general, so if the admin has put any custom symbolic links in the data
* directory, they won't be copied correctly.
*/
sql =
"WITH RECURSIVE files (path, filename, size, isdir) AS (\n"
" SELECT '' AS path, filename, size, isdir FROM\n"
" (SELECT pg_ls_dir('.') AS filename) AS fn,\n"
" pg_stat_file(fn.filename) AS this\n"
" UNION ALL\n"
" SELECT parent.path || parent.filename || '/' AS path,\n"
" fn, this.size, this.isdir\n"
" FROM files AS parent,\n"
" pg_ls_dir(parent.path || parent.filename) AS fn,\n"
" pg_stat_file(parent.path || parent.filename || '/' || fn) AS this\n"
" WHERE parent.isdir = 't'\n"
")\n"
"SELECT path || filename, size, isdir,\n"
" pg_tablespace_location(pg_tablespace.oid) AS link_target\n"
"FROM files\n"
"LEFT OUTER JOIN pg_tablespace ON files.path = 'pg_tblspc/'\n"
" AND oid::text = files.filename\n";
res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("unexpected result while fetching file list: %s\n",
PQresultErrorMessage(res));
/* sanity check the result set */
if (PQnfields(res) != 4)
pg_fatal("unexpected result set while fetching file list\n");
/* Read result to local variables */
for (i = 0; i < PQntuples(res); i++)
{
char *path = PQgetvalue(res, i, 0);
int filesize = atoi(PQgetvalue(res, i, 1));
bool isdir = (strcmp(PQgetvalue(res, i, 2), "t") == 0);
char *link_target = PQgetvalue(res, i, 3);
file_type_t type;
if (link_target[0])
type = FILE_TYPE_SYMLINK;
else if (isdir)
type = FILE_TYPE_DIRECTORY;
else
type = FILE_TYPE_REGULAR;
process_remote_file(path, type, filesize, link_target);
}
}
/*----
* Runs a query, which returns pieces of files from the remote source data
* directory, and overwrites the corresponding parts of target files with
* the received parts. The result set is expected to be of format:
*
* path text -- path in the data directory, e.g "base/1/123"
* begin int4 -- offset within the file
* chunk bytea -- file content
*----
*/
static void
receiveFileChunks(const char *sql)
{
PGresult *res;
if (PQsendQueryParams(conn, sql, 0, NULL, NULL, NULL, NULL, 1) != 1)
pg_fatal("could not send query: %s\n", PQerrorMessage(conn));
pg_log(PG_DEBUG, "getting file chunks");
if (PQsetSingleRowMode(conn) != 1)
pg_fatal("could not set libpq connection to single row mode\n");
while ((res = PQgetResult(conn)) != NULL)
{
char *filename;
int filenamelen;
int chunkoff;
int chunksize;
char *chunk;
switch (PQresultStatus(res))
{
case PGRES_SINGLE_TUPLE:
break;
case PGRES_TUPLES_OK:
continue; /* final zero-row result */
default:
pg_fatal("unexpected result while fetching remote files: %s\n",
PQresultErrorMessage(res));
}
/* sanity check the result set */
if (PQnfields(res) != 3 || PQntuples(res) != 1)
pg_fatal("unexpected result set size while fetching remote files\n");
if (PQftype(res, 0) != TEXTOID &&
PQftype(res, 1) != INT4OID &&
PQftype(res, 2) != BYTEAOID)
{
pg_fatal("unexpected data types in result set while fetching remote files: %u %u %u\n",
PQftype(res, 0), PQftype(res, 1), PQftype(res, 2));
}
if (PQfformat(res, 0) != 1 &&
PQfformat(res, 1) != 1 &&
PQfformat(res, 2) != 1)
{
pg_fatal("unexpected result format while fetching remote files\n");
}
if (PQgetisnull(res, 0, 0) ||
PQgetisnull(res, 0, 1) ||
PQgetisnull(res, 0, 2))
{
pg_fatal("unexpected NULL result while fetching remote files\n");
}
if (PQgetlength(res, 0, 1) != sizeof(int32))
pg_fatal("unexpected result length while fetching remote files\n");
/* Read result set to local variables */
memcpy(&chunkoff, PQgetvalue(res, 0, 1), sizeof(int32));
chunkoff = ntohl(chunkoff);
chunksize = PQgetlength(res, 0, 2);
filenamelen = PQgetlength(res, 0, 0);
filename = pg_malloc(filenamelen + 1);
memcpy(filename, PQgetvalue(res, 0, 0), filenamelen);
filename[filenamelen] = '\0';
chunk = PQgetvalue(res, 0, 2);
pg_log(PG_DEBUG, "received chunk for file \"%s\", off %d, len %d\n",
filename, chunkoff, chunksize);
open_target_file(filename, false);
write_target_range(chunk, chunkoff, chunksize);
}
}
/*
* Receive a single file as a malloc'd buffer.
*/
char *
libpqGetFile(const char *filename, size_t *filesize)
{
PGresult *res;
char *result;
int len;
const char *paramValues[1];
paramValues[0] = filename;
res = PQexecParams(conn, "SELECT pg_read_binary_file($1)",
1, NULL, paramValues, NULL, NULL, 1);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pg_fatal("unexpected result while fetching remote file \"%s\": %s\n",
filename, PQresultErrorMessage(res));
/* sanity check the result set */
if (PQntuples(res) != 1 || PQgetisnull(res, 0, 0))
pg_fatal("unexpected result set while fetching remote file \"%s\"\n",
filename);
/* Read result to local variables */
len = PQgetlength(res, 0, 0);
result = pg_malloc(len + 1);
memcpy(result, PQgetvalue(res, 0, 0), len);
result[len] = '\0';
pg_log(PG_DEBUG, "fetched file \"%s\", length %d\n", filename, len);
if (filesize)
*filesize = len;
return result;
}
/*
* Write a file range to a temporary table in the server.
*
* The range is sent to the server as a COPY formatted line, to be inserted
* into the 'fetchchunks' temporary table. It is used in receiveFileChunks()
* function to actually fetch the data.
*/
static void
fetch_file_range(const char *path, unsigned int begin, unsigned int end)
{
char linebuf[MAXPGPATH + 23];
/* Split the range into CHUNKSIZE chunks */
while (end - begin > 0)
{
unsigned int len;
if (end - begin > CHUNKSIZE)
len = CHUNKSIZE;
else
len = end - begin;
snprintf(linebuf, sizeof(linebuf), "%s\t%u\t%u\n", path, begin, len);
if (PQputCopyData(conn, linebuf, strlen(linebuf)) != 1)
pg_fatal("error sending COPY data: %s\n",
PQerrorMessage(conn));
begin += len;
}
}
/*
* Fetch all changed blocks from remote source data directory.
*/
void
libpq_executeFileMap(filemap_t *map)
{
file_entry_t *entry;
const char *sql;
PGresult *res;
int i;
/*
* First create a temporary table, and load it with the blocks that we
* need to fetch.
*/
sql = "CREATE TEMPORARY TABLE fetchchunks(path text, begin int4, len int4);";
res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("error creating temporary table: %s\n",
PQresultErrorMessage(res));
sql = "COPY fetchchunks FROM STDIN";
res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_COPY_IN)
pg_fatal("unexpected result while sending file list: %s\n",
PQresultErrorMessage(res));
for (i = 0; i < map->narray; i++)
{
entry = map->array[i];
/* If this is a relation file, copy the modified blocks */
execute_pagemap(&entry->pagemap, entry->path);
switch (entry->action)
{
case FILE_ACTION_NONE:
/* nothing else to do */
break;
case FILE_ACTION_COPY:
/* Truncate the old file out of the way, if any */
open_target_file(entry->path, true);
fetch_file_range(entry->path, 0, entry->newsize);
break;
case FILE_ACTION_TRUNCATE:
truncate_target_file(entry->path, entry->newsize);
break;
case FILE_ACTION_COPY_TAIL:
fetch_file_range(entry->path, entry->oldsize, entry->newsize);
break;
case FILE_ACTION_REMOVE:
remove_target(entry);
break;
case FILE_ACTION_CREATE:
create_target(entry);
break;
}
}
if (PQputCopyEnd(conn, NULL) != 1)
pg_fatal("error sending end-of-COPY: %s\n",
PQerrorMessage(conn));
while ((res = PQgetResult(conn)) != NULL)
{
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("unexpected result while sending file list: %s\n",
PQresultErrorMessage(res));
}
/*
* We've now copied the list of file ranges that we need to fetch to the
* temporary table. Now, actually fetch all of those ranges.
*/
sql =
"SELECT path, begin, \n"
" pg_read_binary_file(path, begin, len) AS chunk\n"
"FROM fetchchunks\n";
receiveFileChunks(sql);
}
static void
execute_pagemap(datapagemap_t *pagemap, const char *path)
{
datapagemap_iterator_t *iter;
BlockNumber blkno;
off_t offset;
iter = datapagemap_iterate(pagemap);
while (datapagemap_next(iter, &blkno))
{
offset = blkno * BLCKSZ;
fetch_file_range(path, offset, offset + BLCKSZ);
}
free(iter);
}

140
src/bin/pg_rewind/logging.c Normal file
View File

@ -0,0 +1,140 @@
/*-------------------------------------------------------------------------
*
* logging.c
* logging functions
*
* Copyright (c) 2010-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#include <time.h>
#include "pg_rewind.h"
#include "logging.h"
#include "pgtime.h"
/* Progress counters */
uint64 fetch_size;
uint64 fetch_done;
static pg_time_t last_progress_report = 0;
#define QUERY_ALLOC 8192
#define MESSAGE_WIDTH 60
static
pg_attribute_printf(2, 0)
void
pg_log_v(eLogType type, const char *fmt, va_list ap)
{
char message[QUERY_ALLOC];
vsnprintf(message, sizeof(message), fmt, ap);
switch (type)
{
case PG_DEBUG:
if (debug)
printf("%s", _(message));
break;
case PG_PROGRESS:
if (showprogress)
printf("%s", _(message));
break;
case PG_WARNING:
printf("%s", _(message));
break;
case PG_FATAL:
printf("\n%s", _(message));
printf("%s", _("Failure, exiting\n"));
exit(1);
break;
default:
break;
}
fflush(stdout);
}
void
pg_log(eLogType type, const char *fmt,...)
{
va_list args;
va_start(args, fmt);
pg_log_v(type, fmt, args);
va_end(args);
}
void
pg_fatal(const char *fmt,...)
{
va_list args;
va_start(args, fmt);
pg_log_v(PG_FATAL, fmt, args);
va_end(args);
/* should not get here, pg_log_v() exited already */
exit(1);
}
/*
* Print a progress report based on the global variables.
*
* Progress report is written at maximum once per second, unless the
* force parameter is set to true.
*/
void
progress_report(bool force)
{
int percent;
char fetch_done_str[32];
char fetch_size_str[32];
pg_time_t now;
if (!showprogress)
return;
now = time(NULL);
if (now == last_progress_report && !force)
return; /* Max once per second */
last_progress_report = now;
percent = fetch_size ? (int) ((fetch_done) * 100 / fetch_size) : 0;
/*
* Avoid overflowing past 100% or the full size. This may make the total
* size number change as we approach the end of the backup (the estimate
* will always be wrong if WAL is included), but that's better than having
* the done column be bigger than the total.
*/
if (percent > 100)
percent = 100;
if (fetch_done > fetch_size)
fetch_size = fetch_done;
/*
* Separate step to keep platform-dependent format code out of
* translatable strings. And we only test for INT64_FORMAT availability
* in snprintf, not fprintf.
*/
snprintf(fetch_done_str, sizeof(fetch_done_str), INT64_FORMAT,
fetch_done / 1024);
snprintf(fetch_size_str, sizeof(fetch_size_str), INT64_FORMAT,
fetch_size / 1024);
pg_log(PG_PROGRESS, "%*s/%s kB (%d%%) copied\r",
(int) strlen(fetch_size_str), fetch_done_str, fetch_size_str,
percent);
}

View File

@ -0,0 +1,37 @@
/*-------------------------------------------------------------------------
*
* logging.h
* prototypes for logging functions
*
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef PG_REWIND_LOGGING_H
#define PG_REWIND_LOGGING_H
/* progress counters */
extern uint64 fetch_size;
extern uint64 fetch_done;
/*
* Enumeration to denote pg_log modes
*/
typedef enum
{
PG_DEBUG,
PG_PROGRESS,
PG_WARNING,
PG_FATAL
} eLogType;
extern void pg_log(eLogType type, const char *fmt,...)
pg_attribute_printf(2, 3);
extern void pg_fatal(const char *fmt,...)
pg_attribute_printf(1, 2) pg_attribute_noreturn;
extern void progress_report(bool force);
#endif

9
src/bin/pg_rewind/nls.mk Normal file
View File

@ -0,0 +1,9 @@
# src/bin/pg_rewind/nls.mk
CATALOG_NAME = pg_rewind
AVAIL_LANGUAGES =
GETTEXT_FILES = copy_fetch.c datapagemap.c fetch.c filemap.c libpq_fetch.c logging.c parsexlog.c pg_rewind.c timeline.c ../../common/fe_memutils.c ../../../src/backend/access/transam/xlogreader.c
GETTEXT_TRIGGERS = pg_log pg_fatal report_invalid_record:2
GETTEXT_FLAGS = pg_log:2:c-format \
pg_fatal:1:c-format \
report_invalid_record:2:c-format

View File

@ -0,0 +1,374 @@
/*-------------------------------------------------------------------------
*
* parsexlog.c
* Functions for reading Write-Ahead-Log
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <unistd.h>
#include "pg_rewind.h"
#include "filemap.h"
#include "logging.h"
#include "access/rmgr.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "catalog/storage_xlog.h"
#include "commands/dbcommands_xlog.h"
/*
* RmgrNames is an array of resource manager names, to make error messages
* a bit nicer.
*/
#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
name,
static const char *RmgrNames[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
};
static void extractPageInfo(XLogReaderState *record);
static int xlogreadfd = -1;
static XLogSegNo xlogreadsegno = -1;
static char xlogfpath[MAXPGPATH];
typedef struct XLogPageReadPrivate
{
const char *datadir;
TimeLineID tli;
} XLogPageReadPrivate;
static int SimpleXLogPageRead(XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
TimeLineID *pageTLI);
/*
* Read WAL from the datadir/pg_xlog, starting from 'startpoint' on timeline
* 'tli', until 'endpoint'. Make note of the data blocks touched by the WAL
* records, and return them in a page map.
*/
void
extractPageMap(const char *datadir, XLogRecPtr startpoint, TimeLineID tli,
XLogRecPtr endpoint)
{
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
XLogPageReadPrivate private;
private.datadir = datadir;
private.tli = tli;
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
do
{
record = XLogReadRecord(xlogreader, startpoint, &errormsg);
if (record == NULL)
{
XLogRecPtr errptr;
errptr = startpoint ? startpoint : xlogreader->EndRecPtr;
if (errormsg)
pg_fatal("error reading WAL at %X/%X: %s\n",
(uint32) (errptr >> 32), (uint32) (errptr),
errormsg);
else
pg_fatal("error reading WAL at %X/%X\n",
(uint32) (startpoint >> 32),
(uint32) (startpoint));
}
extractPageInfo(xlogreader);
startpoint = InvalidXLogRecPtr; /* continue reading at next record */
} while (xlogreader->ReadRecPtr != endpoint);
XLogReaderFree(xlogreader);
if (xlogreadfd != -1)
{
close(xlogreadfd);
xlogreadfd = -1;
}
}
/*
* Reads one WAL record. Returns the end position of the record, without
* doing anything with the record itself.
*/
XLogRecPtr
readOneRecord(const char *datadir, XLogRecPtr ptr, TimeLineID tli)
{
XLogRecord *record;
XLogReaderState *xlogreader;
char *errormsg;
XLogPageReadPrivate private;
XLogRecPtr endptr;
private.datadir = datadir;
private.tli = tli;
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
record = XLogReadRecord(xlogreader, ptr, &errormsg);
if (record == NULL)
{
if (errormsg)
pg_fatal("could not read WAL record at %X/%X: %s\n",
(uint32) (ptr >> 32), (uint32) (ptr), errormsg);
else
pg_fatal("could not read WAL record at %X/%X\n",
(uint32) (ptr >> 32), (uint32) (ptr));
}
endptr = xlogreader->EndRecPtr;
XLogReaderFree(xlogreader);
if (xlogreadfd != -1)
{
close(xlogreadfd);
xlogreadfd = -1;
}
return endptr;
}
/*
* Find the previous checkpoint preceding given WAL position.
*/
void
findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, TimeLineID tli,
XLogRecPtr *lastchkptrec, TimeLineID *lastchkpttli,
XLogRecPtr *lastchkptredo)
{
/* Walk backwards, starting from the given record */
XLogRecord *record;
XLogRecPtr searchptr;
XLogReaderState *xlogreader;
char *errormsg;
XLogPageReadPrivate private;
/*
* The given fork pointer points to the end of the last common record,
* which is not necessarily the beginning of the next record, if the
* previous record happens to end at a page boundary. Skip over the page
* header in that case to find the next record.
*/
if (forkptr % XLOG_BLCKSZ == 0)
forkptr += (forkptr % XLogSegSize == 0) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD;
private.datadir = datadir;
private.tli = tli;
xlogreader = XLogReaderAllocate(&SimpleXLogPageRead, &private);
searchptr = forkptr;
for (;;)
{
uint8 info;
record = XLogReadRecord(xlogreader, searchptr, &errormsg);
if (record == NULL)
{
if (errormsg)
pg_fatal("could not find previous WAL record at %X/%X: %s\n",
(uint32) (searchptr >> 32), (uint32) (searchptr),
errormsg);
else
pg_fatal("could not find previous WAL record at %X/%X\n",
(uint32) (searchptr >> 32), (uint32) (searchptr));
}
/*
* Check if it is a checkpoint record. This checkpoint record needs to
* be the latest checkpoint before WAL forked and not the checkpoint
* where the master has been stopped to be rewinded.
*/
info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
if (searchptr < forkptr &&
XLogRecGetRmid(xlogreader) == RM_XLOG_ID &&
(info == XLOG_CHECKPOINT_SHUTDOWN ||
info == XLOG_CHECKPOINT_ONLINE))
{
CheckPoint checkPoint;
memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
*lastchkptrec = searchptr;
*lastchkpttli = checkPoint.ThisTimeLineID;
*lastchkptredo = checkPoint.redo;
break;
}
/* Walk backwards to previous record. */
searchptr = record->xl_prev;
}
XLogReaderFree(xlogreader);
if (xlogreadfd != -1)
{
close(xlogreadfd);
xlogreadfd = -1;
}
}
/* XLogreader callback function, to read a WAL page */
int
SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *readBuf,
TimeLineID *pageTLI)
{
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
uint32 targetPageOff;
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
XLByteToSeg(targetPagePtr, targetSegNo);
targetPageOff = targetPagePtr % XLogSegSize;
/*
* See if we need to switch to a new segment because the requested record
* is not in the currently open one.
*/
if (xlogreadfd >= 0 && !XLByteInSeg(targetPagePtr, xlogreadsegno))
{
close(xlogreadfd);
xlogreadfd = -1;
}
XLByteToSeg(targetPagePtr, xlogreadsegno);
if (xlogreadfd < 0)
{
char xlogfname[MAXFNAMELEN];
XLogFileName(xlogfname, private->tli, xlogreadsegno);
snprintf(xlogfpath, MAXPGPATH, "%s/" XLOGDIR "/%s", private->datadir, xlogfname);
xlogreadfd = open(xlogfpath, O_RDONLY | PG_BINARY, 0);
if (xlogreadfd < 0)
{
printf(_("could not open file \"%s\": %s\n"), xlogfpath,
strerror(errno));
return -1;
}
}
/*
* At this point, we have the right segment open.
*/
Assert(xlogreadfd != -1);
/* Read the requested page */
if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0)
{
printf(_("could not seek in file \"%s\": %s\n"), xlogfpath,
strerror(errno));
return -1;
}
if (read(xlogreadfd, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
printf(_("could not read from file \"%s\": %s\n"), xlogfpath,
strerror(errno));
return -1;
}
Assert(targetSegNo == xlogreadsegno);
*pageTLI = private->tli;
return XLOG_BLCKSZ;
}
/*
* Extract information on which blocks the current record modifies.
*/
static void
extractPageInfo(XLogReaderState *record)
{
int block_id;
RmgrId rmid = XLogRecGetRmid(record);
uint8 info = XLogRecGetInfo(record);
uint8 rminfo = info & ~XLR_INFO_MASK;
/* Is this a special record type that I recognize? */
if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_CREATE)
{
/*
* New databases can be safely ignored. It won't be present in the
* remote system, so it will be copied in toto. There's one
* corner-case, though: if a new, different, database is also created
* in the remote system, we'll see that the files already exist and
* not copy them. That's OK, though; WAL replay of creating the new
* database, from the remote WAL, will re-copy the new database,
* overwriting the database created in the local system.
*/
}
else if (rmid == RM_DBASE_ID && rminfo == XLOG_DBASE_DROP)
{
/*
* An existing database was dropped. We'll see that the files don't
* exist in local system, and copy them in toto from the remote
* system. No need to do anything special here.
*/
}
else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_CREATE)
{
/*
* We can safely ignore these. The local file will be removed, if it
* doesn't exist in remote system. If a file with same name is created
* in remote system, too, there will be WAL records for all the blocks
* in it.
*/
}
else if (rmid == RM_SMGR_ID && rminfo == XLOG_SMGR_TRUNCATE)
{
/*
* We can safely ignore these. If a file is truncated locally, we'll
* notice that when we compare the sizes, and will copy the missing
* tail from remote system.
*
* TODO: But it would be nice to do some sanity cross-checking here..
*/
}
else if (info & XLR_SPECIAL_REL_UPDATE)
{
/*
* This record type modifies a relation file in some special way, but
* we don't recognize the type. That's bad - we don't know how to
* track that change.
*/
pg_fatal("WAL record modifies a relation, but record type is not recognized\n"
"lsn: %X/%X, rmgr: %s, info: %02X\n",
(uint32) (record->ReadRecPtr >> 32), (uint32) (record->ReadRecPtr),
RmgrNames[rmid], info);
}
for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
continue;
/* We only care about the main fork; others are copied in toto */
if (forknum != MAIN_FORKNUM)
continue;
process_block_change(forknum, rnode, blkno);
}
}

View File

@ -0,0 +1,550 @@
/*-------------------------------------------------------------------------
*
* pg_rewind.c
* Synchronizes an old master server to a new timeline
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/stat.h>
#include <fcntl.h>
#include <time.h>
#include <unistd.h>
#include "pg_rewind.h"
#include "fetch.h"
#include "file_ops.h"
#include "filemap.h"
#include "logging.h"
#include "access/timeline.h"
#include "access/xlog_internal.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
#include "getopt_long.h"
#include "storage/bufpage.h"
static void usage(const char *progname);
static void createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli,
XLogRecPtr checkpointloc);
static void digestControlFile(ControlFileData *ControlFile, char *source,
size_t size);
static void updateControlFile(ControlFileData *ControlFile);
static void sanityChecks(void);
static void findCommonAncestorTimeline(XLogRecPtr *recptr, TimeLineID *tli);
static ControlFileData ControlFile_target;
static ControlFileData ControlFile_source;
const char *progname;
/* Configuration options */
char *datadir_target = NULL;
char *datadir_source = NULL;
char *connstr_source = NULL;
bool debug = false;
bool showprogress = false;
bool dry_run = false;
static void
usage(const char *progname)
{
printf(_("%s resynchronizes a cluster with another copy of the cluster.\n\n"), progname);
printf(_("Usage:\n %s [OPTION]...\n\n"), progname);
printf(_("Options:\n"));
printf(_(" -D, --target-pgdata=DIRECTORY\n"));
printf(_(" existing data directory to modify\n"));
printf(_(" --source-pgdata=DIRECTORY\n"));
printf(_(" source data directory to sync with\n"));
printf(_(" --source-server=CONNSTR\n"));
printf(_(" source server to sync with\n"));
printf(_(" -P, --progress write progress messages\n"));
printf(_(" -n, --dry-run stop before modifying anything\n"));
printf(_(" --debug write a lot of debug messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
printf(_("\n"));
printf(_("Report bugs to <pgsql-bugs@postgresql.org>.\n"));
}
int
main(int argc, char **argv)
{
static struct option long_options[] = {
{"help", no_argument, NULL, '?'},
{"target-pgdata", required_argument, NULL, 'D'},
{"source-pgdata", required_argument, NULL, 1},
{"source-server", required_argument, NULL, 2},
{"version", no_argument, NULL, 'V'},
{"dry-run", no_argument, NULL, 'n'},
{"progress", no_argument, NULL, 'P'},
{"debug", no_argument, NULL, 3},
{NULL, 0, NULL, 0}
};
int option_index;
int c;
XLogRecPtr divergerec;
TimeLineID lastcommontli;
XLogRecPtr chkptrec;
TimeLineID chkpttli;
XLogRecPtr chkptredo;
size_t size;
char *buffer;
bool rewind_needed;
XLogRecPtr endrec;
TimeLineID endtli;
ControlFileData ControlFile_new;
progname = get_progname(argv[0]);
/* Process command-line arguments */
if (argc > 1)
{
if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
{
usage(progname);
exit(0);
}
if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0)
{
puts("pg_rewind (PostgreSQL) " PG_VERSION);
exit(0);
}
}
while ((c = getopt_long(argc, argv, "D:NnP", long_options, &option_index)) != -1)
{
switch (c)
{
case '?':
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
case 'P':
showprogress = true;
break;
case 'n':
dry_run = true;
break;
case 3:
debug = true;
break;
case 'D': /* -D or --target-pgdata */
datadir_target = pg_strdup(optarg);
break;
case 1: /* --source-pgdata */
datadir_source = pg_strdup(optarg);
break;
case 2: /* --source-server */
connstr_source = pg_strdup(optarg);
break;
}
}
/* No source given? Show usage */
if (datadir_source == NULL && connstr_source == NULL)
{
pg_fatal("no source specified (--source-pgdata or --source-server)\n");
pg_fatal("Try \"%s --help\" for more information.\n", progname);
exit(1);
}
if (datadir_target == NULL)
{
pg_fatal("no target data directory specified (--target-pgdata)\n");
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
}
if (argc != optind)
{
pg_fatal("%s: invalid arguments\n", progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit(1);
}
/* Connect to remote server */
if (connstr_source)
libpqConnect(connstr_source);
/*
* Ok, we have all the options and we're ready to start. Read in all the
* information we need from both clusters.
*/
buffer = slurpFile(datadir_target, "global/pg_control", &size);
digestControlFile(&ControlFile_target, buffer, size);
pg_free(buffer);
buffer = fetchFile("global/pg_control", &size);
digestControlFile(&ControlFile_source, buffer, size);
pg_free(buffer);
sanityChecks();
/*
* If both clusters are already on the same timeline, there's nothing to
* do.
*/
if (ControlFile_target.checkPointCopy.ThisTimeLineID == ControlFile_source.checkPointCopy.ThisTimeLineID)
pg_fatal("source and target cluster are on the same timeline\n");
findCommonAncestorTimeline(&divergerec, &lastcommontli);
printf(_("The servers diverged at WAL position %X/%X on timeline %u.\n"),
(uint32) (divergerec >> 32), (uint32) divergerec, lastcommontli);
/*
* Check for the possibility that the target is in fact a direct ancestor
* of the source. In that case, there is no divergent history in the
* target that needs rewinding.
*/
if (ControlFile_target.checkPoint >= divergerec)
{
rewind_needed = true;
}
else
{
XLogRecPtr chkptendrec;
/* Read the checkpoint record on the target to see where it ends. */
chkptendrec = readOneRecord(datadir_target,
ControlFile_target.checkPoint,
ControlFile_target.checkPointCopy.ThisTimeLineID);
/*
* If the histories diverged exactly at the end of the shutdown
* checkpoint record on the target, there are no WAL records in the
* target that don't belong in the source's history, and no rewind is
* needed.
*/
if (chkptendrec == divergerec)
rewind_needed = false;
else
rewind_needed = true;
}
if (!rewind_needed)
{
printf(_("No rewind required.\n"));
exit(0);
}
findLastCheckpoint(datadir_target, divergerec, lastcommontli,
&chkptrec, &chkpttli, &chkptredo);
printf(_("Rewinding from last common checkpoint at %X/%X on timeline %u\n"),
(uint32) (chkptrec >> 32), (uint32) chkptrec,
chkpttli);
/*
* Build the filemap, by comparing the remote and local data directories.
*/
(void) filemap_create();
pg_log(PG_PROGRESS, "reading source file list\n");
fetchRemoteFileList();
pg_log(PG_PROGRESS, "reading target file list\n");
traverse_datadir(datadir_target, &process_local_file);
/*
* Read the target WAL from last checkpoint before the point of fork, to
* extract all the pages that were modified on the target cluster after
* the fork. We can stop reading after reaching the final shutdown record.
* XXX: If we supported rewinding a server that was not shut down cleanly,
* we would need to replay until the end of WAL here.
*/
pg_log(PG_PROGRESS, "reading WAL in target\n");
extractPageMap(datadir_target, chkptrec, lastcommontli,
ControlFile_target.checkPoint);
filemap_finalize();
if (showprogress)
calculate_totals();
/* this is too verbose even for verbose mode */
if (debug)
print_filemap();
/*
* Ok, we're ready to start copying things over.
*/
if (showprogress)
{
pg_log(PG_PROGRESS, "Need to copy %lu MB (total source directory size is %lu MB)\n",
(unsigned long) (filemap->fetch_size / (1024 * 1024)),
(unsigned long) (filemap->total_size / (1024 * 1024)));
fetch_size = filemap->fetch_size;
fetch_done = 0;
}
/*
* This is the point of no return. Once we start copying things, we have
* modified the target directory and there is no turning back!
*/
executeFileMap();
progress_report(true);
pg_log(PG_PROGRESS, "\ncreating backup label and updating control file\n");
createBackupLabel(chkptredo, chkpttli, chkptrec);
/*
* Update control file of target. Make it ready to perform archive
* recovery when restarting.
*
* minRecoveryPoint is set to the current WAL insert location in the
* source server. Like in an online backup, it's important that we recover
* all the WAL that was generated while we copied the files over.
*/
memcpy(&ControlFile_new, &ControlFile_source, sizeof(ControlFileData));
if (connstr_source)
{
endrec = libpqGetCurrentXlogInsertLocation();
endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
}
else
{
endrec = ControlFile_source.checkPoint;
endtli = ControlFile_source.checkPointCopy.ThisTimeLineID;
}
ControlFile_new.minRecoveryPoint = endrec;
ControlFile_new.minRecoveryPointTLI = endtli;
ControlFile_new.state = DB_IN_ARCHIVE_RECOVERY;
updateControlFile(&ControlFile_new);
printf(_("Done!\n"));
return 0;
}
static void
sanityChecks(void)
{
/* TODO Check that there's no backup_label in either cluster */
/* Check system_id match */
if (ControlFile_target.system_identifier != ControlFile_source.system_identifier)
pg_fatal("source and target clusters are from different systems\n");
/* check version */
if (ControlFile_target.pg_control_version != PG_CONTROL_VERSION ||
ControlFile_source.pg_control_version != PG_CONTROL_VERSION ||
ControlFile_target.catalog_version_no != CATALOG_VERSION_NO ||
ControlFile_source.catalog_version_no != CATALOG_VERSION_NO)
{
pg_fatal("clusters are not compatible with this version of pg_rewind\n");
}
/*
* Target cluster need to use checksums or hint bit wal-logging, this to
* prevent from data corruption that could occur because of hint bits.
*/
if (ControlFile_target.data_checksum_version != PG_DATA_CHECKSUM_VERSION &&
!ControlFile_target.wal_log_hints)
{
pg_fatal("target server need to use either data checksums or \"wal_log_hints = on\"\n");
}
/*
* Target cluster better not be running. This doesn't guard against
* someone starting the cluster concurrently. Also, this is probably more
* strict than necessary; it's OK if the master was not shut down cleanly,
* as long as it isn't running at the moment.
*/
if (ControlFile_target.state != DB_SHUTDOWNED)
pg_fatal("target server must be shut down cleanly\n");
/*
* When the source is a data directory, also require that the source
* server is shut down. There isn't any very strong reason for this
* limitation, but better safe than sorry.
*/
if (datadir_source && ControlFile_source.state != DB_SHUTDOWNED)
pg_fatal("source data directory must be shut down cleanly\n");
}
/*
* Determine the TLI of the last common timeline in the histories of the two
* clusters. *tli is set to the last common timeline, and *recptr is set to
* the position where the histories diverged (ie. the first WAL record that's
* not the same in both clusters).
*
* Control files of both clusters must be read into ControlFile_target/source
* before calling this.
*/
static void
findCommonAncestorTimeline(XLogRecPtr *recptr, TimeLineID *tli)
{
TimeLineID targettli;
TimeLineHistoryEntry *sourceHistory;
int nentries;
int i;
TimeLineID sourcetli;
targettli = ControlFile_target.checkPointCopy.ThisTimeLineID;
sourcetli = ControlFile_source.checkPointCopy.ThisTimeLineID;
/* Timeline 1 does not have a history file, so no need to check */
if (sourcetli == 1)
{
sourceHistory = (TimeLineHistoryEntry *) pg_malloc(sizeof(TimeLineHistoryEntry));
sourceHistory->tli = sourcetli;
sourceHistory->begin = sourceHistory->end = InvalidXLogRecPtr;
nentries = 1;
}
else
{
char path[MAXPGPATH];
char *histfile;
TLHistoryFilePath(path, sourcetli);
histfile = fetchFile(path, NULL);
sourceHistory = rewind_parseTimeLineHistory(histfile,
ControlFile_source.checkPointCopy.ThisTimeLineID,
&nentries);
pg_free(histfile);
}
/*
* Trace the history backwards, until we hit the target timeline.
*
* TODO: This assumes that there are no timeline switches on the target
* cluster after the fork.
*/
for (i = nentries - 1; i >= 0; i--)
{
TimeLineHistoryEntry *entry = &sourceHistory[i];
if (entry->tli == targettli)
{
/* found it */
*recptr = entry->end;
*tli = entry->tli;
free(sourceHistory);
return;
}
}
pg_fatal("could not find common ancestor of the source and target cluster's timelines\n");
}
/*
* Create a backup_label file that forces recovery to begin at the last common
* checkpoint.
*/
static void
createBackupLabel(XLogRecPtr startpoint, TimeLineID starttli, XLogRecPtr checkpointloc)
{
XLogSegNo startsegno;
time_t stamp_time;
char strfbuf[128];
char xlogfilename[MAXFNAMELEN];
struct tm *tmp;
char buf[1000];
int len;
XLByteToSeg(startpoint, startsegno);
XLogFileName(xlogfilename, starttli, startsegno);
/*
* Construct backup label file
*/
stamp_time = time(NULL);
tmp = localtime(&stamp_time);
strftime(strfbuf, sizeof(strfbuf), "%Y-%m-%d %H:%M:%S %Z", tmp);
len = snprintf(buf, sizeof(buf),
"START WAL LOCATION: %X/%X (file %s)\n"
"CHECKPOINT LOCATION: %X/%X\n"
"BACKUP METHOD: pg_rewind\n"
"BACKUP FROM: standby\n"
"START TIME: %s\n",
/* omit LABEL: line */
(uint32) (startpoint >> 32), (uint32) startpoint, xlogfilename,
(uint32) (checkpointloc >> 32), (uint32) checkpointloc,
strfbuf);
if (len >= sizeof(buf))
pg_fatal("backup label buffer too small\n"); /* shouldn't happen */
/* TODO: move old file out of the way, if any. */
open_target_file("backup_label", true); /* BACKUP_LABEL_FILE */
write_target_range(buf, 0, len);
}
/*
* Check CRC of control file
*/
static void
checkControlFile(ControlFileData *ControlFile)
{
pg_crc32 crc;
/* Calculate CRC */
INIT_CRC32C(crc);
COMP_CRC32C(crc, (char *) ControlFile, offsetof(ControlFileData, crc));
FIN_CRC32C(crc);
/* And simply compare it */
if (!EQ_CRC32C(crc, ControlFile->crc))
pg_fatal("unexpected control file CRC\n");
}
/*
* Verify control file contents in the buffer src, and copy it to *ControlFile.
*/
static void
digestControlFile(ControlFileData *ControlFile, char *src, size_t size)
{
if (size != PG_CONTROL_SIZE)
pg_fatal("unexpected control file size %d, expected %d\n",
(int) size, PG_CONTROL_SIZE);
memcpy(ControlFile, src, sizeof(ControlFileData));
/* Additional checks on control file */
checkControlFile(ControlFile);
}
/*
* Update the target's control file.
*/
static void
updateControlFile(ControlFileData *ControlFile)
{
char buffer[PG_CONTROL_SIZE];
/* Recalculate CRC of control file */
INIT_CRC32C(ControlFile->crc);
COMP_CRC32C(ControlFile->crc,
(char *) ControlFile,
offsetof(ControlFileData, crc));
FIN_CRC32C(ControlFile->crc);
/*
* Write out PG_CONTROL_SIZE bytes into pg_control by zero-padding the
* excess over sizeof(ControlFileData) to avoid premature EOF related
* errors when reading it.
*/
memset(buffer, 0, PG_CONTROL_SIZE);
memcpy(buffer, ControlFile, sizeof(ControlFileData));
open_target_file("global/pg_control", false);
write_target_range(buffer, 0, PG_CONTROL_SIZE);
close_target_file();
}

View File

@ -0,0 +1,44 @@
/*-------------------------------------------------------------------------
*
* pg_rewind.h
*
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*-------------------------------------------------------------------------
*/
#ifndef PG_REWIND_H
#define PG_REWIND_H
#include "c.h"
#include "datapagemap.h"
#include "access/timeline.h"
#include "storage/block.h"
#include "storage/relfilenode.h"
/* Configuration options */
extern char *datadir_target;
extern char *datadir_source;
extern char *connstr_source;
extern bool debug;
extern bool showprogress;
extern bool dry_run;
/* in parsexlog.c */
extern void extractPageMap(const char *datadir, XLogRecPtr startpoint,
TimeLineID tli, XLogRecPtr endpoint);
extern void findLastCheckpoint(const char *datadir, XLogRecPtr searchptr,
TimeLineID tli,
XLogRecPtr *lastchkptrec, TimeLineID *lastchkpttli,
XLogRecPtr *lastchkptredo);
extern XLogRecPtr readOneRecord(const char *datadir, XLogRecPtr ptr,
TimeLineID tli);
/* in timeline.c */
extern TimeLineHistoryEntry *rewind_parseTimeLineHistory(char *buffer,
TimeLineID targetTLI, int *nentries);
#endif /* PG_REWIND_H */

View File

@ -0,0 +1,80 @@
use strict;
use warnings;
use TestLib;
use Test::More tests => 4;
use RewindTest;
my $testmode = shift;
RewindTest::init_rewind_test('basic', $testmode);
RewindTest::setup_cluster();
# Create a test table and insert a row in master.
master_psql("CREATE TABLE tbl1 (d text)");
master_psql("INSERT INTO tbl1 VALUES ('in master')");
# This test table will be used to test truncation, i.e. the table
# is extended in the old master after promotion
master_psql("CREATE TABLE trunc_tbl (d text)");
master_psql("INSERT INTO trunc_tbl VALUES ('in master')");
# This test table will be used to test the "copy-tail" case, i.e. the
# table is truncated in the old master after promotion
master_psql("CREATE TABLE tail_tbl (id integer, d text)");
master_psql("INSERT INTO tail_tbl VALUES (0, 'in master')");
master_psql("CHECKPOINT");
RewindTest::create_standby();
# Insert additional data on master that will be replicated to standby
master_psql("INSERT INTO tbl1 values ('in master, before promotion')");
master_psql("INSERT INTO trunc_tbl values ('in master, before promotion')");
master_psql("INSERT INTO tail_tbl SELECT g, 'in master, before promotion: ' || g FROM generate_series(1, 10000) g");
master_psql('CHECKPOINT');
RewindTest::promote_standby();
# Insert a row in the old master. This causes the master and standby
# to have "diverged", it's no longer possible to just apply the
# standy's logs over master directory - you need to rewind.
master_psql("INSERT INTO tbl1 VALUES ('in master, after promotion')");
# Also insert a new row in the standby, which won't be present in the
# old master.
standby_psql("INSERT INTO tbl1 VALUES ('in standby, after promotion')");
# Insert enough rows to trunc_tbl to extend the file. pg_rewind should
# truncate it back to the old size.
master_psql("INSERT INTO trunc_tbl SELECT 'in master, after promotion: ' || g FROM generate_series(1, 10000) g");
# Truncate tail_tbl. pg_rewind should copy back the truncated part
# (We cannot use an actual TRUNCATE command here, as that creates a
# whole new relfilenode)
master_psql("DELETE FROM tail_tbl WHERE id > 10");
master_psql("VACUUM tail_tbl");
RewindTest::run_pg_rewind();
check_query('SELECT * FROM tbl1',
qq(in master
in master, before promotion
in standby, after promotion
),
'table content');
check_query('SELECT * FROM trunc_tbl',
qq(in master
in master, before promotion
),
'truncation');
check_query('SELECT count(*) FROM tail_tbl',
qq(10001
),
'tail-copy');
exit(0);

View File

@ -0,0 +1,41 @@
use strict;
use warnings;
use TestLib;
use Test::More tests => 2;
use RewindTest;
my $testmode = shift;
RewindTest::init_rewind_test('databases', $testmode);
RewindTest::setup_cluster();
# Create a database in master.
master_psql('CREATE DATABASE inmaster');
RewindTest::create_standby();
# Create another database, the creation is replicated to the standby
master_psql('CREATE DATABASE beforepromotion');
RewindTest::promote_standby();
# Create databases in the old master and the new promoted standby.
master_psql('CREATE DATABASE master_afterpromotion');
standby_psql('CREATE DATABASE standby_afterpromotion');
# The clusters are now diverged.
RewindTest::run_pg_rewind();
# Check that the correct databases are present after pg_rewind.
check_query('SELECT datname FROM pg_database',
qq(template1
template0
postgres
inmaster
beforepromotion
standby_afterpromotion
),
'database names');
exit(0);

View File

@ -0,0 +1,61 @@
# Test how pg_rewind reacts to extra files and directories in the data dirs.
use strict;
use warnings;
use TestLib;
use Test::More tests => 2;
use File::Find;
use RewindTest;
my $testmode = shift;
RewindTest::init_rewind_test('extrafiles', $testmode);
RewindTest::setup_cluster();
# Create a subdir and files that will be present in both
mkdir "$test_master_datadir/tst_both_dir";
append_to_file "$test_master_datadir/tst_both_dir/both_file1", "in both1";
append_to_file "$test_master_datadir/tst_both_dir/both_file2", "in both2";
mkdir "$test_master_datadir/tst_both_dir/both_subdir/";
append_to_file "$test_master_datadir/tst_both_dir/both_subdir/both_file3", "in both3";
RewindTest::create_standby();
# Create different subdirs and files in master and standby
mkdir "$test_standby_datadir/tst_standby_dir";
append_to_file "$test_standby_datadir/tst_standby_dir/standby_file1", "in standby1";
append_to_file "$test_standby_datadir/tst_standby_dir/standby_file2", "in standby2";
mkdir "$test_standby_datadir/tst_standby_dir/standby_subdir/";
append_to_file "$test_standby_datadir/tst_standby_dir/standby_subdir/standby_file3", "in standby3";
mkdir "$test_master_datadir/tst_master_dir";
append_to_file "$test_master_datadir/tst_master_dir/master_file1", "in master1";
append_to_file "$test_master_datadir/tst_master_dir/master_file2", "in master2";
mkdir "$test_master_datadir/tst_master_dir/master_subdir/";
append_to_file "$test_master_datadir/tst_master_dir/master_subdir/master_file3", "in master3";
RewindTest::promote_standby();
RewindTest::run_pg_rewind();
# List files in the data directory after rewind.
my @paths;
find(sub {push @paths, $File::Find::name if $File::Find::name =~ m/.*tst_.*/},
$test_master_datadir);
@paths = sort @paths;
is_deeply(\@paths,
["$test_master_datadir/tst_both_dir",
"$test_master_datadir/tst_both_dir/both_file1",
"$test_master_datadir/tst_both_dir/both_file2",
"$test_master_datadir/tst_both_dir/both_subdir",
"$test_master_datadir/tst_both_dir/both_subdir/both_file3",
"$test_master_datadir/tst_standby_dir",
"$test_master_datadir/tst_standby_dir/standby_file1",
"$test_master_datadir/tst_standby_dir/standby_file2",
"$test_master_datadir/tst_standby_dir/standby_subdir",
"$test_master_datadir/tst_standby_dir/standby_subdir/standby_file3"],
"file lists match");
exit(0);

View File

@ -0,0 +1,131 @@
/*-------------------------------------------------------------------------
*
* timeline.c
* timeline-related functions.
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include "pg_rewind.h"
#include "access/timeline.h"
#include "access/xlog_internal.h"
/*
* This is copy-pasted from the backend readTimeLineHistory, modified to
* return a malloc'd array and to work without backend functions.
*/
/*
* Try to read a timeline's history file.
*
* If successful, return the list of component TLIs (the given TLI followed by
* its ancestor TLIs). If we can't find the history file, assume that the
* timeline has no parents, and return a list of just the specified timeline
* ID.
*/
TimeLineHistoryEntry *
rewind_parseTimeLineHistory(char *buffer, TimeLineID targetTLI, int *nentries)
{
char *fline;
TimeLineHistoryEntry *entry;
TimeLineHistoryEntry *entries = NULL;
int nlines = 0;
TimeLineID lasttli = 0;
XLogRecPtr prevend;
char *bufptr;
bool lastline = false;
/*
* Parse the file...
*/
prevend = InvalidXLogRecPtr;
bufptr = buffer;
while (!lastline)
{
char *ptr;
TimeLineID tli;
uint32 switchpoint_hi;
uint32 switchpoint_lo;
int nfields;
fline = bufptr;
while (*bufptr && *bufptr != '\n')
bufptr++;
if (!(*bufptr))
lastline = true;
else
*bufptr++ = '\0';
/* skip leading whitespace and check for # comment */
for (ptr = fline; *ptr; ptr++)
{
if (!isspace((unsigned char) *ptr))
break;
}
if (*ptr == '\0' || *ptr == '#')
continue;
nfields = sscanf(fline, "%u\t%X/%X", &tli, &switchpoint_hi, &switchpoint_lo);
if (nfields < 1)
{
/* expect a numeric timeline ID as first field of line */
printf(_("syntax error in history file: %s\n"), fline);
printf(_("Expected a numeric timeline ID.\n"));
exit(1);
}
if (nfields != 3)
{
printf(_("syntax error in history file: %s\n"), fline);
printf(_("Expected an XLOG switchpoint location.\n"));
exit(1);
}
if (entries && tli <= lasttli)
{
printf(_("invalid data in history file: %s\n"), fline);
printf(_("Timeline IDs must be in increasing sequence.\n"));
exit(1);
}
lasttli = tli;
nlines++;
entries = pg_realloc(entries, nlines * sizeof(TimeLineHistoryEntry));
entry = &entries[nlines - 1];
entry->tli = tli;
entry->begin = prevend;
entry->end = ((uint64) (switchpoint_hi)) << 32 | (uint64) switchpoint_lo;
prevend = entry->end;
/* we ignore the remainder of each line */
}
if (entries && targetTLI <= lasttli)
{
printf(_("invalid data in history file\n"));
printf(_("Timeline IDs must be less than child timeline's ID.\n"));
exit(1);
}
/*
* Create one more entry for the "tip" of the timeline, which has no entry
* in the history file.
*/
nlines++;
if (entries)
entries = pg_realloc(entries, nlines * sizeof(TimeLineHistoryEntry));
else
entries = pg_malloc(1 * sizeof(TimeLineHistoryEntry));
entry = &entries[nlines - 1];
entry->tli = targetTLI;
entry->begin = prevend;
entry->end = InvalidXLogRecPtr;
*nentries = nlines;
return entries;
}

View File

@ -65,7 +65,8 @@ my $frontend_extraincludes = {
'initdb' => ['src\timezone'],
'psql' => [ 'src\bin\pg_dump', 'src\backend' ] };
my $frontend_extrasource = { 'psql' => ['src\bin\psql\psqlscan.l'] };
my @frontend_excludes = ('pgevent', 'pg_basebackup', 'pg_dump', 'scripts');
my @frontend_excludes =
('pgevent', 'pg_basebackup', 'pg_rewind', 'pg_dump', 'scripts');
sub mkvcbuild
{
@ -422,6 +423,11 @@ sub mkvcbuild
$pgrecvlogical->AddFile('src\bin\pg_basebackup\pg_recvlogical.c');
$pgrecvlogical->AddLibrary('ws2_32.lib');
my $pgrewind = AddSimpleFrontend('pg_rewind', 1);
$pgrewind->{name} = 'pg_rewind';
$pgrewind->AddFile('src\backend\access\transam\xlogreader.c');
$pgrewind->AddLibrary('ws2_32.lib');
my $pgevent = $solution->AddProject('pgevent', 'dll', 'bin');
$pgevent->AddFiles('src\bin\pgevent', 'pgevent.c', 'pgmsgevent.rc');
$pgevent->AddResourceFile('src\bin\pgevent', 'Eventlog message formatter',