rust_data_processing/validation/
mod.rs

1//! Validation (Phase 1).
2//!
3//! A small validation DSL that compiles checks to Polars expressions (via our pipeline) while keeping
4//! the public API in crate-owned types.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use rust_data_processing::validation::{validate_dataset, Check, Severity, ValidationSpec};
10//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
11//!
12//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
13//! let ds = DataSet::new(
14//!     Schema::new(vec![
15//!         Field::new("id", DataType::Int64),
16//!         Field::new("name", DataType::Utf8),
17//!     ]),
18//!     vec![
19//!         vec![Value::Int64(1), Value::Utf8("Ada".to_string())],
20//!         vec![Value::Int64(2), Value::Null],
21//!     ],
22//! );
23//!
24//! let spec = ValidationSpec::new(vec![
25//!     Check::NotNull { column: "name".to_string(), severity: Severity::Error },
26//! ]);
27//! let rep = validate_dataset(&ds, &spec)?;
28//! assert_eq!(rep.summary.failed_checks, 1);
29//! # Ok(())
30//! # }
31//! ```
32
33use crate::error::{IngestionError, IngestionResult};
34use crate::pipeline::DataFrame;
35use crate::types::{DataSet, Value};
36
37use polars::prelude::*;
38
39/// Severity for a validation check.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
41pub enum Severity {
42    Info,
43    Warn,
44    Error,
45}
46
47/// A single validation check.
48#[derive(Debug, Clone, PartialEq)]
49pub enum Check {
50    NotNull {
51        column: String,
52        severity: Severity,
53    },
54    RangeF64 {
55        column: String,
56        min: f64,
57        max: f64,
58        severity: Severity,
59    },
60    RegexMatch {
61        column: String,
62        pattern: String,
63        severity: Severity,
64        /// If true, invalid regex patterns become errors; if false, invalid regex evaluates to false.
65        strict: bool,
66    },
67    InSet {
68        column: String,
69        values: Vec<Value>,
70        severity: Severity,
71    },
72    Unique {
73        column: String,
74        severity: Severity,
75    },
76}
77
78/// A collection of checks.
79#[derive(Debug, Clone, PartialEq)]
80pub struct ValidationSpec {
81    pub checks: Vec<Check>,
82    /// Maximum number of example values to include for failing checks.
83    pub max_examples: usize,
84}
85
86impl ValidationSpec {
87    pub fn new(checks: Vec<Check>) -> Self {
88        Self {
89            checks,
90            max_examples: 5,
91        }
92    }
93}
94
95#[derive(Debug, Clone, PartialEq)]
96pub struct ValidationSummary {
97    pub total_checks: usize,
98    pub failed_checks: usize,
99    pub max_severity: Option<Severity>,
100}
101
102#[derive(Debug, Clone, PartialEq)]
103pub struct CheckResult {
104    pub check: Check,
105    pub failed_count: usize,
106    pub examples: Vec<Value>,
107    pub message: String,
108}
109
110#[derive(Debug, Clone, PartialEq)]
111pub struct ValidationReport {
112    pub results: Vec<CheckResult>,
113    pub summary: ValidationSummary,
114}
115
116pub fn validate_dataset(ds: &DataSet, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
117    let df = DataFrame::from_dataset(ds)?;
118    validate_frame(&df, spec)
119}
120
121pub fn validate_frame(df: &DataFrame, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
122    if spec.checks.is_empty() {
123        return Ok(ValidationReport {
124            results: Vec::new(),
125            summary: ValidationSummary {
126                total_checks: 0,
127                failed_checks: 0,
128                max_severity: None,
129            },
130        });
131    }
132
133    // One-shot aggregation to compute failed counts.
134    let lf = df.lazy_clone();
135    let mut exprs: Vec<Expr> = Vec::with_capacity(spec.checks.len());
136
137    for (i, chk) in spec.checks.iter().enumerate() {
138        exprs.push(fail_count_expr(chk).alias(&fail_count_col_name(i)));
139    }
140
141    let agg = lf.select(exprs).collect().map_err(|e| {
142        crate::ingestion::polars_bridge::polars_error_to_ingestion(
143            "failed to compute validation counts",
144            e,
145        )
146    })?;
147
148    let mut results: Vec<CheckResult> = Vec::with_capacity(spec.checks.len());
149    let mut failed_checks = 0usize;
150    let mut max_sev: Option<Severity> = None;
151
152    for (i, chk) in spec.checks.iter().cloned().enumerate() {
153        let col = agg.column(&fail_count_col_name(i)).map_err(|e| {
154            crate::ingestion::polars_bridge::polars_error_to_ingestion(
155                "validation missing agg column",
156                e,
157            )
158        })?;
159        let failed_count = series_to_usize(col.as_materialized_series())?.unwrap_or(0);
160
161        if failed_count > 0 {
162            failed_checks += 1;
163            let sev = severity_of(&chk);
164            max_sev = Some(max_sev.map(|s| s.max(sev)).unwrap_or(sev));
165        }
166
167        let examples = if failed_count > 0 && spec.max_examples > 0 {
168            collect_examples(df, &chk, spec.max_examples).unwrap_or_default()
169        } else {
170            Vec::new()
171        };
172
173        results.push(CheckResult {
174            message: default_message(&chk, failed_count),
175            check: chk,
176            failed_count,
177            examples,
178        });
179    }
180
181    Ok(ValidationReport {
182        summary: ValidationSummary {
183            total_checks: spec.checks.len(),
184            failed_checks,
185            max_severity: max_sev,
186        },
187        results,
188    })
189}
190
191pub fn render_validation_report_json(rep: &ValidationReport) -> IngestionResult<String> {
192    let results: Vec<serde_json::Value> = rep
193        .results
194        .iter()
195        .map(|r| {
196            serde_json::json!({
197                "check": format!("{:?}", r.check),
198                "failed_count": r.failed_count,
199                "examples": r.examples.iter().map(value_to_json).collect::<Vec<_>>(),
200                "message": r.message,
201            })
202        })
203        .collect();
204
205    serde_json::to_string_pretty(&serde_json::json!({
206        "summary": {
207            "total_checks": rep.summary.total_checks,
208            "failed_checks": rep.summary.failed_checks,
209            "max_severity": rep.summary.max_severity.map(|s| format!("{s:?}")),
210        },
211        "results": results,
212    }))
213    .map_err(|e| IngestionError::SchemaMismatch {
214        message: format!("failed to serialize validation report json: {e}"),
215    })
216}
217
218pub fn render_validation_report_markdown(rep: &ValidationReport) -> String {
219    let mut out = String::new();
220    out.push_str("## Validation report\n\n");
221    out.push_str(&format!(
222        "- Total checks: **{}**\n- Failed checks: **{}**\n\n",
223        rep.summary.total_checks, rep.summary.failed_checks
224    ));
225
226    out.push_str("### Results\n\n");
227    for r in &rep.results {
228        let status = if r.failed_count == 0 { "PASS" } else { "FAIL" };
229        out.push_str(&format!("- **{status}**: `{}`\n", format!("{:?}", r.check)));
230        out.push_str(&format!("  - Failed: **{}**\n", r.failed_count));
231        out.push_str(&format!("  - Message: {}\n", r.message));
232        if !r.examples.is_empty() {
233            out.push_str("  - Examples:\n");
234            for ex in &r.examples {
235                out.push_str(&format!("    - `{}`\n", format!("{ex:?}")));
236            }
237        }
238    }
239    out
240}
241
242fn fail_count_col_name(i: usize) -> String {
243    format!("__fail_{i}")
244}
245
246fn severity_of(chk: &Check) -> Severity {
247    match chk {
248        Check::NotNull { severity, .. }
249        | Check::RangeF64 { severity, .. }
250        | Check::RegexMatch { severity, .. }
251        | Check::InSet { severity, .. }
252        | Check::Unique { severity, .. } => *severity,
253    }
254}
255
256fn default_message(chk: &Check, failed: usize) -> String {
257    match chk {
258        Check::NotNull { column, .. } => format!("column '{column}' has {failed} null(s)"),
259        Check::RangeF64 {
260            column, min, max, ..
261        } => {
262            format!("column '{column}' has {failed} value(s) outside [{min}, {max}]")
263        }
264        Check::RegexMatch {
265            column, pattern, ..
266        } => {
267            format!("column '{column}' has {failed} value(s) not matching /{pattern}/")
268        }
269        Check::InSet { column, .. } => {
270            format!("column '{column}' has {failed} value(s) not in set")
271        }
272        Check::Unique { column, .. } => {
273            format!("column '{column}' has {failed} duplicate(s) among non-null values")
274        }
275    }
276}
277
278fn fail_count_expr(chk: &Check) -> Expr {
279    match chk {
280        Check::NotNull { column, .. } => col(column).is_null().sum(),
281        Check::RangeF64 {
282            column, min, max, ..
283        } => (col(column).lt(lit(*min)).or(col(column).gt(lit(*max)))).sum(),
284        Check::RegexMatch {
285            column,
286            pattern,
287            strict,
288            ..
289        } => col(column)
290            .cast(DataType::String)
291            .str()
292            .contains(lit(pattern.clone()), *strict)
293            .not()
294            .sum(),
295        Check::InSet { column, values, .. } => {
296            let set_expr = lit(values_to_series(values));
297            col(column).is_in(set_expr, false).not().sum()
298        }
299        Check::Unique { column, .. } => {
300            // duplicates among non-null: non_null_count - unique_count
301            let non_null = col(column).is_not_null().sum();
302            let unique = col(column).drop_nulls().n_unique();
303            (non_null - unique).alias("__dup")
304        }
305    }
306}
307
308fn values_to_series(values: &[Value]) -> Series {
309    // We deliberately keep this minimal: enforce all values are same primitive type.
310    if values.is_empty() {
311        return Series::new("set".into(), Vec::<i64>::new());
312    }
313    match &values[0] {
314        Value::Int64(_) => {
315            let mut v: Vec<i64> = Vec::with_capacity(values.len());
316            for x in values {
317                if let Value::Int64(i) = x {
318                    v.push(*i);
319                }
320            }
321            Series::new("set".into(), v)
322        }
323        Value::Bool(_) => {
324            let mut v: Vec<bool> = Vec::with_capacity(values.len());
325            for x in values {
326                if let Value::Bool(b) = x {
327                    v.push(*b);
328                }
329            }
330            Series::new("set".into(), v)
331        }
332        Value::Utf8(_) => {
333            let mut v: Vec<String> = Vec::with_capacity(values.len());
334            for x in values {
335                if let Value::Utf8(s) = x {
336                    v.push(s.clone());
337                }
338            }
339            Series::new("set".into(), v)
340        }
341        Value::Float64(_) | Value::Null => Series::new("set".into(), Vec::<String>::new()),
342    }
343}
344
345fn series_to_usize(s: &Series) -> IngestionResult<Option<usize>> {
346    let av = s.get(0).map_err(|e| IngestionError::Engine {
347        message: "failed to read validation value".to_string(),
348        source: Box::new(e),
349    })?;
350    Ok(match av {
351        AnyValue::Null => None,
352        AnyValue::Int64(v) => Some(v.max(0) as usize),
353        AnyValue::UInt64(v) => Some(v as usize),
354        AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
355        AnyValue::UInt32(v) => Some(v as usize),
356        other => {
357            return Err(IngestionError::SchemaMismatch {
358                message: format!("expected integer-like validation value, got {other}"),
359            });
360        }
361    })
362}
363
364fn collect_examples(
365    df: &DataFrame,
366    chk: &Check,
367    max_examples: usize,
368) -> IngestionResult<Vec<Value>> {
369    let mut lf = df.lazy_clone();
370    let (col_name, predicate) = match chk {
371        Check::NotNull { column, .. } => (column.as_str(), col(column).is_null()),
372        Check::RangeF64 {
373            column, min, max, ..
374        } => (
375            column.as_str(),
376            col(column).lt(lit(*min)).or(col(column).gt(lit(*max))),
377        ),
378        Check::RegexMatch {
379            column,
380            pattern,
381            strict,
382            ..
383        } => (
384            column.as_str(),
385            col(column)
386                .cast(DataType::String)
387                .str()
388                .contains(lit(pattern.clone()), *strict)
389                .not(),
390        ),
391        Check::InSet { column, values, .. } => (
392            column.as_str(),
393            col(column)
394                .is_in(lit(values_to_series(values)), false)
395                .not(),
396        ),
397        Check::Unique { .. } => return Ok(Vec::new()), // examples for duplicates would require group-by; skip in Phase 1
398    };
399
400    lf = lf
401        .filter(predicate)
402        .select([col(col_name)])
403        .limit(max_examples as IdxSize);
404    let out = lf.collect().map_err(|e| {
405        crate::ingestion::polars_bridge::polars_error_to_ingestion(
406            "failed to collect validation examples",
407            e,
408        )
409    })?;
410
411    let s = out
412        .column(col_name)
413        .map_err(|e| {
414            crate::ingestion::polars_bridge::polars_error_to_ingestion(
415                "missing validation example column",
416                e,
417            )
418        })?
419        .as_materialized_series()
420        .clone();
421
422    let mut ex = Vec::new();
423    for i in 0..usize::min(max_examples, s.len()) {
424        let v = s.get(i).map_err(|e| IngestionError::Engine {
425            message: "failed to read validation example".to_string(),
426            source: Box::new(e),
427        })?;
428        ex.push(any_to_value(v));
429    }
430    Ok(ex)
431}
432
433fn any_to_value(v: AnyValue) -> Value {
434    match v {
435        AnyValue::Null => Value::Null,
436        AnyValue::Boolean(b) => Value::Bool(b),
437        AnyValue::Int64(i) => Value::Int64(i),
438        AnyValue::Float64(x) => Value::Float64(x),
439        AnyValue::String(s) => Value::Utf8(s.to_string()),
440        AnyValue::StringOwned(s) => Value::Utf8(s.to_string()),
441        other => Value::Utf8(other.to_string()),
442    }
443}
444
445fn value_to_json(v: &Value) -> serde_json::Value {
446    match v {
447        Value::Null => serde_json::Value::Null,
448        Value::Int64(i) => serde_json::json!(i),
449        Value::Float64(x) => serde_json::json!(x),
450        Value::Bool(b) => serde_json::json!(b),
451        Value::Utf8(s) => serde_json::json!(s),
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458    use crate::types::{DataType, Field, Schema};
459
460    fn sample() -> DataSet {
461        DataSet::new(
462            Schema::new(vec![
463                Field::new("id", DataType::Int64),
464                Field::new("name", DataType::Utf8),
465                Field::new("score", DataType::Float64),
466            ]),
467            vec![
468                vec![
469                    Value::Int64(1),
470                    Value::Utf8("Ada".to_string()),
471                    Value::Float64(10.0),
472                ],
473                vec![Value::Int64(2), Value::Null, Value::Float64(200.0)],
474                vec![
475                    Value::Int64(2),
476                    Value::Utf8("Bob".to_string()),
477                    Value::Float64(5.0),
478                ],
479            ],
480        )
481    }
482
483    #[test]
484    fn validation_counts_failures_and_renders_reports() {
485        let ds = sample();
486        let spec = ValidationSpec::new(vec![
487            Check::NotNull {
488                column: "name".to_string(),
489                severity: Severity::Error,
490            },
491            Check::RangeF64 {
492                column: "score".to_string(),
493                min: 0.0,
494                max: 100.0,
495                severity: Severity::Warn,
496            },
497            Check::Unique {
498                column: "id".to_string(),
499                severity: Severity::Error,
500            },
501        ]);
502
503        let rep = validate_dataset(&ds, &spec).unwrap();
504        assert_eq!(rep.summary.total_checks, 3);
505        assert!(rep.summary.failed_checks >= 1);
506
507        let json = render_validation_report_json(&rep).unwrap();
508        assert!(json.contains("\"results\""));
509
510        let md = render_validation_report_markdown(&rep);
511        assert!(md.contains("## Validation report"));
512    }
513}