Evnty
Home
GitHub
Home
GitHub
  • Evnty

Evnty

Coverage StatusGithub Build StatusNPM versionDownloadsSnyk

Async-first, reactive event handling library for complex event flows with three powerful primitives: Event (multi-listener broadcast), Signal (promise-like coordination), and Sequence (async queue). Built for both browser and Node.js with full TypeScript support.



Table of Contents

  • Core Concepts
  • Motivation
  • Features
  • Platform Support
  • Installing
  • API
    • ConsumerHandle
      • ConsumerHandle.cursor: number
      • ConsumerHandle[Symbol.dispose](): void
    • Broadcast
      • Broadcast.sink: Fn
      • Broadcast.handleEvent(event: T): void
      • Broadcast.size: number
      • Broadcast.emit(value: T): boolean
      • Broadcast.receive(): Promise
      • Broadcast.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise
      • Broadcast.catch(onrejected?: Fn | null): Promise
      • Broadcast.finally(onfinally?: Action | null): Promise
      • Broadcast.join(): ConsumerHandle
      • Broadcast.getCursor(handle: ConsumerHandle): number
      • Broadcast.leave(handle: ConsumerHandle): void
      • Broadcast.consume(handle: ConsumerHandle): T
      • Broadcast.tryConsume(handle: ConsumerHandle): IteratorResult
      • Broadcast.readable(handle: ConsumerHandle): boolean
      • Broadcast[Symbol.asyncIterator](): AsyncIterator
      • Broadcast.dispose(): void
      • Broadcast[Symbol.dispose](): void
    • DispatchResult
      • DispatchResult.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseLike
      • DispatchResult.all(): T[] | Promise
      • DispatchResult.settled(): PromiseSettledResult[] | Promise
    • Event
      • Event.sink: Fn
      • Event.handleEvent(event: T): void
      • Event.size: number
      • Event.emit(value: T): DispatchResult
      • Event.lacks(listener: Listener): boolean
      • Event.has(listener: Listener): boolean
      • Event.off(listener: Listener): this
      • Event.on(listener: Listener): Unsubscribe
      • Event.once(listener: Listener): Unsubscribe
      • Event.clear(): this
      • Event.receive(): Promise
      • Event.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise
      • Event.catch(onrejected?: Fn | null): Promise
      • Event.finally(onfinally?: Action | null): Promise
      • Event.settle(): Promise
      • Event[Symbol.asyncIterator](): AsyncIterator
      • Event.dispose(): void
      • Event[Symbol.dispose](): void
    • merge(...events: Events): Event
    • createInterval(interval: number): Event
    • createEvent(): Event
    • AsyncIteratorObject
      • AsyncIteratorObject.from(iterable: Iterable): AsyncIteratorObject
      • AsyncIteratorObject.merge(...iterables: AsyncIterable[]): AsyncIteratorObject
      • AsyncIteratorObject.pipe(generatorFactory, signal?: AbortSignal): AsyncIteratorObject
      • AsyncIteratorObject.awaited(): AsyncIteratorObject
      • AsyncIteratorObject.map(callbackfn): AsyncIteratorObject
      • AsyncIteratorObject.filter(predicate): AsyncIteratorObject
      • AsyncIteratorObject.filter(predicate): AsyncIteratorObject
      • AsyncIteratorObject.filter(predicate): AsyncIteratorObject
      • AsyncIteratorObject.filterMap(callbackfn): AsyncIteratorObject
      • AsyncIteratorObject.inspect(callbackfn): AsyncIteratorObject
      • AsyncIteratorObject.enumerate(start: number = 0): AsyncIteratorObject
      • AsyncIteratorObject.take(limit: number): AsyncIteratorObject
      • AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject
      • AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject
      • AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject
      • AsyncIteratorObject.drop(count: number): AsyncIteratorObject
      • AsyncIteratorObject.dropWhile(predicate): AsyncIteratorObject
      • AsyncIteratorObject.flatMap(callback): AsyncIteratorObject
      • AsyncIteratorObject.reduce(callbackfn): AsyncIteratorObject
      • AsyncIteratorObject.reduce(callbackfn, initialValue: R): AsyncIteratorObject
      • AsyncIteratorObject.reduce(callbackfn, ...args: unknown[]): AsyncIteratorObject
      • AsyncIteratorObject.expand(callbackfn): AsyncIteratorObject
      • AsyncIteratorObject[Symbol.asyncIterator](): AsyncIterator
    • Sequence
      • Sequence.merge(target: Sequence, ...sequences: Sequence[]): void
      • Sequence.size: number
      • Sequence.reserve(capacity: number): Promise
      • Sequence.emit(value: T): boolean
      • Sequence.receive(): Promise
      • Sequence.dispose(): void
    • Signal
      • Signal.merge(target: Signal, ...signals: Signal[]): void
      • Signal.emit(value: T): boolean
      • Signal.receive(): Promise
      • Signal.dispose(): void
    • abortableIterable(iterable: AsyncIterable, signal: AbortSignal): AsyncIterable
    • iterate(startOrCount?: number, countWhenTwoArgs?: number, step: number = 1): Iterable
    • toAsyncIterable(iterable: Iterable): AsyncIterable
  • Examples
  • License

