rust_data_processing/execution/
mod.rs1mod observer;
10mod semaphore;
11
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use rayon::ThreadPool;
16use rayon::ThreadPoolBuilder;
17use rayon::prelude::*;
18
19use crate::processing::{ReduceOp, reduce};
20use crate::types::{DataSet, Value};
21
22pub use observer::{
23 ExecutionEvent, ExecutionMetrics, ExecutionMetricsSnapshot, ExecutionObserver,
24 StdErrExecutionObserver,
25};
26
27use semaphore::Semaphore;
28
29#[derive(Debug, Clone)]
31pub struct ExecutionOptions {
32 pub num_threads: Option<usize>,
36 pub chunk_size: usize,
40 pub max_in_flight_chunks: usize,
44}
45
46impl Default for ExecutionOptions {
47 fn default() -> Self {
48 let n = std::thread::available_parallelism()
49 .map(|n| n.get())
50 .unwrap_or(1);
51 Self {
52 num_threads: Some(n),
53 chunk_size: 4_096,
54 max_in_flight_chunks: n.max(1),
55 }
56 }
57}
58
59pub struct ExecutionEngine {
61 pool: ThreadPool,
62 opts: ExecutionOptions,
63 observer: Option<Arc<dyn ExecutionObserver>>,
64 metrics: Arc<ExecutionMetrics>,
65}
66
67impl ExecutionEngine {
68 pub fn new(opts: ExecutionOptions) -> Self {
74 assert!(opts.chunk_size > 0, "chunk_size must be > 0");
75 assert!(
76 opts.max_in_flight_chunks > 0,
77 "max_in_flight_chunks must be > 0"
78 );
79 if let Some(n) = opts.num_threads {
80 assert!(n > 0, "num_threads must be > 0 when set");
81 }
82
83 let n_threads = opts
84 .num_threads
85 .unwrap_or_else(|| {
86 std::thread::available_parallelism()
87 .map(|n| n.get())
88 .unwrap_or(1)
89 })
90 .max(1);
91
92 let pool = ThreadPoolBuilder::new()
93 .num_threads(n_threads)
94 .build()
95 .expect("failed to build rayon thread pool");
96
97 Self {
98 pool,
99 opts: opts.clone(),
100 observer: None,
101 metrics: Arc::new(ExecutionMetrics::new()),
102 }
103 }
104
105 pub fn with_observer(mut self, observer: Arc<dyn ExecutionObserver>) -> Self {
107 self.observer = Some(observer);
108 self
109 }
110
111 pub fn metrics(&self) -> Arc<ExecutionMetrics> {
113 Arc::clone(&self.metrics)
114 }
115
116 pub fn filter_parallel<F>(&self, dataset: &DataSet, predicate: F) -> DataSet
118 where
119 F: Fn(&[Value]) -> bool + Send + Sync,
120 {
121 self.pool
122 .install(|| self.filter_parallel_impl(dataset, &predicate))
123 }
124
125 fn filter_parallel_impl(
126 &self,
127 dataset: &DataSet,
128 predicate: &(dyn Fn(&[Value]) -> bool + Send + Sync),
129 ) -> DataSet {
130 let start = Instant::now();
131 self.metrics.begin_run();
132 self.emit(ExecutionEvent::RunStarted);
133
134 let sem = Semaphore::new(self.opts.max_in_flight_chunks);
135 let chunk_ranges = chunk_ranges(dataset.row_count(), self.opts.chunk_size);
136
137 let per_chunk: Vec<Vec<Vec<Value>>> = chunk_ranges
138 .into_par_iter()
139 .map(|range| {
140 let waited = sem.acquire();
141 if waited > Duration::ZERO {
142 self.metrics.on_throttle_wait(waited);
143 self.emit(ExecutionEvent::ThrottleWaited { duration: waited });
144 }
145
146 self.metrics.on_chunk_start();
147 self.emit(ExecutionEvent::ChunkStarted {
148 start_row: range.start,
149 row_count: range.end - range.start,
150 });
151
152 let mut out = Vec::new();
153 for row in &dataset.rows[range] {
154 self.metrics.on_row_processed();
155 if predicate(row.as_slice()) {
156 out.push(row.clone());
157 }
158 }
159
160 self.emit(ExecutionEvent::ChunkFinished {
161 output_rows: out.len(),
162 });
163 self.metrics.on_chunk_end();
164 sem.release();
165 out
166 })
167 .collect();
168
169 let rows = per_chunk.into_iter().flatten().collect::<Vec<_>>();
170 let out = DataSet::new(dataset.schema.clone(), rows);
171
172 self.metrics.end_run(start.elapsed());
173 self.emit(ExecutionEvent::RunFinished {
174 elapsed: start.elapsed(),
175 metrics: self.metrics.snapshot(),
176 });
177
178 out
179 }
180
181 pub fn map_parallel<F>(&self, dataset: &DataSet, mapper: F) -> DataSet
187 where
188 F: Fn(&[Value]) -> Vec<Value> + Send + Sync,
189 {
190 self.pool
191 .install(|| self.map_parallel_impl(dataset, &mapper))
192 }
193
194 fn map_parallel_impl(
195 &self,
196 dataset: &DataSet,
197 mapper: &(dyn Fn(&[Value]) -> Vec<Value> + Send + Sync),
198 ) -> DataSet {
199 let start = Instant::now();
200 self.metrics.begin_run();
201 self.emit(ExecutionEvent::RunStarted);
202
203 let expected_len = dataset.schema.fields.len();
204 let sem = Semaphore::new(self.opts.max_in_flight_chunks);
205 let chunk_ranges = chunk_ranges(dataset.row_count(), self.opts.chunk_size);
206
207 let per_chunk: Vec<Vec<Vec<Value>>> = chunk_ranges
208 .into_par_iter()
209 .map(|range| {
210 let waited = sem.acquire();
211 if waited > Duration::ZERO {
212 self.metrics.on_throttle_wait(waited);
213 self.emit(ExecutionEvent::ThrottleWaited { duration: waited });
214 }
215
216 self.metrics.on_chunk_start();
217 self.emit(ExecutionEvent::ChunkStarted {
218 start_row: range.start,
219 row_count: range.end - range.start,
220 });
221
222 let mut out = Vec::with_capacity(range.end - range.start);
223 for row in &dataset.rows[range] {
224 self.metrics.on_row_processed();
225 let mapped = mapper(row.as_slice());
226 assert!(
227 mapped.len() == expected_len,
228 "mapped row length {} does not match schema length {}",
229 mapped.len(),
230 expected_len
231 );
232 out.push(mapped);
233 }
234
235 self.emit(ExecutionEvent::ChunkFinished {
236 output_rows: out.len(),
237 });
238 self.metrics.on_chunk_end();
239 sem.release();
240 out
241 })
242 .collect();
243
244 let rows = per_chunk.into_iter().flatten().collect::<Vec<_>>();
245 let out = DataSet::new(dataset.schema.clone(), rows);
246
247 self.metrics.end_run(start.elapsed());
248 self.emit(ExecutionEvent::RunFinished {
249 elapsed: start.elapsed(),
250 metrics: self.metrics.snapshot(),
251 });
252
253 out
254 }
255
256 pub fn reduce(&self, dataset: &DataSet, column: &str, op: ReduceOp) -> Option<Value> {
260 let start = Instant::now();
261 self.metrics.begin_run();
262 self.emit(ExecutionEvent::RunStarted);
263 self.emit(ExecutionEvent::ReduceStarted {
264 column: column.to_string(),
265 op,
266 });
267
268 let out = reduce(dataset, column, op);
269
270 self.emit(ExecutionEvent::ReduceFinished {
271 result: out.clone(),
272 });
273 self.metrics.end_run(start.elapsed());
274 self.emit(ExecutionEvent::RunFinished {
275 elapsed: start.elapsed(),
276 metrics: self.metrics.snapshot(),
277 });
278 out
279 }
280
281 fn emit(&self, event: ExecutionEvent) {
282 if let Some(obs) = &self.observer {
283 obs.on_event(&event);
284 }
285 }
286}
287
288fn chunk_ranges(row_count: usize, chunk_size: usize) -> Vec<std::ops::Range<usize>> {
289 if row_count == 0 {
290 return Vec::new();
291 }
292 let mut out = Vec::with_capacity((row_count + chunk_size - 1) / chunk_size);
293 let mut start = 0usize;
294 while start < row_count {
295 let end = (start + chunk_size).min(row_count);
296 out.push(start..end);
297 start = end;
298 }
299 out
300}
301
302#[cfg(test)]
303mod tests {
304 use super::{ExecutionEngine, ExecutionOptions};
305 use std::sync::Arc;
306 use std::sync::atomic::{AtomicUsize, Ordering};
307 use std::time::Duration;
308
309 use crate::execution::{ExecutionEvent, ExecutionObserver};
310 use crate::types::{DataSet, DataType, Field, Schema, Value};
311
312 fn dataset_of_n(n: usize) -> DataSet {
313 let schema = Schema::new(vec![Field::new("id", DataType::Int64)]);
314 let mut rows = Vec::with_capacity(n);
315 for i in 0..n as i64 {
316 rows.push(vec![Value::Int64(i)]);
317 }
318 DataSet::new(schema, rows)
319 }
320
321 #[test]
322 fn map_parallel_runs_with_concurrency() {
323 let ds = dataset_of_n(400);
324 let engine = ExecutionEngine::new(ExecutionOptions {
325 num_threads: Some(4),
326 chunk_size: 1,
327 max_in_flight_chunks: 4,
328 });
329
330 let active = Arc::new(AtomicUsize::new(0));
331 let max_active = Arc::new(AtomicUsize::new(0));
332
333 let active2 = Arc::clone(&active);
334 let max_active2 = Arc::clone(&max_active);
335
336 let out = engine.map_parallel(&ds, move |row| {
337 let now = active2.fetch_add(1, Ordering::SeqCst) + 1;
338 loop {
340 let cur = max_active2.load(Ordering::SeqCst);
341 if now <= cur {
342 break;
343 }
344 if max_active2
345 .compare_exchange(cur, now, Ordering::SeqCst, Ordering::SeqCst)
346 .is_ok()
347 {
348 break;
349 }
350 }
351
352 std::thread::sleep(Duration::from_millis(2));
353 let _ = active2.fetch_sub(1, Ordering::SeqCst);
354
355 let v = match row[0] {
356 Value::Int64(x) => x + 1,
357 _ => 0,
358 };
359 vec![Value::Int64(v)]
360 });
361
362 assert_eq!(out.row_count(), ds.row_count());
363 assert!(max_active.load(Ordering::SeqCst) > 1);
364 }
365
366 struct ConcurrencyObserver {
367 active_chunks: AtomicUsize,
368 max_active_chunks: AtomicUsize,
369 }
370
371 impl ConcurrencyObserver {
372 fn new() -> Self {
373 Self {
374 active_chunks: AtomicUsize::new(0),
375 max_active_chunks: AtomicUsize::new(0),
376 }
377 }
378 fn max(&self) -> usize {
379 self.max_active_chunks.load(Ordering::SeqCst)
380 }
381 fn bump_max(&self, now: usize) {
382 loop {
383 let cur = self.max_active_chunks.load(Ordering::SeqCst);
384 if now <= cur {
385 break;
386 }
387 if self
388 .max_active_chunks
389 .compare_exchange(cur, now, Ordering::SeqCst, Ordering::SeqCst)
390 .is_ok()
391 {
392 break;
393 }
394 }
395 }
396 }
397
398 impl ExecutionObserver for ConcurrencyObserver {
399 fn on_event(&self, event: &ExecutionEvent) {
400 match event {
401 ExecutionEvent::ChunkStarted { .. } => {
402 let now = self.active_chunks.fetch_add(1, Ordering::SeqCst) + 1;
403 self.bump_max(now);
404 }
405 ExecutionEvent::ChunkFinished { .. } => {
406 let _ = self.active_chunks.fetch_sub(1, Ordering::SeqCst);
407 }
408 _ => {}
409 }
410 }
411 }
412
413 #[test]
414 fn max_in_flight_chunks_throttles_chunk_concurrency() {
415 let ds = dataset_of_n(100);
416 let observer = Arc::new(ConcurrencyObserver::new());
417 let obs_trait: Arc<dyn ExecutionObserver> = observer.clone();
418 let engine = ExecutionEngine::new(ExecutionOptions {
419 num_threads: Some(4),
420 chunk_size: 1,
421 max_in_flight_chunks: 1,
422 })
423 .with_observer(obs_trait);
424
425 let out = engine.map_parallel(&ds, |_row| {
426 std::thread::sleep(Duration::from_millis(1));
428 vec![Value::Int64(1)]
429 });
430
431 assert_eq!(out.row_count(), ds.row_count());
432 assert_eq!(observer.max(), 1);
433 }
434
435 #[test]
436 fn metrics_are_available_after_run() {
437 let ds = dataset_of_n(60);
438 let engine = ExecutionEngine::new(ExecutionOptions {
439 num_threads: Some(4),
440 chunk_size: 1,
441 max_in_flight_chunks: 1,
442 });
443 let metrics = engine.metrics();
444
445 let out = engine.map_parallel(&ds, |_row| {
446 std::thread::sleep(Duration::from_millis(2));
447 vec![Value::Int64(1)]
448 });
449
450 assert_eq!(out.row_count(), ds.row_count());
451
452 let snap = metrics.snapshot();
453 assert_eq!(snap.rows_processed, ds.row_count() as u64);
454 assert_eq!(snap.chunks_started, ds.row_count() as u64);
455 assert_eq!(snap.chunks_finished, ds.row_count() as u64);
456 assert_eq!(snap.max_active_chunks, 1);
457 assert!(snap.throttle_wait > Duration::ZERO);
458 assert!(snap.elapsed.is_some());
459 }
460}