Fix handling of pending inserts in nodeModifyTable.c.

Commit b663a4136, which allowed FDWs to INSERT rows in bulk, added to
nodeModifyTable.c code to flush pending inserts to the foreign-table
result relation(s) before completing processing of the ModifyTable node,
but the code failed to take into account the case where the INSERT query
has modifying CTEs, leading to incorrect results.

Also, that commit failed to flush pending inserts before firing BEFORE
ROW triggers so that rows are visible to such triggers.

In that commit we scanned through EState's
es_tuple_routing_result_relations or es_opened_result_relations list to
find the foreign-table result relations to which pending inserts are
flushed, but that would be inefficient in some cases.  So to fix, 1) add
a List member to EState to record the insert-pending result relations,
and 2) modify nodeModifyTable.c so that it adds the foreign-table result
relation to the list in ExecInsert() if appropriate, and flushes pending
inserts properly using the list where needed.

While here, fix a copy-and-pasteo in a comment in ExecBatchInsert(),
which was added by that commit.

Back-patch to v14 where that commit appeared.

Discussion: https://postgr.es/m/CAPmGK16qutyCmyJJzgQOhfBq%3DNoGDqTB6O0QBZTihrbqre%2BoxA%40mail.gmail.com
This commit is contained in:
Etsuro Fujita 2022-11-25 17:45:00 +09:00
parent 9e492d6b69
commit ffbb7e65a8
7 changed files with 302 additions and 21 deletions

View File

@ -10448,7 +10448,130 @@ SELECT * FROM batch_table ORDER BY x;
50 | test50 | test50
(50 rows)
-- Clean up
DROP TABLE batch_table;
DROP TABLE batch_table_p0;
DROP TABLE batch_table_p1;
ALTER SERVER loopback OPTIONS (DROP batch_size);
-- Test that pending inserts are handled properly when needed
CREATE TABLE batch_table (a text, b int);
CREATE FOREIGN TABLE ftable (a text, b int)
SERVER loopback
OPTIONS (table_name 'batch_table', batch_size '2');
CREATE TABLE ltable (a text, b int);
CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS
$$
begin
raise notice '%: there are % rows in ftable',
TG_NAME, (SELECT count(*) FROM ftable);
if TG_OP = 'DELETE' then
return OLD;
else
return NEW;
end if;
end;
$$;
CREATE TRIGGER ftable_rowcount_trigger
BEFORE INSERT OR UPDATE OR DELETE ON ltable
FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
WITH t AS (
INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING *
)
INSERT INTO ftable SELECT * FROM t;
NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable
NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
SELECT * FROM ltable;
a | b
-----+----
AAA | 42
BBB | 42
(2 rows)
SELECT * FROM ftable;
a | b
-----+----
AAA | 42
BBB | 42
(2 rows)
DELETE FROM ftable;
WITH t AS (
UPDATE ltable SET b = b + 100 RETURNING *
)
INSERT INTO ftable SELECT * FROM t;
NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable
NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
SELECT * FROM ltable;
a | b
-----+-----
AAA | 142
BBB | 142
(2 rows)
SELECT * FROM ftable;
a | b
-----+-----
AAA | 142
BBB | 142
(2 rows)
DELETE FROM ftable;
WITH t AS (
DELETE FROM ltable RETURNING *
)
INSERT INTO ftable SELECT * FROM t;
NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable
NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
SELECT * FROM ltable;
a | b
---+---
(0 rows)
SELECT * FROM ftable;
a | b
-----+-----
AAA | 142
BBB | 142
(2 rows)
DELETE FROM ftable;
-- Clean up
DROP FOREIGN TABLE ftable;
DROP TABLE batch_table;
DROP TRIGGER ftable_rowcount_trigger ON ltable;
DROP TABLE ltable;
CREATE TABLE parent (a text, b int) PARTITION BY LIST (a);
CREATE TABLE batch_table (a text, b int);
CREATE FOREIGN TABLE ftable
PARTITION OF parent
FOR VALUES IN ('AAA')
SERVER loopback
OPTIONS (table_name 'batch_table', batch_size '2');
CREATE TABLE ltable
PARTITION OF parent
FOR VALUES IN ('BBB');
CREATE TRIGGER ftable_rowcount_trigger
BEFORE INSERT ON ltable
FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42);
NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable
NOTICE: ftable_rowcount_trigger: there are 2 rows in ftable
SELECT tableoid::regclass, * FROM parent;
tableoid | a | b
----------+-----+----
ftable | AAA | 42
ftable | AAA | 42
ltable | BBB | 42
ltable | BBB | 42
(4 rows)
-- Clean up
DROP FOREIGN TABLE ftable;
DROP TABLE batch_table;
DROP TRIGGER ftable_rowcount_trigger ON ltable;
DROP TABLE ltable;
DROP TABLE parent;
DROP FUNCTION ftable_rowcount_trigf;
-- ===================================================================
-- test asynchronous execution
-- ===================================================================

View File

