From 19890a064ebf53dedcefed0d8339ed3d449b06e6 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 3 Mar 2021 07:28:43 +0530 Subject: [PATCH] Add option to enable two_phase commits via pg_create_logical_replication_slot. Commit 0aa8a01d04 extends the output plugin API to allow decoding of prepared xacts and allowed the user to enable/disable the two-phase option via pg_logical_slot_get_changes(). This can lead to a problem such that the first time when it gets changes via pg_logical_slot_get_changes() without two_phase option enabled it will not get the prepared even though prepare is after consistent snapshot. Now next time during getting changes, if the two_phase option is enabled it can skip prepare because by that time start decoding point has been moved. So the user will only get commit prepared. Allow to enable/disable this option at the create slot time and default will be false. It will break the existing slots which is fine in a major release. Author: Ajin Cherian Reviewed-by: Amit Kapila and Vignesh C Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com --- contrib/test_decoding/expected/twophase.out | 34 +++++++++---------- .../expected/twophase_snapshot.out | 6 ++-- .../expected/twophase_stream.out | 10 +++--- .../specs/twophase_snapshot.spec | 4 +-- contrib/test_decoding/sql/twophase.sql | 34 +++++++++---------- contrib/test_decoding/sql/twophase_stream.sql | 10 +++--- contrib/test_decoding/test_decoding.c | 12 ------- doc/src/sgml/catalogs.sgml | 10 ++++++ doc/src/sgml/func.sgml | 10 +++--- doc/src/sgml/logicaldecoding.sgml | 19 ++++++----- src/backend/catalog/system_views.sql | 4 ++- src/backend/replication/logical/logical.c | 12 +++++++ src/backend/replication/slot.c | 10 +++++- src/backend/replication/slotfuncs.c | 14 +++++--- src/backend/replication/walsender.c | 6 ++-- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_proc.dat | 14 ++++---- src/include/nodes/replnodes.h | 1 + src/include/replication/slot.h | 7 +++- src/test/regress/expected/rules.out | 5 +-- 20 files changed, 131 insertions(+), 93 deletions(-) diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out index 8a1d06d706..e5e0f96896 100644 --- a/contrib/test_decoding/expected/twophase.out +++ b/contrib/test_decoding/expected/twophase.out @@ -1,7 +1,7 @@ -- Test prepared transactions. When two-phase-commit is enabled, transactions are -- decoded at PREPARE time rather than at COMMIT PREPARED time. SET synchronous_commit = on; -SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); ?column? ---------- init @@ -15,14 +15,14 @@ BEGIN; INSERT INTO test_prepared1 VALUES (1); INSERT INTO test_prepared1 VALUES (2); -- should show nothing because the xact has not been prepared yet. -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------ (0 rows) PREPARE TRANSACTION 'test_prepared#1'; -- should show both the above inserts and the PREPARE TRANSACTION. -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ---------------------------------------------------- BEGIN @@ -32,7 +32,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two (4 rows) COMMIT PREPARED 'test_prepared#1'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ----------------------------------- COMMIT PREPARED 'test_prepared#1' @@ -42,7 +42,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ---------------------------------------------------- BEGIN @@ -51,7 +51,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two (3 rows) ROLLBACK PREPARED 'test_prepared#2'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------------------------------------- ROLLBACK PREPARED 'test_prepared#2' @@ -74,7 +74,7 @@ WHERE locktype = 'relation' (2 rows) -- The insert should show the newly altered column but not the DDL. -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------------------------------------------------------------------------- BEGIN @@ -89,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two -- the ALTER will stop us inserting into the other one. -- INSERT INTO test_prepared2 VALUES (5); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ---------------------------------------------------- BEGIN @@ -98,7 +98,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two (3 rows) COMMIT PREPARED 'test_prepared#3'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ----------------------------------- COMMIT PREPARED 'test_prepared#3' @@ -107,7 +107,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two -- make sure stuff still works INSERT INTO test_prepared1 VALUES (6); INSERT INTO test_prepared2 VALUES (7); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data -------------------------------------------------------------------- BEGIN @@ -138,7 +138,7 @@ WHERE locktype = 'relation' -- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. SET statement_timeout = '180s'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data --------------------------------------------------------------------------- BEGIN @@ -150,7 +150,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two RESET statement_timeout; COMMIT PREPARED 'test_prepared_lock'; -- consume the commit -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data -------------------------------------- COMMIT PREPARED 'test_prepared_lock' @@ -166,7 +166,7 @@ INSERT INTO test_prepared_savepoint VALUES (2); ROLLBACK TO SAVEPOINT test_savepoint; PREPARE TRANSACTION 'test_prepared_savepoint'; -- should show only 1, not 2 -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------------------------------------------------------------ BEGIN @@ -176,7 +176,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two COMMIT PREPARED 'test_prepared_savepoint'; -- consume the commit -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------------------------------------------- COMMIT PREPARED 'test_prepared_savepoint' @@ -187,14 +187,14 @@ BEGIN; INSERT INTO test_prepared1 VALUES (20); PREPARE TRANSACTION 'test_prepared_nodecode'; -- should show nothing -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------ (0 rows) COMMIT PREPARED 'test_prepared_nodecode'; -- should be decoded now -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data --------------------------------------------------------------------- BEGIN @@ -207,7 +207,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two DROP TABLE test_prepared1; DROP TABLE test_prepared2; -- show results. There should be nothing to show -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); data ------ (0 rows) diff --git a/contrib/test_decoding/expected/twophase_snapshot.out b/contrib/test_decoding/expected/twophase_snapshot.out index 14d9387646..0e8e1f50fe 100644 --- a/contrib/test_decoding/expected/twophase_snapshot.out +++ b/contrib/test_decoding/expected/twophase_snapshot.out @@ -6,7 +6,7 @@ step s2txid: SELECT pg_current_xact_id() IS NULL; ?column? f -step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true); step s3b: BEGIN; step s3txid: SELECT pg_current_xact_id() IS NULL; ?column? @@ -22,14 +22,14 @@ step s1init: <... completed> init step s1insert: INSERT INTO do_write DEFAULT VALUES; -step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1'); +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1'); data BEGIN table public.do_write: INSERT: id[integer]:2 COMMIT step s2cp: COMMIT PREPARED 'test1'; -step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1'); +step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1'); data BEGIN diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out index d54e640b40..b08bb0e573 100644 --- a/contrib/test_decoding/expected/twophase_stream.out +++ b/contrib/test_decoding/expected/twophase_stream.out @@ -1,6 +1,6 @@ -- Test streaming of two-phase commits SET synchronous_commit = on; -SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); ?column? ---------- init @@ -28,7 +28,7 @@ ROLLBACK TO s1; INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); PREPARE TRANSACTION 'test1'; -- should show the inserts after a ROLLBACK -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ---------------------------------------------------------- streaming message: transactional: 1 prefix: test, sz: 50 @@ -59,7 +59,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two- COMMIT PREPARED 'test1'; --should show the COMMIT PREPARED and the other changes in the transaction -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ------------------------- COMMIT PREPARED 'test1' @@ -81,7 +81,7 @@ ROLLBACK to s1; INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); PREPARE TRANSACTION 'test1_nodecode'; -- should NOT show inserts after a ROLLBACK -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ---------------------------------------------------------- streaming message: transactional: 1 prefix: test, sz: 50 @@ -89,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two- COMMIT PREPARED 'test1_nodecode'; -- should show the inserts but not show a COMMIT PREPARED but a COMMIT -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); data ------------------------------------------------------------- BEGIN diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec index 3e700404e0..e8d9567fb9 100644 --- a/contrib/test_decoding/specs/twophase_snapshot.spec +++ b/contrib/test_decoding/specs/twophase_snapshot.spec @@ -15,8 +15,8 @@ teardown session "s1" setup { SET synchronous_commit=on; } -step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');} -step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');} +step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);} +step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');} step "s1insert" { INSERT INTO do_write DEFAULT VALUES; } session "s2" diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql index dacedfe181..05f18e8494 100644 --- a/contrib/test_decoding/sql/twophase.sql +++ b/contrib/test_decoding/sql/twophase.sql @@ -1,7 +1,7 @@ -- Test prepared transactions. When two-phase-commit is enabled, transactions are -- decoded at PREPARE time rather than at COMMIT PREPARED time. SET synchronous_commit = on; -SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); CREATE TABLE test_prepared1(id integer primary key); CREATE TABLE test_prepared2(id integer primary key); @@ -12,20 +12,20 @@ BEGIN; INSERT INTO test_prepared1 VALUES (1); INSERT INTO test_prepared1 VALUES (2); -- should show nothing because the xact has not been prepared yet. -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); PREPARE TRANSACTION 'test_prepared#1'; -- should show both the above inserts and the PREPARE TRANSACTION. -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared#1'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Test that rollback of a prepared xact is decoded. BEGIN; INSERT INTO test_prepared1 VALUES (3); PREPARE TRANSACTION 'test_prepared#2'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); ROLLBACK PREPARED 'test_prepared#2'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test. BEGIN; @@ -38,7 +38,7 @@ FROM pg_locks WHERE locktype = 'relation' AND relation = 'test_prepared1'::regclass; -- The insert should show the newly altered column but not the DDL. -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Test that we decode correctly while an uncommitted prepared xact -- with ddl exists. @@ -47,14 +47,14 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two -- the ALTER will stop us inserting into the other one. -- INSERT INTO test_prepared2 VALUES (5); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared#3'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- make sure stuff still works INSERT INTO test_prepared1 VALUES (6); INSERT INTO test_prepared2 VALUES (7); -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block -- logical decoding. @@ -70,11 +70,11 @@ WHERE locktype = 'relation' AND relation = 'test_prepared1'::regclass; -- The above CLUSTER command shouldn't cause a timeout on 2pc decoding. SET statement_timeout = '180s'; -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); RESET statement_timeout; COMMIT PREPARED 'test_prepared_lock'; -- consume the commit -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Test savepoints and sub-xacts. Creating savepoints will create -- sub-xacts implicitly. @@ -86,26 +86,26 @@ INSERT INTO test_prepared_savepoint VALUES (2); ROLLBACK TO SAVEPOINT test_savepoint; PREPARE TRANSACTION 'test_prepared_savepoint'; -- should show only 1, not 2 -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared_savepoint'; -- consume the commit -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Test that a GID containing "_nodecode" gets decoded at commit prepared time. BEGIN; INSERT INTO test_prepared1 VALUES (20); PREPARE TRANSACTION 'test_prepared_nodecode'; -- should show nothing -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); COMMIT PREPARED 'test_prepared_nodecode'; -- should be decoded now -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); -- Test 8: -- cleanup and make sure results are also empty DROP TABLE test_prepared1; DROP TABLE test_prepared2; -- show results. There should be nothing to show -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql index e9dd44fdb3..646076da20 100644 --- a/contrib/test_decoding/sql/twophase_stream.sql +++ b/contrib/test_decoding/sql/twophase_stream.sql @@ -1,7 +1,7 @@ -- Test streaming of two-phase commits SET synchronous_commit = on; -SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); CREATE TABLE stream_test(data text); @@ -18,11 +18,11 @@ ROLLBACK TO s1; INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); PREPARE TRANSACTION 'test1'; -- should show the inserts after a ROLLBACK -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); COMMIT PREPARED 'test1'; --should show the COMMIT PREPARED and the other changes in the transaction -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); -- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with -- filtered gid. gids with '_nodecode' will not be decoded at prepare time. @@ -35,11 +35,11 @@ ROLLBACK to s1; INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i); PREPARE TRANSACTION 'test1_nodecode'; -- should NOT show inserts after a ROLLBACK -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); COMMIT PREPARED 'test1_nodecode'; -- should show the inserts but not show a COMMIT PREPARED but a COMMIT -SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1'); DROP TABLE stream_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 929255eac7..ae5f397f35 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -164,7 +164,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ListCell *option; TestDecodingData *data; bool enable_streaming = false; - bool enable_twophase = false; data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, @@ -265,16 +264,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } - else if (strcmp(elem->defname, "two-phase-commit") == 0) - { - if (elem->arg == NULL) - continue; - else if (!parse_bool(strVal(elem->arg), &enable_twophase)) - ereport(ERROR, - (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("could not parse value \"%s\" for parameter \"%s\"", - strVal(elem->arg), elem->defname))); - } else { ereport(ERROR, @@ -286,7 +275,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } ctx->streaming &= enable_streaming; - ctx->twophase &= enable_twophase; } /* cleanup this plugin's resources */ diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index db29905e91..b1de6d0674 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -11529,6 +11529,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx is -1. + + + + two_phase bool + + + True if the slot is enabled for decoding prepared transactions. Always + false for physical slots. + + diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 6c189bfed2..bf99f82149 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -25559,7 +25559,7 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, two_phase boolean ) record ( slot_name name, lsn pg_lsn ) @@ -25571,9 +25571,11 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup()); parameter, temporary, when set to true, specifies that the slot should not be permanently stored to disk and is only meant for use by the current session. Temporary slots are also - released upon any error. A call to this function has the same - effect as the replication protocol command - CREATE_REPLICATION_SLOT ... LOGICAL. + released upon any error. The optional fourth parameter, + two_phase, when set to true, specifies + that the decoding of prepared transactions is enabled for this + slot. A call to this function has the same effect as the replication + protocol command CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index f1f13d81d5..80eb96d609 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -55,7 +55,7 @@ postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding' -postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); +postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true); slot_name | lsn -----------------+----------- regression_slot | 0/16B1970 @@ -169,17 +169,18 @@ $ pg_recvlogical -d postgres --slot=test --drop-slot The following example shows SQL interface that can be used to decode prepared transactions. Before you use two-phase commit commands, you must set - max_prepared_transactions to at least 1. You must also set - the option 'two-phase-commit' to 1 while calling - pg_logical_slot_get_changes. Note that we will stream - the entire transaction after the commit if it is not already decoded. + max_prepared_transactions to at least 1. You must also have + set the two-phase parameter as 'true' while creating the slot using + pg_create_logical_replication_slot + Note that we will stream the entire transaction after the commit if it + is not already decoded. postgres=# BEGIN; postgres=*# INSERT INTO data(data) VALUES('5'); postgres=*# PREPARE TRANSACTION 'test_prepared1'; -postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1'); +postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+--------------------------------------------------------- 0/1689DC0 | 529 | BEGIN 529 @@ -188,7 +189,7 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU (3 rows) postgres=# COMMIT PREPARED 'test_prepared1'; -postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1'); +postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+-------------------------------------------- 0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529 @@ -198,7 +199,7 @@ postgres=#-- you can also rollback a prepared transaction postgres=# BEGIN; postgres=*# INSERT INTO data(data) VALUES('6'); postgres=*# PREPARE TRANSACTION 'test_prepared2'; -postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1'); +postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+--------------------------------------------------------- 0/168A180 | 530 | BEGIN 530 @@ -207,7 +208,7 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU (3 rows) postgres=# ROLLBACK PREPARED 'test_prepared2'; -postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1'); +postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL); lsn | xid | data -----------+-----+---------------------------------------------- 0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530 diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fa58afd9d7..fc94a73a54 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -894,7 +894,8 @@ CREATE VIEW pg_replication_slots AS L.restart_lsn, L.confirmed_flush_lsn, L.wal_status, - L.safe_wal_size + L.safe_wal_size, + L.two_phase FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); @@ -1318,6 +1319,7 @@ AS 'pg_create_physical_replication_slot'; CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, + IN twophase boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3f6d723d09..37b75deb72 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin, startup_cb_wrapper(ctx, &ctx->options, true); MemoryContextSwitchTo(old_context); + /* + * We allow decoding of prepared transactions iff the two_phase option is + * enabled at the time of slot creation. + */ + ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; return ctx; @@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn, startup_cb_wrapper(ctx, &ctx->options, false); MemoryContextSwitchTo(old_context); + /* + * We allow decoding of prepared transactions iff the two_phase option is + * enabled at the time of slot creation. + */ + ctx->twophase &= MyReplicationSlot->data.two_phase; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; ereport(LOG, diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fb4af2ef52..75a087c2f9 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -216,10 +216,17 @@ ReplicationSlotValidateName(const char *name, int elevel) * name: Name of the slot * db_specific: logical decoding is db specific; if the slot is going to * be used for that pass true, otherwise false. + * two_phase: Allows decoding of prepared transactions. We allow this option + * to be enabled only at the slot creation time. If we allow this option + * to be changed during decoding then it is quite possible that we skip + * prepare first time because this option was not enabled. Now next time + * during getting changes, if the two_phase option is enabled it can skip + * prepare because by that time start decoding point has been moved. So the + * user will only get commit prepared. */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency) + ReplicationSlotPersistency persistency, bool two_phase) { ReplicationSlot *slot = NULL; int i; @@ -277,6 +284,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, namestrcpy(&slot->data.name, name); slot->data.database = db_specific ? MyDatabaseId : InvalidOid; slot->data.persistency = persistency; + slot->data.two_phase = two_phase; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d24bb5b0b5..9817b44113 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false); if (immediately_reserve) { @@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) */ static void create_logical_replication_slot(char *name, char *plugin, - bool temporary, XLogRecPtr restart_lsn, + bool temporary, bool two_phase, + XLogRecPtr restart_lsn, bool find_startpoint) { LogicalDecodingContext *ctx = NULL; @@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); /* * Create logical decoding context to find start point or, if we don't @@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name name = PG_GETARG_NAME(0); Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); + bool two_phase = PG_GETARG_BOOL(3); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, + two_phase, InvalidXLogRecPtr, true); @@ -236,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 13 +#define PG_GET_REPLICATION_SLOTS_COLS 14 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -432,6 +435,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = Int64GetDatum(failLSN - currlsn); } + values[i++] = BoolGetDatum(slot_contents.data.two_phase); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -796,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eb3f18ed48..23baa4498a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -938,7 +938,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, - cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT); + cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, + false); } else { @@ -952,7 +953,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) * they get dropped on error as well. */ ReplicationSlotCreate(cmd->slotname, true, - cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); + cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, + cmd->two_phase); } if (cmd->kind == REPLICATION_KIND_LOGICAL) diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 4cc94de224..b19975c5c8 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202102191 +#define CATALOG_VERSION_NO 202103031 #endif diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 1487710d59..3d3974f467 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10496,16 +10496,16 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', - proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool', - proallargtypes => '{name,name,bool,name,pg_lsn}', - proargmodes => '{i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,slot_name,lsn}', + proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool bool', + proallargtypes => '{name,name,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index faa3a251f2..ebc43a0293 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd ReplicationKind kind; char *plugin; bool temporary; + bool two_phase; List *options; } CreateReplicationSlotCmd; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 5c3fde20c6..1ad5e6c50d 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -98,6 +98,11 @@ typedef struct ReplicationSlotPersistentData */ XLogRecPtr initial_consistent_point; + /* + * Allow decoding of prepared transactions? + */ + bool two_phase; + /* plugin name */ NameData plugin; } ReplicationSlotPersistentData; @@ -199,7 +204,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency p); + ReplicationSlotPersistency p, bool two_phase); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 10a1f34ebc..b1c9b7bdfe 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1477,8 +1477,9 @@ pg_replication_slots| SELECT l.slot_name, l.restart_lsn, l.confirmed_flush_lsn, l.wal_status, - l.safe_wal_size - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size) + l.safe_wal_size, + l.two_phase + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper,