Streams

Living Standard — Last Updated

Participate:
GitHub whatwg/streams (new issue, open issues)
IRC: #whatwg on Freenode
Commits:
GitHub whatwg/streams/commits
Snapshot as of this commit
@streamsstandard
Translation (non-normative):
日本語

Abstract

This specification provides APIs for creating, composing, and consuming streams of data. These streams are designed to map efficiently to low-level I/O primitives, and allow easy composition with built-in backpressure and queuing. On top of streams, the web platform can build higher-level abstractions, such as filesystem or socket APIs, while at the same time users can use the supplied tools to build their own streams which integrate well with those of the web platform.

1. Introduction

This section is non-normative.

Large swathes of the web platform are built on streaming data: that is, data that is created, processed, and consumed in an incremental fashion, without ever reading all of it into memory. The Streams Standard provides a common set of APIs for creating and interfacing with such streaming data, embodied in readable streams, writable streams, and transform streams.

This standard provides the base stream primitives which other parts of the web platform can use to expose their streaming data. For example, [FETCH] could expose request bodies as a writable stream, or response bodies as a readable stream. More generally, the platform is full of streaming abstractions waiting to be expressed as streams: multimedia streams, file streams, interprocess communication, and more benefit from being able to process data incrementally instead of buffering it all into memory and processing it in one go. By providing the foundation for these streams to be exposed to developers, the Streams Standard enables use cases like:

The APIs described here provide unifying abstraction for all such streams, encouraging an ecosystem to grow around these shared and composable interfaces. At the same time, they have been carefully designed to map efficiently to low-level I/O concerns, and to encapsulate the trickier issues (such as backpressure) that come along for the ride.

2. Model

A chunk#chunkReferenced in:2.1. Readable Streams (2)2.2. Writable Streams2.4. Pipe Chains and Backpressure2.5. Internal Queues and Queuing Strategies (2) (3)2.6. Locking3.1. Using Readable Streams3.2. Class ReadableStream3.2.4.3. getReader(options = {}) (2) (3)3.2.4.6. tee()3.5.2. Internal Slots3.5.4.3. read()3.6.2. Internal Slots3.8.2. Internal Slots (2) (3)3.8.4.2. close()3.8.4.3. enqueue(chunk)3.9.5. ReadableStreamDefaultControllerEnqueue ( controller, chunk )3.10.2. Internal Slots (2) (3)3.10.4.3. close()3.10.4.4. enqueue(chunk)4.1. Using Writable Streams4.2.2. Internal Slots4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {}) (2)4.2.4.6. write(chunk)6.1. Class ByteLengthQueuingStrategy (2) (3)6.2. Class CountQueuingStrategy (2)8.7. A writable stream with backpressure and success signals is a single piece of data that is written to or read from a stream. It can be of any type; streams can even contain chunks of different types. A chunk will often not be the most atomic unit of data for a given stream; for example a byte stream might contain chunks consisting of 16 KiB Uint8Arrays, instead of single bytes.

2.1. Readable Streams

A readable stream#readable-streamReferenced in:1. Introduction2.1. Readable Streams2.6. Locking3.2. Class ReadableStream3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})3.2.4.4. pipeThrough({ writable, readable }, options)3.2.4.5. pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})4.1. Using Writable Streams6.1. Class ByteLengthQueuingStrategy6.2. Class CountQueuingStrategy8.1. A readable stream with an underlying push source (no backpressure support)8.2. A readable stream with an underlying push source and backpressure support8.4. A readable stream with an underlying pull source represents a source of data, from which you can read. In other words, data comes out of a readable stream.

Although a readable stream can be created with arbitrary behavior, most readable streams wrap a lower-level I/O source, called the underlying source#underlying-sourceReferenced in:2.1. Readable Streams (2) (3)2.4. Pipe Chains and Backpressure (2)2.5. Internal Queues and Queuing Strategies2.6. Locking3.2. Class ReadableStream3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2) (3)3.2.4.6. tee()3.4. Readable Stream Abstract Operations Used by Controllers3.5.4.3. read()3.8. Class ReadableStreamDefaultController3.8.2. Internal Slots (2) (3) (4) (5)3.8.4.1. get desiredSize3.10.2. Internal Slots3.10.4.2. get desiredSize6.1. Class ByteLengthQueuingStrategy6.2. Class CountQueuingStrategy8.2. A readable stream with an underlying push source and backpressure support. There are two types of underlying source: push sources and pull sources.

Push sources#push-sourceReferenced in:3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})8.1. A readable stream with an underlying push source (no backpressure support) push data at you, whether or not you are listening for it. They may also provide a mechanism for pausing and resuming the flow of data. An example push source is a TCP socket, where data is constantly being pushed from the OS level, at a rate that can be controlled by changing the TCP window size.

Pull sources#pull-sourceReferenced in:3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})8.4. A readable stream with an underlying pull source require you to request data from them. The data may be available synchronously, e.g. if it is held by the operating system’s in-memory buffers, or asynchronously, e.g. if it has to be read from disk. An example pull source is a file handle, where you seek to specific locations and read specific amounts.

Readable streams are designed to wrap both types of sources behind a single, unified interface.

Chunks are enqueued into the stream by the stream’s underlying source. They can then be read one at a time via the stream’s public interface.

Code that reads from a readable stream using its public interface is known as a consumer#consumerReferenced in:2.6. Locking3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2)3.5.2. Internal Slots3.6.2. Internal Slots3.8.4.2. close()3.10.4.3. close().

Consumers also have the ability to cancel#cancel-a-readable-streamReferenced in:2.6. Locking (2)3.2.4.2. cancel(reason)3.2.4.3. getReader(options = {})3.2.4.6. tee() a readable stream. This indicates that the consumer has lost interest in the stream, and will immediately close the stream, throw away any queued chunks, and execute any cancellation mechanism of the underlying source.

Consumers can also tee#tee-a-readable-streamReferenced in:2.4. Pipe Chains and Backpressure3.1. Using Readable Streams3.2.4.6. tee()3.3.6. ReadableStreamTee ( stream, shouldClone ) a readable stream. This will lock the stream, making it no longer directly usable; however, it will create two new streams, called branches, which can be consumed independently.

For streams representing bytes, an extended version of the readable stream is provided to handle bytes efficiently, in particular by minimizing copies. The underlying source for such a readable stream is called a underlying byte source#underlying-byte-sourceReferenced in:3.4. Readable Stream Abstract Operations Used by Controllers3.6.4.3. read(view)3.10. Class ReadableByteStreamController3.10.2. Internal Slots (2) (3) (4) (5)8.5. A readable byte stream with an underlying pull source. A readable stream whose underlying source is an underlying byte source is sometimes called a readable byte stream#readable-byte-streamReferenced in:2.6. Locking3.1. Using Readable Streams3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2)3.2.4.3. getReader(options = {})3.4. Readable Stream Abstract Operations Used by Controllers3.8. Class ReadableStreamDefaultController8.3. A readable byte stream with an underlying push source and backpressure support8.5. A readable byte stream with an underlying pull source.

2.2. Writable Streams

A writable stream#writable-streamReferenced in:1. Introduction3.1. Using Readable Streams (2)3.2.4.5. pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})6.1. Class ByteLengthQueuingStrategy6.2. Class CountQueuingStrategy8.6. A writable stream with no backpressure or success signals8.7. A writable stream with backpressure and success signals represents a destination for data, into which you can write. In other words, data goes in to a writable stream.

Analogously to readable streams, most writable streams wrap a lower-level I/O sink, called the underlying sink#underlying-sinkReferenced in:2.2. Writable Streams (2)2.4. Pipe Chains and Backpressure2.5. Internal Queues and Queuing Strategies4.1. Using Writable Streams (2) (3)4.2.2. Internal Slots (2) (3) (4)4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {}) (2) (3) (4) (5)4.2.4.3. get state (2) (3)4.2.4.4. abort(reason)4.2.4.5. close()4.2.4.6. write(chunk)6.1. Class ByteLengthQueuingStrategy6.2. Class CountQueuingStrategy8.7. A writable stream with backpressure and success signals. Writable streams work to abstract away some of the complexity of the underlying sink, by queuing subsequent writes and only delivering them to the underlying sink one by one.

Chunks are written to the stream via its public interface, and are passed one at a time to the stream’s underlying sink.

Code that writes into a writable stream using its public interface is known as a producer#producerReferenced in:4.2.2. Internal Slots4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {})6.1. Class ByteLengthQueuingStrategy6.2. Class CountQueuingStrategy8.6. A writable stream with no backpressure or success signals8.7. A writable stream with backpressure and success signals.

Producers also have the ability to abort a writable stream. This indicates that the producer believes something has gone wrong, and that future writes should be discontinued. It puts the stream in an errored state, even without a signal from the underlying sink.

2.3. Transform Streams

A transform stream#transform-streamReferenced in:1. Introduction3.2.4.4. pipeThrough({ writable, readable }, options) consists of a pair of streams: a writable stream, and a readable stream. In a manner specific to the transform stream in question, writes to the writable side result in new data being made available for reading from the readable side.

Some examples of transform streams include:

2.4. Pipe Chains and Backpressure

Streams are primarily used by piping#pipingReferenced in:2.6. Locking3.1. Using Readable Streams (2) (3)3.2.4.4. pipeThrough({ writable, readable }, options)3.2.4.5. pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})4.1. Using Writable Streams them to each other. A readable stream can be piped directly to a writable stream, or it can be piped through one or more transform streams first.

A set of streams piped together in this way is referred to as a pipe chain#pipe-chainReferenced in:3.2.4.4. pipeThrough({ writable, readable }, options). In a pipe chain, the original source is the underlying source of the first readable stream in the chain; the ultimate sink is the underlying sink of the final writable stream in the chain.

Once a pipe chain is constructed, it can be used to propagate signals regarding how fast chunks should flow through it. If any step in the chain cannot yet accept chunks, it propagates a signal backwards through the pipe chain, until eventually the original source is told to stop producing chunks so fast. This process of normalizing flow from the original source according to how fast the chain can process chunks is called backpressure#backpressureReferenced in:1. Introduction2.4. Pipe Chains and Backpressure2.5. Internal Queues and Queuing Strategies3.1. Using Readable Streams (2)3.8.2. Internal Slots3.8.4.1. get desiredSize3.10.2. Internal Slots3.10.4.2. get desiredSize4.1. Using Writable Streams4.2.2. Internal Slots4.2.4.3. get state6.1. Class ByteLengthQueuingStrategy (2)6.2. Class CountQueuingStrategy (2)8.2. A readable stream with an underlying push source and backpressure support8.3. A readable byte stream with an underlying push source and backpressure support8.6. A writable stream with no backpressure or success signals8.7. A writable stream with backpressure and success signals.

When teeing a readable stream, the backpressure signals from its two branches will aggregate, such that if neither branch is read from, a backpressure signal will be sent to the underlying source of the original stream.

2.5. Internal Queues and Queuing Strategies

Both readable and writable streams maintain internal queues#internal-queuesReferenced in:2.5. Internal Queues and Queuing Strategies3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})3.4. Readable Stream Abstract Operations Used by Controllers3.8. Class ReadableStreamDefaultController3.10. Class ReadableByteStreamController8.3. A readable byte stream with an underlying push source and backpressure support, which they use for similar purposes. In the case of a readable stream, the internal queue contains chunks that have been enqueued by the underlying source, but not yet read by the consumer. In the case of a writable stream, the internal queue contains chunks which have been written to the stream by the producer, but not yet processed and acknowledged by the underlying sink.

A queuing strategy#queuing-strategyReferenced in:3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})3.8.2. Internal Slots (2) (3)3.10.2. Internal Slots (2) (3)4.2.2. Internal Slots (2)4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {})4.2.4.6. write(chunk)6.1. Class ByteLengthQueuingStrategy (2)6.2. Class CountQueuingStrategy is an object that determines how a stream should signal backpressure based on the state of its internal queue. The queuing strategy assigns a size to each chunk, and compares the total size of all chunks in the queue to a specified number, known as the high water mark#high-water-markReferenced in:3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2)4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {}). The resulting difference, high water mark minus total size, is used to determine the desired size to fill the stream’s queue#desired-size-to-fill-a-streams-internal-queueReferenced in:3.8.4.1. get desiredSize3.9.7. ReadableStreamDefaultControllerGetDesiredSize ( controller )3.10.4.2. get desiredSize.

For readable streams, an underlying source can use this desired size as a backpressure signal, slowing down chunk generation so as to try to keep the desired size above or at zero. For writable streams, a producer can behave similarly, avoiding writes that would cause the desired size to go negative.

A simple example of a queuing strategy would be one that assigns a size of one to each chunk, and has a high water mark of three. This would mean that up to three chunks could be enqueued in a readable stream, or three chunks written to a writable stream, before the streams are considered to be applying backpressure.

2.6. Locking

A readable stream reader#readable-stream-readerReferenced in:3.1. Using Readable Streams3.3.6. ReadableStreamTee ( stream, shouldClone ), or simply reader, is an object that allows direct reading of chunks from a readable stream. Without a reader, a consumer can only perform high-level operations on the readable stream: waiting for the stream to become closed or errored, canceling the stream, or piping the readable stream to a writable stream. Many of those high-level operations actually use a reader themselves.

A given readable stream only has at most one reader at a time. We say in this case the stream is locked to the reader#locked-to-a-readerReferenced in:2.1. Readable Streams3.2.2. Internal Slots3.2.4.1. get locked3.2.4.3. getReader(options = {})3.2.4.4. pipeThrough({ writable, readable }, options)3.2.4.5. pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})3.2.4.6. tee()3.3.5. IsReadableStreamLocked ( stream ), and that the reader is active#active-readerReferenced in:3.5.4.2. cancel(reason)3.5.4.4. releaseLock()3.6.4.2. cancel(reason)3.6.4.4. releaseLock().

A reader also has the capability to release its read lock#release-a-read-lockReferenced in:3.2.4.3. getReader(options = {})3.5.4.1. get closed3.5.4.4. releaseLock()3.6.4.1. get closed3.6.4.2. cancel(reason)3.6.4.4. releaseLock(), which makes it no longer active. At this point another reader can be acquired at will. If the stream becomes closed or errored as a result of the behavior of its underlying source or via cancellation, its reader (if one exists) will automatically release its lock.

A readable byte stream has the ability to vend two types of readers: default readers#default-readersReferenced in:3.1. Using Readable Streams3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})3.2.4.3. getReader(options = {})3.3.2. AcquireReadableStreamDefaultReader ( stream )3.5. Class ReadableStreamDefaultReader8.5. A readable byte stream with an underlying pull source (2) and BYOB readers#byob-readersReferenced in:3.1. Using Readable Streams3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2)3.2.4.3. getReader(options = {})3.3.1. AcquireReadableStreamBYOBReader ( stream )3.6. Class ReadableStreamBYOBReader8.3. A readable byte stream with an underlying push source and backpressure support (2)8.5. A readable byte stream with an underlying pull source. BYOB ("bring your own buffer") readers allow reading into a developer-supplied buffer, thus minimizing copies.

3. Readable Streams

3.1. Using Readable Streams

The simplest way to consume a readable stream is to simply pipe it to a writable stream. This ensures that backpressure is respected, and any errors (either writing or reading) are propagated through the chain:
readableStream.pipeTo(writableStream)
  .then(() => console.log("All data successfully written!"))
  .catch(e => console.error("Something went wrong!", e));
If you simply want to be alerted of each new chunk from a readable stream, you can pipe it to a new writable stream that you custom-create for that purpose:
readableStream.pipeTo(new WritableStream({
  write(chunk) {
    console.log("Chunk received", chunk);
  },
  close() {
    console.log("All data successfully read!");
  },
  abort(e) {
    console.error("Something went wrong!", e);
  }
}));

By returning promises from your write implementation, you can signal backpressure to the readable stream.

Although readable streams will usually be used by piping them to a writable stream, you can also read them directly by acquiring a reader and using its read() method to get successive chunks. For example, this code logs the next chunk in the stream, if available:
const reader = readableStream.getReader();

reader.read().then(
  ({ value, done }) => {
    if (done) {
      console.log("The stream was already closed!");
    } else {
      console.log(value);
    }
  },
  e => console.error("The stream became errored and cannot be read from!", e)
);

This more manual method of reading a stream is mainly useful for library authors building new high-level operations on streams, beyond the provided ones of piping and teeing.

The above example showed using the readable stream’s default reader. If the stream is a readable byte stream, you can also acquire a BYOB reader for it, which allows more precise control over buffer allocation in order to avoid copies. For example, this code reads the first 1024 bytes from the stream into a single memory buffer:
const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1024);
readInto(startingAB)
  .then(buffer => console.log("The first 1024 bytes:", buffer))
  .catch(e => console.error("Something went wrong!", e));

