rust_data_processing/ingestion/unified.rs
1//! Unified ingestion entrypoint.
2//!
3//! Most callers should use [`ingest_from_path`], which ingests a file into an in-memory
4//! [`crate::types::DataSet`] using a provided [`crate::types::Schema`].
5//!
6//! - If [`IngestionOptions::format`] is `None`, the ingestion format is inferred from the file
7//! extension.
8//! - If an [`super::observability::IngestionObserver`] is provided, success/failure/alerts are
9//! reported to it.
10
11use std::error::Error as StdError;
12use std::fmt;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::error::{IngestionError, IngestionResult};
17use crate::types::{DataSet, Schema};
18
19use super::observability::{
20 IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
21};
22use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
23use super::{csv, excel, json, parquet};
24use polars::prelude::*;
25
26/// Supported ingestion formats.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum IngestionFormat {
29 /// Comma-separated values.
30 Csv,
31 /// JSON array-of-objects or NDJSON.
32 Json,
33 /// Apache Parquet.
34 Parquet,
35 /// Spreadsheet/workbook formats (feature-gated behind `excel`).
36 Excel,
37}
38
39impl IngestionFormat {
40 /// Parse an ingestion format from a file extension (case-insensitive).
41 pub fn from_extension(ext: &str) -> Option<Self> {
42 match ext.to_ascii_lowercase().as_str() {
43 "csv" => Some(Self::Csv),
44 "json" | "ndjson" => Some(Self::Json),
45 "parquet" | "pq" => Some(Self::Parquet),
46 "xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
47 _ => None,
48 }
49 }
50}
51
52/// How to choose sheet(s) when ingesting an Excel workbook.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum ExcelSheetSelection {
55 /// Ingest the first sheet (default).
56 First,
57 /// Ingest a single named sheet.
58 Sheet(String),
59 /// Ingest all sheets and concatenate rows.
60 AllSheets,
61 /// Ingest only the listed sheets (in order) and concatenate rows.
62 Sheets(Vec<String>),
63}
64
65impl Default for ExcelSheetSelection {
66 fn default() -> Self {
67 Self::First
68 }
69}
70
71/// Options controlling unified ingestion behavior.
72///
73/// Use [`Default`] for common cases.
74#[derive(Clone)]
75pub struct IngestionOptions {
76 /// If `None`, auto-detect format from file extension.
77 pub format: Option<IngestionFormat>,
78 /// Excel-specific options.
79 pub excel_sheet_selection: ExcelSheetSelection,
80 /// Optional observer for logging/alerts.
81 pub observer: Option<Arc<dyn IngestionObserver>>,
82 /// Severity threshold at which `on_alert` is invoked.
83 pub alert_at_or_above: IngestionSeverity,
84}
85
86impl fmt::Debug for IngestionOptions {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("IngestionOptions")
89 .field("format", &self.format)
90 .field("excel_sheet_selection", &self.excel_sheet_selection)
91 .field("observer_set", &self.observer.is_some())
92 .field("alert_at_or_above", &self.alert_at_or_above)
93 .finish()
94 }
95}
96
97impl Default for IngestionOptions {
98 fn default() -> Self {
99 Self {
100 format: None,
101 excel_sheet_selection: ExcelSheetSelection::default(),
102 observer: None,
103 alert_at_or_above: IngestionSeverity::Critical,
104 }
105 }
106}
107
108/// Unified ingestion entry point for path-based sources.
109///
110/// - If `options.format` is `None`, format is inferred from the file extension.
111/// - Use `options.excel_sheet_selection` for Excel multi-tab behavior.
112///
113/// When an observer is configured, this function reports:
114///
115/// - `on_success` on success, with row count stats
116/// - `on_failure` on failure, with a computed severity
117/// - `on_alert` on failure when the computed severity is >= `options.alert_at_or_above`
118///
119/// # Examples
120///
121/// ## CSV (auto-detect by extension)
122///
123/// ```no_run
124/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
125/// use rust_data_processing::types::{DataType, Field, Schema};
126///
127/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
128/// let schema = Schema::new(vec![
129/// Field::new("id", DataType::Int64),
130/// Field::new("name", DataType::Utf8),
131/// ]);
132///
133/// // Uses `.csv` to select CSV ingestion.
134/// let ds = ingest_from_path("people.csv", &schema, &IngestionOptions::default())?;
135/// println!("rows={}", ds.row_count());
136/// # Ok(())
137/// # }
138/// ```
139///
140/// ## JSON (auto-detect by extension, with nested field paths)
141///
142/// ```no_run
143/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
144/// use rust_data_processing::types::{DataType, Field, Schema};
145///
146/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
147/// // JSON supports nested field access via dot paths.
148/// let schema = Schema::new(vec![
149/// Field::new("id", DataType::Int64),
150/// Field::new("user.name", DataType::Utf8),
151/// ]);
152///
153/// let ds = ingest_from_path("events.json", &schema, &IngestionOptions::default())?;
154/// println!("rows={}", ds.row_count());
155/// # Ok(())
156/// # }
157/// ```
158///
159/// ## Parquet (auto-detect by extension)
160///
161/// ```no_run
162/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
163/// use rust_data_processing::types::{DataType, Field, Schema};
164///
165/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
166/// let schema = Schema::new(vec![
167/// Field::new("id", DataType::Int64),
168/// Field::new("active", DataType::Bool),
169/// ]);
170///
171/// let ds = ingest_from_path("data.parquet", &schema, &IngestionOptions::default())?;
172/// println!("rows={}", ds.row_count());
173/// # Ok(())
174/// # }
175/// ```
176///
177/// ## Force a format explicitly (override extension inference)
178///
179/// ```no_run
180/// use rust_data_processing::ingestion::{ingest_from_path, IngestionFormat, IngestionOptions};
181/// use rust_data_processing::types::{DataType, Field, Schema};
182///
183/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
184/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
185///
186/// let opts = IngestionOptions {
187/// format: Some(IngestionFormat::Csv),
188/// ..Default::default()
189/// };
190///
191/// // Useful when a file has no extension or you want to override inference.
192/// let ds = ingest_from_path("input_without_extension", &schema, &opts)?;
193/// println!("rows={}", ds.row_count());
194/// # Ok(())
195/// # }
196/// ```
197///
198/// ## Observability (stderr logging + alert threshold)
199///
200/// ```no_run
201/// use std::sync::Arc;
202///
203/// use rust_data_processing::ingestion::{
204/// ingest_from_path, IngestionOptions, IngestionSeverity, StdErrObserver,
205/// };
206/// use rust_data_processing::types::{DataType, Field, Schema};
207///
208/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
209/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
210///
211/// let opts = IngestionOptions {
212/// observer: Some(Arc::new(StdErrObserver::default())),
213/// alert_at_or_above: IngestionSeverity::Critical,
214/// ..Default::default()
215/// };
216///
217/// // Missing files are treated as Critical and will trigger `on_alert` at this threshold.
218/// let _err = ingest_from_path("does_not_exist.csv", &schema, &opts).unwrap_err();
219/// # Ok(())
220/// # }
221/// ```
222///
223/// ## Excel
224///
225/// Example. Marked `no_run` so it is **compiled** by doctests
226/// (no "not tested" banner), but not executed (it expects a real `workbook.xlsx` file).
227///
228/// ```no_run
229/// use rust_data_processing::ingestion::{
230/// ingest_from_path, ExcelSheetSelection, IngestionFormat, IngestionOptions,
231/// };
232/// use rust_data_processing::types::{DataType, Field, Schema};
233///
234/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
235/// let schema = Schema::new(vec![
236/// Field::new("id", DataType::Int64),
237/// Field::new("name", DataType::Utf8),
238/// ]);
239///
240/// let opts = IngestionOptions {
241/// format: Some(IngestionFormat::Excel),
242/// excel_sheet_selection: ExcelSheetSelection::Sheet("Sheet1".to_string()),
243/// ..Default::default()
244/// };
245///
246/// let ds = ingest_from_path("workbook.xlsx", &schema, &opts)?;
247/// println!("rows={}", ds.row_count());
248/// # Ok(())
249/// # }
250/// ```
251pub fn ingest_from_path(
252 path: impl AsRef<Path>,
253 schema: &Schema,
254 options: &IngestionOptions,
255) -> IngestionResult<DataSet> {
256 let path = path.as_ref();
257 let fmt = match options.format {
258 Some(f) => f,
259 None => infer_format_from_path(path)?,
260 };
261
262 let ctx = IngestionContext {
263 path: path.to_path_buf(),
264 format: fmt,
265 };
266
267 let result = match fmt {
268 IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
269 IngestionFormat::Json => json::ingest_json_from_path(path, schema),
270 IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
271 IngestionFormat::Excel => {
272 ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
273 }
274 };
275
276 if let Some(obs) = options.observer.as_ref() {
277 match &result {
278 Ok(ds) => obs.on_success(
279 &ctx,
280 IngestionStats {
281 rows: ds.row_count(),
282 },
283 ),
284 Err(e) => {
285 let sev = severity_for_error(e);
286 obs.on_failure(&ctx, sev, e);
287 if sev >= options.alert_at_or_above {
288 obs.on_alert(&ctx, sev, e);
289 }
290 }
291 }
292 }
293
294 result
295}
296
297/// Infer a [`Schema`] for an input file.
298///
299/// This is intended for quick exploration and benchmarking when callers don't have a schema yet.
300/// It uses a **best-effort** mapping into `DataType::{Int64, Float64, Bool, Utf8}`.
301///
302/// Notes:
303/// - For JSON, nested fields are inferred only at the **top level** (no dot-path expansion).
304/// - For Excel, inference uses a scan-based heuristic.
305pub fn infer_schema_from_path(
306 path: impl AsRef<Path>,
307 options: &IngestionOptions,
308) -> IngestionResult<Schema> {
309 let path = path.as_ref();
310 let fmt = match options.format {
311 Some(f) => f,
312 None => infer_format_from_path(path)?,
313 };
314
315 match fmt {
316 IngestionFormat::Csv => {
317 let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
318 .with_has_header(true)
319 .finish()
320 .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
321 .collect()
322 .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
323 infer_schema_from_dataframe_lossy(&df)
324 }
325 IngestionFormat::Json => {
326 use std::fs::File;
327
328 let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
329 let json_format = if ext.eq_ignore_ascii_case("ndjson") {
330 JsonFormat::JsonLines
331 } else {
332 JsonFormat::Json
333 };
334
335 let file = File::open(path)?;
336 let df = JsonReader::new(file)
337 .with_json_format(json_format)
338 .finish()
339 .map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
340 infer_schema_from_dataframe_lossy(&df)
341 }
342 IngestionFormat::Parquet => {
343 let df = LazyFrame::scan_parquet(
344 path.to_string_lossy().as_ref().into(),
345 ScanArgsParquet::default(),
346 )
347 .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
348 .collect()
349 .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
350 infer_schema_from_dataframe_lossy(&df)
351 }
352 IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
353 }
354}
355
356/// Convenience wrapper: infer schema and then ingest.
357pub fn ingest_from_path_infer(
358 path: impl AsRef<Path>,
359 options: &IngestionOptions,
360) -> IngestionResult<DataSet> {
361 let schema = infer_schema_from_path(path.as_ref(), options)?;
362 ingest_from_path(path, &schema, options)
363}
364
365fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
366 match e {
367 IngestionError::Io(_) => IngestionSeverity::Critical,
368 IngestionError::Parquet(err) => {
369 // Best-effort: parquet errors often wrap IO, but not always in a structured way.
370 // If we can detect IO in the source chain, treat it as Critical.
371 if error_chain_contains_io(err) {
372 IngestionSeverity::Critical
373 } else {
374 IngestionSeverity::Error
375 }
376 }
377 IngestionError::Csv(err) => match err.kind() {
378 ::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
379 _ => IngestionSeverity::Error,
380 },
381 #[cfg(feature = "excel")]
382 IngestionError::Excel(_) => IngestionSeverity::Error,
383 IngestionError::Engine { source, .. } => {
384 if error_chain_contains_io(source.as_ref()) {
385 IngestionSeverity::Critical
386 } else {
387 IngestionSeverity::Error
388 }
389 }
390 IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
391 IngestionError::ParseError { .. } => IngestionSeverity::Error,
392 }
393}
394
395fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
396 let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
397 while let Some(err) = cur {
398 if err.is::<std::io::Error>() {
399 return true;
400 }
401 cur = err.source();
402 }
403 false
404}
405
406fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
407 let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
408 IngestionError::SchemaMismatch {
409 message: format!(
410 "cannot infer format: path has no extension ({})",
411 path.display()
412 ),
413 }
414 })?;
415
416 IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
417 message: format!(
418 "cannot infer format from extension '{ext}' for path ({})",
419 path.display()
420 ),
421 })
422}
423
424fn ingest_excel_dispatch(
425 path: &Path,
426 schema: &Schema,
427 sel: &ExcelSheetSelection,
428) -> IngestionResult<DataSet> {
429 match sel {
430 ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
431 ExcelSheetSelection::Sheet(name) => {
432 excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
433 }
434 ExcelSheetSelection::AllSheets => {
435 excel::ingest_excel_workbook_from_path(path, None, schema)
436 }
437 ExcelSheetSelection::Sheets(names) => {
438 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
439 excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
440 }
441 }
442}
443
444fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
445 match sel {
446 ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
447 ExcelSheetSelection::Sheet(name) => {
448 excel::infer_excel_schema_from_path(path, Some(name.as_str()))
449 }
450 ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
451 ExcelSheetSelection::Sheets(names) => {
452 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
453 excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
454 }
455 }
456}
457
458/// Convenience helper for callers that want an owned request object.
459///
460/// This can be useful if you want to enqueue ingestion work in a job system.
461#[derive(Clone)]
462pub struct IngestionRequest {
463 /// Path to the input file.
464 pub path: PathBuf,
465 /// Schema to validate/parse values into.
466 pub schema: Schema,
467 /// Options controlling ingestion.
468 pub options: IngestionOptions,
469}
470
471impl fmt::Debug for IngestionRequest {
472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473 f.debug_struct("IngestionRequest")
474 .field("path", &self.path)
475 .field("schema_fields", &self.schema.fields.len())
476 .field("options", &self.options)
477 .finish()
478 }
479}
480
481impl IngestionRequest {
482 /// Execute the request by calling [`ingest_from_path`].
483 pub fn run(&self) -> IngestionResult<DataSet> {
484 ingest_from_path(&self.path, &self.schema, &self.options)
485 }
486}