Core Concepts

Evnty provides three complementary async primitives, each designed for specific patterns:

🔊 Event - Multi-Listener Broadcasting

Events allow multiple listeners to react to values. All registered listeners are called for each emission.

const clickEvent = createEvent<{ x: number, y: number }>();

// Multiple listeners can subscribe
clickEvent.on(({ x, y }) => console.log(`Click at ${x},${y}`));
clickEvent.on(({ x, y }) => updateUI(x, y));

// All listeners receive the value
clickEvent.emit({ x: 100, y: 200 });

Use Event when:

  • Multiple components need to react to the same occurrence
  • You need pub/sub or observer pattern
  • Listeners should persist across multiple emissions

📡 Signal - Promise-Based Coordination

Signals are for coordinating async operations. When a value is sent, ALL waiting consumers receive it (broadcast).

const signal = new Signal<string>();

// Multiple consumers can wait
const promise1 = signal.receive();
const promise2 = signal.receive();

// Send value - all waiting consumers receive it
signal.emit('data');
const [result1, result2] = await Promise.all([promise1, promise2]);
// result1 === 'data' && result2 === 'data'

Use Signal when:

  • You need one-time notifications
  • Multiple async operations need the same trigger
  • Implementing async coordination patterns

📦 Sequence - Async Queue (FIFO)

Sequences are FIFO queues for single-consumer scenarios. Values are consumed in order, with backpressure support.

const taskQueue = new Sequence<Task>();

// Producer adds tasks
taskQueue.emit(task1);
taskQueue.emit(task2);
taskQueue.emit(task3);

// Single consumer processes in order
for await (const task of taskQueue) {
  await processTask(task); // task1, then task2, then task3
}

Use Sequence when:

  • You need ordered processing (FIFO)
  • Only one consumer should handle each value
  • You want backpressure control with reserve()

Key Differences

EventSignalSequence
ConsumersMultiple persistent listenersMultiple one-time receiversSingle consumer
DeliveryAll listeners calledAll waiting get same valueEach value consumed once
PatternPub/SubBroadcast coordinationQueue/Stream
PersistenceListeners stay registeredResolves once per receive()Values queued until consumed

Motivation

Traditional event handling in JavaScript/TypeScript has limitations:

  • String-based event names lack type safety
  • No built-in async coordination primitives
  • Missing functional transformations for event streams
  • Complex patterns require extensive boilerplate

Evnty solves these problems by providing:

  • Type-safe events with full TypeScript inference
  • Three specialized primitives for different async patterns
  • Async iterator transformations via AsyncIteratorObject (map, filter, reduce, expand, etc.)
  • Composable abstractions that work together seamlessly

Features

  • Async-First Design: Built from the ground up for asynchronous event handling with full Promise support
  • Iterator Transformations: AsyncIteratorObject provides map, filter, reduce, take, drop, flatMap, and expand operators
  • Type-Safe: Full TypeScript support with strong typing and inference throughout the event pipeline
  • Async Iteration: Events, Signals, and Sequences can be consumed as async iterables using for-await-of loops
  • Event Composition: Merge multiple event streams into unified events
  • Zero Dependencies: Lightweight with no external dependencies for optimal bundle size
  • Universal: Works seamlessly in both browser and Node.js environments, including service workers

Platform Support

NodeJSChromeFirefoxSafariOperaEdge
Latest ✔Latest ✔Latest ✔Latest ✔Latest ✔Latest ✔

Installing

Using pnpm:

pnpm add evnty

Using yarn:

yarn add evnty

Using npm:

npm install evnty

Examples

Event - Multi-Listener Pattern

import { createEvent } from 'evnty';

// Create a typed event
const userEvent = createEvent<{ id: number, name: string }>();

// Multiple listeners
userEvent.on(user => console.log('Logger:', user));
userEvent.on(user => updateUI(user));
userEvent.on(user => saveToCache(user));

