1use crate::error::{IngestionError, IngestionResult};
33use crate::pipeline::DataFrame;
34use crate::types::{DataSet, DataType};
35
36use polars::prelude::*;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum SamplingMode {
41 Full,
43 Head(usize),
45}
46
47impl Default for SamplingMode {
48 fn default() -> Self {
49 Self::Full
50 }
51}
52
53#[derive(Debug, Clone, PartialEq)]
55pub struct ProfileOptions {
56 pub sampling: SamplingMode,
57 pub quantiles: Vec<f64>,
59}
60
61impl Default for ProfileOptions {
62 fn default() -> Self {
63 Self {
64 sampling: SamplingMode::Full,
65 quantiles: vec![0.5, 0.95],
66 }
67 }
68}
69
70#[derive(Debug, Clone, PartialEq)]
71pub struct NumericProfile {
72 pub min: Option<f64>,
73 pub max: Option<f64>,
74 pub mean: Option<f64>,
75 pub quantiles: Vec<(f64, Option<f64>)>,
76}
77
78#[derive(Debug, Clone, PartialEq)]
79pub struct ColumnProfile {
80 pub name: String,
81 pub data_type: DataType,
82 pub null_count: usize,
83 pub distinct_count: usize,
84 pub numeric: Option<NumericProfile>,
85}
86
87#[derive(Debug, Clone, PartialEq)]
88pub struct ProfileReport {
89 pub sampling: SamplingMode,
90 pub row_count: usize,
92 pub columns: Vec<ColumnProfile>,
93}
94
95pub fn render_profile_report_json(report: &ProfileReport) -> IngestionResult<String> {
97 let sampling = match report.sampling {
98 SamplingMode::Full => "full",
99 SamplingMode::Head(_) => "head",
100 };
101
102 let cols: Vec<serde_json::Value> = report
103 .columns
104 .iter()
105 .map(|c| {
106 let dtype = match c.data_type {
107 DataType::Int64 => "int64",
108 DataType::Float64 => "float64",
109 DataType::Bool => "bool",
110 DataType::Utf8 => "utf8",
111 };
112 let numeric = c.numeric.as_ref().map(|n| {
113 serde_json::json!({
114 "min": n.min,
115 "max": n.max,
116 "mean": n.mean,
117 "quantiles": n.quantiles.iter().map(|(q, v)| serde_json::json!({"q": q, "value": v})).collect::<Vec<_>>(),
118 })
119 });
120
121 serde_json::json!({
122 "name": c.name,
123 "data_type": dtype,
124 "null_count": c.null_count,
125 "distinct_count": c.distinct_count,
126 "numeric": numeric,
127 })
128 })
129 .collect();
130
131 serde_json::to_string_pretty(&serde_json::json!({
132 "sampling": sampling,
133 "row_count": report.row_count,
134 "columns": cols,
135 }))
136 .map_err(|e| IngestionError::SchemaMismatch {
137 message: format!("failed to serialize profile report json: {e}"),
138 })
139}
140
141pub fn render_profile_report_markdown(report: &ProfileReport) -> String {
143 let sampling = match report.sampling {
144 SamplingMode::Full => "Full",
145 SamplingMode::Head(n) => {
146 return format!(
147 "## Profile report\n\n- Sampling: **Head({n})**\n- Rows profiled: **{}**\n\n{}",
148 report.row_count,
149 render_columns_markdown(&report.columns)
150 );
151 }
152 };
153
154 format!(
155 "## Profile report\n\n- Sampling: **{sampling}**\n- Rows profiled: **{}**\n\n{}",
156 report.row_count,
157 render_columns_markdown(&report.columns)
158 )
159}
160
161fn render_columns_markdown(cols: &[ColumnProfile]) -> String {
162 let mut out = String::new();
163 out.push_str("### Columns\n\n");
164 out.push_str("| column | type | nulls | distinct (non-null) | min | max | mean |\n");
165 out.push_str("|---|---:|---:|---:|---:|---:|---:|\n");
166 for c in cols {
167 let dtype = match c.data_type {
168 DataType::Int64 => "Int64",
169 DataType::Float64 => "Float64",
170 DataType::Bool => "Bool",
171 DataType::Utf8 => "Utf8",
172 };
173 let (min, max, mean) = match &c.numeric {
174 Some(n) => (
175 n.min
176 .map(|v| format!("{v:.4}"))
177 .unwrap_or_else(|| "—".to_string()),
178 n.max
179 .map(|v| format!("{v:.4}"))
180 .unwrap_or_else(|| "—".to_string()),
181 n.mean
182 .map(|v| format!("{v:.4}"))
183 .unwrap_or_else(|| "—".to_string()),
184 ),
185 None => ("—".to_string(), "—".to_string(), "—".to_string()),
186 };
187 out.push_str(&format!(
188 "| `{}` | {} | {} | {} | {} | {} | {} |\n",
189 c.name, dtype, c.null_count, c.distinct_count, min, max, mean
190 ));
191 }
192 out
193}
194
195pub fn profile_dataset(ds: &DataSet, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
197 let df = DataFrame::from_dataset(ds)?;
198 profile_frame(&df, options)
199}
200
201pub fn profile_frame(df: &DataFrame, options: &ProfileOptions) -> IngestionResult<ProfileReport> {
203 let mut lf = df.lazy_clone();
204
205 lf = match options.sampling {
206 SamplingMode::Full => lf,
207 SamplingMode::Head(n) => lf.limit(n as IdxSize),
208 };
209
210 let schema = lf.clone().collect_schema().map_err(|e| {
211 crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to collect schema", e)
212 })?;
213
214 let cols: Vec<(String, DataType, bool)> = schema
215 .iter_fields()
216 .map(|f| {
217 let (dt, is_numeric) = polars_dtype_to_profile_dtype(f.dtype());
218 (f.name().to_string(), dt, is_numeric)
219 })
220 .collect();
221
222 if cols.is_empty() {
223 return Ok(ProfileReport {
224 sampling: options.sampling,
225 row_count: 0,
226 columns: Vec::new(),
227 });
228 }
229
230 let mut exprs: Vec<Expr> = Vec::new();
232 exprs.push(len().alias("__rows"));
233
234 for (name, _dt, is_numeric) in &cols {
235 exprs.push(col(name).null_count().alias(&format!("{name}__nulls")));
236 exprs.push(
238 col(name)
239 .drop_nulls()
240 .n_unique()
241 .alias(&format!("{name}__distinct")),
242 );
243 if *is_numeric {
244 exprs.push(col(name).min().alias(&format!("{name}__min")));
245 exprs.push(col(name).max().alias(&format!("{name}__max")));
246 exprs.push(col(name).mean().alias(&format!("{name}__mean")));
247 for q in &options.quantiles {
248 if !(0.0..=1.0).contains(q) {
249 return Err(IngestionError::SchemaMismatch {
250 message: format!("invalid quantile {q}; expected value in [0.0, 1.0]"),
251 });
252 }
253 let pct = (q * 100.0).round() as i64;
254 exprs.push(
255 col(name)
256 .quantile(lit(*q), QuantileMethod::Nearest)
257 .alias(&format!("{name}__p{pct}")),
258 );
259 }
260 }
261 }
262
263 let agg = lf.select(exprs).collect().map_err(|e| {
264 crate::ingestion::polars_bridge::polars_error_to_ingestion("failed to compute profile", e)
265 })?;
266
267 let row_count_col = agg.column("__rows").map_err(|e| {
268 crate::ingestion::polars_bridge::polars_error_to_ingestion(
269 "profiling missing __rows column",
270 e,
271 )
272 })?;
273 let row_count = any_to_usize(row_count_col.as_materialized_series(), 0)?.unwrap_or(0);
274
275 let mut out_cols: Vec<ColumnProfile> = Vec::with_capacity(cols.len());
276 for (name, dt, is_numeric) in cols {
277 let nulls_col = agg.column(&format!("{name}__nulls")).map_err(|e| {
278 crate::ingestion::polars_bridge::polars_error_to_ingestion(
279 &format!("profiling missing null_count for '{name}'"),
280 e,
281 )
282 })?;
283 let null_count = any_to_usize(nulls_col.as_materialized_series(), 0)?.unwrap_or(0);
284
285 let distinct_col = agg.column(&format!("{name}__distinct")).map_err(|e| {
286 crate::ingestion::polars_bridge::polars_error_to_ingestion(
287 &format!("profiling missing distinct_count for '{name}'"),
288 e,
289 )
290 })?;
291 let distinct_count = any_to_usize(distinct_col.as_materialized_series(), 0)?.unwrap_or(0);
292
293 let numeric = if is_numeric {
294 let min = any_to_f64(
295 agg.column(&format!("{name}__min"))
296 .map_err(|e| {
297 crate::ingestion::polars_bridge::polars_error_to_ingestion(
298 "profiling missing min",
299 e,
300 )
301 })?
302 .as_materialized_series(),
303 0,
304 )?;
305 let max = any_to_f64(
306 agg.column(&format!("{name}__max"))
307 .map_err(|e| {
308 crate::ingestion::polars_bridge::polars_error_to_ingestion(
309 "profiling missing max",
310 e,
311 )
312 })?
313 .as_materialized_series(),
314 0,
315 )?;
316 let mean = any_to_f64(
317 agg.column(&format!("{name}__mean"))
318 .map_err(|e| {
319 crate::ingestion::polars_bridge::polars_error_to_ingestion(
320 "profiling missing mean",
321 e,
322 )
323 })?
324 .as_materialized_series(),
325 0,
326 )?;
327 let mut qs = Vec::with_capacity(options.quantiles.len());
328 for q in &options.quantiles {
329 let pct = (q * 100.0).round() as i64;
330 let v = any_to_f64(
331 agg.column(&format!("{name}__p{pct}"))
332 .map_err(|e| {
333 crate::ingestion::polars_bridge::polars_error_to_ingestion(
334 "profiling missing quantile",
335 e,
336 )
337 })?
338 .as_materialized_series(),
339 0,
340 )?;
341 qs.push((*q, v));
342 }
343 Some(NumericProfile {
344 min,
345 max,
346 mean,
347 quantiles: qs,
348 })
349 } else {
350 None
351 };
352
353 out_cols.push(ColumnProfile {
354 name,
355 data_type: dt,
356 null_count,
357 distinct_count,
358 numeric,
359 });
360 }
361
362 Ok(ProfileReport {
363 sampling: options.sampling,
364 row_count,
365 columns: out_cols,
366 })
367}
368
369fn polars_dtype_to_profile_dtype(dt: &polars::datatypes::DataType) -> (DataType, bool) {
370 use polars::datatypes::DataType as P;
371 match dt {
372 P::Boolean => (DataType::Bool, false),
373 P::String => (DataType::Utf8, false),
374 P::Int8 | P::Int16 | P::Int32 | P::Int64 | P::UInt8 | P::UInt16 | P::UInt32 | P::UInt64 => {
375 (DataType::Int64, true)
376 }
377 P::Float32 | P::Float64 => (DataType::Float64, true),
378 _ => (DataType::Utf8, false),
379 }
380}
381
382fn any_to_usize(s: &Series, idx: usize) -> IngestionResult<Option<usize>> {
383 let av = s.get(idx).map_err(|e| IngestionError::Engine {
384 message: "failed to read profile value".to_string(),
385 source: Box::new(e),
386 })?;
387 Ok(match av {
388 AnyValue::Null => None,
389 AnyValue::Int64(v) => Some(v.max(0) as usize),
390 AnyValue::UInt64(v) => Some(v as usize),
391 AnyValue::Int32(v) => Some((v as i64).max(0) as usize),
392 AnyValue::UInt32(v) => Some(v as usize),
393 AnyValue::Int16(v) => Some((v as i64).max(0) as usize),
394 AnyValue::UInt16(v) => Some(v as usize),
395 AnyValue::Int8(v) => Some((v as i64).max(0) as usize),
396 AnyValue::UInt8(v) => Some(v as usize),
397 other => {
398 return Err(IngestionError::SchemaMismatch {
399 message: format!("expected integer-like profile value, got {other}"),
400 });
401 }
402 })
403}
404
405fn any_to_f64(s: &Series, idx: usize) -> IngestionResult<Option<f64>> {
406 let av = s.get(idx).map_err(|e| IngestionError::Engine {
407 message: "failed to read profile value".to_string(),
408 source: Box::new(e),
409 })?;
410 Ok(match av {
411 AnyValue::Null => None,
412 AnyValue::Float64(v) => Some(v),
413 AnyValue::Float32(v) => Some(v as f64),
414 AnyValue::Int64(v) => Some(v as f64),
415 AnyValue::UInt64(v) => Some(v as f64),
416 AnyValue::Int32(v) => Some(v as f64),
417 AnyValue::UInt32(v) => Some(v as f64),
418 other => {
419 return Err(IngestionError::SchemaMismatch {
420 message: format!("expected numeric profile value, got {other}"),
421 });
422 }
423 })
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use crate::types::Value;
430 use crate::types::{Field, Schema};
431
432 fn tiny() -> DataSet {
433 let schema = Schema::new(vec![
434 Field::new("id", DataType::Int64),
435 Field::new("score", DataType::Float64),
436 Field::new("name", DataType::Utf8),
437 ]);
438 DataSet::new(
439 schema,
440 vec![
441 vec![
442 Value::Int64(1),
443 Value::Float64(10.0),
444 Value::Utf8("A".to_string()),
445 ],
446 vec![Value::Int64(2), Value::Null, Value::Utf8("A".to_string())],
447 vec![
448 Value::Int64(3),
449 Value::Float64(30.0),
450 Value::Utf8("B".to_string()),
451 ],
452 ],
453 )
454 }
455
456 #[test]
457 fn profiling_counts_rows_nulls_and_distinct() {
458 let ds = tiny();
459 let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
460 assert_eq!(rep.row_count, 3);
461
462 let score = rep.columns.iter().find(|c| c.name == "score").unwrap();
463 assert_eq!(score.null_count, 1);
464 assert_eq!(score.distinct_count, 2);
465 assert!(score.numeric.is_some());
466
467 let name = rep.columns.iter().find(|c| c.name == "name").unwrap();
468 assert_eq!(name.null_count, 0);
469 assert_eq!(name.distinct_count, 2);
470 assert!(name.numeric.is_none());
471 }
472
473 #[test]
474 fn profiling_supports_head_sampling() {
475 let ds = tiny();
476 let rep = profile_dataset(
477 &ds,
478 &ProfileOptions {
479 sampling: SamplingMode::Head(2),
480 quantiles: vec![0.5],
481 },
482 )
483 .unwrap();
484 assert_eq!(rep.row_count, 2);
485 }
486
487 #[test]
488 fn profile_report_renders_json_and_markdown() {
489 let ds = tiny();
490 let rep = profile_dataset(&ds, &ProfileOptions::default()).unwrap();
491 let json = render_profile_report_json(&rep).unwrap();
492 assert!(json.contains("\"row_count\""));
493 assert!(json.contains("\"columns\""));
494
495 let md = render_profile_report_markdown(&rep);
496 assert!(md.contains("## Profile report"));
497 assert!(md.contains("### Columns"));
498 }
499}