diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index 9fce75565f..addd95faec 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -396,6 +396,13 @@ xact_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "xtop %u: ", xlrec->xtop); xact_desc_assignment(buf, xlrec); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + xl_xact_invals *xlrec = (xl_xact_invals *) rec; + + standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, InvalidOid, + InvalidOid, false); + } } const char * @@ -423,6 +430,9 @@ xact_identify(uint8 info) case XLOG_XACT_ASSIGNMENT: id = "ASSIGNMENT"; break; + case XLOG_XACT_INVALIDATIONS: + id = "INVALIDATION"; + break; } return id; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index bd4c3cf325..d4f7c29847 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1224,6 +1224,16 @@ RecordTransactionCommit(void) bool RelcacheInitFileInval = false; bool wrote_xlog; + /* + * Log pending invalidations for logical decoding of in-progress + * transactions. Normally for DDLs, we log this at each command end, + * however, for certain cases where we directly update the system table + * without a transaction block, the invalidations are not logged till this + * time. + */ + if (XLogLogicalInfoActive()) + LogLogicalInvalidations(); + /* Get data needed for commit record */ nrels = smgrGetPendingDeletes(true, &rels); nchildren = xactGetCommittedChildren(&children); @@ -6022,6 +6032,13 @@ xact_redo(XLogReaderState *record) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } + else if (info == XLOG_XACT_INVALIDATIONS) + { + /* + * XXX we do ignore this for now, what matters are invalidations + * written into the commit record. + */ + } else elog(PANIC, "xact_redo: unknown op code %u", info); } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 0c0c371739..f3a1c31a29 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * We assign subxact to the toplevel xact while processing each - * record if required. So, we don't need to do anything here. - * See LogicalDecodingProcessRecord. + * record if required. So, we don't need to do anything here. See + * LogicalDecodingProcessRecord. */ break; + case XLOG_XACT_INVALIDATIONS: + { + TransactionId xid; + xl_xact_invals *invals; + + xid = XLogRecGetXid(r); + invals = (xl_xact_invals *) XLogRecGetData(r); + + /* + * Execute the invalidations for xid-less transactions, + * otherwise, accumulate them so that they can be processed at + * the commit time. + */ + if (TransactionIdIsValid(xid)) + { + if (!ctx->fast_forward) + ReorderBufferAddInvalidations(reorder, xid, + buf->origptr, + invals->nmsgs, + invals->msgs); + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, + buf->origptr); + } + else if ((!ctx->fast_forward)) + ReorderBufferImmediateInvalidation(ctx->reorder, + invals->nmsgs, + invals->msgs); + } + break; case XLOG_XACT_PREPARE: /* @@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_STANDBY_LOCK: break; case XLOG_INVALIDATIONS: - { - xl_invalidations *invalidations = - (xl_invalidations *) XLogRecGetData(r); - if (!ctx->fast_forward) - ReorderBufferImmediateInvalidation(ctx->reorder, - invalidations->nmsgs, - invalidations->msgs); - } + /* + * We are processing the invalidations at the command level via + * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here. + */ break; default: elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info); @@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, commit_time = parsed->origin_timestamp; } - /* - * Process invalidation messages, even if we're not interested in the - * transaction's contents, since the various caches need to always be - * consistent. - */ - if (parsed->nmsgs > 0) - { - if (!ctx->fast_forward) - ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, - parsed->nmsgs, parsed->msgs); - ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); - } - SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, parsed->nsubxacts, parsed->subxacts); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 449327a147..ce6e62152f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -856,6 +856,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, subtxn->toplevel_xid = xid; Assert(subtxn->nsubtxns == 0); + /* set the reference to top-level transaction */ + subtxn->toptxn = txn; + /* add to subtransaction list */ dlist_push_tail(&txn->subtxns, &subtxn->node); txn->nsubtxns++; @@ -2201,7 +2204,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, /* * Setup the invalidation of the toplevel transaction. * - * This needs to be done before ReorderBufferCommit is called! + * This needs to be called for each XLOG_XACT_INVALIDATIONS message and + * accumulates all the invalidation messages in the toplevel transaction. + * This is required because in some cases where we skip processing the + * transaction (see ReorderBufferForget), we need to execute all the + * invalidations together. */ void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, @@ -2212,17 +2219,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); - if (txn->ninvalidations != 0) - elog(ERROR, "only ever add one set of invalidations"); + /* + * We collect all the invalidations under the top transaction so that we + * can execute them all together. + */ + if (txn->toptxn) + txn = txn->toptxn; Assert(nmsgs > 0); - txn->ninvalidations = nmsgs; - txn->invalidations = (SharedInvalidationMessage *) - MemoryContextAlloc(rb->context, - sizeof(SharedInvalidationMessage) * nmsgs); - memcpy(txn->invalidations, msgs, - sizeof(SharedInvalidationMessage) * nmsgs); + /* Accumulate invalidations. */ + if (txn->ninvalidations == 0) + { + txn->ninvalidations = nmsgs; + txn->invalidations = (SharedInvalidationMessage *) + MemoryContextAlloc(rb->context, + sizeof(SharedInvalidationMessage) * nmsgs); + memcpy(txn->invalidations, msgs, + sizeof(SharedInvalidationMessage) * nmsgs); + } + else + { + txn->invalidations = (SharedInvalidationMessage *) + repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * + (txn->ninvalidations + nmsgs)); + + memcpy(txn->invalidations + txn->ninvalidations, msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + txn->ninvalidations += nmsgs; + } } /* @@ -2250,6 +2275,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; + + /* + * Mark top-level transaction as having catalog changes too if one of its + * children has so that the ReorderBufferBuildTupleCidHash can + * conveniently check just top-level transaction and decide whether to + * build the hash table or not. + */ + if (txn->toptxn != NULL) + txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; } /* diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index 591dd33be6..628d6f5d0c 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -85,6 +85,9 @@ * worth trying to avoid sending such inval traffic in the future, if those * problems can be overcome cheaply. * + * When wal_level=logical, write invalidations into WAL at each command end to + * support the decoding of the in-progress transactions. See + * CommandEndInvalidationMessages. * * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -1094,6 +1097,11 @@ CommandEndInvalidationMessages(void) ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs, LocalExecuteInvalidationMessage); + + /* WAL Log per-command invalidation messages for wal_level=logical */ + if (XLogLogicalInfoActive()) + LogLogicalInvalidations(); + AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs, &transInvalInfo->CurrentCmdInvalidMsgs); } @@ -1501,3 +1509,49 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue) i = ccitem->link - 1; } } + +/* + * LogLogicalInvalidations + * + * Emit WAL for invalidations. This is currently only used for logging + * invalidations at the command end or at commit time if any invalidations + * are pending. + */ +void +LogLogicalInvalidations() +{ + xl_xact_invals xlrec; + SharedInvalidationMessage *invalMessages; + int nmsgs = 0; + + /* Quick exit if we haven't done anything with invalidation messages. */ + if (transInvalInfo == NULL) + return; + + ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs, + MakeSharedInvalidMessagesArray); + + Assert(!(numSharedInvalidMessagesArray > 0 && + SharedInvalidMessagesArray == NULL)); + + invalMessages = SharedInvalidMessagesArray; + nmsgs = numSharedInvalidMessagesArray; + SharedInvalidMessagesArray = NULL; + numSharedInvalidMessagesArray = 0; + + if (nmsgs > 0) + { + /* prepare record */ + memset(&xlrec, 0, MinSizeOfXactInvals); + xlrec.nmsgs = nmsgs; + + /* perform insertion */ + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals); + XLogRegisterData((char *) invalMessages, + nmsgs * sizeof(SharedInvalidationMessage)); + XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); + + pfree(invalMessages); + } +} diff --git a/src/include/access/xact.h b/src/include/access/xact.h index aef8555367..53480116a4 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 #define XLOG_XACT_ASSIGNMENT 0x50 -/* free opcode 0x60 */ +#define XLOG_XACT_INVALIDATIONS 0x60 /* free opcode 0x70 */ /* mask for filtering opcodes out of xl_info */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index b9490a3afe..9b2da56379 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -31,7 +31,7 @@ /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD108 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 019bd382de..1055e99e2e 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -220,6 +220,9 @@ typedef struct ReorderBufferTXN */ XLogRecPtr end_lsn; + /* Toplevel transaction for this subxact (NULL for top-level). */ + struct ReorderBufferTXN *toptxn; + /* * LSN of the last lsn at which snapshot information reside, so we can * restart decoding from there and fully recover this transaction from diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h index bc5081cf72..463888c389 100644 --- a/src/include/utils/inval.h +++ b/src/include/utils/inval.h @@ -61,4 +61,6 @@ extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func, extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue); extern void InvalidateSystemCaches(void); + +extern void LogLogicalInvalidations(void); #endif /* INVAL_H */