rust_data_processing.examples

User guide: runnable Python examples for rust_data_processing.

Source: repository docs/python/README.md (included below for the HTML docs site), plus Phase 2 snippets from docs/python/PHASE2_EXAMPLES.md, and SFT / fine-tuning data prep from docs/python/SFT_PYTHON_EXAMPLES.md.


Python quick start and examples

Phase 2 scope: Phase 1 baseline plus export, privacy, Arrow, incremental ETL → Python; JVM planned

The same Markdown is included in the HTML docs (pdoc) as rust_data_processing.examples — see python-wrapper/rust_data_processing/examples.py.

This page collects Python snippets for the rust-data-processing package (PyO3 extension). The repository README leads with a short Python quick start; the canonical API reference is python-wrapper/API.md. Rust snippets live in docs/rust/README.md.

Install (from PyPI after release, or from a checkout with maturin develop — see docs/RELEASE_CHECKLIST.md):

pip install rust-data-processing

Notebooks: see notebooks/README.md in the repository for Jupyter examples.

import rust_data_processing as rdp

Delta Lake / Apache Iceberg (read path)

The Rust crate’s default build does not embed a Delta/Iceberg table engine (see Planning/ADR_P2_E2_LAKE_TABLE_READ.md). In Python, use deltalake or pyiceberg to scan a table, write Parquet, then call rdp.ingest_from_path on that file — or stay in Python/Polars for the heavy scan. Details: docs/LAKE_TABLE_READ.md.

Phase 2 examples (export, privacy, median, validation, …)

Copy-paste snippets live in PHASE2_EXAMPLES.md (same folder as this README).

What this page covers

Use this as a tour of the Python API (the same page is rendered on GitHub Pages). For every function signature and option, see python-wrapper/API.md.

Topic Where below
File ETL (CSV, JSON, Parquet, Excel) Quick start, JSON / Parquet / Excel, Ordered paths & directory scans
Databases (Python driver vs built-in ConnectorX) Database sources: two ways, ConnectorX
SQL & lazy DataFrame SQL, DataFrame pipelines, Cookbook
Declarative transforms TransformSpec
ML / QA (profile, validate, outliers) ML-oriented flow, Profiling, Validation, Outliers
Phase 2 (JSONL export, privacy, truncation, UTF-8 transforms, median, Delta handoff) Phase 2 examples, Delta / Iceberg
SFT / fine-tuning prep (Alpaca JSONL, HF optional, chat column) SFT Python examples, SFT_DATA_FORMATS.md
Row-wise & parallel processing Processing pipelines, Execution engine
Ingestion observability Observability
CDC boundary types CDC

Quick start (library usage)

Ingest a file with an explicit schema (format can be set or inferred from the extension):

import rust_data_processing as rdp

schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "name", "data_type": "utf8"},
]
ds = rdp.ingest_from_path(
    "tests/fixtures/people.csv",
    schema,
    {"format": "csv"},
)
print("rows", ds.row_count())

Infer schema from the file, then ingest (two passes — same idea as Rust’s ingest_with_inferred_schema):

ds, schema = rdp.ingest_with_inferred_schema("tests/fixtures/people.csv")
print(schema[0]["name"], ds.row_count())

JSON, Parquet, and Excel

Format is usually inferred from the extension; you can set "format" explicitly ("csv", "json", "parquet", "excel"). JSON with nested objects may need dotted column names in the schema (e.g. user.name for tests/fixtures/people.json).

import rust_data_processing as rdp

# JSON (nested columns — matches tests/fixtures/people.json)
schema_json = [
    {"name": "id", "data_type": "int64"},
    {"name": "user.name", "data_type": "utf8"},
    {"name": "score", "data_type": "float64"},
    {"name": "active", "data_type": "bool"},
]
ds_json = rdp.ingest_from_path(
    "tests/fixtures/people.json",
    schema_json,
    {"format": "json"},
)

# Parquet — use a real path to your `.parquet` file
schema_flat = [
    {"name": "id", "data_type": "int64"},
    {"name": "name", "data_type": "utf8"},
    {"name": "score", "data_type": "float64"},
    {"name": "active", "data_type": "bool"},
]
ds_parquet = rdp.ingest_from_path("path/to/file.parquet", schema_flat, {"format": "parquet"})

