Merge #7081 from justinmk/rpcstop

rpc: close channel if stream was closed
This commit is contained in:
Justin M. Keyes 2017-08-26 16:41:35 +02:00 committed by GitHub
commit 6e7a8c3fe2
10 changed files with 105 additions and 29 deletions

View File

@ -233,8 +233,7 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
switch (proc->type) {
case kProcessTypeUv:
// Close the process's stdin. If the process doesn't close its own
// stdout/stderr, they will be closed when it exits(possibly due to being
// terminated after a timeout)
// stdout/stderr, they will be closed when it exits (voluntarily or not).
process_close_in(proc);
ILOG("Sending SIGTERM to pid %d", proc->pid);
uv_kill(proc->pid, SIGTERM);

View File

@ -118,7 +118,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
// won't be called)
&& cnt != 0) {
DLOG("Closing Stream (%p): %s (%s)", stream,
DLOG("closing Stream: %p: %s (%s)", stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true

View File

@ -7,6 +7,7 @@
#include <uv.h>
#include "nvim/log.h"
#include "nvim/rbuffer.h"
#include "nvim/macros.h"
#include "nvim/event/stream.h"
@ -81,6 +82,7 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
DLOG("closing Stream: %p", stream);
stream->closed = true;
stream->close_cb = on_stream_close;
stream->close_cb_data = data;

View File

@ -8,6 +8,7 @@
#include <uv.h>
#include "nvim/log.h"
#include "nvim/event/loop.h"
#include "nvim/event/wstream.h"
#include "nvim/vim.h"

View File

@ -250,7 +250,7 @@ static bool v_do_log_to_file(FILE *log_file, int log_level,
static const char *log_levels[] = {
[DEBUG_LOG_LEVEL] = "DEBUG",
[INFO_LOG_LEVEL] = "INFO ",
[WARNING_LOG_LEVEL] = "WARN ",
[WARN_LOG_LEVEL] = "WARN ",
[ERROR_LOG_LEVEL] = "ERROR",
};
assert(log_level >= DEBUG_LOG_LEVEL && log_level <= ERROR_LOG_LEVEL);

View File

@ -6,7 +6,7 @@
#define DEBUG_LOG_LEVEL 0
#define INFO_LOG_LEVEL 1
#define WARNING_LOG_LEVEL 2
#define WARN_LOG_LEVEL 2
#define ERROR_LOG_LEVEL 3
#define DLOG(...)
@ -43,12 +43,12 @@
__VA_ARGS__)
#endif
#if MIN_LOG_LEVEL <= WARNING_LOG_LEVEL
#if MIN_LOG_LEVEL <= WARN_LOG_LEVEL
# undef WLOG
# undef WLOGN
# define WLOG(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, true, \
# define WLOG(...) do_log(WARN_LOG_LEVEL, __func__, __LINE__, true, \
__VA_ARGS__)
# define WLOGN(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, false, \
# define WLOGN(...) do_log(WARN_LOG_LEVEL, __func__, __LINE__, false, \
__VA_ARGS__)
#endif

View File

@ -62,7 +62,7 @@ typedef struct {
ChannelType type;
msgpack_unpacker *unpacker;
union {
Stream stream;
Stream stream; // bidirectional (socket)
Process *proc;
struct {
Stream in;
@ -133,6 +133,9 @@ uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
rstream_init(proc->out, 0);
rstream_start(proc->out, receive_msgpack, channel);
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
proc->out);
return channel->id;
}
@ -150,6 +153,9 @@ void channel_from_connection(SocketWatcher *watcher)
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.stream, receive_msgpack, channel);
DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
&channel->data.stream);
}
/// @param source description of source function, rplugin name, TCP addr, etc
@ -344,6 +350,9 @@ void channel_from_stdio(void)
rstream_start(&channel->data.std.in, receive_msgpack, channel);
// write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
&channel->data.std.in, &channel->data.std.out);
}
/// Creates a loopback channel. This is used to avoid deadlock
@ -363,6 +372,7 @@ void channel_process_exit(uint64_t id, int status)
decref(channel);
}
// rstream.c:read_event() invokes this as stream->read_cb().
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
void *data, bool eof)
{
@ -374,12 +384,24 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
call_set_error(channel, buf, WARNING_LOG_LEVEL);
call_set_error(channel, buf, WARN_LOG_LEVEL);
goto end;
}
if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
|| (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
char buf[256];
snprintf(buf, sizeof(buf),
"ch %" PRIu64 ": stream closed unexpectedly. "
"closing channel",
channel->id);
call_set_error(channel, buf, WARN_LOG_LEVEL);
goto end;
}
size_t count = rbuffer_size(rbuf);
DLOG("parsing %u bytes of msgpack data from Stream(%p)", count, stream);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
channel->id, count, stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
@ -435,8 +457,8 @@ static void parse_msgpack(Channel *channel)
// causes for this error(search for 'goto _failed')
//
// A not so uncommon cause for this might be deserializing objects with
// a high nesting level: msgpack will break when it's internal parse stack
// size exceeds MSGPACK_EMBED_STACK_SIZE(defined as 32 by default)
// a high nesting level: msgpack will break when its internal parse stack
// size exceeds MSGPACK_EMBED_STACK_SIZE (defined as 32 by default)
send_error(channel, 0, "Invalid msgpack payload. "
"This error can also happen when deserializing "
"an object with high level of nesting");
@ -534,6 +556,39 @@ static void on_request_event(void **argv)
api_clear_error(&error);
}
/// Returns the Stream that a Channel writes to.
static Stream *chan_wstream(Channel *chan)
{
switch (chan->type) {
case kChannelTypeSocket:
return &chan->data.stream;
case kChannelTypeProc:
return chan->data.proc->in;
case kChannelTypeStdio:
return &chan->data.std.out;
case kChannelTypeInternal:
return NULL;
}
abort();
}
/// Returns the Stream that a Channel reads from.
static Stream *chan_rstream(Channel *chan)
{
switch (chan->type) {
case kChannelTypeSocket:
return &chan->data.stream;
case kChannelTypeProc:
return chan->data.proc->out;
case kChannelTypeStdio:
return &chan->data.std.in;
case kChannelTypeInternal:
return NULL;
}
abort();
}
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
@ -545,13 +600,9 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
switch (channel->type) {
case kChannelTypeSocket:
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeProc:
success = wstream_write(channel->data.proc->in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
success = wstream_write(chan_wstream(channel), buffer);
break;
case kChannelTypeInternal:
incref(channel);
@ -565,8 +616,8 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
char buf[256];
snprintf(buf,
sizeof(buf),
"Before returning from a RPC call, ch %" PRIu64 " was "
"closed due to a failed write",
"ch %" PRIu64 ": stream write failed. "
"RPC canceled; closing channel",
channel->id);
call_set_error(channel, buf, ERROR_LOG_LEVEL);
}
@ -817,6 +868,7 @@ static void call_set_error(Channel *channel, char *msg, int loglevel)
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
frame->returned = true;
frame->errored = true;
api_free_object(frame->result);
frame->result = STRING_OBJ(cstr_to_string(msg));
}

