rust_data_processing/ingestion/
json.rs1use std::fs::File;
10use std::path::Path;
11
12use crate::error::{IngestionError, IngestionResult};
13use crate::types::{DataSet, DataType, Schema, Value};
14
15use polars::prelude::*;
16
17use super::polars_bridge::{dataframe_to_dataset, polars_error_to_ingestion};
18
19pub fn ingest_json_from_path(path: impl AsRef<Path>, schema: &Schema) -> IngestionResult<DataSet> {
21 let path = path.as_ref();
22 let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
23
24 let json_format = if ext.eq_ignore_ascii_case("ndjson") {
25 JsonFormat::JsonLines
26 } else {
27 JsonFormat::Json
28 };
29
30 let file = File::open(path)?;
31 let df = JsonReader::new(file)
32 .with_json_format(json_format)
33 .finish()
34 .map_err(|e| json_polars_error("failed to read json with polars", e))?;
35
36 for field in &schema.fields {
39 if !field.name.contains('.') && df.column(&field.name).is_err() {
40 return Err(IngestionError::SchemaMismatch {
41 message: format!("missing required field '{}'", field.name),
42 });
43 }
44 }
45
46 let mut lf = df.lazy();
47
48 let mut derived: Vec<Expr> = Vec::new();
50 for field in &schema.fields {
51 if field.name.contains('.') {
52 derived.push(expr_for_dot_path(&field.name));
53 }
54 }
55 if !derived.is_empty() {
56 lf = lf.with_columns(derived);
57 }
58
59 let projection: Vec<Expr> = schema.fields.iter().map(|f| col(&f.name)).collect();
61 let projected = lf
62 .select(projection)
63 .collect()
64 .map_err(|e| json_polars_error("failed to project json fields", e))?;
65
66 dataframe_to_dataset(&projected, schema, "field", 1)
67}
68
69fn json_polars_error(action: &str, err: PolarsError) -> IngestionError {
70 match err {
71 PolarsError::ColumnNotFound(name) => IngestionError::SchemaMismatch {
72 message: format!("missing required field '{name}'"),
73 },
74 other => polars_error_to_ingestion(action, other),
75 }
76}
77
78fn expr_for_dot_path(path: &str) -> Expr {
79 let mut iter = path.split('.');
80 let root = iter.next().unwrap_or(path);
81 let mut expr = col(root);
82 for seg in iter {
83 expr = expr.struct_().field_by_name(seg);
84 }
85 expr.alias(path)
86}
87
88pub fn ingest_json_from_str(input: &str, schema: &Schema) -> IngestionResult<DataSet> {
90 let trimmed = input.trim();
91 if trimmed.is_empty() {
92 return Err(IngestionError::SchemaMismatch {
93 message: "json input is empty".to_string(),
94 });
95 }
96
97 if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
99 match v {
100 serde_json::Value::Array(items) => ingest_json_values(&items, schema),
101 serde_json::Value::Object(_) => ingest_json_values(&vec![v], schema),
102 _ => Err(IngestionError::SchemaMismatch {
103 message: "json must be an object, an array of objects, or NDJSON".to_string(),
104 }),
105 }
106 } else {
107 let mut values = Vec::new();
109 for (i, line) in trimmed.lines().enumerate() {
110 let line = line.trim();
111 if line.is_empty() {
112 continue;
113 }
114 let v = serde_json::from_str::<serde_json::Value>(line).map_err(|e| {
115 IngestionError::SchemaMismatch {
116 message: format!("invalid ndjson at line {}: {}", i + 1, e),
117 }
118 })?;
119 values.push(v);
120 }
121 ingest_json_values(&values, schema)
122 }
123}
124
125fn ingest_json_values(values: &[serde_json::Value], schema: &Schema) -> IngestionResult<DataSet> {
126 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(values.len());
127
128 for (idx0, v) in values.iter().enumerate() {
129 let row_num = idx0 + 1;
130 let obj = v
131 .as_object()
132 .ok_or_else(|| IngestionError::SchemaMismatch {
133 message: format!("row {row_num} is not a json object"),
134 })?;
135
136 let mut row: Vec<Value> = Vec::with_capacity(schema.fields.len());
137 for field in &schema.fields {
138 let jv = get_by_dot_path(obj, &field.name).ok_or_else(|| {
139 IngestionError::SchemaMismatch {
140 message: format!("row {row_num} missing required field '{}'", field.name),
141 }
142 })?;
143 row.push(convert_json_value(
144 row_num,
145 &field.name,
146 &field.data_type,
147 jv,
148 )?);
149 }
150 rows.push(row);
151 }
152
153 Ok(DataSet::new(schema.clone(), rows))
154}
155
156fn get_by_dot_path<'a>(
157 root: &'a serde_json::Map<String, serde_json::Value>,
158 path: &str,
159) -> Option<&'a serde_json::Value> {
160 let mut current: &serde_json::Value = root.get(path.split('.').next().unwrap_or(path))?;
161
162 if !path.contains('.') {
164 return Some(current);
165 }
166
167 for segment in path.split('.').skip(1) {
168 match current {
169 serde_json::Value::Object(map) => current = map.get(segment)?,
170 _ => return None,
171 }
172 }
173 Some(current)
174}
175
176fn convert_json_value(
177 row: usize,
178 column: &str,
179 data_type: &DataType,
180 v: &serde_json::Value,
181) -> IngestionResult<Value> {
182 if v.is_null() {
183 return Ok(Value::Null);
184 }
185
186 match data_type {
187 DataType::Utf8 => v
188 .as_str()
189 .map(|s| Value::Utf8(s.to_string()))
190 .ok_or_else(|| IngestionError::ParseError {
191 row,
192 column: column.to_string(),
193 raw: v.to_string(),
194 message: "expected string".to_string(),
195 }),
196 DataType::Bool => v
197 .as_bool()
198 .map(Value::Bool)
199 .ok_or_else(|| IngestionError::ParseError {
200 row,
201 column: column.to_string(),
202 raw: v.to_string(),
203 message: "expected bool".to_string(),
204 }),
205 DataType::Int64 => {
206 if let Some(n) = v.as_i64() {
207 Ok(Value::Int64(n))
208 } else if let Some(n) = v.as_u64() {
209 i64::try_from(n)
210 .map(Value::Int64)
211 .map_err(|_| IngestionError::ParseError {
212 row,
213 column: column.to_string(),
214 raw: v.to_string(),
215 message: "u64 out of range for i64".to_string(),
216 })
217 } else {
218 Err(IngestionError::ParseError {
219 row,
220 column: column.to_string(),
221 raw: v.to_string(),
222 message: "expected integer number".to_string(),
223 })
224 }
225 }
226 DataType::Float64 => {
227 v.as_f64()
228 .map(Value::Float64)
229 .ok_or_else(|| IngestionError::ParseError {
230 row,
231 column: column.to_string(),
232 raw: v.to_string(),
233 message: "expected number".to_string(),
234 })
235 }
236 }
237}