Add support for streaming to built-in logical replication.

To add support for streaming of in-progress transactions into the
built-in logical replication, we need to do three things:

* Extend the logical replication protocol, so identify in-progress
transactions, and allow adding additional bits of information (e.g.
XID of subtransactions).

* Modify the output plugin (pgoutput) to implement the new stream
API callbacks, by leveraging the extended replication protocol.

* Modify the replication apply worker, to properly handle streamed
in-progress transaction by spilling the data to disk and then
replaying them on commit.

We however must explicitly disable streaming replication during
replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover we don't have a replication connection open so we
don't have where to send the data anyway.

Author: Tomas Vondra, Dilip Kumar and Amit Kapila
Reviewed-by: Amit Kapila, Kuntal Ghosh and Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
This commit is contained in:
Amit Kapila 2020-09-03 07:54:07 +05:30
parent 66f1630680
commit 464824323e
23 changed files with 1766 additions and 74 deletions

View File

@ -1509,6 +1509,22 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>WALWrite</literal></entry>
<entry>Waiting for a write to a WAL file.</entry>
</row>
<row>
<entry><literal>LogicalChangesRead</literal></entry>
<entry>Waiting for a read from a logical changes file.</entry>
</row>
<row>
<entry><literal>LogicalChangesWrite</literal></entry>
<entry>Waiting for a write to a logical changes file.</entry>
</row>
<row>
<entry><literal>LogicalSubxactRead</literal></entry>
<entry>Waiting for a read from a logical subxact file.</entry>
</row>
<row>
<entry><literal>LogicalSubxactWrite</literal></entry>
<entry>Waiting for a write to a logical subxact file.</entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -165,8 +165,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<xref linkend="sql-createsubscription"/>. See there for more
information. The parameters that can be altered
are <literal>slot_name</literal>,
<literal>synchronous_commit</literal>, and
<literal>binary</literal>.
<literal>synchronous_commit</literal>,
<literal>binary</literal>, and
<literal>streaming</literal>.
</para>
</listitem>
</varlistentry>

View File

@ -228,6 +228,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>streaming</literal> (<type>boolean</type>)</term>
<listitem>
<para>
Specifies whether streaming of in-progress transactions should
be enabled for this subscription. By default, all transactions
are fully decoded on the publisher, and only then sent to the
subscriber as a whole.
</para>
</listitem>
</varlistentry>
</variablelist></para>
</listitem>
</varlistentry>

View File

@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->binary = subform->subbinary;
sub->stream = subform->substream;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,

View File

@ -1128,7 +1128,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications)
ON pg_subscription TO public;

View File

@ -63,7 +63,8 @@ parse_subscription_options(List *options,
bool *copy_data,
char **synchronous_commit,
bool *refresh,
bool *binary_given, bool *binary)
bool *binary_given, bool *binary,
bool *streaming_given, bool *streaming)
{
ListCell *lc;
bool connect_given = false;
@ -99,6 +100,11 @@ parse_subscription_options(List *options,
*binary_given = false;
*binary = false;
}
if (streaming)
{
*streaming_given = false;
*streaming = false;
}
/* Parse options */
foreach(lc, options)
@ -194,6 +200,16 @@ parse_subscription_options(List *options,
*binary_given = true;
*binary = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0 && streaming)
{
if (*streaming_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
*streaming_given = true;
*streaming = defGetBoolean(defel);
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
bool enabled_given;
bool enabled;
bool copy_data;
bool streaming;
bool streaming_given;
char *synchronous_commit;
char *conninfo;
char *slotname;
@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
&copy_data,
&synchronous_commit,
NULL, /* no "refresh" */
&binary_given, &binary);
&binary_given, &binary,
&streaming_given, &streaming);
/*
* Since creating a replication slot is not transactional, rolling back
@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (slotname)
@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
char *synchronous_commit;
bool binary_given;
bool binary;
bool streaming_given;
bool streaming;
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, /* no "copy_data" */
&synchronous_commit,
NULL, /* no "refresh" */
&binary_given, &binary);
&binary_given, &binary,
&streaming_given, &streaming);
if (slotname_given)
{
@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
if (streaming_given)
{
values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
}
update_tuple = true;
break;
}
@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
NULL, /* no "copy_data" */
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
NULL, NULL); /* no "binary" */
NULL, NULL, /* no "binary" */
NULL, NULL); /* no streaming */
Assert(enabled_given);
if (!sub->slotname && enabled)
@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
&copy_data,
NULL, /* no "synchronous_commit" */
&refresh,
NULL, NULL); /* no "binary" */
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true;
@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
&copy_data,
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
NULL, NULL); /* no "binary" */
NULL, NULL, /* no "binary" */
NULL, NULL); /* no "streaming" */
AlterSubscription_refresh(sub, copy_data);

