rust_data_processing/outliers/
mod.rs

1//! Outlier detection (Phase 1).
2//!
3//! Provides a few common numeric outlier detection primitives backed by Polars expressions.
4//!
5//! ## Example
6//!
7//! ```rust
8//! use rust_data_processing::outliers::{detect_outliers_dataset, OutlierMethod, OutlierOptions};
9//! use rust_data_processing::profiling::SamplingMode;
10//! use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
11//!
12//! # fn main() -> Result<(), rust_data_processing::IngestionError> {
13//! let ds = DataSet::new(
14//!     Schema::new(vec![Field::new("x", DataType::Float64)]),
15//!     vec![
16//!         vec![Value::Float64(1.0)],
17//!         vec![Value::Float64(1.0)],
18//!         vec![Value::Float64(1.0)],
19//!         vec![Value::Float64(1.0)],
20//!         vec![Value::Float64(1000.0)],
21//!     ],
22//! );
23//!
24//! let rep = detect_outliers_dataset(
25//!     &ds,
26//!     "x",
27//!     OutlierMethod::Iqr { k: 1.5 },
28//!     &OutlierOptions { sampling: SamplingMode::Full, max_examples: 3 },
29//! )?;
30//!
31//! assert!(rep.outlier_count >= 1);
32//! # Ok(())
33//! # }
34//! ```
35
36use crate::error::{IngestionError, IngestionResult};
37use crate::pipeline::DataFrame;
38use crate::profiling::SamplingMode;
39use crate::types::DataSet;
40
41use polars::prelude::*;
42
43#[derive(Debug, Clone, Copy, PartialEq)]
44pub enum OutlierMethod {
45    /// Standard z-score outliers: \(|x - mean| / std > threshold\).
46    ZScore { threshold: f64 },
47    /// Tukey fences using IQR: \(x < Q1 - k·IQR\) or \(x > Q3 + k·IQR\).
48    Iqr { k: f64 },
49    /// Median absolute deviation (MAD) based score: \(0.6745·|x - median| / MAD > threshold\).
50    Mad { threshold: f64 },
51}
52
53#[derive(Debug, Clone, PartialEq)]
54pub struct OutlierOptions {
55    pub sampling: SamplingMode,
56    pub max_examples: usize,
57}
58
59impl Default for OutlierOptions {
60    fn default() -> Self {
61        Self {
62            sampling: SamplingMode::Full,
63            max_examples: 5,
64        }
65    }
66}
67
68#[derive(Debug, Clone, PartialEq)]
69pub struct OutlierStats {
70    pub method: OutlierMethod,
71    pub mean: Option<f64>,
72    pub std: Option<f64>,
73    pub median: Option<f64>,
74    pub mad: Option<f64>,
75    pub q1: Option<f64>,
76    pub q3: Option<f64>,
77    pub lower_fence: Option<f64>,
78    pub upper_fence: Option<f64>,
79}
80
81#[derive(Debug, Clone, PartialEq)]
82pub struct OutlierReport {
83    pub column: String,
84    pub sampling: SamplingMode,
85    pub row_count: usize,
86    pub outlier_count: usize,
87    pub stats: OutlierStats,
88    pub examples: Vec<f64>,
89}
90
91pub fn detect_outliers_dataset(
92    ds: &DataSet,
93    column: &str,
94    method: OutlierMethod,
95    options: &OutlierOptions,
96) -> IngestionResult<OutlierReport> {
97    let df = DataFrame::from_dataset(ds)?;
98    detect_outliers_frame(&df, column, method, options)
99}
100
101pub fn detect_outliers_frame(
102    df: &DataFrame,
103    column: &str,
104    method: OutlierMethod,
105    options: &OutlierOptions,
106) -> IngestionResult<OutlierReport> {
107    let mut lf = df.lazy_clone();
108    lf = match options.sampling {
109        SamplingMode::Full => lf,
110        SamplingMode::Head(n) => lf.limit(n as IdxSize),
111    };
112
113    // Compute stats in one collect.
114    let stats_df = match method {
115        OutlierMethod::ZScore { .. } => lf
116            .clone()
117            .select([
118                len().alias("__rows"),
119                col(column).mean().alias("__mean"),
120                col(column).std(1).alias("__std"),
121            ])
122            .collect(),
123        OutlierMethod::Iqr { .. } => lf
124            .clone()
125            .select([
126                len().alias("__rows"),
127                col(column)
128                    .quantile(lit(0.25), QuantileMethod::Nearest)
129                    .alias("__q1"),
130                col(column)
131                    .quantile(lit(0.75), QuantileMethod::Nearest)
132                    .alias("__q3"),
133            ])
134            .collect(),
135        OutlierMethod::Mad { .. } => lf
136            .clone()
137            .select([
138                len().alias("__rows"),
139                col(column).median().alias("__median"),
140                // MAD requires a second pass; we compute median first, then derive MAD below.
141            ])
142            .collect(),
143    }
144    .map_err(|e| {
145        crate::ingestion::polars_bridge::polars_error_to_ingestion(
146            "failed to compute outlier stats",
147            e,
148        )
149    })?;
150
151    let row_count = read_f64(&stats_df, "__rows")?.unwrap_or(0.0) as usize;
152
153    let (stats, predicate) = match method {
154        OutlierMethod::ZScore { threshold } => {
155            let mean = read_f64(&stats_df, "__mean")?;
156            let std = read_f64(&stats_df, "__std")?;
157            let pred = match (mean, std) {
158                (Some(m), Some(s)) if s > 0.0 => {
159                    ((col(column) - lit(m)) / lit(s)).abs().gt(lit(threshold))
160                }
161                _ => lit(false),
162            };
163            (
164                OutlierStats {
165                    method,
166                    mean,
167                    std,
168                    median: None,
169                    mad: None,
170                    q1: None,
171                    q3: None,
172                    lower_fence: None,
173                    upper_fence: None,
174                },
175                pred,
176            )
177        }
178        OutlierMethod::Iqr { k } => {
179            let q1 = read_f64(&stats_df, "__q1")?;
180            let q3 = read_f64(&stats_df, "__q3")?;
181            let (lower, upper, pred) = match (q1, q3) {
182                (Some(a), Some(b)) => {
183                    let iqr = b - a;
184                    let lo = a - k * iqr;
185                    let hi = b + k * iqr;
186                    (
187                        Some(lo),
188                        Some(hi),
189                        col(column).lt(lit(lo)).or(col(column).gt(lit(hi))),
190                    )
191                }
192                _ => (None, None, lit(false)),
193            };
194            (
195                OutlierStats {
196                    method,
197                    mean: None,
198                    std: None,
199                    median: None,
200                    mad: None,
201                    q1,
202                    q3,
203                    lower_fence: lower,
204                    upper_fence: upper,
205                },
206                pred,
207            )
208        }
209        OutlierMethod::Mad { threshold } => {
210            let median = read_f64(&stats_df, "__median")?;
211            // Compute MAD on the same sampled lf.
212            let mad = if let Some(m) = median {
213                let mad_df = lf
214                    .clone()
215                    .select([(col(column) - lit(m)).abs().median().alias("__mad")])
216                    .collect()
217                    .map_err(|e| {
218                        crate::ingestion::polars_bridge::polars_error_to_ingestion(
219                            "failed to compute MAD",
220                            e,
221                        )
222                    })?;
223                read_f64(&mad_df, "__mad")?
224            } else {
225                None
226            };
227            let pred = match (median, mad) {
228                (Some(m), Some(d)) if d > 0.0 => {
229                    (lit(0.6745) * (col(column) - lit(m)).abs() / lit(d)).gt(lit(threshold))
230                }
231                _ => lit(false),
232            };
233            (
234                OutlierStats {
235                    method,
236                    mean: None,
237                    std: None,
238                    median,
239                    mad,
240                    q1: None,
241                    q3: None,
242                    lower_fence: None,
243                    upper_fence: None,
244                },
245                pred,
246            )
247        }
248    };
249
250    // Count outliers and collect examples.
251    let outlier_count_df = lf
252        .clone()
253        .filter(predicate.clone())
254        .select([len().alias("__outliers")])
255        .collect()
256        .map_err(|e| {
257            crate::ingestion::polars_bridge::polars_error_to_ingestion(
258                "failed to count outliers",
259                e,
260            )
261        })?;
262    let outlier_count = read_f64(&outlier_count_df, "__outliers")?.unwrap_or(0.0) as usize;
263
264    let examples = if outlier_count > 0 && options.max_examples > 0 {
265        let ex_df = lf
266            .clone()
267            .filter(predicate)
268            .select([col(column)])
269            .limit(options.max_examples as IdxSize)
270            .collect()
271            .map_err(|e| {
272                crate::ingestion::polars_bridge::polars_error_to_ingestion(
273                    "failed to collect outlier examples",
274                    e,
275                )
276            })?;
277        let s = ex_df
278            .column(column)
279            .map_err(|e| {
280                crate::ingestion::polars_bridge::polars_error_to_ingestion(
281                    "missing outlier column",
282                    e,
283                )
284            })?
285            .as_materialized_series();
286        series_to_f64_vec(s, options.max_examples)?
287    } else {
288        Vec::new()
289    };
290
291    Ok(OutlierReport {
292        column: column.to_string(),
293        sampling: options.sampling,
294        row_count,
295        outlier_count,
296        stats,
297        examples,
298    })
299}
300
301pub fn render_outlier_report_json(rep: &OutlierReport) -> IngestionResult<String> {
302    serde_json::to_string_pretty(&serde_json::json!({
303        "column": rep.column,
304        "sampling": format!("{:?}", rep.sampling),
305        "row_count": rep.row_count,
306        "outlier_count": rep.outlier_count,
307        "stats": {
308            "method": format!("{:?}", rep.stats.method),
309            "mean": rep.stats.mean,
310            "std": rep.stats.std,
311            "median": rep.stats.median,
312            "mad": rep.stats.mad,
313            "q1": rep.stats.q1,
314            "q3": rep.stats.q3,
315            "lower_fence": rep.stats.lower_fence,
316            "upper_fence": rep.stats.upper_fence,
317        },
318        "examples": rep.examples,
319    }))
320    .map_err(|e| IngestionError::SchemaMismatch {
321        message: format!("failed to serialize outlier report json: {e}"),
322    })
323}
324
325pub fn render_outlier_report_markdown(rep: &OutlierReport) -> String {
326    let mut out = String::new();
327    out.push_str("## Outlier report\n\n");
328    out.push_str(&format!("- Column: `{}`\n", rep.column));
329    out.push_str(&format!("- Rows profiled: **{}**\n", rep.row_count));
330    out.push_str(&format!("- Outliers: **{}**\n\n", rep.outlier_count));
331    out.push_str("### Stats\n\n");
332    out.push_str(&format!(
333        "- Method: `{}`\n",
334        format!("{:?}", rep.stats.method)
335    ));
336    if let Some(v) = rep.stats.mean {
337        out.push_str(&format!("- mean: `{v:.6}`\n"));
338    }
339    if let Some(v) = rep.stats.std {
340        out.push_str(&format!("- std: `{v:.6}`\n"));
341    }
342    if let Some(v) = rep.stats.median {
343        out.push_str(&format!("- median: `{v:.6}`\n"));
344    }
345    if let Some(v) = rep.stats.mad {
346        out.push_str(&format!("- mad: `{v:.6}`\n"));
347    }
348    if let (Some(a), Some(b)) = (rep.stats.lower_fence, rep.stats.upper_fence) {
349        out.push_str(&format!("- fences: `[{a:.6}, {b:.6}]`\n"));
350    }
351    if !rep.examples.is_empty() {
352        out.push_str("\n### Examples\n\n");
353        for v in &rep.examples {
354            out.push_str(&format!("- `{v}`\n"));
355        }
356    }
357    out
358}
359
360fn read_f64(df: &polars::prelude::DataFrame, name: &str) -> IngestionResult<Option<f64>> {
361    let col = df
362        .column(name)
363        .map_err(|e| {
364            crate::ingestion::polars_bridge::polars_error_to_ingestion("missing stats column", e)
365        })?
366        .as_materialized_series();
367    let av = col.get(0).map_err(|e| IngestionError::Engine {
368        message: "failed to read outlier stat".to_string(),
369        source: Box::new(e),
370    })?;
371    Ok(match av {
372        AnyValue::Null => None,
373        AnyValue::Float64(v) => Some(v),
374        AnyValue::Float32(v) => Some(v as f64),
375        AnyValue::Int64(v) => Some(v as f64),
376        AnyValue::UInt64(v) => Some(v as f64),
377        AnyValue::Int32(v) => Some(v as f64),
378        AnyValue::UInt32(v) => Some(v as f64),
379        other => {
380            return Err(IngestionError::SchemaMismatch {
381                message: format!("expected numeric stat value, got {other}"),
382            });
383        }
384    })
385}
386
387fn series_to_f64_vec(s: &Series, max: usize) -> IngestionResult<Vec<f64>> {
388    let n = usize::min(max, s.len());
389    let mut out = Vec::with_capacity(n);
390    for i in 0..n {
391        let av = s.get(i).map_err(|e| IngestionError::Engine {
392            message: "failed to read outlier example".to_string(),
393            source: Box::new(e),
394        })?;
395        match av {
396            AnyValue::Null => {}
397            AnyValue::Float64(v) => out.push(v),
398            AnyValue::Float32(v) => out.push(v as f64),
399            AnyValue::Int64(v) => out.push(v as f64),
400            AnyValue::Int32(v) => out.push(v as f64),
401            other => {
402                return Err(IngestionError::SchemaMismatch {
403                    message: format!("expected numeric outlier example, got {other}"),
404                });
405            }
406        }
407    }
408    Ok(out)
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use crate::types::Value;
415    use crate::types::{DataType, Field, Schema};
416
417    fn ds() -> DataSet {
418        DataSet::new(
419            Schema::new(vec![Field::new("x", DataType::Float64)]),
420            vec![
421                vec![Value::Float64(1.0)],
422                vec![Value::Float64(2.0)],
423                vec![Value::Float64(3.0)],
424                vec![Value::Float64(1000.0)],
425            ],
426        )
427    }
428
429    #[test]
430    fn outlier_iqr_finds_extreme_value_and_renders() {
431        let rep = detect_outliers_dataset(
432            &ds(),
433            "x",
434            OutlierMethod::Iqr { k: 1.5 },
435            &OutlierOptions {
436                sampling: SamplingMode::Full,
437                max_examples: 3,
438            },
439        )
440        .unwrap();
441        assert!(rep.outlier_count >= 1);
442        let json = render_outlier_report_json(&rep).unwrap();
443        assert!(json.contains("\"outlier_count\""));
444        let md = render_outlier_report_markdown(&rep);
445        assert!(md.contains("## Outlier report"));
446    }
447}