Merge pull request #6844 from bfredl/channel

channels: support buffered output and bytes sockets/stdio
This commit is contained in:
Björn Linse 2017-11-26 10:18:01 +01:00 committed by GitHub
commit 207b7ca4bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 2174 additions and 1244 deletions

View File

@ -1,20 +0,0 @@
" Common functionality for providers
let s:stderr = {}
function! provider#stderr_collector(chan_id, data, event)
let stderr = get(s:stderr, a:chan_id, [''])
let stderr[-1] .= a:data[0]
call extend(stderr, a:data[1:])
let s:stderr[a:chan_id] = stderr
endfunction
function! provider#clear_stderr(chan_id)
if has_key(s:stderr, a:chan_id)
call remove(s:stderr, a:chan_id)
endif
endfunction
function! provider#get_stderr(chan_id)
return get(s:stderr, a:chan_id, [])
endfunction

View File

@ -7,7 +7,7 @@ let s:clipboard = {}
" When caching is enabled, store the jobid of the xclip/xsel process keeping
" ownership of the selection, so we know how long the cache is valid.
let s:selection = { 'owner': 0, 'data': [], 'on_stderr': function('provider#stderr_collector') }
let s:selection = { 'owner': 0, 'data': [], 'stderr_buffered': v:true }
function! s:selection.on_exit(jobid, data, event) abort
" At this point this nvim instance might already have launched
@ -16,12 +16,10 @@ function! s:selection.on_exit(jobid, data, event) abort
let self.owner = 0
endif
if a:data != 0
let stderr = provider#get_stderr(a:jobid)
echohl WarningMsg
echomsg 'clipboard: error invoking '.get(self.argv, 0, '?').': '.join(stderr)
echomsg 'clipboard: error invoking '.get(self.argv, 0, '?').': '.join(self.stderr)
echohl None
endif
call provider#clear_stderr(a:jobid)
endfunction
let s:selections = { '*': s:selection, '+': copy(s:selection) }

View File

@ -3,7 +3,7 @@ if exists('g:loaded_node_provider')
endif
let g:loaded_node_provider = 1
let s:job_opts = {'rpc': v:true, 'on_stderr': function('provider#stderr_collector')}
let s:job_opts = {'rpc': v:true, 'stderr_buffered': v:true}
function! provider#node#Detect() abort
return has('win32') ? exepath('neovim-node-host.cmd') : exepath('neovim-node-host')
@ -32,19 +32,18 @@ function! provider#node#Require(host) abort
endif
try
let channel_id = jobstart(args, s:job_opts)
let job = copy(s:job_opts)
let channel_id = jobstart(args, job)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
for row in provider#get_stderr(channel_id)
for row in job.stderr
echomsg row
endfor
endtry
finally
call provider#clear_stderr(channel_id)
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name, '$NVIM_NODE_LOG_FILE')
endfunction

View File

@ -5,7 +5,7 @@ endif
let s:loaded_pythonx_provider = 1
let s:job_opts = {'rpc': v:true, 'on_stderr': function('provider#stderr_collector')}
let s:job_opts = {'rpc': v:true, 'stderr_buffered': v:true}
function! provider#pythonx#Require(host) abort
let ver = (a:host.orig_name ==# 'python') ? 2 : 3
@ -21,18 +21,17 @@ function! provider#pythonx#Require(host) abort
endfor
try
let channel_id = jobstart(args, s:job_opts)
let job = copy(s:job_opts)
let channel_id = jobstart(args, job)
if rpcrequest(channel_id, 'poll') ==# 'ok'
return channel_id
endif
catch
echomsg v:throwpoint
echomsg v:exception
for row in provider#get_stderr(channel_id)
for row in job.stderr
echomsg row
endfor
finally
call provider#clear_stderr(channel_id)
endtry
throw remote#host#LoadErrorForHost(a:host.orig_name,
\ '$NVIM_PYTHON_LOG_FILE')

168
runtime/doc/channel.txt Normal file
View File

@ -0,0 +1,168 @@
*channel.txt* Nvim
NVIM REFERENCE MANUAL by Thiago de Arruda
Nvim's facilities for async io *channel*
Type <M-]> to see the table of contents.
==============================================================================
1. Introduction *channel-intro*
Channels are nvim's way of communicating with external processes.
There are several ways to open a channel:
1. Through stdin/stdout when `nvim` is started with `--headless`, and a startup
script or --cmd command opens the stdio channel using |stdioopen()|.
2. Through stdin, stdout and stderr of a process spawned by |jobstart()|.
3. Through the PTY master end of a PTY opened with
`jobstart(..., {'pty': v:true})` or |termopen()|.
4. By connecting to a TCP/IP socket or named pipe with |sockconnect()|.
5. By another process connecting to a socket listened to by nvim. This only
supports RPC channels, see |rpc-connecting|.
Channels support multiple modes or protocols. In the most basic
mode of operation, raw bytes are read and written to the channel.
The |rpc| protocol, based on the msgpack-rpc standard, enables nvim and the
process at the other end to send remote calls and events to each other.
Additionally, the builtin |terminal-emulator|, is implemented on top of PTY
channels.
==============================================================================
2. Reading and writing raw bytes *channel-bytes*
By default, channels opened by vimscript functions will operate with raw
bytes. Additionally, for a job channel using rpc, bytes can still be
read over its stderr. Similarily, only bytes can be written to nvim's own stderr.
*channel-callback* *buffered*
*on_stdout* *on_stderr* *on_stdin* *on_data*
A callback function `on_{stream}` will be invoked with data read from the
channel. By default, the callback will be invoked immediately when data is
available, to facilitate interactive communication. The same callback will
then be invoked with empty data, to indicate that the stream reached EOF.
Alternatively the `{stream}_buffered` option can be set to invoke the callback
only when the underlying stream reaches EOF, and will then be passed in
complete output. This is helpful when only the complete output is useful, and
not partial data. Futhermore if `{stream}_buffered` is set but not a callback,
the data is saved in the options dict, with the stream name as key.
- The arguments passed to the callback function are:
0: The channel id
1: the raw data read from the channel, formatted as a |readfile()|-style
list. If EOF occured, a single empty string `['']` will be passed in.
Note that the items in this list do not directly correspond to actual
lines in the output. See |channel-lines|
2: Stream name as a string, like `"stdout"`. This is to allow multiple
on_{event} handlers to be implemented by the same function. The available
events depend on how the channel was opened and in what mode/protocol.
*channel-lines*
Note:
stream event handlers may receive partial (incomplete) lines. For a given
invocation of on_stdout etc, `a:data` is not guaranteed to end
with a newline.
- `abcdefg` may arrive as `['abc']`, `['defg']`.
- `abc\nefg` may arrive as `['abc', '']`, `['efg']` or `['abc']`,
`['','efg']`, or even `['ab']`, `['c','efg']`.
If you only are interested in complete output when the process exits,
use buffered mode. Otherwise, an easy way to deal with this:
initialize a list as `['']`, then append to it as follows: >
let s:chunks = ['']
func! s:on_event(job_id, data, event) dict
let s:chunks[-1] .= a:data[0]
call extend(s:chunks, a:data[1:])
endf
<
Additionally, if the callbacks are Dictionary functions, |self| can be used to
refer to the options dictionary containing the callbacks. |Partial|s can also be
used as callbacks.
Data can be sent to the channel using the |chansend()| function. Here is a
simple example, echoing some data through a cat-process:
>
function! s:OnEvent(id, data, event) dict
let str = join(a:data, "\n")
echomsg str
endfunction
let id = jobstart(['cat'], {'on_stdout': function('s:OnEvent') } )
call chansend(id, "hello!")
<
Here is a example of setting a buffer to the result of grep, but only after
all data has been processed:
>
function! s:OnEvent(id, data, event) dict
call nvim_buf_set_lines(2, 0, -1, v:true, a:data)
endfunction
let id = jobstart(['grep', '^[0-9]'], { 'on_stdout': function('s:OnEvent'),
\ 'stdout_buffered':v:true } )
call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")
" no output is received, buffer is empty
call chansend(id, "xx\n20 GOTO 10\nzz\n")
call chanclose(id, 'stdin')
" now buffer has result
<
For additional examples with jobs, see |job-control|.
*channel-pty*
A special case is PTY channels opened by `jobstart(..., {'pty': v:true})` .
No preprocessing of ANSI escape sequences is done, these will be sent raw to
the callback. However, change of PTY size can be signaled to the slave using
|jobresize()|. See also |terminal-emulator|.
==============================================================================
3. Communicating using msgpack-rpc *channel-rpc*
When channels are opened with the `rpc` option set to true, the channel can be
used for remote method calls in both directions, see |msgpack-rpc|. Note that
rpc channels are implicitly trusted and the process at the other end can
invoke any |api| function!
==============================================================================
4. Using the stdio channel *channel-stdio*
When invoked normally, nvim will use stdin and stdout to interact with the
user over the terminal interface (TUI). However when invoked with
`--headless`, the TUI is not started and stdin and stdout can be used as a
channel. To open the stdio channel |stdioopen()| must be called during
|startup|, as there later will be no way of invoking a command. As a
convenience, the stdio channel will always have channel id 1.
Here is an example:
>
func! OnEvent(id, data, event)
if a:data == [""]
quit
end
call chansend(a:id, map(a:data, {i,v -> toupper(v)}))
endfunc
call stdioopen({'on_stdin': 'OnEvent'})
<
Put this in `uppercase.vim` and invoke nvim with
>
nvim --headless --cmd "source uppercase.vim"
<
*--embed*
An common use case is another program embedding nvim and communicating with it
over rpc. Therefore, the option `--embed` exists as a shorthand for
`nvim --headless --cmd "call stdioopen({'rpc': v:true})"`
Nvim's stderr is implicitly open as a write-only bytes channel. It will
always have channel id 2, however to be explicit |v:stderr| can be used.
==============================================================================
vim:tw=78:ts=8:noet:ft=help:norl:

View File

@ -37,6 +37,8 @@ Functions ~
*file_readable()* Obsolete name for |filereadable()|.
*highlight_exists()* Obsolete name for |hlexists()|.
*highlightID()* Obsolete name for |hlID()|.
*jobclose()* Obsolete name for |chanclose()|
*jobsend()* Obsolete name for |chansend()|
*last_buffer_nr()* Obsolete name for bufnr("$").
Modifiers ~

View File