# Excel — optional sheet selection in options (see python-wrapper/README.md)
ds_excel = rdp.ingest_from_path("path/to/file.xlsx", schema_flat, {"format": "excel"})

Ordered paths and directory scans (incremental batches)

Use paths_from_directory_scan for a recursive listing with an optional glob on paths relative to the root; results are sorted so ordering is repeatable. Combine with ingest_from_ordered_paths: files are read in list order, rows are concatenated, then the same watermark options as single-file ingest apply once to the combined rows (not per file). The function returns (DataSet, metadata) where metadata includes paths, last_path, and max_watermark_value (when watermark_column / watermark_exclusive_above are set) for checkpointing the next run.

paths = rdp.paths_from_directory_scan("data/incoming", "**/*.csv")
ds, meta = rdp.ingest_from_ordered_paths(
    paths,
    schema,
    {"watermark_column": "ts", "watermark_exclusive_above": 100},
)
print(meta["last_path"], meta["max_watermark_value"])

DataFrame-centric pipelines (Polars-backed) (Phase 1)

Use DataFrame.from_dataset for a lazy plan; chain methods and collect() to a DataSet:

import rust_data_processing as rdp

schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "active", "data_type": "bool"},
    {"name": "score", "data_type": "float64"},
]
rows = [
    [1, True, 10.0],
    [2, True, 20.0],
    [3, False, 30.0],
]
ds = rdp.DataSet(schema, rows)

out = (
    rdp.DataFrame.from_dataset(ds)
    .filter_eq("active", True)
    .multiply_f64("score", 2.0)
    .collect()
)
assert out.row_count() == 2

SQL queries (Polars-backed) (Phase 1)

Single-table SQL — the DataSet is registered as table df; sql_query_dataset returns a materialized DataSet:

import rust_data_processing as rdp

schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "active", "data_type": "bool"},
    {"name": "score", "data_type": "float64"},
]
rows = [
    [1, True, 10.0],
    [2, True, 20.0],
    [3, False, 30.0],
]
ds = rdp.DataSet(schema, rows)

out = rdp.sql_query_dataset(
    ds,
    "SELECT id, score FROM df WHERE active = TRUE ORDER BY id DESC LIMIT 10",
)

Multi-table JOINs via SqlContext (execute returns a lazy DataFrame; call collect() for a DataSet):

import rust_data_processing as rdp

people = rdp.DataSet(
    [
        {"name": "id", "data_type": "int64"},
        {"name": "name", "data_type": "utf8"},
    ],
    [[1, "Ada"], [2, "Grace"]],
)
scores = rdp.DataSet(
    [
        {"name": "id", "data_type": "int64"},
        {"name": "score", "data_type": "float64"},
    ],
    [[1, 9.0], [3, 7.0]],
)

ctx = rdp.SqlContext()
ctx.register("people", rdp.DataFrame.from_dataset(people))
ctx.register("scores", rdp.DataFrame.from_dataset(scores))

out = ctx.execute(
    "SELECT p.id, p.name, s.score FROM people p JOIN scores s ON p.id = s.id"
).collect()

Database sources: two ways

  1. Your own Python driver (no db feature): Use psycopg2, SQLAlchemy, etc., execute SQL, then build a DataSet(schema, rows) from the result rows (schema/row shape in python-wrapper/API.md § Conventions; DataSet examples appear throughout this page). This works with the default PyPI wheel; you do not need ConnectorX or maturin … --features db.

  2. Built-in SQL → DataSet (requires db): ingest_from_db / ingest_from_db_infer use ConnectorX inside the native extension. The module must be built with the db Cargo feature (see python-wrapper/README_DEV.md).

Concrete pattern (any Python DB-API driver — no db feature): map query results to DataSet(schema, rows) yourself.

import rust_data_processing as rdp

# After cursor.execute(...): build schema from your column names + logical types,
# then rows = list(cursor.fetchall()) (or batch). Example shape:
schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "name", "data_type": "utf8"},
]
rows = [[1, "ada"], [2, "grace"]]
ds = rdp.DataSet(schema, rows)

