rust_data_processing/ingestion/
json.rs

1//! JSON ingestion implementation.
2//!
3//! Supported inputs:
4//! - A JSON array of objects: `[{"a":1}, {"a":2}]`
5//! - Newline-delimited JSON (NDJSON): `{"a":1}\n{"a":2}\n`
6//!
7//! Nested fields are supported using dot paths in schema field names (e.g. `user.name`).
8
9use std::fs::File;
10use std::path::Path;
11
12use crate::error::{IngestionError, IngestionResult};
13use crate::types::{DataSet, DataType, Schema, Value};
14
15use polars::prelude::*;
16
17use super::polars_bridge::{dataframe_to_dataset, polars_error_to_ingestion};
18
19/// Ingest JSON into an in-memory `DataSet`.
20pub fn ingest_json_from_path(path: impl AsRef<Path>, schema: &Schema) -> IngestionResult<DataSet> {
21    let path = path.as_ref();
22    let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
23
24    let json_format = if ext.eq_ignore_ascii_case("ndjson") {
25        JsonFormat::JsonLines
26    } else {
27        JsonFormat::Json
28    };
29
30    let file = File::open(path)?;
31    let df = JsonReader::new(file)
32        .with_json_format(json_format)
33        .finish()
34        .map_err(|e| json_polars_error("failed to read json with polars", e))?;
35
36    // Ensure required top-level fields exist before we build any lazy projections.
37    // (Dotted fields like `user.name` are handled via derived columns below.)
38    for field in &schema.fields {
39        if !field.name.contains('.') && df.column(&field.name).is_err() {
40            return Err(IngestionError::SchemaMismatch {
41                message: format!("missing required field '{}'", field.name),
42            });
43        }
44    }
45
46    let mut lf = df.lazy();
47
48    // Add derived columns for dotted schema field paths (e.g. `user.name`).
49    let mut derived: Vec<Expr> = Vec::new();
50    for field in &schema.fields {
51        if field.name.contains('.') {
52            derived.push(expr_for_dot_path(&field.name));
53        }
54    }
55    if !derived.is_empty() {
56        lf = lf.with_columns(derived);
57    }
58
59    // Select only the schema columns in schema order, then convert to DataSet.
60    let projection: Vec<Expr> = schema.fields.iter().map(|f| col(&f.name)).collect();
61    let projected = lf
62        .select(projection)
63        .collect()
64        .map_err(|e| json_polars_error("failed to project json fields", e))?;
65
66    dataframe_to_dataset(&projected, schema, "field", 1)
67}
68
69fn json_polars_error(action: &str, err: PolarsError) -> IngestionError {
70    match err {
71        PolarsError::ColumnNotFound(name) => IngestionError::SchemaMismatch {
72            message: format!("missing required field '{name}'"),
73        },
74        other => polars_error_to_ingestion(action, other),
75    }
76}
77
78fn expr_for_dot_path(path: &str) -> Expr {
79    let mut iter = path.split('.');
80    let root = iter.next().unwrap_or(path);
81    let mut expr = col(root);
82    for seg in iter {
83        expr = expr.struct_().field_by_name(seg);
84    }
85    expr.alias(path)
86}
87
88/// Ingest JSON from an in-memory string into a [`DataSet`].
89pub fn ingest_json_from_str(input: &str, schema: &Schema) -> IngestionResult<DataSet> {
90    let trimmed = input.trim();
91    if trimmed.is_empty() {
92        return Err(IngestionError::SchemaMismatch {
93            message: "json input is empty".to_string(),
94        });
95    }
96
97    // First try parsing as a single JSON value (array or object).
98    if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
99        match v {
100            serde_json::Value::Array(items) => ingest_json_values(&items, schema),
101            serde_json::Value::Object(_) => ingest_json_values(&vec![v], schema),
102            _ => Err(IngestionError::SchemaMismatch {
103                message: "json must be an object, an array of objects, or NDJSON".to_string(),
104            }),
105        }
106    } else {
107        // Fall back to NDJSON.
108        let mut values = Vec::new();
109        for (i, line) in trimmed.lines().enumerate() {
110            let line = line.trim();
111            if line.is_empty() {
112                continue;
113            }
114            let v = serde_json::from_str::<serde_json::Value>(line).map_err(|e| {
115                IngestionError::SchemaMismatch {
116                    message: format!("invalid ndjson at line {}: {}", i + 1, e),
117                }
118            })?;
119            values.push(v);
120        }
121        ingest_json_values(&values, schema)
122    }
123}
124
125fn ingest_json_values(values: &[serde_json::Value], schema: &Schema) -> IngestionResult<DataSet> {
126    let mut rows: Vec<Vec<Value>> = Vec::with_capacity(values.len());
127
128    for (idx0, v) in values.iter().enumerate() {
129        let row_num = idx0 + 1;
130        let obj = v
131            .as_object()
132            .ok_or_else(|| IngestionError::SchemaMismatch {
133                message: format!("row {row_num} is not a json object"),
134            })?;
135
136        let mut row: Vec<Value> = Vec::with_capacity(schema.fields.len());
137        for field in &schema.fields {
138            let jv = get_by_dot_path(obj, &field.name).ok_or_else(|| {
139                IngestionError::SchemaMismatch {
140                    message: format!("row {row_num} missing required field '{}'", field.name),
141                }
142            })?;
143            row.push(convert_json_value(
144                row_num,
145                &field.name,
146                &field.data_type,
147                jv,
148            )?);
149        }
150        rows.push(row);
151    }
152
153    Ok(DataSet::new(schema.clone(), rows))
154}
155
156fn get_by_dot_path<'a>(
157    root: &'a serde_json::Map<String, serde_json::Value>,
158    path: &str,
159) -> Option<&'a serde_json::Value> {
160    let mut current: &serde_json::Value = root.get(path.split('.').next().unwrap_or(path))?;
161
162    // If there are no dots, short-circuit.
163    if !path.contains('.') {
164        return Some(current);
165    }
166
167    for segment in path.split('.').skip(1) {
168        match current {
169            serde_json::Value::Object(map) => current = map.get(segment)?,
170            _ => return None,
171        }
172    }
173    Some(current)
174}
175
176fn convert_json_value(
177    row: usize,
178    column: &str,
179    data_type: &DataType,
180    v: &serde_json::Value,
181) -> IngestionResult<Value> {
182    if v.is_null() {
183        return Ok(Value::Null);
184    }
185
186    match data_type {
187        DataType::Utf8 => v
188            .as_str()
189            .map(|s| Value::Utf8(s.to_string()))
190            .ok_or_else(|| IngestionError::ParseError {
191                row,
192                column: column.to_string(),
193                raw: v.to_string(),
194                message: "expected string".to_string(),
195            }),
196        DataType::Bool => v
197            .as_bool()
198            .map(Value::Bool)
199            .ok_or_else(|| IngestionError::ParseError {
200                row,
201                column: column.to_string(),
202                raw: v.to_string(),
203                message: "expected bool".to_string(),
204            }),
205        DataType::Int64 => {
206            if let Some(n) = v.as_i64() {
207                Ok(Value::Int64(n))
208            } else if let Some(n) = v.as_u64() {
209                i64::try_from(n)
210                    .map(Value::Int64)
211                    .map_err(|_| IngestionError::ParseError {
212                        row,
213                        column: column.to_string(),
214                        raw: v.to_string(),
215                        message: "u64 out of range for i64".to_string(),
216                    })
217            } else {
218                Err(IngestionError::ParseError {
219                    row,
220                    column: column.to_string(),
221                    raw: v.to_string(),
222                    message: "expected integer number".to_string(),
223                })
224            }
225        }
226        DataType::Float64 => {
227            v.as_f64()
228                .map(Value::Float64)
229                .ok_or_else(|| IngestionError::ParseError {
230                    row,
231                    column: column.to_string(),
232                    raw: v.to_string(),
233                    message: "expected number".to_string(),
234                })
235        }
236    }
237}