SELECT FOR UPDATE is implemented...

This commit is contained in:
Vadim B. Mikheev 1999-01-25 12:01:19 +00:00
parent 443e24beb7
commit 247b3f9054
7 changed files with 715 additions and 581 deletions

View File

@ -26,7 +26,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.62 1998/12/18 09:10:21 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.63 1999/01/25 12:01:03 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -363,6 +363,32 @@ ExecCheckPerms(CmdType operation,
}
if (!ok)
elog(ERROR, "%s: %s", rname.data, aclcheck_error_strings[aclcheck_result]);
if (parseTree->rowMark != NULL)
{
foreach(lp, parseTree->rowMark)
{
RowMark *rm = lfirst(lp);
if (!(rm->info & ROW_ACL_FOR_UPDATE))
continue;
relid = ((RangeTblEntry *)nth(rm->rti - 1, rangeTable))->relid;
htup = SearchSysCacheTuple(RELOID,
ObjectIdGetDatum(relid),
0, 0, 0);
if (!HeapTupleIsValid(htup))
elog(ERROR, "ExecCheckPerms: bogus RT relid: %d",
relid);
StrNCpy(rname.data,
((Form_pg_class) GETSTRUCT(htup))->relname.data,
NAMEDATALEN);
ok = ((aclcheck_result = CHECK(ACL_WR)) == ACLCHECK_OK);
opstr = "write";
if (!ok)
elog(ERROR, "%s: %s", rname.data, aclcheck_error_strings[aclcheck_result]);
}
}
}
/* ===============================================================
@ -372,6 +398,11 @@ ExecCheckPerms(CmdType operation,
* ===============================================================
*/
typedef struct execRowMark
{
Relation relation;
char resname[32];
} execRowMark;
/* ----------------------------------------------------------------
* InitPlan
@ -398,6 +429,10 @@ InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
rangeTable = parseTree->rtable;
resultRelation = parseTree->resultRelation;
#ifndef NO_SECURITY
ExecCheckPerms(operation, resultRelation, rangeTable, parseTree);
#endif
/******************
* initialize the node's execution state
******************
@ -468,9 +503,32 @@ InitPlan(CmdType operation, Query *parseTree, Plan *plan, EState *estate)
estate->es_result_relation_info = NULL;
}
#ifndef NO_SECURITY
ExecCheckPerms(operation, resultRelation, rangeTable, parseTree);
#endif
/*
* Have to lock relations selected for update
*/
estate->es_rowMark = NULL;
if (parseTree->rowMark != NULL)
{
Relation relation;
Oid relid;
RowMark *rm;
List *l;
execRowMark *erm;
foreach(l, parseTree->rowMark)
{
rm = lfirst(l);
relid = ((RangeTblEntry *)nth(rm->rti - 1, rangeTable))->relid;
relation = heap_open(relid);
LockRelation(relation, RowShareLock);
if (!(rm->info & ROW_MARK_FOR_UPDATE))
continue;
erm = (execRowMark*) palloc(sizeof(execRowMark));
erm->relation = relation;
sprintf(erm->resname, "ctid%u", rm->rti);
estate->es_rowMark = lappend(estate->es_rowMark, erm);
}
}
/******************
* initialize the executor "tuple" table.
@ -777,6 +835,49 @@ ExecutePlan(EState *estate,
* ctid!! */
tupleid = &tuple_ctid;
}
else if (estate->es_rowMark != NULL)
{
List *l;
execRowMark *erm;
Buffer buffer;
HeapTupleData tuple;
int test;
foreach (l, estate->es_rowMark)
{
erm = lfirst(l);
if (!ExecGetJunkAttribute(junkfilter,
slot,
erm->resname,
&datum,
&isNull))
elog(ERROR, "ExecutePlan: NO (junk) `%s' was found!", erm->resname);
if (isNull)
elog(ERROR, "ExecutePlan: (junk) `%s' is NULL!", erm->resname);
tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
test = heap_mark4update(erm->relation, &tuple, &buffer);
ReleaseBuffer(buffer);
switch (test)
{
case HeapTupleSelfUpdated:
case HeapTupleMayBeUpdated:
break;
case HeapTupleUpdated:
if (XactIsoLevel == XACT_SERIALIZABLE)
elog(ERROR, "Can't serialize access due to concurrent update");
else
elog(ERROR, "Isolation level %u is not supported", XactIsoLevel);
return(NULL);
default:
elog(ERROR, "Unknown status %u from heap_mark4update", test);
return(NULL);
}
}
}
/******************
* Finally create a new "clean" tuple with all junk attributes

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/optimizer/plan/planner.c,v 1.36 1999/01/18 00:09:47 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/optimizer/plan/planner.c,v 1.37 1999/01/25 12:01:04 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -20,6 +20,8 @@
#include "nodes/plannodes.h"
#include "nodes/parsenodes.h"
#include "nodes/relation.h"
#include "nodes/makefuncs.h"
#include "catalog/pg_type.h"
#include "parser/parse_expr.h"
#include "utils/elog.h"
@ -119,6 +121,8 @@ union_planner(Query *parse)
else if ((rt_index =
first_inherit_rt_entry(rangetable)) != -1)
{
if (parse->rowMark != NULL)
elog(ERROR, "SELECT FOR UPDATE is not supported for inherit queries");
result_plan = (Plan *) plan_inherit_queries(parse, rt_index);
/* XXX do we need to do this? bjm 12/19/97 */
tlist = preprocess_targetlist(tlist,
@ -148,17 +152,49 @@ union_planner(Query *parse)
* a new entry and attaches it to the list 'new_tlist' (consisting of the
* VAR node and the RESDOM node as usual with tlists :-) ) */
if (parse->hasAggs)
{
{
if (parse->havingQual != NULL)
{
new_tlist = check_having_qual_for_vars(parse->havingQual,new_tlist);
}
}
{
new_tlist = check_having_qual_for_vars(parse->havingQual,new_tlist);
}
}
new_tlist = preprocess_targetlist(new_tlist,
parse->commandType,
parse->resultRelation,
parse->rtable);
/* FOR UPDATE ... */
if (parse->rowMark != NULL)
{
List *l;
TargetEntry *ctid;
Resdom *resdom;
Var *var;
char *resname;
foreach (l, parse->rowMark)
{
if (!(((RowMark*)lfirst(l))->info & ROW_MARK_FOR_UPDATE))
continue;
resname = (char*) palloc(32);
sprintf(resname, "ctid%u", ((RowMark*)lfirst(l))->rti);
resdom = makeResdom(length(new_tlist) + 1,
TIDOID,
-1,
resname,
0,
0,
1);
var = makeVar(((RowMark*)lfirst(l))->rti, -1, TIDOID,
-1, 0, ((RowMark*)lfirst(l))->rti, -1);
ctid = makeTargetEntry(resdom, (Node *) var);
new_tlist = lappend(new_tlist, ctid);
}
}
/* Here starts the original (pre having) code */
tlist = preprocess_targetlist(tlist,
@ -290,7 +326,7 @@ union_planner(Query *parse)
pfree(vpm);
}
}
/*
* For now, before we hand back the plan, check to see if there is a
* user-specified sort that needs to be done. Eventually, this will

View File

@ -5,7 +5,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: analyze.c,v 1.94 1999/01/21 22:48:07 momjian Exp $
* $Id: analyze.c,v 1.95 1999/01/25 12:01:05 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -45,7 +45,8 @@ static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt);
static Query *transformCursorStmt(ParseState *pstate, SelectStmt *stmt);
static Query *transformCreateStmt(ParseState *pstate, CreateStmt *stmt);
static void transformForUpdate(Query *qry, List *forUpdate);
static void transformForUpdate(Query *qry, List *forUpdate);
void CheckSelectForUpdate(Query *qry);
List *extras_before = NIL;
List *extras_after = NIL;
@ -1134,6 +1135,19 @@ Node *A_Expr_to_Expr(Node *ptr, bool *intersect_present)
return result;
}
void
CheckSelectForUpdate(Query *qry)
{
if (qry->unionClause != NULL)
elog(ERROR, "SELECT FOR UPDATE is not allowed with UNION/INTERSECT/EXCEPT clause");
if (qry->uniqueFlag != NULL)
elog(ERROR, "SELECT FOR UPDATE is not allowed with DISTINCT clause");
if (qry->groupClause != NULL)
elog(ERROR, "SELECT FOR UPDATE is not allowed with GROUP BY clause");
if (qry->hasAggs)
elog(ERROR, "SELECT FOR UPDATE is not allowed with AGGREGATE");
}
static void
transformForUpdate(Query *qry, List *forUpdate)
{
@ -1142,6 +1156,8 @@ transformForUpdate(Query *qry, List *forUpdate)
List *l;
Index i;
CheckSelectForUpdate(qry);
if (lfirst(forUpdate) == NULL) /* all tables */
{
i = 1;

File diff suppressed because it is too large Load Diff

View File

@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/parser/gram.y,v 2.48 1999/01/22 19:35:54 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/parser/gram.y,v 2.49 1999/01/25 12:01:13 vadim Exp $
*
* HISTORY
* AUTHOR DATE MAJOR EVENT
@ -2798,19 +2798,8 @@ SelectStmt: select_w_o_sort sort_clause for_update_clause
first_select->forUpdate = $3;
$$ = (Node *)first_select;
}
if (((SelectStmt *)$$)->forUpdate != NULL)
{
SelectStmt *n = (SelectStmt *)$$;
if (n->unionClause != NULL)
elog(ERROR, "SELECT FOR UPDATE is not allowed with UNION/INTERSECT/EXCEPT clause");
if (n->unique != NULL)
elog(ERROR, "SELECT FOR UPDATE is not allowed with DISTINCT clause");
if (n->groupClause != NULL)
elog(ERROR, "SELECT FOR UPDATE is not allowed with GROUP BY clause");
if (n->havingClause != NULL)
elog(ERROR, "SELECT FOR UPDATE is not allowed with HAVING clause");
}
if (((SelectStmt *)$$)->forUpdate != NULL && QueryIsRule)
elog(ERROR, "SELECT FOR UPDATE is not allowed in RULES");
}
;

View File

@ -6,7 +6,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/rewrite/rewriteHandler.c,v 1.30 1999/01/24 00:28:30 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/rewrite/rewriteHandler.c,v 1.31 1999/01/25 12:01:14 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -1787,6 +1787,7 @@ apply_RIR_view(Node **nodePtr, int rt_index, RangeTblEntry *rte, List *tlist, in
}
}
extern void CheckSelectForUpdate(Query *rule_action); /* in analyze.c */
static void
ApplyRetrieveRule(Query *parsetree,
@ -1847,6 +1848,7 @@ ApplyRetrieveRule(Query *parsetree,
Index rti = 1;
List *l2;
CheckSelectForUpdate(rule_action);
/*
* We believe that rt_index is VIEW - nothing should be
* marked for VIEW, but ACL check must be done.

View File

@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: execnodes.h,v 1.20 1998/12/14 05:19:16 scrappy Exp $
* $Id: execnodes.h,v 1.21 1999/01/25 12:01:19 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -208,6 +208,7 @@ typedef struct EState
int *es_refcount;
uint32 es_processed; /* # of tuples processed */
Oid es_lastoid; /* last oid processed (by INSERT) */
List *es_rowMark; /* not good place, but there is no other */
} EState;
/* ----------------