diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 505445f2dc..a7bbcf3499 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 0; /* the maximum number of replication * slots */ +static ReplicationSlot *SearchNamedReplicationSlot(const char *name); +static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, + const char *name, SlotAcquireBehavior behavior); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -322,77 +325,117 @@ ReplicationSlotCreate(const char *name, bool db_specific, } /* - * Find a previously created slot and mark it as used by this backend. + * Search for the named replication slot. * - * The return value is only useful if behavior is SAB_Inquire, in which - * it's zero if we successfully acquired the slot, or the PID of the - * owning process otherwise. If behavior is SAB_Error, then trying to - * acquire an owned slot is an error. If SAB_Block, we sleep until the - * slot is released by the owning process. + * Return the replication slot if found, otherwise NULL. + * + * The caller must hold ReplicationSlotControlLock in shared mode. */ -int -ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior) +static ReplicationSlot * +SearchNamedReplicationSlot(const char *name) { - ReplicationSlot *slot; - int active_pid; int i; + ReplicationSlot *slot = NULL; -retry: - Assert(MyReplicationSlot == NULL); + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, + LW_SHARED)); - /* - * Search for the named slot and mark it active if we find it. If the - * slot is already active, we exit the loop with active_pid set to the PID - * of the backend that owns it. - */ - active_pid = 0; - slot = NULL; - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) { - /* - * This is the slot we want; check if it's active under some other - * process. In single user mode, we don't need this check. - */ - if (IsUnderPostmaster) - { - /* - * Get ready to sleep on it in case it is active. (We may end - * up not sleeping, but we don't want to do this while holding - * the spinlock.) - */ - ConditionVariablePrepareToSleep(&s->active_cv); - - SpinLockAcquire(&s->mutex); - - active_pid = s->active_pid; - if (active_pid == 0) - active_pid = s->active_pid = MyProcPid; - - SpinLockRelease(&s->mutex); - } - else - active_pid = MyProcPid; slot = s; - break; } } - LWLockRelease(ReplicationSlotControlLock); - /* If we did not find the slot, error out. */ - if (slot == NULL) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("replication slot \"%s\" does not exist", name))); + return slot; +} + +/* + * Find a previously created slot and mark it as used by this process. + * + * The return value is only useful if behavior is SAB_Inquire, in which + * it's zero if we successfully acquired the slot, -1 if the slot no longer + * exists, or the PID of the owning process otherwise. If behavior is + * SAB_Error, then trying to acquire an owned slot is an error. + * If SAB_Block, we sleep until the slot is released by the owning process. + */ +int +ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior) +{ + return ReplicationSlotAcquireInternal(NULL, name, behavior); +} + +/* + * Mark the specified slot as used by this process. + * + * Only one of slot and name can be specified. + * If slot == NULL, search for the slot with the given name. + * + * See comments about the return value in ReplicationSlotAcquire(). + */ +static int +ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, + SlotAcquireBehavior behavior) +{ + ReplicationSlot *s; + int active_pid; + + AssertArg((slot == NULL) ^ (name == NULL)); + +retry: + Assert(MyReplicationSlot == NULL); + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); /* - * If we found the slot but it's already active in another backend, we - * either error out or retry after a short wait, as caller specified. + * Search for the slot with the specified name if the slot to acquire is + * not given. If the slot is not found, we either return -1 or error out. + */ + s = slot ? slot : SearchNamedReplicationSlot(name); + if (s == NULL || !s->in_use) + { + LWLockRelease(ReplicationSlotControlLock); + + if (behavior == SAB_Inquire) + return -1; + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("replication slot \"%s\" does not exist", + name ? name : NameStr(slot->data.name)))); + } + + /* + * This is the slot we want; check if it's active under some other + * process. In single user mode, we don't need this check. + */ + if (IsUnderPostmaster) + { + /* + * Get ready to sleep on the slot in case it is active if SAB_Block. + * (We may end up not sleeping, but we don't want to do this while + * holding the spinlock.) + */ + if (behavior == SAB_Block) + ConditionVariablePrepareToSleep(&s->active_cv); + + SpinLockAcquire(&s->mutex); + if (s->active_pid == 0) + s->active_pid = MyProcPid; + active_pid = s->active_pid; + SpinLockRelease(&s->mutex); + } + else + active_pid = MyProcPid; + LWLockRelease(ReplicationSlotControlLock); + + /* + * If we found the slot but it's already active in another process, we + * either error out, return the PID of the owning process, or retry + * after a short wait, as caller specified. */ if (active_pid != MyProcPid) { @@ -400,24 +443,24 @@ retry: ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", - name, active_pid))); + NameStr(s->data.name), active_pid))); else if (behavior == SAB_Inquire) return active_pid; /* Wait here until we get signaled, and then restart */ - ConditionVariableSleep(&slot->active_cv, + ConditionVariableSleep(&s->active_cv, WAIT_EVENT_REPLICATION_SLOT_DROP); ConditionVariableCancelSleep(); goto retry; } - else - ConditionVariableCancelSleep(); /* no sleep needed after all */ + else if (behavior == SAB_Block) + ConditionVariableCancelSleep(); /* no sleep needed after all */ /* Let everybody know we've modified this slot */ - ConditionVariableBroadcast(&slot->active_cv); + ConditionVariableBroadcast(&s->active_cv); /* We made this slot active, so it's ours now. */ - MyReplicationSlot = slot; + MyReplicationSlot = s; /* success */ return 0; @@ -1100,43 +1143,82 @@ restart: ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn = InvalidXLogRecPtr; NameData slotname; + int wspid; + int last_signaled_pid = 0; if (!s->in_use) continue; SpinLockAcquire(&s->mutex); - if (s->data.restart_lsn == InvalidXLogRecPtr || - s->data.restart_lsn >= oldestLSN) - { - SpinLockRelease(&s->mutex); - continue; - } - slotname = s->data.name; restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); + + if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + continue; LWLockRelease(ReplicationSlotControlLock); + /* Get ready to sleep on the slot in case it is active */ + ConditionVariablePrepareToSleep(&s->active_cv); + for (;;) { - int wspid = ReplicationSlotAcquire(NameStr(slotname), - SAB_Inquire); + /* + * Try to mark this slot as used by this process. + * + * Note that ReplicationSlotAcquireInternal(SAB_Inquire) + * should not cancel the prepared condition variable + * if this slot is active in other process. Because in this case + * we have to wait on that CV for the process owning + * the slot to be terminated, later. + */ + wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire); - /* no walsender? success! */ - if (wspid == 0) + /* + * Exit the loop if we successfully acquired the slot or + * the slot was dropped during waiting for the owning process + * to be terminated. For example, the latter case is likely to + * happen when the slot is temporary because it's automatically + * dropped by the termination of the owning process. + */ + if (wspid <= 0) break; - ereport(LOG, - (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind", - wspid, NameStr(slotname)))); - (void) kill(wspid, SIGTERM); + /* + * Signal to terminate the process that owns the slot. + * + * There is the race condition where other process may own + * the slot after the process using it was terminated and before + * this process owns it. To handle this case, we signal again + * if the PID of the owning process is changed than the last. + * + * XXX This logic assumes that the same PID is not reused + * very quickly. + */ + if (last_signaled_pid != wspid) + { + ereport(LOG, + (errmsg("terminating process %d because replication slot \"%s\" is too far behind", + wspid, NameStr(slotname)))); + (void) kill(wspid, SIGTERM); + last_signaled_pid = wspid; + } ConditionVariableTimedSleep(&s->active_cv, 10, WAIT_EVENT_REPLICATION_SLOT_DROP); } ConditionVariableCancelSleep(); + /* + * Do nothing here and start from scratch if the slot has + * already been dropped. + */ + if (wspid == -1) + { + CHECK_FOR_INTERRUPTS(); + goto restart; + } + ereport(LOG, (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", NameStr(slotname),