1use crate::error::{IngestionError, IngestionResult};
34use crate::pipeline::DataFrame;
35use crate::types::{DataSet, Value};
36
37use polars::prelude::*;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
41pub enum Severity {
42 Info,
43 Warn,
44 Error,
45}
46
47#[derive(Debug, Clone, PartialEq)]
49pub enum Check {
50 NotNull {
51 column: String,
52 severity: Severity,
53 },
54 RangeF64 {
55 column: String,
56 min: f64,
57 max: f64,
58 severity: Severity,
59 },
60 RegexMatch {
61 column: String,
62 pattern: String,
63 severity: Severity,
64 strict: bool,
66 },
67 InSet {
68 column: String,
69 values: Vec<Value>,
70 severity: Severity,
71 },
72 Unique {
73 column: String,
74 severity: Severity,
75 },
76}
77
78#[derive(Debug, Clone, PartialEq)]
80pub struct ValidationSpec {
81 pub checks: Vec<Check>,
82 pub max_examples: usize,
84}
85
86impl ValidationSpec {
87 pub fn new(checks: Vec<Check>) -> Self {
88 Self {
89 checks,
90 max_examples: 5,
91 }
92 }
93}
94
95#[derive(Debug, Clone, PartialEq)]
96pub struct ValidationSummary {
97 pub total_checks: usize,
98 pub failed_checks: usize,
99 pub max_severity: Option<Severity>,
100}
101
102#[derive(Debug, Clone, PartialEq)]
103pub struct CheckResult {
104 pub check: Check,
105 pub failed_count: usize,
106 pub examples: Vec<Value>,
107 pub message: String,
108}
109
110#[derive(Debug, Clone, PartialEq)]
111pub struct ValidationReport {
112 pub results: Vec<CheckResult>,
113 pub summary: ValidationSummary,
114}
115
116pub fn validate_dataset(ds: &DataSet, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
117 let df = DataFrame::from_dataset(ds)?;
118 validate_frame(&df, spec)
119}
120
121pub fn validate_frame(df: &DataFrame, spec: &ValidationSpec) -> IngestionResult<ValidationReport> {
122 if spec.checks.is_empty() {
123 return Ok(ValidationReport {
124 results: Vec::new(),
125 summary: ValidationSummary {
126 total_checks: 0,
127 failed_checks: 0,
128 max_severity: None,
129 },
130 });
131 }
132
133 let lf = df.lazy_clone();
135 let mut exprs: Vec<Expr> = Vec::with_capacity(spec.checks.len());
136
137 for (i, chk) in spec.checks.iter().enumerate() {
138 exprs.push(fail_count_expr(chk).alias(&fail_count_col_name(i)));
139 }
140
141 let agg = lf.select(exprs).collect().map_err(|e| {
142 crate::ingestion::polars_bridge::polars_error_to_ingestion(
143 "failed to compute validation counts",
144 e,
145 )
146 })?;
147
148 let mut results: Vec<CheckResult> = Vec::with_capacity(spec.checks.len());
149 let mut failed_checks = 0usize;
150 let mut max_sev: Option<Severity> = None;
151
152 for (i, chk) in spec.checks.iter().cloned().enumerate() {
153 let col = agg.column(&fail_count_col_name(i)).map_err(|e| {
154 crate::ingestion::polars_bridge::polars_error_to_ingestion(
155 "validation missing agg column",
156 e,
157 )
158 })?;
159 let failed_count = series_to_usize(col.as_materialized_series())?.unwrap_or(0);
160
161 if failed_count > 0 {
162 failed_checks += 1;
163 let sev = severity_of(&chk);
164 max_sev = Some(max_sev.map(|s| s.max(sev)).unwrap_or(sev));
165 }
166
167 let examples = if failed_count > 0 && spec.max_examples > 0 {
168 collect_examples(df, &chk, spec.max_examples).unwrap_or_default()
169 } else {
170 Vec::new()
171 };
172
173 results.push(CheckResult {
174 message: default_message(&chk, failed_count),
175 check: chk,
176 failed_count,
177 examples,
178 });
179 }
180
181 Ok(ValidationReport {
182 summary: ValidationSummary {
183 total_checks: spec.checks.len(),
184 failed_checks,
185 max_severity: max_sev,
186 },
187 results,
188 })
189}
190
191pub fn render_validation_report_json(rep: &ValidationReport) -> IngestionResult<String> {
192 let results: Vec<serde_json::Value> = rep
193 .results
194 .iter()
195 .map(|r| {
196 serde_json::json!({
197 "check": format!("{:?}", r.check),
198 "failed_count": r.failed_count,
199 "examples": r.examples.iter().map(value_to_json).collect::<Vec<_>>(),
200 "message": r.message,
201 })
202 })
203 .collect();
204
205 serde_json::to_string_pretty(&serde_json::json!({
206 "summary": {
207 "total_checks": rep.summary.total_checks,
208 "failed_checks": rep.summary.failed_checks,
209 "max_severity": rep.summary.max_severity.map(|s| format!("{s:?}")),
210 },
211 "results": results,
212 }))
213 .map_err(|e| IngestionError::SchemaMismatch {
214 message: format!("failed to serialize validation report json: {e}"),
215 })
216}
217
218pub fn render_validation_report_markdown(rep: &ValidationReport) -> String {
219 let mut out = String::new();
220 out.push_str("## Validation report\n\n");
221 out.push_str(&format!(
222 "- Total checks: **{}**\n- Failed checks: **{}**\n\n",
223 rep.summary.total_checks, rep.summary.failed_checks
224 ));
225
226 out.push_str("### Results\n\n");
227 for r in &rep.results {
228 let status = if r.failed_count == 0 { "PASS" } else { "FAIL" };
229 out.push_str(&format!("- **{status}**: `{}`\n", format!("{:?}", r.check)));
230 out.push_str(&format!(" - Failed: **{}**\n", r.failed_count));
231 out.push_str(&format!(" - Message: {}\n", r.message));
232 if !r.examples.is_empty() {
233 out.push_str(" - Examples:\n");
234 for ex in &r.examples {
235 out.push_str(&format!(" - `{}`\n", format!("{ex:?}")));
236 }
237 }
238 }
239 out
240}
241
242fn fail_count_col_name(i: usize) -> String {
243 format!("__fail_{i}")
244}
245
246fn severity_of(chk: &Check) -> Severity {
247 match chk {
248 Check::NotNull { severity, .. }
249 | Check::RangeF64 { severity, .. }
250 | Check::RegexMatch { severity, .. }
251 | Check::InSet { severity, .. }
252 | Check::Unique { severity, .. } => *severity,
253 }
254}
255
256fn default_message(chk: &Check, failed: usize) -> String {
257 match chk {
258 Check::NotNull { column, .. } => format!("column '{column}' has {failed} null(s)"),
259 Check::RangeF64 {
260 column, min, max, ..
261 } => {
262 format!("column '{column}' has {failed} value(s) outside [{min}, {max}]")
263 }
264 Check::RegexMatch {
265 column, pattern, ..
266 } => {
267 format!("column '{column}' has {failed} value(s) not matching /{pattern}/")
268 }
269 Check::InSet { column, .. } => {
270 format!("column '{column}' has {failed} value(s) not in set")
271 }
272 Check::Unique { column, .. } => {
273 format!("column '{column}' has {failed} duplicate(s) among non-null values")
274 }
275 }
276}
277
278fn fail_count_expr(chk: &Check) -> Expr {
279 match chk {
280 Check::NotNull { column, .. } => col(column).is_null().sum(),
281 Check::RangeF64 {
282 column, min, max, ..
283 } => (col(column).lt(lit(*min)).or(col(column).gt(lit(*max)))).sum(),
284 Check::RegexMatch {
285 column,
286 pattern,
287 strict,
288 ..
289 } => col(column)
290 .cast(DataType::String)
291 .str()
292 .contains(lit(pattern.clone()), *strict)
293 .not()
294 .sum(),
295 Check::InSet { column, values, .. } => {
296 let set_expr = lit(values_to_series(values));
297 col(column).is_in(set_expr, false).not().sum()
298 }
299 Check::Unique { column, .. } => {
300 let non_null = col(column).is_not_null().sum();
302 let unique = col(column).drop_nulls().n_unique();
303 (non_null - unique).alias("__dup")
304 }
305 }
306}
307
308fn values_to_series(values: &[Value]) -> Series {
309 if values.is_empty() {
311 return Series::new("set".into(), Vec::<i64>::new());
312 }
313 match &values[0] {
314 Value::Int64(_) => {
315 let mut v: Vec<i64> = Vec::with_capacity(values.len());
316 for x in values {
317 if let Value::Int64(i) = x {
318 v.push(*i);
319 }
320 }
321 Series::new("set".into(), v)
322 }
323 Value::Bool(_) => {
324 let mut v: Vec<bool> = Vec::with_capacity(values.len());
325 for x in values {
326 if let Value::Bool(b) = x {
327 v.push(*b);
328 }
329 }
330 Series::new("set".into(), v)
331 }
332 Value::Utf8(_) => {
333 let mut v: Vec<String> = Vec::with_capacity(values.len());
334 for x in values {
335 if let Value::Utf8(s) = x {
336 v.push(s.clone());
337 }
338 }
339 Series::new("set".into(), v)
340 }
341 Value::Float64(_) | Value::Null => Series::new("set".into(), Vec::<String>::new()),
342 }
343}
344
345fn series_to_usize(s: &Series) -> IngestionResult<Option<usize>> {
346 let av = s.get(0).map_err(|e| IngestionError::Engine {
347 message: "failed to read validation value".to_string(),
348 source: Box::new(e),
349 })?;
350 Ok(match av {
351 AnyValue::Null => None,
352 AnyValue::Int64(v) => Some(v.max(0) as usize),
353 AnyValue::UInt64(v) => Some(v as usize),
354 AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
355 AnyValue::UInt32(v) => Some(v as usize),
356 other => {
357 return Err(IngestionError::SchemaMismatch {
358 message: format!("expected integer-like validation value, got {other}"),
359 });
360 }
361 })
362}
363
364fn collect_examples(
365 df: &DataFrame,
366 chk: &Check,
367 max_examples: usize,
368) -> IngestionResult<Vec<Value>> {
369 let mut lf = df.lazy_clone();
370 let (col_name, predicate) = match chk {
371 Check::NotNull { column, .. } => (column.as_str(), col(column).is_null()),
372 Check::RangeF64 {
373 column, min, max, ..
374 } => (
375 column.as_str(),
376 col(column).lt(lit(*min)).or(col(column).gt(lit(*max))),
377 ),
378 Check::RegexMatch {
379 column,
380 pattern,
381 strict,
382 ..
383 } => (
384 column.as_str(),
385 col(column)
386 .cast(DataType::String)
387 .str()
388 .contains(lit(pattern.clone()), *strict)
389 .not(),
390 ),
391 Check::InSet { column, values, .. } => (
392 column.as_str(),
393 col(column)
394 .is_in(lit(values_to_series(values)), false)
395 .not(),
396 ),
397 Check::Unique { .. } => return Ok(Vec::new()), };
399
400 lf = lf
401 .filter(predicate)
402 .select([col(col_name)])
403 .limit(max_examples as IdxSize);
404 let out = lf.collect().map_err(|e| {
405 crate::ingestion::polars_bridge::polars_error_to_ingestion(
406 "failed to collect validation examples",
407 e,
408 )
409 })?;
410
411 let s = out
412 .column(col_name)
413 .map_err(|e| {
414 crate::ingestion::polars_bridge::polars_error_to_ingestion(
415 "missing validation example column",
416 e,
417 )
418 })?
419 .as_materialized_series()
420 .clone();
421
422 let mut ex = Vec::new();
423 for i in 0..usize::min(max_examples, s.len()) {
424 let v = s.get(i).map_err(|e| IngestionError::Engine {
425 message: "failed to read validation example".to_string(),
426 source: Box::new(e),
427 })?;
428 ex.push(any_to_value(v));
429 }
430 Ok(ex)
431}
432
433fn any_to_value(v: AnyValue) -> Value {
434 match v {
435 AnyValue::Null => Value::Null,
436 AnyValue::Boolean(b) => Value::Bool(b),
437 AnyValue::Int64(i) => Value::Int64(i),
438 AnyValue::Float64(x) => Value::Float64(x),
439 AnyValue::String(s) => Value::Utf8(s.to_string()),
440 AnyValue::StringOwned(s) => Value::Utf8(s.to_string()),
441 other => Value::Utf8(other.to_string()),
442 }
443}
444
445fn value_to_json(v: &Value) -> serde_json::Value {
446 match v {
447 Value::Null => serde_json::Value::Null,
448 Value::Int64(i) => serde_json::json!(i),
449 Value::Float64(x) => serde_json::json!(x),
450 Value::Bool(b) => serde_json::json!(b),
451 Value::Utf8(s) => serde_json::json!(s),
452 }
453}
454
455#[cfg(test)]
456mod tests {
457 use super::*;
458 use crate::types::{DataType, Field, Schema};
459
460 fn sample() -> DataSet {
461 DataSet::new(
462 Schema::new(vec![
463 Field::new("id", DataType::Int64),
464 Field::new("name", DataType::Utf8),
465 Field::new("score", DataType::Float64),
466 ]),
467 vec![
468 vec![
469 Value::Int64(1),
470 Value::Utf8("Ada".to_string()),
471 Value::Float64(10.0),
472 ],
473 vec![Value::Int64(2), Value::Null, Value::Float64(200.0)],
474 vec![
475 Value::Int64(2),
476 Value::Utf8("Bob".to_string()),
477 Value::Float64(5.0),
478 ],
479 ],
480 )
481 }
482
483 #[test]
484 fn validation_counts_failures_and_renders_reports() {
485 let ds = sample();
486 let spec = ValidationSpec::new(vec![
487 Check::NotNull {
488 column: "name".to_string(),
489 severity: Severity::Error,
490 },
491 Check::RangeF64 {
492 column: "score".to_string(),
493 min: 0.0,
494 max: 100.0,
495 severity: Severity::Warn,
496 },
497 Check::Unique {
498 column: "id".to_string(),
499 severity: Severity::Error,
500 },
501 ]);
502
503 let rep = validate_dataset(&ds, &spec).unwrap();
504 assert_eq!(rep.summary.total_checks, 3);
505 assert!(rep.summary.failed_checks >= 1);
506
507 let json = render_validation_report_json(&rep).unwrap();
508 assert!(json.contains("\"results\""));
509
510 let md = render_validation_report_markdown(&rep);
511 assert!(md.contains("## Validation report"));
512 }
513}