// Emit - all listeners are called
userEvent.emit({ id: 1, name: 'Alice' });

// One-time listener
userEvent.once(user => console.log('First user only:', user));

// Async iteration
for await (const user of userEvent) {
  console.log('User event:', user);
}

Signal - Async Coordination

import { Signal } from 'evnty';

// Coordinate multiple async operations
const dataSignal = new Signal<Buffer>();

// Multiple operations wait for the same data
async function processA() {
  const data = await dataSignal.receive();
  // Process data in way A
}

async function processB() {
  const data = await dataSignal.receive();
  // Process data in way B
}

// Start both processors
Promise.all([processA(), processB()]);

// Both receive the same data when it arrives
dataSignal.emit(Buffer.from('shared data'));

Sequence - Task Queue

import { Sequence } from 'evnty';

// Create a task queue
const taskQueue = new Sequence<() => Promise<void>>();

// Single consumer processes tasks in order
(async () => {
  for await (const task of taskQueue) {
    await task();
    console.log('Task completed');
  }
})();

// Multiple producers add tasks
taskQueue.emit(async () => fetchData());
taskQueue.emit(async () => processData());
taskQueue.emit(async () => saveResults());

// Backpressure control
await taskQueue.reserve(10); // Wait until queue has ≤10 items
taskQueue.emit(async () => nonUrgentTask());

Combining Primitives

// Event + Signal for request/response pattern
const requestEvent = createEvent<Request>();
const responseSignal = new Signal<Response>();

requestEvent.on(async (req) => {
  const response = await handleRequest(req);
  responseSignal.emit(response);
});

// Event + Sequence for buffered processing
const dataEvent = createEvent<Data>();
const processQueue = new Sequence<Data>();

dataEvent.on(data => processQueue.emit(data));

// Process with controlled concurrency
for await (const data of processQueue) {
  await processWithRateLimit(data);
}

API

ConsumerHandle

A handle representing a consumer's position in a Broadcast. Returned by Broadcast.join() and used to consume values. Implements Disposable for automatic cleanup via using keyword.

  • @example
const broadcast = new Broadcast<number>();
using handle = broadcast.join();
broadcast.emit(42);
const value = broadcast.consume(handle); // 42

ConsumerHandle.cursor: number

The current position of this consumer in the buffer.

ConsumerHandle[Symbol.dispose](): void

Leaves the broadcast, releasing this consumer's position.

Broadcast

A multi-consumer FIFO queue where each consumer maintains its own read position. Values are buffered and each consumer can read them independently at their own pace. The buffer automatically compacts when all consumers have read past a position.

Key characteristics:

  • Multiple consumers - each gets their own cursor position
  • Buffered delivery - values are stored until all consumers read them
  • Late joiners only see values emitted after joining
  • Automatic cleanup via FinalizationRegistry when handles are garbage collected

Differs from:

  • Event: Broadcast buffers values, Event does not

  • Sequence: Broadcast supports multiple consumers, Sequence is single-consumer

  • Signal: Broadcast buffers values, Signal only notifies current waiters

  • @template T - The type of values in the broadcast

  • @example

const broadcast = new Broadcast<number>();

const handle1 = broadcast.join();
const handle2 = broadcast.join();

broadcast.emit(1);
broadcast.emit(2);

broadcast.consume(handle1); // 1
broadcast.consume(handle2); // 1
broadcast.consume(handle1); // 2

Broadcast.sink: Fn

Returns a bound emit function for use as a callback.

Broadcast.handleEvent(event: T): void

DOM EventListener interface compatibility.

Broadcast.size: number

The number of active consumers.

Broadcast.emit(value: T): boolean

Emits a value to all consumers. The value is buffered for consumption.

  • @param value - The value to emit.
  • @returns true if the value was emitted.

Broadcast.receive(): Promise

Waits for the next emitted value without joining as a consumer. Does not buffer - only receives values emitted after calling.

  • @returns A promise that resolves with the next emitted value.

Broadcast.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise

Broadcast.catch(onrejected?: Fn | null): Promise

Broadcast.finally(onfinally?: Action | null): Promise

Broadcast.join(): ConsumerHandle

Joins the broadcast as a consumer. Returns a handle used to consume values. The consumer starts at the current buffer position and will only see values emitted after joining.

  • @example
const handle = broadcast.join();
// Use handle with consume(), readable(), leave()

Broadcast.getCursor(handle: ConsumerHandle): number

