Expand description
rust-data-processing is a small library for ingesting common file formats into an in-memory
types::DataSet, using a user-provided types::Schema.
The primary entrypoint is ingestion::ingest_from_path, which can auto-detect the ingestion
format from the file extension (or you can force a format via ingestion::IngestionOptions).
§What you can ingest (Epic 1 / Story 1.1)
File formats (auto-detected by extension):
- CSV:
.csv - JSON:
.json(array-of-objects) and.ndjson(newline-delimited objects) - Parquet:
.parquet,.pq - Excel/workbooks:
.xlsx,.xls,.xlsm,.xlsb,.ods
Schema + value types:
Ingestion produces a types::DataSet whose cells are typed types::Values matching a
user-provided types::Schema. Supported logical types are:
Across formats, empty cells / empty strings / explicit JSON null map to types::Value::Null.
§Quick examples: ingest data
use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
use rust_data_processing::types::{DataType, Field, Schema};
let schema = Schema::new(vec![
Field::new("id", DataType::Int64),
Field::new("name", DataType::Utf8),
]);
// Auto-detects by extension (.csv/.json/.parquet/.xlsx/...).
let ds = ingest_from_path("data.csv", &schema, &IngestionOptions::default())?;
println!("rows={}", ds.row_count());JSON supports nested field paths using dot notation in the schema (e.g. user.name):
use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
use rust_data_processing::types::{DataType, Field, Schema};
let schema = Schema::new(vec![
Field::new("id", DataType::Int64),
Field::new("user.name", DataType::Utf8),
]);
let ds = ingest_from_path("events.ndjson", &schema, &IngestionOptions::default())?;
println!("rows={}", ds.row_count());§Modules
ingestion: unified ingestion entrypoints and format-specific implementationstypes: schema + in-memory dataset typesprocessing: in-memory dataset transformations (filter/map/reduce, feature-wise stats, arg max/min, top‑k frequency)execution: execution engine for parallel pipelines + throttling + metricssql: SQL support (Polars-backed; enabled by default)transform: serde-friendly transformation spec compiled to pipeline wrappersprofiling: Polars-backed profiling metrics + sampling modesvalidation: validation DSL + built-in checks + reportingoutliers: outlier detection primitives + explainable outputscdc: CDC boundary types (Phase 1 spike)error: error types used across ingestion
§Processing example (1.2 pipeline)
use rust_data_processing::processing::{filter, map, reduce, ReduceOp};
use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
let schema = Schema::new(vec![
Field::new("id", DataType::Int64),
Field::new("active", DataType::Bool),
Field::new("score", DataType::Float64),
]);
let ds = DataSet::new(
schema,
vec![
vec![Value::Int64(1), Value::Bool(true), Value::Float64(10.0)],
vec![Value::Int64(2), Value::Bool(false), Value::Float64(20.0)],
vec![Value::Int64(3), Value::Bool(true), Value::Null],
],
);
let active_idx = ds.schema.index_of("active").unwrap();
let filtered = filter(&ds, |row| matches!(row.get(active_idx), Some(Value::Bool(true))));
let mapped = map(&filtered, |row| {
let mut out = row.to_vec();
// score *= 2.0
if let Some(Value::Float64(v)) = out.get(2) {
out[2] = Value::Float64(v * 2.0);
}
out
});
let sum = reduce(&mapped, "score", ReduceOp::Sum).unwrap();
assert_eq!(sum, Value::Float64(20.0));§Execution engine example (1.3 parallel pipeline)
use rust_data_processing::execution::{ExecutionEngine, ExecutionOptions};
use rust_data_processing::processing::ReduceOp;
use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
let schema = Schema::new(vec![
Field::new("id", DataType::Int64),
Field::new("active", DataType::Bool),
Field::new("score", DataType::Float64),
]);
let ds = DataSet::new(
schema,
vec![
vec![Value::Int64(1), Value::Bool(true), Value::Float64(10.0)],
vec![Value::Int64(2), Value::Bool(false), Value::Float64(20.0)],
vec![Value::Int64(3), Value::Bool(true), Value::Null],
],
);
let engine = ExecutionEngine::new(ExecutionOptions {
num_threads: Some(4),
chunk_size: 1_024,
max_in_flight_chunks: 4,
});
let active_idx = ds.schema.index_of("active").unwrap();
let filtered = engine.filter_parallel(&ds, |row| matches!(row.get(active_idx), Some(Value::Bool(true))));
let mapped = engine.map_parallel(&filtered, |row| row.to_vec());
let sum = engine.reduce(&mapped, "score", ReduceOp::Sum).unwrap();
assert_eq!(sum, Value::Float64(30.0));
let snapshot = engine.metrics().snapshot();
println!("rows_processed={}", snapshot.rows_processed);§Quick examples: Phase 1 modules
§TransformSpec (declarative ETL)
use rust_data_processing::pipeline::CastMode;
use rust_data_processing::transform::{TransformSpec, TransformStep};
use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
let ds = DataSet::new(
Schema::new(vec![
Field::new("id", DataType::Int64),
Field::new("score", DataType::Int64),
]),
vec![vec![Value::Int64(1), Value::Int64(10)], vec![Value::Int64(2), Value::Null]],
);
let out_schema = Schema::new(vec![
Field::new("id", DataType::Int64),
Field::new("score_f", DataType::Float64),
]);
let spec = TransformSpec::new(out_schema.clone())
.with_step(TransformStep::Rename { pairs: vec![("score".to_string(), "score_f".to_string())] })
.with_step(TransformStep::Cast { column: "score_f".to_string(), to: DataType::Float64, mode: CastMode::Lossy })
.with_step(TransformStep::FillNull { column: "score_f".to_string(), value: Value::Float64(0.0) });
let out = spec.apply(&ds).unwrap();
assert_eq!(out.schema, out_schema);§Profiling (metrics + deterministic sampling)
use rust_data_processing::profiling::{profile_dataset, ProfileOptions, SamplingMode};
use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
let ds = DataSet::new(
Schema::new(vec![Field::new("x", DataType::Float64)]),
vec![vec![Value::Float64(1.0)], vec![Value::Null], vec![Value::Float64(3.0)]],
);
let rep = profile_dataset(
&ds,
&ProfileOptions { sampling: SamplingMode::Head(2), quantiles: vec![0.5] },
)
.unwrap();
assert_eq!(rep.row_count, 2);§Validation (DSL + reporting)
use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
use rust_data_processing::validation::{validate_dataset, Check, Severity, ValidationSpec};
let ds = DataSet::new(
Schema::new(vec![Field::new("email", DataType::Utf8)]),
vec![vec![Value::Utf8("ada@example.com".to_string())], vec![Value::Null]],
);
let spec = ValidationSpec::new(vec![
Check::NotNull { column: "email".to_string(), severity: Severity::Error },
]);
let rep = validate_dataset(&ds, &spec).unwrap();
assert_eq!(rep.summary.total_checks, 1);§Outliers (IQR / z-score / MAD)
use rust_data_processing::outliers::{detect_outliers_dataset, OutlierMethod, OutlierOptions};
use rust_data_processing::profiling::SamplingMode;
use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
let ds = DataSet::new(
Schema::new(vec![Field::new("x", DataType::Float64)]),
vec![
vec![Value::Float64(1.0)],
vec![Value::Float64(1.0)],
vec![Value::Float64(1.0)],
vec![Value::Float64(1000.0)],
],
);
let rep = detect_outliers_dataset(
&ds,
"x",
OutlierMethod::Iqr { k: 1.5 },
&OutlierOptions { sampling: SamplingMode::Full, max_examples: 3 },
)
.unwrap();
assert!(rep.outlier_count >= 1);§SQL (Polars-backed)
use rust_data_processing::pipeline::DataFrame;
use rust_data_processing::sql;
use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
let ds = DataSet::new(
Schema::new(vec![Field::new("id", DataType::Int64), Field::new("active", DataType::Bool)]),
vec![vec![Value::Int64(1), Value::Bool(true)]],
);
let out = sql::query(&DataFrame::from_dataset(&ds)?, "SELECT id FROM df WHERE active = TRUE")?
.collect()?;
assert_eq!(out.row_count(), 1);For more end-to-end examples, see the repository README.md and API.md (processing / aggregates).
Aggregate semantics: docs/REDUCE_AGG_SEMANTICS.md.
§Reduce operations
processing::ReduceOp::Count: counts rows (including nulls)processing::ReduceOp::Sum,processing::ReduceOp::Min,processing::ReduceOp::Max: operate on numeric columns and ignore nulls. If all values are null, these returnSome(Value::Null).processing::ReduceOp::Mean,processing::ReduceOp::Variance,processing::ReduceOp::StdDev: use a numerically stable one-pass (Welford) accumulation; mean / sum-of-squares / L2 norm are returned astypes::Value::Float64. Sample variance / std dev require at least two values.processing::ReduceOp::CountDistinctNonNull: distinct non-null values (also for UTF-8 and bool).pipeline::DataFrame::reduceprovides the Polars-backed equivalent for whole-frame scalars.processing::feature_wise_mean_std: one scan, mean + std for several numeric columns;pipeline::DataFrame::feature_wise_mean_stdfor Polars.processing::arg_max_row,processing::arg_min_row,processing::top_k_by_frequency: row extrema and label top‑k.
Re-exports§
pub use error::IngestionError;pub use error::IngestionResult;
Modules§
- cdc
- CDC interface boundary (Phase 1 spike).
- error
- execution
- Execution engine for running processing pipelines with configurable parallelism.
- ingestion
- Ingestion entrypoints and implementations.
- outliers
- Outlier detection (Phase 1).
- pipeline
- DataFrame-centric pipeline/transforms backed by a Polars lazy plan.
- processing
- In-memory data transformations.
- profiling
- Profiling (Phase 1).
- sql
- SQL support (Polars-backed).
- transform
- Transformation specifications and helpers.
- types
- Core data model types for ingestion.
- validation
- Validation (Phase 1).