For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
Sign inTry it free
DocsGuidesSDKsIntegrationsAPI docsTutorialsFlagship blog
DocsGuidesSDKsIntegrationsAPI docsTutorialsFlagship blog
  • Flagship blog
    • 52 Blog Posts, Claude, 3 Prompts, Under an Hour
    • Shipping from Oakland: An Observability Hackathon Recap
    • Day 12 | New Year, New Observability
    • Day 11 | What engineering teams really want from Observability
    • Day 10 | Why observability and feature flags go together like milk and cookies
    • Day 9 | The Three Ghosts Haunting Your AI This Holiday Season
    • Day 8 | Observable Multi-Modal Agentic Systems
    • Day 7 | SLOs that actually drive decisions
    • Day 6 | Stop cardinality from stealing your cloud budget
    • Day 5 | Using a Popular Tidying Method to Consolidate Your Observability Stack
    • Day 4 | Tracing the impact of feature flags in your Node.js app
    • Day 3 | Zero-Config Observability with OpenTelemetry
    • Day 2 | Why AI agents need three layers of observability
    • Day 1 | Observability Under the Tree: What Changed in 2025
    • 5 takeaways from my first PyCon JP conference
    • Dungeons & Downtimes: XP gained from our adventure
    • Reverse Proxy for custom domains
    • Adventures in dogfooding: Guarded Releases
    • A quick tool for npm package scanning
    • My DEF CON 33 experience
    • Make every launch a big deal
    • Fun with JS streams
    • Moonshots XXII: Hack to the Future recap
    • A tale of three rate limiters
    • My good friend Claude
    • My approach to React app architecture in 2025
    • Data isolation with ClickHouse row policies
    • Ingest and Visualization for OpenTelemetry Metrics
    • Alert Evaluations: Incremental Merges in ClickHouse
    • Optimizing ClickHouse: The Tactics That Worked for Us
    • Migrating from OpenSearch to ClickHouse
    • Revamping Privacy Mode: A Better Way to Obfuscate Sensitive Data
    • An open-source session replay benchmark
    • LLM-based Grouping of Errors
    • Building GitHub Enhanced Stacktraces
    • Vercel Edge Runtime Support
    • Finding Interesting Sessions with Markov Chains
    • Building Logging Integrations at LaunchDarkly
    • The Network Request Details Panel
    • Using Github as a Headless CMS
    • Your Source Maps Should Be Public
    • Supporting Outside Contributions at LaunchDarkly
    • Managing our design tokens at LaunchDarkly
    • Our Commitment to OpenTelemetry
    • The 5 Best Logging Libraries for Ruby
    • InfluxDB: Visualizing Millions of Customers' Metrics using a Time Series Database
    • 8 Tips to Help You Maximize Chrome DevTools
    • The Debugging Process and Techniques for Web Applications (Part 2/2)
    • 5 Best Node.js Logging Libraries
    • What are rage clicks and how to detect them
    • 5 Best Practices for Maintaining a Clean ReactJS App
    • Is Kafka the Key? The Evolution of LaunchDarkly's Ingest
    • What Is Full Stack Monitoring and How Does It Work?
    • The beauty of contact-first API design
    • What is Frontend Monitoring and What Tools Help You Do It?
    • 5 strategies to monitor the health of your web application
    • Configuring OpenSearch for a Write-Heavy Workload
    • Maximizing Our Machines: Worker Pools At LaunchDarkly
Sign inTry it free
LogoLogo
Flagship blog

Fun with JS streams

Was this page helpful?
Previous

Moonshots XXII: Hack to the Future recap

Next
Built with

Published August 28th, 2025

Portrait of Chris Tarquini.

by Chris Tarquini, LaunchDarkly Staff Strategic Solutions Architect

In 2021, modern browsers and Node.js (v17+) adopted the WHATWG Streams standard, unlocking a powerful new way to handle data. Streams enable structured, incremental processing of data. Similar to Unix pipelines but built directly into the web platform and runtime. Streams come with a few key benefits:

  • Pipelines – Compose data transformations by chaining streams, just like command-line tools.

  • Backpressure-aware Queues – Automatically throttle producers and consumers to avoid overloading memory or CPU.

  • Async Iteration – Treat streams like async iterables, making integration with modern for await…of syntax seamless and intuitive.

