Evnty
Async-first, reactive event handling library for complex event flows with three powerful primitives: Event (multi-listener broadcast), Signal (promise-like coordination), and Sequence (async queue). Built for both browser and Node.js with full TypeScript support.
Table of Contents
- Core Concepts
- Motivation
- Features
- Platform Support
- Installing
- API
ConsumerHandleBroadcastBroadcast.sink: FnBroadcast.handleEvent(event: T): voidBroadcast.size: numberBroadcast.emit(value: T): booleanBroadcast.receive(): PromiseBroadcast.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseBroadcast.catch(onrejected?: Fn | null): PromiseBroadcast.finally(onfinally?: Action | null): PromiseBroadcast.join(): ConsumerHandleBroadcast.getCursor(handle: ConsumerHandle): numberBroadcast.leave(handle: ConsumerHandle): voidBroadcast.consume(handle: ConsumerHandle): TBroadcast.tryConsume(handle: ConsumerHandle): IteratorResultBroadcast.readable(handle: ConsumerHandle): booleanBroadcast[Symbol.asyncIterator](): AsyncIteratorBroadcast.dispose(): voidBroadcast[Symbol.dispose](): void
DispatchResultEventEvent.sink: FnEvent.handleEvent(event: T): voidEvent.size: numberEvent.emit(value: T): DispatchResultEvent.lacks(listener: Listener): booleanEvent.has(listener: Listener): booleanEvent.off(listener: Listener): thisEvent.on(listener: Listener): UnsubscribeEvent.once(listener: Listener): UnsubscribeEvent.clear(): thisEvent.receive(): PromiseEvent.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseEvent.catch(onrejected?: Fn | null): PromiseEvent.finally(onfinally?: Action | null): PromiseEvent.settle(): PromiseEvent[Symbol.asyncIterator](): AsyncIteratorEvent.dispose(): voidEvent[Symbol.dispose](): void
merge(...events: Events): EventcreateInterval(interval: number): EventcreateEvent(): EventAsyncIteratorObjectAsyncIteratorObject.from(iterable: Iterable): AsyncIteratorObjectAsyncIteratorObject.merge(...iterables: AsyncIterable[]): AsyncIteratorObjectAsyncIteratorObject.pipe(generatorFactory, signal?: AbortSignal): AsyncIteratorObjectAsyncIteratorObject.awaited(): AsyncIteratorObjectAsyncIteratorObject.map(callbackfn): AsyncIteratorObjectAsyncIteratorObject.filter(predicate): AsyncIteratorObjectAsyncIteratorObject.filter(predicate): AsyncIteratorObjectAsyncIteratorObject.filter(predicate): AsyncIteratorObjectAsyncIteratorObject.filterMap(callbackfn): AsyncIteratorObjectAsyncIteratorObject.inspect(callbackfn): AsyncIteratorObjectAsyncIteratorObject.enumerate(start: number = 0): AsyncIteratorObjectAsyncIteratorObject.take(limit: number): AsyncIteratorObjectAsyncIteratorObject.takeWhile(predicate): AsyncIteratorObjectAsyncIteratorObject.takeWhile(predicate): AsyncIteratorObjectAsyncIteratorObject.takeWhile(predicate): AsyncIteratorObjectAsyncIteratorObject.drop(count: number): AsyncIteratorObjectAsyncIteratorObject.dropWhile(predicate): AsyncIteratorObjectAsyncIteratorObject.flatMap(callback): AsyncIteratorObjectAsyncIteratorObject.reduce(callbackfn): AsyncIteratorObjectAsyncIteratorObject.reduce(callbackfn, initialValue: R): AsyncIteratorObjectAsyncIteratorObject.reduce(callbackfn, ...args: unknown[]): AsyncIteratorObjectAsyncIteratorObject.expand(callbackfn): AsyncIteratorObjectAsyncIteratorObject[Symbol.asyncIterator](): AsyncIterator
SequenceSignalabortableIterable(iterable: AsyncIterable, signal: AbortSignal): AsyncIterableiterate(startOrCount?: number, countWhenTwoArgs?: number, step: number = 1): IterabletoAsyncIterable(iterable: Iterable): AsyncIterable
- Examples
- License
Core Concepts
Evnty provides three complementary async primitives, each designed for specific patterns:
🔊 Event - Multi-Listener Broadcasting
Events allow multiple listeners to react to values. All registered listeners are called for each emission.
const clickEvent = createEvent<{ x: number, y: number }>();
// Multiple listeners can subscribe
clickEvent.on(({ x, y }) => console.log(`Click at ${x},${y}`));
clickEvent.on(({ x, y }) => updateUI(x, y));
// All listeners receive the value
clickEvent.emit({ x: 100, y: 200 });
Use Event when:
- Multiple components need to react to the same occurrence
- You need pub/sub or observer pattern
- Listeners should persist across multiple emissions
📡 Signal - Promise-Based Coordination
Signals are for coordinating async operations. When a value is sent, ALL waiting consumers receive it (broadcast).
const signal = new Signal<string>();
// Multiple consumers can wait
const promise1 = signal.receive();
const promise2 = signal.receive();
// Send value - all waiting consumers receive it
signal.emit('data');
const [result1, result2] = await Promise.all([promise1, promise2]);
// result1 === 'data' && result2 === 'data'
Use Signal when:
- You need one-time notifications
- Multiple async operations need the same trigger
- Implementing async coordination patterns
📦 Sequence - Async Queue (FIFO)
Sequences are FIFO queues for single-consumer scenarios. Values are consumed in order, with backpressure support.
const taskQueue = new Sequence<Task>();
// Producer adds tasks
taskQueue.emit(task1);
taskQueue.emit(task2);
taskQueue.emit(task3);
// Single consumer processes in order
for await (const task of taskQueue) {
await processTask(task); // task1, then task2, then task3
}
Use Sequence when:
- You need ordered processing (FIFO)
- Only one consumer should handle each value
- You want backpressure control with
reserve()
Key Differences
| Event | Signal | Sequence | |
|---|---|---|---|
| Consumers | Multiple persistent listeners | Multiple one-time receivers | Single consumer |
| Delivery | All listeners called | All waiting get same value | Each value consumed once |
| Pattern | Pub/Sub | Broadcast coordination | Queue/Stream |
| Persistence | Listeners stay registered | Resolves once per receive() | Values queued until consumed |
Motivation
Traditional event handling in JavaScript/TypeScript has limitations:
- String-based event names lack type safety
- No built-in async coordination primitives
- Missing functional transformations for event streams
- Complex patterns require extensive boilerplate
Evnty solves these problems by providing:
- Type-safe events with full TypeScript inference
- Three specialized primitives for different async patterns
- Async iterator transformations via
AsyncIteratorObject(map, filter, reduce, expand, etc.) - Composable abstractions that work together seamlessly
Features
- Async-First Design: Built from the ground up for asynchronous event handling with full Promise support
- Iterator Transformations:
AsyncIteratorObjectprovides map, filter, reduce, take, drop, flatMap, and expand operators - Type-Safe: Full TypeScript support with strong typing and inference throughout the event pipeline
- Async Iteration: Events, Signals, and Sequences can be consumed as async iterables using for-await-of loops
- Event Composition: Merge multiple event streams into unified events
- Zero Dependencies: Lightweight with no external dependencies for optimal bundle size
- Universal: Works seamlessly in both browser and Node.js environments, including service workers
Platform Support
![]() | ![]() | ![]() | ![]() | ![]() | ![]() |
|---|---|---|---|---|---|
| Latest ✔ | Latest ✔ | Latest ✔ | Latest ✔ | Latest ✔ | Latest ✔ |
Installing
Using pnpm:
pnpm add evnty
Using yarn:
yarn add evnty
Using npm:
npm install evnty
Examples
Event - Multi-Listener Pattern
import { createEvent } from 'evnty';
// Create a typed event
const userEvent = createEvent<{ id: number, name: string }>();
// Multiple listeners
userEvent.on(user => console.log('Logger:', user));
userEvent.on(user => updateUI(user));
userEvent.on(user => saveToCache(user));
// Emit - all listeners are called
userEvent.emit({ id: 1, name: 'Alice' });
// One-time listener
userEvent.once(user => console.log('First user only:', user));
// Async iteration
for await (const user of userEvent) {
console.log('User event:', user);
}
Signal - Async Coordination
import { Signal } from 'evnty';
// Coordinate multiple async operations
const dataSignal = new Signal<Buffer>();
// Multiple operations wait for the same data
async function processA() {
const data = await dataSignal.receive();
// Process data in way A
}
async function processB() {
const data = await dataSignal.receive();
// Process data in way B
}
// Start both processors
Promise.all([processA(), processB()]);
// Both receive the same data when it arrives
dataSignal.emit(Buffer.from('shared data'));
Sequence - Task Queue
import { Sequence } from 'evnty';
// Create a task queue
const taskQueue = new Sequence<() => Promise<void>>();
// Single consumer processes tasks in order
(async () => {
for await (const task of taskQueue) {
await task();
console.log('Task completed');
}
})();
// Multiple producers add tasks
taskQueue.emit(async () => fetchData());
taskQueue.emit(async () => processData());
taskQueue.emit(async () => saveResults());
// Backpressure control
await taskQueue.reserve(10); // Wait until queue has ≤10 items
taskQueue.emit(async () => nonUrgentTask());
Combining Primitives
// Event + Signal for request/response pattern
const requestEvent = createEvent<Request>();
const responseSignal = new Signal<Response>();
requestEvent.on(async (req) => {
const response = await handleRequest(req);
responseSignal.emit(response);
});
// Event + Sequence for buffered processing
const dataEvent = createEvent<Data>();
const processQueue = new Sequence<Data>();
dataEvent.on(data => processQueue.emit(data));
// Process with controlled concurrency
for await (const data of processQueue) {
await processWithRateLimit(data);
}
API
ConsumerHandle
A handle representing a consumer's position in a Broadcast. Returned by Broadcast.join() and used to consume values. Implements Disposable for automatic cleanup via using keyword.
- @example
const broadcast = new Broadcast<number>();
using handle = broadcast.join();
broadcast.emit(42);
const value = broadcast.consume(handle); // 42
ConsumerHandle.cursor: number
The current position of this consumer in the buffer.
ConsumerHandle[Symbol.dispose](): void
Leaves the broadcast, releasing this consumer's position.
Broadcast
A multi-consumer FIFO queue where each consumer maintains its own read position. Values are buffered and each consumer can read them independently at their own pace. The buffer automatically compacts when all consumers have read past a position.
Key characteristics:
- Multiple consumers - each gets their own cursor position
- Buffered delivery - values are stored until all consumers read them
- Late joiners only see values emitted after joining
- Automatic cleanup via FinalizationRegistry when handles are garbage collected
Differs from:
Event: Broadcast buffers values, Event does not
Sequence: Broadcast supports multiple consumers, Sequence is single-consumer
Signal: Broadcast buffers values, Signal only notifies current waiters
@template T - The type of values in the broadcast
@example
const broadcast = new Broadcast<number>();
const handle1 = broadcast.join();
const handle2 = broadcast.join();
broadcast.emit(1);
broadcast.emit(2);
broadcast.consume(handle1); // 1
broadcast.consume(handle2); // 1
broadcast.consume(handle1); // 2
Broadcast.sink: Fn
Returns a bound emit function for use as a callback.
Broadcast.handleEvent(event: T): void
DOM EventListener interface compatibility.
Broadcast.size: number
The number of active consumers.
Broadcast.emit(value: T): boolean
Emits a value to all consumers. The value is buffered for consumption.
- @param value - The value to emit.
- @returns
trueif the value was emitted.
Broadcast.receive(): Promise
Waits for the next emitted value without joining as a consumer. Does not buffer - only receives values emitted after calling.
- @returns A promise that resolves with the next emitted value.
Broadcast.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise
Broadcast.catch(onrejected?: Fn | null): Promise
Broadcast.finally(onfinally?: Action | null): Promise
Broadcast.join(): ConsumerHandle
Joins the broadcast as a consumer. Returns a handle used to consume values. The consumer starts at the current buffer position and will only see values emitted after joining.
- @example
const handle = broadcast.join();
// Use handle with consume(), readable(), leave()
Broadcast.getCursor(handle: ConsumerHandle): number
Gets the current cursor position for a consumer handle.
- @param handle - The consumer handle.
- @returns The cursor position.
- @throws If the handle is invalid (already left or never joined).
Broadcast.leave(handle: ConsumerHandle): void
Removes a consumer from the broadcast. The handle becomes invalid after this call. Idempotent - calling multiple times has no effect.
- @param handle - The consumer handle to remove.
Broadcast.consume(handle: ConsumerHandle): T
Consumes and returns the next value for a consumer. Advances the consumer's cursor position.
@param handle - The consumer handle.
@throws If no value is available or the handle is invalid.
@example
if (broadcast.readable(handle)) {
const value = broadcast.consume(handle);
}
Broadcast.tryConsume(handle: ConsumerHandle): IteratorResult
Attempts to consume the next value for a consumer. Returns { done: true } when no value is currently available.
- @param handle - The consumer handle.
- @returns The next value, or
{ done: true }when nothing is available. - @throws If the handle is invalid.
Broadcast.readable(handle: ConsumerHandle): boolean
Checks if there are values available for a consumer to read.
- @param handle - The consumer handle.
- @returns
trueif there are unread values,falseotherwise.
Broadcast[Symbol.asyncIterator](): AsyncIterator
Broadcast.dispose(): void
Broadcast[Symbol.dispose](): void
DispatchResult
Wraps an array of values or promises (typically listener results) and provides batch resolution.
- @template T
DispatchResult.then(onfulfilled?: Fn | null, onrejected?: Fn | null): PromiseLike
DispatchResult.all(): T[] | Promise
Resolves all listener results, rejecting if any promise rejects or any ResultError exists.
DispatchResult.settled(): PromiseSettledResult[] | Promise
Waits for all listener results to settle, regardless of fulfillment or rejection.
Event
Multi-listener event emitter with async support. All registered listeners are called for each emission, and their return values are collected in a DispatchResult. Supports async iteration and an onDispose callback for cleanup.
Differs from:
Signal: Event has persistent listeners; Signal is promise-based (receive per round)
Sequence: Event broadcasts to all listeners; Sequence is a single-consumer queue
@template T - The type of value emitted to listeners (event payload)
@template R - The return type of listener functions
Event.sink: Fn
Returns a bound emit function for use as a callback. Useful for passing to other APIs that expect a function.
- @example
const event = new Event<string>();
someApi.onMessage(event.sink);
Event.handleEvent(event: T): void
DOM EventListener interface compatibility. Allows the event to be used directly with addEventListener.
Event.size: number
The number of listeners for the event.
Event.emit(value: T): DispatchResult
Emits a value to all registered listeners. Each listener is called with the value and their return values are collected.
@param value - The value to emit to all listeners.
@returns A DispatchResult containing all listener return values.
@example
const event = new Event<string, number>();
event.on(str => str.length);
const result = event.emit('hello');
await result.all(); // [5]
Event.lacks(listener: Listener): boolean
Checks if the given listener is NOT registered for this event.
@param listener - The listener function to check against the registered listeners.
@returns
trueif the listener is not already registered; otherwise,false.@example
// Check if a listener is not already added
if (event.lacks(myListener)) {
event.on(myListener);
}
Event.has(listener: Listener): boolean
Checks if the given listener is registered for this event.
@param listener - The listener function to check.
@returns
trueif the listener is currently registered; otherwise,false.@example
// Verify if a listener is registered
if (event.has(myListener)) {
console.log('Listener is already registered');
}
Event.off(listener: Listener): this
Removes a specific listener from this event.
@param listener - The listener to remove.
@returns The event instance for chaining.
@example
// Remove a listener
event.off(myListener);
Event.on(listener: Listener): Unsubscribe
Registers a listener that is called on every emission.
@param listener - The function to call when the event occurs.
@returns A function that removes this listener when called.
@example
const unsubscribe = event.on((data) => {
console.log('Event data:', data);
});
Event.once(listener: Listener): Unsubscribe
Registers a listener that is called only once on the next emission, then auto-removed.
@param listener - The listener to trigger once.
@returns A function that removes this listener when called (if it hasn't fired yet).
@example
const cancel = event.once((data) => {
console.log('Received data once:', data);
});
Event.clear(): this
Removes all listeners from the event. Does not dispose the event - new listeners can still be added after clearing.
- @returns The event instance for chaining.
Event.receive(): Promise
Waits for the next emission and returns the emitted value.
- @returns A promise that resolves with the next emitted value.
Event.then(onfulfilled?: Fn | null, onrejected?: Fn | null): Promise
Event.catch(onrejected?: Fn | null): Promise
Event.finally(onfinally?: Action | null): Promise
Event.settle(): Promise
Waits for the next emission via receive() and wraps the outcome in a PromiseSettledResult - always resolves, never rejects.
@returns A promise that resolves with the settled result.
@example
const result = await event.settle();
if (result.status === 'fulfilled') {
console.log('Value:', result.value);
} else {
console.error('Reason:', result.reason);
}
Event[Symbol.asyncIterator](): AsyncIterator
Event.dispose(): void
Event[Symbol.dispose](): void
merge(...events: Events): Event
Merges multiple events into a single event that triggers whenever any source triggers. Disposing the merged event unsubscribes from all sources.
@param events - The events to merge.
@returns A new Event that forwards emissions from all sources.
@example
const mouseEvent = createEvent<MouseEvent>();
const keyboardEvent = createEvent<KeyboardEvent>();
const inputEvent = merge(mouseEvent, keyboardEvent);
inputEvent.on(event => console.log('Input event:', event));
createInterval(interval: number): Event
Creates a periodic event that emits an incrementing counter (starting from 0) at a fixed interval. Disposing the event clears the interval.
@param interval - The interval in milliseconds.
@returns An Event that triggers at the specified interval.
@example
const tickEvent = createInterval(1000);
tickEvent.on(tickNumber => console.log('Tick:', tickNumber));
createEvent(): Event
Creates a new Event instance for multi-listener event handling.
- @example
const messageEvent = createEvent<string>();
messageEvent.on(msg => console.log('Received:', msg));
messageEvent.emit('Hello'); // All listeners receive 'Hello'
// Listeners can return values, collected via DispatchResult
const validateEvent = createEvent<string, boolean>();
validateEvent.on(str => str.length > 0);
validateEvent.on(str => str.length < 100);
const results = await validateEvent.emit('test').all(); // [true, true]
AsyncIteratorObject
A wrapper class providing functional operations on async iterables. Enables lazy evaluation and chainable transformations on async data streams.
Key characteristics:
Lazy evaluation - operations are not executed until iteration begins
Chainable - all transformation methods return new AsyncIteratorObject instances
Supports both sync and async transformation functions
Memory efficient - processes values one at a time
Operation fusion - chains execute in optimized passes
@template T The type of values yielded by the iterator
@template TReturn The return type of the iterator
@template TNext The type of value that can be passed to next()
@example
// Create from an async generator
async function* numbers() {
yield 1; yield 2; yield 3;
}
const iterator = new AsyncIteratorObject(numbers())
.map(x => x 2)
.filter(x => x > 2);
for await (const value of iterator) {
console.log(value); // 4, 6
}
AsyncIteratorObject.from(iterable: Iterable): AsyncIteratorObject
A wrapper class providing functional operations on async iterables. Enables lazy evaluation and chainable transformations on async data streams.
Key characteristics:
Lazy evaluation - operations are not executed until iteration begins
Chainable - all transformation methods return new AsyncIteratorObject instances
Supports both sync and async transformation functions
Memory efficient - processes values one at a time
Operation fusion - chains execute in optimized passes
@template T The type of values yielded by the iterator
@template TReturn The return type of the iterator
@template TNext The type of value that can be passed to next()
@example
// Create from an async generator
async function* numbers() {
yield 1; yield 2; yield 3;
}
const iterator = new AsyncIteratorObject(numbers())
.map(x => x 2)
.filter(x => x > 2);
for await (const value of iterator) {
console.log(value); // 4, 6
}
Creates an AsyncIteratorObject from a synchronous iterable. Converts the sync iterable to async for uniform handling.
@param iterable A synchronous iterable to convert
@returns A new AsyncIteratorObject wrapping the converted iterable
@example
const syncArray = [1, 2, 3, 4, 5];
const asyncIterator = AsyncIteratorObject.from(syncArray);
for await (const value of asyncIterator) {
console.log(value); // 1, 2, 3, 4, 5
}
AsyncIteratorObject.merge(...iterables: AsyncIterable[]): AsyncIteratorObject
Merges multiple async iterables into a single stream. Values from all sources are interleaved as they become available. The merged iterator completes when all source iterators complete.
@param iterables The async iterables to merge
@returns A new AsyncIteratorObject yielding values from all sources
@example
async function* source1() { yield 1; yield 3; }
async function* source2() { yield 2; yield 4; }
const merged = AsyncIteratorObject.merge(source1(), source2());
for await (const value of merged) {
console.log(value); // Order depends on timing: 1, 2, 3, 4 or similar
}
AsyncIteratorObject.pipe(generatorFactory, signal?: AbortSignal): AsyncIteratorObject
Escape hatch for custom transformations not covered by the built-in operators. Materializes the fused operation chain, then applies a generator function to each value.
- @param generatorFactory A function that returns a generator function for transforming values
- @param signal Optional AbortSignal to cancel the operation
- @returns A new AsyncIteratorObject with transformed values
AsyncIteratorObject.awaited(): AsyncIteratorObject
Resolves promise-like values from the source iterator. Useful for normalizing values before applying type-guard predicates.
- @returns A new AsyncIteratorObject yielding awaited values
AsyncIteratorObject.map(callbackfn): AsyncIteratorObject
Transforms each value using a mapping function. The callback can be synchronous or return a promise.
@param callbackfn Function to transform each value
@returns A new AsyncIteratorObject yielding transformed values
@example
const numbers = AsyncIteratorObject.from([1, 2, 3]);
const doubled = numbers.map(x => x 2);
for await (const value of doubled) {
console.log(value); // 2, 4, 6
}
AsyncIteratorObject.filter(predicate): AsyncIteratorObject
AsyncIteratorObject.filter(predicate): AsyncIteratorObject
AsyncIteratorObject.filter(predicate): AsyncIteratorObject
Filters values based on a predicate function. Only values for which the predicate returns truthy are yielded. Supports type guard predicates for type narrowing.
@param predicate Function to test each value
@returns A new AsyncIteratorObject yielding only values that pass the test
@example
const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const evens = numbers.filter(x => x % 2 === 0);
for await (const value of evens) {
console.log(value); // 2, 4
}
AsyncIteratorObject.filterMap(callbackfn): AsyncIteratorObject
Combined filter and map operation. Returns undefined to skip a value. The callback result is awaited to check for undefined.
@param callbackfn Function that returns a transformed value or undefined to skip
@returns A new AsyncIteratorObject yielding non-undefined transformed values
@example
const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const doubledEvens = numbers.filterMap(x => x % 2 === 0 ? x 2 : undefined);
for await (const value of doubledEvens) {
console.log(value); // 4, 8
}
AsyncIteratorObject.inspect(callbackfn): AsyncIteratorObject
Executes a side-effect function for each value without modifying the stream. Useful for debugging or logging. The callback is awaited for proper sequencing.
@param callbackfn Function to execute for each value
@returns A new AsyncIteratorObject yielding the same values
@example
const numbers = AsyncIteratorObject.from([1, 2, 3]);
const logged = numbers.inspect(x => console.log('value:', x)).map(x => x 2);
AsyncIteratorObject.enumerate(start: number = 0): AsyncIteratorObject
Wraps each value with its index as a tuple. Useful after filtering when original indices are lost.
@param start Starting index (default: 0)
@returns A new AsyncIteratorObject yielding [index, value] tuples
@example
const letters = AsyncIteratorObject.from(['a', 'b', 'c']);
const enumerated = letters.enumerate();
for await (const [i, v] of enumerated) {
console.log(i, v); // 0 'a', 1 'b', 2 'c'
}
AsyncIteratorObject.take(limit: number): AsyncIteratorObject
Creates an iterator whose values are the values from this iterator, stopping once the provided limit is reached.
- @param limit The maximum number of values to yield.
AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject
AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject
AsyncIteratorObject.takeWhile(predicate): AsyncIteratorObject
Takes values while the predicate returns truthy. Stops immediately when predicate returns falsy. Supports type guard predicates for type narrowing.
@param predicate Function to test each value
@returns A new AsyncIteratorObject yielding values until predicate fails
@example
const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const small = numbers.takeWhile(x => x < 4);
for await (const value of small) {
console.log(value); // 1, 2, 3
}
AsyncIteratorObject.drop(count: number): AsyncIteratorObject
Creates an iterator whose values are the values from this iterator after skipping the provided count.
- @param count The number of values to drop.
AsyncIteratorObject.dropWhile(predicate): AsyncIteratorObject
Skips values while the predicate returns truthy. Yields all remaining values once predicate returns falsy.
@param predicate Function to test each value
@returns A new AsyncIteratorObject skipping values until predicate fails
@example
const numbers = AsyncIteratorObject.from([1, 2, 3, 4, 5]);
const afterSmall = numbers.dropWhile(x => x < 3);
for await (const value of afterSmall) {
console.log(value); // 3, 4, 5
}
AsyncIteratorObject.flatMap(callback): AsyncIteratorObject
Creates an iterator whose values are the result of applying the callback to the values from this iterator and then flattening the resulting iterators or iterables.
- @param callback A function that accepts up to two arguments to be used to transform values from the underlying iterator into new iterators or iterables to be flattened into the result.
AsyncIteratorObject.reduce(callbackfn): AsyncIteratorObject
AsyncIteratorObject.reduce(callbackfn, initialValue: R): AsyncIteratorObject
AsyncIteratorObject.reduce(callbackfn, ...args: unknown[]): AsyncIteratorObject
Creates an iterator of accumulated values by applying a reducer function. Unlike Array.reduce, this returns an iterator that yields each intermediate accumulated value, not just the final result. This allows observing the accumulation process.
@param callbackfn Reducer function to accumulate values
@param initialValue Optional initial value for the accumulation
@returns A new AsyncIteratorObject yielding accumulated values at each step
@example
const numbers = AsyncIteratorObject.from([1, 2, 3, 4]);
const sums = numbers.reduce((sum, x) => sum + x, 0);
for await (const value of sums) {
console.log(value); // 1, 3, 6, 10 (running totals)
}
AsyncIteratorObject.expand(callbackfn): AsyncIteratorObject
Transforms each value into multiple values using an expander function. Each input value is expanded into zero or more output values. Like flatMap but takes sync Iterables (or Promises of Iterables) instead of AsyncIterables.
@param callbackfn Function that returns an iterable of values for each input
@returns A new AsyncIteratorObject yielding all expanded values
@example
const numbers = AsyncIteratorObject.from([1, 2, 3]);
const expanded = numbers.expand(x => [x, x 10]);
for await (const value of expanded) {
console.log(value); // 1, 10, 2, 20, 3, 30
}
AsyncIteratorObject[Symbol.asyncIterator](): AsyncIterator
Sequence
A sequence is a FIFO (First-In-First-Out) queue for async consumption. Designed for single consumer with multiple producers pattern. Values are queued and consumed in order, with backpressure support. Respects an optional AbortSignal: emit() returns false when aborted; waits reject.
Key characteristics:
Single consumer - values are consumed once, in order
Multiple producers can push values concurrently
FIFO ordering - first value in is first value out
Backpressure control via reserve() method
Async iteration support for continuous consumption
@template T The type of values in the sequence.
@example
// Create a sequence for processing tasks
const tasks = new Sequence<string>();
// Producer: Add tasks to the queue
tasks.emit('task1');
tasks.emit('task2');
tasks.emit('task3');
// Consumer: Process tasks in order
const task1 = await tasks.receive(); // 'task1'
const task2 = await tasks.receive(); // 'task2'
const task3 = await tasks.receive(); // 'task3'
Sequence.merge(target: Sequence, ...sequences: Sequence[]): void
Merges multiple source sequences into a target sequence. Values from all sources are forwarded to the target sequence. Each source is consumed independently and concurrently.
@param target The sequence that will receive values from all sources
@param sequences The source sequences to merge from
@example
// Create target and source sequences
const target = new Sequence<number>();
const source1 = new Sequence<number>();
const source2 = new Sequence<number>();
// Merge sources into target
Sequence.merge(target, source1, source2);
// Values from both sources appear in target
source1.emit(1);
source2.emit(2);
source1.emit(3);
// Consumer gets values as they arrive
await target.receive(); // Could be 1, 2, or 3 depending on timing
Sequence.size: number
Returns the number of values currently queued.
- @returns The current queue size
Sequence.reserve(capacity: number): Promise
Waits until the queue size drops to or below the specified capacity. Useful for implementing backpressure - producers can wait before adding more items.
@param capacity The maximum queue size to wait for
@returns A promise that resolves when the queue size is at or below capacity
@example
// Producer with backpressure control
const sequence = new Sequence<string>();
// Wait if queue has more than 10 items
await sequence.reserve(10);
sequence.emit('new item'); // Safe to add, queue has space
Sequence.emit(value: T): boolean
Pushes a value onto the queue. Wakes any pending receive() waiter.
- @param value - The value to enqueue.
- @returns
trueif the sequence is still active.
Sequence.receive(): Promise
Consumes and returns the next value from the queue. If the queue is empty, waits for a value to be added. Values are consumed in FIFO order. If the sequence has been aborted or disposed, this method rejects with Error('Disposed').
@returns A promise that resolves with the next value
@example
const sequence = new Sequence<number>();
// Consumer waits for values
const valuePromise = sequence.receive();
// Producer adds value
sequence.emit(42);
// Consumer receives it
const value = await valuePromise; // 42
Sequence.dispose(): void
Disposes of the sequence, rejecting any pending receive() waiters. Called by [Symbol.dispose]() (inherited from Async) when using the using declaration.
Signal
Promise-based async coordination primitive. emit() resolves the pending receive() promise (shared across callers). Reusable - after each emission a new round of receive() calls can be made. Disposable via [Symbol.dispose]() or an optional AbortSignal.
@template T The type of value that this signal carries.
@example
const signal = new Signal<string>();
const promise = signal.receive();
signal.emit('hello');
await promise; // 'hello'
Signal.merge(target: Signal, ...signals: Signal[]): void
Merges multiple source signals into a target signal. Values from any source signal are emitted to the target signal. The merge continues until the target signal is disposed.
Note: When the target is disposed, iteration stops after the next value from each source. For immediate cleanup, dispose source signals directly.
@param target The signal that will receive values from all sources
@param signals The source signals to merge from
@example
// Create a target signal and source signals
const target = new Signal<string>();
const source1 = new Signal<string>();
const source2 = new Signal<string>();
// Merge sources into target
Signal.merge(target, source1, source2);
// Values from any source appear in target
const promise = target.receive();
source1.emit('Hello');
const value = await promise; // 'Hello'
Signal.emit(value: T): boolean
Sends a value to the waiting receiver, if any.
- @param value - The value to send.
- @returns
trueif the value was emitted.
Signal.receive(): Promise
Waits for the next value to be sent to this signal. If the signal has been aborted or disposed, this method rejects with Error('Disposed').
@returns A promise that resolves with the next value sent to the signal.
@example
const signal = new Signal<string>();
// Wait for a value
const valuePromise = signal.receive();
// Send a value from elsewhere
signal.emit('Hello');
const value = await valuePromise; // 'Hello'
Signal.dispose(): void
abortableIterable(iterable: AsyncIterable, signal: AbortSignal): AsyncIterable
Wraps an async iterable with abort signal support. Each iteration creates a fresh iterator with scoped abort handling. Listener is added at iteration start and removed on completion/abort/return.
@template T - The yielded value type
@template TReturn - The return value type
@template TNext - The type passed to next()
@param iterable - The source async iterable to wrap
@param signal - AbortSignal to cancel iteration
@returns An async iterable with abort support
@example
const controller = new AbortController();
const source = async function*() { yield 1; yield 2; yield 3; };
for await (const value of abortableIterable(source(), controller.signal)) {
console.log(value);
if (value === 2) controller.abort();
}
iterate(startOrCount?: number, countWhenTwoArgs?: number, step: number = 1): Iterable
Creates an iterable sequence of numbers with flexible parameters. Can generate infinite sequences, finite sequences, or sequences with custom start and step values.
@param args Variable arguments to configure the sequence:
- No args: Infinite sequence starting at 0 with step 1
- 1 arg (count): Sequence from 0 to count-1
- 2 args (start, count): Sequence starting at 'start' for 'count' iterations
- 3 args (start, count, step): Custom start, count, and step value
@returns An iterable that generates numbers according to the parameters
@example
// Infinite sequence: 0, 1, 2, 3, ...
for (const n of iterate()) { }
// Count only: 0, 1, 2, 3, 4
for (const n of iterate(5)) { }
// Start and count: 10, 11, 12, 13, 14
for (const n of iterate(10, 5)) { }
// Start, count, and step: 0, 2, 4, 6, 8
for (const n of iterate(0, 5, 2)) { }
toAsyncIterable(iterable: Iterable): AsyncIterable
Converts a synchronous iterable to an asynchronous iterable. Wraps the sync iterator methods to return promises, enabling uniform async handling.
@template T The type of values yielded by the iterator
@template TReturn The return type of the iterator
@template TNext The type of value that can be passed to next()
@param iterable A synchronous iterable to convert
@returns An async iterable that yields the same values as the input
@example
const syncArray = [1, 2, 3, 4, 5];
const asyncIterable = toAsyncIterable(syncArray);
for await (const value of asyncIterable) {
console.log(value); // 1, 2, 3, 4, 5
}
License
License The MIT License Copyright (c) 2025 Ivan Zakharchanka