View File

@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w)
case WAIT_EVENT_WAL_WRITE:
event_name = "WALWrite";
break;
case WAIT_EVENT_LOGICAL_CHANGES_READ:
event_name = "LogicalChangesRead";
break;
case WAIT_EVENT_LOGICAL_CHANGES_WRITE:
event_name = "LogicalChangesWrite";
break;
case WAIT_EVENT_LOGICAL_SUBXACT_READ:
event_name = "LogicalSubxactRead";
break;
case WAIT_EVENT_LOGICAL_SUBXACT_WRITE:
event_name = "LogicalSubxactWrite";
break;
/* no default case, so that compiler will warn */
}

View File

@ -425,6 +425,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version);
if (options->proto.logical.streaming &&
PQserverVersion(conn->streamConn) >= 140000)
appendStringInfo(&cmd, ", streaming 'on'");
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)

View File

@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
* Write INSERT to the output stream.
*/
void
logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'I'); /* action INSERT */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
* Write UPDATE to the output stream.
*/
void
logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
HeapTuple newtuple, bool binary)
logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
HeapTuple oldtuple, HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
* Write DELETE to the output stream.
*/
void
logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
HeapTuple oldtuple, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b
pq_sendbyte(out, 'D'); /* action DELETE */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
*/
void
logicalrep_write_truncate(StringInfo out,
TransactionId xid,
int nrelids,
Oid relids[],
bool cascade, bool restart_seqs)
@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out,
pq_sendbyte(out, 'T'); /* action TRUNCATE */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
pq_sendint32(out, nrelids);
/* encode and send truncate flags */
@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in,
* Write relation description to the output stream.
*/
void
logicalrep_write_rel(StringInfo out, Relation rel)
logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
{
char *relname;
pq_sendbyte(out, 'R'); /* sending RELATION */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in)
* This function will always write base type info.
*/
void
logicalrep_write_typ(StringInfo out, Oid typoid)
logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
{
Oid basetypoid = getBaseType(typoid);
HeapTuple tup;
@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid)
pq_sendbyte(out, 'Y'); /* sending TYPE */
/* transaction ID (if not valid, we're not streaming) */
if (TransactionIdIsValid(xid))
pq_sendint32(out, xid);
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", basetypoid);
@ -720,3 +747,126 @@ logicalrep_read_namespace(StringInfo in)
return nspname;
}
/*
* Write the information for the start stream message to the output stream.
*/
void
logicalrep_write_stream_start(StringInfo out,
TransactionId xid, bool first_segment)
{
pq_sendbyte(out, 'S'); /* action STREAM START */
Assert(TransactionIdIsValid(xid));
/* transaction ID (we're starting to stream, so must be valid) */
pq_sendint32(out, xid);
/* 1 if this is the first streaming segment for this xid */
pq_sendbyte(out, first_segment ? 1 : 0);
}
/*
* Read the information about the start stream message from output stream.
*/
TransactionId
logicalrep_read_stream_start(StringInfo in, bool *first_segment)
{
TransactionId xid;
Assert(first_segment);
xid = pq_getmsgint(in, 4);
*first_segment = (pq_getmsgbyte(in) == 1);
return xid;
}
/*
* Write the stop stream message to the output stream.
*/
void
logicalrep_write_stream_stop(StringInfo out)
{
pq_sendbyte(out, 'E'); /* action STREAM END */
}
/*
* Write STREAM COMMIT to the output stream.
*/
void
logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
uint8 flags = 0;
pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
Assert(TransactionIdIsValid(txn->xid));
/* transaction ID */
pq_sendint32(out, txn->xid);
/* send the flags field (unused for now) */
pq_sendbyte(out, flags);
/* send fields */
pq_sendint64(out, commit_lsn);
pq_sendint64(out, txn->end_lsn);
pq_sendint64(out, txn->commit_time);
}
/*
* Read STREAM COMMIT from the output stream.
*/
TransactionId
logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
{
TransactionId xid;
uint8 flags;
xid = pq_getmsgint(in, 4);
/* read flags (unused for now) */
flags = pq_getmsgbyte(in);
if (flags != 0)
elog(ERROR, "unrecognized flags %u in commit message", flags);
/* read fields */
commit_data->commit_lsn = pq_getmsgint64(in);
commit_data->end_lsn = pq_getmsgint64(in);
commit_data->committime = pq_getmsgint64(in);
return xid;
}
/*
* Write STREAM ABORT to the output stream. Note that xid and subxid will be
* same for the top-level transaction abort.
*/
void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid)
{
pq_sendbyte(out, 'A'); /* action STREAM ABORT */
Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
/* transaction ID */
pq_sendint32(out, xid);
pq_sendint32(out, subxid);
}
/*
* Read STREAM ABORT from the output stream.
*/
void
logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid)
{
Assert(xid && subxid);
*xid = pq_getmsgint(in, 4);
*subxid = pq_getmsgint(in, 4);
}

