// File:
---
title: Introduction
description: A collection of commonly used stream patterns for the Web Streams API and Node.js Streams.
slug: /
---
## What is datastream
Datastream is a collection of commonly used **stream patterns** for the **Web Streams API** and **Node.js Streams**.
If you're iterating over an array more than once, it's time to use streams.
## A quick example
Code is better than 10,000 words, so let's jump into an example.
Let's assume you want to read a CSV file, validate the data, and compress it:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream } from '@datastream/compress'
const streams = [
createReadableStream(csvData),
csvParseStream({ header: true }),
validateStream(schema),
gzipCompressStream()
]
await pipeline(streams)
```
## Stream types
- **Readable**: The start of a pipeline that injects data into a stream.
- **PassThrough**: Does not modify the data, but listens and prepares a result that can be retrieved.
- **Transform**: Modifies data as it passes through.
- **Writable**: The end of a pipeline that stores data from the stream.
## Setup
```bash
npm install @datastream/core @datastream/{module}
```
## Why streams?
Streams allow you to process data incrementally, without loading everything into memory at once. This is essential for:
- **Large files**: Process gigabytes of data with constant memory usage
- **Real-time data**: Handle data as it arrives, not after it's all collected
- **Composability**: Chain simple operations into complex data pipelines
- **Performance**: Start processing before all data is available
Datastream provides ready-made stream patterns so you can focus on your data flow instead of stream plumbing.
## Next steps
Ready to dive in? Head to the [Quick Start](/docs/quick-start) guide.
// File: core-concepts/
---
title: Core Concepts
description: Learn about stream types, pipeline orchestration, and naming conventions in datastream.
---
## Stream types
Datastream uses four stream types. Each package export follows a naming convention that tells you the type:
| Type | Naming pattern | Purpose |
|------|---------------|---------|
| **Readable** | `*ReadableStream`, `*ReadStream` | Injects data into a pipeline — files, databases, APIs |
| **PassThrough** | `*CountStream`, `*LengthStream`, `*DetectStream` | Observes data without modifying it, collects metrics via `.result()` |
| **Transform** | `*ParseStream`, `*FormatStream`, `*CompressStream` | Modifies data as it passes through |
| **Writable** | `*WriteStream`, `*PutItemStream` | Consumes data at the end — files, databases, APIs |
## `pipeline()` vs `pipejoin()`
### `pipeline(streams[], streamOptions)`
Connects all streams, waits for completion, and returns combined results from all PassThrough streams:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'
import { stringLengthStream } from '@datastream/string'
const count = objectCountStream()
const length = stringLengthStream()
const result = await pipeline([
createReadableStream(['hello', 'world']),
count,
length,
])
console.log(result)
// { count: 2, length: 10 }
```
If the last stream is Readable or Transform (no Writable at the end), `pipeline` automatically appends a no-op Writable so the pipeline completes.
### `pipejoin(streams[])`
Connects streams and returns the resulting stream — use this when you want to consume the output manually:
```javascript
import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core'
const streams = [
createReadableStream([1, 2, 3]),
createTransformStream((n, enqueue) => enqueue(n * 2)),
]
const river = pipejoin(streams)
const output = await streamToArray(river)
// [2, 4, 6]
```
## The `.result()` pattern
PassThrough streams collect metrics without modifying data. After the pipeline completes, retrieve results:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
import { objectCountStream } from '@datastream/object'
import { digestStream } from '@datastream/digest'
const count = objectCountStream()
const digest = await digestStream({ algorithm: 'SHA2-256' })
const result = await pipeline([
createReadableStream(data),
digest,
csvParseStream(),
count,
])
console.log(result)
// { digest: 'SHA2-256:abc123...', count: 1000 }
```
Each PassThrough stream returns `{ key, value }` from its `.result()` method. `pipeline()` combines them into a single object. You can customize the key with the `resultKey` option:
```javascript
const count = objectCountStream({ resultKey: 'rowCount' })
// result: { rowCount: 1000 }
```
## Stream options
All stream factory functions accept a `streamOptions` parameter:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `highWaterMark` | `number` | — | Backpressure threshold — how many chunks to buffer before pausing |
| `chunkSize` | `number` | `16384` (16KB) | Size hint for chunking strategies |
| `signal` | `AbortSignal` | — | Signal to abort the pipeline |
```javascript
const controller = new AbortController()
await pipeline([
createReadableStream(data),
csvParseStream(),
], { signal: controller.signal })
// Abort from elsewhere:
controller.abort()
```
## Error handling
### AbortSignal
Use `AbortController` to cancel pipelines:
```javascript
const controller = new AbortController()
setTimeout(() => controller.abort(), 5000) // 5s timeout
try {
await pipeline(streams, { signal: controller.signal })
} catch (e) {
if (e.name === 'AbortError') {
console.log('Pipeline was cancelled')
}
}
```
### Validation errors
Streams like `validateStream` and CSV cleaning streams collect errors in `.result()` rather than throwing, so the pipeline continues processing:
```javascript
const result = await pipeline([
createReadableStream(data),
csvParseStream(),
validateStream({ schema }),
])
console.log(result.validate)
// { '#/required/name': { id: '#/required/name', keys: ['name'], message: '...', idx: [3, 7] } }
```
Invalid rows are dropped by default. Set `onErrorEnqueue: true` to keep them in the stream.
## Lazy options
Many CSV streams accept functions instead of values for options. This lets you wire up detection results that aren't available until runtime:
```javascript
const detect = csvDetectDelimitersStream()
csvParseStream({
delimiterChar: () => detect.result().value.delimiterChar,
})
```
The function is called when the stream first needs the value, not at creation time.
// File: examples/api-pagination-dynamodb/
---
title: API pagination to DynamoDB
description: Fetch paginated API data and write to DynamoDB with datastream.
---
Fetch paginated API data and write to DynamoDB:
```javascript
import { pipeline } from '@datastream/core'
import { fetchReadableStream } from '@datastream/fetch'
import { objectCountStream } from '@datastream/object'
import { awsDynamoDBPutItemStream } from '@datastream/aws'
import { createTransformStream } from '@datastream/core'
const count = objectCountStream()
const result = await pipeline([
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'data',
nextPath: 'pagination.next_url',
}),
createTransformStream((item, enqueue) => {
enqueue({
PK: { S: `USER#${item.id}` },
SK: { S: `PROFILE` },
name: { S: item.name },
email: { S: item.email },
})
}),
count,
awsDynamoDBPutItemStream({ TableName: 'Users' }),
])
console.log(result)
// { count: 450 }
```
// File: examples/browser-file-processing/
---
title: Browser file processing
description: Process files in the browser using the File System Access API and datastream.
---
Use the File System Access API to process files in the browser:
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvFormatStream,
} from '@datastream/csv'
import { objectFromEntriesStream, objectCountStream } from '@datastream/object'
const types = [{ accept: { 'text/csv': ['.csv'] } }]
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const count = objectCountStream()
const result = await pipeline([
await fileReadStream({ types }),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
count,
csvFormatStream({ header: true }),
await fileWriteStream({ path: 'output.csv', types }),
])
console.log(result)
// { count: 500 }
```
// File: examples/checksum-compress/
---
title: Checksum and compress
description: Calculate a digest while compressing and writing a file with datastream.
---
Calculate a digest while compressing and writing a file:
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import { digestStream } from '@datastream/digest'
import { gzipCompressStream } from '@datastream/compress'
const digest = await digestStream({ algorithm: 'SHA2-256' })
const result = await pipeline([
fileReadStream({ path: './data.csv' }),
digest,
gzipCompressStream(),
fileWriteStream({ path: './data.csv.gz' }),
])
console.log(result)
// { digest: 'SHA2-256:e3b0c44298fc1c14...' }
```
// File: examples/csv-etl/
---
title: CSV ETL — validate and compress
description: Read, detect, parse, validate, re-format, compress, and write CSV files with datastream.
---
Read a CSV file, detect format, parse, validate, re-format, compress, and write:
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvRemoveEmptyRowsStream,
csvRemoveMalformedRowsStream,
csvCoerceValuesStream,
csvFormatStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream } from '@datastream/compress'
const schema = {
type: 'object',
required: ['id', 'name', 'email'],
properties: {
id: { type: 'number' },
name: { type: 'string', minLength: 1 },
email: { type: 'string', format: 'email' },
},
additionalProperties: false,
}
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
fileReadStream({ path: './input.csv' }),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
csvRemoveEmptyRowsStream(),
csvRemoveMalformedRowsStream({
headers: () => detectHeader.result().value.header,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
csvCoerceValuesStream(),
validateStream({ schema }),
csvFormatStream({ header: true }),
gzipCompressStream(),
fileWriteStream({ path: './output.csv.gz' }),
])
console.log(result)
// { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, csvRemoveEmptyRows: {...}, csvRemoveMalformedRows: {...}, validate: {} }
```
// File: examples/parquet/
---
title: Parquet — read and write
description: Read and write Apache Parquet files using datastream with hyparquet and parquet-wasm.
---
### Read Parquet file into CSV
Read a Parquet file from S3, extract specific columns, and write as CSV:
```javascript
import { pipeline, createReadableStream, streamToBuffer } from '@datastream/core'
import { awsS3GetObjectStream } from '@datastream/aws'
import { objectReadableStream } from '@datastream/object'
import { csvFormatStream } from '@datastream/csv'
import { fileWriteStream } from '@datastream/file'
import { parquetRead } from 'hyparquet'
const buffer = await streamToBuffer(
await awsS3GetObjectStream({ Bucket: 'data-lake', Key: 'users.parquet' }),
)
const rows = []
await parquetRead({
file: {
byteLength: buffer.byteLength,
read: (start, end) => buffer.slice(start, end),
},
columns: ['id', 'name', 'email'],
onComplete: (data) => rows.push(...data),
})
const result = await pipeline([
objectReadableStream(rows),
csvFormatStream({ header: true }),
fileWriteStream({ path: './users.csv' }),
])
```
### Write CSV to Parquet
Read a CSV file, parse into objects, and write as Parquet to S3:
```javascript
import { pipeline, streamToArray } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvCoerceValuesStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { createReadableStreamFromArrayBuffer } from '@datastream/core'
import { awsS3PutObjectStream } from '@datastream/aws'
import { tableFromJSON, tableToIPC } from 'apache-arrow'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const rows = await streamToArray(
await pipeline([
fileReadStream({ path: './users.csv' }),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
csvCoerceValuesStream(),
]),
)
const table = tableFromJSON(rows)
const { writeParquet } = await import('parquet-wasm')
const parquetBuffer = writeParquet(table)
await pipeline([
createReadableStreamFromArrayBuffer(parquetBuffer),
awsS3PutObjectStream({ Bucket: 'data-lake', Key: 'users.parquet' }),
])
```
// File: examples/s3-transform/
---
title: S3 to S3 — transform and re-upload
description: Read from S3, transform CSV data, and write back with datastream.
---
Read from S3, parse CSV, validate, re-format, and write back:
```javascript
import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream, awsS3PutObjectStream } from '@datastream/aws'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvRemoveMalformedRowsStream,
csvFormatStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'input.csv.gz' }),
gzipDecompressStream(),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
csvRemoveMalformedRowsStream({
headers: () => detectHeader.result().value.header,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
validateStream({ schema }),
csvFormatStream({ header: true }),
gzipCompressStream(),
awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
])
```
// File: packages/aws/
---
title: aws
description: AWS service streams for CloudWatch Logs, DynamoDB, Kinesis, Lambda, S3, SNS, and SQS.
---
AWS service streams for CloudWatch Logs, DynamoDB, Kinesis, Lambda, S3, SNS, and SQS. Node.js only.
## Install
```bash
npm install @datastream/aws
```
Requires the corresponding AWS SDK v3 client packages:
```bash
npm install @aws-sdk/client-cloudwatch-logs
npm install @aws-sdk/client-dynamodb
npm install @aws-sdk/client-kinesis
npm install @aws-sdk/client-lambda
npm install @aws-sdk/client-s3 @aws-sdk/lib-storage
npm install @aws-sdk/client-sns
npm install @aws-sdk/client-sqs
```
## CloudWatch Logs
### `awsCloudWatchLogsSetClient`
Mutates module-level state — not safe for concurrent multi-tenant use.
```javascript
import { CloudWatchLogsClient } from '@aws-sdk/client-cloudwatch-logs'
import { awsCloudWatchLogsSetClient } from '@datastream/aws'
awsCloudWatchLogsSetClient(new CloudWatchLogsClient({ region: 'us-east-1' }))
```
### `awsCloudWatchLogsGetLogEventsStream` Readable async
Gets log events from a CloudWatch Logs log stream. Auto-paginates until no new events are returned.
#### Options
Accepts `GetLogEventsCommand` parameters plus:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `logGroupName` | `string` | — | Log group name |
| `logStreamName` | `string` | — | Log stream name |
| `pollingActive` | `boolean` | `false` | Keep polling for new events |
| `pollingDelay` | `number` | `1000` | Delay (ms) between polls when no new events |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsCloudWatchLogsGetLogEventsStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsCloudWatchLogsGetLogEventsStream({
logGroupName: '/aws/lambda/my-function',
logStreamName: 'stream-id',
})),
])
```
### `awsCloudWatchLogsFilterLogEventsStream` Readable async
Filters log events across log streams in a log group. Auto-paginates through all matching results.
#### Options
Accepts `FilterLogEventsCommand` parameters:
| Option | Type | Description |
|--------|------|-------------|
| `logGroupName` | `string` | Log group name |
| `filterPattern` | `string` | CloudWatch Logs filter pattern |
| `startTime` | `number` | Start of time range (epoch ms) |
| `endTime` | `number` | End of time range (epoch ms) |
#### Example
```javascript
import { awsCloudWatchLogsFilterLogEventsStream } from '@datastream/aws'
const events = await awsCloudWatchLogsFilterLogEventsStream({
logGroupName: '/aws/lambda/my-function',
filterPattern: 'ERROR',
})
```
## S3
### `awsS3SetClient`
Set a custom S3 client. By default, FIPS endpoints are enabled for US and CA regions. Mutates module-level state — not safe for concurrent multi-tenant use.
```javascript
import { S3Client } from '@aws-sdk/client-s3'
import { awsS3SetClient } from '@datastream/aws'
awsS3SetClient(new S3Client({ region: 'eu-west-1' }))
```
### `awsS3GetObjectStream` Readable async
Downloads an object from S3 as a stream.
#### Options
Accepts all `GetObjectCommand` parameters plus:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `client` | `S3Client` | default client | Custom S3 client for this call |
| `Bucket` | `string` | — | S3 bucket name |
| `Key` | `string` | — | S3 object key |
#### Example
```javascript
import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'
await pipeline([
await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'data.csv' }),
csvParseStream(),
])
```
### `awsS3PutObjectStream` PassThrough
Uploads data to S3 using multipart upload.
#### Options
Accepts all S3 PutObject parameters plus:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `client` | `S3Client` | default client | Custom S3 client |
| `Bucket` | `string` | — | S3 bucket name |
| `Key` | `string` | — | S3 object key |
| `onProgress` | `function` | — | Upload progress callback |
| `tags` | `object` | — | S3 object tags |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsS3PutObjectStream } from '@datastream/aws'
import { gzipCompressStream } from '@datastream/compress'
await pipeline([
createReadableStream(data),
gzipCompressStream(),
awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
])
```
### `awsS3ChecksumStream` PassThrough
Computes a multi-part S3 checksum while data passes through. Designed for pre-signed URL uploads in the browser.
#### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `ChecksumAlgorithm` | `string` | `"SHA256"` | `"SHA1"` or `"SHA256"` |
| `partSize` | `number` | `17179870` | Part size in bytes |
| `resultKey` | `string` | `"s3"` | Key in pipeline result |
#### Result
```javascript
{ checksum: 'base64hash-3', checksums: ['part1hash', 'part2hash', 'part3hash'], partSize: 17179870 }
```
## DynamoDB
### `awsDynamoDBSetClient`
Mutates module-level state — not safe for concurrent multi-tenant use.
```javascript
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { awsDynamoDBSetClient } from '@datastream/aws'
awsDynamoDBSetClient(new DynamoDBClient({ region: 'us-east-1' }))
```
### `awsDynamoDBQueryStream` Readable async
Queries a DynamoDB table and auto-paginates through all results.
#### Options
Accepts all `QueryCommand` parameters:
| Option | Type | Description |
|--------|------|-------------|
| `TableName` | `string` | DynamoDB table name |
| `KeyConditionExpression` | `string` | Query key condition |
| `ExpressionAttributeValues` | `object` | Expression values |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBQueryStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsDynamoDBQueryStream({
TableName: 'Users',
KeyConditionExpression: 'PK = :pk',
ExpressionAttributeValues: { ':pk': { S: 'USER#123' } },
})),
])
```
### `awsDynamoDBScanStream` Readable async
Scans an entire DynamoDB table with automatic pagination.
```javascript
import { awsDynamoDBScanStream } from '@datastream/aws'
const items = await awsDynamoDBScanStream({ TableName: 'Users' })
```
### `awsDynamoDBExecuteStatementStream` Readable async
Executes a PartiQL statement against DynamoDB with automatic pagination.
#### Options
Accepts `ExecuteStatementCommand` parameters:
| Option | Type | Description |
|--------|------|-------------|
| `Statement` | `string` | PartiQL statement |
| `Parameters` | `object[]` | Statement parameters |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBExecuteStatementStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsDynamoDBExecuteStatementStream({
Statement: 'SELECT * FROM "Users" WHERE PK = ?',
Parameters: [{ S: 'USER#123' }],
})),
])
```
### `awsDynamoDBGetItemStream` Readable async
Batch gets items by keys. Automatically retries unprocessed keys with exponential backoff.
#### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `TableName` | `string` | — | DynamoDB table name |
| `Keys` | `object[]` | — | Array of key objects |
| `retryMaxCount` | `number` | `10` | Maximum retry attempts |
### `awsDynamoDBPutItemStream` Writable
Writes items to DynamoDB using `BatchWriteItem`. Automatically batches 25 items per request and retries unprocessed items with exponential backoff.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `TableName` | `string` | DynamoDB table name |
| `retryMaxCount` | `number` | Maximum retry attempts (default 10) |
#### Example
```javascript
import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsDynamoDBPutItemStream } from '@datastream/aws'
await pipeline([
createReadableStream(items),
createTransformStream((item, enqueue) => {
enqueue({
PK: { S: `USER#${item.id}` },
SK: { S: 'PROFILE' },
name: { S: item.name },
})
}),
awsDynamoDBPutItemStream({ TableName: 'Users' }),
])
```
### `awsDynamoDBDeleteItemStream` Writable
Deletes items from DynamoDB using `BatchWriteItem`. Batches 25 items per request.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `TableName` | `string` | DynamoDB table name |
| `retryMaxCount` | `number` | Maximum retry attempts (default 10) |
#### Example
```javascript
import { awsDynamoDBDeleteItemStream } from '@datastream/aws'
awsDynamoDBDeleteItemStream({ TableName: 'Users' })
// Input chunks: { PK: { S: 'USER#1' }, SK: { S: 'PROFILE' } }
```
## Kinesis
### `awsKinesisSetClient`
Mutates module-level state — not safe for concurrent multi-tenant use.
```javascript
import { KinesisClient } from '@aws-sdk/client-kinesis'
import { awsKinesisSetClient } from '@datastream/aws'
awsKinesisSetClient(new KinesisClient({ region: 'us-east-1' }))
```
### `awsKinesisGetRecordsStream` Readable async
Gets records from a Kinesis shard. Polls until no more records are returned.
#### Options
Accepts `GetRecordsCommand` parameters plus:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `ShardIterator` | `string` | — | Shard iterator |
| `pollingActive` | `boolean` | `false` | Keep polling for new records |
| `pollingDelay` | `number` | `1000` | Delay (ms) between polls when no records |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsKinesisGetRecordsStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsKinesisGetRecordsStream({
ShardIterator: 'AAA...',
})),
])
```
### `awsKinesisPutRecordsStream` Writable
Writes records to a Kinesis stream. Batches 500 records per `PutRecordsCommand`.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `StreamName` | `string` | Kinesis stream name |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsKinesisPutRecordsStream } from '@datastream/aws'
await pipeline([
createReadableStream(records),
awsKinesisPutRecordsStream({ StreamName: 'my-stream' }),
])
```
## Lambda
### `awsLambdaSetClient`
Mutates module-level state — not safe for concurrent multi-tenant use.
```javascript
import { LambdaClient } from '@aws-sdk/client-lambda'
import { awsLambdaSetClient } from '@datastream/aws'
awsLambdaSetClient(new LambdaClient({ region: 'us-east-1' }))
```
### `awsLambdaReadableStream` Readable
Invokes a Lambda function with response streaming (`InvokeWithResponseStream`).
Also exported as `awsLambdaResponseStream`.
#### Options
Accepts `InvokeWithResponseStreamCommand` parameters. Pass an array to invoke multiple functions sequentially.
| Option | Type | Description |
|--------|------|-------------|
| `FunctionName` | `string` | Lambda function name or ARN |
| `Payload` | `string` | JSON payload |
#### Example
```javascript
import { pipeline } from '@datastream/core'
import { awsLambdaReadableStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'
await pipeline([
awsLambdaReadableStream({
FunctionName: 'data-processor',
Payload: JSON.stringify({ key: 'input.csv' }),
}),
csvParseStream(),
])
```
## SNS
### `awsSNSSetClient`
Mutates module-level state — not safe for concurrent multi-tenant use.
```javascript
import { SNSClient } from '@aws-sdk/client-sns'
import { awsSNSSetClient } from '@datastream/aws'
awsSNSSetClient(new SNSClient({ region: 'us-east-1' }))
```
### `awsSNSPublishMessageStream` Writable
Publishes messages to an SNS topic. Batches 10 messages per `PublishBatchCommand`.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `TopicArn` | `string` | SNS topic ARN |
#### Example
```javascript
import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsSNSPublishMessageStream } from '@datastream/aws'
await pipeline([
createReadableStream(events),
createTransformStream((event, enqueue) => {
enqueue({
Id: event.id,
Message: JSON.stringify(event),
})
}),
awsSNSPublishMessageStream({ TopicArn: 'arn:aws:sns:us-east-1:123:my-topic' }),
])
```
## SQS
### `awsSQSSetClient`
Mutates module-level state — not safe for concurrent multi-tenant use.
```javascript
import { SQSClient } from '@aws-sdk/client-sqs'
import { awsSQSSetClient } from '@datastream/aws'
awsSQSSetClient(new SQSClient({ region: 'us-east-1' }))
```
### `awsSQSReceiveMessageStream` Readable async
Polls an SQS queue and yields messages until the queue is empty.
#### Options
Accepts `ReceiveMessageCommand` parameters plus:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `QueueUrl` | `string` | — | SQS queue URL |
| `MaxNumberOfMessages` | `number` | — | Max messages per poll (1-10) |
| `pollingActive` | `boolean` | `false` | Keep polling even when queue is empty |
| `pollingDelay` | `number` | `1000` | Delay (ms) between polls when queue is empty |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsSQSReceiveMessageStream, awsSQSDeleteMessageStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsSQSReceiveMessageStream({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
})),
awsSQSDeleteMessageStream({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
}),
])
```
### `awsSQSSendMessageStream` Writable
Sends messages to an SQS queue. Batches 10 messages per `SendMessageBatchCommand`.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `QueueUrl` | `string` | SQS queue URL |
### `awsSQSDeleteMessageStream` Writable
Deletes messages from an SQS queue. Batches 10 messages per `DeleteMessageBatchCommand`.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `QueueUrl` | `string` | SQS queue URL |
// File: packages/base64/
---
title: base64
description: Base64 encoding and decoding streams.
---
Base64 encoding and decoding streams.
## Install
```bash
npm install @datastream/base64
```
## `base64EncodeStream` Transform
Encodes data to base64. Handles chunk boundaries correctly by buffering partial 3-byte groups.
> **Note:** Input must be Latin1-encoded strings (code points 0-255). Use `charsetEncodeStream` to convert other encodings before base64 encoding.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { base64EncodeStream } from '@datastream/base64'
await pipeline([
createReadableStream('Hello, World!'),
base64EncodeStream(),
])
```
## `base64DecodeStream` Transform
Decodes base64 data back to its original form. Handles chunk boundaries by buffering partial 4-character groups.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { base64DecodeStream } from '@datastream/base64'
await pipeline([
createReadableStream('SGVsbG8sIFdvcmxkIQ=='),
base64DecodeStream(),
])
```
// File: packages/charset/
---
title: charset
description: Character set detection, decoding, and encoding streams.
---
Character set detection, decoding, and encoding streams.
## Install
```bash
npm install @datastream/charset
```
## `charsetDetectStream` PassThrough
Detects the character encoding of the data passing through by analyzing byte patterns.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `resultKey` | `string` | `"charset"` | Key in pipeline result |
### Result
Returns the most likely charset with confidence score:
```javascript
{ charset: 'UTF-8', confidence: 80 }
```
### Supported charsets
UTF-8, UTF-16BE, UTF-16LE, UTF-32BE, UTF-32LE, Shift_JIS, ISO-2022-JP, ISO-2022-CN, ISO-2022-KR, GB18030, EUC-JP, EUC-KR, Big5, ISO-8859-1, ISO-8859-2, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, windows-1250, windows-1251, windows-1252, windows-1254, windows-1256, KOI8-R
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
import { charsetDetectStream, charsetDecodeStream } from '@datastream/charset'
const detect = charsetDetectStream()
const result = await pipeline([
fileReadStream({ path: './data.csv' }),
detect,
])
console.log(result.charset)
// { charset: 'UTF-8', confidence: 80 }
```
## `charsetDecodeStream` Transform
Decodes binary data to text using the specified character encoding. Uses `TextDecoderStream` internally.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `charset` | `string` | — | Character encoding name (e.g. `"UTF-8"`, `"ISO-8859-1"`) |
### Example
```javascript
import { charsetDecodeStream } from '@datastream/charset'
charsetDecodeStream({ charset: 'ISO-8859-1' })
```
## `charsetEncodeStream` Transform
Encodes text to binary using the specified character encoding. Uses `TextEncoderStream` internally.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `charset` | `string` | — | Character encoding name |
### Example
```javascript
import { charsetEncodeStream } from '@datastream/charset'
charsetEncodeStream({ charset: 'UTF-8' })
```
// File: packages/compress/
---
title: compress
description: Compression and decompression streams for gzip, deflate, brotli, and zstd.
---
Compression and decompression streams for gzip, deflate, brotli, and zstd.
## Install
```bash
npm install @datastream/compress
```
## gzip
### `gzipCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `-1` | Compression level (-1 to 9). -1 = default, 0 = none, 9 = best |
| `maxOutputSize` | `number` | — | Maximum compressed output in bytes (web variant) |
### `gzipDecompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded |
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress'
// Compress
await pipeline([
fileReadStream({ path: './data.csv' }),
gzipCompressStream({ quality: 9 }),
fileWriteStream({ path: './data.csv.gz' }),
])
// Decompress
await pipeline([
fileReadStream({ path: './data.csv.gz' }),
gzipDecompressStream(),
fileWriteStream({ path: './data.csv' }),
])
```
## deflate
### `deflateCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `-1` | Compression level (-1 to 9) |
| `maxOutputSize` | `number` | — | Maximum compressed output in bytes (web variant) |
### `deflateDecompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded |
## brotli
### `brotliCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `11` | Compression level (0 to 11) |
### `brotliDecompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded |
## zstd Node.js only
Requires Node.js with zstd support.
### `zstdCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `3` | Compression level |
### `zstdDecompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded |
## Output size protection
### Decompression bombs
A malicious compressed payload known as a "decompression bomb" can be as small as a few kilobytes but expand to gigabytes when decompressed, exhausting memory and crashing the process. Setting `maxOutputSize` ensures decompression is aborted before memory is exhausted. Always set this when decompressing untrusted input.
```javascript
import { gzipDecompressStream } from '@datastream/compress'
// Limit decompressed output to 100MB
gzipDecompressStream({ maxOutputSize: 100 * 1024 * 1024 })
```
### Compression output limits
Compression streams also support `maxOutputSize` (web variant) to cap compressed output size. This can be useful to enforce storage limits.
```javascript
import { gzipCompressStream } from '@datastream/compress'
// Limit compressed output to 50MB
gzipCompressStream({ maxOutputSize: 50 * 1024 * 1024 })
```
## Platform support
| Algorithm | Node.js | Browser |
|-----------|---------|---------|
| gzip | `node:zlib` | `CompressionStream` |
| deflate | `node:zlib` | `CompressionStream` |
| brotli | `node:zlib` | `CompressionStream` |
| zstd | `node:zlib` | Not supported |
// File: packages/core/
---
title: core
description: Pipeline orchestration, stream factories, and utility functions for datastream.
---
Foundation package providing pipeline orchestration, stream factories, and utility functions.
## Install
```bash
npm install @datastream/core
```
## Pipeline
### `pipeline(streams, streamOptions)` async
Connects all streams, waits for completion, and returns combined `.result()` values from all PassThrough streams. Automatically appends a no-op Writable if the last stream is Readable.
#### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `highWaterMark` | `number` | — | Backpressure threshold |
| `chunkSize` | `number` | — | Size hint for chunking |
| `signal` | `AbortSignal` | — | Abort the pipeline |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'
const count = objectCountStream()
const result = await pipeline([
createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]),
count,
])
console.log(result)
// { count: 3 }
```
### `pipejoin(streams)` returns stream
Connects streams and returns the resulting stream. Use this when you want to consume output manually with `streamToArray`, `streamToString`, or `for await`.
#### Example
```javascript
import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core'
const river = pipejoin([
createReadableStream([1, 2, 3]),
createTransformStream((n, enqueue) => enqueue(n * 2)),
])
const output = await streamToArray(river)
// [2, 4, 6]
```
### `result(streams)` async
Iterates over streams and combines all `.result()` return values into a single object. Called automatically by `pipeline()`.
## Consumers
### `streamToArray(stream)` async
Collects all chunks from a stream into an array.
```javascript
import { pipejoin, streamToArray, createReadableStream } from '@datastream/core'
const river = pipejoin([createReadableStream(['a', 'b', 'c'])])
const output = await streamToArray(river)
// ['a', 'b', 'c']
```
### `streamToString(stream)` async
Concatenates all chunks into a single string.
```javascript
const output = await streamToString(river)
// 'abc'
```
### `streamToObject(stream)` async
Merges all chunks into a single object using `Object.assign`.
```javascript
const river = pipejoin([createReadableStream([{ a: 1 }, { b: 2 }])])
const output = await streamToObject(river)
// { a: 1, b: 2 }
```
### `streamToBuffer(stream)` async Node.js only
Collects all chunks into a `Buffer`.
## Stream Factories
### `createReadableStream(input, streamOptions)` Readable
Creates a Readable stream from various input types.
#### Input types
| Type | Behavior |
|------|----------|
| `string` | Chunked at `chunkSize` (default 16KB) |
| `Array` | Each element emitted as a chunk |
| `AsyncIterable` / `Iterable` | Each yielded value emitted as a chunk |
| `ArrayBuffer` | Chunked at `chunkSize` (Node.js only) |
#### Example
```javascript
import { createReadableStream } from '@datastream/core'
// From string — auto-chunked
const stream = createReadableStream('hello world')
// From array — one chunk per element
const stream = createReadableStream([{ a: 1 }, { a: 2 }])
// From async generator
async function* generate() {
yield 'chunk1'
yield 'chunk2'
}
const stream = createReadableStream(generate())
```
### `createReadableStreamFromString(input, streamOptions)` Readable
Creates a Readable stream from a string, chunking it at `chunkSize` (default 16KB). Useful when you need explicit control over string chunking separate from `createReadableStream`.
```javascript
import { createReadableStreamFromString } from '@datastream/core'
const stream = createReadableStreamFromString(largeString, { chunkSize: 4096 })
```
### `createReadableStreamFromArrayBuffer(input, streamOptions)` Readable
Creates a Readable stream from an `ArrayBuffer` or `Uint8Array`, chunking it at `chunkSize` (default 16KB).
```javascript
import { createReadableStreamFromArrayBuffer } from '@datastream/core'
const stream = createReadableStreamFromArrayBuffer(buffer, { chunkSize: 8192 })
```
### `createPassThroughStream(fn, flush?, streamOptions)` Transform (PassThrough)
Creates a stream that observes each chunk without modifying it. The chunk is automatically passed through.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `fn` | `(chunk) => void` | Called for each chunk, return value ignored |
| `flush` | `() => void` | Optional, called when stream ends |
| `streamOptions` | `object` | Stream configuration (supports `signal` for abort) |
#### Example
```javascript
import { createPassThroughStream } from '@datastream/core'
let total = 0
const counter = createPassThroughStream((chunk) => {
total += chunk.length
})
counter.result = () => ({ key: 'total', value: total })
```
### `createTransformStream(fn, flush?, streamOptions)` Transform
Creates a stream that modifies chunks. Use `enqueue` to emit output — you can emit zero, one, or many chunks per input.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `fn` | `(chunk, enqueue) => void` | Transform each chunk, call `enqueue(output)` to emit |
| `flush` | `(enqueue) => void` | Optional, emit final chunks when stream ends |
| `streamOptions` | `object` | Stream configuration (supports `signal` for abort) |
#### Example
```javascript
import { createTransformStream } from '@datastream/core'
// Filter: emit only matching chunks
const filter = createTransformStream((chunk, enqueue) => {
if (chunk.age > 18) enqueue(chunk)
})
// Expand: emit multiple chunks per input
const expand = createTransformStream((chunk, enqueue) => {
for (const item of chunk.items) {
enqueue(item)
}
})
```
### `createWritableStream(fn, close?, streamOptions)` Writable
Creates a stream that consumes chunks at the end of a pipeline.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `fn` | `(chunk) => void` | Called for each chunk |
| `close` | `() => void` | Optional, called when stream ends |
| `streamOptions` | `object` | Stream configuration (supports `signal` for abort) |
#### Example
```javascript
import { createWritableStream } from '@datastream/core'
const rows = []
const collector = createWritableStream((chunk) => {
rows.push(chunk)
})
```
## Utilities
### `isReadable(stream)`
Returns `true` if the stream is Readable.
### `isWritable(stream)`
Returns `true` if the stream is Writable.
### `makeOptions(options)`
Normalizes stream options for interoperability between Readable, Transform, and Writable streams.
| Option | Type | Description |
|--------|------|-------------|
| `highWaterMark` | `number` | Backpressure threshold |
| `chunkSize` | `number` | Chunking size hint |
| `signal` | `AbortSignal` | Abort signal |
### `timeout(ms, options)` async
Returns a promise that resolves after `ms` milliseconds. Supports `AbortSignal` cancellation.
```javascript
import { timeout } from '@datastream/core'
await timeout(1000) // wait 1 second
const controller = new AbortController()
await timeout(5000, { signal: controller.signal }) // cancellable
```
### `backpressureGauge(streams)` Node.js only
Measures pause/resume timing across streams. Useful for identifying bottlenecks.
```javascript
import { backpressureGauge } from '@datastream/core'
const metrics = backpressureGauge({ parse: parseStream, validate: validateStream })
// After pipeline completes:
// metrics.parse.total = { timestamp, duration }
// metrics.parse.timeline = [{ timestamp, duration }, ...]
```
// File: packages/csv/
---
title: csv
description: Parse, format, detect, clean, and coerce CSV data streams.
---
Parse, format, detect, clean, and coerce CSV data.
## Install
```bash
npm install @datastream/csv
```
## `csvDetectDelimitersStream` PassThrough
Auto-detects the delimiter, newline, quote, and escape characters from the first chunk of data.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` (1KB) | Minimum bytes to buffer before detecting |
| `resultKey` | `string` | `"csvDetectDelimiters"` | Key in pipeline result |
### Result
```javascript
{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }
```
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvDetectDelimitersStream } from '@datastream/csv'
const detect = csvDetectDelimitersStream()
const result = await pipeline([
createReadableStream('name\tage\r\nAlice\t30'),
detect,
])
console.log(result.csvDetectDelimiters)
// { delimiterChar: '\t', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }
```
## `csvDetectHeaderStream` Transform
Detects and strips the header row. Outputs data rows only (without the header).
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` (1KB) | Minimum bytes to buffer before detecting |
| `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function |
| `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function |
| `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function |
| `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function |
| `parser` | `function` | `csvQuotedParser` | Custom parser function |
| `resultKey` | `string` | `"csvDetectHeader"` | Key in pipeline result |
### Result
```javascript
{ header: ['name', 'age', 'city'] }
```
### Example
```javascript
import { csvDetectDelimitersStream, csvDetectHeaderStream } from '@datastream/csv'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
```
## `csvParseStream` Transform
Parses CSV text into arrays of field values (string arrays). Each output chunk is one row as `string[]`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `2097152` (2MB) | Input buffer size before first parse |
| `fieldMaxSize` | `number` | `16777216` (16MB) | Maximum size of a single field in bytes |
| `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function |
| `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function |
| `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function |
| `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function |
| `parser` | `function` | `csvQuotedParser` | Custom parser function |
| `resultKey` | `string` | `"csvErrors"` | Key in pipeline result |
#### Field size protection
A crafted CSV with an unterminated quoted field causes the parser to buffer the entire remaining input into a single field, consuming unbounded memory. An attacker can exploit this to exhaust process memory with a relatively small file. Setting `fieldMaxSize` caps per-field memory and aborts parsing with an error when exceeded. Always set this when parsing untrusted CSV input, and lower it from the default if your data has known field size bounds.
```javascript
// Parse with a 1MB field limit for untrusted input
csvParseStream({ fieldMaxSize: 1 * 1024 * 1024 })
```
### Result
Parse errors collected during processing:
```javascript
{
UnterminatedQuote: { id: 'UnterminatedQuote', message: 'Unterminated quoted field', idx: [5] }
}
```
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
const result = await pipeline([
createReadableStream('a,b,c\r\n1,2,3\r\n4,5,6'),
csvParseStream(),
])
// Chunks emitted: ['a','b','c'], ['1','2','3'], ['4','5','6']
```
## `csvFormatStream` Transform
Formats objects or arrays back to CSV text.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `header` | `boolean \| string[]` | — | `true` to auto-detect from object keys, or provide array of column names |
| `delimiterChar` | `string` | `","` | Delimiter character |
| `newlineChar` | `string` | `"\r\n"` | Newline character |
| `quoteChar` | `string` | `'"'` | Quote character |
| `escapeChar` | `string` | quoteChar | Escape character |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvFormatStream } from '@datastream/csv'
await pipeline([
createReadableStream([
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
]),
csvFormatStream({ header: true }),
])
// Output: "name","age"\r\n"Alice","30"\r\n"Bob","25"\r\n
```
## `csvRemoveEmptyRowsStream` Transform
Removes rows where all fields are empty strings.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `onErrorEnqueue` | `boolean` | `false` | If `true`, empty rows are kept in stream (but still tracked) |
| `resultKey` | `string` | `"csvRemoveEmptyRows"` | Key in pipeline result |
### Result
```javascript
{ EmptyRow: { id: 'EmptyRow', message: 'Row is empty', idx: [3, 7] } }
```
## `csvRemoveMalformedRowsStream` Transform
Removes rows with an incorrect number of fields compared to the first row or the provided header.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `headers` | `string[] \| () => string[]` | — | Expected header array, or lazy function. If not provided, uses first row's field count |
| `onErrorEnqueue` | `boolean` | `false` | If `true`, malformed rows are kept in stream |
| `resultKey` | `string` | `"csvRemoveMalformedRows"` | Key in pipeline result |
### Result
```javascript
{ MalformedRow: { id: 'MalformedRow', message: 'Row has incorrect number of fields', idx: [2] } }
```
## `csvCoerceValuesStream` Transform
Converts string field values to typed JavaScript values. Works on objects (use after `objectFromEntriesStream`).
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `columns` | `object` | — | Map of column names to types. Without this, auto-coercion is used |
| `resultKey` | `string` | `"csvCoerceValues"` | Key in pipeline result |
### Auto-coercion rules
| Input | Output |
|-------|--------|
| `""` | `null` |
| `"true"` / `"false"` | `true` / `false` |
| Numeric strings | `Number` |
| ISO 8601 date strings | `Date` |
| JSON strings (`{...}`, `[...]`) | Parsed object/array |
### Column types
When specifying `columns`, valid types are: `"number"`, `"boolean"`, `"null"`, `"date"`, `"json"`.
```javascript
csvCoerceValuesStream({
columns: { age: 'number', active: 'boolean', birthday: 'date' }
})
```
## `csvQuotedParser` / `csvUnquotedParser`
Standalone parser functions for use outside of streams. `csvUnquotedParser` is faster but does not handle quoted fields.
```javascript
import { csvQuotedParser } from '@datastream/csv'
const { rows, tail, numCols, idx, errors } = csvQuotedParser(
'a,b,c\r\n1,2,3\r\n',
{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"' },
true
)
// rows: [['a','b','c'], ['1','2','3']]
```
## Full pipeline example
Detect, parse, clean, coerce, and validate:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvRemoveEmptyRowsStream,
csvRemoveMalformedRowsStream,
csvCoerceValuesStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
createReadableStream(csvData),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
csvRemoveEmptyRowsStream(),
csvRemoveMalformedRowsStream({
headers: () => detectHeader.result().value.header,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
csvCoerceValuesStream(),
validateStream({ schema }),
])
```
// File: packages/digest/
---
title: digest
description: Compute cryptographic hash digests while streaming data.
---
Compute cryptographic hash digests while streaming data.
## Install
```bash
npm install @datastream/digest
```
## `digestStream` PassThrough async (browser)
Computes a hash digest of all data passing through. The stream is async in the browser (returns a Promise) because it uses `hash-wasm`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `algorithm` | `string` | — | Hash algorithm (see table below) |
| `resultKey` | `string` | `"digest"` | Key in pipeline result |
### Supported algorithms
| Algorithm | Node.js | Browser |
|-----------|---------|---------|
| `SHA2-256` | `node:crypto` | `hash-wasm` |
| `SHA2-384` | `node:crypto` | `hash-wasm` |
| `SHA2-512` | `node:crypto` | `hash-wasm` |
| `SHA3-256` | — | `hash-wasm` |
| `SHA3-384` | — | `hash-wasm` |
| `SHA3-512` | — | `hash-wasm` |
### Result
```javascript
'SHA2-256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
```
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
import { digestStream } from '@datastream/digest'
// Node.js — synchronous
const digest = digestStream({ algorithm: 'SHA2-256' })
const result = await pipeline([
fileReadStream({ path: './data.csv' }),
digest,
])
console.log(result)
// { digest: 'SHA2-256:e3b0c4429...' }
```
```javascript
// Browser — async, must await
const digest = await digestStream({ algorithm: 'SHA2-256' })
```
// File: packages/encrypt/
---
title: encrypt
description: Symmetric encryption and decryption streams for AES-GCM, AES-CTR, and ChaCha20-Poly1305.
---
Symmetric encryption and decryption streams. Defaults to AES-256-GCM (authenticated encryption).
## Install
```bash
npm install @datastream/encrypt
```
For ChaCha20-Poly1305 in browser environments:
```bash
npm install libsodium-wrappers
```
## `encryptStream` Transform async (browser)
Encrypts data passing through. On Node.js, wraps `node:crypto` for true streaming. In the browser, uses Web Crypto API (AES-GCM buffers; AES-CTR streams).
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `algorithm` | `string` | `"AES-256-GCM"` | Encryption algorithm |
| `key` | `Uint8Array\|Buffer` | — | 32-byte encryption key |
| `iv` | `Uint8Array\|Buffer` | auto-generated | Initialization vector |
| `aad` | `Uint8Array\|Buffer` | — | Additional Authenticated Data (GCM/ChaCha only) |
| `maxInputSize` | `number` | `67108864` | Max input bytes for web AES-GCM (64MB) |
### Supported algorithms
| Algorithm | Auth | Node.js | Browser | IV size |
|-----------|------|---------|---------|---------|
| `AES-256-GCM` | authTag | `node:crypto` (streaming) | `crypto.subtle` (buffered) | 12 bytes |
| `AES-256-CTR` | none | `node:crypto` (streaming) | `crypto.subtle` (streaming) | 16 bytes |
| `CHACHA20-POLY1305` | authTag | `node:crypto` (streaming) | `libsodium-wrappers` (buffered) | 12 bytes |
### Result
```javascript
{
algorithm: 'AES-256-GCM',
iv: Uint8Array(12),
authTag: Uint8Array(16) // only for GCM and ChaCha20
}
```
### Example
```javascript
import { createReadableStream, pipeline } from '@datastream/core'
import { encryptStream, generateEncryptionKey } from '@datastream/encrypt'
const key = generateEncryptionKey()
// Node.js — synchronous
const enc = encryptStream({ key })
const result = await pipeline([
createReadableStream(data),
enc,
])
const { iv, authTag } = result.encrypt
```
```javascript
// Browser — async, must await
const enc = await encryptStream({ key })
```
## `decryptStream` Transform async (browser)
Decrypts data encrypted by `encryptStream`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `algorithm` | `string` | `"AES-256-GCM"` | Must match encryption algorithm |
| `key` | `Uint8Array\|Buffer` | — | Same key used for encryption |
| `iv` | `Uint8Array\|Buffer` | — | IV from `encryptStream` result |
| `authTag` | `Uint8Array\|Buffer` | — | Auth tag from result (GCM/ChaCha) |
| `aad` | `Uint8Array\|Buffer` | — | Must match encryption AAD |
| `maxOutputSize` | `number` | — | Max decrypted output bytes |
### Example
```javascript
import { createReadableStream, pipeline, streamToString } from '@datastream/core'
import { decryptStream } from '@datastream/encrypt'
const dec = decryptStream({ key, iv, authTag })
const plaintext = await streamToString(
createReadableStream(ciphertext).pipe(dec)
)
```
## `generateEncryptionKey`
Generate a cryptographically random encryption key.
```javascript
import { generateEncryptionKey } from '@datastream/encrypt'
const key = generateEncryptionKey() // 32 bytes (AES-256)
const key = generateEncryptionKey({ bits: 128 }) // 16 bytes (AES-128)
```
## Patterns
### Encrypt with plaintext and ciphertext digests
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import { encryptStream } from '@datastream/encrypt'
import { digestStream } from '@datastream/digest'
const result = await pipeline([
fileReadStream({ path: './data.csv' }),
digestStream({ algorithm: 'SHA2-256', resultKey: 'plaintextDigest' }),
encryptStream({ key }),
digestStream({ algorithm: 'SHA2-256', resultKey: 'ciphertextDigest' }),
fileWriteStream({ path: './data.csv.enc' }),
])
// result.plaintextDigest — verify correct decryption
// result.ciphertextDigest — verify file integrity on disk
// result.encrypt — { algorithm, iv, authTag }
```
### Large file streaming with AES-CTR
AES-256-CTR supports true streaming on both Node.js and browser. Use it for large files where AES-GCM's buffering would cause memory issues. Pair with `digestStream` for integrity verification.
```javascript
import { encryptStream } from '@datastream/encrypt'
const enc = await encryptStream({ key, algorithm: 'AES-256-CTR' })
```
### With Additional Authenticated Data (AAD)
Bind encryption to metadata so ciphertext can't be reused in a different context.
```javascript
const aad = new TextEncoder().encode(JSON.stringify({ userId: '123' }))
const enc = encryptStream({ key, aad })
// Decryption must provide the same aad
const dec = decryptStream({ key, iv, authTag, aad })
```
// File: packages/fetch/
---
title: fetch
description: HTTP client streams with automatic pagination, rate limiting, and retry.
---
HTTP client streams with automatic pagination, rate limiting, and 429 retry.
## Install
```bash
npm install @datastream/fetch
```
## `fetchSetDefaults`
Set global defaults for all fetch streams. Mutates module-level state — not safe for concurrent multi-tenant use.
### Defaults
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `method` | `string` | `"GET"` | HTTP method |
| `headers` | `object` | `{ Accept: 'application/json', 'Accept-Encoding': 'br, gzip, deflate' }` | Request headers |
| `rateLimit` | `number` | `0.01` | Minimum seconds between requests (0.01 = 100/sec) |
| `dataPath` | `string` | — | Dot-path to data array in JSON response body |
| `nextPath` | `string` | — | Dot-path to next page URL in JSON response body |
| `qs` | `object` | `{}` | Default query string parameters |
| `offsetParam` | `string` | — | Query parameter name for offset pagination |
| `offsetAmount` | `number` | — | Increment per page for offset pagination |
### Example
```javascript
import { fetchSetDefaults } from '@datastream/fetch'
fetchSetDefaults({
headers: { Authorization: 'Bearer token123' },
rateLimit: 0.1, // 10 requests/sec
})
```
## `fetchReadableStream` Readable
Fetches data from one or more URLs and emits chunks. Automatically detects JSON responses and handles pagination.
Also exported as `fetchResponseStream`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `url` | `string` | — | Request URL |
| `method` | `string` | `"GET"` | HTTP method |
| `headers` | `object` | — | Request headers (merged with defaults) |
| `rateLimit` | `number` | `0.01` | Seconds between requests |
| `dataPath` | `string` | — | Dot-path to data in JSON body |
| `nextPath` | `string` | — | Dot-path to next URL in JSON body |
| `qs` | `object` | `{}` | Query string parameters |
| `offsetParam` | `string` | — | Query param for offset-based pagination |
| `offsetAmount` | `number` | — | Offset increment per page |
Pass an array of option objects to fetch from multiple URLs in sequence.
### Pagination strategies
**Link header** — automatically followed when present:
```
Link: ; rel="next"
```
**Body path** — use `nextPath` to extract the next URL from the JSON response:
```javascript
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'data',
nextPath: 'pagination.next_url',
})
```
**Offset** — use `offsetParam` and `offsetAmount` for numeric pagination:
```javascript
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'results',
offsetParam: 'offset',
offsetAmount: 100,
})
```
### 429 retry
When receiving a `429 Too Many Requests` response, the request is automatically retried with exponential backoff. If a `Retry-After` header is present, its value (in seconds) is used as the delay. Otherwise, the delay follows `min(1000 × 2^(attempt-1), 30000)` ms. Retries are capped at `retryMaxCount` (default: 10).
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `retryMaxCount` | `number` | `10` | Maximum retry attempts on 429 |
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fetchReadableStream } from '@datastream/fetch'
import { objectCountStream } from '@datastream/object'
const count = objectCountStream()
const result = await pipeline([
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'data',
nextPath: 'meta.next',
headers: { Authorization: 'Bearer token' },
}),
count,
])
console.log(result)
// { count: 450 }
```
### Multiple URLs
```javascript
fetchReadableStream([
{ url: 'https://api.example.com/users?status=active', dataPath: 'data' },
{ url: 'https://api.example.com/users?status=inactive', dataPath: 'data' },
])
```
## `fetchWritableStream` Writable, async
Streams data as the body of an HTTP request. Uses `duplex: "half"` for browser compatibility.
Also exported as `fetchRequestStream`.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { fetchWritableStream } from '@datastream/fetch'
import { csvFormatStream } from '@datastream/csv'
await pipeline([
createReadableStream(data),
csvFormatStream({ header: true }),
await fetchWritableStream({
url: 'https://api.example.com/upload',
method: 'PUT',
headers: { 'Content-Type': 'text/csv' },
}),
])
```
// File: packages/file/
---
title: file
description: File read and write streams for Node.js and the browser.
---
File read and write streams for Node.js and the browser.
## Install
```bash
npm install @datastream/file
```
## `fileReadStream` Readable
Reads a file as a stream.
- **Node.js**: Uses `fs.createReadStream`
- **Browser**: Uses `window.showOpenFilePicker` (File System Access API)
### Options
| Option | Type | Description |
|--------|------|-------------|
| `path` | `string` | File path (Node.js) |
| `basePath` | `string` | When provided, enforces that `path` resolves within `basePath`. Prevents path traversal and rejects symbolic links |
| `types` | `object[]` | File type filter for the file picker (see below) |
### Example — Node.js
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
await pipeline([
fileReadStream({ path: './data.csv' }),
])
```
### Example — Browser
```javascript
import { fileReadStream } from '@datastream/file'
const stream = await fileReadStream({
types: [{ accept: { 'text/csv': ['.csv'] } }],
})
```
## `fileWriteStream` Writable
Writes a stream to a file.
- **Node.js**: Uses `fs.createWriteStream`
- **Browser**: Uses `window.showSaveFilePicker` (File System Access API)
### Options
| Option | Type | Description |
|--------|------|-------------|
| `path` | `string` | File path (Node.js), suggested file name (Browser) |
| `basePath` | `string` | When provided, enforces that `path` resolves within `basePath`. Prevents path traversal and rejects symbolic links |
| `types` | `object[]` | File type filter |
### Example — Node.js
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { fileWriteStream } from '@datastream/file'
await pipeline([
createReadableStream('hello world'),
fileWriteStream({ path: './output.txt' }),
])
```
### Example — Browser
```javascript
import { fileWriteStream } from '@datastream/file'
const writable = await fileWriteStream({
path: 'output.csv',
types: [{ accept: { 'text/csv': ['.csv'] } }],
})
```
## File type filtering
The `types` option validates file extensions (Node.js) and configures the file picker dialog (Browser):
```javascript
const types = [
{
accept: {
'text/csv': ['.csv'],
'application/json': ['.json'],
},
},
]
```
On Node.js, if `types` is provided and the file extension doesn't match, an `"Invalid extension"` error is thrown.
## Security
When accepting file paths from user input, always use an absolute `path` or set `basePath` to prevent path traversal attacks (e.g., `../../etc/passwd`). Relative paths without a `basePath` constraint can resolve outside the intended directory.
`basePath` is opt-in. When provided, paths are resolved and checked with `path.resolve().startsWith(basePath)`, and symbolic links are rejected. When omitted, no path restriction is applied.
```javascript
// Restrict reads to a specific directory
fileReadStream({ path: userInput, basePath: '/data/uploads', types })
// Convenience helper for cwd-scoped reads
const safeFileRead = (path, types) =>
fileReadStream({ path, basePath: process.cwd(), types })
```
// File: packages/indexeddb/
---
title: indexeddb
description: IndexedDB read and write streams for the browser.
---
IndexedDB read and write streams for the browser.
## Install
```bash
npm install @datastream/indexeddb
```
## `indexedDBConnect`
Opens (or creates) an IndexedDB database. Re-exported from the [idb](https://www.npmjs.com/package/idb) library.
```javascript
import { indexedDBConnect } from '@datastream/indexeddb'
const db = await indexedDBConnect('my-database', 1, {
upgrade(db) {
db.createObjectStore('records', { keyPath: 'id' })
},
})
```
## `indexedDBReadStream` Readable async
Reads records from an IndexedDB object store as a stream.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` |
| `store` | `string` | — | Object store name |
| `index` | `string` | — | Optional index name |
| `key` | `IDBKeyRange` | — | Optional key range filter |
### Example
```javascript
import { pipeline } from '@datastream/core'
import { indexedDBConnect, indexedDBReadStream } from '@datastream/indexeddb'
import { objectCountStream } from '@datastream/object'
const db = await indexedDBConnect('my-database', 1)
const count = objectCountStream()
const result = await pipeline([
await indexedDBReadStream({ db, store: 'records' }),
count,
])
console.log(result)
// { count: 100 }
```
## `indexedDBWriteStream` Writable async
Writes records to an IndexedDB object store.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` |
| `store` | `string` | — | Object store name |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { indexedDBConnect, indexedDBWriteStream } from '@datastream/indexeddb'
const db = await indexedDBConnect('my-database', 1)
await pipeline([
createReadableStream([
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
]),
await indexedDBWriteStream({ db, store: 'records' }),
])
```
// File: packages/ipfs/
---
title: ipfs
description: IPFS get and add streams.
---
IPFS get and add streams.
## Install
```bash
npm install @datastream/ipfs
```
## `ipfsGetStream` Readable async
Retrieves content from IPFS by CID.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `node` | `IPFS` | — | IPFS node instance |
| `cid` | `string` | — | Content identifier |
### Example
```javascript
import { pipeline } from '@datastream/core'
import { ipfsGetStream } from '@datastream/ipfs'
await pipeline([
await ipfsGetStream({ node: ipfsNode, cid: 'QmHash...' }),
])
```
## `ipfsAddStream` PassThrough async
Adds content to IPFS and returns the CID in `.result()`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `node` | `IPFS` | — | IPFS node instance |
| `resultKey` | `string` | `"cid"` | Key in pipeline result |
### Result
The CID of the stored content.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { ipfsAddStream } from '@datastream/ipfs'
const result = await pipeline([
createReadableStream('Hello IPFS!'),
await ipfsAddStream({ node: ipfsNode }),
])
console.log(result)
// { cid: 'QmHash...' }
```
// File: packages/json/
---
title: json
description: JSON and NDJSON (JSON Lines) parsing and formatting streams.
---
JSON and NDJSON (Newline-Delimited JSON / JSON Lines) parsing and formatting streams.
## Install
```bash
npm install @datastream/json
```
## `ndjsonParseStream` Transform
Parses NDJSON (one JSON value per line) into individual JavaScript values. Splits on `\n`, parses each line with `JSON.parse`, and skips empty lines.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `maxBufferSize` | `number` | `16777216` (16MB) | Maximum buffer size for incomplete lines |
| `resultKey` | `string` | `"jsonErrors"` | Key in pipeline result |
### Result
Parse errors collected during processing:
```javascript
{
ParseError: { id: 'ParseError', message: 'Invalid JSON', idx: [3, 7] }
}
```
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { ndjsonParseStream } from '@datastream/json'
const parse = ndjsonParseStream()
const result = await pipeline([
createReadableStream('{"name":"Alice"}\n{"name":"Bob"}\n'),
parse,
])
console.log(result.jsonErrors)
// {}
```
## `ndjsonFormatStream` Transform
Formats objects as NDJSON text (one `JSON.stringify` per line). Batches output for throughput.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `space` | `number \| string` | — | Passed to `JSON.stringify` for pretty-printing each line |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { ndjsonFormatStream } from '@datastream/json'
await pipeline([
createReadableStream([
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
]),
ndjsonFormatStream(),
])
// Output: {"name":"Alice","age":30}\n{"name":"Bob","age":25}\n
```
## `jsonParseStream` Transform
Parses a streaming JSON array (`[{...},{...}]`) into individual elements. Uses a state machine to track nesting depth, strings, and escapes across chunk boundaries — no external dependencies.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `maxBufferSize` | `number` | `16777216` (16MB) | Maximum buffer size for incomplete elements |
| `maxValueSize` | `number` | `16777216` (16MB) | Maximum size of a single JSON element |
| `resultKey` | `string` | `"jsonErrors"` | Key in pipeline result |
#### Buffer size protection
A JSON array with deeply nested or very large objects can cause the parser to buffer significant amounts of data. Setting `maxBufferSize` caps total memory and `maxValueSize` caps per-element memory. Lower these from defaults when parsing untrusted input.
```javascript
// Parse with a 1MB buffer limit for untrusted input
jsonParseStream({ maxBufferSize: 1 * 1024 * 1024 })
```
### Result
Parse errors collected during processing:
```javascript
{
ParseError: { id: 'ParseError', message: 'Invalid JSON', idx: [2] }
}
```
### Example
```javascript
import { pipeline, createReadableStream, streamToArray } from '@datastream/core'
import { jsonParseStream } from '@datastream/json'
const streams = [
createReadableStream('[{"name":"Alice"},{"name":"Bob"}]'),
jsonParseStream(),
]
const output = await streamToArray(pipejoin(streams))
// [{ name: 'Alice' }, { name: 'Bob' }]
```
## `jsonFormatStream` Transform
Formats objects as a JSON array string (`[{...},{...}]`). Emits `[` with the first element, `,\n` between elements, and `]` on flush. Outputs `[]` if no elements.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `space` | `number \| string` | — | Passed to `JSON.stringify` for pretty-printing |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { jsonFormatStream } from '@datastream/json'
await pipeline([
createReadableStream([
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
]),
jsonFormatStream({ space: 2 }),
])
// Output: [{\n "name": "Alice",\n "age": 30\n},\n{\n "name": "Bob",\n "age": 25\n}\n]
```
## Full pipeline example
Fetch NDJSON API, validate, and write as JSON array:
```javascript
import { pipeline } from '@datastream/core'
import { fetchResponseStream } from '@datastream/fetch'
import { ndjsonParseStream } from '@datastream/json'
import { validateStream } from '@datastream/validate'
import { objectCountStream } from '@datastream/object'
import { jsonFormatStream } from '@datastream/json'
import { fileWriteStream } from '@datastream/file'
const count = objectCountStream()
const parse = ndjsonParseStream()
const result = await pipeline([
fetchResponseStream({ url: 'https://api.example.com/data.ndjson' }),
parse,
validateStream({ schema }),
count,
jsonFormatStream(),
fileWriteStream({ path: './output.json' }),
])
console.log(result.count) // number of objects processed
console.log(result.jsonErrors) // any parse errors
```
// File: packages/object/
---
title: object
description: Transform, reshape, filter, and aggregate object streams.
---
Transform, reshape, filter, and aggregate object streams.
## Install
```bash
npm install @datastream/object
```
## `objectReadableStream` Readable
Creates a Readable stream from an array of objects.
```javascript
import { objectReadableStream } from '@datastream/object'
const stream = objectReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }])
```
## `objectCountStream` PassThrough
Counts the number of chunks that pass through.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `resultKey` | `string` | `"count"` | Key in pipeline result |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'
const count = objectCountStream()
const result = await pipeline([
createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]),
count,
])
console.log(result)
// { count: 3 }
```
## `objectFromEntriesStream` Transform
Converts arrays to objects using column names. Commonly used with `csvParseStream` output.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[] \| () => string[]` | — | Column names, or lazy function |
### Example
```javascript
import { objectFromEntriesStream } from '@datastream/object'
// Converts ['Alice', '30', 'Toronto'] → { name: 'Alice', age: '30', city: 'Toronto' }
objectFromEntriesStream({ keys: ['name', 'age', 'city'] })
// With lazy keys from csvDetectHeaderStream
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
})
```
## `objectPickStream` Transform
Keeps only the specified keys on each object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Keys to keep |
### Example
```javascript
import { objectPickStream } from '@datastream/object'
// { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice', age: 30 }
objectPickStream({ keys: ['name', 'age'] })
```
## `objectOmitStream` Transform
Removes the specified keys from each object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Keys to remove |
### Example
```javascript
import { objectOmitStream } from '@datastream/object'
// { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice' }
objectOmitStream({ keys: ['age', 'city'] })
```
## `objectKeyMapStream` Transform
Renames keys on each object. Unmapped keys are kept as-is.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `object` | — | Map of `{ oldKey: 'newKey' }` |
### Example
```javascript
import { objectKeyMapStream } from '@datastream/object'
// { firstName: 'Alice', lastName: 'Smith' } → { first_name: 'Alice', last_name: 'Smith' }
objectKeyMapStream({ keys: { firstName: 'first_name', lastName: 'last_name' } })
```
## `objectValueMapStream` Transform
Maps values for a specific key using a lookup table.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `key` | `string` | — | Key whose value to map |
| `values` | `object` | — | Map of `{ oldValue: newValue }` |
### Example
```javascript
import { objectValueMapStream } from '@datastream/object'
// { status: 'A' } → { status: 'Active' }
objectValueMapStream({ key: 'status', values: { A: 'Active', I: 'Inactive' } })
```
## `objectKeyJoinStream` Transform
Joins multiple keys into new combined keys.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `object` | — | Map of `{ newKey: ['key1', 'key2'] }` |
| `separator` | `string` | — | Separator for joined values |
| `isNestedObject` | `boolean` | `false` | Use `structuredClone` instead of shallow spread for cloning |
### Example
```javascript
import { objectKeyJoinStream } from '@datastream/object'
// { first: 'Alice', last: 'Smith', age: 30 } → { name: 'Alice Smith', age: 30 }
objectKeyJoinStream({ keys: { name: ['first', 'last'] }, separator: ' ' })
```
## `objectKeyValueStream` Transform
Creates a key-value pair object from two fields.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `key` | `string` | — | Field to use as the key |
| `value` | `string` | — | Field to use as the value |
### Example
```javascript
import { objectKeyValueStream } from '@datastream/object'
// { code: 'CA', country: 'Canada' } → { CA: 'Canada' }
objectKeyValueStream({ key: 'code', value: 'country' })
```
## `objectKeyValuesStream` Transform
Creates a key-values pair object — the key comes from a field, the value is a subset of the object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `key` | `string` | — | Field to use as the key |
| `values` | `string[]` | — | Fields to include in value object. If omitted, entire object is used |
### Example
```javascript
import { objectKeyValuesStream } from '@datastream/object'
// { id: 'u1', name: 'Alice', age: 30 } → { u1: { name: 'Alice', age: 30 } }
objectKeyValuesStream({ key: 'id', values: ['name', 'age'] })
```
## `objectBatchStream` Transform
Groups consecutive objects with the same key values into arrays. Use with `objectPivotLongToWideStream`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Keys to group by |
### Example
```javascript
import { objectBatchStream } from '@datastream/object'
// Input: [{id:1,k:'a'}, {id:1,k:'b'}, {id:2,k:'c'}]
// Output: [[{id:1,k:'a'}, {id:1,k:'b'}]], [[{id:2,k:'c'}]]
objectBatchStream({ keys: ['id'] })
```
## `objectPivotLongToWideStream` Transform
Pivots batched arrays from long to wide format. Must be used after `objectBatchStream`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Pivot key columns |
| `valueParam` | `string` | — | Column containing the values |
| `delimiter` | `string` | `" "` | Separator for combined key names |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectBatchStream, objectPivotLongToWideStream } from '@datastream/object'
// Input: [{id:1, metric:'temp', value:20}, {id:1, metric:'humidity', value:60}]
// Output: {id:1, temp:20, humidity:60}
await pipeline([
createReadableStream(data),
objectBatchStream({ keys: ['id'] }),
objectPivotLongToWideStream({ keys: ['metric'], valueParam: 'value' }),
])
```
## `objectPivotWideToLongStream` Transform
Pivots wide format to long format. Emits multiple chunks per input object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Columns to unpivot |
| `keyParam` | `string` | `"keyParam"` | Name for the new key column |
| `valueParam` | `string` | `"valueParam"` | Name for the new value column |
| `isNestedObject` | `boolean` | `false` | Use `structuredClone` instead of shallow spread for cloning |
### Example
```javascript
import { objectPivotWideToLongStream } from '@datastream/object'
// Input: { id: 1, temp: 20, humidity: 60 }
// Output: { id: 1, keyParam: 'temp', valueParam: 20 }, { id: 1, keyParam: 'humidity', valueParam: 60 }
objectPivotWideToLongStream({
keys: ['temp', 'humidity'],
keyParam: 'metric',
valueParam: 'value',
})
```
## `objectSkipConsecutiveDuplicatesStream` Transform
Skips consecutive duplicate objects. Uses shallow equality by default (compares top-level values with `===`).
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `isNestedObject` | `boolean` | `false` | Use `JSON.stringify` for deep comparison instead of shallow equality |
### Example
```javascript
import { objectSkipConsecutiveDuplicatesStream } from '@datastream/object'
// Shallow (default, fast) — compares top-level values
// Input: [{a:1}, {a:1}, {a:2}, {a:1}]
// Output: [{a:1}, {a:2}, {a:1}]
objectSkipConsecutiveDuplicatesStream()
// Deep — compares nested objects via JSON.stringify
// Input: [{a:{b:1}}, {a:{b:1}}, {a:{b:2}}]
// Output: [{a:{b:1}}, {a:{b:2}}]
objectSkipConsecutiveDuplicatesStream({ isNestedObject: true })
```
// File: packages/string/
---
title: string
description: String manipulation streams for splitting, replacing, counting, and measuring text.
---
String manipulation streams — split, replace, count, and measure text data.
## Install
```bash
npm install @datastream/string
```
## `stringReadableStream` Readable
Creates a Readable stream from a string input.
```javascript
import { stringReadableStream } from '@datastream/string'
const stream = stringReadableStream('hello world')
```
## `stringLengthStream` PassThrough
Counts the total character length of all chunks.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `resultKey` | `string` | `"length"` | Key in pipeline result |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { stringLengthStream } from '@datastream/string'
const length = stringLengthStream()
const result = await pipeline([
createReadableStream('hello world'),
length,
])
console.log(result)
// { length: 11 }
```
## `stringCountStream` PassThrough
Counts occurrences of a substring across all chunks.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `substr` | `string` | — | Substring to count |
| `resultKey` | `string` | `"count"` | Key in pipeline result |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { stringCountStream } from '@datastream/string'
const count = stringCountStream({ substr: '\n' })
const result = await pipeline([
createReadableStream('line1\nline2\nline3'),
count,
])
console.log(result)
// { count: 2 }
```
## `stringSplitStream` Transform
Splits streaming text by a separator, emitting one chunk per segment. Handles splits that cross chunk boundaries.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `separator` | `string` | — | String to split on |
### Example
```javascript
import { pipeline, createReadableStream, streamToArray, pipejoin } from '@datastream/core'
import { stringSplitStream } from '@datastream/string'
const river = pipejoin([
createReadableStream('alice,bob,charlie'),
stringSplitStream({ separator: ',' }),
])
const output = await streamToArray(river)
// ['alice', 'bob', 'charlie']
```
## `stringReplaceStream` Transform
Replaces pattern matches in streaming text. Handles replacements that span chunk boundaries.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `pattern` | `string \| RegExp` | — | Pattern to search for |
| `replacement` | `string` | — | Replacement string |
### Example
```javascript
import { stringReplaceStream } from '@datastream/string'
stringReplaceStream({ pattern: /\t/g, replacement: ',' })
```
## `stringMinimumFirstChunkSize` Transform
Buffers data until the first chunk meets a minimum size, then passes all subsequent chunks through unchanged.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` (1KB) | Minimum first chunk size in characters |
## `stringMinimumChunkSize` Transform
Buffers every chunk to meet a minimum size before emitting. Unlike `stringMinimumFirstChunkSize` which only buffers the first chunk then passes through, this continues buffering all subsequent chunks that are smaller than `chunkSize`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` (1KB) | Minimum chunk size in characters |
## `stringSkipConsecutiveDuplicates` Transform
Skips consecutive duplicate string chunks.
```javascript
import { stringSkipConsecutiveDuplicates } from '@datastream/string'
// Input chunks: 'a', 'a', 'b', 'a' → Output: 'a', 'b', 'a'
```
// File: packages/validate/
---
title: validate
description: JSON Schema validation for object streams using Ajv.
---
JSON Schema validation for object streams using Ajv.
## Install
```bash
npm install @datastream/validate
```
## `validateStream` Transform
Validates each object chunk against a JSON Schema. Invalid rows are dropped by default; errors are collected in `.result()`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `schema` | `object \| function` | — | JSON Schema object, or pre-compiled validation function |
| `idxStart` | `number` | `0` | Starting row index for error tracking |
| `onErrorEnqueue` | `boolean` | `false` | If `true`, invalid rows are kept in the stream |
| `allowCoerceTypes` | `boolean` | `true` | If `false`, emits original data without Ajv coercion |
| `resultKey` | `string` | `"validate"` | Key in pipeline result |
### Result
Errors grouped by schema path:
```javascript
{
'#/required': {
id: '#/required',
keys: ['email'],
message: "must have required property 'email'",
idx: [3, 7, 15]
},
'#/properties/age/type': {
id: '#/properties/age/type',
keys: ['age'],
message: 'must be number',
idx: [5]
}
}
```
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { validateStream } from '@datastream/validate'
const schema = {
type: 'object',
required: ['name', 'age'],
properties: {
name: { type: 'string', minLength: 1 },
age: { type: 'number', minimum: 0 },
email: { type: 'string', format: 'email' },
},
additionalProperties: false,
}
const result = await pipeline([
createReadableStream([
{ name: 'Alice', age: 30, email: 'alice@example.com' },
{ name: '', age: -1 },
{ name: 'Bob', age: 25 },
]),
validateStream({ schema }),
])
console.log(result.validate)
// Errors for row 1 (name minLength, age minimum)
```
### Keep invalid rows
```javascript
validateStream({ schema, onErrorEnqueue: true })
```
### Pre-compiled schema
For better cold-start performance, pre-compile your schema during the build step:
```javascript
import { transpileSchema, validateStream } from '@datastream/validate'
const validate = transpileSchema(schema)
validateStream({ schema: validate })
```
## `transpileSchema`
Pre-compiles a JSON Schema into a validation function using Ajv.
### Ajv defaults
| Option | Default | Description |
|--------|---------|-------------|
| `strict` | `true` | Strict mode |
| `coerceTypes` | `true` | Coerce types to match schema |
| `allErrors` | `true` | Report all errors, not just the first |
| `useDefaults` | `"empty"` | Apply defaults for missing/empty values |
| `messages` | `true` | Include error messages |
```javascript
import { transpileSchema } from '@datastream/validate'
const validate = transpileSchema(schema, { coerceTypes: false })
```
// File: quick-start/
---
title: Quick Start
description: Install datastream and build your first stream pipeline in minutes.
---
## Install
```bash
npm install @datastream/core
```
Add packages as needed:
```bash
npm install @datastream/csv @datastream/validate @datastream/compress @datastream/file
```
## Your first pipeline
Read a CSV string, parse it, and collect the results:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
import { objectFromEntriesStream, objectCountStream } from '@datastream/object'
const csvData = 'name,age,city\r\nAlice,30,Toronto\r\nBob,25,Vancouver\r\nCharlie,35,Montreal'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const count = objectCountStream()
const result = await pipeline([
createReadableStream(csvData),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
count,
])
console.log(result)
// { csvDetectDelimiters: { delimiterChar: ',', ... }, csvDetectHeader: { header: ['name','age','city'] }, count: 3 }
```
## Add validation
Extend the pipeline with schema validation using JSON Schema:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream } from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
const schema = {
type: 'object',
required: ['name', 'age', 'city'],
properties: {
name: { type: 'string' },
age: { type: 'number' },
city: { type: 'string' },
},
additionalProperties: false,
}
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
createReadableStream(csvData),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
validateStream({ schema }),
])
console.log(result)
// { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, validate: {} }
```
## Add file I/O
Read from and write to files (Node.js):
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvFormatStream } from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream } from '@datastream/compress'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
fileReadStream({ path: './input.csv' }),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
validateStream({ schema }),
csvFormatStream({ header: true }),
gzipCompressStream(),
fileWriteStream({ path: './output.csv.gz' }),
])
```
## Next steps
- Learn about [Core Concepts](/docs/core-concepts) — stream types, pipeline patterns, and error handling
- Browse [Recipes](/docs/recipes) for complete real-world examples
- Explore the [core](/docs/packages/core) package API reference