Fun with JS streams

Published August 28th, 2025

Portrait of Chris Tarquini.

by Chris Tarquini, LaunchDarkly Staff Strategic Solutions Architect

In 2021, modern browsers and Node.js (v17+) adopted the WHATWG Streams standard, unlocking a powerful new way to handle data. Streams enable structured, incremental processing of data. Similar to Unix pipelines but built directly into the web platform and runtime. Streams come with a few key benefits:

  • Pipelines – Compose data transformations by chaining streams, just like command-line tools.

  • Backpressure-aware Queues – Automatically throttle producers and consumers to avoid overloading memory or CPU.

  • Async Iteration – Treat streams like async iterables, making integration with modern for await…of syntax seamless and intuitive.

Streams are usually thought of in the context of I/O. Here’s how an SSE client might be implemented with streams:

1const resp = await fetch("https://stream.example.com/sse")
2const sseEventStream = resp.body.pipeThrough(new SSEParserStream())
3// instead of sseEventStream.getReader().read(), we can use async iteration
4for await (const {event, data} of sseEventStream) {
5 // do something with each event
6}

Another use-case is to turn events into streams. I like this pattern because:

  • You can enforce FIFO ordering ensuring each event is handled to completion before handling the next event.

  • For async await, syntax looks nicer to me.

  • It’s simple to compose multiple streams to add features like batching, deduplication, and logging in a re-usable and non-intrusive way.

  • Easy to implement queuing and buffering strategies without messing with your business logic.

Here’s a simple example of creating a flag update stream using the LaunchDarkly Node.js server SDK:

1export class FlagUpdateStream extends ReadableStream<string> {
2 constructor(ldClient: LDClient) {
3 super({
4 start: (controller: ReadableStreamDefaultController<string>) => {
5 function onUpdate({key}: {key: string}) {
6 controller.enqueue(key)
7 }
8 ldClient.on('failed', (error) => {
9 controller.error(error)
10 })
11 ldClient.on('update', onUpdate);
12 }
13 });
14 }
15}

Usage:

1const flagUpdateStream = new FlagUpdateStream(ldClient)
2for await(const flagKey of updateStream) {
3 // do stuff
4}

This may seem somewhat trivial, and on the surface it is. But let’s take a look at how this can look with a full pipeline:

1// stream of flag updates
2const flagUpdateStream = new FlagUpdateStream(ldClient)
3// forwards only chunks that match the given filter function
4const releaseFlagFilter = new FilterStream<string>(v => v.startsWith('release-'))
5// input = flag keys, output = results of evaluating that flag with the given context
6const evaluatorStream = new FlagEvaluatorStream(ldClient, context)
7// input = single items, output = arrays of items
8const batchStream = new BatchStream({capacity: 10, flushIntervalSecs: 5})
9// writes to a given SNS topic, sends arrays as batch
10const snsStream = new SNSStream(snsClient, "arn:bla:bla:bla")
11// writes JSON-ified input to stdout
12const consoleStream = new ConsoleStream()
13
14// build the pipeline
15const evaluatorPipeline = flagUpdateStream
16 .pipeThrough(releaseFlagFilter)
17 .pipeThrough(evaluatorStream)
18
19// tee lets us duplicate a stream, let's use that to log to console and to sns
20const [log, output] = evaluatorPipeline.tee()
21const logPipeline = log.pipeTo(consoleStream)
22// SNS messages will be batched
23const snsPipeline = output.pipeThrough(batchStream).pipeTo(snsStream)
24// wait for the pipelines to end
25await Promise.all([logPipeline, snsPipeline])

Streams give us nice lego blocks that we can use to compose different solutions. Here we can easily switch out our batching, logging and destination strategies.

This general pattern was used to create a proof of concept that subscribes to updates and evaluates flags for all of their site contexts and then publishes the results to Amazon SNS. Workers for their various applications will consume these queues and write the flags to their respective databases.

Now, a developer could leverage LaunchDarkly’s attribute based targeting and have the results be materialized in the database their application is already using for flags. There are obviously some caveats here, however, this system allows them to start using LaunchDarkly with no code changes and allows them to access flags from stored procedures critical to their business.

Streams make this implementation highly adaptable, letting us swap out different parts of the pipeline with ease. If you’re familiar with RxJS, these patterns may seem very familiar to you!

Anyway that’s all I have on streams. I like ‘em. Especially when combined with async iteration. Also, pro tip: you can turn any generator into a stream like this:

1// async not necessary here but you can make it async or sync
2async function* series(start, stop) {
3 for(let i = start; i < stop; i++) yield i
4}
5const stream = ReadableStream.from(series(0,10))
(ReadableStream.from(iteratable) is an experimental API so your mileage may vary).