View File

@ -88,7 +88,12 @@ bool msgpack_rpc_to_object(const msgpack_object *const obj, Object *const arg)
{
bool ret = true;
kvec_t(MPToAPIObjectStackItem) stack = KV_INITIAL_VALUE;
kv_push(stack, ((MPToAPIObjectStackItem) { obj, arg, false, 0 }));
kv_push(stack, ((MPToAPIObjectStackItem) {
.mobj = obj,
.aobj = arg,
.container = false,
.idx = 0,
}));
while (ret && kv_size(stack)) {
MPToAPIObjectStackItem cur = kv_last(stack);
if (!cur.container) {
@ -361,7 +366,7 @@ typedef struct {
size_t idx;
} APIToMPObjectStackItem;
/// Convert type used by Neovim API to msgpack
/// Convert type used by Nvim API to msgpack type.
///
/// @param[in] result Object to convert.
/// @param[out] res Structure that defines where conversion results are saved.

View File

@ -13,6 +13,7 @@
#include <stdbool.h>
#include <stdlib.h>
#include "nvim/log.h"
#include "nvim/vim.h"
#include "nvim/ascii.h"
#include "nvim/normal.h"

View File

@ -20,6 +20,22 @@ describe('server -> client', function()
cid = nvim('get_api_info')[1]
end)
it('handles unexpected closed stream while preparing RPC response', function()
source([[
let g:_nvim_args = [v:progpath, '--embed', '-n', '-u', 'NONE', '-i', 'NONE', ]
let ch1 = jobstart(g:_nvim_args, {'rpc': v:true})
let child1_ch = rpcrequest(ch1, "nvim_get_api_info")[0]
call rpcnotify(ch1, 'nvim_eval', 'rpcrequest('.child1_ch.', "nvim_get_api_info")')
let ch2 = jobstart(g:_nvim_args, {'rpc': v:true})
let child2_ch = rpcrequest(ch2, "nvim_get_api_info")[0]
call rpcnotify(ch2, 'nvim_eval', 'rpcrequest('.child2_ch.', "nvim_get_api_info")')
call jobstop(ch1)
]])
eq(2, eval("1+1")) -- Still alive?
end)
describe('simple call', function()
it('works', function()
local function on_setup()
@ -141,7 +157,7 @@ describe('server -> client', function()
end)
end)
describe('when the client is a recursive vim instance', function()
describe('recursive (child) nvim client', function()
if os.getenv("TRAVIS") and helpers.os_name() == "osx" then
-- XXX: Hangs Travis macOS since e9061117a5b8f195c3f26a5cb94e18ddd7752d86.
pending("[Hangs on Travis macOS. #5002]", function() end)
@ -155,7 +171,7 @@ describe('server -> client', function()
after_each(function() command('call rpcstop(vim)') end)
it('can send/recieve notifications and make requests', function()
it('can send/receive notifications and make requests', function()
nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
-- Wait for the notification to complete.
@ -188,7 +204,7 @@ describe('server -> client', function()
end)
end)
describe('when using jobstart', function()
describe('jobstart()', function()
local jobid
before_each(function()
local channel = nvim('get_api_info')[1]
@ -227,7 +243,7 @@ describe('server -> client', function()
end)
end)
describe('when connecting to another nvim instance', function()
describe('connecting to another (peer) nvim', function()
local function connect_test(server, mode, address)
local serverpid = funcs.getpid()
local client = spawn(nvim_argv)
@ -256,7 +272,7 @@ describe('server -> client', function()
client:close()
end
it('over a named pipe', function()
it('via named pipe', function()
local server = spawn(nvim_argv)
set_session(server)
local address = funcs.serverlist()[1]
@ -265,7 +281,7 @@ describe('server -> client', function()
connect_test(server, 'pipe', address)
end)
it('to an ip adress', function()
it('via ip address', function()
local server = spawn(nvim_argv)
set_session(server)
local address = funcs.serverstart("127.0.0.1:")
@ -273,7 +289,7 @@ describe('server -> client', function()
connect_test(server, 'tcp', address)
end)
it('to a hostname', function()
it('via hostname', function()
local server = spawn(nvim_argv)
set_session(server)
local address = funcs.serverstart("localhost:")