Gets the current cursor position for a consumer handle.

  • @param handle - The consumer handle.
  • @returns The cursor position.
  • @throws If the handle is invalid (already left or never joined).

Broadcast.leave(handle: ConsumerHandle): void

Removes a consumer from the broadcast. The handle becomes invalid after this call. Idempotent - calling multiple times has no effect.

  • @param handle - The consumer handle to remove.

Broadcast.consume(handle: ConsumerHandle): T

Consumes and returns the next value for a consumer. Advances the consumer's cursor position.

  • @param handle - The consumer handle.

  • @throws If no value is available or the handle is invalid.

  • @example

if (broadcast.readable(handle)) {
  const value = broadcast.consume(handle);
}

Broadcast.tryConsume(handle: ConsumerHandle): IteratorResult

Attempts to consume the next value for a consumer. Returns { done: true } when no value is currently available.

  • @param handle - The consumer handle.
  • @returns The next value, or { done: true } when nothing is available.
  • @throws If the handle is invalid.

Broadcast.readable(handle: ConsumerHandle): boolean

Checks if there are values available for a consumer to read.

  • @param handle - The consumer handle.
  • @returns true if there are unread values, false otherwise.

Broadcast[Symbol.asyncIterator](): AsyncIterator

Broadcast.dispose(): void

Broadcast[Symbol.dispose](): void

DispatchResult

Wraps an array of values or promises (typically listener results) and provides batch resolution.

  • @template T

DispatchResult.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseLike

DispatchResult.all(): T[] | Promise

Resolves all listener results, rejecting if any promise rejects or any ResultError exists.

DispatchResult.settled(): PromiseSettledResult[] | Promise

Waits for all listener results to settle, regardless of fulfillment or rejection.

Event

Multi-listener event emitter with async support. All registered listeners are called for each emission, and their return values are collected in a DispatchResult. Supports async iteration and an onDispose callback for cleanup.

Differs from:

  • Signal: Event has persistent listeners; Signal is promise-based (receive per round)

  • Sequence: Event broadcasts to all listeners; Sequence is a single-consumer queue

  • @template T - The type of value emitted to listeners (event payload)

  • @template R - The return type of listener functions

Event.sink: Fn

Returns a bound emit function for use as a callback. Useful for passing to other APIs that expect a function.

  • @example
const event = new Event<string>();
someApi.onMessage(event.sink);

Event.handleEvent(event: T): void

DOM EventListener interface compatibility. Allows the event to be used directly with addEventListener.

Event.size: number

The number of listeners for the event.

Event.emit(value: T): DispatchResult

Emits a value to all registered listeners. Each listener is called with the value and their return values are collected.

  • @param value - The value to emit to all listeners.

  • @returns A DispatchResult containing all listener return values.

  • @example

const event = new Event<string, number>();
event.on(str => str.length);
const result = event.emit('hello');
await result.all(); // [5]

Event.lacks(listener: Listener): boolean

Checks if the given listener is NOT registered for this event.

  • @param listener - The listener function to check against the registered listeners.

  • @returns true if the listener is not already registered; otherwise, false.

  • @example

// Check if a listener is not already added
if (event.lacks(myListener)) {
  event.on(myListener);
}

Event.has(listener: Listener): boolean

Checks if the given listener is registered for this event.

  • @param listener - The listener function to check.

  • @returns true if the listener is currently registered; otherwise, false.

  • @example

// Verify if a listener is registered
if (event.has(myListener)) {
  console.log('Listener is already registered');
}

Event.off(listener: Listener): this

Removes a specific listener from this event.

  • @param listener - The listener to remove.

  • @returns The event instance for chaining.

  • @example

// Remove a listener
event.off(myListener);

Event.on(listener: Listener): Unsubscribe

Registers a listener that is called on every emission.

  • @param listener - The function to call when the event occurs.

  • @returns A function that removes this listener when called.

  • @example

const unsubscribe = event.on((data) => {
  console.log('Event data:', data);
});

Event.once(listener: Listener): Unsubscribe

