1. Introduction
This section is non-normative.
Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.
This standard provides the base stream primitives which other parts of the web platform can use to expose their streaming data. For example, [FETCH] could expose request bodies as a writable stream, or response bodies as a readable stream. More generally, the platform is full of streaming abstractions waiting to be expressed as streams: multimedia streams, file streams, interprocess communication, and more benefit from being able to process data incrementally instead of buffering it all into memory and processing it in one go. By providing the foundation for these streams to be exposed to developers, the Streams Standard enables use cases like:
- Video effects: piping a readable video stream through a transform stream that applies effects in real time.
- Decompression: piping a file stream through a transform stream that selectively decompresses files from a .tgz archive, turning them into
imgelements as the user scrolls through an image gallery. - Image decoding: piping an HTTP response stream through a transform stream that decodes bytes into bitmap data,
and then through another transform that translates bitmaps into PNGs. If installed inside the
fetchhook of a service worker [SERVICE-WORKERS], this would allow developers to transparently polyfill new image formats.
The APIs described here provide unifying abstraction for all such streams, encouraging an ecosystem to grow around these shared and composable interfaces. At the same time, they have been carefully designed to map efficiently to low-level I/O concerns, and to encapsulate the trickier issues (such as backpressure) that come along for the ride.
2. Model
A chunk is a single piece of data that is written to or read from a stream. It can be of any type; streams
can even contain chunks of different types. A chunk will often not be the most atomic unit of data for a given stream;
for example a byte stream might contain chunks consisting of 16 KiB Uint8Arrays, instead of single
bytes.
2.1. Readable Streams
A readable stream represents a source of data, from which you can read. In other words, data comes out of a readable stream.
Although a readable stream can be created with arbitrary behavior, most readable streams wrap a lower-level I/O source, called the underlying source. There are two types of underlying source: push sources and pull sources.
Push sources push data at you, whether or not you are listening for it. They may also provide a mechanism for pausing and resuming the flow of data. An example push source is a TCP socket, where data is constantly being pushed from the OS level, at a rate that can be controlled by changing the TCP window size.
Pull sources require you to request data from them. The data may be available synchronously, e.g. if it is held by the operating system’s in-memory buffers, or asynchronously, e.g. if it has to be read from disk. An example pull source is a file handle, where you seek to specific locations and read specific amounts.
Readable streams are designed to wrap both types of sources behind a single, unified interface.
Chunks are enqueued into the stream by the stream’s underlying source. They can then be read one at a time via the stream’s public interface.
Code that reads from a readable stream using its public interface is known as a consumer.
Consumers also have the ability to cancel a readable stream. This indicates that the consumer has lost interest in the stream, and will immediately close the stream, throw away any queued chunks, and execute any cancellation mechanism of the underlying source.
Consumers can also tee a readable stream. This will lock the stream, making it no longer directly usable; however, it will create two new streams, called branches, which can be consumed independently.
For streams representing bytes, an extended version of the readable stream is provided to handle bytes efficiently, in particular by minimizing copies. The underlying source for such a readable stream is called a underlying byte source. A readable stream whose underlying source is an underlying byte source is sometimes called a readable byte stream.
2.2. Writable Streams
A writable stream represents a destination for data, into which you can write. In other words, data goes in to a writable stream.
Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the underlying sink. Writable streams work to abstract away some of the complexity of the underlying sink, by queuing subsequent writes and only delivering them to the underlying sink one by one.
Chunks are written to the stream via its public interface, and are passed one at a time to the stream’s underlying sink.
Code that writes into a writable stream using its public interface is known as a producer.
Producers also have the ability to abort a writable stream. This indicates that the producer believes something has gone wrong, and that future writes should be discontinued. It puts the stream in an errored state, even without a signal from the underlying sink.
2.3. Transform Streams
A transform stream consists of a pair of streams: a writable stream, and a readable stream. In a manner specific to the transform stream in question, writes to the writable side result in new data being made available for reading from the readable side.
Some examples of transform streams include:
- A GZIP compressor, to which uncompressed bytes are written and from which compressed bytes are read;
- A video decoder, to which encoded bytes are written and from which uncompressed video frames are read;
- A text decoder, to which bytes are written and from which strings are read;
- A CSV-to-JSON converter, to which strings representing lines of a CSV file are written and from which corresponding JavaScript objects are read.
2.4. Pipe Chains and Backpressure
Streams are primarily used by piping them to each other. A readable stream can be piped directly to a writable stream, or it can be piped through one or more transform streams first.
A set of streams piped together in this way is referred to as a pipe chain. In a pipe chain, the original source is the underlying source of the first readable stream in the chain; the ultimate sink is the underlying sink of the final writable stream in the chain.
Once a pipe chain is constructed, it can be used to propagate signals regarding how fast chunks should flow through it. If any step in the chain cannot yet accept chunks, it propagates a signal backwards through the pipe chain, until eventually the original source is told to stop producing chunks so fast. This process of normalizing flow from the original source according to how fast the chain can process chunks is called backpressure.
When teeing a readable stream, the backpressure signals from its two branches will aggregate, such that if neither branch is read from, a backpressure signal will be sent to the underlying source of the original stream.
2.5. Internal Queues and Queuing Strategies
Both readable and writable streams maintain internal queues, which they use for similar purposes. In the case of a readable stream, the internal queue contains chunks that have been enqueued by the underlying source, but not yet read by the consumer. In the case of a writable stream, the internal queue contains chunks which have been written to the stream by the producer, but not yet processed and acknowledged by the underlying sink.
A queuing strategy is an object that determines how a stream should signal backpressure based on the state of its internal queue. The queuing strategy assigns a size to each chunk, and compares the total size of all chunks in the queue to a specified number, known as the high water mark. The resulting difference, high water mark minus total size, is used to determine the desired size to fill the stream’s queue.
For readable streams, an underlying source can use this desired size as a backpressure signal, slowing down chunk generation so as to try to keep the desired size above or at zero. For writable streams, a producer can behave similarly, avoiding writes that would cause the desired size to go negative.
2.6. Locking
A readable stream reader, or simply reader, is an object that allows direct reading of chunks from a readable stream. Without a reader, a consumer can only perform high-level operations on the readable stream: canceling the stream, or piping the readable stream to a writable stream.
Similarly, a writable stream writer, or simply writer, is an object that allows direct writing of chunks to a writable stream. Without a writer, a producer can only perform the high-level operations of aborting the stream or piping a readable stream to the writable stream.
(Under the covers, these high-level operations actually use a reader or writer themselves.)
A given readable or writable stream only has at most one reader or writer at a time. We say in this case the stream is locked, and that the reader or writer is active.
A reader or writer also has the capability to release its lock, which makes it no longer active, and allows further readers or writers to be acquired.
A readable byte stream has the ability to vend two types of readers: default readers and BYOB readers. BYOB ("bring your own buffer") readers allow reading into a developer-supplied buffer, thus minimizing copies.
3. Readable Streams
3.1. Using Readable Streams
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
readableStream.pipeTo(new WritableStream({
write(chunk) {
console.log("Chunk received", chunk);
},
close() {
console.log("All data successfully read!");
},
abort(e) {
console.error("Something went wrong!", e);
}
}));
By returning promises from your write implementation, you can signal backpressure to the readable
stream.
read() method to get successive
chunks. For example, this code logs the next chunk in the stream, if available:
const reader = readableStream.getReader();
reader.read().then(
({ value, done }) => {
if (done) {
console.log("The stream was already closed!");
} else {
console.log(value);
}
},
e => console.error("The stream became errored and cannot be read from!", e)
);
This more manual method of reading a stream is mainly useful for library authors building new high-level operations on streams, beyond the provided ones of piping and teeing.
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1024);
readInto(startingAB)
.then(buffer => console.log("The first 1024 bytes:", buffer))
.catch(e => console.error("Something went wrong!", e));
function readInto(buffer, offset = 0) {
if (offset === buffer.byteLength) {
return Promise.resolve(buffer);
}
const view = new Uint8Array(buffer, offset, buffer.byteLength - offset);
return reader.read(view).then(newView => {
return readInto(newView.buffer, offset + newView.byteLength);
});
}
An important thing to note here is that the final buffer value is different from the startingAB, but it (and all intermediate buffers) shares the same backing memory allocation. At each
step, the buffer is transferred to a new ArrayBuffer object. The newView is a new Uint8Array, with that ArrayBuffer object as its buffer property,
the offset that bytes were written to as its byteOffset property, and the number of bytes that were
written as its byteLength property.
3.2. Class ReadableStream
The ReadableStream class is a concrete instance of the general readable stream concept. It is
adaptable to any chunk type, and maintains an internal queue to keep track of data supplied by the underlying
source but not yet read by any consumer.
3.2.1. Class Definition
This section is non-normative.
If one were to write the ReadableStream class in something close to the syntax of [ECMASCRIPT], it would look
like
class ReadableStream {
constructor(underlyingSource = {}, { size, highWaterMark } = {})
get locked()
cancel(reason)
getReader()
pipeThrough({ writable, readable }, options)
pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
tee()
}
3.2.2. Internal Slots
Instances of ReadableStream are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[disturbed]] | A boolean flag set to |
| [[readableStreamController]] | A ReadableStreamDefaultController or ReadableByteStreamController created with the ability to control
the state and queue of this stream; also used for the |
| [[reader]] | A ReadableStreamDefaultReader or ReadableStreamBYOBReader instance, if the stream is locked to a
reader, or |
| [[state]] | A string containing the stream’s current state, used internally; one of "readable", "closed", or "errored"
|
| [[storedError]] | A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate on an errored stream |
3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark } = {})
underlyingSource object passed to the constructor can implement any of the following methods to
govern how the constructed stream instance behaves:
-
start(controller)is called immediately, and is typically used to adapt a push source by setting up relevant event listeners, or to acquire access to a pull source. If this process is asynchronous, it can return a promise to signal success or failure. -
pull(controller)is called when the stream’s internal queue of chunks is not full, and will be called repeatedly until the queue reaches its high water mark. Ifpullreturns a promise, thenpullwill not be called again until that promise fulfills; if the promise rejects, the stream will become errored. -
cancel(reason)is called when the consumer signals that they are no longer interested in the stream. It should perform any actions necessary to release access to the underlying source. If this process is asynchronous, it can return a promise to signal success or failure.
Both start and pull are given the ability to manipulate the stream’s internal queue and
state via the passed controller object. This is an example of the revealing constructor pattern.
If the underlyingSource object contains a property type set to "bytes", this readable stream is a readable byte stream, and can successfully vend BYOB readers. In that case,
the passed controller object will be an instance of ReadableByteStreamController. Otherwise, it will
be an instance of ReadableStreamDefaultController.
For readable byte streams, underlyingSource can also contain a property autoAllocateChunkSize, which can be set to a positive integer to enable the auto-allocation feature for
this stream. In that case, when a consumer uses a default reader, the stream implementation will
automatically allocate an ArrayBuffer of the given size, and call the underlying source code
as if the consumer was using a BYOB reader. This can cut down on the amount of code needed when writing
the underlying source implementation, as can be seen by comparing §8.3 A readable byte stream with an underlying push source (no backpressure support) without auto-allocation
to §8.5 A readable byte stream with an underlying pull source with auto-allocation.
The constructor also accepts a second argument containing the queuing strategy object with
two properties: a non-negative number highWaterMark, and a function size(chunk). The
supplied strategy could be an instance of the built-in CountQueuingStrategy or ByteLengthQueuingStrategy classes, or it could be custom. If no strategy is supplied, the default
behavior will be the same as a CountQueuingStrategy with a high water mark of 1.
- Set
this .[[state]] to"readable". - Set
this .[[reader]] andthis .[[storedError]] toundefined . - Set
this .[[disturbed]] tofalse . - Set
this .[[readableStreamController]] toundefined . - Let type be ?
GetV (underlyingSource,"type"). - Let typeString be ?
ToString (type). - If typeString is
"bytes",- If highWaterMark is
undefined , let highWaterMark be0 . - Set
this .[[readableStreamController]] to ?Construct (ReadableByteStreamController, «this , underlyingSource, highWaterMark »).
- If highWaterMark is
- Otherwise, if type is
undefined ,- If highWaterMark is
undefined , let highWaterMark be1 . - Set
this .[[readableStreamController]] to ?Construct (ReadableStreamDefaultController, «this , underlyingSource, size, highWaterMark »).
- If highWaterMark is
- Otherwise, throw a
RangeError exception.
3.2.4. Properties of the ReadableStream Prototype
3.2.4.1. get locked
locked getter returns whether or not the readable stream is locked to a reader. - If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - Return !
IsReadableStreamLocked (this ).
3.2.4.2. cancel(reason)
cancel method cancels the stream, signaling a loss of interest
in the stream by a consumer. The supplied reason argument will be given to the underlying source, which
may or may not use it. - If !
IsReadableStream (this ) isfalse , return a promise rejected with aTypeError exception. - If !
IsReadableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. - Return !
ReadableStreamCancel (this , reason).
3.2.4.3. getReader({ mode } = {})
getReader method creates a reader of the type specified by the mode option and locks the stream to the new reader. While the stream is locked, no other reader can be
acquired until this one is released.
This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel the stream, which would interfere with your abstraction.
When mode is ReadableStreamDefaultReader). The reader provides the ability to directly read individual chunks from the
stream via the reader’s read() method.
When mode is "byob", the getReader method creates a BYOB reader (an
instance of ReadableStreamBYOBReader). This feature only works on readable byte streams, i.e. streams which
were constructed specifically with the ability to handle "bring your own buffer" reading. The reader provides the
ability to directly read individual chunks from the stream via the reader’s read() method, into developer-supplied buffers, allowing more precise control over allocation.
- If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - If mode is
"byob", return ?AcquireReadableStreamBYOBReader (this ). - If mode is
undefined , return ?AcquireReadableStreamDefaultReader (this ). - Throw a
RangeError exception.
function readAllChunks(readableStream) {
const reader = readableStream.getReader();
const chunks = [];
return pump();
function pump() {
return reader.read().then(({ value, done }) => {
if (done) {
return chunks;
}
chunks.push(value);
return pump();
});
}
}
Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures that no other consumer can interfere with the stream, either by reading chunks or by canceling the stream.
3.2.4.4. pipeThrough({ writable, readable }, options)
pipeThrough method provides a convenient, chainable way of piping this readable stream through a transform stream (or any other { writable, readable } pair). It simply pipes the stream
into the writable side of the supplied pair, and returns the readable side for further use.
Piping a stream will generally lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
This method is intentionally generic; it does not require that its ReadableStream object. It also does not require that its writable argument be a WritableStream instance, or that its readable argument be a ReadableStream instance.
- Perform ?
Invoke (this ,"pipeTo", « writable, options »). - Return readable.
pipeThrough(transform, options) would
look like
httpResponseBody
.pipeThrough(decompressorTransform)
.pipeThrough(ignoreNonImageFilesTransform)
.pipeTo(mediaGallery);
3.2.4.5. pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
pipeTo method pipes this readable stream to a given writable
stream. The way in which the piping process behaves under various error conditions can be customized with a
number of passed options. It returns a promise that fulfills when the piping process completes successfully, or
rejects if any errors were encountered.
Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
Errors and closures of the source and destination streams propagate as follows:
-
An error in the source readable stream will abort the destination writable stream, unless
preventAbortis truthy. The returned promise will be rejected with the source’s error, or with any error that occurs during aborting the destination. -
An error in the destination writable stream will cancel the source readable stream, unless
preventCancelis truthy. The returned promise will be rejected with the destination’s error, or with any error that occurs during canceling the source. -
When the source readable stream closes, the destination writable stream will be closed, unless
preventCloseis true. The returned promise will be fulfilled once this process completes, unless an error is encountered while closing the destination, in which case it will be rejected with that error. -
If the destination writable stream starts out closed or closing, the source readable stream will be canceled, unless
preventCancelis true. The returned promise will be rejected with an error indicating piping to a closed stream failed, or with any error that occurs during canceling the source.
- If !
IsReadableStream (this ) isfalse , return a promise rejected with aTypeError exception. - If !
IsWritableStream (dest) isfalse , return a promise rejected with aTypeError exception. - Set preventClose to !
ToBoolean (preventClose), set preventAbort to !ToBoolean (preventAbort), and set preventCancel to !ToBoolean (preventCancel). - If !
IsReadableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. - If !
IsWritableStreamLocked (dest) istrue , return a promise rejected with aTypeError exception. - If !
IsReadableByteStreamController (this .[[readableStreamController]]) istrue , let reader be either !AcquireReadableStreamBYOBReader (this ) or !AcquireReadableStreamDefaultReader (this ), at the user agent’s discretion. - Otherwise, let reader be !
AcquireReadableStreamDefaultReader (this ). - Let writer be !
AcquireWritableStreamDefaultWriter (dest). - Let shuttingDown be
false . - Let promise be a new promise.
- In parallel, using reader and writer, read all chunks from
this and write them to dest. Due to the locking provided by the reader and writer, the exact manner in which this happens is not observable to author code, and so there is flexibility in how this is done. The following constraints apply regardless of the exact algorithm used:- Public API must not be used: while reading or writing, or performing any of the operations below, the JavaScript-modifiable reader, writer, and stream APIs (i.e. methods on the appropriate prototypes) must not be used. Instead, the streams must be manipulated directly.
- Backpressure must be enforced:
- While
WritableStreamDefaultWriterGetDesiredSize (writer) is ≤0 or isnull , the user agent must not read from reader. - If reader is a BYOB reader,
WritableStreamDefaultWriterGetDesiredSize (writer) should be used to determine the size of the chunks read from reader. - Otherwise,
WritableStreamDefaultWriterGetDesiredSize (writer) may be used to determine the flow rate heuristically, e.g. by delaying reads while it is judged to be "low" compared to the size of chunks that have been typically read.
- While
- Shutdown must stop all activity: if shuttingDown becomes
true , the user agent must not initiate further reads from reader or writes to writer. (Ongoing reads and writes may finish.) In particular, the user agent must check the below conditions onthis .[[state]] and dest.[[state]] before performing any reads or writes, since they might lead to immediate shutdown. - Errors must be propagated forward: if
this .[[state]] is or becomes"errored", then- If preventAbort is
false , shutdown with an action of !WritableStreamAbort (dest,this .[[storedError]]) and withthis .[[storedError]]. - Otherwise, shutdown with
this .[[storedError]].
- If preventAbort is
- Errors must be propagated backward: if dest.[[state]] is or becomes
"errored", then- If preventCancel is
false , shutdown with an action of !ReadableStreamCancel (this , dest.[[storedError]]) and with dest.[[storedError]]. - Otherwise, shutdown with dest.[[storedError]].
- If preventCancel is
- Closing must be propagated forward: if
this .[[state]] is or becomes"closed", then- If preventClose is
false , shutdown with an action of !WritableStreamDefaultWriterCloseWithErrorPropagation (writer). - Otherwise, shutdown.
- If preventClose is
- Closing must be propagated backward: if dest.[[state]] is
"closing"or"closed", then- Let destClosed be a new
TypeError . - If preventCancel is
false , shutdown with an action of !ReadableStreamCancel (this , destClosed) and with destClosed. - Otherwise, shutdown with destClosed.
- Let destClosed be a new
- Shutdown with an action: if any of the above requirements ask to
shutdown with an action action, optionally with an error originalError, then:
- If shuttingDown is
true , abort these substeps. - Set shuttingDown to
true . - Wait until any ongoing write finishes (i.e. the corresponding promises settle).
- Let p be the result of performing action.
- Upon fulfillment of p, finalize, passing along originalError if it was given.
- Upon rejection of p with reason newError, finalize with newError.
- If shuttingDown is
- Shutdown: if any of the above requirements or steps ask to shutdown, optionally
with an error error, then:
- If shuttingDown is
true , abort these substeps. - Set shuttingDown to
true . - Wait until any ongoing write finishes (i.e. the corresponding promises settle).
- Finalize, passing along error if it was given.
- If shuttingDown is
- Finalize: both forms of shutdown will eventually ask to finalize, optionally with
an error error, which means to perform the following steps:
- Perform !
WritableStreamDefaultWriterRelease (writer). - Perform !
ReadableStreamReaderGenericRelease (reader). - If error was given, reject promise with error.
- Otherwise, resolve promise with
undefined .
- Perform !
- Return promise.
3.2.4.6. tee()
tee method tees this readable stream, returning a two-element
array containing the two resulting branches as new ReadableStream instances.
Teeing a stream will lock it, preventing any other consumer from acquiring a reader. To cancel the stream, cancel both of the resulting branches; a composite cancellation reason will then be propagated to the stream’s underlying source.
Note that the chunks seen in each branch will be the same object. If the chunks are not immutable, this could
allow interference between the two branches. (Let us know if you think we should add an option to tee that creates structured
clones of the chunks for each branch.)
- If !
IsReadableStream (this ) isfalse , throw aTypeError exception. - Let branches be ?
ReadableStreamTee (this ,false ). - Return !
CreateArrayFromList (branches).
cacheEntry representing an
on-disk file, and another writable stream httpRequestBody representing an upload to a remote server,
you could pipe the same readable stream to both destinations at once:
const [forLocal, forRemote] = readableStream.tee();
Promise.all([
forLocal.pipeTo(cacheEntry),
forRemote.pipeTo(httpRequestBody)
])
.then(() => console.log("Saved the stream to the cache and also uploaded it!"))
.catch(e => console.error("Either caching or uploading failed: ", e));
3.3. General Readable Stream Abstract Operations
The following abstract operations, unlike most in this specification, are meant to be generally useful by other specifications, instead of just being part of the implementation of this spec’s classes.
3.3.1. AcquireReadableStreamBYOBReader ( stream ) throws
This abstract operation is meant to be called from other specifications that may wish to acquire a BYOB reader for a given stream.
- Return ?
Construct (ReadableStreamBYOBReader, « stream »).
3.3.2. AcquireReadableStreamDefaultReader ( stream ) throws
This abstract operation is meant to be called from other specifications that may wish to acquire a default reader for a given stream.
- Return ?
Construct (ReadableStreamDefaultReader, « stream »).
3.3.3. IsReadableStream ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[readableStreamController]] internal slot, return
false . - Return
true .
3.3.4. IsReadableStreamDisturbed ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream has ever been read from or canceled.
- Assert: !
IsReadableStream (stream) istrue . - Return stream.[[disturbed]].
3.3.5. IsReadableStreamLocked ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream is locked to a reader.
- Assert: !
IsReadableStream (stream) istrue . - If stream.[[reader]] is
undefined , returnfalse . - Return
true .
3.3.6. ReadableStreamTee ( stream, cloneForBranch2 ) throws
This abstract operation is meant to be called from other specifications that may wish to tee a given readable stream.
The second argument, cloneForBranch2, governs whether or not the data from the original stream will be structured cloned before appearing in the second of the returned branches. This is useful for scenarios where both branches are to be consumed in such a way that they might otherwise interfere with each other, such as by transfering their chunks. However, it does introduce a noticable asymmetry between the two branches. [HTML]
- Assert: !
IsReadableStream (stream) istrue . - Assert:
Type (cloneForBranch2) is Boolean. - Let reader be ?
AcquireReadableStreamDefaultReader (stream). - Let teeState be
Record {[[closedOrErrored]]:false , [[canceled1]]:false , [[canceled2]]:false , [[reason1]]:undefined , [[reason2]]:undefined , [[promise]]: a new promise}. - Let pull be a new ReadableStreamTee pull function.
- Set pull.[[reader]] to reader, pull.[[teeState]] to teeState, and pull.[[cloneForBranch2]] to cloneForBranch2.
- Let cancel1 be a new ReadableStreamTee branch 1 cancel function.
- Set cancel1.[[stream]] to stream and cancel1.[[teeState]] to teeState.
- Let cancel2 be a new ReadableStreamTee branch 2 cancel function.
- Set cancel2.[[stream]] to stream and cancel2.[[teeState]] to teeState.
- Let underlyingSource1 be !
ObjectCreate (%ObjectPrototype% ). - Perform !
CreateDataProperty (underlyingSource1,"pull", pull). - Perform !
CreateDataProperty (underlyingSource1,"cancel", cancel1). - Let branch1Stream be !
Construct (ReadableStream, underlyingSource1). - Let underlyingSource2 be !
ObjectCreate (%ObjectPrototype% ). - Perform !
CreateDataProperty (underlyingSource2,"pull", pull). - Perform !
CreateDataProperty (underlyingSource2,"cancel", cancel2). - Let branch2Stream be !
Construct (ReadableStream, underlyingSource2). - Set pull.[[branch1]] to branch1Stream.[[readableStreamController]].
- Set pull.[[branch2]] to branch2Stream.[[readableStreamController]].
- Upon rejection of reader.[[closedPromise]] with reason r,
- If teeState.[[closedOrErrored]] is
false , then:- Perform !
ReadableStreamDefaultControllerError (pull.[[branch1]], r). - Perform !
ReadableStreamDefaultControllerError (pull.[[branch2]], r). - Set teeState.[[closedOrErrored]] to
true .
- Perform !
- If teeState.[[closedOrErrored]] is
- Return « branch1Stream, branch2Stream ».
A ReadableStreamTee pull function is an anonymous built-in function that pulls data from a given readable
stream reader and enqueues it into two other streams ("branches" of the associated tee). Each
- Let reader be F.[[reader]], branch1 be F.[[branch1]], branch2 be F.[[branch2]], teeState be F.[[teeState]], and cloneForBranch2 be F.[[cloneForBranch2]].
- Return the result of transforming !
ReadableStreamDefaultReaderRead (reader) by a fulfillment handler which takes the argument result and performs the following steps:- Assert:
Type (result) is Object. - Let value be ?
Get (result,"value"). - Let done be ?
Get (result,"done"). - Assert:
Type (done) is Boolean. - If done is
true and teeState.[[closedOrErrored]] isfalse ,- If teeState.[[canceled1]] is
false ,- Perform !
ReadableStreamDefaultControllerClose (branch1).
- Perform !
- If teeState.[[canceled2]] is
false ,- Perform !
ReadableStreamDefaultControllerClose (branch2).
- Perform !
- Set teeState.[[closedOrErrored]] to
true .
- If teeState.[[canceled1]] is
- If teeState.[[closedOrErrored]] is
true , return. - Let value1 and value2 be value.
- If teeState.[[canceled2]] is
false and cloneForBranch2 istrue , set value2 to ? StructuredClone(value2). - If teeState.[[canceled1]] is
false , perform ?ReadableStreamDefaultControllerEnqueue (branch1, value1). - If teeState.[[canceled2]] is
false , perform ?ReadableStreamDefaultControllerEnqueue (branch2, value2).
- Assert:
A ReadableStreamTee branch 1 cancel function is an anonymous built-in function that reacts to the
cancellation of the first of the two branches of the associated tee. Each
- Let stream be F.[[stream]] and teeState be F.[[teeState]].
- Set teeState.[[canceled1]] to
true . - Set teeState.[[reason1]] to reason.
- If teeState.[[canceled2]] is
true ,- Let compositeReason be !
CreateArrayFromList (« teeState.[[reason1]], teeState.[[reason2]] »). - Let cancelResult be !
ReadableStreamCancel (stream, compositeReason). - Resolve teeState.[[promise]] with cancelResult.
- Let compositeReason be !
- Return teeState.[[promise]].
A ReadableStreamTee branch 2 cancel function is an anonymous built-in function that reacts to the
cancellation of the second of the two branches of the associated tee. Each
- Let stream be F.[[stream]] and teeState be F.[[teeState]].
- Set teeState.[[canceled2]] to
true . - Set teeState.[[reason2]] to reason.
- If teeState.[[canceled1]] is
true ,- Let compositeReason be !
CreateArrayFromList (« teeState.[[reason1]], teeState.[[reason2]] »). - Let cancelResult be !
ReadableStreamCancel (stream, compositeReason). - Resolve teeState.[[promise]] with cancelResult.
- Let compositeReason be !
- Return teeState.[[promise]].
3.4. Readable Stream Abstract Operations Used by Controllers
In terms of specification factoring, the way that the ReadableStream class encapsulates the behavior of
both simple readable streams and readable byte streams into a single class is by centralizing most of the
potentially-varying logic inside the two controller classes, ReadableStreamDefaultController and ReadableByteStreamController. Those classes define most of the stateful internal slots and abstract
operations for how a stream’s internal queue is managed and how it interfaces with its underlying source or underlying byte source.
The abstract operations in this section are interfaces that are used by the controller implementations to affect their
associated ReadableStream object, translating those internal state changes into developer-facing results
visible through the ReadableStream's public API.
3.4.1. ReadableStreamAddReadIntoRequest ( stream ) nothrow
- Assert: !
IsReadableStreamBYOBReader (stream.[[reader]]) istrue . - Assert: stream.[[state]] is
"readable"or"closed". - Let promise be a new promise.
- Let readIntoRequest be
Record {[[promise]]: promise}. - Append readIntoRequest as the last element of stream.[[reader]].[[readIntoRequests]].
- Return promise.
3.4.2. ReadableStreamAddReadRequest ( stream ) nothrow
- Assert: !
IsReadableStreamDefaultReader (stream.[[reader]]) istrue . - Assert: stream.[[state]] is
"readable". - Let promise be a new promise.
- Let readRequest be
Record {[[promise]]: promise}. - Append readRequest as the last element of stream.[[reader]].[[readRequests]].
- Return promise.
3.4.3. ReadableStreamCancel ( stream, reason ) nothrow
- Set stream.[[disturbed]] to
true . - If stream.[[state]] is
"closed", return a promise resolved withundefined . - If stream.[[state]] is
"errored", return a promise rejected with stream.[[storedError]]. - Perform !
ReadableStreamClose (stream). - Let sourceCancelPromise be ! stream.[[readableStreamController]].[[Cancel]](reason).
- Return the result of transforming sourceCancelPromise by a fulfillment handler that returns
undefined .
3.4.4. ReadableStreamClose ( stream ) nothrow
- Assert: stream.[[state]] is
"readable". - Set stream.[[state]] to
"closed". - Let reader be stream.[[reader]].
- If reader is
undefined , return. - If !
IsReadableStreamDefaultReader (reader) istrue ,- Repeat for each readRequest that is an element of reader.[[readRequests]],
- Resolve readRequest.[[promise]] with !
CreateIterResultObject (undefined ,true ).
- Resolve readRequest.[[promise]] with !
- Set reader.[[readRequests]] to an empty
List .
- Repeat for each readRequest that is an element of reader.[[readRequests]],
- Resolve reader.[[closedPromise]] with
undefined .
"closed", but stream.[[closeRequested]] is cancel(reason). In this case we allow the
controller’s close method to be called and silently do nothing, since the cancelation was outside the
control of the underlying source. 3.4.5. ReadableStreamError ( stream, e ) nothrow
- Assert: !
IsReadableStream (stream) istrue . - Assert: stream.[[state]] is
"readable". - Set stream.[[state]] to
"errored". - Set stream.[[storedError]] to e.
- Let reader be stream.[[reader]].
- If reader is
undefined , return. - If !
IsReadableStreamDefaultReader (reader) istrue , - Otherwise,
- Assert: !
IsReadableStreamBYOBReader (reader). - Repeat for each readIntoRequest that is an element of reader.[[readIntoRequests]],
- Reject readIntoRequest.[[promise]] with e.
- Set reader.[[readIntoRequests]] to a new empty
List .
- Assert: !
- Reject reader.[[closedPromise]] with e.
- Set reader.[[closedPromise]].[[PromiseIsHandled]] to
true .
3.4.6. ReadableStreamFulfillReadIntoRequest ( stream, chunk, done ) nothrow
- Let reader be stream.[[reader]].
- Let readIntoRequest be the first element of reader.[[readIntoRequests]].
- Remove readIntoRequest from reader.[[readIntoRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
- Resolve readIntoRequest.[[promise]] with !
CreateIterResultObject (chunk, done).
3.4.7. ReadableStreamFulfillReadRequest ( stream, chunk, done ) nothrow
- Let reader be stream.[[reader]].
- Let readRequest be the first element of reader.[[readRequests]].
- Remove readRequest from reader.[[readRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
- Resolve readRequest.[[promise]] with !
CreateIterResultObject (chunk, done).
3.4.8. ReadableStreamGetNumReadIntoRequests ( stream ) nothrow
- Return the number of elements in stream.[[reader]].[[readIntoRequests]].
3.4.9. ReadableStreamGetNumReadRequests ( stream ) nothrow
- Return the number of elements in stream.[[reader]].[[readRequests]].
3.4.10. ReadableStreamHasBYOBReader ( stream ) nothrow
- Let reader be stream.[[reader]].
- If reader is
undefined , returnfalse . - If !
IsReadableStreamBYOBReader (reader) isfalse , returnfalse . - Return
true .
3.4.11. ReadableStreamHasDefaultReader ( stream ) nothrow
- Let reader be stream.[[reader]].
- If reader is
undefined , returnfalse . - If !
IsReadableStreamDefaultReader (reader) isfalse , returnfalse . - Return
true .
3.5. Class ReadableStreamDefaultReader
The ReadableStreamDefaultReader class represents a default reader designed to be vended by a ReadableStream instance.
3.5.1. Class Definition
This section is non-normative.
If one were to write the ReadableStreamDefaultReader class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableStreamDefaultReader {
constructor(stream)
get closed()
cancel(reason)
read()
releaseLock()
}
3.5.2. Internal Slots
Instances of ReadableStreamDefaultReader are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[closedPromise]] | A promise returned by the reader’s closed getter
|
| [[ownerReadableStream]] | A ReadableStream instance that owns this reader
|
| [[readRequests]] | A read() method that have
not yet been resolved, due to the consumer requesting chunks sooner than they are available; also
used for the IsReadableStreamDefaultReader brand check
|
3.5.3. new ReadableStreamDefaultReader(stream)
ReadableStreamDefaultReader constructor is generally not meant to be used directly; instead, a
stream’s getReader() method should be used. - If !
IsReadableStream (stream) isfalse , throw aTypeError exception. - If !
IsReadableStreamLocked (stream) istrue , throw aTypeError exception. - Perform !
ReadableStreamReaderGenericInitialize (this , stream). - Set
this .[[readRequests]] to a new emptyList .
3.5.4. Properties of the ReadableStreamDefaultReader Prototype
3.5.4.1. get closed
closed getter returns a promise that will be fulfilled when the stream becomes closed or the
reader’s lock is released, or rejected if the stream ever errors. - If !
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[closedPromise]].
3.5.4.2. cancel(reason)
cancel method behaves the same as that for the
associated stream. - If !
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
ReadableStreamReaderGenericCancel (this , reason).
3.5.4.3. read()
read method will return a promise that allows access to the next chunk from the stream’s
internal queue, if available.
- If the chunk does become available, the promise will be fulfilled with an object of the form
{ value: theChunk, done: false }. - If the stream becomes closed, the promise will be fulfilled with an object of the form
{ value: undefined, done: true }. - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source.
- If !
IsReadableStreamDefaultReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
ReadableStreamDefaultReaderRead (this ).
3.5.4.4. releaseLock()
releaseLock method releases the reader’s lock on the corresponding
stream. After the lock is released, the reader is no longer active. If the associated
stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise,
the reader will appear closed.
A reader’s lock cannot be released while it still has a pending read request, i.e., if a promise returned by the
reader’s read() method has not yet been settled. Attempting to do so will throw
a
- If !
IsReadableStreamDefaultReader (this ) isfalse , throw aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return. - If
this .[[readRequests]] is not empty, throw aTypeError exception. - Perform !
ReadableStreamReaderGenericRelease (this ).
3.6. Class ReadableStreamBYOBReader
The ReadableStreamBYOBReader class represents a BYOB reader designed to be vended by a ReadableStream instance.
3.6.1. Class Definition
This section is non-normative.
If one were to write the ReadableStreamBYOBReader class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableStreamBYOBReader {
constructor(stream)
get closed()
cancel(reason)
read(view)
releaseLock()
}
3.6.2. Internal Slots
Instances of ReadableStreamBYOBReader are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[closedPromise]] | A promise returned by the reader’s closed getter
|
| [[ownerReadableStream]] | A ReadableStream instance that owns this reader
|
| [[readIntoRequests]] | A read(view) method that have
not yet been resolved, due to the consumer requesting chunks sooner than they are available; also
used for the IsReadableStreamBYOBReader brand check
|
3.6.3. new ReadableStreamBYOBReader(stream)
ReadableStreamBYOBReader constructor is generally not meant to be used directly; instead, a stream’s getReader() method should be used. - If !
IsReadableStream (stream) isfalse , throw aTypeError exception. - If !
IsReadableByteStreamController (stream.[[readableStreamController]]) isfalse , throw aTypeError exception. - If !
IsReadableStreamLocked (stream) istrue , throw aTypeError exception. - Perform !
ReadableStreamReaderGenericInitialize (this , stream). - Set
this .[[readIntoRequests]] to a new emptyList .
3.6.4. Properties of the ReadableStreamBYOBReader Prototype
3.6.4.1. get closed
closed getter returns a promise that will be fulfilled when the stream becomes closed or the
reader’s lock is released, or rejected if the stream ever errors. - If !
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[closedPromise]].
3.6.4.2. cancel(reason)
cancel method behaves the same as that for the
associated stream. - If !
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
ReadableStreamReaderGenericCancel (this , reason).
3.6.4.3. read(view)
read method will write read bytes into view and return a promise resolved with a
possibly transferred buffer as described below.
- If the chunk does become available, the promise will be fulfilled with an object of the form
{ value: theChunk, done: false }. - If the stream becomes closed, the promise will be fulfilled with an object of the form
{ value: undefined, done: true }. - If the stream becomes errored, the promise will be rejected with the relevant error.
If reading a chunk causes the queue to become empty, more data will be pulled from the underlying byte source.
- If !
IsReadableStreamBYOBReader (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return a promise rejected with aTypeError exception. - If
Type (view) is not Object, return a promise rejected with aTypeError exception. - If view does not have a [[ViewedArrayBuffer]] internal slot, return a promise rejected with a
TypeError exception. - If view.[[ByteLength]] is
0 , return a promise rejected with aTypeError exception. - Return !
ReadableStreamBYOBReaderRead (this , view).
3.6.4.4. releaseLock()
releaseLock method releases the reader’s lock on the corresponding
stream. After the lock is released, the reader is no longer active. If the associated
stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise,
the reader will appear closed.
A reader’s lock cannot be released while it still has a pending read request, i.e., if a promise returned by the
reader’s read() method has not yet been settled. Attempting to do so will throw
a
- If !
IsReadableStreamBYOBReader (this ) isfalse , throw aTypeError exception. - If
this .[[ownerReadableStream]] isundefined , return. - If
this .[[readIntoRequests]] is not empty, throw aTypeError exception. - Perform !
ReadableStreamReaderGenericRelease (this ).
3.7. Readable Stream Reader Abstract Operations
3.7.1. IsReadableStreamDefaultReader ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[readRequests]] internal slot, return
false . - Return
true .
3.7.2. IsReadableStreamBYOBReader ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[readIntoRequests]] internal slot, return
false . - Return
true .
3.7.3. ReadableStreamReaderGenericCancel ( reader, reason ) nothrow
- Let stream be reader.[[ownerReadableStream]].
- Assert: stream is not
undefined . - Return !
ReadableStreamCancel (stream, reason).
3.7.4. ReadableStreamReaderGenericInitialize ( reader, stream ) nothrow
- Set reader.[[ownerReadableStream]] to stream.
- Set stream.[[reader]] to reader.
- If stream.[[state]] is
"readable",- Set reader.[[closedPromise]] to a new promise.
- Otherwise, if stream.[[state]] is
"closed",- Set reader.[[closedPromise]] to a promise resolved with
undefined .
- Set reader.[[closedPromise]] to a promise resolved with
- Otherwise,
- Assert: stream.[[state]] is
"errored". - Set reader.[[closedPromise]] to a promise rejected with stream.[[storedError]].
- Set reader.[[closedPromise]].[[PromiseIsHandled]] to
true .
- Assert: stream.[[state]] is
3.7.5. ReadableStreamReaderGenericRelease ( reader ) nothrow
- Assert: reader.[[ownerReadableStream]] is not
undefined . - Assert: reader.[[ownerReadableStream]].[[reader]] is reader.
- If reader.[[ownerReadableStream]].[[state]] is
"readable", reject reader.[[closedPromise]] with aTypeError exception. - Otherwise, set reader.[[closedPromise]] to a promise rejected with a
TypeError exception. - Set reader.[[closedPromise]].[[PromiseIsHandled]] to
true . - Set reader.[[ownerReadableStream]].[[reader]] to
undefined . - Set reader.[[ownerReadableStream]] to
undefined .
3.7.6. ReadableStreamBYOBReaderRead ( reader, view ) nothrow
- Let stream be reader.[[ownerReadableStream]].
- Assert: stream is not
undefined . - Set stream.[[disturbed]] to
true . - If stream.[[state]] is
"errored", return a promise rejected with stream.[[storedError]]. - Return !
ReadableByteStreamControllerPullInto (stream.[[readableStreamController]], view).
3.7.7. ReadableStreamDefaultReaderRead ( reader ) nothrow
- Let stream be reader.[[ownerReadableStream]].
- Assert: stream is not
undefined . - Set stream.[[disturbed]] to
true . - If stream.[[state]] is
"closed", return a promise resolved with !CreateIterResultObject (undefined ,true ). - If stream.[[state]] is
"errored", return a promise rejected with stream.[[storedError]]. - Assert: stream.[[state]] is
"readable". - Return ! stream.[[readableStreamController]].[[Pull]]().
3.8. Class ReadableStreamDefaultController
The ReadableStreamDefaultController class has methods that allow control of a ReadableStream's state and internal queue. When constructing a ReadableStream that is not a readable byte stream, the underlying source is given a corresponding ReadableStreamDefaultController instance to manipulate.
3.8.1. Class Definition
This section is non-normative.
If one were to write the ReadableStreamDefaultController class in something close to the syntax of [ECMASCRIPT],
it would look like
class ReadableStreamDefaultController {
constructor(stream, underlyingSource, size, highWaterMark)
get desiredSize()
close()
enqueue(chunk)
error(e)
}
3.8.2. Internal Slots
Instances of ReadableStreamDefaultController are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying source, but still has chunks in its internal queue that have not yet been read |
| [[controlledReadableStream]] | The ReadableStream instance controlled
|
| [[pullAgain]] | A boolean flag set to pull method to pull more data, but the pull could not yet be done since a previous call is
still executing
|
| [[pulling]] | A boolean flag set to pull method is
executing and has not yet fulfilled, used to prevent reentrant calls
|
| [[queue]] | A |
| [[started]] | A boolean flag indicating whether the underlying source has finished starting |
| [[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying source |
| [[strategySize]] | A function supplied to the constructor as part of the stream’s queuing strategy, designed to calculate
the size of enqueued chunks; can be |
| [[underlyingSource]] | An object representation of the stream’s underlying source; also used for the IsReadableStreamDefaultController brand check |
3.8.3. new ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)
ReadableStreamDefaultController constructor cannot be used directly; it only works on a ReadableStream that is in the middle of being constructed. - If !
IsReadableStream (stream) isfalse , throw aTypeError exception. - If stream.[[readableStreamController]] is not
undefined , throw aTypeError exception. - Set
this .[[controlledReadableStream]] to stream. - Set
this .[[underlyingSource]] to underlyingSource. - Set
this .[[queue]] to a new emptyList . - Set
this .[[started]],this .[[closeRequested]],this .[[pullAgain]], andthis .[[pulling]] tofalse . - Let normalizedStrategy be ?
ValidateAndNormalizeQueuingStrategy (size, highWaterMark). - Set
this .[[strategySize]] to normalizedStrategy.[[size]] andthis .[[strategyHWM]] to normalizedStrategy.[[highWaterMark]]. - Let controller be
this . - Let startResult be ?
InvokeOrNoop (underlyingSource,"start", «this »). - Let startPromise be a promise resolved with startResult:
- Upon fulfillment of startPromise,
- Set controller.[[started]] to
true . - Assert: controller.[[pulling]] is
false . - Assert: controller.[[pullAgain]] is
false . - Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (controller).
- Set controller.[[started]] to
- Upon rejection of startPromise with reason r,
- Perform !
ReadableStreamDefaultControllerErrorIfNeeded (controller, r).
- Perform !
- Upon fulfillment of startPromise,
3.8.4. Properties of the ReadableStreamDefaultController Prototype
3.8.4.1. get desiredSize
desiredSize getter returns the desired size
to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying
source should use this information to determine when and how to apply backpressure. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - Return !
ReadableStreamDefaultControllerGetDesiredSize (this ).
3.8.4.2. close()
close method will close the controlled readable stream. Consumers will still be able to read
any previously-enqueued chunks from the stream, but once those are read, the stream will become closed. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - If
this .[[closeRequested]] istrue , throw aTypeError exception. - If
this .[[controlledReadableStream]].[[state]] is not"readable", throw aTypeError exception. - Perform !
ReadableStreamDefaultControllerClose (this ).
3.8.4.3. enqueue(chunk)
enqueue method will enqueue a given chunk in the controlled readable stream. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - If
this .[[closeRequested]] istrue , throw aTypeError exception. - If
this .[[controlledReadableStream]].[[state]] is not"readable", throw aTypeError exception. - Return ?
ReadableStreamDefaultControllerEnqueue (this , chunk).
3.8.4.4. error(e)
error method will error the readable stream, making all future interactions with it fail with the
given error e. - If !
IsReadableStreamDefaultController (this ) isfalse , throw aTypeError exception. - Let stream be
this .[[controlledReadableStream]]. - If stream.[[state]] is not
"readable", throw aTypeError exception. - Perform !
ReadableStreamDefaultControllerError (this , e).
3.8.5. Readable Stream Default Controller Internal Methods
The following are additional internal methods implemented by each ReadableStreamDefaultController instance.
They are similar to the supporting abstract operations in the following section, but are in method form to allow
polymorphic dispatch from the readable stream implementation to either these or their counterparts for BYOB controllers.
3.8.5.1. [[Cancel]](reason)
- Set
this .[[queue]] to a new emptyList . - Return !
PromiseInvokeOrNoop (this .[[underlyingSource]],"cancel", « reason »)
3.8.5.2. [[Pull]]()
- Let stream be
this .[[controlledReadableStream]]. - If
this [[queue]] is not empty,- Let chunk be !
DequeueValue (this .[[queue]]). - If
this .[[closeRequested]] istrue andthis .[[queue]] is empty, perform !ReadableStreamClose (stream). - Otherwise, perform !
ReadableStreamDefaultControllerCallPullIfNeeded (this ). - Return a promise resolved with !
CreateIterResultObject (chunk,false ).
- Let chunk be !
- Let pendingPromise be !
ReadableStreamAddReadRequest (stream). - Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (this ). - Return pendingPromise.
3.9. Readable Stream Default Controller Abstract Operations
3.9.1. IsReadableStreamDefaultController ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[underlyingSource]] internal slot, return
false . - Return
true .
3.9.2. ReadableStreamDefaultControllerCallPullIfNeeded ( controller ) nothrow
- Let shouldPull be !
ReadableStreamDefaultControllerShouldCallPull (controller). - If shouldPull is
false , return. - If controller.[[pulling]] is
true ,- Set controller.[[pullAgain]] to
true . - Return.
- Set controller.[[pullAgain]] to
- Assert: controller.[[pullAgain]] is
false . - Set controller.[[pulling]] to
true . - Let pullPromise be !
PromiseInvokeOrNoop (controller.[[underlyingSource]],"pull", « controller »). - Upon fulfillment of pullPromise,
- Set controller.[[pulling]] to
false . - If controller.[[pullAgain]] is
true ,- Set controller.[[pullAgain]] to
false . - Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (controller).
- Set controller.[[pullAgain]] to
- Set controller.[[pulling]] to
- Upon rejection of pullPromise with reason e,
- Perform !
ReadableStreamDefaultControllerErrorIfNeeded (controller, e).
- Perform !
3.9.3. ReadableStreamDefaultControllerShouldCallPull ( controller ) nothrow
- Let stream be controller.[[controlledReadableStream]].
- If stream.[[state]] is
"closed"or stream.[[state]] is"errored", returnfalse . - If controller.[[closeRequested]] is
true , returnfalse . - If controller.[[started]] is
false , returnfalse . - If !
IsReadableStreamLocked (stream) istrue and !ReadableStreamGetNumReadRequests (stream) >0 , returntrue . - Let desiredSize be
ReadableStreamDefaultControllerGetDesiredSize (controller). - If desiredSize >
0 , returntrue . - Return
false .
3.9.4. ReadableStreamDefaultControllerClose ( controller ) nothrow
This abstract operation can be called by other specifications that wish to close a readable stream, in the same way a developer-created stream would be closed by its associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).
- Let stream be controller.[[controlledReadableStream]].
- Assert: controller.[[closeRequested]] is
false . - Assert: stream.[[state]] is
"readable". - Set controller.[[closeRequested]] to
true . - If controller.[[queue]] is empty, perform !
ReadableStreamClose (stream).
3.9.5. ReadableStreamDefaultControllerEnqueue ( controller, chunk ) throws
This abstract operation can be called by other specifications that wish to enqueue chunks in a readable stream, in the same way a developer would enqueue chunks using the stream’s associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).
- Let stream be controller.[[controlledReadableStream]].
- Assert: controller.[[closeRequested]] is
false . - Assert: stream.[[state]] is
"readable". - If !
IsReadableStreamLocked (stream) istrue and !ReadableStreamGetNumReadRequests (stream) >0 , perform !ReadableStreamFulfillReadRequest (stream, chunk,false ). - Otherwise,
- Let chunkSize be
1 . - If controller.[[strategySize]] is not
undefined ,- Set chunkSize to
Call (controller.[[strategySize]],undefined , « chunk »). - If chunkSize is an
abrupt completion ,- Perform !
ReadableStreamDefaultControllerErrorIfNeeded (controller, chunkSize.[[Value]]). - Return chunkSize.
- Perform !
- Let chunkSize be chunkSize.[[Value]].
- Set chunkSize to
- Let enqueueResult be !
EnqueueValueWithSize (controller.[[queue]], chunk, chunkSize). - If enqueueResult is an
abrupt completion ,- Perform !
ReadableStreamDefaultControllerErrorIfNeeded (controller, enqueueResult.[[Value]]). - Return enqueueResult.
- Perform !
- Let chunkSize be
- Perform !
ReadableStreamDefaultControllerCallPullIfNeeded (controller).
"closed", but stream.[[closeRequested]]
is cancel(reason). In this case we allow the
controller’s enqueue method to be called and silently do nothing, since the cancelation was outside the
control of the underlying source. 3.9.6. ReadableStreamDefaultControllerError ( controller, e ) nothrow
This abstract operation can be called by other specifications that wish to move a readable stream to an errored state, in the same way a developer would error a stream using its associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the precondition (listed here as an assert).
- Let stream be controller.[[controlledReadableStream]].
- Assert: stream.[[state]] is
"readable". - Set controller.[[queue]] to a new empty
List . - Perform !
ReadableStreamError (stream, e).
3.9.7. ReadableStreamDefaultControllerErrorIfNeeded ( controller, e ) nothrow
- If controller.[[controlledReadableStream]].[[state]] is
"readable", perform !ReadableStreamDefaultControllerError (controller, e).
3.9.8. ReadableStreamDefaultControllerGetDesiredSize ( controller ) nothrow
This abstract operation can be called by other specifications that wish to determine the desired size to fill this stream’s internal queue, similar to how a developer would consult
the desiredSize property of the stream’s associated controller object.
Specifications should not use this on streams they did not create.
- Let queueSize be !
GetTotalQueueSize (controller.[[queue]]). - Return controller.[[strategyHWM]] − queueSize.
3.10. Class ReadableByteStreamController
The ReadableByteStreamController class has methods that allow control of a ReadableStream's state and internal queue. When constructing a ReadableStream, the underlying byte source is given a
corresponding ReadableByteStreamController instance to manipulate.
3.10.1. Class Definition
This section is non-normative.
If one were to write the ReadableByteStreamController class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableByteStreamController {
constructor(stream, underlyingByteSource, highWaterMark)
get byobRequest()
get desiredSize()
close()
enqueue(chunk)
error(e)
}
3.10.2. Internal Slots
Instances of ReadableByteStreamController are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[autoAllocateChunkSize]] | A positive integer, when the automatic buffer allocation feature is enabled. In that case, this value
specifies the size of buffer to allocate. It is |
| [[closeRequested]] | A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read |
| [[controlledReadableStream]] | The ReadableStream instance controlled
|
| [[pullAgain]] | A boolean flag set to pull method to pull more data, but the pull could not yet be done
since a previous call is still executing
|
| [[pulling]] | A boolean flag set to pull method is executing and has not yet fulfilled, used to prevent reentrant calls
|
| [[byobRequest]] | A ReadableStreamBYOBRequest instance representing the current BYOB pull request
|
| [[pendingPullIntos]] | A |
| [[queue]] | A |
| [[started]] | A boolean flag indicating whether the underlying source has finished starting |
| [[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source |
| [[totalQueuedBytes]] | The number of bytes stored in [[queue]] |
| [[underlyingByteSource]] | An object representation of the stream’s underlying byte source; also used for the IsReadableByteStreamController brand check |
3.10.3. new ReadableByteStreamController(stream, underlyingByteSource, highWaterMark)
ReadableByteStreamController constructor cannot be used directly; it only works on a ReadableStream that is in the middle of being constructed. - If !
IsReadableStream (stream) isfalse , throw aTypeError exception. - If stream.[[readableStreamController]] is not
undefined , throw aTypeError exception. - Set
this .[[controlledReadableStream]] to stream. - Set
this .[[underlyingByteSource]] to underlyingByteSource. - Set
this .[[pullAgain]], andthis .[[pulling]] tofalse . - Perform !
ReadableByteStreamControllerClearPendingPullIntos (this ). - Set
this .[[queue]] to a new emptyList . - Set
this .[[totalQueuedBytes]] to0 . - Set
this .[[started]], andthis .[[closeRequested]] tofalse . - Set
this .[[strategyHWM]] to ?ValidateAndNormalizeHighWaterMark (highWaterMark). - Let autoAllocateChunkSize be ?
GetV (underlyingByteSource,"autoAllocateChunkSize"). - If autoAllocateChunkSize is not
undefined ,- If !
IsInteger (autoAllocateChunkSize) isfalse , or if autoAllocateChunkSize ≤0 , throw aRangeError exception.
- If !
- Set
this .[[autoAllocateChunkSize]] to autoAllocateChunkSize. - Set
this .[[pendingPullIntos]] to a new emptyList . - Let controller be
this . - Let startResult be ?
InvokeOrNoop (underlyingByteSource,"start", «this »). - Let startPromise be a promise resolved with startResult:
- Upon fulfillment of startPromise,
- Set controller.[[started]] to
true . - Assert: controller.[[pulling]] is
false . - Assert: controller.[[pullAgain]] is
false . - Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
- Set controller.[[started]] to
- Upon rejection of startPromise with reason r,
- If stream.[[state]] is
"readable", perform !ReadableByteStreamControllerError (controller, r).
- If stream.[[state]] is
- Upon fulfillment of startPromise,
3.10.4. Properties of the ReadableByteStreamController Prototype
3.10.4.1. get byobRequest
byobRequest getter returns the current BYOB pull request. - If
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - If
this .[[byobRequest]] isundefined andthis .[[pendingPullIntos]] is not empty,- Let firstDescriptor be the first element of
this .[[pendingPullIntos]]. - Let view be !
Construct (%Uint8Array%, « firstDescriptor.[[buffer]], firstDescriptor.[[byteOffset]] + firstDescriptor.[[bytesFilled]], firstDescriptor.[[byteLength]] − firstDescriptor.[[bytesFilled]] »). - Set
this .[[byobRequest]] to !Construct (ReadableStreamBYOBRequest, «this , view »).
- Let firstDescriptor be the first element of
- Return
this .[[byobRequest]].
3.10.4.2. get desiredSize
desiredSize getter returns the desired size
to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying
source should use this information to determine when and how to apply backpressure. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - Return !
ReadableByteStreamControllerGetDesiredSize (this ).
3.10.4.3. close()
close method will close the controlled readable stream. Consumers will still be able to read
any previously-enqueued chunks from the stream, but once those are read, the stream will become closed. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - If
this .[[closeRequested]] istrue , throw aTypeError exception. - If
this .[[controlledReadableStream]].[[state]] is not"readable", throw aTypeError exception. - Perform ?
ReadableByteStreamControllerClose (this ).
3.10.4.4. enqueue(chunk)
enqueue method will enqueue a given chunk in the controlled readable stream. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - If
this .[[closeRequested]] istrue , throw aTypeError exception. - If
this .[[controlledReadableStream]].[[state]] is not"readable", throw aTypeError exception. - If
Type (chunk) is not Object, throw aTypeError exception. - If chunk does not have a [[ViewedArrayBuffer]] internal slot, throw a
TypeError exception. - Return !
ReadableByteStreamControllerEnqueue (this , chunk).
3.10.4.5. error(e)
error method will error the readable stream, making all future interactions with it fail with the
given error e. - If !
IsReadableByteStreamController (this ) isfalse , throw aTypeError exception. - Let stream be
this .[[controlledReadableStream]]. - If stream.[[state]] is not
"readable", throw aTypeError exception. - Perform !
ReadableByteStreamControllerError (this , e).
3.10.5. Readable Stream BYOB Controller Internal Methods
The following are additional internal methods implemented by each ReadableByteStreamController instance. They are
similar to the supporting abstract operations in the following section, but are in method form to allow polymorphic
dispatch from the readable stream implementation to either these or their counterparts for default controllers.
3.10.5.1. [[Cancel]](reason)
- If
this .[[pendingPullIntos]] is not empty,- Let firstDescriptor be the first element of
this .[[pendingPullIntos]]. - Set firstDescriptor.[[bytesFilled]] to
0 .
- Let firstDescriptor be the first element of
- Set
this .[[queue]] to a new emptyList . - Set
this .[[totalQueuedBytes]] to0 . - Return !
PromiseInvokeOrNoop (this .[[underlyingByteSource]],"cancel", « reason »)
3.10.5.2. [[Pull]]()
- Let stream be
this .[[controlledReadableStream]]. - If !
ReadableStreamGetNumReadRequests (stream) is0 ,- If
this [[totalQueuedBytes]] >0 ,- Let entry be the first element of
this .[[queue]]. - Remove entry from
this .[[queue]], shifting all other elements downward (so that the second becomes the first, and so on). - Set
this .[[totalQueuedBytes]] tothis .[[totalQueuedBytes]] − entry.[[byteLength]]. - Perform !
ReadableByteStreamControllerHandleQueueDrain (this ). - Let view be !
Construct (%Uint8Array%, « entry.[[buffer]], entry.[[byteOffset]], entry.[[byteLength]] »). - Return a promise resolved with !
CreateIterResultObject (view,false ).
- Let entry be the first element of
- Let autoAllocateChunkSize be
this .[[autoAllocateChunkSize]]. - If autoAllocateChunkSize is not
undefined ,- Let buffer be
Construct (%ArrayBuffer% , « autoAllocateChunkSize »). - If buffer is an
abrupt completion , return a promise rejected with buffer.[[Value]]. - Let pullIntoDescriptor be
Record {[[buffer]]: buffer.[[Value]], [[byteOffset]]:0 , [[byteLength]]: autoAllocateChunkSize, [[bytesFilled]]:0 , [[elementSize]]:1 , [[ctor]]: %Uint8Array%, [[readerType]]:"default"}. - Append pullIntoDescriptor as the last element of
this .[[pendingPullIntos]].
- Let buffer be
- If
- Otherwise,
- Assert:
this .[[autoAllocateChunkSize]] isundefined .
- Assert:
- Let promise be !
ReadableStreamAddReadRequest (stream). - Perform !
ReadableByteStreamControllerCallPullIfNeeded (this ). - Return promise.
3.11. Class ReadableStreamBYOBRequest
The ReadableStreamBYOBRequest class represents a pull into request in a ReadableByteStreamController.
3.11.1. Class Definition
This section is non-normative.
If one were to write the ReadableStreamBYOBRequest class in something close to the syntax of [ECMASCRIPT], it
would look like
class ReadableStreamBYOBRequest {
constructor(controller, view)
get view()
respond(bytesWritten)
respondWithNewView(view)
}
3.11.2. Internal Slots
Instances of ReadableStreamBYOBRequest are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[associatedReadableByteStreamController]] | The parent ReadableByteStreamController instance
|
| [[view]] | A typed array representing the destination region to which the controller may write generated data |
3.11.3. new ReadableStreamBYOBRequest(controller, view)
- Set
this .[[associatedReadableByteStreamController]] to controller. - Set
this .[[view]] to view.
3.11.4. Properties of the ReadableStreamBYOBRequest Prototype
3.11.4.1. get view
- If !
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. - Return
this .[[view]].
3.11.4.2. respond(bytesWritten)
- If !
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. - If
this .[[associatedReadableByteStreamController]] isundefined , throw aTypeError exception. - Return ?
ReadableByteStreamControllerRespond (this .[[associatedReadableByteStreamController]], bytesWritten).
3.11.4.3. respondWithNewView(view)
- If !
IsReadableStreamBYOBRequest (this ) isfalse , throw aTypeError exception. - If
this .[[associatedReadableByteStreamController]] isundefined , throw aTypeError exception. - If
Type (view) is not Object, throw aTypeError exception. - If view does not have a [[ViewedArrayBuffer]] internal slot, throw a
TypeError exception. - Return ?
ReadableByteStreamControllerRespondWithNewView (this .[[associatedReadableByteStreamController]], view).
3.12. Readable Stream BYOB Controller Abstract Operations
3.12.1. IsReadableStreamBYOBRequest ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[associatedReadableByteStreamController]] internal slot, return
false . - Return
true .
3.12.2. IsReadableByteStreamController ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[underlyingByteSource]] internal slot, return
false . - Return
true .
3.12.3. ReadableByteStreamControllerCallPullIfNeeded ( controller ) nothrow
- Let shouldPull be !
ReadableByteStreamControllerShouldCallPull (controller). - If shouldPull is
false , return. - If controller.[[pulling]] is
true ,- Set controller.[[pullAgain]] to
true . - Return.
- Set controller.[[pullAgain]] to
- Assert: controller.[[pullAgain]] is
false . - Set controller.[[pulling]] to
true . - Let pullPromise be !
PromiseInvokeOrNoop (controller.[[underlyingByteSource]],"pull", « controller »). - Upon fulfillment of pullPromise,
- Set controller.[[pulling]] to
false . - If controller.[[pullAgain]] is
true ,- Set controller.[[pullAgain]] to
false . - Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
- Set controller.[[pullAgain]] to
- Set controller.[[pulling]] to
- Upon rejection of pullPromise with reason e,
- If controller.[[controlledReadableStream]].[[state]] is
"readable", perform !ReadableByteStreamControllerError (controller, e).
- If controller.[[controlledReadableStream]].[[state]] is
3.12.4. ReadableByteStreamControllerClearPendingPullIntos ( controller ) nothrow
- Perform !
ReadableByteStreamControllerInvalidateBYOBRequest (controller). - Set controller.[[pendingPullIntos]] to a new empty
List .
3.12.5. ReadableByteStreamControllerClose ( controller ) throws
- Let stream be controller.[[controlledReadableStream]].
- Assert: controller.[[closeRequested]] is
false . - Assert: stream.[[state]] is
"readable". - If controller.[[totalQueuedBytes]] >
0 ,- Set controller.[[closeRequested]] to
true . - Return.
- Set controller.[[closeRequested]] to
- If controller.[[pendingPullIntos]] is not empty,
- Let firstPendingPullInto be the first element of controller.[[pendingPullIntos]].
- If firstPendingPullInto.[[bytesFilled]] >
0 ,- Let e be a new
TypeError exception. - Perform !
ReadableByteStreamControllerError (controller, e). - Throw e.
- Let e be a new
- Perform !
ReadableStreamClose (stream).
3.12.6. ReadableByteStreamControllerCommitPullIntoDescriptor ( stream, pullIntoDescriptor ) nothrow
- Assert: stream.[[state]] is not
"errored". - Let done be
false . - If stream.[[state]] is
"closed",- Assert: pullIntoDescriptor.[[bytesFilled]] is
0 . - Set done to
true .
- Assert: pullIntoDescriptor.[[bytesFilled]] is
- Let filledView be !
ReadableByteStreamControllerConvertPullIntoDescriptor (pullIntoDescriptor). - If pullIntoDescriptor.[[readerType]] is
"default",- Perform !
ReadableStreamFulfillReadRequest (stream, filledView, done).
- Perform !
- Otherwise,
- Assert: pullIntoDescriptor.[[readerType]] is
"byob". - Perform !
ReadableStreamFulfillReadIntoRequest (stream, filledView, done).
- Assert: pullIntoDescriptor.[[readerType]] is
3.12.7. ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ) nothrow
- Let bytesFilled be pullIntoDescriptor.[[bytesFilled]].
- Let elementSize be pullIntoDescriptor.[[elementSize]].
- Assert: bytesFilled ≤ pullIntoDescriptor.[[byteLength]].
- Assert: bytesFilled mod elementSize is
0 . - Return !
Construct (pullIntoDescriptor.[[ctor]], « pullIntoDescriptor.[[buffer]], pullIntoDescriptor.[[byteOffset]], bytesFilled ÷ elementSize »).
3.12.8. ReadableByteStreamControllerEnqueue ( controller, chunk ) nothrow
- Let stream be controller.[[controlledReadableStream]].
- Assert: controller.[[closeRequested]] is
false . - Assert: stream.[[state]] is
"readable". - Let buffer be chunk.[[ViewedArrayBuffer]].
- Let byteOffset be chunk.[[ByteOffset]].
- Let byteLength be chunk.[[ByteLength]].
- Let transferredBuffer be ! Transfer(buffer,
the current Realm Record ). - If !
ReadableStreamHasDefaultReader (stream) istrue - If !
ReadableStreamGetNumReadRequests (stream) is0 ,- Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, transferredBuffer, byteOffset, byteLength).
- Perform !
- Otherwise,
- Assert: controller.[[queue]] is empty.
- Let transferredView be !
Construct (%Uint8Array%, « transferredBuffer, byteOffset, byteLength »). - Perform !
ReadableStreamFulfillReadRequest (stream, transferredView,false ).
- If !
- Otherwise, if !
ReadableStreamHasBYOBReader (stream) istrue ,- Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, transferredBuffer, byteOffset, byteLength). - Perform !
ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (controller).
- Perform !
- Otherwise,
- Assert: !
IsReadableStreamLocked (stream) isfalse . - Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, transferredBuffer, byteOffset, byteLength).
- Assert: !
3.12.9. ReadableByteStreamControllerEnqueueChunkToQueue ( controller, buffer, byteOffset, byteLength ) nothrow
- Append
Record {[[buffer]]: buffer, [[byteOffset]]: byteOffset, [[byteLength]]: byteLength} as the last element of controller.[[queue]]. - Add byteLength to controller.[[totalQueuedBytes]].
3.12.10. ReadableByteStreamControllerError ( controller, e ) nothrow
- Let stream be controller.[[controlledReadableStream]].
- Assert: stream.[[state]] is
"readable". - Perform !
ReadableByteStreamControllerClearPendingPullIntos (controller). - Let controller.[[queue]] be a new empty
List . - Perform !
ReadableStreamError (stream, e).
3.12.11. ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller, size, pullIntoDescriptor ) nothrow
- Assert: either controller.[[pendingPullIntos]] is empty, or the first element of controller.[[pendingPullIntos]] is pullIntoDescriptor.
- Perform !
ReadableByteStreamControllerInvalidateBYOBRequest (controller). - Set pullIntoDescriptor.[[bytesFilled]] to pullIntoDescriptor.[[bytesFilled]] + size.
3.12.12. ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller, pullIntoDescriptor ) nothrow
- Let elementSize be pullIntoDescriptor.[[elementSize]].
- Let currentAlignedBytes be pullIntoDescriptor.[[bytesFilled]] − (pullIntoDescriptor.[[bytesFilled]] mod elementSize).
- Let maxBytesToCopy be
min (controller.[[totalQueuedBytes]], pullIntoDescriptor.[[byteLength]] − pullIntoDescriptor.[[bytesFilled]]). - Let maxBytesFilled be pullIntoDescriptor.[[bytesFilled]] + maxBytesToCopy.
- Let maxAlignedBytes be maxBytesFilled − (maxBytesFilled mod elementSize).
- Let totalBytesToCopyRemaining be maxBytesToCopy.
- Let ready be
false . - If maxAlignedBytes > currentAlignedBytes,
- Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor.[[bytesFilled]].
- Set ready to
true .
- Let queue be controller.[[queue]].
- Repeat the following steps while totalBytesToCopyRemaining >
0 ,- Let headOfQueue be the first element of queue.
- Let bytesToCopy be
min (totalBytesToCopyRemaining, headOfQueue.[[byteLength]]). - Let destStart be pullIntoDescriptor.[[byteOffset]] + pullIntoDescriptor.[[bytesFilled]].
- Perform !
CopyDataBlockBytes (headOfQueue.[[buffer]].[[ArrayBufferData]], headOfQueue.[[byteOffset]], pullIntoDescriptor.[[buffer]].[[ArrayBufferData]], destStart, bytesToCopy). - If headOfQueue.[[byteLength]] is bytesToCopy,
- Remove the first element of queue, shifting all other elements downward (so that the second becomes the first, and so on).
- Otherwise,
- Set headOfQueue.[[byteOffset]] to headOfQueue.[[byteOffset]] + bytesToCopy.
- Set headOfQueue.[[byteLength]] to headOfQueue.[[byteLength]] − bytesToCopy.
- Set controller.[[totalQueuedBytes]] to controller.[[totalQueuedBytes]] − bytesToCopy.
- Perform !
ReadableByteStreamControllerFillHeadPullIntoDescriptor (controller, bytesToCopy, pullIntoDescriptor). - Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
- If ready is
false ,- Assert: controller.[[totalQueuedBytes]] is
0 . - Assert: pullIntoDescriptor.[[bytesFilled]] >
0 . - Assert: pullIntoDescriptor.[[bytesFilled]] < pullIntoDescriptor.[[elementSize]].
- Assert: controller.[[totalQueuedBytes]] is
- Return ready.
3.12.13. ReadableByteStreamControllerGetDesiredSize ( controller ) nothrow
- Return controller.[[strategyHWM]] − controller.[[totalQueuedBytes]].
3.12.14. ReadableByteStreamControllerHandleQueueDrain ( controller ) nothrow
- Assert: controller.[[controlledReadableStream]].[[state]] is
"readable". - If controller.[[totalQueuedBytes]] is
0 and controller.[[closeRequested]] istrue ,- Perform !
ReadableStreamClose (controller.[[controlledReadableStream]]).
- Perform !
- Otherwise,
- Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller).
- Perform !
3.12.15. ReadableByteStreamControllerInvalidateBYOBRequest ( controller ) nothrow
- If controller.[[byobRequest]] is
undefined , return. - Set controller.[[byobRequest]].[[associatedReadableByteStreamController]] to
undefined . - Set controller.[[byobRequest]].[[view]] to
undefined . - Set controller.[[byobRequest]] to
undefined .
3.12.16. ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ) nothrow
- Assert: controller.[[closeRequested]] is
false . - Repeat the following steps while controller.[[pendingPullIntos]] is not empty,
- If controller.[[totalQueuedBytes]] is
0 , return. - Let pullIntoDescriptor be the first element of controller.[[pendingPullIntos]].
- If !
ReadableByteStreamControllerFillPullIntoDescriptorFromQueue (controller, pullIntoDescriptor) istrue ,- Perform !
ReadableByteStreamControllerShiftPendingPullInto (controller). - Perform !
ReadableByteStreamControllerCommitPullIntoDescriptor (controller.[[controlledReadableStream]], pullIntoDescriptor).
- Perform !
- If controller.[[totalQueuedBytes]] is
3.12.17. ReadableByteStreamControllerPullInto ( controller, view ) nothrow
- Let stream be controller.[[controlledReadableStream]].
- Let elementSize be 1.
- Let ctor be
%DataView% . - If view has a [[TypedArrayName]] internal slot (i.e., it is not a
DataView),- Set elementSize to the element size specified in the typed array constructors table for view.[[TypedArrayName]].
- Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
- Let pullIntoDescriptor be
Record {[[buffer]]: view.[[ViewedArrayBuffer]], [[byteOffset]]: view.[[ByteOffset]], [[byteLength]]: view.[[ByteLength]], [[bytesFilled]]:0 , [[elementSize]]: elementSize, [[ctor]]: ctor, [[readerType]]:"byob"}. - If controller.[[pendingPullIntos]] is not empty,
- Set pullIntoDescriptor.[[buffer]] to ! Transfer(pullIntoDescriptor.[[buffer]],
the current Realm Record ). - Append pullIntoDescriptor as the last element of controller.[[pendingPullIntos]].
- Return !
ReadableStreamAddReadIntoRequest (stream).
- Set pullIntoDescriptor.[[buffer]] to ! Transfer(pullIntoDescriptor.[[buffer]],
- If stream.[[state]] is
"closed",- Let emptyView be !
Construct (ctor, « pullIntoDescriptor.[[buffer]], pullIntoDescriptor.[[byteOffset]],0 »). - Return a promise resolved with !
CreateIterResultObject (emptyView,true ).
- Let emptyView be !
- If controller.[[totalQueuedBytes]] >
0 ,- If !
ReadableByteStreamControllerFillPullIntoDescriptorFromQueue (controller, pullIntoDescriptor) istrue ,- Let filledView be !
ReadableByteStreamControllerConvertPullIntoDescriptor (pullIntoDescriptor). - Perform !
ReadableByteStreamControllerHandleQueueDrain (controller). - Return a promise resolved with !
CreateIterResultObject (filledView,false ).
- Let filledView be !
- If controller.[[closeRequested]] is
true ,- Let e be a
TypeError exception. - Perform !
ReadableByteStreamControllerError (controller, e). - Return a promise rejected with e.
- Let e be a
- If !
- Set pullIntoDescriptor.[[buffer]] to ! Transfer(pullIntoDescriptor.[[buffer]],
the current Realm Record ). - Append pullIntoDescriptor as the last element of controller.[[pendingPullIntos]].
- Let promise be !
ReadableStreamAddReadIntoRequest (stream). - Perform !
ReadableByteStreamControllerCallPullIfNeeded (controller). - Return promise.
3.12.18. ReadableByteStreamControllerRespond ( controller, bytesWritten ) throws
- Let bytesWritten be ?
ToNumber (bytesWritten). - If !
IsFiniteNonNegativeNumber (bytesWritten) isfalse ,- Throw a
RangeError exception.
- Throw a
- Assert: controller.[[pendingPullIntos]] is not empty.
- Perform ?
ReadableByteStreamControllerRespondInternal (controller, bytesWritten).
3.12.19. ReadableByteStreamControllerRespondInClosedState ( controller, firstDescriptor ) nothrow
- Set firstDescriptor.[[buffer]] to ! Transfer(firstDescriptor.[[buffer]],
the current Realm Record ). - Assert: firstDescriptor.[[bytesFilled]] is
0 . - Let stream be controller.[[controlledReadableStream]].
- Repeat the following steps while !
ReadableStreamGetNumReadIntoRequests (stream) >0 ,- Let pullIntoDescriptor be !
ReadableByteStreamControllerShiftPendingPullInto (controller). - Perform !
ReadableByteStreamControllerCommitPullIntoDescriptor (stream, pullIntoDescriptor).
- Let pullIntoDescriptor be !
3.12.20. ReadableByteStreamControllerRespondInReadableState ( controller, bytesWritten, pullIntoDescriptor ) throws
- If pullIntoDescriptor.[[bytesFilled]] + bytesWritten > pullIntoDescriptor.[[byteLength]], throw a
RangeError exception. - Perform !
ReadableByteStreamControllerFillHeadPullIntoDescriptor (controller, bytesWritten, pullIntoDescriptor). - If pullIntoDescriptor.[[bytesFilled]] < pullIntoDescriptor.[[elementSize]], return.
- Perform !
ReadableByteStreamControllerShiftPendingPullInto (controller). - Let remainderSize be pullIntoDescriptor.[[bytesFilled]] mod pullIntoDescriptor.[[elementSize]].
- If remainderSize >
0 ,- Let end be pullIntoDescriptor.[[byteOffset]] + pullIntoDescriptor.[[bytesFilled]].
- Let remainder be ?
CloneArrayBuffer (pullIntoDescriptor.[[buffer]], end − remainderSize, remainderSize,%ArrayBuffer% ). - Perform !
ReadableByteStreamControllerEnqueueChunkToQueue (controller, remainder,0 , remainder.[[ByteLength]]).
- Set pullIntoDescriptor.[[buffer]] to ! Transfer(pullIntoDescriptor.[[buffer]],
the current Realm Record ). - Set pullIntoDescriptor.[[bytesFilled]] to pullIntoDescriptor.[[bytesFilled]] − remainderSize.
- Perform !
ReadableByteStreamControllerCommitPullIntoDescriptor (controller.[[controlledReadableStream]], pullIntoDescriptor). - Perform !
ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue (controller).
3.12.21. ReadableByteStreamControllerRespondInternal ( controller, bytesWritten ) throws
- Let firstDescriptor be the first element of controller.[[pendingPullIntos]].
- Let stream be controller.[[controlledReadableStream]].
- If stream.[[state]] is
"closed",- If bytesWritten is not
0 , throw aTypeError exception. - Perform !
ReadableByteStreamControllerRespondInClosedState (controller, firstDescriptor).
- If bytesWritten is not
- Otherwise,
- Assert: stream.[[state]] is
"readable". - Perform ?
ReadableByteStreamControllerRespondInReadableState (controller, bytesWritten, firstDescriptor).
- Assert: stream.[[state]] is
3.12.22. ReadableByteStreamControllerRespondWithNewView ( controller, view ) throws
- Assert: controller.[[pendingPullIntos]] is not empty.
- Let firstDescriptor be the first element of controller.[[pendingPullIntos]].
- If firstDescriptor.[[byteOffset]] + firstDescriptor.[[bytesFilled]] is not view.[[ByteOffset]], throw a
RangeError exception. - If firstDescriptor.[[byteLength]] is not view.[[ByteLength]], throw a
RangeError exception. - Set firstDescriptor.[[buffer]] to view.[[ViewedArrayBuffer]].
- Perform ?
ReadableByteStreamControllerRespondInternal (controller, view.[[ByteLength]]).
3.12.23. ReadableByteStreamControllerShiftPendingPullInto ( controller ) nothrow
- Let descriptor be the first element of controller.[[pendingPullIntos]].
- Remove descriptor from controller.[[pendingPullIntos]], shifting all other elements downward (so that the second becomes the first, and so on).
- Perform !
ReadableByteStreamControllerInvalidateBYOBRequest (controller). - Return descriptor.
3.12.24. ReadableByteStreamControllerShouldCallPull ( controller ) nothrow
- Let stream be controller.[[controlledReadableStream]].
- If stream.[[state]] is not
"readable", returnfalse . - If controller.[[closeRequested]] is
true , returnfalse . - If controller.[[started]] is
false , returnfalse . - If !
ReadableStreamHasDefaultReader (stream) istrue and !ReadableStreamGetNumReadRequests (stream) >0 , returntrue . - If !
ReadableStreamHasBYOBReader (stream) istrue and !ReadableStreamGetNumReadIntoRequests (stream) >0 , returntrue . - If !
ReadableByteStreamControllerGetDesiredSize (controller) >0 , returntrue . - Return
false .
4. Writable Streams
4.1. Using Writable Streams
readableStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
write() and close() methods. Since writable streams
queue any incoming writes, and take care internally to forward them to the underlying sink in sequence, you can
indiscriminately write to a writable stream without much ceremony:
function writeArrayToStream(array, writableStream) {
const writer = writableStream.getWriter();
array.forEach(chunk => writer.write(chunk));
return writer.close();
}
writeArrayToStream([1, 2, 3, 4, 5], writableStream)
.then(() => console.log("All done!"))
.catch(e => console.error("Error with the stream: " + e));
close() method. That promise (which can also be accessed using
the closed getter) will reject if anything goes wrong with the stream—initializing it,
writing to it, or closing it. And it will fulfill once the stream is successfully closed. Often this is all you care
about.
However, if you care about the success of writing a specific chunk, you can use the promise returned by the
stream’s write() method:
writer.write("i am a chunk of data")
.then(() => console.log("chunk successfully written!"))
.catch(e => console.error(e));
What "success" means is up to a given stream instance (or more precisely, its underlying sink) to decide. For example, for a file stream it could simply mean that the OS has accepted the write, and not necessarily that the chunk has been flushed to disk.
desiredSize and ready properties of writable
stream writers allow producers to more precisely respond to flow control signals from the stream, to keep
memory usage below the stream’s specified high water mark. The following example writes an infinite sequence of
random bytes to a stream, using desiredSize to determine how many bytes to generate at
a given time, and using ready to wait for the backpressure to subside.
async function writeRandomBytesForever(writableStream) {
const writer = writableStream.getWriter();
while (true) {
await writer.ready;
const bytes = new Uint8Array(writer.desiredSize);
window.crypto.getRandomValues(bytes);
await writer.write(bytes);
}
}
writeRandomBytesForever(myWritableStream).catch(e => console.error("Something broke", e));
4.2. Class WritableStream
4.2.1. Class Definition
This section is non-normative.
If one were to write the WritableStream class in something close to the syntax of [ECMASCRIPT], it would look
like
class WritableStream {
constructor(underlyingSink = {}, { size, highWaterMark = 1 } = {})
get locked()
abort(reason)
getWriter()
}
4.2.2. Internal Slots
Instances of WritableStream are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[pendingAbortRequest]] | The promise for a pending abort operation |
| [[pendingCloseRequest]] | The promise returned from the writer close() method
|
| [[pendingWriteRequest]] | The promise for the current pending write operation |
| [[state]] | A string containing the stream’s current state, used internally; one of "writable", "closing", "closed", or "errored"
|
| [[storedError]] | A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate
on the stream while in the "errored" state
|
| [[writableStreamController]] | A WritableStreamDefaultController created with the ability to control the state and queue of this stream;
also used for the |
| [[writer]] | A WritableStreamDefaultWriter instance, if the stream is locked to a writer, or |
| [[writeRequests]] | A |
4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 1 } = {})
underlyingSink object passed to the constructor can implement any of the following methods to govern
how the constructed stream instance behaves:
-
start(controller)is called immediately, and should perform any actions necessary to acquire access to the underlying sink. If this process is asynchronous, it can return a promise to signal success or failure. -
write(chunk, controller)is called when a new chunk of data is ready to be written to the underlying sink. It can return a promise to signal success or failure of the write operation. The stream implementation guarantees that this method will be called only after previous writes have succeeded, and never aftercloseorabortis called. -
close(controller)is called after the producer signals that they are done writing chunks to the stream, and all queued-up writes successfully complete. It should perform any actions necessary to finalize writes to the underlying sink, and release access to it. If this process is asynchronous, it can return a promise to signal success or failure. The stream implementation guarantees that this method will be called only after all queued-up writes have succeeded. -
abort(reason)is called when the producer signals they wish to abruptly close the stream and put it in an errored state. It should clean up any held resources, much likeclose, but perhaps with some custom handling. Unlikeclose,abortwill be called even if writes are queued up; those chunks will be thrown away. If this process is asynchronous, it can return a promise to signal success or failure.
The controller object passed to start, write and close is an
instance of WritableStreamDefaultController, and has the ability to error the stream.
The constructor also accepts a second argument containing the queuing strategy object with
two properties: a non-negative number highWaterMark, and a function size(chunk). The
supplied strategy could be an instance of the built-in CountQueuingStrategy or ByteLengthQueuingStrategy classes, or it could be custom. If no strategy is supplied, the default
behavior will be the same as a CountQueuingStrategy with a high water mark of 1.
close and abort to be called, in cases where the producer aborts the stream while it is in the "closing" state. Notably, since a stream always spends at least one turn in the "closing" state, code like ws.close(); ws.abort(...); will cause both to be called, even if the close method itself has no asynchronous behavior. A well-designed underlying sink object should be able to deal with
this. - Set
this .[[state]] to"writable". - Set
this .[[storedError]],this .[[writer]],this .[[writableStreamController]],this .[[pendingAbortRequest]],this .[[pendingCloseRequest]], andthis .[[pendingWriteRequest]] toundefined . - Set
this .[[writeRequests]] to a new emptyList . - Let type be ?
GetV (underlyingSink,"type"). - If type is not
undefined , throw aRangeError exception.This is to allow us to add new potential types in the future, without backward-compatibility concerns.
- Set
this .[[writableStreamController]] to ?Construct (WritableStreamDefaultController, «this , underlyingSink, size, highWaterMark »).
4.2.4. Properties of the WritableStream Prototype
4.2.4.1. get locked
locked getter returns whether or not the writable stream is locked to a writer. - If !
IsWritableStream (this ) isfalse , throw aTypeError exception. - Return !
IsWritableStreamLocked (this ).
4.2.4.2. abort(reason)
abort method aborts the stream, signaling that the producer can
no longer successfully write to the stream and it should be immediately moved to an errored state, with any queued-up
writes discarded. This will also execute any abort mechanism of the underlying sink. - If !
IsWritableStream (this ) isfalse , return a promise rejected with aTypeError exception. - If !
IsWritableStreamLocked (this ) istrue , return a promise rejected with aTypeError exception. - Return !
WritableStreamAbort (this , reason).
4.2.4.3. getWriter()
getWriter method creates a writer (an instance of WritableStreamDefaultWriter) and locks the stream to the new writer. While the
stream is locked, no other writer can be acquired until this one is released.
This functionality is especially useful for creating abstractions that desire the ability to write to a stream without interruption or interleaving. By getting a writer for the stream, you can ensure nobody else can write at the same time, which would cause the resulting written data to be unpredictable and probably useless.
- If !
IsWritableStream (this ) isfalse , throw aTypeError exception. - Return ?
AcquireWritableStreamDefaultWriter (this ).
4.3. General Writable Stream Abstract Operations
The following abstract operations, unlike most in this specification, are meant to be generally useful by other specifications, instead of just being part of the implementation of this spec’s classes.
4.3.1. AcquireWritableStreamDefaultWriter ( stream ) throws
- Return ?
Construct (WritableStreamDefaultWriter, « stream »).
4.3.2. IsWritableStream ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have a [[writableStreamController]] internal slot, return
false . - Return
true .
4.3.3. IsWritableStreamLocked ( stream ) nothrow
This abstract operation is meant to be called from other specifications that may wish to query whether or not a writable stream is locked to a writer.
- Assert: !
IsWritableStream (stream) istrue . - If stream.[[writer]] is
undefined , returnfalse . - Return
true .
4.3.4. WritableStreamAbort ( stream, reason ) nothrow
- Let state be stream.[[state]].
- If state is
"closed", return a promise resolved withundefined . - If state is
"errored", return a promise rejected with stream.[[storedError]]. - Assert: state is
"writable"or"closing". - Let error be a new
TypeError indicating that the stream has been aborted. - Perform !
WritableStreamError (stream, error). - Let controller be stream.[[writableStreamController]].
- Assert: controller is not
undefined . - If controller.[[writing]] is
true or controller.[[inClose]] istrue ,- Set stream.[[pendingAbortRequest]] to a new promise.
- If controller.[[writing]] is
true , return the result of transforming stream.[[pendingAbortRequest]] by a fulfillment handler that returns !WritableStreamDefaultControllerAbort (controller, reason). - Otherwise, return stream.[[pendingAbortRequest]].
- Return !
WritableStreamDefaultControllerAbort (controller, reason).
4.4. Writable Stream Abstract Operations Used by Controllers
To allow future flexibility to add different writable stream behaviors (similar to the distinction between simple
readable streams and readable byte streams), much of the internal state of a writable stream is
encapsulated by the WritableStreamDefaultController class. At this point in time the division of work between the
stream and its controller may seems somewhat arbitrary, but centralizing much of the logic in the controller is a useful
structure for the future.
The abstract operations in this section are interfaces that are used by the controller implementation to affect its
associated WritableStream object, translating the controller’s internal state changes into developer-facing results
visible through the WritableStream's public API.
4.4.1. WritableStreamAddWriteRequest ( stream ) nothrow
- Assert: !
IsWritableStreamLocked (stream) istrue . - Assert: stream.[[state]] is
"writable". - Let promise be a new promise.
- Append promise as the last element of stream.[[writeRequests]].
- Return promise.
4.4.2. WritableStreamError ( stream, e ) nothrow
- Let oldState be stream.[[state]].
- Assert: oldState is
"writable"or"closing". - Set stream.[[state]] to
"errored". - Set stream.[[storedError]] to e.
- Let controller be stream.[[writableStreamController]].
- If controller is
undefined , or both controller.[[writing]] and controller.[[inClose]] arefalse , perform !WritableStreamRejectPromisesInReactionToError (stream). - Let writer be stream.[[writer]].
- If writer is not undefined,
- If oldState is
"writable"and !WritableStreamDefaultControllerGetBackpressure (stream.[[writableStreamController]]) istrue , reject writer.[[readyPromise]] with e. - Otherwise, set writer.[[readyPromise]] to a promise rejected with e.
- Set writer.[[readyPromise]].[[PromiseIsHandled]] to
true .
- If oldState is
4.4.3. WritableStreamFinishClose ( stream ) nothrow
- Assert: stream.[[state]] is
"closing"or"errored". - Let writer be stream.[[writer]].
- If stream.[[state]] is
"closing",- If writer is not
undefined , resolve writer.[[closedPromise]] withundefined . - Set stream.[[state]] to
"closed".
- If writer is not
- Otherwise if writer is not
undefined ,- Assert: stream.[[state]] is
"errored". - Reject writer.[[closedPromise]] with stream.[[storedError]].
- Set writer.[[closedPromise]].[[PromiseIsHandled]] to
true .
- Assert: stream.[[state]] is
- If stream.[[pendingAbortRequest]] is not
undefined ,- Resolve stream.[[pendingAbortRequest]] with
undefined . - Set stream.[[pendingAbortRequest]] to
undefined .
- Resolve stream.[[pendingAbortRequest]] with
4.4.4. WritableStreamRejectPromisesInReactionToError ( stream ) nothrow
- Assert: stream.[[state]] is
"errored". - Assert: stream.[[pendingWriteRequest]] is
undefined . - Let storedError be stream.[[storedError]].
- Repeat for each writeRequest that is an element of stream.[[writeRequests]],
- Reject writeRequest with storedError.
- Set stream.[[writeRequests]] to an empty
List . - If stream.[[pendingCloseRequest]] is not
undefined ,- Assert: stream.[[writableStreamController]].[[inClose]] is
false . - Reject stream.[[pendingCloseRequest]] with storedError.
- Set stream.[[pendingCloseRequest]] to
undefined .
- Assert: stream.[[writableStreamController]].[[inClose]] is
- Let writer be stream.[[writer]].
- If writer is not undefined,
- Reject writer.[[closedPromise]] with storedError.
- Set writer.[[closedPromise]].[[PromiseIsHandled]] to
true .
4.4.5. WritableStreamUpdateBackpressure ( stream, backpressure ) nothrow
- Assert: stream.[[state]] is
"writable". - Let writer be stream.[[writer]].
- If writer is
undefined , return. - If backpressure is
true ,- Set writer.[[readyPromise]] to a new promise.
- Otherwise,
- Assert: backpressure is
false . - Resolve writer.[[readyPromise]] with
undefined .
- Assert: backpressure is
4.5. Class WritableStreamDefaultWriter
The WritableStreamDefaultWriter class represents a writable stream writer designed to be vended by a WritableStream instance.
4.5.1. Class Definition
This section is non-normative.
If one were to write the WritableStreamDefaultWriter class in something close to the syntax of [ECMASCRIPT], it
would look like
class WritableStreamDefaultWriter {
constructor(stream)
get closed()
get desiredSize()
get ready()
abort(reason)
close()
releaseLock()
write(chunk)
}
4.5.2. Internal Slots
Instances of WritableStreamDefaultWriter are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[closedPromise]] | A promise returned by the writer’s closed getter
|
| [[ownerWritableStream]] | A WritableStream instance that owns this writer
|
| [[readyPromise]] | A promise returned by the writer’s ready getter
|
4.5.3. new WritableStreamDefaultWriter(stream)
WritableStreamDefaultWriter constructor is generally not meant to be used directly; instead, a
stream’s getWriter() method should be used. - If !
IsWritableStream (stream) isfalse , throw aTypeError exception. - If !
IsWritableStreamLocked (stream) istrue , throw aTypeError exception. - Set
this .[[ownerWritableStream]] to stream. - Set stream.[[writer]] to
this . - Let state be stream.[[state]].
- If state is
"writable"or"closing",- Set
this .[[closedPromise]] to a new promise.
- Set
- Otherwise if state is
"closed",- Set
this .[[closedPromise]] to a promise resolved withundefined .
- Set
- Otherwise,
- Assert: state is
"errored". - Set
this .[[closedPromise]] to a promise rejected with stream.[[storedError]]. - Set
this .[[closedPromise]].[[PromiseIsHandled]] totrue .
- Assert: state is
- If state is
"writable"and !WritableStreamDefaultControllerGetBackpressure (stream.[[writableStreamController]]) istrue ,- Set
this .[[readyPromise]] to a new promise.
- Set
- Otherwise,
- Set
this .[[readyPromise]] to a promise resolved withundefined .
- Set
4.5.4. Properties of the WritableStreamDefaultWriter Prototype
4.5.4.1. get closed
closed getter returns a promise that will be fulfilled when the stream becomes closed, or rejected if
the stream ever errors or the writer’s lock is released before the stream finishes
closing. - If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[closedPromise]].
4.5.4.2. get desiredSize
desiredSize getter returns the desired size to
fill the stream’s internal queue. It can be negative, if the queue is over-full. A producer should use this
information to determine the right amount of data to write.
It will be
- If !
IsWritableStreamDefaultWriter (this ) isfalse , throw aTypeError exception. - If
this .[[ownerWritableStream]] isundefined , throw aTypeError exception. - Return !
WritableStreamDefaultWriterGetDesiredSize (this ).
4.5.4.3. get ready
ready getter returns a promise that will be fulfilled when the desired size to fill the stream’s internal queue transitions from nonpositive to positive,
signaling that it is no longer applying backpressure. Once the desired size to fill the stream’s internal queue dips back to zero or below, the getter will return a new
promise that stays pending until the next transition.
If the stream becomes errored, or the writer’s lock is released, is the returned promise will become rejected.
- If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - Return
this .[[readyPromise]].
4.5.4.4. abort(reason)
abort method behaves the same as that for the
associated stream. (Otherwise, it returns a rejected promise.) - If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - If
this .[[ownerWritableStream]] isundefined , return a promise rejected with aTypeError exception. - Return !
WritableStreamDefaultWriterAbort (this , reason).
4.5.4.5. close()
close method will close the associated writable stream. The underlying sink will finish
processing any previously-written chunks, before invoking its close behavior. During this time any further
attempts to write will fail (without erroring the stream).
The method returns a promise that is fulfilled with closed.)
- If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - Let stream be
this .[[ownerWritableStream]]. - If stream is
undefined , return a promise rejected with aTypeError exception. - If stream.[[state]] is
"closing", return a promise rejected with aTypeError exception. - Return !
WritableStreamDefaultWriterClose (this ).
4.5.4.6. releaseLock()
releaseLock method releases the writer’s lock on the corresponding
stream. After the lock is released, the writer is no longer active. If the associated
stream is errored when the lock is released, the writer will appear errored in the same way from now on; otherwise,
the writer will appear closed.
Note that the lock can still be released even if some ongoing writes have not yet finished (i.e. even if the promises
returned from previous calls to write() have not yet settled). It’s not required to
hold the lock on the writer for the duration of the write; the lock instead simply prevents other producers from writing in an interleaved manner.
- If !
IsWritableStreamDefaultWriter (this ) isfalse , throw aTypeError exception. - Let stream be
this .[[ownerWritableStream]]. - If stream is
undefined , return. - Assert: stream.[[writer]] is not
undefined . - Perform !
WritableStreamDefaultWriterRelease (this ).
4.5.4.7. write(chunk)
write method writes the given chunk to the writable stream, by waiting until any previous
writes have finished successfully, and then sending the chunk to the underlying sink. It will return a
promise that fulfills with Note that what "success" means is up to the underlying sink; it may indicate simply that the chunk has been accepted, and not necessarily that it is safely saved to its ultimate destination.
- If !
IsWritableStreamDefaultWriter (this ) isfalse , return a promise rejected with aTypeError exception. - Let stream be
this .[[ownerWritableStream]]. - If stream is
undefined , return a promise rejected with aTypeError exception. - If stream.[[state]] is
"closing", return a promise rejected with aTypeError exception. - Return !
WritableStreamDefaultWriterWrite (this , chunk).
4.6. Writable Stream Writer Abstract Operations
4.6.1. IsWritableStreamDefaultWriter ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[ownerWritableStream]] internal slot, return
false . - Return
true .
4.6.2. WritableStreamDefaultWriterAbort ( writer, reason ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Return !
WritableStreamAbort (stream, reason).
4.6.3. WritableStreamDefaultWriterClose ( writer ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Let state be stream.[[state]].
- If state is
"closed"or"errored", return a promise rejected with aTypeError exception. - Assert: state is
"writable". - Set stream.[[pendingCloseRequest]] to a new promise.
- If !
WritableStreamDefaultControllerGetBackpressure (stream.[[writableStreamController]]) istrue , resolve writer.[[readyPromise]] withundefined . - Set stream.[[state]] to
"closing". - Perform !
WritableStreamDefaultControllerClose (stream.[[writableStreamController]]). - Return stream.[[pendingCloseRequest]].
4.6.4. WritableStreamDefaultWriterCloseWithErrorPropagation ( writer ) nothrow
This abstract operation helps implement the error propagation semantics of pipeTo().
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Let state be stream.[[state]].
- If state is
"closing"or"closed", return a promise resolved withundefined . - If state is
"errored", return a promise rejected with stream.[[storedError]]. - Assert: state is
"writable". - Return !
WritableStreamDefaultWriterClose (writer).
4.6.5. WritableStreamDefaultWriterGetDesiredSize ( writer ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Let state be stream.[[state]].
- If state is
"errored", returnnull . - If state is
"closed", return0 . - Return !
WritableStreamDefaultControllerGetDesiredSize (stream.[[writableStreamController]]).
4.6.6. WritableStreamDefaultWriterRelease ( writer ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Assert: stream.[[writer]] is writer.
- Let releasedError be a new
TypeError . - Let state be stream.[[state]].
- If state is
"writable"or"closing", or stream.[[pendingAbortRequest]] is notundefined , reject writer.[[closedPromise]] with releasedError. - Otherwise, set writer.[[closedPromise]] to a promise rejected with releasedError.
- Set writer.[[closedPromise]].[[PromiseIsHandled]] to
true . - If state is
"writable"and !WritableStreamDefaultControllerGetBackpressure (stream.[[writableStreamController]]) istrue , reject writer.[[readyPromise]] with releasedError. - Otherwise, set writer.[[readyPromise]] to a promise rejected with releasedError.
- Set writer.[[readyPromise]].[[PromiseIsHandled]] to
true . - Set stream.[[writer]] to
undefined . - Set writer.[[ownerReadableStream]] to
undefined .
4.6.7. WritableStreamDefaultWriterWrite ( writer, chunk ) nothrow
- Let stream be writer.[[ownerWritableStream]].
- Assert: stream is not
undefined . - Let state be stream.[[state]].
- If state is
"closed"or"errored", return a promise rejected with aTypeError exception. - Assert: state is
"writable". - Let promise be !
WritableStreamAddWriteRequest (stream). - Perform !
WritableStreamDefaultControllerWrite (stream.[[writableStreamController]], chunk). - Return promise.
4.7. Class WritableStreamDefaultController
The WritableStreamDefaultController class has methods that allow control of a WritableStream's state. When
constructing a WritableStream, the underlying sink is given a corresponding WritableStreamDefaultController instance to manipulate.
4.7.1. Class Definition
This section is non-normative.
If one were to write the WritableStreamDefaultController class in something close to the syntax of [ECMASCRIPT],
it would look like
class WritableStreamDefaultController {
constructor(stream, underlyingSink, size, highWaterMark)
error(e)
}
4.7.2. Internal Slots
Instances of WritableStreamDefaultController are created with the internal slots described in the following table:
| Internal Slot | Description (non-normative) |
|---|---|
| [[controlledWritableStream]] | The WritableStream instance controlled
|
| [[inClose]] | A boolean flag set to close method is
executing and has not yet fulfilled, used to prevent the abort() method from
interrupting close
|
| [[queue]] | A |
| [[started]] | A boolean flag indicating whether the underlying sink has finished starting |
| [[strategyHWM]] | A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying sink |
| [[strategySize]] | A function supplied to the constructor as part of the stream’s queuing strategy, designed to calculate
the size of enqueued chunks; can be |
| [[underlyingSink]] | An object representation of the stream’s underlying sink; also used for the IsWritableStreamDefaultController brand check |
| [[writing]] | A boolean flag set to write method is
executing and has not yet fulfilled, used to prevent reentrant calls
|
4.7.3. new WritableStreamDefaultController(stream, underlyingSink, size, highWaterMark)
WritableStreamDefaultController constructor cannot be used directly; it only works on a WritableStream that is in the middle of being constructed. - If !
IsWritableStream (stream) isfalse , throw aTypeError exception. - If stream.[[writableStreamController]] is not
undefined , throw aTypeError exception. - Set
this .[[controlledWritableStream]] to stream. - Set
this .[[underlyingSink]] to underlyingSink. - Set
this .[[queue]] to a new emptyList . - Set
this .[[started]] andthis .[[writing]] andthis .[[inClose]] tofalse . - Let normalizedStrategy be ?
ValidateAndNormalizeQueuingStrategy (size, highWaterMark). - Set
this .[[strategySize]] to normalizedStrategy.[[size]] andthis .[[strategyHWM]] to normalizedStrategy.[[highWaterMark]]. - Let backpressure be !
WritableStreamDefaultControllerGetBackpressure (this ). - If backpressure is
true , perform !WritableStreamUpdateBackpressure (stream, backpressure). - Let controller be
this . - Let startResult be ?
InvokeOrNoop (underlyingSink,"start", «this »). - Let startPromise be a promise resolved with startResult:
- Upon fulfillment of startPromise,
- Set controller.[[started]] to
true . - Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
- Set controller.[[started]] to
- Upon rejection of startPromise with reason r,
- Perform !
WritableStreamDefaultControllerErrorIfNeeded (controller, r).
- Perform !
- Upon fulfillment of startPromise,
4.7.4. Properties of the WritableStreamDefaultController Prototype
4.7.4.1. error(e)
error method will error the writable stream, making all future interactions with it fail with the
given error e.
This method is rarely used, since usually it suffices to return a rejected promise from one of the underlying sink’s methods. However, it can be useful for suddenly shutting down a stream in response to an event outside the normal lifecycle of interactions with the underlying sink.
- If !
IsWritableStreamDefaultController (this ) isfalse , throw aTypeError exception. - Let state be
this .[[controlledWritableStream]].[[state]]. - If state is
"closed"or"errored", throw aTypeError exception. - Perform !
WritableStreamDefaultControllerError (this , e).
4.8. Writable Stream Default Controller Abstract Operations
4.8.1. IsWritableStreamDefaultController ( x ) nothrow
- If
Type (x) is not Object, returnfalse . - If x does not have an [[underlyingSink]] internal slot, return
false . - Return
true .
4.8.2. WritableStreamDefaultControllerAbort ( controller, reason ) nothrow
- Set controller.[[queue]] to a new empty
List . - Let sinkAbortPromise be !
PromiseInvokeOrNoop (controller.[[underlyingSink]],"abort", « reason »). - Return the result of transforming sinkAbortPromise by a fulfillment handler that returns
undefined .
4.8.3. WritableStreamDefaultControllerClose ( controller ) nothrow
- Perform !
EnqueueValueWithSize (controller.[[queue]],"close",0 ). - Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
4.8.4. WritableStreamDefaultControllerGetDesiredSize ( controller ) nothrow
- Let queueSize be !
GetTotalQueueSize (controller.[[queue]]). - Return controller.[[strategyHWM]] − queueSize.
4.8.5. WritableStreamDefaultControllerWrite ( controller, chunk ) nothrow
- Let stream be controller.[[controlledWritableStream]].
- Assert: stream.[[state]] is
"writable". - Let chunkSize be
1 . - If controller.[[strategySize]] is not
undefined ,- Set chunkSize to
Call (controller.[[strategySize]],undefined , « chunk »). - If chunkSize is an
abrupt completion ,- Perform !
WritableStreamDefaultControllerErrorIfNeeded (controller, chunkSize.[[Value]]). - Return.
- Perform !
- Set chunkSize to
- Let writeRecord be
Record {[[chunk]]: chunk}. - Let lastBackpressure be !
WritableStreamDefaultControllerGetBackpressure (controller). - Let enqueueResult be !
EnqueueValueWithSize (controller.[[queue]], writeRecord, chunkSize). - If enqueueResult is an
abrupt completion ,- Perform !
WritableStreamDefaultControllerErrorIfNeeded (controller, enqueueResult.[[Value]]). - Return.
- Perform !
- If stream.[[state]] is
"writable",- Let backpressure be !
WritableStreamDefaultControllerGetBackpressure (controller). - If lastBackpressure is not backpressure, perform !
WritableStreamUpdateBackpressure (stream, backpressure).
- Let backpressure be !
- Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
4.8.6. WritableStreamDefaultControllerAdvanceQueueIfNeeded ( controller ) nothrow
- If controller.[[controlledWritableStream]].[[state]] is
"closed"or"errored", return. - If controller.[[started]] is
false , return. - If controller.[[writing]] is
true , return. - If controller.[[queue]] is empty, return.
- Let writeRecord be !
PeekQueueValue (controller.[[queue]]). - If writeRecord is
"close", performWritableStreamDefaultControllerProcessClose (controller). - Otherwise, perform
WritableStreamDefaultControllerProcessWrite (controller, writeRecord.[[chunk]]).
4.8.7. WritableStreamDefaultControllerErrorIfNeeded ( controller, e ) nothrow
- If controller.[[controlledWritableStream]].[[state]] is
"writable"or"closing", perform !WritableStreamDefaultControllerError (controller, e).
4.8.8. WritableStreamDefaultControllerProcessClose ( controller ) nothrow
- Let stream be controller.[[controlledWritableStream]].
- Assert: stream.[[state]] is
"closing". - Perform !
DequeueValue (controller.[[queue]]). - Assert: controller.[[queue]] is empty.
- Set controller.[[inClose]] to
true . - Let sinkClosePromise be !
PromiseInvokeOrNoop (controller.[[underlyingSink]],"close", « controller »). - Upon fulfillment of sinkClosePromise,
- Assert: controller.[[inClose]] is
true . - Set controller.[[inClose]] to
false . - Assert: stream.[[state]] is
"closing"or"errored". - Assert: stream.[[pendingCloseRequest]] is not
undefined . - Resolve stream.[[pendingCloseRequest]] with
undefined . - Set stream.[[pendingCloseRequest]] to
undefined . - Perform !
WritableStreamFinishClose (stream).
- Assert: controller.[[inClose]] is
- Upon rejection of sinkClosePromise with reason r,
- Assert: controller.[[inClose]] is
true . - Set controller.[[inClose]] to
false . - Assert: stream.[[pendingCloseRequest]] is not
undefined . - Reject stream.[[pendingCloseRequest]] with r.
- Set stream.[[pendingCloseRequest]] to
undefined . - If stream.[[pendingAbortRequest]] is not
undefined ,- Reject stream.[[pendingAbortRequest]] with r.
- Set stream.[[pendingAbortRequest]] to
undefined .
- Perform !
WritableStreamDefaultControllerErrorIfNeeded (controller, r).
- Assert: controller.[[inClose]] is
4.8.9. WritableStreamDefaultControllerProcessWrite ( controller, chunk ) nothrow
- Set controller.[[writing]] to
true . - Let stream be controller.[[controllerWritableStream]].
- Assert: stream.[[pendingWriteRequest]] is undefined.
- Assert: stream.[[writeRequests]] is not empty.
- Let writeRequest be the first element of stream.[[writeRequests]].
- Remove writeRequest from stream.[[writeRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
- Set stream.[[pendingWriteRequest]] to writeRequest.
- Let sinkWritePromise be !
PromiseInvokeOrNoop (controller.[[underlyingSink]],"write", « chunk, controller »). - Upon fulfillment of sinkWritePromise,
- Assert: controller.[[writing]] is
true . - Set controller.[[writing]] to
false . - Assert: stream.[[pendingWriteRequest]] is not
undefined . - Resolve stream.[[pendingWriteRequest]] with
undefined . - Set stream.[[pendingWriteRequest]] to
undefined . - Let state be stream.[[state]].
- If state is
"errored",- Perform !
WritableStreamRejectPromisesInReactionToError (stream). - If stream.[[pendingAbortRequest]] is not
undefined ,- Resolve stream.[[pendingAbortRequest]] with
undefined . - Set stream.[[pendingAbortRequest]] to
undefined .
- Resolve stream.[[pendingAbortRequest]] with
- Return.
- Perform !
- Let lastBackpressure be !
WritableStreamDefaultControllerGetBackpressure (controller). - Perform !
DequeueValue (controller.[[queue]]). - If state is not
"closing",- Let backpressure be !
WritableStreamDefaultControllerGetBackpressure (controller). - If lastBackpressure is not backpressure, perform !
WritableStreamUpdateBackpressure (controller.[[controlledWritableStream]], backpressure).
- Let backpressure be !
- Perform !
WritableStreamDefaultControllerAdvanceQueueIfNeeded (controller).
- Assert: controller.[[writing]] is
- Upon rejection of sinkWritePromise with r,
- Assert: controller.[[writing]] is
true . - Set controller.[[writing]] to
false . - Assert: stream.[[pendingWriteRequest]] is not
undefined . - Reject stream.[[pendingWriteRequest]] with r.
- Set stream.[[pendingWriteRequest]] to
undefined . - If stream.[[state]] is
"errored",- Set stream.[[storedError]] to r.
- Perform !
WritableStreamRejectPromisesInReactionToError (stream).
- If stream.[[pendingAbortRequest]] is not
undefined ,- Reject stream.[[pendingAbortRequest]] with r.
- Set stream.[[pendingAbortRequest]] to
undefined .
- Perform !
WritableStreamDefaultControllerErrorIfNeeded (controller, r).
- Assert: controller.[[writing]] is
4.8.10. WritableStreamDefaultControllerGetBackpressure ( controller ) nothrow
- Let desiredSize be !
WritableStreamDefaultControllerGetDesiredSize (controller). - Return desiredSize ≤
0 .
4.8.11. WritableStreamDefaultControllerError ( controller, e ) nothrow
- Let stream be controller.[[controlledWritableStream]].
- Assert: stream.[[state]] is
"writable"or"closing". - Perform !
WritableStreamError (stream, e). - Set controller.[[queue]] to a new empty
List .
5. Transform Streams
Transform streams have been developed in the testable implementation, but not yet re-encoded in spec language. We are waiting to validate their design before doing so. In the meantime, see reference-implementation/lib/transform-stream.js.
6. Other Stream APIs and Operations
6.1. Class ByteLengthQueuingStrategy
A common queuing strategy when dealing with bytes is to wait until the accumulated byteLength properties of the incoming chunks reaches a specified high-water mark. As such, this is provided as a built-in queuing strategy that can be used when constructing streams.
const stream = new ReadableStream(
{ ... },
new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 })
);
In this case, 16 KiB worth of chunks can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream = new WritableStream(
{ ... },
new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 })
);
In this case, 32 KiB worth of chunks can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers.
6.1.1. Class Definition
This section is non-normative.
If one were to write the ByteLengthQueuingStrategy class in something close to the syntax of [ECMASCRIPT], it
would look like
class ByteLengthQueuingStrategy {
constructor({ highWaterMark })
size(chunk)
}
Each ByteLengthQueuingStrategy instance will additionally have an own data property highWaterMark set by its constructor.
6.1.2. new ByteLengthQueuingStrategy({ highWaterMark })
- Perform !
CreateDataProperty (this ,"highWaterMark", highWaterMark).
6.1.3. Properties of the ByteLengthQueuingStrategy Prototype
6.1.3.1. size(chunk)
size method returns the given chunk’s byteLength property. (If the chunk doesn’t have
one, it will return This method is intentionally generic; it does not require that its ByteLengthQueuingStrategy object.
- Return ?
GetV (chunk,"byteLength").
6.2. Class CountQueuingStrategy
A common queuing strategy when dealing with streams of generic objects is to simply count the number of chunks that have been accumulated so far, waiting until this number reaches a specified high-water mark. As such, this strategy is also provided out of the box.
const stream = new ReadableStream(
{ ... },
new CountQueuingStrategy({ highWaterMark: 10 })
);
In this case, 10 chunks (of any kind) can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.
const stream = new WritableStream(
{ ... },
new CountQueuingStrategy({ highWaterMark: 5 })
);
In this case, five chunks (of any kind) can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers.
6.2.1. Class Definition
This section is non-normative.
If one were to write the CountQueuingStrategy class in something close to the syntax of [ECMASCRIPT], it would
look like
class CountQueuingStrategy {
constructor({ highWaterMark })
size()
}
Each CountQueuingStrategy instance will additionally have an own data property highWaterMark set by its constructor.
6.2.2. new CountQueuingStrategy({ highWaterMark })
- Perform !
CreateDataProperty (this ,"highWaterMark", highWaterMark).
6.2.3. Properties of the CountQueuingStrategy Prototype
6.2.3.1. size()
size method returns one always, so that the total queue size is a count of the number of chunks in
the queue.
This method is intentionally generic; it does not require that its CountQueuingStrategy object.
- Return
1 .
6.3. Queue-with-Sizes Operations
The streams in this specification use a "queue-with-sizes" data structure to store queued up values, along with their
determined sizes. A queue-with-sizes is a
A number of abstract operations are specified here to make working with queues-with-sizes more pleasant, and used throughout the rest of this standard.
6.3.1. DequeueValue ( queue ) nothrow
- Assert: queue is not empty.
- Let pair be the first element of queue.
- Remove pair from queue, shifting all other elements downward (so that the second becomes the first, and so on).
- Return pair.[[value]].
6.3.2. EnqueueValueWithSize ( queue, value, size ) throws
- Let size be ?
ToNumber (size). - If !
IsFiniteNonNegativeNumber (size) isfalse , throw aRangeError exception. - Append
Record {[[value]]: value, [[size]]: size} as the last element of queue.
6.3.3. GetTotalQueueSize ( queue ) nothrow
- Let totalSize be
0 . - Repeat for each
Record {[[value]], [[size]]} pair that is an element of queue,- Assert: pair.[[size]] is a finite, non-
NaN number. - Set totalSize to totalSize + pair.[[size]].
- Assert: pair.[[size]] is a finite, non-
- Return totalSize.
6.3.4. PeekQueueValue ( queue ) nothrow
- Assert: queue is not empty.
- Let pair be the first element of queue.
- Return pair.[[value]].
6.4. Miscellaneous Operations
A few abstract operations are used in this specification for utility purposes. We define them here.
6.4.1. InvokeOrNoop ( O, P, args ) throws
- Assert: O is not
undefined . - Assert: !
IsPropertyKey (P) istrue . - Assert: args is a
List . - Let method be ?
GetV (O, P). - If method is
undefined , returnundefined . - Return ?
Call (method, O, args).
6.4.2. IsFiniteNonNegativeNumber ( v ) nothrow
- If v is
NaN , returnfalse . - If v is
+∞ , returnfalse . - If v <
0 , returnfalse . - Return
true .
6.4.3. PromiseInvokeOrNoop ( O, P, args ) nothrow
- Assert: O is not
undefined . - Assert: !
IsPropertyKey (P) istrue . - Assert: args is a
List . - Let returnValue be
InvokeOrNoop (O, P, args). - If returnValue is an
abrupt completion , return a promise rejected with result.[[Value]]. - Otherwise, return a promise resolved with returnValue.[[Value]].
6.4.4. ValidateAndNormalizeHighWaterMark ( highWaterMark ) throws
- Set highWaterMark to ?
ToNumber (highWaterMark). - If highWaterMark is
NaN or highWaterMark <0 , throw aRangeError exception.+∞ is explicitly allowed as a valid high water mark. It causes backpressure to never be applied. - Return highWaterMark.
6.4.5. ValidateAndNormalizeQueuingStrategy ( size, highWaterMark ) throws
- If size is not
undefined and !IsCallable (size) isfalse , throw aTypeError exception. - Let highWaterMark be ?
ValidateAndNormalizeHighWaterMark (highWaterMark). - Return
Record {[[size]]: size, [[highWaterMark]]: highWaterMark}.
7. Global Properties
The following constructors must be exposed on the
The attributes of these properties must be { [[Writable]]:
ReadableStreamDefaultReader, ReadableStreamBYOBReader, ReadableStreamDefaultController, ReadableByteStreamController, WritableStreamDefaultWriter, and WritableStreamDefaultController classes are
specifically not exposed, as they are not independently useful. 8. Examples of Creating Streams
This section, and all its subsections, are non-normative.
The previous examples throughout the standard have focused on how to use streams. Here we show how to create a stream,
using the ReadableStream or WritableStream constructors.
8.1. A readable stream with an underlying push source (no backpressure support)
The following function creates readable streams that wrap WebSocket instances [HTML], which are push sources that do not support backpressure signals. It illustrates how, when adapting a push source, usually most of the work
happens in the start function.
function makeReadableWebSocketStream(url, protocols) {
const ws = new WebSocket(url, protocols);
ws.binaryType = "arraybuffer";
return new ReadableStream({
start(controller) {
ws.onmessage = event => controller.enqueue(event.data);
ws.onclose = () => controller.close();
ws.onerror = () => controller.error(new Error("The WebSocket errored!"));
},
cancel() {
ws.close();
}
});
}
We can then use this function to create readable streams for a web socket, and pipe that stream to an arbitrary writable stream:
const webSocketStream = makeReadableWebSocketStream("wss://example.com:443/", "protocol");
webSocketStream.pipeTo(writableStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
8.2. A readable stream with an underlying push source and backpressure support
The following function returns readable streams that wrap "backpressure sockets," which are hypothetical objects
that have the same API as web sockets, but also provide the ability to pause and resume the flow of data with their readStop and readStart methods. In doing so, this example shows how to apply backpressure to underlying sources that support it.
function makeReadableBackpressureSocketStream(host, port) {
const socket = createBackpressureSocket(host, port);
return new ReadableStream({
start(controller) {
socket.ondata = event => {
controller.enqueue(event.data);
if (controller.desiredSize <= 0) {
// The internal queue is full, so propagate
// the backpressure signal to the underlying source.
socket.readStop();
}
};
socket.onend = () => controller.close();
socket.onerror = () => controller.error(new Error("The socket errored!"));
},
pull() {
// This is called if the internal queue has been emptied, but the
// stream’s consumer still wants more data. In that case, restart
// the flow of data if we have previously paused it.
socket.readStart();
},
cancel() {
socket.close();
}
});
}
We can then use this function to create readable streams for such "backpressure sockets" in the same way we do for web sockets. This time, however, when we pipe to a destination that cannot accept data as fast as the socket is producing it, or if we leave the stream alone without reading from it for some time, a backpressure signal will be sent to the socket.
8.3. A readable byte stream with an underlying push source (no backpressure support)
The following function returns readable byte streams that wraps a hypothetical UDP socket API, including a
promise-returning select2() method that is meant to be evocative of the POSIX select(2) system call.
Since the UDP protocol does not have any built-in backpressure support, the backpressure signal given by desiredSize is ignored, and the stream ensures that when data is available from the
socket but not yet requested by the developer, it is enqueued in the stream’s internal queue, to avoid overflow
of the kernel-space queue and a consequent loss of data.
This has some interesting consequences for how consumers interact with the stream. If the consumer does not read data as fast as the socket produces it, the chunks will remain in the stream’s internal queue indefinitely. In this case, using a BYOB reader will cause an extra copy, to move the data from the stream’s internal queue to the developer-supplied buffer. However, if the consumer consumes the data quickly enough, a BYOB reader will allow zero-copy reading directly into developer-supplied buffers.
(You can imagine a more complex version of this example which uses desiredSize to
inform an out-of-band backpressure signaling mechanism, for example by sending a message down the socket to adjust the
rate of data being sent. That is left as an exercise for the reader.)
const DEFAULT_CHUNK_SIZE = 65536;
function makeUDPSocketStream(host, port) {
const socket = createUDPSocket(host, port);
return new ReadableStream({
type: "bytes",
start(controller) {
readRepeatedly().catch(e => controller.error(e));
function readRepeatedly() {
return socket.select2().then(() => {
// Since the socket can become readable even when there’s
// no pending BYOB requests, we need to handle both cases.
let bytesRead;
if (controller.byobRequest) {
const v = controller.byobRequest.view;
bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
controller.byobRequest.respond(bytesRead);
} else {
const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
}
if (bytesRead === 0) {
controller.close();
return;
}
return readRepeatedly();
});
}
},
cancel() {
socket.close();
}
});
}
ReadableStream instances returned from this function can now vend BYOB readers, with all of the
aforementioned benefits and caveats.
8.4. A readable stream with an underlying pull source
The following function returns readable streams that wrap portions of the Node.js file system API (which themselves map fairly directly to C’s fopen, fread, and fclose trio). Files are a typical example of pull
sources. Note how in contrast to the examples with push sources, most of the work here happens on-demand in the pull function, and not at startup time in the start function.
const fs = require("pr/fs"); // https://github.com/jden/pr
const CHUNK_SIZE = 1024;
function makeReadableFileStream(filename) {
let fd;
let position = 0;
return new ReadableStream({
start() {
return fs.open(filename, "r").then(result => {
fd = result;
});
},
pull(controller) {
const buffer = new ArrayBuffer(CHUNK_SIZE);
return fs.read(fd, buffer, 0, CHUNK_SIZE, position).then(bytesRead => {
if (bytesRead === 0) {
return fs.close(fd).then(() => controller.close());
} else {
position += bytesRead;
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
}
});
},
cancel() {
return fs.close(fd);
}
});
}
We can then create and use readable streams for files just as we could before for sockets.
8.5. A readable byte stream with an underlying pull source
The following function returns readable byte streams that allow efficient zero-copy reading of files, again using the Node.js file system API. Instead of using a predetermined chunk size of 1024, it attempts to fill the developer-supplied buffer, allowing full control.
const fs = require("pr/fs"); // https://github.com/jden/pr
const DEFAULT_CHUNK_SIZE = 1024;
function makeReadableByteFileStream(filename) {
let fd;
let position = 0;
return new ReadableStream({
type: "bytes",
start() {
return fs.open(filename, "r").then(result => {
fd = result;
});
},
pull(controller) {
// Even when the consumer is using the default reader, the auto-allocation
// feature allocates a buffer and passes it to us via byobRequest.
const v = controller.byobRequest.view;
return fs.read(fd, v.buffer, v.byteOffset, v.byteLength, position).then(bytesRead => {
if (bytesRead === 0) {
return fs.close(fd).then(() => controller.close());
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
}
});
},
cancel() {
return fs.close(fd);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE
});
}
With this in hand, we can create and use BYOB readers for the returned ReadableStream. But we can
also create default readers, using them in the same simple and generic manner as usual. The adaptation between
the low-level byte tracking of the underlying byte source shown here, and the higher-level chunk-based
consumption of a default reader, is all taken care of automatically by the streams implementation. The
auto-allocation feature, via the autoAllocateChunkSize option, even allows us to write less code, compared
to the manual branching in §8.3 A readable byte stream with an underlying push source (no backpressure support).
8.6. A writable stream with no backpressure or success signals
The following function returns a writable stream that wraps a WebSocket [HTML]. Web sockets do not provide
any way to tell when a given chunk of data has been successfully sent (without awkward polling of bufferedAmount, which we leave as an exercise to the reader). As such, this writable stream has no ability
to communicate accurate backpressure signals or write success/failure to its producers. That is, the
promises returned by its writer’s write() method and ready getter will always fulfill immediately.
function makeWritableWebSocketStream(url, protocols) {
const ws = new WebSocket(url, protocols);
return new WritableStream({
start(controller) {
ws.onerror = () => controller.error(new Error("The WebSocket errored!"));
return new Promise(resolve => ws.onopen = resolve);
},
write(chunk) {
ws.send(chunk);
// Return immediately, since the web socket gives us no easy way to tell
// when the write completes.
},
close() {
return new Promise((resolve, reject) => {
ws.onclose = resolve;
ws.close(1000);
});
},
abort(reason) {
return new Promise((resolve, reject) => {
ws.onclose = resolve;
ws.close(4000, reason && reason.message);
});
}
});
}
We can then use this function to create writable streams for a web socket, and pipe an arbitrary readable stream to it:
const webSocketStream = makeWritableWebSocketStream("wss://example.com:443/", "protocol");
readableStream.pipeTo(webSocketStream)
.then(() => console.log("All data successfully written!"))
.catch(e => console.error("Something went wrong!", e));
8.7. A writable stream with backpressure and success signals
The following function returns writable streams that wrap portions of the Node.js file system API (which themselves map fairly directly to C’s fopen, fwrite, and fclose trio). Since the API we are wrapping provides a way to
tell when a given write succeeds, this stream will be able to communicate backpressure signals as well as whether
an individual write succeeded or failed.
const fs = require("pr/fs"); // https://github.com/jden/pr
function makeWritableFileStream(filename) {
let fd;
return new WritableStream({
start() {
return fs.open(filename, "w").then(result => {
fd = result;
});
},
write(chunk) {
return fs.write(fd, chunk, 0, chunk.length);
},
close() {
return fs.close(fd);
},
abort() {
return fs.close(fd);
}
});
}
We can then use this function to create a writable stream for a file, and write individual chunks of data to it:
const fileStream = makeWritableFileStream("/example/path/on/fs.txt");
const writer = fileStream.getWriter();
writer.write("To stream, or not to stream\n");
writer.write("That is the question\n");
writer.close()
.then(() => console.log("chunks written and stream closed successfully!"))
.catch(e => console.error(e));
Note that if a particular call to fs.write takes a longer time, the returned promise will fulfill later.
In the meantime, additional writes can be queued up, which are stored in the stream’s internal queue. The accumulation
of chunks in this queue can change the stream to return a pending promise from the ready getter, which is a signal to producers that they should back off and stop writing if possible.
The way in which the writable stream queues up writes is especially important in this case, since as stated in the documentation for fs.write, "it is unsafe to use fs.write multiple times on the same file without waiting
for the [promise]." But we don’t have to worry about that when writing the makeWritableFileStream function, since the stream implementation guarantees that the underlying sink’s write method will
not be called until any promises returned by previous calls have fulfilled!
8.8. A { readable, writable } stream pair wrapping the same underlying resource
The following function returns an object of the form { readable, writable }, with the readable property containing a readable stream and the writable property containing a
writable stream, where both streams wrap the same underlying web socket resource. In essence, this combines §8.1 A readable stream with an underlying push source (no backpressure support) and §8.6 A writable stream with no backpressure or success signals.
While doing so, it illustrates how you can use JavaScript classes to create reusable underlying sink and underlying source abstractions.
function streamifyWebSocket(url, protocol) {
const ws = new WebSocket(url, protocols);
ws.binaryType = "arraybuffer";
return {
readable: new ReadableStream(new WebSocketSource(ws)),
writable: new WritableStream(new WebSocketSink(ws))
};
}
class WebSocketSource {
constructor(ws) {
this._ws = ws;
}
start(controller) {
this._ws.onmessage = event => controller.enqueue(event.data);
this._ws.onclose = () => controller.close();
this._ws.addEventListener("error", () => {
controller.error(new Error("The WebSocket errored!"));
});
}
cancel() {
this._ws.close();
}
}
class WebSocketSink {
constructor(ws) {
this._ws = ws;
}
start(controller) {
this._ws.addEventListener("error", () => {
controller.error(new Error("The WebSocket errored!"));
});
return new Promise(resolve => this._ws.onopen = resolve);
}
write(chunk) {
this._ws.send(chunk);
}
close() {
return new Promise((resolve, reject) => {
this._ws.onclose = resolve;
this._ws.close();
});
}
abort(reason) {
return new Promise((resolve, reject) => {
ws.onclose = resolve;
ws.close(4000, reason && reason.message);
});
}
}
We can then use the objects created by this function to communicate with a remote web socket, using the standard stream APIs:
const streamyWS = streamifyWebSocket("wss://example.com:443/", "protocol");
const writer = streamyWS.writable.getWriter();
const reader = streamyWS.readable.getReader();
writer.write("Hello");
writer.write("web socket!");
reader.read().then(({ value, done }) => {
console.log("The web socket says: ", value);
});
Note how in this setup canceling the readable side will implicitly close the writable side,
and similarly, closing or aborting the writable side will implicitly close the readable side.
Conventions
This specification uses algorithm conventions very similar to those of [ECMASCRIPT]. However, it deviates in the following ways, mostly for brevity. It is hoped (and vaguely planned) that eventually the conventions of ECMAScript itself will evolve in these ways.
- We use destructuring notation in function and method declarations, and assume that the destructuring assignment procedure was performed before the algorithm starts.
- We similarly use the default argument notation
= {}in a couple of cases. - We use "
this " instead of "this value". - We use the shorthand phrases from the [PROMISES-GUIDE] to operate on promises at a higher level than the ECMAScript spec does.
Acknowledgments
The editor would like to thank Adam Rice, Anne van Kesteren, Ben Kelly, Brian di Palma, Calvin Metcalf, Dominic Tarr, Ed Hager, Forbes Lindesay, 贺师俊 (hax), isonmad, Jake Archibald, Jens Nockert, Mangala Sadhu Sangeet Singh Khalsa, Marcos Caceres, Marvin Hagemeister, Michael Mior, Mihai Potra, Simon Menke, Stephen Sugden, Tab Atkins, Tanguy Krotoff, Thorsten Lorenz, Till Schneidereit, Tim Caswell, Trevor Norris, tzik, Youenn Fablet, and Xabier Rodríguez for their contributions to this specification.
Special thanks to: Bert Belder for bringing up implementation concerns that led to crucial API changes; Forrest Norvell for his work on the initial reference implementation; Gorgi Kosev for his breakthrough idea of separating piping into two methods, thus resolving a major sticking point; Isaac Schlueter for his pioneering work on JavaScript streams in Node.js; Jake Verbaten for his early involvement and support; Janessa Det for the logo; Will Chan for his help ensuring that the API allows high-performance network streaming; and 平野裕 (Yutaka Hirano) for his help with the readable stream reader design.
This standard is written by Domenic Denicola (Google, [email protected]) and 吉野剛史 (Takeshi Yoshino, Google, [email protected]).
Per CC0, to the extent possible under law, the editor has waived all copyright and related or neighboring rights to this work.