@ -3372,8 +3372,94 @@ INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1,
SELECT COUNT(*) FROM batch_table;
SELECT * FROM batch_table ORDER BY x;
-- Clean up
DROP TABLE batch_table;
DROP TABLE batch_table_p0;
DROP TABLE batch_table_p1;
ALTER SERVER loopback OPTIONS (DROP batch_size);
-- Test that pending inserts are handled properly when needed
CREATE TABLE batch_table (a text, b int);
CREATE FOREIGN TABLE ftable (a text, b int)
SERVER loopback
OPTIONS (table_name 'batch_table', batch_size '2');
CREATE TABLE ltable (a text, b int);
CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS
$$
begin
raise notice '%: there are % rows in ftable',
TG_NAME, (SELECT count(*) FROM ftable);
if TG_OP = 'DELETE' then
return OLD;
else
return NEW;
end if;
end;
$$;
CREATE TRIGGER ftable_rowcount_trigger
BEFORE INSERT OR UPDATE OR DELETE ON ltable
FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
WITH t AS (
INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING *
)
INSERT INTO ftable SELECT * FROM t;
SELECT * FROM ltable;
SELECT * FROM ftable;
DELETE FROM ftable;
WITH t AS (
UPDATE ltable SET b = b + 100 RETURNING *
)
INSERT INTO ftable SELECT * FROM t;
SELECT * FROM ltable;
SELECT * FROM ftable;
DELETE FROM ftable;
WITH t AS (
DELETE FROM ltable RETURNING *
)
INSERT INTO ftable SELECT * FROM t;
SELECT * FROM ltable;
SELECT * FROM ftable;
DELETE FROM ftable;
-- Clean up
DROP FOREIGN TABLE ftable;
DROP TABLE batch_table;
DROP TRIGGER ftable_rowcount_trigger ON ltable;
DROP TABLE ltable;
CREATE TABLE parent (a text, b int) PARTITION BY LIST (a);
CREATE TABLE batch_table (a text, b int);
CREATE FOREIGN TABLE ftable
PARTITION OF parent
FOR VALUES IN ('AAA')
SERVER loopback
OPTIONS (table_name 'batch_table', batch_size '2');
CREATE TABLE ltable
PARTITION OF parent
FOR VALUES IN ('BBB');
CREATE TRIGGER ftable_rowcount_trigger
BEFORE INSERT ON ltable
FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf();
INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42);
SELECT tableoid::regclass, * FROM parent;
-- Clean up
DROP FOREIGN TABLE ftable;
DROP TABLE batch_table;
DROP TRIGGER ftable_rowcount_trigger ON ltable;
DROP TABLE ltable;
DROP TABLE parent;
DROP FUNCTION ftable_rowcount_trigf;
-- ===================================================================
-- test asynchronous execution
-- ===================================================================

View File