function readInto(buffer, offset = 0) {
  if (offset === buffer.byteLength) {
    return Promise.resolve(buffer);
  }

  const view = new Uint8Array(buffer, offset, buffer.byteLength - offset);
  return reader.read(view).then(newView => {
    return readInto(newView.buffer, offset + newView.byteLength);
  });
}

An important thing to note here is that the final buffer value is different from the startingAB, but it (and all intermediate buffers) shares the same backing memory allocation. At each step, the buffer is transferred to a new ArrayBuffer object. The newView is a new Uint8Array, with that ArrayBuffer object as its buffer property, the offset that bytes were written to as its byteOffset property, and the number of bytes that were written as its byteLength property.

3.2. Class ReadableStream#rs-classReferenced in:3.2. Class ReadableStream3.2.1. Class Definition3.2.2. Internal Slots3.2.4. Properties of the ReadableStream Prototype3.2.4.4. pipeThrough({ writable, readable }, options) (2)3.2.4.6. tee()3.3.6. ReadableStreamTee ( stream, shouldClone ) (2)3.4. Readable Stream Abstract Operations Used by Controllers (2) (3)3.5. Class ReadableStreamDefaultReader3.5.2. Internal Slots3.6. Class ReadableStreamBYOBReader3.6.2. Internal Slots3.8. Class ReadableStreamDefaultController (2)3.8.2. Internal Slots3.8.3. new ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)3.10. Class ReadableByteStreamController (2)3.10.2. Internal Slots3.10.3. new ReadableByteStreamController(stream, underlyingByteSource, highWaterMark)7. Global Properties (2) (3)8. Examples of Creating Streams8.3. A readable byte stream with an underlying push source and backpressure support8.5. A readable byte stream with an underlying pull source

The ReadableStream class is a concrete instance of the general readable stream concept. It is adaptable to any chunk type, and maintains an internal queue to keep track of data supplied by the underlying source but not yet read by any consumer.

3.2.1. Class Definition

This section is non-normative.

If one were to write the ReadableStream class in something close to the syntax of [ECMASCRIPT], it would look like

class ReadableStream {
  constructor(underlyingSource = {}, { size, highWaterMark = 1 } = {})

  get locked()

  cancel(reason)
  getReader()
  pipeThrough({ writable, readable }, options)
  pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
  tee()
}

3.2.2. Internal Slots

Instances of ReadableStream are created with the internal slots described in the following table:

Internal Slot Description (non-normative)
[[readableStreamController]] A ReadableStreamDefaultController or ReadableByteStreamController created with the ability to control the state and queue of this stream; also used for the IsReadableStream brand check
[[disturbed]] A boolean flag set to true when the stream has been read from or canceled
[[reader]] A ReadableStreamDefaultReader or ReadableStreamBYOBReader instance, if the stream is locked to a reader, or undefined if it is not
[[state]] A string containing the stream’s current state, used internally; one of "readable", "closed", or "errored".
[[storedError]] A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate on an errored stream

3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})

The underlyingSource object passed to the constructor can implement any of the following methods to govern how the constructed stream instance behaves:

Both start and pull are given the ability to manipulate the stream’s internal queue and state via the passed controller object. This is an example of the revealing constructor pattern.

If the underlyingSource object contains a property type set to "bytes", this readable stream is a readable byte stream, and can successfully vend BYOB readers. In that case, the passed controller object will be an instance of ReadableByteStreamController. Otherwise, it will be an instance of ReadableStreamDefaultController.

For readable byte streams, underlyingSource can also contain a property autoAllocateChunkSize, which can be set to a positive integer to enable the auto-allocation feature for this stream. In that case, when a consumer uses a default reader, the stream implementation will automatically allocate an ArrayBuffer of the given size, and call the underlying source code as if the consumer was using a BYOB reader. This can cut down on the amount of code needed when writing the underlying source implementation, as can be seen by comparing §8.3 A readable byte stream with an underlying push source and backpressure support without auto-allocation to §8.5 A readable byte stream with an underlying pull source with auto-allocation.

The constructor also accepts a second argument containing the queuing strategy object with two properties: a non-negative number highWaterMark, and a function size(chunk). The supplied strategy could be an instance of the built-in CountQueuingStrategy or ByteLengthQueuingStrategy classes, or it could be custom. If no strategy is supplied, the default behavior will be the same as a CountQueuingStrategy with a high water mark of 1.

  1. Set this@[[state]] to "readable".
  2. Set this@[[reader]] and this@[[storedError]] to undefined.
  3. Set this@[[disturbed]] to false.
  4. Set this@[[readableStreamController]] to undefined.
  5. Let type be GetV(underlyingSource, "type").
  6. ReturnIfAbrupt(type).
  7. Let typeString be ToString(type).
  8. If typeString is "bytes",
    1. If highWaterMark is undefined, let highWaterMark be 0.
    2. Set this@[[readableStreamController]] to Construct(ReadableByteStreamController, «this, underlyingSource, highWaterMark»).
  9. Otherwise, if type is undefined,
    1. If highWaterMark is undefined, let highWaterMark be 1.
    2. Set this@[[readableStreamController]] to Construct(ReadableStreamDefaultController, «this, underlyingSource, size, highWaterMark»).
  10. Otherwise, throw a RangeError exception.

3.2.4. Properties of the ReadableStream Prototype

3.2.4.1. get locked
The locked getter returns whether or not the readable stream is locked to a reader.
  1. If IsReadableStream(this) is false, throw a TypeError exception.
  2. Return IsReadableStreamLocked(this).
3.2.4.2. cancel(reason)#rs-cancelReferenced in:3.4.4. ReadableStreamClose ( stream )3.9.5. ReadableStreamDefaultControllerEnqueue ( controller, chunk )
The cancel method cancels the stream, signaling a loss of interest in the stream by a consumer. The supplied reason argument will be given to the underlying source, which may or may not use it.
  1. If IsReadableStream(this) is false, return a promise rejected with a TypeError exception.
  2. If IsReadableStreamLocked(this) is true, return a promise rejected with a TypeError exception.
  3. Return ReadableStreamCancel(this, reason).
3.2.4.3. getReader(options = {})#rs-get-readerReferenced in:3.5.3. new ReadableStreamDefaultReader(stream)3.6.3. new ReadableStreamBYOBReader(stream)7. Global Properties
The getReader method creates a reader of the type specified by the options argument and locks the stream to the new reader. While the stream is locked, no other reader can be acquired until this one is released.

This functionality is especially useful for creating abstractions that desire the ability to consume a stream in its entirety. By getting a reader for the stream, you can ensure nobody else can interleave reads with yours or cancel the stream, which would interfere with your abstraction.

When options.mode is undefined, the method creates a default reader (an instance of ReadableStreamDefaultReader). The reader provides the ability to directly read individual chunks from the stream via the reader’s read() method.

When options.mode is "byob", the getReader method creates a BYOB reader (an instance of ReadableStreamBYOBReader). This feature only works on readable byte streams, i.e. streams which were constructed specifically with the ability to handle "bring your own buffer" reading. The reader provides the ability to directly read individual chunks from the stream via the reader’s read() method, into developer-supplied buffers, allowing more precise control over allocation.

  1. If IsReadableStream(this) is false, throw a TypeError exception.
  2. Let mode be GetV(options, "mode").
  3. If mode is "byob",
    1. If IsReadableByteStreamController(this@[[readableStreamController]]) is false, throw a TypeError exception.
    2. Return AcquireReadableStreamBYOBReader(this).
  4. If mode is undefined, return AcquireReadableStreamDefaultReader(this).
  5. Throw a RangeError exception.
An example of an abstraction that might benefit from using a reader is a function like the following, which is designed to read an entire readable stream into memory as an array of chunks.
function readAllChunks(readableStream) {
  const reader = readableStream.getReader();
  const chunks = [];

  return pump();

  function pump() {
    return reader.read().then(({ value, done })=> {
      if (done) {
        return chunks;
      }

      chunks.push(value);
      return pump();
    });
  }
}

Note how the first thing it does is obtain a reader, and from then on it uses the reader exclusively. This ensures that no other consumer can interfere with the stream, either by reading chunks or by canceling the stream.

3.2.4.4. pipeThrough({ writable, readable }, options)#rs-pipe-throughReferenced in:3.2.4.4. pipeThrough({ writable, readable }, options)
The pipeThrough method provides a convenient, chainable way of piping this readable stream through a transform stream (or any other { writable, readable } pair). It simply pipes the stream into the writable side of the supplied pair, and returns the readable side for further use.

Piping a stream will generally lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.

This method is intentionally generic; it does not require that its this value be a ReadableStream object. It also does not require that its writable argument be a WritableStream instance, or that its readable argument be a ReadableStream instance.

  1. Call-with-rethrow Invoke(this, "pipeTo", «writable, options»).
  2. Return readable.
A typical example of constructing pipe chain using pipeThrough(transform, options) would look like
httpResponseBody
  .pipeThrough(decompressorTransform)
  .pipeThrough(ignoreNonImageFilesTransform)
  .pipeTo(mediaGallery);
3.2.4.5. pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
The pipeTo method pipes this readable stream to a given writable stream. The way in which the piping process behaves under various error conditions can be customized with a number of passed options. It returns a promise that fulfills when the piping process completes successfully, or rejects if any errors were encountered.

Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.

The pipeTo method is still in some flux. Its design depends on the design of writable streams, which are still undergoing spec churn.

For now, the reference implementation and tests provide a guide to what this method is generally intended to do: reference-implementation/lib/readable-stream.js, look for the pipeTo method. In addition to changing as the writable stream design changes, one major aspect of pipeTo not captured by the reference implementation is that it will operate via unobservable abstract operation calls, instead of using the JavaScript-exposed readable and writable stream APIs. This will better allow optimization and specialization. See #407 and #97 for more information.

3.2.4.6. tee()
The tee method tees this readable stream, returning a two-element array containing the two resulting branches as new ReadableStream instances.

Teeing a stream will lock it, preventing any other consumer from acquiring a reader. To cancel the stream, cancel both of the resulting branches; a composite cancellation reason will then be propagated to the stream’s underlying source.

Note that the chunks seen in each branch will be the same object. If the chunks are not immutable, this could allow interference between the two branches. (Let us know if you think we should add an option to tee that creates structured clones of the chunks for each branch.)

  1. If IsReadableStream(this) is false, throw a TypeError exception.
  2. Let branches be ReadableStreamTee(this, false).
  3. ReturnIfAbrupt(branches).
  4. Return CreateArrayFromList(branches).
Teeing a stream is most useful when you wish to let two independent consumers read from the stream in parallel, perhaps even at different speeds. For example, given a writable stream cacheEntry representing an on-disk file, and another writable stream httpRequestBody representing an upload to a remote server, you could pipe the same readable stream to both destinations at once:
const [forLocal, forRemote] = readableStream.tee();

Promise.all([
  forLocal.pipeTo(cacheEntry),
  forRemote.pipeTo(httpRequestBody)
])
.then(() => console.log("Saved the stream to the cache and also uploaded it!"))
.catch(e => console.error("Either caching or uploading failed: ", e));

3.3. General Readable Stream Abstract Operations

The following abstract operations, unlike most in this specification, are meant to be generally useful by other specifications, instead of just being part of the implementation of this spec’s classes.

3.3.1. AcquireReadableStreamBYOBReader ( stream ) throws

This abstract operation is meant to be called from other specifications that may wish to acquire a BYOB reader for a given stream.

  1. Return Construct(ReadableStreamBYOBReader, «stream»).

3.3.2. AcquireReadableStreamDefaultReader ( stream ) throws

This abstract operation is meant to be called from other specifications that may wish to acquire a default reader for a given stream.

  1. Return Construct(ReadableStreamDefaultReader, «‍stream»).

3.3.3. IsReadableStream ( x ) nothrow

  1. If Type(x) is not Object, return false.
  2. If x does not have a [[readableStreamController]] internal slot, return false.
  3. Return true.

3.3.4. IsReadableStreamDisturbed ( stream ) nothrow

This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream has ever been read from or canceled.

  1. Assert: IsReadableStream(stream) is true.
  2. Return stream@[[disturbed]].

3.3.5. IsReadableStreamLocked ( stream ) nothrow

This abstract operation is meant to be called from other specifications that may wish to query whether or not a readable stream is locked to a reader.

  1. Assert: IsReadableStream(stream) is true.
  2. If stream@[[reader]] is undefined, return false.
  3. Return true.

3.3.6. ReadableStreamTee ( stream, shouldClone ) throws

This abstract operation is meant to be called from other specifications that may wish to tee a given readable stream. Its second argument governs whether or not the data from the original stream will be structured cloned before becoming visible in the returned branches. [HTML]

  1. Assert: IsReadableStream(stream) is true.
  2. Assert: Type(shouldClone) is Boolean.
  3. Let reader be AcquireReadableStreamDefaultReader(stream).
  4. ReturnIfAbrupt(reader).
  5. Let teeState be Record{[[closedOrErrored]]: false, [[canceled1]]: false, [[canceled2]]: false, [[reason1]]: undefined, [[reason2]]: undefined, [[promise]]: a new promise}.
  6. Let pull be a new ReadableStreamTee pull function.
  7. Set pull@[[reader]] to reader, pull@[[teeState]] to teeState, and pull@[[shouldClone]] to shouldClone.
  8. Let cancel1 be a new ReadableStreamTee branch 1 cancel function.
  9. Set cancel1@[[stream]] to stream and cancel1@[[teeState]] to teeState.
  10. Let cancel2 be a new ReadableStreamTee branch 2 cancel function.
  11. Set cancel2@[[stream]] to stream and cancel2@[[teeState]] to teeState.
  12. Let underlyingSource1 be ObjectCreate(%ObjectPrototype%).
  13. Perform CreateDataProperty(underlyingSource1, "pull", pull).
  14. Perform CreateDataProperty(underlyingSource1, "cancel", cancel1).
  15. Let branch1Stream be Construct(ReadableStream, underlyingSource1).
  16. Let underlyingSource2 be ObjectCreate(%ObjectPrototype%).
  17. Perform CreateDataProperty(underlyingSource2, "pull", pull).
  18. Perform CreateDataProperty(underlyingSource2, "cancel", cancel2).
  19. Let branch2Stream be Construct(ReadableStream, underlyingSource2).
  20. Set pull@[[branch1]] to branch1Stream@[[readableStreamController]].
  21. Set pull@[[branch2]] to branch2Stream@[[readableStreamController]].
  22. Upon rejection of reader@[[closedPromise]] with reason r,
    1. If teeState.[[closedOrErrored]] is true, return undefined.
    2. Perform ReadableStreamDefaultControllerError(pull@[[branch1]], r).
    3. Perform ReadableStreamDefaultControllerError(pull@[[branch2]], r).
    4. Set teeState.[[closedOrErrored]] to true.
  23. Return «branch1, branch2».
The given algorithm creates two clones of each chunk, and discards the original, instead of creating one clone and giving the original to one branch and the clone to another. This is done to ensure symmetry between the chunks seen by each branch; for example, the clone of const r = /?:/; r.expando = "!"; is distinguishable from the original since the clone will not have the expando property.

However, in specific cases implementations may be able to do something more optimal, without observable consequences. For example if each chunk is created by the implementation, and cannot otherwise be modified by the developer, it may be possible to ensure the original and its clone are not distinguishable, in which case only one clone operation would be necessary. But, be careful!

A ReadableStreamTee pull function#readablestreamtee-pull-functionReferenced in:3.3.6. ReadableStreamTee ( stream, shouldClone ) is an anonymous built-in function that pulls data from a given readable stream reader and enqueues it into two other streams ("branches" of the associated tee). Each ReadableStreamTee pull function has [[reader]], [[branch1]], [[branch2]], [[teeState]], and [[shouldClone]] internal slots. When a ReadableStreamTee pull function F is called, it performs the following steps:

  1. Let reader be F@[[reader]], branch1 be F@[[branch1]], branch2 be F@[[branch2]], teeState be F@[[teeState]], and shouldClone be F@[[shouldClone]].
  2. Return the result of transforming ReadableStreamDefaultReaderRead(reader) by a fulfillment handler which takes the argument result and performs the following steps:
    1. Assert: Type(result) is Object.
    2. Let value be Get(result, "value").
    3. ReturnIfAbrupt(value).
    4. Let done be Get(result, "done").
    5. ReturnIfAbrupt(done).
    6. Assert: Type(done) is Boolean.
    7. If done is true and teeState.[[closedOrErrored]] is false,
      1. If teeState.[[canceled1]] is false,
        1. Perform ReadableStreamDefaultControllerClose(branch1).
      2. If teeState.[[canceled2]] is false,
        1. Perform ReadableStreamDefaultControllerClose(branch2).
      3. Set teeState.[[closedOrErrored]] to true.
    8. If teeState.[[closedOrErrored]] is true, return undefined.
    9. If teeState.[[canceled1]] is false,
      1. Let value1 be value.
      2. If shouldClone is true, set value1 to StructuredClone(value).
      3. Call-with-rethrow ReadableStreamDefaultControllerEnqueue(branch1, value1).
    10. If teeState.[[canceled2]] is false,
      1. Let value2 be value.
      2. If shouldClone is true, set value2 to StructuredClone(value).
      3. Call-with-rethrow ReadableStreamDefaultControllerEnqueue(branch2, value2).

