rust_data_processing/lib.rs
1//! `rust-data-processing` is a small library for ingesting common file formats into an in-memory
2//! [`types::DataSet`], using a user-provided [`types::Schema`].
3//!
4//! The primary entrypoint is [`ingestion::ingest_from_path`], which can auto-detect the ingestion
5//! format from the file extension (or you can force a format via [`ingestion::IngestionOptions`]).
6//!
7//! ## What you can ingest (Epic 1 / Story 1.1)
8//!
9//! **File formats (auto-detected by extension):**
10//!
11//! - **CSV**: `.csv`
12//! - **JSON**: `.json` (array-of-objects) and `.ndjson` (newline-delimited objects)
13//! - **Parquet**: `.parquet`, `.pq`
14//! - **Excel/workbooks**: `.xlsx`, `.xls`, `.xlsm`, `.xlsb`, `.ods`
15//!
16//! **Schema + value types:**
17//!
18//! Ingestion produces a [`types::DataSet`] whose cells are typed [`types::Value`]s matching a
19//! user-provided [`types::Schema`]. Supported logical types are:
20//!
21//! - [`types::DataType::Int64`]
22//! - [`types::DataType::Float64`]
23//! - [`types::DataType::Bool`]
24//! - [`types::DataType::Utf8`]
25//!
26//! Across formats, empty cells / empty strings / explicit JSON `null` map to [`types::Value::Null`].
27//!
28//! ## Quick examples: ingest data
29//!
30//! ```no_run
31//! use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
32//! use rust_data_processing::types::{DataType, Field, Schema};
33//!
34//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
35//! let schema = Schema::new(vec![
36//! Field::new("id", DataType::Int64),
37//! Field::new("name", DataType::Utf8),
38//! ]);
39//! // Auto-detects by extension (.csv/.json/.parquet/.xlsx/...).
40//! let ds = ingest_from_path("data.csv", &schema, &IngestionOptions::default())?;
41//! println!("rows={}", ds.row_count());
42//! # Ok(())
43//! # }
44//! ```
45//!
46//! JSON supports nested field paths using dot notation in the schema (e.g. `user.name`):
47//!
48//! ```no_run
49//! use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
50//! use rust_data_processing::types::{DataType, Field, Schema};
51//!
52//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
53//! let schema = Schema::new(vec![
54//! Field::new("id", DataType::Int64),
55//! Field::new("user.name", DataType::Utf8),
56//! ]);
57//! let ds = ingest_from_path("events.ndjson", &schema, &IngestionOptions::default())?;
58//! println!("rows={}", ds.row_count());
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! ## Modules
64//!
65//! - [`ingestion`]: unified ingestion entrypoints and format-specific implementations
66//! - [`types`]: schema + in-memory dataset types
67//! - [`processing`]: in-memory dataset transformations (filter/map/reduce, feature-wise stats, arg max/min, top‑k frequency)
68//! - [`execution`]: execution engine for parallel pipelines + throttling + metrics
69//! - `sql`: SQL support (Polars-backed; enabled by default)
70//! - [`transform`]: serde-friendly transformation spec compiled to pipeline wrappers
71//! - [`profiling`]: Polars-backed profiling metrics + sampling modes
72//! - [`validation`]: validation DSL + built-in checks + reporting
73//! - [`outliers`]: outlier detection primitives + explainable outputs
74//! - [`cdc`]: CDC boundary types (Phase 1 spike)
75//! - [`error`]: error types used across ingestion
76//!
77//! ## Processing example (1.2 pipeline)
78//!
79//! ```rust
80//! use rust_data_processing::processing::{filter, map, reduce, ReduceOp};
81//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
82//!
83//! let schema = Schema::new(vec![
84//! Field::new("id", DataType::Int64),
85//! Field::new("active", DataType::Bool),
86//! Field::new("score", DataType::Float64),
87//! ]);
88//! let ds = DataSet::new(
89//! schema,
90//! vec![
91//! vec![Value::Int64(1), Value::Bool(true), Value::Float64(10.0)],
92//! vec![Value::Int64(2), Value::Bool(false), Value::Float64(20.0)],
93//! vec![Value::Int64(3), Value::Bool(true), Value::Null],
94//! ],
95//! );
96//!
97//! let active_idx = ds.schema.index_of("active").unwrap();
98//! let filtered = filter(&ds, |row| matches!(row.get(active_idx), Some(Value::Bool(true))));
99//! let mapped = map(&filtered, |row| {
100//! let mut out = row.to_vec();
101//! // score *= 2.0
102//! if let Some(Value::Float64(v)) = out.get(2) {
103//! out[2] = Value::Float64(v * 2.0);
104//! }
105//! out
106//! });
107//!
108//! let sum = reduce(&mapped, "score", ReduceOp::Sum).unwrap();
109//! assert_eq!(sum, Value::Float64(20.0));
110//! ```
111//!
112//! ## Execution engine example (1.3 parallel pipeline)
113//!
114//! ```no_run
115//! use rust_data_processing::execution::{ExecutionEngine, ExecutionOptions};
116//! use rust_data_processing::processing::ReduceOp;
117//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
118//!
119//! # fn main() {
120//! let schema = Schema::new(vec![
121//! Field::new("id", DataType::Int64),
122//! Field::new("active", DataType::Bool),
123//! Field::new("score", DataType::Float64),
124//! ]);
125//! let ds = DataSet::new(
126//! schema,
127//! vec![
128//! vec![Value::Int64(1), Value::Bool(true), Value::Float64(10.0)],
129//! vec![Value::Int64(2), Value::Bool(false), Value::Float64(20.0)],
130//! vec![Value::Int64(3), Value::Bool(true), Value::Null],
131//! ],
132//! );
133//!
134//! let engine = ExecutionEngine::new(ExecutionOptions {
135//! num_threads: Some(4),
136//! chunk_size: 1_024,
137//! max_in_flight_chunks: 4,
138//! });
139//!
140//! let active_idx = ds.schema.index_of("active").unwrap();
141//! let filtered = engine.filter_parallel(&ds, |row| matches!(row.get(active_idx), Some(Value::Bool(true))));
142//! let mapped = engine.map_parallel(&filtered, |row| row.to_vec());
143//! let sum = engine.reduce(&mapped, "score", ReduceOp::Sum).unwrap();
144//! assert_eq!(sum, Value::Float64(30.0));
145//!
146//! let snapshot = engine.metrics().snapshot();
147//! println!("rows_processed={}", snapshot.rows_processed);
148//! # }
149//! ```
150//!
151//! ## Quick examples: Phase 1 modules
152//!
153//! ### TransformSpec (declarative ETL)
154//!
155//! ```rust
156//! use rust_data_processing::pipeline::CastMode;
157//! use rust_data_processing::transform::{TransformSpec, TransformStep};
158//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
159//!
160//! let ds = DataSet::new(
161//! Schema::new(vec![
162//! Field::new("id", DataType::Int64),
163//! Field::new("score", DataType::Int64),
164//! ]),
165//! vec![vec![Value::Int64(1), Value::Int64(10)], vec![Value::Int64(2), Value::Null]],
166//! );
167//!
168//! let out_schema = Schema::new(vec![
169//! Field::new("id", DataType::Int64),
170//! Field::new("score_f", DataType::Float64),
171//! ]);
172//!
173//! let spec = TransformSpec::new(out_schema.clone())
174//! .with_step(TransformStep::Rename { pairs: vec![("score".to_string(), "score_f".to_string())] })
175//! .with_step(TransformStep::Cast { column: "score_f".to_string(), to: DataType::Float64, mode: CastMode::Lossy })
176//! .with_step(TransformStep::FillNull { column: "score_f".to_string(), value: Value::Float64(0.0) });
177//!
178//! let out = spec.apply(&ds).unwrap();
179//! assert_eq!(out.schema, out_schema);
180//! ```
181//!
182//! ### Profiling (metrics + deterministic sampling)
183//!
184//! ```rust
185//! use rust_data_processing::profiling::{profile_dataset, ProfileOptions, SamplingMode};
186//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
187//!
188//! let ds = DataSet::new(
189//! Schema::new(vec![Field::new("x", DataType::Float64)]),
190//! vec![vec![Value::Float64(1.0)], vec![Value::Null], vec![Value::Float64(3.0)]],
191//! );
192//!
193//! let rep = profile_dataset(
194//! &ds,
195//! &ProfileOptions { sampling: SamplingMode::Head(2), quantiles: vec![0.5] },
196//! )
197//! .unwrap();
198//! assert_eq!(rep.row_count, 2);
199//! ```
200//!
201//! ### Validation (DSL + reporting)
202//!
203//! ```rust
204//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
205//! use rust_data_processing::validation::{validate_dataset, Check, Severity, ValidationSpec};
206//!
207//! let ds = DataSet::new(
208//! Schema::new(vec![Field::new("email", DataType::Utf8)]),
209//! vec![vec![Value::Utf8("ada@example.com".to_string())], vec![Value::Null]],
210//! );
211//!
212//! let spec = ValidationSpec::new(vec![
213//! Check::NotNull { column: "email".to_string(), severity: Severity::Error },
214//! ]);
215//!
216//! let rep = validate_dataset(&ds, &spec).unwrap();
217//! assert_eq!(rep.summary.total_checks, 1);
218//! ```
219//!
220//! ### Outliers (IQR / z-score / MAD)
221//!
222//! ```rust
223//! use rust_data_processing::outliers::{detect_outliers_dataset, OutlierMethod, OutlierOptions};
224//! use rust_data_processing::profiling::SamplingMode;
225//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
226//!
227//! let ds = DataSet::new(
228//! Schema::new(vec![Field::new("x", DataType::Float64)]),
229//! vec![
230//! vec![Value::Float64(1.0)],
231//! vec![Value::Float64(1.0)],
232//! vec![Value::Float64(1.0)],
233//! vec![Value::Float64(1000.0)],
234//! ],
235//! );
236//!
237//! let rep = detect_outliers_dataset(
238//! &ds,
239//! "x",
240//! OutlierMethod::Iqr { k: 1.5 },
241//! &OutlierOptions { sampling: SamplingMode::Full, max_examples: 3 },
242//! )
243//! .unwrap();
244//! assert!(rep.outlier_count >= 1);
245//! ```
246//!
247//! ### SQL (Polars-backed)
248//!
249//! ```no_run
250//! # #[cfg(feature = "sql")]
251//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
252//! use rust_data_processing::pipeline::DataFrame;
253//! use rust_data_processing::sql;
254//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
255//!
256//! let ds = DataSet::new(
257//! Schema::new(vec![Field::new("id", DataType::Int64), Field::new("active", DataType::Bool)]),
258//! vec![vec![Value::Int64(1), Value::Bool(true)]],
259//! );
260//! let out = sql::query(&DataFrame::from_dataset(&ds)?, "SELECT id FROM df WHERE active = TRUE")?
261//! .collect()?;
262//! assert_eq!(out.row_count(), 1);
263//! # Ok(())
264//! # }
265//! # #[cfg(not(feature = "sql"))]
266//! # fn main() {}
267//! ```
268//!
269//! For more end-to-end examples, see the repository `README.md` and `API.md` (processing / aggregates).
270//! Aggregate semantics: `docs/REDUCE_AGG_SEMANTICS.md`.
271//!
272//! ### Reduce operations
273//!
274//! - [`processing::ReduceOp::Count`]: counts rows (including nulls)
275//! - [`processing::ReduceOp::Sum`], [`processing::ReduceOp::Min`], [`processing::ReduceOp::Max`]:
276//! operate on numeric columns and ignore nulls. If all values are null, these return
277//! `Some(Value::Null)`.
278//! - [`processing::ReduceOp::Mean`], [`processing::ReduceOp::Variance`], [`processing::ReduceOp::StdDev`]:
279//! use a numerically stable one-pass (Welford) accumulation; mean / sum-of-squares / L2 norm are
280//! returned as [`types::Value::Float64`]. Sample variance / std dev require at least two values.
281//! - [`processing::ReduceOp::CountDistinctNonNull`]: distinct non-null values (also for UTF-8 and bool).
282//! - [`pipeline::DataFrame::reduce`] provides the Polars-backed equivalent for whole-frame scalars.
283//! - [`processing::feature_wise_mean_std`]: one scan, mean + std for several numeric columns; [`pipeline::DataFrame::feature_wise_mean_std`] for Polars.
284//! - [`processing::arg_max_row`], [`processing::arg_min_row`], [`processing::top_k_by_frequency`]: row extrema and label top‑k.
285
286pub mod cdc;
287pub mod error;
288pub mod execution;
289pub mod ingestion;
290pub mod outliers;
291pub mod pipeline;
292pub mod processing;
293pub mod profiling;
294#[cfg(feature = "sql")]
295pub mod sql;
296pub mod transform;
297pub mod types;
298pub mod validation;
299
300pub use error::{IngestionError, IngestionResult};