Direct DB ingestion (ConnectorX) (feature-gated)

The native module must be built with the db Cargo feature (see python-wrapper/README_DEV.md). Then:

# Infer column types from the query result
ds = rdp.ingest_from_db_infer(
    "postgresql://user:pass@localhost:5432/db?cxprotocol=binary",
    "SELECT id, score, active FROM my_table",
)
print("rows", ds.row_count())

# Or provide an explicit schema (same strings as file ingest)
schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "score", "data_type": "float64"},
    {"name": "active", "data_type": "bool"},
]
ds = rdp.ingest_from_db(
    "postgresql://user:pass@localhost:5432/db?cxprotocol=binary",
    "SELECT id, score, active FROM my_table",
    schema,
)

End-user transformation spec (TransformSpec) (Phase 1)

transform_apply accepts a dict (or JSON string) in the same serde shape as Rust’s TransformSpec:

import rust_data_processing as rdp

schema_in = [
    {"name": "id", "data_type": "int64"},
    {"name": "score", "data_type": "int64"},
    {"name": "weather", "data_type": "utf8"},
]
rows = [[1, 10, "drizzle"], [2, None, "rain"]]
ds = rdp.DataSet(schema_in, rows)

spec = {
    "output_schema": {
        "fields": [
            {"name": "id", "data_type": "Int64"},
            {"name": "score_f", "data_type": "Float64"},
            {"name": "wx", "data_type": "Utf8"},
        ]
    },
    "steps": [
        {"Rename": {"pairs": [["weather", "wx"]]}},
        {"Rename": {"pairs": [["score", "score_f"]]}},
        {
            "Cast": {
                "column": "score_f",
                "to": "Float64",
                "mode": "lossy",
            }
        },
        {"FillNull": {"column": "score_f", "value": {"Float64": 0.0}}},
        {"Select": {"columns": ["id", "score_f", "wx"]}},
    ],
}
out = rdp.transform_apply(ds, spec)
assert out.column_names() == ["id", "score_f", "wx"]

ML / training-dataset prep (Phase 1)

For tabular ML, teams usually combine data quality and understanding before training or batch inference: profile columns (nulls, ranges, quantiles), validate business rules (not-null, regex, ranges), and optionally flag outliers for review. This library keeps those steps on a single DataSet; see Profiling, Validation, and Outlier detection below. Raw JSON or Markdown report strings are available as profile_dataset_json / profile_dataset_markdown (and the validate_* / detect_outliers_* variants) if you want files or PR comments instead of dicts.

Profiling (Phase 1)

import rust_data_processing as rdp

schema = [{"name": "score", "data_type": "float64"}]
rows = [[1.0], [None], [3.0]]
ds = rdp.DataSet(schema, rows)

rep = rdp.profile_dataset(
    ds,
    {"head_rows": 2, "quantiles": [0.5]},
)
assert rep["row_count"] == 2
assert rep["columns"][0]["null_count"] == 1

Markdown for humans (e.g. paste into a doc or ticket):

md = rdp.profile_dataset_markdown(ds, {"head_rows": 100, "quantiles": [0.5, 0.95]})

Validation (Phase 1)

import rust_data_processing as rdp

schema = [{"name": "email", "data_type": "utf8"}]
rows = [
    ["ada@example.com"],
    [None],
    ["not-an-email"],
]
ds = rdp.DataSet(schema, rows)

rep = rdp.validate_dataset(
    ds,
    {
        "checks": [
            {"kind": "not_null", "column": "email", "severity": "error"},
            {
                "kind": "regex_match",
                "column": "email",
                "pattern": r"^[^@]+@[^@]+\.[^@]+$",
                "severity": "warn",
                "strict": True,
            },
        ],
    },
)
assert rep["summary"]["total_checks"] >= 2

Outlier detection (Phase 1)

import rust_data_processing as rdp

schema = [{"name": "x", "data_type": "float64"}]
rows = [[1.0], [1.0], [1.0], [1000.0]]
ds = rdp.DataSet(schema, rows)

rep = rdp.detect_outliers(
    ds,
    "x",
    {"kind": "iqr", "k": 1.5},
    {"sampling": "full", "max_examples": 3},
)
assert rep["outlier_count"] >= 1