Registers a listener that is called only once on the next emission, then auto-removed.

  • @param listener - The listener to trigger once.

  • @returns A function that removes this listener when called (if it hasn't fired yet).

  • @example

const cancel = event.once((data) => {
  console.log('Received data once:', data);
});

Event.clear(): this

Removes all listeners from the event. Does not dispose the event - new listeners can still be added after clearing.

  • @returns The event instance for chaining.

Event.receive(): Promise

Waits for the next emission and returns the emitted value.

  • @returns A promise that resolves with the next emitted value.

Event.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise

Event.catch(onrejected?: Fn | null): Promise

Event.finally(onfinally?: Action | null): Promise

Event.settle(): Promise

Waits for the next emission via receive() and wraps the outcome in a PromiseSettledResult - always resolves, never rejects.

  • @returns A promise that resolves with the settled result.

  • @example

const result = await event.settle();
if (result.status === 'fulfilled') {
  console.log('Value:', result.value);
} else {
  console.error('Reason:', result.reason);
}

Event[Symbol.asyncIterator](): AsyncIterator

Event.dispose(): void

Event[Symbol.dispose](): void

merge(...events: Events): Event

Merges multiple events into a single event that triggers whenever any source triggers. Disposing the merged event unsubscribes from all sources.

  • @param events - The events to merge.

  • @returns A new Event that forwards emissions from all sources.

  • @example

const mouseEvent = createEvent<MouseEvent>();
const keyboardEvent = createEvent<KeyboardEvent>();
const inputEvent = merge(mouseEvent, keyboardEvent);
inputEvent.on(event => console.log('Input event:', event));

createInterval(interval: number): Event

Creates a periodic event that emits an incrementing counter (starting from 0) at a fixed interval. Disposing the event clears the interval.

  • @param interval - The interval in milliseconds.

  • @returns An Event that triggers at the specified interval.

  • @example

const tickEvent = createInterval(1000);
tickEvent.on(tickNumber => console.log('Tick:', tickNumber));

createEvent(): Event

Creates a new Event instance for multi-listener event handling.

  • @example
const messageEvent = createEvent<string>();
messageEvent.on(msg => console.log('Received:', msg));
messageEvent.emit('Hello'); // All listeners receive 'Hello'

// Listeners can return values, collected via DispatchResult
const validateEvent = createEvent<string, boolean>();
validateEvent.on(str => str.length > 0);
validateEvent.on(str => str.length < 100);
const results = await validateEvent.emit('test').all(); // [true, true]

AsyncIteratorObject

A wrapper class providing functional operations on async iterables. Enables lazy evaluation and chainable transformations on async data streams.

Key characteristics:

  • Lazy evaluation - operations are not executed until iteration begins

  • Chainable - all transformation methods return new AsyncIteratorObject instances

  • Supports both sync and async transformation functions

  • Memory efficient - processes values one at a time

  • Operation fusion - chains execute in optimized passes

  • @template T The type of values yielded by the iterator

  • @template TReturn The return type of the iterator

  • @template TNext The type of value that can be passed to next()

  • @example

// Create from an async generator
async function* numbers() {
  yield 1; yield 2; yield 3;
}

const iterator = new AsyncIteratorObject(numbers())
  .map(x => x 2)
  .filter(x => x > 2);

for await (const value of iterator) {
  console.log(value); // 4, 6
}

AsyncIteratorObject.from(iterable: Iterable): AsyncIteratorObject

A wrapper class providing functional operations on async iterables. Enables lazy evaluation and chainable transformations on async data streams.

Key characteristics:

  • Lazy evaluation - operations are not executed until iteration begins

  • Chainable - all transformation methods return new AsyncIteratorObject instances

  • Supports both sync and async transformation functions

  • Memory efficient - processes values one at a time

  • Operation fusion - chains execute in optimized passes

  • @template T The type of values yielded by the iterator

  • @template TReturn The return type of the iterator

  • @template TNext The type of value that can be passed to next()

  • @example

// Create from an async generator
async function* numbers() {
  yield 1; yield 2; yield 3;
}

const iterator = new AsyncIteratorObject(numbers())
  .map(x => x 2)
  .filter(x => x > 2);

for await (const value of iterator) {
  console.log(value); // 4, 6
}

Creates an AsyncIteratorObject from a synchronous iterable. Converts the sync iterable to async for uniform handling.

  • @param iterable A synchronous iterable to convert

  • @returns A new AsyncIteratorObject wrapping the converted iterable

  • @example

const syncArray = [1, 2, 3, 4, 5];
const asyncIterator = AsyncIteratorObject.from(syncArray);

for await (const value of asyncIterator) {
  console.log(value); // 1, 2, 3, 4, 5
}

AsyncIteratorObject.merge(...iterables: AsyncIterable[]): AsyncIteratorObject

Merges multiple async iterables into a single stream. Values from all sources are interleaved as they become available. The merged iterator completes when all source iterators complete.

  • @param iterables The async iterables to merge

  • @returns A new AsyncIteratorObject yielding values from all sources

  • @example

async function* source1() { yield 1; yield 3; }
async function* source2() { yield 2; yield 4; }

const merged = AsyncIteratorObject.merge(source1(), source2());

for await (const value of merged) {
  console.log(value); // Order depends on timing: 1, 2, 3, 4 or similar
}

AsyncIteratorObject.pipe(generatorFactory, signal?: AbortSignal): AsyncIteratorObject

Escape hatch for custom transformations not covered by the built-in operators. Materializes the fused operation chain, then applies a generator function to each value.

  • @param generatorFactory A function that returns a generator function for transforming values
  • @param signal Optional AbortSignal to cancel the operation
  • @returns A new AsyncIteratorObject with transformed values

AsyncIteratorObject.awaited(): AsyncIteratorObject

Resolves promise-like values from the source iterator. Useful for normalizing values before applying type-guard predicates.

  • @returns A new AsyncIteratorObject yielding awaited values

AsyncIteratorObject.map(callbackfn): AsyncIteratorObject

Transforms each value using a mapping function. The callback can be synchronous or return a promise.

  • @param callbackfn Function to transform each value

  • @returns A new AsyncIteratorObject yielding transformed values

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3]);
const doubled = numbers.map(x => x 2);

