Evnty
Async-first, reactive event handling library for complex event flows with three powerful primitives: Event (multi-listener broadcast), Signal (promise-like coordination), and Sequence (async queue). Built for both browser and Node.js with full TypeScript support.
Table of Contents
- Core Concepts
- Motivation
- Features
- Platform Support
- Installing
- API
ConsumerHandleBroadcastBroadcast.disposed: booleanBroadcast.sink: FnBroadcast.handleEvent(event: T): voidBroadcast.size: numberBroadcast.emit(value: T): booleanBroadcast.receive(): PromiseBroadcast.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseBroadcast.catch(onrejected?: Fn | null): PromiseBroadcast.finally(onfinally?: Action | null): PromiseBroadcast.join(): ConsumerHandleBroadcast.getCursor(handle: ConsumerHandle): numberBroadcast.leave(handle: ConsumerHandle): voidBroadcast.consume(handle: ConsumerHandle): TBroadcast.readable(handle: ConsumerHandle): booleanBroadcast[Symbol.asyncIterator](): AsyncIteratorBroadcast.dispose(): voidBroadcast[Symbol.dispose](): void
DispatchResultEventEvent.disposed: booleanEvent.sink: FnEvent.handleEvent(event: T): voidEvent.size: numberEvent.emit(value: T): DispatchResultEvent.lacks(listener: Listener): booleanEvent.has(listener: Listener): booleanEvent.off(listener: Listener): thisEvent.on(listener: Listener): UnsubscribeEvent.once(listener: Listener): UnsubscribeEvent.clear(): thisEvent.receive(): PromiseEvent.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseEvent.catch(onrejected?: Fn | null): PromiseEvent.finally(onfinally?: Action | null): PromiseEvent.settle(): PromiseEvent[Symbol.asyncIterator](): AsyncIteratorEvent.dispose(): voidEvent[Symbol.dispose](): void
merge(...events: Events): EventcreateInterval(interval: number): EventcreateEvent(): EventAsyncIteratorObjectAsyncIteratorObject.from(iterable: Iterable): AsyncIteratorObjectAsyncIteratorObject.merge(...iterables: AsyncIterable[]): AsyncIteratorObjectAsyncIteratorObject.pipe(generatorFactory, signal?: AbortSignal): AsyncIteratorObjectAsyncIteratorObject.awaited(): AsyncIteratorObjectAsyncIteratorObject.map(callbackfn): AsyncIteratorObjectAsyncIteratorObject.filter(predicate): AsyncIteratorObjectAsyncIteratorObject.filter(predicate): AsyncIteratorObjectAsyncIteratorObject.filter(predicate): AsyncIteratorObjectAsyncIteratorObject.take(limit: number): AsyncIteratorObjectAsyncIteratorObject.drop(count: number): AsyncIteratorObjectAsyncIteratorObject.flatMap(callback): AsyncIteratorObjectAsyncIteratorObject.reduce(callbackfn): AsyncIteratorObjectAsyncIteratorObject.reduce(callbackfn, initialValue: R): AsyncIteratorObjectAsyncIteratorObject.reduce(callbackfn, ...args: unknown[]): AsyncIteratorObjectAsyncIteratorObject.expand(callbackfn): AsyncIteratorObjectAsyncIteratorObject[Symbol.asyncIterator]()
SequenceSignalabortableIterable(iterable: AsyncIterable, signal: AbortSignal): AsyncIterableiterate(startOrCount?: number, countWhenTwoArgs?: number, step: number = 1): IterabletoAsyncIterable(iterable: Iterable): AsyncIterable
- Examples
- License
Core Concepts
Evnty provides three complementary async primitives, each designed for specific patterns:
🔊 Event - Multi-Listener Broadcasting
Events allow multiple listeners to react to values. All registered listeners are called for each emission.
const clickEvent = createEvent<{ x: number, y: number }>();
// Multiple listeners can subscribe
clickEvent.on(({ x, y }) => console.log(`Click at ${x},${y}`));
clickEvent.on(({ x, y }) => updateUI(x, y));
// All listeners receive the value
clickEvent({ x: 100, y: 200 });
Use Event when:
- Multiple components need to react to the same occurrence
- You need pub/sub or observer pattern
- Listeners should persist across multiple emissions
📡 Signal - Promise-Based Coordination
Signals are for coordinating async operations. When a value is sent, ALL waiting consumers receive it (broadcast).
const signal = new Signal<string>();
// Multiple consumers can wait
const promise1 = signal.next();
const promise2 = signal.next();
// Send value - all waiting consumers receive it
signal('data');
const [result1, result2] = await Promise.all([promise1, promise2]);
// result1 === 'data' && result2 === 'data'
Use Signal when:
- You need one-time notifications
- Multiple async operations need the same trigger
- Implementing async coordination patterns
📦 Sequence - Async Queue (FIFO)
Sequences are FIFO queues for single-consumer scenarios. Values are consumed in order, with backpressure support.
const taskQueue = new Sequence<Task>();
// Producer adds tasks
taskQueue(task1);
taskQueue(task2);
taskQueue(task3);
// Single consumer processes in order
for await (const task of taskQueue) {
await processTask(task); // task1, then task2, then task3
}
Use Sequence when:
- You need ordered processing (FIFO)
- Only one consumer should handle each value
- You want backpressure control with
reserve()
Key Differences
| Event | Signal | Sequence | |
|---|---|---|---|
| Consumers | Multiple persistent listeners | Multiple one-time receivers | Single consumer |
| Delivery | All listeners called | All waiting get same value | Each value consumed once |
| Pattern | Pub/Sub | Broadcast coordination | Queue/Stream |
| Persistence | Listeners stay registered | Resolves once per next() | Values queued until consumed |
Motivation
Traditional event handling in JavaScript/TypeScript has limitations:
- String-based event names lack type safety
- No built-in async coordination primitives
- Missing functional transformations for event streams
- Complex patterns require extensive boilerplate
Evnty solves these problems by providing:
- Type-safe events with full TypeScript inference
- Three specialized primitives for different async patterns
- Async iterator transformations via
AsyncIteratorObject(map, filter, reduce, expand, etc.) - Composable abstractions that work together seamlessly
Features
- Async-First Design: Built from the ground up for asynchronous event handling with full Promise support
- Iterator Transformations:
AsyncIteratorObjectprovides map, filter, reduce, take, drop, flatMap, and expand operators - Type-Safe: Full TypeScript support with strong typing and inference throughout the event pipeline
- Async Iteration: Events, Signals, and Sequences can be consumed as async iterables using for-await-of loops
- Event Composition: Merge multiple event streams into unified events
- Zero Dependencies: Lightweight with no external dependencies for optimal bundle size
- Universal: Works seamlessly in both browser and Node.js environments, including service workers
Platform Support
![]() | ![]() | ![]() | ![]() | ![]() | ![]() |
|---|---|---|---|---|---|
| Latest ✔ | Latest ✔ | Latest ✔ | Latest ✔ | Latest ✔ | Latest ✔ |
Installing
Using pnpm:
pnpm add evnty
Using yarn:
yarn add evnty
Using npm:
npm install evnty
Examples
Event - Multi-Listener Pattern
import { createEvent } from 'evnty';
// Create a typed event
const userEvent = createEvent<{ id: number, name: string }>();
// Multiple listeners
userEvent.on(user => console.log('Logger:', user));
userEvent.on(user => updateUI(user));
userEvent.on(user => saveToCache(user));
// Emit - all listeners are called
userEvent({ id: 1, name: 'Alice' });
// One-time listener
userEvent.once(user => console.log('First user only:', user));
// Async iteration
for await (const user of userEvent) {
console.log('User event:', user);
}
Signal - Async Coordination
import { Signal } from 'evnty';
// Coordinate multiple async operations
const dataSignal = new Signal<Buffer>();
// Multiple operations wait for the same data
async function processA() {
const data = await dataSignal.next();
// Process data in way A
}
async function processB() {
const data = await dataSignal.next();
// Process data in way B
}
// Start both processors
Promise.all([processA(), processB()]);
// Both receive the same data when it arrives
dataSignal(Buffer.from('shared data'));
Sequence - Task Queue
import { Sequence } from 'evnty';
// Create a task queue
const taskQueue = new Sequence<() => Promise<void>>();
// Single consumer processes tasks in order
(async () => {
for await (const task of taskQueue) {
await task();
console.log('Task completed');
}
})();
// Multiple producers add tasks
taskQueue(async () => fetchData());
taskQueue(async () => processData());
taskQueue(async () => saveResults());
// Backpressure control
await taskQueue.reserve(10); // Wait until queue has ≤10 items
taskQueue(async () => nonUrgentTask());
Combining Primitives
// Event + Signal for request/response pattern
const requestEvent = createEvent<Request>();
const responseSignal = new Signal<Response>();
requestEvent.on(async (req) => {
const response = await handleRequest(req);
responseSignal(response);
});
// Event + Sequence for buffered processing
const dataEvent = createEvent<Data>();
const processQueue = new Sequence<Data>();
dataEvent.on(data => processQueue(data));
// Process with controlled concurrency
for await (const data of processQueue) {
await processWithRateLimit(data);
}
API
ConsumerHandle
A handle representing a consumer's position in a Broadcast. Returned by Broadcast.join() and used to consume values. Implements Disposable for automatic cleanup via using keyword.
const broadcast = new Broadcast<number>();
using handle = broadcast.join();
broadcast.emit(42);
const value = broadcast.consume(handle); // 42
ConsumerHandle.cursor: number
The current position of this consumer in the buffer.
ConsumerHandle[Symbol.dispose](): void
Leaves the broadcast, releasing this consumer's position.
Broadcast
A multi-consumer FIFO queue where each consumer maintains its own read position. Values are buffered and each consumer can read them independently at their own pace. The buffer automatically compacts when all consumers have read past a position.
Key characteristics:
- Multiple consumers - each gets their own cursor position
- Buffered delivery - values are stored until all consumers read them
- Late joiners only see values emitted after joining
- Automatic cleanup via FinalizationRegistry when handles are garbage collected
Differs from:
Event: Broadcast buffers values, Event does not
Sequence: Broadcast supports multiple consumers, Sequence is single-consumer
Signal: Broadcast buffers values, Signal only notifies current waiters
@template T - The type of values in the broadcast
const broadcast = new Broadcast<number>();
const handle1 = broadcast.join();
const handle2 = broadcast.join();
broadcast.emit(1);
broadcast.emit(2);
broadcast.consume(handle1); // 1
broadcast.consume(handle2); // 1
broadcast.consume(handle1); // 2
Broadcast.disposed: boolean
Checks if the broadcast has been disposed.
Broadcast.sink: Fn
Returns a bound emit function for use as a callback.
Broadcast.handleEvent(event: T): void
DOM EventListener interface compatibility.
Broadcast.size: number
The number of active consumers.
Broadcast.emit(value: T): boolean
Emits a value to all consumers. The value is buffered for consumption.
- @param value - The value to emit.
- @returns
{boolean}trueif there were waiters for the signal,falseotherwise.
Broadcast.receive(): Promise
Waits for the next emitted value without joining as a consumer. Does not buffer - only receives values emitted after calling.
- @returns
{Promise<T>}A promise that resolves with the next emitted value.
Broadcast.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise
Broadcast.catch(onrejected?: Fn | null): Promise
Broadcast.finally(onfinally?: Action | null): Promise
Broadcast.join(): ConsumerHandle
Joins the broadcast as a consumer. Returns a handle used to consume values. The consumer starts at the current buffer position and will only see values emitted after joining.
- @returns
{ConsumerHandle<T>}A handle for consuming values.
const handle = broadcast.join();
// Use handle with consume(), readable(), leave()
Broadcast.getCursor(handle: ConsumerHandle): number
Gets the current cursor position for a consumer handle.
- @param handle - The consumer handle.
- @returns
{number}The cursor position. - @throws
{Error}If the handle is invalid (already left or never joined).
Broadcast.leave(handle: ConsumerHandle): void
Removes a consumer from the broadcast. The handle becomes invalid after this call. Idempotent - calling multiple times has no effect.
- @param handle - The consumer handle to remove.
Broadcast.consume(handle: ConsumerHandle): T
Consumes and returns the next value for a consumer. Advances the consumer's cursor position.
- @param handle - The consumer handle.
- @returns
{T}The next value in the buffer for this consumer. - @throws
{Error}If the handle is invalid.
if (broadcast.readable(handle)) {
const value = broadcast.consume(handle);
}
Broadcast.readable(handle: ConsumerHandle): boolean
Checks if there are values available for a consumer to read.
- @param handle - The consumer handle.
- @returns
{boolean}trueif there are unread values,falseotherwise.
Broadcast[Symbol.asyncIterator](): AsyncIterator
Broadcast.dispose(): void
Broadcast[Symbol.dispose](): void
DispatchResult
Wraps an array of values or promises (typically listener results) and provides batch resolution.
- @template T
DispatchResult.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseLike
DispatchResult.all(): Promise
Resolves all listener results, rejecting if any promise rejects or any ResultError exists.
DispatchResult.settled(): Promise
Waits for all listener results to settle, regardless of fulfillment or rejection.
Event
A class representing a multi-listener event emitter with async support. Events allow multiple listeners to react to emitted values, with each listener potentially returning a result. All listeners are called for each emission.
Key characteristics:
- Multiple listeners - all are called for each emission
- Listeners can return values collected in EventResult
- Supports async listeners and async iteration
- Provides lifecycle hooks for listener management
- Memory efficient using RingBuffer for storage
Differs from:
Signal: Events have multiple persistent listeners vs Signal's one-time resolution per consumer
Sequence: Events broadcast to all listeners vs Sequence's single consumer queue
@template T - The type of value emitted to listeners (event payload)
@template R - The return type of listener functions
Event.disposed: boolean
Checks if the event has been disposed.
Event.sink: Fn
Returns a bound emit function for use as a callback. Useful for passing to other APIs that expect a function.
const event = new Event<string>();
someApi.onMessage(event.sink);
Event.handleEvent(event: T): void
DOM EventListener interface compatibility. Allows the event to be used directly with addEventListener.
Event.size: number
The number of listeners for the event.
- @readonly
- @type
{number}
Event.emit(value: T): DispatchResult
Emits a value to all registered listeners. Each listener is called with the value and their return values are collected.
- @param value - The value to emit to all listeners.
- @returns
{DispatchResult<void | R>}A result object containing all listener return values.
const event = new Event<string, number>();
event.on(str => str.length);
const result = event.emit('hello');
await result.all(); // [5]
Event.lacks(listener: Listener): boolean
Checks if the given listener is NOT registered for this event.
- @param listener - The listener function to check against the registered listeners.
- @returns
{boolean}trueif the listener is not already registered; otherwise,false.
// Check if a listener is not already added
if (event.lacks(myListener)) {
event.on(myListener);
}
Event.has(listener: Listener): boolean
Checks if the given listener is registered for this event.
- @param listener - The listener function to check.
- @returns
{boolean}trueif the listener is currently registered; otherwise,false.
// Verify if a listener is registered
if (event.has(myListener)) {
console.log('Listener is already registered');
}
Event.off(listener: Listener): this
Removes a specific listener from this event.
- @param listener - The listener to remove.
- @returns
{this}The event instance, allowing for method chaining.
// Remove a listener
event.off(myListener);
Event.on(listener: Listener): Unsubscribe
Registers a listener that gets triggered whenever the event is emitted. This is the primary method for adding event handlers that will react to the event being triggered.
- @param listener - The function to call when the event occurs.
- @returns
{Unsubscribe}An object that can be used to unsubscribe the listener, ensuring easy cleanup.
// Add a listener to an event
const unsubscribe = event.on((data) => {
console.log('Event data:', data);
});
Event.once(listener: Listener): Unsubscribe
Adds a listener that will be called only once the next time the event is emitted. This method is useful for one-time notifications or single-trigger scenarios.
- @param listener - The listener to trigger once.
- @returns
{Unsubscribe}An object that can be used to remove the listener if the event has not yet occurred.
// Register a one-time listener
const onceUnsubscribe = event.once((data) => {
console.log('Received data once:', data);
});
Event.clear(): this
Removes all listeners from the event, effectively resetting it. This is useful when you need to cleanly dispose of all event handlers to prevent memory leaks or unwanted triggers after certain conditions.
- @returns
{this}The instance of the event, allowing for method chaining.
const myEvent = new Event();
myEvent.on(data => console.log(data));
myEvent.clear(); // Clears all listeners
Event.receive(): Promise
Waits for the next event emission and returns the emitted value. This method allows the event to be used as a promise that resolves with the next emitted value.
- @returns
{Promise<T>}A promise that resolves with the next emitted event value.
Event.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise
Event.catch(onrejected?: Fn | null): Promise
Event.finally(onfinally?: Action | null): Promise
Event.settle(): Promise
Waits for the event to settle, returning a PromiseSettledResult. Resolves even when the next listener rejects.
@returns
{Promise<PromiseSettledResult<T>>}A promise that resolves with the settled result.@example
const result = await event.settle();
if (result.status === 'fulfilled') {
console.log('Event fulfilled with value:', result.value);
} else {
console.error('Event rejected with reason:', result.reason);
}
Event[Symbol.asyncIterator](): AsyncIterator
Event.dispose(): void
Event[Symbol.dispose](): void
merge(...events: Events): Event
Merges multiple events into a single event. This function takes any number of Event instances and returns a new Event that triggers whenever any of the input events trigger. The parameters and results of the merged event are derived from the input events, providing a flexible way to handle multiple sources of events in a unified manner.
- @template Events - An array of
Eventinstances. - @param events - A rest parameter that takes multiple events to be merged.
- @returns
{Event<AllEventsParameters<Events>, AllEventsResults<Events>>}Returns a newEventinstance that triggers with the parameters and results of any of the merged input events.
// Merging mouse and keyboard events into a single event
const mouseEvent = createEvent<MouseEvent>();
const keyboardEvent = createEvent<KeyboardEvent>();
const inputEvent = merge(mouseEvent, keyboardEvent);
inputEvent.on(event => console.log('Input event:', event));
createInterval(interval: number): Event
Creates a periodic event that triggers at a specified interval. The event will automatically emit an incrementing counter value each time it triggers, starting from zero. This function is useful for creating time-based triggers within an application, such as updating UI elements, polling, or any other timed operation.
- @template R - The return type of the event handler function, defaulting to
void. - @param interval - The interval in milliseconds at which the event should trigger.
- @returns
{Event<number, R>}AnEventinstance that triggers at the specified interval, emitting an incrementing counter value.
// Creating an interval event that logs a message every second
const tickEvent = createInterval(1000);
tickEvent.on(tickNumber => console.log('Tick:', tickNumber));
createEvent(): Event
Creates a new Event instance for multi-listener event handling. This is the primary way to create events in the library.
- @template T - The type of value emitted to listeners (event payload)
- @template R - The return type of listener functions (collected in EventResult)
- @returns
{Event<T, R>}A new Event instance ready for listener registration
// Create an event that accepts a string payload
const messageEvent = createEvent<string>();
messageEvent.on(msg => console.log('Received:', msg));
messageEvent('Hello'); // All listeners receive 'Hello'
// Create an event where listeners return values
const validateEvent = createEvent<string, boolean>();
validateEvent.on(str => str.length > 0);
validateEvent.on(str => str.length < 100);
const results = await validateEvent('test'); // EventResult with [true, true]
AsyncIteratorObject
A wrapper class providing functional operations on async iterables. Enables lazy evaluation and chainable transformations on async data streams.
Key characteristics:
Lazy evaluation - operations are not executed until iteration begins
Chainable - all transformation methods return new AsyncIteratorObject instances
Supports both sync and async transformation functions
Memory efficient - processes values one at a time
@template T The type of values yielded by the iterator
@template TReturn The return type of the iterator
@template TNext The type of value that can be passed to next()
// Create from an async generator
async function* numbers() {
yield 1; yield 2; yield 3;
}
const iterator = new AsyncIteratorObject(numbers())
.map(x => x 2)
.filter(x => x > 2);
for await (const value of iterator) {
console.log(value); // 4, 6
}
AsyncIteratorObject.from(iterable: Iterable): AsyncIteratorObject
A wrapper class providing functional operations on async iterables. Enables lazy evaluation and chainable transformations on async data streams.
Key characteristics:
Lazy evaluation - operations are not executed until iteration begins
Chainable - all transformation methods return new AsyncIteratorObject instances
Supports both sync and async transformation functions
Memory efficient - processes values one at a time
@template T The type of values yielded by the iterator
@template TReturn The return type of the iterator
@template TNext The type of value that can be passed to next()
// Create from an async generator
async function* numbers() {
yield 1; yield 2; yield 3;
}
const iterator = new AsyncIteratorObject(numbers())
.map(x => x 2)
.filter(x => x > 2);
for await (const value of iterator) {
console.log(value); // 4, 6
}
Creates an AsyncIteratorObject from a synchronous iterable. Converts the sync iterable to async for uniform handling.
- @param iterable A synchronous iterable to convert
- @returns A new AsyncIteratorObject wrapping the converted iterable
const syncArray = [1, 2, 3, 4, 5];
const asyncIterator = AsyncIteratorObject.from(syncArray);
for await (const value of asyncIterator) {
console.log(value); // 1, 2, 3, 4, 5
}
AsyncIteratorObject.merge(...iterables: AsyncIterable[]): AsyncIteratorObject
Merges multiple async iterables into a single stream. Values from all sources are interleaved as they become available. The merged iterator completes when all source iterators complete.
- @param iterables The async iterables to merge
- @returns A new AsyncIteratorObject yielding values from all sources
async function* source1() { yield 1; yield 3; }
async function* source2() { yield 2; yield 4; }
const merged = AsyncIteratorObject.merge(source1(), source2());
for await (const value of merged) {
console.log(value); // Order depends on timing: 1, 2, 3, 4 or similar
}
AsyncIteratorObject.pipe(generatorFactory, signal?: AbortSignal): AsyncIteratorObject
Low-level transformation method using generator functions. Allows custom async transformations by providing a generator factory. Used internally by other transformation methods.
- @param generatorFactory A function that returns a generator function for transforming values
- @param signal Optional AbortSignal to cancel the operation
- @returns A new AsyncIteratorObject with transformed values
AsyncIteratorObject.awaited(): AsyncIteratorObject
Resolves promise-like values from the source iterator. Useful for normalizing values before applying type-guard predicates.
- @returns A new AsyncIteratorObject yielding awaited values
AsyncIteratorObject.map(callbackfn): AsyncIteratorObject
Transforms each value using a mapping function. The callback can be synchronous or return a promise.
- @param callbackfn Function to transform each value
- @returns A new AsyncIteratorObject yielding transformed values
const numbers = AsyncIteratorObject.from([1, 2, 3]);
const doubled = numbers.map(x => x 2);
for await (const value of doubled) {
console.log(value); // 2, 4, 6
}
AsyncIteratorObject.filter(predicate): AsyncIteratorObject
AsyncIteratorObject.filter(predicate): AsyncIteratorObject
AsyncIteratorObject.filter(predicate): AsyncIteratorObject
Filters values based on a predicate function. Only values for which the predicate returns truthy are yielded. Supports type guard predicates for type narrowing.
- @param predicate Function to test each value
- @returns A new AsyncIteratorObject yielding only values that pass the test
const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const evens = numbers.filter(x => x % 2 === 0);
for await (const value of evens) {
console.log(value); // 2, 4
}
AsyncIteratorObject.take(limit: number): AsyncIteratorObject
Creates an iterator whose values are the values from this iterator, stopping once the provided limit is reached.
- @param limit The maximum number of values to yield.
AsyncIteratorObject.drop(count: number): AsyncIteratorObject
Creates an iterator whose values are the values from this iterator after skipping the provided count.
- @param count The number of values to drop.
AsyncIteratorObject.flatMap(callback): AsyncIteratorObject
Creates an iterator whose values are the result of applying the callback to the values from this iterator and then flattening the resulting iterators or iterables.
- @param callback A function that accepts up to two arguments to be used to transform values from the underlying iterator into new iterators or iterables to be flattened into the result.
AsyncIteratorObject.reduce(callbackfn): AsyncIteratorObject
AsyncIteratorObject.reduce(callbackfn, initialValue: R): AsyncIteratorObject
AsyncIteratorObject.reduce(callbackfn, ...args: unknown[]): AsyncIteratorObject
Creates an iterator of accumulated values by applying a reducer function. Unlike Array.reduce, this returns an iterator that yields each intermediate accumulated value, not just the final result. This allows observing the accumulation process.
- @param callbackfn Reducer function to accumulate values
- @param initialValue Optional initial value for the accumulation
- @returns A new AsyncIteratorObject yielding accumulated values at each step
const numbers = AsyncIteratorObject.from([1, 2, 3, 4]);
const sums = numbers.reduce((sum, x) => sum + x, 0);
for await (const value of sums) {
console.log(value); // 1, 3, 6, 10 (running totals)
}
AsyncIteratorObject.expand(callbackfn): AsyncIteratorObject
Transforms each value into multiple values using an expander function. Each input value is expanded into zero or more output values. Similar to flatMap but for expanding to multiple values rather than flattening iterables.
- @param callbackfn Function that returns an iterable of values for each input
- @returns A new AsyncIteratorObject yielding all expanded values
const numbers = AsyncIteratorObject.from([1, 2, 3]);
const expanded = numbers.expand(x => [x, x 10]);
for await (const value of expanded) {
console.log(value); // 1, 10, 2, 20, 3, 30
}
AsyncIteratorObject[Symbol.asyncIterator]()
Sequence
A sequence is a FIFO (First-In-First-Out) queue for async consumption. Designed for single consumer with multiple producers pattern. Values are queued and consumed in order, with backpressure support. Respects an optional AbortSignal: enqueue returns false when aborted; waits reject.
Key characteristics:
Single consumer - values are consumed once, in order
Multiple producers can push values concurrently
FIFO ordering - first value in is first value out
Backpressure control via reserve() method
Async iteration support for continuous consumption
@template T The type of values in the sequence.
// Create a sequence for processing tasks
const tasks = new Sequence<string>();
// Producer: Add tasks to the queue
tasks('task1');
tasks('task2');
tasks('task3');
// Consumer: Process tasks in order
const task1 = await tasks.receive(); // 'task1'
const task2 = await tasks.receive(); // 'task2'
const task3 = await tasks.receive(); // 'task3'
Sequence.merge(target: Sequence, ...sequences: Sequence[]): void
Merges multiple source sequences into a target sequence. Values from all sources are forwarded to the target sequence. Each source is consumed independently and concurrently.
- @param target The sequence that will receive values from all sources
- @param sequences The source sequences to merge from
// Create target and source sequences
const target = new Sequence<number>();
const source1 = new Sequence<number>();
const source2 = new Sequence<number>();
// Merge sources into target
Sequence.merge(target, source1, source2);
// Values from both sources appear in target
source1(1);
source2(2);
source1(3);
// Consumer gets values as they arrive
await target.receive(); // Could be 1, 2, or 3 depending on timing
Sequence.size: number
Returns the number of values currently queued.
- @returns The current queue size
Sequence.reserve(capacity: number): Promise
Waits until the queue size drops to or below the specified capacity. Useful for implementing backpressure - producers can wait before adding more items.
- @param capacity The maximum queue size to wait for
- @returns A promise that resolves when the queue size is at or below capacity
// Producer with backpressure control
const sequence = new Sequence<string>();
// Wait if queue has more than 10 items
await sequence.reserve(10);
sequence('new item'); // Safe to add, queue has space
Sequence.emit(value: T): boolean
Sequence.receive(): Promise
Consumes and returns the next value from the queue. If the queue is empty, waits for a value to be added. Values are consumed in FIFO order.
- @returns A promise that resolves with the next value
const sequence = new Sequence<number>();
// Consumer waits for values
const valuePromise = sequence.receive();
// Producer adds value
sequence(42);
// Consumer receives it
const value = await valuePromise; // 42
Sequence.dispose(): void
Disposes of the sequence, signaling any waiting consumers. Called automatically when used with using declaration.
Signal
A signal is a broadcast async primitive for coordinating between producers and consumers. When a value is sent, ALL waiting consumers receive the same value (broadcast pattern). Signals can be reused - each call to next() creates a new promise for the next value.
Key characteristics:
Multiple consumers can wait simultaneously
All waiting consumers receive the same value when sent
Reusable - can send multiple values over time
Supports async iteration for continuous value streaming
@template T The type of value that this signal carries.
// Create a signal for string values
const signal = new Signal<string>();
// Multiple consumers wait for the same value
const promise1 = signal.receive();
const promise2 = signal.receive();
// Send a value - both consumers receive it
signal('Hello World');
const [value1, value2] = await Promise.all([promise1, promise2]);
console.log(value1 === value2); // true - both got 'Hello World'
Signal.merge(target: Signal, ...signals: Signal[]): void
Merges multiple source signals into a target signal. Values from any source signal are forwarded to the target signal. The merge continues until the target signal is aborted.
Note: When the target is aborted, iteration stops after the next value from each source. For immediate cleanup, abort source signals directly.
- @param target The signal that will receive values from all sources
- @param signals The source signals to merge from
// Create a target signal and source signals
const target = new Signal<string>();
const source1 = new Signal<string>();
const source2 = new Signal<string>();
// Merge sources into target
Signal.merge(target, source1, source2);
// Values from any source appear in target
source1('Hello');
const value = await target; // 'Hello'
Signal.emit(value: T): boolean
Signal.receive(): Promise
Waits for the next value to be sent to this signal. If the signal has been aborted, this method will reject with the abort reason.
- @returns A promise that resolves with the next value sent to the signal.
const signal = new Signal<string>();
// Wait for a value
const valuePromise = signal.receive();
// Send a value from elsewhere
signal('Hello');
const value = await valuePromise; // 'Hello'
Signal.dispose(): void
abortableIterable(iterable: AsyncIterable, signal: AbortSignal): AsyncIterable
Wraps an async iterable with abort signal support. Each iteration creates a fresh iterator with scoped abort handling. Listener is added at iteration start and removed on completion/abort/return/throw.
@template T - The yielded value type
@template TReturn - The return value type
@template TNext - The type passed to next()
@param iterable - The source async iterable to wrap
@param signal - AbortSignal to cancel iteration
@returns An async iterable with abort support
@example
const controller = new AbortController();
const source = async function*() { yield 1; yield 2; yield 3; };
for await (const value of abortableIterable(source(), controller.signal)) {
console.log(value);
if (value === 2) controller.abort();
}
iterate(startOrCount?: number, countWhenTwoArgs?: number, step: number = 1): Iterable
Creates an iterable sequence of numbers with flexible parameters. Can generate infinite sequences, finite sequences, or sequences with custom start and step values.
@param args Variable arguments to configure the sequence:
- No args: Infinite sequence starting at 0 with step 1
- 1 arg (count): Sequence from 0 to count-1
- 2 args (start, count): Sequence starting at 'start' for 'count' iterations
- 3 args (start, count, step): Custom start, count, and step value
@returns An iterable that generates numbers according to the parameters
@example
// Infinite sequence: 0, 1, 2, 3, ...
for (const n of iterate()) { }
// Count only: 0, 1, 2, 3, 4
for (const n of iterate(5)) { }
// Start and count: 10, 11, 12, 13, 14
for (const n of iterate(10, 5)) { }
// Start, count, and step: 0, 2, 4, 6, 8
for (const n of iterate(0, 5, 2)) { }
toAsyncIterable(iterable: Iterable): AsyncIterable
Converts a synchronous iterable to an asynchronous iterable. Wraps the sync iterator methods to return promises, enabling uniform async handling.
@template T The type of values yielded by the iterator
@template TReturn The return type of the iterator
@template TNext The type of value that can be passed to next()
@param iterable A synchronous iterable to convert
@returns An async iterable that yields the same values as the input
@example
const syncArray = [1, 2, 3, 4, 5];
const asyncIterable = toAsyncIterable(syncArray);
for await (const value of asyncIterable) {
console.log(value); // 1, 2, 3, 4, 5
}
License
License The MIT License Copyright (c) 2025 Ivan Zakharchanka





