// File: --- title: Introduction description: A collection of commonly used stream patterns for the Web Streams API and Node.js Streams. slug: / --- ## What is datastream Datastream is a collection of commonly used **stream patterns** for the **Web Streams API** and **Node.js Streams**. If you're iterating over an array more than once, it's time to use streams. ## A quick example Code is better than 10,000 words, so let's jump into an example. Let's assume you want to read a CSV file, validate the data, and compress it: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' import { validateStream } from '@datastream/validate' import { gzipCompressStream } from '@datastream/compress' const streams = [ createReadableStream(csvData), csvParseStream({ header: true }), validateStream(schema), gzipCompressStream() ] await pipeline(streams) ``` ## Stream types - **Readable**: The start of a pipeline that injects data into a stream. - **PassThrough**: Does not modify the data, but listens and prepares a result that can be retrieved. - **Transform**: Modifies data as it passes through. - **Writable**: The end of a pipeline that stores data from the stream. ## Setup ```bash npm install @datastream/core @datastream/{module} ``` ## Why streams? Streams allow you to process data incrementally, without loading everything into memory at once. This is essential for: - **Large files**: Process gigabytes of data with constant memory usage - **Real-time data**: Handle data as it arrives, not after it's all collected - **Composability**: Chain simple operations into complex data pipelines - **Performance**: Start processing before all data is available Datastream provides ready-made stream patterns so you can focus on your data flow instead of stream plumbing. ## Next steps Ready to dive in? Head to the [Quick Start](/docs/quick-start) guide. // File: core-concepts/ --- title: Core Concepts description: Learn about stream types, pipeline orchestration, and naming conventions in datastream. --- ## Stream types Datastream uses four stream types. Each package export follows a naming convention that tells you the type: | Type | Naming pattern | Purpose | |------|---------------|---------| | **Readable** | `*ReadableStream`, `*ReadStream` | Injects data into a pipeline — files, databases, APIs | | **PassThrough** | `*CountStream`, `*LengthStream`, `*DetectStream` | Observes data without modifying it, collects metrics via `.result()` | | **Transform** | `*ParseStream`, `*FormatStream`, `*CompressStream` | Modifies data as it passes through | | **Writable** | `*WriteStream`, `*PutItemStream` | Consumes data at the end — files, databases, APIs | ## `pipeline()` vs `pipejoin()` ### `pipeline(streams[], streamOptions)` Connects all streams, waits for completion, and returns combined results from all PassThrough streams: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectCountStream } from '@datastream/object' import { stringLengthStream } from '@datastream/string' const count = objectCountStream() const length = stringLengthStream() const result = await pipeline([ createReadableStream(['hello', 'world']), count, length, ]) console.log(result) // { count: 2, length: 10 } ``` If the last stream is Readable or Transform (no Writable at the end), `pipeline` automatically appends a no-op Writable so the pipeline completes. ### `pipejoin(streams[])` Connects streams and returns the resulting stream — use this when you want to consume the output manually: ```javascript import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core' const streams = [ createReadableStream([1, 2, 3]), createTransformStream((n, enqueue) => enqueue(n * 2)), ] const river = pipejoin(streams) const output = await streamToArray(river) // [2, 4, 6] ``` ## The `.result()` pattern PassThrough streams collect metrics without modifying data. After the pipeline completes, retrieve results: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' import { objectCountStream } from '@datastream/object' import { digestStream } from '@datastream/digest' const count = objectCountStream() const digest = await digestStream({ algorithm: 'SHA2-256' }) const result = await pipeline([ createReadableStream(data), digest, csvParseStream(), count, ]) console.log(result) // { digest: 'SHA2-256:abc123...', count: 1000 } ``` Each PassThrough stream returns `{ key, value }` from its `.result()` method. `pipeline()` combines them into a single object. You can customize the key with the `resultKey` option: ```javascript const count = objectCountStream({ resultKey: 'rowCount' }) // result: { rowCount: 1000 } ``` ## Stream options All stream factory functions accept a `streamOptions` parameter: | Option | Type | Default | Description | |--------|------|---------|-------------| | `highWaterMark` | `number` | — | Backpressure threshold — how many chunks to buffer before pausing | | `chunkSize` | `number` | `16384` (16KB) | Size hint for chunking strategies | | `signal` | `AbortSignal` | — | Signal to abort the pipeline | ```javascript const controller = new AbortController() await pipeline([ createReadableStream(data), csvParseStream(), ], { signal: controller.signal }) // Abort from elsewhere: controller.abort() ``` ## Error handling ### AbortSignal Use `AbortController` to cancel pipelines: ```javascript const controller = new AbortController() setTimeout(() => controller.abort(), 5000) // 5s timeout try { await pipeline(streams, { signal: controller.signal }) } catch (e) { if (e.name === 'AbortError') { console.log('Pipeline was cancelled') } } ``` ### Validation errors Streams like `validateStream` and CSV cleaning streams collect errors in `.result()` rather than throwing, so the pipeline continues processing: ```javascript const result = await pipeline([ createReadableStream(data), csvParseStream(), validateStream({ schema }), ]) console.log(result.validate) // { '#/required/name': { id: '#/required/name', keys: ['name'], message: '...', idx: [3, 7] } } ``` Invalid rows are dropped by default. Set `onErrorEnqueue: true` to keep them in the stream. ## Lazy options Many CSV streams accept functions instead of values for options. This lets you wire up detection results that aren't available until runtime: ```javascript const detect = csvDetectDelimitersStream() csvParseStream({ delimiterChar: () => detect.result().value.delimiterChar, }) ``` The function is called when the stream first needs the value, not at creation time. // File: examples/api-pagination-dynamodb/ --- title: API pagination to DynamoDB description: Fetch paginated API data and write to DynamoDB with datastream. --- Fetch paginated API data and write to DynamoDB: ```javascript import { pipeline } from '@datastream/core' import { fetchReadableStream } from '@datastream/fetch' import { objectCountStream } from '@datastream/object' import { awsDynamoDBPutItemStream } from '@datastream/aws' import { createTransformStream } from '@datastream/core' const count = objectCountStream() const result = await pipeline([ fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'data', nextPath: 'pagination.next_url', }), createTransformStream((item, enqueue) => { enqueue({ PK: { S: `USER#${item.id}` }, SK: { S: `PROFILE` }, name: { S: item.name }, email: { S: item.email }, }) }), count, awsDynamoDBPutItemStream({ TableName: 'Users' }), ]) console.log(result) // { count: 450 } ``` // File: examples/browser-file-processing/ --- title: Browser file processing description: Process files in the browser using the File System Access API and datastream. --- Use the File System Access API to process files in the browser: ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvFormatStream, } from '@datastream/csv' import { objectFromEntriesStream, objectCountStream } from '@datastream/object' const types = [{ accept: { 'text/csv': ['.csv'] } }] const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const count = objectCountStream() const result = await pipeline([ await fileReadStream({ types }), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), count, csvFormatStream({ header: true }), await fileWriteStream({ path: 'output.csv', types }), ]) console.log(result) // { count: 500 } ``` // File: examples/checksum-compress/ --- title: Checksum and compress description: Calculate a digest while compressing and writing a file with datastream. --- Calculate a digest while compressing and writing a file: ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { digestStream } from '@datastream/digest' import { gzipCompressStream } from '@datastream/compress' const digest = await digestStream({ algorithm: 'SHA2-256' }) const result = await pipeline([ fileReadStream({ path: './data.csv' }), digest, gzipCompressStream(), fileWriteStream({ path: './data.csv.gz' }), ]) console.log(result) // { digest: 'SHA2-256:e3b0c44298fc1c14...' } ``` // File: examples/csv-etl/ --- title: CSV ETL — validate and compress description: Read, detect, parse, validate, re-format, compress, and write CSV files with datastream. --- Read a CSV file, detect format, parse, validate, re-format, compress, and write: ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvRemoveEmptyRowsStream, csvRemoveMalformedRowsStream, csvCoerceValuesStream, csvFormatStream, } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' import { gzipCompressStream } from '@datastream/compress' const schema = { type: 'object', required: ['id', 'name', 'email'], properties: { id: { type: 'number' }, name: { type: 'string', minLength: 1 }, email: { type: 'string', format: 'email' }, }, additionalProperties: false, } const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ fileReadStream({ path: './input.csv' }), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), csvRemoveEmptyRowsStream(), csvRemoveMalformedRowsStream({ headers: () => detectHeader.result().value.header, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), csvCoerceValuesStream(), validateStream({ schema }), csvFormatStream({ header: true }), gzipCompressStream(), fileWriteStream({ path: './output.csv.gz' }), ]) console.log(result) // { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, csvRemoveEmptyRows: {...}, csvRemoveMalformedRows: {...}, validate: {} } ``` // File: examples/parquet/ --- title: Parquet — read and write description: Read and write Apache Parquet files using datastream with hyparquet and parquet-wasm. --- ### Read Parquet file into CSV Read a Parquet file from S3, extract specific columns, and write as CSV: ```javascript import { pipeline, createReadableStream, streamToBuffer } from '@datastream/core' import { awsS3GetObjectStream } from '@datastream/aws' import { objectReadableStream } from '@datastream/object' import { csvFormatStream } from '@datastream/csv' import { fileWriteStream } from '@datastream/file' import { parquetRead } from 'hyparquet' const buffer = await streamToBuffer( await awsS3GetObjectStream({ Bucket: 'data-lake', Key: 'users.parquet' }), ) const rows = [] await parquetRead({ file: { byteLength: buffer.byteLength, read: (start, end) => buffer.slice(start, end), }, columns: ['id', 'name', 'email'], onComplete: (data) => rows.push(...data), }) const result = await pipeline([ objectReadableStream(rows), csvFormatStream({ header: true }), fileWriteStream({ path: './users.csv' }), ]) ``` ### Write CSV to Parquet Read a CSV file, parse into objects, and write as Parquet to S3: ```javascript import { pipeline, streamToArray } from '@datastream/core' import { fileReadStream } from '@datastream/file' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvCoerceValuesStream, } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { createReadableStreamFromArrayBuffer } from '@datastream/core' import { awsS3PutObjectStream } from '@datastream/aws' import { tableFromJSON, tableToIPC } from 'apache-arrow' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const rows = await streamToArray( await pipeline([ fileReadStream({ path: './users.csv' }), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), csvCoerceValuesStream(), ]), ) const table = tableFromJSON(rows) const { writeParquet } = await import('parquet-wasm') const parquetBuffer = writeParquet(table) await pipeline([ createReadableStreamFromArrayBuffer(parquetBuffer), awsS3PutObjectStream({ Bucket: 'data-lake', Key: 'users.parquet' }), ]) ``` // File: examples/s3-transform/ --- title: S3 to S3 — transform and re-upload description: Read from S3, transform CSV data, and write back with datastream. --- Read from S3, parse CSV, validate, re-format, and write back: ```javascript import { pipeline } from '@datastream/core' import { awsS3GetObjectStream, awsS3PutObjectStream } from '@datastream/aws' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvRemoveMalformedRowsStream, csvFormatStream, } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'input.csv.gz' }), gzipDecompressStream(), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), csvRemoveMalformedRowsStream({ headers: () => detectHeader.result().value.header, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), validateStream({ schema }), csvFormatStream({ header: true }), gzipCompressStream(), awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }), ]) ``` // File: packages/aws/ --- title: aws description: AWS service streams for CloudWatch Logs, DynamoDB, Kinesis, Lambda, S3, SNS, and SQS. --- AWS service streams for CloudWatch Logs, DynamoDB, Kinesis, Lambda, S3, SNS, and SQS. Node.js only. ## Install ```bash npm install @datastream/aws ``` Requires the corresponding AWS SDK v3 client packages: ```bash npm install @aws-sdk/client-cloudwatch-logs npm install @aws-sdk/client-dynamodb npm install @aws-sdk/client-kinesis npm install @aws-sdk/client-lambda npm install @aws-sdk/client-s3 @aws-sdk/lib-storage npm install @aws-sdk/client-sns npm install @aws-sdk/client-sqs ``` ## CloudWatch Logs ### `awsCloudWatchLogsSetClient` Mutates module-level state — not safe for concurrent multi-tenant use. ```javascript import { CloudWatchLogsClient } from '@aws-sdk/client-cloudwatch-logs' import { awsCloudWatchLogsSetClient } from '@datastream/aws' awsCloudWatchLogsSetClient(new CloudWatchLogsClient({ region: 'us-east-1' })) ``` ### `awsCloudWatchLogsGetLogEventsStream` Readable async Gets log events from a CloudWatch Logs log stream. Auto-paginates until no new events are returned. #### Options Accepts `GetLogEventsCommand` parameters plus: | Option | Type | Default | Description | |--------|------|---------|-------------| | `logGroupName` | `string` | — | Log group name | | `logStreamName` | `string` | — | Log stream name | | `pollingActive` | `boolean` | `false` | Keep polling for new events | | `pollingDelay` | `number` | `1000` | Delay (ms) between polls when no new events | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsCloudWatchLogsGetLogEventsStream } from '@datastream/aws' await pipeline([ createReadableStream(await awsCloudWatchLogsGetLogEventsStream({ logGroupName: '/aws/lambda/my-function', logStreamName: 'stream-id', })), ]) ``` ### `awsCloudWatchLogsFilterLogEventsStream` Readable async Filters log events across log streams in a log group. Auto-paginates through all matching results. #### Options Accepts `FilterLogEventsCommand` parameters: | Option | Type | Description | |--------|------|-------------| | `logGroupName` | `string` | Log group name | | `filterPattern` | `string` | CloudWatch Logs filter pattern | | `startTime` | `number` | Start of time range (epoch ms) | | `endTime` | `number` | End of time range (epoch ms) | #### Example ```javascript import { awsCloudWatchLogsFilterLogEventsStream } from '@datastream/aws' const events = await awsCloudWatchLogsFilterLogEventsStream({ logGroupName: '/aws/lambda/my-function', filterPattern: 'ERROR', }) ``` ## S3 ### `awsS3SetClient` Set a custom S3 client. By default, FIPS endpoints are enabled for US and CA regions. Mutates module-level state — not safe for concurrent multi-tenant use. ```javascript import { S3Client } from '@aws-sdk/client-s3' import { awsS3SetClient } from '@datastream/aws' awsS3SetClient(new S3Client({ region: 'eu-west-1' })) ``` ### `awsS3GetObjectStream` Readable async Downloads an object from S3 as a stream. #### Options Accepts all `GetObjectCommand` parameters plus: | Option | Type | Default | Description | |--------|------|---------|-------------| | `client` | `S3Client` | default client | Custom S3 client for this call | | `Bucket` | `string` | — | S3 bucket name | | `Key` | `string` | — | S3 object key | #### Example ```javascript import { pipeline } from '@datastream/core' import { awsS3GetObjectStream } from '@datastream/aws' import { csvParseStream } from '@datastream/csv' await pipeline([ await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'data.csv' }), csvParseStream(), ]) ``` ### `awsS3PutObjectStream` PassThrough Uploads data to S3 using multipart upload. #### Options Accepts all S3 PutObject parameters plus: | Option | Type | Default | Description | |--------|------|---------|-------------| | `client` | `S3Client` | default client | Custom S3 client | | `Bucket` | `string` | — | S3 bucket name | | `Key` | `string` | — | S3 object key | | `onProgress` | `function` | — | Upload progress callback | | `tags` | `object` | — | S3 object tags | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsS3PutObjectStream } from '@datastream/aws' import { gzipCompressStream } from '@datastream/compress' await pipeline([ createReadableStream(data), gzipCompressStream(), awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }), ]) ``` ### `awsS3ChecksumStream` PassThrough Computes a multi-part S3 checksum while data passes through. Designed for pre-signed URL uploads in the browser. #### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `ChecksumAlgorithm` | `string` | `"SHA256"` | `"SHA1"` or `"SHA256"` | | `partSize` | `number` | `17179870` | Part size in bytes | | `resultKey` | `string` | `"s3"` | Key in pipeline result | #### Result ```javascript { checksum: 'base64hash-3', checksums: ['part1hash', 'part2hash', 'part3hash'], partSize: 17179870 } ``` ## DynamoDB ### `awsDynamoDBSetClient` Mutates module-level state — not safe for concurrent multi-tenant use. ```javascript import { DynamoDBClient } from '@aws-sdk/client-dynamodb' import { awsDynamoDBSetClient } from '@datastream/aws' awsDynamoDBSetClient(new DynamoDBClient({ region: 'us-east-1' })) ``` ### `awsDynamoDBQueryStream` Readable async Queries a DynamoDB table and auto-paginates through all results. #### Options Accepts all `QueryCommand` parameters: | Option | Type | Description | |--------|------|-------------| | `TableName` | `string` | DynamoDB table name | | `KeyConditionExpression` | `string` | Query key condition | | `ExpressionAttributeValues` | `object` | Expression values | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsDynamoDBQueryStream } from '@datastream/aws' await pipeline([ createReadableStream(await awsDynamoDBQueryStream({ TableName: 'Users', KeyConditionExpression: 'PK = :pk', ExpressionAttributeValues: { ':pk': { S: 'USER#123' } }, })), ]) ``` ### `awsDynamoDBScanStream` Readable async Scans an entire DynamoDB table with automatic pagination. ```javascript import { awsDynamoDBScanStream } from '@datastream/aws' const items = await awsDynamoDBScanStream({ TableName: 'Users' }) ``` ### `awsDynamoDBExecuteStatementStream` Readable async Executes a PartiQL statement against DynamoDB with automatic pagination. #### Options Accepts `ExecuteStatementCommand` parameters: | Option | Type | Description | |--------|------|-------------| | `Statement` | `string` | PartiQL statement | | `Parameters` | `object[]` | Statement parameters | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsDynamoDBExecuteStatementStream } from '@datastream/aws' await pipeline([ createReadableStream(await awsDynamoDBExecuteStatementStream({ Statement: 'SELECT * FROM "Users" WHERE PK = ?', Parameters: [{ S: 'USER#123' }], })), ]) ``` ### `awsDynamoDBGetItemStream` Readable async Batch gets items by keys. Automatically retries unprocessed keys with exponential backoff. #### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `TableName` | `string` | — | DynamoDB table name | | `Keys` | `object[]` | — | Array of key objects | | `retryMaxCount` | `number` | `10` | Maximum retry attempts | ### `awsDynamoDBPutItemStream` Writable Writes items to DynamoDB using `BatchWriteItem`. Automatically batches 25 items per request and retries unprocessed items with exponential backoff. #### Options | Option | Type | Description | |--------|------|-------------| | `TableName` | `string` | DynamoDB table name | | `retryMaxCount` | `number` | Maximum retry attempts (default 10) | #### Example ```javascript import { pipeline, createReadableStream, createTransformStream } from '@datastream/core' import { awsDynamoDBPutItemStream } from '@datastream/aws' await pipeline([ createReadableStream(items), createTransformStream((item, enqueue) => { enqueue({ PK: { S: `USER#${item.id}` }, SK: { S: 'PROFILE' }, name: { S: item.name }, }) }), awsDynamoDBPutItemStream({ TableName: 'Users' }), ]) ``` ### `awsDynamoDBDeleteItemStream` Writable Deletes items from DynamoDB using `BatchWriteItem`. Batches 25 items per request. #### Options | Option | Type | Description | |--------|------|-------------| | `TableName` | `string` | DynamoDB table name | | `retryMaxCount` | `number` | Maximum retry attempts (default 10) | #### Example ```javascript import { awsDynamoDBDeleteItemStream } from '@datastream/aws' awsDynamoDBDeleteItemStream({ TableName: 'Users' }) // Input chunks: { PK: { S: 'USER#1' }, SK: { S: 'PROFILE' } } ``` ## Kinesis ### `awsKinesisSetClient` Mutates module-level state — not safe for concurrent multi-tenant use. ```javascript import { KinesisClient } from '@aws-sdk/client-kinesis' import { awsKinesisSetClient } from '@datastream/aws' awsKinesisSetClient(new KinesisClient({ region: 'us-east-1' })) ``` ### `awsKinesisGetRecordsStream` Readable async Gets records from a Kinesis shard. Polls until no more records are returned. #### Options Accepts `GetRecordsCommand` parameters plus: | Option | Type | Default | Description | |--------|------|---------|-------------| | `ShardIterator` | `string` | — | Shard iterator | | `pollingActive` | `boolean` | `false` | Keep polling for new records | | `pollingDelay` | `number` | `1000` | Delay (ms) between polls when no records | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsKinesisGetRecordsStream } from '@datastream/aws' await pipeline([ createReadableStream(await awsKinesisGetRecordsStream({ ShardIterator: 'AAA...', })), ]) ``` ### `awsKinesisPutRecordsStream` Writable Writes records to a Kinesis stream. Batches 500 records per `PutRecordsCommand`. #### Options | Option | Type | Description | |--------|------|-------------| | `StreamName` | `string` | Kinesis stream name | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsKinesisPutRecordsStream } from '@datastream/aws' await pipeline([ createReadableStream(records), awsKinesisPutRecordsStream({ StreamName: 'my-stream' }), ]) ``` ## Lambda ### `awsLambdaSetClient` Mutates module-level state — not safe for concurrent multi-tenant use. ```javascript import { LambdaClient } from '@aws-sdk/client-lambda' import { awsLambdaSetClient } from '@datastream/aws' awsLambdaSetClient(new LambdaClient({ region: 'us-east-1' })) ``` ### `awsLambdaReadableStream` Readable Invokes a Lambda function with response streaming (`InvokeWithResponseStream`). Also exported as `awsLambdaResponseStream`. #### Options Accepts `InvokeWithResponseStreamCommand` parameters. Pass an array to invoke multiple functions sequentially. | Option | Type | Description | |--------|------|-------------| | `FunctionName` | `string` | Lambda function name or ARN | | `Payload` | `string` | JSON payload | #### Example ```javascript import { pipeline } from '@datastream/core' import { awsLambdaReadableStream } from '@datastream/aws' import { csvParseStream } from '@datastream/csv' await pipeline([ awsLambdaReadableStream({ FunctionName: 'data-processor', Payload: JSON.stringify({ key: 'input.csv' }), }), csvParseStream(), ]) ``` ## SNS ### `awsSNSSetClient` Mutates module-level state — not safe for concurrent multi-tenant use. ```javascript import { SNSClient } from '@aws-sdk/client-sns' import { awsSNSSetClient } from '@datastream/aws' awsSNSSetClient(new SNSClient({ region: 'us-east-1' })) ``` ### `awsSNSPublishMessageStream` Writable Publishes messages to an SNS topic. Batches 10 messages per `PublishBatchCommand`. #### Options | Option | Type | Description | |--------|------|-------------| | `TopicArn` | `string` | SNS topic ARN | #### Example ```javascript import { pipeline, createReadableStream, createTransformStream } from '@datastream/core' import { awsSNSPublishMessageStream } from '@datastream/aws' await pipeline([ createReadableStream(events), createTransformStream((event, enqueue) => { enqueue({ Id: event.id, Message: JSON.stringify(event), }) }), awsSNSPublishMessageStream({ TopicArn: 'arn:aws:sns:us-east-1:123:my-topic' }), ]) ``` ## SQS ### `awsSQSSetClient` Mutates module-level state — not safe for concurrent multi-tenant use. ```javascript import { SQSClient } from '@aws-sdk/client-sqs' import { awsSQSSetClient } from '@datastream/aws' awsSQSSetClient(new SQSClient({ region: 'us-east-1' })) ``` ### `awsSQSReceiveMessageStream` Readable async Polls an SQS queue and yields messages until the queue is empty. #### Options Accepts `ReceiveMessageCommand` parameters plus: | Option | Type | Default | Description | |--------|------|---------|-------------| | `QueueUrl` | `string` | — | SQS queue URL | | `MaxNumberOfMessages` | `number` | — | Max messages per poll (1-10) | | `pollingActive` | `boolean` | `false` | Keep polling even when queue is empty | | `pollingDelay` | `number` | `1000` | Delay (ms) between polls when queue is empty | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsSQSReceiveMessageStream, awsSQSDeleteMessageStream } from '@datastream/aws' await pipeline([ createReadableStream(await awsSQSReceiveMessageStream({ QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue', })), awsSQSDeleteMessageStream({ QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue', }), ]) ``` ### `awsSQSSendMessageStream` Writable Sends messages to an SQS queue. Batches 10 messages per `SendMessageBatchCommand`. #### Options | Option | Type | Description | |--------|------|-------------| | `QueueUrl` | `string` | SQS queue URL | ### `awsSQSDeleteMessageStream` Writable Deletes messages from an SQS queue. Batches 10 messages per `DeleteMessageBatchCommand`. #### Options | Option | Type | Description | |--------|------|-------------| | `QueueUrl` | `string` | SQS queue URL | // File: packages/base64/ --- title: base64 description: Base64 encoding and decoding streams. --- Base64 encoding and decoding streams. ## Install ```bash npm install @datastream/base64 ``` ## `base64EncodeStream` Transform Encodes data to base64. Handles chunk boundaries correctly by buffering partial 3-byte groups. > **Note:** Input must be Latin1-encoded strings (code points 0-255). Use `charsetEncodeStream` to convert other encodings before base64 encoding. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { base64EncodeStream } from '@datastream/base64' await pipeline([ createReadableStream('Hello, World!'), base64EncodeStream(), ]) ``` ## `base64DecodeStream` Transform Decodes base64 data back to its original form. Handles chunk boundaries by buffering partial 4-character groups. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { base64DecodeStream } from '@datastream/base64' await pipeline([ createReadableStream('SGVsbG8sIFdvcmxkIQ=='), base64DecodeStream(), ]) ``` // File: packages/charset/ --- title: charset description: Character set detection, decoding, and encoding streams. --- Character set detection, decoding, and encoding streams. ## Install ```bash npm install @datastream/charset ``` ## `charsetDetectStream` PassThrough Detects the character encoding of the data passing through by analyzing byte patterns. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `resultKey` | `string` | `"charset"` | Key in pipeline result | ### Result Returns the most likely charset with confidence score: ```javascript { charset: 'UTF-8', confidence: 80 } ``` ### Supported charsets UTF-8, UTF-16BE, UTF-16LE, UTF-32BE, UTF-32LE, Shift_JIS, ISO-2022-JP, ISO-2022-CN, ISO-2022-KR, GB18030, EUC-JP, EUC-KR, Big5, ISO-8859-1, ISO-8859-2, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, windows-1250, windows-1251, windows-1252, windows-1254, windows-1256, KOI8-R ### Example ```javascript import { pipeline } from '@datastream/core' import { fileReadStream } from '@datastream/file' import { charsetDetectStream, charsetDecodeStream } from '@datastream/charset' const detect = charsetDetectStream() const result = await pipeline([ fileReadStream({ path: './data.csv' }), detect, ]) console.log(result.charset) // { charset: 'UTF-8', confidence: 80 } ``` ## `charsetDecodeStream` Transform Decodes binary data to text using the specified character encoding. Uses `TextDecoderStream` internally. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `charset` | `string` | — | Character encoding name (e.g. `"UTF-8"`, `"ISO-8859-1"`) | ### Example ```javascript import { charsetDecodeStream } from '@datastream/charset' charsetDecodeStream({ charset: 'ISO-8859-1' }) ``` ## `charsetEncodeStream` Transform Encodes text to binary using the specified character encoding. Uses `TextEncoderStream` internally. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `charset` | `string` | — | Character encoding name | ### Example ```javascript import { charsetEncodeStream } from '@datastream/charset' charsetEncodeStream({ charset: 'UTF-8' }) ``` // File: packages/compress/ --- title: compress description: Compression and decompression streams for gzip, deflate, brotli, and zstd. --- Compression and decompression streams for gzip, deflate, brotli, and zstd. ## Install ```bash npm install @datastream/compress ``` ## gzip ### `gzipCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `-1` | Compression level (-1 to 9). -1 = default, 0 = none, 9 = best | | `maxOutputSize` | `number` | — | Maximum compressed output in bytes (web variant) | ### `gzipDecompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded | ### Example ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress' // Compress await pipeline([ fileReadStream({ path: './data.csv' }), gzipCompressStream({ quality: 9 }), fileWriteStream({ path: './data.csv.gz' }), ]) // Decompress await pipeline([ fileReadStream({ path: './data.csv.gz' }), gzipDecompressStream(), fileWriteStream({ path: './data.csv' }), ]) ``` ## deflate ### `deflateCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `-1` | Compression level (-1 to 9) | | `maxOutputSize` | `number` | — | Maximum compressed output in bytes (web variant) | ### `deflateDecompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded | ## brotli ### `brotliCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `11` | Compression level (0 to 11) | ### `brotliDecompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded | ## zstd Node.js only Requires Node.js with zstd support. ### `zstdCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `3` | Compression level | ### `zstdDecompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `maxOutputSize` | `number` | — | Maximum decompressed output in bytes. Destroys the stream with an error when exceeded | ## Output size protection ### Decompression bombs A malicious compressed payload known as a "decompression bomb" can be as small as a few kilobytes but expand to gigabytes when decompressed, exhausting memory and crashing the process. Setting `maxOutputSize` ensures decompression is aborted before memory is exhausted. Always set this when decompressing untrusted input. ```javascript import { gzipDecompressStream } from '@datastream/compress' // Limit decompressed output to 100MB gzipDecompressStream({ maxOutputSize: 100 * 1024 * 1024 }) ``` ### Compression output limits Compression streams also support `maxOutputSize` (web variant) to cap compressed output size. This can be useful to enforce storage limits. ```javascript import { gzipCompressStream } from '@datastream/compress' // Limit compressed output to 50MB gzipCompressStream({ maxOutputSize: 50 * 1024 * 1024 }) ``` ## Platform support | Algorithm | Node.js | Browser | |-----------|---------|---------| | gzip | `node:zlib` | `CompressionStream` | | deflate | `node:zlib` | `CompressionStream` | | brotli | `node:zlib` | `CompressionStream` | | zstd | `node:zlib` | Not supported | // File: packages/core/ --- title: core description: Pipeline orchestration, stream factories, and utility functions for datastream. --- Foundation package providing pipeline orchestration, stream factories, and utility functions. ## Install ```bash npm install @datastream/core ``` ## Pipeline ### `pipeline(streams, streamOptions)` async Connects all streams, waits for completion, and returns combined `.result()` values from all PassThrough streams. Automatically appends a no-op Writable if the last stream is Readable. #### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `highWaterMark` | `number` | — | Backpressure threshold | | `chunkSize` | `number` | — | Size hint for chunking | | `signal` | `AbortSignal` | — | Abort the pipeline | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectCountStream } from '@datastream/object' const count = objectCountStream() const result = await pipeline([ createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]), count, ]) console.log(result) // { count: 3 } ``` ### `pipejoin(streams)` returns stream Connects streams and returns the resulting stream. Use this when you want to consume output manually with `streamToArray`, `streamToString`, or `for await`. #### Example ```javascript import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core' const river = pipejoin([ createReadableStream([1, 2, 3]), createTransformStream((n, enqueue) => enqueue(n * 2)), ]) const output = await streamToArray(river) // [2, 4, 6] ``` ### `result(streams)` async Iterates over streams and combines all `.result()` return values into a single object. Called automatically by `pipeline()`. ## Consumers ### `streamToArray(stream)` async Collects all chunks from a stream into an array. ```javascript import { pipejoin, streamToArray, createReadableStream } from '@datastream/core' const river = pipejoin([createReadableStream(['a', 'b', 'c'])]) const output = await streamToArray(river) // ['a', 'b', 'c'] ``` ### `streamToString(stream)` async Concatenates all chunks into a single string. ```javascript const output = await streamToString(river) // 'abc' ``` ### `streamToObject(stream)` async Merges all chunks into a single object using `Object.assign`. ```javascript const river = pipejoin([createReadableStream([{ a: 1 }, { b: 2 }])]) const output = await streamToObject(river) // { a: 1, b: 2 } ``` ### `streamToBuffer(stream)` async Node.js only Collects all chunks into a `Buffer`. ## Stream Factories ### `createReadableStream(input, streamOptions)` Readable Creates a Readable stream from various input types. #### Input types | Type | Behavior | |------|----------| | `string` | Chunked at `chunkSize` (default 16KB) | | `Array` | Each element emitted as a chunk | | `AsyncIterable` / `Iterable` | Each yielded value emitted as a chunk | | `ArrayBuffer` | Chunked at `chunkSize` (Node.js only) | #### Example ```javascript import { createReadableStream } from '@datastream/core' // From string — auto-chunked const stream = createReadableStream('hello world') // From array — one chunk per element const stream = createReadableStream([{ a: 1 }, { a: 2 }]) // From async generator async function* generate() { yield 'chunk1' yield 'chunk2' } const stream = createReadableStream(generate()) ``` ### `createReadableStreamFromString(input, streamOptions)` Readable Creates a Readable stream from a string, chunking it at `chunkSize` (default 16KB). Useful when you need explicit control over string chunking separate from `createReadableStream`. ```javascript import { createReadableStreamFromString } from '@datastream/core' const stream = createReadableStreamFromString(largeString, { chunkSize: 4096 }) ``` ### `createReadableStreamFromArrayBuffer(input, streamOptions)` Readable Creates a Readable stream from an `ArrayBuffer` or `Uint8Array`, chunking it at `chunkSize` (default 16KB). ```javascript import { createReadableStreamFromArrayBuffer } from '@datastream/core' const stream = createReadableStreamFromArrayBuffer(buffer, { chunkSize: 8192 }) ``` ### `createPassThroughStream(fn, flush?, streamOptions)` Transform (PassThrough) Creates a stream that observes each chunk without modifying it. The chunk is automatically passed through. #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `fn` | `(chunk) => void` | Called for each chunk, return value ignored | | `flush` | `() => void` | Optional, called when stream ends | | `streamOptions` | `object` | Stream configuration (supports `signal` for abort) | #### Example ```javascript import { createPassThroughStream } from '@datastream/core' let total = 0 const counter = createPassThroughStream((chunk) => { total += chunk.length }) counter.result = () => ({ key: 'total', value: total }) ``` ### `createTransformStream(fn, flush?, streamOptions)` Transform Creates a stream that modifies chunks. Use `enqueue` to emit output — you can emit zero, one, or many chunks per input. #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `fn` | `(chunk, enqueue) => void` | Transform each chunk, call `enqueue(output)` to emit | | `flush` | `(enqueue) => void` | Optional, emit final chunks when stream ends | | `streamOptions` | `object` | Stream configuration (supports `signal` for abort) | #### Example ```javascript import { createTransformStream } from '@datastream/core' // Filter: emit only matching chunks const filter = createTransformStream((chunk, enqueue) => { if (chunk.age > 18) enqueue(chunk) }) // Expand: emit multiple chunks per input const expand = createTransformStream((chunk, enqueue) => { for (const item of chunk.items) { enqueue(item) } }) ``` ### `createWritableStream(fn, close?, streamOptions)` Writable Creates a stream that consumes chunks at the end of a pipeline. #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `fn` | `(chunk) => void` | Called for each chunk | | `close` | `() => void` | Optional, called when stream ends | | `streamOptions` | `object` | Stream configuration (supports `signal` for abort) | #### Example ```javascript import { createWritableStream } from '@datastream/core' const rows = [] const collector = createWritableStream((chunk) => { rows.push(chunk) }) ``` ## Utilities ### `isReadable(stream)` Returns `true` if the stream is Readable. ### `isWritable(stream)` Returns `true` if the stream is Writable. ### `makeOptions(options)` Normalizes stream options for interoperability between Readable, Transform, and Writable streams. | Option | Type | Description | |--------|------|-------------| | `highWaterMark` | `number` | Backpressure threshold | | `chunkSize` | `number` | Chunking size hint | | `signal` | `AbortSignal` | Abort signal | ### `timeout(ms, options)` async Returns a promise that resolves after `ms` milliseconds. Supports `AbortSignal` cancellation. ```javascript import { timeout } from '@datastream/core' await timeout(1000) // wait 1 second const controller = new AbortController() await timeout(5000, { signal: controller.signal }) // cancellable ``` ### `backpressureGauge(streams)` Node.js only Measures pause/resume timing across streams. Useful for identifying bottlenecks. ```javascript import { backpressureGauge } from '@datastream/core' const metrics = backpressureGauge({ parse: parseStream, validate: validateStream }) // After pipeline completes: // metrics.parse.total = { timestamp, duration } // metrics.parse.timeline = [{ timestamp, duration }, ...] ``` // File: packages/csv/ --- title: csv description: Parse, format, detect, clean, and coerce CSV data streams. --- Parse, format, detect, clean, and coerce CSV data. ## Install ```bash npm install @datastream/csv ``` ## `csvDetectDelimitersStream` PassThrough Auto-detects the delimiter, newline, quote, and escape characters from the first chunk of data. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` (1KB) | Minimum bytes to buffer before detecting | | `resultKey` | `string` | `"csvDetectDelimiters"` | Key in pipeline result | ### Result ```javascript { delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' } ``` ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvDetectDelimitersStream } from '@datastream/csv' const detect = csvDetectDelimitersStream() const result = await pipeline([ createReadableStream('name\tage\r\nAlice\t30'), detect, ]) console.log(result.csvDetectDelimiters) // { delimiterChar: '\t', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' } ``` ## `csvDetectHeaderStream` Transform Detects and strips the header row. Outputs data rows only (without the header). ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` (1KB) | Minimum bytes to buffer before detecting | | `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function | | `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function | | `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function | | `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function | | `parser` | `function` | `csvQuotedParser` | Custom parser function | | `resultKey` | `string` | `"csvDetectHeader"` | Key in pipeline result | ### Result ```javascript { header: ['name', 'age', 'city'] } ``` ### Example ```javascript import { csvDetectDelimitersStream, csvDetectHeaderStream } from '@datastream/csv' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) ``` ## `csvParseStream` Transform Parses CSV text into arrays of field values (string arrays). Each output chunk is one row as `string[]`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `2097152` (2MB) | Input buffer size before first parse | | `fieldMaxSize` | `number` | `16777216` (16MB) | Maximum size of a single field in bytes | | `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function | | `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function | | `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function | | `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function | | `parser` | `function` | `csvQuotedParser` | Custom parser function | | `resultKey` | `string` | `"csvErrors"` | Key in pipeline result | #### Field size protection A crafted CSV with an unterminated quoted field causes the parser to buffer the entire remaining input into a single field, consuming unbounded memory. An attacker can exploit this to exhaust process memory with a relatively small file. Setting `fieldMaxSize` caps per-field memory and aborts parsing with an error when exceeded. Always set this when parsing untrusted CSV input, and lower it from the default if your data has known field size bounds. ```javascript // Parse with a 1MB field limit for untrusted input csvParseStream({ fieldMaxSize: 1 * 1024 * 1024 }) ``` ### Result Parse errors collected during processing: ```javascript { UnterminatedQuote: { id: 'UnterminatedQuote', message: 'Unterminated quoted field', idx: [5] } } ``` ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' const result = await pipeline([ createReadableStream('a,b,c\r\n1,2,3\r\n4,5,6'), csvParseStream(), ]) // Chunks emitted: ['a','b','c'], ['1','2','3'], ['4','5','6'] ``` ## `csvFormatStream` Transform Formats objects or arrays back to CSV text. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `header` | `boolean \| string[]` | — | `true` to auto-detect from object keys, or provide array of column names | | `delimiterChar` | `string` | `","` | Delimiter character | | `newlineChar` | `string` | `"\r\n"` | Newline character | | `quoteChar` | `string` | `'"'` | Quote character | | `escapeChar` | `string` | quoteChar | Escape character | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvFormatStream } from '@datastream/csv' await pipeline([ createReadableStream([ { name: 'Alice', age: 30 }, { name: 'Bob', age: 25 }, ]), csvFormatStream({ header: true }), ]) // Output: "name","age"\r\n"Alice","30"\r\n"Bob","25"\r\n ``` ## `csvRemoveEmptyRowsStream` Transform Removes rows where all fields are empty strings. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `onErrorEnqueue` | `boolean` | `false` | If `true`, empty rows are kept in stream (but still tracked) | | `resultKey` | `string` | `"csvRemoveEmptyRows"` | Key in pipeline result | ### Result ```javascript { EmptyRow: { id: 'EmptyRow', message: 'Row is empty', idx: [3, 7] } } ``` ## `csvRemoveMalformedRowsStream` Transform Removes rows with an incorrect number of fields compared to the first row or the provided header. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `headers` | `string[] \| () => string[]` | — | Expected header array, or lazy function. If not provided, uses first row's field count | | `onErrorEnqueue` | `boolean` | `false` | If `true`, malformed rows are kept in stream | | `resultKey` | `string` | `"csvRemoveMalformedRows"` | Key in pipeline result | ### Result ```javascript { MalformedRow: { id: 'MalformedRow', message: 'Row has incorrect number of fields', idx: [2] } } ``` ## `csvCoerceValuesStream` Transform Converts string field values to typed JavaScript values. Works on objects (use after `objectFromEntriesStream`). ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `columns` | `object` | — | Map of column names to types. Without this, auto-coercion is used | | `resultKey` | `string` | `"csvCoerceValues"` | Key in pipeline result | ### Auto-coercion rules | Input | Output | |-------|--------| | `""` | `null` | | `"true"` / `"false"` | `true` / `false` | | Numeric strings | `Number` | | ISO 8601 date strings | `Date` | | JSON strings (`{...}`, `[...]`) | Parsed object/array | ### Column types When specifying `columns`, valid types are: `"number"`, `"boolean"`, `"null"`, `"date"`, `"json"`. ```javascript csvCoerceValuesStream({ columns: { age: 'number', active: 'boolean', birthday: 'date' } }) ``` ## `csvQuotedParser` / `csvUnquotedParser` Standalone parser functions for use outside of streams. `csvUnquotedParser` is faster but does not handle quoted fields. ```javascript import { csvQuotedParser } from '@datastream/csv' const { rows, tail, numCols, idx, errors } = csvQuotedParser( 'a,b,c\r\n1,2,3\r\n', { delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"' }, true ) // rows: [['a','b','c'], ['1','2','3']] ``` ## Full pipeline example Detect, parse, clean, coerce, and validate: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvRemoveEmptyRowsStream, csvRemoveMalformedRowsStream, csvCoerceValuesStream, } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ createReadableStream(csvData), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), csvRemoveEmptyRowsStream(), csvRemoveMalformedRowsStream({ headers: () => detectHeader.result().value.header, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), csvCoerceValuesStream(), validateStream({ schema }), ]) ``` // File: packages/digest/ --- title: digest description: Compute cryptographic hash digests while streaming data. --- Compute cryptographic hash digests while streaming data. ## Install ```bash npm install @datastream/digest ``` ## `digestStream` PassThrough async (browser) Computes a hash digest of all data passing through. The stream is async in the browser (returns a Promise) because it uses `hash-wasm`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `algorithm` | `string` | — | Hash algorithm (see table below) | | `resultKey` | `string` | `"digest"` | Key in pipeline result | ### Supported algorithms | Algorithm | Node.js | Browser | |-----------|---------|---------| | `SHA2-256` | `node:crypto` | `hash-wasm` | | `SHA2-384` | `node:crypto` | `hash-wasm` | | `SHA2-512` | `node:crypto` | `hash-wasm` | | `SHA3-256` | — | `hash-wasm` | | `SHA3-384` | — | `hash-wasm` | | `SHA3-512` | — | `hash-wasm` | ### Result ```javascript 'SHA2-256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' ``` ### Example ```javascript import { pipeline } from '@datastream/core' import { fileReadStream } from '@datastream/file' import { digestStream } from '@datastream/digest' // Node.js — synchronous const digest = digestStream({ algorithm: 'SHA2-256' }) const result = await pipeline([ fileReadStream({ path: './data.csv' }), digest, ]) console.log(result) // { digest: 'SHA2-256:e3b0c4429...' } ``` ```javascript // Browser — async, must await const digest = await digestStream({ algorithm: 'SHA2-256' }) ``` // File: packages/encrypt/ --- title: encrypt description: Symmetric encryption and decryption streams for AES-GCM, AES-CTR, and ChaCha20-Poly1305. --- Symmetric encryption and decryption streams. Defaults to AES-256-GCM (authenticated encryption). ## Install ```bash npm install @datastream/encrypt ``` For ChaCha20-Poly1305 in browser environments: ```bash npm install libsodium-wrappers ``` ## `encryptStream` Transform async (browser) Encrypts data passing through. On Node.js, wraps `node:crypto` for true streaming. In the browser, uses Web Crypto API (AES-GCM buffers; AES-CTR streams). ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `algorithm` | `string` | `"AES-256-GCM"` | Encryption algorithm | | `key` | `Uint8Array\|Buffer` | — | 32-byte encryption key | | `iv` | `Uint8Array\|Buffer` | auto-generated | Initialization vector | | `aad` | `Uint8Array\|Buffer` | — | Additional Authenticated Data (GCM/ChaCha only) | | `maxInputSize` | `number` | `67108864` | Max input bytes for web AES-GCM (64MB) | ### Supported algorithms | Algorithm | Auth | Node.js | Browser | IV size | |-----------|------|---------|---------|---------| | `AES-256-GCM` | authTag | `node:crypto` (streaming) | `crypto.subtle` (buffered) | 12 bytes | | `AES-256-CTR` | none | `node:crypto` (streaming) | `crypto.subtle` (streaming) | 16 bytes | | `CHACHA20-POLY1305` | authTag | `node:crypto` (streaming) | `libsodium-wrappers` (buffered) | 12 bytes | ### Result ```javascript { algorithm: 'AES-256-GCM', iv: Uint8Array(12), authTag: Uint8Array(16) // only for GCM and ChaCha20 } ``` ### Example ```javascript import { createReadableStream, pipeline } from '@datastream/core' import { encryptStream, generateEncryptionKey } from '@datastream/encrypt' const key = generateEncryptionKey() // Node.js — synchronous const enc = encryptStream({ key }) const result = await pipeline([ createReadableStream(data), enc, ]) const { iv, authTag } = result.encrypt ``` ```javascript // Browser — async, must await const enc = await encryptStream({ key }) ``` ## `decryptStream` Transform async (browser) Decrypts data encrypted by `encryptStream`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `algorithm` | `string` | `"AES-256-GCM"` | Must match encryption algorithm | | `key` | `Uint8Array\|Buffer` | — | Same key used for encryption | | `iv` | `Uint8Array\|Buffer` | — | IV from `encryptStream` result | | `authTag` | `Uint8Array\|Buffer` | — | Auth tag from result (GCM/ChaCha) | | `aad` | `Uint8Array\|Buffer` | — | Must match encryption AAD | | `maxOutputSize` | `number` | — | Max decrypted output bytes | ### Example ```javascript import { createReadableStream, pipeline, streamToString } from '@datastream/core' import { decryptStream } from '@datastream/encrypt' const dec = decryptStream({ key, iv, authTag }) const plaintext = await streamToString( createReadableStream(ciphertext).pipe(dec) ) ``` ## `generateEncryptionKey` Generate a cryptographically random encryption key. ```javascript import { generateEncryptionKey } from '@datastream/encrypt' const key = generateEncryptionKey() // 32 bytes (AES-256) const key = generateEncryptionKey({ bits: 128 }) // 16 bytes (AES-128) ``` ## Patterns ### Encrypt with plaintext and ciphertext digests ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { encryptStream } from '@datastream/encrypt' import { digestStream } from '@datastream/digest' const result = await pipeline([ fileReadStream({ path: './data.csv' }), digestStream({ algorithm: 'SHA2-256', resultKey: 'plaintextDigest' }), encryptStream({ key }), digestStream({ algorithm: 'SHA2-256', resultKey: 'ciphertextDigest' }), fileWriteStream({ path: './data.csv.enc' }), ]) // result.plaintextDigest — verify correct decryption // result.ciphertextDigest — verify file integrity on disk // result.encrypt — { algorithm, iv, authTag } ``` ### Large file streaming with AES-CTR AES-256-CTR supports true streaming on both Node.js and browser. Use it for large files where AES-GCM's buffering would cause memory issues. Pair with `digestStream` for integrity verification. ```javascript import { encryptStream } from '@datastream/encrypt' const enc = await encryptStream({ key, algorithm: 'AES-256-CTR' }) ``` ### With Additional Authenticated Data (AAD) Bind encryption to metadata so ciphertext can't be reused in a different context. ```javascript const aad = new TextEncoder().encode(JSON.stringify({ userId: '123' })) const enc = encryptStream({ key, aad }) // Decryption must provide the same aad const dec = decryptStream({ key, iv, authTag, aad }) ``` // File: packages/fetch/ --- title: fetch description: HTTP client streams with automatic pagination, rate limiting, and retry. --- HTTP client streams with automatic pagination, rate limiting, and 429 retry. ## Install ```bash npm install @datastream/fetch ``` ## `fetchSetDefaults` Set global defaults for all fetch streams. Mutates module-level state — not safe for concurrent multi-tenant use. ### Defaults | Option | Type | Default | Description | |--------|------|---------|-------------| | `method` | `string` | `"GET"` | HTTP method | | `headers` | `object` | `{ Accept: 'application/json', 'Accept-Encoding': 'br, gzip, deflate' }` | Request headers | | `rateLimit` | `number` | `0.01` | Minimum seconds between requests (0.01 = 100/sec) | | `dataPath` | `string` | — | Dot-path to data array in JSON response body | | `nextPath` | `string` | — | Dot-path to next page URL in JSON response body | | `qs` | `object` | `{}` | Default query string parameters | | `offsetParam` | `string` | — | Query parameter name for offset pagination | | `offsetAmount` | `number` | — | Increment per page for offset pagination | ### Example ```javascript import { fetchSetDefaults } from '@datastream/fetch' fetchSetDefaults({ headers: { Authorization: 'Bearer token123' }, rateLimit: 0.1, // 10 requests/sec }) ``` ## `fetchReadableStream` Readable Fetches data from one or more URLs and emits chunks. Automatically detects JSON responses and handles pagination. Also exported as `fetchResponseStream`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `url` | `string` | — | Request URL | | `method` | `string` | `"GET"` | HTTP method | | `headers` | `object` | — | Request headers (merged with defaults) | | `rateLimit` | `number` | `0.01` | Seconds between requests | | `dataPath` | `string` | — | Dot-path to data in JSON body | | `nextPath` | `string` | — | Dot-path to next URL in JSON body | | `qs` | `object` | `{}` | Query string parameters | | `offsetParam` | `string` | — | Query param for offset-based pagination | | `offsetAmount` | `number` | — | Offset increment per page | Pass an array of option objects to fetch from multiple URLs in sequence. ### Pagination strategies **Link header** — automatically followed when present: ``` Link: ; rel="next" ``` **Body path** — use `nextPath` to extract the next URL from the JSON response: ```javascript fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'data', nextPath: 'pagination.next_url', }) ``` **Offset** — use `offsetParam` and `offsetAmount` for numeric pagination: ```javascript fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'results', offsetParam: 'offset', offsetAmount: 100, }) ``` ### 429 retry When receiving a `429 Too Many Requests` response, the request is automatically retried with exponential backoff. If a `Retry-After` header is present, its value (in seconds) is used as the delay. Otherwise, the delay follows `min(1000 × 2^(attempt-1), 30000)` ms. Retries are capped at `retryMaxCount` (default: 10). | Option | Type | Default | Description | |--------|------|---------|-------------| | `retryMaxCount` | `number` | `10` | Maximum retry attempts on 429 | ### Example ```javascript import { pipeline } from '@datastream/core' import { fetchReadableStream } from '@datastream/fetch' import { objectCountStream } from '@datastream/object' const count = objectCountStream() const result = await pipeline([ fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'data', nextPath: 'meta.next', headers: { Authorization: 'Bearer token' }, }), count, ]) console.log(result) // { count: 450 } ``` ### Multiple URLs ```javascript fetchReadableStream([ { url: 'https://api.example.com/users?status=active', dataPath: 'data' }, { url: 'https://api.example.com/users?status=inactive', dataPath: 'data' }, ]) ``` ## `fetchWritableStream` Writable, async Streams data as the body of an HTTP request. Uses `duplex: "half"` for browser compatibility. Also exported as `fetchRequestStream`. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { fetchWritableStream } from '@datastream/fetch' import { csvFormatStream } from '@datastream/csv' await pipeline([ createReadableStream(data), csvFormatStream({ header: true }), await fetchWritableStream({ url: 'https://api.example.com/upload', method: 'PUT', headers: { 'Content-Type': 'text/csv' }, }), ]) ``` // File: packages/file/ --- title: file description: File read and write streams for Node.js and the browser. --- File read and write streams for Node.js and the browser. ## Install ```bash npm install @datastream/file ``` ## `fileReadStream` Readable Reads a file as a stream. - **Node.js**: Uses `fs.createReadStream` - **Browser**: Uses `window.showOpenFilePicker` (File System Access API) ### Options | Option | Type | Description | |--------|------|-------------| | `path` | `string` | File path (Node.js) | | `basePath` | `string` | When provided, enforces that `path` resolves within `basePath`. Prevents path traversal and rejects symbolic links | | `types` | `object[]` | File type filter for the file picker (see below) | ### Example — Node.js ```javascript import { pipeline } from '@datastream/core' import { fileReadStream } from '@datastream/file' await pipeline([ fileReadStream({ path: './data.csv' }), ]) ``` ### Example — Browser ```javascript import { fileReadStream } from '@datastream/file' const stream = await fileReadStream({ types: [{ accept: { 'text/csv': ['.csv'] } }], }) ``` ## `fileWriteStream` Writable Writes a stream to a file. - **Node.js**: Uses `fs.createWriteStream` - **Browser**: Uses `window.showSaveFilePicker` (File System Access API) ### Options | Option | Type | Description | |--------|------|-------------| | `path` | `string` | File path (Node.js), suggested file name (Browser) | | `basePath` | `string` | When provided, enforces that `path` resolves within `basePath`. Prevents path traversal and rejects symbolic links | | `types` | `object[]` | File type filter | ### Example — Node.js ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { fileWriteStream } from '@datastream/file' await pipeline([ createReadableStream('hello world'), fileWriteStream({ path: './output.txt' }), ]) ``` ### Example — Browser ```javascript import { fileWriteStream } from '@datastream/file' const writable = await fileWriteStream({ path: 'output.csv', types: [{ accept: { 'text/csv': ['.csv'] } }], }) ``` ## File type filtering The `types` option validates file extensions (Node.js) and configures the file picker dialog (Browser): ```javascript const types = [ { accept: { 'text/csv': ['.csv'], 'application/json': ['.json'], }, }, ] ``` On Node.js, if `types` is provided and the file extension doesn't match, an `"Invalid extension"` error is thrown. ## Security When accepting file paths from user input, always use an absolute `path` or set `basePath` to prevent path traversal attacks (e.g., `../../etc/passwd`). Relative paths without a `basePath` constraint can resolve outside the intended directory. `basePath` is opt-in. When provided, paths are resolved and checked with `path.resolve().startsWith(basePath)`, and symbolic links are rejected. When omitted, no path restriction is applied. ```javascript // Restrict reads to a specific directory fileReadStream({ path: userInput, basePath: '/data/uploads', types }) // Convenience helper for cwd-scoped reads const safeFileRead = (path, types) => fileReadStream({ path, basePath: process.cwd(), types }) ``` // File: packages/indexeddb/ --- title: indexeddb description: IndexedDB read and write streams for the browser. --- IndexedDB read and write streams for the browser. ## Install ```bash npm install @datastream/indexeddb ``` ## `indexedDBConnect` Opens (or creates) an IndexedDB database. Re-exported from the [idb](https://www.npmjs.com/package/idb) library. ```javascript import { indexedDBConnect } from '@datastream/indexeddb' const db = await indexedDBConnect('my-database', 1, { upgrade(db) { db.createObjectStore('records', { keyPath: 'id' }) }, }) ``` ## `indexedDBReadStream` Readable async Reads records from an IndexedDB object store as a stream. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` | | `store` | `string` | — | Object store name | | `index` | `string` | — | Optional index name | | `key` | `IDBKeyRange` | — | Optional key range filter | ### Example ```javascript import { pipeline } from '@datastream/core' import { indexedDBConnect, indexedDBReadStream } from '@datastream/indexeddb' import { objectCountStream } from '@datastream/object' const db = await indexedDBConnect('my-database', 1) const count = objectCountStream() const result = await pipeline([ await indexedDBReadStream({ db, store: 'records' }), count, ]) console.log(result) // { count: 100 } ``` ## `indexedDBWriteStream` Writable async Writes records to an IndexedDB object store. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` | | `store` | `string` | — | Object store name | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { indexedDBConnect, indexedDBWriteStream } from '@datastream/indexeddb' const db = await indexedDBConnect('my-database', 1) await pipeline([ createReadableStream([ { id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }, ]), await indexedDBWriteStream({ db, store: 'records' }), ]) ``` // File: packages/ipfs/ --- title: ipfs description: IPFS get and add streams. --- IPFS get and add streams. ## Install ```bash npm install @datastream/ipfs ``` ## `ipfsGetStream` Readable async Retrieves content from IPFS by CID. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `node` | `IPFS` | — | IPFS node instance | | `cid` | `string` | — | Content identifier | ### Example ```javascript import { pipeline } from '@datastream/core' import { ipfsGetStream } from '@datastream/ipfs' await pipeline([ await ipfsGetStream({ node: ipfsNode, cid: 'QmHash...' }), ]) ``` ## `ipfsAddStream` PassThrough async Adds content to IPFS and returns the CID in `.result()`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `node` | `IPFS` | — | IPFS node instance | | `resultKey` | `string` | `"cid"` | Key in pipeline result | ### Result The CID of the stored content. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { ipfsAddStream } from '@datastream/ipfs' const result = await pipeline([ createReadableStream('Hello IPFS!'), await ipfsAddStream({ node: ipfsNode }), ]) console.log(result) // { cid: 'QmHash...' } ``` // File: packages/json/ --- title: json description: JSON and NDJSON (JSON Lines) parsing and formatting streams. --- JSON and NDJSON (Newline-Delimited JSON / JSON Lines) parsing and formatting streams. ## Install ```bash npm install @datastream/json ``` ## `ndjsonParseStream` Transform Parses NDJSON (one JSON value per line) into individual JavaScript values. Splits on `\n`, parses each line with `JSON.parse`, and skips empty lines. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `maxBufferSize` | `number` | `16777216` (16MB) | Maximum buffer size for incomplete lines | | `resultKey` | `string` | `"jsonErrors"` | Key in pipeline result | ### Result Parse errors collected during processing: ```javascript { ParseError: { id: 'ParseError', message: 'Invalid JSON', idx: [3, 7] } } ``` ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { ndjsonParseStream } from '@datastream/json' const parse = ndjsonParseStream() const result = await pipeline([ createReadableStream('{"name":"Alice"}\n{"name":"Bob"}\n'), parse, ]) console.log(result.jsonErrors) // {} ``` ## `ndjsonFormatStream` Transform Formats objects as NDJSON text (one `JSON.stringify` per line). Batches output for throughput. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `space` | `number \| string` | — | Passed to `JSON.stringify` for pretty-printing each line | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { ndjsonFormatStream } from '@datastream/json' await pipeline([ createReadableStream([ { name: 'Alice', age: 30 }, { name: 'Bob', age: 25 }, ]), ndjsonFormatStream(), ]) // Output: {"name":"Alice","age":30}\n{"name":"Bob","age":25}\n ``` ## `jsonParseStream` Transform Parses a streaming JSON array (`[{...},{...}]`) into individual elements. Uses a state machine to track nesting depth, strings, and escapes across chunk boundaries — no external dependencies. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `maxBufferSize` | `number` | `16777216` (16MB) | Maximum buffer size for incomplete elements | | `maxValueSize` | `number` | `16777216` (16MB) | Maximum size of a single JSON element | | `resultKey` | `string` | `"jsonErrors"` | Key in pipeline result | #### Buffer size protection A JSON array with deeply nested or very large objects can cause the parser to buffer significant amounts of data. Setting `maxBufferSize` caps total memory and `maxValueSize` caps per-element memory. Lower these from defaults when parsing untrusted input. ```javascript // Parse with a 1MB buffer limit for untrusted input jsonParseStream({ maxBufferSize: 1 * 1024 * 1024 }) ``` ### Result Parse errors collected during processing: ```javascript { ParseError: { id: 'ParseError', message: 'Invalid JSON', idx: [2] } } ``` ### Example ```javascript import { pipeline, createReadableStream, streamToArray } from '@datastream/core' import { jsonParseStream } from '@datastream/json' const streams = [ createReadableStream('[{"name":"Alice"},{"name":"Bob"}]'), jsonParseStream(), ] const output = await streamToArray(pipejoin(streams)) // [{ name: 'Alice' }, { name: 'Bob' }] ``` ## `jsonFormatStream` Transform Formats objects as a JSON array string (`[{...},{...}]`). Emits `[` with the first element, `,\n` between elements, and `]` on flush. Outputs `[]` if no elements. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `space` | `number \| string` | — | Passed to `JSON.stringify` for pretty-printing | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { jsonFormatStream } from '@datastream/json' await pipeline([ createReadableStream([ { name: 'Alice', age: 30 }, { name: 'Bob', age: 25 }, ]), jsonFormatStream({ space: 2 }), ]) // Output: [{\n "name": "Alice",\n "age": 30\n},\n{\n "name": "Bob",\n "age": 25\n}\n] ``` ## Full pipeline example Fetch NDJSON API, validate, and write as JSON array: ```javascript import { pipeline } from '@datastream/core' import { fetchResponseStream } from '@datastream/fetch' import { ndjsonParseStream } from '@datastream/json' import { validateStream } from '@datastream/validate' import { objectCountStream } from '@datastream/object' import { jsonFormatStream } from '@datastream/json' import { fileWriteStream } from '@datastream/file' const count = objectCountStream() const parse = ndjsonParseStream() const result = await pipeline([ fetchResponseStream({ url: 'https://api.example.com/data.ndjson' }), parse, validateStream({ schema }), count, jsonFormatStream(), fileWriteStream({ path: './output.json' }), ]) console.log(result.count) // number of objects processed console.log(result.jsonErrors) // any parse errors ``` // File: packages/object/ --- title: object description: Transform, reshape, filter, and aggregate object streams. --- Transform, reshape, filter, and aggregate object streams. ## Install ```bash npm install @datastream/object ``` ## `objectReadableStream` Readable Creates a Readable stream from an array of objects. ```javascript import { objectReadableStream } from '@datastream/object' const stream = objectReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]) ``` ## `objectCountStream` PassThrough Counts the number of chunks that pass through. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `resultKey` | `string` | `"count"` | Key in pipeline result | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectCountStream } from '@datastream/object' const count = objectCountStream() const result = await pipeline([ createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]), count, ]) console.log(result) // { count: 3 } ``` ## `objectFromEntriesStream` Transform Converts arrays to objects using column names. Commonly used with `csvParseStream` output. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[] \| () => string[]` | — | Column names, or lazy function | ### Example ```javascript import { objectFromEntriesStream } from '@datastream/object' // Converts ['Alice', '30', 'Toronto'] → { name: 'Alice', age: '30', city: 'Toronto' } objectFromEntriesStream({ keys: ['name', 'age', 'city'] }) // With lazy keys from csvDetectHeaderStream objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }) ``` ## `objectPickStream` Transform Keeps only the specified keys on each object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Keys to keep | ### Example ```javascript import { objectPickStream } from '@datastream/object' // { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice', age: 30 } objectPickStream({ keys: ['name', 'age'] }) ``` ## `objectOmitStream` Transform Removes the specified keys from each object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Keys to remove | ### Example ```javascript import { objectOmitStream } from '@datastream/object' // { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice' } objectOmitStream({ keys: ['age', 'city'] }) ``` ## `objectKeyMapStream` Transform Renames keys on each object. Unmapped keys are kept as-is. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `object` | — | Map of `{ oldKey: 'newKey' }` | ### Example ```javascript import { objectKeyMapStream } from '@datastream/object' // { firstName: 'Alice', lastName: 'Smith' } → { first_name: 'Alice', last_name: 'Smith' } objectKeyMapStream({ keys: { firstName: 'first_name', lastName: 'last_name' } }) ``` ## `objectValueMapStream` Transform Maps values for a specific key using a lookup table. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `key` | `string` | — | Key whose value to map | | `values` | `object` | — | Map of `{ oldValue: newValue }` | ### Example ```javascript import { objectValueMapStream } from '@datastream/object' // { status: 'A' } → { status: 'Active' } objectValueMapStream({ key: 'status', values: { A: 'Active', I: 'Inactive' } }) ``` ## `objectKeyJoinStream` Transform Joins multiple keys into new combined keys. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `object` | — | Map of `{ newKey: ['key1', 'key2'] }` | | `separator` | `string` | — | Separator for joined values | | `isNestedObject` | `boolean` | `false` | Use `structuredClone` instead of shallow spread for cloning | ### Example ```javascript import { objectKeyJoinStream } from '@datastream/object' // { first: 'Alice', last: 'Smith', age: 30 } → { name: 'Alice Smith', age: 30 } objectKeyJoinStream({ keys: { name: ['first', 'last'] }, separator: ' ' }) ``` ## `objectKeyValueStream` Transform Creates a key-value pair object from two fields. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `key` | `string` | — | Field to use as the key | | `value` | `string` | — | Field to use as the value | ### Example ```javascript import { objectKeyValueStream } from '@datastream/object' // { code: 'CA', country: 'Canada' } → { CA: 'Canada' } objectKeyValueStream({ key: 'code', value: 'country' }) ``` ## `objectKeyValuesStream` Transform Creates a key-values pair object — the key comes from a field, the value is a subset of the object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `key` | `string` | — | Field to use as the key | | `values` | `string[]` | — | Fields to include in value object. If omitted, entire object is used | ### Example ```javascript import { objectKeyValuesStream } from '@datastream/object' // { id: 'u1', name: 'Alice', age: 30 } → { u1: { name: 'Alice', age: 30 } } objectKeyValuesStream({ key: 'id', values: ['name', 'age'] }) ``` ## `objectBatchStream` Transform Groups consecutive objects with the same key values into arrays. Use with `objectPivotLongToWideStream`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Keys to group by | ### Example ```javascript import { objectBatchStream } from '@datastream/object' // Input: [{id:1,k:'a'}, {id:1,k:'b'}, {id:2,k:'c'}] // Output: [[{id:1,k:'a'}, {id:1,k:'b'}]], [[{id:2,k:'c'}]] objectBatchStream({ keys: ['id'] }) ``` ## `objectPivotLongToWideStream` Transform Pivots batched arrays from long to wide format. Must be used after `objectBatchStream`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Pivot key columns | | `valueParam` | `string` | — | Column containing the values | | `delimiter` | `string` | `" "` | Separator for combined key names | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectBatchStream, objectPivotLongToWideStream } from '@datastream/object' // Input: [{id:1, metric:'temp', value:20}, {id:1, metric:'humidity', value:60}] // Output: {id:1, temp:20, humidity:60} await pipeline([ createReadableStream(data), objectBatchStream({ keys: ['id'] }), objectPivotLongToWideStream({ keys: ['metric'], valueParam: 'value' }), ]) ``` ## `objectPivotWideToLongStream` Transform Pivots wide format to long format. Emits multiple chunks per input object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Columns to unpivot | | `keyParam` | `string` | `"keyParam"` | Name for the new key column | | `valueParam` | `string` | `"valueParam"` | Name for the new value column | | `isNestedObject` | `boolean` | `false` | Use `structuredClone` instead of shallow spread for cloning | ### Example ```javascript import { objectPivotWideToLongStream } from '@datastream/object' // Input: { id: 1, temp: 20, humidity: 60 } // Output: { id: 1, keyParam: 'temp', valueParam: 20 }, { id: 1, keyParam: 'humidity', valueParam: 60 } objectPivotWideToLongStream({ keys: ['temp', 'humidity'], keyParam: 'metric', valueParam: 'value', }) ``` ## `objectSkipConsecutiveDuplicatesStream` Transform Skips consecutive duplicate objects. Uses shallow equality by default (compares top-level values with `===`). ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `isNestedObject` | `boolean` | `false` | Use `JSON.stringify` for deep comparison instead of shallow equality | ### Example ```javascript import { objectSkipConsecutiveDuplicatesStream } from '@datastream/object' // Shallow (default, fast) — compares top-level values // Input: [{a:1}, {a:1}, {a:2}, {a:1}] // Output: [{a:1}, {a:2}, {a:1}] objectSkipConsecutiveDuplicatesStream() // Deep — compares nested objects via JSON.stringify // Input: [{a:{b:1}}, {a:{b:1}}, {a:{b:2}}] // Output: [{a:{b:1}}, {a:{b:2}}] objectSkipConsecutiveDuplicatesStream({ isNestedObject: true }) ``` // File: packages/string/ --- title: string description: String manipulation streams for splitting, replacing, counting, and measuring text. --- String manipulation streams — split, replace, count, and measure text data. ## Install ```bash npm install @datastream/string ``` ## `stringReadableStream` Readable Creates a Readable stream from a string input. ```javascript import { stringReadableStream } from '@datastream/string' const stream = stringReadableStream('hello world') ``` ## `stringLengthStream` PassThrough Counts the total character length of all chunks. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `resultKey` | `string` | `"length"` | Key in pipeline result | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { stringLengthStream } from '@datastream/string' const length = stringLengthStream() const result = await pipeline([ createReadableStream('hello world'), length, ]) console.log(result) // { length: 11 } ``` ## `stringCountStream` PassThrough Counts occurrences of a substring across all chunks. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `substr` | `string` | — | Substring to count | | `resultKey` | `string` | `"count"` | Key in pipeline result | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { stringCountStream } from '@datastream/string' const count = stringCountStream({ substr: '\n' }) const result = await pipeline([ createReadableStream('line1\nline2\nline3'), count, ]) console.log(result) // { count: 2 } ``` ## `stringSplitStream` Transform Splits streaming text by a separator, emitting one chunk per segment. Handles splits that cross chunk boundaries. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `separator` | `string` | — | String to split on | ### Example ```javascript import { pipeline, createReadableStream, streamToArray, pipejoin } from '@datastream/core' import { stringSplitStream } from '@datastream/string' const river = pipejoin([ createReadableStream('alice,bob,charlie'), stringSplitStream({ separator: ',' }), ]) const output = await streamToArray(river) // ['alice', 'bob', 'charlie'] ``` ## `stringReplaceStream` Transform Replaces pattern matches in streaming text. Handles replacements that span chunk boundaries. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `pattern` | `string \| RegExp` | — | Pattern to search for | | `replacement` | `string` | — | Replacement string | ### Example ```javascript import { stringReplaceStream } from '@datastream/string' stringReplaceStream({ pattern: /\t/g, replacement: ',' }) ``` ## `stringMinimumFirstChunkSize` Transform Buffers data until the first chunk meets a minimum size, then passes all subsequent chunks through unchanged. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` (1KB) | Minimum first chunk size in characters | ## `stringMinimumChunkSize` Transform Buffers every chunk to meet a minimum size before emitting. Unlike `stringMinimumFirstChunkSize` which only buffers the first chunk then passes through, this continues buffering all subsequent chunks that are smaller than `chunkSize`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` (1KB) | Minimum chunk size in characters | ## `stringSkipConsecutiveDuplicates` Transform Skips consecutive duplicate string chunks. ```javascript import { stringSkipConsecutiveDuplicates } from '@datastream/string' // Input chunks: 'a', 'a', 'b', 'a' → Output: 'a', 'b', 'a' ``` // File: packages/validate/ --- title: validate description: JSON Schema validation for object streams using Ajv. --- JSON Schema validation for object streams using Ajv. ## Install ```bash npm install @datastream/validate ``` ## `validateStream` Transform Validates each object chunk against a JSON Schema. Invalid rows are dropped by default; errors are collected in `.result()`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `schema` | `object \| function` | — | JSON Schema object, or pre-compiled validation function | | `idxStart` | `number` | `0` | Starting row index for error tracking | | `onErrorEnqueue` | `boolean` | `false` | If `true`, invalid rows are kept in the stream | | `allowCoerceTypes` | `boolean` | `true` | If `false`, emits original data without Ajv coercion | | `resultKey` | `string` | `"validate"` | Key in pipeline result | ### Result Errors grouped by schema path: ```javascript { '#/required': { id: '#/required', keys: ['email'], message: "must have required property 'email'", idx: [3, 7, 15] }, '#/properties/age/type': { id: '#/properties/age/type', keys: ['age'], message: 'must be number', idx: [5] } } ``` ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { validateStream } from '@datastream/validate' const schema = { type: 'object', required: ['name', 'age'], properties: { name: { type: 'string', minLength: 1 }, age: { type: 'number', minimum: 0 }, email: { type: 'string', format: 'email' }, }, additionalProperties: false, } const result = await pipeline([ createReadableStream([ { name: 'Alice', age: 30, email: 'alice@example.com' }, { name: '', age: -1 }, { name: 'Bob', age: 25 }, ]), validateStream({ schema }), ]) console.log(result.validate) // Errors for row 1 (name minLength, age minimum) ``` ### Keep invalid rows ```javascript validateStream({ schema, onErrorEnqueue: true }) ``` ### Pre-compiled schema For better cold-start performance, pre-compile your schema during the build step: ```javascript import { transpileSchema, validateStream } from '@datastream/validate' const validate = transpileSchema(schema) validateStream({ schema: validate }) ``` ## `transpileSchema` Pre-compiles a JSON Schema into a validation function using Ajv. ### Ajv defaults | Option | Default | Description | |--------|---------|-------------| | `strict` | `true` | Strict mode | | `coerceTypes` | `true` | Coerce types to match schema | | `allErrors` | `true` | Report all errors, not just the first | | `useDefaults` | `"empty"` | Apply defaults for missing/empty values | | `messages` | `true` | Include error messages | ```javascript import { transpileSchema } from '@datastream/validate' const validate = transpileSchema(schema, { coerceTypes: false }) ``` // File: quick-start/ --- title: Quick Start description: Install datastream and build your first stream pipeline in minutes. --- ## Install ```bash npm install @datastream/core ``` Add packages as needed: ```bash npm install @datastream/csv @datastream/validate @datastream/compress @datastream/file ``` ## Your first pipeline Read a CSV string, parse it, and collect the results: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' import { objectFromEntriesStream, objectCountStream } from '@datastream/object' const csvData = 'name,age,city\r\nAlice,30,Toronto\r\nBob,25,Vancouver\r\nCharlie,35,Montreal' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const count = objectCountStream() const result = await pipeline([ createReadableStream(csvData), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), count, ]) console.log(result) // { csvDetectDelimiters: { delimiterChar: ',', ... }, csvDetectHeader: { header: ['name','age','city'] }, count: 3 } ``` ## Add validation Extend the pipeline with schema validation using JSON Schema: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' const schema = { type: 'object', required: ['name', 'age', 'city'], properties: { name: { type: 'string' }, age: { type: 'number' }, city: { type: 'string' }, }, additionalProperties: false, } const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ createReadableStream(csvData), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), validateStream({ schema }), ]) console.log(result) // { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, validate: {} } ``` ## Add file I/O Read from and write to files (Node.js): ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvFormatStream } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' import { gzipCompressStream } from '@datastream/compress' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ fileReadStream({ path: './input.csv' }), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), validateStream({ schema }), csvFormatStream({ header: true }), gzipCompressStream(), fileWriteStream({ path: './output.csv.gz' }), ]) ``` ## Next steps - Learn about [Core Concepts](/docs/core-concepts) — stream types, pipeline patterns, and error handling - Browse [Recipes](/docs/recipes) for complete real-world examples - Explore the [core](/docs/packages/core) package API reference