for await (const value of doubled) {
  console.log(value); // 2, 4, 6
}

AsyncIteratorObject.filter(predicate): AsyncIteratorObject

AsyncIteratorObject.filter(predicate): AsyncIteratorObject

AsyncIteratorObject.filter(predicate): AsyncIteratorObject

Filters values based on a predicate function. Only values for which the predicate returns truthy are yielded. Supports type guard predicates for type narrowing.

  • @param predicate Function to test each value

  • @returns A new AsyncIteratorObject yielding only values that pass the test

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const evens = numbers.filter(x => x % 2 === 0);

for await (const value of evens) {
  console.log(value); // 2, 4
}

AsyncIteratorObject.filterMap(callbackfn): AsyncIteratorObject

Combined filter and map operation. Returns undefined to skip a value. The callback result is awaited to check for undefined.

  • @param callbackfn Function that returns a transformed value or undefined to skip

  • @returns A new AsyncIteratorObject yielding non-undefined transformed values

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const doubledEvens = numbers.filterMap(x => x % 2 === 0 ? x 2 : undefined);

for await (const value of doubledEvens) {
  console.log(value); // 4, 8
}

AsyncIteratorObject.inspect(callbackfn): AsyncIteratorObject

Executes a side-effect function for each value without modifying the stream. Useful for debugging or logging. The callback is awaited for proper sequencing.

  • @param callbackfn Function to execute for each value

  • @returns A new AsyncIteratorObject yielding the same values

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3]);
const logged = numbers.inspect(x => console.log('value:', x)).map(x => x 2);

AsyncIteratorObject.enumerate(start: number = 0): AsyncIteratorObject

Wraps each value with its index as a tuple. Useful after filtering when original indices are lost.

  • @param start Starting index (default: 0)

  • @returns A new AsyncIteratorObject yielding [index, value] tuples

  • @example

const letters = AsyncIteratorObject.from(['a', 'b', 'c']);
const enumerated = letters.enumerate();

for await (const [i, v] of enumerated) {
  console.log(i, v); // 0 'a', 1 'b', 2 'c'
}

AsyncIteratorObject.take(limit: number): AsyncIteratorObject

Creates an iterator whose values are the values from this iterator, stopping once the provided limit is reached.

  • @param limit The maximum number of values to yield.

AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject

AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject

AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject

Takes values while the predicate returns truthy. Stops immediately when predicate returns falsy. Supports type guard predicates for type narrowing.

  • @param predicate Function to test each value

  • @returns A new AsyncIteratorObject yielding values until predicate fails

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const small = numbers.takeWhile(x => x < 4);

for await (const value of small) {
  console.log(value); // 1, 2, 3
}

AsyncIteratorObject.drop(count: number): AsyncIteratorObject

Creates an iterator whose values are the values from this iterator after skipping the provided count.

  • @param count The number of values to drop.

AsyncIteratorObject.dropWhile(predicate): AsyncIteratorObject

Skips values while the predicate returns truthy. Yields all remaining values once predicate returns falsy.

  • @param predicate Function to test each value

  • @returns A new AsyncIteratorObject skipping values until predicate fails

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const afterSmall = numbers.dropWhile(x => x < 3);

for await (const value of afterSmall) {
  console.log(value); // 3, 4, 5
}

AsyncIteratorObject.flatMap(callback): AsyncIteratorObject