CDC interface boundary (Phase 1 spike)

The rust_data_processing.cdc submodule exposes plain Python types aligned with the Rust cdc module (no connector ships yet):

from rust_data_processing.cdc import CdcEvent, CdcOp, RowImage, SourceMeta, TableRef

ev = CdcEvent(
    meta=SourceMeta(source="db", checkpoint=None),
    table=TableRef.with_schema("public", "users"),
    op=CdcOp.INSERT,
    before=None,
    after=RowImage.new([("id", 1), ("name", "Ada")]),
)
assert ev.op == CdcOp.INSERT

Cookbook (Phase 1)

Stable transformation wrappers (Polars-backed)

Rename + cast + fill nulls:

import rust_data_processing as rdp

schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "score", "data_type": "int64"},
]
rows = [[1, 10], [2, None]]
ds = rdp.DataSet(schema, rows)

out = (
    rdp.DataFrame.from_dataset(ds)
    .rename([("score", "score_i")])
    .cast("score_i", "float64")
    .fill_null("score_i", 0.0)
    .collect()
)

Group-by aggregates:

schema = [
    {"name": "grp", "data_type": "utf8"},
    {"name": "score", "data_type": "float64"},
]
rows = [
    ["A", 1.0],
    ["A", 2.0],
    ["B", None],
]
ds = rdp.DataSet(schema, rows)

out = (
    rdp.DataFrame.from_dataset(ds)
    .group_by(
        ["grp"],
        [
            {"type": "sum", "column": "score", "alias": "sum_score"},
            {"type": "count_rows", "alias": "cnt"},
        ],
    )
    .collect()
)

Per-key mean, sample std dev, and count-distinct:

schema = [
    {"name": "grp", "data_type": "utf8"},
    {"name": "score", "data_type": "float64"},
    {"name": "label", "data_type": "utf8"},
]
rows = [
    ["A", 10.0, "x"],
    ["A", 20.0, "y"],
    ["B", None, "z"],
]
ds = rdp.DataSet(schema, rows)

_ = (
    rdp.DataFrame.from_dataset(ds)
    .group_by(
        ["grp"],
        [
            {"type": "mean", "column": "score", "alias": "mu_score"},
            {
                "type": "std_dev",
                "column": "score",
                "alias": "sd_score",
                "kind": "sample",
            },
            {
                "type": "count_distinct_non_null",
                "column": "label",
                "alias": "n_labels",
            },
        ],
    )
    .collect()
)

Semantics (nulls, all-null groups, SUM vs MEAN, casting): see docs/REDUCE_AGG_SEMANTICS.md. More detail: python-wrapper/API.md § Processing pipelines.

Join two DataFrames:

people_schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "name", "data_type": "utf8"},
]
people_rows = [[1, "Ada"], [2, "Grace"]]
people = rdp.DataSet(people_schema, people_rows)

scores_schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "score", "data_type": "float64"},
]
scores_rows = [[1, 9.0], [3, 7.0]]
scores = rdp.DataSet(scores_schema, scores_rows)

out = (
    rdp.DataFrame.from_dataset(people)
    .join(rdp.DataFrame.from_dataset(scores), ["id"], ["id"], "inner")
    .collect()
)

Processing pipelines (Epic 1 / Story 1.2)

In-memory helpers mirror rust_data_processing::processing:

  • processing_filter(ds, predicate)predicate receives one row as list
  • processing_map(ds, mapper)
  • processing_reduce(ds, column, op) — op names: count, sum, min, max, mean, variance_population, variance_sample, stddev_population, stddev_sample, sum_squares, l2_norm, count_distinct_non_null, …
  • processing_feature_wise_mean_std, processing_arg_max_row, processing_arg_min_row, processing_top_k_by_frequency

Polars-backed equivalents: DataFrame.reduce, DataFrame.feature_wise_mean_std. Semantics: docs/REDUCE_AGG_SEMANTICS.md.

Example:

import rust_data_processing as rdp

schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "active", "data_type": "bool"},
    {"name": "score", "data_type": "float64"},
]
rows = [
    [1, True, 10.0],
    [2, False, 20.0],
    [3, True, None],
]
ds = rdp.DataSet(schema, rows)

