aws

AWS service streams for CloudWatch Logs, DynamoDB, Kinesis, Lambda, S3, SNS, and SQS. Node.js only.

Install

npm install @datastream/aws

Requires the corresponding AWS SDK v3 client packages:

npm install @aws-sdk/client-cloudwatch-logs
npm install @aws-sdk/client-dynamodb
npm install @aws-sdk/client-kinesis
npm install @aws-sdk/client-lambda
npm install @aws-sdk/client-s3 @aws-sdk/lib-storage
npm install @aws-sdk/client-sns
npm install @aws-sdk/client-sqs

CloudWatch Logs

awsCloudWatchLogsSetClient

Mutates module-level state — not safe for concurrent multi-tenant use.

import { CloudWatchLogsClient } from '@aws-sdk/client-cloudwatch-logs'
import { awsCloudWatchLogsSetClient } from '@datastream/aws'

awsCloudWatchLogsSetClient(new CloudWatchLogsClient({ region: 'us-east-1' }))

awsCloudWatchLogsGetLogEventsStream Readable async

Gets log events from a CloudWatch Logs log stream. Auto-paginates until no new events are returned.

Options

Accepts GetLogEventsCommand parameters plus:

OptionTypeDefaultDescription
logGroupNamestringLog group name
logStreamNamestringLog stream name
pollingActivebooleanfalseKeep polling for new events
pollingDelaynumber1000Delay (ms) between polls when no new events

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsCloudWatchLogsGetLogEventsStream } from '@datastream/aws'

await pipeline([
  createReadableStream(await awsCloudWatchLogsGetLogEventsStream({
    logGroupName: '/aws/lambda/my-function',
    logStreamName: 'stream-id',
  })),
])

awsCloudWatchLogsFilterLogEventsStream Readable async

Filters log events across log streams in a log group. Auto-paginates through all matching results.

Options

Accepts FilterLogEventsCommand parameters:

OptionTypeDescription
logGroupNamestringLog group name
filterPatternstringCloudWatch Logs filter pattern
startTimenumberStart of time range (epoch ms)
endTimenumberEnd of time range (epoch ms)

Example

import { awsCloudWatchLogsFilterLogEventsStream } from '@datastream/aws'

const events = await awsCloudWatchLogsFilterLogEventsStream({
  logGroupName: '/aws/lambda/my-function',
  filterPattern: 'ERROR',
})

S3

awsS3SetClient

Set a custom S3 client. By default, FIPS endpoints are enabled for US and CA regions. Mutates module-level state — not safe for concurrent multi-tenant use.

import { S3Client } from '@aws-sdk/client-s3'
import { awsS3SetClient } from '@datastream/aws'

awsS3SetClient(new S3Client({ region: 'eu-west-1' }))

awsS3GetObjectStream Readable async

Downloads an object from S3 as a stream.

Options

Accepts all GetObjectCommand parameters plus:

OptionTypeDefaultDescription
clientS3Clientdefault clientCustom S3 client for this call
BucketstringS3 bucket name
KeystringS3 object key

Example

import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'

await pipeline([
  await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'data.csv' }),
  csvParseStream(),
])

awsS3PutObjectStream PassThrough

Uploads data to S3 using multipart upload.

Options

Accepts all S3 PutObject parameters plus:

OptionTypeDefaultDescription
clientS3Clientdefault clientCustom S3 client
BucketstringS3 bucket name
KeystringS3 object key
onProgressfunctionUpload progress callback
tagsobjectS3 object tags

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsS3PutObjectStream } from '@datastream/aws'
import { gzipCompressStream } from '@datastream/compress'

await pipeline([
  createReadableStream(data),
  gzipCompressStream(),
  awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
])

awsS3ChecksumStream PassThrough

Computes a multi-part S3 checksum while data passes through. Designed for pre-signed URL uploads in the browser.

Options

OptionTypeDefaultDescription
ChecksumAlgorithmstring"SHA256""SHA1" or "SHA256"
partSizenumber17179870Part size in bytes
resultKeystring"s3"Key in pipeline result

