rust_data_processing.examples
User guide: runnable Python examples for rust_data_processing.
Source: repository docs/python/README.md (included below for the HTML docs site), plus
Phase 2 snippets from docs/python/PHASE2_EXAMPLES.md, and SFT / fine-tuning data prep from
docs/python/SFT_PYTHON_EXAMPLES.md.
Python quick start and examples
The same Markdown is included in the HTML docs (pdoc) as rust_data_processing.examples — see python-wrapper/rust_data_processing/examples.py.
This page collects Python snippets for the rust-data-processing package (PyO3 extension). The repository README leads with a short Python quick start; the canonical API reference is python-wrapper/API.md. Rust snippets live in docs/rust/README.md.
Install (from PyPI after release, or from a checkout with maturin develop — see docs/RELEASE_CHECKLIST.md):
pip install rust-data-processing
Notebooks: see notebooks/README.md in the repository for Jupyter examples.
import rust_data_processing as rdp
Delta Lake / Apache Iceberg (read path)
The Rust crate’s default build does not embed a Delta/Iceberg table engine (see Planning/ADR_P2_E2_LAKE_TABLE_READ.md). In Python, use deltalake or pyiceberg to scan a table, write Parquet, then call rdp.ingest_from_path on that file — or stay in Python/Polars for the heavy scan. Details: docs/LAKE_TABLE_READ.md.
Phase 2 examples (export, privacy, median, validation, …)
Copy-paste snippets live in PHASE2_EXAMPLES.md (same folder as this README).
What this page covers
Use this as a tour of the Python API (the same page is rendered on GitHub Pages). For every function signature and option, see python-wrapper/API.md.
| Topic | Where below |
|---|---|
| File ETL (CSV, JSON, Parquet, Excel) | Quick start, JSON / Parquet / Excel, Ordered paths & directory scans |
| Databases (Python driver vs built-in ConnectorX) | Database sources: two ways, ConnectorX |
SQL & lazy DataFrame |
SQL, DataFrame pipelines, Cookbook |
| Declarative transforms | TransformSpec |
| ML / QA (profile, validate, outliers) | ML-oriented flow, Profiling, Validation, Outliers |
| Phase 2 (JSONL export, privacy, truncation, UTF-8 transforms, median, Delta handoff) | Phase 2 examples, Delta / Iceberg |
| SFT / fine-tuning prep (Alpaca JSONL, HF optional, chat column) | SFT Python examples, SFT_DATA_FORMATS.md |
| Row-wise & parallel processing | Processing pipelines, Execution engine |
| Ingestion observability | Observability |
| CDC boundary types | CDC |
Quick start (library usage)
Ingest a file with an explicit schema (format can be set or inferred from the extension):
import rust_data_processing as rdp
schema = [
{"name": "id", "data_type": "int64"},
{"name": "name", "data_type": "utf8"},
]
ds = rdp.ingest_from_path(
"tests/fixtures/people.csv",
schema,
{"format": "csv"},
)
print("rows", ds.row_count())
Infer schema from the file, then ingest (two passes — same idea as Rust’s ingest_with_inferred_schema):
ds, schema = rdp.ingest_with_inferred_schema("tests/fixtures/people.csv")
print(schema[0]["name"], ds.row_count())
JSON, Parquet, and Excel
Format is usually inferred from the extension; you can set "format" explicitly ("csv", "json", "parquet", "excel"). JSON with nested objects may need dotted column names in the schema (e.g. user.name for tests/fixtures/people.json).
import rust_data_processing as rdp
# JSON (nested columns — matches tests/fixtures/people.json)
schema_json = [
{"name": "id", "data_type": "int64"},
{"name": "user.name", "data_type": "utf8"},
{"name": "score", "data_type": "float64"},
{"name": "active", "data_type": "bool"},
]
ds_json = rdp.ingest_from_path(
"tests/fixtures/people.json",
schema_json,
{"format": "json"},
)
# Parquet — use a real path to your `.parquet` file
schema_flat = [
{"name": "id", "data_type": "int64"},
{"name": "name", "data_type": "utf8"},
{"name": "score", "data_type": "float64"},
{"name": "active", "data_type": "bool"},
]
ds_parquet = rdp.ingest_from_path("path/to/file.parquet", schema_flat, {"format": "parquet"})
# Excel — optional sheet selection in options (see python-wrapper/README.md)
ds_excel = rdp.ingest_from_path("path/to/file.xlsx", schema_flat, {"format": "excel"})
Ordered paths and directory scans (incremental batches)
Use paths_from_directory_scan for a recursive listing with an optional glob on paths relative to the root; results are sorted so ordering is repeatable. Combine with ingest_from_ordered_paths: files are read in list order, rows are concatenated, then the same watermark options as single-file ingest apply once to the combined rows (not per file). The function returns (DataSet, metadata) where metadata includes paths, last_path, and max_watermark_value (when watermark_column / watermark_exclusive_above are set) for checkpointing the next run.
paths = rdp.paths_from_directory_scan("data/incoming", "**/*.csv")
ds, meta = rdp.ingest_from_ordered_paths(
paths,
schema,
{"watermark_column": "ts", "watermark_exclusive_above": 100},
)
print(meta["last_path"], meta["max_watermark_value"])
DataFrame-centric pipelines (Polars-backed) (Phase 1)
Use DataFrame.from_dataset for a lazy plan; chain methods and collect() to a DataSet:
import rust_data_processing as rdp
schema = [
{"name": "id", "data_type": "int64"},
{"name": "active", "data_type": "bool"},
{"name": "score", "data_type": "float64"},
]
rows = [
[1, True, 10.0],
[2, True, 20.0],
[3, False, 30.0],
]
ds = rdp.DataSet(schema, rows)
out = (
rdp.DataFrame.from_dataset(ds)
.filter_eq("active", True)
.multiply_f64("score", 2.0)
.collect()
)
assert out.row_count() == 2
SQL queries (Polars-backed) (Phase 1)
Single-table SQL — the DataSet is registered as table df; sql_query_dataset returns a materialized DataSet:
import rust_data_processing as rdp
schema = [
{"name": "id", "data_type": "int64"},
{"name": "active", "data_type": "bool"},
{"name": "score", "data_type": "float64"},
]
rows = [
[1, True, 10.0],
[2, True, 20.0],
[3, False, 30.0],
]
ds = rdp.DataSet(schema, rows)
out = rdp.sql_query_dataset(
ds,
"SELECT id, score FROM df WHERE active = TRUE ORDER BY id DESC LIMIT 10",
)
Multi-table JOINs via SqlContext (execute returns a lazy DataFrame; call collect() for a DataSet):
import rust_data_processing as rdp
people = rdp.DataSet(
[
{"name": "id", "data_type": "int64"},
{"name": "name", "data_type": "utf8"},
],
[[1, "Ada"], [2, "Grace"]],
)
scores = rdp.DataSet(
[
{"name": "id", "data_type": "int64"},
{"name": "score", "data_type": "float64"},
],
[[1, 9.0], [3, 7.0]],
)
ctx = rdp.SqlContext()
ctx.register("people", rdp.DataFrame.from_dataset(people))
ctx.register("scores", rdp.DataFrame.from_dataset(scores))
out = ctx.execute(
"SELECT p.id, p.name, s.score FROM people p JOIN scores s ON p.id = s.id"
).collect()
Database sources: two ways
Your own Python driver (no
dbfeature): Use psycopg2, SQLAlchemy, etc., execute SQL, then build aDataSet(schema, rows)from the result rows (schema/row shape inpython-wrapper/API.md§ Conventions;DataSetexamples appear throughout this page). This works with the default PyPI wheel; you do not need ConnectorX ormaturin … --features db.Built-in SQL →
DataSet(requiresdb):ingest_from_db/ingest_from_db_inferuse ConnectorX inside the native extension. The module must be built with thedbCargo feature (seepython-wrapper/README_DEV.md).
Concrete pattern (any Python DB-API driver — no db feature): map query results to DataSet(schema, rows) yourself.
import rust_data_processing as rdp
# After cursor.execute(...): build schema from your column names + logical types,
# then rows = list(cursor.fetchall()) (or batch). Example shape:
schema = [
{"name": "id", "data_type": "int64"},
{"name": "name", "data_type": "utf8"},
]
rows = [[1, "ada"], [2, "grace"]]
ds = rdp.DataSet(schema, rows)
Direct DB ingestion (ConnectorX) (feature-gated)
The native module must be built with the db Cargo feature (see python-wrapper/README_DEV.md). Then:
# Infer column types from the query result
ds = rdp.ingest_from_db_infer(
"postgresql://user:pass@localhost:5432/db?cxprotocol=binary",
"SELECT id, score, active FROM my_table",
)
print("rows", ds.row_count())
# Or provide an explicit schema (same strings as file ingest)
schema = [
{"name": "id", "data_type": "int64"},
{"name": "score", "data_type": "float64"},
{"name": "active", "data_type": "bool"},
]
ds = rdp.ingest_from_db(
"postgresql://user:pass@localhost:5432/db?cxprotocol=binary",
"SELECT id, score, active FROM my_table",
schema,
)
End-user transformation spec (TransformSpec) (Phase 1)
transform_apply accepts a dict (or JSON string) in the same serde shape as Rust’s TransformSpec:
import rust_data_processing as rdp
schema_in = [
{"name": "id", "data_type": "int64"},
{"name": "score", "data_type": "int64"},
{"name": "weather", "data_type": "utf8"},
]
rows = [[1, 10, "drizzle"], [2, None, "rain"]]
ds = rdp.DataSet(schema_in, rows)
spec = {
"output_schema": {
"fields": [
{"name": "id", "data_type": "Int64"},
{"name": "score_f", "data_type": "Float64"},
{"name": "wx", "data_type": "Utf8"},
]
},
"steps": [
{"Rename": {"pairs": [["weather", "wx"]]}},
{"Rename": {"pairs": [["score", "score_f"]]}},
{
"Cast": {
"column": "score_f",
"to": "Float64",
"mode": "lossy",
}
},
{"FillNull": {"column": "score_f", "value": {"Float64": 0.0}}},
{"Select": {"columns": ["id", "score_f", "wx"]}},
],
}
out = rdp.transform_apply(ds, spec)
assert out.column_names() == ["id", "score_f", "wx"]
ML / training-dataset prep (Phase 1)
For tabular ML, teams usually combine data quality and understanding before training or batch inference: profile columns (nulls, ranges, quantiles), validate business rules (not-null, regex, ranges), and optionally flag outliers for review. This library keeps those steps on a single DataSet; see Profiling, Validation, and Outlier detection below. Raw JSON or Markdown report strings are available as profile_dataset_json / profile_dataset_markdown (and the validate_* / detect_outliers_* variants) if you want files or PR comments instead of dicts.
Profiling (Phase 1)
import rust_data_processing as rdp
schema = [{"name": "score", "data_type": "float64"}]
rows = [[1.0], [None], [3.0]]
ds = rdp.DataSet(schema, rows)
rep = rdp.profile_dataset(
ds,
{"head_rows": 2, "quantiles": [0.5]},
)
assert rep["row_count"] == 2
assert rep["columns"][0]["null_count"] == 1
Markdown for humans (e.g. paste into a doc or ticket):
md = rdp.profile_dataset_markdown(ds, {"head_rows": 100, "quantiles": [0.5, 0.95]})
Validation (Phase 1)
import rust_data_processing as rdp
schema = [{"name": "email", "data_type": "utf8"}]
rows = [
["ada@example.com"],
[None],
["not-an-email"],
]
ds = rdp.DataSet(schema, rows)
rep = rdp.validate_dataset(
ds,
{
"checks": [
{"kind": "not_null", "column": "email", "severity": "error"},
{
"kind": "regex_match",
"column": "email",
"pattern": r"^[^@]+@[^@]+\.[^@]+$",
"severity": "warn",
"strict": True,
},
],
},
)
assert rep["summary"]["total_checks"] >= 2
Outlier detection (Phase 1)
import rust_data_processing as rdp
schema = [{"name": "x", "data_type": "float64"}]
rows = [[1.0], [1.0], [1.0], [1000.0]]
ds = rdp.DataSet(schema, rows)
rep = rdp.detect_outliers(
ds,
"x",
{"kind": "iqr", "k": 1.5},
{"sampling": "full", "max_examples": 3},
)
assert rep["outlier_count"] >= 1
CDC interface boundary (Phase 1 spike)
The rust_data_processing.cdc submodule exposes plain Python types aligned with the Rust cdc module (no connector ships yet):
from rust_data_processing.cdc import CdcEvent, CdcOp, RowImage, SourceMeta, TableRef
ev = CdcEvent(
meta=SourceMeta(source="db", checkpoint=None),
table=TableRef.with_schema("public", "users"),
op=CdcOp.INSERT,
before=None,
after=RowImage.new([("id", 1), ("name", "Ada")]),
)
assert ev.op == CdcOp.INSERT
Cookbook (Phase 1)
Stable transformation wrappers (Polars-backed)
Rename + cast + fill nulls:
import rust_data_processing as rdp
schema = [
{"name": "id", "data_type": "int64"},
{"name": "score", "data_type": "int64"},
]
rows = [[1, 10], [2, None]]
ds = rdp.DataSet(schema, rows)
out = (
rdp.DataFrame.from_dataset(ds)
.rename([("score", "score_i")])
.cast("score_i", "float64")
.fill_null("score_i", 0.0)
.collect()
)
Group-by aggregates:
schema = [
{"name": "grp", "data_type": "utf8"},
{"name": "score", "data_type": "float64"},
]
rows = [
["A", 1.0],
["A", 2.0],
["B", None],
]
ds = rdp.DataSet(schema, rows)
out = (
rdp.DataFrame.from_dataset(ds)
.group_by(
["grp"],
[
{"type": "sum", "column": "score", "alias": "sum_score"},
{"type": "count_rows", "alias": "cnt"},
],
)
.collect()
)
Per-key mean, sample std dev, and count-distinct:
schema = [
{"name": "grp", "data_type": "utf8"},
{"name": "score", "data_type": "float64"},
{"name": "label", "data_type": "utf8"},
]
rows = [
["A", 10.0, "x"],
["A", 20.0, "y"],
["B", None, "z"],
]
ds = rdp.DataSet(schema, rows)
_ = (
rdp.DataFrame.from_dataset(ds)
.group_by(
["grp"],
[
{"type": "mean", "column": "score", "alias": "mu_score"},
{
"type": "std_dev",
"column": "score",
"alias": "sd_score",
"kind": "sample",
},
{
"type": "count_distinct_non_null",
"column": "label",
"alias": "n_labels",
},
],
)
.collect()
)
Semantics (nulls, all-null groups, SUM vs MEAN, casting): see docs/REDUCE_AGG_SEMANTICS.md. More detail: python-wrapper/API.md § Processing pipelines.
Join two DataFrames:
people_schema = [
{"name": "id", "data_type": "int64"},
{"name": "name", "data_type": "utf8"},
]
people_rows = [[1, "Ada"], [2, "Grace"]]
people = rdp.DataSet(people_schema, people_rows)
scores_schema = [
{"name": "id", "data_type": "int64"},
{"name": "score", "data_type": "float64"},
]
scores_rows = [[1, 9.0], [3, 7.0]]
scores = rdp.DataSet(scores_schema, scores_rows)
out = (
rdp.DataFrame.from_dataset(people)
.join(rdp.DataFrame.from_dataset(scores), ["id"], ["id"], "inner")
.collect()
)
Processing pipelines (Epic 1 / Story 1.2)
In-memory helpers mirror rust_data_processing::processing:
processing_filter(ds, predicate)—predicatereceives one row aslistprocessing_map(ds, mapper)processing_reduce(ds, column, op)— op names:count,sum,min,max,mean,variance_population,variance_sample,stddev_population,stddev_sample,sum_squares,l2_norm,count_distinct_non_null, …processing_feature_wise_mean_std,processing_arg_max_row,processing_arg_min_row,processing_top_k_by_frequency
Polars-backed equivalents: DataFrame.reduce, DataFrame.feature_wise_mean_std. Semantics: docs/REDUCE_AGG_SEMANTICS.md.
Example:
import rust_data_processing as rdp
schema = [
{"name": "id", "data_type": "int64"},
{"name": "active", "data_type": "bool"},
{"name": "score", "data_type": "float64"},
]
rows = [
[1, True, 10.0],
[2, False, 20.0],
[3, True, None],
]
ds = rdp.DataSet(schema, rows)
filtered = rdp.processing_filter(ds, lambda row: row[1] is True)
mapped = rdp.processing_map(
filtered,
lambda row: [
row[0],
row[1],
row[2] * 1.1 if row[2] is not None else None,
],
)
s = rdp.processing_reduce(mapped, "score", "sum")
assert s == 11.0
Mean, variance, norms, distinct counts
schema = [
{"name": "x", "data_type": "float64"},
{"name": "cat", "data_type": "utf8"},
]
rows = [[2.0, "a"], [4.0, "b"]]
ds = rdp.DataSet(schema, rows)
mean = rdp.processing_reduce(ds, "x", "mean")
std_s = rdp.processing_reduce(ds, "x", "stddev_sample")
l2 = rdp.processing_reduce(ds, "x", "l2_norm")
d = rdp.processing_reduce(ds, "cat", "count_distinct_non_null")
assert mean is not None and d == 2
Polars-backed DataFrame.reduce (same op names)
mem = rdp.processing_reduce(ds, "x", "mean")
pol = rdp.DataFrame.from_dataset(ds).reduce("x", "mean")
assert pol is not None and mem == pol
Feature-wise mean and std (memory vs Polars)
schema = [
{"name": "a", "data_type": "int64"},
{"name": "b", "data_type": "float64"},
]
rows = [[1, 10.0], [3, 20.0]]
ds = rdp.DataSet(schema, rows)
cols = ["a", "b"]
mem = rdp.processing_feature_wise_mean_std(ds, cols, "sample")
pol = rdp.DataFrame.from_dataset(ds).feature_wise_mean_std(cols, "sample")
assert mem[0]["column"] == pol[0]["column"]
Arg max / arg min row and top‑k label frequencies
schema = [
{"name": "id", "data_type": "int64"},
{"name": "region", "data_type": "utf8"},
]
rows = [[1, "west"], [2, "east"], [3, "west"]]
ds = rdp.DataSet(schema, rows)
assert rdp.processing_arg_max_row(ds, "id") is not None
assert len(rdp.processing_top_k_by_frequency(ds, "region", 2)) >= 1
Execution engine (parallel pipelines) (Story 1.3)
import rust_data_processing as rdp
schema = [
{"name": "id", "data_type": "int64"},
{"name": "active", "data_type": "bool"},
{"name": "score", "data_type": "float64"},
]
rows = [
[1, True, 10.0],
[2, False, 20.0],
[3, True, None],
]
ds = rdp.DataSet(schema, rows)
engine = rdp.ExecutionEngine(
{"num_threads": 4, "chunk_size": 1024, "max_in_flight_chunks": 4}
)
active_idx = 1
filtered = engine.filter_parallel(ds, lambda row: row[active_idx] is True)
mapped = engine.map_parallel(filtered, lambda row: list(row))
s = engine.reduce(mapped, "score", "sum")
metrics = engine.metrics_snapshot()
print("rows_processed", metrics["rows_processed"], "elapsed", metrics.get("elapsed_seconds"))
More examples: counts, missing columns, all-null numeric
schema = [{"name": "score", "data_type": "float64"}]
rows = [[1.0], [None]]
ds = rdp.DataSet(schema, rows)
assert rdp.processing_reduce(ds, "score", "count") == 2
assert rdp.processing_reduce(ds, "score", "sum") == 1.0
assert rdp.processing_reduce(ds, "missing", "sum") is None
all_null = rdp.DataSet(
[{"name": "x", "data_type": "float64"}],
[[None], [None]],
)
assert rdp.processing_reduce(all_null, "x", "mean") is None
Benchmarks (Story 1.2.5)
Criterion benchmarks are run on the Rust crate (cargo bench, scripts/run_benchmarks.ps1). The Python package calls the same native code paths; for throughput numbers, use the Rust workflow or see the repository README benchmark snapshot.
Observability (failure/alert hooks)
Pass observer / alert_at_or_above in the ingestion options dict (see python-wrapper/API.md § Ingestion observability):
import rust_data_processing as rdp
schema = [{"name": "id", "data_type": "int64"}]
def on_alert(ctx, severity, message):
print(severity, message)
try:
rdp.ingest_from_path(
"does_not_exist.csv",
schema,
{
"format": "csv",
"alert_at_or_above": "critical",
"observer": {"on_alert": on_alert},
},
)
except ValueError:
pass
See also
python-wrapper/README.md— install and dev workflowpython-wrapper/API.md— full Python APIdocs/rust/README.md— Rust mirror of this pagedocs/REDUCE_AGG_SEMANTICS.md— aggregate semantics
Phase 2 — Python examples
Copy-paste snippets for export / JSONL, privacy summaries, reports truncation, UTF-8 transforms, string-length validation, median reductions, and Delta/Iceberg handoff. Requires pip install rust-data-processing (or maturin develop from python-wrapper/).
1. JSON Lines export and train/test indices
import rust_data_processing as rdp
schema = [{"name": "id", "data_type": "int64"}, {"name": "label", "data_type": "utf8"}]
rows = [[1, "ok"], [2, "ok"], [3, "holdout"]]
ds = rdp.DataSet(schema, rows)
text = rdp.export_dataset_jsonl(ds, ["id", "label"])
print(text) # one JSON object per line
records = rdp.export_jsonl_records(ds, ["id", "label"]) # list[dict]
train_idx, test_idx = rdp.export_train_test_row_indices(ds.row_count(), test_fraction=0.34)
train_rows = [rows[i] for i in train_idx]
test_rows = [rows[i] for i in test_idx]
2. UTF-8 length filter (export helper)
import rust_data_processing as rdp
ds = rdp.DataSet(
[{"name": "msg", "data_type": "utf8"}],
[["hi"], ["this is too long for a tweet style cap"]],
)
short_only = rdp.export_filter_rows_max_utf8_chars(ds, "msg", max_chars=10)
3. Privacy diff reports (after transform_apply)
import rust_data_processing as rdp
schema = [{"name": "email", "data_type": "utf8"}]
before = rdp.DataSet(schema, [["user@company.com"]])
spec = {
"output_schema": {"fields": [{"name": "email", "data_type": "Utf8"}]},
"steps": [
{
"Utf8RedactMiddle": {
"column": "email",
"keep_left": 2,
"keep_right": 0,
"redaction": "***",
}
}
],
}
after = rdp.transform_apply(before, spec)
rows = rdp.privacy_summarize_utf8_changes(before, after, ["email"]) # list of dicts
md = rdp.privacy_summarize_utf8_changes(before, after, ["email"], as_markdown=True)
4. Truncate large JSON / text for logs or LLM context
import rust_data_processing as rdp
blob = '{"profile": ' + ("x" * 5000) + "}"
snippet = rdp.reports_truncate_utf8_bytes(blob, max_bytes=256)
5. UTF-8 masking transforms (TransformSpec)
Supported step variants: Utf8Truncate, Utf8Sha256Hex, Utf8RedactMiddle (see Rust enum names in JSON). Use transform_apply(dataset, spec_dict) so you do not hand-serialize JSON.
import rust_data_processing as rdp
ds = rdp.DataSet([{"name": "s", "data_type": "utf8"}], [["secret-value"]])
spec = {
"output_schema": {"fields": [{"name": "s", "data_type": "Utf8"}]},
"steps": [{"Utf8Sha256Hex": {"column": "s"}}],
}
out = rdp.transform_apply(ds, spec)
6. Validation: UTF-8 length (Unicode scalars)
import rust_data_processing as rdp
ds = rdp.DataSet([{"name": "code", "data_type": "utf8"}], [["AB"], ["ABCDE"]])
rep = rdp.validate_dataset(
ds,
{
"checks": [
{
"kind": "utf8_len_chars_between",
"column": "code",
"min_chars": 3,
"max_chars": 10,
"severity": "warn",
}
]
},
)
7. Median: processing_reduce and DataFrame
import rust_data_processing as rdp
ds = rdp.DataSet([{"name": "x", "data_type": "int64"}], [[10], [20], [30], [40]])
assert rdp.processing_reduce(ds, "x", "median") == 25.0
ds2 = rdp.DataSet(
[{"name": "g", "data_type": "utf8"}, {"name": "v", "data_type": "int64"}],
[["a", 1], ["a", 100]],
)
lf = rdp.DataFrame.from_dataset(ds2)
m = lf.group_by(["g"], [{"type": "median", "column": "v", "alias": "m"}]).collect()
8. Delta Lake / Iceberg (handoff — not in-process in default wheel)
Use Python deltalake or PyIceberg to read a table, write Parquet (or build rows), then:
import rust_data_processing as rdp
# After you materialize a Parquet path from deltalake / Spark:
schema = rdp.infer_schema_from_path("exported_slice.parquet")
ds = rdp.ingest_from_path("exported_slice.parquet", schema)
Details: LAKE_TABLE_READ.md, ADR_P2_E2_LAKE_TABLE_READ.md.
9. End-to-end: ingest → validate → JSONL (tabular QA)
import rust_data_processing as rdp
# Point at a real CSV path on disk.
path = "tests/fixtures/people.csv" # from repo root; use absolute path in notebooks
schema = rdp.infer_schema_from_path(path)
ds = rdp.ingest_from_path(path, schema)
_ = rdp.profile_dataset(ds, {"sampling": "full"})
rep = rdp.validate_dataset(ds, {"checks": [{"kind": "not_null", "column": schema[0]["name"], "severity": "warn"}]})
cols = [f["name"] for f in schema]
jl = rdp.export_dataset_jsonl(ds, cols)
10. Incremental ETL helpers (watermark, ordered paths, Hive-style discovery)
These shipped in Phase 2 / P2-E1 and are exposed on the same Python module.
import rust_data_processing as rdp
# Watermark: after ingest, keep rows strictly above a floor (set both keys together).
opts = {"watermark_column": "ts", "watermark_exclusive_above": 100}
# ds = rdp.ingest_from_path("events.csv", schema, opts)
# Append-only batch: concatenate files then apply watermark once.
# ds, meta = rdp.ingest_from_ordered_paths(["/data/a.csv", "/data/b.csv"], schema, opts)
# Hive-style layout: discover partitioned files under a root.
# files = rdp.discover_hive_partitioned_files("s3://bucket/prefix", file_pattern="*.parquet")
# paths = rdp.paths_from_directory_scan("/data/events", relative_pattern="*.csv")
See API.md § Ingestion and Incremental / watermark.
See also
SFT_PYTHON_EXAMPLES.md— Alpaca-style NDJSON + optional Hugging Facedatasets+ JSONL export for trainers.API.md— full function table.README.md— longer tour including Phase 1 APIs.notebooks/README.md— Jupyter entrypoints.
SFT / fine-tuning data — runnable Python examples (rust_data_processing)
These examples use rust_data_processing (rdp) only for the tabular steps: ingest, profile, validate, optional TransformSpec, JSONL export, and deterministic train/test row indices. They do not run training or ship a tokenizer.
Prerequisite: pip install rust-data-processing (or maturin develop from python-wrapper/).
Trainer warning: chat templates and tokenizers belong in TRL / HF / Llama-Factory — see SFT_DATA_FORMATS.md.
1. Alpaca-style NDJSON (committed sample in this repo)
The file examples/sft/sample_alpaca.ndjson has four rows with instruction, input, and output (UTF-8). Run from the repository root, or build an absolute path to that file:
from pathlib import Path
import rust_data_processing as rdp
path = Path("examples/sft/sample_alpaca.ndjson").resolve()
schema = [
{"name": "instruction", "data_type": "utf8"},
{"name": "input", "data_type": "utf8"},
{"name": "output", "data_type": "utf8"},
]
# `.ndjson` is detected as newline-delimited JSON (no need to force `format`).
ds = rdp.ingest_from_path(str(path), schema)
assert ds.row_count() == 4
prof = rdp.profile_dataset(ds, {"sampling": "full"})
print("profile row_count", prof["row_count"])
# Example QA: flag rows with very short outputs (policy is yours)
rep = rdp.validate_dataset(
ds,
{
"checks": [
{
"kind": "utf8_len_chars_between",
"column": "output",
"min_chars": 2,
"max_chars": 500,
"severity": "warn",
}
]
},
)
print("failed_checks", rep["summary"]["failed_checks"])
# Stable JSONL for a trainer that expects instruction/input/output columns
cols = ["instruction", "input", "output"]
text = rdp.export_dataset_jsonl(ds, cols)
print(text.splitlines()[0][:80], "...")
# Deterministic tail holdout (same semantics as Rust `train_test_row_indices`)
train_idx, test_idx = rdp.export_train_test_row_indices(ds.row_count(), test_fraction=0.25)
rows = ds.to_rows()
train_ds = rdp.DataSet(schema, [rows[i] for i in train_idx])
test_ds = rdp.DataSet(schema, [rows[i] for i in test_idx])
2. “Messages” / chat column as a single JSON string
Some pipelines store one UTF-8 column whose cell is JSON: [{"role":"user","content":"..."}, ...]. Keep it as Utf8; validate length; export as one field per JSONL line.
import json
import rust_data_processing as rdp
messages = [
{"role": "user", "content": "Summarize: Rust is a systems language."},
{"role": "assistant", "content": "Rust is a systems programming language focused on safety."},
]
schema = [{"name": "messages", "data_type": "utf8"}]
ds = rdp.DataSet(schema, [[json.dumps(messages)]])
jl = rdp.export_dataset_jsonl(ds, ["messages"])
print(jl.strip())
Your trainer may expect a different key (conversations, etc.) — rename in a TransformSpec or when building rows.
3. Optional: slice a well-known public dataset with Hugging Face datasets
This requires pip install datasets and network the first time HF caches the split. It is not a dev dependency of this repo.
Alpaca (tatsu-lab; check the current license / terms on the Hugging Face dataset card before production use):
# pip install datasets
from datasets import load_dataset
import rust_data_processing as rdp
hf = load_dataset("tatsu-lab/alpaca", split="train[:8]") # tiny slice for a demo
schema = [
{"name": "instruction", "data_type": "utf8"},
{"name": "input", "data_type": "utf8"},
{"name": "output", "data_type": "utf8"},
]
rows = [
[row["instruction"], row.get("input") or "", row["output"]]
for row in hf
]
ds = rdp.DataSet(schema, rows)
rep = rdp.validate_dataset(
ds,
{"checks": [{"kind": "not_null", "column": "instruction", "severity": "error"}]},
)
print(rdp.export_dataset_jsonl(ds, ["instruction", "output"]))
Other common entrypoints (same pattern: load_dataset → list of rows → DataSet):
- Databricks Dolly — see §4 below (
instruction/context/responseon the dataset card). - OpenAssistant — larger; use a tiny
split="train[:32]"for experiments; map whatever columns you need into a fixedDataSetschema before export.
Always read the dataset card for license, attribution, and allowed use.
4. Databricks Dolly 15k (tiny slice → same QA + JSONL pattern)
The databricks/databricks-dolly-15k split exposes (among others) instruction, context, and response. Map them into the same three-column shape as Alpaca-style tooling (instruction, input, output) by using context as input:
# pip install datasets
from datasets import load_dataset
import rust_data_processing as rdp
hf = load_dataset("databricks/databricks-dolly-15k", split="train[:16]")
schema = [
{"name": "instruction", "data_type": "utf8"},
{"name": "input", "data_type": "utf8"},
{"name": "output", "data_type": "utf8"},
]
rows = [
[r["instruction"], (r.get("context") or "").strip(), r["output"]]
for r in hf
]
ds = rdp.DataSet(schema, rows)
print("rows", ds.row_count(), "failed_checks", rdp.validate_dataset(ds, {"checks": [{"kind": "not_null", "column": "output", "severity": "error"}]})["summary"]["failed_checks"])
print(rdp.export_dataset_jsonl(ds, ["instruction", "input", "output"])[:200], "...")
See also
SFT_DATA_FORMATS.md— column conventions and warnings.PHASE2_EXAMPLES.md— JSONL export, privacy transforms, Delta handoff.API.md— full Python API tables.
1"""User guide: runnable Python examples for `rust_data_processing`. 2 3Source: repository ``docs/python/README.md`` (included below for the HTML docs site), plus 4Phase 2 snippets from ``docs/python/PHASE2_EXAMPLES.md``, and SFT / fine-tuning data prep from 5``docs/python/SFT_PYTHON_EXAMPLES.md``. 6 7---- 8 9.. include:: ../../docs/python/README.md 10 11---- 12 13.. include:: ../../docs/python/PHASE2_EXAMPLES.md 14 15---- 16 17.. include:: ../../docs/python/SFT_PYTHON_EXAMPLES.md 18"""