rust_data_processing/ingestion/
parquet.rs

1//! Parquet ingestion implementation.
2
3use std::path::Path;
4
5use crate::error::{IngestionError, IngestionResult};
6use crate::types::{DataSet, DataType, Schema};
7
8use polars::prelude::*;
9
10use super::polars_bridge::{dataframe_to_dataset, polars_error_to_ingestion};
11
12/// Ingest a Parquet file into an in-memory `DataSet`.
13///
14/// Notes:
15/// - Validates that all schema fields exist as columns
16/// - Delegates Parquet decoding to Polars, then converts into `DataSet`
17pub fn ingest_parquet_from_path(
18    path: impl AsRef<Path>,
19    schema: &Schema,
20) -> IngestionResult<DataSet> {
21    let path = path.as_ref();
22
23    let df = LazyFrame::scan_parquet(
24        path.to_string_lossy().as_ref().into(),
25        ScanArgsParquet::default(),
26    )
27    .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
28    .collect()
29    .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
30
31    // Parquet: keep "type mismatch" strictness. If the physical/logical Parquet column type is
32    // incompatible with the requested schema type (e.g. UTF8 string column for an Int64 field),
33    // we surface this as a ParseError (tests rely on this behavior).
34    validate_parquet_column_types(&df, schema)?;
35
36    dataframe_to_dataset(&df, schema, "column", 1)
37}
38
39fn validate_parquet_column_types(df: &DataFrame, schema: &Schema) -> IngestionResult<()> {
40    for field in &schema.fields {
41        let s = df
42            .column(&field.name)
43            .map_err(|_| IngestionError::SchemaMismatch {
44                message: format!("missing required column '{}'", field.name),
45            })?
46            .as_materialized_series();
47
48        if !dtype_compatible_with_schema(&field.data_type, s.dtype()) {
49            return Err(IngestionError::ParseError {
50                row: 1,
51                column: field.name.clone(),
52                raw: s.dtype().to_string(),
53                message: "parquet column type mismatch".to_string(),
54            });
55        }
56    }
57    Ok(())
58}
59
60fn dtype_compatible_with_schema(
61    schema_dtype: &DataType,
62    polars_dtype: &polars::datatypes::DataType,
63) -> bool {
64    use polars::datatypes::DataType as P;
65
66    match schema_dtype {
67        DataType::Utf8 => matches!(polars_dtype, P::String),
68        DataType::Bool => matches!(polars_dtype, P::Boolean),
69        DataType::Int64 => matches!(
70            polars_dtype,
71            P::Int8 | P::Int16 | P::Int32 | P::Int64 | P::UInt8 | P::UInt16 | P::UInt32 | P::UInt64
72        ),
73        DataType::Float64 => matches!(
74            polars_dtype,
75            P::Float32
76                | P::Float64
77                | P::Int8
78                | P::Int16
79                | P::Int32
80                | P::Int64
81                | P::UInt8
82                | P::UInt16
83                | P::UInt32
84                | P::UInt64
85        ),
86    }
87}