fix(ext/node): panic on 'worker_threads.receiveMessageOnPort' (#23386)

Closes https://github.com/denoland/deno/issues/23362

Previously we were panicking if there was a pending read on a
port and `receiveMessageOnPort` was called. This is now fixed
by cancelling the pending read, trying to read a message and
resuming reading in a loop.
This commit is contained in:
Bartek Iwańczuk 2024-04-16 00:06:39 +01:00 committed by GitHub
parent 46c709e52f
commit 0b8d7d1d4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 42 additions and 1 deletions

View File

@ -17,6 +17,7 @@ import {
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
refMessagePort,
serializeJsMessageData,
@ -441,6 +442,7 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
err["code"] = "ERR_INVALID_ARG_TYPE";
throw err;
}
port[MessagePortReceiveMessageOnPortSymbol] = true;
const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
if (data === null) return undefined;
return { message: deserializeJsMessageData(data)[0] };

View File

@ -88,6 +88,9 @@ const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id");
const MessagePortIdSymbol = _id;
const MessagePortReceiveMessageOnPortSymbol = Symbol(
"MessagePortReceiveMessageOnPort",
);
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
@ -128,6 +131,10 @@ class MessagePort extends EventTarget {
constructor() {
super();
ObjectDefineProperty(this, MessagePortReceiveMessageOnPortSymbol, {
value: false,
enumerable: false,
});
ObjectDefineProperty(this, nodeWorkerThreadCloseCb, {
value: null,
enumerable: false,
@ -189,7 +196,15 @@ class MessagePort extends EventTarget {
this[_id],
);
} catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break;
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
// If we were interrupted, check if the interruption is coming
// from `receiveMessageOnPort` API from Node compat, if so, continue.
if (this[MessagePortReceiveMessageOnPortSymbol]) {
this[MessagePortReceiveMessageOnPortSymbol] = false;
continue;
}
break;
}
nodeWorkerThreadMaybeInvokeCloseCb(this);
throw err;
}
@ -444,6 +459,7 @@ export {
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
serializeJsMessageData,
structuredClone,

View File

@ -235,6 +235,7 @@ pub fn op_message_port_recv_message_sync(
#[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
resource.cancel.cancel();
let mut rx = resource.port.rx.borrow_mut();
match rx.try_recv() {

View File

@ -414,3 +414,25 @@ Deno.test({
mainPort.close();
},
});
// Regression test for https://github.com/denoland/deno/issues/23362
Deno.test("[node/worker_threads] receiveMessageOnPort works if there's pending read", function () {
const { port1, port2 } = new workerThreads.MessageChannel();
const message1 = { hello: "world" };
const message2 = { foo: "bar" };
assertEquals(workerThreads.receiveMessageOnPort(port2), undefined);
port2.start();
port1.postMessage(message1);
port1.postMessage(message2);
assertEquals(workerThreads.receiveMessageOnPort(port2), {
message: message1,
});
assertEquals(workerThreads.receiveMessageOnPort(port2), {
message: message2,
});
port1.close();
port2.close();
});