aws
AWS service streams for CloudWatch Logs, DynamoDB, Kinesis, Lambda, S3, SNS, and SQS. Node.js only.
Install
npm install @datastream/aws Requires the corresponding AWS SDK v3 client packages:
npm install @aws-sdk/client-cloudwatch-logs
npm install @aws-sdk/client-dynamodb
npm install @aws-sdk/client-kinesis
npm install @aws-sdk/client-lambda
npm install @aws-sdk/client-s3 @aws-sdk/lib-storage
npm install @aws-sdk/client-sns
npm install @aws-sdk/client-sqs CloudWatch Logs
awsCloudWatchLogsSetClient
Mutates module-level state — not safe for concurrent multi-tenant use.
import { CloudWatchLogsClient } from '@aws-sdk/client-cloudwatch-logs'
import { awsCloudWatchLogsSetClient } from '@datastream/aws'
awsCloudWatchLogsSetClient(new CloudWatchLogsClient({ region: 'us-east-1' })) awsCloudWatchLogsGetLogEventsStream Readable async
Gets log events from a CloudWatch Logs log stream. Auto-paginates until no new events are returned.
Options
Accepts GetLogEventsCommand parameters plus:
| Option | Type | Default | Description |
|---|---|---|---|
logGroupName | string | — | Log group name |
logStreamName | string | — | Log stream name |
pollingActive | boolean | false | Keep polling for new events |
pollingDelay | number | 1000 | Delay (ms) between polls when no new events |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { awsCloudWatchLogsGetLogEventsStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsCloudWatchLogsGetLogEventsStream({
logGroupName: '/aws/lambda/my-function',
logStreamName: 'stream-id',
})),
]) awsCloudWatchLogsFilterLogEventsStream Readable async
Filters log events across log streams in a log group. Auto-paginates through all matching results.
Options
Accepts FilterLogEventsCommand parameters:
| Option | Type | Description |
|---|---|---|
logGroupName | string | Log group name |
filterPattern | string | CloudWatch Logs filter pattern |
startTime | number | Start of time range (epoch ms) |
endTime | number | End of time range (epoch ms) |
Example
import { awsCloudWatchLogsFilterLogEventsStream } from '@datastream/aws'
const events = await awsCloudWatchLogsFilterLogEventsStream({
logGroupName: '/aws/lambda/my-function',
filterPattern: 'ERROR',
}) S3
awsS3SetClient
Set a custom S3 client. By default, FIPS endpoints are enabled for US and CA regions. Mutates module-level state — not safe for concurrent multi-tenant use.
import { S3Client } from '@aws-sdk/client-s3'
import { awsS3SetClient } from '@datastream/aws'
awsS3SetClient(new S3Client({ region: 'eu-west-1' })) awsS3GetObjectStream Readable async
Downloads an object from S3 as a stream.
Options
Accepts all GetObjectCommand parameters plus:
| Option | Type | Default | Description |
|---|---|---|---|
client | S3Client | default client | Custom S3 client for this call |
Bucket | string | — | S3 bucket name |
Key | string | — | S3 object key |
Example
import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'
await pipeline([
await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'data.csv' }),
csvParseStream(),
]) awsS3PutObjectStream PassThrough
Uploads data to S3 using multipart upload.
Options
Accepts all S3 PutObject parameters plus:
| Option | Type | Default | Description |
|---|---|---|---|
client | S3Client | default client | Custom S3 client |
Bucket | string | — | S3 bucket name |
Key | string | — | S3 object key |
onProgress | function | — | Upload progress callback |
tags | object | — | S3 object tags |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { awsS3PutObjectStream } from '@datastream/aws'
import { gzipCompressStream } from '@datastream/compress'
await pipeline([
createReadableStream(data),
gzipCompressStream(),
awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
]) awsS3ChecksumStream PassThrough
Computes a multi-part S3 checksum while data passes through. Designed for pre-signed URL uploads in the browser.
Options
| Option | Type | Default | Description |
|---|---|---|---|
ChecksumAlgorithm | string | "SHA256" | "SHA1" or "SHA256" |
partSize | number | 17179870 | Part size in bytes |
resultKey | string | "s3" | Key in pipeline result |
Result
{ checksum: 'base64hash-3', checksums: ['part1hash', 'part2hash', 'part3hash'], partSize: 17179870 } DynamoDB
awsDynamoDBSetClient
Mutates module-level state — not safe for concurrent multi-tenant use.
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { awsDynamoDBSetClient } from '@datastream/aws'
awsDynamoDBSetClient(new DynamoDBClient({ region: 'us-east-1' })) awsDynamoDBQueryStream Readable async
Queries a DynamoDB table and auto-paginates through all results.
Options
Accepts all QueryCommand parameters:
| Option | Type | Description |
|---|---|---|
TableName | string | DynamoDB table name |
KeyConditionExpression | string | Query key condition |
ExpressionAttributeValues | object | Expression values |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBQueryStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsDynamoDBQueryStream({
TableName: 'Users',
KeyConditionExpression: 'PK = :pk',
ExpressionAttributeValues: { ':pk': { S: 'USER#123' } },
})),
]) awsDynamoDBScanStream Readable async
Scans an entire DynamoDB table with automatic pagination.
import { awsDynamoDBScanStream } from '@datastream/aws'
const items = await awsDynamoDBScanStream({ TableName: 'Users' }) awsDynamoDBExecuteStatementStream Readable async
Executes a PartiQL statement against DynamoDB with automatic pagination.
Options
Accepts ExecuteStatementCommand parameters:
| Option | Type | Description |
|---|---|---|
Statement | string | PartiQL statement |
Parameters | object[] | Statement parameters |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBExecuteStatementStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsDynamoDBExecuteStatementStream({
Statement: 'SELECT * FROM "Users" WHERE PK = ?',
Parameters: [{ S: 'USER#123' }],
})),
]) awsDynamoDBGetItemStream Readable async
Batch gets items by keys. Automatically retries unprocessed keys with exponential backoff.
Options
| Option | Type | Default | Description |
|---|---|---|---|
TableName | string | — | DynamoDB table name |
Keys | object[] | — | Array of key objects |
retryMaxCount | number | 10 | Maximum retry attempts |
awsDynamoDBPutItemStream Writable
Writes items to DynamoDB using BatchWriteItem. Automatically batches 25 items per request and retries unprocessed items with exponential backoff.
Options
| Option | Type | Description |
|---|---|---|
TableName | string | DynamoDB table name |
retryMaxCount | number | Maximum retry attempts (default 10) |
Example
import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsDynamoDBPutItemStream } from '@datastream/aws'
await pipeline([
createReadableStream(items),
createTransformStream((item, enqueue) => {
enqueue({
PK: { S: `USER#${item.id}` },
SK: { S: 'PROFILE' },
name: { S: item.name },
})
}),
awsDynamoDBPutItemStream({ TableName: 'Users' }),
]) awsDynamoDBDeleteItemStream Writable
Deletes items from DynamoDB using BatchWriteItem. Batches 25 items per request.
Options
| Option | Type | Description |
|---|---|---|
TableName | string | DynamoDB table name |
retryMaxCount | number | Maximum retry attempts (default 10) |
Example
import { awsDynamoDBDeleteItemStream } from '@datastream/aws'
awsDynamoDBDeleteItemStream({ TableName: 'Users' })
// Input chunks: { PK: { S: 'USER#1' }, SK: { S: 'PROFILE' } } Kinesis
awsKinesisSetClient
Mutates module-level state — not safe for concurrent multi-tenant use.
import { KinesisClient } from '@aws-sdk/client-kinesis'
import { awsKinesisSetClient } from '@datastream/aws'
awsKinesisSetClient(new KinesisClient({ region: 'us-east-1' })) awsKinesisGetRecordsStream Readable async
Gets records from a Kinesis shard. Polls until no more records are returned.
Options
Accepts GetRecordsCommand parameters plus:
| Option | Type | Default | Description |
|---|---|---|---|
ShardIterator | string | — | Shard iterator |
pollingActive | boolean | false | Keep polling for new records |
pollingDelay | number | 1000 | Delay (ms) between polls when no records |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { awsKinesisGetRecordsStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsKinesisGetRecordsStream({
ShardIterator: 'AAA...',
})),
]) awsKinesisPutRecordsStream Writable
Writes records to a Kinesis stream. Batches 500 records per PutRecordsCommand.
Options
| Option | Type | Description |
|---|---|---|
StreamName | string | Kinesis stream name |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { awsKinesisPutRecordsStream } from '@datastream/aws'
await pipeline([
createReadableStream(records),
awsKinesisPutRecordsStream({ StreamName: 'my-stream' }),
]) Lambda
awsLambdaSetClient
Mutates module-level state — not safe for concurrent multi-tenant use.
import { LambdaClient } from '@aws-sdk/client-lambda'
import { awsLambdaSetClient } from '@datastream/aws'
awsLambdaSetClient(new LambdaClient({ region: 'us-east-1' })) awsLambdaReadableStream Readable
Invokes a Lambda function with response streaming (InvokeWithResponseStream).
Also exported as awsLambdaResponseStream.
Options
Accepts InvokeWithResponseStreamCommand parameters. Pass an array to invoke multiple functions sequentially.
| Option | Type | Description |
|---|---|---|
FunctionName | string | Lambda function name or ARN |
Payload | string | JSON payload |
Example
import { pipeline } from '@datastream/core'
import { awsLambdaReadableStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'
await pipeline([
awsLambdaReadableStream({
FunctionName: 'data-processor',
Payload: JSON.stringify({ key: 'input.csv' }),
}),
csvParseStream(),
]) SNS
awsSNSSetClient
Mutates module-level state — not safe for concurrent multi-tenant use.
import { SNSClient } from '@aws-sdk/client-sns'
import { awsSNSSetClient } from '@datastream/aws'
awsSNSSetClient(new SNSClient({ region: 'us-east-1' })) awsSNSPublishMessageStream Writable
Publishes messages to an SNS topic. Batches 10 messages per PublishBatchCommand.
Options
| Option | Type | Description |
|---|---|---|
TopicArn | string | SNS topic ARN |
Example
import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsSNSPublishMessageStream } from '@datastream/aws'
await pipeline([
createReadableStream(events),
createTransformStream((event, enqueue) => {
enqueue({
Id: event.id,
Message: JSON.stringify(event),
})
}),
awsSNSPublishMessageStream({ TopicArn: 'arn:aws:sns:us-east-1:123:my-topic' }),
]) SQS
awsSQSSetClient
Mutates module-level state — not safe for concurrent multi-tenant use.
import { SQSClient } from '@aws-sdk/client-sqs'
import { awsSQSSetClient } from '@datastream/aws'
awsSQSSetClient(new SQSClient({ region: 'us-east-1' })) awsSQSReceiveMessageStream Readable async
Polls an SQS queue and yields messages until the queue is empty.
Options
Accepts ReceiveMessageCommand parameters plus:
| Option | Type | Default | Description |
|---|---|---|---|
QueueUrl | string | — | SQS queue URL |
MaxNumberOfMessages | number | — | Max messages per poll (1-10) |
pollingActive | boolean | false | Keep polling even when queue is empty |
pollingDelay | number | 1000 | Delay (ms) between polls when queue is empty |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { awsSQSReceiveMessageStream, awsSQSDeleteMessageStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsSQSReceiveMessageStream({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
})),
awsSQSDeleteMessageStream({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
}),
]) awsSQSSendMessageStream Writable
Sends messages to an SQS queue. Batches 10 messages per SendMessageBatchCommand.
Options
| Option | Type | Description |
|---|---|---|
QueueUrl | string | SQS queue URL |
awsSQSDeleteMessageStream Writable
Deletes messages from an SQS queue. Batches 10 messages per DeleteMessageBatchCommand.
Options
| Option | Type | Description |
|---|---|---|
QueueUrl | string | SQS queue URL |