ingest_from_path

Function ingest_from_path 

Source
pub fn ingest_from_path(
    path: impl AsRef<Path>,
    schema: &Schema,
    options: &IngestionOptions,
) -> IngestionResult<DataSet>
Expand description

Unified ingestion entry point for path-based sources.

  • If options.format is None, format is inferred from the file extension.
  • Use options.excel_sheet_selection for Excel multi-tab behavior.

When an observer is configured, this function reports:

  • on_success on success, with row count stats
  • on_failure on failure, with a computed severity
  • on_alert on failure when the computed severity is >= options.alert_at_or_above

§Examples

§CSV (auto-detect by extension)

use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
use rust_data_processing::types::{DataType, Field, Schema};

let schema = Schema::new(vec![
    Field::new("id", DataType::Int64),
    Field::new("name", DataType::Utf8),
]);

// Uses `.csv` to select CSV ingestion.
let ds = ingest_from_path("people.csv", &schema, &IngestionOptions::default())?;
println!("rows={}", ds.row_count());

§JSON (auto-detect by extension, with nested field paths)

use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
use rust_data_processing::types::{DataType, Field, Schema};

// JSON supports nested field access via dot paths.
let schema = Schema::new(vec![
    Field::new("id", DataType::Int64),
    Field::new("user.name", DataType::Utf8),
]);

let ds = ingest_from_path("events.json", &schema, &IngestionOptions::default())?;
println!("rows={}", ds.row_count());

§Parquet (auto-detect by extension)

use rust_data_processing::ingestion::{ingest_from_path, IngestionOptions};
use rust_data_processing::types::{DataType, Field, Schema};

let schema = Schema::new(vec![
    Field::new("id", DataType::Int64),
    Field::new("active", DataType::Bool),
]);

let ds = ingest_from_path("data.parquet", &schema, &IngestionOptions::default())?;
println!("rows={}", ds.row_count());

§Force a format explicitly (override extension inference)

use rust_data_processing::ingestion::{ingest_from_path, IngestionFormat, IngestionOptions};
use rust_data_processing::types::{DataType, Field, Schema};

let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);

let opts = IngestionOptions {
    format: Some(IngestionFormat::Csv),
    ..Default::default()
};

// Useful when a file has no extension or you want to override inference.
let ds = ingest_from_path("input_without_extension", &schema, &opts)?;
println!("rows={}", ds.row_count());

§Observability (stderr logging + alert threshold)

use std::sync::Arc;

use rust_data_processing::ingestion::{
    ingest_from_path, IngestionOptions, IngestionSeverity, StdErrObserver,
};
use rust_data_processing::types::{DataType, Field, Schema};

let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);

let opts = IngestionOptions {
    observer: Some(Arc::new(StdErrObserver::default())),
    alert_at_or_above: IngestionSeverity::Critical,
    ..Default::default()
};

// Missing files are treated as Critical and will trigger `on_alert` at this threshold.
let _err = ingest_from_path("does_not_exist.csv", &schema, &opts).unwrap_err();

§Excel

Example. Marked no_run so it is compiled by doctests (no “not tested” banner), but not executed (it expects a real workbook.xlsx file).

use rust_data_processing::ingestion::{
    ingest_from_path, ExcelSheetSelection, IngestionFormat, IngestionOptions,
};
use rust_data_processing::types::{DataType, Field, Schema};

let schema = Schema::new(vec![
    Field::new("id", DataType::Int64),
    Field::new("name", DataType::Utf8),
]);

let opts = IngestionOptions {
    format: Some(IngestionFormat::Excel),
    excel_sheet_selection: ExcelSheetSelection::Sheet("Sheet1".to_string()),
    ..Default::default()
};

let ds = ingest_from_path("workbook.xlsx", &schema, &opts)?;
println!("rows={}", ds.row_count());