deno/cli/js/web/streams/readable-internals.ts

1351 lines
44 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/* eslint-disable @typescript-eslint/no-explicit-any */
// TODO reenable this lint here
import * as shared from "./shared-internals.ts";
import * as q from "./queue-mixin.ts";
import {
QueuingStrategy,
QueuingStrategySizeCallback,
UnderlyingSource,
UnderlyingByteSource,
} from "../dom_types.ts";
// ReadableStreamDefaultController
export const controlledReadableStream_ = Symbol("controlledReadableStream_");
export const pullAlgorithm_ = Symbol("pullAlgorithm_");
export const cancelAlgorithm_ = Symbol("cancelAlgorithm_");
export const strategySizeAlgorithm_ = Symbol("strategySizeAlgorithm_");
export const strategyHWM_ = Symbol("strategyHWM_");
export const started_ = Symbol("started_");
export const closeRequested_ = Symbol("closeRequested_");
export const pullAgain_ = Symbol("pullAgain_");
export const pulling_ = Symbol("pulling_");
export const cancelSteps_ = Symbol("cancelSteps_");
export const pullSteps_ = Symbol("pullSteps_");
// ReadableByteStreamController
export const autoAllocateChunkSize_ = Symbol("autoAllocateChunkSize_");
export const byobRequest_ = Symbol("byobRequest_");
export const controlledReadableByteStream_ = Symbol(
"controlledReadableByteStream_"
);
export const pendingPullIntos_ = Symbol("pendingPullIntos_");
// ReadableStreamDefaultReader
export const closedPromise_ = Symbol("closedPromise_");
export const ownerReadableStream_ = Symbol("ownerReadableStream_");
export const readRequests_ = Symbol("readRequests_");
export const readIntoRequests_ = Symbol("readIntoRequests_");
// ReadableStreamBYOBRequest
export const associatedReadableByteStreamController_ = Symbol(
"associatedReadableByteStreamController_"
);
export const view_ = Symbol("view_");
// ReadableStreamBYOBReader
// ReadableStream
export const reader_ = Symbol("reader_");
export const readableStreamController_ = Symbol("readableStreamController_");
export type StartFunction<OutputType> = (
controller: SDReadableStreamControllerBase<OutputType>
) => void | PromiseLike<void>;
export type StartAlgorithm = () => Promise<void> | void;
export type PullFunction<OutputType> = (
controller: SDReadableStreamControllerBase<OutputType>
) => void | PromiseLike<void>;
export type PullAlgorithm<OutputType> = (
controller: SDReadableStreamControllerBase<OutputType>
) => PromiseLike<void>;
export type CancelAlgorithm = (reason?: shared.ErrorResult) => Promise<void>;
// ----
export interface SDReadableStreamControllerBase<OutputType> {
readonly desiredSize: number | null;
close(): void;
error(e?: shared.ErrorResult): void;
[cancelSteps_](reason: shared.ErrorResult): Promise<void>;
[pullSteps_](forAuthorCode: boolean): Promise<IteratorResult<OutputType>>;
}
export interface SDReadableStreamBYOBRequest {
readonly view: ArrayBufferView;
respond(bytesWritten: number): void;
respondWithNewView(view: ArrayBufferView): void;
[associatedReadableByteStreamController_]:
| SDReadableByteStreamController
| undefined;
[view_]: ArrayBufferView | undefined;
}
interface ArrayBufferViewCtor {
new (
buffer: ArrayBufferLike,
byteOffset?: number,
byteLength?: number
): ArrayBufferView;
}
export interface PullIntoDescriptor {
readerType: "default" | "byob";
ctor: ArrayBufferViewCtor;
buffer: ArrayBufferLike;
byteOffset: number;
byteLength: number;
bytesFilled: number;
elementSize: number;
}
export interface SDReadableByteStreamController
extends SDReadableStreamControllerBase<ArrayBufferView>,
q.ByteQueueContainer {
readonly byobRequest: SDReadableStreamBYOBRequest | undefined;
enqueue(chunk: ArrayBufferView): void;
[autoAllocateChunkSize_]: number | undefined; // A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.
[byobRequest_]: SDReadableStreamBYOBRequest | undefined; // A ReadableStreamBYOBRequest instance representing the current BYOB pull request
[cancelAlgorithm_]: CancelAlgorithm; // A promise-returning algorithm, taking one argument (the cancel reason), which communicates a requested cancelation to the underlying source
[closeRequested_]: boolean; // A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read
[controlledReadableByteStream_]: SDReadableStream<ArrayBufferView>; // The ReadableStream instance controlled
[pullAgain_]: boolean; // A boolean flag set to true if the streams mechanisms requested a call to the underlying byte sources pull() method to pull more data, but the pull could not yet be done since a previous call is still executing
[pullAlgorithm_]: PullAlgorithm<ArrayBufferView>; // A promise-returning algorithm that pulls data from the underlying source
[pulling_]: boolean; // A boolean flag set to true while the underlying byte sources pull() method is executing and has not yet fulfilled, used to prevent reentrant calls
[pendingPullIntos_]: PullIntoDescriptor[]; // A List of descriptors representing pending BYOB pull requests
[started_]: boolean; // A boolean flag indicating whether the underlying source has finished starting
[strategyHWM_]: number; // A number supplied to the constructor as part of the streams queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source
}
export interface SDReadableStreamDefaultController<OutputType>
extends SDReadableStreamControllerBase<OutputType>,
q.QueueContainer<OutputType> {
enqueue(chunk?: OutputType): void;
[controlledReadableStream_]: SDReadableStream<OutputType>;
[pullAlgorithm_]: PullAlgorithm<OutputType>;
[cancelAlgorithm_]: CancelAlgorithm;
[strategySizeAlgorithm_]: QueuingStrategySizeCallback<OutputType>;
[strategyHWM_]: number;
[started_]: boolean;
[closeRequested_]: boolean;
[pullAgain_]: boolean;
[pulling_]: boolean;
}
// ----
export interface SDReadableStreamReader<OutputType> {
readonly closed: Promise<void>;
cancel(reason: shared.ErrorResult): Promise<void>;
releaseLock(): void;
[ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
[closedPromise_]: shared.ControlledPromise<void>;
}
export interface ReadRequest<V> extends shared.ControlledPromise<V> {
forAuthorCode: boolean;
}
export declare class SDReadableStreamDefaultReader<OutputType>
implements SDReadableStreamReader<OutputType> {
constructor(stream: SDReadableStream<OutputType>);
readonly closed: Promise<void>;
cancel(reason: shared.ErrorResult): Promise<void>;
releaseLock(): void;
read(): Promise<IteratorResult<OutputType | undefined>>;
[ownerReadableStream_]: SDReadableStream<OutputType> | undefined;
[closedPromise_]: shared.ControlledPromise<void>;
[readRequests_]: Array<ReadRequest<IteratorResult<OutputType>>>;
}
export declare class SDReadableStreamBYOBReader
implements SDReadableStreamReader<ArrayBufferView> {
constructor(stream: SDReadableStream<ArrayBufferView>);
readonly closed: Promise<void>;
cancel(reason: shared.ErrorResult): Promise<void>;
releaseLock(): void;
read(view: ArrayBufferView): Promise<IteratorResult<ArrayBufferView>>;
[ownerReadableStream_]: SDReadableStream<ArrayBufferView> | undefined;
[closedPromise_]: shared.ControlledPromise<void>;
[readIntoRequests_]: Array<ReadRequest<IteratorResult<ArrayBufferView>>>;
}
/* TODO reenable this when we add WritableStreams and Transforms
export interface GenericTransformStream<InputType, OutputType> {
readable: SDReadableStream<OutputType>;
writable: ws.WritableStream<InputType>;
}
*/
export type ReadableStreamState = "readable" | "closed" | "errored";
export declare class SDReadableStream<OutputType> {
constructor(
underlyingSource: UnderlyingByteSource,
strategy?: { highWaterMark?: number; size?: undefined }
);
constructor(
underlyingSource?: UnderlyingSource<OutputType>,
strategy?: QueuingStrategy<OutputType>
);
readonly locked: boolean;
cancel(reason?: shared.ErrorResult): Promise<void>;
getReader(): SDReadableStreamReader<OutputType>;
getReader(options: { mode: "byob" }): SDReadableStreamBYOBReader;
tee(): Array<SDReadableStream<OutputType>>;
/* TODO reenable these methods when we bring in writableStreams and transport types
pipeThrough<ResultType>(
transform: GenericTransformStream<OutputType, ResultType>,
options?: PipeOptions
): SDReadableStream<ResultType>;
pipeTo(
dest: ws.WritableStream<OutputType>,
options?: PipeOptions
): Promise<void>;
*/
[shared.state_]: ReadableStreamState;
[shared.storedError_]: shared.ErrorResult;
[reader_]: SDReadableStreamReader<OutputType> | undefined;
[readableStreamController_]: SDReadableStreamControllerBase<OutputType>;
}
// ---- Stream
export function initializeReadableStream<OutputType>(
stream: SDReadableStream<OutputType>
): void {
stream[shared.state_] = "readable";
stream[reader_] = undefined;
stream[shared.storedError_] = undefined;
stream[readableStreamController_] = undefined!; // mark slot as used for brand check
}
export function isReadableStream(
value: unknown
): value is SDReadableStream<any> {
if (typeof value !== "object" || value === null) {
return false;
}
return readableStreamController_ in value;
}
export function isReadableStreamLocked<OutputType>(
stream: SDReadableStream<OutputType>
): boolean {
return stream[reader_] !== undefined;
}
export function readableStreamGetNumReadIntoRequests<OutputType>(
stream: SDReadableStream<OutputType>
): number {
// TODO remove the "as unknown" cast
// This is in to workaround a compiler error
// error TS2352: Conversion of type 'SDReadableStreamReader<OutputType>' to type 'SDReadableStreamBYOBReader' may be a mistake because neither type sufficiently overlaps with the other. If this was intentional, convert the expression to 'unknown' first.
// Type 'SDReadableStreamReader<OutputType>' is missing the following properties from type 'SDReadableStreamBYOBReader': read, [readIntoRequests_]
const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
if (reader === undefined) {
return 0;
}
return reader[readIntoRequests_].length;
}
export function readableStreamGetNumReadRequests<OutputType>(
stream: SDReadableStream<OutputType>
): number {
const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
if (reader === undefined) {
return 0;
}
return reader[readRequests_].length;
}
export function readableStreamCreateReadResult<T>(
value: T,
done: boolean,
forAuthorCode: boolean
): IteratorResult<T> {
const prototype = forAuthorCode ? Object.prototype : null;
const result = Object.create(prototype);
result.value = value;
result.done = done;
return result;
}
export function readableStreamAddReadIntoRequest(
stream: SDReadableStream<ArrayBufferView>,
forAuthorCode: boolean
): Promise<IteratorResult<ArrayBufferView, any>> {
// Assert: ! IsReadableStreamBYOBReader(stream.[[reader]]) is true.
// Assert: stream.[[state]] is "readable" or "closed".
const reader = stream[reader_] as SDReadableStreamBYOBReader;
const conProm = shared.createControlledPromise<
IteratorResult<ArrayBufferView>
>() as ReadRequest<IteratorResult<ArrayBufferView>>;
conProm.forAuthorCode = forAuthorCode;
reader[readIntoRequests_].push(conProm);
return conProm.promise;
}
export function readableStreamAddReadRequest<OutputType>(
stream: SDReadableStream<OutputType>,
forAuthorCode: boolean
): Promise<IteratorResult<OutputType, any>> {
// Assert: ! IsReadableStreamDefaultReader(stream.[[reader]]) is true.
// Assert: stream.[[state]] is "readable".
const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
const conProm = shared.createControlledPromise<
IteratorResult<OutputType>
>() as ReadRequest<IteratorResult<OutputType>>;
conProm.forAuthorCode = forAuthorCode;
reader[readRequests_].push(conProm);
return conProm.promise;
}
export function readableStreamHasBYOBReader<OutputType>(
stream: SDReadableStream<OutputType>
): boolean {
const reader = stream[reader_];
return isReadableStreamBYOBReader(reader);
}
export function readableStreamHasDefaultReader<OutputType>(
stream: SDReadableStream<OutputType>
): boolean {
const reader = stream[reader_];
return isReadableStreamDefaultReader(reader);
}
export function readableStreamCancel<OutputType>(
stream: SDReadableStream<OutputType>,
reason: shared.ErrorResult
): Promise<undefined> {
if (stream[shared.state_] === "closed") {
return Promise.resolve(undefined);
}
if (stream[shared.state_] === "errored") {
return Promise.reject(stream[shared.storedError_]);
}
readableStreamClose(stream);
const sourceCancelPromise = stream[readableStreamController_][cancelSteps_](
reason
);
return sourceCancelPromise.then((_) => undefined);
}
export function readableStreamClose<OutputType>(
stream: SDReadableStream<OutputType>
): void {
// Assert: stream.[[state]] is "readable".
stream[shared.state_] = "closed";
const reader = stream[reader_];
if (reader === undefined) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[readRequests_]) {
readRequest.resolve(
readableStreamCreateReadResult(
undefined,
true,
readRequest.forAuthorCode
)
);
}
reader[readRequests_] = [];
}
reader[closedPromise_].resolve();
reader[closedPromise_].promise.catch(() => {});
}
export function readableStreamError<OutputType>(
stream: SDReadableStream<OutputType>,
error: shared.ErrorResult
): void {
if (stream[shared.state_] !== "readable") {
throw new RangeError("Stream is in an invalid state");
}
stream[shared.state_] = "errored";
stream[shared.storedError_] = error;
const reader = stream[reader_];
if (reader === undefined) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[readRequests_]) {
readRequest.reject(error);
}
reader[readRequests_] = [];
} else {
// Assert: IsReadableStreamBYOBReader(reader).
// TODO remove the "as unknown" cast
const readIntoRequests = ((reader as unknown) as SDReadableStreamBYOBReader)[
readIntoRequests_
];
for (const readIntoRequest of readIntoRequests) {
readIntoRequest.reject(error);
}
// TODO remove the "as unknown" cast
((reader as unknown) as SDReadableStreamBYOBReader)[readIntoRequests_] = [];
}
reader[closedPromise_].reject(error);
}
// ---- Readers
export function isReadableStreamDefaultReader(
reader: unknown
): reader is SDReadableStreamDefaultReader<any> {
if (typeof reader !== "object" || reader === null) {
return false;
}
return readRequests_ in reader;
}
export function isReadableStreamBYOBReader(
reader: unknown
): reader is SDReadableStreamBYOBReader {
if (typeof reader !== "object" || reader === null) {
return false;
}
return readIntoRequests_ in reader;
}
export function readableStreamReaderGenericInitialize<OutputType>(
reader: SDReadableStreamReader<OutputType>,
stream: SDReadableStream<OutputType>
): void {
reader[ownerReadableStream_] = stream;
stream[reader_] = reader;
const streamState = stream[shared.state_];
reader[closedPromise_] = shared.createControlledPromise<void>();
if (streamState === "readable") {
// leave as is
} else if (streamState === "closed") {
reader[closedPromise_].resolve(undefined);
} else {
reader[closedPromise_].reject(stream[shared.storedError_]);
reader[closedPromise_].promise.catch(() => {});
}
}
export function readableStreamReaderGenericRelease<OutputType>(
reader: SDReadableStreamReader<OutputType>
): void {
// Assert: reader.[[ownerReadableStream]] is not undefined.
// Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
const stream = reader[ownerReadableStream_];
if (stream === undefined) {
throw new TypeError("Reader is in an inconsistent state");
}
if (stream[shared.state_] === "readable") {
// code moved out
} else {
reader[closedPromise_] = shared.createControlledPromise<void>();
}
reader[closedPromise_].reject(new TypeError());
reader[closedPromise_].promise.catch(() => {});
stream[reader_] = undefined;
reader[ownerReadableStream_] = undefined;
}
export function readableStreamBYOBReaderRead(
reader: SDReadableStreamBYOBReader,
view: ArrayBufferView,
forAuthorCode = false
): Promise<IteratorResult<ArrayBufferView, any>> {
const stream = reader[ownerReadableStream_]!;
// Assert: stream is not undefined.
if (stream[shared.state_] === "errored") {
return Promise.reject(stream[shared.storedError_]);
}
return readableByteStreamControllerPullInto(
stream[readableStreamController_] as SDReadableByteStreamController,
view,
forAuthorCode
);
}
export function readableStreamDefaultReaderRead<OutputType>(
reader: SDReadableStreamDefaultReader<OutputType>,
forAuthorCode = false
): Promise<IteratorResult<OutputType | undefined>> {
const stream = reader[ownerReadableStream_]!;
// Assert: stream is not undefined.
if (stream[shared.state_] === "closed") {
return Promise.resolve(
readableStreamCreateReadResult(undefined, true, forAuthorCode)
);
}
if (stream[shared.state_] === "errored") {
return Promise.reject(stream[shared.storedError_]);
}
// Assert: stream.[[state]] is "readable".
return stream[readableStreamController_][pullSteps_](forAuthorCode);
}
export function readableStreamFulfillReadIntoRequest<OutputType>(
stream: SDReadableStream<OutputType>,
chunk: ArrayBufferView,
done: boolean
): void {
// TODO remove the "as unknown" cast
const reader = (stream[reader_] as unknown) as SDReadableStreamBYOBReader;
const readIntoRequest = reader[readIntoRequests_].shift()!; // <-- length check done in caller
readIntoRequest.resolve(
readableStreamCreateReadResult(chunk, done, readIntoRequest.forAuthorCode)
);
}
export function readableStreamFulfillReadRequest<OutputType>(
stream: SDReadableStream<OutputType>,
chunk: OutputType,
done: boolean
): void {
const reader = stream[reader_] as SDReadableStreamDefaultReader<OutputType>;
const readRequest = reader[readRequests_].shift()!; // <-- length check done in caller
readRequest.resolve(
readableStreamCreateReadResult(chunk, done, readRequest.forAuthorCode)
);
}
// ---- DefaultController
export function setUpReadableStreamDefaultController<OutputType>(
stream: SDReadableStream<OutputType>,
controller: SDReadableStreamDefaultController<OutputType>,
startAlgorithm: StartAlgorithm,
pullAlgorithm: PullAlgorithm<OutputType>,
cancelAlgorithm: CancelAlgorithm,
highWaterMark: number,
sizeAlgorithm: QueuingStrategySizeCallback<OutputType>
): void {
// Assert: stream.[[readableStreamController]] is undefined.
controller[controlledReadableStream_] = stream;
q.resetQueue(controller);
controller[started_] = false;
controller[closeRequested_] = false;
controller[pullAgain_] = false;
controller[pulling_] = false;
controller[strategySizeAlgorithm_] = sizeAlgorithm;
controller[strategyHWM_] = highWaterMark;
controller[pullAlgorithm_] = pullAlgorithm;
controller[cancelAlgorithm_] = cancelAlgorithm;
stream[readableStreamController_] = controller;
const startResult = startAlgorithm();
Promise.resolve(startResult).then(
(_) => {
controller[started_] = true;
// Assert: controller.[[pulling]] is false.
// Assert: controller.[[pullAgain]] is false.
readableStreamDefaultControllerCallPullIfNeeded(controller);
},
(error) => {
readableStreamDefaultControllerError(controller, error);
}
);
}
export function isReadableStreamDefaultController(
value: unknown
): value is SDReadableStreamDefaultController<any> {
if (typeof value !== "object" || value === null) {
return false;
}
return controlledReadableStream_ in value;
}
export function readableStreamDefaultControllerHasBackpressure<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): boolean {
return !readableStreamDefaultControllerShouldCallPull(controller);
}
export function readableStreamDefaultControllerCanCloseOrEnqueue<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): boolean {
const state = controller[controlledReadableStream_][shared.state_];
return controller[closeRequested_] === false && state === "readable";
}
export function readableStreamDefaultControllerGetDesiredSize<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): number | null {
const state = controller[controlledReadableStream_][shared.state_];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
return controller[strategyHWM_] - controller[q.queueTotalSize_];
}
export function readableStreamDefaultControllerClose<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): void {
// Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
controller[closeRequested_] = true;
const stream = controller[controlledReadableStream_];
if (controller[q.queue_].length === 0) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
}
export function readableStreamDefaultControllerEnqueue<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>,
chunk: OutputType
): void {
const stream = controller[controlledReadableStream_];
// Assert: !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) is true.
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
readableStreamFulfillReadRequest(stream, chunk, false);
} else {
// Let result be the result of performing controller.[[strategySizeAlgorithm]], passing in chunk,
// and interpreting the result as an ECMAScript completion value.
// impl note: assuming that in JS land this just means try/catch with rethrow
let chunkSize: number;
try {
chunkSize = controller[strategySizeAlgorithm_](chunk);
} catch (error) {
readableStreamDefaultControllerError(controller, error);
throw error;
}
try {
q.enqueueValueWithSize(controller, chunk, chunkSize);
} catch (error) {
readableStreamDefaultControllerError(controller, error);
throw error;
}
}
readableStreamDefaultControllerCallPullIfNeeded(controller);
}
export function readableStreamDefaultControllerError<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>,
error: shared.ErrorResult
): void {
const stream = controller[controlledReadableStream_];
if (stream[shared.state_] !== "readable") {
return;
}
q.resetQueue(controller);
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamError(stream, error);
}
export function readableStreamDefaultControllerCallPullIfNeeded<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): void {
if (!readableStreamDefaultControllerShouldCallPull(controller)) {
return;
}
if (controller[pulling_]) {
controller[pullAgain_] = true;
return;
}
if (controller[pullAgain_]) {
throw new RangeError("Stream controller is in an invalid state.");
}
controller[pulling_] = true;
controller[pullAlgorithm_](controller).then(
(_) => {
controller[pulling_] = false;
if (controller[pullAgain_]) {
controller[pullAgain_] = false;
readableStreamDefaultControllerCallPullIfNeeded(controller);
}
},
(error) => {
readableStreamDefaultControllerError(controller, error);
}
);
}
export function readableStreamDefaultControllerShouldCallPull<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): boolean {
const stream = controller[controlledReadableStream_];
if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
return false;
}
if (controller[started_] === false) {
return false;
}
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
return true;
}
const desiredSize = readableStreamDefaultControllerGetDesiredSize(controller);
if (desiredSize === null) {
throw new RangeError("Stream is in an invalid state.");
}
return desiredSize > 0;
}
export function readableStreamDefaultControllerClearAlgorithms<OutputType>(
controller: SDReadableStreamDefaultController<OutputType>
): void {
controller[pullAlgorithm_] = undefined!;
controller[cancelAlgorithm_] = undefined!;
controller[strategySizeAlgorithm_] = undefined!;
}
// ---- BYOBController
export function setUpReadableByteStreamController(
stream: SDReadableStream<ArrayBufferView>,
controller: SDReadableByteStreamController,
startAlgorithm: StartAlgorithm,
pullAlgorithm: PullAlgorithm<ArrayBufferView>,
cancelAlgorithm: CancelAlgorithm,
highWaterMark: number,
autoAllocateChunkSize: number | undefined
): void {
// Assert: stream.[[readableStreamController]] is undefined.
if (stream[readableStreamController_] !== undefined) {
throw new TypeError("Cannot reuse streams");
}
if (autoAllocateChunkSize !== undefined) {
if (
!shared.isInteger(autoAllocateChunkSize) ||
autoAllocateChunkSize <= 0
) {
throw new RangeError(
"autoAllocateChunkSize must be a positive, finite integer"
);
}
}
// Set controller.[[controlledReadableByteStream]] to stream.
controller[controlledReadableByteStream_] = stream;
// Set controller.[[pullAgain]] and controller.[[pulling]] to false.
controller[pullAgain_] = false;
controller[pulling_] = false;
readableByteStreamControllerClearPendingPullIntos(controller);
q.resetQueue(controller);
controller[closeRequested_] = false;
controller[started_] = false;
controller[strategyHWM_] = shared.validateAndNormalizeHighWaterMark(
highWaterMark
);
controller[pullAlgorithm_] = pullAlgorithm;
controller[cancelAlgorithm_] = cancelAlgorithm;
controller[autoAllocateChunkSize_] = autoAllocateChunkSize;
controller[pendingPullIntos_] = [];
stream[readableStreamController_] = controller;
// Let startResult be the result of performing startAlgorithm.
const startResult = startAlgorithm();
Promise.resolve(startResult).then(
(_) => {
controller[started_] = true;
// Assert: controller.[[pulling]] is false.
// Assert: controller.[[pullAgain]] is false.
readableByteStreamControllerCallPullIfNeeded(controller);
},
(error) => {
readableByteStreamControllerError(controller, error);
}
);
}
export function isReadableStreamBYOBRequest(
value: unknown
): value is SDReadableStreamBYOBRequest {
if (typeof value !== "object" || value === null) {
return false;
}
return associatedReadableByteStreamController_ in value;
}
export function isReadableByteStreamController(
value: unknown
): value is SDReadableByteStreamController {
if (typeof value !== "object" || value === null) {
return false;
}
return controlledReadableByteStream_ in value;
}
export function readableByteStreamControllerCallPullIfNeeded(
controller: SDReadableByteStreamController
): void {
if (!readableByteStreamControllerShouldCallPull(controller)) {
return;
}
if (controller[pulling_]) {
controller[pullAgain_] = true;
return;
}
// Assert: controller.[[pullAgain]] is false.
controller[pulling_] = true;
controller[pullAlgorithm_](controller).then(
(_) => {
controller[pulling_] = false;
if (controller[pullAgain_]) {
controller[pullAgain_] = false;
readableByteStreamControllerCallPullIfNeeded(controller);
}
},
(error) => {
readableByteStreamControllerError(controller, error);
}
);
}
export function readableByteStreamControllerClearAlgorithms(
controller: SDReadableByteStreamController
): void {
controller[pullAlgorithm_] = undefined!;
controller[cancelAlgorithm_] = undefined!;
}
export function readableByteStreamControllerClearPendingPullIntos(
controller: SDReadableByteStreamController
): void {
readableByteStreamControllerInvalidateBYOBRequest(controller);
controller[pendingPullIntos_] = [];
}
export function readableByteStreamControllerClose(
controller: SDReadableByteStreamController
): void {
const stream = controller[controlledReadableByteStream_];
// Assert: controller.[[closeRequested]] is false.
// Assert: stream.[[state]] is "readable".
if (controller[q.queueTotalSize_] > 0) {
controller[closeRequested_] = true;
return;
}
if (controller[pendingPullIntos_].length > 0) {
const firstPendingPullInto = controller[pendingPullIntos_][0];
if (firstPendingPullInto.bytesFilled > 0) {
const error = new TypeError();
readableByteStreamControllerError(controller, error);
throw error;
}
}
readableByteStreamControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
export function readableByteStreamControllerCommitPullIntoDescriptor(
stream: SDReadableStream<ArrayBufferView>,
pullIntoDescriptor: PullIntoDescriptor
): void {
// Assert: stream.[[state]] is not "errored".
let done = false;
if (stream[shared.state_] === "closed") {
// Assert: pullIntoDescriptor.[[bytesFilled]] is 0.
done = true;
}
const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
pullIntoDescriptor
);
if (pullIntoDescriptor.readerType === "default") {
readableStreamFulfillReadRequest(stream, filledView, done);
} else {
// Assert: pullIntoDescriptor.[[readerType]] is "byob".
readableStreamFulfillReadIntoRequest(stream, filledView, done);
}
}
export function readableByteStreamControllerConvertPullIntoDescriptor(
pullIntoDescriptor: PullIntoDescriptor
): ArrayBufferView {
const { bytesFilled, elementSize } = pullIntoDescriptor;
// Assert: bytesFilled <= pullIntoDescriptor.byteLength
// Assert: bytesFilled mod elementSize is 0
return new pullIntoDescriptor.ctor(
pullIntoDescriptor.buffer,
pullIntoDescriptor.byteOffset,
bytesFilled / elementSize
);
}
export function readableByteStreamControllerEnqueue(
controller: SDReadableByteStreamController,
chunk: ArrayBufferView
): void {
const stream = controller[controlledReadableByteStream_];
// Assert: controller.[[closeRequested]] is false.
// Assert: stream.[[state]] is "readable".
const { buffer, byteOffset, byteLength } = chunk;
const transferredBuffer = shared.transferArrayBuffer(buffer);
if (readableStreamHasDefaultReader(stream)) {
if (readableStreamGetNumReadRequests(stream) === 0) {
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength
);
} else {
// Assert: controller.[[queue]] is empty.
const transferredView = new Uint8Array(
transferredBuffer,
byteOffset,
byteLength
);
readableStreamFulfillReadRequest(stream, transferredView, false);
}
} else if (readableStreamHasBYOBReader(stream)) {
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller
);
} else {
// Assert: !IsReadableStreamLocked(stream) is false.
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength
);
}
readableByteStreamControllerCallPullIfNeeded(controller);
}
export function readableByteStreamControllerEnqueueChunkToQueue(
controller: SDReadableByteStreamController,
buffer: ArrayBufferLike,
byteOffset: number,
byteLength: number
): void {
controller[q.queue_].push({ buffer, byteOffset, byteLength });
controller[q.queueTotalSize_] += byteLength;
}
export function readableByteStreamControllerError(
controller: SDReadableByteStreamController,
error: shared.ErrorResult
): void {
const stream = controller[controlledReadableByteStream_];
if (stream[shared.state_] !== "readable") {
return;
}
readableByteStreamControllerClearPendingPullIntos(controller);
q.resetQueue(controller);
readableByteStreamControllerClearAlgorithms(controller);
readableStreamError(stream, error);
}
export function readableByteStreamControllerFillHeadPullIntoDescriptor(
controller: SDReadableByteStreamController,
size: number,
pullIntoDescriptor: PullIntoDescriptor
): void {
// Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
readableByteStreamControllerInvalidateBYOBRequest(controller);
pullIntoDescriptor.bytesFilled += size;
}
export function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller: SDReadableByteStreamController,
pullIntoDescriptor: PullIntoDescriptor
): boolean {
const elementSize = pullIntoDescriptor.elementSize;
const currentAlignedBytes =
pullIntoDescriptor.bytesFilled -
(pullIntoDescriptor.bytesFilled % elementSize);
const maxBytesToCopy = Math.min(
controller[q.queueTotalSize_],
pullIntoDescriptor.byteLength - pullIntoDescriptor.bytesFilled
);
const maxBytesFilled = pullIntoDescriptor.bytesFilled + maxBytesToCopy;
const maxAlignedBytes = maxBytesFilled - (maxBytesFilled % elementSize);
let totalBytesToCopyRemaining = maxBytesToCopy;
let ready = false;
if (maxAlignedBytes > currentAlignedBytes) {
totalBytesToCopyRemaining =
maxAlignedBytes - pullIntoDescriptor.bytesFilled;
ready = true;
}
const queue = controller[q.queue_];
while (totalBytesToCopyRemaining > 0) {
const headOfQueue = queue.front()!;
const bytesToCopy = Math.min(
totalBytesToCopyRemaining,
headOfQueue.byteLength
);
const destStart =
pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
shared.copyDataBlockBytes(
pullIntoDescriptor.buffer,
destStart,
headOfQueue.buffer,
headOfQueue.byteOffset,
bytesToCopy
);
if (headOfQueue.byteLength === bytesToCopy) {
queue.shift();
} else {
headOfQueue.byteOffset += bytesToCopy;
headOfQueue.byteLength -= bytesToCopy;
}
controller[q.queueTotalSize_] -= bytesToCopy;
readableByteStreamControllerFillHeadPullIntoDescriptor(
controller,
bytesToCopy,
pullIntoDescriptor
);
totalBytesToCopyRemaining -= bytesToCopy;
}
if (!ready) {
// Assert: controller[queueTotalSize_] === 0
// Assert: pullIntoDescriptor.bytesFilled > 0
// Assert: pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize
}
return ready;
}
export function readableByteStreamControllerGetDesiredSize(
controller: SDReadableByteStreamController
): number | null {
const stream = controller[controlledReadableByteStream_];
const state = stream[shared.state_];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
return controller[strategyHWM_] - controller[q.queueTotalSize_];
}
export function readableByteStreamControllerHandleQueueDrain(
controller: SDReadableByteStreamController
): void {
// Assert: controller.[[controlledReadableByteStream]].[[state]] is "readable".
if (controller[q.queueTotalSize_] === 0 && controller[closeRequested_]) {
readableByteStreamControllerClearAlgorithms(controller);
readableStreamClose(controller[controlledReadableByteStream_]);
} else {
readableByteStreamControllerCallPullIfNeeded(controller);
}
}
export function readableByteStreamControllerInvalidateBYOBRequest(
controller: SDReadableByteStreamController
): void {
const byobRequest = controller[byobRequest_];
if (byobRequest === undefined) {
return;
}
byobRequest[associatedReadableByteStreamController_] = undefined;
byobRequest[view_] = undefined;
controller[byobRequest_] = undefined;
}
export function readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller: SDReadableByteStreamController
): void {
// Assert: controller.[[closeRequested]] is false.
const pendingPullIntos = controller[pendingPullIntos_];
while (pendingPullIntos.length > 0) {
if (controller[q.queueTotalSize_] === 0) {
return;
}
const pullIntoDescriptor = pendingPullIntos[0];
if (
readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
pullIntoDescriptor
)
) {
readableByteStreamControllerShiftPendingPullInto(controller);
readableByteStreamControllerCommitPullIntoDescriptor(
controller[controlledReadableByteStream_],
pullIntoDescriptor
);
}
}
}
export function readableByteStreamControllerPullInto(
controller: SDReadableByteStreamController,
view: ArrayBufferView,
forAuthorCode: boolean
): Promise<IteratorResult<ArrayBufferView, any>> {
const stream = controller[controlledReadableByteStream_];
const elementSize = (view as Uint8Array).BYTES_PER_ELEMENT || 1; // DataView exposes this in Webkit as 1, is not present in FF or Blink
const ctor = view.constructor as Uint8ArrayConstructor; // the typecast here is just for TS typing, it does not influence buffer creation
const byteOffset = view.byteOffset;
const byteLength = view.byteLength;
const buffer = shared.transferArrayBuffer(view.buffer);
const pullIntoDescriptor: PullIntoDescriptor = {
buffer,
byteOffset,
byteLength,
bytesFilled: 0,
elementSize,
ctor,
readerType: "byob",
};
if (controller[pendingPullIntos_].length > 0) {
controller[pendingPullIntos_].push(pullIntoDescriptor);
return readableStreamAddReadIntoRequest(stream, forAuthorCode);
}
if (stream[shared.state_] === "closed") {
const emptyView = new ctor(
pullIntoDescriptor.buffer,
pullIntoDescriptor.byteOffset,
0
);
return Promise.resolve(
readableStreamCreateReadResult(emptyView, true, forAuthorCode)
);
}
if (controller[q.queueTotalSize_] > 0) {
if (
readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
pullIntoDescriptor
)
) {
const filledView = readableByteStreamControllerConvertPullIntoDescriptor(
pullIntoDescriptor
);
readableByteStreamControllerHandleQueueDrain(controller);
return Promise.resolve(
readableStreamCreateReadResult(filledView, false, forAuthorCode)
);
}
if (controller[closeRequested_]) {
const error = new TypeError();
readableByteStreamControllerError(controller, error);
return Promise.reject(error);
}
}
controller[pendingPullIntos_].push(pullIntoDescriptor);
const promise = readableStreamAddReadIntoRequest(stream, forAuthorCode);
readableByteStreamControllerCallPullIfNeeded(controller);
return promise;
}
export function readableByteStreamControllerRespond(
controller: SDReadableByteStreamController,
bytesWritten: number
): void {
bytesWritten = Number(bytesWritten);
if (!shared.isFiniteNonNegativeNumber(bytesWritten)) {
throw new RangeError("bytesWritten must be a finite, non-negative number");
}
// Assert: controller.[[pendingPullIntos]] is not empty.
readableByteStreamControllerRespondInternal(controller, bytesWritten);
}
export function readableByteStreamControllerRespondInClosedState(
controller: SDReadableByteStreamController,
firstDescriptor: PullIntoDescriptor
): void {
firstDescriptor.buffer = shared.transferArrayBuffer(firstDescriptor.buffer);
// Assert: firstDescriptor.[[bytesFilled]] is 0.
const stream = controller[controlledReadableByteStream_];
if (readableStreamHasBYOBReader(stream)) {
while (readableStreamGetNumReadIntoRequests(stream) > 0) {
const pullIntoDescriptor = readableByteStreamControllerShiftPendingPullInto(
controller
)!;
readableByteStreamControllerCommitPullIntoDescriptor(
stream,
pullIntoDescriptor
);
}
}
}
export function readableByteStreamControllerRespondInReadableState(
controller: SDReadableByteStreamController,
bytesWritten: number,
pullIntoDescriptor: PullIntoDescriptor
): void {
if (
pullIntoDescriptor.bytesFilled + bytesWritten >
pullIntoDescriptor.byteLength
) {
throw new RangeError();
}
readableByteStreamControllerFillHeadPullIntoDescriptor(
controller,
bytesWritten,
pullIntoDescriptor
);
if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) {
return;
}
readableByteStreamControllerShiftPendingPullInto(controller);
const remainderSize =
pullIntoDescriptor.bytesFilled % pullIntoDescriptor.elementSize;
if (remainderSize > 0) {
const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled;
const remainder = shared.cloneArrayBuffer(
pullIntoDescriptor.buffer,
end - remainderSize,
remainderSize,
ArrayBuffer
);
readableByteStreamControllerEnqueueChunkToQueue(
controller,
remainder,
0,
remainder.byteLength
);
}
pullIntoDescriptor.buffer = shared.transferArrayBuffer(
pullIntoDescriptor.buffer
);
pullIntoDescriptor.bytesFilled =
pullIntoDescriptor.bytesFilled - remainderSize;
readableByteStreamControllerCommitPullIntoDescriptor(
controller[controlledReadableByteStream_],
pullIntoDescriptor
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller);
}
export function readableByteStreamControllerRespondInternal(
controller: SDReadableByteStreamController,
bytesWritten: number
): void {
const firstDescriptor = controller[pendingPullIntos_][0];
const stream = controller[controlledReadableByteStream_];
if (stream[shared.state_] === "closed") {
if (bytesWritten !== 0) {
throw new TypeError();
}
readableByteStreamControllerRespondInClosedState(
controller,
firstDescriptor
);
} else {
// Assert: stream.[[state]] is "readable".
readableByteStreamControllerRespondInReadableState(
controller,
bytesWritten,
firstDescriptor
);
}
readableByteStreamControllerCallPullIfNeeded(controller);
}
export function readableByteStreamControllerRespondWithNewView(
controller: SDReadableByteStreamController,
view: ArrayBufferView
): void {
// Assert: controller.[[pendingPullIntos]] is not empty.
const firstDescriptor = controller[pendingPullIntos_][0];
if (
firstDescriptor.byteOffset + firstDescriptor.bytesFilled !==
view.byteOffset
) {
throw new RangeError();
}
if (firstDescriptor.byteLength !== view.byteLength) {
throw new RangeError();
}
firstDescriptor.buffer = view.buffer;
readableByteStreamControllerRespondInternal(controller, view.byteLength);
}
export function readableByteStreamControllerShiftPendingPullInto(
controller: SDReadableByteStreamController
): PullIntoDescriptor | undefined {
const descriptor = controller[pendingPullIntos_].shift();
readableByteStreamControllerInvalidateBYOBRequest(controller);
return descriptor;
}
export function readableByteStreamControllerShouldCallPull(
controller: SDReadableByteStreamController
): boolean {
// Let stream be controller.[[controlledReadableByteStream]].
const stream = controller[controlledReadableByteStream_];
if (stream[shared.state_] !== "readable") {
return false;
}
if (controller[closeRequested_]) {
return false;
}
if (!controller[started_]) {
return false;
}
if (
readableStreamHasDefaultReader(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
return true;
}
if (
readableStreamHasBYOBReader(stream) &&
readableStreamGetNumReadIntoRequests(stream) > 0
) {
return true;
}
const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
// Assert: desiredSize is not null.
return desiredSize! > 0;
}
export function setUpReadableStreamBYOBRequest(
request: SDReadableStreamBYOBRequest,
controller: SDReadableByteStreamController,
view: ArrayBufferView
): void {
if (!isReadableByteStreamController(controller)) {
throw new TypeError();
}
if (!ArrayBuffer.isView(view)) {
throw new TypeError();
}
// Assert: !IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
request[associatedReadableByteStreamController_] = controller;
request[view_] = view;
}