Streams are usually thought of in the context of I/O. Here’s how an SSE client might be implemented with streams:

1const resp = await fetch("https://stream.example.com/sse")
2const sseEventStream = resp.body.pipeThrough(new SSEParserStream())
3// instead of sseEventStream.getReader().read(), we can use async iteration
4for await (const {event, data} of sseEventStream) {
5 // do something with each event
6}

Another use-case is to turn events into streams. I like this pattern because:

  • You can enforce FIFO ordering ensuring each event is handled to completion before handling the next event.

  • For async await, syntax looks nicer to me.

  • It’s simple to compose multiple streams to add features like batching, deduplication, and logging in a re-usable and non-intrusive way.

  • Easy to implement queuing and buffering strategies without messing with your business logic.

Here’s a simple example of creating a flag update stream using the LaunchDarkly Node.js server SDK:

1export class FlagUpdateStream extends ReadableStream<string> {
2 constructor(ldClient: LDClient) {
3 super({
4 start: (controller: ReadableStreamDefaultController<string>) => {
5 function onUpdate({key}: {key: string}) {
6 controller.enqueue(key)
7 }
8 ldClient.on('failed', (error) => {
9 controller.error(error)
10 })
11 ldClient.on('update', onUpdate);
12 }
13 });
14 }
15}

Usage:

1const flagUpdateStream = new FlagUpdateStream(ldClient)
2for await(const flagKey of updateStream) {
3 // do stuff
4}

This may seem somewhat trivial, and on the surface it is. But let’s take a look at how this can look with a full pipeline:

1// stream of flag updates
2const flagUpdateStream = new FlagUpdateStream(ldClient)
3// forwards only chunks that match the given filter function
4const releaseFlagFilter = new FilterStream<string>(v => v.startsWith('release-'))
5// input = flag keys, output = results of evaluating that flag with the given context
6const evaluatorStream = new FlagEvaluatorStream(ldClient, context)
7// input = single items, output = arrays of items
8const batchStream = new BatchStream({capacity: 10, flushIntervalSecs: 5})
9// writes to a given SNS topic, sends arrays as batch
10const snsStream = new SNSStream(snsClient, "arn:bla:bla:bla")
11// writes JSON-ified input to stdout
12const consoleStream = new ConsoleStream()
13
14// build the pipeline
15const evaluatorPipeline = flagUpdateStream
16 .pipeThrough(releaseFlagFilter)
17 .pipeThrough(evaluatorStream)
18
19// tee lets us duplicate a stream, let's use that to log to console and to sns
20const [log, output] = evaluatorPipeline.tee()
21const logPipeline = log.pipeTo(consoleStream)
22// SNS messages will be batched
23const snsPipeline = output.pipeThrough(batchStream).pipeTo(snsStream)
24// wait for the pipelines to end
25await Promise.all([logPipeline, snsPipeline])

Streams give us nice lego blocks that we can use to compose different solutions. Here we can easily switch out our batching, logging and destination strategies.

This general pattern was used to create a proof of concept that subscribes to updates and evaluates flags for all of their site contexts and then publishes the results to Amazon SNS. Workers for their various applications will consume these queues and write the flags to their respective databases.

Now, a developer could leverage LaunchDarkly’s attribute based targeting and have the results be materialized in the database their application is already using for flags. There are obviously some caveats here, however, this system allows them to start using LaunchDarkly with no code changes and allows them to access flags from stored procedures critical to their business.

Streams make this implementation highly adaptable, letting us swap out different parts of the pipeline with ease. If you’re familiar with RxJS, these patterns may seem very familiar to you!

Anyway that’s all I have on streams. I like ‘em. Especially when combined with async iteration. Also, pro tip: you can turn any generator into a stream like this:

1// async not necessary here but you can make it async or sync
2async function* series(start, stop) {
3 for(let i = start; i < stop; i++) yield i
4}
5const stream = ReadableStream.from(series(0,10))
(ReadableStream.from(iteratable) is an experimental API so your mileage may vary).