Parquet — read and write

Read Parquet file into CSV

Read a Parquet file from S3, extract specific columns, and write as CSV:

import { pipeline, createReadableStream, streamToBuffer } from '@datastream/core'
import { awsS3GetObjectStream } from '@datastream/aws'
import { objectReadableStream } from '@datastream/object'
import { csvFormatStream } from '@datastream/csv'
import { fileWriteStream } from '@datastream/file'
import { parquetRead } from 'hyparquet'

const buffer = await streamToBuffer(
  await awsS3GetObjectStream({ Bucket: 'data-lake', Key: 'users.parquet' }),
)

const rows = []
await parquetRead({
  file: {
    byteLength: buffer.byteLength,
    read: (start, end) => buffer.slice(start, end),
  },
  columns: ['id', 'name', 'email'],
  onComplete: (data) => rows.push(...data),
})

const result = await pipeline([
  objectReadableStream(rows),
  csvFormatStream({ header: true }),
  fileWriteStream({ path: './users.csv' }),
])

Write CSV to Parquet

Read a CSV file, parse into objects, and write as Parquet to S3:

import { pipeline, streamToArray } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
import {
  csvDetectDelimitersStream,
  csvDetectHeaderStream,
  csvParseStream,
  csvCoerceValuesStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { createReadableStreamFromArrayBuffer } from '@datastream/core'
import { awsS3PutObjectStream } from '@datastream/aws'
import { tableFromJSON, tableToIPC } from 'apache-arrow'

const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
  delimiterChar: () => detectDelimiters.result().value.delimiterChar,
  newlineChar: () => detectDelimiters.result().value.newlineChar,
  quoteChar: () => detectDelimiters.result().value.quoteChar,
  escapeChar: () => detectDelimiters.result().value.escapeChar,
})

const rows = await streamToArray(
  await pipeline([
    fileReadStream({ path: './users.csv' }),
    detectDelimiters,
    detectHeader,
    csvParseStream({
      delimiterChar: () => detectDelimiters.result().value.delimiterChar,
      newlineChar: () => detectDelimiters.result().value.newlineChar,
      quoteChar: () => detectDelimiters.result().value.quoteChar,
      escapeChar: () => detectDelimiters.result().value.escapeChar,
    }),
    objectFromEntriesStream({
      keys: () => detectHeader.result().value.header,
    }),
    csvCoerceValuesStream(),
  ]),
)

const table = tableFromJSON(rows)
const { writeParquet } = await import('parquet-wasm')
const parquetBuffer = writeParquet(table)

await pipeline([
  createReadableStreamFromArrayBuffer(parquetBuffer),
  awsS3PutObjectStream({ Bucket: 'data-lake', Key: 'users.parquet' }),
])