rust_data_processing

Python bindings for the rust-data-processing Rust crate.

The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than importing rust_data_processing._rust_data_processing directly.

  1"""Python bindings for the `rust-data-processing` Rust crate.
  2
  3The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than
  4importing ``rust_data_processing._rust_data_processing`` directly.
  5"""
  6
  7from __future__ import annotations
  8
  9import json
 10from importlib.metadata import PackageNotFoundError, version
 11from typing import Any, Mapping
 12
 13from . import cdc
 14
 15from ._rust_data_processing import (
 16    DataFrame,
 17    DataSet,
 18    ExecutionEngine,
 19    SqlContext,
 20    detect_outliers_json,
 21    detect_outliers_markdown,
 22    discover_hive_partitioned_files,
 23    export_dataset_jsonl,
 24    export_filter_rows_max_utf8_chars,
 25    export_train_test_row_indices,
 26    extension_version,
 27    ingest_from_db,
 28    ingest_from_db_infer,
 29    ingest_from_ordered_paths,
 30    ingest_from_path,
 31    ingest_from_path_infer,
 32    infer_schema_from_path,
 33    parse_partition_segment,
 34    paths_from_directory_scan,
 35    paths_from_explicit_list,
 36    paths_from_glob,
 37    privacy_summarize_utf8_changes_json,
 38    privacy_summarize_utf8_changes_markdown,
 39    processing_arg_max_row,
 40    processing_arg_min_row,
 41    processing_feature_wise_mean_std,
 42    processing_filter,
 43    processing_map,
 44    processing_reduce,
 45    processing_top_k_by_frequency,
 46    profile_dataset_json,
 47    profile_dataset_markdown,
 48    reports_truncate_utf8_bytes,
 49    sql_query_dataset,
 50    transform_apply_json,
 51    validate_dataset_json,
 52    validate_dataset_markdown,
 53)
 54
 55try:
 56    __version__ = version("rust-data-processing")
 57except PackageNotFoundError:
 58    __version__ = extension_version()
 59
 60
 61def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None):
 62    """Infer schema once, then ingest (two passes over the file; same as the Rust helper)."""
 63    schema = infer_schema_from_path(path, options)
 64    return ingest_from_path(path, schema, options), schema
 65
 66
 67def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet:
 68    """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape)."""
 69    if isinstance(spec, str):
 70        payload = spec
 71    else:
 72        payload = json.dumps(spec)
 73    return transform_apply_json(dataset, payload)
 74
 75
 76def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]:
 77    """Return profiling report as a dict (parsed JSON)."""
 78    return json.loads(profile_dataset_json(dataset, options))
 79
 80
 81def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]:
 82    """Run validation checks; return report dict (parsed JSON)."""
 83    return json.loads(validate_dataset_json(dataset, spec))
 84
 85
 86def detect_outliers(
 87    dataset: DataSet,
 88    column: str,
 89    method: Mapping[str, Any],
 90    options: dict[str, Any] | None = None,
 91) -> dict[str, Any]:
 92    """Outlier report as dict (parsed JSON)."""
 93    return json.loads(detect_outliers_json(dataset, column, method, options))
 94
 95
 96def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]:
 97    """Parse :func:`export_dataset_jsonl` output into a list of row dicts."""
 98    text = export_dataset_jsonl(dataset, columns)
 99    return [json.loads(line) for line in text.splitlines() if line.strip()]