@ -1818,6 +1818,13 @@ v:shell_error Result of the last shell command. When non-zero, the last
*v:statusmsg* *statusmsg-variable*
v:statusmsg Last given status message. It's allowed to set this variable.
*v:stderr* *stderr-variable*
v:stderr Channel id for stderr. Unlike stdin and stdout (see
|stdioopen()|), stderr is always open for writing. This channel
ID is always 2, but this variable can be used to be explicit.
Example: >
:call chansend(v:stderr, "something bad happened\n")
<
*v:swapname* *swapname-variable*
v:swapname Only valid when executing |SwapExists| autocommands: Name of
the swap file found. Read-only.
@ -1989,6 +1996,8 @@ call({func}, {arglist} [, {dict}])
any call {func} with arguments {arglist}
ceil({expr}) Float round {expr} up
changenr() Number current change number
chanclose({id}[, {stream}]) Number Closes a channel or one of its streams
chansend({id}, {data}) Number Writes {data} to channel
char2nr({expr}[, {utf8}]) Number ASCII/UTF8 value of first char in {expr}
cindent({lnum}) Number C indent for line {lnum}
clearmatches() none clear all matches
@ -2137,13 +2146,11 @@ isdirectory({directory}) Number |TRUE| if {directory} is a directory
islocked({expr}) Number |TRUE| if {expr} is locked
id({expr}) String identifier of the container
items({dict}) List key-value pairs in {dict}
jobclose({job}[, {stream}]) Number Closes a job stream(s)
jobpid({job}) Number Returns pid of a job.
jobresize({job}, {width}, {height})
Number Resize {job}'s pseudo terminal window
jobsend({job}, {data}) Number Writes {data} to {job}'s stdin
jobpid({id}) Number Returns pid of a job.
jobresize({id}, {width}, {height})
Number Resize pseudo terminal window of a job
jobstart({cmd}[, {opts}]) Number Spawns {cmd} as a job
jobstop({job}) Number Stops a job
jobstop({id}) Number Stops a job
jobwait({ids}[, {timeout}]) Number Wait for a set of jobs
join({list} [, {sep}]) String join {list} items into one String
json_decode({expr}) any Convert {expr} from JSON
@ -2226,7 +2233,6 @@ rpcnotify({channel}, {event}[, {args}...])
Sends an |RPC| notification to {channel}
rpcrequest({channel}, {method}[, {args}...])
Sends an |RPC| request to {channel}
rpcstop({channel}) Closes an |RPC| {channel}
screenattr({row}, {col}) Number attribute at screen position
screenchar({row}, {col}) Number character at screen position
screencol() Number current cursor column
@ -2268,6 +2274,8 @@ shiftwidth() Number effective value of 'shiftwidth'
simplify({filename}) String simplify filename as much as possible
sin({expr}) Float sine of {expr}
sinh({expr}) Float hyperbolic sine of {expr}
sockconnect({mode}, {address} [, {opts}])
Number Connects to socket
sort({list} [, {func} [, {dict}]])
List sort {list}, using {func} to compare
soundfold({word}) String sound-fold {word}
@ -2277,6 +2285,7 @@ spellsuggest({word} [, {max} [, {capital}]])
split({expr} [, {pat} [, {keepempty}]])
List make |List| from {pat} separated {expr}
sqrt({expr}) Float square root of {expr}
stdioopen({dict}) Number open stdio in a headless instance.
str2float({expr}) Float convert String to Float
str2nr({expr} [, {base}]) Number convert String to Number
strchars({expr} [, {skipcc}]) Number character length of the String {expr}
@ -2761,6 +2770,35 @@ changenr() *changenr()*
redo it is the number of the redone change. After undo it is
one less than the number of the undone change.
chanclose({id}[, {stream}]) {Nvim} *chanclose()*
Close a channel or a specific stream associated with it.
For a job, {stream} can be one of "stdin", "stdout",
"stderr" or "rpc" (closes stdin/stdout for a job started
with `"rpc":v:true`) If {stream} is omitted, all streams
are closed. If the channel is a pty, this will then close the
pty master, sending SIGHUP to the job process.
For a socket, there is only one stream, and {stream} should be
ommited.
chansend({id}, {data}) {Nvim} *chansend()*
Send data to channel {id}. For a job, it writes it to the
stdin of the process. For the stdio channel |channel-stdio|,
it writes to Nvim's stdout. Returns the number of bytes
written if the write succeeded, 0 otherwise.
See |channel-bytes| for more information.
{data} may be a string, string convertible, or a list. If
{data} is a list, the items will be joined by newlines; any
newlines in an item will be sent as NUL. To send a final
newline, include a final empty string. Example: >
:call chansend(id, ["abc", "123\n456", ""])
< will send "abc<NL>123<NUL>456<NL>".
chansend() writes raw data, not RPC messages. If the channel
was created with `"rpc":v:true` then the channel expects RPC
messages, use |rpcnotify()| and |rpcrequest()| instead.
char2nr({expr} [, {utf8}]) *char2nr()*
Return number value of the first char in {expr}. Examples: >
char2nr(" ") returns 32
@ -4931,12 +4969,6 @@ items({dict}) *items()*
entry and the value of this entry. The |List| is in arbitrary
order.
jobclose({job}[, {stream}]) *jobclose()*
Close {stream} of |job-id| {job}, where {stream} is one of:
"stdin", "stdout", "stderr", "rpc" (RPC channel of a job
started with `"rpc":v:true`). If {stream} is omitted, all
streams are closed. If the job is a pty job, this will close
the pty master, sending SIGHUP to the job process.
jobpid({job}) *jobpid()*
Return the PID (process id) of |job-id| {job}.
@ -4946,22 +4978,6 @@ jobresize({job}, {width}, {height}) *jobresize()*
columns and {height} rows.
Fails if the job was not started with `"pty":v:true`.
jobsend({job}, {data}) *jobsend()*
Writes to stdin of the process associated with |job-id| {job}.
Returns 1 if the write succeeded, 0 otherwise.
See |job-control|.
{data} may be a string, string convertible, or a list. If
{data} is a list, the items will be joined by newlines; any
newlines in an item will be sent as NUL. To send a final
newline, include a final empty string. Example: >
:call jobsend(j, ["abc", "123\n456", ""])
< will send "abc<NL>123<NUL>456<NL>".
jobsend() writes raw data, not RPC messages. If the job was
created with `"rpc":v:true` then the channel expects RPC
messages, use |rpcnotify()| and |rpcrequest()| instead.
jobstart({cmd}[, {opts}]) *jobstart()*
Spawns {cmd} as a job.
If {cmd} is a List it runs directly (no 'shell').
@ -4971,6 +4987,11 @@ jobstart({cmd}[, {opts}]) *jobstart()*
Returns |job-id| on success, 0 on invalid arguments (or job
table is full), -1 if {cmd}[0] or 'shell' is not executable.
For communication over the job's stdio, it is represented as a
|channel|, and a channel ID is returned on success. Use
|chansend()| (or |rpcnotify()| and |rpcrequest()| if "rpc" option
was used) to send data to stdin and |chanclose()| to close stdio
streams without stopping the job explicitly.
See |job-control| and |rpc|.
@ -4987,7 +5008,9 @@ jobstart({cmd}[, {opts}]) *jobstart()*
*jobstart-options*
{opts} is a dictionary with these keys:
|on_stdout|: stdout event handler (function name or |Funcref|)
stdout_buffered : read stdout in |buffered| mode.
|on_stderr|: stderr event handler (function name or |Funcref|)
stderr_buffered : read stderr in |buffered| mode.
|on_exit| : exit event handler (function name or |Funcref|)
cwd : Working directory of the job; defaults to
|current-directory|.
@ -5009,9 +5032,14 @@ jobstart({cmd}[, {opts}]) *jobstart()*
{opts} is passed as |self| dictionary to the callback; the
caller may set other keys to pass application-specific data.
Returns:
- The channel ID on success
- 0 on invalid arguments
- -1 if {cmd}[0] is not executable.
See |job-control|, |channels|, and |msgpack-rpc| for more information.
jobstop({job}) *jobstop()*
Stop |job-id| {job} by sending SIGTERM to the job process. If
jobstop({id}) *jobstop()*
Stop |job-id| {id} by sending SIGTERM to the job process. If
the process does not terminate after a timeout then SIGKILL
will be sent. When the job terminates its |on_exit| handler
(if any) will be invoked.
@ -6328,13 +6356,11 @@ rpcstart({prog}[, {argv}]) {Nvim} *rpcstart()*
:let id = jobstart(['prog', 'arg1', 'arg2'], {'rpc': v:true})
rpcstop({channel}) {Nvim} *rpcstop()*
Closes an |RPC| {channel}. If the channel is a job
started with |jobstart()| the job is killed.
It is better to use |jobstop()| in this case, or use
|jobclose|(id, "rpc") to only close the channel without
killing the job.
Closes the socket connection if the channel was opened by
connecting to |v:servername|.
Deprecated. This function was used to stop a job with |rpc|
channel, and additionally closed rpc sockets. Instead use
|jobstop()| to stop any job, and |chanclose|(id, "rpc") to close
rpc communication without stopping the job. Use |chanclose|(id)
to close any socket.
screenattr({row}, {col}) *screenattr()*
Like |screenchar()|, but return the attribute. This is a rather
@ -7034,15 +7060,20 @@ sockconnect({mode}, {address}, {opts}) *sockconnect()*
{address} should be the path of a named pipe. If {mode} is
"tcp" then {address} should be of the form "host:port" where
the host should be an ip adderess or host name, and port the
port number. Currently only rpc sockets are supported, so
{opts} must be passed with "rpc" set to |TRUE|.
port number.
Returns a |channel| ID. Close the socket with |chanclose()|.
Use |chansend()| to send data over a bytes socket, and
|rpcrequest()| and |rpcnotify()| to communicate with a RPC
socket.
{opts} is a dictionary with these keys:
rpc : If set, |msgpack-rpc| will be used to communicate
over the socket.
|on_data| : callback invoked when data was read from socket
data_buffered : read data from socket in |buffered| mode.
rpc : If set, |msgpack-rpc| will be used to communicate
over the socket.
Returns:
- The channel ID on success, which is used by
|rpcnotify()| and |rpcrequest()| and |rpcstop()|.
- The channel ID on success (greater than zero)
- 0 on invalid arguments or connection failure.
sort({list} [, {func} [, {dict}]]) *sort()* *E702*
@ -7194,6 +7225,27 @@ sqrt({expr}) *sqrt()*
"nan" may be different, it depends on system libraries.
stdioopen({opts}) *stdioopen()*
In a nvim launched with the |--headless| option, this opens
stdin and stdout as a |channel|. This function can only be
invoked once per instance. See |channel-stdio| for more
information and examples. Note that stderr is not handled by
this function, see |v:stderr|.
Returns a |channel| ID. Close the stdio descriptors with |chanclose()|.
Use |chansend()| to send data to stdout, and
|rpcrequest()| and |rpcnotify()| to communicate over RPC.
{opts} is a dictionary with these keys:
|on_stdin| : callback invoked when stdin is written to.
stdin_buffered : read stdin in |buffered| mode.
rpc : If set, |msgpack-rpc| will be used to communicate
over stdio
Returns:
- The channel ID on success (this is always 1)
- 0 on invalid arguments
str2float({expr}) *str2float()*
Convert String {expr} to a Float. This mostly works the same
as when using a floating point number in an expression, see

View File

@ -20,6 +20,8 @@ When a job starts it is assigned a number, unique for the life of the current
Nvim session. Functions like |jobstart()| return job ids. Functions like
|jobsend()|, |jobstop()|, |rpcnotify()|, and |rpcrequest()| take job ids.
The job's stdio streams are represented as a |channel|. It is possible to send
and recieve raw bytes, or use |msgpack-rpc|.
==============================================================================
Usage *job-control-usage*
@ -40,9 +42,9 @@ Example: >
call append(line('$'), str)
endfunction
let s:callbacks = {
\ 'on_stdout': function('s:JobHandler'),
\ 'on_stderr': function('s:JobHandler'),
\ 'on_exit': function('s:JobHandler')
\ 'on_stdout': function('s:OnEvent'),
\ 'on_stderr': function('s:OnEvent'),
\ 'on_exit': function('s:OnEvent')
\ }
let job1 = jobstart(['bash'], extend({'shell': 'shell 1'}, s:callbacks))
let job2 = jobstart(['bash', '-c', 'for i in {1..10}; do echo hello $i!; sleep 1; done'], extend({'shell': 'shell 2'}, s:callbacks))
@ -59,26 +61,14 @@ Description of what happens:
- `JobHandler()` callback is passed to |jobstart()| to handle various job
events. It displays stdout/stderr data received from the shells.
*on_stdout*
Arguments passed to on_stdout callback:
0: |job-id|
1: List of lines read from the stream. If the last item is not "" (empty
string), then it is an incomplete line that might be continued at the
next on_stdout invocation. See Note 2 below.
2: Event type: "stdout"
*on_stderr*
Arguments passed to on_stderr callback:
0: |job-id|
1: List of lines read from the stream. If the last item is not "" (empty
string), then it is an incomplete line that might be continued at the
next on_stderr invocation. See Note 2 below.
2: Event type: "stderr"
For |on_stdout| and |on_stderr| see |channel-callback|.
*on_exit*
Arguments passed to on_exit callback:
0: |job-id|
1: Exit-code of the process.
2: Event type: "exit"
Note: Buffered stdout/stderr data which has not been flushed by the sender
will not trigger the on_stdout/on_stderr callback (but if the process
ends, the on_exit callback will be invoked).
@ -137,13 +127,19 @@ The above example could be written in this "object-oriented" style: >
let instance = Shell.new('bomb',
\ 'for i in $(seq 9 -1 1); do echo $i 1>&$((i % 2 + 1)); sleep 1; done')
<
To send data to the job's stdin, use |jobsend()|: >
:call jobsend(job1, "ls\n")
:call jobsend(job1, "invalid-command\n")
:call jobsend(job1, "exit\n")
To send data to the job's stdin, use |chansend()|: >
:call chansend(job1, "ls\n")
:call chansend(job1, "invalid-command\n")
:call chansend(job1, "exit\n")
<
A job may be killed with |jobstop()|: >
:call jobstop(job1)
<
A job may be killed at any time with the |jobstop()| function:
>
:call jobstop(job1)
<
Individual streams can be closed without killing the job, see |chanclose()|.
==============================================================================
vim:tw=78:ts=8:noet:ft=help:norl:

View File

@ -1,4 +1,3 @@
*msgpack_rpc.txt* Nvim
NVIM REFERENCE MANUAL by Thiago de Arruda
@ -61,24 +60,24 @@ To get a formatted dump of the API using python (requires the `pyyaml` and
==============================================================================
3. Connecting *rpc-connecting*
There are several ways to open a msgpack-rpc channel to an Nvim instance:
See |channel-intro|, for various ways to open a channel. Most of the channel
opening functions take an `rpc` key in the options dictionary, to enable rpc.
1. Through stdin/stdout when `nvim` is started with `--embed`. This is how
applications can embed Nvim.
Additionally, rpc channels can be opened by other processes connecting to
TCP/IP sockets or named pipes listened to by nvim.
2. Through stdin/stdout of some other process spawned by |jobstart()|.
Set the "rpc" key to |v:true| in the options dict to use the job's stdin
and stdout as a single msgpack channel that is processed directly by
Nvim. Then it is not possible to process raw data to or from the
process's stdin and stdout. stderr can still be used, though.
An rpc socket is automatically created with each instance. The socket
location is stored in |v:servername|. By default this is a named pipe
with an automatically generated address. See |XXX|.
3. Through the socket automatically created with each instance. The socket
location is stored in |v:servername|.
4. Through a TCP/IP socket. To make Nvim listen on a TCP/IP socket, set the
|$NVIM_LISTEN_ADDRESS| environment variable before starting Nvim: >
To make Nvim listen on a TCP/IP socket instead, set the
|$NVIM_LISTEN_ADDRESS| environment variable before starting Nvim: >
NVIM_LISTEN_ADDRESS=127.0.0.1:6666 nvim
<
<Also, more sockets and named pipes can be listened on using |serverstart()|.
Note that localhost TCP sockets are generally less secure than named pipes,
and can lead to vunerabilities like remote code execution.
Connecting to the socket is the easiest way a programmer can test the API,
which can be done through any msgpack-rpc client library or full-featured
|api-client|. Here's a Ruby script that prints 'hello world!' in the current

View File

@ -117,8 +117,8 @@ variables:
- *b:term_title* The settable title of the terminal, typically displayed in
the window title or tab title of a graphical terminal emulator. Programs
running in the terminal can set this title via an escape sequence.
- *b:terminal_job_id* The nvim job ID of the job running in the terminal. See
|job-control| for more information.
- |'channel'| The nvim channel ID for the underlying PTY.
|chansend()| can be used to send input to the terminal.
- *b:terminal_job_pid* The PID of the top-level process running in the
terminal.

View File

@ -1213,6 +1213,12 @@ A jump table for the options with a short description can be found at |Q_op|.
< |Nvi| also has this option, but it only uses the first character.
See |cmdwin|.
*'channel'*
'channel' number (default: 0)
local to buffer
|Channel| connected to the buffer. Currently only used by
|terminal-emulator|. Is 0 if no terminal is open. Cannot be changed.
*'charconvert'* *'ccv'* *E202* *E214* *E513*
'charconvert' 'ccv' string (default "")
global

View File

@ -351,6 +351,8 @@ argument.
*--headless*
--headless Do not start the built-in UI.
See |channel-stdio| for how to use stdio for other purposes
instead.
See also |silent-mode|, which does start a (limited) UI.
==============================================================================

View File

@ -252,7 +252,7 @@ static void remote_ui_flush(UI *ui)
{
UIData *data = ui->data;
if (data->buffer.size > 0) {
channel_send_event(data->channel_id, "redraw", data->buffer);
rpc_send_event(data->channel_id, "redraw", data->buffer);
data->buffer = (Array)ARRAY_DICT_INIT;
}
}

View File

@ -721,7 +721,7 @@ void nvim_subscribe(uint64_t channel_id, String event)
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_subscribe(channel_id, e);
rpc_subscribe(channel_id, e);
}
/// Unsubscribes to event broadcasts
@ -737,7 +737,7 @@ void nvim_unsubscribe(uint64_t channel_id, String event)
char e[METHOD_MAXLEN + 1];
memcpy(e, event.data, length);
e[length] = NUL;
channel_unsubscribe(channel_id, e);
rpc_unsubscribe(channel_id, e);
}
Integer nvim_get_color_by_name(String name)

View File

