diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 558e94b845..2ab3f1efaa 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10448,7 +10448,130 @@ SELECT * FROM batch_table ORDER BY x; 50 | test50 | test50 (50 rows) +-- Clean up +DROP TABLE batch_table; +DROP TABLE batch_table_p0; +DROP TABLE batch_table_p1; ALTER SERVER loopback OPTIONS (DROP batch_size); +-- Test that pending inserts are handled properly when needed +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable (a text, b int) + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable (a text, b int); +CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS +$$ +begin + raise notice '%: there are % rows in ftable', + TG_NAME, (SELECT count(*) FROM ftable); + if TG_OP = 'DELETE' then + return OLD; + else + return NEW; + end if; +end; +$$; +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT OR UPDATE OR DELETE ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); +WITH t AS ( + INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING * +) +INSERT INTO ftable SELECT * FROM t; +NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +SELECT * FROM ltable; + a | b +-----+---- + AAA | 42 + BBB | 42 +(2 rows) + +SELECT * FROM ftable; + a | b +-----+---- + AAA | 42 + BBB | 42 +(2 rows) + +DELETE FROM ftable; +WITH t AS ( + UPDATE ltable SET b = b + 100 RETURNING * +) +INSERT INTO ftable SELECT * FROM t; +NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +SELECT * FROM ltable; + a | b +-----+----- + AAA | 142 + BBB | 142 +(2 rows) + +SELECT * FROM ftable; + a | b +-----+----- + AAA | 142 + BBB | 142 +(2 rows) + +DELETE FROM ftable; +WITH t AS ( + DELETE FROM ltable RETURNING * +) +INSERT INTO ftable SELECT * FROM t; +NOTICE: ftable_rowcount_trigger: there are 0 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +SELECT * FROM ltable; + a | b +---+--- +(0 rows) + +SELECT * FROM ftable; + a | b +-----+----- + AAA | 142 + BBB | 142 +(2 rows) + +DELETE FROM ftable; +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; +CREATE TABLE parent (a text, b int) PARTITION BY LIST (a); +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable + PARTITION OF parent + FOR VALUES IN ('AAA') + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable + PARTITION OF parent + FOR VALUES IN ('BBB'); +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); +INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42); +NOTICE: ftable_rowcount_trigger: there are 1 rows in ftable +NOTICE: ftable_rowcount_trigger: there are 2 rows in ftable +SELECT tableoid::regclass, * FROM parent; + tableoid | a | b +----------+-----+---- + ftable | AAA | 42 + ftable | AAA | 42 + ltable | BBB | 42 + ltable | BBB | 42 +(4 rows) + +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; +DROP TABLE parent; +DROP FUNCTION ftable_rowcount_trigf; -- =================================================================== -- test asynchronous execution -- =================================================================== diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index b0dbb41fb5..51560429e0 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3372,8 +3372,94 @@ INSERT INTO batch_table SELECT i, 'test'||i, 'test'|| i FROM generate_series(1, SELECT COUNT(*) FROM batch_table; SELECT * FROM batch_table ORDER BY x; +-- Clean up +DROP TABLE batch_table; +DROP TABLE batch_table_p0; +DROP TABLE batch_table_p1; + ALTER SERVER loopback OPTIONS (DROP batch_size); +-- Test that pending inserts are handled properly when needed +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable (a text, b int) + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable (a text, b int); +CREATE FUNCTION ftable_rowcount_trigf() RETURNS trigger LANGUAGE plpgsql AS +$$ +begin + raise notice '%: there are % rows in ftable', + TG_NAME, (SELECT count(*) FROM ftable); + if TG_OP = 'DELETE' then + return OLD; + else + return NEW; + end if; +end; +$$; +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT OR UPDATE OR DELETE ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); + +WITH t AS ( + INSERT INTO ltable VALUES ('AAA', 42), ('BBB', 42) RETURNING * +) +INSERT INTO ftable SELECT * FROM t; + +SELECT * FROM ltable; +SELECT * FROM ftable; +DELETE FROM ftable; + +WITH t AS ( + UPDATE ltable SET b = b + 100 RETURNING * +) +INSERT INTO ftable SELECT * FROM t; + +SELECT * FROM ltable; +SELECT * FROM ftable; +DELETE FROM ftable; + +WITH t AS ( + DELETE FROM ltable RETURNING * +) +INSERT INTO ftable SELECT * FROM t; + +SELECT * FROM ltable; +SELECT * FROM ftable; +DELETE FROM ftable; + +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; + +CREATE TABLE parent (a text, b int) PARTITION BY LIST (a); +CREATE TABLE batch_table (a text, b int); +CREATE FOREIGN TABLE ftable + PARTITION OF parent + FOR VALUES IN ('AAA') + SERVER loopback + OPTIONS (table_name 'batch_table', batch_size '2'); +CREATE TABLE ltable + PARTITION OF parent + FOR VALUES IN ('BBB'); +CREATE TRIGGER ftable_rowcount_trigger +BEFORE INSERT ON ltable +FOR EACH ROW EXECUTE PROCEDURE ftable_rowcount_trigf(); + +INSERT INTO parent VALUES ('AAA', 42), ('BBB', 42), ('AAA', 42), ('BBB', 42); + +SELECT tableoid::regclass, * FROM parent; + +-- Clean up +DROP FOREIGN TABLE ftable; +DROP TABLE batch_table; +DROP TRIGGER ftable_rowcount_trigger ON ltable; +DROP TABLE ltable; +DROP TABLE parent; +DROP FUNCTION ftable_rowcount_trigf; + -- =================================================================== -- test asynchronous execution -- =================================================================== diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index d78862e660..ef828e0496 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1261,6 +1261,7 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_ChildToRootMap = NULL; resultRelInfo->ri_ChildToRootMapValid = false; resultRelInfo->ri_CopyMultiInsertBuffer = NULL; + resultRelInfo->ri_ModifyTableState = NULL; } /* diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 40e3c07693..262cabd940 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -1034,6 +1034,13 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, Assert(partRelInfo->ri_BatchSize >= 1); + /* + * If doing batch insert, setup back-link so we can easily find the + * mtstate again. + */ + if (partRelInfo->ri_BatchSize > 1) + partRelInfo->ri_ModifyTableState = mtstate; + partRelInfo->ri_CopyMultiInsertBuffer = NULL; /* diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 9df1f81ea8..0e595ffa6e 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -127,6 +127,7 @@ CreateExecutorState(void) estate->es_result_relations = NULL; estate->es_opened_result_relations = NIL; estate->es_tuple_routing_result_relations = NIL; + estate->es_insert_pending_result_relations = NIL; estate->es_trig_target_relations = NIL; estate->es_param_list_info = NULL; diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index b7ea953b55..271ff2be8e 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -142,6 +142,7 @@ static void ExecBatchInsert(ModifyTableState *mtstate, int numSlots, EState *estate, bool canSetTag); +static void ExecPendingInserts(EState *estate); static void ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context, ResultRelInfo *sourcePartInfo, ResultRelInfo *destPartInfo, @@ -761,6 +762,10 @@ ExecInsert(ModifyTableContext *context, if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_insert_before_row) { + /* Flush any pending inserts, so rows are visible to the triggers */ + if (estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(estate); + if (!ExecBRInsertTriggers(estate, resultRelInfo, slot)) return NULL; /* "do nothing" */ } @@ -794,6 +799,8 @@ ExecInsert(ModifyTableContext *context, */ if (resultRelInfo->ri_BatchSize > 1) { + bool flushed = false; + /* * When we've reached the desired batch size, perform the * insertion. @@ -806,6 +813,7 @@ ExecInsert(ModifyTableContext *context, resultRelInfo->ri_NumSlots, estate, canSetTag); resultRelInfo->ri_NumSlots = 0; + flushed = true; } oldContext = MemoryContextSwitchTo(estate->es_query_cxt); @@ -848,6 +856,24 @@ ExecInsert(ModifyTableContext *context, ExecCopySlot(resultRelInfo->ri_PlanSlots[resultRelInfo->ri_NumSlots], planSlot); + /* + * If these are the first tuples stored in the buffers, add the + * target rel to the es_insert_pending_result_relations list, + * except in the case where flushing was done above, in which case + * the target rel would already have been added to the list, so no + * need to do this. + */ + if (resultRelInfo->ri_NumSlots == 0 && !flushed) + { + Assert(!list_member_ptr(estate->es_insert_pending_result_relations, + resultRelInfo)); + estate->es_insert_pending_result_relations = + lappend(estate->es_insert_pending_result_relations, + resultRelInfo); + } + Assert(list_member_ptr(estate->es_insert_pending_result_relations, + resultRelInfo)); + resultRelInfo->ri_NumSlots++; MemoryContextSwitchTo(oldContext); @@ -1166,9 +1192,8 @@ ExecBatchInsert(ModifyTableState *mtstate, slot = rslots[i]; /* - * AFTER ROW Triggers or RETURNING expressions might reference the - * tableoid column, so (re-)initialize tts_tableOid before evaluating - * them. + * AFTER ROW Triggers might reference the tableoid column, so + * (re-)initialize tts_tableOid before evaluating them. */ slot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); @@ -1188,6 +1213,32 @@ ExecBatchInsert(ModifyTableState *mtstate, estate->es_processed += numInserted; } +/* + * ExecPendingInserts -- flushes all pending inserts to the foreign tables + */ +static void +ExecPendingInserts(EState *estate) +{ + ListCell *lc; + + foreach(lc, estate->es_insert_pending_result_relations) + { + ResultRelInfo *resultRelInfo = (ResultRelInfo *) lfirst(lc); + ModifyTableState *mtstate = resultRelInfo->ri_ModifyTableState; + + Assert(mtstate); + ExecBatchInsert(mtstate, resultRelInfo, + resultRelInfo->ri_Slots, + resultRelInfo->ri_PlanSlots, + resultRelInfo->ri_NumSlots, + estate, mtstate->canSetTag); + resultRelInfo->ri_NumSlots = 0; + } + + list_free(estate->es_insert_pending_result_relations); + estate->es_insert_pending_result_relations = NIL; +} + /* * ExecDeletePrologue -- subroutine for ExecDelete * @@ -1203,9 +1254,15 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, /* BEFORE ROW DELETE triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_delete_before_row) + { + /* Flush any pending inserts, so rows are visible to the triggers */ + if (context->estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(context->estate); + return ExecBRDeleteTriggers(context->estate, context->epqstate, resultRelInfo, tupleid, oldtuple, epqreturnslot); + } return true; } @@ -1780,9 +1837,15 @@ ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, /* BEFORE ROW UPDATE triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) + { + /* Flush any pending inserts, so rows are visible to the triggers */ + if (context->estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(context->estate); + return ExecBRUpdateTriggers(context->estate, context->epqstate, resultRelInfo, tupleid, oldtuple, slot, &context->tmfd); + } return true; } @@ -3452,9 +3515,6 @@ ExecModifyTable(PlanState *pstate) HeapTupleData oldtupdata; HeapTuple oldtuple; ItemPointer tupleid; - PartitionTupleRouting *proute = node->mt_partition_tuple_routing; - List *relinfos = NIL; - ListCell *lc; CHECK_FOR_INTERRUPTS(); @@ -3756,21 +3816,8 @@ ExecModifyTable(PlanState *pstate) /* * Insert remaining tuples for batch insert. */ - if (proute) - relinfos = estate->es_tuple_routing_result_relations; - else - relinfos = estate->es_opened_result_relations; - - foreach(lc, relinfos) - { - resultRelInfo = lfirst(lc); - if (resultRelInfo->ri_NumSlots > 0) - ExecBatchInsert(node, resultRelInfo, - resultRelInfo->ri_Slots, - resultRelInfo->ri_PlanSlots, - resultRelInfo->ri_NumSlots, - estate, node->canSetTag); - } + if (estate->es_insert_pending_result_relations != NIL) + ExecPendingInserts(estate); /* * We're done, but fire AFTER STATEMENT triggers before exiting. @@ -4295,6 +4342,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } else resultRelInfo->ri_BatchSize = 1; + + /* + * If doing batch insert, setup back-link so we can easily find the + * mtstate again. + */ + if (resultRelInfo->ri_BatchSize > 1) + resultRelInfo->ri_ModifyTableState = mtstate; } /* diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 0f10d4432b..18e572f171 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -571,6 +571,9 @@ typedef struct ResultRelInfo * one of its ancestors; see ExecCrossPartitionUpdateForeignKey(). */ List *ri_ancestorResultRels; + + /* for use by nodeModifyTable.c when performing batch-inserts */ + struct ModifyTableState *ri_ModifyTableState; } ResultRelInfo; /* ---------------- @@ -692,6 +695,12 @@ typedef struct EState int es_jit_flags; struct JitContext *es_jit; struct JitInstrumentation *es_jit_worker_instr; + + /* + * The following list contains ResultRelInfos for foreign tables on which + * batch-inserts are to be executed. + */ + List *es_insert_pending_result_relations; } EState;