rust_data_processing/profiling/
mod.rs

1//! Profiling (Phase 1).
2//!
3//! A small, engine-delegated profiler that computes common column metrics using Polars under the hood,
4//! while keeping the public API in crate-owned types.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use rust_data_processing::profiling::{profile_dataset, ProfileOptions, SamplingMode};
10//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
11//!
12//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
13//! let ds = DataSet::new(
14//!     Schema::new(vec![Field::new("score", DataType::Float64)]),
15//!     vec![vec![Value::Float64(1.0)], vec![Value::Null], vec![Value::Float64(3.0)]],
16//! );
17//!
18//! let rep = profile_dataset(
19//!     &ds,
20//!     &ProfileOptions {
21//!         sampling: SamplingMode::Head(2),
22//!         quantiles: vec![0.5],
23//!     },
24//! )?;
25//!
26//! assert_eq!(rep.row_count, 2);
27//! assert_eq!(rep.columns[0].null_count, 1);
28//! # Ok(())
29//! # }
30//! ```
31
32use crate::error::{IngestionError, IngestionResult};
33use crate::pipeline::DataFrame;
34use crate::types::{DataSet, DataType};
35
36use polars::prelude::*;
37
38/// How profiling should sample rows before computing metrics.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SamplingMode {
41    /// Profile the full dataset.
42    Full,
43    /// Profile only the first N rows.
44    Head(usize),
45}
46
47impl Default for SamplingMode {
48    fn default() -> Self {
49        Self::Full
50    }
51}
52
53/// Options for profiling.
54#[derive(Debug, Clone, PartialEq)]
55pub struct ProfileOptions {
56    pub sampling: SamplingMode,
57    /// Quantiles to compute for numeric columns (values in [0.0, 1.0]).
58    pub quantiles: Vec<f64>,
59}
60
61impl Default for ProfileOptions {
62    fn default() -> Self {
63        Self {
64            sampling: SamplingMode::Full,
65            quantiles: vec![0.5, 0.95],
66        }
67    }
68}
69
70#[derive(Debug, Clone, PartialEq)]
71pub struct NumericProfile {
72    pub min: Option<f64>,
73    pub max: Option<f64>,
74    pub mean: Option<f64>,
75    pub quantiles: Vec<(f64, Option<f64>)>,
76}
77
78#[derive(Debug, Clone, PartialEq)]
79pub struct ColumnProfile {
80    pub name: String,
81    pub data_type: DataType,
82    pub null_count: usize,
83    pub distinct_count: usize,
84    pub numeric: Option<NumericProfile>,
85}
86
87#[derive(Debug, Clone, PartialEq)]
88pub struct ProfileReport {
89    pub sampling: SamplingMode,
90    /// Row count of the profiled (possibly sampled) data.
91    pub row_count: usize,
92    pub columns: Vec<ColumnProfile>,
93}
94
95/// Render a profile report to a stable JSON string.
96pub fn render_profile_report_json(report: &ProfileReport) -> IngestionResult<String> {
97    let sampling = match report.sampling {
98        SamplingMode::Full => "full",
99        SamplingMode::Head(_) => "head",
100    };
101
102    let cols: Vec<serde_json::Value> = report
103        .columns
104        .iter()
105        .map(|c| {
106            let dtype = match c.data_type {
107                DataType::Int64 => "int64",
108                DataType::Float64 => "float64",
109                DataType::Bool => "bool",
110                DataType::Utf8 => "utf8",
111            };
112            let numeric = c.numeric.as_ref().map(|n| {
113                serde_json::json!({
114                    "min": n.min,
115                    "max": n.max,
116                    "mean": n.mean,
117                    "quantiles": n.quantiles.iter().map(|(q, v)| serde_json::json!({"q": q, "value": v})).collect::<Vec<_>>(),
118                })
119            });
120
121            serde_json::json!({
122                "name": c.name,
123                "data_type": dtype,
124                "null_count": c.null_count,
125                "distinct_count": c.distinct_count,
126                "numeric": numeric,
127            })
128        })
129        .collect();
130
131    serde_json::to_string_pretty(&serde_json::json!({
132        "sampling": sampling,
133        "row_count": report.row_count,
134        "columns": cols,
135    }))
136    .map_err(|e| IngestionError::SchemaMismatch {
137        message: format!("failed to serialize profile report json: {e}"),
138    })
139}
140
141/// Render a profile report to a human-readable Markdown string.
142pub fn render_profile_report_markdown(report: &ProfileReport) -> String {
143    let sampling = match report.sampling {
144        SamplingMode::Full => "Full",
145        SamplingMode::Head(n) => {
146            return format!(
147                "## Profile report\n\n- Sampling: **Head({n})**\n- Rows profiled: **{}**\n\n{}",
148                report.row_count,
149                render_columns_markdown(&report.columns)
150            );
151        }
152    };
153
154    format!(
155        "## Profile report\n\n- Sampling: **{sampling}**\n- Rows profiled: **{}**\n\n{}",
156        report.row_count,
157        render_columns_markdown(&report.columns)
158    )
159}
160
161fn render_columns_markdown(cols: &[ColumnProfile]) -> String {
162    let mut out = String::new();
163    out.push_str("### Columns\n\n");
164    out.push_str("| column | type | nulls | distinct (non-null) | min | max | mean |\n");
165    out.push_str("|---|---:|---:|---:|---:|---:|---:|\n");
166    for c in cols {
167        let dtype = match c.data_type {
168            DataType::Int64 => "Int64",
169            DataType::Float64 => "Float64",
170            DataType::Bool => "Bool",
171            DataType::Utf8 => "Utf8",
172        };
173        let (min, max, mean) = match &c.numeric {
174            Some(n) => (
175                n.min
176                    .map(|v| format!("{v:.4}"))
177                    .unwrap_or_else(|| "—".to_string()),
178                n.max
179                    .map(|v| format!("{v:.4}"))
180                    .unwrap_or_else(|| "—".to_string()),
181                n.mean
182                    .map(|v| format!("{v:.4}"))
183                    .unwrap_or_else(|| "—".to_string()),
184            ),
185            None => ("—".to_string(), "—".to_string(), "—".to_string()),
186        };
187        out.push_str(&format!(
188            "| `{}` | {} | {} | {} | {} | {} | {} |\n",
189            c.name, dtype, c.null_count, c.distinct_count, min, max, mean
190        ));
191    }
192    out
193}
194
195/// Profile an in-memory dataset.
196pub fn profile_dataset(ds: &DataSet, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
197    let df = DataFrame::from_dataset(ds)?;
198    profile_frame(&df, options)
199}
200
201/// Profile a pipeline frame (computed lazily).
202pub fn profile_frame(df: &DataFrame, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
203    let mut lf = df.lazy_clone();
204
205    lf = match options.sampling {
206        SamplingMode::Full => lf,
207        SamplingMode::Head(n) => lf.limit(n as IdxSize),
208    };
209
210    let schema = lf.clone().collect_schema().map_err(|e| {
211        crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to collect schema", e)
212    })?;
213
214    let cols: Vec<(String, DataType, bool)> = schema
215        .iter_fields()
216        .map(|f| {
217            let (dt, is_numeric) = polars_dtype_to_profile_dtype(f.dtype());
218            (f.name().to_string(), dt, is_numeric)
219        })
220        .collect();
221
222    if cols.is_empty() {
223        return Ok(ProfileReport {
224            sampling: options.sampling,
225            row_count: 0,
226            columns: Vec::new(),
227        });
228    }
229
230    // Build a single-row aggregation over the (optionally sampled) LazyFrame.
231    let mut exprs: Vec<Expr> = Vec::new();
232    exprs.push(len().alias("__rows"));
233
234    for (name, _dt, is_numeric) in &cols {
235        exprs.push(col(name).null_count().alias(&format!("{name}__nulls")));
236        // Distinct count excluding nulls (common profiling expectation).
237        exprs.push(
238            col(name)
239                .drop_nulls()
240                .n_unique()
241                .alias(&format!("{name}__distinct")),
242        );
243        if *is_numeric {
244            exprs.push(col(name).min().alias(&format!("{name}__min")));
245            exprs.push(col(name).max().alias(&format!("{name}__max")));
246            exprs.push(col(name).mean().alias(&format!("{name}__mean")));
247            for q in &options.quantiles {
248                if !(0.0..=1.0).contains(q) {
249                    return Err(IngestionError::SchemaMismatch {
250                        message: format!("invalid quantile {q}; expected value in [0.0, 1.0]"),
251                    });
252                }
253                let pct = (q * 100.0).round() as i64;
254                exprs.push(
255                    col(name)
256                        .quantile(lit(*q), QuantileMethod::Nearest)
257                        .alias(&format!("{name}__p{pct}")),
258                );
259            }
260        }
261    }
262
263    let agg = lf.select(exprs).collect().map_err(|e| {
264        crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to compute profile", e)
265    })?;
266
267    let row_count_col = agg.column("__rows").map_err(|e| {
268        crate::ingestion::polars_bridge::polars_error_to_ingestion(
269            "profiling missing __rows column",
270            e,
271        )
272    })?;
273    let row_count = any_to_usize(row_count_col.as_materialized_series(), 0)?.unwrap_or(0);
274
275    let mut out_cols: Vec<ColumnProfile> = Vec::with_capacity(cols.len());
276    for (name, dt, is_numeric) in cols {
277        let nulls_col = agg.column(&format!("{name}__nulls")).map_err(|e| {
278            crate::ingestion::polars_bridge::polars_error_to_ingestion(
279                &format!("profiling missing null_count for '{name}'"),
280                e,
281            )
282        })?;
283        let null_count = any_to_usize(nulls_col.as_materialized_series(), 0)?.unwrap_or(0);
284
285        let distinct_col = agg.column(&format!("{name}__distinct")).map_err(|e| {
286            crate::ingestion::polars_bridge::polars_error_to_ingestion(
287                &format!("profiling missing distinct_count for '{name}'"),
288                e,
289            )
290        })?;
291        let distinct_count = any_to_usize(distinct_col.as_materialized_series(), 0)?.unwrap_or(0);
292
293        let numeric = if is_numeric {
294            let min = any_to_f64(
295                agg.column(&format!("{name}__min"))
296                    .map_err(|e| {
297                        crate::ingestion::polars_bridge::polars_error_to_ingestion(
298                            "profiling missing min",
299                            e,
300                        )
301                    })?
302                    .as_materialized_series(),
303                0,
304            )?;
305            let max = any_to_f64(
306                agg.column(&format!("{name}__max"))
307                    .map_err(|e| {
308                        crate::ingestion::polars_bridge::polars_error_to_ingestion(
309                            "profiling missing max",
310                            e,
311                        )
312                    })?
313                    .as_materialized_series(),
314                0,
315            )?;
316            let mean = any_to_f64(
317                agg.column(&format!("{name}__mean"))
318                    .map_err(|e| {
319                        crate::ingestion::polars_bridge::polars_error_to_ingestion(
320                            "profiling missing mean",
321                            e,
322                        )
323                    })?
324                    .as_materialized_series(),
325                0,
326            )?;
327            let mut qs = Vec::with_capacity(options.quantiles.len());
328            for q in &options.quantiles {
329                let pct = (q * 100.0).round() as i64;
330                let v = any_to_f64(
331                    agg.column(&format!("{name}__p{pct}"))
332                        .map_err(|e| {
333                            crate::ingestion::polars_bridge::polars_error_to_ingestion(
334                                "profiling missing quantile",
335                                e,
336                            )
337                        })?
338                        .as_materialized_series(),
339                    0,
340                )?;
341                qs.push((*q, v));
342            }
343            Some(NumericProfile {
344                min,
345                max,
346                mean,
347                quantiles: qs,
348            })
349        } else {
350            None
351        };
352
353        out_cols.push(ColumnProfile {
354            name,
355            data_type: dt,
356            null_count,
357            distinct_count,
358            numeric,
359        });
360    }
361
362    Ok(ProfileReport {
363        sampling: options.sampling,
364        row_count,
365        columns: out_cols,
366    })
367}
368
369fn polars_dtype_to_profile_dtype(dt: &polars::datatypes::DataType) -> (DataType, bool) {
370    use polars::datatypes::DataType as P;
371    match dt {
372        P::Boolean => (DataType::Bool, false),
373        P::String => (DataType::Utf8, false),
374        P::Int8 | P::Int16 | P::Int32 | P::Int64 | P::UInt8 | P::UInt16 | P::UInt32 | P::UInt64 => {
375            (DataType::Int64, true)
376        }
377        P::Float32 | P::Float64 => (DataType::Float64, true),
378        _ => (DataType::Utf8, false),
379    }
380}
381
382fn any_to_usize(s: &Series, idx: usize) -> IngestionResult<Option<usize>> {
383    let av = s.get(idx).map_err(|e| IngestionError::Engine {
384        message: "failed to read profile value".to_string(),
385        source: Box::new(e),
386    })?;
387    Ok(match av {
388        AnyValue::Null => None,
389        AnyValue::Int64(v) => Some(v.max(0) as usize),
390        AnyValue::UInt64(v) => Some(v as usize),
391        AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
392        AnyValue::UInt32(v) => Some(v as usize),
393        AnyValue::Int16(v) => Some((v as i64).max(0) as usize),
394        AnyValue::UInt16(v) => Some(v as usize),
395        AnyValue::Int8(v) => Some((v as i64).max(0) as usize),
396        AnyValue::UInt8(v) => Some(v as usize),
397        other => {
398            return Err(IngestionError::SchemaMismatch {
399                message: format!("expected integer-like profile value, got {other}"),
400            });
401        }
402    })
403}
404
405fn any_to_f64(s: &Series, idx: usize) -> IngestionResult<Option<f64>> {
406    let av = s.get(idx).map_err(|e| IngestionError::Engine {
407        message: "failed to read profile value".to_string(),
408        source: Box::new(e),
409    })?;
410    Ok(match av {
411        AnyValue::Null => None,
412        AnyValue::Float64(v) => Some(v),
413        AnyValue::Float32(v) => Some(v as f64),
414        AnyValue::Int64(v) => Some(v as f64),
415        AnyValue::UInt64(v) => Some(v as f64),
416        AnyValue::Int32(v) => Some(v as f64),
417        AnyValue::UInt32(v) => Some(v as f64),
418        other => {
419            return Err(IngestionError::SchemaMismatch {
420                message: format!("expected numeric profile value, got {other}"),
421            });
422        }
423    })
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use crate::types::Value;
430    use crate::types::{Field, Schema};
431
432    fn tiny() -> DataSet {
433        let schema = Schema::new(vec![
434            Field::new("id", DataType::Int64),
435            Field::new("score", DataType::Float64),
436            Field::new("name", DataType::Utf8),
437        ]);
438        DataSet::new(
439            schema,
440            vec![
441                vec![
442                    Value::Int64(1),
443                    Value::Float64(10.0),
444                    Value::Utf8("A".to_string()),
445                ],
446                vec![Value::Int64(2), Value::Null, Value::Utf8("A".to_string())],
447                vec![
448                    Value::Int64(3),
449                    Value::Float64(30.0),
450                    Value::Utf8("B".to_string()),
451                ],
452            ],
453        )
454    }
455
456    #[test]
457    fn profiling_counts_rows_nulls_and_distinct() {
458        let ds = tiny();
459        let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
460        assert_eq!(rep.row_count, 3);
461
462        let score = rep.columns.iter().find(|c| c.name == "score").unwrap();
463        assert_eq!(score.null_count, 1);
464        assert_eq!(score.distinct_count, 2);
465        assert!(score.numeric.is_some());
466
467        let name = rep.columns.iter().find(|c| c.name == "name").unwrap();
468        assert_eq!(name.null_count, 0);
469        assert_eq!(name.distinct_count, 2);
470        assert!(name.numeric.is_none());
471    }
472
473    #[test]
474    fn profiling_supports_head_sampling() {
475        let ds = tiny();
476        let rep = profile_dataset(
477            &ds,
478            &ProfileOptions {
479                sampling: SamplingMode::Head(2),
480                quantiles: vec![0.5],
481            },
482        )
483        .unwrap();
484        assert_eq!(rep.row_count, 2);
485    }
486
487    #[test]
488    fn profile_report_renders_json_and_markdown() {
489        let ds = tiny();
490        let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
491        let json = render_profile_report_json(&rep).unwrap();
492        assert!(json.contains("\"row_count\""));
493        assert!(json.contains("\"columns\""));
494
495        let md = render_profile_report_markdown(&rep);
496        assert!(md.contains("## Profile report"));
497        assert!(md.contains("### Columns"));
498    }
499}