rust_data_processing/ingestion/
unified.rs

1//! Unified ingestion entrypoint.
2//!
3//! Most callers should use [`ingest_from_path`], which ingests a file into an in-memory
4//! [`crate::types::DataSet`] using a provided [`crate::types::Schema`].
5//!
6//! - If [`IngestionOptions::format`] is `None`, the ingestion format is inferred from the file
7//!   extension.
8//! - If an [`super::observability::IngestionObserver`] is provided, success/failure/alerts are
9//!   reported to it.
10
11use std::error::Error as StdError;
12use std::fmt;
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16use crate::error::{IngestionError, IngestionResult};
17use crate::types::{DataSet, Schema};
18
19use super::observability::{
20    IngestionContext, IngestionObserver, IngestionSeverity, IngestionStats,
21};
22use super::polars_bridge::{infer_schema_from_dataframe_lossy, polars_error_to_ingestion};
23use super::{csv, excel, json, parquet};
24use polars::prelude::*;
25
26/// Supported ingestion formats.
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28pub enum IngestionFormat {
29    /// Comma-separated values.
30    Csv,
31    /// JSON array-of-objects or NDJSON.
32    Json,
33    /// Apache Parquet.
34    Parquet,
35    /// Spreadsheet/workbook formats (feature-gated behind `excel`).
36    Excel,
37}
38
39impl IngestionFormat {
40    /// Parse an ingestion format from a file extension (case-insensitive).
41    pub fn from_extension(ext: &str) -> Option<Self> {
42        match ext.to_ascii_lowercase().as_str() {
43            "csv" => Some(Self::Csv),
44            "json" | "ndjson" => Some(Self::Json),
45            "parquet" | "pq" => Some(Self::Parquet),
46            "xlsx" | "xls" | "xlsm" | "xlsb" | "ods" => Some(Self::Excel),
47            _ => None,
48        }
49    }
50}
51
52/// How to choose sheet(s) when ingesting an Excel workbook.
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum ExcelSheetSelection {
55    /// Ingest the first sheet (default).
56    First,
57    /// Ingest a single named sheet.
58    Sheet(String),
59    /// Ingest all sheets and concatenate rows.
60    AllSheets,
61    /// Ingest only the listed sheets (in order) and concatenate rows.
62    Sheets(Vec<String>),
63}
64
65impl Default for ExcelSheetSelection {
66    fn default() -> Self {
67        Self::First
68    }
69}
70
71/// Options controlling unified ingestion behavior.
72///
73/// Use [`Default`] for common cases.
74#[derive(Clone)]
75pub struct IngestionOptions {
76    /// If `None`, auto-detect format from file extension.
77    pub format: Option<IngestionFormat>,
78    /// Excel-specific options.
79    pub excel_sheet_selection: ExcelSheetSelection,
80    /// Optional observer for logging/alerts.
81    pub observer: Option<Arc<dyn IngestionObserver>>,
82    /// Severity threshold at which `on_alert` is invoked.
83    pub alert_at_or_above: IngestionSeverity,
84}
85
86impl fmt::Debug for IngestionOptions {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("IngestionOptions")
89            .field("format", &self.format)
90            .field("excel_sheet_selection", &self.excel_sheet_selection)
91            .field("observer_set", &self.observer.is_some())
92            .field("alert_at_or_above", &self.alert_at_or_above)
93            .finish()
94    }
95}
96
97impl Default for IngestionOptions {
98    fn default() -> Self {
99        Self {
100            format: None,
101            excel_sheet_selection: ExcelSheetSelection::default(),
102            observer: None,
103            alert_at_or_above: IngestionSeverity::Critical,
104        }
105    }
106}
107
108/// Unified ingestion entry point for path-based sources.
109///
110/// - If `options.format` is `None`, format is inferred from the file extension.
111/// - Use `options.excel_sheet_selection` for Excel multi-tab behavior.
112///
113/// When an observer is configured, this function reports:
114///
115/// - `on_success` on success, with row count stats
116/// - `on_failure` on failure, with a computed severity
117/// - `on_alert` on failure when the computed severity is >= `options.alert_at_or_above`
118///
119/// # Examples
120///
121/// ## CSV (auto-detect by extension)
122///
123/// ```no_run
124/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
125/// use rust_data_processing::types::{DataType, Field, Schema};
126///
127/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
128/// let schema = Schema::new(vec![
129///     Field::new("id", DataType::Int64),
130///     Field::new("name", DataType::Utf8),
131/// ]);
132///
133/// // Uses `.csv` to select CSV ingestion.
134/// let ds = ingest_from_path("people.csv", &schema, &IngestionOptions::default())?;
135/// println!("rows={}", ds.row_count());
136/// # Ok(())
137/// # }
138/// ```
139///
140/// ## JSON (auto-detect by extension, with nested field paths)
141///
142/// ```no_run
143/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
144/// use rust_data_processing::types::{DataType, Field, Schema};
145///
146/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
147/// // JSON supports nested field access via dot paths.
148/// let schema = Schema::new(vec![
149///     Field::new("id", DataType::Int64),
150///     Field::new("user.name", DataType::Utf8),
151/// ]);
152///
153/// let ds = ingest_from_path("events.json", &schema, &IngestionOptions::default())?;
154/// println!("rows={}", ds.row_count());
155/// # Ok(())
156/// # }
157/// ```
158///
159/// ## Parquet (auto-detect by extension)
160///
161/// ```no_run
162/// use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
163/// use rust_data_processing::types::{DataType, Field, Schema};
164///
165/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
166/// let schema = Schema::new(vec![
167///     Field::new("id", DataType::Int64),
168///     Field::new("active", DataType::Bool),
169/// ]);
170///
171/// let ds = ingest_from_path("data.parquet", &schema, &IngestionOptions::default())?;
172/// println!("rows={}", ds.row_count());
173/// # Ok(())
174/// # }
175/// ```
176///
177/// ## Force a format explicitly (override extension inference)
178///
179/// ```no_run
180/// use rust_data_processing::ingestion::{ingest_from_path, IngestionFormat, IngestionOptions};
181/// use rust_data_processing::types::{DataType, Field, Schema};
182///
183/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
184/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
185///
186/// let opts = IngestionOptions {
187///     format: Some(IngestionFormat::Csv),
188///     ..Default::default()
189/// };
190///
191/// // Useful when a file has no extension or you want to override inference.
192/// let ds = ingest_from_path("input_without_extension", &schema, &opts)?;
193/// println!("rows={}", ds.row_count());
194/// # Ok(())
195/// # }
196/// ```
197///
198/// ## Observability (stderr logging + alert threshold)
199///
200/// ```no_run
201/// use std::sync::Arc;
202///
203/// use rust_data_processing::ingestion::{
204///     ingest_from_path, IngestionOptions, IngestionSeverity, StdErrObserver,
205/// };
206/// use rust_data_processing::types::{DataType, Field, Schema};
207///
208/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
209/// let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
210///
211/// let opts = IngestionOptions {
212///     observer: Some(Arc::new(StdErrObserver::default())),
213///     alert_at_or_above: IngestionSeverity::Critical,
214///     ..Default::default()
215/// };
216///
217/// // Missing files are treated as Critical and will trigger `on_alert` at this threshold.
218/// let _err = ingest_from_path("does_not_exist.csv", &schema, &opts).unwrap_err();
219/// # Ok(())
220/// # }
221/// ```
222///
223/// ## Excel
224///
225/// Example. Marked `no_run` so it is **compiled** by doctests
226/// (no "not tested" banner), but not executed (it expects a real `workbook.xlsx` file).
227///
228/// ```no_run
229/// use rust_data_processing::ingestion::{
230///     ingest_from_path, ExcelSheetSelection, IngestionFormat, IngestionOptions,
231/// };
232/// use rust_data_processing::types::{DataType, Field, Schema};
233///
234/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
235/// let schema = Schema::new(vec![
236///     Field::new("id", DataType::Int64),
237///     Field::new("name", DataType::Utf8),
238/// ]);
239///
240/// let opts = IngestionOptions {
241///     format: Some(IngestionFormat::Excel),
242///     excel_sheet_selection: ExcelSheetSelection::Sheet("Sheet1".to_string()),
243///     ..Default::default()
244/// };
245///
246/// let ds = ingest_from_path("workbook.xlsx", &schema, &opts)?;
247/// println!("rows={}", ds.row_count());
248/// # Ok(())
249/// # }
250/// ```
251pub fn ingest_from_path(
252    path: impl AsRef<Path>,
253    schema: &Schema,
254    options: &IngestionOptions,
255) -> IngestionResult<DataSet> {
256    let path = path.as_ref();
257    let fmt = match options.format {
258        Some(f) => f,
259        None => infer_format_from_path(path)?,
260    };
261
262    let ctx = IngestionContext {
263        path: path.to_path_buf(),
264        format: fmt,
265    };
266
267    let result = match fmt {
268        IngestionFormat::Csv => csv::ingest_csv_from_path(path, schema),
269        IngestionFormat::Json => json::ingest_json_from_path(path, schema),
270        IngestionFormat::Parquet => parquet::ingest_parquet_from_path(path, schema),
271        IngestionFormat::Excel => {
272            ingest_excel_dispatch(path, schema, &options.excel_sheet_selection)
273        }
274    };
275
276    if let Some(obs) = options.observer.as_ref() {
277        match &result {
278            Ok(ds) => obs.on_success(
279                &ctx,
280                IngestionStats {
281                    rows: ds.row_count(),
282                },
283            ),
284            Err(e) => {
285                let sev = severity_for_error(e);
286                obs.on_failure(&ctx, sev, e);
287                if sev >= options.alert_at_or_above {
288                    obs.on_alert(&ctx, sev, e);
289                }
290            }
291        }
292    }
293
294    result
295}
296
297/// Infer a [`Schema`] for an input file.
298///
299/// This is intended for quick exploration and benchmarking when callers don't have a schema yet.
300/// It uses a **best-effort** mapping into `DataType::{Int64, Float64, Bool, Utf8}`.
301///
302/// Notes:
303/// - For JSON, nested fields are inferred only at the **top level** (no dot-path expansion).
304/// - For Excel, inference uses a scan-based heuristic.
305pub fn infer_schema_from_path(
306    path: impl AsRef<Path>,
307    options: &IngestionOptions,
308) -> IngestionResult<Schema> {
309    let path = path.as_ref();
310    let fmt = match options.format {
311        Some(f) => f,
312        None => infer_format_from_path(path)?,
313    };
314
315    match fmt {
316        IngestionFormat::Csv => {
317            let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
318                .with_has_header(true)
319                .finish()
320                .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
321                .collect()
322                .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
323            infer_schema_from_dataframe_lossy(&df)
324        }
325        IngestionFormat::Json => {
326            use std::fs::File;
327
328            let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
329            let json_format = if ext.eq_ignore_ascii_case("ndjson") {
330                JsonFormat::JsonLines
331            } else {
332                JsonFormat::Json
333            };
334
335            let file = File::open(path)?;
336            let df = JsonReader::new(file)
337                .with_json_format(json_format)
338                .finish()
339                .map_err(|e| polars_error_to_ingestion("failed to read json with polars", e))?;
340            infer_schema_from_dataframe_lossy(&df)
341        }
342        IngestionFormat::Parquet => {
343            let df = LazyFrame::scan_parquet(
344                path.to_string_lossy().as_ref().into(),
345                ScanArgsParquet::default(),
346            )
347            .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
348            .collect()
349            .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
350            infer_schema_from_dataframe_lossy(&df)
351        }
352        IngestionFormat::Excel => infer_excel_schema_dispatch(path, &options.excel_sheet_selection),
353    }
354}
355
356/// Convenience wrapper: infer schema and then ingest.
357pub fn ingest_from_path_infer(
358    path: impl AsRef<Path>,
359    options: &IngestionOptions,
360) -> IngestionResult<DataSet> {
361    let schema = infer_schema_from_path(path.as_ref(), options)?;
362    ingest_from_path(path, &schema, options)
363}
364
365fn severity_for_error(e: &IngestionError) -> IngestionSeverity {
366    match e {
367        IngestionError::Io(_) => IngestionSeverity::Critical,
368        IngestionError::Parquet(err) => {
369            // Best-effort: parquet errors often wrap IO, but not always in a structured way.
370            // If we can detect IO in the source chain, treat it as Critical.
371            if error_chain_contains_io(err) {
372                IngestionSeverity::Critical
373            } else {
374                IngestionSeverity::Error
375            }
376        }
377        IngestionError::Csv(err) => match err.kind() {
378            ::csv::ErrorKind::Io(_) => IngestionSeverity::Critical,
379            _ => IngestionSeverity::Error,
380        },
381        #[cfg(feature = "excel")]
382        IngestionError::Excel(_) => IngestionSeverity::Error,
383        IngestionError::Engine { source, .. } => {
384            if error_chain_contains_io(source.as_ref()) {
385                IngestionSeverity::Critical
386            } else {
387                IngestionSeverity::Error
388            }
389        }
390        IngestionError::SchemaMismatch { .. } => IngestionSeverity::Error,
391        IngestionError::ParseError { .. } => IngestionSeverity::Error,
392    }
393}
394
395fn error_chain_contains_io(e: &(dyn StdError + 'static)) -> bool {
396    let mut cur: Option<&(dyn StdError + 'static)> = Some(e);
397    while let Some(err) = cur {
398        if err.is::<std::io::Error>() {
399            return true;
400        }
401        cur = err.source();
402    }
403    false
404}
405
406fn infer_format_from_path(path: &Path) -> IngestionResult<IngestionFormat> {
407    let ext = path.extension().and_then(|s| s.to_str()).ok_or_else(|| {
408        IngestionError::SchemaMismatch {
409            message: format!(
410                "cannot infer format: path has no extension ({})",
411                path.display()
412            ),
413        }
414    })?;
415
416    IngestionFormat::from_extension(ext).ok_or_else(|| IngestionError::SchemaMismatch {
417        message: format!(
418            "cannot infer format from extension '{ext}' for path ({})",
419            path.display()
420        ),
421    })
422}
423
424fn ingest_excel_dispatch(
425    path: &Path,
426    schema: &Schema,
427    sel: &ExcelSheetSelection,
428) -> IngestionResult<DataSet> {
429    match sel {
430        ExcelSheetSelection::First => excel::ingest_excel_from_path(path, None, schema),
431        ExcelSheetSelection::Sheet(name) => {
432            excel::ingest_excel_from_path(path, Some(name.as_str()), schema)
433        }
434        ExcelSheetSelection::AllSheets => {
435            excel::ingest_excel_workbook_from_path(path, None, schema)
436        }
437        ExcelSheetSelection::Sheets(names) => {
438            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
439            excel::ingest_excel_workbook_from_path(path, Some(refs.as_slice()), schema)
440        }
441    }
442}
443
444fn infer_excel_schema_dispatch(path: &Path, sel: &ExcelSheetSelection) -> IngestionResult<Schema> {
445    match sel {
446        ExcelSheetSelection::First => excel::infer_excel_schema_from_path(path, None),
447        ExcelSheetSelection::Sheet(name) => {
448            excel::infer_excel_schema_from_path(path, Some(name.as_str()))
449        }
450        ExcelSheetSelection::AllSheets => excel::infer_excel_schema_workbook_from_path(path, None),
451        ExcelSheetSelection::Sheets(names) => {
452            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
453            excel::infer_excel_schema_workbook_from_path(path, Some(refs.as_slice()))
454        }
455    }
456}
457
458/// Convenience helper for callers that want an owned request object.
459///
460/// This can be useful if you want to enqueue ingestion work in a job system.
461#[derive(Clone)]
462pub struct IngestionRequest {
463    /// Path to the input file.
464    pub path: PathBuf,
465    /// Schema to validate/parse values into.
466    pub schema: Schema,
467    /// Options controlling ingestion.
468    pub options: IngestionOptions,
469}
470
471impl fmt::Debug for IngestionRequest {
472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473        f.debug_struct("IngestionRequest")
474            .field("path", &self.path)
475            .field("schema_fields", &self.schema.fields.len())
476            .field("options", &self.options)
477            .finish()
478    }
479}
480
481impl IngestionRequest {
482    /// Execute the request by calling [`ingest_from_path`].
483    pub fn run(&self) -> IngestionResult<DataSet> {
484        ingest_from_path(&self.path, &self.schema, &self.options)
485    }
486}