100
101
102def privacy_summarize_utf8_changes(
103    before: DataSet,
104    after: DataSet,
105    columns: list[str],
106    *,
107    as_markdown: bool = False,
108) -> list[dict[str, Any]] | str:
109    """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string."""
110    if as_markdown:
111        return privacy_summarize_utf8_changes_markdown(before, after, columns)
112    return json.loads(privacy_summarize_utf8_changes_json(before, after, columns))
113
114
115__all__ = [
116    "DataFrame",
117    "DataSet",
118    "ExecutionEngine",
119    "SqlContext",
120    "__version__",
121    "cdc",
122    "detect_outliers",
123    "detect_outliers_json",
124    "detect_outliers_markdown",
125    "discover_hive_partitioned_files",
126    "export_dataset_jsonl",
127    "export_filter_rows_max_utf8_chars",
128    "export_jsonl_records",
129    "export_train_test_row_indices",
130    "extension_version",
131    "ingest_from_db",
132    "ingest_from_db_infer",
133    "ingest_from_ordered_paths",
134    "ingest_from_path",
135    "ingest_from_path_infer",
136    "ingest_with_inferred_schema",
137    "infer_schema_from_path",
138    "parse_partition_segment",
139    "paths_from_directory_scan",
140    "paths_from_explicit_list",
141    "paths_from_glob",
142    "privacy_summarize_utf8_changes",
143    "privacy_summarize_utf8_changes_json",
144    "privacy_summarize_utf8_changes_markdown",
145    "processing_arg_max_row",
146    "processing_arg_min_row",
147    "processing_feature_wise_mean_std",
148    "processing_filter",
149    "processing_map",
150    "processing_reduce",
151    "processing_top_k_by_frequency",
152    "profile_dataset",
153    "profile_dataset_json",
154    "profile_dataset_markdown",
155    "reports_truncate_utf8_bytes",
156    "sql_query_dataset",
157    "transform_apply",
158    "transform_apply_json",
159    "validate_dataset",
160    "validate_dataset_json",
161    "validate_dataset_markdown",
162]
class DataFrame:

Polars-backed lazy pipeline; collect to [DataSet] when ready.

def from_dataset(ds):
def filter_eq(self, /, column, value):
def filter_not_null(self, /, column):
def filter_mod_eq_int64(self, /, column, modulus, equals):
def select(self, /, columns):
def rename(self, /, pairs):
def drop(self, /, columns):
def cast(self, /, column, to):
def cast_with_mode(self, /, column, to, mode):
def fill_null(self, /, column, value):
def with_literal(self, /, name, value):
def multiply_f64(self, /, column, factor):
def add_f64(self, /, column, delta):
def with_mul_f64(self, /, name, source, factor):
def with_add_f64(self, /, name, source, delta):
def group_by(self, /, keys, aggs):
def join(self, /, other, left_on, right_on, how):
def collect(self, /):
def collect_with_schema(self, /, schema):
def reduce(self, /, column, op):
def sum(self, /, column):
def feature_wise_mean_std(self, /, columns, std_kind=None):
class DataSet:

In-memory tabular dataset (mirrors rust_data_processing::types::DataSet).

def row_count(self, /):
def column_names(self, /):
def schema(self, /):
def to_rows(self, /):
class ExecutionEngine:

Configurable Rayon-backed engine: parallel filter/map (Python row callbacks acquire the GIL per row), sequential reduce, and optional on_execution_event hook.

def filter_parallel(self, /, ds, predicate):
def map_parallel(self, /, ds, mapper):
def reduce(self, /, ds, column, op):
def metrics_snapshot(self, /):
class SqlContext:

Multi-table SQL context (register several pipeline frames, then execute).

def register(self, /, name, df):
def execute(self, /, sql):
__version__ = '0.2.2'
def detect_outliers( dataset: DataSet, column: str, method: Mapping[str, Any], options: dict[str, typing.Any] | None = None) -> dict[str, typing.Any]:
87def detect_outliers(
88    dataset: DataSet,
89    column: str,
90    method: Mapping[str, Any],
91    options: dict[str, Any] | None = None,
92) -> dict[str, Any]:
93    """Outlier report as dict (parsed JSON)."""
94    return json.loads(detect_outliers_json(dataset, column, method, options))

Outlier report as dict (parsed JSON).

def detect_outliers_json(ds, column, method, options=None):
def detect_outliers_markdown(ds, column, method, options=None):
def discover_hive_partitioned_files(root, file_pattern=None):

Discover files under a Hive-style key=value directory tree (see Rust ingestion::partition rustdoc).

Returns a list of dicts: {"path": str, "segments": [{"key": str, "value": str}, ...]}.

def export_dataset_jsonl(ds, columns):
def export_filter_rows_max_utf8_chars(ds, column, max_chars):

