Revert "refactor: per-worker resource table (#3306)"

This patch does not work with the recent bundler changes (#3325).
Unfortunately I didn't merge master before landing this patch. It has
something to do with console.log not working inside the compiler worker.

This reverts commit fd62379eaf.
This commit is contained in:
Ryan Dahl 2019-11-13 23:14:48 -05:00 committed by Ry Dahl
parent fd62379eaf
commit fdf0ede2ac
14 changed files with 336 additions and 358 deletions

View File

@ -43,6 +43,7 @@ pub mod permissions;
mod progress;
mod repl;
pub mod resolve_addr;
pub mod resources;
mod shell;
mod signal;
pub mod source_maps;
@ -56,7 +57,6 @@ pub mod worker;
use crate::deno_error::js_check;
use crate::deno_error::print_err_and_exit;
use crate::global_state::ThreadSafeGlobalState;
use crate::ops::io::get_stdio;
use crate::progress::Progress;
use crate::state::ThreadSafeState;
use crate::worker::Worker;
@ -128,15 +128,6 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();
let state_ = state.clone();
{
let mut resource_table = state_.lock_resource_table();
let (stdin, stdout, stderr) = get_stdio();
resource_table.add("stdin", Box::new(stdin));
resource_table.add("stdout", Box::new(stdout));
resource_table.add("stderr", Box::new(stderr));
}
let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),

View File

