Fix up pg_dump's treatment of large object ownership and ACLs. We now emit

a separate archive entry for each BLOB, and use pg_dump's standard methods
for dealing with its ownership, ACL if any, and comment if any.  This means
that switches like --no-owner and --no-privileges do what they're supposed
to.  Preliminary testing says that performance is still reasonable even
with many blobs, though we'll have to see how that shakes out in the field.

KaiGai Kohei, revised by me
This commit is contained in:
Tom Lane 2010-02-18 01:29:10 +00:00
parent 2b44d74dd4
commit c0d5be5d6a
8 changed files with 284 additions and 248 deletions

View File

@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.53 2010/01/02 16:57:59 momjian Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.54 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -486,7 +486,8 @@ parsePGArray(const char *atext, char ***itemarray, int *nitems)
* name: the object name, in the form to use in the commands (already quoted)
* subname: the sub-object name, if any (already quoted); NULL if none
* type: the object type (as seen in GRANT command: must be one of
* TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE)
* TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
* FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT)
* acls: the ACL string fetched from the database
* owner: username of object owner (will be passed through fmtId); can be
* NULL or empty string to indicate "no owner known"

View File

@ -15,7 +15,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.178 2010/01/19 18:39:19 tgl Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.179 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -98,6 +98,7 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
static bool _tocEntryIsACL(TocEntry *te);
static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
@ -329,9 +330,9 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
AH->currentTE = te;
reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
/* We want anything that's selected and has a dropStmt */
if (((reqs & (REQ_SCHEMA|REQ_DATA)) != 0) && te->dropStmt)
{
/* We want the schema */
ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
/* Select owner and schema as necessary */
_becomeOwner(AH, te);
@ -381,7 +382,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
/* Work out what, if anything, we want from this entry */
reqs = _tocEntryRequired(te, ropt, true);
if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
/* Both schema and data objects might now have ownership/ACLs */
if ((reqs & (REQ_SCHEMA|REQ_DATA)) != 0)
{
ahlog(AH, 1, "setting owner and privileges for %s %s\n",
te->desc, te->tag);
@ -905,6 +907,7 @@ EndRestoreBlobs(ArchiveHandle *AH)
void
StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
{
bool old_blob_style = (AH->version < K_VERS_1_12);
Oid loOid;
AH->blobCount++;
@ -914,24 +917,32 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
ahlog(AH, 2, "restoring large object with OID %u\n", oid);
if (drop)
/* With an old archive we must do drop and create logic here */
if (old_blob_style && drop)
DropBlobIfExists(AH, oid);
if (AH->connection)
{
loOid = lo_create(AH->connection, oid);
if (loOid == 0 || loOid != oid)
die_horribly(AH, modulename, "could not create large object %u\n",
oid);
if (old_blob_style)
{
loOid = lo_create(AH->connection, oid);
if (loOid == 0 || loOid != oid)
die_horribly(AH, modulename, "could not create large object %u\n",
oid);
}
AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
if (AH->loFd == -1)
die_horribly(AH, modulename, "could not open large object\n");
die_horribly(AH, modulename, "could not open large object %u\n",
oid);
}
else
{
ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
oid, INV_WRITE);
if (old_blob_style)
ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
oid, INV_WRITE);
else
ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
oid, INV_WRITE);
}
AH->writingBlob = 1;
@ -1829,6 +1840,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->vmin = K_VERS_MINOR;
AH->vrev = K_VERS_REV;
/* Make a convenient integer <maj><min><rev>00 */
AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
/* initialize for backwards compatible string processing */
AH->public.encoding = 0; /* PG_SQL_ASCII */
AH->public.std_strings = false;
@ -2068,12 +2082,13 @@ ReadToc(ArchiveHandle *AH)
else
{
/*
* rules for pre-8.4 archives wherein pg_dump hasn't classified
* the entries into sections
* Rules for pre-8.4 archives wherein pg_dump hasn't classified
* the entries into sections. This list need not cover entry
* types added later than 8.4.
*/
if (strcmp(te->desc, "COMMENT") == 0 ||
strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0)
strcmp(te->desc, "ACL LANGUAGE") == 0)
te->section = SECTION_NONE;
else if (strcmp(te->desc, "TABLE DATA") == 0 ||
strcmp(te->desc, "BLOBS") == 0 ||
@ -2228,10 +2243,10 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
return 0;
/* If it's an ACL, maybe ignore it */
if ((!include_acls || ropt->aclsSkip) &&
(strcmp(te->desc, "ACL") == 0 || strcmp(te->desc, "DEFAULT ACL") == 0))
if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te))
return 0;
/* Ignore DATABASE entry unless we should create it */
if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
return 0;
@ -2286,9 +2301,18 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
if (!te->hadDumper)
{
/*
* Special Case: If 'SEQUENCE SET' then it is considered a data entry
* Special Case: If 'SEQUENCE SET' or anything to do with BLOBs,
* then it is considered a data entry. We don't need to check for
* the BLOBS entry or old-style BLOB COMMENTS, because they will
* have hadDumper = true ... but we do need to check new-style
* BLOB comments.
*/
if (strcmp(te->desc, "SEQUENCE SET") == 0)
if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
strcmp(te->desc, "BLOB") == 0 ||
(strcmp(te->desc, "ACL") == 0 &&
strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
(strcmp(te->desc, "COMMENT") == 0 &&
strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
res = res & REQ_DATA;
else
res = res & ~REQ_DATA;
@ -2320,6 +2344,20 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
return res;
}
/*
* Identify TOC entries that are ACLs.
*/
static bool
_tocEntryIsACL(TocEntry *te)
{
/* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
if (strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "ACL LANGUAGE") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0)
return true;
return false;
}
/*
* Issue SET commands for parameters that we want to have set the same way
* at all times during execution of a restore script.
@ -2685,6 +2723,13 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
return;
}
/* BLOBs just have a name, but it's numeric so must not use fmtId */
if (strcmp(type, "BLOB") == 0)
{
appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
return;
}
/*
* These object types require additional decoration. Fortunately, the
* information needed is exactly what's in the DROP command.
@ -2723,14 +2768,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
/* ACLs are dumped only during acl pass */
if (acl_pass)
{
if (!(strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0))
if (!_tocEntryIsACL(te))
return;
}
else
{
if (strcmp(te->desc, "ACL") == 0 ||
strcmp(te->desc, "DEFAULT ACL") == 0)
if (_tocEntryIsACL(te))
return;
}
@ -2824,6 +2867,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
{
if (strcmp(te->desc, "AGGREGATE") == 0 ||
strcmp(te->desc, "BLOB") == 0 ||
strcmp(te->desc, "CONVERSION") == 0 ||
strcmp(te->desc, "DATABASE") == 0 ||
strcmp(te->desc, "DOMAIN") == 0 ||
@ -2873,7 +2917,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
* If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
* commands, so we can no longer assume we know the current auth setting.
*/
if (strncmp(te->desc, "ACL", 3) == 0)
if (acl_pass)
{
if (AH->currUser)
free(AH->currUser);

View File

@ -17,7 +17,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.83 2009/12/14 00:39:11 itagaki Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.84 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -61,16 +61,16 @@ typedef struct _z_stream
typedef z_stream *z_streamp;
#endif
/* Current archive version number (the format we can output) */
#define K_VERS_MAJOR 1
#define K_VERS_MINOR 11
#define K_VERS_MINOR 12
#define K_VERS_REV 0
/* Data block types */
#define BLK_DATA 1
#define BLK_BLOB 2
#define BLK_BLOBS 3
/* Some important version numbers (checked in code) */
/* Historical version numbers (checked in code) */
#define K_VERS_1_0 (( (1 * 256 + 0) * 256 + 0) * 256 + 0)
#define K_VERS_1_2 (( (1 * 256 + 2) * 256 + 0) * 256 + 0) /* Allow No ZLIB */
#define K_VERS_1_3 (( (1 * 256 + 3) * 256 + 0) * 256 + 0) /* BLOBs */
@ -87,8 +87,11 @@ typedef z_stream *z_streamp;
#define K_VERS_1_10 (( (1 * 256 + 10) * 256 + 0) * 256 + 0) /* add tablespace */
#define K_VERS_1_11 (( (1 * 256 + 11) * 256 + 0) * 256 + 0) /* add toc section
* indicator */
#define K_VERS_1_12 (( (1 * 256 + 12) * 256 + 0) * 256 + 0) /* add separate BLOB
* entries */
#define K_VERS_MAX (( (1 * 256 + 11) * 256 + 255) * 256 + 0)
/* Newest format we can read */
#define K_VERS_MAX (( (1 * 256 + 12) * 256 + 255) * 256 + 0)
/* Flags to indicate disposition of offsets stored in files */

View File

@ -5,7 +5,7 @@
* Implements the basic DB functions used by the archiver.
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.87 2010/02/17 04:19:40 tgl Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.88 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -703,16 +703,26 @@ CommitTransaction(ArchiveHandle *AH)
void
DropBlobIfExists(ArchiveHandle *AH, Oid oid)
{
/* Call lo_unlink only if exists to avoid not-found error. */
if (PQserverVersion(AH->connection) >= 90000)
/*
* If we are not restoring to a direct database connection, we have to
* guess about how to detect whether the blob exists. Assume new-style.
*/
if (AH->connection == NULL ||
PQserverVersion(AH->connection) >= 90000)
{
ahprintf(AH, "SELECT pg_catalog.lo_unlink(oid) "
"FROM pg_catalog.pg_largeobject_metadata "
"WHERE oid = %u;\n", oid);
ahprintf(AH,
"SELECT pg_catalog.lo_unlink(oid) "
"FROM pg_catalog.pg_largeobject_metadata "
"WHERE oid = '%u';\n",
oid);
}
else
{
ahprintf(AH, "SELECT CASE WHEN EXISTS(SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u') THEN pg_catalog.lo_unlink('%u') END;\n",
/* Restoring to pre-9.0 server, so do it the old way */
ahprintf(AH,
"SELECT CASE WHEN EXISTS("
"SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
") THEN pg_catalog.lo_unlink('%u') END;\n",
oid, oid);
}
}

View File

@ -17,7 +17,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.23 2009/12/14 00:39:11 itagaki Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.24 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -147,14 +147,21 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
static void
_StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
{
bool old_blob_style = (AH->version < K_VERS_1_12);
if (oid == 0)
die_horribly(AH, NULL, "invalid OID for large object\n");
if (AH->ropt->dropSchema)
/* With an old archive we must do drop and create logic here */
if (old_blob_style && AH->ropt->dropSchema)
DropBlobIfExists(AH, oid);
ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
oid, INV_WRITE);
if (old_blob_style)
ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
oid, INV_WRITE);
else
ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
oid, INV_WRITE);
AH->WriteDataPtr = _WriteBlobData;
}

View File

@ -12,7 +12,7 @@
* by PostgreSQL
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.571 2010/02/17 04:19:40 tgl Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.572 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -190,9 +190,9 @@ static void selectSourceSchema(const char *schemaName);
static char *getFormattedTypeName(Oid oid, OidOptions opts);
static char *myFormatType(const char *typname, int32 typmod);
static const char *fmtQualifiedId(const char *schema, const char *id);
static bool hasBlobs(Archive *AH);
static void getBlobs(Archive *AH);
static void dumpBlob(Archive *AH, BlobInfo *binfo);
static int dumpBlobs(Archive *AH, void *arg);
static int dumpBlobComments(Archive *AH, void *arg);
static void dumpDatabase(Archive *AH);
static void dumpEncoding(Archive *AH);
static void dumpStdStrings(Archive *AH);
@ -701,25 +701,8 @@ main(int argc, char **argv)
getTableDataFKConstraints();
}
if (outputBlobs && hasBlobs(g_fout))
{
/* Add placeholders to allow correct sorting of blobs */
DumpableObject *blobobj;
DumpableObject *blobcobj;
blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
blobobj->objType = DO_BLOBS;
blobobj->catId = nilCatalogId;
AssignDumpId(blobobj);
blobobj->name = strdup("BLOBS");
blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
blobcobj->objType = DO_BLOB_COMMENTS;
blobcobj->catId = nilCatalogId;
AssignDumpId(blobcobj);
blobcobj->name = strdup("BLOB COMMENTS");
addObjectDependency(blobcobj, blobobj->dumpId);
}
if (outputBlobs)
getBlobs(g_fout);
/*
* Collect dependency data to assist in ordering the objects.
@ -1808,7 +1791,7 @@ dumpDatabase(Archive *AH)
appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid\n"
"FROM pg_catalog.pg_class\n"
"WHERE oid = %d;\n",
"WHERE oid = %u;\n",
LargeObjectRelationId);
lo_res = PQexec(g_conn, loFrozenQry->data);
@ -1825,7 +1808,7 @@ dumpDatabase(Archive *AH)
appendPQExpBuffer(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid.\n");
appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n"
"SET relfrozenxid = '%u'\n"
"WHERE oid = %d;\n",
"WHERE oid = %u;\n",
atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
LargeObjectRelationId);
ArchiveEntry(AH, nilCatalogId, createDumpId(),
@ -1938,40 +1921,135 @@ dumpStdStrings(Archive *AH)
/*
* hasBlobs:
* Test whether database contains any large objects
* getBlobs:
* Collect schema-level data about large objects
*/
static bool
hasBlobs(Archive *AH)
static void
getBlobs(Archive *AH)
{
bool result;
const char *blobQry;
PGresult *res;
PQExpBuffer blobQry = createPQExpBuffer();
BlobInfo *binfo;
DumpableObject *bdata;
PGresult *res;
int ntups;
int i;
/* Verbose message */
if (g_verbose)
write_msg(NULL, "reading large objects\n");
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
/* Check for BLOB OIDs */
/* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */
if (AH->remoteVersion >= 90000)
blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
appendPQExpBuffer(blobQry,
"SELECT oid, (%s lomowner) AS rolname, lomacl"
" FROM pg_largeobject_metadata",
username_subquery);
else if (AH->remoteVersion >= 70100)
blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
appendPQExpBuffer(blobQry,
"SELECT DISTINCT loid, NULL::oid, NULL::oid"
" FROM pg_largeobject");
else
blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
appendPQExpBuffer(blobQry,
"SELECT oid, NULL::oid, NULL::oid"
" FROM pg_class WHERE relkind = 'l'");
res = PQexec(g_conn, blobQry);
check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
res = PQexec(g_conn, blobQry->data);
check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK);
result = PQntuples(res) > 0;
ntups = PQntuples(res);
if (ntups > 0)
{
/*
* Each large object has its own BLOB archive entry.
*/
binfo = (BlobInfo *) malloc(ntups * sizeof(BlobInfo));
for (i = 0; i < ntups; i++)
{
binfo[i].dobj.objType = DO_BLOB;
binfo[i].dobj.catId.tableoid = LargeObjectRelationId;
binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, 0));
AssignDumpId(&binfo[i].dobj);
binfo[i].dobj.name = strdup(PQgetvalue(res, i, 0));
if (!PQgetisnull(res, i, 1))
binfo[i].rolname = strdup(PQgetvalue(res, i, 1));
else
binfo[i].rolname = "";
if (!PQgetisnull(res, i, 2))
binfo[i].blobacl = strdup(PQgetvalue(res, i, 2));
else
binfo[i].blobacl = NULL;
}
/*
* If we have any large objects, a "BLOBS" archive entry is needed.
* This is just a placeholder for sorting; it carries no data now.
*/
bdata = (DumpableObject *) malloc(sizeof(DumpableObject));
bdata->objType = DO_BLOB_DATA;
bdata->catId = nilCatalogId;
AssignDumpId(bdata);
bdata->name = strdup("BLOBS");
}
PQclear(res);
destroyPQExpBuffer(blobQry);
}
return result;
/*
* dumpBlob
*
* dump the definition (metadata) of the given large object
*/
static void
dumpBlob(Archive *AH, BlobInfo *binfo)
{
PQExpBuffer cquery = createPQExpBuffer();
PQExpBuffer dquery = createPQExpBuffer();
appendPQExpBuffer(cquery,
"SELECT pg_catalog.lo_create('%s');\n",
binfo->dobj.name);
appendPQExpBuffer(dquery,
"SELECT pg_catalog.lo_unlink('%s');\n",
binfo->dobj.name);
ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
binfo->dobj.name,
NULL, NULL,
binfo->rolname, false,
"BLOB", SECTION_PRE_DATA,
cquery->data, dquery->data, NULL,
binfo->dobj.dependencies, binfo->dobj.nDeps,
NULL, NULL);
/* set up tag for comment and/or ACL */
resetPQExpBuffer(cquery);
appendPQExpBuffer(cquery, "LARGE OBJECT %s", binfo->dobj.name);
/* Dump comment if any */
dumpComment(AH, cquery->data,
NULL, binfo->rolname,
binfo->dobj.catId, 0, binfo->dobj.dumpId);
/* Dump ACL if any */
if (binfo->blobacl)
dumpACL(AH, binfo->dobj.catId, binfo->dobj.dumpId, "LARGE OBJECT",
binfo->dobj.name, NULL, cquery->data,
NULL, binfo->rolname, binfo->blobacl);
destroyPQExpBuffer(cquery);
destroyPQExpBuffer(dquery);
}
/*
* dumpBlobs:
* dump all blobs
* dump the data contents of all large objects
*/
static int
dumpBlobs(Archive *AH, void *arg)
@ -1980,6 +2058,7 @@ dumpBlobs(Archive *AH, void *arg)
const char *blobFetchQry;
PGresult *res;
char buf[LOBBUFSIZE];
int ntups;
int i;
int cnt;
@ -1989,7 +2068,10 @@ dumpBlobs(Archive *AH, void *arg)
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
/* Cursor to get all BLOB OIDs */
/*
* Currently, we re-fetch all BLOB OIDs using a cursor. Consider
* scanning the already-in-memory dumpable objects instead...
*/
if (AH->remoteVersion >= 90000)
blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
else if (AH->remoteVersion >= 70100)
@ -2012,7 +2094,8 @@ dumpBlobs(Archive *AH, void *arg)
check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
/* Process the tuples, if any */
for (i = 0; i < PQntuples(res); i++)
ntups = PQntuples(res);
for (i = 0; i < ntups; i++)
{
Oid blobOid;
int loFd;
@ -2022,8 +2105,8 @@ dumpBlobs(Archive *AH, void *arg)
loFd = lo_open(g_conn, blobOid, INV_READ);
if (loFd == -1)
{
write_msg(NULL, "dumpBlobs(): could not open large object: %s",
PQerrorMessage(g_conn));
write_msg(NULL, "dumpBlobs(): could not open large object %u: %s",
blobOid, PQerrorMessage(g_conn));
exit_nicely();
}
@ -2035,8 +2118,8 @@ dumpBlobs(Archive *AH, void *arg)
cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE);
if (cnt < 0)
{
write_msg(NULL, "dumpBlobs(): error reading large object: %s",
PQerrorMessage(g_conn));
write_msg(NULL, "dumpBlobs(): error reading large object %u: %s",
blobOid, PQerrorMessage(g_conn));
exit_nicely();
}
@ -2047,141 +2130,13 @@ dumpBlobs(Archive *AH, void *arg)
EndBlob(AH, blobOid);
}
} while (PQntuples(res) > 0);
} while (ntups > 0);
PQclear(res);
return 1;
}
/*
* dumpBlobComments
* dump all blob properties.
* It has "BLOB COMMENTS" tag due to the historical reason, but note
* that it is the routine to dump all the properties of blobs.
*
* Since we don't provide any way to be selective about dumping blobs,
* there's no need to be selective about their comments either. We put
* all the comments into one big TOC entry.
*/
static int
dumpBlobComments(Archive *AH, void *arg)
{
const char *blobQry;
const char *blobFetchQry;
PQExpBuffer cmdQry = createPQExpBuffer();
PGresult *res;
int i;
if (g_verbose)
write_msg(NULL, "saving large object properties\n");
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
/* Cursor to get all BLOB comments */
if (AH->remoteVersion >= 90000)
blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
"obj_description(oid, 'pg_largeobject'), "
"pg_get_userbyid(lomowner), lomacl "
"FROM pg_largeobject_metadata";
else if (AH->remoteVersion >= 70300)
blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
"obj_description(loid, 'pg_largeobject'), NULL, NULL "
"FROM (SELECT DISTINCT loid FROM "
"pg_description d JOIN pg_largeobject l ON (objoid = loid) "
"WHERE classoid = 'pg_largeobject'::regclass) ss";
else if (AH->remoteVersion >= 70200)
blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
"obj_description(loid, 'pg_largeobject'), NULL, NULL "
"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
else if (AH->remoteVersion >= 70100)
blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
"obj_description(loid), NULL, NULL "
"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
else
blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
" ( "
" SELECT description "
" FROM pg_description pd "
" WHERE pd.objoid=pc.oid "
" ), NULL, NULL "
"FROM pg_class pc WHERE relkind = 'l'";
res = PQexec(g_conn, blobQry);
check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
/* Command to fetch from cursor */
blobFetchQry = "FETCH 100 IN blobcmt";
do
{
PQclear(res);
/* Do a fetch */
res = PQexec(g_conn, blobFetchQry);
check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
/* Process the tuples, if any */
for (i = 0; i < PQntuples(res); i++)
{
Oid blobOid = atooid(PQgetvalue(res, i, 0));
char *lo_comment = PQgetvalue(res, i, 1);
char *lo_owner = PQgetvalue(res, i, 2);
char *lo_acl = PQgetvalue(res, i, 3);
char lo_name[32];
resetPQExpBuffer(cmdQry);
/* comment on the blob */
if (!PQgetisnull(res, i, 1))
{
appendPQExpBuffer(cmdQry,
"COMMENT ON LARGE OBJECT %u IS ", blobOid);
appendStringLiteralAH(cmdQry, lo_comment, AH);
appendPQExpBuffer(cmdQry, ";\n");
}
/* dump blob ownership, if necessary */
if (!PQgetisnull(res, i, 2))
{
appendPQExpBuffer(cmdQry,
"ALTER LARGE OBJECT %u OWNER TO %s;\n",
blobOid, lo_owner);
}
/* dump blob privileges, if necessary */
if (!PQgetisnull(res, i, 3) &&
!dataOnly && !aclsSkip)
{
snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
lo_acl, lo_owner, "",
AH->remoteVersion, cmdQry))
{
write_msg(NULL, "could not parse ACL (%s) for "
"large object %u", lo_acl, blobOid);
exit_nicely();
}
}
if (cmdQry->len > 0)
{
appendPQExpBuffer(cmdQry, "\n");
archputs(cmdQry->data, AH);
}
}
} while (PQntuples(res) > 0);
PQclear(res);
archputs("\n", AH);
destroyPQExpBuffer(cmdQry);
return 1;
}
static void
binary_upgrade_set_type_oids_by_type_oid(PQExpBuffer upgrade_buffer,
Oid pg_type_oid)
@ -6140,9 +6095,17 @@ dumpComment(Archive *fout, const char *target,
CommentItem *comments;
int ncomments;
/* Comments are SCHEMA not data */
if (dataOnly)
return;
/* Comments are schema not data ... except blob comments are data */
if (strncmp(target, "LARGE OBJECT ", 13) != 0)
{
if (dataOnly)
return;
}
else
{
if (schemaOnly)
return;
}
/* Search for comments associated with catalogId, using table */
ncomments = findComments(fout, catalogId.tableoid, catalogId.oid,
@ -6530,7 +6493,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
case DO_DEFAULT_ACL:
dumpDefaultACL(fout, (DefaultACLInfo *) dobj);
break;
case DO_BLOBS:
case DO_BLOB:
dumpBlob(fout, (BlobInfo *) dobj);
break;
case DO_BLOB_DATA:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
dobj->name, NULL, NULL, "",
false, "BLOBS", SECTION_DATA,
@ -6538,14 +6504,6 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
dobj->dependencies, dobj->nDeps,
dumpBlobs, NULL);
break;
case DO_BLOB_COMMENTS:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
dobj->name, NULL, NULL, "",
false, "BLOB COMMENTS", SECTION_DATA,
"", "", NULL,
dobj->dependencies, dobj->nDeps,
dumpBlobComments, NULL);
break;
}
}
@ -10382,14 +10340,16 @@ dumpDefaultACL(Archive *fout, DefaultACLInfo *daclinfo)
*
* 'objCatId' is the catalog ID of the underlying object.
* 'objDumpId' is the dump ID of the underlying object.
* 'type' must be TABLE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE.
* 'type' must be one of
* TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
* FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT.
* 'name' is the formatted name of the object. Must be quoted etc. already.
* 'subname' is the formatted name of the sub-object, if any. Must be quoted.
* 'tag' is the tag for the archive entry (typ. unquoted name of object).
* 'nspname' is the namespace the object is in (NULL if none).
* 'owner' is the owner, NULL if there is no owner (for languages).
* 'acls' is the string read out of the fooacl system catalog field;
* it will be parsed here.
* it will be parsed here.
*----------
*/
static void
@ -10401,7 +10361,11 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId,
PQExpBuffer sql;
/* Do nothing if ACL dump is not enabled */
if (dataOnly || aclsSkip)
if (aclsSkip)
return;
/* --data-only skips ACLs *except* BLOB ACLs */
if (dataOnly && strcmp(type, "LARGE OBJECT") != 0)
return;
sql = createPQExpBuffer();

