rust_data_processing/sql/mod.rs
1//! SQL support (Polars-backed).
2//!
3//! This module is implemented as a thin wrapper around Polars SQL: it compiles SQL into a Polars
4//! logical plan (a `LazyFrame`) and returns a [`crate::pipeline::DataFrame`].
5//!
6//! Design goals:
7//! - Keep public signatures in crate types (no Polars types in signatures)
8//! - Preserve underlying engine errors via `IngestionError::Engine { source, .. }`
9
10use crate::error::{IngestionError, IngestionResult};
11use crate::pipeline::DataFrame;
12
13use polars_sql::SQLContext;
14
15/// Default single-table name used by [`query`].
16pub const DEFAULT_TABLE: &str = "df";
17
18/// Execute a SQL query against a single [`DataFrame`].
19///
20/// The input is registered as the table [`DEFAULT_TABLE`] (i.e. `df`), so callers should write
21/// queries like: `SELECT ... FROM df WHERE ...`.
22///
23/// # Example
24///
25/// ```no_run
26/// use rust_data_processing::pipeline::DataFrame;
27/// use rust_data_processing::sql;
28/// use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
29///
30/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
31/// let ds = DataSet::new(
32/// Schema::new(vec![
33/// Field::new("id", DataType::Int64),
34/// Field::new("active", DataType::Bool),
35/// ]),
36/// vec![
37/// vec![Value::Int64(1), Value::Bool(true)],
38/// vec![Value::Int64(2), Value::Bool(false)],
39/// ],
40/// );
41///
42/// let out = sql::query(
43/// &DataFrame::from_dataset(&ds)?,
44/// "SELECT id FROM df WHERE active = TRUE ORDER BY id",
45/// )?
46/// .collect()?;
47///
48/// assert_eq!(out.row_count(), 1);
49/// # Ok(())
50/// # }
51/// ```
52pub fn query(df: &DataFrame, sql: &str) -> IngestionResult<DataFrame> {
53 let mut ctx = Context::new();
54 ctx.register(DEFAULT_TABLE, df)?;
55 ctx.execute(sql)
56}
57
58/// A SQL execution context that can register multiple tables and execute queries.
59///
60/// This is the preferred entrypoint for JOINs across multiple [`DataFrame`]s.
61///
62/// # Example (JOIN)
63///
64/// ```no_run
65/// use rust_data_processing::pipeline::DataFrame;
66/// use rust_data_processing::sql;
67/// use rust_data_processing::types::{DataSet, DataType, Field, Schema, Value};
68///
69/// # fn main() -> Result<(), rust_data_processing::IngestionError> {
70/// let people = DataSet::new(
71/// Schema::new(vec![
72/// Field::new("id", DataType::Int64),
73/// Field::new("name", DataType::Utf8),
74/// ]),
75/// vec![
76/// vec![Value::Int64(1), Value::Utf8("Ada".to_string())],
77/// vec![Value::Int64(2), Value::Utf8("Grace".to_string())],
78/// ],
79/// );
80/// let scores = DataSet::new(
81/// Schema::new(vec![
82/// Field::new("id", DataType::Int64),
83/// Field::new("score", DataType::Float64),
84/// ]),
85/// vec![vec![Value::Int64(1), Value::Float64(98.5)]],
86/// );
87///
88/// let mut ctx = sql::Context::new();
89/// ctx.register("people", &DataFrame::from_dataset(&people)?)?;
90/// ctx.register("scores", &DataFrame::from_dataset(&scores)?)?;
91///
92/// let out = ctx
93/// .execute("SELECT p.id, p.name, s.score FROM people p JOIN scores s ON p.id = s.id")?
94/// .collect()?;
95///
96/// assert_eq!(out.row_count(), 1);
97/// # Ok(())
98/// # }
99/// ```
100pub struct Context {
101 inner: SQLContext,
102}
103
104impl Context {
105 /// Create an empty SQL context.
106 pub fn new() -> Self {
107 Self {
108 inner: SQLContext::new(),
109 }
110 }
111
112 /// Register a [`DataFrame`] as a SQL table.
113 pub fn register(&mut self, name: &str, df: &DataFrame) -> IngestionResult<()> {
114 if name.trim().is_empty() {
115 return Err(IngestionError::SchemaMismatch {
116 message: "sql table name must be non-empty".to_string(),
117 });
118 }
119 self.inner.register(name, df.lazy_clone());
120 Ok(())
121 }
122
123 /// Execute a SQL query and return a lazy [`DataFrame`].
124 pub fn execute(&mut self, sql: &str) -> IngestionResult<DataFrame> {
125 let lf = self
126 .inner
127 .execute(sql)
128 .map_err(|e| IngestionError::Engine {
129 message: "failed to execute sql query".to_string(),
130 source: Box::new(e),
131 })?;
132 Ok(DataFrame::from_lazyframe(lf))
133 }
134}
135
136impl Default for Context {
137 fn default() -> Self {
138 Self::new()
139 }
140}