@ -15,6 +15,7 @@ use deno::PinnedBuf;
use futures::Future;
pub type MinimalOp = dyn Future<Item = i32, Error = ErrBox> + Send;
pub type Dispatcher = fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>;
#[derive(Copy, Clone, Debug, PartialEq)]
// This corresponds to RecordMinimal on the TS side.
@ -111,10 +112,9 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None);
}
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp
where
D: Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>,
{
pub fn minimal_op(
d: Dispatcher,
) -> impl Fn(&[u8], Option<PinnedBuf>) -> CoreOp {
move |control: &[u8], zero_copy: Option<PinnedBuf>| {
let mut record = match parse_min_record(control) {
Some(r) => r,

View File

@ -1,9 +1,8 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::http_body::HttpBody;
use crate::http_util::get_client;
use crate::ops::json_op;
use crate::resources;
use crate::state::ThreadSafeState;
use deno::*;
use http::header::HeaderName;
@ -55,7 +54,6 @@ pub fn op_fetch(
request = request.header(name, v);
}
debug!("Before fetch {}", url);
let state_ = state.clone();
let future = request.send().map_err(ErrBox::from).and_then(move |res| {
let status = res.status();
let mut res_headers = Vec::new();
@ -63,9 +61,8 @@ pub fn op_fetch(
res_headers.push((key.to_string(), val.to_str().unwrap().to_owned()));
}
let body = HttpBody::from(res.into_body());
let mut table = state_.lock_resource_table();
let rid = table.add("httpBody", Box::new(StreamResource::HttpBody(body)));
let body = res.into_body();
let rid = resources::add_reqwest_body(body);
let json_res = json!({
"bodyRid": rid,

View File

@ -1,11 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::fs as deno_fs;
use crate::ops::json_op;
use crate::resources;
use crate::resources::CliResource;
use crate::state::ThreadSafeState;
use deno::*;
use futures::Future;
@ -37,7 +38,7 @@ fn op_open(
let args: OpenArgs = serde_json::from_value(args)?;
let (filename, filename_) = deno_fs::resolve_from_cwd(&args.filename)?;
let mode = args.mode.as_ref();
let state_ = state.clone();
let mut open_options = tokio::fs::OpenOptions::new();
match mode {
@ -90,8 +91,7 @@ fn op_open(
let is_sync = args.promise_id.is_none();
let op = open_options.open(filename).map_err(ErrBox::from).and_then(
move |fs_file| {
let mut table = state_.lock_resource_table();
let rid = table.add("fsFile", Box::new(StreamResource::FsFile(fs_file)));
let rid = resources::add_fs_file(fs_file);
futures::future::ok(json!(rid))
},
);
@ -110,21 +110,21 @@ struct CloseArgs {
}
fn op_close(
state: &ThreadSafeState,
_state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: CloseArgs = serde_json::from_value(args)?;
let mut table = state.lock_resource_table();
let mut table = resources::lock_resource_table();
table.close(args.rid as u32).ok_or_else(bad_resource)?;
Ok(JsonOp::Sync(json!({})))
}
#[derive(Debug)]
pub struct SeekFuture {
seek_from: SeekFrom,
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for SeekFuture {
@ -132,13 +132,13 @@ impl Future for SeekFuture {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = self.state.lock_resource_table();
let mut table = resources::lock_resource_table();
let resource = table
.get_mut::<StreamResource>(self.rid)
.get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
let tokio_file = match resource {
StreamResource::FsFile(ref mut file) => file,
CliResource::FsFile(ref mut file) => file,
_ => return Err(bad_resource()),
};
@ -156,7 +156,7 @@ struct SeekArgs {
}
fn op_seek(
state: &ThreadSafeState,
_state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@ -177,11 +177,7 @@ fn op_seek(
}
};
let fut = SeekFuture {
state: state.clone(),
seek_from,
rid,
};
let fut = SeekFuture { seek_from, rid };
let op = fut.and_then(move |_| futures::future::ok(json!({})));
if args.promise_id.is_none() {

View File

@ -1,101 +1,19 @@
use super::dispatch_minimal::MinimalOp;
use crate::deno_error;
use crate::deno_error::bad_resource;
use crate::http_body::HttpBody;
use crate::ops::minimal_op;
use crate::resources;
use crate::resources::CliResource;
use crate::resources::DenoAsyncRead;
use crate::resources::DenoAsyncWrite;
use crate::state::ThreadSafeState;
use deno::ErrBox;
use deno::Resource;
use deno::*;
use futures;
use futures::Future;
use futures::Poll;
use std;
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
#[cfg(not(windows))]
use std::os::unix::io::FromRawFd;
#[cfg(windows)]
use std::os::windows::io::FromRawHandle;
#[cfg(windows)]
extern crate winapi;
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op(
"read",
s.core_op(minimal_op(s.stateful_minimal_op(op_read))),
);
i.register_op(
"write",
s.core_op(minimal_op(s.stateful_minimal_op(op_write))),
);
}
pub fn get_stdio() -> (StreamResource, StreamResource, StreamResource) {
let stdin = StreamResource::Stdin(tokio::io::stdin());
let stdout = StreamResource::Stdout({
#[cfg(not(windows))]
let stdout = unsafe { std::fs::File::from_raw_fd(1) };
#[cfg(windows)]
let stdout = unsafe {
std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
winapi::um::winbase::STD_OUTPUT_HANDLE,
))
};
tokio::fs::File::from_std(stdout)
});
let stderr = StreamResource::Stderr(tokio::io::stderr());
(stdin, stdout, stderr)
}
pub enum StreamResource {
Stdin(tokio::io::Stdin),
Stdout(tokio::fs::File),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File),
TcpStream(tokio::net::TcpStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(HttpBody),
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
}
impl Resource for StreamResource {}
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
}
impl DenoAsyncRead for StreamResource {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
let r = match self {
StreamResource::FsFile(ref mut f) => f.poll_read(buf),
StreamResource::Stdin(ref mut f) => f.poll_read(buf),
StreamResource::TcpStream(ref mut f) => f.poll_read(buf),
StreamResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
StreamResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
StreamResource::HttpBody(ref mut f) => f.poll_read(buf),
StreamResource::ChildStdout(ref mut f) => f.poll_read(buf),
StreamResource::ChildStderr(ref mut f) => f.poll_read(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
i.register_op("read", s.core_op(minimal_op(op_read)));
i.register_op("write", s.core_op(minimal_op(op_write)));
}
#[derive(Debug, PartialEq)]
@ -109,15 +27,14 @@ enum IoState {
///
/// The returned future will resolve to both the I/O stream and the buffer
/// as well as the number of bytes read once the read operation is completed.
pub fn read<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Read<T>
pub fn read<T>(rid: ResourceId, buf: T) -> Read<T>
where
T: AsMut<[u8]>,
{
Read {
rid,
buf,
io_state: IoState::Pending,
state: state.clone(),
state: IoState::Pending,
}
}
@ -125,11 +42,11 @@ where
/// a buffer.
///
/// Created by the [`read`] function.
#[derive(Debug)]
pub struct Read<T> {
rid: ResourceId,
buf: T,
io_state: IoState,
state: ThreadSafeState,
state: IoState,
}
impl<T> Future for Read<T>
@ -140,25 +57,21 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.io_state == IoState::Done {
if self.state == IoState::Done {
panic!("poll a Read after it's done");
}
let mut table = self.state.lock_resource_table();
let mut table = resources::lock_resource_table();
let resource = table
.get_mut::<StreamResource>(self.rid)
.get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
let nread = try_ready!(resource.poll_read(&mut self.buf.as_mut()[..]));
self.io_state = IoState::Done;
self.state = IoState::Done;
Ok(nread.into())
}
}
pub fn op_read(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
) -> Box<MinimalOp> {
pub fn op_read(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("read rid={}", rid);
let zero_copy = match zero_copy {
None => {
@ -167,50 +80,19 @@ pub fn op_read(
Some(buf) => buf,
};
let fut = read(state, rid as u32, zero_copy)
let fut = read(rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nread| Ok(nread as i32));
Box::new(fut)
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
fn shutdown(&mut self) -> Poll<(), ErrBox>;
}
impl DenoAsyncWrite for StreamResource {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
let r = match self {
StreamResource::FsFile(ref mut f) => f.poll_write(buf),
StreamResource::Stdout(ref mut f) => f.poll_write(buf),
StreamResource::Stderr(ref mut f) => f.poll_write(buf),
StreamResource::TcpStream(ref mut f) => f.poll_write(buf),
StreamResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
StreamResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
StreamResource::ChildStdin(ref mut f) => f.poll_write(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
unimplemented!()
}
}
/// A future used to write some data to a stream.
#[derive(Debug)]
pub struct Write<T> {
rid: ResourceId,
buf: T,
io_state: IoState,
state: ThreadSafeState,
state: IoState,
}
/// Creates a future that will write some of the buffer `buf` to
@ -218,15 +100,14 @@ pub struct Write<T> {
///
/// Any error which happens during writing will cause both the stream and the
/// buffer to get destroyed.
pub fn write<T>(state: &ThreadSafeState, rid: ResourceId, buf: T) -> Write<T>
pub fn write<T>(rid: ResourceId, buf: T) -> Write<T>
where
T: AsRef<[u8]>,
{
Write {
rid,
buf,
io_state: IoState::Pending,
state: state.clone(),
state: IoState::Pending,
}
}
@ -240,25 +121,21 @@ where
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.io_state == IoState::Done {
if self.state == IoState::Done {
panic!("poll a Read after it's done");
}
let mut table = self.state.lock_resource_table();
let mut table = resources::lock_resource_table();
let resource = table
.get_mut::<StreamResource>(self.rid)
.get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
let nwritten = try_ready!(resource.poll_write(self.buf.as_ref()));
self.io_state = IoState::Done;
self.state = IoState::Done;
Ok(nwritten.into())
}
}
pub fn op_write(
state: &ThreadSafeState,
rid: i32,
zero_copy: Option<PinnedBuf>,
) -> Box<MinimalOp> {
pub fn op_write(rid: i32, zero_copy: Option<PinnedBuf>) -> Box<MinimalOp> {
debug!("write rid={}", rid);
let zero_copy = match zero_copy {
None => {
@ -267,7 +144,7 @@ pub fn op_write(
Some(buf) => buf,
};
let fut = write(state, rid as u32, zero_copy)
let fut = write(rid as u32, zero_copy)
.map_err(ErrBox::from)
.and_then(move |nwritten| Ok(nwritten as i32));

View File

@ -5,7 +5,6 @@ mod dispatch_minimal;
pub use dispatch_json::json_op;
pub use dispatch_json::JsonOp;
pub use dispatch_minimal::minimal_op;
pub use dispatch_minimal::MinimalOp;
pub mod compiler;
pub mod errors;

View File

@ -1,11 +1,12 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
use crate::resources;
use crate::resources::CliResource;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@ -33,19 +34,18 @@ enum AcceptState {
}
/// Simply accepts a connection.
pub fn accept(state: &ThreadSafeState, rid: ResourceId) -> Accept {
pub fn accept(rid: ResourceId) -> Accept {
Accept {
accept_state: AcceptState::Eager,
state: AcceptState::Eager,
rid,
state: state.clone(),
}
}
/// A future representing state of accepting a TCP connection.
#[derive(Debug)]
pub struct Accept {
accept_state: AcceptState,
state: AcceptState,
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for Accept {
@ -53,11 +53,11 @@ impl Future for Accept {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.accept_state == AcceptState::Done {
if self.state == AcceptState::Done {
panic!("poll Accept after it's done");
}
let mut table = self.state.lock_resource_table();
let mut table = resources::lock_resource_table();
let listener_resource = table
.get_mut::<TcpListenerResource>(self.rid)
.ok_or_else(|| {
@ -70,22 +70,22 @@ impl Future for Accept {
let listener = &mut listener_resource.listener;
if self.accept_state == AcceptState::Eager {
if self.state == AcceptState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
self.accept_state = AcceptState::Done;
self.state = AcceptState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
self.accept_state = AcceptState::Pending;
self.state = AcceptState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
self.accept_state = AcceptState::Done;
self.state = AcceptState::Done;
return Err(e);
}
}
@ -94,7 +94,7 @@ impl Future for Accept {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
self.accept_state = AcceptState::Done;
self.state = AcceptState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@ -103,7 +103,7 @@ impl Future for Accept {
}
Err(e) => {
listener_resource.untrack_task();
self.accept_state = AcceptState::Done;
self.state = AcceptState::Done;
Err(e)
}
}
@ -116,25 +116,23 @@ struct AcceptArgs {
}
fn op_accept(
state: &ThreadSafeState,
_state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state_ = state.clone();
let table = state.lock_resource_table();
let table = resources::lock_resource_table();
table
.get::<TcpListenerResource>(rid)
.ok_or_else(bad_resource)?;
let op = accept(state, rid)
let op = accept(rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut table = state_.lock_resource_table();
let rid =
table.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = resources::add_tcp_stream(tcp_stream);
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@ -163,7 +161,7 @@ fn op_dial(
) -> Result<JsonOp, ErrBox> {
let args: DialArgs = serde_json::from_value(args)?;
assert_eq!(args.transport, "tcp"); // TODO Support others.
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
let op = resolve_addr(&args.hostname, args.port).and_then(move |addr| {
@ -172,9 +170,7 @@ fn op_dial(
.and_then(move |tcp_stream| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut table = state_.lock_resource_table();
let rid = table
.add("tcpStream", Box::new(StreamResource::TcpStream(tcp_stream)));
let rid = resources::add_tcp_stream(tcp_stream);
Ok((rid, local_addr, remote_addr))
})
.map_err(ErrBox::from)
@ -197,7 +193,7 @@ struct ShutdownArgs {
}
fn op_shutdown(
state: &ThreadSafeState,
_state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@ -212,12 +208,10 @@ fn op_shutdown(
_ => unimplemented!(),
};
let mut table = state.lock_resource_table();
let resource = table
.get_mut::<StreamResource>(rid)
.ok_or_else(bad_resource)?;
let mut table = resources::lock_resource_table();
let resource = table.get_mut::<CliResource>(rid).ok_or_else(bad_resource)?;
match resource {
StreamResource::TcpStream(ref mut stream) => {
CliResource::TcpStream(ref mut stream) => {
TcpStream::shutdown(stream, shutdown_mode).map_err(ErrBox::from)?;
}
_ => return Err(bad_resource()),
@ -305,7 +299,7 @@ fn op_listen(
task: None,
local_addr,
};
let mut table = state.lock_resource_table();
let mut table = resources::lock_resource_table();
let rid = table.add("tcpListener", Box::new(listener_resource));
Ok(JsonOp::Sync(json!({

View File

@ -1,8 +1,9 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::resources;
use crate::resources::CloneFileFuture;
use crate::signal::kill;
use crate::state::ThreadSafeState;
use deno::*;
@ -27,41 +28,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
i.register_op("kill", s.core_op(json_op(s.stateful_op(op_kill))));
}
struct CloneFileFuture {
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for CloneFileFuture {
type Item = tokio::fs::File;
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = self.state.lock_resource_table();
let repr = table
.get_mut::<StreamResource>(self.rid)
.ok_or_else(bad_resource)?;
match repr {
StreamResource::FsFile(ref mut file) => {
file.poll_try_clone().map_err(ErrBox::from)
}
_ => Err(bad_resource()),
}
}
}
fn clone_file(
rid: u32,
state: &ThreadSafeState,
) -> Result<std::fs::File, ErrBox> {
(CloneFileFuture {
rid,
state: state.clone(),
})
.wait()
.map(|f| f.into_std())
}
fn subprocess_stdio_map(s: &str) -> std::process::Stdio {
match s {
"inherit" => std::process::Stdio::inherit(),
@ -99,7 +65,6 @@ fn op_run(
let run_args: RunArgs = serde_json::from_value(args)?;
state.check_run()?;
let state_ = state.clone();
let args = run_args.args;
let env = run_args.env;
@ -118,7 +83,7 @@ fn op_run(
// TODO: make this work with other resources, eg. sockets
let stdin_rid = run_args.stdin_rid;
if stdin_rid > 0 {
let file = clone_file(stdin_rid, &state_)?;
let file = (CloneFileFuture { rid: stdin_rid }).wait()?.into_std();
c.stdin(file);
} else {
c.stdin(subprocess_stdio_map(run_args.stdin.as_ref()));
@ -126,7 +91,7 @@ fn op_run(
let stdout_rid = run_args.stdout_rid;
if stdout_rid > 0 {
let file = clone_file(stdout_rid, &state_)?;
let file = (CloneFileFuture { rid: stdout_rid }).wait()?.into_std();
c.stdout(file);
} else {
c.stdout(subprocess_stdio_map(run_args.stdout.as_ref()));
@ -134,7 +99,7 @@ fn op_run(
let stderr_rid = run_args.stderr_rid;
if stderr_rid > 0 {
let file = clone_file(stderr_rid, &state_)?;
let file = (CloneFileFuture { rid: stderr_rid }).wait()?.into_std();
c.stderr(file);
} else {
c.stderr(subprocess_stdio_map(run_args.stderr.as_ref()));
@ -144,42 +109,29 @@ fn op_run(
let mut child = c.spawn_async().map_err(ErrBox::from)?;
let pid = child.id();
let mut table = state_.lock_resource_table();
let stdin_rid = match child.stdin().take() {
Some(child_stdin) => {
let rid = table.add(
"childStdin",
Box::new(StreamResource::ChildStdin(child_stdin)),
);
Some(rid)
}
None => None,
let stdin_rid = if child.stdin().is_some() {
let rid = resources::add_child_stdin(child.stdin().take().unwrap());
Some(rid)
} else {
None
};
let stdout_rid = match child.stdout().take() {
Some(child_stdout) => {
let rid = table.add(
"childStdout",
Box::new(StreamResource::ChildStdout(child_stdout)),
);
Some(rid)
}
None => None,
let stdout_rid = if child.stdout().is_some() {
let rid = resources::add_child_stdout(child.stdout().take().unwrap());
Some(rid)
} else {
None
};
let stderr_rid = match child.stderr().take() {
Some(child_stderr) => {
let rid = table.add(
"childStderr",
Box::new(StreamResource::ChildStderr(child_stderr)),
);
Some(rid)
}
None => None,
let stderr_rid = if child.stderr().is_some() {
let rid = resources::add_child_stderr(child.stderr().take().unwrap());
Some(rid)
} else {
None
};
let child_resource = ChildResource { child };
let mut table = resources::lock_resource_table();
let child_rid = table.add("child", Box::new(child_resource));
Ok(JsonOp::Sync(json!({
@ -193,7 +145,6 @@ fn op_run(
pub struct ChildStatus {
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for ChildStatus {
@ -201,7 +152,7 @@ impl Future for ChildStatus {
type Error = ErrBox;
fn poll(&mut self) -> Poll<ExitStatus, ErrBox> {
let mut table = self.state.lock_resource_table();
let mut table = resources::lock_resource_table();
let child_resource = table
.get_mut::<ChildResource>(self.rid)
.ok_or_else(bad_resource)?;
@ -226,10 +177,7 @@ fn op_run_status(
state.check_run()?;
let future = ChildStatus {
rid,
state: state.clone(),
};
let future = ChildStatus { rid };
let future = future.and_then(move |run_status| {
let code = run_status.code();

View File

@ -4,8 +4,9 @@ use crate::deno_error::bad_resource;
use crate::ops::json_op;
use crate::repl;
use crate::repl::Repl;
use crate::resources;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use std::sync::Arc;
use std::sync::Mutex;
@ -43,7 +44,7 @@ fn op_repl_start(
repl::history_path(&state.global_state.dir, &args.history_file);
let repl = repl::Repl::new(history_path);
let resource = ReplResource(Arc::new(Mutex::new(repl)));
let mut table = state.lock_resource_table();
let mut table = resources::lock_resource_table();
let rid = table.add("repl", Box::new(resource));
Ok(JsonOp::Sync(json!(rid)))
}
@ -55,7 +56,7 @@ struct ReplReadlineArgs {
}
fn op_repl_readline(
state: &ThreadSafeState,
_state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
@ -63,10 +64,9 @@ fn op_repl_readline(
let rid = args.rid as u32;
let prompt = args.prompt;
debug!("op_repl_readline {} {}", rid, prompt);
let state = state.clone();
blocking_json(false, move || {
let table = state.lock_resource_table();
let table = resources::lock_resource_table();
let resource = table.get::<ReplResource>(rid).ok_or_else(bad_resource)?;
let repl = resource.0.clone();
let line = repl.lock().unwrap().readline(&prompt)?;

View File

@ -1,6 +1,7 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{JsonOp, Value};
use crate::ops::json_op;
use crate::resources::lock_resource_table;
use crate::state::ThreadSafeState;
use deno::*;
@ -9,11 +10,11 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
}
fn op_resources(
state: &ThreadSafeState,
_state: &ThreadSafeState,
_args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let resource_table = state.lock_resource_table();
let resource_table = lock_resource_table();
let serialized_resources = resource_table.entries();
Ok(JsonOp::Sync(json!(serialized_resources)))
}

View File

@ -1,13 +1,13 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use super::io::StreamResource;
use crate::deno_error::bad_resource;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
use crate::ops::json_op;
use crate::resolve_addr::resolve_addr;
use crate::resources;
use crate::resources::Resource;
use crate::state::ThreadSafeState;
use deno::Resource;
use deno::*;
use futures::Async;
use futures::Future;
@ -60,7 +60,7 @@ pub fn op_dial_tls(
) -> Result<JsonOp, ErrBox> {
let args: DialTLSArgs = serde_json::from_value(args)?;
let cert_file = args.cert_file;
let state_ = state.clone();
state.check_net(&args.hostname, args.port)?;
if let Some(path) = cert_file.clone() {
state.check_read(&path)?;
@ -99,11 +99,7 @@ pub fn op_dial_tls(
.connect(dnsname, tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
let mut table = state_.lock_resource_table();
let rid = table.add(
"clientTlsStream",
Box::new(StreamResource::ClientTlsStream(Box::new(tls_stream))),
);
let rid = resources::add_tls_stream(tls_stream);
futures::future::ok(json!({
"rid": rid,
"localAddr": local_addr.to_string(),
@ -269,7 +265,7 @@ fn op_listen_tls(
task: None,
local_addr,
};
let mut table = state.lock_resource_table();
let mut table = resources::lock_resource_table();
let rid = table.add("tlsListener", Box::new(tls_listener_resource));
Ok(JsonOp::Sync(json!({
@ -286,19 +282,18 @@ enum AcceptTlsState {
}
/// Simply accepts a TLS connection.
pub fn accept_tls(state: &ThreadSafeState, rid: ResourceId) -> AcceptTls {
pub fn accept_tls(rid: ResourceId) -> AcceptTls {
AcceptTls {
accept_state: AcceptTlsState::Eager,
state: AcceptTlsState::Eager,
rid,
state: state.clone(),
}
}
/// A future representing state of accepting a TLS connection.
#[derive(Debug)]
pub struct AcceptTls {
accept_state: AcceptTlsState,
state: AcceptTlsState,
rid: ResourceId,
state: ThreadSafeState,
}
impl Future for AcceptTls {
@ -306,11 +301,11 @@ impl Future for AcceptTls {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.accept_state == AcceptTlsState::Done {
if self.state == AcceptTlsState::Done {
panic!("poll AcceptTls after it's done");
}
let mut table = self.state.lock_resource_table();
let mut table = resources::lock_resource_table();
let listener_resource = table
.get_mut::<TlsListenerResource>(self.rid)
.ok_or_else(|| {
@ -323,22 +318,22 @@ impl Future for AcceptTls {
let listener = &mut listener_resource.listener;
if self.accept_state == AcceptTlsState::Eager {
if self.state == AcceptTlsState::Eager {
// Similar to try_ready!, but also track/untrack accept task
// in TcpListener resource.
// In this way, when the listener is closed, the task can be
// notified to error out (instead of stuck forever).
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
self.accept_state = AcceptTlsState::Done;
self.state = AcceptTlsState::Done;
return Ok((stream, addr).into());
}
Ok(Async::NotReady) => {
self.accept_state = AcceptTlsState::Pending;
self.state = AcceptTlsState::Pending;
return Ok(Async::NotReady);
}
Err(e) => {
self.accept_state = AcceptTlsState::Done;
self.state = AcceptTlsState::Done;
return Err(e);
}
}
@ -347,7 +342,7 @@ impl Future for AcceptTls {
match listener.poll_accept().map_err(ErrBox::from) {
Ok(Async::Ready((stream, addr))) => {
listener_resource.untrack_task();
self.accept_state = AcceptTlsState::Done;
self.state = AcceptTlsState::Done;
Ok((stream, addr).into())
}
Ok(Async::NotReady) => {
@ -356,7 +351,7 @@ impl Future for AcceptTls {
}
Err(e) => {
listener_resource.untrack_task();
self.accept_state = AcceptTlsState::Done;
self.state = AcceptTlsState::Done;
Err(e)
}
}
@ -369,22 +364,21 @@ struct AcceptTlsArgs {
}
fn op_accept_tls(
state: &ThreadSafeState,
_state: &ThreadSafeState,
args: Value,
_zero_copy: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: AcceptTlsArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let state1 = state.clone();
let state2 = state.clone();
let op = accept_tls(state, rid)
let op = accept_tls(rid)
.and_then(move |(tcp_stream, _socket_addr)| {
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
Ok((tcp_stream, local_addr, remote_addr))
})
.and_then(move |(tcp_stream, local_addr, remote_addr)| {
let table = state1.lock_resource_table();
let table = resources::lock_resource_table();
let resource = table
.get::<TlsListenerResource>(rid)
.ok_or_else(bad_resource)
@ -395,11 +389,7 @@ fn op_accept_tls(
.accept(tcp_stream)
.map_err(ErrBox::from)
.and_then(move |tls_stream| {
let mut table = state2.lock_resource_table();
let rid = table.add(
"serverTlsStream",
Box::new(StreamResource::ServerTlsStream(Box::new(tls_stream))),
);
let rid = resources::add_server_tls_stream(tls_stream);
Ok((rid, local_addr, remote_addr))
})
})

209
cli/resources.rs Normal file
View File

@ -0,0 +1,209 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
// Think of Resources as File Descriptors. They are integers that are allocated
// by the privileged side of Deno to refer to various resources. The simplest
// example are standard file system files and stdio - but there will be other
// resources added in the future that might not correspond to operating system
// level File Descriptors. To avoid confusion we call them "resources" not "file
// descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here.
use crate::deno_error::bad_resource;
use crate::http_body::HttpBody;
use deno::ErrBox;
pub use deno::Resource;
pub use deno::ResourceId;
use deno::ResourceTable;
use futures;
use futures::Future;
use futures::Poll;
use reqwest::r#async::Decoder as ReqwestDecoder;
use std;
use std::sync::Mutex;
use std::sync::MutexGuard;
use tokio;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_process;
use tokio_rustls::client::TlsStream as ClientTlsStream;
use tokio_rustls::server::TlsStream as ServerTlsStream;
#[cfg(not(windows))]
use std::os::unix::io::FromRawFd;
#[cfg(windows)]
use std::os::windows::io::FromRawHandle;
#[cfg(windows)]
extern crate winapi;
lazy_static! {
static ref RESOURCE_TABLE: Mutex<ResourceTable> = Mutex::new({
let mut table = ResourceTable::default();
// TODO Load these lazily during lookup?
table.add("stdin", Box::new(CliResource::Stdin(tokio::io::stdin())));
table.add("stdout", Box::new(CliResource::Stdout({
#[cfg(not(windows))]
let stdout = unsafe { std::fs::File::from_raw_fd(1) };
#[cfg(windows)]
let stdout = unsafe {
std::fs::File::from_raw_handle(winapi::um::processenv::GetStdHandle(
winapi::um::winbase::STD_OUTPUT_HANDLE))
};
tokio::fs::File::from_std(stdout)
})));
table.add("stderr", Box::new(CliResource::Stderr(tokio::io::stderr())));
table
});
}
// TODO: rename to `StreamResource`
pub enum CliResource {
Stdin(tokio::io::Stdin),
Stdout(tokio::fs::File),
Stderr(tokio::io::Stderr),
FsFile(tokio::fs::File),
TcpStream(tokio::net::TcpStream),
ServerTlsStream(Box<ServerTlsStream<TcpStream>>),
ClientTlsStream(Box<ClientTlsStream<TcpStream>>),
HttpBody(HttpBody),
ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr),
}
impl Resource for CliResource {}
pub fn lock_resource_table<'a>() -> MutexGuard<'a, ResourceTable> {
RESOURCE_TABLE.lock().unwrap()
}
/// `DenoAsyncRead` is the same as the `tokio_io::AsyncRead` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncRead {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox>;
}
impl DenoAsyncRead for CliResource {
fn poll_read(&mut self, buf: &mut [u8]) -> Poll<usize, ErrBox> {
let r = match self {
CliResource::FsFile(ref mut f) => f.poll_read(buf),
CliResource::Stdin(ref mut f) => f.poll_read(buf),
CliResource::TcpStream(ref mut f) => f.poll_read(buf),
CliResource::ClientTlsStream(ref mut f) => f.poll_read(buf),
CliResource::ServerTlsStream(ref mut f) => f.poll_read(buf),
CliResource::HttpBody(ref mut f) => f.poll_read(buf),
CliResource::ChildStdout(ref mut f) => f.poll_read(buf),
CliResource::ChildStderr(ref mut f) => f.poll_read(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
}
/// `DenoAsyncWrite` is the same as the `tokio_io::AsyncWrite` trait
/// but uses an `ErrBox` error instead of `std::io:Error`
pub trait DenoAsyncWrite {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox>;
fn shutdown(&mut self) -> Poll<(), ErrBox>;
}
impl DenoAsyncWrite for CliResource {
fn poll_write(&mut self, buf: &[u8]) -> Poll<usize, ErrBox> {
let r = match self {
CliResource::FsFile(ref mut f) => f.poll_write(buf),
CliResource::Stdout(ref mut f) => f.poll_write(buf),
CliResource::Stderr(ref mut f) => f.poll_write(buf),
CliResource::TcpStream(ref mut f) => f.poll_write(buf),
CliResource::ClientTlsStream(ref mut f) => f.poll_write(buf),
CliResource::ServerTlsStream(ref mut f) => f.poll_write(buf),
CliResource::ChildStdin(ref mut f) => f.poll_write(buf),
_ => {
return Err(bad_resource());
}
};
r.map_err(ErrBox::from)
}
fn shutdown(&mut self) -> futures::Poll<(), ErrBox> {
unimplemented!()
}
}
pub fn add_fs_file(fs_file: tokio::fs::File) -> ResourceId {
let mut table = lock_resource_table();
table.add("fsFile", Box::new(CliResource::FsFile(fs_file)))
}
pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> ResourceId {
let mut table = lock_resource_table();
table.add("tcpStream", Box::new(CliResource::TcpStream(stream)))
}
pub fn add_tls_stream(stream: ClientTlsStream<TcpStream>) -> ResourceId {
let mut table = lock_resource_table();
table.add(
"clientTlsStream",
Box::new(CliResource::ClientTlsStream(Box::new(stream))),
)
}
pub fn add_server_tls_stream(stream: ServerTlsStream<TcpStream>) -> ResourceId {
let mut table = lock_resource_table();
table.add(
"serverTlsStream",
Box::new(CliResource::ServerTlsStream(Box::new(stream))),
)
}
pub fn add_reqwest_body(body: ReqwestDecoder) -> ResourceId {
let body = HttpBody::from(body);
let mut table = lock_resource_table();
table.add("httpBody", Box::new(CliResource::HttpBody(body)))
}
pub fn add_child_stdin(stdin: tokio_process::ChildStdin) -> ResourceId {
let mut table = lock_resource_table();
table.add("childStdin", Box::new(CliResource::ChildStdin(stdin)))
}
pub fn add_child_stdout(stdout: tokio_process::ChildStdout) -> ResourceId {
let mut table = lock_resource_table();
table.add("childStdout", Box::new(CliResource::ChildStdout(stdout)))
}
pub fn add_child_stderr(stderr: tokio_process::ChildStderr) -> ResourceId {
let mut table = lock_resource_table();
table.add("childStderr", Box::new(CliResource::ChildStderr(stderr)))
}
pub struct CloneFileFuture {
pub rid: ResourceId,
}
impl Future for CloneFileFuture {
type Item = tokio::fs::File;
type Error = ErrBox;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut table = lock_resource_table();
let repr = table
.get_mut::<CliResource>(self.rid)
.ok_or_else(bad_resource)?;
match repr {
CliResource::FsFile(ref mut file) => {
file.poll_try_clone().map_err(ErrBox::from)
}
_ => Err(bad_resource()),
}
}
}

View File

@ -5,7 +5,6 @@ use crate::global_timer::GlobalTimer;
use crate::import_map::ImportMap;
use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::ops::MinimalOp;
use crate::permissions::DenoPermissions;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
@ -16,7 +15,6 @@ use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
use deno::ResourceTable;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
@ -29,7 +27,6 @@ use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::time::Instant;
use tokio::sync::mpsc;
@ -55,7 +52,6 @@ pub struct State {
pub start_time: Instant,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
pub resource_table: Mutex<ResourceTable>,
}
impl Clone for ThreadSafeState {
@ -72,10 +68,6 @@ impl Deref for ThreadSafeState {
}
impl ThreadSafeState {
pub fn lock_resource_table(&self) -> MutexGuard<ResourceTable> {
self.resource_table.lock().unwrap()
}
/// Wrap core `OpDispatcher` to collect metrics.
pub fn core_op<D>(
&self,
@ -111,21 +103,6 @@ impl ThreadSafeState {
}
}
/// This is a special function that provides `state` argument to dispatcher.
pub fn stateful_minimal_op<D>(
&self,
dispatcher: D,
) -> impl Fn(i32, Option<PinnedBuf>) -> Box<MinimalOp>
where
D: Fn(&ThreadSafeState, i32, Option<PinnedBuf>) -> Box<MinimalOp>,
{
let state = self.clone();
move |rid: i32, zero_copy: Option<PinnedBuf>| -> Box<MinimalOp> {
dispatcher(&state, rid, zero_copy)
}
}
/// This is a special function that provides `state` argument to dispatcher.
///
/// NOTE: This only works with JSON dispatcher.
@ -243,7 +220,6 @@ impl ThreadSafeState {
start_time: Instant::now(),
seeded_rng,
include_deno_namespace,
resource_table: Mutex::new(ResourceTable::default()),
};
Ok(ThreadSafeState(Arc::new(state)))

View File

@ -65,7 +65,7 @@ impl ResourceTable {
}
// close(2) is done by dropping the value. Therefore we just need to remove
// the resource from the resource table.
// the resource from the RESOURCE_TABLE.
pub fn close(&mut self, rid: ResourceId) -> Option<()> {
self.map.remove(&rid).map(|(_name, _resource)| ())
}