A ReadableStreamTee branch 1 cancel function#readablestreamtee-branch-1-cancel-functionReferenced in:3.3.6. ReadableStreamTee ( stream, shouldClone ) is an anonymous built-in function that reacts to the cancellation of the first of the two branches of the associated tee. Each ReadableStreamTee branch 1 cancel function has [[stream]] and [[teeState]] internal slots. When a ReadableStreamTee branch 1 cancel function F is called with argument reason, it performs the following steps:

  1. Let stream be F@[[stream]] and teeState be F@[[teeState]].
  2. Set teeState.[[canceled1]] to true.
  3. Set teeState.[[reason1]] to reason.
  4. If teeState.[[canceled2]] is true,
    1. Let compositeReason be CreateArrayFromListteeState.[[reason1]], teeState.[[reason2]]»).
    2. Let cancelResult be ReadableStreamCancel(stream, compositeReason).
    3. ReturnIfAbrupt(cancelResult).
    4. Resolve teeState.[[promise]] with cancelResult.
  5. Return teeState.[[promise]].

A ReadableStreamTee branch 2 cancel function#readablestreamtee-branch-2-cancel-functionReferenced in:3.3.6. ReadableStreamTee ( stream, shouldClone ) is an anonymous built-in function that reacts to the cancellation of the second of the two branches of the associated tee. Each ReadableStreamTee branch 2 cancel function has [[stream]] and [[teeState]] internal slots. When a ReadableStreamTee branch 2 cancel function F is called with argument reason, it performs the following steps:

  1. Let stream be F@[[stream]] and teeState be F@[[teeState]].
  2. Set teeState.[[canceled2]] to true.
  3. Set teeState.[[reason2]] to reason.
  4. If teeState.[[canceled1]] is true,
    1. Let compositeReason be CreateArrayFromListteeState.[[reason1]], teeState.[[reason2]]»).
    2. Let cancelResult be ReadableStreamCancel(stream, compositeReason).
    3. ReturnIfAbrupt(cancelResult).
    4. Resolve teeState.[[promise]] with cancelResult.
  5. Return teeState.[[promise]].
The algorithm given here is written such that three new function objects are created for each call to to ReadableStreamTee. This is just a simplification, and is not actually necessary, since it is unobservable to developer code. For example, a self-hosted implementation could optimize by creating a class whose prototype contains methods for these functions, with the state stored as instance variables.

3.4. Readable Stream Abstract Operations Used by Controllers

In terms of specification factoring, the way that the ReadableStream class encapsulates the behavior of both simple readable streams and readable byte streams into a single class is by centralizing most of the potentially-varying logic inside the two controller classes, ReadableStreamDefaultController and ReadableByteStreamController. Those classes define most of the stateful internal slots and abstract operations for how a stream’s internal queue is managed and how it interfaces with its underlying source or underlying byte source.

The abstract operations in this section are interfaces that are used by the controller implementations to affect their associated ReadableStream object, translating those internal state changes into developer-facing results visible through the ReadableStream's public API.

3.4.1. ReadableStreamAddReadIntoRequest ( stream ) nothrow

  1. Assert: IsReadableStreamBYOBReader(stream@[[reader]]) is true.
  2. Let promise be a new promise.
  3. Let readIntoRequest be Record{[[promise]]: promise}.
  4. Append readIntoRequest as the last element of stream@[[reader]]@[[readIntoRequests]].
  5. Return promise.

3.4.2. ReadableStreamAddReadRequest ( stream ) nothrow

  1. Assert: IsReadableStreamDefaultReader(stream@[[reader]]) is true.
  2. Let promise be a new promise.
  3. Let readRequest be Record{[[promise]]: promise}.
  4. Append readRequest as the last element of stream@[[reader]]@[[readRequests]].
  5. Return promise.

3.4.3. ReadableStreamCancel ( stream, reason ) nothrow

  1. Assert: stream is not undefined.
  2. Set stream@[[disturbed]] to true.
  3. If stream@[[state]] is "closed", return a new promise resolved with undefined.
  4. If stream@[[state]] is "errored", return a new promise rejected with stream@[[storedError]].
  5. Perform ReadableStreamClose(stream).
  6. Let sourceCancelPromise be stream@[[readableStreamController]].[[Cancel]](reason).
  7. Return the result of transforming sourceCancelPromise by a fulfillment handler that returns undefined.

3.4.4. ReadableStreamClose ( stream ) nothrow

This abstract operation can be called by other specifications that wish to close a readable stream, in the same way a developer-created stream would be closed by its associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).

  1. Assert: stream@[[state]] is "readable".
  2. Set stream@[[state]] to "closed".
  3. Let reader be stream@[[reader]].
  4. If reader is undefined, return undefined.
  5. If IsReadableStreamDefaultReader(reader) is true,
    1. Repeat for each readRequest that is an element of reader@[[readRequests]],
      1. Resolve readRequest.[[promise]] with CreateIterResultObject(undefined, true).
    2. Set reader@[[readRequests]] to an empty List.
  6. Resolve reader@[[closedPromise]] with undefined.
  7. Return undefined.
The case where stream@[[state]] is "closed", but stream@[[closeRequested]] is false, will happen if the stream was closed without its controller’s close method ever being called: i.e., if the stream was closed by a call to cancel(reason). In this case we allow the controller’s close method to be called and silently do nothing, since the cancelation was outside the control of the underlying source.

3.4.5. ReadableStreamError ( stream, e ) nothrow

This abstract operation can be called by other specifications that wish to move a readable stream to an errored state, in the same way a developer would error a stream using its associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the precondition (listed here as an assert).

  1. Assert: IsReadableStream(stream) is true.
  2. Assert: stream@[[state]] is "readable".
  3. Set stream@[[state]] to "errored".
  4. Set stream@[[storedError]] to e.
  5. Let reader be stream@[[reader]].
  6. If reader is undefined, return undefined.
  7. If IsReadableStreamDefaultReader(reader) is true,
    1. Repeat for each readRequest that is an element of reader@[[readRequests]],
      1. Reject readRequest.[[promise]] with e.
    2. Set reader@[[readRequests]] to a new empty List.
  8. Otherwise,
    1. Assert: IsReadableStreamBYOBReader(reader).
    2. Repeat for each readIntoRequest that is an element of reader@[[readIntoRequests]],
      1. Reject readIntoRequest.[[promise]] with e.
    3. Set reader@[[readIntoRequests]] to a new empty List.
  9. Reject reader@[[closedPromise]] with e.

