rust_data_processing
Python bindings for the rust-data-processing Rust crate.
The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than
importing rust_data_processing._rust_data_processing directly.
1"""Python bindings for the `rust-data-processing` Rust crate. 2 3The native extension is built with PyO3 and maturin. Prefer the APIs exported here rather than 4importing ``rust_data_processing._rust_data_processing`` directly. 5""" 6 7from __future__ import annotations 8 9import json 10from importlib.metadata import PackageNotFoundError, version 11from typing import Any, Mapping 12 13from . import cdc 14 15from ._rust_data_processing import ( 16 DataFrame, 17 DataSet, 18 ExecutionEngine, 19 SqlContext, 20 detect_outliers_json, 21 detect_outliers_markdown, 22 discover_hive_partitioned_files, 23 export_dataset_jsonl, 24 export_filter_rows_max_utf8_chars, 25 export_train_test_row_indices, 26 extension_version, 27 ingest_from_db, 28 ingest_from_db_infer, 29 ingest_from_ordered_paths, 30 ingest_from_path, 31 ingest_from_path_infer, 32 infer_schema_from_path, 33 parse_partition_segment, 34 paths_from_directory_scan, 35 paths_from_explicit_list, 36 paths_from_glob, 37 privacy_summarize_utf8_changes_json, 38 privacy_summarize_utf8_changes_markdown, 39 processing_arg_max_row, 40 processing_arg_min_row, 41 processing_feature_wise_mean_std, 42 processing_filter, 43 processing_map, 44 processing_reduce, 45 processing_top_k_by_frequency, 46 profile_dataset_json, 47 profile_dataset_markdown, 48 reports_truncate_utf8_bytes, 49 sql_query_dataset, 50 transform_apply_json, 51 validate_dataset_json, 52 validate_dataset_markdown, 53) 54 55try: 56 __version__ = version("rust-data-processing") 57except PackageNotFoundError: 58 __version__ = extension_version() 59 60 61def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None): 62 """Infer schema once, then ingest (two passes over the file; same as the Rust helper).""" 63 schema = infer_schema_from_path(path, options) 64 return ingest_from_path(path, schema, options), schema 65 66 67def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet: 68 """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape).""" 69 if isinstance(spec, str): 70 payload = spec 71 else: 72 payload = json.dumps(spec) 73 return transform_apply_json(dataset, payload) 74 75 76def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]: 77 """Return profiling report as a dict (parsed JSON).""" 78 return json.loads(profile_dataset_json(dataset, options)) 79 80 81def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]: 82 """Run validation checks; return report dict (parsed JSON).""" 83 return json.loads(validate_dataset_json(dataset, spec)) 84 85 86def detect_outliers( 87 dataset: DataSet, 88 column: str, 89 method: Mapping[str, Any], 90 options: dict[str, Any] | None = None, 91) -> dict[str, Any]: 92 """Outlier report as dict (parsed JSON).""" 93 return json.loads(detect_outliers_json(dataset, column, method, options)) 94 95 96def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]: 97 """Parse :func:`export_dataset_jsonl` output into a list of row dicts.""" 98 text = export_dataset_jsonl(dataset, columns) 99 return [json.loads(line) for line in text.splitlines() if line.strip()] 100 101 102def privacy_summarize_utf8_changes( 103 before: DataSet, 104 after: DataSet, 105 columns: list[str], 106 *, 107 as_markdown: bool = False, 108) -> list[dict[str, Any]] | str: 109 """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.""" 110 if as_markdown: 111 return privacy_summarize_utf8_changes_markdown(before, after, columns) 112 return json.loads(privacy_summarize_utf8_changes_json(before, after, columns)) 113 114 115__all__ = [ 116 "DataFrame", 117 "DataSet", 118 "ExecutionEngine", 119 "SqlContext", 120 "__version__", 121 "cdc", 122 "detect_outliers", 123 "detect_outliers_json", 124 "detect_outliers_markdown", 125 "discover_hive_partitioned_files", 126 "export_dataset_jsonl", 127 "export_filter_rows_max_utf8_chars", 128 "export_jsonl_records", 129 "export_train_test_row_indices", 130 "extension_version", 131 "ingest_from_db", 132 "ingest_from_db_infer", 133 "ingest_from_ordered_paths", 134 "ingest_from_path", 135 "ingest_from_path_infer", 136 "ingest_with_inferred_schema", 137 "infer_schema_from_path", 138 "parse_partition_segment", 139 "paths_from_directory_scan", 140 "paths_from_explicit_list", 141 "paths_from_glob", 142 "privacy_summarize_utf8_changes", 143 "privacy_summarize_utf8_changes_json", 144 "privacy_summarize_utf8_changes_markdown", 145 "processing_arg_max_row", 146 "processing_arg_min_row", 147 "processing_feature_wise_mean_std", 148 "processing_filter", 149 "processing_map", 150 "processing_reduce", 151 "processing_top_k_by_frequency", 152 "profile_dataset", 153 "profile_dataset_json", 154 "profile_dataset_markdown", 155 "reports_truncate_utf8_bytes", 156 "sql_query_dataset", 157 "transform_apply", 158 "transform_apply_json", 159 "validate_dataset", 160 "validate_dataset_json", 161 "validate_dataset_markdown", 162]
Polars-backed lazy pipeline; collect to [DataSet] when ready.
In-memory tabular dataset (mirrors rust_data_processing::types::DataSet).
Configurable Rayon-backed engine: parallel filter/map (Python row callbacks acquire the GIL per row),
sequential reduce, and optional on_execution_event hook.
Multi-table SQL context (register several pipeline frames, then execute).
87def detect_outliers( 88 dataset: DataSet, 89 column: str, 90 method: Mapping[str, Any], 91 options: dict[str, Any] | None = None, 92) -> dict[str, Any]: 93 """Outlier report as dict (parsed JSON).""" 94 return json.loads(detect_outliers_json(dataset, column, method, options))
Outlier report as dict (parsed JSON).
Discover files under a Hive-style key=value directory tree (see Rust ingestion::partition rustdoc).
Returns a list of dicts: {"path": str, "segments": [{"key": str, "value": str}, ...]}.
Drop rows where column (Utf8) exceeds max_chars Unicode scalars; nulls kept.
97def export_jsonl_records(dataset: DataSet, columns: list[str]) -> list[dict[str, Any]]: 98 """Parse :func:`export_dataset_jsonl` output into a list of row dicts.""" 99 text = export_dataset_jsonl(dataset, columns) 100 return [json.loads(line) for line in text.splitlines() if line.strip()]
Parse export_dataset_jsonl() output into a list of row dicts.
Deterministic train/test row index split: (train_indices, test_indices) as two lists of int.
Ingest an ordered list of files, concatenate rows, apply watermark once; returns (dataset, metadata_dict).
62def ingest_with_inferred_schema(path: str, options: dict[str, Any] | None = None): 63 """Infer schema once, then ingest (two passes over the file; same as the Rust helper).""" 64 schema = infer_schema_from_path(path, options) 65 return ingest_from_path(path, schema, options), schema
Infer schema once, then ingest (two passes over the file; same as the Rust helper).
Parse a single path component as key=value, or return None if invalid.
List files under root (recursive), optional glob on path relative to root; sorted for stable ordering.
Validate paths exist as files; return them in order with duplicates removed (first wins).
Expand a filesystem glob to existing file paths (sorted).
103def privacy_summarize_utf8_changes( 104 before: DataSet, 105 after: DataSet, 106 columns: list[str], 107 *, 108 as_markdown: bool = False, 109) -> list[dict[str, Any]] | str: 110 """UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.""" 111 if as_markdown: 112 return privacy_summarize_utf8_changes_markdown(before, after, columns) 113 return json.loads(privacy_summarize_utf8_changes_json(before, after, columns))
UTF-8 diff summary per column: parsed JSON list (default) or Markdown string.
77def profile_dataset(dataset: DataSet, options: dict[str, Any] | None = None) -> dict[str, Any]: 78 """Return profiling report as a dict (parsed JSON).""" 79 return json.loads(profile_dataset_json(dataset, options))
Return profiling report as a dict (parsed JSON).
68def transform_apply(dataset: DataSet, spec: Mapping[str, Any] | str) -> DataSet: 69 """Apply a :class:`TransformSpec` given as JSON string or dict (serde shape).""" 70 if isinstance(spec, str): 71 payload = spec 72 else: 73 payload = json.dumps(spec) 74 return transform_apply_json(dataset, payload)
Apply a TransformSpec given as JSON string or dict (serde shape).
82def validate_dataset(dataset: DataSet, spec: Mapping[str, Any]) -> dict[str, Any]: 83 """Run validation checks; return report dict (parsed JSON).""" 84 return json.loads(validate_dataset_json(dataset, spec))
Run validation checks; return report dict (parsed JSON).