File diff suppressed because it is too large Load Diff

View File

@ -47,17 +47,40 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn);
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static bool publications_valid;
static bool in_streaming;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx);
/*
* Entry in the map used to remember which relation schemas we sent.
*
* The schema_sent flag determines if the current schema record was already
* sent to the subscriber (in which case we don't need to send it again).
*
* The schema cache on downstream is however updated only at commit time,
* and with streamed transactions the commit order may be different from
* the order the transactions are sent in. Also, the (sub) transactions
* might get aborted so we need to send the schema for each (sub) transaction
* so that we don't loose the schema information on abort. For handling this,
* we maintain the list of xids (streamed_txns) for those we have already sent
* the schema.
*
* For partitions, 'pubactions' considers not only the table's own
* publications, but also those of all of its ancestors.
*/
@ -70,6 +93,8 @@ typedef struct RelationSyncEntry
* have been sent for this to be true.
*/
bool schema_sent;
List *streamed_txns; /* streamed toplevel transactions with this
* schema */
bool replicate_valid;
PublicationActions pubactions;
@ -95,10 +120,15 @@ typedef struct RelationSyncEntry
static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext decoding_context);
static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
TransactionId xid);
/*
* Specify output plugin callbacks
@ -115,16 +145,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
/* transaction streaming */
cb->stream_start_cb = pgoutput_stream_start;
cb->stream_stop_cb = pgoutput_stream_stop;
cb->stream_abort_cb = pgoutput_stream_abort;
cb->stream_commit_cb = pgoutput_stream_commit;
cb->stream_change_cb = pgoutput_change;
cb->stream_truncate_cb = pgoutput_truncate;
}
static void
parse_output_parameters(List *options, uint32 *protocol_version,
List **publication_names, bool *binary)
List **publication_names, bool *binary,
bool *enable_streaming)
{
ListCell *lc;
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
bool streaming_given = false;
*binary = false;
@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
*binary = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0)
{
if (streaming_given)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
streaming_given = true;
*enable_streaming = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
@ -194,6 +244,7 @@ static void
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
bool enable_streaming = false;
PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */
@ -217,7 +268,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
parse_output_parameters(ctx->output_plugin_options,
&data->protocol_version,
&data->publication_names,
&data->binary);
&data->binary,
&enable_streaming);
/* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
@ -237,6 +289,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("publication_names parameter missing")));
/*
* Decide whether to enable streaming. It is disabled by default, in
* which case we just update the flag in decoding context. Otherwise
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
if (!enable_streaming)
ctx->streaming = false;
else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
else if (!ctx->streaming)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
/* Also remember we're currently not streaming any transaction. */
in_streaming = false;
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
@ -247,6 +320,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Initialize relation schema cache. */
init_rel_sync_cache(CacheMemoryContext);
}
else
{
/* Disable the streaming during the slot initialization mode. */
ctx->streaming = false;
}
}
/*
@ -305,9 +383,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
if (relentry->schema_sent)
bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
/*
* Remember XID of the (sub)transaction for the change. We don't care if
* it's top-level transaction or not (we have already sent that XID in
* start of the current streaming block).
*
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
if (in_streaming)
xid = change->txn->xid;
if (change->txn->toptxn)
topxid = change->txn->toptxn->xid;
else
topxid = xid;
/*
* Do we need to send the schema? We do track streamed transactions
* separately, because those may be applied later (and the regular
* transactions won't see their effects until then) and in an order that
* we don't know at this point.
*
* XXX There is a scope of optimization here. Currently, we always send
* the schema first time in a streaming transaction but we can probably
* avoid that by checking 'relentry->schema_sent' flag. However, before
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
if (in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
if (schema_sent)
return;
/* If needed, send the ancestor's schema first. */
@ -323,19 +439,24 @@ maybe_send_schema(LogicalDecodingContext *ctx,
relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
CreateTupleDescCopy(outdesc));
MemoryContextSwitchTo(oldctx);
send_relation_and_attrs(ancestor, ctx);
send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor);
}
send_relation_and_attrs(relation, ctx);
relentry->schema_sent = true;
send_relation_and_attrs(relation, xid, ctx);
if (in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;
}
/*
* Sends a relation
*/
static void
send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
@ -359,17 +480,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
continue;
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, att->atttypid);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_rel(ctx->out, relation);
logicalrep_write_rel(ctx->out, xid, relation);
OutputPluginWrite(ctx, false);
}
/*
* Sends the decoded DML over wire.
*
* This is called both in streaming and non-streaming modes.
*/
static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
@ -378,10 +501,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
if (!is_publishable_relation(relation))
return;
/*
* Remember the xid for the change in streaming mode. We need to send xid
* with each change in the streaming mode so that subscriber can make
* their association and on aborts, it can discard the corresponding
* changes.
*/
if (in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
/* First check the table filter */
@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
maybe_send_schema(ctx, relation, relentry);
maybe_send_schema(ctx, txn, change, relation, relentry);
/* Send the data */
switch (change->action)
@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_insert(ctx->out, relation, tuple,
logicalrep_write_insert(ctx->out, xid, relation, tuple,
data->binary);
OutputPluginWrite(ctx, true);
break;
@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_update(ctx->out, relation, oldtuple, newtuple,
data->binary);
logicalrep_write_update(ctx->out, xid, relation, oldtuple,
newtuple, data->binary);
OutputPluginWrite(ctx, true);
break;
}
@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_delete(ctx->out, relation, oldtuple,
logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
data->binary);
OutputPluginWrite(ctx, true);
}
@ -498,6 +631,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int i;
int nrelids;
Oid *relids;
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
@ -526,13 +664,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue;
relids[nrelids++] = relid;
maybe_send_schema(ctx, relation, relentry);
maybe_send_schema(ctx, txn, change, relation, relentry);
}
if (nrelids > 0)
{
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_truncate(ctx->out,
xid,
nrelids,
relids,
change->data.truncate.cascade,
@ -605,6 +744,118 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
}
/*
* START STREAM callback
*/
static void
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
Assert(!in_streaming);
/*
* If we already sent the first stream for this transaction then don't
* send the origin id in the subsequent streams.
*/
if (rbtxn_is_streamed(txn))
send_replication_origin = false;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
if (send_replication_origin)
{
char *origin;
/* Message boundary */
OutputPluginWrite(ctx, false);
OutputPluginPrepareWrite(ctx, true);
if (replorigin_by_oid(txn->origin_id, true, &origin))
logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
}
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
in_streaming = true;
}
/*
* STOP STREAM callback
*/
static void
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
/* we should be streaming a trasanction */
Assert(in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
in_streaming = false;
}
/*
* Notify downstream to discard the streamed transaction (along with all
* it's subtransactions, if it's a toplevel transaction).
*/
static void
pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
/*
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
/* determine the toplevel transaction */
toptxn = (txn->toptxn) ? txn->toptxn : txn;
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);
}
/*
* Notify downstream to apply the streamed transaction (along with all
* it's subtransactions).
*/
static void
pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(txn->xid, true);
}
/*
* Initialize the relation schema sync cache for a decoding session.
*
@ -641,6 +892,39 @@ init_rel_sync_cache(MemoryContext cachectx)
(Datum) 0);
}
/*
* We expect relatively small number of streamed transactions.
*/
static bool
get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
{
ListCell *lc;
foreach(lc, entry->streamed_txns)
{
if (xid == (uint32) lfirst_int(lc))
return true;
}
return false;
}
/*
* Add the xid in the rel sync entry for which we have already sent the schema
* of the relation.
*/
static void
set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
{
MemoryContext oldctx;
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
MemoryContextSwitchTo(oldctx);
}
/*
* Find or create entry in the relation schema cache.
*
@ -771,11 +1055,58 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
}
if (!found)
{
entry->schema_sent = false;
entry->streamed_txns = NULL;
}
return entry;
}
/*
* Cleanup list of streamed transactions and update the schema_sent flag.
*
* When a streamed transaction commits or aborts, we need to remove the
* toplevel XID from the schema cache. If the transaction aborted, the
* subscriber will simply throw away the schema records we streamed, so
* we don't need to do anything else.
*
* If the transaction is committed, the subscriber will update the relation
* cache - so tweak the schema_sent flag accordingly.
*/
static void
cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
{
HASH_SEQ_STATUS hash_seq;
RelationSyncEntry *entry;
ListCell *lc;
Assert(RelationSyncCache != NULL);
hash_seq_init(&hash_seq, RelationSyncCache);
while ((entry = hash_seq_search(&hash_seq)) != NULL)
{
/*
* We can set the schema_sent flag for an entry that has committed xid
* in the list as that ensures that the subscriber would have the
* corresponding schema and we don't need to send it unless there is
* any invalidation for that relation.
*/
foreach(lc, entry->streamed_txns)
{
if (xid == (uint32) lfirst_int(lc))
{
if (is_commit)
entry->schema_sent = true;
entry->streamed_txns =
foreach_delete_current(entry->streamed_txns, lc);
break;
}
}
}
}
/*
* Relcache invalidation callback
*/
@ -811,7 +1142,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
* Reset schema sent status as the relation definition may have changed.
*/
if (entry != NULL)
{
entry->schema_sent = false;
list_free(entry->streamed_txns);
entry->streamed_txns = NULL;
}
}
/*

View File

@ -4202,6 +4202,7 @@ getSubscriptions(Archive *fout)
int i_oid;
int i_subname;
int i_rolname;
int i_substream;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
@ -4241,10 +4242,17 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 140000)
appendPQExpBuffer(query,
" s.subbinary\n");
" s.subbinary,\n");
else
appendPQExpBuffer(query,
" false AS subbinary\n");
" false AS subbinary,\n");
if (fout->remoteVersion >= 140000)
appendPQExpBuffer(query,
" s.substream\n");
else
appendPQExpBuffer(query,
" false AS substream\n");
appendPQExpBuffer(query,
"FROM pg_subscription s\n"
@ -4264,6 +4272,7 @@ getSubscriptions(Archive *fout)
i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
i_subbinary = PQfnumber(res, "subbinary");
i_substream = PQfnumber(res, "substream");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
@ -4287,6 +4296,8 @@ getSubscriptions(Archive *fout)
pg_strdup(PQgetvalue(res, i, i_subpublications));
subinfo[i].subbinary =
pg_strdup(PQgetvalue(res, i, i_subbinary));
subinfo[i].substream =
pg_strdup(PQgetvalue(res, i, i_substream));
if (strlen(subinfo[i].rolname) == 0)
pg_log_warning("owner of subscription \"%s\" appears to be invalid",
@ -4358,6 +4369,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
if (strcmp(subinfo->subbinary, "t") == 0)
appendPQExpBuffer(query, ", binary = true");
if (strcmp(subinfo->substream, "f") != 0)
appendPQExpBuffer(query, ", streaming = on");
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));

View File

@ -626,6 +626,7 @@ typedef struct _SubscriptionInfo
char *subconninfo;
char *subslotname;
char *subbinary;
char *substream;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;

View File

@ -5979,7 +5979,7 @@ describeSubscriptions(const char *pattern, bool verbose)
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false};
false, false, false, false};
if (pset.sversion < 100000)
{
@ -6005,11 +6005,13 @@ describeSubscriptions(const char *pattern, bool verbose)
if (verbose)
{
/* Binary mode is only supported in v14 and higher */
/* Binary mode and streaming are only supported in v14 and higher */
if (pset.sversion >= 140000)
appendPQExpBuffer(&buf,
", subbinary AS \"%s\"\n",
gettext_noop("Binary"));
", subbinary AS \"%s\"\n"
", substream AS \"%s\"\n",
gettext_noop("Binary"),
gettext_noop("Streaming"));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202009021
#define CATALOG_VERSION_NO 202009031
#endif