@ -1261,6 +1261,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_ChildToRootMap = NULL;
resultRelInfo->ri_ChildToRootMapValid = false;
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
resultRelInfo->ri_ModifyTableState = NULL;
}
/*

View File

@ -1034,6 +1034,13 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
Assert(partRelInfo->ri_BatchSize >= 1);
/*
* If doing batch insert, setup back-link so we can easily find the
* mtstate again.
*/
if (partRelInfo->ri_BatchSize > 1)
partRelInfo->ri_ModifyTableState = mtstate;
partRelInfo->ri_CopyMultiInsertBuffer = NULL;
/*

View File

@ -127,6 +127,7 @@ CreateExecutorState(void)
estate->es_result_relations = NULL;
estate->es_opened_result_relations = NIL;
estate->es_tuple_routing_result_relations = NIL;
estate->es_insert_pending_result_relations = NIL;
estate->es_trig_target_relations = NIL;
estate->es_param_list_info = NULL;

View File

@ -142,6 +142,7 @@ static void ExecBatchInsert(ModifyTableState *mtstate,
int numSlots,
EState *estate,
bool canSetTag);
static void ExecPendingInserts(EState *estate);
static void ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context,
ResultRelInfo *sourcePartInfo,
ResultRelInfo *destPartInfo,
@ -761,6 +762,10 @@ ExecInsert(ModifyTableContext *context,
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row)
{
/* Flush any pending inserts, so rows are visible to the triggers */
if (estate->es_insert_pending_result_relations != NIL)
ExecPendingInserts(estate);
if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
return NULL; /* "do nothing" */
}
@ -794,6 +799,8 @@ ExecInsert(ModifyTableContext *context,
*/
if (resultRelInfo->ri_BatchSize > 1)
{
bool flushed = false;
/*
* When we've reached the desired batch size, perform the
* insertion.
@ -806,6 +813,7 @@ ExecInsert(ModifyTableContext *context,
resultRelInfo->ri_NumSlots,
estate, canSetTag);
resultRelInfo->ri_NumSlots = 0;
flushed = true;
}
oldContext = MemoryContextSwitchTo(estate->es_query_cxt);
@ -848,6 +856,24 @@ ExecInsert(ModifyTableContext *context,
ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots],
planSlot);
/*
* If these are the first tuples stored in the buffers, add the
* target rel to the es_insert_pending_result_relations list,
* except in the case where flushing was done above, in which case
* the target rel would already have been added to the list, so no
* need to do this.
*/
if (resultRelInfo->ri_NumSlots == 0 && !flushed)
{
Assert(!list_member_ptr(estate->es_insert_pending_result_relations,
resultRelInfo));
estate->es_insert_pending_result_relations =
lappend(estate->es_insert_pending_result_relations,
resultRelInfo);
}
Assert(list_member_ptr(estate->es_insert_pending_result_relations,
resultRelInfo));
resultRelInfo->ri_NumSlots++;
MemoryContextSwitchTo(oldContext);
@ -1166,9 +1192,8 @@ ExecBatchInsert(ModifyTableState *mtstate,
slot = rslots[i];
/*
* AFTER ROW Triggers or RETURNING expressions might reference the
* tableoid column, so (re-)initialize tts_tableOid before evaluating
* them.
* AFTER ROW Triggers might reference the tableoid column, so
* (re-)initialize tts_tableOid before evaluating them.
*/
slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
@ -1188,6 +1213,32 @@ ExecBatchInsert(ModifyTableState *mtstate,
estate->es_processed += numInserted;
}
/*
* ExecPendingInserts -- flushes all pending inserts to the foreign tables
*/
static void
ExecPendingInserts(EState *estate)
{
ListCell *lc;
foreach(lc, estate->es_insert_pending_result_relations)
{
ResultRelInfo *resultRelInfo = (ResultRelInfo *) lfirst(lc);
ModifyTableState *mtstate = resultRelInfo->ri_ModifyTableState;
Assert(mtstate);
ExecBatchInsert(mtstate, resultRelInfo,
resultRelInfo->ri_Slots,
resultRelInfo->ri_PlanSlots,
resultRelInfo->ri_NumSlots,
estate, mtstate->canSetTag);
resultRelInfo->ri_NumSlots = 0;
}
list_free(estate->es_insert_pending_result_relations);
estate->es_insert_pending_result_relations = NIL;
}
/*
* ExecDeletePrologue -- subroutine for ExecDelete
*
@ -1203,9 +1254,15 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
/* BEFORE ROW DELETE triggers */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_delete_before_row)
{
/* Flush any pending inserts, so rows are visible to the triggers */
if (context->estate->es_insert_pending_result_relations != NIL)
ExecPendingInserts(context->estate);
return ExecBRDeleteTriggers(context->estate, context->epqstate,
resultRelInfo, tupleid, oldtuple,
epqreturnslot);
}
return true;
}
@ -1780,9 +1837,15 @@ ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
/* BEFORE ROW UPDATE triggers */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
/* Flush any pending inserts, so rows are visible to the triggers */
if (context->estate->es_insert_pending_result_relations != NIL)
ExecPendingInserts(context->estate);
return ExecBRUpdateTriggers(context->estate, context->epqstate,
resultRelInfo, tupleid, oldtuple, slot,
&context->tmfd);
}
return true;
}
@ -3452,9 +3515,6 @@ ExecModifyTable(PlanState *pstate)
HeapTupleData oldtupdata;
HeapTuple oldtuple;
ItemPointer tupleid;
PartitionTupleRouting *proute = node->mt_partition_tuple_routing;
List *relinfos = NIL;
ListCell *lc;
CHECK_FOR_INTERRUPTS();
@ -3756,21 +3816,8 @@ ExecModifyTable(PlanState *pstate)
/*
* Insert remaining tuples for batch insert.
*/
if (proute)
relinfos = estate->es_tuple_routing_result_relations;
else
relinfos = estate->es_opened_result_relations;
foreach(lc, relinfos)
{
resultRelInfo = lfirst(lc);
if (resultRelInfo->ri_NumSlots > 0)
ExecBatchInsert(node, resultRelInfo,
resultRelInfo->ri_Slots,
resultRelInfo->ri_PlanSlots,
resultRelInfo->ri_NumSlots,
estate, node->canSetTag);
}
if (estate->es_insert_pending_result_relations != NIL)
ExecPendingInserts(estate);
/*
* We're done, but fire AFTER STATEMENT triggers before exiting.
@ -4295,6 +4342,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
}
else
resultRelInfo->ri_BatchSize = 1;
/*
* If doing batch insert, setup back-link so we can easily find the
* mtstate again.
*/
if (resultRelInfo->ri_BatchSize > 1)
resultRelInfo->ri_ModifyTableState = mtstate;
}
/*

View File

@ -571,6 +571,9 @@ typedef struct ResultRelInfo
* one of its ancestors; see ExecCrossPartitionUpdateForeignKey().
*/
List *ri_ancestorResultRels;
/* for use by nodeModifyTable.c when performing batch-inserts */
struct ModifyTableState *ri_ModifyTableState;
} ResultRelInfo;
/* ----------------
@ -692,6 +695,12 @@ typedef struct EState
int es_jit_flags;
struct JitContext *es_jit;
struct JitInstrumentation *es_jit_worker_instr;
/*
* The following list contains ResultRelInfos for foreign tables on which
* batch-inserts are to be executed.
*/
List *es_insert_pending_result_relations;
} EState;