3.4.6. ReadableStreamFulfillReadIntoRequest ( stream, chunk, done ) nothrow

  1. Let reader be stream@[[reader]].
  2. Let readIntoRequest be the first element of reader@[[readIntoRequests]].
  3. Remove readIntoRequest from reader@[[readIntoRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
  4. Resolve readIntoRequest.[[promise]] with CreateIterResultObject(chunk, done).

3.4.7. ReadableStreamFulfillReadRequest ( stream, chunk, done ) nothrow

  1. Let reader be stream@[[reader]].
  2. Let readRequest be the first element of reader@[[readRequests]].
  3. Remove readRequest from reader@[[readRequests]], shifting all other elements downward (so that the second becomes the first, and so on).
  4. Resolve readRequest.[[promise]] with CreateIterResultObject(chunk, done).

3.4.8. ReadableStreamGetNumReadIntoRequests ( stream ) nothrow

  1. Return the number of elements in stream@[[reader]]@[[readIntoRequests]].

3.4.9. ReadableStreamGetNumReadRequests ( stream ) nothrow

  1. Return the number of elements in stream@[[reader]]@[[readRequests]].

3.4.10. ReadableStreamHasBYOBReader ( stream ) nothrow

  1. Let reader be stream@[[reader]].
  2. If reader is undefined, return false.
  3. If IsReadableStreamBYOBReader(reader) is false, return false.
  4. Return true.

3.4.11. ReadableStreamHasReader ( stream ) nothrow

  1. Let reader be stream@[[reader]].
  2. If reader is undefined, return false.
  3. If IsReadableStreamDefaultReader(reader) is false, return false.
  4. Return true.

3.5. Class ReadableStreamDefaultReader#default-reader-classReferenced in:3.2.2. Internal Slots3.2.4.3. getReader(options = {})3.3.2. AcquireReadableStreamDefaultReader ( stream )3.5. Class ReadableStreamDefaultReader3.5.1. Class Definition3.5.2. Internal Slots3.5.4. Properties of the ReadableStreamDefaultReader Prototype7. Global Properties

The ReadableStreamDefaultReader class represents a default reader designed to be vended by a ReadableStream instance.

3.5.1. Class Definition

This section is non-normative.

If one were to write the ReadableStreamDefaultReader class in something close to the syntax of [ECMASCRIPT], it would look like

class ReadableStreamDefaultReader {
  constructor(stream)

  get closed()

  cancel(reason)
  read()
  releaseLock()
}

3.5.2. Internal Slots

Instances of ReadableStreamDefaultReader are created with the internal slots described in the following table:

Internal Slot Description (non-normative)
[[closedPromise]] A promise returned by the reader’s closed getter
[[ownerReadableStream]] A ReadableStream instance that owns this reader
[[readRequests]] A List of promises returned by calls to the reader’s read() method that have not yet been resolved, due to the consumer requesting chunks sooner than they are available; also used for the IsReadableStreamDefaultReader brand check

3.5.3. new ReadableStreamDefaultReader(stream)

The ReadableStreamDefaultReader constructor is generally not meant to be used directly; instead, a stream’s getReader() method should be used.
  1. If IsReadableStream(stream) is false, throw a TypeError exception.
  2. If IsReadableStreamLocked(stream) is true, throw a TypeError exception.
  3. Perform ReadableStreamReaderGenericInitialize(this, stream).
  4. Set this@[[readRequests]] to a new empty List.

3.5.4. Properties of the ReadableStreamDefaultReader Prototype

3.5.4.1. get closed#default-reader-closedReferenced in:3.5.2. Internal Slots
The closed getter returns a promise that will be fulfilled when the stream becomes closed or the reader’s lock is released, or rejected if the stream ever errors.
  1. If IsReadableStreamDefaultReader(this) is false, return a promise rejected with a TypeError exception.
  2. Return this@[[closedPromise]].
3.5.4.2. cancel(reason)
If the reader is active, the cancel method behaves the same as that for the associated stream.
  1. If IsReadableStreamDefaultReader(this) is false, return a promise rejected with a TypeError exception.
  2. If this@[[ownerReadableStream]] is undefined, return a promise rejected with a TypeError exception.
  3. Return ReadableStreamCancel(this@[[ownerReadableStream]], reason).
3.5.4.3. read()#default-reader-readReferenced in:3.2.4.3. getReader(options = {})3.5.2. Internal Slots3.5.4.4. releaseLock()
The read method will return a promise that allows access to the next chunk from the stream’s internal queue, if available.

If reading a chunk causes the queue to become empty, more data will be pulled from the underlying source.

  1. If IsReadableStreamDefaultReader(this) is false, return a promise rejected with a TypeError exception.
  2. If this@[[ownerReadableStream]] is undefined, return a promise rejected with a TypeError exception.
  3. Return ReadableStreamDefaultReaderRead(this).
3.5.4.4. releaseLock()
The releaseLock method releases the reader’s lock on the corresponding stream. After the lock is released, the reader is no longer active. If the associated stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise, the reader will appear closed.

A reader’s lock cannot be released while it still has a pending read request, i.e., if a promise returned by the reader’s read() method has not yet been settled. Attempting to do so will throw a TypeError and leave the reader locked to the stream.

  1. If IsReadableStreamDefaultReader(this) is false, throw a TypeError exception.
  2. If this@[[ownerReadableStream]] is undefined, return undefined.
  3. If this@[[readRequests]] is not empty, throw a TypeError exception.
  4. Perform ReadableStreamReaderGenericRelease(this).

3.6. Class ReadableStreamBYOBReader#byob-reader-classReferenced in:3.2.2. Internal Slots3.2.4.3. getReader(options = {})3.3.1. AcquireReadableStreamBYOBReader ( stream )3.6. Class ReadableStreamBYOBReader3.6.1. Class Definition3.6.2. Internal Slots3.6.4. Properties of the ReadableStreamBYOBReader Prototype7. Global Properties

The ReadableStreamBYOBReader class represents a BYOB reader designed to be vended by a ReadableStream instance.

3.6.1. Class Definition

This section is non-normative.

If one were to write the ReadableStreamBYOBReader class in something close to the syntax of [ECMASCRIPT], it would look like

class ReadableStreamBYOBReader {
  constructor(stream)

  get closed()

  cancel(reason)
  read(view)
  releaseLock()
}

3.6.2. Internal Slots

Instances of ReadableStreamBYOBReader are created with the internal slots described in the following table:

Internal Slot Description (non-normative)
[[closedPromise]] A promise returned by the reader’s closed getter
[[ownerReadableStream]] A ReadableStream instance that owns this reader
[[readIntoRequests]] A List of promises returned by calls to the reader’s read(view) method that have not yet been resolved, due to the consumer requesting chunks sooner than they are available; also used for the IsReadableStreamBYOBReader brand check

3.6.3. new ReadableStreamBYOBReader(stream)

The ReadableStreamBYOBReader constructor is generally not meant to be used directly; instead, a stream’s getReader() method should be used.
  1. If IsReadableStream(stream) is false, throw a TypeError exception.
  2. If IsReadableStreamLocked(stream) is true, throw a TypeError exception.
  3. Perform ReadableStreamReaderGenericInitialize(this, stream).
  4. Set this@[[readIntoRequests]] to a new empty List.

3.6.4. Properties of the ReadableStreamBYOBReader Prototype

3.6.4.1. get closed#byob-reader-closedReferenced in:3.6.2. Internal Slots
The closed getter returns a promise that will be fulfilled when the stream becomes closed or the reader’s lock is released, or rejected if the stream ever errors.
  1. If IsReadableStreamBYOBReader(this) is false, return a promise rejected with a TypeError exception.
  2. Return this@[[closedPromise]].
3.6.4.2. cancel(reason)
If the reader is active, the cancel method behaves the same as that for the associated stream. When done, it automatically releases the lock.
  1. If IsReadableStreamBYOBReader(this) is false, return a promise rejected with a TypeError exception.
  2. If this@[[ownerReadableStream]] is undefined, return a promise rejected with a TypeError exception.
  3. Return ReadableStreamCancel(this@[[ownerReadableStream]], reason).
3.6.4.3. read(view)#byob-reader-readReferenced in:3.2.4.3. getReader(options = {})3.6.2. Internal Slots3.6.4.4. releaseLock()
The read method will write read bytes into view and return a promise resolved with a possibly transferred buffer as described below.

If reading a chunk causes the queue to become empty, more data will be pulled from the underlying byte source.

  1. If IsReadableStreamBYOBReader(this) is false, return a promise rejected with a TypeError exception.
  2. If this@[[ownerReadableStream]] is undefined, return a promise rejected with a TypeError exception.
  3. If Type(view) is not Object, return a promise rejected with a TypeError exception.
  4. If view does not have a [[ViewedArrayBuffer]] internal slot, return a promise rejected with a TypeError exception.
  5. If view@[[ByteLength]] is 0, return a promise rejected with a TypeError exception.
  6. Return ReadableStreamBYOBReaderRead(this, view).
3.6.4.4. releaseLock()
The releaseLock method releases the reader’s lock on the corresponding stream. After the lock is released, the reader is no longer active. If the associated stream is errored when the lock is released, the reader will appear errored in the same way from now on; otherwise, the reader will appear closed.

A reader’s lock cannot be released while it still has a pending read request, i.e., if a promise returned by the reader’s read() method has not yet been settled. Attempting to do so will throw a TypeError and leave the reader locked to the stream.

  1. If IsReadableStreamBYOBReader(this) is false, throw a TypeError exception.
  2. If this@[[ownerReadableStream]] is undefined, return undefined.
  3. If this@[[readIntoRequests]] is not empty, throw a TypeError exception.
  4. Perform ReadableStreamReaderGenericRelease(this).

3.7. Readable Stream Reader Abstract Operations

3.7.1. IsReadableStreamDefaultReader ( x ) nothrow

  1. If Type(x) is not Object, return false.
  2. If x does not have a [[readRequests]] internal slot, return false.
  3. Return true.

3.7.2. IsReadableStreamBYOBReader ( x ) nothrow

  1. If Type(x) is not Object, return false.
  2. If x does not have a [[readIntoRequests]] internal slot, return false.
  3. Return true.

3.7.3. ReadableStreamReaderGenericCancel ( reader, reason ) throws

  1. Return ReadableStreamCancel(reader@[[ownerReadableStream]], reason).

3.7.4. ReadableStreamReaderGenericInitialize ( reader, stream ) nothrow

  1. Set reader@[[ownerReadableStream]] to stream.
  2. Set stream@[[reader]] to reader.
  3. If stream@[[state]] is "readable",
    1. Set reader@[[closedPromise]] to a new promise.
  4. Otherwise,
    1. If stream@[[state]] is "closed",
      1. Set reader@[[closedPromise]] to a new promise resolved with undefined.
    2. Otherwise,
      1. Assert: stream@[[state]] is "errored".
      2. Set reader@[[closedPromise]] to a new promise rejected with stream@[[storedError]].

3.7.5. ReadableStreamReaderGenericRelease ( reader ) throws

  1. Assert: reader@[[ownerReadableStream]] is not undefined.
  2. Assert: reader@[[ownerReadableStream]]@[[reader]] is not undefined.
  3. If reader@[[ownerReadableStream]]@[[state]] is "readable", reject reader@[[closedPromise]] with a TypeError exception.
  4. Otherwise, set reader@[[closedPromise]] to a new promise rejected with a TypeError exception.
  5. Set reader@[[ownerReadableStream]]@[[reader]] to undefined.
  6. Set reader@[[ownerReadableStream]] to undefined.

3.7.6. ReadableStreamBYOBReaderRead ( reader, view ) nothrow

  1. Let stream be reader@[[ownerReadableStream]].
  2. Assert: stream is not undefined.
  3. Set stream@[[disturbed]] to true.
  4. If stream@[[state]] is "errored", return a promise rejected with stream@[[storedError]].
  5. Return ReadableByteStreamControllerPullInto(stream@[[readableStreamController]], view).

3.7.7. ReadableStreamDefaultReaderRead ( reader ) nothrow

  1. Let stream be reader@[[ownerReadableStream]].
  2. Assert: stream is not undefined.
  3. Set stream@[[disturbed]] to true.
  4. If stream@[[state]] is "closed", return a new promise resolved with CreateIterResultObject(undefined, true).
  5. If stream@[[state]] is "errored", return a new promise rejected with stream@[[storedError]].
  6. Assert: stream@[[state]] is "readable".
  7. Return stream@[[readableStreamController]].[[Pull]]().

3.8. Class ReadableStreamDefaultController#rs-default-controller-classReferenced in:3.2.2. Internal Slots3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2)3.4. Readable Stream Abstract Operations Used by Controllers3.8. Class ReadableStreamDefaultController (2)3.8.1. Class Definition3.8.2. Internal Slots3.8.4. Properties of the ReadableStreamDefaultController Prototype3.8.5. Readable Stream Default Controller Internal Methods7. Global Properties

The ReadableStreamDefaultController class has methods that allow control of a ReadableStream's state and internal queue. When constructing a ReadableStream that is not a readable byte stream, the underlying source is given a corresponding ReadableStreamDefaultController instance to manipulate.

3.8.1. Class Definition

This section is non-normative.

If one were to write the ReadableStreamDefaultController class in something close to the syntax of [ECMASCRIPT], it would look like

class ReadableStreamDefaultController {
  constructor(stream, underlyingSource, size, highWaterMark)

  get desiredSize()

  close()
  enqueue(chunk)
  error(e)
}

3.8.2. Internal Slots

Instances of ReadableStreamDefaultController are created with the internal slots described in the following table:

Internal Slot Description (non-normative)
[[closeRequested]] A boolean flag indicating whether the stream has been closed by its underlying source, but still has chunks in its internal queue that have not yet been read
[[controlledReadableStream]] The ReadableStream instance controlled
[[pullAgain]] A boolean flag set to true if the stream’s mechanisms requested a call to the underlying source’s pull method to pull more data, but the pull could not yet be done since a previous call is still executing
[[pulling]] A boolean flag set to true while the underlying source’s pull method is executing and has not yet fulfilled, used to prevent reentrant calls
[[queue]] A List representing the stream’s internal queue of chunks
[[started]] A boolean flag indicating whether the underlying source has finished starting
[[strategyHWM]] A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying source.
[[strategySize]] A function supplied to the constructor as part of the stream’s queuing strategy, designed to calculate the size of enqueued chunks; can be undefined for the default behavior.
[[underlyingSource]] An object representation of the stream’s underlying source, including its queuing strategy; also used for the IsReadableStream brand check

3.8.3. new ReadableStreamDefaultController(stream, underlyingSource, size, highWaterMark)

The ReadableStreamDefaultController constructor cannot be used directly; it only works on a ReadableStream that is in the middle of being constructed.
  1. If IsReadableStream(stream) is false, throw a TypeError exception.
  2. If stream@[[readableStreamController]] is not undefined, throw a TypeError exception.
  3. Set this@[[controlledReadableStream]] to stream.
  4. Set this@[[underlyingSource]] to underlyingSource.
  5. Set this@[[queue]] to a new empty List.
  6. Set this@[[started]], this@[[closeRequested]], this@[[pullAgain]], and this@[[pulling]] to false.
  7. Let normalizedStrategy be ValidateAndNormalizeQueuingStrategy(size, highWaterMark).
  8. Set this@[[strategySize]] to normalizedStrategy.[[size]] and this@[[strategyHWM]] to normalizedStrategy.[[highWaterMark]].
  9. Let controller be this.
  10. Let startResult be InvokeOrNoop(underlyingSource, "start", «this»).
  11. ReturnIfAbrupt(startResult).
  12. Resolve startResult as a promise:
    1. Upon fulfillment,
      1. Set controller@[[started]] to true.
      2. Perform ReadableStreamDefaultControllerCallPullIfNeeded(controller).
    2. Upon rejection with reason r,
      1. If stream@[[state]] is "readable", perform ReadableStreamDefaultControllerError(controller, r).

3.8.4. Properties of the ReadableStreamDefaultController Prototype

3.8.4.1. get desiredSize#rs-default-controller-desired-sizeReferenced in:3.9.7. ReadableStreamDefaultControllerGetDesiredSize ( controller )
The desiredSize getter returns the desired size to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying source should use this information to determine when and how to apply backpressure.
  1. If IsReadableStreamDefaultController(this) is false, throw a TypeError exception.
  2. Return ReadableStreamDefaultControllerGetDesiredSize(this).
3.8.4.2. close()
The close method will close the controlled readable stream. Consumers will still be able to read any previously-enqueued chunks from the stream, but once those are read, the stream will become closed.
  1. If IsReadableStreamDefaultController(this) is false, throw a TypeError exception.
  2. If this@[[closeRequested]] is true, throw a TypeError exception.
  3. If this@[[controlledReadableStream]]@[[state]] is not "readable", throw a TypeError exception.
  4. Perform ReadableStreamDefaultControllerClose(this).
3.8.4.3. enqueue(chunk)
The enqueue method will enqueue a given chunk in the controlled readable stream.
  1. If IsReadableStreamDefaultController(this) is false, throw a TypeError exception.
  2. If this@[[closeRequested]] is true, throw a TypeError exception.
  3. If this@[[controlledReadableStream]]@[[state]] is not "readable", throw a TypeError exception.
  4. Return ReadableStreamDefaultControllerEnqueue(this, chunk).
3.8.4.4. error(e)
The error method will error the readable stream, making all future interactions with it fail with the given error e.
  1. If IsReadableStreamDefaultController(this) is false, throw a TypeError exception.
  2. Let stream be this@[[controlledReadableStream]].
  3. If stream@[[state]] is not "readable", throw a TypeError exception.
  4. Perform ReadableStreamDefaultControllerError(this, e).

3.8.5. Readable Stream Default Controller Internal Methods

The following are additional internal methods implemented by each ReadableStreamDefaultController instance. They are similar to the supporting abstract operations in the following section, but are in method form to allow polymorphic dispatch from the readable stream implementation to either these or their counterparts for BYOB controllers.

3.8.5.1. [[Cancel]](reason)
  1. Set this@[[queue]] to a new empty List.
  2. Return PromiseInvokeOrNoop(this@[[underlyingSource]], "cancel", «reason»)
3.8.5.2. [[Pull]]()
  1. Let stream be this@[[controlledReadableStream]].
  2. If this[[queue]] is not empty,
    1. Let chunk be DequeueValue(this@[[queue]]).
    2. If this@[[closeRequested]] is true and this@[[queue]] is empty, perform ReadableStreamClose(stream).
    3. Otherwise, perform ReadableStreamDefaultControllerCallPullIfNeeded(this).
    4. Return a promise resolved with CreateIterResultObject(chunk, false).
  3. Let pendingPromise be ReadableStreamAddReadRequest(stream).
  4. Perform ReadableStreamDefaultControllerCallPullIfNeeded(this).
  5. Return pendingPromise.

3.9. Readable Stream Default Controller Abstract Operations

3.9.1. IsReadableStreamDefaultController ( x ) nothrow

  1. If Type(x) is not Object, return false.
  2. If x does not have an [[underlyingSource]] internal slot, return false.
  3. Return true.

3.9.2. ReadableStreamDefaultControllerCallPullIfNeeded ( controller ) nothrow

  1. Let shouldPull be ReadableStreamDefaultControllerShouldCallPull(controller).
  2. If shouldPull is false, return undefined.
  3. If controller@[[pulling]] is true,
    1. Set controller@[[pullAgain]] to true.
    2. Return undefined.
  4. Set controller@[[pulling]] to true.
  5. Let pullPromise be PromiseInvokeOrNoop(controller@[[underlyingSource]], "pull", «controller»).
  6. Upon fulfillment of pullPromise,
    1. Set controller@[[pulling]] to false.
    2. If controller@[[pullAgain]] is true,
      1. Set controller@[[pullAgain]] to false.
      2. Perform ReadableStreamDefaultControllerCallPullIfNeeded(controller).
  7. Upon rejection of pullPromise with reason e,
    1. If controller@[[controlledReadableStream]]@[[state]] is "readable", perform ReadableStreamDefaultControllerError(controller, e).
  8. Return undefined.

3.9.3. ReadableStreamDefaultControllerShouldCallPull ( controller ) nothrow

  1. Let stream be controller@[[controlledReadableStream]].
  2. If stream@[[state]] is "closed" or stream@[[state]] is "errored", return false.
  3. If controller@[[closeRequested]] is true, return false.
  4. If controller@[[started]] is false, return false.
  5. If IsReadableStreamLocked(stream) is true and ReadableStreamGetNumReadRequests(stream) > 0, return true.
  6. Let desiredSize be ReadableStreamDefaultControllerGetDesiredSize(controller).
  7. If desiredSize > 0, return true.
  8. Return false.

3.9.4. ReadableStreamDefaultControllerClose ( controller ) nothrow

  1. Let stream be controller@[[controlledReadableStream]].
  2. Assert: controller@[[closeRequested]] is false.
  3. Assert: stream@[[state]] is "readable".
  4. Set controller@[[closeRequested]] to true.
  5. If controller@[[queue]] is empty, perform ReadableStreamClose(stream).

3.9.5. ReadableStreamDefaultControllerEnqueue ( controller, chunk ) nothrow

This abstract operation can be called by other specifications that wish to enqueue chunks in a readable stream, in the same way a developer would enqueue chunks using the stream’s associated controller object. Specifications should not do this to streams they did not create, and must ensure they have obeyed the preconditions (listed here as asserts).

  1. Let stream be controller@[[controlledReadableStream]].
  2. Assert: controller@[[closeRequested]] is false.
  3. Assert: stream@[[state]] is "readable".
  4. If IsReadableStreamLocked(stream) is true and ReadableStreamGetNumReadRequests(stream) > 0, perform ReadableStreamFulfillReadRequest(stream, chunk, false).
  5. Otherwise,
    1. Let chunkSize be 1.
    2. If controller@[[strategySize]] is not undefined,
      1. Set chunkSize to Call(stream@[[strategySize]], undefined, «‍chunk»).
      2. If chunkSize is an abrupt completion,
        1. If stream@[[state]] is "readable", perform ReadableStreamDefaultControllerError(controller, ‍chunkSize.[[value]]).
        2. Return chunkSize.
      3. Let chunkSize be chunkSize.[[value]].
    3. Let enqueueResult be EnqueueValueWithSize(controller@[[queue]], chunk, chunkSize).
    4. If enqueueResult is an abrupt completion,
      1. If stream@[[state]] is "readable", perform ReadableStreamDefaultControllerError(controller, ‍enqueueResult.[[value]]).
      2. Return enqueueResult.
  6. Perform ReadableStreamDefaultControllerCallPullIfNeeded(controller).
  7. Return undefined.
The case where stream@[[state]] is "closed", but stream@[[closeRequested]] is false, will happen if the stream was closed without its controller’s close method ever being called: i.e., if the stream was closed by a call to cancel(reason). In this case we allow the controller’s enqueue method to be called and silently do nothing, since the cancelation was outside the control of the underlying source.

3.9.6. ReadableStreamDefaultControllerError ( controller, e ) nothrow

  1. Let stream be controller@[[controlledReadableStream]].
  2. Assert: stream@[[state]] is "readable".
  3. Set controller@[[queue]] to a new empty List.
  4. Perform ReadableStreamError(stream, e).

3.9.7. ReadableStreamDefaultControllerGetDesiredSize ( controller ) nothrow

This abstract operation can be called by other specifications that wish to determine the desired size to fill this stream’s internal queue, similar to how a developer would consult the desiredSize property of the stream’s associated controller object. Specifications should not use this on streams they did not create.

  1. Let queueSize be GetTotalQueueSize(controller@[[queue]]).
  2. Return controller@[[strategyHWM]] − queueSize.

3.10. Class ReadableByteStreamController#rbs-controller-classReferenced in:3.2.2. Internal Slots3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2)3.4. Readable Stream Abstract Operations Used by Controllers3.10. Class ReadableByteStreamController (2)3.10.1. Class Definition3.10.2. Internal Slots3.10.4. Properties of the ReadableByteStreamController Prototype3.10.5. Readable Stream BYOB Controller Internal Methods3.11. Class ReadableStreamBYOBRequest3.11.2. Internal Slots7. Global Properties

The ReadableByteStreamController class has methods that allow control of a ReadableStream's state and internal queue. When constructing a ReadableStream, the underlying byte source is given a corresponding ReadableByteStreamController instance to manipulate.

3.10.1. Class Definition

This section is non-normative.

If one were to write the ReadableByteStreamController class in something close to the syntax of [ECMASCRIPT], it would look like

class ReadableByteStreamController {
  constructor(stream, underlyingByteSource, highWaterMark)

  get byobRequest()
  get desiredSize()

  close()
  enqueue(chunk)
  error(e)
}

3.10.2. Internal Slots

Instances of ReadableByteStreamController are created with the internal slots described in the following table:

Internal Slot Description (non-normative)
[[autoAllocateChunkSize]] A non negative integer when the automatic buffer allocation feature is enabled. In that case, this value specifies the size of buffer to allocate. It is undefined otherwise.
[[closeRequested]] A boolean flag indicating whether the stream has been closed by its underlying byte source, but still has chunks in its internal queue that have not yet been read
[[controlledReadableStream]] The ReadableStream instance controlled
[[pullAgain]] A boolean flag set to true if the stream’s mechanisms requested a call to the underlying byte source’s pull method to pull more data, but the pull could not yet be done since a previous call is still executing
[[pulling]] A boolean flag set to true while the underlying byte source’s pull method is executing and has not yet fulfilled, used to prevent reentrant calls
[[byobRequest]] A ReadableStreamBYOBRequest instance representing the current BYOB pull request.
[[pendingPullIntos]] A List of descriptors representing pending BYOB pull requests.
[[queue]] A List representing the stream’s internal queue of chunks
[[started]] A boolean flag indicating whether the underlying source has finished starting
[[strategyHWM]] A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to its underlying byte source.
[[strategySize]] A function supplied to the constructor as part of the stream’s queuing strategy, designed to calculate the size of enqueued chunks; can be undefined for the default behavior.
[[totalQueuedBytes]] The number of bytes stored in [[queue]]
[[underlyingByteSource]] An object representation of the stream’s underlying byte source, including its queuing strategy; also used for the IsReadableStream brand check

3.10.3. new ReadableByteStreamController(stream, underlyingByteSource, highWaterMark)

The ReadableByteStreamController constructor cannot be used directly; it only works on a ReadableStream that is in the middle of being constructed.
  1. If IsReadableStream(stream) is false, throw a TypeError exception.
  2. If stream@[[readableStreamController]] is not undefined, throw a TypeError exception.
  3. Set this@[[controlledReadableStream]] to stream.
  4. Set this@[[underlyingByteSource]] to underlyingByteSource.
  5. Set this@[[pullAgain]], and this@[[pulling]] to false.
  6. Perform ReadableByteStreamControllerClearPendingPullIntos(this).
  7. Set this@[[queue]] to a new empty List.
  8. Set this@[[totalQueuedBytes]] to 0.
  9. Set this@[[started]], and this@[[closeRequested]] to false.
  10. Set this@[[strategyHWM]] to ValidateAndNormalizeHighWaterMark(highWaterMark).
  11. Let autoAllocateChunkSize be GetV(underlyingByteSource, "autoAllocateChunkSize").
  12. If autoAllocateChunkSize is not undefined,
    1. Set autoAllocateChunkSize to ToInteger(autoAllocateChunkSize).
    2. ReturnIfAbrupt(autoAllocateChunkSize).
    3. If autoAllocateChunkSize ≤ 0, or if autoAllocateChunkSize is +∞ or -∞, throw a RangeError exception.
  13. Set this@[[autoAllocateChunkSize]] to autoAllocateChunkSize.
  14. Set this@[[pendingPullIntos]] to a new empty List.
  15. Let controller be this.
  16. Let startResult be InvokeOrNoop(underlyingByteSource, "start", «this»).
  17. ReturnIfAbrupt(startResult).
  18. Resolve startResult as a promise:
    1. Upon fulfillment,
      1. Set controller@[[started]] to true.
      2. Assert: controller@[[pulling]] is false.
      3. Assert: controller@[[pullAgain]] is false.
      4. Perform ReadableByteStreamControllerCallPullIfNeeded(controller).
    2. Upon rejection with reason r,
      1. If stream@[[state]] is "readable", perform ReadableByteStreamControllerError(controller, r).

3.10.4. Properties of the ReadableByteStreamController Prototype

3.10.4.1. get byobRequest
The byobRequest getter returns the current BYOB pull request.
  1. If IsReadableByteStreamController(this) is false, throw a TypeError exception.
  2. If this@[[byobRequest]] is undefined and this@[[pendingPullIntos]] is not empty,
    1. Let firstDescriptor be the first element of this@[[pendingPullIntos]].
    2. Let view be Construct(%Uint8Array%, «firstDescriptor.[[buffer]], firstDescriptor.[[byteOffset]] + firstDescriptor.[[bytesFilled]], firstDescriptor.[[byteLength]] - firstDescriptor.[[bytesFilled]]»).
    3. Set this@[[byobRequest]] to Construct(ReadableStreamBYOBRequest, «this, view»).
  3. Return this@[[byobRequest]].
3.10.4.2. get desiredSize
The desiredSize getter returns the desired size to fill the controlled stream’s internal queue. It can be negative, if the queue is over-full. An underlying source should use this information to determine when and how to apply backpressure.
  1. If IsReadableByteStreamController(this) is false, throw a TypeError exception.
  2. Return ReadableByteStreamControllerGetDesiredSize(this).
3.10.4.3. close()
The close method will close the controlled readable stream. Consumers will still be able to read any previously-enqueued chunks from the stream, but once those are read, the stream will become closed.
  1. If IsReadableByteStreamController(this) is false, throw a TypeError exception.
  2. If this@[[closeRequested]] is true, throw a TypeError exception.
  3. If this@[[controlledReadableStream]]@[[state]] is not "readable", throw a TypeError exception.
  4. Perform ReadableByteStreamControllerClose(this).
3.10.4.4. enqueue(chunk)
The enqueue method will enqueue a given chunk in the controlled readable stream.
  1. If IsReadableByteStreamController(this) is false, throw a TypeError exception.
  2. If this@[[closeRequested]] is true, throw a TypeError exception.
  3. If this@[[controlledReadableStream]]@[[state]] is not "readable", throw a TypeError exception.
  4. If Type(chunk) is not Object, throw a TypeError exception.
  5. If chunk does not have a [[ViewedArrayBuffer]] internal slot, throw a TypeError exception.
  6. Return ReadableByteStreamControllerEnqueue(this, chunk).
3.10.4.5. error(e)
The error method will error the readable stream, making all future interactions with it fail with the given error e.
  1. If IsReadableByteStreamController(this) is false, throw a TypeError exception.
  2. Let stream be this@[[controlledReadableStream]].
  3. If stream@[[state]] is not "readable", throw a TypeError exception.
  4. Perform ReadableByteStreamControllerError(this, e).

3.10.5. Readable Stream BYOB Controller Internal Methods

The following are additional internal methods implemented by each ReadableByteStreamController instance. They are similar to the supporting abstract operations in the following section, but are in method form to allow polymorphic dispatch from the readable stream implementation to either these or their counterparts for default controllers.

3.10.5.1. [[Cancel]](reason)
  1. If this@[[pendingPullIntos]] is not empty,
    1. Let firstDescriptor be the first element of this@[[pendingPullIntos]].
    2. Set firstDescriptor.[[bytesFilled]] to 0.
  2. Set this@[[queue]] to a new empty List.
  3. Set this@[[totalQueuedBytes]] to 0.
  4. Return PromiseInvokeOrNoop(this@[[underlyingByteSource]], "cancel", «reason»)
3.10.5.2. [[Pull]]()
  1. Let stream be this@[[controlledReadableStream]].
  2. If ReadableStreamGetNumReadRequests(stream) is 0,
    1. If this[[totalQueuedBytes]] > 0,
      1. Let entry be the first element of this@[[queue]].
      2. Remove entry from this@[[queue]], shifting all other elements downward (so that the second becomes the first, and so on).
      3. Subtract entry.[[byteLength]] from this@[[totalQueuedBytes]].
      4. Perform ReadableByteStreamControllerHandleQueueDrain(this).
      5. Let view be Construct(%Uint8Array%, «entry.[[buffer]], entry.[[byteOffset]], entry.[[byteLength]]»).
      6. Return a promise resolved with CreateIterResultObject(view, false).
    2. Let autoAllocateChunkSize be this@[[autoAllocateChunkSize]].
    3. If autoAllocateChunkSize is not undefined,
      1. Let buffer be Construct(%ArrayBuffer%, «autoAllocateChunkSize»).
      2. Let pullIntoDescriptor be Record{[[buffer]]: buffer, [[byteOffset]]: 0, [[byteLength]]: autoAllocateChunkSize, [[bytesFilled]]: 0, [[elementSize]]: 1, [[ctor]]: %Uint8Array%, [[readerType]]: "default"}.
      3. Append pullIntoDescriptor as the last element of this@[[pendingPullIntos]].
  3. Otherwise,
    1. Assert: this@[[autoAllocateChunkSize]] is undefined.
  4. Let promise be ReadableStreamAddReadRequest(stream).
  5. Perform ReadableByteStreamControllerCallPullIfNeeded(this).
  6. Return promise.

3.11. Class ReadableStreamBYOBRequest#rs-byob-request-classReferenced in:3.10.2. Internal Slots3.10.4.1. get byobRequest3.11. Class ReadableStreamBYOBRequest3.11.1. Class Definition3.11.2. Internal Slots3.11.4. Properties of the ReadableStreamBYOBRequest Prototype7. Global Properties

The ReadableStreamBYOBRequest class represents a pull into request in a ReadableByteStreamController.

3.11.1. Class Definition

This section is non-normative.

If one were to write the ReadableStreamBYOBRequest class in something close to the syntax of [ECMASCRIPT], it would look like

class ReadableStreamBYOBRequest {
  constructor(controller, view)

  get view()

  respond(bytesWritten)
  respondWithNewView(view)
}

3.11.2. Internal Slots

Instances of ReadableStreamBYOBRequest are created with the internal slots described in the following table:

Internal Slot Description (non-normative)
[[associatedReadableByteStreamController]] The parent ReadableByteStreamController instance
[[view]] A typed array representing the destination region to which the controller may write generated data

3.11.3. new ReadableStreamBYOBRequest(controller, view)

  1. Set this@[[associatedReadableByteStreamController]] to controller.
  2. Set this@[[view]] to view.

3.11.4. Properties of the ReadableStreamBYOBRequest Prototype

3.11.4.1. get view
  1. If IsReadableStreamBYOBRequest(this) is false, throw a TypeError exception.
  2. Return this@[[view]].
3.11.4.2. respond(bytesWritten)
  1. If IsReadableStreamBYOBRequest(this) is false, throw a TypeError exception.
  2. If this@[[associatedReadableByteStreamController]] is undefined, throw a TypeError exception.
  3. Return ReadableByteStreamControllerRespond(this@[[associatedReadableByteStreamController]], bytesWritten).
3.11.4.3. respondWithNewView(view)
  1. If IsReadableStreamBYOBRequest(this) is false, throw a TypeError exception.
  2. If this@[[associatedReadableByteStreamController]] is undefined, throw a TypeError exception.
  3. If Type(chunk) is not Object, throw a TypeError exception.
  4. If view does not have a [[ViewedArrayBuffer]] internal slot, throw a TypeError exception.
  5. Return ReadableByteStreamControllerRespondWithNewView(this@[[associatedReadableByteStreamController]], view).

3.12. Readable Stream BYOB Controller Abstract Operations

3.12.1. IsReadableStreamBYOBRequest ( x ) nothrow

  1. If Type(x) is not Object, return false.
  2. If x does not have an [[associatedReadableByteStreamController]] internal slot, return false.
  3. Return true.

3.12.2. IsReadableByteStreamController ( x ) nothrow

  1. If Type(x) is not Object, return false.
  2. If x does not have an [[underlyingByteSource]] internal slot, return false.
  3. Return true.

3.12.3. ReadableByteStreamControllerCallPullIfNeeded ( controller ) nothrow

  1. Let shouldPull be ReadableByteStreamControllerShouldCallPull(controller).
  2. If shouldPull is false, return undefined.
  3. If controller@[[pulling]] is true,
    1. Set controller@[[pullAgain]] to true.
    2. Return undefined.
  4. Set controller@[[pullAgain]] to false.
  5. Set controller@[[pulling]] to true.
  6. Let pullPromise be PromiseInvokeOrNoop(controller@[[underlyingByteSource]], "pull", «‍controller»).
  7. Upon fulfillment of pullPromise,
    1. Set controller@[[pulling]] to false.
    2. If controller@[[pullAgain]] is true,
      1. Set controller@[[pullAgain]] to false.
      2. Perform ReadableByteStreamControllerCallPullIfNeeded(controller).
  8. Upon rejection of pullPromise with reason e,
    1. If controller@[[controlledReadableStream]]@[[state]] is "readable", perform ReadableByteStreamControllerError(controller, e).
  9. Return undefined.

3.12.4. ReadableByteStreamControllerClearPendingPullIntos ( controller ) nothrow

  1. If controller@[[byobRequest]] is not undefined,
    1. Perform ReadableStreamBYOBRequestInvalidate(controller@[[byobRequest]]).
    2. Set controller@[[byobRequest]] to undefined.
  2. Set controller@[[pendingPullIntos]] to a new empty List.

3.12.5. ReadableByteStreamControllerClose ( controller ) nothrow

  1. Let stream be controller@[[controlledReadableStream]].
  2. Assert: controller@[[closeRequested]] is false.
  3. Assert: stream@[[state]] is "readable".
  4. If controller@[[totalQueuedBytes]] > 0,
    1. Set controller@[[closeRequested]] to true.
    2. Return.
  5. Let firstPendingPullInto be the first element of controller@[[pendingPullIntos]].
  6. If ReadableStreamHasBYOBReader(stream) is true and controller@[[pendingPullIntos]] is not empty and firstPendingPullInto.[[bytesFilled]] > 0.
    1. Let e be a TypeError exception.
    2. Perform ReadableByteStreamControllerError(controller, e).
    3. Throw e.
  7. Perform ReadableStreamClose(stream).

3.12.6. ReadableByteStreamControllerCommitPullIntoDescriptor ( stream, pullIntoDescriptor ) nothrow

  1. Assert: stream@[[state]] is not "errored".
  2. Let done be false.
  3. If stream@[[state]] is "closed",
    1. Assert: pullIntoDescriptor.[[bytesFilled]] is not 0.
    2. Let done be true.
  4. Let filledView be ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
  5. If pullIntoDescriptor.[[readerType]] is "default",
    1. ReadableStreamFulfillReadRequest(stream, filledView, done).
  6. Otherwise,
    1. Assert: pullIntoDescriptor.[[readerType]] is "byob",
    2. ReadableStreamFulfillReadIntoRequest(stream, filledView, done).

3.12.7. ReadableByteStreamControllerConvertPullIntoDescriptor ( pullIntoDescriptor ) nothrow

  1. Let bytesFilled be pullIntoDescriptor.[[bytesFilled]].
  2. Let elementSize be pullIntoDescriptor.[[elementSize]].
  3. Assert: bytesFilledpullIntoDescriptor.[[byteLength]].
  4. Assert: bytesFilled mod elementSize is 0.
  5. Return Construct(pullIntoDescriptor.[[ctor]], «pullIntoDescriptor.[[buffer]], pullIntoDescriptor.[[byteOffset]], bytesFilled / elementSize»).

3.12.8. ReadableByteStreamControllerEnqueue ( controller, chunk ) throws

  1. Let stream be controller@[[controlledReadableStream]].
  2. Assert: controller@[[closeRequested]] is false.
  3. Assert: stream@[[state]] is "readable".
  4. Let buffer be chunk@[[ViewedArrayBuffer]].
  5. Let byteOffset be chunk@[[ByteOffset]].
  6. Let byteLength be chunk@[[ByteLength]].
  7. If ReadableStreamHasReader(stream) is true
    1. If ReadableStreamGetNumReadRequests(stream) is 0,
      1. Transfer buffer and let transferredBuffer be the result.
      2. Perform ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
    2. Otherwise,
      1. Assert: controller@[[queue]] is empty.
      2. Transfer buffer and let transferredBuffer be the result.
      3. Let transferredView be Construct(%Uint8Array%, «transferredBuffer, byteOffset, byteLength»).
      4. Perform ReadableStreamFulfillReadRequest(stream, transferredView, false).
  8. Otherwise,
    1. If ReadableStreamHasBYOBReader(stream) is true,
      1. Transfer buffer and let transferredBuffer be the result.
      2. Perform ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).
      3. Perform ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
    2. Otherwise,
      1. Assert: IsReadableStreamLocked(stream) is false.
      2. Transfer buffer and let transferredBuffer be the result.
      3. Perform ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength).

