rust_data_processing/ingestion/
csv.rs1use std::path::Path;
4
5use crate::error::{IngestionError, IngestionResult};
6use crate::types::{DataSet, DataType, Schema, Value};
7
8use polars::prelude::*;
9
10use super::polars_bridge::{dataframe_to_dataset, polars_error_to_ingestion};
11
12pub fn ingest_csv_from_path(path: impl AsRef<Path>, schema: &Schema) -> IngestionResult<DataSet> {
20 let path = path.as_ref();
21
22 let df = LazyCsvReader::new(path.to_string_lossy().as_ref().into())
25 .with_has_header(true)
26 .finish()
27 .map_err(|e| polars_error_to_ingestion("failed to read csv with polars", e))?
28 .collect()
29 .map_err(|e| polars_error_to_ingestion("failed to collect csv with polars", e))?;
30
31 dataframe_to_dataset(&df, schema, "column", 2)
32}
33
34pub fn ingest_csv_from_reader<R: std::io::Read>(
36 rdr: &mut csv::Reader<R>,
37 schema: &Schema,
38) -> IngestionResult<DataSet> {
39 let headers = rdr.headers()?.clone();
40
41 let mut col_idxs = Vec::with_capacity(schema.fields.len());
43 for field in &schema.fields {
44 match headers.iter().position(|h| h == field.name) {
45 Some(idx) => col_idxs.push(idx),
46 None => {
47 return Err(IngestionError::SchemaMismatch {
48 message: format!(
49 "missing required column '{field}'. headers={:?}",
50 headers.iter().collect::<Vec<_>>(),
51 field = field.name
52 ),
53 });
54 }
55 }
56 }
57
58 let mut rows: Vec<Vec<Value>> = Vec::new();
59 for (row_idx0, result) in rdr.records().enumerate() {
60 let user_row = row_idx0 + 2;
62 let record = result?;
63
64 let mut row: Vec<Value> = Vec::with_capacity(schema.fields.len());
65 for (field, &csv_idx) in schema.fields.iter().zip(col_idxs.iter()) {
66 let raw = record.get(csv_idx).unwrap_or("");
67 row.push(parse_typed_value(
68 user_row,
69 &field.name,
70 &field.data_type,
71 raw,
72 )?);
73 }
74 rows.push(row);
75 }
76
77 Ok(DataSet::new(schema.clone(), rows))
78}
79
80fn parse_typed_value(
81 row: usize,
82 column: &str,
83 data_type: &DataType,
84 raw: &str,
85) -> IngestionResult<Value> {
86 let trimmed = raw.trim();
87 if trimmed.is_empty() {
88 return Ok(Value::Null);
89 }
90
91 match data_type {
92 DataType::Utf8 => Ok(Value::Utf8(trimmed.to_owned())),
93 DataType::Int64 => {
94 trimmed
95 .parse::<i64>()
96 .map(Value::Int64)
97 .map_err(|e| IngestionError::ParseError {
98 row,
99 column: column.to_owned(),
100 raw: raw.to_owned(),
101 message: e.to_string(),
102 })
103 }
104 DataType::Float64 => {
105 trimmed
106 .parse::<f64>()
107 .map(Value::Float64)
108 .map_err(|e| IngestionError::ParseError {
109 row,
110 column: column.to_owned(),
111 raw: raw.to_owned(),
112 message: e.to_string(),
113 })
114 }
115 DataType::Bool => {
116 parse_bool(trimmed)
117 .map(Value::Bool)
118 .map_err(|message| IngestionError::ParseError {
119 row,
120 column: column.to_owned(),
121 raw: raw.to_owned(),
122 message,
123 })
124 }
125 }
126}
127
128fn parse_bool(s: &str) -> Result<bool, String> {
129 match s.to_ascii_lowercase().as_str() {
130 "true" | "t" | "1" | "yes" | "y" => Ok(true),
131 "false" | "f" | "0" | "no" | "n" => Ok(false),
132 _ => Err("expected bool (true/false/1/0/yes/no)".to_string()),
133 }
134}