View File

@ -51,6 +51,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */
bool substream; /* Stream in-progress transactions. */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
@ -78,6 +80,7 @@ typedef struct Subscription
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */
bool stream; /* Allow streaming in-progress transactions. */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */

View File

@ -982,7 +982,11 @@ typedef enum
WAIT_EVENT_WAL_READ,
WAIT_EVENT_WAL_SYNC,
WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN,
WAIT_EVENT_WAL_WRITE
WAIT_EVENT_WAL_WRITE,
WAIT_EVENT_LOGICAL_CHANGES_READ,
WAIT_EVENT_LOGICAL_CHANGES_WRITE,
WAIT_EVENT_LOGICAL_SUBXACT_READ,
WAIT_EVENT_LOGICAL_SUBXACT_WRITE
} WaitEventIO;
/* ----------

View File

@ -23,9 +23,13 @@
* we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
* have backwards compatibility for. The client requests protocol version at
* connect time.
*
* LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
* support for streaming large transactions.
*/
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
#define LOGICALREP_PROTO_VERSION_NUM 2
/*
* This struct stores a tuple received via logical replication.
@ -98,25 +102,45 @@ extern void logicalrep_read_commit(StringInfo in,
extern void logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn);
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
extern void logicalrep_write_insert(StringInfo out, Relation rel,
HeapTuple newtuple, bool binary);
extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
Relation rel, HeapTuple newtuple,
bool binary);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
extern void logicalrep_write_update(StringInfo out, TransactionId xid,
Relation rel, HeapTuple oldtuple,
HeapTuple newtuple, bool binary);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
extern void logicalrep_write_delete(StringInfo out, Relation rel,
HeapTuple oldtuple, bool binary);
extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
Relation rel, HeapTuple oldtuple,
bool binary);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup);
extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
int nrelids, Oid relids[],
bool cascade, bool restart_seqs);
extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
extern void logicalrep_write_rel(StringInfo out, Relation rel);
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
extern void logicalrep_write_typ(StringInfo out, Oid typoid);
extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
Oid typoid);
extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid,
bool first_segment);
extern TransactionId logicalrep_read_stream_start(StringInfo in,
bool *first_segment);
extern void logicalrep_write_stream_stop(StringInfo out);
extern TransactionId logicalrep_read_stream_stop(StringInfo in);
extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
extern TransactionId logicalrep_read_stream_commit(StringInfo out,
LogicalRepCommitData *commit_data);
extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid);
extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid);
#endif /* LOGICAL_PROTO_H */