3.12.9. ReadableByteStreamControllerEnqueueChunkToQueue ( controller, buffer, byteOffset, byteLength ) nothrow

  1. Append Record{[[buffer]]: buffer, [[byteOffset]]: byteOffset, [[byteLength]]: byteLength} as the last element of controller@[[queue]].
  2. Add byteLength to controller@[[totalQueuedBytes]].

3.12.10. ReadableByteStreamControllerError ( controller, e ) nothrow

  1. Let stream be controller@[[controlledReadableStream]].
  2. Assert: stream@[[state]] is "readable".
  3. Perform ReadableByteStreamControllerClearPendingPullIntos(controller).
  4. Let controller@[[queue]] be a new empty List.
  5. Perform ReadableStreamError(stream, e).

3.12.11. ReadableByteStreamControllerFillHeadPullIntoDescriptor ( controller, size, pullIntoDescriptor ) nothrow

  1. Assert: controller@[[pendingPullIntos]] is empty or the first element of controller@[[pendingPullIntos]] is pullIntoDescriptor.
  2. If controller@[[byobRequest]] is not undefined,
    1. Perform ReadableStreamBYOBRequestInvalidate(controller@[[byobRequest]]).
    2. Set controller@[[byobRequest]] to undefined.
  3. Add size to pullIntoDescriptor.[[bytesFilled]].

