Merge pull request #4723 from bfredl/rpcstderr
allow stderr handler for rpc jobs and use it to display python/ruby startup error
This commit is contained in:
commit
71b3e20d0f
|
@ -5,11 +5,24 @@ endif
|
|||
|
||||
let s:loaded_pythonx_provider = 1
|
||||
|
||||
let s:stderr = {}
|
||||
let s:job_opts = {'rpc': v:true}
|
||||
|
||||
" TODO(bfredl): this logic is common and should be builtin
|
||||
function! s:job_opts.on_stderr(chan_id, data, event)
|
||||
let stderr = get(s:stderr, a:chan_id, [''])
|
||||
let last = remove(stderr, -1)
|
||||
let a:data[0] = last.a:data[0]
|
||||
call extend(stderr, a:data)
|
||||
let s:stderr[a:chan_id] = stderr
|
||||
endfunction
|
||||
|
||||
function! provider#pythonx#Require(host) abort
|
||||
let ver = (a:host.orig_name ==# 'python') ? 2 : 3
|
||||
|
||||
" Python host arguments
|
||||
let args = ['-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']
|
||||
let prog = (ver == '2' ? provider#python#Prog() : provider#python3#Prog())
|
||||
let args = [prog, '-c', 'import sys; sys.path.remove(""); import neovim; neovim.start_host()']
|
||||
|
||||
" Collect registered Python plugins into args
|
||||
let python_plugins = remote#host#PluginsForHost(a:host.name)
|
||||
|
@ -18,14 +31,16 @@ function! provider#pythonx#Require(host) abort
|
|||
endfor
|
||||
|
||||
try
|
||||
let channel_id = rpcstart((ver ==# '2' ?
|
||||
\ provider#python#Prog() : provider#python3#Prog()), args)
|
||||
let channel_id = jobstart(args, s:job_opts)
|
||||
if rpcrequest(channel_id, 'poll') ==# 'ok'
|
||||
return channel_id
|
||||
endif
|
||||
catch
|
||||
echomsg v:throwpoint
|
||||
echomsg v:exception
|
||||
for row in get(s:stderr, channel_id, [])
|
||||
echomsg row
|
||||
endfor
|
||||
endtry
|
||||
throw remote#host#LoadErrorForHost(a:host.orig_name,
|
||||
\ '$NVIM_PYTHON_LOG_FILE')
|
||||
|
|
|
@ -4,6 +4,17 @@ if exists('g:loaded_ruby_provider')
|
|||
endif
|
||||
let g:loaded_ruby_provider = 1
|
||||
|
||||
let s:stderr = {}
|
||||
let s:job_opts = {'rpc': v:true}
|
||||
|
||||
function! s:job_opts.on_stderr(chan_id, data, event)
|
||||
let stderr = get(s:stderr, a:chan_id, [''])
|
||||
let last = remove(stderr, -1)
|
||||
let a:data[0] = last.a:data[0]
|
||||
call extend(stderr, a:data)
|
||||
let s:stderr[a:chan_id] = stderr
|
||||
endfunction
|
||||
|
||||
function! provider#ruby#Detect() abort
|
||||
return exepath('neovim-ruby-host')
|
||||
endfunction
|
||||
|
@ -13,7 +24,7 @@ function! provider#ruby#Prog()
|
|||
endfunction
|
||||
|
||||
function! provider#ruby#Require(host) abort
|
||||
let args = []
|
||||
let args = [provider#ruby#Prog()]
|
||||
let ruby_plugins = remote#host#PluginsForHost(a:host.name)
|
||||
|
||||
for plugin in ruby_plugins
|
||||
|
@ -21,13 +32,16 @@ function! provider#ruby#Require(host) abort
|
|||
endfor
|
||||
|
||||
try
|
||||
let channel_id = rpcstart(provider#ruby#Prog(), args)
|
||||
let channel_id = jobstart(args, s:job_opts)
|
||||
if rpcrequest(channel_id, 'poll') ==# 'ok'
|
||||
return channel_id
|
||||
endif
|
||||
catch
|
||||
echomsg v:throwpoint
|
||||
echomsg v:exception
|
||||
for row in get(s:stderr, channel_id, [])
|
||||
echomsg row
|
||||
endfor
|
||||
endtry
|
||||
throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_RUBY_LOG_FILE')
|
||||
endfunction
|
||||
|
|
|
@ -261,9 +261,8 @@ function! remote#host#LoadErrorForHost(host, log) abort
|
|||
\ 'You can try to see what happened '.
|
||||
\ 'by starting Neovim with the environment variable '.
|
||||
\ a:log . ' set to a file and opening the generated '.
|
||||
\ 'log file. Also, the host stderr will be available '.
|
||||
\ 'in Neovim log, so it may contain useful information. '.
|
||||
\ 'See also ~/.nvimlog.'
|
||||
\ 'log file. Also, the host stderr is available '.
|
||||
\ 'in messages.'
|
||||
endfunction
|
||||
|
||||
|
||||
|
|
|
@ -2043,7 +2043,6 @@ rpcnotify({channel}, {event}[, {args}...])
|
|||
Sends an |RPC| notification to {channel}
|
||||
rpcrequest({channel}, {method}[, {args}...])
|
||||
Sends an |RPC| request to {channel}
|
||||
rpcstart({prog}[, {argv}]) Spawns {prog} and opens an |RPC| channel
|
||||
rpcstop({channel}) Closes an |RPC| {channel}
|
||||
screenattr({row}, {col}) Number attribute at screen position
|
||||
screenchar({row}, {col}) Number character at screen position
|
||||
|
@ -4395,8 +4394,10 @@ items({dict}) *items()*
|
|||
order.
|
||||
|
||||
jobclose({job}[, {stream}]) {Nvim} *jobclose()*
|
||||
Close {job}'s {stream}, which can be one "stdin", "stdout" or
|
||||
"stderr". If {stream} is omitted, all streams are closed.
|
||||
Close {job}'s {stream}, which can be one of "stdin", "stdout",
|
||||
"stderr" or "rpc" (closes the rpc channel for a job started
|
||||
with the "rpc" option.) If {stream} is omitted, all streams
|
||||
are closed.
|
||||
|
||||
jobpid({job}) {Nvim} *jobpid()*
|
||||
Return the pid (process id) of {job}.
|
||||
|
@ -4418,6 +4419,10 @@ jobsend({job}, {data}) {Nvim} *jobsend()*
|
|||
:call jobsend(j, ["abc", "123\n456", ""])
|
||||
< will send "abc<NL>123<NUL>456<NL>".
|
||||
|
||||
If the job was started with the rpc option this function
|
||||
cannot be used, instead use |rpcnotify()| and |rpcrequest()|
|
||||
to communicate with the job.
|
||||
|
||||
jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
|
||||
Spawns {cmd} as a job. If {cmd} is a |List| it is run
|
||||
directly. If {cmd} is a |String| it is processed like this: >
|
||||
|
@ -4433,9 +4438,14 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
|
|||
on_exit : exit event handler (function name or |Funcref|)
|
||||
cwd : Working directory of the job; defaults to
|
||||
|current-directory|.
|
||||
rpc : If set, |msgpack-rpc| will be used to communicate
|
||||
with the job over stdin and stdout. "on_stdout" is
|
||||
then ignored, but "on_stderr" can still be used.
|
||||
pty : If set, the job will be connected to a new pseudo
|
||||
terminal, and the job streams are connected to
|
||||
the master file descriptor.
|
||||
terminal, and the job streams are connected to
|
||||
the master file descriptor. "on_stderr" is ignored
|
||||
as all output will be received on stdout.
|
||||
|
||||
width : (pty only) Width of the terminal screen
|
||||
height : (pty only) Height of the terminal screen
|
||||
TERM : (pty only) $TERM environment variable
|
||||
|
@ -4447,10 +4457,12 @@ jobstart({cmd}[, {opts}]) {Nvim} *jobstart()*
|
|||
{opts} is passed as |self| to the callback; the caller may
|
||||
pass arbitrary data by setting other keys.
|
||||
Returns:
|
||||
- job ID on success, used by |jobsend()| and |jobstop()|
|
||||
- The job ID on success, which is used by |jobsend()| (or
|
||||
|rpcnotify()| and |rpcrequest()| if "rpc" option was used)
|
||||
and |jobstop()|
|
||||
- 0 on invalid arguments or if the job table is full
|
||||
- -1 if {cmd}[0] is not executable.
|
||||
See |job-control| for more information.
|
||||
See |job-control| and |msgpack-rpc| for more information.
|
||||
|
||||
jobstop({job}) {Nvim} *jobstop()*
|
||||
Stop a job created with |jobstart()| by sending a `SIGTERM`
|
||||
|
@ -5649,19 +5661,20 @@ rpcrequest({channel}, {method}[, {args}...]) {Nvim} *rpcrequest()*
|
|||
:let result = rpcrequest(rpc_chan, "func", 1, 2, 3)
|
||||
|
||||
rpcstart({prog}[, {argv}]) {Nvim} *rpcstart()*
|
||||
Spawns {prog} as a job (optionally passing the list {argv}),
|
||||
and opens an |RPC| channel with the spawned process's
|
||||
stdin/stdout. Returns:
|
||||
- channel id on success, which is used by |rpcrequest()|,
|
||||
|rpcnotify()| and |rpcstop()|
|
||||
- 0 on failure
|
||||
Example: >
|
||||
:let rpc_chan = rpcstart('prog', ['arg1', 'arg2'])
|
||||
Deprecated. Replace >
|
||||
:let id = rpcstart('prog', ['arg1', 'arg2'])
|
||||
< with >
|
||||
:let id = jobstart(['prog', 'arg1', 'arg2'],
|
||||
{'rpc': v:true})
|
||||
|
||||
rpcstop({channel}) {Nvim} *rpcstop()*
|
||||
Closes an |RPC| {channel}, possibly created via
|
||||
|rpcstart()|. Also closes channels created by connections to
|
||||
|v:servername|.
|
||||
Closes an |RPC| {channel}. If the channel is a job
|
||||
started with |jobstart()| the job is killed.
|
||||
It is better to use |jobstop()| in this case, or use
|
||||
|jobclose|(id, "rpc") to only close the channel without
|
||||
killing the job.
|
||||
Closes the socket connection if the channel was opened by
|
||||
connecting to |v:servername|.
|
||||
|
||||
screenattr(row, col) *screenattr()*
|
||||
Like screenchar(), but return the attribute. This is a rather
|
||||
|
|
|
@ -11,7 +11,6 @@ RPC API for Nvim *RPC* *rpc* *msgpack-rpc*
|
|||
3. Connecting |rpc-connecting|
|
||||
4. Clients |rpc-api-client|
|
||||
5. Types |rpc-types|
|
||||
6. Vimscript functions |rpc-vim-functions|
|
||||
|
||||
==============================================================================
|
||||
1. Introduction *rpc-intro*
|
||||
|
@ -66,12 +65,16 @@ To get a formatted dump of the API using python (requires the `pyyaml` and
|
|||
==============================================================================
|
||||
3. Connecting *rpc-connecting*
|
||||
|
||||
There are several ways to open a msgpack-rpc stream to an Nvim server:
|
||||
There are several ways to open a msgpack-rpc channel to an Nvim instance:
|
||||
|
||||
1. Through stdin/stdout when `nvim` is started with `--embed`. This is how
|
||||
applications can embed Nvim.
|
||||
|
||||
2. Through stdin/stdout of some other process spawned by |rpcstart()|.
|
||||
2. Through stdin/stdout of some other process spawned by |jobstart()|.
|
||||
Set the "rpc" key to |v:true| in the options dict to use the job's stdin
|
||||
and stdout as a single msgpack channel that is processed directly by
|
||||
Nvim. Then it is not possible to process raw data to or from the
|
||||
process's stdin and stdout. stderr can still be used, though.
|
||||
|
||||
3. Through the socket automatically created with each instance. The socket
|
||||
location is stored in |v:servername|.
|
||||
|
@ -110,11 +113,12 @@ functions can be called interactively:
|
|||
>>> nvim = attach('socket', path='[address]')
|
||||
>>> nvim.command('echo "hello world!"')
|
||||
<
|
||||
You can also embed an Nvim instance via |rpcstart()|
|
||||
You can also embed an Nvim instance via |jobstart()|, and communicate using
|
||||
|rpcrequest()| and |rpcnotify()|:
|
||||
>
|
||||
let vim = rpcstart('nvim', ['--embed'])
|
||||
let vim = jobstart(['nvim', '--embed'], {'rpc': v:true})
|
||||
echo rpcrequest(vim, 'vim_eval', '"Hello " . "world!"')
|
||||
call rpcstop(vim)
|
||||
call jobstop(vim)
|
||||
<
|
||||
==============================================================================
|
||||
4. Implementing API clients *rpc-api-client* *api-client*
|
||||
|
@ -233,23 +237,5 @@ Even for statically compiled clients it is good practice to avoid hardcoding
|
|||
the type codes, because a client may be built against one Nvim version but
|
||||
connect to another with different type codes.
|
||||
|
||||
==============================================================================
|
||||
6. Vimscript functions *rpc-vim-functions*
|
||||
|
||||
RPC functions are available in Vimscript:
|
||||
|
||||
1. |rpcstart()|: Similarly to |jobstart()|, this will spawn a co-process
|
||||
with its standard handles connected to Nvim. The difference is that it's
|
||||
not possible to process raw data to or from the process's stdin, stdout,
|
||||
or stderr. This is because the job's stdin and stdout are used as
|
||||
a single msgpack channel that is processed directly by Nvim.
|
||||
2. |rpcstop()|: Same as |jobstop()|, but operates on handles returned by
|
||||
|rpcstart()|.
|
||||
3. |rpcrequest()|: Sends a msgpack-rpc request to the process.
|
||||
4. |rpcnotify()|: Sends a msgpack-rpc notification to the process.
|
||||
|
||||
|rpcrequest()| and |rpcnotify()| can also be used with channels connected to
|
||||
a nvim server. |v:servername|
|
||||
|
||||
==============================================================================
|
||||
vim:tw=78:ts=8:noet:ft=help:norl:
|
||||
|
|
118
src/nvim/eval.c
118
src/nvim/eval.c
|
@ -408,6 +408,7 @@ typedef struct {
|
|||
Terminal *term;
|
||||
bool stopped;
|
||||
bool exited;
|
||||
bool rpc;
|
||||
int refcount;
|
||||
ufunc_T *on_stdout, *on_stderr, *on_exit;
|
||||
dict_T *self;
|
||||
|
@ -448,8 +449,7 @@ typedef struct {
|
|||
#define FNE_INCL_BR 1 /* find_name_end(): include [] in name */
|
||||
#define FNE_CHECK_START 2 /* find_name_end(): check name starts with
|
||||
valid character */
|
||||
static uint64_t current_job_id = 1;
|
||||
static PMap(uint64_t) *jobs = NULL;
|
||||
static PMap(uint64_t) *jobs = NULL;
|
||||
|
||||
static uint64_t last_timer_id = 0;
|
||||
static PMap(uint64_t) *timers = NULL;
|
||||
|
@ -11724,16 +11724,35 @@ static void f_jobclose(typval_T *argvars, typval_T *rettv)
|
|||
if (argvars[1].v_type == VAR_STRING) {
|
||||
char *stream = (char *)argvars[1].vval.v_string;
|
||||
if (!strcmp(stream, "stdin")) {
|
||||
process_close_in(proc);
|
||||
if (data->rpc) {
|
||||
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
|
||||
} else {
|
||||
process_close_in(proc);
|
||||
}
|
||||
} else if (!strcmp(stream, "stdout")) {
|
||||
process_close_out(proc);
|
||||
if (data->rpc) {
|
||||
EMSG(_("Invalid stream on rpc job, use jobclose(id, 'rpc')"));
|
||||
} else {
|
||||
process_close_out(proc);
|
||||
}
|
||||
} else if (!strcmp(stream, "stderr")) {
|
||||
process_close_err(proc);
|
||||
} else if (!strcmp(stream, "rpc")) {
|
||||
if (data->rpc) {
|
||||
channel_close(data->id);
|
||||
} else {
|
||||
EMSG(_("Invalid job stream: Not an rpc job"));
|
||||
}
|
||||
} else {
|
||||
EMSG2(_("Invalid job stream \"%s\""), stream);
|
||||
}
|
||||
} else {
|
||||
process_close_streams(proc);
|
||||
if (data->rpc) {
|
||||
channel_close(data->id);
|
||||
process_close_err(proc);
|
||||
} else {
|
||||
process_close_streams(proc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11790,6 +11809,11 @@ static void f_jobsend(typval_T *argvars, typval_T *rettv)
|
|||
return;
|
||||
}
|
||||
|
||||
if (data->rpc) {
|
||||
EMSG(_("Can't send raw data to rpc channel"));
|
||||
return;
|
||||
}
|
||||
|
||||
ssize_t input_len;
|
||||
char *input = (char *) save_tv_as_string(&argvars[1], &input_len, false);
|
||||
if (!input) {
|
||||
|
@ -11911,12 +11935,23 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
dict_T *job_opts = NULL;
|
||||
bool detach = false, rpc = false, pty = false;
|
||||
ufunc_T *on_stdout = NULL, *on_stderr = NULL, *on_exit = NULL;
|
||||
char *cwd = NULL;
|
||||
if (argvars[1].v_type == VAR_DICT) {
|
||||
job_opts = argvars[1].vval.v_dict;
|
||||
|
||||
detach = get_dict_number(job_opts, (uint8_t *)"detach") != 0;
|
||||
rpc = get_dict_number(job_opts, (uint8_t *)"rpc") != 0;
|
||||
pty = get_dict_number(job_opts, (uint8_t *)"pty") != 0;
|
||||
if (pty && rpc) {
|
||||
EMSG2(_(e_invarg2), "job cannot have both 'pty' and 'rpc' options set");
|
||||
shell_free_argv(argv);
|
||||
return;
|
||||
}
|
||||
|
||||
char *new_cwd = (char *)get_dict_string(job_opts, (char_u *)"cwd", false);
|
||||
if (new_cwd && strlen(new_cwd) > 0) {
|
||||
cwd = new_cwd;
|
||||
|
@ -11934,10 +11969,8 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
|
|||
}
|
||||
}
|
||||
|
||||
bool pty = job_opts && get_dict_number(job_opts, (uint8_t *)"pty") != 0;
|
||||
bool detach = job_opts && get_dict_number(job_opts, (uint8_t *)"detach") != 0;
|
||||
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
|
||||
job_opts, pty, detach, cwd);
|
||||
job_opts, pty, rpc, detach, cwd);
|
||||
Process *proc = (Process *)&data->proc;
|
||||
|
||||
if (pty) {
|
||||
|
@ -11955,7 +11988,7 @@ static void f_jobstart(typval_T *argvars, typval_T *rettv)
|
|||
}
|
||||
}
|
||||
|
||||
if (!on_stdout) {
|
||||
if (!rpc && !on_stdout) {
|
||||
proc->out = NULL;
|
||||
}
|
||||
if (!on_stderr) {
|
||||
|
@ -14105,7 +14138,7 @@ end:
|
|||
api_free_object(result);
|
||||
}
|
||||
|
||||
// "rpcstart()" function
|
||||
// "rpcstart()" function (DEPRECATED)
|
||||
static void f_rpcstart(typval_T *argvars, typval_T *rettv)
|
||||
{
|
||||
rettv->v_type = VAR_NUMBER;
|
||||
|
@ -14158,32 +14191,27 @@ static void f_rpcstart(typval_T *argvars, typval_T *rettv)
|
|||
|
||||
// The last item of argv must be NULL
|
||||
argv[i] = NULL;
|
||||
uint64_t channel_id = channel_from_process(argv);
|
||||
|
||||
if (!channel_id) {
|
||||
EMSG(_(e_api_spawn_failed));
|
||||
}
|
||||
|
||||
rettv->vval.v_number = (varnumber_T)channel_id;
|
||||
TerminalJobData *data = common_job_init(argv, NULL, NULL, NULL,
|
||||
NULL, false, true, false, NULL);
|
||||
common_job_start(data, rettv);
|
||||
}
|
||||
|
||||
// "rpcstop()" function
|
||||
static void f_rpcstop(typval_T *argvars, typval_T *rettv)
|
||||
{
|
||||
rettv->v_type = VAR_NUMBER;
|
||||
rettv->vval.v_number = 0;
|
||||
|
||||
if (check_restricted() || check_secure()) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (argvars[0].v_type != VAR_NUMBER) {
|
||||
// Wrong argument types
|
||||
EMSG(_(e_invarg));
|
||||
return;
|
||||
}
|
||||
|
||||
rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
|
||||
// if called with a job, stop it, else closes the channel
|
||||
if (pmap_get(uint64_t)(jobs, argvars[0].vval.v_number)) {
|
||||
f_jobstop(argvars, rettv);
|
||||
} else {
|
||||
rettv->vval.v_number = channel_close(argvars[0].vval.v_number);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -16677,7 +16705,7 @@ static void f_termopen(typval_T *argvars, typval_T *rettv)
|
|||
}
|
||||
|
||||
TerminalJobData *data = common_job_init(argv, on_stdout, on_stderr, on_exit,
|
||||
job_opts, true, false, cwd);
|
||||
job_opts, true, false, false, cwd);
|
||||
data->proc.pty.width = curwin->w_width;
|
||||
data->proc.pty.height = curwin->w_height;
|
||||
data->proc.pty.term_name = xstrdup("xterm-256color");
|
||||
|
@ -22101,6 +22129,7 @@ static inline TerminalJobData *common_job_init(char **argv,
|
|||
ufunc_T *on_exit,
|
||||
dict_T *self,
|
||||
bool pty,
|
||||
bool rpc,
|
||||
bool detach,
|
||||
char *cwd)
|
||||
{
|
||||
|
@ -22111,6 +22140,7 @@ static inline TerminalJobData *common_job_init(char **argv,
|
|||
data->on_exit = on_exit;
|
||||
data->self = self;
|
||||
data->events = queue_new_child(main_loop.events);
|
||||
data->rpc = rpc;
|
||||
if (pty) {
|
||||
data->proc.pty = pty_process_init(&main_loop, data);
|
||||
} else {
|
||||
|
@ -22130,7 +22160,9 @@ static inline TerminalJobData *common_job_init(char **argv,
|
|||
return data;
|
||||
}
|
||||
|
||||
/// Return true/false on success/failure.
|
||||
/// common code for getting job callbacks for jobstart, termopen and rpcstart
|
||||
///
|
||||
/// @return true/false on success/failure.
|
||||
static inline bool common_job_callbacks(dict_T *vopts, ufunc_T **on_stdout,
|
||||
ufunc_T **on_stderr, ufunc_T **on_exit)
|
||||
{
|
||||
|
@ -22174,15 +22206,22 @@ static inline bool common_job_start(TerminalJobData *data, typval_T *rettv)
|
|||
}
|
||||
xfree(cmd);
|
||||
|
||||
data->id = current_job_id++;
|
||||
wstream_init(proc->in, 0);
|
||||
if (proc->out) {
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, on_job_stdout);
|
||||
data->id = next_chan_id++;
|
||||
|
||||
if (data->rpc) {
|
||||
// the rpc channel takes over the in and out streams
|
||||
channel_from_process(proc, data->id);
|
||||
} else {
|
||||
wstream_init(proc->in, 0);
|
||||
if (proc->out) {
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, on_job_stdout, data);
|
||||
}
|
||||
}
|
||||
|
||||
if (proc->err) {
|
||||
rstream_init(proc->err, 0);
|
||||
rstream_start(proc->err, on_job_stderr);
|
||||
rstream_start(proc->err, on_job_stderr, data);
|
||||
}
|
||||
pmap_put(uint64_t)(jobs, data->id, data);
|
||||
rettv->vval.v_number = data->id;
|
||||
|
@ -22302,12 +22341,18 @@ static void on_process_exit(Process *proc, int status, void *d)
|
|||
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
|
||||
terminal_close(data->term, msg);
|
||||
}
|
||||
if (data->rpc) {
|
||||
channel_process_exit(data->id, status);
|
||||
}
|
||||
|
||||
if (data->status_ptr) {
|
||||
*data->status_ptr = status;
|
||||
}
|
||||
|
||||
process_job_event(data, data->on_exit, "exit", NULL, 0, status);
|
||||
|
||||
pmap_del(uint64_t)(jobs, data->id);
|
||||
term_job_data_decref(data);
|
||||
}
|
||||
|
||||
static void term_write(char *buf, size_t size, void *d)
|
||||
|
@ -22355,7 +22400,7 @@ static void term_job_data_decref(TerminalJobData *data)
|
|||
static void on_job_event(JobEvent *ev)
|
||||
{
|
||||
if (!ev->callback) {
|
||||
goto end;
|
||||
return;
|
||||
}
|
||||
|
||||
typval_T argv[3];
|
||||
|
@ -22391,13 +22436,6 @@ static void on_job_event(JobEvent *ev)
|
|||
call_user_func(ev->callback, argc, argv, &rettv, curwin->w_cursor.lnum,
|
||||
curwin->w_cursor.lnum, ev->data->self);
|
||||
clear_tv(&rettv);
|
||||
|
||||
end:
|
||||
if (!ev->received) {
|
||||
// exit event, safe to free job data now
|
||||
pmap_del(uint64_t)(jobs, ev->data->id);
|
||||
term_job_data_decref(ev->data);
|
||||
}
|
||||
}
|
||||
|
||||
static TerminalJobData *find_job(uint64_t id)
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
#define CLOSE_PROC_STREAM(proc, stream) \
|
||||
do { \
|
||||
if (proc->stream && !proc->stream->closed) { \
|
||||
stream_close(proc->stream, NULL); \
|
||||
stream_close(proc->stream, NULL, NULL); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
@ -78,10 +78,8 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
|
|||
return false;
|
||||
}
|
||||
|
||||
void *data = proc->data;
|
||||
|
||||
if (proc->in) {
|
||||
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe, data);
|
||||
stream_init(NULL, proc->in, -1, (uv_stream_t *)&proc->in->uv.pipe);
|
||||
proc->in->events = proc->events;
|
||||
proc->in->internal_data = proc;
|
||||
proc->in->internal_close_cb = on_process_stream_close;
|
||||
|
@ -89,7 +87,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
|
|||
}
|
||||
|
||||
if (proc->out) {
|
||||
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe, data);
|
||||
stream_init(NULL, proc->out, -1, (uv_stream_t *)&proc->out->uv.pipe);
|
||||
proc->out->events = proc->events;
|
||||
proc->out->internal_data = proc;
|
||||
proc->out->internal_close_cb = on_process_stream_close;
|
||||
|
@ -97,7 +95,7 @@ bool process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
|
|||
}
|
||||
|
||||
if (proc->err) {
|
||||
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe, data);
|
||||
stream_init(NULL, proc->err, -1, (uv_stream_t *)&proc->err->uv.pipe);
|
||||
proc->err->events = proc->events;
|
||||
proc->err->internal_data = proc;
|
||||
proc->err->internal_close_cb = on_process_stream_close;
|
||||
|
@ -373,7 +371,7 @@ static void flush_stream(Process *proc, Stream *stream)
|
|||
if (stream->read_cb) {
|
||||
// Stream callback could miss EOF handling if a child keeps the stream
|
||||
// open.
|
||||
stream->read_cb(stream, stream->buffer, 0, stream->data, true);
|
||||
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -17,21 +17,19 @@
|
|||
# include "event/rstream.c.generated.h"
|
||||
#endif
|
||||
|
||||
void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize,
|
||||
void *data)
|
||||
void rstream_init_fd(Loop *loop, Stream *stream, int fd, size_t bufsize)
|
||||
FUNC_ATTR_NONNULL_ARG(1)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
stream_init(loop, stream, fd, NULL, data);
|
||||
stream_init(loop, stream, fd, NULL);
|
||||
rstream_init(stream, bufsize);
|
||||
}
|
||||
|
||||
void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize,
|
||||
void *data)
|
||||
void rstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t bufsize)
|
||||
FUNC_ATTR_NONNULL_ARG(1)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
stream_init(NULL, stream, -1, uvstream, data);
|
||||
stream_init(NULL, stream, -1, uvstream);
|
||||
rstream_init(stream, bufsize);
|
||||
}
|
||||
|
||||
|
@ -48,10 +46,11 @@ void rstream_init(Stream *stream, size_t bufsize)
|
|||
/// Starts watching for events from a `Stream` instance.
|
||||
///
|
||||
/// @param stream The `Stream` instance
|
||||
void rstream_start(Stream *stream, stream_read_cb cb)
|
||||
void rstream_start(Stream *stream, stream_read_cb cb, void *data)
|
||||
FUNC_ATTR_NONNULL_ARG(1)
|
||||
{
|
||||
stream->read_cb = cb;
|
||||
stream->cb_data = data;
|
||||
if (stream->uvstream) {
|
||||
uv_read_start(stream->uvstream, alloc_cb, read_cb);
|
||||
} else {
|
||||
|
@ -81,7 +80,7 @@ static void on_rbuffer_nonfull(RBuffer *buf, void *data)
|
|||
{
|
||||
Stream *stream = data;
|
||||
assert(stream->read_cb);
|
||||
rstream_start(stream, stream->read_cb);
|
||||
rstream_start(stream, stream->read_cb, stream->cb_data);
|
||||
}
|
||||
|
||||
// Callbacks used by libuv
|
||||
|
@ -179,7 +178,7 @@ static void read_event(void **argv)
|
|||
if (stream->read_cb) {
|
||||
size_t count = (uintptr_t)argv[1];
|
||||
bool eof = (uintptr_t)argv[2];
|
||||
stream->read_cb(stream, stream->buffer, count, stream->data, eof);
|
||||
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
|
||||
}
|
||||
stream->pending_reqs--;
|
||||
if (stream->closed && !stream->pending_reqs) {
|
||||
|
|
|
@ -113,7 +113,7 @@ int socket_watcher_start(SocketWatcher *watcher, int backlog, socket_cb cb)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
|
||||
int socket_watcher_accept(SocketWatcher *watcher, Stream *stream)
|
||||
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
uv_stream_t *client;
|
||||
|
@ -133,7 +133,7 @@ int socket_watcher_accept(SocketWatcher *watcher, Stream *stream, void *data)
|
|||
return result;
|
||||
}
|
||||
|
||||
stream_init(NULL, stream, -1, client, data);
|
||||
stream_init(NULL, stream, -1, client);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -30,8 +30,7 @@ int stream_set_blocking(int fd, bool blocking)
|
|||
return retval;
|
||||
}
|
||||
|
||||
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
|
||||
void *data)
|
||||
void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
|
||||
FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
stream->uvstream = uvstream;
|
||||
|
@ -58,7 +57,6 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
|
|||
stream->uvstream->data = stream;
|
||||
}
|
||||
|
||||
stream->data = data;
|
||||
stream->internal_data = NULL;
|
||||
stream->fpos = 0;
|
||||
stream->curmem = 0;
|
||||
|
@ -74,12 +72,13 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream,
|
|||
stream->num_bytes = 0;
|
||||
}
|
||||
|
||||
void stream_close(Stream *stream, stream_close_cb on_stream_close)
|
||||
void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
|
||||
FUNC_ATTR_NONNULL_ARG(1)
|
||||
{
|
||||
assert(!stream->closed);
|
||||
stream->closed = true;
|
||||
stream->close_cb = on_stream_close;
|
||||
stream->close_cb_data = data;
|
||||
|
||||
if (!stream->pending_reqs) {
|
||||
stream_close_handle(stream);
|
||||
|
@ -103,7 +102,7 @@ static void close_cb(uv_handle_t *handle)
|
|||
rbuffer_free(stream->buffer);
|
||||
}
|
||||
if (stream->close_cb) {
|
||||
stream->close_cb(stream, stream->data);
|
||||
stream->close_cb(stream, stream->close_cb_data);
|
||||
}
|
||||
if (stream->internal_close_cb) {
|
||||
stream->internal_close_cb(stream, stream->internal_data);
|
||||
|
|
|
@ -44,13 +44,14 @@ struct stream {
|
|||
uv_file fd;
|
||||
stream_read_cb read_cb;
|
||||
stream_write_cb write_cb;
|
||||
void *cb_data;
|
||||
stream_close_cb close_cb, internal_close_cb;
|
||||
void *close_cb_data, *internal_data;
|
||||
size_t fpos;
|
||||
size_t curmem;
|
||||
size_t maxmem;
|
||||
size_t pending_reqs;
|
||||
size_t num_bytes;
|
||||
void *data, *internal_data;
|
||||
bool closed;
|
||||
Queue *events;
|
||||
};
|
||||
|
|
|
@ -22,19 +22,17 @@ typedef struct {
|
|||
# include "event/wstream.c.generated.h"
|
||||
#endif
|
||||
|
||||
void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
|
||||
void *data)
|
||||
void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
|
||||
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
stream_init(loop, stream, fd, NULL, data);
|
||||
stream_init(loop, stream, fd, NULL);
|
||||
wstream_init(stream, maxmem);
|
||||
}
|
||||
|
||||
void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
|
||||
void *data)
|
||||
void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
|
||||
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
|
||||
{
|
||||
stream_init(NULL, stream, -1, uvstream, data);
|
||||
stream_init(NULL, stream, -1, uvstream);
|
||||
wstream_init(stream, maxmem);
|
||||
}
|
||||
|
||||
|
@ -54,10 +52,11 @@ void wstream_init(Stream *stream, size_t maxmem)
|
|||
///
|
||||
/// @param stream The `Stream` instance
|
||||
/// @param cb The callback
|
||||
void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
|
||||
FUNC_ATTR_NONNULL_ALL
|
||||
void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
|
||||
FUNC_ATTR_NONNULL_ARG(1, 2)
|
||||
{
|
||||
stream->write_cb = cb;
|
||||
stream->cb_data = data;
|
||||
}
|
||||
|
||||
/// Queues data for writing to the backing file descriptor of a `Stream`
|
||||
|
@ -138,7 +137,7 @@ static void write_cb(uv_write_t *req, int status)
|
|||
wstream_release_wbuffer(data->buffer);
|
||||
|
||||
if (data->stream->write_cb) {
|
||||
data->stream->write_cb(data->stream, data->stream->data, status);
|
||||
data->stream->write_cb(data->stream, data->stream->cb_data, status);
|
||||
}
|
||||
|
||||
data->stream->pending_reqs--;
|
||||
|
|
|
@ -1244,6 +1244,9 @@ EXTERN char *ignoredp;
|
|||
// If a msgpack-rpc channel should be started over stdin/stdout
|
||||
EXTERN bool embedded_mode INIT(= false);
|
||||
|
||||
/// next free id for a job or rpc channel
|
||||
EXTERN uint64_t next_chan_id INIT(= 1);
|
||||
|
||||
/// Used to track the status of external functions.
|
||||
/// Currently only used for iconv().
|
||||
typedef enum {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "nvim/main.h"
|
||||
#include "nvim/ascii.h"
|
||||
#include "nvim/memory.h"
|
||||
#include "nvim/eval.h"
|
||||
#include "nvim/os_unix.h"
|
||||
#include "nvim/message.h"
|
||||
#include "nvim/map.h"
|
||||
|
@ -55,12 +56,7 @@ typedef struct {
|
|||
msgpack_unpacker *unpacker;
|
||||
union {
|
||||
Stream stream;
|
||||
struct {
|
||||
LibuvProcess uvproc;
|
||||
Stream in;
|
||||
Stream out;
|
||||
Stream err;
|
||||
} process;
|
||||
Process *proc;
|
||||
struct {
|
||||
Stream in;
|
||||
Stream out;
|
||||
|
@ -79,7 +75,6 @@ typedef struct {
|
|||
uint64_t request_id;
|
||||
} RequestEvent;
|
||||
|
||||
static uint64_t next_id = 1;
|
||||
static PMap(uint64_t) *channels = NULL;
|
||||
static PMap(cstr_t) *event_strings = NULL;
|
||||
static msgpack_sbuffer out_buffer;
|
||||
|
@ -112,33 +107,20 @@ void channel_teardown(void)
|
|||
}
|
||||
|
||||
/// Creates an API channel by starting a process and connecting to its
|
||||
/// stdin/stdout. stderr is forwarded to the editor error stream.
|
||||
/// stdin/stdout. stderr is handled by the job infrastructure.
|
||||
///
|
||||
/// @param argv The argument vector for the process. [consumed]
|
||||
/// @return The channel id (> 0), on success.
|
||||
/// 0, on error.
|
||||
uint64_t channel_from_process(char **argv)
|
||||
uint64_t channel_from_process(Process *proc, uint64_t id)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeProc);
|
||||
channel->data.process.uvproc = libuv_process_init(&main_loop, channel);
|
||||
Process *proc = &channel->data.process.uvproc.process;
|
||||
proc->argv = argv;
|
||||
proc->in = &channel->data.process.in;
|
||||
proc->out = &channel->data.process.out;
|
||||
proc->err = &channel->data.process.err;
|
||||
proc->cb = process_exit;
|
||||
if (!process_spawn(proc)) {
|
||||
loop_poll_events(&main_loop, 0);
|
||||
decref(channel);
|
||||
return 0;
|
||||
}
|
||||
|
||||
Channel *channel = register_channel(kChannelTypeProc, id, proc->events);
|
||||
incref(channel); // process channels are only closed by the exit_cb
|
||||
channel->data.proc = proc;
|
||||
|
||||
wstream_init(proc->in, 0);
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, parse_msgpack);
|
||||
rstream_init(proc->err, 0);
|
||||
rstream_start(proc->err, forward_stderr);
|
||||
rstream_start(proc->out, parse_msgpack, channel);
|
||||
|
||||
return channel->id;
|
||||
}
|
||||
|
@ -148,14 +130,14 @@ uint64_t channel_from_process(char **argv)
|
|||
/// @param watcher The SocketWatcher ready to accept the connection
|
||||
void channel_from_connection(SocketWatcher *watcher)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeSocket);
|
||||
socket_watcher_accept(watcher, &channel->data.stream, channel);
|
||||
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL);
|
||||
socket_watcher_accept(watcher, &channel->data.stream);
|
||||
incref(channel); // close channel only after the stream is closed
|
||||
channel->data.stream.internal_close_cb = close_cb;
|
||||
channel->data.stream.internal_data = channel;
|
||||
wstream_init(&channel->data.stream, 0);
|
||||
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.stream, parse_msgpack);
|
||||
rstream_start(&channel->data.stream, parse_msgpack, channel);
|
||||
}
|
||||
|
||||
/// Sends event/arguments to channel
|
||||
|
@ -314,30 +296,21 @@ bool channel_close(uint64_t id)
|
|||
/// Neovim
|
||||
void channel_from_stdio(void)
|
||||
{
|
||||
Channel *channel = register_channel(kChannelTypeStdio);
|
||||
Channel *channel = register_channel(kChannelTypeStdio, 0, NULL);
|
||||
incref(channel); // stdio channels are only closed on exit
|
||||
// read stream
|
||||
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE,
|
||||
channel);
|
||||
rstream_start(&channel->data.std.in, parse_msgpack);
|
||||
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
|
||||
rstream_start(&channel->data.std.in, parse_msgpack, channel);
|
||||
// write stream
|
||||
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0, NULL);
|
||||
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
|
||||
}
|
||||
|
||||
static void forward_stderr(Stream *stream, RBuffer *rbuf, size_t count,
|
||||
void *data, bool eof)
|
||||
void channel_process_exit(uint64_t id, int status)
|
||||
{
|
||||
while (rbuffer_size(rbuf)) {
|
||||
char buf[256];
|
||||
size_t read = rbuffer_read(rbuf, buf, sizeof(buf) - 1);
|
||||
buf[read] = NUL;
|
||||
ELOG("Channel %" PRIu64 " stderr: %s", ((Channel *)data)->id, buf);
|
||||
}
|
||||
}
|
||||
Channel *channel = pmap_get(uint64_t)(channels, id);
|
||||
|
||||
static void process_exit(Process *proc, int status, void *data)
|
||||
{
|
||||
decref(data);
|
||||
channel->closed = true;
|
||||
decref(channel);
|
||||
}
|
||||
|
||||
static void parse_msgpack(Stream *stream, RBuffer *rbuf, size_t c, void *data,
|
||||
|
@ -512,7 +485,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
|
|||
success = wstream_write(&channel->data.stream, buffer);
|
||||
break;
|
||||
case kChannelTypeProc:
|
||||
success = wstream_write(&channel->data.process.in, buffer);
|
||||
success = wstream_write(channel->data.proc->in, buffer);
|
||||
break;
|
||||
case kChannelTypeStdio:
|
||||
success = wstream_write(&channel->data.std.out, buffer);
|
||||
|
@ -637,16 +610,17 @@ static void close_channel(Channel *channel)
|
|||
|
||||
switch (channel->type) {
|
||||
case kChannelTypeSocket:
|
||||
stream_close(&channel->data.stream, NULL);
|
||||
stream_close(&channel->data.stream, NULL, NULL);
|
||||
break;
|
||||
case kChannelTypeProc:
|
||||
if (!channel->data.process.uvproc.process.closed) {
|
||||
process_stop(&channel->data.process.uvproc.process);
|
||||
}
|
||||
// Only close the rpc channel part,
|
||||
// there could be an error message on the stderr stream
|
||||
process_close_in(channel->data.proc);
|
||||
process_close_out(channel->data.proc);
|
||||
break;
|
||||
case kChannelTypeStdio:
|
||||
stream_close(&channel->data.std.in, NULL);
|
||||
stream_close(&channel->data.std.out, NULL);
|
||||
stream_close(&channel->data.std.in, NULL, NULL);
|
||||
stream_close(&channel->data.std.out, NULL, NULL);
|
||||
queue_put(main_loop.fast_events, exit_event, 1, channel);
|
||||
return;
|
||||
default:
|
||||
|
@ -680,7 +654,9 @@ static void free_channel(Channel *channel)
|
|||
pmap_free(cstr_t)(channel->subscribed_events);
|
||||
kv_destroy(channel->call_stack);
|
||||
kv_destroy(channel->delayed_notifications);
|
||||
queue_free(channel->events);
|
||||
if (channel->type != kChannelTypeProc) {
|
||||
queue_free(channel->events);
|
||||
}
|
||||
xfree(channel);
|
||||
}
|
||||
|
||||
|
@ -689,15 +665,15 @@ static void close_cb(Stream *stream, void *data)
|
|||
decref(data);
|
||||
}
|
||||
|
||||
static Channel *register_channel(ChannelType type)
|
||||
static Channel *register_channel(ChannelType type, uint64_t id, Queue *events)
|
||||
{
|
||||
Channel *rv = xmalloc(sizeof(Channel));
|
||||
rv->events = queue_new_child(main_loop.events);
|
||||
rv->events = events ? events : queue_new_child(main_loop.events);
|
||||
rv->type = type;
|
||||
rv->refcount = 1;
|
||||
rv->closed = false;
|
||||
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
|
||||
rv->id = next_id++;
|
||||
rv->id = id > 0 ? id : next_chan_id++;
|
||||
rv->pending_requests = 0;
|
||||
rv->subscribed_events = pmap_new(cstr_t)();
|
||||
rv->next_request_id = 1;
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
#include "nvim/api/private/defs.h"
|
||||
#include "nvim/event/socket.h"
|
||||
#include "nvim/event/process.h"
|
||||
#include "nvim/vim.h"
|
||||
|
||||
#define METHOD_MAXLEN 512
|
||||
|
|
|
@ -60,8 +60,8 @@ void input_start(int fd)
|
|||
}
|
||||
|
||||
global_fd = fd;
|
||||
rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE, NULL);
|
||||
rstream_start(&read_stream, read_cb);
|
||||
rstream_init_fd(&main_loop, &read_stream, fd, READ_BUFFER_SIZE);
|
||||
rstream_start(&read_stream, read_cb, NULL);
|
||||
}
|
||||
|
||||
void input_stop(void)
|
||||
|
@ -71,7 +71,7 @@ void input_stop(void)
|
|||
}
|
||||
|
||||
rstream_stop(&read_stream);
|
||||
stream_close(&read_stream, NULL);
|
||||
stream_close(&read_stream, NULL, NULL);
|
||||
}
|
||||
|
||||
static void cursorhold_event(void **argv)
|
||||
|
|
|
@ -236,10 +236,10 @@ static int do_os_system(char **argv,
|
|||
}
|
||||
proc->out->events = NULL;
|
||||
rstream_init(proc->out, 0);
|
||||
rstream_start(proc->out, data_cb);
|
||||
rstream_start(proc->out, data_cb, &buf);
|
||||
proc->err->events = NULL;
|
||||
rstream_init(proc->err, 0);
|
||||
rstream_start(proc->err, data_cb);
|
||||
rstream_start(proc->err, data_cb, &buf);
|
||||
|
||||
// write the input, if any
|
||||
if (input) {
|
||||
|
@ -251,7 +251,7 @@ static int do_os_system(char **argv,
|
|||
return -1;
|
||||
}
|
||||
// close the input stream after everything is written
|
||||
wstream_set_write_cb(&in, shell_write_cb);
|
||||
wstream_set_write_cb(&in, shell_write_cb, NULL);
|
||||
}
|
||||
|
||||
// invoke busy_start here so event_poll_until wont change the busy state for
|
||||
|
@ -546,5 +546,5 @@ static size_t write_output(char *output, size_t remaining, bool to_buffer,
|
|||
|
||||
static void shell_write_cb(Stream *stream, void *data, int status)
|
||||
{
|
||||
stream_close(stream, NULL);
|
||||
stream_close(stream, NULL, NULL);
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ void term_input_init(TermInput *input, Loop *loop)
|
|||
int curflags = termkey_get_canonflags(input->tk);
|
||||
termkey_set_canonflags(input->tk, curflags | TERMKEY_CANON_DELBS);
|
||||
// setup input handle
|
||||
rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff, input);
|
||||
rstream_init_fd(loop, &input->read_stream, input->in_fd, 0xfff);
|
||||
// initialize a timer handle for handling ESC with libtermkey
|
||||
time_watcher_init(loop, &input->timer_handle, input);
|
||||
}
|
||||
|
@ -49,13 +49,13 @@ void term_input_destroy(TermInput *input)
|
|||
uv_mutex_destroy(&input->key_buffer_mutex);
|
||||
uv_cond_destroy(&input->key_buffer_cond);
|
||||
time_watcher_close(&input->timer_handle, NULL);
|
||||
stream_close(&input->read_stream, NULL);
|
||||
stream_close(&input->read_stream, NULL, NULL);
|
||||
termkey_destroy(input->tk);
|
||||
}
|
||||
|
||||
void term_input_start(TermInput *input)
|
||||
{
|
||||
rstream_start(&input->read_stream, read_cb);
|
||||
rstream_start(&input->read_stream, read_cb, input);
|
||||
}
|
||||
|
||||
void term_input_stop(TermInput *input)
|
||||
|
@ -340,7 +340,7 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
|
|||
//
|
||||
// ls *.md | xargs nvim
|
||||
input->in_fd = 2;
|
||||
stream_close(&input->read_stream, NULL);
|
||||
stream_close(&input->read_stream, NULL, NULL);
|
||||
queue_put(input->loop->fast_events, restart_reading, 1, input);
|
||||
} else {
|
||||
loop_schedule(&main_loop, event_create(1, input_done_event, 0));
|
||||
|
@ -391,6 +391,6 @@ static void read_cb(Stream *stream, RBuffer *buf, size_t c, void *data,
|
|||
static void restart_reading(void **argv)
|
||||
{
|
||||
TermInput *input = argv[0];
|
||||
rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff, input);
|
||||
rstream_start(&input->read_stream, read_cb);
|
||||
rstream_init_fd(input->loop, &input->read_stream, input->in_fd, 0xfff);
|
||||
rstream_start(&input->read_stream, read_cb, input);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
local deps_prefix = './.deps/usr'
|
||||
if os.getenv('DEPS_PREFIX') then
|
||||
deps_prefix = os.getenv('DEPS_PREFIX')
|
||||
end
|
||||
|
||||
package.path = deps_prefix .. '/share/lua/5.1/?.lua;' ..
|
||||
deps_prefix .. '/share/lua/5.1/?/init.lua;' ..
|
||||
package.path
|
||||
|
||||
package.cpath = deps_prefix .. '/lib/lua/5.1/?.so;' ..
|
||||
package.cpath
|
||||
|
||||
local mpack = require('mpack')
|
||||
local StdioStream = require('nvim.stdio_stream')
|
||||
local Session = require('nvim.session')
|
||||
|
||||
local stdio_stream = StdioStream.open()
|
||||
local session = Session.new(stdio_stream)
|
||||
|
||||
local function on_request(method, args)
|
||||
if method == 'poll' then
|
||||
return 'ok'
|
||||
elseif method == 'write_stderr' then
|
||||
io.stderr:write(args[1])
|
||||
return "done!"
|
||||
elseif method == "exit" then
|
||||
session:stop()
|
||||
return mpack.NIL
|
||||
end
|
||||
end
|
||||
|
||||
local function on_notification(event, args)
|
||||
if event == 'ping' and #args == 0 then
|
||||
session:notify("vim_eval", "rpcnotify(g:channel, 'pong')")
|
||||
end
|
||||
end
|
||||
|
||||
session:run(on_request, on_notification)
|
|
@ -4,7 +4,9 @@
|
|||
local helpers = require('test.functional.helpers')(after_each)
|
||||
local clear, nvim, eval = helpers.clear, helpers.nvim, helpers.eval
|
||||
local eq, neq, run, stop = helpers.eq, helpers.neq, helpers.run, helpers.stop
|
||||
local nvim_prog = helpers.nvim_prog
|
||||
local nvim_prog, command, funcs = helpers.nvim_prog, helpers.command, helpers.funcs
|
||||
local source, next_message = helpers.source, helpers.next_message
|
||||
local meths = helpers.meths
|
||||
|
||||
|
||||
describe('server -> client', function()
|
||||
|
@ -144,11 +146,11 @@ describe('server -> client', function()
|
|||
end
|
||||
|
||||
before_each(function()
|
||||
nvim('command', "let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")
|
||||
command("let vim = rpcstart('"..nvim_prog.."', ['-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--embed'])")
|
||||
neq(0, eval('vim'))
|
||||
end)
|
||||
|
||||
after_each(function() nvim('command', 'call rpcstop(vim)') end)
|
||||
after_each(function() command('call rpcstop(vim)') end)
|
||||
|
||||
it('can send/recieve notifications and make requests', function()
|
||||
nvim('command', "call rpcnotify(vim, 'vim_set_current_line', 'SOME TEXT')")
|
||||
|
@ -181,4 +183,42 @@ describe('server -> client', function()
|
|||
eq(true, string.match(err, ': (.*)') == 'Failed to evaluate expression')
|
||||
end)
|
||||
end)
|
||||
|
||||
describe('when using jobstart', function()
|
||||
local jobid
|
||||
before_each(function()
|
||||
local channel = nvim('get_api_info')[1]
|
||||
nvim('set_var', 'channel', channel)
|
||||
source([[
|
||||
function! s:OnEvent(id, data, event)
|
||||
call rpcnotify(g:channel, a:event, 0, a:data)
|
||||
endfunction
|
||||
let g:job_opts = {
|
||||
\ 'on_stderr': function('s:OnEvent'),
|
||||
\ 'on_exit': function('s:OnEvent'),
|
||||
\ 'user': 0,
|
||||
\ 'rpc': v:true
|
||||
\ }
|
||||
]])
|
||||
local lua_prog = arg[-1]
|
||||
meths.set_var("args", {lua_prog, 'test/functional/api/rpc_fixture.lua'})
|
||||
jobid = eval("jobstart(g:args, g:job_opts)")
|
||||
neq(0, 'jobid')
|
||||
end)
|
||||
|
||||
after_each(function()
|
||||
funcs.jobstop(jobid)
|
||||
end)
|
||||
|
||||
it('rpc and text stderr can be combined', function()
|
||||
eq("ok",funcs.rpcrequest(jobid, "poll"))
|
||||
funcs.rpcnotify(jobid, "ping")
|
||||
eq({'notification', 'pong', {}}, next_message())
|
||||
eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n"))
|
||||
eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_message())
|
||||
funcs.rpcrequest(jobid, "exit")
|
||||
eq({'notification', 'exit', {0, 0}}, next_message())
|
||||
end)
|
||||
end)
|
||||
|
||||
end)
|
||||
|
|
|
@ -5,6 +5,7 @@ local clear, eq, eval, execute, feed, insert, neq, next_msg, nvim,
|
|||
helpers.insert, helpers.neq, helpers.next_message, helpers.nvim,
|
||||
helpers.nvim_dir, helpers.ok, helpers.source,
|
||||
helpers.write_file, helpers.mkdir, helpers.rmdir
|
||||
local command = helpers.command
|
||||
local Screen = require('test.functional.ui.screen')
|
||||
|
||||
|
||||
|
@ -429,6 +430,13 @@ describe('jobs', function()
|
|||
eq({'notification', 'j', {0, {jobid, 'exit'}}}, next_msg())
|
||||
end)
|
||||
|
||||
it('cannot have both rpc and pty options', function()
|
||||
command("let g:job_opts.pty = v:true")
|
||||
command("let g:job_opts.rpc = v:true")
|
||||
local _, err = pcall(command, "let j = jobstart(['cat', '-'], g:job_opts)")
|
||||
ok(string.find(err, "E475: Invalid argument: job cannot have both 'pty' and 'rpc' options set") ~= nil)
|
||||
end)
|
||||
|
||||
describe('running tty-test program', function()
|
||||
local function next_chunk()
|
||||
local rv
|
||||
|
|
Loading…
Reference in New Issue