Drop rows where column (Utf8) exceeds max_chars Unicode scalars; nulls kept.

def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, typing.Any]]:
 97def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]:
 98    """Parse :func:`export_dataset_jsonl` output into a list of row dicts."""
 99    text = export_dataset_jsonl(dataset, columns)
100    return [json.loads(line) for line in text.splitlines() if line.strip()]

Parse export_dataset_jsonl() output into a list of row dicts.

def export_train_test_row_indices(row_count, test_fraction):

Deterministic train/test row index split: (train_indices, test_indices) as two lists of int.

def extension_version():
def ingest_from_db(conn, query, schema, options=None):
def ingest_from_db_infer(conn, query, options=None):
def ingest_from_ordered_paths(paths, schema, options=None):

Ingest an ordered list of files, concatenate rows, apply watermark once; returns (dataset, metadata_dict).

def ingest_from_path(path, schema, options=None):
def ingest_from_path_infer(path, options=None):
def ingest_with_inferred_schema(path: str, options: dict[str, typing.Any] | None = None):
62def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None):
63    """Infer schema once, then ingest (two passes over the file; same as the Rust helper)."""
64    schema = infer_schema_from_path(path, options)
65    return ingest_from_path(path, schema, options), schema

Infer schema once, then ingest (two passes over the file; same as the Rust helper).

def infer_schema_from_path(path, options=None):
def parse_partition_segment(component):

Parse a single path component as key=value, or return None if invalid.

def paths_from_directory_scan(root, relative_pattern=None):

List files under root (recursive), optional glob on path relative to root; sorted for stable ordering.

def paths_from_explicit_list(paths):

Validate paths exist as files; return them in order with duplicates removed (first wins).

def paths_from_glob(pattern):

Expand a filesystem glob to existing file paths (sorted).

def privacy_summarize_utf8_changes( before: DataSet, after: DataSet, columns: list[str], *, as_markdown: bool = False) -> list[dict[str, typing.Any]] | str:
103def privacy_summarize_utf8_changes(
104    before: DataSet,
105    after: DataSet,
106    columns: list[str],
107    *,
108    as_markdown: bool = False,
109) -> list[dict[str, Any]] | str:
110    """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string."""
111    if as_markdown:
112        return privacy_summarize_utf8_changes_markdown(before, after, columns)
113    return json.loads(privacy_summarize_utf8_changes_json(before, after, columns))

UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.

def privacy_summarize_utf8_changes_json(before, after, columns):
def privacy_summarize_utf8_changes_markdown(before, after, columns):
def processing_arg_max_row(ds, column):
def processing_arg_min_row(ds, column):
def processing_feature_wise_mean_std(ds, columns, std_kind=None):
def processing_filter(ds, predicate):
def processing_map(ds, mapper):
def processing_reduce(ds, column, op):
def processing_top_k_by_frequency(ds, column, k):
def profile_dataset( dataset: DataSet, options: dict[str, typing.Any] | None = None) -> dict[str, typing.Any]:
77def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]:
78    """Return profiling report as a dict (parsed JSON)."""
79    return json.loads(profile_dataset_json(dataset, options))

Return profiling report as a dict (parsed JSON).

def profile_dataset_json(ds, options=None):
def profile_dataset_markdown(ds, options=None):
def reports_truncate_utf8_bytes(text, max_bytes):
def sql_query_dataset(ds, sql):
def transform_apply(dataset: DataSet, spec: Union[Mapping[str, Any], str]) -> DataSet:
68def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet:
69    """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape)."""
70    if isinstance(spec, str):
71        payload = spec
72    else:
73        payload = json.dumps(spec)
74    return transform_apply_json(dataset, payload)

Apply a TransformSpec given as JSON string or dict (serde shape).

def transform_apply_json(ds, spec_json):
def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, typing.Any]:
82def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]:
83    """Run validation checks; return report dict (parsed JSON)."""
84    return json.loads(validate_dataset_json(dataset, spec))

Run validation checks; return report dict (parsed JSON).

def validate_dataset_json(ds, spec):
def validate_dataset_markdown(ds, spec):