filtered = rdp.processing_filter(ds, lambda row: row[1] is True)
mapped = rdp.processing_map(
    filtered,
    lambda row: [
        row[0],
        row[1],
        row[2] * 1.1 if row[2] is not None else None,
    ],
)
s = rdp.processing_reduce(mapped, "score", "sum")
assert s == 11.0

Mean, variance, norms, distinct counts

schema = [
    {"name": "x", "data_type": "float64"},
    {"name": "cat", "data_type": "utf8"},
]
rows = [[2.0, "a"], [4.0, "b"]]
ds = rdp.DataSet(schema, rows)

mean = rdp.processing_reduce(ds, "x", "mean")
std_s = rdp.processing_reduce(ds, "x", "stddev_sample")
l2 = rdp.processing_reduce(ds, "x", "l2_norm")
d = rdp.processing_reduce(ds, "cat", "count_distinct_non_null")
assert mean is not None and d == 2

Polars-backed DataFrame.reduce (same op names)

mem = rdp.processing_reduce(ds, "x", "mean")
pol = rdp.DataFrame.from_dataset(ds).reduce("x", "mean")
assert pol is not None and mem == pol

Feature-wise mean and std (memory vs Polars)

schema = [
    {"name": "a", "data_type": "int64"},
    {"name": "b", "data_type": "float64"},
]
rows = [[1, 10.0], [3, 20.0]]
ds = rdp.DataSet(schema, rows)

cols = ["a", "b"]
mem = rdp.processing_feature_wise_mean_std(ds, cols, "sample")
pol = rdp.DataFrame.from_dataset(ds).feature_wise_mean_std(cols, "sample")
assert mem[0]["column"] == pol[0]["column"]

Arg max / arg min row and top‑k label frequencies

schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "region", "data_type": "utf8"},
]
rows = [[1, "west"], [2, "east"], [3, "west"]]
ds = rdp.DataSet(schema, rows)

assert rdp.processing_arg_max_row(ds, "id") is not None
assert len(rdp.processing_top_k_by_frequency(ds, "region", 2)) >= 1

Execution engine (parallel pipelines) (Story 1.3)

import rust_data_processing as rdp

schema = [
    {"name": "id", "data_type": "int64"},
    {"name": "active", "data_type": "bool"},
    {"name": "score", "data_type": "float64"},
]
rows = [
    [1, True, 10.0],
    [2, False, 20.0],
    [3, True, None],
]
ds = rdp.DataSet(schema, rows)

engine = rdp.ExecutionEngine(
    {"num_threads": 4, "chunk_size": 1024, "max_in_flight_chunks": 4}
)
active_idx = 1
filtered = engine.filter_parallel(ds, lambda row: row[active_idx] is True)
mapped = engine.map_parallel(filtered, lambda row: list(row))
s = engine.reduce(mapped, "score", "sum")
metrics = engine.metrics_snapshot()
print("rows_processed", metrics["rows_processed"], "elapsed", metrics.get("elapsed_seconds"))

More examples: counts, missing columns, all-null numeric

schema = [{"name": "score", "data_type": "float64"}]
rows = [[1.0], [None]]
ds = rdp.DataSet(schema, rows)

assert rdp.processing_reduce(ds, "score", "count") == 2
assert rdp.processing_reduce(ds, "score", "sum") == 1.0
assert rdp.processing_reduce(ds, "missing", "sum") is None

all_null = rdp.DataSet(
    [{"name": "x", "data_type": "float64"}],
    [[None], [None]],
)
assert rdp.processing_reduce(all_null, "x", "mean") is None

Benchmarks (Story 1.2.5)

Criterion benchmarks are run on the Rust crate (cargo bench, scripts/run_benchmarks.ps1). The Python package calls the same native code paths; for throughput numbers, use the Rust workflow or see the repository README benchmark snapshot.

Observability (failure/alert hooks)

Pass observer / alert_at_or_above in the ingestion options dict (see python-wrapper/API.md § Ingestion observability):

import rust_data_processing as rdp

schema = [{"name": "id", "data_type": "int64"}]