View File

@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.162 2010/01/28 23:21:12 petere Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.163 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -115,8 +115,8 @@ typedef enum
DO_FDW,
DO_FOREIGN_SERVER,
DO_DEFAULT_ACL,
DO_BLOBS,
DO_BLOB_COMMENTS
DO_BLOB,
DO_BLOB_DATA
} DumpableObjectType;
typedef struct _dumpableObject
@ -443,6 +443,13 @@ typedef struct _defaultACLInfo
char *defaclacl;
} DefaultACLInfo;
typedef struct _blobInfo
{
DumpableObject dobj;
char *rolname;
char *blobacl;
} BlobInfo;
/* global decls */
extern bool force_quotes; /* double-quotes for identifiers flag */
extern bool g_verbose; /* verbose flag */

View File

@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.28 2010/02/15 19:59:47 petere Exp $
* $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.29 2010/02/18 01:29:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -46,7 +46,7 @@ static const int oldObjectTypePriority[] =
16, /* DO_FK_CONSTRAINT */
2, /* DO_PROCLANG */
2, /* DO_CAST */
9, /* DO_TABLE_DATA */
10, /* DO_TABLE_DATA */
7, /* DO_DUMMY_TYPE */
3, /* DO_TSPARSER */
4, /* DO_TSDICT */
@ -55,8 +55,8 @@ static const int oldObjectTypePriority[] =
3, /* DO_FDW */
4, /* DO_FOREIGN_SERVER */
17, /* DO_DEFAULT_ACL */
10, /* DO_BLOBS */
11 /* DO_BLOB_COMMENTS */
9, /* DO_BLOB */
11 /* DO_BLOB_DATA */
};
/*
@ -83,7 +83,7 @@ static const int newObjectTypePriority[] =
26, /* DO_FK_CONSTRAINT */
2, /* DO_PROCLANG */
8, /* DO_CAST */
19, /* DO_TABLE_DATA */
20, /* DO_TABLE_DATA */
17, /* DO_DUMMY_TYPE */
10, /* DO_TSPARSER */
12, /* DO_TSDICT */
@ -92,8 +92,8 @@ static const int newObjectTypePriority[] =
14, /* DO_FDW */
15, /* DO_FOREIGN_SERVER */
27, /* DO_DEFAULT_ACL */
20, /* DO_BLOBS */
21 /* DO_BLOB_COMMENTS */
19, /* DO_BLOB */
21 /* DO_BLOB_DATA */
};
@ -1157,14 +1157,14 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
"DEFAULT ACL %s (ID %d OID %u)",
obj->name, obj->dumpId, obj->catId.oid);
return;
case DO_BLOBS:
case DO_BLOB:
snprintf(buf, bufsize,
"BLOBS (ID %d)",
obj->dumpId);
"BLOB (ID %d OID %u)",
obj->dumpId, obj->catId.oid);
return;
case DO_BLOB_COMMENTS:
case DO_BLOB_DATA:
snprintf(buf, bufsize,
"BLOB COMMENTS (ID %d)",
"BLOB DATA (ID %d)",
obj->dumpId);
return;
}