json

JSON and NDJSON (Newline-Delimited JSON / JSON Lines) parsing and formatting streams.

Install

npm install @datastream/json

ndjsonParseStream Transform

Parses NDJSON (one JSON value per line) into individual JavaScript values. Splits on \n, parses each line with JSON.parse, and skips empty lines.

Options

OptionTypeDefaultDescription
maxBufferSizenumber16777216 (16MB)Maximum buffer size for incomplete lines
resultKeystring"jsonErrors"Key in pipeline result

Result

Parse errors collected during processing:

{
  ParseError: { id: 'ParseError', message: 'Invalid JSON', idx: [3, 7] }
}

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { ndjsonParseStream } from '@datastream/json'

const parse = ndjsonParseStream()

const result = await pipeline([
  createReadableStream('{"name":"Alice"}\n{"name":"Bob"}\n'),
  parse,
])

console.log(result.jsonErrors)
// {}

ndjsonFormatStream Transform

Formats objects as NDJSON text (one JSON.stringify per line). Batches output for throughput.

Options

OptionTypeDefaultDescription
spacenumber \| stringPassed to JSON.stringify for pretty-printing each line

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { ndjsonFormatStream } from '@datastream/json'

await pipeline([
  createReadableStream([
    { name: 'Alice', age: 30 },
    { name: 'Bob', age: 25 },
  ]),
  ndjsonFormatStream(),
])

// Output: {"name":"Alice","age":30}\n{"name":"Bob","age":25}\n

jsonParseStream Transform

Parses a streaming JSON array ([{...},{...}]) into individual elements. Uses a state machine to track nesting depth, strings, and escapes across chunk boundaries — no external dependencies.

Options

OptionTypeDefaultDescription
maxBufferSizenumber16777216 (16MB)Maximum buffer size for incomplete elements
maxValueSizenumber16777216 (16MB)Maximum size of a single JSON element
resultKeystring"jsonErrors"Key in pipeline result

Buffer size protection

A JSON array with deeply nested or very large objects can cause the parser to buffer significant amounts of data. Setting maxBufferSize caps total memory and maxValueSize caps per-element memory. Lower these from defaults when parsing untrusted input.

// Parse with a 1MB buffer limit for untrusted input
jsonParseStream({ maxBufferSize: 1 * 1024 * 1024 })

Result

Parse errors collected during processing:

{
  ParseError: { id: 'ParseError', message: 'Invalid JSON', idx: [2] }
}

Example

import { pipeline, createReadableStream, streamToArray } from '@datastream/core'
import { jsonParseStream } from '@datastream/json'

const streams = [
  createReadableStream('[{"name":"Alice"},{"name":"Bob"}]'),
  jsonParseStream(),
]

const output = await streamToArray(pipejoin(streams))
// [{ name: 'Alice' }, { name: 'Bob' }]

jsonFormatStream Transform

Formats objects as a JSON array string ([{...},{...}]). Emits [ with the first element, ,\n between elements, and ] on flush. Outputs [] if no elements.

Options

OptionTypeDefaultDescription
spacenumber \| stringPassed to JSON.stringify for pretty-printing

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { jsonFormatStream } from '@datastream/json'

await pipeline([
  createReadableStream([
    { name: 'Alice', age: 30 },
    { name: 'Bob', age: 25 },
  ]),
  jsonFormatStream({ space: 2 }),
])

// Output: [{\n  "name": "Alice",\n  "age": 30\n},\n{\n  "name": "Bob",\n  "age": 25\n}\n]

Full pipeline example

Fetch NDJSON API, validate, and write as JSON array:

import { pipeline } from '@datastream/core'
import { fetchResponseStream } from '@datastream/fetch'
import { ndjsonParseStream } from '@datastream/json'
import { validateStream } from '@datastream/validate'
import { objectCountStream } from '@datastream/object'
import { jsonFormatStream } from '@datastream/json'
import { fileWriteStream } from '@datastream/file'

const count = objectCountStream()
const parse = ndjsonParseStream()

const result = await pipeline([
  fetchResponseStream({ url: 'https://api.example.com/data.ndjson' }),
  parse,
  validateStream({ schema }),
  count,
  jsonFormatStream(),
  fileWriteStream({ path: './output.json' }),
])

console.log(result.count)        // number of objects processed
console.log(result.jsonErrors)   // any parse errors