Merge pull request #4723 from bfredl/rpcstderr

allow stderr handler for rpc jobs and use it to display python/ruby startup error
This commit is contained in:
Björn Linse 2016-08-20 12:58:37 +02:00 committed by GitHub
commit 71b3e20d0f
21 changed files with 323 additions and 196 deletions

View File

@ -5,11 +5,24 @@ endif
let s:loaded_pythonx_provider = 1
let s:stderr = {}
let s:job_opts = {'rpc': v:true}
" TODO(bfredl): this logic is common and should be builtin
function! s:job_opts.on_stderr(chan_id, data, event)
let stderr = get(s:stderr, a:chan_id, [''])
let last = remove(stderr, -1)
let a:data[0] = last.a:data[0]
call extend(stderr, a:data)
let s:stderr[a:chan_id] = stderr
endfunction
function! provider#pythonx#Require(host) abort
let ver = (a:host.orig_name ==# 'python') ? 2 : 3
" Python host arguments
let args = ['-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']
let prog = (ver == '2' ? provider#python#Prog() : provider#python3#Prog())
let args = [prog, '-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']
" Collect registered Python plugins into args
let python_plugins = remote#host#PluginsForHost(a:host.name)
@ -18,14 +31,16 @@ function! provider#pythonx#Require(host) abort
endfor
try
let channel_id = rpcstart((ver ==# '2' ?
\ provider#python#Prog() : provider#python3#Prog()), args)
let channel_id = jobstart(args, s:job_opts)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
for row in get(s:stderr, channel_id, [])
echomsg row
endfor
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name,
\ '$NVIM_PYTHON_LOG_FILE')

View File

@ -4,6 +4,17 @@ if exists('g:loaded_ruby_provider')
endif
let g:loaded_ruby_provider = 1
let s:stderr = {}
let s:job_opts = {'rpc': v:true}
function! s:job_opts.on_stderr(chan_id, data, event)
let stderr = get(s:stderr, a:chan_id, [''])
let last = remove(stderr, -1)
let a:data[0] = last.a:data[0]
call extend(stderr, a:data)
let s:stderr[a:chan_id] = stderr
endfunction
function! provider#ruby#Detect() abort
return exepath('neovim-ruby-host')
endfunction
@ -13,7 +24,7 @@ function! provider#ruby#Prog()
endfunction
function! provider#ruby#Require(host) abort
let args = []
let args = [provider#ruby#Prog()]
let ruby_plugins = remote#host#PluginsForHost(a:host.name)
for plugin in ruby_plugins
@ -21,13 +32,16 @@ function! provider#ruby#Require(host) abort
endfor
try
let channel_id = rpcstart(provider#ruby#Prog(), args)
let channel_id = jobstart(args, s:job_opts)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
for row in get(s:stderr, channel_id, [])
echomsg row
endfor
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_RUBY_LOG_FILE')
endfunction

View File

@ -261,9 +261,8 @@ function! remote#host#LoadErrorForHost(host, log) abort
\ 'You can try to see what happened '.
\ 'by starting Neovim with the environment variable '.
\ a:log . ' set to a file and opening the generated '.
\ 'log file. Also, the host stderr will be available '.
\ 'in Neovim log, so it may contain useful information. '.
\ 'See also ~/.nvimlog.'
\ 'log file. Also, the host stderr is available '.
\ 'in messages.'
endfunction

View File

@ -2043,7 +2043,6 @@ rpcnotify({channel}, {event}[, {args}...])
Sends an |RPC| notification to {channel}
rpcrequest({channel}, {method}[, {args}...])
Sends an |RPC| request to {channel}
rpcstart({prog}[, {argv}]) Spawns {prog} and opens an |RPC| channel
rpcstop({channel}) Closes an |RPC| {channel}
screenattr({row}, {col}) Number attribute at screen position
screenchar({row}, {col}) Number character at screen position
@ -4395,8 +4394,10 @@ items({dict}) *items()*
order.
jobclose({job}[, {stream}]) {Nvim} *jobclose()*
Close {job}'s {stream}, which can be one "stdin", "stdout" or
"stderr". If {stream} is omitted, all streams are closed.
Close {job}'s {stream}, which can be one of "stdin", "stdout",
"stderr" or "rpc" (closes the rpc channel for a job started
with the "rpc" option.) If {stream} is omitted, all streams
are closed.
jobpid({job}) {Nvim} *jobpid()*
Return the pid (process id) of {job}.
@ -4418,6 +4419,10 @@ jobsend({job}, {data}) {Nvim} *jobsend()*
:call jobsend(j, ["abc", "123\n456", ""])
< will send "abc<NL>123<NUL>456<NL>".
If the job was started with the rpc option this function
cannot be used, instead use |rpcnotify()| and |rpcrequest()|
to communicate with the job.
jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
Spawns {cmd} as a job. If {cmd} is a |List| it is run
directly. If {cmd} is a |String| it is processed like this: >
@ -4433,9 +4438,14 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
on_exit : exit event handler (function name or |Funcref|)
cwd : Working directory of the job; defaults to
|current-directory|.
rpc : If set, |msgpack-rpc| will be used to communicate
with the job over stdin and stdout. "on_stdout" is
then ignored, but "on_stderr" can still be used.
pty : If set, the job will be connected to a new pseudo
terminal, and the job streams are connected to
the master file descriptor.
terminal, and the job streams are connected to
the master file descriptor. "on_stderr" is ignored
as all output will be received on stdout.
width : (pty only) Width of the terminal screen
height : (pty only) Height of the terminal screen
TERM : (pty only) $TERM environment variable
@ -4447,10 +4457,12 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
{opts} is passed as |self| to the callback; the caller may
pass arbitrary data by setting other keys.
Returns:
- job ID on success, used by |jobsend()| and |jobstop()|
- The job ID on success, which is used by |jobsend()| (or
|rpcnotify()| and |rpcrequest()| if "rpc" option was used)
and |jobstop()|
- 0 on invalid arguments or if the job table is full
- -1 if {cmd}[0] is not executable.
See |job-control| for more information.
See |job-control| and |msgpack-rpc| for more information.
jobstop({job}) {Nvim} *jobstop()*
Stop a job created with |jobstart()| by sending a `SIGTERM`
@ -5649,19 +5661,20 @@ rpcrequest({channel}, {method}[, {args}...]) {Nvim} *rpcrequest()*
:let result = rpcrequest(rpc_chan, "func", 1, 2, 3)
rpcstart({prog}[, {argv}]) {Nvim} *rpcstart()*
Spawns {prog} as a job (optionally passing the list {argv}),
and opens an |RPC| channel with the spawned process's
stdin/stdout. Returns:
- channel id on success, which is used by |rpcrequest()|,
|rpcnotify()| and |rpcstop()|
- 0 on failure
Example: >
:let rpc_chan = rpcstart('prog', ['arg1', 'arg2'])
Deprecated. Replace >
:let id = rpcstart('prog', ['arg1', 'arg2'])
< with >
:let id = jobstart(['prog', 'arg1', 'arg2'],
{'rpc': v:true})
rpcstop({channel}) {Nvim} *rpcstop()*
Closes an |RPC| {channel}, possibly created via
|rpcstart()|. Also closes channels created by connections to
|v:servername|.
Closes an |RPC| {channel}. If the channel is a job
started with |jobstart()| the job is killed.
It is better to use |jobstop()| in this case, or use
|jobclose|(id, "rpc") to only close the channel without
killing the job.
Closes the socket connection if the channel was opened by
connecting to |v:servername|.
screenattr(row, col) *screenattr()*
Like screenchar(), but return the attribute. This is a rather

View File

@ -11,7 +11,6 @@ RPC API for Nvim *RPC* *rpc* *msgpack-rpc*
3. Connecting |rpc-connecting|
4. Clients |rpc-api-client|
5. Types |rpc-types|
6. Vimscript functions |rpc-vim-functions|
==============================================================================
1. Introduction *rpc-intro*
@ -66,12 +65,16 @@ To get a formatted dump of the API using python (requires the `pyyaml` and
==============================================================================
3. Connecting *rpc-connecting*
There are several ways to open a msgpack-rpc stream to an Nvim server:
There are several ways to open a msgpack-rpc channel to an Nvim instance:
1. Through stdin/stdout when `nvim` is started with `--embed`. This is how
applications can embed Nvim.
2. Through stdin/stdout of some other process spawned by |rpcstart()|.
2. Through stdin/stdout of some other process spawned by |jobstart()|.
Set the "rpc" key to |v:true| in the options dict to use the job's stdin
and stdout as a single msgpack channel that is processed directly by
Nvim. Then it is not possible to process raw data to or from the
process's stdin and stdout. stderr can still be used, though.
3. Through the socket automatically created with each instance. The socket
location is stored in |v:servername|.
@ -110,11 +113,12 @@ functions can be called interactively:
>>> nvim = attach('socket', path='[address]')
>>> nvim.command('echo "hello world!"')
<
You can also embed an Nvim instance via |rpcstart()|
You can also embed an Nvim instance via |jobstart()|, and communicate using
|rpcrequest()| and |rpcnotify()|:
>
let vim = rpcstart('nvim', ['--embed'])
let vim = jobstart(['nvim', '--embed'], {'rpc': v:true})
echo rpcrequest(vim, 'vim_eval', '"Hello " . "world!"')
call rpcstop(vim)
call jobstop(vim)
<
==============================================================================
4. Implementing API clients *rpc-api-client* *api-client*
@ -233,23 +237,5 @@ Even for statically compiled clients it is good practice to avoid hardcoding
the type codes, because a client may be built against one Nvim version but
connect to another with different type codes.
==============================================================================
6. Vimscript functions *rpc-vim-functions*
RPC functions are available in Vimscript:
1. |rpcstart()|: Similarly to |jobstart()|, this will spawn a co-process
with its standard handles connected to Nvim. The difference is that it's
not possible to process raw data to or from the process's stdin, stdout,
or stderr. This is because the job's stdin and stdout are used as
a single msgpack channel that is processed directly by Nvim.
2. |rpcstop()|: Same as |jobstop()|, but operates on handles returned by
|rpcstart()|.
3. |rpcrequest()|: Sends a msgpack-rpc request to the process.
4. |rpcnotify()|: Sends a msgpack-rpc notification to the process.
|rpcrequest()| and |rpcnotify()| can also be used with channels connected to
a nvim server. |v:servername|
==============================================================================
vim:tw=78:ts=8:noet:ft=help:norl:

View File

@ -408,6 +408,7 @@ typedef struct {
Terminal *term;
bool stopped;
bool exited;
bool rpc;
int refcount;
ufunc_T *on_stdout, *on_stderr, *on_exit;
dict_T *self;
@ -448,8 +449,7 @@ typedef struct {
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
valid character */
static uint64_t current_job_id = 1;
static PMap(uint64_t) *jobs = NULL;
static PMap(uint64_t) *jobs = NULL;
static uint64_t last_timer_id = 0;
static PMap(uint64_t) *timers = NULL;
@ -11724,16 +11724,35 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
if (argvars[1].v_type == VAR_STRING) {
char *stream = (char *)argvars[1].vval.v_string;
if (!strcmp(stream, "stdin")) {
process_close_in(proc);
if (data->rpc) {
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
} else {
process_close_in(proc);
}
} else if (!strcmp(stream, "stdout")) {
process_close_out(proc);
if (data->rpc) {
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
} else {
process_close_out(proc);
}
} else if (!strcmp(stream, "stderr")) {
process_close_err(proc);
} else if (!strcmp(stream, "rpc")) {
if (data->rpc) {
channel_close(data->id);
} else {
EMSG(_("Invalid job stream: Not an rpc job"));
}
} else {
EMSG2(_("Invalid job stream \"%s\""), stream);
}
} else {
process_close_streams(proc);
if (data->rpc) {
channel_close(data->id);
process_close_err(proc);
} else {
process_close_streams(proc);
}
}
}
@ -11790,6 +11809,11 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
return;
}
if (data->rpc) {
EMSG(_("Can't send raw data to rpc channel"));
return;
}
ssize_t input_len;
char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false);
if (!input) {
@ -11911,12 +11935,23 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
return;
}
dict_T *job_opts = NULL;
bool detach = false, rpc = false, pty = false;
ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL;
char *cwd = NULL;
if (argvars[1].v_type == VAR_DICT) {
job_opts = argvars[1].vval.v_dict;
detach = get_dict_number(job_opts, (uint8_t *)"detach") != 0;
rpc = get_dict_number(job_opts, (uint8_t *)"rpc") != 0;
pty = get_dict_number(job_opts, (uint8_t *)"pty") != 0;
if (pty && rpc) {
EMSG2(_(e_invarg2), "job cannot have both 'pty' and 'rpc' options set");
shell_free_argv(argv);
return;
}
char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false);
if (new_cwd && strlen(new_cwd) > 0) {
cwd = new_cwd;
@ -11934,10 +11969,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0;
bool detach = job_opts && get_dict_number(job_opts, (uint8_t *)"detach") != 0;
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
job_opts, pty, detach, cwd);
job_opts, pty, rpc, detach, cwd);
Process *proc = (Process *)&data->proc;
if (pty) {
@ -11955,7 +11988,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
}
}
if (!on_stdout) {
if (!rpc && !on_stdout) {
proc->out = NULL;
}
if (!on_stderr) {
@ -14105,7 +14138,7 @@ end:
api_free_object(result);
}
// "rpcstart()" function
// "rpcstart()" function (DEPRECATED)
static void f_rpcstart(typval_T *argvars, typval_T *rettv)
{
rettv->v_type = VAR_NUMBER;
@ -14158,32 +14191,27 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)
// The last item of argv must be NULL
argv[i] = NULL;
uint64_t channel_id = channel_from_process(argv);
if (!channel_id) {
EMSG(_(e_api_spawn_failed));
}
rettv->vval.v_number = (varnumber_T)channel_id;
TerminalJobData *data = common_job_init(argv, NULL, NULL, NULL,
NULL, false, true, false, NULL);
common_job_start(data, rettv);
}
// "rpcstop()" function
static void f_rpcstop(typval_T *argvars, typval_T *rettv)
{
rettv->v_type = VAR_NUMBER;
rettv->vval.v_number = 0;
if (check_restricted() || check_secure()) {
return;
}
if (argvars[0].v_type != VAR_NUMBER) {
// Wrong argument types
EMSG(_(e_invarg));
return;
}
rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
// if called with a job, stop it, else closes the channel
if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) {
f_jobstop(argvars, rettv);
} else {
rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
}
}
/*
@ -16677,7 +16705,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
}
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
job_opts, true, false, cwd);
job_opts, true, false, false, cwd);
data->proc.pty.width = curwin->w_width;
data->proc.pty.height = curwin->w_height;
data->proc.pty.term_name = xstrdup("xterm-256color");
@ -22101,6 +22129,7 @@ static inline TerminalJobData *common_job_init(char **argv,
ufunc_T *on_exit,
dict_T *self,
bool pty,
bool rpc,
bool detach,
char *cwd)
{
@ -22111,6 +22140,7 @@ static inline TerminalJobData *common_job_init(char **argv,
data->on_exit = on_exit;
data->self = self;
data->events = queue_new_child(main_loop.events);
data->rpc = rpc;
if (pty) {
data->proc.pty = pty_process_init(&main_loop, data);
} else {
@ -22130,7 +22160,9 @@ static inline TerminalJobData *common_job_init(char **argv,
return data;
}
/// Return true/false on success/failure.
/// common code for getting job callbacks for jobstart, termopen and rpcstart
///
/// @return true/false on success/failure.
static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,
ufunc_T **on_stderr, ufunc_T **on_exit)
{
@ -22174,15 +22206,22 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
}
xfree(cmd);
data->id = current_job_id++;
wstream_init(proc->in, 0);
if (proc->out) {
rstream_init(proc->out, 0);
rstream_start(proc->out, on_job_stdout);
data->id = next_chan_id++;
if (data->rpc) {
// the rpc channel takes over the in and out streams
channel_from_process(proc, data->id);
} else {
wstream_init(proc->in, 0);
if (proc->out) {
rstream_init(proc->out, 0);
rstream_start(proc->out, on_job_stdout, data);
}
}
if (proc->err) {
rstream_init(proc->err, 0);
rstream_start(proc->err, on_job_stderr);
rstream_start(proc->err, on_job_stderr, data);
}
pmap_put(uint64_t)(jobs, data->id, data);
rettv->vval.v_number = data->id;
@ -22302,12 +22341,18 @@ static void on_process_exit(Process *proc, int status, void *d)
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
terminal_close(data->term, msg);
}
if (data->rpc) {
channel_process_exit(data->id, status);
}
if (data->status_ptr) {
*data->status_ptr = status;
}
process_job_event(data, data->on_exit, "exit", NULL, 0, status);
pmap_del(uint64_t)(jobs, data->id);
term_job_data_decref(data);
}
static void term_write(char *buf, size_t size, void *d)
@ -22355,7 +22400,7 @@ static void term_job_data_decref(TerminalJobData *data)
static void on_job_event(JobEvent *ev)
{
if (!ev->callback) {
goto end;
return;
}
typval_T argv[3];
@ -22391,13 +22436,6 @@ static void on_job_event(JobEvent *ev)
call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum,
curwin->w_cursor.lnum, ev->data->self);
clear_tv(&rettv);
end:
if (!ev->received) {
// exit event, safe to free job data now
pmap_del(uint64_t)(jobs, ev->data->id);
term_job_data_decref(ev->data);
}
}
static TerminalJobData *find_job(uint64_t id)

View File

@ -25,7 +25,7 @@
#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (proc->stream && !proc->stream->closed) { \
stream_close(proc->stream, NULL); \
stream_close(proc->stream, NULL, NULL); \
} \
} while (0)
@ -78,10 +78,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return false;
}
void *data = proc->data;
if (proc->in) {
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe);
proc->in->events = proc->events;
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
@ -89,7 +87,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->out) {
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe);
proc->out->events = proc->events;
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
@ -97,7 +95,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (proc->err) {
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe);
proc->err->events = proc->events;
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
@ -373,7 +371,7 @@ static void flush_stream(Process *proc, Stream *stream)
if (stream->read_cb) {
// Stream callback could miss EOF handling if a child keeps the stream
// open.
stream->read_cb(stream, stream->buffer, 0, stream->data, true);
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
}
break;
}

View File

@ -17,21 +17,19 @@
# include "event/rstream.c.generated.h"
#endif
void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize,
void *data)
void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
stream_init(loop, stream, fd, NULL, data);
stream_init(loop, stream, fd, NULL);
rstream_init(stream, bufsize);
}
void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize,
void *data)
void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize)
FUNC_ATTR_NONNULL_ARG(1)
FUNC_ATTR_NONNULL_ARG(2)
{
stream_init(NULL, stream, -1, uvstream, data);
stream_init(NULL, stream, -1, uvstream);
rstream_init(stream, bufsize);
}
@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize)
/// Starts watching for events from a `Stream` instance.
///
/// @param stream The `Stream` instance
void rstream_start(Stream *stream, stream_read_cb cb)
void rstream_start(Stream *stream, stream_read_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
stream->read_cb = cb;
stream->cb_data = data;
if (stream->uvstream) {
uv_read_start(stream->uvstream, alloc_cb, read_cb);
} else {
@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
{
Stream *stream = data;
assert(stream->read_cb);
rstream_start(stream, stream->read_cb);
rstream_start(stream, stream->read_cb, stream->cb_data);
}
// Callbacks used by libuv
@ -179,7 +178,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
stream->read_cb(stream, stream->buffer, count, stream->data, eof);
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;
if (stream->closed && !stream->pending_reqs) {

View File

@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
return 0;
}
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
uv_stream_t *client;
@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
return result;
}
stream_init(NULL, stream, -1, client, data);
stream_init(NULL, stream, -1, client);
return 0;
}

View File

@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking)
return retval;
}
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
void *data)
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(2)
{
stream->uvstream = uvstream;
@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->uvstream->data = stream;
}
stream->data = data;
stream->internal_data = NULL;
stream->fpos = 0;
stream->curmem = 0;
@ -74,12 +72,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
stream->num_bytes = 0;
}
void stream_close(Stream *stream, stream_close_cb on_stream_close)
void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
FUNC_ATTR_NONNULL_ARG(1)
{
assert(!stream->closed);
stream->closed = true;
stream->close_cb = on_stream_close;
stream->close_cb_data = data;
if (!stream->pending_reqs) {
stream_close_handle(stream);
@ -103,7 +102,7 @@ static void close_cb(uv_handle_t *handle)
rbuffer_free(stream->buffer);
}
if (stream->close_cb) {
stream->close_cb(stream, stream->data);
stream->close_cb(stream, stream->close_cb_data);
}
if (stream->internal_close_cb) {
stream->internal_close_cb(stream, stream->internal_data);

View File

@ -44,13 +44,14 @@ struct stream {
uv_file fd;
stream_read_cb read_cb;
stream_write_cb write_cb;
void *cb_data;
stream_close_cb close_cb, internal_close_cb;
void *close_cb_data, *internal_data;
size_t fpos;
size_t curmem;
size_t maxmem;
size_t pending_reqs;
size_t num_bytes;
void *data, *internal_data;
bool closed;
Queue *events;
};

View File

@ -22,19 +22,17 @@ typedef struct {
# include "event/wstream.c.generated.h"
#endif
void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
void *data)
void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
stream_init(loop, stream, fd, NULL, data);
stream_init(loop, stream, fd, NULL);
wstream_init(stream, maxmem);
}
void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
void *data)
void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
stream_init(NULL, stream, -1, uvstream, data);
stream_init(NULL, stream, -1, uvstream);
wstream_init(stream, maxmem);
}
@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem)
///
/// @param stream The `Stream` instance
/// @param cb The callback
void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
FUNC_ATTR_NONNULL_ALL
void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
stream->write_cb = cb;
stream->cb_data = data;
}
/// Queues data for writing to the backing file descriptor of a `Stream`
@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status)
wstream_release_wbuffer(data->buffer);
if (data->stream->write_cb) {
data->stream->write_cb(data->stream, data->stream->data, status);
data->stream->write_cb(data->stream, data->stream->cb_data, status);
}
data->stream->pending_reqs--;

View File

@ -1244,6 +1244,9 @@ EXTERN char *ignoredp;
// If a msgpack-rpc channel should be started over stdin/stdout
EXTERN bool embedded_mode INIT(= false);
/// next free id for a job or rpc channel
EXTERN uint64_t next_chan_id INIT(= 1);
/// Used to track the status of external functions.
/// Currently only used for iconv().
typedef enum {

View File

@ -19,6 +19,7 @@
#include "nvim/main.h"
#include "nvim/ascii.h"
#include "nvim/memory.h"
#include "nvim/eval.h"
#include "nvim/os_unix.h"
#include "nvim/message.h"
#include "nvim/map.h"
@ -55,12 +56,7 @@ typedef struct {
msgpack_unpacker *unpacker;
union {
Stream stream;
struct {
LibuvProcess uvproc;
Stream in;
Stream out;
Stream err;
} process;
Process *proc;
struct {
Stream in;
Stream out;
@ -79,7 +75,6 @@ typedef struct {
uint64_t request_id;
} RequestEvent;
static uint64_t next_id = 1;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@ -112,33 +107,20 @@ void channel_teardown(void)
}
/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is forwarded to the editor error stream.
/// stdin/stdout. stderr is handled by the job infrastructure.
///
/// @param argv The argument vector for the process. [consumed]
/// @return The channel id (> 0), on success.
/// 0, on error.
uint64_t channel_from_process(char **argv)
uint64_t channel_from_process(Process *proc, uint64_t id)
{
Channel *channel = register_channel(kChannelTypeProc);
channel->data.process.uvproc = libuv_process_init(&main_loop, channel);
Process *proc = &channel->data.process.uvproc.process;
proc->argv = argv;
proc->in = &channel->data.process.in;
proc->out = &channel->data.process.out;
proc->err = &channel->data.process.err;
proc->cb = process_exit;
if (!process_spawn(proc)) {
loop_poll_events(&main_loop, 0);
decref(channel);
return 0;
}
Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
rstream_start(proc->out, parse_msgpack);
rstream_init(proc->err, 0);
rstream_start(proc->err, forward_stderr);
rstream_start(proc->out, parse_msgpack, channel);
return channel->id;
}
@ -148,14 +130,14 @@ uint64_t channel_from_process(char **argv)
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = register_channel(kChannelTypeSocket);
socket_watcher_accept(watcher, &channel->data.stream, channel);
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.stream, parse_msgpack);
rstream_start(&channel->data.stream, parse_msgpack, channel);
}
/// Sends event/arguments to channel
@ -314,30 +296,21 @@ bool channel_close(uint64_t id)
/// Neovim
void channel_from_stdio(void)
{
Channel *channel = register_channel(kChannelTypeStdio);
Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
channel);
rstream_start(&channel->data.std.in, parse_msgpack);
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.std.in, parse_msgpack, channel);
// write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL);
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
}
static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
void *data, bool eof)
void channel_process_exit(uint64_t id, int status)
{
while (rbuffer_size(rbuf)) {
char buf[256];
size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
buf[read] = NUL;
ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
}
}
Channel *channel = pmap_get(uint64_t)(channels, id);
static void process_exit(Process *proc, int status, void *data)
{
decref(data);
channel->closed = true;
decref(channel);
}
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
@ -512,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
success = wstream_write(&channel->data.stream, buffer);
break;
case kChannelTypeProc:
success = wstream_write(&channel->data.process.in, buffer);
success = wstream_write(channel->data.proc->in, buffer);
break;
case kChannelTypeStdio:
success = wstream_write(&channel->data.std.out, buffer);
@ -637,16 +610,17 @@ static void close_channel(Channel *channel)
switch (channel->type) {
case kChannelTypeSocket:
stream_close(&channel->data.stream, NULL);
stream_close(&channel->data.stream, NULL, NULL);
break;
case kChannelTypeProc:
if (!channel->data.process.uvproc.process.closed) {
process_stop(&channel->data.process.uvproc.process);
}
// Only close the rpc channel part,
// there could be an error message on the stderr stream
process_close_in(channel->data.proc);
process_close_out(channel->data.proc);
break;
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL);
stream_close(&channel->data.std.out, NULL);
stream_close(&channel->data.std.in, NULL, NULL);
stream_close(&channel->data.std.out, NULL, NULL);
queue_put(main_loop.fast_events, exit_event, 1, channel);
return;
default:
@ -680,7 +654,9 @@ static void free_channel(Channel *channel)
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
kv_destroy(channel->delayed_notifications);
queue_free(channel->events);
if (channel->type != kChannelTypeProc) {
queue_free(channel->events);
}
xfree(channel);
}
@ -689,15 +665,15 @@ static void close_cb(Stream *stream, void *data)
decref(data);
}
static Channel *register_channel(ChannelType type)
static Channel *register_channel(ChannelType type, uint64_t id, Queue *events)
{
Channel *rv = xmalloc(sizeof(Channel));
rv->events = queue_new_child(main_loop.events);
rv->events = events ? events : queue_new_child(main_loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = next_id++;
rv->id = id > 0 ? id : next_chan_id++;
rv->pending_requests = 0;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;

View File

@ -6,6 +6,7 @@
#include "nvim/api/private/defs.h"
#include "nvim/event/socket.h"
#include "nvim/event/process.h"
#include "nvim/vim.h"
#define METHOD_MAXLEN 512

View File

@ -60,8 +60,8 @@ void input_start(int fd)
}
global_fd = fd;
rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE, NULL);
rstream_start(&read_stream, read_cb);
rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE);
rstream_start(&read_stream, read_cb, NULL);
}
void input_stop(void)
@ -71,7 +71,7 @@ void input_stop(void)
}
rstream_stop(&read_stream);
stream_close(&read_stream, NULL);
stream_close(&read_stream, NULL, NULL);
}
static void cursorhold_event(void **argv)

View File

@ -236,10 +236,10 @@ static int do_os_system(char **argv,
}
proc->out->events = NULL;
rstream_init(proc->out, 0);
rstream_start(proc->out, data_cb);
rstream_start(proc->out, data_cb, &buf);
proc->err->events = NULL;
rstream_init(proc->err, 0);
rstream_start(proc->err, data_cb);
rstream_start(proc->err, data_cb, &buf);
// write the input, if any
if (input) {
@ -251,7 +251,7 @@ static int do_os_system(char **argv,
return -1;
}
// close the input stream after everything is written
wstream_set_write_cb(&in, shell_write_cb);
wstream_set_write_cb(&in, shell_write_cb, NULL);
}
// invoke busy_start here so event_poll_until wont change the busy state for
@ -546,5 +546,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
static void shell_write_cb(Stream *stream, void *data, int status)
{
stream_close(stream, NULL);
stream_close(stream, NULL, NULL);
}

View File

@ -38,7 +38,7 @@ void term_input_init(TermInput *input, Loop *loop)
int curflags = termkey_get_canonflags(input->tk);
termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS);
// setup input handle
rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff, input);
rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff);
// initialize a timer handle for handling ESC with libtermkey
time_watcher_init(loop, &input->timer_handle, input);
}
@ -49,13 +49,13 @@ void term_input_destroy(TermInput *input)
uv_mutex_destroy(&input->key_buffer_mutex);
uv_cond_destroy(&input->key_buffer_cond);
time_watcher_close(&input->timer_handle, NULL);
stream_close(&input->read_stream, NULL);
stream_close(&input->read_stream, NULL, NULL);
termkey_destroy(input->tk);
}
void term_input_start(TermInput *input)
{
rstream_start(&input->read_stream, read_cb);
rstream_start(&input->read_stream, read_cb, input);
}
void term_input_stop(TermInput *input)
@ -340,7 +340,7 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
//
// ls *.md | xargs nvim
input->in_fd = 2;
stream_close(&input->read_stream, NULL);
stream_close(&input->read_stream, NULL, NULL);
queue_put(input->loop->fast_events, restart_reading, 1, input);
} else {
loop_schedule(&main_loop, event_create(1, input_done_event, 0));
@ -391,6 +391,6 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
static void restart_reading(void **argv)
{
TermInput *input = argv[0];
rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff, input);
rstream_start(&input->read_stream, read_cb);
rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff);
rstream_start(&input->read_stream, read_cb, input);
}

View File

@ -0,0 +1,38 @@
local deps_prefix = './.deps/usr'
if os.getenv('DEPS_PREFIX') then
deps_prefix = os.getenv('DEPS_PREFIX')
end
package.path = deps_prefix .. '/share/lua/5.1/?.lua;' ..
deps_prefix .. '/share/lua/5.1/?/init.lua;' ..
package.path
package.cpath = deps_prefix .. '/lib/lua/5.1/?.so;' ..
package.cpath
local mpack = require('mpack')
local StdioStream = require('nvim.stdio_stream')
local Session = require('nvim.session')
local stdio_stream = StdioStream.open()
local session = Session.new(stdio_stream)
local function on_request(method, args)
if method == 'poll' then
return 'ok'
elseif method == 'write_stderr' then
io.stderr:write(args[1])
return "done!"
elseif method == "exit" then
session:stop()
return mpack.NIL
end
end
local function on_notification(event, args)
if event == 'ping' and #args == 0 then
session:notify("vim_eval", "rpcnotify(g:channel, 'pong')")
end
end
session:run(on_request, on_notification)

View File

@ -4,7 +4,9 @@
local helpers = require('test.functional.helpers')(after_each)
local clear, nvim, eval = helpers.clear, helpers.nvim, helpers.eval
local eq, neq, run, stop = helpers.eq, helpers.neq, helpers.run, helpers.stop
local nvim_prog = helpers.nvim_prog
local nvim_prog, command, funcs = helpers.nvim_prog, helpers.command, helpers.funcs
local source, next_message = helpers.source, helpers.next_message
local meths = helpers.meths
describe('server -> client', function()
@ -144,11 +146,11 @@ describe('server -> client', function()
end
before_each(function()
nvim('command', "let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")
command("let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")
neq(0, eval('vim'))
end)
after_each(function() nvim('command', 'call rpcstop(vim)') end)
after_each(function() command('call rpcstop(vim)') end)
it('can send/recieve notifications and make requests', function()
nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
@ -181,4 +183,42 @@ describe('server -> client', function()
eq(true, string.match(err, ': (.*)') == 'Failed to evaluate expression')
end)
end)
describe('when using jobstart', function()
local jobid
before_each(function()
local channel = nvim('get_api_info')[1]
nvim('set_var', 'channel', channel)
source([[
function! s:OnEvent(id, data, event)
call rpcnotify(g:channel, a:event, 0, a:data)
endfunction
let g:job_opts = {
\ 'on_stderr': function('s:OnEvent'),
\ 'on_exit': function('s:OnEvent'),
\ 'user': 0,
\ 'rpc': v:true
\ }
]])
local lua_prog = arg[-1]
meths.set_var("args", {lua_prog, 'test/functional/api/rpc_fixture.lua'})
jobid = eval("jobstart(g:args, g:job_opts)")
neq(0, 'jobid')
end)
after_each(function()
funcs.jobstop(jobid)
end)
it('rpc and text stderr can be combined', function()
eq("ok",funcs.rpcrequest(jobid, "poll"))
funcs.rpcnotify(jobid, "ping")
eq({'notification', 'pong', {}}, next_message())
eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n"))
eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_message())
funcs.rpcrequest(jobid, "exit")
eq({'notification', 'exit', {0, 0}}, next_message())
end)
end)
end)

View File

@ -5,6 +5,7 @@ local clear, eq, eval, execute, feed, insert, neq, next_msg, nvim,
helpers.insert, helpers.neq, helpers.next_message, helpers.nvim,
helpers.nvim_dir, helpers.ok, helpers.source,
helpers.write_file, helpers.mkdir, helpers.rmdir
local command = helpers.command
local Screen = require('test.functional.ui.screen')
@ -429,6 +430,13 @@ describe('jobs', function()
eq({'notification', 'j', {0, {jobid, 'exit'}}}, next_msg())
end)
it('cannot have both rpc and pty options', function()
command("let g:job_opts.pty = v:true")
command("let g:job_opts.rpc = v:true")
local _, err = pcall(command, "let j = jobstart(['cat', '-'], g:job_opts)")
ok(string.find(err, "E475: Invalid argument: job cannot have both 'pty' and 'rpc' options set") ~= nil)
end)
describe('running tty-test program', function()
local function next_chunk()
local rv