Fix bogus completion tag usage in walsender

Since commit fd5942c18f (2012, 9.3-era), walsender has been sending
completion tags for certain replication commands twice -- and they're
not even consistent.  Apparently neither libpq nor JDBC have a problem
with it, but it's not kosher.  Fix by remove the EndCommand() call in
the common code path for them all, and inserting specific calls to
EndReplicationCommand() specifically in those places where it's needed.

EndReplicationCommand() is a new simple function to send the completion
tag for replication commands.  Do this instead of sending a generic
SELECT completion tag for them all, which was also pretty bogus (if
innocuous).  While at it, change StartReplication() to use
EndReplicationCommand() instead of pg_puttextmessage().

In commit 2f9661311b, I failed to realize that replication commands
are not close-enough kin of regular SQL commands, so the
DROP_REPLICATION_SLOT tag I added is undeserved and a type pun.  Take it
out.

Backpatch to 13, where the latter commit appeared.  The duplicate tag
has been sent since 9.3, but since nothing is broken, it doesn't seem
worth fixing.

Per complaints from Tom Lane.

Discussion: https://postgr.es/m/1347966.1600195735@sss.pgh.pa.us
This commit is contained in:
Alvaro Herrera 2020-09-16 13:04:38 -03:00
parent 44fc6e259b
commit 07082b08cc
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
4 changed files with 34 additions and 14 deletions

View File

@ -799,7 +799,7 @@ StartReplication(StartReplicationCmd *cmd)
}
/* Send CommandComplete message */
pq_puttextmessage('C', "START_STREAMING");
EndReplicationCommand("START_STREAMING");
}
/*
@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
QueryCompletion qc;
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0);
EndCommand(&qc, DestRemote, false);
}
/*
@ -1517,9 +1513,9 @@ exec_replication_command(const char *cmd_string)
{
int parse_rc;
Node *cmd_node;
const char *cmdtag;
MemoryContext cmd_context;
MemoryContext old_context;
QueryCompletion qc;
/*
* If WAL sender has been told that shutdown is getting close, switch its
@ -1619,40 +1615,53 @@ exec_replication_command(const char *cmd_string)
switch (cmd_node->type)
{
case T_IdentifySystemCmd:
cmdtag = "IDENTIFY_SYSTEM";
IdentifySystem();
EndReplicationCommand(cmdtag);
break;
case T_BaseBackupCmd:
PreventInTransactionBlock(true, "BASE_BACKUP");
cmdtag = "BASE_BACKUP";
PreventInTransactionBlock(true, cmdtag);
SendBaseBackup((BaseBackupCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_CreateReplicationSlotCmd:
cmdtag = "CREATE_REPLICATION_SLOT";
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_DropReplicationSlotCmd:
cmdtag = "DROP_REPLICATION_SLOT";
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_StartReplicationCmd:
{
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
PreventInTransactionBlock(true, "START_REPLICATION");
cmdtag = "START_REPLICATION";
PreventInTransactionBlock(true, cmdtag);
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
StartReplication(cmd);
else
StartLogicalReplication(cmd);
/* callees already sent their own completion message */
Assert(xlogreader != NULL);
break;
}
case T_TimeLineHistoryCmd:
PreventInTransactionBlock(true, "TIMELINE_HISTORY");
cmdtag = "TIMELINE_HISTORY";
PreventInTransactionBlock(true, cmdtag);
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
EndReplicationCommand(cmdtag);
break;
case T_VariableShowStmt:
@ -1660,10 +1669,13 @@ exec_replication_command(const char *cmd_string)
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
cmdtag = "SHOW";
/* syscache access needs a transaction environment */
StartTransactionCommand();
GetPGVariable(n->name, dest);
CommitTransactionCommand();
EndReplicationCommand(cmdtag);
}
break;
@ -1676,10 +1688,6 @@ exec_replication_command(const char *cmd_string)
MemoryContextSwitchTo(old_context);
MemoryContextDelete(cmd_context);
/* Send CommandComplete message */
SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
EndCommand(&qc, DestRemote, true);
/* Report to pgstat that this process is now idle */
pgstat_report_activity(STATE_IDLE, NULL);
debug_query_string = NULL;

View File

@ -211,6 +211,18 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o
}
}
/* ----------------
* EndReplicationCommand - stripped down version of EndCommand
*
* For use by replication commands.
* ----------------
*/
void
EndReplicationCommand(const char *commandTag)
{
pq_putmessage('C', commandTag, strlen(commandTag) + 1);
}
/* ----------------
* NullCommand - tell dest that an empty query string was recognized
*

View File

@ -157,7 +157,6 @@ PG_CMDTAG(CMDTAG_DROP_OWNED, "DROP OWNED", true, false, false)
PG_CMDTAG(CMDTAG_DROP_POLICY, "DROP POLICY", true, false, false)
PG_CMDTAG(CMDTAG_DROP_PROCEDURE, "DROP PROCEDURE", true, false, false)
PG_CMDTAG(CMDTAG_DROP_PUBLICATION, "DROP PUBLICATION", true, false, false)
PG_CMDTAG(CMDTAG_DROP_REPLICATION_SLOT, "DROP REPLICATION SLOT", false, false, false)
PG_CMDTAG(CMDTAG_DROP_ROLE, "DROP ROLE", false, false, false)
PG_CMDTAG(CMDTAG_DROP_ROUTINE, "DROP ROUTINE", true, false, false)
PG_CMDTAG(CMDTAG_DROP_RULE, "DROP RULE", true, false, false)

View File

@ -139,6 +139,7 @@ extern void BeginCommand(CommandTag commandTag, CommandDest dest);
extern DestReceiver *CreateDestReceiver(CommandDest dest);
extern void EndCommand(const QueryCompletion *qc, CommandDest dest,
bool force_undecorated_output);
extern void EndReplicationCommand(const char *commandTag);
/* Additional functions that go with destination management, more or less. */