rust_data_processing/ingestion/
parquet.rs1use std::path::Path;
4
5use crate::error::{IngestionError, IngestionResult};
6use crate::types::{DataSet, DataType, Schema};
7
8use polars::prelude::*;
9
10use super::polars_bridge::{dataframe_to_dataset, polars_error_to_ingestion};
11
12pub fn ingest_parquet_from_path(
18 path: impl AsRef<Path>,
19 schema: &Schema,
20) -> IngestionResult<DataSet> {
21 let path = path.as_ref();
22
23 let df = LazyFrame::scan_parquet(
24 path.to_string_lossy().as_ref().into(),
25 ScanArgsParquet::default(),
26 )
27 .map_err(|e| polars_error_to_ingestion("failed to read parquet with polars", e))?
28 .collect()
29 .map_err(|e| polars_error_to_ingestion("failed to collect parquet with polars", e))?;
30
31 validate_parquet_column_types(&df, schema)?;
35
36 dataframe_to_dataset(&df, schema, "column", 1)
37}
38
39fn validate_parquet_column_types(df: &DataFrame, schema: &Schema) -> IngestionResult<()> {
40 for field in &schema.fields {
41 let s = df
42 .column(&field.name)
43 .map_err(|_| IngestionError::SchemaMismatch {
44 message: format!("missing required column '{}'", field.name),
45 })?
46 .as_materialized_series();
47
48 if !dtype_compatible_with_schema(&field.data_type, s.dtype()) {
49 return Err(IngestionError::ParseError {
50 row: 1,
51 column: field.name.clone(),
52 raw: s.dtype().to_string(),
53 message: "parquet column type mismatch".to_string(),
54 });
55 }
56 }
57 Ok(())
58}
59
60fn dtype_compatible_with_schema(
61 schema_dtype: &DataType,
62 polars_dtype: &polars::datatypes::DataType,
63) -> bool {
64 use polars::datatypes::DataType as P;
65
66 match schema_dtype {
67 DataType::Utf8 => matches!(polars_dtype, P::String),
68 DataType::Bool => matches!(polars_dtype, P::Boolean),
69 DataType::Int64 => matches!(
70 polars_dtype,
71 P::Int8 | P::Int16 | P::Int32 | P::Int64 | P::UInt8 | P::UInt16 | P::UInt32 | P::UInt64
72 ),
73 DataType::Float64 => matches!(
74 polars_dtype,
75 P::Float32
76 | P::Float64
77 | P::Int8
78 | P::Int16
79 | P::Int32
80 | P::Int64
81 | P::UInt8
82 | P::UInt16
83 | P::UInt32
84 | P::UInt64
85 ),
86 }
87}