@ -603,6 +603,7 @@ struct file_buffer {
char_u *b_p_bt; ///< 'buftype'
int b_has_qf_entry; ///< quickfix exists for buffer
int b_p_bl; ///< 'buflisted'
long b_p_channel; ///< 'channel'
int b_p_cin; ///< 'cindent'
char_u *b_p_cino; ///< 'cinoptions'
char_u *b_p_cink; ///< 'cinkeys'

752
src/nvim/channel.c Normal file
View File

@ -0,0 +1,752 @@
// This is an open source non-commercial project. Dear PVS-Studio, please check
// it. PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
#include "nvim/api/ui.h"
#include "nvim/channel.h"
#include "nvim/eval.h"
#include "nvim/event/socket.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
#include "nvim/os/shell.h"
#include "nvim/path.h"
#include "nvim/ascii.h"
static bool did_stdio = false;
PMap(uint64_t) *channels = NULL;
/// next free id for a job or rpc channel
/// 1 is reserved for stdio channel
/// 2 is reserved for stderr channel
static uint64_t next_chan_id = CHAN_STDERR+1;
typedef struct {
Channel *chan;
Callback *callback;
const char *type;
list_T *received;
int status;
} ChannelEvent;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "channel.c.generated.h"
#endif
/// Teardown the module
void channel_teardown(void)
{
if (!channels) {
return;
}
Channel *channel;
map_foreach_value(channels, channel, {
channel_close(channel->id, kChannelPartAll, NULL);
});
}
/// Closes a channel
///
/// @param id The channel id
/// @return true if successful, false otherwise
bool channel_close(uint64_t id, ChannelPart part, const char **error)
{
Channel *chan;
Process *proc;
const char *dummy;
if (!error) {
error = &dummy;
}
if (!(chan = find_channel(id))) {
if (id < next_chan_id) {
// allow double close, even though we can't say what parts was valid.
return true;
}
*error = (const char *)e_invchan;
return false;
}
bool close_main = false;
if (part == kChannelPartRpc || part == kChannelPartAll) {
close_main = true;
if (chan->is_rpc) {
rpc_close(chan);
} else if (part == kChannelPartRpc) {
*error = (const char *)e_invstream;
return false;
}
} else if ((part == kChannelPartStdin || part == kChannelPartStdout)
&& chan->is_rpc) {
*error = (const char *)e_invstreamrpc;
return false;
}
switch (chan->streamtype) {
case kChannelStreamSocket:
if (!close_main) {
*error = (const char *)e_invstream;
return false;
}
stream_may_close(&chan->stream.socket);
break;
case kChannelStreamProc:
proc = (Process *)&chan->stream.proc;
if (part == kChannelPartStdin || close_main) {
stream_may_close(&proc->in);
}
if (part == kChannelPartStdout || close_main) {
stream_may_close(&proc->out);
}
if (part == kChannelPartStderr || part == kChannelPartAll) {
stream_may_close(&proc->err);
}
if (proc->type == kProcessTypePty && part == kChannelPartAll) {
pty_process_close_master(&chan->stream.pty);
}
break;
case kChannelStreamStdio:
if (part == kChannelPartStdin || close_main) {
stream_may_close(&chan->stream.stdio.in);
}
if (part == kChannelPartStdout || close_main) {
stream_may_close(&chan->stream.stdio.out);
}
if (part == kChannelPartStderr) {
*error = (const char *)e_invstream;
return false;
}
break;
case kChannelStreamStderr:
if (part != kChannelPartAll && part != kChannelPartStderr) {
*error = (const char *)e_invstream;
return false;
}
if (!chan->stream.err.closed) {
chan->stream.err.closed = true;
// Don't close on exit, in case late error messages
if (!exiting) {
fclose(stderr);
}
channel_decref(chan);
}
break;
case kChannelStreamInternal:
if (!close_main) {
*error = (const char *)e_invstream;
return false;
}
break;
}
return true;
}
/// Initializes the module
void channel_init(void)
{
channels = pmap_new(uint64_t)();
channel_alloc(kChannelStreamStderr);
rpc_init();
remote_ui_init();
}
/// Allocates a channel.
///
/// Channel is allocated with refcount 1, which should be decreased
/// when the underlying stream closes.
static Channel *channel_alloc(ChannelStreamType type)
{
Channel *chan = xcalloc(1, sizeof(*chan));
if (type == kChannelStreamStdio) {
chan->id = CHAN_STDIO;
} else if (type == kChannelStreamStderr) {
chan->id = CHAN_STDERR;
} else {
chan->id = next_chan_id++;
}
chan->events = multiqueue_new_child(main_loop.events);
chan->refcount = 1;
chan->streamtype = type;
pmap_put(uint64_t)(channels, chan->id, chan);
return chan;
}
/// Not implemented, only logging for now
void channel_create_event(Channel *chan, char *ext_source)
{
#if MIN_LOG_LEVEL <= INFO_LOG_LEVEL
char *stream_desc, *mode_desc, *source;
switch (chan->streamtype) {
case kChannelStreamProc:
if (chan->stream.proc.type == kProcessTypePty) {
stream_desc = "pty job";
} else {
stream_desc = "job";
}
break;
case kChannelStreamStdio:
stream_desc = "stdio";
break;
case kChannelStreamSocket:
stream_desc = "socket";
break;
case kChannelStreamInternal:
stream_desc = "socket (internal)";
break;
default:
stream_desc = "?";
}
if (chan->is_rpc) {
mode_desc = ", rpc";
} else if (chan->term) {
mode_desc = ", terminal";
} else {
mode_desc = "";
}
if (ext_source) {
// TODO(bfredl): in a future improved traceback solution,
// external events should be included.
source = ext_source;
} else {
eval_format_source_name_line((char *)IObuff, sizeof(IObuff));
source = (char *)IObuff;
}
ILOG("new channel %" PRIu64 " (%s%s): %s", chan->id, stream_desc,
mode_desc, source);
#else
(void)chan;
(void)ext_source;
#endif
}
void channel_incref(Channel *channel)
{
channel->refcount++;
}
void channel_decref(Channel *channel)
{
if (!(--channel->refcount)) {
multiqueue_put(main_loop.fast_events, free_channel_event, 1, channel);
}
}
void callback_reader_free(CallbackReader *reader)
{
callback_free(&reader->cb);
if (reader->buffered) {
ga_clear(&reader->buffer);
}
}
void callback_reader_start(CallbackReader *reader)
{
if (reader->buffered) {
ga_init(&reader->buffer, sizeof(char *), 32);
ga_grow(&reader->buffer, 32);
}
}
static void free_channel_event(void **argv)
{
Channel *channel = argv[0];
if (channel->is_rpc) {
rpc_free(channel);
}
callback_reader_free(&channel->on_stdout);
callback_reader_free(&channel->on_stderr);
callback_free(&channel->on_exit);
pmap_del(uint64_t)(channels, channel->id);
multiqueue_free(channel->events);
xfree(channel);
}
static void channel_destroy_early(Channel *chan)
{
if ((chan->id != --next_chan_id)) {
abort();
}
if ((--chan->refcount != 0)) {
abort();
}
free_channel_event((void **)&chan);
}
static void close_cb(Stream *stream, void *data)
{
channel_decref(data);
}
Channel *channel_job_start(char **argv, CallbackReader on_stdout,
CallbackReader on_stderr, Callback on_exit,
bool pty, bool rpc, bool detach, const char *cwd,
uint16_t pty_width, uint16_t pty_height,
char *term_name, varnumber_T *status_out)
{
Channel *chan = channel_alloc(kChannelStreamProc);
chan->on_stdout = on_stdout;
chan->on_stderr = on_stderr;
chan->on_exit = on_exit;
chan->is_rpc = rpc;
if (pty) {
if (detach) {
EMSG2(_(e_invarg2), "terminal/pty job cannot be detached");
shell_free_argv(argv);
xfree(term_name);
channel_destroy_early(chan);
*status_out = 0;
return NULL;
}
chan->stream.pty = pty_process_init(&main_loop, chan);
if (pty_width > 0) {
chan->stream.pty.width = pty_width;
}
if (pty_height > 0) {
chan->stream.pty.height = pty_height;
}
if (term_name) {
chan->stream.pty.term_name = term_name;
}
} else {
chan->stream.uv = libuv_process_init(&main_loop, chan);
}
Process *proc = (Process *)&chan->stream.proc;
proc->argv = argv;
proc->cb = channel_process_exit_cb;
proc->events = chan->events;
proc->detach = detach;
proc->cwd = cwd;
char *cmd = xstrdup(proc->argv[0]);
bool has_out, has_err;
if (proc->type == kProcessTypePty) {
has_out = true;
has_err = false;
} else {
has_out = chan->is_rpc || callback_reader_set(chan->on_stdout);
has_err = callback_reader_set(chan->on_stderr);
}
int status = process_spawn(proc, true, has_out, has_err);
if (status) {
EMSG3(_(e_jobspawn), os_strerror(status), cmd);
xfree(cmd);
if (proc->type == kProcessTypePty) {
xfree(chan->stream.pty.term_name);
}
channel_destroy_early(chan);
*status_out = proc->status;
return NULL;
}
xfree(cmd);
wstream_init(&proc->in, 0);
if (has_out) {
rstream_init(&proc->out, 0);
}
if (chan->is_rpc) {
// the rpc takes over the in and out streams
rpc_start(chan);
} else {
if (has_out) {
callback_reader_start(&chan->on_stdout);
rstream_start(&proc->out, on_job_stdout, chan);
}
}
if (has_err) {
callback_reader_start(&chan->on_stderr);
rstream_init(&proc->err, 0);
rstream_start(&proc->err, on_job_stderr, chan);
}
*status_out = (varnumber_T)chan->id;
return chan;
}
uint64_t channel_connect(bool tcp, const char *address,
bool rpc, CallbackReader on_output,
int timeout, const char **error)
{
if (!tcp && rpc) {
char *path = fix_fname(address);
if (server_owns_pipe_address(path)) {
// avoid deadlock
xfree(path);
return channel_create_internal_rpc();
}
xfree(path);
}
Channel *channel = channel_alloc(kChannelStreamSocket);
if (!socket_connect(&main_loop, &channel->stream.socket,
tcp, address, timeout, error)) {
channel_destroy_early(channel);
return 0;
}
channel->stream.socket.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel;
wstream_init(&channel->stream.socket, 0);
rstream_init(&channel->stream.socket, 0);
if (rpc) {
rpc_start(channel);
} else {
channel->on_stdout = on_output;
callback_reader_start(&channel->on_stdout);
rstream_start(&channel->stream.socket, on_socket_output, channel);
}
channel_create_event(channel, NULL);
return channel->id;
}
/// Creates an RPC channel from a tcp/pipe socket connection
///
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = channel_alloc(kChannelStreamSocket);
socket_watcher_accept(watcher, &channel->stream.socket);
channel->stream.socket.internal_close_cb = close_cb;
channel->stream.socket.internal_data = channel;
wstream_init(&channel->stream.socket, 0);
rstream_init(&channel->stream.socket, 0);
rpc_start(channel);
channel_create_event(channel, watcher->addr);
}
/// Creates a loopback channel. This is used to avoid deadlock
/// when an instance connects to its own named pipe.
static uint64_t channel_create_internal_rpc(void)
{
Channel *channel = channel_alloc(kChannelStreamInternal);
rpc_start(channel);
return channel->id;
}
/// Creates an API channel from stdin/stdout. This is used when embedding
/// Neovim
uint64_t channel_from_stdio(bool rpc, CallbackReader on_output,
const char **error)
FUNC_ATTR_NONNULL_ALL
{
if (!headless_mode) {
*error = _("can only be opened in headless mode");
return 0;
}
if (did_stdio) {
*error = _("channel was already open");
return 0;
}
did_stdio = true;
Channel *channel = channel_alloc(kChannelStreamStdio);
rstream_init_fd(&main_loop, &channel->stream.stdio.in, 0, 0);
wstream_init_fd(&main_loop, &channel->stream.stdio.out, 1, 0);
if (rpc) {
rpc_start(channel);
} else {
channel->on_stdout = on_output;
callback_reader_start(&channel->on_stdout);
rstream_start(&channel->stream.stdio.in, on_stdio_input, channel);
}
return channel->id;
}
/// @param data will be consumed
size_t channel_send(uint64_t id, char *data, size_t len, const char **error)
{
Channel *chan = find_channel(id);
if (!chan) {
EMSG(_(e_invchan));
goto err;
}
if (chan->streamtype == kChannelStreamStderr) {
if (chan->stream.err.closed) {
*error = _("Can't send data to closed stream");
goto err;
}
// unbuffered write
size_t written = fwrite(data, len, 1, stderr);
xfree(data);
return len * written;
}
Stream *in = channel_instream(chan);
if (in->closed) {
*error = _("Can't send data to closed stream");
goto err;
}
if (chan->is_rpc) {
*error = _("Can't send raw data to rpc channel");
goto err;
}
WBuffer *buf = wstream_new_buffer(data, len, 1, xfree);
return wstream_write(in, buf) ? len : 0;
err:
xfree(data);
return 0;
}
/// NB: mutates buf in place!
static list_T *buffer_to_tv_list(char *buf, size_t count)
{
list_T *ret = tv_list_alloc();
char *ptr = buf;
size_t remaining = count;
size_t off = 0;
while (off < remaining) {
// append the line
if (ptr[off] == NL) {
tv_list_append_string(ret, ptr, (ssize_t)off);
size_t skip = off + 1;
ptr += skip;
remaining -= skip;
off = 0;
continue;
}
if (ptr[off] == NUL) {
// Translate NUL to NL
ptr[off] = NL;
}
off++;
}
tv_list_append_string(ret, ptr, (ssize_t)off);
return ret;
}
// vimscript job callbacks must be executed on Nvim main loop
static inline void process_channel_event(Channel *chan, Callback *callback,
const char *type, char *buf,
size_t count, int status)
{
assert(callback);
ChannelEvent *event_data = xmalloc(sizeof(*event_data));
event_data->received = NULL;
if (buf) {
event_data->received = buffer_to_tv_list(buf, count);
} else {
event_data->status = status;
}
channel_incref(chan); // Hold on ref to callback
event_data->chan = chan;
event_data->callback = callback;
event_data->type = type;
multiqueue_put(chan->events, on_channel_event, 1, event_data);
}
void on_job_stdout(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdout");
}
void on_job_stderr(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr, "stderr");
}
static void on_socket_output(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "data");
}
static void on_stdio_input(Stream *stream, RBuffer *buf, size_t count,
void *data, bool eof)
{
Channel *chan = data;
on_channel_output(stream, chan, buf, count, eof, &chan->on_stdout, "stdin");
}
static void on_channel_output(Stream *stream, Channel *chan, RBuffer *buf,
size_t count, bool eof, CallbackReader *reader,
const char *type)
{
// stub variable, to keep reading consistent with the order of events, only
// consider the count parameter.
size_t r;
char *ptr = rbuffer_read_ptr(buf, &r);
if (eof) {
if (reader->buffered) {
if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, reader->buffer.ga_data,
(size_t)reader->buffer.ga_len, 0);
ga_clear(&reader->buffer);
} else if (reader->self) {
list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
(size_t)reader->buffer.ga_len);
tv_dict_add_list(reader->self, type, strlen(type), data);
} else {
abort();
}
} else if (reader->cb.type != kCallbackNone) {
process_channel_event(chan, &reader->cb, type, ptr, 0, 0);
}
return;
}
// The order here matters, the terminal must receive the data first because
// process_channel_event will modify the read buffer(convert NULs into NLs)
if (chan->term) {
terminal_receive(chan->term, ptr, count);
}
rbuffer_consumed(buf, count);
if (reader->buffered) {
ga_concat_len(&reader->buffer, ptr, count);
} else if (callback_reader_set(*reader)) {
process_channel_event(chan, &reader->cb, type, ptr, count, 0);
}
}
static void channel_process_exit_cb(Process *proc, int status, void *data)
{
Channel *chan = data;
if (chan->term) {
char msg[sizeof("\r\n[Process exited ]") + NUMBUFLEN];
snprintf(msg, sizeof msg, "\r\n[Process exited %d]", proc->status);
terminal_close(chan->term, msg);
}
// if status is -1 the process did not really exit,
// we just closed the handle onto a detached process
if (status >= 0) {
process_channel_event(chan, &chan->on_exit, "exit", NULL, 0, status);
}
channel_decref(chan);
}
static void on_channel_event(void **args)
{
ChannelEvent *ev = (ChannelEvent *)args[0];
typval_T argv[4];
argv[0].v_type = VAR_NUMBER;
argv[0].v_lock = VAR_UNLOCKED;
argv[0].vval.v_number = (varnumber_T)ev->chan->id;
if (ev->received) {
argv[1].v_type = VAR_LIST;
argv[1].v_lock = VAR_UNLOCKED;
argv[1].vval.v_list = ev->received;
argv[1].vval.v_list->lv_refcount++;
} else {
argv[1].v_type = VAR_NUMBER;
argv[1].v_lock = VAR_UNLOCKED;
argv[1].vval.v_number = ev->status;
}
argv[2].v_type = VAR_STRING;
argv[2].v_lock = VAR_UNLOCKED;
argv[2].vval.v_string = (uint8_t *)ev->type;
typval_T rettv = TV_INITIAL_VALUE;
callback_call(ev->callback, 3, argv, &rettv);
tv_clear(&rettv);
channel_decref(ev->chan);
xfree(ev);
}
/// Open terminal for channel
///
/// Channel `chan` is assumed to be an open pty channel,
/// and curbuf is assumed to be a new, unmodified buffer.
void channel_terminal_open(Channel *chan)
{
TerminalOptions topts;
topts.data = chan;
topts.width = chan->stream.pty.width;
topts.height = chan->stream.pty.height;
topts.write_cb = term_write;
topts.resize_cb = term_resize;
topts.close_cb = term_close;
curbuf->b_p_channel = (long)chan->id; // 'channel' option
Terminal *term = terminal_open(topts);
chan->term = term;
channel_incref(chan);
}
static void term_write(char *buf, size_t size, void *data)
{
Channel *chan = data;
if (chan->stream.proc.in.closed) {
// If the backing stream was closed abruptly, there may be write events
// ahead of the terminal close event. Just ignore the writes.
ILOG("write failed: stream is closed");
return;
}
WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
wstream_write(&chan->stream.proc.in, wbuf);
}
static void term_resize(uint16_t width, uint16_t height, void *data)
{
Channel *chan = data;
pty_process_resize(&chan->stream.pty, width, height);
}
static inline void term_delayed_free(void **argv)
{
Channel *chan = argv[0];
if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.pending_reqs) {
multiqueue_put(chan->events, term_delayed_free, 1, chan);
return;
}
terminal_destroy(chan->term);
chan->term = NULL;
channel_decref(chan);
}
static void term_close(void *data)
{
Channel *chan = data;
process_stop(&chan->stream.proc);
multiqueue_put(chan->events, term_delayed_free, 1, data);
}