Result

{ checksum: 'base64hash-3', checksums: ['part1hash', 'part2hash', 'part3hash'], partSize: 17179870 }

DynamoDB

awsDynamoDBSetClient

Mutates module-level state — not safe for concurrent multi-tenant use.

import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { awsDynamoDBSetClient } from '@datastream/aws'

awsDynamoDBSetClient(new DynamoDBClient({ region: 'us-east-1' }))

awsDynamoDBQueryStream Readable async

Queries a DynamoDB table and auto-paginates through all results.

Options

Accepts all QueryCommand parameters:

OptionTypeDescription
TableNamestringDynamoDB table name
KeyConditionExpressionstringQuery key condition
ExpressionAttributeValuesobjectExpression values

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBQueryStream } from '@datastream/aws'

await pipeline([
  createReadableStream(await awsDynamoDBQueryStream({
    TableName: 'Users',
    KeyConditionExpression: 'PK = :pk',
    ExpressionAttributeValues: { ':pk': { S: 'USER#123' } },
  })),
])

awsDynamoDBScanStream Readable async

Scans an entire DynamoDB table with automatic pagination.

import { awsDynamoDBScanStream } from '@datastream/aws'

const items = await awsDynamoDBScanStream({ TableName: 'Users' })

awsDynamoDBExecuteStatementStream Readable async

Executes a PartiQL statement against DynamoDB with automatic pagination.

Options

Accepts ExecuteStatementCommand parameters:

OptionTypeDescription
StatementstringPartiQL statement
Parametersobject[]Statement parameters

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBExecuteStatementStream } from '@datastream/aws'

await pipeline([
  createReadableStream(await awsDynamoDBExecuteStatementStream({
    Statement: 'SELECT * FROM "Users" WHERE PK = ?',
    Parameters: [{ S: 'USER#123' }],
  })),
])

awsDynamoDBGetItemStream Readable async

Batch gets items by keys. Automatically retries unprocessed keys with exponential backoff.

Options

OptionTypeDefaultDescription
TableNamestringDynamoDB table name
Keysobject[]Array of key objects
retryMaxCountnumber10Maximum retry attempts

awsDynamoDBPutItemStream Writable

Writes items to DynamoDB using BatchWriteItem. Automatically batches 25 items per request and retries unprocessed items with exponential backoff.

Options

OptionTypeDescription
TableNamestringDynamoDB table name
retryMaxCountnumberMaximum retry attempts (default 10)

Example

import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsDynamoDBPutItemStream } from '@datastream/aws'

await pipeline([
  createReadableStream(items),
  createTransformStream((item, enqueue) => {
    enqueue({
      PK: { S: `USER#${item.id}` },
      SK: { S: 'PROFILE' },
      name: { S: item.name },
    })
  }),
  awsDynamoDBPutItemStream({ TableName: 'Users' }),
])

awsDynamoDBDeleteItemStream Writable

Deletes items from DynamoDB using BatchWriteItem. Batches 25 items per request.

Options

OptionTypeDescription
TableNamestringDynamoDB table name
retryMaxCountnumberMaximum retry attempts (default 10)

Example

import { awsDynamoDBDeleteItemStream } from '@datastream/aws'

awsDynamoDBDeleteItemStream({ TableName: 'Users' })
// Input chunks: { PK: { S: 'USER#1' }, SK: { S: 'PROFILE' } }

Kinesis

awsKinesisSetClient

Mutates module-level state — not safe for concurrent multi-tenant use.

import { KinesisClient } from '@aws-sdk/client-kinesis'
import { awsKinesisSetClient } from '@datastream/aws'

awsKinesisSetClient(new KinesisClient({ region: 'us-east-1' }))

awsKinesisGetRecordsStream Readable async

Gets records from a Kinesis shard. Polls until no more records are returned.

Options

Accepts GetRecordsCommand parameters plus:

