rpc: Don't delay notifications when request is pending (#6544)

With the old behavior, if a GUI makes a blocking request that requires user
interaction (like nvim_input()), it would not get any screen updates.

The client, not nvim, should decide how to handle notifications during a
pending request. If an rplugin wants to avoid async calls while a sync call is
busy, it likely wants to avoid processing async calls while another async call
also is handled as well.

This may break the expectation of some existing rplugins. For compatibility,
remote/define.vim reimplements the old behavior. Clients can opt-out by
specifying `sync=urgent`.

- Legacy hosts should be updated to use `sync=urgent`. They could add a flag
  indicating which async methods are always safe to call and which must wait
  until the main loop returns.
- New hosts can expose the full asyncness, they don't need to offer both
  behaviors.

ref #6532
ref #1398 d83868fe90
This commit is contained in:
Björn Linse 2017-10-29 03:06:53 +01:00 committed by Justin M. Keyes
parent 3a938fff09
commit 2a3bcd1ff8
4 changed files with 74 additions and 56 deletions

View File

@ -169,14 +169,40 @@ function! remote#define#FunctionOnChannel(channel, method, sync, name, opts)
exe function_def
endfunction
let s:busy = {}
let s:pending_notifications = {}
function! s:GetRpcFunction(sync)
if a:sync
return 'rpcrequest'
if a:sync ==# 'urgent'
return 'rpcnotify'
elseif a:sync
return 'remote#define#request'
endif
return 'rpcnotify'
return 'remote#define#notify'
endfunction
function! remote#define#notify(chan, ...)
if get(s:busy, a:chan, 0) > 0
let pending = get(s:pending_notifications, a:chan, [])
call add(pending, deepcopy(a:000))
let s:pending_notifications[a:chan] = pending
else
call call('rpcnotify', [a:chan] + a:000)
endif
endfunction
function! remote#define#request(chan, ...)
let s:busy[a:chan] = get(s:busy, a:chan, 0)+1
let val = call('rpcrequest', [a:chan]+a:000)
let s:busy[a:chan] -= 1
if s:busy[a:chan] == 0
for msg in get(s:pending_notifications, a:chan, [])
call call('rpcnotify', [a:chan] + msg)
endfor
let s:pending_notifications[a:chan] = []
endif
return val
endfunction
function! s:GetCommandPrefix(name, opts)
return 'command!'.s:StringifyOpts(a:opts, ['nargs', 'complete', 'range',

View File

@ -2531,6 +2531,7 @@ def CheckSpacing(filename, clean_lines, linenum, nesting_state, error):
r'(?<!\bkhash_t)'
r'(?<!\bkbtree_t)'
r'(?<!\bkbitr_t)'
r'(?<!\bPMap)'
r'\((?:const )?(?:struct )?[a-zA-Z_]\w*(?: *\*(?:const)?)*\)'
r' +'
r'-?(?:\*+|&)?(?:\w+|\+\+|--|\()', cast_line)

View File

@ -56,7 +56,6 @@ typedef struct {
typedef struct {
uint64_t id;
size_t refcount;
size_t pending_requests;
PMap(cstr_t) *subscribed_events;
bool closed;
ChannelType type;
@ -71,7 +70,6 @@ typedef struct {
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
kvec_t(WBuffer *) delayed_notifications;
MultiQueue *events;
} Channel;
@ -205,14 +203,7 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
}
if (channel) {
if (channel->pending_requests) {
// Pending request, queue the notification for later sending.
const String method = cstr_as_string((char *)name);
WBuffer *buffer = serialize_request(id, 0, method, args, &out_buffer, 1);
kv_push(channel->delayed_notifications, buffer);
} else {
send_event(channel, name, args);
}
send_event(channel, name, args);
} else {
broadcast_event(name, args);
}
@ -248,10 +239,8 @@ Object channel_send_call(uint64_t id,
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
kv_push(channel->call_stack, &frame);
channel->pending_requests++;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
channel->pending_requests--;
if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
@ -276,10 +265,6 @@ Object channel_send_call(uint64_t id,
api_free_object(frame.result);
}
if (!channel->pending_requests) {
send_delayed_notifications(channel);
}
decref(channel);
return frame.errored ? NIL : frame.result;
@ -704,11 +689,7 @@ static void broadcast_event(const char *name, Array args)
for (size_t i = 0; i < kv_size(subscribed); i++) {
Channel *channel = kv_A(subscribed, i);
if (channel->pending_requests) {
kv_push(channel->delayed_notifications, buffer);
} else {
channel_write(channel, buffer);
}
channel_write(channel, buffer);
}
end:
@ -786,7 +767,6 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
if (channel->type != kChannelTypeProc) {
multiqueue_free(channel->events);
}
@ -811,11 +791,9 @@ static Channel *register_channel(ChannelType type, uint64_t id,
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
kv_init(rv->delayed_notifications);
pmap_put(uint64_t)(channels, rv->id, rv);
ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
@ -912,16 +890,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}
static void send_delayed_notifications(Channel* channel)
{
for (size_t i = 0; i < kv_size(channel->delayed_notifications); i++) {
WBuffer *buffer = kv_A(channel->delayed_notifications, i);
channel_write(channel, buffer);
}
kv_size(channel->delayed_notifications) = 0;
}
static void incref(Channel *channel)
{
channel->refcount++;

View File

@ -109,7 +109,28 @@ describe('server -> client', function()
end)
describe('requests and notifications interleaved', function()
-- This tests that the following scenario won't happen:
it('does not delay notifications during pending request', function()
local received = false
local function on_setup()
eq("retval", funcs.rpcrequest(cid, "doit"))
stop()
end
local function on_request(method)
if method == "doit" then
funcs.rpcnotify(cid, "headsup")
eq(true,received)
return "retval"
end
end
local function on_notification(method)
if method == "headsup" then
received = true
end
end
run(on_request, on_notification, on_setup)
end)
-- This tests the following scenario:
--
-- server->client [request ] (1)
-- client->server [request ] (2) triggered by (1)
@ -124,36 +145,38 @@ describe('server -> client', function()
-- only deals with one server->client request at a time. (In other words,
-- the client cannot send a response to a request that is not at the top
-- of nvim's request stack).
--
-- But above scenario shoudn't happen by the way notifications are dealt in
-- Nvim: they are only sent after there are no pending server->client
-- request(the request stack fully unwinds). So (3) is only sent after the
-- client returns (6).
it('works', function()
local expected = 300
local notified = 0
pending('will close connection if not properly synchronized', function()
local function on_setup()
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end
local function on_request(method)
eq('notify', method)
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
if method == "notify" then
eq(1, eval('rpcnotify('..cid..', "notification")'))
return 'notified!'
elseif method == "nested" then
-- do some busywork, so the first request will return
-- before this one
for _ = 1, 5 do
eq(2, eval("1+1"))
end
eq(1, eval('rpcnotify('..cid..', "nested_done")'))
return 'done!'
end
end
local function on_notification(method)
eq('notification', method)
if notified == expected then
stop()
return
if method == "notification" then
eq('done!', eval('rpcrequest('..cid..', "nested")'))
elseif method == "nested_done" then
-- this should never have been sent
ok(false)
end
notified = notified + 1
eq('notified!', eval('rpcrequest('..cid..', "notify")'))
end
run(on_request, on_notification, on_setup)
eq(expected, notified)
-- ignore disconnect failure, otherwise detected by after_each
clear()
end)
end)