redis-cli: Accept commands in subscribed mode (#11873)

The message "Reading messages... (press Ctrl-C to quit)" is replaced by
"Reading messages... (press Ctrl-C to quit or any key to type command)".

This allows users to subscribe to more channels, to try out UNSUBSCRIBE and to
combine pubsub with other features such as push messages from client tracking.

The "Reading messages" info message is displayed in the bottom of the output in a
distinct style and moves downward as more messages appear. When any key is pressed,
the info message is replaced by the prompt with for entering commands.
After entering a command and the reply is displayed, the "Reading messages" info
messages appears again. This is added to the repl loop in redis-cli and in the
corresponding place for non-interactive mode.

An indication "(subscribed mode)" is included in the prompt when entering commands
in subscribed mode.

Also:
* Fixes a problem that UNSUBSCRIBE hanged when used with RESP3 and push callback,
  without first entering subscribe mode. It hanged because UNSUBSCRIBE gets one or
  more push replies but no in-band reply.
* Exit subscribed mode after RESET.
This commit is contained in:
Viktor Söderqvist 2023-03-19 11:56:54 +01:00 committed by GitHub
parent c9466b24a6
commit bbf364a442
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 298 additions and 56 deletions

View File

@ -163,6 +163,7 @@ enum KEY_ACTION{
CTRL_F = 6, /* Ctrl-f */
CTRL_H = 8, /* Ctrl-h */
TAB = 9, /* Tab */
NL = 10, /* Enter typed before raw mode was enabled */
CTRL_K = 11, /* Ctrl+k */
CTRL_L = 12, /* Ctrl+l */
ENTER = 13, /* Enter */
@ -256,8 +257,8 @@ static int enableRawMode(int fd) {
* We want read to return every single byte, without timeout. */
raw.c_cc[VMIN] = 1; raw.c_cc[VTIME] = 0; /* 1 byte, no timer */
/* put terminal in raw mode after flushing */
if (tcsetattr(fd,TCSAFLUSH,&raw) < 0) goto fatal;
/* put terminal in raw mode */
if (tcsetattr(fd,TCSANOW,&raw) < 0) goto fatal;
rawmode = 1;
return 0;
@ -268,7 +269,7 @@ fatal:
static void disableRawMode(int fd) {
/* Don't even check the return value as it's too late. */
if (rawmode && tcsetattr(fd,TCSAFLUSH,&orig_termios) != -1)
if (rawmode && tcsetattr(fd,TCSANOW,&orig_termios) != -1)
rawmode = 0;
}
@ -840,6 +841,8 @@ static int linenoiseEdit(int stdin_fd, int stdout_fd, char *buf, size_t buflen,
}
switch(c) {
case NL: /* enter, typed before raw mode was enabled */
break;
case ENTER: /* enter */
history_len--;
free(history[history_len]);

View File

@ -45,6 +45,7 @@
#include <fcntl.h>
#include <limits.h>
#include <math.h>
#include <termios.h>
#include <hiredis.h>
#ifdef USE_OPENSSL
@ -172,6 +173,9 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253
int *spectrum_palette;
int spectrum_palette_size;
static int orig_termios_saved = 0;
static struct termios orig_termios; /* To restore terminal at exit.*/
/* Dict Helpers */
static uint64_t dictSdsHash(const void *key);
static int dictSdsKeyCompare(dict *d, const void *key1,
@ -267,12 +271,14 @@ static struct config {
int eval_ldb_end; /* Lua debugging session ended. */
int enable_ldb_on_eval; /* Handle manual SCRIPT DEBUG + EVAL commands. */
int last_cmd_type;
redisReply *last_reply;
int verbose;
int set_errcode;
clusterManagerCommand cluster_manager_command;
int no_auth_warning;
int resp2;
int resp2; /* value of 1: specified explicitly with option -2 */
int resp3; /* value of 1: specified explicitly, value of 2: implicit like --json option */
int current_resp3; /* 1 if we have RESP3 right now in the current connection. */
int in_multi;
int pre_multi_dbnum;
} config;
@ -335,6 +341,9 @@ static void cliRefreshPrompt(void) {
if (config.in_multi)
prompt = sdscatlen(prompt,"(TX)",4);
if (config.pubsub_mode)
prompt = sdscatfmt(prompt,"(subscribed mode)");
/* Copy the prompt in the static buffer. */
prompt = sdscatlen(prompt,"> ",2);
snprintf(config.prompt,sizeof(config.prompt),"%s",prompt);
@ -1016,6 +1025,29 @@ static void freeHintsCallback(void *ptr) {
sdsfree(ptr);
}
/*------------------------------------------------------------------------------
* TTY manipulation
*--------------------------------------------------------------------------- */
/* Restore terminal if we've changed it. */
void cliRestoreTTY(void) {
if (orig_termios_saved)
tcsetattr(STDIN_FILENO, TCSANOW, &orig_termios);
}
/* Put the terminal in "press any key" mode */
static void cliPressAnyKeyTTY(void) {
if (!isatty(STDIN_FILENO)) return;
if (!orig_termios_saved) {
if (tcgetattr(STDIN_FILENO, &orig_termios) == -1) return;
atexit(cliRestoreTTY);
orig_termios_saved = 1;
}
struct termios mode = orig_termios;
mode.c_lflag &= ~(ECHO | ICANON); /* echoing off, canonical off */
tcsetattr(STDIN_FILENO, TCSANOW, &mode);
}
/*------------------------------------------------------------------------------
* Networking / parsing
*--------------------------------------------------------------------------- */
@ -1088,6 +1120,7 @@ static int cliSwitchProto(void) {
}
}
freeReplyObject(reply);
config.current_resp3 = 1;
return result;
}
@ -1147,6 +1180,9 @@ static int cliConnect(int flags) {
* errors. */
anetKeepAlive(NULL, context->fd, REDIS_CLI_KEEPALIVE_INTERVAL);
/* State of the current connection. */
config.current_resp3 = 0;
/* Do AUTH, select the right DB, switch to RESP3 if needed. */
if (cliAuth(context, config.conn_info.user, config.conn_info.auth) != REDIS_OK)
return REDIS_ERR;
@ -1309,6 +1345,8 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
char numsep;
if (r->type == REDIS_REPLY_SET) numsep = '~';
else if (r->type == REDIS_REPLY_MAP) numsep = '#';
/* TODO: this would be a breaking change for scripts, do that in a major version. */
/* else if (r->type == REDIS_REPLY_PUSH) numsep = '>'; */
else numsep = ')';
snprintf(_prefixfmt,sizeof(_prefixfmt),"%%s%%%ud%c ",idxlen,numsep);
@ -1351,6 +1389,25 @@ static sds cliFormatReplyTTY(redisReply *r, char *prefix) {
return out;
}
/* Returns 1 if the reply is a pubsub pushed reply. */
int isPubsubPush(redisReply *r) {
if (r == NULL ||
r->type != (config.current_resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) ||
r->elements < 3 ||
r->element[0]->type != REDIS_REPLY_STRING)
{
return 0;
}
char *str = r->element[0]->str;
size_t len = r->element[0]->len;
/* Check if it is [p|s][un]subscribe or [p|s]message, but even simpler, we
* just check that it ends with "message" or "subscribe". */
return ((len >= strlen("message") &&
!strcmp(str + len - strlen("message"), "message")) ||
(len >= strlen("subscribe") &&
!strcmp(str + len - strlen("subscribe"), "subscribe")));
}
int isColorTerm(void) {
char *t = getenv("TERM");
return t != NULL && strstr(t,"xterm") != NULL;
@ -1656,6 +1713,11 @@ static int cliReadReply(int output_raw_strings) {
sds out = NULL;
int output = 1;
if (config.last_reply) {
freeReplyObject(config.last_reply);
config.last_reply = NULL;
}
if (redisGetReply(context,&_reply) != REDIS_OK) {
if (config.blocking_state_aborted) {
config.blocking_state_aborted = 0;
@ -1682,7 +1744,7 @@ static int cliReadReply(int output_raw_strings) {
return REDIS_ERR; /* avoid compiler warning */
}
reply = (redisReply*)_reply;
config.last_reply = reply = (redisReply*)_reply;
config.last_cmd_type = reply->type;
@ -1731,15 +1793,78 @@ static int cliReadReply(int output_raw_strings) {
fflush(stdout);
sdsfree(out);
}
freeReplyObject(reply);
return REDIS_OK;
}
/* Simultaneously wait for pubsub messages from redis and input on stdin. */
static void cliWaitForMessagesOrStdin() {
int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) ||
getenv("FAKETTY"));
int use_color = show_info && isColorTerm();
cliPressAnyKeyTTY();
while (config.pubsub_mode) {
/* First check if there are any buffered replies. */
redisReply *reply;
do {
if (redisGetReplyFromReader(context, (void **)&reply) != REDIS_OK) {
cliPrintContextError();
exit(1);
}
if (reply) {
sds out = cliFormatReply(reply, config.output, 0);
fwrite(out,sdslen(out),1,stdout);
fflush(stdout);
sdsfree(out);
}
} while(reply);
/* Wait for input, either on the Redis socket or on stdin. */
struct timeval tv;
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(context->fd, &readfds);
FD_SET(STDIN_FILENO, &readfds);
tv.tv_sec = 5;
tv.tv_usec = 0;
if (show_info) {
if (use_color) printf("\033[1;90m"); /* Bold, bright color. */
printf("Reading messages... (press Ctrl-C to quit or any key to type command)\r");
if (use_color) printf("\033[0m"); /* Reset color. */
fflush(stdout);
}
select(context->fd + 1, &readfds, NULL, NULL, &tv);
if (show_info) {
printf("\033[K"); /* Erase current line */
fflush(stdout);
}
if (config.blocking_state_aborted) {
/* Ctrl-C pressed */
config.blocking_state_aborted = 0;
config.pubsub_mode = 0;
if (cliConnect(CC_FORCE) != REDIS_OK) {
cliPrintContextError();
exit(1);
}
break;
} else if (FD_ISSET(context->fd, &readfds)) {
/* Message from Redis */
if (cliReadReply(0) != REDIS_OK) {
cliPrintContextError();
exit(1);
}
fflush(stdout);
} else if (FD_ISSET(STDIN_FILENO, &readfds)) {
/* Any key pressed */
break;
}
}
cliRestoreTTY();
}
static int cliSendCommand(int argc, char **argv, long repeat) {
char *command = argv[0];
size_t *argvlen;
int j, output_raw;
int is_unsubscribe_command = 0; /* Is it an unsubscribe related command? */
if (context == NULL) return REDIS_ERR;
@ -1775,12 +1900,12 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
if (!strcasecmp(command,"subscribe") ||
!strcasecmp(command,"psubscribe") ||
!strcasecmp(command,"ssubscribe")) config.pubsub_mode = 1;
if (!strcasecmp(command,"unsubscribe") ||
!strcasecmp(command,"punsubscribe") ||
!strcasecmp(command,"sunsubscribe")) is_unsubscribe_command = 1;
int is_subscribe = (!strcasecmp(command, "subscribe") ||
!strcasecmp(command, "psubscribe") ||
!strcasecmp(command, "ssubscribe"));
int is_unsubscribe = (!strcasecmp(command, "unsubscribe") ||
!strcasecmp(command, "punsubscribe") ||
!strcasecmp(command, "sunsubscribe"));
if (!strcasecmp(command,"sync") ||
!strcasecmp(command,"psync")) config.slave_mode = 1;
@ -1812,21 +1937,6 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
while(repeat < 0 || repeat-- > 0) {
redisAppendCommandArgv(context,argc,(const char**)argv,argvlen);
if (is_unsubscribe_command) {
/* In unsubscribe related commands, we need to read the specified
* number of replies according to the number of parameters. */
argc--; /* Skip the command */
do {
if (cliReadReply(output_raw) != REDIS_OK) {
cliPrintContextError();
exit(1);
}
fflush(stdout);
} while(--argc);
zfree(argvlen);
continue;
}
if (config.monitor_mode) {
do {
if (cliReadReply(output_raw) != REDIS_OK) {
@ -1843,27 +1953,15 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_OK;
}
if (config.pubsub_mode) {
if (config.output != OUTPUT_RAW)
printf("Reading messages... (press Ctrl-C to quit)\n");
int num_expected_pubsub_push = 0;
if (is_subscribe || is_unsubscribe) {
/* When a push callback is set, redisGetReply (hiredis) loops until
* an in-band message is received, but these commands are confirmed
* using push replies only. There is one push reply per channel if
* channels are specified, otherwise at least one. */
num_expected_pubsub_push = argc > 1 ? argc - 1 : 1;
/* Unset our default PUSH handler so this works in RESP2/RESP3 */
redisSetPushCallback(context, NULL);
while (config.pubsub_mode) {
if (cliReadReply(output_raw) != REDIS_OK) {
cliPrintContextError();
exit(1);
}
fflush(stdout); /* Make it grep friendly */
if (!config.pubsub_mode || config.last_cmd_type == REDIS_REPLY_ERROR) {
if (config.push_output) {
redisSetPushCallback(context, cliPushHandler);
}
config.pubsub_mode = 0;
}
}
continue;
}
if (config.slave_mode) {
@ -1874,10 +1972,35 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
return REDIS_ERR; /* Error = slaveMode lost connection to master */
}
if (cliReadReply(output_raw) != REDIS_OK) {
zfree(argvlen);
return REDIS_ERR;
} else {
/* Read response, possibly skipping pubsub/push messages. */
while (1) {
if (cliReadReply(output_raw) != REDIS_OK) {
zfree(argvlen);
return REDIS_ERR;
}
fflush(stdout);
if (config.pubsub_mode || num_expected_pubsub_push > 0) {
if (isPubsubPush(config.last_reply)) {
if (num_expected_pubsub_push > 0 &&
!strcasecmp(config.last_reply->element[0]->str, command))
{
/* This pushed message confirms the
* [p|s][un]subscribe command. */
if (is_subscribe && !config.pubsub_mode) {
config.pubsub_mode = 1;
cliRefreshPrompt();
}
if (--num_expected_pubsub_push > 0) {
continue; /* We need more of these. */
}
} else {
continue; /* Skip this pubsub message. */
}
} else if (config.last_reply->type == REDIS_REPLY_PUSH) {
continue; /* Skip other push message. */
}
}
/* Store database number when SELECT was successfully executed. */
if (!strcasecmp(command,"select") && argc == 2 &&
config.last_cmd_type != REDIS_REPLY_ERROR)
@ -1911,9 +2034,25 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
config.in_multi = 0;
config.dbnum = 0;
config.conn_info.input_dbnum = 0;
config.resp3 = 0;
config.current_resp3 = 0;
if (config.pubsub_mode && config.push_output) {
redisSetPushCallback(context, cliPushHandler);
}
config.pubsub_mode = 0;
cliRefreshPrompt();
} else if (!strcasecmp(command,"hello")) {
if (config.last_cmd_type == REDIS_REPLY_MAP) {
config.current_resp3 = 1;
} else if (config.last_cmd_type == REDIS_REPLY_ARRAY) {
config.current_resp3 = 0;
}
} else if ((is_subscribe || is_unsubscribe) && !config.pubsub_mode) {
/* We didn't enter pubsub mode. Restore push callback. */
if (config.push_output)
redisSetPushCallback(context, cliPushHandler);
}
break;
}
if (config.cluster_reissue_command){
/* If we need to reissue the command, break to prevent a
@ -2664,8 +2803,17 @@ static void repl(void) {
}
cliRefreshPrompt();
while((line = linenoise(context ? config.prompt : "not connected> ")) != NULL) {
if (line[0] != '\0') {
while(1) {
line = linenoise(context ? config.prompt : "not connected> ");
if (line == NULL) {
/* ^C, ^D or similar. */
if (config.pubsub_mode) {
config.pubsub_mode = 0;
if (cliConnect(CC_FORCE) == REDIS_OK)
continue;
}
break;
} else if (line[0] != '\0') {
long repeat = 1;
int skipargs = 0;
char *endptr = NULL;
@ -2759,6 +2907,11 @@ static void repl(void) {
/* Free the argument vector */
sdsfreesplitres(argv,argc);
}
if (config.pubsub_mode) {
cliWaitForMessagesOrStdin();
}
/* linenoise() returns malloc-ed lines like readline() */
linenoiseFree(line);
}
@ -2799,6 +2952,13 @@ static int noninteractive(int argc, char **argv) {
retval = issueCommand(argc, sds_args);
sdsfreesplitres(sds_args, argc);
while (config.pubsub_mode) {
if (cliReadReply(0) != REDIS_OK) {
cliPrintContextError();
exit(1);
}
fflush(stdout);
}
return retval == REDIS_OK ? 0 : 1;
}
@ -9011,6 +9171,7 @@ int main(int argc, char **argv) {
config.eval_ldb_sync = 0;
config.enable_ldb_on_eval = 0;
config.last_cmd_type = -1;
config.last_reply = NULL;
config.verbose = 0;
config.set_errcode = 0;
config.no_auth_warning = 0;

View File

@ -60,7 +60,7 @@ start_server {tags {"cli"}} {
# Helpers to run tests in interactive mode
proc format_output {output} {
set _ [string trimright [regsub -all "\r" $output ""] "\n"]
set _ [string trimright $output "\n"]
}
proc run_command {fd cmd} {
@ -76,6 +76,12 @@ start_server {tags {"cli"}} {
unset ::env(FAKETTY)
}
proc test_interactive_nontty_cli {name code} {
set fd [open_cli]
test "Interactive non-TTY CLI: $name" $code
close_cli $fd
}
# Helpers to run tests where stdout is not a tty
proc write_tmpfile {contents} {
set tmp [tmpfile "cli"]
@ -142,7 +148,8 @@ start_server {tags {"cli"}} {
test_interactive_cli "INFO response should be printed raw" {
set lines [split [run_command $fd info] "\n"]
foreach line $lines {
if {![regexp {^$|^#|^[^#:]+:} $line]} {
# Info lines end in \r\n, so they now end in \r.
if {![regexp {^\r$|^#|^[^#:]+:} $line]} {
fail "Malformed info line: $line"
}
}
@ -186,6 +193,77 @@ start_server {tags {"cli"}} {
assert_equal "bar" [r get key]
}
test_interactive_cli "Subscribed mode" {
set reading "Reading messages... (press Ctrl-C to quit or any key to type command)\r"
set erase "\033\[K"; # Erases the "Reading messages..." line.
# Subscribe to some channels.
set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n"
set sub2 "1) \"subscribe\"\n2) \"ch2\"\n3) (integer) 2\n"
set sub3 "1) \"subscribe\"\n2) \"ch3\"\n3) (integer) 3\n"
assert_equal $sub1$sub2$sub3$reading \
[run_command $fd "subscribe ch1 ch2 ch3"]
# Receive pubsub message.
r publish ch2 hello
set message "1) \"message\"\n2) \"ch2\"\n3) \"hello\"\n"
assert_equal $erase$message$reading [read_cli $fd]
# Unsubscribe some.
set unsub1 "1) \"unsubscribe\"\n2) \"ch1\"\n3) (integer) 2\n"
set unsub2 "1) \"unsubscribe\"\n2) \"ch2\"\n3) (integer) 1\n"
assert_equal $erase$unsub1$unsub2$reading \
[run_command $fd "unsubscribe ch1 ch2"]
# Command forbidden in subscribed mode (RESP2).
set err "(error) ERR Can't execute 'get': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context\n"
assert_equal $erase$err$reading [run_command $fd "get k"]
# Command allowed in subscribed mode.
set pong "1) \"pong\"\n2) \"\"\n"
assert_equal $erase$pong$reading [run_command $fd "ping"]
# Reset exits subscribed mode.
assert_equal ${erase}RESET [run_command $fd "reset"]
assert_equal PONG [run_command $fd "ping"]
# Check TTY output of push messages in RESP3 has ")" prefix (to be changed to ">" in the future).
assert_match "1#*" [run_command $fd "hello 3"]
set sub1 "1) \"subscribe\"\n2) \"ch1\"\n3) (integer) 1\n"
assert_equal $sub1$reading \
[run_command $fd "subscribe ch1"]
}
test_interactive_nontty_cli "Subscribed mode" {
# Raw output and no "Reading messages..." info message.
# Use RESP3 in this test case.
assert_match {*proto 3*} [run_command $fd "hello 3"]
# Subscribe to some channels.
set sub1 "subscribe\nch1\n1"
set sub2 "subscribe\nch2\n2"
assert_equal $sub1\n$sub2 \
[run_command $fd "subscribe ch1 ch2"]
assert_equal OK [run_command $fd "client tracking on"]
assert_equal OK [run_command $fd "set k 42"]
assert_equal 42 [run_command $fd "get k"]
# Interleaving invalidate and pubsub messages.
r publish ch1 hello
r del k
r publish ch2 world
set message1 "message\nch1\nhello"
set invalidate "invalidate\nk"
set message2 "message\nch2\nworld"
assert_equal $message1\n$invalidate\n$message2\n [read_cli $fd]
# Unsubscribe all.
set unsub1 "unsubscribe\nch1\n1"
set unsub2 "unsubscribe\nch2\n0"
assert_equal $unsub1\n$unsub2 [run_command $fd "unsubscribe ch1 ch2"]
}
test_tty_cli "Status reply" {
assert_equal "OK" [run_cli set key bar]
assert_equal "bar" [r get key]