def on_alert(ctx, severity, message):
    print(severity, message)

try:
    rdp.ingest_from_path(
        "does_not_exist.csv",
        schema,
        {
            "format": "csv",
            "alert_at_or_above": "critical",
            "observer": {"on_alert": on_alert},
        },
    )
except ValueError:
    pass

See also


Phase 2 — Python examples

Copy-paste snippets for export / JSONL, privacy summaries, reports truncation, UTF-8 transforms, string-length validation, median reductions, and Delta/Iceberg handoff. Requires pip install rust-data-processing (or maturin develop from python-wrapper/).


1. JSON Lines export and train/test indices

import rust_data_processing as rdp

schema = [{"name": "id", "data_type": "int64"}, {"name": "label", "data_type": "utf8"}]
rows = [[1, "ok"], [2, "ok"], [3, "holdout"]]
ds = rdp.DataSet(schema, rows)

text = rdp.export_dataset_jsonl(ds, ["id", "label"])
print(text)  # one JSON object per line

records = rdp.export_jsonl_records(ds, ["id", "label"])  # list[dict]

train_idx, test_idx = rdp.export_train_test_row_indices(ds.row_count(), test_fraction=0.34)
train_rows = [rows[i] for i in train_idx]
test_rows = [rows[i] for i in test_idx]

2. UTF-8 length filter (export helper)

import rust_data_processing as rdp

ds = rdp.DataSet(
    [{"name": "msg", "data_type": "utf8"}],
    [["hi"], ["this is too long for a tweet style cap"]],
)
short_only = rdp.export_filter_rows_max_utf8_chars(ds, "msg", max_chars=10)

3. Privacy diff reports (after transform_apply)

import rust_data_processing as rdp

schema = [{"name": "email", "data_type": "utf8"}]
before = rdp.DataSet(schema, [["user@company.com"]])
spec = {
    "output_schema": {"fields": [{"name": "email", "data_type": "Utf8"}]},
    "steps": [
        {
            "Utf8RedactMiddle": {
                "column": "email",
                "keep_left": 2,
                "keep_right": 0,
                "redaction": "***",
            }
        }
    ],
}
after = rdp.transform_apply(before, spec)

rows = rdp.privacy_summarize_utf8_changes(before, after, ["email"])  # list of dicts
md = rdp.privacy_summarize_utf8_changes(before, after, ["email"], as_markdown=True)

4. Truncate large JSON / text for logs or LLM context

import rust_data_processing as rdp

blob = '{"profile": ' + ("x" * 5000) + "}"
snippet = rdp.reports_truncate_utf8_bytes(blob, max_bytes=256)

5. UTF-8 masking transforms (TransformSpec)

Supported step variants: Utf8Truncate, Utf8Sha256Hex, Utf8RedactMiddle (see Rust enum names in JSON). Use transform_apply(dataset, spec_dict) so you do not hand-serialize JSON.

import rust_data_processing as rdp

ds = rdp.DataSet([{"name": "s", "data_type": "utf8"}], [["secret-value"]])
spec = {
    "output_schema": {"fields": [{"name": "s", "data_type": "Utf8"}]},
    "steps": [{"Utf8Sha256Hex": {"column": "s"}}],
}
out = rdp.transform_apply(ds, spec)

6. Validation: UTF-8 length (Unicode scalars)

import rust_data_processing as rdp

ds = rdp.DataSet([{"name": "code", "data_type": "utf8"}], [["AB"], ["ABCDE"]])
rep = rdp.validate_dataset(
    ds,
    {
        "checks": [
            {
                "kind": "utf8_len_chars_between",
                "column": "code",
                "min_chars": 3,
                "max_chars": 10,
                "severity": "warn",
            }
        ]
    },
)

7. Median: processing_reduce and DataFrame

import rust_data_processing as rdp

ds = rdp.DataSet([{"name": "x", "data_type": "int64"}], [[10], [20], [30], [40]])
assert rdp.processing_reduce(ds, "x", "median") == 25.0

