csv

Parse, format, detect, clean, and coerce CSV data.

Install

npm install @datastream/csv

csvDetectDelimitersStream PassThrough

Auto-detects the delimiter, newline, quote, and escape characters from the first chunk of data.

Options

OptionTypeDefaultDescription
chunkSizenumber1024 (1KB)Minimum bytes to buffer before detecting
resultKeystring"csvDetectDelimiters"Key in pipeline result

Result

{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { csvDetectDelimitersStream } from '@datastream/csv'

const detect = csvDetectDelimitersStream()

const result = await pipeline([
  createReadableStream('name\tage\r\nAlice\t30'),
  detect,
])

console.log(result.csvDetectDelimiters)
// { delimiterChar: '\t', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }

csvDetectHeaderStream Transform

Detects and strips the header row. Outputs data rows only (without the header).

Options

OptionTypeDefaultDescription
chunkSizenumber1024 (1KB)Minimum bytes to buffer before detecting
delimiterCharstring \| () => string","Delimiter character or lazy function
newlineCharstring \| () => string"\r\n"Newline character or lazy function
quoteCharstring \| () => string'"'Quote character or lazy function
escapeCharstring \| () => stringquoteCharEscape character or lazy function
parserfunctioncsvQuotedParserCustom parser function
resultKeystring"csvDetectHeader"Key in pipeline result

Result

{ header: ['name', 'age', 'city'] }

Example

import { csvDetectDelimitersStream, csvDetectHeaderStream } from '@datastream/csv'

const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
  delimiterChar: () => detectDelimiters.result().value.delimiterChar,
  newlineChar: () => detectDelimiters.result().value.newlineChar,
  quoteChar: () => detectDelimiters.result().value.quoteChar,
  escapeChar: () => detectDelimiters.result().value.escapeChar,
})

csvParseStream Transform

Parses CSV text into arrays of field values (string arrays). Each output chunk is one row as string[].

Options

OptionTypeDefaultDescription
chunkSizenumber2097152 (2MB)Input buffer size before first parse
fieldMaxSizenumber16777216 (16MB)Maximum size of a single field in bytes
delimiterCharstring \| () => string","Delimiter character or lazy function
newlineCharstring \| () => string"\r\n"Newline character or lazy function
quoteCharstring \| () => string'"'Quote character or lazy function
escapeCharstring \| () => stringquoteCharEscape character or lazy function
parserfunctioncsvQuotedParserCustom parser function
resultKeystring"csvErrors"Key in pipeline result

Field size protection

A crafted CSV with an unterminated quoted field causes the parser to buffer the entire remaining input into a single field, consuming unbounded memory. An attacker can exploit this to exhaust process memory with a relatively small file. Setting fieldMaxSize caps per-field memory and aborts parsing with an error when exceeded. Always set this when parsing untrusted CSV input, and lower it from the default if your data has known field size bounds.

// Parse with a 1MB field limit for untrusted input
csvParseStream({ fieldMaxSize: 1 * 1024 * 1024 })

Result

Parse errors collected during processing:

{
  UnterminatedQuote: { id: 'UnterminatedQuote', message: 'Unterminated quoted field', idx: [5] }
}

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'

const result = await pipeline([
  createReadableStream('a,b,c\r\n1,2,3\r\n4,5,6'),
  csvParseStream(),
])

// Chunks emitted: ['a','b','c'], ['1','2','3'], ['4','5','6']

csvFormatStream Transform

Formats objects or arrays back to CSV text.

Options

OptionTypeDefaultDescription
headerboolean \| string[]true to auto-detect from object keys, or provide array of column names
delimiterCharstring","Delimiter character
newlineCharstring"\r\n"Newline character
quoteCharstring'"'Quote character
escapeCharstringquoteCharEscape character

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { csvFormatStream } from '@datastream/csv'

await pipeline([
  createReadableStream([
    { name: 'Alice', age: 30 },
    { name: 'Bob', age: 25 },
  ]),
  csvFormatStream({ header: true }),
])

// Output: "name","age"\r\n"Alice","30"\r\n"Bob","25"\r\n

csvRemoveEmptyRowsStream Transform

Removes rows where all fields are empty strings.

Options

OptionTypeDefaultDescription
onErrorEnqueuebooleanfalseIf true, empty rows are kept in stream (but still tracked)
resultKeystring"csvRemoveEmptyRows"Key in pipeline result

Result

{ EmptyRow: { id: 'EmptyRow', message: 'Row is empty', idx: [3, 7] } }

csvRemoveMalformedRowsStream Transform

Removes rows with an incorrect number of fields compared to the first row or the provided header.

Options

OptionTypeDefaultDescription
headersstring[] \| () => string[]Expected header array, or lazy function. If not provided, uses first row’s field count
onErrorEnqueuebooleanfalseIf true, malformed rows are kept in stream
resultKeystring"csvRemoveMalformedRows"Key in pipeline result

Result

{ MalformedRow: { id: 'MalformedRow', message: 'Row has incorrect number of fields', idx: [2] } }

csvCoerceValuesStream Transform

Converts string field values to typed JavaScript values. Works on objects (use after objectFromEntriesStream).

Options

OptionTypeDefaultDescription
columnsobjectMap of column names to types. Without this, auto-coercion is used
resultKeystring"csvCoerceValues"Key in pipeline result

Auto-coercion rules

InputOutput
""null
"true" / "false"true / false
Numeric stringsNumber
ISO 8601 date stringsDate
JSON strings ({...}, [...])Parsed object/array

Column types

When specifying columns, valid types are: "number", "boolean", "null", "date", "json".

csvCoerceValuesStream({
  columns: { age: 'number', active: 'boolean', birthday: 'date' }
})

csvQuotedParser / csvUnquotedParser

Standalone parser functions for use outside of streams. csvUnquotedParser is faster but does not handle quoted fields.

import { csvQuotedParser } from '@datastream/csv'

const { rows, tail, numCols, idx, errors } = csvQuotedParser(
  'a,b,c\r\n1,2,3\r\n',
  { delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"' },
  true
)
// rows: [['a','b','c'], ['1','2','3']]

Full pipeline example

Detect, parse, clean, coerce, and validate:

import { pipeline, createReadableStream } from '@datastream/core'
import {
  csvDetectDelimitersStream,
  csvDetectHeaderStream,
  csvParseStream,
  csvRemoveEmptyRowsStream,
  csvRemoveMalformedRowsStream,
  csvCoerceValuesStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'

const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
  delimiterChar: () => detectDelimiters.result().value.delimiterChar,
  newlineChar: () => detectDelimiters.result().value.newlineChar,
  quoteChar: () => detectDelimiters.result().value.quoteChar,
  escapeChar: () => detectDelimiters.result().value.escapeChar,
})

const result = await pipeline([
  createReadableStream(csvData),
  detectDelimiters,
  detectHeader,
  csvParseStream({
    delimiterChar: () => detectDelimiters.result().value.delimiterChar,
    newlineChar: () => detectDelimiters.result().value.newlineChar,
    quoteChar: () => detectDelimiters.result().value.quoteChar,
    escapeChar: () => detectDelimiters.result().value.escapeChar,
  }),
  csvRemoveEmptyRowsStream(),
  csvRemoveMalformedRowsStream({
    headers: () => detectHeader.result().value.header,
  }),
  objectFromEntriesStream({
    keys: () => detectHeader.result().value.header,
  }),
  csvCoerceValuesStream(),
  validateStream({ schema }),
])