Creates an iterator whose values are the result of applying the callback to the values from this iterator and then flattening the resulting iterators or iterables.

  • @param callback A function that accepts up to two arguments to be used to transform values from the underlying iterator into new iterators or iterables to be flattened into the result.

AsyncIteratorObject.reduce(callbackfn): AsyncIteratorObject

AsyncIteratorObject.reduce(callbackfn, initialValue: R): AsyncIteratorObject

AsyncIteratorObject.reduce(callbackfn, ...args: unknown[]): AsyncIteratorObject

Creates an iterator of accumulated values by applying a reducer function. Unlike Array.reduce, this returns an iterator that yields each intermediate accumulated value, not just the final result. This allows observing the accumulation process.

  • @param callbackfn Reducer function to accumulate values

  • @param initialValue Optional initial value for the accumulation

  • @returns A new AsyncIteratorObject yielding accumulated values at each step

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3, 4]);
const sums = numbers.reduce((sum, x) => sum + x, 0);

for await (const value of sums) {
  console.log(value); // 1, 3, 6, 10 (running totals)
}

AsyncIteratorObject.expand(callbackfn): AsyncIteratorObject

Transforms each value into multiple values using an expander function. Each input value is expanded into zero or more output values. Like flatMap but takes sync Iterables (or Promises of Iterables) instead of AsyncIterables.

  • @param callbackfn Function that returns an iterable of values for each input

  • @returns A new AsyncIteratorObject yielding all expanded values

  • @example

const numbers = AsyncIteratorObject.from([1, 2, 3]);
const expanded = numbers.expand(x => [x, x 10]);

for await (const value of expanded) {
  console.log(value); // 1, 10, 2, 20, 3, 30
}

AsyncIteratorObject[Symbol.asyncIterator](): AsyncIterator

Sequence

A sequence is a FIFO (First-In-First-Out) queue for async consumption. Designed for single consumer with multiple producers pattern. Values are queued and consumed in order, with backpressure support. Respects an optional AbortSignal: emit() returns false when aborted; waits reject.

Key characteristics:

  • Single consumer - values are consumed once, in order

  • Multiple producers can push values concurrently

  • FIFO ordering - first value in is first value out

  • Backpressure control via reserve() method

  • Async iteration support for continuous consumption

  • @template T The type of values in the sequence.

  • @example

// Create a sequence for processing tasks
const tasks = new Sequence<string>();

// Producer: Add tasks to the queue
tasks.emit('task1');
tasks.emit('task2');
tasks.emit('task3');

// Consumer: Process tasks in order
const task1 = await tasks.receive(); // 'task1'
const task2 = await tasks.receive(); // 'task2'
const task3 = await tasks.receive(); // 'task3'

Sequence.merge(target: Sequence, ...sequences: Sequence[]): void

Merges multiple source sequences into a target sequence. Values from all sources are forwarded to the target sequence. Each source is consumed independently and concurrently.

  • @param target The sequence that will receive values from all sources

  • @param sequences The source sequences to merge from

  • @example

// Create target and source sequences
const target = new Sequence<number>();
const source1 = new Sequence<number>();
const source2 = new Sequence<number>();

// Merge sources into target
Sequence.merge(target, source1, source2);

// Values from both sources appear in target
source1.emit(1);
source2.emit(2);
source1.emit(3);

// Consumer gets values as they arrive
await target.receive(); // Could be 1, 2, or 3 depending on timing

Sequence.size: number

Returns the number of values currently queued.

  • @returns The current queue size

Sequence.reserve(capacity: number): Promise

Waits until the queue size drops to or below the specified capacity. Useful for implementing backpressure - producers can wait before adding more items.

  • @param capacity The maximum queue size to wait for

  • @returns A promise that resolves when the queue size is at or below capacity

  • @example

// Producer with backpressure control
const sequence = new Sequence<string>();

// Wait if queue has more than 10 items
await sequence.reserve(10);
sequence.emit('new item'); // Safe to add, queue has space

Sequence.emit(value: T): boolean

Pushes a value onto the queue. Wakes any pending receive() waiter.

  • @param value - The value to enqueue.
  • @returns true if the sequence is still active.

Sequence.receive(): Promise

Consumes and returns the next value from the queue. If the queue is empty, waits for a value to be added. Values are consumed in FIFO order. If the sequence has been aborted or disposed, this method rejects with Error('Disposed').

  • @returns A promise that resolves with the next value

  • @example

const sequence = new Sequence<number>();

// Consumer waits for values
const valuePromise = sequence.receive();