3.12.12. ReadableByteStreamControllerFillPullIntoDescriptorFromQueue ( controller, pullIntoDescriptor ) nothrow

  1. Let elementSize be pullIntoDescriptor.[[elementSize]].
  2. Let currentAlignedBytes be pullIntoDescriptor.[[bytesFilled]] - pullIntoDescriptor.[[bytesFilled]] mod elementSize.
  3. Let maxBytesToCopy be the minimum of controller@[[totalQueuedBytes]] and pullIntoDescriptor.[[byteLength]] - pullIntoDescriptor.[[bytesFilled]].
  4. Let maxBytesFilled be pullIntoDescriptor.[[bytesFilled]] + maxBytesToCopy.
  5. Let maxAlignedBytes be maxBytesFilled - maxBytesFilled mod elementSize.
  6. Let totalBytesToCopyRemaining be maxBytesToCopy.
  7. Let ready be false.
  8. If maxAlignedBytes > currentAlignedBytes,
    1. Let totalBytesToCopyRemaining be maxAlignedBytes - pullIntoDescriptor.[[bytesFilled]].
    2. Let ready be true.
  9. Let queue be controller@[[queue]].
  10. Repeat the following steps while totalBytesToCopyRemaining > 0,
    1. Let headOfQueue be the first element of queue.
    2. Let bytesToCopy be the minimum of totalBytesToCopyRemaining and headOfQueue.[[byteLength]].
    3. Let destStart be pullIntoDescriptor.[[byteOffset]] + pullIntoDescriptor.[[bytesFilled]].
    4. Set the bytesToCopy bytes of headOfQueue.[[buffer]] at offset headOfQueue.[[byteOffset]] to the bytesToCopy bytes of pullIntoDescriptor.[[buffer]] at offset destStart.
    5. If headOfQueue.[[byteLength]] is bytesToCopy,
      1. Remove the first element of queue, shifting all other elements downward (so that the second becomes the first, and so on).
    6. Otherwise,
      1. Add bytesToCopy to headOfQueue.[[byteOffset]].
      2. Subtract bytesToCopy from headOfQueue.[[byteLength]].
    7. Subtract bytesToCopy from controller@[[totalQueuedBytes]].
    8. Perform ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor).
    9. Subtract bytesToCopy from totalBytesToCopyRemaining.
  11. If ready is false,
    1. Assert: controller@[[totalQueuedBytes]] is 0.
    2. Assert: pullIntoDescriptor@[[bytesFilled]] > 0.
    3. Assert: pullIntoDescriptor@[[bytesFilled]] < pullIntoDescriptor.[[elementSize]].
  12. Return ready.

3.12.13. ReadableByteStreamControllerGetDesiredSize ( controller ) nothrow

  1. Return controller@[[strategyHWM]] - controller@[[totalQueuedBytes]].

3.12.14. ReadableByteStreamControllerHandleQueueDrain ( controller ) nothrow

  1. Assert: controller@[[controlledReadableStream]]@[[state]] is "readable".
  2. If controller@[[totalQueuedBytes]] is 0 and controller@[[closeRequested]] is true,
    1. Perform ReadableStreamClose(controller@[[controlledReadableStream]]).
  3. Otherwise,
    1. Perform ReadableByteStreamControllerCallPullIfNeeded(controller).

3.12.15. ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue ( controller ) nothrow

  1. Assert: controller@[[closeRequested]] is false.
  2. Repeat the following steps while controller@[[pendingPullIntos]] is not empty,
    1. If controller@[[totalQueuedBytes]] is 0, return.
    2. Let pullIntoDescriptor be the first element of controller@[[pendingPullIntos]].
    3. If ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
      1. Perform ReadableByteStreamControllerShiftPendingPullInto(controller).
      2. Perform ReadableByteStreamControllerCommitPullIntoDescriptor(controller@[[controlledReadableStream]], pullIntoDescriptor).

3.12.16. ReadableByteStreamControllerPullInto ( controller, view ) nothrow

  1. Let stream be controller@[[controlledReadableStream]].
  2. Let elementSize be 1.
  3. If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView), set elementSize to the element size specified in The TypedArray Constructors table for view@[[TypedArrayName]].
  4. Let ctor be %DataView%.
  5. If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView), set ctor to the constructor specified in The TypedArray Constructors table for view@[[TypedArrayName]].
  6. Let pullIntoDescriptor be Record{[[buffer]]: view.[[buffer]], [[byteOffset]]: view.[[byteOffset]], [[byteLength]]: view.[[byteLength]], [[bytesFilled]]: 0, [[elementSize]]: elementSize, [[ctor]]: ctor, [[readerType]]: "byob"}.
  7. If controller@[[pendingPullIntos]] is not empty,
    1. Transfer pullIntoDescriptor.[[buffer]] and set pullIntoDescriptor.[[buffer]] to the result.
    2. Append pullIntoDescriptor as the last element of controller@[[pendingPullIntos]].
    3. Return ReadableStreamAddReadIntoRequest(stream).
  8. If stream@[[state]] is "closed",
    1. Let emptyView be Construct(ctor, «view.[[buffer]], view.[[byteOffset]], 0»).
    2. Return a promise resolved with CreateIterResultObject(emptyView, true).
  9. If controller@[[totalQueuedBytes]] > 0,
    1. If ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
      1. Let filledView be ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
      2. Perform ReadableByteStreamControllerHandleQueueDrain(controller).
      3. Return a promise resolved with CreateIterResultObject(filledView, false).
    2. If controller@[[closeRequested]] is true,
      1. Let e be a TypeError exception.
      2. Perform ReadableByteStreamControllerError(controller, e).
      3. Return a promise rejected with e.
  10. Transfer pullIntoDescriptor.[[buffer]] and set pullIntoDescriptor.[[buffer]] to the result.
  11. Append pullIntoDescriptor as the last element of controller@[[pendingPullIntos]].
  12. Let promise be ReadableStreamAddReadIntoRequest(stream).
  13. Perform ReadableByteStreamControllerCallPullIfNeeded(controller).
  14. Return promise.

3.12.17. ReadableByteStreamControllerRespond ( controller, bytesWritten ) throws

  1. Let bytesWritten be ToNumber(bytesWritten).
  2. If IsFiniteNonNegativeNumber(bytesWritten) is false,
    1. Throw a RangeError exception.
  3. Assert: controller@[[pendingPullIntos]] is not empty.
  4. Return ReadableByteStreamControllerRespondInternal(controller, bytesWritten).

3.12.18. ReadableByteStreamControllerRespondInClosedState ( controller, firstDescriptor ) nothrow

  1. Transfer firstDescriptor.[[buffer]] and set firstDescriptor.[[buffer]] to the result.
  2. Assert: firstDescriptor.[[bytesFilled]] is 0.
  3. Let stream be controller@[[controlledReadableStream]].
  4. Repeat the following steps while ReadableStreamGetNumReadIntoRequests(stream) > 0,
    1. Let pullIntoDescriptor be ReadableByteStreamControllerShiftPendingPullInto(controller).
    2. Perform ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor).

3.12.19. ReadableByteStreamControllerRespondInReadableState ( controller, bytesWritten, pullIntoDescriptor ) nothrow

  1. If pullIntoDescriptor.[[bytesFilled]] + bytesWritten > pullIntoDescriptor.[[byteLength]], throw a RangeError exception.
  2. Perform ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor).
  3. If pullIntoDescriptor.[[bytesFilled]] < pullIntoDescriptor.[[elementSize]], return.
  4. Perform ReadableByteStreamControllerShiftPendingPullInto(controller).
  5. Let remainderSize be pullIntoDescriptor.[[bytesFilled]] mod pullIntoDescriptor.[[elementSize]].
  6. If remainderSize > 0,
    1. Let end be pullIntoDescriptor.[[byteOffset]] + pullIntoDescriptor.[[bytesFilled]].
    2. Let remainder be a new ArrayBuffer containing the remainderSize bytes of pullIntoDescriptor.[[buffer]] at offset end - remainderSize.
    3. Perform ReadableByteStreamControllerEnqueueChunkToQueue(controller, remainder, 0, remainder@[[ByteLength]]).
  7. Transfer pullIntoDescriptor.[[buffer]] and set pullIntoDescriptor.[[buffer]] to the result.
  8. Let pullIntoDescriptor.[[bytesFilled]] be pullIntoDescriptor.[[bytesFilled]] - remainderSize.
  9. Perform ReadableByteStreamControllerCommitPullIntoDescriptor(controller@[[controlledReadableStream]], pullIntoDescriptor).
  10. Perform ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).

3.12.20. ReadableByteStreamControllerRespondInternal ( controller, bytesWritten ) throws

  1. Let firstDescriptor be the first element of controller@[[pendingPullIntos]].
  2. Let stream be controller@[[controlledReadableStream]].
  3. If stream@[[state]] is "closed",
    1. If bytesWritten is not 0,
      1. Throw a TypeError exception.
    2. Perform ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
  4. Otherwise,
    1. Assert: stream@[[state]] is "readable".
    2. Perform ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).

3.12.21. ReadableByteStreamControllerRespondWithNewView ( controller, view ) throws

  1. Assert: controller@[[pendingPullIntos]] is not empty.
  2. Let firstDescriptor be the first element of controller@[[pendingPullIntos]].
  3. If firstDescriptor.[[byteOffset]] + firstDescriptor.[[bytesFilled]] is not view@[[ByteOffset]],
    1. Throw a RangeError exception.
  4. If firstDescriptor.[[byteLength]] is not view@[[ByteOffset]],
    1. Throw a RangeError exception.
  5. Set firstDescriptor.[[buffer]] to view@[[ViewedArrayBuffer]].
  6. Return ReadableByteStreamControllerRespondInternal(controller, view@[[ByteLength]]).

3.12.22. ReadableByteStreamControllerShiftPendingPullInto ( controller ) nothrow

  1. Let descriptor be the first element of controller@[[pendingPullIntos]].
  2. Remove descriptor from controller@[[pendingPullIntos]], shifting all other elements downward (so that the second becomes the first, and so on).
  3. If controller@[[byobRequest]] is not undefined,
    1. Perform ReadableStreamBYOBRequestInvalidate(controller@[[byobRequest]]).
    2. Set controller@[[byobRequest]] to undefined.
  4. Return descriptor.

3.12.23. ReadableByteStreamControllerShouldCallPull ( controller ) nothrow

  1. Let stream be controller@[[controlledReadableStream]].
  2. If stream@[[state]] is not "readable", return false.
  3. If controller@[[closeRequested]] is true, return false.
  4. If controller@[[started]] is false, return false.
  5. If ReadableStreamHasReader(stream) is true and ReadableStreamGetNumReadRequests(stream) > 0, return true.
  6. If ReadableStreamHasBYOBReader(stream) is true and ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
  7. Let desiredSize be ReadableByteStreamControllerGetDesiredSize(controller).
  8. If desiredSize > 0, return true.
  9. Return false.

3.12.24. ReadableStreamBYOBRequestInvalidate ( request ) nothrow

  1. Set request@[[associatedReadableByteStreamController]] to undefined.
  2. Set request@[[view]] to undefined.

4. Writable Streams

Although readable streams have been significantly evolved recently due to implementation progress providing feedback, writable streams have not yet caught up to all the discoveries in that space. As such, while the following spec will be the basis for a final API, it is expected to change in several important ways before being ready to ship. Please follow along on the writable streams issues label for details.

4.1. Using Writable Streams

The usual way to write to a writable stream is to simply pipe a readable stream to it. This ensures that backpressure is respected, so that if the writable stream’s underlying sink is not able to accept data as fast as the readable stream can produce it, the readable stream is informed of this and has a chance to slow down its data production.
readableStream.pipeTo(writableStream)
  .then(() => console.log("All data successfully written!"))
  .catch(e => console.error("Something went wrong!", e));
You can also write directly to writable streams using their write() and close() methods. Since writable streams queue any incoming writes, and take care internally to forward them to the underlying sink in sequence, you can indiscriminately write to a writable stream without much ceremony:
function writeArrayToStream(array, writableStream) {
  array.forEach(chunk => writableStream.write(chunk));

  return writableStream.close();
}

writeArrayToStream([1, 2, 3, 4, 5], writableStream)
  .then(() => console.log("All done!"))
  .catch(e => console.error("Error with the stream: " + e));
In the previous example we only paid attention to the success or failure of the entire stream, by looking at the promise returned by its close() method. That promise (which can also be accessed using the closed getter) will reject if anything goes wrong with the stream—initializing it, writing to it, or closing it. And it will fulfill once the stream is successfully closed. Often this is all you care about.

However, if you care about the success of writing a specific chunk, you can use the promise returned by the stream’s write() method:

writableStream.write("i am a chunk of data")
  .then(() => console.log("chunk successfully written!"))
  .catch(e => console.error(e));

What "success" means is up to a given stream instance (or more precisely, its underlying sink) to decide. For example, for a file stream it could simply mean that the OS has accepted the write, and not necessarily that the chunk has been flushed to disk.

4.2. Class WritableStream#ws-classReferenced in:3.2.4.4. pipeThrough({ writable, readable }, options)4.2.1. Class Definition4.2.2. Internal Slots4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {}) (2) (3)4.2.4. Properties of the WritableStream Prototype7. Global Properties8. Examples of Creating Streams

4.2.1. Class Definition

This section is non-normative.

If one were to write the WritableStream class in something close to the syntax of [ECMASCRIPT], it would look like

class WritableStream {
  constructor(underlyingSink = {}, { size, highWaterMark = 0 } = {})

  get closed()
  get ready()
  get state()

  abort(reason)
  close()
  write(chunk)
}

4.2.2. Internal Slots

Instances of WritableStream are created with the internal slots described in the following table:

Internal Slot Description (non-normative)
[[closedPromise]] A promise that becomes fulfilled when the stream becomes "closed"; returned by the closed getter
[[queue]] A List representing the stream’s internal queue of pending writes
[[started]] A boolean flag indicating whether the underlying sink has finished starting
[[startedPromise]] A promise storing the result of starting the underlying sink, used to delay actions until that is complete
[[state]] A string containing the stream’s current state; returned by the state getter
[[storedError]] A value indicating how the stream failed, to be given as a failure reason or exception when trying to operate on the stream while in the "errored" state
[[strategySize]] A function supplied to the constructor as part of the stream’s queuing strategy, designed to calculate the size of chunks written; can be undefined for the default behavior.
[[strategyHWM]] A number supplied to the constructor as part of the stream’s queuing strategy, indicating the point at which the stream will apply backpressure to any producers.
[[readyPromise]] A promise returned by the ready getter
[[underlyingSink]] An object representation of the stream’s underlying sink; also used for the IsWritableStream brand check
[[writing]] A boolean flag indicating whether the stream is currently writing to the underlying sink, used to prevent concurrent such writes

4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {})

The underlyingSink object passed to the constructor can implement any of the following methods to govern how the constructed stream instance behaves:

The constructor also accepts a second argument containing the queuing strategy object with two properties: a non-negative number highWaterMark, and a function size(chunk). The supplied strategy could be an instance of the built-in CountQueuingStrategy or ByteLengthQueuingStrategy classes, or it could be custom. If no strategy is supplied, the default behavior will be the same as a CountQueuingStrategy with a high water mark of 0.

Due to the way writable streams asynchronously close, it is possible for both close and abort to be called, in cases where the producer aborts the stream while it is in the "closing" state. Notably, since a stream always spends at least one turn in the "closing" state, code like ws.close(); ws.abort(...); will cause both to be called, even if the close method itself has no asynchronous behavior. A well-designed underlying sink object should be able to deal with this.
  1. Set this@[[underlyingSink]] to underlyingSink.
  2. Set this@[[closedPromise]] to a new promise.
  3. Set this@[[readyPromise]] to a new promise resolved with undefined.
  4. Set this@[[queue]] to a new empty List.
  5. Set this@[[state]] to "writable".
  6. Set this@[[started]] and this@[[writing]] to false.
  7. Let normalizedStrategy be ValidateAndNormalizeQueuingStrategy(size, highWaterMark).
  8. Set this@[[strategySize]] to normalizedStrategy.[[size]] and this@[[strategyHWM]] to normalizedStrategy.[[highWaterMark]].
  9. Perform SyncWritableStreamStateWithQueue(this).
  10. Let error be a new WritableStream error function.
  11. Set error@[[stream]] to this.
  12. Let startResult be InvokeOrNoop(underlyingSink, "start", «error»).
  13. ReturnIfAbrupt(startResult).
  14. Set this@[[startedPromise]] to the result of resolving startResult as a promise.
    1. Upon fulfillment,
      1. Set this@[[started]] to true.
      2. Set this@[[startedPromise]] to undefined.
    2. Upon rejection with reason r, perform ErrorWritableStream(this, r).

