rust_data_processing/
types.rs

1//! Core data model types for ingestion.
2//!
3//! This crate ingests supported formats into an in-memory [`DataSet`], using a user-provided
4//! [`Schema`] (a list of typed [`Field`]s).
5
6use serde::{Deserialize, Serialize};
7
8/// Logical data type for a schema field.
9#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10pub enum DataType {
11    /// 64-bit signed integer.
12    Int64,
13    /// 64-bit floating point number.
14    Float64,
15    /// Boolean.
16    Bool,
17    /// UTF-8 string.
18    Utf8,
19}
20
21/// A single named, typed field in a [`Schema`].
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct Field {
24    /// Field/column name.
25    pub name: String,
26    /// Field data type.
27    pub data_type: DataType,
28}
29
30impl Field {
31    /// Create a new field.
32    pub fn new(name: impl Into<String>, data_type: DataType) -> Self {
33        Self {
34            name: name.into(),
35            data_type,
36        }
37    }
38}
39
40/// A list of fields describing the expected shape of incoming data.
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42pub struct Schema {
43    /// Ordered list of fields.
44    pub fields: Vec<Field>,
45}
46
47impl Schema {
48    /// Create a new schema from fields.
49    pub fn new(fields: Vec<Field>) -> Self {
50        Self { fields }
51    }
52
53    /// Iterate field names in order.
54    pub fn field_names(&self) -> impl Iterator<Item = &str> {
55        self.fields.iter().map(|f| f.name.as_str())
56    }
57
58    /// Returns the index of a field by name, if present.
59    pub fn index_of(&self, name: &str) -> Option<usize> {
60        self.fields.iter().position(|f| f.name == name)
61    }
62}
63
64/// A single typed value in a [`DataSet`].
65#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66pub enum Value {
67    /// Missing/empty value.
68    Null,
69    /// 64-bit signed integer.
70    Int64(i64),
71    /// 64-bit float.
72    Float64(f64),
73    /// Boolean.
74    Bool(bool),
75    /// UTF-8 string.
76    Utf8(String),
77}
78
79/// In-memory tabular dataset.
80///
81/// Rows are stored as `Vec<Vec<Value>>` in the same order as the [`Schema`] fields.
82#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
83pub struct DataSet {
84    /// Schema describing row shape.
85    pub schema: Schema,
86    /// Row-major value storage.
87    pub rows: Vec<Vec<Value>>,
88}
89
90impl DataSet {
91    /// Create a dataset from schema and rows.
92    pub fn new(schema: Schema, rows: Vec<Vec<Value>>) -> Self {
93        Self { schema, rows }
94    }
95
96    /// Number of rows in the dataset.
97    pub fn row_count(&self) -> usize {
98        self.rows.len()
99    }
100
101    /// Create a new dataset containing only rows that match `predicate`.
102    ///
103    /// The returned dataset preserves the original schema.
104    pub fn filter_rows<F>(&self, mut predicate: F) -> Self
105    where
106        F: FnMut(&[Value]) -> bool,
107    {
108        let rows = self
109            .rows
110            .iter()
111            .filter(|row| predicate(row.as_slice()))
112            .cloned()
113            .collect();
114        Self {
115            schema: self.schema.clone(),
116            rows,
117        }
118    }
119
120    /// Create a new dataset by applying `mapper` to every row.
121    ///
122    /// The returned dataset preserves the original schema.
123    ///
124    /// # Panics
125    ///
126    /// Panics if `mapper` returns a row with a different length than the schema field count.
127    pub fn map_rows<F>(&self, mut mapper: F) -> Self
128    where
129        F: FnMut(&[Value]) -> Vec<Value>,
130    {
131        let expected_len = self.schema.fields.len();
132        let rows = self
133            .rows
134            .iter()
135            .map(|row| {
136                let out = mapper(row.as_slice());
137                assert!(
138                    out.len() == expected_len,
139                    "mapped row length {} does not match schema length {}",
140                    out.len(),
141                    expected_len
142                );
143                out
144            })
145            .collect();
146
147        Self {
148            schema: self.schema.clone(),
149            rows,
150        }
151    }
152
153    /// Reduce (fold) all rows into an accumulator value.
154    ///
155    /// This is similar to `Iterator::fold`, but provides each row as `&[Value]`.
156    pub fn reduce_rows<A, F>(&self, init: A, mut reducer: F) -> A
157    where
158        F: FnMut(A, &[Value]) -> A,
159    {
160        self.rows
161            .iter()
162            .fold(init, |acc, row| reducer(acc, row.as_slice()))
163    }
164}