// Producer adds value
sequence.emit(42);

// Consumer receives it
const value = await valuePromise; // 42

Sequence.dispose(): void

Disposes of the sequence, rejecting any pending receive() waiters. Called by [Symbol.dispose]() (inherited from Async) when using the using declaration.

Signal

Promise-based async coordination primitive. emit() resolves the pending receive() promise (shared across callers). Reusable - after each emission a new round of receive() calls can be made. Disposable via [Symbol.dispose]() or an optional AbortSignal.

  • @template T The type of value that this signal carries.

  • @example

const signal = new Signal<string>();

const promise = signal.receive();
signal.emit('hello');
await promise; // 'hello'

Signal.merge(target: Signal, ...signals: Signal[]): void

Merges multiple source signals into a target signal. Values from any source signal are emitted to the target signal. The merge continues until the target signal is disposed.

Note: When the target is disposed, iteration stops after the next value from each source. For immediate cleanup, dispose source signals directly.

  • @param target The signal that will receive values from all sources

  • @param signals The source signals to merge from

  • @example

// Create a target signal and source signals
const target = new Signal<string>();
const source1 = new Signal<string>();
const source2 = new Signal<string>();

// Merge sources into target
Signal.merge(target, source1, source2);

// Values from any source appear in target
const promise = target.receive();
source1.emit('Hello');
const value = await promise; // 'Hello'

Signal.emit(value: T): boolean

Sends a value to the waiting receiver, if any.

  • @param value - The value to send.
  • @returns true if the value was emitted.

Signal.receive(): Promise

Waits for the next value to be sent to this signal. If the signal has been aborted or disposed, this method rejects with Error('Disposed').

  • @returns A promise that resolves with the next value sent to the signal.

  • @example

const signal = new Signal<string>();

// Wait for a value
const valuePromise = signal.receive();

// Send a value from elsewhere
signal.emit('Hello');

const value = await valuePromise; // 'Hello'

Signal.dispose(): void

abortableIterable(iterable: AsyncIterable, signal: AbortSignal): AsyncIterable

Wraps an async iterable with abort signal support. Each iteration creates a fresh iterator with scoped abort handling. Listener is added at iteration start and removed on completion/abort/return.

  • @template T - The yielded value type

  • @template TReturn - The return value type

  • @template TNext - The type passed to next()

  • @param iterable - The source async iterable to wrap

  • @param signal - AbortSignal to cancel iteration

  • @returns An async iterable with abort support

  • @example

const controller = new AbortController();
const source = async function*() { yield 1; yield 2; yield 3; };

for await (const value of abortableIterable(source(), controller.signal)) {
  console.log(value);
  if (value === 2) controller.abort();
}

iterate(startOrCount?: number, countWhenTwoArgs?: number, step: number = 1): Iterable

Creates an iterable sequence of numbers with flexible parameters. Can generate infinite sequences, finite sequences, or sequences with custom start and step values.

  • @param args Variable arguments to configure the sequence:

    • No args: Infinite sequence starting at 0 with step 1
    • 1 arg (count): Sequence from 0 to count-1
    • 2 args (start, count): Sequence starting at 'start' for 'count' iterations
    • 3 args (start, count, step): Custom start, count, and step value
  • @returns An iterable that generates numbers according to the parameters

  • @example

// Infinite sequence: 0, 1, 2, 3, ...
for (const n of iterate()) { }

// Count only: 0, 1, 2, 3, 4
for (const n of iterate(5)) { }

// Start and count: 10, 11, 12, 13, 14
for (const n of iterate(10, 5)) { }

// Start, count, and step: 0, 2, 4, 6, 8
for (const n of iterate(0, 5, 2)) { }

toAsyncIterable(iterable: Iterable): AsyncIterable

Converts a synchronous iterable to an asynchronous iterable. Wraps the sync iterator methods to return promises, enabling uniform async handling.

  • @template T The type of values yielded by the iterator

  • @template TReturn The return type of the iterator

  • @template TNext The type of value that can be passed to next()

  • @param iterable A synchronous iterable to convert

  • @returns An async iterable that yields the same values as the input

  • @example

const syncArray = [1, 2, 3, 4, 5];
const asyncIterable = toAsyncIterable(syncArray);

for await (const value of asyncIterable) {
  console.log(value); // 1, 2, 3, 4, 5
}

License

License The MIT License Copyright (c) 2025 Ivan Zakharchanka

Last Updated: 3/5/26, 1:33 AM
Contributors: Ivan Zakharchanka, 3axap4eHko