View File

@ -178,6 +178,7 @@ typedef struct
uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */
bool streaming; /* Streaming of large transactions */
} logical;
} proto;
} WalRcvStreamOptions;

View File

@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
---------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
---------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
@ -162,19 +162,42 @@ ERROR: binary requires a Boolean value
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
ERROR: streaming requires a Boolean value
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;

View File

@ -132,6 +132,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
\dRs+
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
DROP SUBSCRIPTION regress_testsub;
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;

View File

@ -0,0 +1,98 @@
# Test streaming of simple large transaction
use strict;
use warnings;
use PostgresNode;
use TestLib;
use Test::More tests => 4;
# Create publisher node
my $node_publisher = get_new_node('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = get_new_node('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
$node_publisher->wait_for_catchup($appname);
# Also wait for initial table sync to finish
my $synced_query =
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Insert, update and delete enough rows to exceed the 64kB limit.
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
# Test the streaming in binary mode
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET (binary = on)"
);
# Insert, update and delete enough rows to exceed the 64kB limit.
$node_publisher->safe_psql('postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
# Change the local values of the extra columns on the subscriber,
# update publisher, and check that subscriber retains the expected
# values. This is to ensure that non-streaming transactions behave
# properly after a streaming transaction.
$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
is($result, qq(6667|6667|6667), 'check extra columns contain locally changed data');
$node_subscriber->stop;
$node_publisher->stop;

View File

@ -111,6 +111,7 @@ Append
AppendPath
AppendRelInfo
AppendState
ApplySubXactData
Archive
ArchiveEntryPtrType
ArchiveFormat
@ -2370,6 +2371,7 @@ StopList
StopWorkersData
StrategyNumber
StreamCtl
StreamXidHash
StringInfo
StringInfoData
StripnullState
@ -2380,6 +2382,7 @@ SubPlanState
SubTransactionId
SubXactCallback
SubXactCallbackItem
SubXactInfo
SubXactEvent
SubplanResultRelHashElem
SubqueryScan