ds2 = rdp.DataSet(
    [{"name": "g", "data_type": "utf8"}, {"name": "v", "data_type": "int64"}],
    [["a", 1], ["a", 100]],
)
lf = rdp.DataFrame.from_dataset(ds2)
m = lf.group_by(["g"], [{"type": "median", "column": "v", "alias": "m"}]).collect()

8. Delta Lake / Iceberg (handoff — not in-process in default wheel)

Use Python deltalake or PyIceberg to read a table, write Parquet (or build rows), then:

import rust_data_processing as rdp

# After you materialize a Parquet path from deltalake / Spark:
schema = rdp.infer_schema_from_path("exported_slice.parquet")
ds = rdp.ingest_from_path("exported_slice.parquet", schema)

Details: LAKE_TABLE_READ.md, ADR_P2_E2_LAKE_TABLE_READ.md.


9. End-to-end: ingest → validate → JSONL (tabular QA)

import rust_data_processing as rdp

# Point at a real CSV path on disk.
path = "tests/fixtures/people.csv"  # from repo root; use absolute path in notebooks
schema = rdp.infer_schema_from_path(path)
ds = rdp.ingest_from_path(path, schema)
_ = rdp.profile_dataset(ds, {"sampling": "full"})
rep = rdp.validate_dataset(ds, {"checks": [{"kind": "not_null", "column": schema[0]["name"], "severity": "warn"}]})
cols = [f["name"] for f in schema]
jl = rdp.export_dataset_jsonl(ds, cols)

10. Incremental ETL helpers (watermark, ordered paths, Hive-style discovery)

These shipped in Phase 2 / P2-E1 and are exposed on the same Python module.

import rust_data_processing as rdp

# Watermark: after ingest, keep rows strictly above a floor (set both keys together).
opts = {"watermark_column": "ts", "watermark_exclusive_above": 100}
# ds = rdp.ingest_from_path("events.csv", schema, opts)

# Append-only batch: concatenate files then apply watermark once.
# ds, meta = rdp.ingest_from_ordered_paths(["/data/a.csv", "/data/b.csv"], schema, opts)

# Hive-style layout: discover partitioned files under a root.
# files = rdp.discover_hive_partitioned_files("s3://bucket/prefix", file_pattern="*.parquet")
# paths = rdp.paths_from_directory_scan("/data/events", relative_pattern="*.csv")

See API.md § Ingestion and Incremental / watermark.


See also


SFT / fine-tuning data — runnable Python examples (rust_data_processing)

These examples use rust_data_processing (rdp) only for the tabular steps: ingest, profile, validate, optional TransformSpec, JSONL export, and deterministic train/test row indices. They do not run training or ship a tokenizer.

Prerequisite: pip install rust-data-processing (or maturin develop from python-wrapper/).

Trainer warning: chat templates and tokenizers belong in TRL / HF / Llama-Factory — see SFT_DATA_FORMATS.md.


1. Alpaca-style NDJSON (committed sample in this repo)

The file examples/sft/sample_alpaca.ndjson has four rows with instruction, input, and output (UTF-8). Run from the repository root, or build an absolute path to that file:

from pathlib import Path

import rust_data_processing as rdp

path = Path("examples/sft/sample_alpaca.ndjson").resolve()

schema = [
    {"name": "instruction", "data_type": "utf8"},
    {"name": "input", "data_type": "utf8"},
    {"name": "output", "data_type": "utf8"},
]
# `.ndjson` is detected as newline-delimited JSON (no need to force `format`).
ds = rdp.ingest_from_path(str(path), schema)

assert ds.row_count() == 4
prof = rdp.profile_dataset(ds, {"sampling": "full"})
print("profile row_count", prof["row_count"])

# Example QA: flag rows with very short outputs (policy is yours)
rep = rdp.validate_dataset(
    ds,
    {
        "checks": [
            {
                "kind": "utf8_len_chars_between",
                "column": "output",
                "min_chars": 2,
                "max_chars": 500,
                "severity": "warn",
            }
        ]
    },
)
print("failed_checks", rep["summary"]["failed_checks"])

# Stable JSONL for a trainer that expects instruction/input/output columns
cols = ["instruction", "input", "output"]
text = rdp.export_dataset_jsonl(ds, cols)
print(text.splitlines()[0][:80], "...")

