rust_data_processing/ingestion/
csv.rs

1//! CSV ingestion implementation.
2
3use std::path::Path;
4
5use crate::error::{IngestionError, IngestionResult};
6use crate::types::{DataSet, DataType, Schema, Value};
7
8use polars::prelude::*;
9
10use super::polars_bridge::{dataframe_to_dataset, polars_error_to_ingestion};
11
12/// Ingest a CSV file into an in-memory [`DataSet`].
13///
14/// Rules:
15///
16/// - CSV must have headers.
17/// - Headers must contain all schema fields (order can differ).
18/// - Each value is parsed according to the schema field type.
19pub fn ingest_csv_from_path(path: impl AsRef<Path>, schema: &Schema) -> IngestionResult<DataSet> {
20    let path = path.as_ref();
21
22    // Phase 1 delegation: use Polars' CSV reader for robust parsing of CSV mechanics
23    // (quoting, escaping, delimiter handling, etc.), then convert into our `DataSet`.
24    let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
25        .with_has_header(true)
26        .finish()
27        .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
28        .collect()
29        .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
30
31    dataframe_to_dataset(&df, schema, "column", 2)
32}
33
34/// Ingest CSV data from an existing CSV reader.
35pub fn ingest_csv_from_reader<R: std::io::Read>(
36    rdr: &mut csv::Reader<R>,
37    schema: &Schema,
38) -> IngestionResult<DataSet> {
39    let headers = rdr.headers()?.clone();
40
41    // Map schema fields -> CSV column indexes (allows re-ordered CSV columns).
42    let mut col_idxs = Vec::with_capacity(schema.fields.len());
43    for field in &schema.fields {
44        match headers.iter().position(|h| h == field.name) {
45            Some(idx) => col_idxs.push(idx),
46            None => {
47                return Err(IngestionError::SchemaMismatch {
48                    message: format!(
49                        "missing required column '{field}'. headers={:?}",
50                        headers.iter().collect::<Vec<_>>(),
51                        field = field.name
52                    ),
53                });
54            }
55        }
56    }
57
58    let mut rows: Vec<Vec<Value>> = Vec::new();
59    for (row_idx0, result) in rdr.records().enumerate() {
60        // Report 1-based row number for users; +1 again because header is row 1.
61        let user_row = row_idx0 + 2;
62        let record = result?;
63
64        let mut row: Vec<Value> = Vec::with_capacity(schema.fields.len());
65        for (field, &csv_idx) in schema.fields.iter().zip(col_idxs.iter()) {
66            let raw = record.get(csv_idx).unwrap_or("");
67            row.push(parse_typed_value(
68                user_row,
69                &field.name,
70                &field.data_type,
71                raw,
72            )?);
73        }
74        rows.push(row);
75    }
76
77    Ok(DataSet::new(schema.clone(), rows))
78}
79
80fn parse_typed_value(
81    row: usize,
82    column: &str,
83    data_type: &DataType,
84    raw: &str,
85) -> IngestionResult<Value> {
86    let trimmed = raw.trim();
87    if trimmed.is_empty() {
88        return Ok(Value::Null);
89    }
90
91    match data_type {
92        DataType::Utf8 => Ok(Value::Utf8(trimmed.to_owned())),
93        DataType::Int64 => {
94            trimmed
95                .parse::<i64>()
96                .map(Value::Int64)
97                .map_err(|e| IngestionError::ParseError {
98                    row,
99                    column: column.to_owned(),
100                    raw: raw.to_owned(),
101                    message: e.to_string(),
102                })
103        }
104        DataType::Float64 => {
105            trimmed
106                .parse::<f64>()
107                .map(Value::Float64)
108                .map_err(|e| IngestionError::ParseError {
109                    row,
110                    column: column.to_owned(),
111                    raw: raw.to_owned(),
112                    message: e.to_string(),
113                })
114        }
115        DataType::Bool => {
116            parse_bool(trimmed)
117                .map(Value::Bool)
118                .map_err(|message| IngestionError::ParseError {
119                    row,
120                    column: column.to_owned(),
121                    raw: raw.to_owned(),
122                    message,
123                })
124        }
125    }
126}
127
128fn parse_bool(s: &str) -> Result<bool, String> {
129    match s.to_ascii_lowercase().as_str() {
130        "true" | "t" | "1" | "yes" | "y" => Ok(true),
131        "false" | "f" | "0" | "no" | "n" => Ok(false),
132        _ => Err("expected bool (true/false/1/0/yes/no)".to_string()),
133    }
134}