deno/cli/js/streams/readable-stream.ts

392 lines
12 KiB
TypeScript

// Forked from https://github.com/stardazed/sd-streams/tree/8928cf04b035fd02fb1340b7eb541c76be37e546
// Copyright (c) 2018-Present by Arthur Langereis - @zenmumbler MIT
/**
* streams/readable-stream - ReadableStream class implementation
* Part of Stardazed
* (c) 2018-Present by Arthur Langereis - @zenmumbler
* https://github.com/stardazed/sd-streams
*/
/* eslint prefer-const: "off" */
// TODO remove this, surpressed because of
// 284:7 error 'branch1' is never reassigned. Use 'const' instead prefer-const
import * as rs from "./readable-internals.ts";
import * as shared from "./shared-internals.ts";
import {
QueuingStrategy,
QueuingStrategySizeCallback,
UnderlyingSource,
UnderlyingByteSource
} from "../dom_types.ts";
import {
ReadableStreamDefaultController,
setUpReadableStreamDefaultControllerFromUnderlyingSource
} from "./readable-stream-default-controller.ts";
import { ReadableStreamDefaultReader } from "./readable-stream-default-reader.ts";
import {
ReadableByteStreamController,
setUpReadableByteStreamControllerFromUnderlyingSource
} from "./readable-byte-stream-controller.ts";
import { SDReadableStreamBYOBReader } from "./readable-stream-byob-reader.ts";
export class SDReadableStream<OutputType>
implements rs.SDReadableStream<OutputType> {
[shared.state_]: rs.ReadableStreamState;
[shared.storedError_]: shared.ErrorResult;
[rs.reader_]: rs.SDReadableStreamReader<OutputType> | undefined;
[rs.readableStreamController_]: rs.SDReadableStreamControllerBase<OutputType>;
constructor(
underlyingSource: UnderlyingByteSource,
strategy?: { highWaterMark?: number; size?: undefined }
);
constructor(
underlyingSource?: UnderlyingSource<OutputType>,
strategy?: QueuingStrategy<OutputType>
);
constructor(
underlyingSource: UnderlyingSource<OutputType> | UnderlyingByteSource = {},
strategy:
| QueuingStrategy<OutputType>
| { highWaterMark?: number; size?: undefined } = {}
) {
rs.initializeReadableStream(this);
const sizeFunc = strategy.size;
const stratHWM = strategy.highWaterMark;
const sourceType = underlyingSource.type;
if (sourceType === undefined) {
const sizeAlgorithm = shared.makeSizeAlgorithmFromSizeFunction(sizeFunc);
const highWaterMark = shared.validateAndNormalizeHighWaterMark(
stratHWM === undefined ? 1 : stratHWM
);
setUpReadableStreamDefaultControllerFromUnderlyingSource(
this,
underlyingSource as UnderlyingSource<OutputType>,
highWaterMark,
sizeAlgorithm
);
} else if (String(sourceType) === "bytes") {
if (sizeFunc !== undefined) {
throw new RangeError(
"bytes streams cannot have a strategy with a `size` field"
);
}
const highWaterMark = shared.validateAndNormalizeHighWaterMark(
stratHWM === undefined ? 0 : stratHWM
);
setUpReadableByteStreamControllerFromUnderlyingSource(
(this as unknown) as rs.SDReadableStream<ArrayBufferView>,
underlyingSource as UnderlyingByteSource,
highWaterMark
);
} else {
throw new RangeError(
"The underlying source's `type` field must be undefined or 'bytes'"
);
}
}
get locked(): boolean {
return rs.isReadableStreamLocked(this);
}
getReader(): rs.SDReadableStreamDefaultReader<OutputType>;
getReader(options: { mode?: "byob" }): rs.SDReadableStreamBYOBReader;
getReader(options?: {
mode?: "byob";
}):
| rs.SDReadableStreamDefaultReader<OutputType>
| rs.SDReadableStreamBYOBReader {
if (!rs.isReadableStream(this)) {
throw new TypeError();
}
if (options === undefined) {
options = {};
}
const { mode } = options;
if (mode === undefined) {
return new ReadableStreamDefaultReader(this);
} else if (String(mode) === "byob") {
return new SDReadableStreamBYOBReader(
(this as unknown) as rs.SDReadableStream<ArrayBufferView>
);
}
throw RangeError("mode option must be undefined or `byob`");
}
cancel(reason: shared.ErrorResult): Promise<void> {
if (!rs.isReadableStream(this)) {
return Promise.reject(new TypeError());
}
if (rs.isReadableStreamLocked(this)) {
return Promise.reject(new TypeError("Cannot cancel a locked stream"));
}
return rs.readableStreamCancel(this, reason);
}
tee(): Array<SDReadableStream<OutputType>> {
return readableStreamTee(this, false);
}
/* TODO reenable these methods when we bring in writableStreams and transport types
pipeThrough<ResultType>(
transform: rs.GenericTransformStream<OutputType, ResultType>,
options: PipeOptions = {}
): rs.SDReadableStream<ResultType> {
const { readable, writable } = transform;
if (!rs.isReadableStream(this)) {
throw new TypeError();
}
if (!ws.isWritableStream(writable)) {
throw new TypeError("writable must be a WritableStream");
}
if (!rs.isReadableStream(readable)) {
throw new TypeError("readable must be a ReadableStream");
}
if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
throw new TypeError("options.signal must be an AbortSignal instance");
}
if (rs.isReadableStreamLocked(this)) {
throw new TypeError("Cannot pipeThrough on a locked stream");
}
if (ws.isWritableStreamLocked(writable)) {
throw new TypeError("Cannot pipeThrough to a locked stream");
}
const pipeResult = pipeTo(this, writable, options);
pipeResult.catch(() => {});
return readable;
}
pipeTo(
dest: ws.WritableStream<OutputType>,
options: PipeOptions = {}
): Promise<void> {
if (!rs.isReadableStream(this)) {
return Promise.reject(new TypeError());
}
if (!ws.isWritableStream(dest)) {
return Promise.reject(
new TypeError("destination must be a WritableStream")
);
}
if (options.signal !== undefined && !shared.isAbortSignal(options.signal)) {
return Promise.reject(
new TypeError("options.signal must be an AbortSignal instance")
);
}
if (rs.isReadableStreamLocked(this)) {
return Promise.reject(new TypeError("Cannot pipe from a locked stream"));
}
if (ws.isWritableStreamLocked(dest)) {
return Promise.reject(new TypeError("Cannot pipe to a locked stream"));
}
return pipeTo(this, dest, options);
}
*/
}
export function createReadableStream<OutputType>(
startAlgorithm: rs.StartAlgorithm,
pullAlgorithm: rs.PullAlgorithm<OutputType>,
cancelAlgorithm: rs.CancelAlgorithm,
highWaterMark?: number,
sizeAlgorithm?: QueuingStrategySizeCallback<OutputType>
): SDReadableStream<OutputType> {
if (highWaterMark === undefined) {
highWaterMark = 1;
}
if (sizeAlgorithm === undefined) {
sizeAlgorithm = (): number => 1;
}
// Assert: ! IsNonNegativeNumber(highWaterMark) is true.
const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
OutputType
>;
rs.initializeReadableStream(stream);
const controller = Object.create(
ReadableStreamDefaultController.prototype
) as ReadableStreamDefaultController<OutputType>;
rs.setUpReadableStreamDefaultController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
sizeAlgorithm
);
return stream;
}
export function createReadableByteStream<OutputType>(
startAlgorithm: rs.StartAlgorithm,
pullAlgorithm: rs.PullAlgorithm<OutputType>,
cancelAlgorithm: rs.CancelAlgorithm,
highWaterMark?: number,
autoAllocateChunkSize?: number
): SDReadableStream<OutputType> {
if (highWaterMark === undefined) {
highWaterMark = 0;
}
// Assert: ! IsNonNegativeNumber(highWaterMark) is true.
if (autoAllocateChunkSize !== undefined) {
if (
!shared.isInteger(autoAllocateChunkSize) ||
autoAllocateChunkSize <= 0
) {
throw new RangeError(
"autoAllocateChunkSize must be a positive, finite integer"
);
}
}
const stream = Object.create(SDReadableStream.prototype) as SDReadableStream<
OutputType
>;
rs.initializeReadableStream(stream);
const controller = Object.create(
ReadableByteStreamController.prototype
) as ReadableByteStreamController;
rs.setUpReadableByteStreamController(
(stream as unknown) as SDReadableStream<ArrayBufferView>,
controller,
startAlgorithm,
(pullAlgorithm as unknown) as rs.PullAlgorithm<ArrayBufferView>,
cancelAlgorithm,
highWaterMark,
autoAllocateChunkSize
);
return stream;
}
export function readableStreamTee<OutputType>(
stream: SDReadableStream<OutputType>,
cloneForBranch2: boolean
): [SDReadableStream<OutputType>, SDReadableStream<OutputType>] {
if (!rs.isReadableStream(stream)) {
throw new TypeError();
}
const reader = new ReadableStreamDefaultReader(stream);
let closedOrErrored = false;
let canceled1 = false;
let canceled2 = false;
let reason1: shared.ErrorResult;
let reason2: shared.ErrorResult;
let branch1: SDReadableStream<OutputType>;
let branch2: SDReadableStream<OutputType>;
let cancelResolve: (reason: shared.ErrorResult) => void;
const cancelPromise = new Promise<void>(resolve => (cancelResolve = resolve));
const pullAlgorithm = (): Promise<void> => {
return rs
.readableStreamDefaultReaderRead(reader)
.then(({ value, done }) => {
if (done && !closedOrErrored) {
if (!canceled1) {
rs.readableStreamDefaultControllerClose(
branch1![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>
);
}
if (!canceled2) {
rs.readableStreamDefaultControllerClose(
branch2![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>
);
}
closedOrErrored = true;
}
if (closedOrErrored) {
return;
}
const value1 = value;
let value2 = value;
if (!canceled1) {
rs.readableStreamDefaultControllerEnqueue(
branch1![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
value1!
);
}
if (!canceled2) {
if (cloneForBranch2) {
value2 = shared.cloneValue(value2);
}
rs.readableStreamDefaultControllerEnqueue(
branch2![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
value2!
);
}
});
};
const cancel1Algorithm = (reason: shared.ErrorResult): Promise<void> => {
canceled1 = true;
reason1 = reason;
if (canceled2) {
const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
cancelResolve(cancelResult);
}
return cancelPromise;
};
const cancel2Algorithm = (reason: shared.ErrorResult): Promise<void> => {
canceled2 = true;
reason2 = reason;
if (canceled1) {
const cancelResult = rs.readableStreamCancel(stream, [reason1, reason2]);
cancelResolve(cancelResult);
}
return cancelPromise;
};
const startAlgorithm = (): undefined => undefined;
branch1 = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancel1Algorithm
);
branch2 = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancel2Algorithm
);
reader[rs.closedPromise_].promise.catch(error => {
if (!closedOrErrored) {
rs.readableStreamDefaultControllerError(
branch1![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
error
);
rs.readableStreamDefaultControllerError(
branch2![
rs.readableStreamController_
] as ReadableStreamDefaultController<OutputType>,
error
);
closedOrErrored = true;
}
});
return [branch1, branch2];
}