134
src/nvim/channel.h Normal file
View File

@ -0,0 +1,134 @@
#ifndef NVIM_CHANNEL_H
#define NVIM_CHANNEL_H
#include "nvim/main.h"
#include "nvim/event/socket.h"
#include "nvim/event/process.h"
#include "nvim/os/pty_process.h"
#include "nvim/event/libuv_process.h"
#include "nvim/eval/typval.h"
#include "nvim/msgpack_rpc/channel_defs.h"
#define CHAN_STDIO 1
#define CHAN_STDERR 2
typedef enum {
kChannelStreamProc,
kChannelStreamSocket,
kChannelStreamStdio,
kChannelStreamStderr,
kChannelStreamInternal
} ChannelStreamType;
typedef enum {
kChannelPartStdin,
kChannelPartStdout,
kChannelPartStderr,
kChannelPartRpc,
kChannelPartAll
} ChannelPart;
typedef struct {
Stream in;
Stream out;
} StdioPair;
typedef struct {
bool closed;
} StderrState;
typedef struct {
Callback cb;
dict_T *self;
garray_T buffer;
bool buffered;
} CallbackReader;
#define CALLBACK_READER_INIT ((CallbackReader){ .cb = CALLBACK_NONE, \
.self = NULL, \
.buffer = GA_EMPTY_INIT_VALUE, \
.buffered = false })
static inline bool callback_reader_set(CallbackReader reader)
{
return reader.cb.type != kCallbackNone || reader.self;
}
struct Channel {
uint64_t id;
size_t refcount;
MultiQueue *events;
ChannelStreamType streamtype;
union {
Process proc;
LibuvProcess uv;
PtyProcess pty;
Stream socket;
StdioPair stdio;
StderrState err;
} stream;
bool is_rpc;
RpcState rpc;
Terminal *term;
CallbackReader on_stdout;
CallbackReader on_stderr;
Callback on_exit;
};
EXTERN PMap(uint64_t) *channels;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "channel.h.generated.h"
#endif
/// @returns Channel with the id or NULL if not found
static inline Channel *find_channel(uint64_t id)
{
return pmap_get(uint64_t)(channels, id);
}
static inline Stream *channel_instream(Channel *chan)
FUNC_ATTR_NONNULL_ALL
{
switch (chan->streamtype) {
case kChannelStreamProc:
return &chan->stream.proc.in;
case kChannelStreamSocket:
return &chan->stream.socket;
case kChannelStreamStdio:
return &chan->stream.stdio.out;
case kChannelStreamInternal:
case kChannelStreamStderr:
abort();
}
abort();
}
static inline Stream *channel_outstream(Channel *chan)
FUNC_ATTR_NONNULL_ALL
{
switch (chan->streamtype) {
case kChannelStreamProc:
return &chan->stream.proc.out;
case kChannelStreamSocket:
return &chan->stream.socket;
case kChannelStreamStdio:
return &chan->stream.stdio.in;
case kChannelStreamInternal:
case kChannelStreamStderr:
abort();
}
abort();
}
#endif // NVIM_CHANNEL_H

File diff suppressed because it is too large Load Diff

View File