A WritableStream error function#writablestream-error-functionReferenced in:4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {}) is an anonymous built-in function that is used to allow underlying sinks to error their associated writable stream. Each WritableStream error function has a [[stream]] internal slot. When a WritableStream error function F is called with argument e, it performs the following steps:

  1. Let stream be F@[[stream]].
  2. Perform ErrorWritableStream(stream, e).
  3. Return undefined.

4.2.4. Properties of the WritableStream Prototype

4.2.4.1. get closed#ws-closedReferenced in:4.1. Using Writable Streams4.2.2. Internal Slots
The closed getter returns a promise that will be fulfilled when the stream becomes closed, or rejected if it ever errors.
  1. If IsWritableStream(this) is false, return a promise rejected with a TypeError exception.
  2. Return this@[[closedPromise]].
4.2.4.2. get ready#ws-readyReferenced in:4.2.2. Internal Slots4.2.4.3. get state
The ready getter returns a promise that will be fulfilled when the stream transitions away from the "waiting" state to any other state. Once the stream transitions back to "waiting", the getter will return a new promise that stays pending until the next state transition.

In essence, this promise gives a signal as to when any backpressure has let up (or that the stream has been closed or errored).

  1. If IsWritableStream(this) is false, return a promise rejected with a TypeError exception.
  2. Return this@[[readyPromise]].
4.2.4.3. get state#ws-stateReferenced in:4.2.2. Internal Slots4.2.4.6. write(chunk)
The state getter returns the state of the stream, which will be one of the following:
"waiting"
The stream’s internal queue is full; that is, the stream is exerting backpressure. Use ready to be notified of when the pressure subsides.
"writable"
The stream’s internal queue is not full; call write() until backpressure is exerted.
"closing"
The stream’s close() method has been called, and a command to close is in the queue or being processed by the underlying sink; attempts to write will now fail.
"closed"
The underlying sink has been closed; writing is no longer possible.
"errored"
An error occurred interacting with the underlying sink or the stream has been aborted, so the stream is now dead.
  1. If IsWritableStream(this) is false, throw a TypeError exception.
  2. Return this@[[state]].
4.2.4.4. abort(reason)
The abort method signals that the producer can no longer successfully write to the stream and it should be immediately moved to an "errored" state, with any queued-up writes discarded. This will also execute any abort mechanism of the underlying sink.
  1. If IsWritableStream(this) is false, return a promise rejected with a TypeError exception.
  2. If this@[[state]] is "closed", return a new promise resolved with undefined.
  3. If this@[[state]] is "errored", return a new promise rejected with this@[[storedError]].
  4. Perform ErrorWritableStream(this, reason).
  5. Let sinkAbortPromise be PromiseInvokeOrFallbackOrNoop(this@[[underlyingSink]], "abort", «reason», "close", «»).
  6. Return the result of transforming sinkAbortPromise by a fulfillment handler that returns undefined.
4.2.4.5. close()#ws-closeReferenced in:4.1. Using Writable Streams (2)4.2.4.3. get state
The close method signals that the producer is done writing chunks to the stream and wishes to move the stream to a "closed" state. This queues an action to close the stream, such that once any currently queued-up writes complete, the close mechanism of the underlying sink will execute, releasing any held resources. In the meantime, the stream will be in a "closing" state.
  1. If IsWritableStream(this) is false, return a promise rejected with a TypeError exception.
  2. If this@[[state]] is "closing" or "closed", return a promise rejected with a TypeError exception.
  3. If this@[[state]] is "errored", return a promise rejected with this@[[storedError]].
  4. If this@[[state]] is "waiting", resolve this@[[readyPromise]] with undefined.
  5. Set this@[[state]] to "closing".
  6. Perform EnqueueValueWithSize(this@[[queue]], "close", 0).
  7. Perform CallOrScheduleWritableStreamAdvanceQueue(this).
  8. Return this@[[closedPromise]].
4.2.4.6. write(chunk)#ws-writeReferenced in:4.1. Using Writable Streams (2)4.2.4.3. get state
The write method adds a write to the stream’s internal queue, instructing the stream to write the given chunk of data to the underlying sink once all other pending writes have finished successfully. It returns a promise that will be fulfilled or rejected depending on the success or failure of writing the chunk to the underlying sink.

The impact of enqueuing this chunk will be immediately reflected in the stream’s state property; in particular, if the internal queue is now full according to the stream’s queuing strategy, the stream will exert backpressure by changing its state to "waiting".

  1. If IsWritableStream(this) is false, return a promise rejected with a TypeError exception.
  2. If this@[[state]] is "closing" or "closed", return a promise rejected with a TypeError exception.
  3. If this@[[state]] is "errored", return a promise rejected with this@[[storedError]].
  4. Assert: this@[[state]] is either "waiting" or "writable".
  5. Let chunkSize be 1.
  6. If this@[[strategySize]] is not undefined, then
    1. Set chunkSize to Call(this@[[strategySize]], undefined, «‍chunk»).
    2. If chunkSize is an abrupt completion,
      1. Perform ErrorWritableStream(this, chunkSize.[[value]]).
      2. Return a new promise rejected with chunkSize.[[value]].
    3. Set chunkSize to chunkSize.[[value]].
  7. Let promise be a new promise.
  8. Let writeRecord be Record{[[promise]]: promise, [[chunk]]: chunk}.
  9. Let enqueueResult be EnqueueValueWithSize(this@[[queue]], writeRecord, chunkSize).
  10. If enqueueResult is an abrupt completion,
    1. Perform ErrorWritableStream(this, enqueueResult.[[value]]).
    2. Return a new promise rejected with enqueueResult.[[value]].
  11. Perform SyncWritableStreamStateWithQueue(this).
  12. Perform CallOrScheduleWritableStreamAdvanceQueue(this).
  13. Return promise.

4.3. Writable Stream Abstract Operations

4.3.1. CallOrScheduleWritableStreamAdvanceQueue ( stream ) nothrow

  1. If stream@[[started]] is false, then
    1. Upon fulfillment of stream@[[startedPromise]], perform WritableStreamAdvanceQueue(stream).
  2. Otherwise,
    1. Perform WritableStreamAdvanceQueue(stream).
  3. Return undefined.

4.3.2. CloseWritableStream ( stream ) nothrow

  1. Assert: stream@[[state]] is "closing".
  2. Let sinkClosePromise be PromiseInvokeOrNoop(stream@[[underlyingSink]], "close").
    1. Upon fulfillment,
      1. If stream@[[state]] is "errored", return.
      2. Assert: stream@[[state]] is "closing".
      3. Resolve stream@[[closedPromise]] with undefined.
      4. Set stream@[[state]] to "closed".
    2. Upon rejection with reason r, perform ErrorWritableStream(stream, r).
  3. Return undefined.

4.3.3. ErrorWritableStream ( stream, e ) nothrow

  1. If stream@[[state]] is "closed" or "errored", return undefined.
  2. Repeat while stream@[[queue]] is not empty:
    1. Let writeRecord be DequeueValue(stream@[[queue]]).
    2. If writeRecord is not "close", reject writeRecord.[[promise]] with e.
  3. Set stream@[[storedError]] to e.
  4. If stream@[[state]] is "waiting", resolve stream@[[readyPromise]] with undefined.
  5. Reject stream@[[closedPromise]] with e.
  6. Set stream@[[state]] to "errored".
  7. Return undefined.

4.3.4. IsWritableStream ( x ) nothrow

  1. If Type(x) is not Object, return false.
  2. If x does not have a [[underlyingSink]] internal slot, return false.
  3. Return true.

4.3.5. SyncWritableStreamStateWithQueue ( stream ) nothrow

  1. If stream@[[state]] is "closing", return undefined.
  2. Assert: stream@[[state]] is either "writable" or "waiting".
  3. Let queueSize be GetTotalQueueSize(stream@[[queue]]).
  4. Let shouldApplyBackpressure be true if queueSize > stream@[[strategyHWM]], and false otherwise.
  5. If shouldApplyBackpressure is true and stream@[[state]] is "writable", then
    1. Set stream@[[state]] to "waiting".
    2. Set stream@[[readyPromise]] to a new promise.
  6. If shouldApplyBackpressure is false and stream@[[state]] is "waiting", then
    1. Set stream@[[state]] to "writable".
    2. Resolve stream@[[readyPromise]] with undefined.
  7. Return undefined.

4.3.6. WritableStreamAdvanceQueue ( stream ) nothrow

  1. If stream@[[queue]] is empty, or stream@[[writing]] is true, return undefined.
  2. Let writeRecord be PeekQueueValue(stream@[[queue]]).
  3. If writeRecord is "close", then
    1. Assert: stream@[[state]] is "closing".
    2. Perform DequeueValue(stream@[[queue]]).
    3. Assert: stream@[[queue]] is now empty.
    4. Perform CloseWritableStream(stream).
    5. Return undefined.
  4. Set stream@[[writing]] to true.
  5. Let writeResult be PromiseInvokeOrNoop(stream@[[underlyingSink]], "write", «writeRecord.[[chunk]]»).
  6. Upon fulfillment of writeResult,
    1. If stream@[[state]] is "errored", return.
    2. Set stream@[[writing]] to false.
    3. Resolve writeRecord.[[promise]] with undefined.
    4. Perform DequeueValue(stream@[[queue]]).
    5. Perform SyncWritableStreamStateWithQueue(stream).
    6. Perform WritableStreamAdvanceQueue(stream).
  7. Upon rejection of writeResult with reason r, perform ErrorWritableStream(stream, r).
  8. Return undefined.

5. Transform Streams

Transform streams have been developed in the testable implementation, but not yet re-encoded in spec language. We are waiting to validate their design before doing so. In the meantime, see reference-implementation/lib/transform-stream.js.

6. Other Stream APIs and Operations

6.1. Class ByteLengthQueuingStrategy#blqs-classReferenced in:3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {})4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {})6.1.1. Class Definition (2)6.1.3. Properties of the ByteLengthQueuingStrategy Prototype7. Global Properties

A common queuing strategy when dealing with bytes is to wait until the accumulated byteLength properties of the incoming chunks reaches a specified high-water mark. As such, this is provided as a built-in queuing strategy that can be used when constructing streams.

When creating a readable stream or writable stream, you can supply a byte-length queuing strategy directly:
const stream = new ReadableStream(
  { ... },
  new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 })
);

In this case, 16 KiB worth of chunks can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.

const stream = new WritableStream(
  { ... },
  new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 })
);

In this case, 32 KiB worth of chunks can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers.

6.1.1. Class Definition

This section is non-normative.

If one were to write the ByteLengthQueuingStrategy class in something close to the syntax of [ECMASCRIPT], it would look like

class ByteLengthQueuingStrategy {
  constructor({ highWaterMark })
  size(chunk)
}

Each ByteLengthQueuingStrategy instance will additionally have an own data property highWaterMark set by its constructor.

6.1.2. new ByteLengthQueuingStrategy({ highWaterMark })

The constructor takes a nonnegative number for the high-water mark, and stores it on the object as a property.
  1. CreateDataProperty(this, "highWaterMark", highWaterMark).

6.1.3. Properties of the ByteLengthQueuingStrategy Prototype

6.1.3.1. size(chunk)
The size method returns the given chunk’s byteLength property. (If the chunk doesn’t have one, it will return undefined, causing the stream using this strategy to error.)

This method is intentionally generic; it does not require that its this value be a ByteLengthQueuingStrategy object.

  1. Return GetV(chunk, "byteLength").

6.2. Class CountQueuingStrategy#cqs-classReferenced in:3.2.3. new ReadableStream(underlyingSource = {}, { size, highWaterMark = 1 } = {}) (2)4.2.3. new WritableStream(underlyingSink = {}, { size, highWaterMark = 0 } = {}) (2)6.2.1. Class Definition (2)6.2.3. Properties of the CountQueuingStrategy Prototype7. Global Properties

A common queuing strategy when dealing with streams of generic objects is to simply count the number of chunks that have been accumulated so far, waiting until this number reaches a specified high-water mark. As such, this strategy is also provided out of the box.

When creating a readable stream or writable stream, you can supply a count queuing strategy directly:
const stream = new ReadableStream(
  { ... },
  new CountQueuingStrategy({ highWaterMark: 10 })
);

In this case, 10 chunks (of any kind) can be enqueued by the readable stream’s underlying source before the readable stream implementation starts sending backpressure signals to the underlying source.

const stream = new WritableStream(
  { ... },
  new CountQueuingStrategy({ highWaterMark: 5 })
);

In this case, five chunks (of any kind) can be accumulated in the writable stream’s internal queue, waiting for previous writes to the underlying sink to finish, before the writable stream starts sending backpressure signals to any producers.

6.2.1. Class Definition

This section is non-normative.

If one were to write the CountQueuingStrategy class in something close to the syntax of [ECMASCRIPT], it would look like

class CountQueuingStrategy {
  constructor({ highWaterMark })
  size()
}

Each CountQueuingStrategy instance will additionally have an own data property highWaterMark set by its constructor.

6.2.2. new CountQueuingStrategy({ highWaterMark })

The constructor takes a nonnegative number for the high-water mark, and stores it on the object as a property.
  1. CreateDataProperty(this, "highWaterMark", highWaterMark).

6.2.3. Properties of the CountQueuingStrategy Prototype

6.2.3.1. size()
The size method returns one always, so that the total queue size is a count of the number of chunks in the queue.

This method is intentionally generic; it does not require that its this value be a CountQueuingStrategy object.

  1. Return 1.

6.3. Queue-with-Sizes Operations

The streams in this specification use a "queue-with-sizes" data structure to store queued up values, along with their determined sizes. A queue-with-sizes is a List of Records with [[value]] and [[size]] fields (although in implementations it would of course be backed by a more efficient data structure).

A number of abstract operations are specified here to make working with queues-with-sizes more pleasant, and used throughout the rest of this standard.

6.3.1. DequeueValue ( queue ) nothrow

  1. Assert: queue is not empty.
  2. Let pair be the first element of queue.
  3. Remove pair from queue, shifting all other elements downward (so that the second becomes the first, and so on).
  4. Return pair.[[value]].

6.3.2. EnqueueValueWithSize ( queue, value, size ) throws

  1. Let size be ToNumber(size).
  2. ReturnIfAbrupt(size).
  3. If size is NaN, +∞, or negative, throw a RangeError exception.
  4. Append Record{[[value]]: value, [[size]]: size} as the last element of queue.
  5. Return undefined.

6.3.3. GetTotalQueueSize ( queue ) nothrow

  1. Let totalSize be 0.
  2. Repeat for each Record{[[value]], [[size]]} pair that is an element of queue,
    1. Assert: pair.[[size]] is a finite, non-NaN number.
    2. Add pair.[[size]] to totalSize.
  3. Return totalSize.

6.3.4. PeekQueueValue ( queue ) nothrow

  1. Assert: queue is not empty.
  2. Let pair be the first element of queue.
  3. Return pair.[[value]].

6.4. Miscellaneous Operations

A few abstract operations are used in this specification for utility purposes. We define them here.

6.4.1. IsFiniteNonNegativeNumber ( v ) nothrow

  1. If v is NaN, return false.
  2. If v is +∞, return false.
  3. If v < 0, return false.
  4. Return true.

6.4.2. InvokeOrNoop ( O, P, args ) throws

InvokeOrNoop is a slight modification of the [ECMASCRIPT] Invoke abstract operation to return undefined when the method is not present.
  1. Assert: P is a valid property key.
  2. If args was not passed, let args be a new empty List.
  3. Let method be GetV(O, P).
  4. ReturnIfAbrupt(method).
  5. If method is undefined, return undefined.
  6. Return Call(method, O, args).

6.4.3. PromiseInvokeOrFallbackOrNoop ( O, P1, args1, P2, args2 ) nothrow

PromiseInvokeOrFallbackOrNoop is a specialized version of promise-calling that works on methods, calls a fallback method if the first method is not present, and returns a promise for undefined when neither method is not present.
  1. Assert: P1 is a valid property key.
  2. Assert: P2 is a valid property key.
  3. Let method be GetV(O, P1).
  4. If method is an abrupt completion, return a new promise rejected with method.[[value]].
  5. Let method be method.[[value]].
  6. If method is undefined, return PromiseInvokeOrNoop(O, P2, args2).
  7. Let returnValue be Call(method, O, args1).
  8. If returnValue is an abrupt completion, return a new promise rejected with returnValue.[[value]].
  9. Otherwise, return a new promise resolved with returnValue.[[value]].