OptionTypeDefaultDescription
ShardIteratorstringShard iterator
pollingActivebooleanfalseKeep polling for new records
pollingDelaynumber1000Delay (ms) between polls when no records

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsKinesisGetRecordsStream } from '@datastream/aws'

await pipeline([
  createReadableStream(await awsKinesisGetRecordsStream({
    ShardIterator: 'AAA...',
  })),
])

awsKinesisPutRecordsStream Writable

Writes records to a Kinesis stream. Batches 500 records per PutRecordsCommand.

Options

OptionTypeDescription
StreamNamestringKinesis stream name

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsKinesisPutRecordsStream } from '@datastream/aws'

await pipeline([
  createReadableStream(records),
  awsKinesisPutRecordsStream({ StreamName: 'my-stream' }),
])

Lambda

awsLambdaSetClient

Mutates module-level state — not safe for concurrent multi-tenant use.

import { LambdaClient } from '@aws-sdk/client-lambda'
import { awsLambdaSetClient } from '@datastream/aws'

awsLambdaSetClient(new LambdaClient({ region: 'us-east-1' }))

awsLambdaReadableStream Readable

Invokes a Lambda function with response streaming (InvokeWithResponseStream).

Also exported as awsLambdaResponseStream.

Options

Accepts InvokeWithResponseStreamCommand parameters. Pass an array to invoke multiple functions sequentially.

OptionTypeDescription
FunctionNamestringLambda function name or ARN
PayloadstringJSON payload

Example

import { pipeline } from '@datastream/core'
import { awsLambdaReadableStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'

await pipeline([
  awsLambdaReadableStream({
    FunctionName: 'data-processor',
    Payload: JSON.stringify({ key: 'input.csv' }),
  }),
  csvParseStream(),
])

SNS

awsSNSSetClient

Mutates module-level state — not safe for concurrent multi-tenant use.

import { SNSClient } from '@aws-sdk/client-sns'
import { awsSNSSetClient } from '@datastream/aws'

awsSNSSetClient(new SNSClient({ region: 'us-east-1' }))

awsSNSPublishMessageStream Writable

Publishes messages to an SNS topic. Batches 10 messages per PublishBatchCommand.

Options

OptionTypeDescription
TopicArnstringSNS topic ARN

Example

import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsSNSPublishMessageStream } from '@datastream/aws'

await pipeline([
  createReadableStream(events),
  createTransformStream((event, enqueue) => {
    enqueue({
      Id: event.id,
      Message: JSON.stringify(event),
    })
  }),
  awsSNSPublishMessageStream({ TopicArn: 'arn:aws:sns:us-east-1:123:my-topic' }),
])

SQS

awsSQSSetClient

Mutates module-level state — not safe for concurrent multi-tenant use.

import { SQSClient } from '@aws-sdk/client-sqs'
import { awsSQSSetClient } from '@datastream/aws'

awsSQSSetClient(new SQSClient({ region: 'us-east-1' }))

awsSQSReceiveMessageStream Readable async

Polls an SQS queue and yields messages until the queue is empty.

Options

Accepts ReceiveMessageCommand parameters plus:

OptionTypeDefaultDescription
QueueUrlstringSQS queue URL
MaxNumberOfMessagesnumberMax messages per poll (1-10)
pollingActivebooleanfalseKeep polling even when queue is empty
pollingDelaynumber1000Delay (ms) between polls when queue is empty

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsSQSReceiveMessageStream, awsSQSDeleteMessageStream } from '@datastream/aws'

await pipeline([
  createReadableStream(await awsSQSReceiveMessageStream({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  })),
  awsSQSDeleteMessageStream({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  }),
])

awsSQSSendMessageStream Writable

Sends messages to an SQS queue. Batches 10 messages per SendMessageBatchCommand.

Options

OptionTypeDescription
QueueUrlstringSQS queue URL

awsSQSDeleteMessageStream Writable

Deletes messages from an SQS queue. Batches 10 messages per DeleteMessageBatchCommand.

Options

OptionTypeDescription
QueueUrlstringSQS queue URL