@ -7,6 +7,9 @@
#include "nvim/eval/typval.h"
#include "nvim/profile.h"
#include "nvim/garray.h"
#include "nvim/event/rstream.h"
#include "nvim/event/wstream.h"
#include "nvim/channel.h"
#define COPYID_INC 2
#define COPYID_MASK (~0x1)
@ -53,6 +56,7 @@ typedef enum {
VV_DYING,
VV_EXCEPTION,
VV_THROWPOINT,
VV_STDERR,
VV_REG,
VV_CMDBANG,
VV_INSERTMODE,

View File

@ -55,6 +55,8 @@ return {
call={args={2, 3}},
ceil={args=1, func="float_op_wrapper", data="&ceil"},
changenr={},
chanclose={args={1, 2}},
chansend={args=2},
char2nr={args={1, 2}},
cindent={args=1},
clearmatches={},
@ -173,10 +175,10 @@ return {
islocked={args=1},
id={args=1},
items={args=1},
jobclose={args={1, 2}},
jobclose={args={1, 2}, func="f_chanclose"},
jobpid={args=1},
jobresize={args=3},
jobsend={args=2},
jobsend={args=2, func="f_chansend"},
jobstart={args={1, 2}},
jobstop={args=1},
jobwait={args={1, 2}},
@ -273,6 +275,7 @@ return {
sockconnect={args={2,3}},
sort={args={1, 3}},
soundfold={args=1},
stdioopen={args=1},
spellbadword={args={0, 1}},
spellsuggest={args={1, 3}},
split={args={1, 3}},

View File

@ -374,7 +374,7 @@ void tv_list_append_dict(list_T *const list, dict_T *const dict)
/// case string is considered to be usual zero-terminated
/// string or NULL “empty” string.
void tv_list_append_string(list_T *const l, const char *const str,
const ptrdiff_t len)
const ssize_t len)
FUNC_ATTR_NONNULL_ARG(1)
{
if (str == NULL) {
@ -824,7 +824,7 @@ void tv_dict_watcher_add(dict_T *const dict, const char *const key_pattern,
/// @param[in] cb2 Second callback to check.
///
/// @return True if they are equal, false otherwise.
bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2)
bool tv_callback_equal(const Callback *cb1, const Callback *cb2)
FUNC_ATTR_NONNULL_ALL FUNC_ATTR_WARN_UNUSED_RESULT
{
if (cb1->type != cb2->type) {
@ -843,10 +843,31 @@ bool tv_callback_equal(const Callback *const cb1, const Callback *const cb2)
return true;
}
}
assert(false);
abort();
return false;
}
/// Unref/free callback
void callback_free(Callback *callback)
FUNC_ATTR_NONNULL_ALL
{
switch (callback->type) {
case kCallbackFuncref: {
func_unref(callback->data.funcref);
xfree(callback->data.funcref);
break;
}
case kCallbackPartial: {
partial_unref(callback->data.partial);
break;
}
case kCallbackNone: {
break;
}
}
callback->type = kCallbackNone;
}
/// Remove watcher from a dictionary
///
/// @param dict Dictionary to remove watcher from.

View File

@ -46,22 +46,22 @@ int libuv_process_spawn(LibuvProcess *uvproc)
uvproc->uvstdio[2].flags = UV_IGNORE;
uvproc->uv.data = proc;
if (proc->in) {
if (!proc->in.closed) {
uvproc->uvstdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
uvproc->uvstdio[0].data.stream = STRUCT_CAST(uv_stream_t,
&proc->in->uv.pipe);
&proc->in.uv.pipe);
}
if (proc->out) {
if (!proc->out.closed) {
uvproc->uvstdio[1].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[1].data.stream = STRUCT_CAST(uv_stream_t,
&proc->out->uv.pipe);
&proc->out.uv.pipe);
}
if (proc->err) {
if (!proc->err.closed) {
uvproc->uvstdio[2].flags = UV_CREATE_PIPE | UV_WRITABLE_PIPE;
uvproc->uvstdio[2].data.stream = STRUCT_CAST(uv_stream_t,
&proc->err->uv.pipe);
&proc->err.uv.pipe);
}
int status;

View File

@ -25,28 +25,28 @@
// For pty processes SIGTERM is sent first (in case SIGHUP was not enough).
#define KILL_TIMEOUT_MS 2000
#define CLOSE_PROC_STREAM(proc, stream) \
do { \
if (proc->stream && !proc->stream->closed) { \
stream_close(proc->stream, NULL, NULL); \
} \
} while (0)
static bool process_is_tearing_down = false;
/// @returns zero on success, or negative error code
int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
int process_spawn(Process *proc, bool in, bool out, bool err)
FUNC_ATTR_NONNULL_ALL
{
if (proc->in) {
uv_pipe_init(&proc->loop->uv, &proc->in->uv.pipe, 0);
if (in) {
uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0);
} else {
proc->in.closed = true;
}
if (proc->out) {
uv_pipe_init(&proc->loop->uv, &proc->out->uv.pipe, 0);
if (out) {
uv_pipe_init(&proc->loop->uv, &proc->out.uv.pipe, 0);
} else {
proc->out.closed = true;
}
if (proc->err) {
uv_pipe_init(&proc->loop->uv, &proc->err->uv.pipe, 0);
if (err) {
uv_pipe_init(&proc->loop->uv, &proc->err.uv.pipe, 0);
} else {
proc->err.closed = true;
}
int status;
@ -62,14 +62,14 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
}
if (status) {
if (proc->in) {
uv_close((uv_handle_t *)&proc->in->uv.pipe, NULL);
if (in) {
uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
}
if (proc->out) {
uv_close((uv_handle_t *)&proc->out->uv.pipe, NULL);
if (out) {
uv_close((uv_handle_t *)&proc->out.uv.pipe, NULL);
}
if (proc->err) {
uv_close((uv_handle_t *)&proc->err->uv.pipe, NULL);
if (err) {
uv_close((uv_handle_t *)&proc->err.uv.pipe, NULL);
}
if (proc->type == kProcessTypeUv) {
@ -82,30 +82,27 @@ int process_spawn(Process *proc) FUNC_ATTR_NONNULL_ALL
return status;
}
if (proc->in) {
stream_init(NULL, proc->in, -1,
STRUCT_CAST(uv_stream_t, &proc->in->uv.pipe));
proc->in->events = proc->events;
proc->in->internal_data = proc;
proc->in->internal_close_cb = on_process_stream_close;
if (in) {
stream_init(NULL, &proc->in, -1,
STRUCT_CAST(uv_stream_t, &proc->in.uv.pipe));
proc->in.internal_data = proc;
proc->in.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
if (proc->out) {
stream_init(NULL, proc->out, -1,
STRUCT_CAST(uv_stream_t, &proc->out->uv.pipe));
proc->out->events = proc->events;
proc->out->internal_data = proc;
proc->out->internal_close_cb = on_process_stream_close;
if (out) {
stream_init(NULL, &proc->out, -1,
STRUCT_CAST(uv_stream_t, &proc->out.uv.pipe));
proc->out.internal_data = proc;
proc->out.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
if (proc->err) {
stream_init(NULL, proc->err, -1,
STRUCT_CAST(uv_stream_t, &proc->err->uv.pipe));
proc->err->events = proc->events;
proc->err->internal_data = proc;
proc->err->internal_close_cb = on_process_stream_close;
if (err) {
stream_init(NULL, &proc->err, -1,
STRUCT_CAST(uv_stream_t, &proc->err.uv.pipe));
proc->err.internal_data = proc;
proc->err.internal_close_cb = on_process_stream_close;
proc->refcount++;
}
@ -136,27 +133,11 @@ void process_teardown(Loop *loop) FUNC_ATTR_NONNULL_ALL
pty_process_teardown(loop);
}
// Wrappers around `stream_close` that protect against double-closing.
void process_close_streams(Process *proc) FUNC_ATTR_NONNULL_ALL
{
process_close_in(proc);
process_close_out(proc);
process_close_err(proc);
}
void process_close_in(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, in);
}
void process_close_out(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, out);
}
void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
{
CLOSE_PROC_STREAM(proc, err);
stream_may_close(&proc->in);
stream_may_close(&proc->out);
stream_may_close(&proc->err);
}
/// Synchronously wait for a process to finish
@ -164,16 +145,15 @@ void process_close_err(Process *proc) FUNC_ATTR_NONNULL_ALL
/// @param process Process instance
/// @param ms Time in milliseconds to wait for the process.
/// 0 for no wait. -1 to wait until the process quits.
/// @return Exit code of the process.
/// @return Exit code of the process. proc->status will have the same value.
/// -1 if the timeout expired while the process is still running.
/// -2 if the user interruped the wait.
int process_wait(Process *proc, int ms, MultiQueue *events)
FUNC_ATTR_NONNULL_ARG(1)
{
int status = -1; // default
bool interrupted = false;
if (!proc->refcount) {
status = proc->status;
int status = proc->status;
LOOP_PROCESS_EVENTS(proc->loop, proc->events, 0);
return status;
}
@ -209,7 +189,9 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
if (proc->refcount == 1) {
// Job exited, collect status and manually invoke close_cb to free the job
// resources
status = interrupted ? -2 : proc->status;
if (interrupted) {
proc->status = -2;
}
decref(proc);
if (events) {
// the decref call created an exit event, process it now
@ -219,7 +201,7 @@ int process_wait(Process *proc, int ms, MultiQueue *events)
proc->refcount--;
}
return status;
return proc->status;
}
/// Ask a process to terminate and eventually kill if it doesn't respond
@ -233,8 +215,9 @@ void process_stop(Process *proc) FUNC_ATTR_NONNULL_ALL
switch (proc->type) {
case kProcessTypeUv:
// Close the process's stdin. If the process doesn't close its own
// stdout/stderr, they will be closed when it exits (voluntarily or not).
process_close_in(proc);
// stdout/stderr, they will be closed when it exits(possibly due to being
// terminated after a timeout)
stream_may_close(&proc->in);
ILOG("Sending SIGTERM to pid %d", proc->pid);
uv_kill(proc->pid, SIGTERM);
break;
@ -375,15 +358,15 @@ static void flush_stream(Process *proc, Stream *stream)
// Poll for data and process the generated events.
loop_poll_events(proc->loop, 0);
if (proc->events) {
multiqueue_process_events(proc->events);
if (stream->events) {
multiqueue_process_events(stream->events);
}
// Stream can be closed if it is empty.
if (num_bytes == stream->num_bytes) {
if (stream->read_cb) {
if (stream->read_cb && !stream->did_eof) {
// Stream callback could miss EOF handling if a child keeps the stream
// open.
// open. But only send EOF if we haven't already.
stream->read_cb(stream, stream->buffer, 0, stream->cb_data, true);
}
break;
@ -395,8 +378,8 @@ static void process_close_handles(void **argv)
{
Process *proc = argv[0];
flush_stream(proc, proc->out);
flush_stream(proc, proc->err);
flush_stream(proc, &proc->out);
flush_stream(proc, &proc->err);
process_close_streams(proc);
process_close(proc);

View File

@ -23,13 +23,14 @@ struct process {
uint64_t stopped_time;
const char *cwd;
char **argv;
Stream *in, *out, *err;
Stream in, out, err;
process_exit_cb cb;
internal_process_cb internal_exit_cb, internal_close_cb;
bool closed, detach;
MultiQueue *events;
};
static inline Process process_init(Loop *loop, ProcessType type, void *data)
{
return (Process) {
@ -38,14 +39,14 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
.loop = loop,
.events = NULL,
.pid = 0,
.status = 0,
.status = -1,
.refcount = 0,
.stopped_time = 0,
.cwd = NULL,
.argv = NULL,
.in = NULL,
.out = NULL,
.err = NULL,
.in = { .closed = false },
.out = { .closed = false },
.err = { .closed = false },
.cb = NULL,
.closed = false,
.internal_close_cb = NULL,
@ -54,6 +55,11 @@ static inline Process process_init(Loop *loop, ProcessType type, void *data)
};
}
static inline bool process_is_stopped(Process *proc)
{
return proc->stopped_time != 0;
}
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "event/process.h.generated.h"
#endif

View File

@ -105,20 +105,20 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
{
Stream *stream = uvstream->data;
if (cnt > 0) {
stream->num_bytes += (size_t)cnt;
}
if (cnt <= 0) {
if (cnt != UV_ENOBUFS
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
//
// We don't need to do anything with the RBuffer because the next call
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
// won't be called)
&& cnt != 0) {
DLOG("closing Stream: %p: %s (%s)", stream,
// cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
// http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
//
// We don't need to do anything with the RBuffer because the next call
// to `alloc_cb` will return the same unused pointer(`rbuffer_produced`
// won't be called)
if (cnt == UV_ENOBUFS || cnt == 0) {
return;
} else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
// The TTY driver might signal TTY without closing the stream
invoke_read_cb(stream, 0, true);
} else {
DLOG("Closing Stream (%p): %s (%s)", stream,
uv_err_name((int)cnt), os_strerror((int)cnt));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
@ -130,6 +130,7 @@ static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
// at this point we're sure that cnt is positive, no error occurred
size_t nread = (size_t)cnt;
stream->num_bytes += nread;
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(stream->buffer, nread);
@ -187,6 +188,7 @@ static void read_event(void **argv)
if (stream->read_cb) {
size_t count = (uintptr_t)argv[1];
bool eof = (uintptr_t)argv[2];
stream->did_eof = eof;
stream->read_cb(stream, stream->buffer, count, stream->cb_data, eof);
}
stream->pending_reqs--;

View File

@ -92,6 +92,13 @@ void stream_close(Stream *stream, stream_close_cb on_stream_close, void *data)
}
}
void stream_may_close(Stream *stream)
{
if (!stream->closed) {
stream_close(stream, NULL, NULL);
}
}
void stream_close_handle(Stream *stream)
FUNC_ATTR_NONNULL_ALL
{

View File

@ -14,10 +14,7 @@ typedef struct stream Stream;
///
/// @param stream The Stream instance
/// @param rbuffer The associated RBuffer instance
/// @param count Number of bytes to read. This must be respected if keeping
/// the order of events is a requirement. This is because events
/// may be queued and only processed later when more data is copied
/// into to the buffer, so one read may starve another.
/// @param count Number of bytes that was read.
/// @param data User-defined data
/// @param eof If the stream reached EOF.
typedef void (*stream_read_cb)(Stream *stream, RBuffer *buf, size_t count,
@ -33,6 +30,8 @@ typedef void (*stream_write_cb)(Stream *stream, void *data, int status);
typedef void (*stream_close_cb)(Stream *stream, void *data);
struct stream {
bool closed;
bool did_eof;
union {
uv_pipe_t pipe;
uv_tcp_t tcp;
@ -52,7 +51,6 @@ struct stream {
size_t maxmem;
size_t pending_reqs;
size_t num_bytes;
bool closed;
MultiQueue *events;
};

View File

@ -1074,11 +1074,17 @@ EXTERN char_u e_invexpr2[] INIT(= N_("E15: Invalid expression: %s"));
EXTERN char_u e_invrange[] INIT(= N_("E16: Invalid range"));
EXTERN char_u e_invcmd[] INIT(= N_("E476: Invalid command"));
EXTERN char_u e_isadir2[] INIT(= N_("E17: \"%s\" is a directory"));
EXTERN char_u e_invjob[] INIT(= N_("E900: Invalid job id"));
EXTERN char_u e_invchan[] INIT(= N_("E900: Invalid channel id"));
EXTERN char_u e_invchanjob[] INIT(= N_("E900: Invalid channel id: not a job"));
EXTERN char_u e_jobtblfull[] INIT(= N_("E901: Job table is full"));
EXTERN char_u e_jobspawn[] INIT(= N_(
"E903: Process failed to start: %s: \"%s\""));
EXTERN char_u e_jobnotpty[] INIT(= N_("E904: Job is not connected to a pty"));
"E903: Process failed to start: %s: \"%s\""));
EXTERN char_u e_channotpty[] INIT(= N_("E904: channel is not a pty"));
EXTERN char_u e_stdiochan2[] INIT(= N_(
"E905: Couldn't open stdio channel: %s"));
EXTERN char_u e_invstream[] INIT(= N_("E906: invalid stream for channel"));
EXTERN char_u e_invstreamrpc[] INIT(= N_(
"E906: invalid stream for rpc channel, use 'rpc'"));
EXTERN char_u e_libcall[] INIT(= N_("E364: Library call failed for \"%s()\""));
EXTERN char_u e_mkdir[] INIT(= N_("E739: Cannot create directory %s: %s"));
EXTERN char_u e_markinval[] INIT(= N_("E19: Mark has invalid line number"));
@ -1189,9 +1195,9 @@ EXTERN char *ignoredp;
// If a msgpack-rpc channel should be started over stdin/stdout
EXTERN bool embedded_mode INIT(= false);
/// next free id for a job or rpc channel
EXTERN uint64_t next_chan_id INIT(= 1);
// Dont try to start an user interface
// or read/write to stdio (unless embedding)
EXTERN bool headless_mode INIT(= false);
/// Used to track the status of external functions.
/// Currently only used for iconv().

View File

@ -778,7 +778,6 @@ err_closing:
if (execl("/bin/sh", "sh", "-c", cmd, (char *)NULL) == -1)
PERROR(_("cs_create_connection exec failed"));
stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
exit(127);
/* NOTREACHED */
default: /* parent. */

View File

@ -73,6 +73,9 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/private/handle.h"
#include "nvim/api/private/dispatch.h"
#ifndef WIN32
# include "nvim/os/pty_process_unix.h"
#endif
/* Maximum number of commands from + or -c arguments. */
#define MAX_ARG_CMDS 10
@ -103,7 +106,6 @@ typedef struct {
bool input_isatty; // stdin is a terminal
bool output_isatty; // stdout is a terminal
bool err_isatty; // stderr is a terminal
bool headless; // Do not start the builtin UI.
int no_swap_file; // "-n" argument used
int use_debug_break_level;
int window_count; /* number of windows to use */
@ -298,8 +300,8 @@ int main(int argc, char **argv)
assert(p_ch >= 0 && Rows >= p_ch && Rows - p_ch <= INT_MAX);
cmdline_row = (int)(Rows - p_ch);
msg_row = cmdline_row;
screenalloc(false); /* allocate screen buffers */
set_init_2(params.headless);
screenalloc(false); // allocate screen buffers
set_init_2(headless_mode);
TIME_MSG("inits 2");
msg_scroll = TRUE;
@ -311,8 +313,9 @@ int main(int argc, char **argv)
/* Set the break level after the terminal is initialized. */
debug_break_level = params.use_debug_break_level;
bool reading_input = !params.headless && (params.input_isatty
|| params.output_isatty || params.err_isatty);
bool reading_input = !headless_mode
&& (params.input_isatty || params.output_isatty
|| params.err_isatty);
if (reading_input) {
// One of the startup commands (arguments, sourced scripts or plugins) may
@ -448,7 +451,7 @@ int main(int argc, char **argv)
wait_return(TRUE);
}
if (!params.headless) {
if (!headless_mode) {
// Stop reading from input stream, the UI layer will take over now.
input_stop();
ui_builtin_start();
@ -809,11 +812,14 @@ static void command_line_scan(mparm_T *parmp)
}
mch_exit(0);
} else if (STRICMP(argv[0] + argv_idx, "headless") == 0) {
parmp->headless = true;
headless_mode = true;
} else if (STRICMP(argv[0] + argv_idx, "embed") == 0) {
embedded_mode = true;
parmp->headless = true;
channel_from_stdio();
headless_mode = true;
const char *err;
if (!channel_from_stdio(true, CALLBACK_READER_INIT, &err)) {
abort();
}
} else if (STRNICMP(argv[0] + argv_idx, "literal", 7) == 0) {
#if !defined(UNIX)
parmp->literal = TRUE;
@ -1216,7 +1222,6 @@ static void init_params(mparm_T *paramp, int argc, char **argv)
memset(paramp, 0, sizeof(*paramp));
paramp->argc = argc;
paramp->argv = argv;
paramp->headless = false;
paramp->want_full_screen = true;
paramp->use_debug_break_level = -1;
paramp->window_count = -1;
@ -1245,6 +1250,14 @@ static void check_and_set_isatty(mparm_T *paramp)
stdout_isatty
= paramp->output_isatty = os_isatty(fileno(stdout));
paramp->err_isatty = os_isatty(fileno(stderr));
int tty_fd = paramp->input_isatty
? OS_STDIN_FILENO
: (paramp->output_isatty
? OS_STDOUT_FILENO
: (paramp->err_isatty ? OS_STDERR_FILENO : -1));
#ifndef WIN32
pty_process_save_termios(tty_fd);
#endif
TIME_MSG("window checked");
}
@ -1387,7 +1400,7 @@ static void handle_tag(char_u *tagname)
// When starting in Ex mode and commands come from a file, set Silent mode.
static void check_tty(mparm_T *parmp)
{
if (parmp->headless) {
if (headless_mode) {
return;
}

View File

@ -2622,7 +2622,10 @@ void preserve_exit(void)
// Prevent repeated calls into this method.
if (really_exiting) {
stream_set_blocking(input_global_fd(), true); //normalize stream (#2598)
if (input_global_fd() >= 0) {
// normalize stream (#2598)
stream_set_blocking(input_global_fd(), true);
}
exit(2);
}

View File

@ -11,8 +11,8 @@
#include "nvim/api/private/helpers.h"
#include "nvim/api/vim.h"
#include "nvim/api/ui.h"
#include "nvim/channel.h"
#include "nvim/msgpack_rpc/channel.h"
#include "nvim/msgpack_rpc/server.h"
#include "nvim/event/loop.h"
#include "nvim/event/libuv_process.h"
#include "nvim/event/rstream.h"
@ -29,58 +29,14 @@
#include "nvim/map.h"
#include "nvim/log.h"
#include "nvim/misc1.h"
#include "nvim/path.h"
#include "nvim/lib/kvec.h"
#include "nvim/os/input.h"
#define CHANNEL_BUFFER_SIZE 0xffff
#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
#define log_client_msg(...)
#define log_server_msg(...)
#endif
typedef enum {
kChannelTypeSocket,
kChannelTypeProc,
kChannelTypeStdio,
kChannelTypeInternal
} ChannelType;
typedef struct {
uint64_t request_id;
bool returned, errored;
Object result;
} ChannelCallFrame;
typedef struct {
uint64_t id;
size_t refcount;
PMap(cstr_t) *subscribed_events;
bool closed;
ChannelType type;
msgpack_unpacker *unpacker;
union {
Stream stream; // bidirectional (socket)
Process *proc;
struct {
Stream in;
Stream out;
} std;
} data;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
MultiQueue *events;
} Channel;
typedef struct {
Channel *channel;
MsgpackRpcRequestHandler handler;
Array args;
uint64_t request_id;
} RequestEvent;
static PMap(uint64_t) *channels = NULL;
static PMap(cstr_t) *event_strings = NULL;
static msgpack_sbuffer out_buffer;
@ -88,102 +44,44 @@ static msgpack_sbuffer out_buffer;
# include "msgpack_rpc/channel.c.generated.h"
#endif
/// Initializes the module
void channel_init(void)
void rpc_init(void)
{
ch_before_blocking_events = multiqueue_new_child(main_loop.events);
channels = pmap_new(uint64_t)();
event_strings = pmap_new(cstr_t)();
msgpack_sbuffer_init(&out_buffer);
remote_ui_init();
}
/// Teardown the module
void channel_teardown(void)
void rpc_start(Channel *channel)
{
if (!channels) {
return;
channel_incref(channel);
channel->is_rpc = true;
RpcState *rpc = &channel->rpc;
rpc->closed = false;
rpc->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rpc->subscribed_events = pmap_new(cstr_t)();
rpc->next_request_id = 1;
kv_init(rpc->call_stack);
if (channel->streamtype != kChannelStreamInternal) {
Stream *out = channel_outstream(channel);
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
Stream *in = channel_instream(channel);
DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, in, out);
#endif
rstream_start(out, receive_msgpack, channel);
}
Channel *channel;
map_foreach_value(channels, channel, {
close_channel(channel);
});
}
/// Creates an API channel by starting a process and connecting to its
/// stdin/stdout. stderr is handled by the job infrastructure.
///
/// @param proc process object
/// @param id (optional) channel id
/// @param source description of source function, rplugin name, TCP addr, etc
///
/// @return Channel id (> 0), on success. 0, on error.
uint64_t channel_from_process(Process *proc, uint64_t id, char *source)
static Channel *find_rpc_channel(uint64_t id)
{
Channel *channel = register_channel(kChannelTypeProc, id, proc->events,
source);
incref(channel); // process channels are only closed by the exit_cb
channel->data.proc = proc;
wstream_init(proc->in, 0);
rstream_init(proc->out, 0);
rstream_start(proc->out, receive_msgpack, channel);
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, proc->in,
proc->out);
return channel->id;
}
/// Creates an API channel from a tcp/pipe socket connection
///
/// @param watcher The SocketWatcher ready to accept the connection
void channel_from_connection(SocketWatcher *watcher)
{
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL,
watcher->addr);
socket_watcher_accept(watcher, &channel->data.stream);
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.stream, receive_msgpack, channel);
DLOG("ch %" PRIu64 " in/out-stream=%p", channel->id,
&channel->data.stream);
}
/// @param source description of source function, rplugin name, TCP addr, etc
uint64_t channel_connect(bool tcp, const char *address, int timeout,
char *source, const char **error)
{
if (!tcp) {
char *path = fix_fname(address);
if (server_owns_pipe_address(path)) {
// avoid deadlock
xfree(path);
return channel_create_internal();
}
xfree(path);
Channel *chan = find_channel(id);
if (!chan || !chan->is_rpc || chan->rpc.closed) {
return NULL;
}
Channel *channel = register_channel(kChannelTypeSocket, 0, NULL, source);
if (!socket_connect(&main_loop, &channel->data.stream,
tcp, address, timeout, error)) {
decref(channel);
return 0;
}
incref(channel); // close channel only after the stream is closed
channel->data.stream.internal_close_cb = close_cb;
channel->data.stream.internal_data = channel;
wstream_init(&channel->data.stream, 0);
rstream_init(&channel->data.stream, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.stream, receive_msgpack, channel);
return channel->id;
return chan;
}
/// Publishes an event to a channel.
@ -192,12 +90,11 @@ uint64_t channel_connect(bool tcp, const char *address, int timeout,
/// @param name Event name (application-defined)
/// @param args Array of event arguments
/// @return True if the event was sent successfully, false otherwise.
bool channel_send_event(uint64_t id, const char *name, Array args)
bool rpc_send_event(uint64_t id, const char *name, Array args)
{
Channel *channel = NULL;
if (id && (!(channel = pmap_get(uint64_t)(channels, id))
|| channel->closed)) {
if (id && (!(channel = find_rpc_channel(id)))) {
api_free_array(args);
return false;
}
@ -218,29 +115,30 @@ bool channel_send_event(uint64_t id, const char *name, Array args)
/// @param args Array with method arguments
/// @param[out] error True if the return value is an error
/// @return Whatever the remote method returned
Object channel_send_call(uint64_t id,
const char *method_name,
Array args,
Error *err)
Object rpc_send_call(uint64_t id,
const char *method_name,
Array args,
Error *err)
{
Channel *channel = NULL;
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
if (!(channel = find_rpc_channel(id))) {
api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
api_free_array(args);
return NIL;
}
incref(channel);
uint64_t request_id = channel->next_request_id++;
channel_incref(channel);
RpcState *rpc = &channel->rpc;
uint64_t request_id = rpc->next_request_id++;
// Send the msgpack-rpc request
send_request(channel, request_id, method_name, args);
// Push the frame
ChannelCallFrame frame = { request_id, false, false, NIL };
kv_push(channel->call_stack, &frame);
kv_push(rpc->call_stack, &frame);
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned);
(void)kv_pop(channel->call_stack);
(void)kv_pop(rpc->call_stack);
if (frame.errored) {
if (frame.result.type == kObjectTypeString) {
@ -265,7 +163,7 @@ Object channel_send_call(uint64_t id,
api_free_object(frame.result);
}
decref(channel);
channel_decref(channel);
return frame.errored ? NIL : frame.result;
}
@ -274,11 +172,11 @@ Object channel_send_call(uint64_t id,
///
/// @param id The channel id
/// @param event The event type string
void channel_subscribe(uint64_t id, char *event)
void rpc_subscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
if (!(channel = find_rpc_channel(id))) {
abort();
}
@ -289,81 +187,32 @@ void channel_subscribe(uint64_t id, char *event)
pmap_put(cstr_t)(event_strings, event_string, event_string);
}
pmap_put(cstr_t)(channel->subscribed_events, event_string, event_string);
pmap_put(cstr_t)(channel->rpc.subscribed_events, event_string, event_string);
}
/// Unsubscribes to event broadcasts
///
/// @param id The channel id
/// @param event The event type string
void channel_unsubscribe(uint64_t id, char *event)
void rpc_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
if (!(channel = find_rpc_channel(id))) {
abort();
}
unsubscribe(channel, event);
}
/// Closes a channel
///
/// @param id The channel id
/// @return true if successful, false otherwise
bool channel_close(uint64_t id)
{
Channel *channel;
if (!(channel = pmap_get(uint64_t)(channels, id)) || channel->closed) {
return false;
}
close_channel(channel);
return true;
}
/// Creates an API channel from stdin/stdout. Used to embed Nvim.
void channel_from_stdio(void)
{
Channel *channel = register_channel(kChannelTypeStdio, 0, NULL, NULL);
incref(channel); // stdio channels are only closed on exit
// read stream
rstream_init_fd(&main_loop, &channel->data.std.in, 0, CHANNEL_BUFFER_SIZE);
rstream_start(&channel->data.std.in, receive_msgpack, channel);
// write stream
wstream_init_fd(&main_loop, &channel->data.std.out, 1, 0);
DLOG("ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
&channel->data.std.in, &channel->data.std.out);
}
/// Creates a loopback channel. This is used to avoid deadlock
/// when an instance connects to its own named pipe.
uint64_t channel_create_internal(void)
{
Channel *channel = register_channel(kChannelTypeInternal, 0, NULL, NULL);
incref(channel); // internal channel lives until process exit
return channel->id;
}
void channel_process_exit(uint64_t id, int status)
{
Channel *channel = pmap_get(uint64_t)(channels, id);
channel->closed = true;
decref(channel);
}
// rstream.c:read_event() invokes this as stream->read_cb().
static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
void *data, bool eof)
{
Channel *channel = data;
incref(channel);
channel_incref(channel);
if (eof) {
close_channel(channel);
channel_close(channel->id, kChannelPartRpc, NULL);
char buf[256];
snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the client",
channel->id);
@ -371,30 +220,19 @@ static void receive_msgpack(Stream *stream, RBuffer *rbuf, size_t c,
goto end;
}
if ((chan_wstream(channel) != NULL && chan_wstream(channel)->closed)
|| (chan_rstream(channel) != NULL && chan_rstream(channel)->closed)) {
char buf[256];
snprintf(buf, sizeof(buf),
"ch %" PRIu64 ": stream closed unexpectedly. "
"closing channel",
channel->id);
call_set_error(channel, buf, WARN_LOG_LEVEL);
goto end;
}
size_t count = rbuffer_size(rbuf);
DLOG("ch %" PRIu64 ": parsing %u bytes from msgpack Stream: %p",
DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
channel->id, count, stream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->unpacker), count);
msgpack_unpacker_buffer_consumed(channel->unpacker, count);
msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, count);
rbuffer_read(rbuf, msgpack_unpacker_buffer(channel->rpc.unpacker), count);
msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, count);
parse_msgpack(channel);
end:
decref(channel);
channel_decref(channel);
}
static void parse_msgpack(Channel *channel)
@ -404,8 +242,8 @@ static void parse_msgpack(Channel *channel)
msgpack_unpack_return result;
// Deserialize everything we can.
while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
MSGPACK_UNPACK_SUCCESS) {
while ((result = msgpack_unpacker_next(channel->rpc.unpacker, &unpacked)) ==
MSGPACK_UNPACK_SUCCESS) {
bool is_response = is_rpc_response(&unpacked.data);
log_client_msg(channel->id, !is_response, unpacked.data);
@ -431,7 +269,7 @@ static void parse_msgpack(Channel *channel)
if (result == MSGPACK_UNPACK_NOMEM_ERROR) {
mch_errmsg(e_outofmem);
mch_errmsg("\n");
decref(channel);
channel_decref(channel);
preserve_exit();
}
@ -496,7 +334,7 @@ static void handle_request(Channel *channel, msgpack_object *request)
evdata->handler = handler;
evdata->args = args;
evdata->request_id = request_id;
incref(channel);
channel_incref(channel);
if (handler.async) {
bool is_get_mode = handler.fn == handle_nvim_get_mode;
@ -534,66 +372,30 @@ static void on_request_event(void **argv)
api_free_object(result);
}
api_free_array(args);
decref(channel);
channel_decref(channel);
xfree(e);
api_clear_error(&error);
}
/// Returns the Stream that a Channel writes to.
static Stream *chan_wstream(Channel *chan)
{
switch (chan->type) {
case kChannelTypeSocket:
return &chan->data.stream;
case kChannelTypeProc:
return chan->data.proc->in;
case kChannelTypeStdio:
return &chan->data.std.out;
case kChannelTypeInternal:
return NULL;
}
abort();
}
/// Returns the Stream that a Channel reads from.
static Stream *chan_rstream(Channel *chan)
{
switch (chan->type) {
case kChannelTypeSocket:
return &chan->data.stream;
case kChannelTypeProc:
return chan->data.proc->out;
case kChannelTypeStdio:
return &chan->data.std.in;
case kChannelTypeInternal:
return NULL;
}
abort();
}
static bool channel_write(Channel *channel, WBuffer *buffer)
{
bool success = false;
bool success;
if (channel->closed) {
if (channel->rpc.closed) {
wstream_release_wbuffer(buffer);
return false;
}
switch (channel->type) {
case kChannelTypeSocket:
case kChannelTypeProc:
case kChannelTypeStdio:
success = wstream_write(chan_wstream(channel), buffer);
break;
case kChannelTypeInternal:
incref(channel);
CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
success = true;
break;
if (channel->streamtype == kChannelStreamInternal) {
channel_incref(channel);
CREATE_EVENT(channel->events, internal_read_event, 2, channel, buffer);
success = true;
} else {
Stream *in = channel_instream(channel);
success = wstream_write(in, buffer);
}
if (!success) {
// If the write failed for any reason, close the channel
char buf[256];
@ -613,14 +415,14 @@ static void internal_read_event(void **argv)
Channel *channel = argv[0];
WBuffer *buffer = argv[1];
msgpack_unpacker_reserve_buffer(channel->unpacker, buffer->size);
memcpy(msgpack_unpacker_buffer(channel->unpacker),
msgpack_unpacker_reserve_buffer(channel->rpc.unpacker, buffer->size);
memcpy(msgpack_unpacker_buffer(channel->rpc.unpacker),
buffer->data, buffer->size);
msgpack_unpacker_buffer_consumed(channel->unpacker, buffer->size);
msgpack_unpacker_buffer_consumed(channel->rpc.unpacker, buffer->size);
parse_msgpack(channel);
decref(channel);
channel_decref(channel);
wstream_release_wbuffer(buffer);
}
@ -669,7 +471,8 @@ static void broadcast_event(const char *name, Array args)
Channel *channel;
map_foreach_value(channels, channel, {
if (pmap_has(cstr_t)(channel->subscribed_events, name)) {
if (channel->is_rpc
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, name)) {
kv_push(subscribed, channel);
}
});
@ -699,10 +502,11 @@ end:
static void unsubscribe(Channel *channel, char *event)
{
char *event_string = pmap_get(cstr_t)(event_strings, event);
pmap_del(cstr_t)(channel->subscribed_events, event_string);
pmap_del(cstr_t)(channel->rpc.subscribed_events, event_string);
map_foreach_value(channels, channel, {
if (pmap_has(cstr_t)(channel->subscribed_events, event_string)) {
if (channel->is_rpc
&& pmap_has(cstr_t)(channel->rpc.subscribed_events, event_string)) {
return;
}
});
@ -712,98 +516,43 @@ static void unsubscribe(Channel *channel, char *event)
xfree(event_string);
}
/// Close the channel streams/process and free the channel resources.
static void close_channel(Channel *channel)
/// Mark rpc state as closed, and release its reference to the channel.
/// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
void rpc_close(Channel *channel)
{
if (channel->closed) {
if (channel->rpc.closed) {
return;
}
channel->closed = true;
channel->rpc.closed = true;
channel_decref(channel);
switch (channel->type) {
case kChannelTypeSocket:
stream_close(&channel->data.stream, NULL, NULL);
break;
case kChannelTypeProc:
// Only close the rpc channel part,
// there could be an error message on the stderr stream
process_close_in(channel->data.proc);
process_close_out(channel->data.proc);
break;
case kChannelTypeStdio:
stream_close(&channel->data.std.in, NULL, NULL);
stream_close(&channel->data.std.out, NULL, NULL);
multiqueue_put(main_loop.fast_events, exit_event, 1, channel);
return;
case kChannelTypeInternal:
// nothing to free.
break;
if (channel->streamtype == kChannelStreamStdio) {
multiqueue_put(main_loop.fast_events, exit_event, 0);
}
decref(channel);
}
static void exit_event(void **argv)
{
decref(argv[0]);
if (!exiting) {
mch_exit(0);
}
}
static void free_channel(Channel *channel)
void rpc_free(Channel *channel)
{
remote_ui_disconnect(channel->id);
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
msgpack_unpacker_free(channel->rpc.unpacker);
// Unsubscribe from all events
char *event_string;
map_foreach_value(channel->subscribed_events, event_string, {
map_foreach_value(channel->rpc.subscribed_events, event_string, {
unsubscribe(channel, event_string);
});
pmap_free(cstr_t)(channel->subscribed_events);
kv_destroy(channel->call_stack);
if (channel->type != kChannelTypeProc) {
multiqueue_free(channel->events);
}
xfree(channel);
}
static void close_cb(Stream *stream, void *data)
{
decref(data);
}
/// @param source description of source function, rplugin name, TCP addr, etc
static Channel *register_channel(ChannelType type, uint64_t id,
MultiQueue *events, char *source)
{
// Jobs and channels share the same id namespace.
assert(id == 0 || !pmap_get(uint64_t)(channels, id));
Channel *rv = xmalloc(sizeof(Channel));
rv->events = events ? events : multiqueue_new_child(main_loop.events);
rv->type = type;
rv->refcount = 1;
rv->closed = false;
rv->unpacker = msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
rv->id = id > 0 ? id : next_chan_id++;
rv->subscribed_events = pmap_new(cstr_t)();
rv->next_request_id = 1;
kv_init(rv->call_stack);
pmap_put(uint64_t)(channels, rv->id, rv);
ILOG("new channel %" PRIu64 " (%s): %s", rv->id,
(type == kChannelTypeProc ? "proc"
: (type == kChannelTypeSocket ? "socket"
: (type == kChannelTypeStdio ? "stdio"
: (type == kChannelTypeInternal ? "internal" : "?")))),
(source ? source : "?"));
return rv;
pmap_free(cstr_t)(channel->rpc.subscribed_events);
kv_destroy(channel->rpc.call_stack);
}
static bool is_rpc_response(msgpack_object *obj)
@ -818,15 +567,18 @@ static bool is_rpc_response(msgpack_object *obj)
static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
{
uint64_t response_id = obj->via.array.ptr[1].via.u64;
if (kv_size(channel->rpc.call_stack) == 0) {
return false;
}
// Must be equal to the frame at the stack's bottom
return kv_size(channel->call_stack) && response_id
== kv_A(channel->call_stack, kv_size(channel->call_stack) - 1)->request_id;
ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
return response_id == frame->request_id;
}
static void complete_call(msgpack_object *obj, Channel *channel)
{
ChannelCallFrame *frame = kv_A(channel->call_stack,
kv_size(channel->call_stack) - 1);
ChannelCallFrame *frame = kv_last(channel->rpc.call_stack);
frame->returned = true;
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
@ -840,15 +592,15 @@ static void complete_call(msgpack_object *obj, Channel *channel)
static void call_set_error(Channel *channel, char *msg, int loglevel)
{
LOG(loglevel, "RPC: %s", msg);
for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_A(channel->call_stack, i);
for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
frame->returned = true;
frame->errored = true;
api_free_object(frame->result);
frame->result = STRING_OBJ(cstr_to_string(msg));
}
close_channel(channel);
channel_close(channel->id, kChannelPartRpc, NULL);
}
static WBuffer *serialize_request(uint64_t channel_id,
@ -890,18 +642,6 @@ static WBuffer *serialize_response(uint64_t channel_id,
return rv;
}
static void incref(Channel *channel)
{
channel->refcount++;
}
static void decref(Channel *channel)
{
if (!(--channel->refcount)) {
free_channel(channel);
}
}
#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
#define REQ "[request] "
#define RES "[response] "

View File

@ -8,6 +8,7 @@
#include "nvim/event/socket.h"
#include "nvim/event/process.h"
#include "nvim/vim.h"
#include "nvim/channel.h"
#define METHOD_MAXLEN 512
@ -16,6 +17,7 @@
/// of os_inchar(), so they are processed "just-in-time".
MultiQueue *ch_before_blocking_events;
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "msgpack_rpc/channel.h.generated.h"
#endif

View File

@ -0,0 +1,36 @@
#ifndef NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
#define NVIM_MSGPACK_RPC_CHANNEL_DEFS_H
#include <stdbool.h>
#include <uv.h>
#include <msgpack.h>
#include "nvim/api/private/defs.h"
#include "nvim/event/socket.h"
#include "nvim/event/process.h"
#include "nvim/vim.h"
typedef struct Channel Channel;
typedef struct {
uint64_t request_id;
bool returned, errored;
Object result;
} ChannelCallFrame;
typedef struct {
Channel *channel;
MsgpackRpcRequestHandler handler;
Array args;
uint64_t request_id;
} RequestEvent;
typedef struct {
PMap(cstr_t) *subscribed_events;
bool closed;
msgpack_unpacker *unpacker;
uint64_t next_request_id;
kvec_t(ChannelCallFrame *) call_stack;
} RpcState;
#endif // NVIM_MSGPACK_RPC_CHANNEL_DEFS_H

View File

@ -115,6 +115,7 @@ static int p_bomb;
static char_u *p_bh;
static char_u *p_bt;
static int p_bl;
static long p_channel;
static int p_ci;
static int p_cin;
static char_u *p_cink;
@ -4193,6 +4194,9 @@ static char *set_num_option(int opt_idx, char_u *varp, long value,
curbuf->b_p_imsearch = B_IMODE_NONE;
}
p_imsearch = curbuf->b_p_imsearch;
} else if (pp == &p_channel || pp == &curbuf->b_p_channel) {
errmsg = e_invarg;
*pp = old_value;
}
/* if 'titlelen' has changed, redraw the title */
else if (pp == &p_titlelen) {
@ -5472,6 +5476,7 @@ static char_u *get_varp(vimoption_T *p)
case PV_BH: return (char_u *)&(curbuf->b_p_bh);
case PV_BT: return (char_u *)&(curbuf->b_p_bt);
case PV_BL: return (char_u *)&(curbuf->b_p_bl);
case PV_CHANNEL:return (char_u *)&(curbuf->b_p_channel);
case PV_CI: return (char_u *)&(curbuf->b_p_ci);
case PV_CIN: return (char_u *)&(curbuf->b_p_cin);
case PV_CINK: return (char_u *)&(curbuf->b_p_cink);
@ -5773,6 +5778,7 @@ void buf_copy_options(buf_T *buf, int flags)
buf->b_p_nf = vim_strsave(p_nf);
buf->b_p_mps = vim_strsave(p_mps);
buf->b_p_si = p_si;
buf->b_p_channel = 0;
buf->b_p_ci = p_ci;
buf->b_p_cin = p_cin;
buf->b_p_cink = vim_strsave(p_cink);

View File

@ -695,6 +695,7 @@ enum {
, BV_BIN
, BV_BL
, BV_BOMB
, BV_CHANNEL
, BV_CI
, BV_CIN
, BV_CINK

View File

@ -294,6 +294,14 @@ return {
varname='p_cedit',
defaults={if_true={vi="", vim=macros('CTRL_F_STR')}}
},
{
full_name='channel',
type='number', scope={'buffer'},
no_mkrc=true,
nodefault=true,
varname='p_channel',
defaults={if_true={vi=0}}
},
{
full_name='charconvert', abbreviation='ccv',
type='string', scope={'global'},

View File

@ -37,7 +37,7 @@ typedef enum {
static Stream read_stream = {.closed = true};
static RBuffer *input_buffer = NULL;
static bool input_eof = false;
static int global_fd = 0;
static int global_fd = -1;
static int events_enabled = 0;
static bool blocking = false;

View File

@ -36,23 +36,36 @@
# include "os/pty_process_unix.c.generated.h"
#endif
/// termios saved at startup (for TUI) or initialized by pty_process_spawn().
static struct termios termios_default;
/// Saves the termios properties associated with `tty_fd`.
///
/// @param tty_fd TTY file descriptor, or -1 if not in a terminal.
void pty_process_save_termios(int tty_fd)
{
if (tty_fd == -1 || tcgetattr(tty_fd, &termios_default) != 0) {
return;
}
}
/// @returns zero on success, or negative error code
int pty_process_spawn(PtyProcess *ptyproc)
FUNC_ATTR_NONNULL_ALL
{
static struct termios termios;
if (!termios.c_cflag) {
init_termios(&termios);
if (!termios_default.c_cflag) {
// TODO(jkeyes): We could pass NULL to forkpty() instead ...
init_termios(&termios_default);
}
int status = 0; // zero or negative error code (libuv convention)
Process *proc = (Process *)ptyproc;
assert(!proc->err);
assert(proc->err.closed);
uv_signal_start(&proc->loop->children_watcher, chld_handler, SIGCHLD);
ptyproc->winsize = (struct winsize){ ptyproc->height, ptyproc->width, 0, 0 };
uv_disable_stdio_inheritance();
int master;
int pid = forkpty(&master, NULL, &termios, &ptyproc->winsize);
int pid = forkpty(&master, NULL, &termios_default, &ptyproc->winsize);
if (pid < 0) {
status = -errno;
@ -83,12 +96,12 @@ int pty_process_spawn(PtyProcess *ptyproc)
goto error;
}
if (proc->in
&& (status = set_duplicating_descriptor(master, &proc->in->uv.pipe))) {
if (!proc->in.closed
&& (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) {
goto error;
}
if (proc->out
&& (status = set_duplicating_descriptor(master, &proc->out->uv.pipe))) {
if (!proc->out.closed
&& (status = set_duplicating_descriptor(master, &proc->out.uv.pipe))) {
goto error;
}

View File

@ -44,7 +44,7 @@ int pty_process_spawn(PtyProcess *ptyproc)
wchar_t *cwd = NULL;
const char *emsg = NULL;
assert(!proc->err);
assert(proc->err.closed);
cfg = winpty_config_new(WINPTY_FLAG_ALLOW_CURPROC_DESKTOP_CREATION, &err);
if (cfg == NULL) {
@ -71,20 +71,20 @@ int pty_process_spawn(PtyProcess *ptyproc)
goto cleanup;
}
if (proc->in != NULL) {
if (!proc->in.closed) {
in_req = xmalloc(sizeof(uv_connect_t));
uv_pipe_connect(
in_req,
&proc->in->uv.pipe,
&proc->in.uv.pipe,
in_name,
pty_process_connect_cb);
}
if (proc->out != NULL) {
if (!proc->out.closed) {
out_req = xmalloc(sizeof(uv_connect_t));
uv_pipe_connect(
out_req,
&proc->out->uv.pipe,
&proc->out.uv.pipe,
out_name,
pty_process_connect_cb);
}
@ -228,7 +228,7 @@ static void wait_eof_timer_cb(uv_timer_t *wait_eof_timer)
PtyProcess *ptyproc = wait_eof_timer->data;
Process *proc = (Process *)ptyproc;
if (!proc->out || !uv_is_readable(proc->out->uvstream)) {
if (proc->out.closed || !uv_is_readable(proc->out.uvstream)) {
uv_timer_stop(&ptyproc->wait_eof_timer);
pty_process_finish2(ptyproc);
}

View File

@ -207,16 +207,12 @@ static int do_os_system(char **argv,
char prog[MAXPATHL];
xstrlcpy(prog, argv[0], MAXPATHL);
Stream in, out, err;
LibuvProcess uvproc = libuv_process_init(&main_loop, &buf);
Process *proc = &uvproc.process;
MultiQueue *events = multiqueue_new_child(main_loop.events);
proc->events = events;
proc->argv = argv;
proc->in = input != NULL ? &in : NULL;
proc->out = &out;
proc->err = &err;
int status = process_spawn(proc);
int status = process_spawn(proc, input != NULL, true, true);
if (status) {
loop_poll_events(&main_loop, 0);
// Failed, probably 'shell' is not executable.
@ -231,32 +227,29 @@ static int do_os_system(char **argv,
return -1;
}
// We want to deal with stream events as fast a possible while queueing
// process events, so reset everything to NULL. It prevents closing the
// Note: unlike process events, stream events are not queued, as we want to
// deal with stream events as fast a possible. It prevents closing the
// streams while there's still data in the OS buffer (due to the process
// exiting before all data is read).
if (input != NULL) {
proc->in->events = NULL;
wstream_init(proc->in, 0);
wstream_init(&proc->in, 0);
}
proc->out->events = NULL;
rstream_init(proc->out, 0);
rstream_start(proc->out, data_cb, &buf);
proc->err->events = NULL;
rstream_init(proc->err, 0);
rstream_start(proc->err, data_cb, &buf);
rstream_init(&proc->out, 0);
rstream_start(&proc->out, data_cb, &buf);
rstream_init(&proc->err, 0);
rstream_start(&proc->err, data_cb, &buf);
// write the input, if any
if (input) {
WBuffer *input_buffer = wstream_new_buffer((char *) input, len, 1, NULL);
if (!wstream_write(&in, input_buffer)) {
if (!wstream_write(&proc->in, input_buffer)) {
// couldn't write, stop the process and tell the user about it
process_stop(proc);
return -1;
}
// close the input stream after everything is written
wstream_set_write_cb(&in, shell_write_cb, NULL);
wstream_set_write_cb(&proc->in, shell_write_cb, NULL);
}
// Invoke busy_start here so LOOP_PROCESS_EVENTS_UNTIL will not change the
@ -684,10 +677,6 @@ static void shell_write_cb(Stream *stream, void *data, int status)
msg_schedule_emsgf(_("E5677: Error writing input to shell-command: %s"),
uv_err_name(status));
}
if (stream->closed) { // Process may have exited before this write.
WLOG("stream was already closed");
return;
}
stream_close(stream, NULL, NULL);
}

View File

@ -144,7 +144,9 @@ void mch_exit(int r) FUNC_ATTR_NORETURN
if (!event_teardown() && r == 0) {
r = 1; // Exit with error if main_loop did not teardown gracefully.
}
stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
if (input_global_fd() >= 0) {
stream_set_blocking(input_global_fd(), true); // normalize stream (#2598)
}
#ifdef EXITFREE
free_all_mem();

View File

@ -121,7 +121,7 @@ char *rbuffer_read_ptr(RBuffer *buf, size_t *read_count) FUNC_ATTR_NONNULL_ALL
{
if (!buf->size) {
*read_count = 0;
return NULL;
return buf->read_ptr;
}
if (buf->read_ptr < buf->write_ptr) {

View File

@ -1094,11 +1094,12 @@ static void refresh_terminal(Terminal *term)
// Calls refresh_terminal() on all invalidated_terminals.
static void refresh_timer_cb(TimeWatcher *watcher, void *data)
{
refresh_pending = false;
if (exiting // Cannot redraw (requires event loop) during teardown/exit.
// WM_LIST (^D) is not redrawn, unlike the normal wildmenu. So we must
// skip redraws to keep it visible.
|| wild_menu_showing == WM_LIST) {
goto end;
return;
}
Terminal *term;
void *stub; (void)(stub);
@ -1113,8 +1114,6 @@ static void refresh_timer_cb(TimeWatcher *watcher, void *data)
if (any_visible) {
redraw(true);
}
end:
refresh_pending = false;
}
static void refresh_size(Terminal *term, buf_T *buf)

View File

@ -262,6 +262,7 @@ describe('server -> client', function()
eq("done!",funcs.rpcrequest(jobid, "write_stderr", "fluff\n"))
eq({'notification', 'stderr', {0, {'fluff', ''}}}, next_message())
funcs.rpcrequest(jobid, "exit")
eq({'notification', 'stderr', {0, {''}}}, next_message())
eq({'notification', 'exit', {0, 0}}, next_message())
end)
end)

View File

@ -0,0 +1,266 @@
local helpers = require('test.functional.helpers')(after_each)
local clear, eq, eval, next_msg, ok, source = helpers.clear, helpers.eq,
helpers.eval, helpers.next_message, helpers.ok, helpers.source
local command, funcs, meths = helpers.command, helpers.funcs, helpers.meths
local sleep = helpers.sleep
local spawn, nvim_argv = helpers.spawn, helpers.nvim_argv
local set_session = helpers.set_session
local nvim_prog = helpers.nvim_prog
local retry = helpers.retry
local expect_twostreams = helpers.expect_twostreams
describe('channels', function()
local init = [[
function! Normalize(data) abort
" Windows: remove ^M
return type([]) == type(a:data)
\ ? map(a:data, 'substitute(v:val, "\r", "", "g")')
\ : a:data
endfunction
function! OnEvent(id, data, event) dict
call rpcnotify(1, a:event, a:id, a:data)
endfunction
]]
before_each(function()
clear()
source(init)
end)
it('can connect to socket', function()
local server = spawn(nvim_argv)
set_session(server)
local address = funcs.serverlist()[1]
local client = spawn(nvim_argv)
set_session(client, true)
source(init)
meths.set_var('address', address)
command("let g:id = sockconnect('pipe', address, {'on_data':'OnEvent'})")
local id = eval("g:id")
ok(id > 0)
command("call chansend(g:id, msgpackdump([[2,'nvim_set_var',['code',23]]]))")
set_session(server, true)
retry(nil, 1000, function()
eq(23, meths.get_var('code'))
end)
set_session(client, true)
command("call chansend(g:id, msgpackdump([[0,0,'nvim_eval',['2+3']]]))")
local res = eval("msgpackdump([[1,0,v:null,5]])")
eq({"\148\001\n\192\005"}, res)
eq({'notification', 'data', {id, res}}, next_msg())
command("call chansend(g:id, msgpackdump([[2,'nvim_command',['quit']]]))")
eq({'notification', 'data', {id, {''}}}, next_msg())
end)
it('can use stdio channel', function()
source([[
let g:job_opts = {
\ 'on_stdout': function('OnEvent'),
\ 'on_stderr': function('OnEvent'),
\ 'on_exit': function('OnEvent'),
\ }
]])
meths.set_var("nvim_prog", nvim_prog)
meths.set_var("code", [[
function! OnEvent(id, data, event) dict
let text = string([a:id, a:data, a:event])
call chansend(g:x, text)
if a:data == ['']
call chansend(v:stderr, "*dies*")
quit
endif
endfunction
let g:x = stdioopen({'on_stdin':'OnEvent'})
call chansend(x, "hello")
]])
command("let g:id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)")
local id = eval("g:id")
ok(id > 0)
eq({ "notification", "stdout", {id, { "hello" } } }, next_msg())
command("call chansend(id, 'howdy')")
eq({"notification", "stdout", {id, {"[1, ['howdy'], 'stdin']"}}}, next_msg())
command("call chanclose(id, 'stdin')")
expect_twostreams({{"notification", "stdout", {id, {"[1, [''], 'stdin']"}}},
{'notification', 'stdout', {id, {''}}}},
{{"notification", "stderr", {id, {"*dies*"}}},
{'notification', 'stderr', {id, {''}}}})
eq({"notification", "exit", {3,0}}, next_msg())
end)
local function expect_twoline(id, stream, line1, line2, nobr)
local msg = next_msg()
local joined = nobr and {line1..line2} or {line1, line2}
if not pcall(eq, {"notification", stream, {id, joined}}, msg) then
local sep = (not nobr) and "" or nil
eq({"notification", stream, {id, {line1, sep}}}, msg)
eq({"notification", stream, {id, {line2}}}, next_msg())
end
end
it('can use stdio channel with pty', function()
if helpers.pending_win32(pending) then return end
source([[
let g:job_opts = {
\ 'on_stdout': function('OnEvent'),
\ 'on_exit': function('OnEvent'),
\ 'pty': v:true,
\ }
]])
meths.set_var("nvim_prog", nvim_prog)
meths.set_var("code", [[
function! OnEvent(id, data, event) dict
let text = string([a:id, a:data, a:event])
call chansend(g:x, text)
endfunction
let g:x = stdioopen({'on_stdin':'OnEvent'})
]])
command("let g:id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)")
local id = eval("g:id")
ok(id > 0)
command("call chansend(id, 'TEXT\n')")
expect_twoline(id, "stdout", "TEXT\r", "[1, ['TEXT', ''], 'stdin']")
command("call chansend(id, 'neovan')")
eq({"notification", "stdout", {id, {"neovan"}}}, next_msg())
command("call chansend(id, '\127\127im\n')")
expect_twoline(id, "stdout", "\b \b\b \bim\r", "[1, ['neovim', ''], 'stdin']")
command("call chansend(id, 'incomplet\004')")
local is_freebsd = eval("system('uname') =~? 'FreeBSD'") == 1
local bsdlike = is_freebsd or (helpers.os_name() == "osx")
print("bsdlike:", bsdlike)
local extra = bsdlike and "^D\008\008" or ""
expect_twoline(id, "stdout",
"incomplet"..extra, "[1, ['incomplet'], 'stdin']", true)
command("call chansend(id, '\004')")
if bsdlike then
expect_twoline(id, "stdout", extra, "[1, [''], 'stdin']", true)
else
eq({"notification", "stdout", {id, {"[1, [''], 'stdin']"}}}, next_msg())
end
-- channel is still open
command("call chansend(id, 'hi again!\n')")
eq({"notification", "stdout", {id, {"hi again!\r", ""}}}, next_msg())
end)
it('stdio channel can use rpc and stderr simultaneously', function()
if helpers.pending_win32(pending) then return end
source([[
let g:job_opts = {
\ 'on_stderr': function('OnEvent'),
\ 'on_exit': function('OnEvent'),
\ 'rpc': v:true,
\ }
]])
meths.set_var("nvim_prog", nvim_prog)
meths.set_var("code", [[
let id = stdioopen({'rpc':v:true})
call rpcnotify(id,"nvim_call_function", "rpcnotify", [1, "message", "hi there!", id])
call chansend(v:stderr, "trouble!")
]])
command("let id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)")
eq({"notification", "message", {"hi there!", 1}}, next_msg())
eq({"notification", "stderr", {3, {"trouble!"}}}, next_msg())
eq(30, eval("rpcrequest(id, 'nvim_eval', '[chansend(v:stderr, \"math??\"), 5*6][1]')"))
eq({"notification", "stderr", {3, {"math??"}}}, next_msg())
local _, err = pcall(command,"call rpcrequest(id, 'nvim_command', 'call chanclose(v:stderr, \"stdin\")')")
ok(string.find(err,"E906: invalid stream for channel") ~= nil)
eq(1, eval("rpcrequest(id, 'nvim_eval', 'chanclose(v:stderr, \"stderr\")')"))
eq({"notification", "stderr", {3, {""}}}, next_msg())
command("call rpcnotify(id, 'nvim_command', 'quit')")
eq({"notification", "exit", {3, 0}}, next_msg())
end)
it('can use buffered output mode', function()
if helpers.pending_win32(pending) then return end
source([[
let g:job_opts = {
\ 'on_stdout': function('OnEvent'),
\ 'on_exit': function('OnEvent'),
\ 'stdout_buffered': v:true,
\ }
]])
command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
local id = eval("g:id")
command([[call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")]])
sleep(10)
command([[call chansend(id, "xx\n20 GOTO 10\nzz\n")]])
command("call chanclose(id, 'stdin')")
eq({"notification", "stdout", {id, {'10 PRINT "NVIM"',
'20 GOTO 10', ''}}}, next_msg())
eq({"notification", "exit", {id, 0}}, next_msg())
command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
id = eval("g:id")
command([[call chansend(id, "is no number\nnot at all")]])
command("call chanclose(id, 'stdin')")
-- works correctly with no output
eq({"notification", "stdout", {id, {''}}}, next_msg())
eq({"notification", "exit", {id, 1}}, next_msg())
end)
it('can use buffered output mode with no stream callback', function()
if helpers.pending_win32(pending) then return end
source([[
function! OnEvent(id, data, event) dict
call rpcnotify(1, a:event, a:id, a:data, self.stdout)
endfunction
let g:job_opts = {
\ 'on_exit': function('OnEvent'),
\ 'stdout_buffered': v:true,
\ }
]])
command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
local id = eval("g:id")
command([[call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")]])
sleep(10)
command([[call chansend(id, "xx\n20 GOTO 10\nzz\n")]])
command("call chanclose(id, 'stdin')")
eq({"notification", "exit", {id, 0, {'10 PRINT "NVIM"',
'20 GOTO 10', ''}}}, next_msg())
-- reset dictionary
source([[
let g:job_opts = {
\ 'on_exit': function('OnEvent'),
\ 'stdout_buffered': v:true,
\ }
]])
command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)")
id = eval("g:id")
command([[call chansend(id, "is no number\nnot at all")]])
command("call chanclose(id, 'stdin')")
-- works correctly with no output
eq({"notification", "exit", {id, 1, {''}}}, next_msg())
end)
end)

View File

@ -10,6 +10,7 @@ local wait = helpers.wait
local iswin = helpers.iswin
local get_pathsep = helpers.get_pathsep
local nvim_set = helpers.nvim_set
local expect_twostreams = helpers.expect_twostreams
local Screen = require('test.functional.ui.screen')
describe('jobs', function()
@ -29,15 +30,14 @@ describe('jobs', function()
\ ? map(a:data, 'substitute(v:val, "\r", "", "g")')
\ : a:data
endfunction
function! s:OnEvent(id, data, event) dict
function! OnEvent(id, data, event) dict
let userdata = get(self, 'user')
let data = Normalize(a:data)
call rpcnotify(g:channel, a:event, userdata, data)
endfunction
let g:job_opts = {
\ 'on_stdout': function('s:OnEvent'),
\ 'on_stderr': function('s:OnEvent'),
\ 'on_exit': function('s:OnEvent'),
\ 'on_stdout': function('OnEvent'),
\ 'on_exit': function('OnEvent'),
\ 'user': 0
\ }
]])
@ -51,6 +51,7 @@ describe('jobs', function()
nvim('command', "let j = jobstart('echo $VAR', g:job_opts)")
end
eq({'notification', 'stdout', {0, {'abc', ''}}}, next_msg())
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -63,6 +64,7 @@ describe('jobs', function()
end
eq({'notification', 'stdout',
{0, {(iswin() and [[C:\]] or '/'), ''}}}, next_msg())
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -76,6 +78,7 @@ describe('jobs', function()
nvim('command', "let j = jobstart('pwd', g:job_opts)")
end
eq({'notification', 'stdout', {0, {dir, ''}}}, next_msg())
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
rmdir(dir)
end)
@ -118,8 +121,12 @@ describe('jobs', function()
it('invokes callbacks when the job writes and exits', function()
-- TODO: hangs on Windows
if helpers.pending_win32(pending) then return end
nvim('command', "let g:job_opts.on_stderr = function('OnEvent')")
nvim('command', "call jobstart('echo', g:job_opts)")
eq({'notification', 'stdout', {0, {'', ''}}}, next_msg())
expect_twostreams({{'notification', 'stdout', {0, {'', ''}}},
{'notification', 'stdout', {0, {''}}}},
{{'notification', 'stderr', {0, {''}}}})
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -134,6 +141,7 @@ describe('jobs', function()
nvim('command', 'call jobsend(j, [123, "xyz", ""])')
eq({'notification', 'stdout', {0, {'123', 'xyz', ''}}}, next_msg())
nvim('command', "call jobstop(j)")
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -145,6 +153,7 @@ describe('jobs', function()
nvim('command', "let j = jobstart(['cat', '"..filename.."'], g:job_opts)")
eq({'notification', 'stdout', {0, {'abc\ndef', ''}}}, next_msg())
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
os.remove(filename)
@ -168,6 +177,7 @@ describe('jobs', function()
nvim('command', 'call jobsend(j, "abc\\nxyz")')
eq({'notification', 'stdout', {0, {'abc', 'xyz'}}}, next_msg())
nvim('command', "call jobstop(j)")
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -186,6 +196,7 @@ describe('jobs', function()
eq({'notification', 'stdout', {0, {'\n123\n', 'abc\nxyz\n', ''}}},
next_msg())
nvim('command', "call jobstop(j)")
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -196,6 +207,7 @@ describe('jobs', function()
eq({'notification', 'stdout', {0, {'some data', 'without\nfinal nl'}}},
next_msg())
nvim('command', "call jobstop(j)")
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -203,6 +215,7 @@ describe('jobs', function()
if helpers.pending_win32(pending) then return end -- TODO: Need `cat`.
nvim('command', "let j = jobstart(['cat', '-'], g:job_opts)")
nvim('command', 'call jobclose(j, "stdin")')
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
end)
@ -239,6 +252,7 @@ describe('jobs', function()
local pid = eval('jobpid(j)')
eq(0,os.execute('ps -p '..pid..' > /dev/null'))
nvim('command', 'call jobstop(j)')
eq({'notification', 'stdout', {0, {''}}}, next_msg())
eq({'notification', 'exit', {0, 0}}, next_msg())
neq(0,os.execute('ps -p '..pid..' > /dev/null'))
end)
@ -270,6 +284,7 @@ describe('jobs', function()
nvim('command', [[call jobstart('echo "foo"', g:job_opts)]])
local data = {n = 5, s = 'str', l = {1}}
eq({'notification', 'stdout', {data, {'foo', ''}}}, next_msg())
eq({'notification', 'stdout', {data, {''}}}, next_msg())
eq({'notification', 'exit', {data, 0}}, next_msg())
end)
@ -283,7 +298,6 @@ describe('jobs', function()
it('can omit data callbacks', function()
nvim('command', 'unlet g:job_opts.on_stdout')
nvim('command', 'unlet g:job_opts.on_stderr')
nvim('command', 'let g:job_opts.user = 5')
nvim('command', [[call jobstart('echo "foo"', g:job_opts)]])
eq({'notification', 'exit', {5, 0}}, next_msg())
@ -294,11 +308,13 @@ describe('jobs', function()
nvim('command', 'let g:job_opts.user = 5')
nvim('command', [[call jobstart('echo "foo"', g:job_opts)]])
eq({'notification', 'stdout', {5, {'foo', ''}}}, next_msg())
eq({'notification', 'stdout', {5, {''}}}, next_msg())
end)
it('will pass return code with the exit event', function()
nvim('command', 'let g:job_opts.user = 5')
nvim('command', "call jobstart('exit 55', g:job_opts)")
eq({'notification', 'stdout', {5, {''}}}, next_msg())
eq({'notification', 'exit', {5, 55}}, next_msg())
end)
@ -341,6 +357,14 @@ describe('jobs', function()
end)
it('requires funcrefs for script-local (s:) functions', function()
local screen = Screen.new(60, 5)
screen:attach()
screen:set_default_attr_ids({
[1] = {bold = true, foreground = Screen.colors.Blue1},
[2] = {foreground = Screen.colors.Grey100, background = Screen.colors.Red},
[3] = {bold = true, foreground = Screen.colors.SeaGreen4}
})
-- Pass job callback names _without_ `function(...)`.
source([[
function! s:OnEvent(id, data, event) dict
@ -350,14 +374,10 @@ describe('jobs', function()
\ 'on_stdout': 's:OnEvent',
\ 'on_stderr': 's:OnEvent',
\ 'on_exit': 's:OnEvent',
\ 'user': 2349
\ })
]])
-- The behavior is asynchronous, retry until a time limit.
helpers.retry(nil, 10000, function()
eq("E120:", string.match(eval("v:errmsg"), "E%d*:"))
end)
screen:expect("{2:E120: Using <SID> not in a script context: s:OnEvent}",nil,nil,nil,true)
end)
it('does not repeat output with slow output handlers', function()
@ -376,7 +396,7 @@ describe('jobs', function()
call jobwait([jobstart(cmd, d)])
call rpcnotify(g:channel, 'data', d.data)
]])
eq({'notification', 'data', {{{'1', ''}, {'2', ''}, {'3', ''}, {'4', ''}, {'5', ''}}}}, next_msg())
eq({'notification', 'data', {{{'1', ''}, {'2', ''}, {'3', ''}, {'4', ''}, {'5', ''}, {''}}}}, next_msg())
end)
it('jobstart() works with partial functions', function()
@ -497,7 +517,8 @@ describe('jobs', function()
elseif self.state == 2
let self.state = 3
call jobsend(a:id, "line3\n")
else
elseif self.state == 3
let self.state = 4
call rpcnotify(g:channel, 'w', printf('job %d closed', self.counter))
call jobclose(a:id, 'stdin')
endif
@ -552,6 +573,7 @@ describe('jobs', function()
-- FIXME need to wait until jobsend succeeds before calling jobstop
pending('will only emit the "exit" event after "stdout" and "stderr"', function()
nvim('command', "let g:job_opts.on_stderr = function('s:OnEvent')")
nvim('command', "let j = jobstart(['cat', '-'], g:job_opts)")
local jobid = nvim('eval', 'j')
nvim('eval', 'jobsend(j, "abcdef")')
@ -638,7 +660,7 @@ describe('jobs', function()
-- there won't be any more messages, and the test would hang.
helpers.sleep(100)
local err = exc_exec('call jobpid(j)')
eq('Vim(call):E900: Invalid job id', err)
eq('Vim(call):E900: Invalid channel id', err)
-- cleanup
eq(other_pid, eval('jobpid(' .. other_jobid .. ')'))

View File

@ -100,6 +100,22 @@ local function next_message()
return session:next_message()
end
local function expect_twostreams(msgs1, msgs2)
local pos1, pos2 = 1, 1
while pos1 <= #msgs1 or pos2 <= #msgs2 do
local msg = next_message()
if pos1 <= #msgs1 and pcall(eq, msgs1[pos1], msg) then
pos1 = pos1 + 1
elseif pos2 <= #msgs2 then
eq(msgs2[pos2], msg)
pos2 = pos2 + 1
else
-- already failed, but show the right error message
eq(msgs1[pos1], msg)
end
end
end
local function call_and_stop_on_error(...)
local status, result = copcall(...) -- luacheck: ignore
if not status then
@ -618,6 +634,33 @@ local function alter_slashes(obj)
end
end
local function hexdump(str)
local len = string.len( str )
local dump = ""
local hex = ""
local asc = ""
for i = 1, len do
if 1 == i % 8 then
dump = dump .. hex .. asc .. "\n"
hex = string.format( "%04x: ", i - 1 )
asc = ""
end
local ord = string.byte( str, i )
hex = hex .. string.format( "%02x ", ord )
if ord >= 32 and ord <= 126 then
asc = asc .. string.char( ord )
else
asc = asc .. "."
end
end
return dump .. hex
.. string.rep( " ", 8 - len % 8 ) .. asc
end
local module = {
prepend_argv = prepend_argv,
clear = clear,
@ -636,6 +679,7 @@ local module = {
command = nvim_command,
request = request,
next_message = next_message,
expect_twostreams = expect_twostreams,
run = run,
stop = stop,
eq = eq,
@ -687,6 +731,7 @@ local module = {
get_pathsep = get_pathsep,
missing_provider = missing_provider,
alter_slashes = alter_slashes,
hexdump = hexdump,
}
return function(after_each)

View File

@ -225,21 +225,8 @@ local function check_cores(app, force)
local esigns = ('='):rep(len / 2)
out:write(('\n%s Core file %s %s\n'):format(esigns, core, esigns))
out:flush()
local pipe = io.popen(
db_cmd:gsub('%$_NVIM_TEST_APP', app):gsub('%$_NVIM_TEST_CORE', core)
.. ' 2>&1', 'r')
if pipe then
local bt = pipe:read('*a')
if bt then
out:write(bt)
out:write('\n')
else
out:write('Failed to read from the pipe\n')
end
else
out:write('Failed to create pipe\n')
end
out:flush()
os.execute(db_cmd:gsub('%$_NVIM_TEST_APP', app):gsub('%$_NVIM_TEST_CORE', core) .. ' 2>&1')
out:write('\n')
found_cores = found_cores + 1
os.remove(core)
end