6.4.4. PromiseInvokeOrNoop ( O, P, args ) nothrow

PromiseInvokeOrNoop is a specialized version of promise-calling that both works on methods and returns a promise for undefined when the method is not present.
  1. Assert: P is a valid property key.
  2. If args was not passed, let args be a new empty List.
  3. Let method be GetV(O, P).
  4. If method is an abrupt completion, return a new promise rejected with method.[[value]].
  5. Let method be method.[[value]].
  6. If method is undefined, return a new promise resolved with undefined.
  7. Let returnValue be Call(method, O, args).
  8. If returnValue is an abrupt completion, return a new promise rejected with returnValue.[[value]].
  9. Otherwise, return a new promise resolved with returnValue.[[value]].

6.4.5. ValidateAndNormalizeHighWaterMark ( highWaterMark ) throws

  1. Set highWaterMark to ToNumber(highWaterMark).
  2. ReturnIfAbrupt(highWaterMark).
  3. If highWaterMark is NaN, throw a TypeError exception.
  4. If highWaterMark < 0, throw a RangeError exception.
  5. Return highWaterMark.

6.4.6. ValidateAndNormalizeQueuingStrategy ( size, highWaterMark ) throws

  1. If size is not undefined and IsCallable(size) is false, throw a TypeError exception.
  2. Let highWaterMark be ValidateAndNormalizeHighWaterMark(highWaterMark).
  3. ReturnIfAbrupt(highWaterMark).
  4. Return Record{[[size]]: size, [[highWaterMark]]: highWaterMark}.

7. Global Properties

The following constructors must be exposed on the global object as data properties of the same name:

The attributes of these properties must be { [[Writable]]: true, [[Enumerable]]: false, [[Configurable]]: true }.

The ReadableStreamDefaultReader and ReadableStreamBYOBReader classes are specifically not exposed, as while they do have a functioning constructor, instances should instead be created through the getReader() method of a ReadableStream instance. Similarly, the supporting classes ReadableStreamDefaultController, ReadableByteStreamController, and ReadableStreamBYOBRequest are not exposed, since they are not independently useful outside of the ReadableStream implementation.

8. Examples of Creating Streams

This section, and all its subsections, are non-normative.

The previous examples throughout the standard have focused on how to use streams. Here we show how to create a stream, using the ReadableStream or WritableStream constructors.

8.1. A readable stream with an underlying push source (no backpressure support)

The following function creates readable streams that wrap WebSocket instances [HTML], which are push sources that do not support backpressure signals. It illustrates how, when adapting a push source, usually most of the work happens in the start function.

function makeReadableWebSocketStream(url, protocols) {
  const ws = new WebSocket(url, protocols);
  ws.binaryType = "arraybuffer";

  return new ReadableStream({
    start(controller) {
      ws.onmessage = event => controller.enqueue(event.data);
      ws.onend = () => controller.close();
      ws.onerror = () => controller.error(new Error("The WebSocket errored!"));
    },

    cancel() {
      ws.close();
    }
  });
}

We can then use this function to create readable streams for a web socket, and pipe that stream to an arbitrary writable stream:

const webSocketStream = makeReadableWebSocketStream("wss://example.com:443/", "protocol");

webSocketStream.pipeTo(writableStream)
  .then(() => console.log("All data successfully written!"))
  .catch(e => console.error("Something went wrong!", e));

8.2. A readable stream with an underlying push source and backpressure support

The following function returns readable streams that wrap "backpressure sockets," which are hypothetical objects that have the same API as web sockets, but also provide the ability to pause and resume the flow of data with their readStop and readStart methods. In doing so, this example shows how to apply backpressure to underlying sources that support it.

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // The internal queue is full, so propagate
          // the backpressure signal to the underlying source.
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(new Error("The socket errored!"));
    },

    pull() {
      // This is called if the internal queue has been emptied, but the
      // stream’s consumer still wants more data. In that case, restart
      // the flow of data if we have previously paused it.
      socket.readStart();
    },

    cancel() {
      socket.close();
    }
  });
}

We can then use this function to create readable streams for such "backpressure sockets" in the same way we do for web sockets. This time, however, when we pipe to a destination that cannot accept data as fast as the socket is producing it, or if we leave the stream alone without reading from it for some time, a backpressure signal will be sent to the socket.

8.3. A readable byte stream with an underlying push source and backpressure support

The following function returns readable byte streams that allow efficient direct reading of TCP sockets, based on a hypothetical JavaScript translation of the POSIX socket API (the main innovation of which is translating select(2) into an EventTarget emitting a readable event). [DOM]

This setup allows zero-copy reading directly into developer-supplied buffers. Additionally, it ensures that when data is available from the socket but not yet requested by the developer, it is enqueued in the stream’s internal queue, to avoid overflowing the kernel-space queue. In this case, an additional copy will potentially be necessary when using a BYOB reader, to move the data from the stream’s internal queue to the developer-supplied buffer. If this occurs, backpressure will immediately be applied downstream on the socket, by adjusting the TCP window size.

const DEFAULT_WINDOW_SIZE = 65536;

function makeReadableBackpressureByteSocketStream(host, port) {
  const socket = createHypotheticalSelect2Socket(host, port);

  return new ReadableStream({
    type: "bytes",

    start(controller) {
      socket.setTCPWindowSize(Math.max(0, controller.desiredSize));

      socket.onreadable = () => {
        // Since onreadable can happen even when there’s no pending BYOB
        // requests, we need to handle both cases.
        if (controller.byobRequest) {
          const v = controller.byobRequest.view;
          const bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
          controller.byobRequest.respond(bytesRead);
        } else {
          const buffer = new ArrayBuffer(DEFAULT_WINDOW_SIZE);
          const bytesRead = socket.readInto(buffer, 0, DEFAULT_WINDOW_SIZE);
          controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
        }

        // The internal queue size has changed, so propagate
        // the backpressure signal to the underlying source.
        socket.setTCPWindowSize(Math.max(0, controller.desiredSize));
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(new Error("The socket errored!"));
    },

    pull(controller) {
      // This is called when the internal queue has been drained, and the
      // stream’s consumer can accept more data. Reflect the up-to-date
      // backpressure level by setting the TCP receive window of the socket
      // to desiredSize.
      socket.setTCPWindowSize(Math.max(0, controller.desiredSize));
    },

    cancel() {
      socket.close();
    }
  }, {
    highWaterMark: DEFAULT_WINDOW_SIZE
  });
}

ReadableStream instances returned from this function can now vend BYOB readers, with all of the aforementioned benefits and caveats.

8.4. A readable stream with an underlying pull source

The following function returns readable streams that wrap portions of the Node.js file system API (which themselves map fairly directly to C’s fopen, fread, and fclose trio). Files are a typical example of pull sources. Note how in contrast to the examples with push sources, most of the work here happens on-demand in the pull function, and not at startup time in the start function.

const fs = require("pr/fs"); // https://github.com/jden/pr
const CHUNK_SIZE = 1024;

function makeReadableFileStream(filename) {
  let fd;
  let position = 0;

  return new ReadableStream({
    start() {
      return fs.open(filename, "r").then(result => {
        fd = result;
      });
    },

    pull(controller) {
      const buffer = new ArrayBuffer(CHUNK_SIZE);

      return fs.read(fd, buffer, 0, CHUNK_SIZE, position).then(bytesRead => {
        if (bytesRead === 0) {
          return fs.close(fd).then(() => controller.close());
        } else {
          position += bytesRead;
          controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
        }
      });
    },

    cancel() {
      return fs.close(fd);
    }
  });
}

We can then create and use readable streams for files just as we could before for sockets.

8.5. A readable byte stream with an underlying pull source

The following function returns readable byte streams that allow efficient zero-copy reading of files, again using the Node.js file system API. Instead of using a predetermined chunk size of 1024, it attempts to fill the developer-supplied buffer, allowing full control.

const fs = require("pr/fs"); // https://github.com/jden/pr
const DEFAULT_CHUNK_SIZE = 1024;

function makeReadableByteFileStream(filename) {
  let fd;
  let position = 0;

  return new ReadableStream({
    type: "bytes",

    start() {
      return fs.open(filename, "r").then(result => {
        fd = result;
      });
    },

    pull(controller) {
      // Even when the consumer is using the default reader, the auto-allocation
      // feature allocates a buffer and passes it to us via byobRequest.
      const v = controller.byobRequest.view;

      return fs.read(fd, v.buffer, v.byteOffset, v.byteLength, position).then(bytesRead => {
        if (bytesRead === 0) {
          return fs.close(fd).then(() => controller.close());
        } else {
          position += bytesRead;
          controller.byobRequest.respond(bytesRead);
        }
      });
    },

    cancel() {
      return fs.close(fd);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE
  });
}

With this in hand, we can create and use BYOB readers for the returned ReadableStream. But we can also create default readers, using them in the same simple and generic manner as usual. The adaptation between the low-level byte tracking of the underlying byte source shown here, and the higher-level chunk-based consumption of a default reader, is all taken care of automatically by the streams implementation. The auto-allocation feature, via the autoAllocateChunkSize option, even allows us to write less code, compared to the manual branching in §8.3 A readable byte stream with an underlying push source and backpressure support.

8.6. A writable stream with no backpressure or success signals

The following function returns a writable stream that wraps a WebSocket [HTML]. Web sockets do not provide any way to tell when a given chunk of data has been successfully sent, so this writable stream has no ability to communicate backpressure signals or write success/failure to its producers. That is, it will always be in the "writable" state, and the promise returned by its write() method will always fulfill immediately.

function makeWritableWebSocketStream(url, protocols) {
  const ws = new WebSocket(url, protocols);

  return new WritableStream({
    start(error) {
      ws.onerror = error;
      return new Promise(resolve => ws.onopen = resolve);
    },

    write(chunk) {
      ws.send(chunk);
      // Return immediately, since the web socket gives us no way to tell
      // when the write completes.
    },

    close() {
      return new Promise((resolve, reject) => {
        ws.onclose = resolve;
        ws.close();
      });
    }
  });
}

We can then use this function to create writable streams for a web socket, and pipe an arbitrary readable stream to it:

const webSocketStream = makeWritableWebSocketStream("wss://example.com:443/", "protocol");

readableStream.pipeTo(webSocketStream)
  .then(() => console.log("All data successfully written!"))
  .catch(e => console.error("Something went wrong!", e));

8.7. A writable stream with backpressure and success signals

The following function returns writable streams that wrap portions of the io.js file system API (which themselves map fairly directly to C’s fopen, fwrite, and fclose trio). Since the API we are wrapping provides a way to tell when a given write succeeds, this stream will be able to communicate backpressure signals as well as whether an individual write succeeded or failed.

const fs = require("pr/fs"); // https://github.com/jden/pr

function makeWritableFileStream(filename) {
  let fd;

  return new WritableStream({
    start() {
      return fs.open(filename, "w").then(result => {
        fd = result;
      });
    },

    write(chunk) {
      return fs.write(fd, chunk, 0, chunk.length);
    }

    close() {
      return fs.close(fd);
    }
  });
}

We can then use this function to create a writable stream for a file, and write individual chunks of data to it:

const fileStream = makeWritableFileStream("/example/path/on/fs.txt");

fileStream.write("To stream, or not to stream\n");
fileStream.write("That is the question\n");

fileStream.close()
  .then(() => console.log("chunks written and stream closed successfully!"))
  .catch(e => console.error(e));

Note that if a particular call to fs.write takes a longer time, the returned promise will fulfill later. In the meantime, additional writes can be queued up, which are stored in the stream’s internal queue. The accumulation of chunks in this queue can move the stream into a "waiting" state, which is a signal to producers that they should back off and stop writing if possible.

The way in which the writable stream queues up writes is especially important in this case, since as stated in the documentation for fs.write, "it is unsafe to use fs.write multiple times on the same file without waiting for the [promise]." But we don’t have to worry about that when writing the makeWritableFileStream function, since the stream implementation guarantees that the underlying sink’s write method will not be called until any promises returned by previous calls have fulfilled!

8.8. A { readable, writable } stream pair wrapping the same underlying resource

The following function returns an object of the form { readable, writable }, with the readable property containing a readable stream and the writable property containing a writable stream, where both streams wrap the same underlying web socket resource. In essence, this combines §8.1 A readable stream with an underlying push source (no backpressure support) and §8.6 A writable stream with no backpressure or success signals.

While doing so, it illustrates how you can use JavaScript classes to create reusable underlying sink and underlying source abstractions.

function streamifyWebSocket(url, protocol) {
  const ws = new WebSocket(url, protocols);
  ws.binaryType = "arraybuffer";

  return {
    readable: new ReadableStream(new WebSocketSource(ws)),
    writable: new WritableStream(new WebSocketSink(ws))
  };
}

class WebSocketSource {
  constructor(ws) {
    this._ws = ws;
  }

  start(controller) {
    this._ws.onmessage = event => controller.enqueue(event.data);
    this._ws.onend = () => controller.close();

    this._ws.addEventListener("error", () => {
      controller.error(new Error("The WebSocket errored!"));
    });
  }

  cancel() {
    this._ws.close();
  }
}

class WebSocketSink {
  constructor(ws) {
    this._ws = ws;
  }

  start(error) {
    this._ws.addEventListener("error", () => {
      error(new Error("The WebSocket errored!"));
    });

    return new Promise(resolve => this._ws.onopen = resolve);
  }

  write(chunk) {
    this._ws.send(chunk);
  }

  close() {
    return new Promise((resolve, reject) => {
      this._ws.onclose = resolve;
      this._ws.close();
    });
  }
});

We can then use the objects created by this function to communicate with a remote web socket, using the standard stream APIs:

const streamyWS = streamifyWebSocket("wss://example.com:443/", "protocol");

streamyWS.writable.write("Hello");
streamyWS.writable.write("web socket!");

streamyWS.readable.read().then(({ value, done }) => {
  console.log("The web socket says: ", value);
});

Note how in this setup canceling the readable side will implicitly close the writable side, and similarly, closing or aborting the writable side will implicitly close the readable side.

streamyWS.readable.cancel().then(() => {
  assert(streamyWS.writable.state === "closed");
});

Conventions

This specification uses algorithm conventions very similar to those of [ECMASCRIPT]. However, it deviates in the following ways, mostly for brevity. It is hoped (and vaguely planned) that eventually the conventions of ECMAScript itself will evolve in these ways.

Acknowledgments

The editor would like to thank Anne van Kesteren, Ben Kelly, Brian di Palma, Calvin Metcalf, Dominic Tarr, Ed Hager, Forbes Lindesay, 贺师俊 (hax), Jake Archibald, Jens Nockert, Mangala Sadhu Sangeet Singh Khalsa, Marcos Caceres, Michael Mior, Mihai Potra, Stephen Sugden, Tab Atkins, Thorsten Lorenz, Tim Caswell, Trevor Norris, tzik, Youenn Fablet, and Xabier Rodríguez for their contributions to this specification.

Special thanks to: Bert Belder for bringing up implementation concerns that led to crucial API changes; Forrest Norvell for his work on the initial reference implementation; Gorgi Kosev for his breakthrough idea of separating piping into two methods, thus resolving a major sticking point; Isaac Schlueter for his pioneering work on JavaScript streams in Node.js; Jake Verbaten for his early involvement and support; Janessa Det for the logo; Will Chan for his help ensuring that the API allows high-performance network streaming; and 平野裕 (Yutaka Hirano) for his help with the readable stream reader design.

This standard is written by Domenic Denicola (Google, [email protected]) with substantial help from 吉野剛史 (Takeshi Yoshino, Google, [email protected]).

Per CC0, to the extent possible under law, the editor has waived all copyright and related or neighboring rights to this work.

Index

Terms defined by this specification

Terms defined by reference

References

Normative References

[ECMASCRIPT]
ECMAScript Language Specification. URL: https://tc39.github.io/ecma262/
[HTML]
Ian Hickson. HTML Standard. Living Standard. URL: https://html.spec.whatwg.org/multipage/

Informative References

[DOM]
Anne van Kesteren. DOM Standard. Living Standard. URL: https://dom.spec.whatwg.org/
[FETCH]
Anne van Kesteren. Fetch Standard. Living Standard. URL: https://fetch.spec.whatwg.org/
[SERVICE-WORKERS]
Alex Russell; Jungkee Song; Jake Archibald. Service Workers. 25 June 2015. WD. URL: https://slightlyoff.github.io/ServiceWorker/spec/service_worker/