# Deterministic tail holdout (same semantics as Rust `train_test_row_indices`)
train_idx, test_idx = rdp.export_train_test_row_indices(ds.row_count(), test_fraction=0.25)
rows = ds.to_rows()
train_ds = rdp.DataSet(schema, [rows[i] for i in train_idx])
test_ds = rdp.DataSet(schema, [rows[i] for i in test_idx])

2. “Messages” / chat column as a single JSON string

Some pipelines store one UTF-8 column whose cell is JSON: [{"role":"user","content":"..."}, ...]. Keep it as Utf8; validate length; export as one field per JSONL line.

import json

import rust_data_processing as rdp

messages = [
    {"role": "user", "content": "Summarize: Rust is a systems language."},
    {"role": "assistant", "content": "Rust is a systems programming language focused on safety."},
]
schema = [{"name": "messages", "data_type": "utf8"}]
ds = rdp.DataSet(schema, [[json.dumps(messages)]])
jl = rdp.export_dataset_jsonl(ds, ["messages"])
print(jl.strip())

Your trainer may expect a different key (conversations, etc.) — rename in a TransformSpec or when building rows.


3. Optional: slice a well-known public dataset with Hugging Face datasets

This requires pip install datasets and network the first time HF caches the split. It is not a dev dependency of this repo.

Alpaca (tatsu-lab; check the current license / terms on the Hugging Face dataset card before production use):

# pip install datasets
from datasets import load_dataset

import rust_data_processing as rdp

hf = load_dataset("tatsu-lab/alpaca", split="train[:8]")  # tiny slice for a demo
schema = [
    {"name": "instruction", "data_type": "utf8"},
    {"name": "input", "data_type": "utf8"},
    {"name": "output", "data_type": "utf8"},
]
rows = [
    [row["instruction"], row.get("input") or "", row["output"]]
    for row in hf
]
ds = rdp.DataSet(schema, rows)
rep = rdp.validate_dataset(
    ds,
    {"checks": [{"kind": "not_null", "column": "instruction", "severity": "error"}]},
)
print(rdp.export_dataset_jsonl(ds, ["instruction", "output"]))

Other common entrypoints (same pattern: load_dataset → list of rows → DataSet):

  • Databricks Dolly — see §4 below (instruction / context / response on the dataset card).
  • OpenAssistant — larger; use a tiny split="train[:32]" for experiments; map whatever columns you need into a fixed DataSet schema before export.

Always read the dataset card for license, attribution, and allowed use.


4. Databricks Dolly 15k (tiny slice → same QA + JSONL pattern)

The databricks/databricks-dolly-15k split exposes (among others) instruction, context, and response. Map them into the same three-column shape as Alpaca-style tooling (instruction, input, output) by using context as input:

# pip install datasets
from datasets import load_dataset

import rust_data_processing as rdp

hf = load_dataset("databricks/databricks-dolly-15k", split="train[:16]")
schema = [
    {"name": "instruction", "data_type": "utf8"},
    {"name": "input", "data_type": "utf8"},
    {"name": "output", "data_type": "utf8"},
]
rows = [
    [r["instruction"], (r.get("context") or "").strip(), r["output"]]
    for r in hf
]
ds = rdp.DataSet(schema, rows)
print("rows", ds.row_count(), "failed_checks", rdp.validate_dataset(ds, {"checks": [{"kind": "not_null", "column": "output", "severity": "error"}]})["summary"]["failed_checks"])
print(rdp.export_dataset_jsonl(ds, ["instruction", "input", "output"])[:200], "...")

See also

 1"""User guide: runnable Python examples for `rust_data_processing`.
 2
 3Source: repository ``docs/python/README.md`` (included below for the HTML docs site), plus
 4Phase 2 snippets from ``docs/python/PHASE2_EXAMPLES.md``, and SFT / fine-tuning data prep from
 5``docs/python/SFT_PYTHON_EXAMPLES.md``.
 6
 7----
 8
 9.. include:: ../../docs/python/README.md
10
11----
12
13.. include:: ../../docs/python/PHASE2_EXAMPLES.md
14
15----
16
17.. include:: ../../docs/python/SFT_PYTHON_EXAMPLES.md
18"""