rust_data_processing/execution/
mod.rs

1//! Execution engine for running processing pipelines with configurable parallelism.
2//!
3//! This module sits "above" [`crate::processing`] and provides:
4//!
5//! - Parallel (chunked) execution for filter/map
6//! - Resource limits / throttling (e.g., in-flight chunks)
7//! - Real-time metrics + observer hooks for monitoring
8
9mod observer;
10mod semaphore;
11
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use rayon::ThreadPool;
16use rayon::ThreadPoolBuilder;
17use rayon::prelude::*;
18
19use crate::processing::{ReduceOp, reduce};
20use crate::types::{DataSet, Value};
21
22pub use observer::{
23    ExecutionEvent, ExecutionMetrics, ExecutionMetricsSnapshot, ExecutionObserver,
24    StdErrExecutionObserver,
25};
26
27use semaphore::Semaphore;
28
29/// Configuration for the [`ExecutionEngine`].
30#[derive(Debug, Clone)]
31pub struct ExecutionOptions {
32    /// Number of worker threads used by the engine.
33    ///
34    /// If `None`, uses the platform's available parallelism.
35    pub num_threads: Option<usize>,
36    /// Number of rows per chunk.
37    ///
38    /// Chunking lets the engine bound working-set size and implement throttling.
39    pub chunk_size: usize,
40    /// Upper bound on concurrently executing chunks.
41    ///
42    /// This is an additional throttle on top of `num_threads`.
43    pub max_in_flight_chunks: usize,
44}
45
46impl Default for ExecutionOptions {
47    fn default() -> Self {
48        let n = std::thread::available_parallelism()
49            .map(|n| n.get())
50            .unwrap_or(1);
51        Self {
52            num_threads: Some(n),
53            chunk_size: 4_096,
54            max_in_flight_chunks: n.max(1),
55        }
56    }
57}
58
59/// A configurable execution engine for in-memory [`DataSet`] pipelines.
60pub struct ExecutionEngine {
61    pool: ThreadPool,
62    opts: ExecutionOptions,
63    observer: Option<Arc<dyn ExecutionObserver>>,
64    metrics: Arc<ExecutionMetrics>,
65}
66
67impl ExecutionEngine {
68    /// Create a new engine with the given options.
69    ///
70    /// # Panics
71    ///
72    /// Panics if `chunk_size == 0`, `max_in_flight_chunks == 0`, or `num_threads == Some(0)`.
73    pub fn new(opts: ExecutionOptions) -> Self {
74        assert!(opts.chunk_size > 0, "chunk_size must be > 0");
75        assert!(
76            opts.max_in_flight_chunks > 0,
77            "max_in_flight_chunks must be > 0"
78        );
79        if let Some(n) = opts.num_threads {
80            assert!(n > 0, "num_threads must be > 0 when set");
81        }
82
83        let n_threads = opts
84            .num_threads
85            .unwrap_or_else(|| {
86                std::thread::available_parallelism()
87                    .map(|n| n.get())
88                    .unwrap_or(1)
89            })
90            .max(1);
91
92        let pool = ThreadPoolBuilder::new()
93            .num_threads(n_threads)
94            .build()
95            .expect("failed to build rayon thread pool");
96
97        Self {
98            pool,
99            opts: opts.clone(),
100            observer: None,
101            metrics: Arc::new(ExecutionMetrics::new()),
102        }
103    }
104
105    /// Attach an observer for execution events (metrics/logging).
106    pub fn with_observer(mut self, observer: Arc<dyn ExecutionObserver>) -> Self {
107        self.observer = Some(observer);
108        self
109    }
110
111    /// Get a handle to real-time execution metrics.
112    pub fn metrics(&self) -> Arc<ExecutionMetrics> {
113        Arc::clone(&self.metrics)
114    }
115
116    /// Execute a parallel filter over the dataset.
117    pub fn filter_parallel<F>(&self, dataset: &DataSet, predicate: F) -> DataSet
118    where
119        F: Fn(&[Value]) -> bool + Send + Sync,
120    {
121        self.pool
122            .install(|| self.filter_parallel_impl(dataset, &predicate))
123    }
124
125    fn filter_parallel_impl(
126        &self,
127        dataset: &DataSet,
128        predicate: &(dyn Fn(&[Value]) -> bool + Send + Sync),
129    ) -> DataSet {
130        let start = Instant::now();
131        self.metrics.begin_run();
132        self.emit(ExecutionEvent::RunStarted);
133
134        let sem = Semaphore::new(self.opts.max_in_flight_chunks);
135        let chunk_ranges = chunk_ranges(dataset.row_count(), self.opts.chunk_size);
136
137        let per_chunk: Vec<Vec<Vec<Value>>> = chunk_ranges
138            .into_par_iter()
139            .map(|range| {
140                let waited = sem.acquire();
141                if waited > Duration::ZERO {
142                    self.metrics.on_throttle_wait(waited);
143                    self.emit(ExecutionEvent::ThrottleWaited { duration: waited });
144                }
145
146                self.metrics.on_chunk_start();
147                self.emit(ExecutionEvent::ChunkStarted {
148                    start_row: range.start,
149                    row_count: range.end - range.start,
150                });
151
152                let mut out = Vec::new();
153                for row in &dataset.rows[range] {
154                    self.metrics.on_row_processed();
155                    if predicate(row.as_slice()) {
156                        out.push(row.clone());
157                    }
158                }
159
160                self.emit(ExecutionEvent::ChunkFinished {
161                    output_rows: out.len(),
162                });
163                self.metrics.on_chunk_end();
164                sem.release();
165                out
166            })
167            .collect();
168
169        let rows = per_chunk.into_iter().flatten().collect::<Vec<_>>();
170        let out = DataSet::new(dataset.schema.clone(), rows);
171
172        self.metrics.end_run(start.elapsed());
173        self.emit(ExecutionEvent::RunFinished {
174            elapsed: start.elapsed(),
175            metrics: self.metrics.snapshot(),
176        });
177
178        out
179    }
180
181    /// Execute a parallel map over the dataset.
182    ///
183    /// # Panics
184    ///
185    /// Panics if `mapper` returns rows with a different length than the schema field count.
186    pub fn map_parallel<F>(&self, dataset: &DataSet, mapper: F) -> DataSet
187    where
188        F: Fn(&[Value]) -> Vec<Value> + Send + Sync,
189    {
190        self.pool
191            .install(|| self.map_parallel_impl(dataset, &mapper))
192    }
193
194    fn map_parallel_impl(
195        &self,
196        dataset: &DataSet,
197        mapper: &(dyn Fn(&[Value]) -> Vec<Value> + Send + Sync),
198    ) -> DataSet {
199        let start = Instant::now();
200        self.metrics.begin_run();
201        self.emit(ExecutionEvent::RunStarted);
202
203        let expected_len = dataset.schema.fields.len();
204        let sem = Semaphore::new(self.opts.max_in_flight_chunks);
205        let chunk_ranges = chunk_ranges(dataset.row_count(), self.opts.chunk_size);
206
207        let per_chunk: Vec<Vec<Vec<Value>>> = chunk_ranges
208            .into_par_iter()
209            .map(|range| {
210                let waited = sem.acquire();
211                if waited > Duration::ZERO {
212                    self.metrics.on_throttle_wait(waited);
213                    self.emit(ExecutionEvent::ThrottleWaited { duration: waited });
214                }
215
216                self.metrics.on_chunk_start();
217                self.emit(ExecutionEvent::ChunkStarted {
218                    start_row: range.start,
219                    row_count: range.end - range.start,
220                });
221
222                let mut out = Vec::with_capacity(range.end - range.start);
223                for row in &dataset.rows[range] {
224                    self.metrics.on_row_processed();
225                    let mapped = mapper(row.as_slice());
226                    assert!(
227                        mapped.len() == expected_len,
228                        "mapped row length {} does not match schema length {}",
229                        mapped.len(),
230                        expected_len
231                    );
232                    out.push(mapped);
233                }
234
235                self.emit(ExecutionEvent::ChunkFinished {
236                    output_rows: out.len(),
237                });
238                self.metrics.on_chunk_end();
239                sem.release();
240                out
241            })
242            .collect();
243
244        let rows = per_chunk.into_iter().flatten().collect::<Vec<_>>();
245        let out = DataSet::new(dataset.schema.clone(), rows);
246
247        self.metrics.end_run(start.elapsed());
248        self.emit(ExecutionEvent::RunFinished {
249            elapsed: start.elapsed(),
250            metrics: self.metrics.snapshot(),
251        });
252
253        out
254    }
255
256    /// Reduce a column using the existing built-in reduce operation.
257    ///
258    /// This is currently sequential, but is tracked via the observer/metrics hooks.
259    pub fn reduce(&self, dataset: &DataSet, column: &str, op: ReduceOp) -> Option<Value> {
260        let start = Instant::now();
261        self.metrics.begin_run();
262        self.emit(ExecutionEvent::RunStarted);
263        self.emit(ExecutionEvent::ReduceStarted {
264            column: column.to_string(),
265            op,
266        });
267
268        let out = reduce(dataset, column, op);
269
270        self.emit(ExecutionEvent::ReduceFinished {
271            result: out.clone(),
272        });
273        self.metrics.end_run(start.elapsed());
274        self.emit(ExecutionEvent::RunFinished {
275            elapsed: start.elapsed(),
276            metrics: self.metrics.snapshot(),
277        });
278        out
279    }
280
281    fn emit(&self, event: ExecutionEvent) {
282        if let Some(obs) = &self.observer {
283            obs.on_event(&event);
284        }
285    }
286}
287
288fn chunk_ranges(row_count: usize, chunk_size: usize) -> Vec<std::ops::Range<usize>> {
289    if row_count == 0 {
290        return Vec::new();
291    }
292    let mut out = Vec::with_capacity((row_count + chunk_size - 1) / chunk_size);
293    let mut start = 0usize;
294    while start < row_count {
295        let end = (start + chunk_size).min(row_count);
296        out.push(start..end);
297        start = end;
298    }
299    out
300}
301
302#[cfg(test)]
303mod tests {
304    use super::{ExecutionEngine, ExecutionOptions};
305    use std::sync::Arc;
306    use std::sync::atomic::{AtomicUsize, Ordering};
307    use std::time::Duration;
308
309    use crate::execution::{ExecutionEvent, ExecutionObserver};
310    use crate::types::{DataSet, DataType, Field, Schema, Value};
311
312    fn dataset_of_n(n: usize) -> DataSet {
313        let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
314        let mut rows = Vec::with_capacity(n);
315        for i in 0..n as i64 {
316            rows.push(vec![Value::Int64(i)]);
317        }
318        DataSet::new(schema, rows)
319    }
320
321    #[test]
322    fn map_parallel_runs_with_concurrency() {
323        let ds = dataset_of_n(400);
324        let engine = ExecutionEngine::new(ExecutionOptions {
325            num_threads: Some(4),
326            chunk_size: 1,
327            max_in_flight_chunks: 4,
328        });
329
330        let active = Arc::new(AtomicUsize::new(0));
331        let max_active = Arc::new(AtomicUsize::new(0));
332
333        let active2 = Arc::clone(&active);
334        let max_active2 = Arc::clone(&max_active);
335
336        let out = engine.map_parallel(&ds, move |row| {
337            let now = active2.fetch_add(1, Ordering::SeqCst) + 1;
338            // max = max(max, now)
339            loop {
340                let cur = max_active2.load(Ordering::SeqCst);
341                if now <= cur {
342                    break;
343                }
344                if max_active2
345                    .compare_exchange(cur, now, Ordering::SeqCst, Ordering::SeqCst)
346                    .is_ok()
347                {
348                    break;
349                }
350            }
351
352            std::thread::sleep(Duration::from_millis(2));
353            let _ = active2.fetch_sub(1, Ordering::SeqCst);
354
355            let v = match row[0] {
356                Value::Int64(x) => x + 1,
357                _ => 0,
358            };
359            vec![Value::Int64(v)]
360        });
361
362        assert_eq!(out.row_count(), ds.row_count());
363        assert!(max_active.load(Ordering::SeqCst) > 1);
364    }
365
366    struct ConcurrencyObserver {
367        active_chunks: AtomicUsize,
368        max_active_chunks: AtomicUsize,
369    }
370
371    impl ConcurrencyObserver {
372        fn new() -> Self {
373            Self {
374                active_chunks: AtomicUsize::new(0),
375                max_active_chunks: AtomicUsize::new(0),
376            }
377        }
378        fn max(&self) -> usize {
379            self.max_active_chunks.load(Ordering::SeqCst)
380        }
381        fn bump_max(&self, now: usize) {
382            loop {
383                let cur = self.max_active_chunks.load(Ordering::SeqCst);
384                if now <= cur {
385                    break;
386                }
387                if self
388                    .max_active_chunks
389                    .compare_exchange(cur, now, Ordering::SeqCst, Ordering::SeqCst)
390                    .is_ok()
391                {
392                    break;
393                }
394            }
395        }
396    }
397
398    impl ExecutionObserver for ConcurrencyObserver {
399        fn on_event(&self, event: &ExecutionEvent) {
400            match event {
401                ExecutionEvent::ChunkStarted { .. } => {
402                    let now = self.active_chunks.fetch_add(1, Ordering::SeqCst) + 1;
403                    self.bump_max(now);
404                }
405                ExecutionEvent::ChunkFinished { .. } => {
406                    let _ = self.active_chunks.fetch_sub(1, Ordering::SeqCst);
407                }
408                _ => {}
409            }
410        }
411    }
412
413    #[test]
414    fn max_in_flight_chunks_throttles_chunk_concurrency() {
415        let ds = dataset_of_n(100);
416        let observer = Arc::new(ConcurrencyObserver::new());
417        let obs_trait: Arc<dyn ExecutionObserver> = observer.clone();
418        let engine = ExecutionEngine::new(ExecutionOptions {
419            num_threads: Some(4),
420            chunk_size: 1,
421            max_in_flight_chunks: 1,
422        })
423        .with_observer(obs_trait);
424
425        let out = engine.map_parallel(&ds, |_row| {
426            // Make each chunk/row take long enough to overlap if not throttled.
427            std::thread::sleep(Duration::from_millis(1));
428            vec![Value::Int64(1)]
429        });
430
431        assert_eq!(out.row_count(), ds.row_count());
432        assert_eq!(observer.max(), 1);
433    }
434
435    #[test]
436    fn metrics_are_available_after_run() {
437        let ds = dataset_of_n(60);
438        let engine = ExecutionEngine::new(ExecutionOptions {
439            num_threads: Some(4),
440            chunk_size: 1,
441            max_in_flight_chunks: 1,
442        });
443        let metrics = engine.metrics();
444
445        let out = engine.map_parallel(&ds, |_row| {
446            std::thread::sleep(Duration::from_millis(2));
447            vec![Value::Int64(1)]
448        });
449
450        assert_eq!(out.row_count(), ds.row_count());
451
452        let snap = metrics.snapshot();
453        assert_eq!(snap.rows_processed, ds.row_count() as u64);
454        assert_eq!(snap.chunks_started, ds.row_count() as u64);
455        assert_eq!(snap.chunks_finished, ds.row_count() as u64);
456        assert_eq!(snap.max_active_chunks, 1);
457        assert!(snap.throttle_wait > Duration::ZERO);
458        assert!(snap.elapsed.is_some());
459    }
460}