1use crate::error::{IngestionError, IngestionResult};
37use crate::pipeline::DataFrame;
38use crate::profiling::SamplingMode;
39use crate::types::DataSet;
40
41use polars::prelude::*;
42
43#[derive(Debug, Clone, Copy, PartialEq)]
44pub enum OutlierMethod {
45 ZScore { threshold: f64 },
47 Iqr { k: f64 },
49 Mad { threshold: f64 },
51}
52
53#[derive(Debug, Clone, PartialEq)]
54pub struct OutlierOptions {
55 pub sampling: SamplingMode,
56 pub max_examples: usize,
57}
58
59impl Default for OutlierOptions {
60 fn default() -> Self {
61 Self {
62 sampling: SamplingMode::Full,
63 max_examples: 5,
64 }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq)]
69pub struct OutlierStats {
70 pub method: OutlierMethod,
71 pub mean: Option<f64>,
72 pub std: Option<f64>,
73 pub median: Option<f64>,
74 pub mad: Option<f64>,
75 pub q1: Option<f64>,
76 pub q3: Option<f64>,
77 pub lower_fence: Option<f64>,
78 pub upper_fence: Option<f64>,
79}
80
81#[derive(Debug, Clone, PartialEq)]
82pub struct OutlierReport {
83 pub column: String,
84 pub sampling: SamplingMode,
85 pub row_count: usize,
86 pub outlier_count: usize,
87 pub stats: OutlierStats,
88 pub examples: Vec<f64>,
89}
90
91pub fn detect_outliers_dataset(
92 ds: &DataSet,
93 column: &str,
94 method: OutlierMethod,
95 options: &OutlierOptions,
96) -> IngestionResult<OutlierReport> {
97 let df = DataFrame::from_dataset(ds)?;
98 detect_outliers_frame(&df, column, method, options)
99}
100
101pub fn detect_outliers_frame(
102 df: &DataFrame,
103 column: &str,
104 method: OutlierMethod,
105 options: &OutlierOptions,
106) -> IngestionResult<OutlierReport> {
107 let mut lf = df.lazy_clone();
108 lf = match options.sampling {
109 SamplingMode::Full => lf,
110 SamplingMode::Head(n) => lf.limit(n as IdxSize),
111 };
112
113 let stats_df = match method {
115 OutlierMethod::ZScore { .. } => lf
116 .clone()
117 .select([
118 len().alias("__rows"),
119 col(column).mean().alias("__mean"),
120 col(column).std(1).alias("__std"),
121 ])
122 .collect(),
123 OutlierMethod::Iqr { .. } => lf
124 .clone()
125 .select([
126 len().alias("__rows"),
127 col(column)
128 .quantile(lit(0.25), QuantileMethod::Nearest)
129 .alias("__q1"),
130 col(column)
131 .quantile(lit(0.75), QuantileMethod::Nearest)
132 .alias("__q3"),
133 ])
134 .collect(),
135 OutlierMethod::Mad { .. } => lf
136 .clone()
137 .select([
138 len().alias("__rows"),
139 col(column).median().alias("__median"),
140 ])
142 .collect(),
143 }
144 .map_err(|e| {
145 crate::ingestion::polars_bridge::polars_error_to_ingestion(
146 "failed to compute outlier stats",
147 e,
148 )
149 })?;
150
151 let row_count = read_f64(&stats_df, "__rows")?.unwrap_or(0.0) as usize;
152
153 let (stats, predicate) = match method {
154 OutlierMethod::ZScore { threshold } => {
155 let mean = read_f64(&stats_df, "__mean")?;
156 let std = read_f64(&stats_df, "__std")?;
157 let pred = match (mean, std) {
158 (Some(m), Some(s)) if s > 0.0 => {
159 ((col(column) - lit(m)) / lit(s)).abs().gt(lit(threshold))
160 }
161 _ => lit(false),
162 };
163 (
164 OutlierStats {
165 method,
166 mean,
167 std,
168 median: None,
169 mad: None,
170 q1: None,
171 q3: None,
172 lower_fence: None,
173 upper_fence: None,
174 },
175 pred,
176 )
177 }
178 OutlierMethod::Iqr { k } => {
179 let q1 = read_f64(&stats_df, "__q1")?;
180 let q3 = read_f64(&stats_df, "__q3")?;
181 let (lower, upper, pred) = match (q1, q3) {
182 (Some(a), Some(b)) => {
183 let iqr = b - a;
184 let lo = a - k * iqr;
185 let hi = b + k * iqr;
186 (
187 Some(lo),
188 Some(hi),
189 col(column).lt(lit(lo)).or(col(column).gt(lit(hi))),
190 )
191 }
192 _ => (None, None, lit(false)),
193 };
194 (
195 OutlierStats {
196 method,
197 mean: None,
198 std: None,
199 median: None,
200 mad: None,
201 q1,
202 q3,
203 lower_fence: lower,
204 upper_fence: upper,
205 },
206 pred,
207 )
208 }
209 OutlierMethod::Mad { threshold } => {
210 let median = read_f64(&stats_df, "__median")?;
211 let mad = if let Some(m) = median {
213 let mad_df = lf
214 .clone()
215 .select([(col(column) - lit(m)).abs().median().alias("__mad")])
216 .collect()
217 .map_err(|e| {
218 crate::ingestion::polars_bridge::polars_error_to_ingestion(
219 "failed to compute MAD",
220 e,
221 )
222 })?;
223 read_f64(&mad_df, "__mad")?
224 } else {
225 None
226 };
227 let pred = match (median, mad) {
228 (Some(m), Some(d)) if d > 0.0 => {
229 (lit(0.6745) * (col(column) - lit(m)).abs() / lit(d)).gt(lit(threshold))
230 }
231 _ => lit(false),
232 };
233 (
234 OutlierStats {
235 method,
236 mean: None,
237 std: None,
238 median,
239 mad,
240 q1: None,
241 q3: None,
242 lower_fence: None,
243 upper_fence: None,
244 },
245 pred,
246 )
247 }
248 };
249
250 let outlier_count_df = lf
252 .clone()
253 .filter(predicate.clone())
254 .select([len().alias("__outliers")])
255 .collect()
256 .map_err(|e| {
257 crate::ingestion::polars_bridge::polars_error_to_ingestion(
258 "failed to count outliers",
259 e,
260 )
261 })?;
262 let outlier_count = read_f64(&outlier_count_df, "__outliers")?.unwrap_or(0.0) as usize;
263
264 let examples = if outlier_count > 0 && options.max_examples > 0 {
265 let ex_df = lf
266 .clone()
267 .filter(predicate)
268 .select([col(column)])
269 .limit(options.max_examples as IdxSize)
270 .collect()
271 .map_err(|e| {
272 crate::ingestion::polars_bridge::polars_error_to_ingestion(
273 "failed to collect outlier examples",
274 e,
275 )
276 })?;
277 let s = ex_df
278 .column(column)
279 .map_err(|e| {
280 crate::ingestion::polars_bridge::polars_error_to_ingestion(
281 "missing outlier column",
282 e,
283 )
284 })?
285 .as_materialized_series();
286 series_to_f64_vec(s, options.max_examples)?
287 } else {
288 Vec::new()
289 };
290
291 Ok(OutlierReport {
292 column: column.to_string(),
293 sampling: options.sampling,
294 row_count,
295 outlier_count,
296 stats,
297 examples,
298 })
299}
300
301pub fn render_outlier_report_json(rep: &OutlierReport) -> IngestionResult<String> {
302 serde_json::to_string_pretty(&serde_json::json!({
303 "column": rep.column,
304 "sampling": format!("{:?}", rep.sampling),
305 "row_count": rep.row_count,
306 "outlier_count": rep.outlier_count,
307 "stats": {
308 "method": format!("{:?}", rep.stats.method),
309 "mean": rep.stats.mean,
310 "std": rep.stats.std,
311 "median": rep.stats.median,
312 "mad": rep.stats.mad,
313 "q1": rep.stats.q1,
314 "q3": rep.stats.q3,
315 "lower_fence": rep.stats.lower_fence,
316 "upper_fence": rep.stats.upper_fence,
317 },
318 "examples": rep.examples,
319 }))
320 .map_err(|e| IngestionError::SchemaMismatch {
321 message: format!("failed to serialize outlier report json: {e}"),
322 })
323}
324
325pub fn render_outlier_report_markdown(rep: &OutlierReport) -> String {
326 let mut out = String::new();
327 out.push_str("## Outlier report\n\n");
328 out.push_str(&format!("- Column: `{}`\n", rep.column));
329 out.push_str(&format!("- Rows profiled: **{}**\n", rep.row_count));
330 out.push_str(&format!("- Outliers: **{}**\n\n", rep.outlier_count));
331 out.push_str("### Stats\n\n");
332 out.push_str(&format!(
333 "- Method: `{}`\n",
334 format!("{:?}", rep.stats.method)
335 ));
336 if let Some(v) = rep.stats.mean {
337 out.push_str(&format!("- mean: `{v:.6}`\n"));
338 }
339 if let Some(v) = rep.stats.std {
340 out.push_str(&format!("- std: `{v:.6}`\n"));
341 }
342 if let Some(v) = rep.stats.median {
343 out.push_str(&format!("- median: `{v:.6}`\n"));
344 }
345 if let Some(v) = rep.stats.mad {
346 out.push_str(&format!("- mad: `{v:.6}`\n"));
347 }
348 if let (Some(a), Some(b)) = (rep.stats.lower_fence, rep.stats.upper_fence) {
349 out.push_str(&format!("- fences: `[{a:.6}, {b:.6}]`\n"));
350 }
351 if !rep.examples.is_empty() {
352 out.push_str("\n### Examples\n\n");
353 for v in &rep.examples {
354 out.push_str(&format!("- `{v}`\n"));
355 }
356 }
357 out
358}
359
360fn read_f64(df: &polars::prelude::DataFrame, name: &str) -> IngestionResult<Option<f64>> {
361 let col = df
362 .column(name)
363 .map_err(|e| {
364 crate::ingestion::polars_bridge::polars_error_to_ingestion("missing stats column", e)
365 })?
366 .as_materialized_series();
367 let av = col.get(0).map_err(|e| IngestionError::Engine {
368 message: "failed to read outlier stat".to_string(),
369 source: Box::new(e),
370 })?;
371 Ok(match av {
372 AnyValue::Null => None,
373 AnyValue::Float64(v) => Some(v),
374 AnyValue::Float32(v) => Some(v as f64),
375 AnyValue::Int64(v) => Some(v as f64),
376 AnyValue::UInt64(v) => Some(v as f64),
377 AnyValue::Int32(v) => Some(v as f64),
378 AnyValue::UInt32(v) => Some(v as f64),
379 other => {
380 return Err(IngestionError::SchemaMismatch {
381 message: format!("expected numeric stat value, got {other}"),
382 });
383 }
384 })
385}
386
387fn series_to_f64_vec(s: &Series, max: usize) -> IngestionResult<Vec<f64>> {
388 let n = usize::min(max, s.len());
389 let mut out = Vec::with_capacity(n);
390 for i in 0..n {
391 let av = s.get(i).map_err(|e| IngestionError::Engine {
392 message: "failed to read outlier example".to_string(),
393 source: Box::new(e),
394 })?;
395 match av {
396 AnyValue::Null => {}
397 AnyValue::Float64(v) => out.push(v),
398 AnyValue::Float32(v) => out.push(v as f64),
399 AnyValue::Int64(v) => out.push(v as f64),
400 AnyValue::Int32(v) => out.push(v as f64),
401 other => {
402 return Err(IngestionError::SchemaMismatch {
403 message: format!("expected numeric outlier example, got {other}"),
404 });
405 }
406 }
407 }
408 Ok(out)
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use crate::types::Value;
415 use crate::types::{DataType, Field, Schema};
416
417 fn ds() -> DataSet {
418 DataSet::new(
419 Schema::new(vec![Field::new("x", DataType::Float64)]),
420 vec![
421 vec![Value::Float64(1.0)],
422 vec![Value::Float64(2.0)],
423 vec![Value::Float64(3.0)],
424 vec![Value::Float64(1000.0)],
425 ],
426 )
427 }
428
429 #[test]
430 fn outlier_iqr_finds_extreme_value_and_renders() {
431 let rep = detect_outliers_dataset(
432 &ds(),
433 "x",
434 OutlierMethod::Iqr { k: 1.5 },
435 &OutlierOptions {
436 sampling: SamplingMode::Full,
437 max_examples: 3,
438 },
439 )
440 .unwrap();
441 assert!(rep.outlier_count >= 1);
442 let json = render_outlier_report_json(&rep).unwrap();
443 assert!(json.contains("\"outlier_count\""));
444 let md = render_outlier_report_markdown(&rep);
445 assert!(md.contains("## Outlier report"));
446 }
447}