rust_data_processing/execution/
observer.rs

1use std::fmt;
2use std::sync::Mutex;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::time::{Duration, Instant};
5
6use crate::processing::ReduceOp;
7use crate::types::Value;
8
9/// Execution events emitted by the engine.
10#[derive(Debug, Clone)]
11pub enum ExecutionEvent {
12    RunStarted,
13    ThrottleWaited {
14        duration: Duration,
15    },
16    ChunkStarted {
17        start_row: usize,
18        row_count: usize,
19    },
20    ChunkFinished {
21        output_rows: usize,
22    },
23    ReduceStarted {
24        column: String,
25        op: ReduceOp,
26    },
27    ReduceFinished {
28        result: Option<Value>,
29    },
30    RunFinished {
31        elapsed: Duration,
32        metrics: ExecutionMetricsSnapshot,
33    },
34}
35
36/// Observer hook for execution events.
37pub trait ExecutionObserver: Send + Sync {
38    fn on_event(&self, event: &ExecutionEvent);
39}
40
41/// A simple stderr logger for execution events.
42#[derive(Default)]
43pub struct StdErrExecutionObserver;
44
45impl ExecutionObserver for StdErrExecutionObserver {
46    fn on_event(&self, event: &ExecutionEvent) {
47        eprintln!("{event:?}");
48    }
49}
50
51/// Real-time metrics for an execution run.
52///
53/// The engine updates these counters during execution; callers can snapshot them at any time.
54pub struct ExecutionMetrics {
55    run_id: AtomicU64,
56    started_at: Mutex<Option<Instant>>,
57    elapsed_ns: AtomicU64,
58
59    rows_processed: AtomicU64,
60    chunks_started: AtomicU64,
61    chunks_finished: AtomicU64,
62    throttle_wait_ns: AtomicU64,
63
64    active_chunks: AtomicUsize,
65    max_active_chunks: AtomicUsize,
66}
67
68impl ExecutionMetrics {
69    pub fn new() -> Self {
70        Self {
71            run_id: AtomicU64::new(0),
72            started_at: Mutex::new(None),
73            elapsed_ns: AtomicU64::new(0),
74            rows_processed: AtomicU64::new(0),
75            chunks_started: AtomicU64::new(0),
76            chunks_finished: AtomicU64::new(0),
77            throttle_wait_ns: AtomicU64::new(0),
78            active_chunks: AtomicUsize::new(0),
79            max_active_chunks: AtomicUsize::new(0),
80        }
81    }
82
83    pub fn begin_run(&self) {
84        let _ = self.run_id.fetch_add(1, Ordering::SeqCst) + 1;
85        *self.started_at.lock().expect("metrics mutex poisoned") = Some(Instant::now());
86
87        self.elapsed_ns.store(0, Ordering::SeqCst);
88        self.rows_processed.store(0, Ordering::SeqCst);
89        self.chunks_started.store(0, Ordering::SeqCst);
90        self.chunks_finished.store(0, Ordering::SeqCst);
91        self.throttle_wait_ns.store(0, Ordering::SeqCst);
92        self.active_chunks.store(0, Ordering::SeqCst);
93        self.max_active_chunks.store(0, Ordering::SeqCst);
94    }
95
96    pub fn end_run(&self, elapsed: Duration) {
97        self.elapsed_ns.store(
98            elapsed.as_nanos().min(u64::MAX as u128) as u64,
99            Ordering::SeqCst,
100        );
101    }
102
103    pub fn on_row_processed(&self) {
104        let _ = self.rows_processed.fetch_add(1, Ordering::SeqCst);
105    }
106
107    pub fn on_chunk_start(&self) {
108        let _ = self.chunks_started.fetch_add(1, Ordering::SeqCst);
109        let now = self.active_chunks.fetch_add(1, Ordering::SeqCst) + 1;
110        update_max_usize(&self.max_active_chunks, now);
111    }
112
113    pub fn on_chunk_end(&self) {
114        let _ = self.chunks_finished.fetch_add(1, Ordering::SeqCst);
115        let _ = self.active_chunks.fetch_sub(1, Ordering::SeqCst);
116    }
117
118    pub fn on_throttle_wait(&self, d: Duration) {
119        let add = d.as_nanos().min(u64::MAX as u128) as u64;
120        let _ = self.throttle_wait_ns.fetch_add(add, Ordering::SeqCst);
121    }
122
123    pub fn snapshot(&self) -> ExecutionMetricsSnapshot {
124        let run_id = self.run_id.load(Ordering::SeqCst);
125        let elapsed_ns = self.elapsed_ns.load(Ordering::SeqCst);
126        let elapsed = if elapsed_ns > 0 {
127            Some(Duration::from_nanos(elapsed_ns))
128        } else {
129            None
130        };
131
132        ExecutionMetricsSnapshot {
133            run_id,
134            elapsed,
135            rows_processed: self.rows_processed.load(Ordering::SeqCst),
136            chunks_started: self.chunks_started.load(Ordering::SeqCst),
137            chunks_finished: self.chunks_finished.load(Ordering::SeqCst),
138            throttle_wait: Duration::from_nanos(self.throttle_wait_ns.load(Ordering::SeqCst)),
139            max_active_chunks: self.max_active_chunks.load(Ordering::SeqCst),
140        }
141    }
142}
143
144impl Default for ExecutionMetrics {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150fn update_max_usize(dst: &AtomicUsize, now: usize) {
151    loop {
152        let cur = dst.load(Ordering::SeqCst);
153        if now <= cur {
154            break;
155        }
156        if dst
157            .compare_exchange(cur, now, Ordering::SeqCst, Ordering::SeqCst)
158            .is_ok()
159        {
160            break;
161        }
162    }
163}
164
165/// Immutable snapshot of [`ExecutionMetrics`].
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct ExecutionMetricsSnapshot {
168    pub run_id: u64,
169    pub elapsed: Option<Duration>,
170    pub rows_processed: u64,
171    pub chunks_started: u64,
172    pub chunks_finished: u64,
173    pub throttle_wait: Duration,
174    pub max_active_chunks: usize,
175}
176
177impl fmt::Display for ExecutionMetricsSnapshot {
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        write!(
180            f,
181            "run_id={}, rows_processed={}, chunks={}/{}, max_active_chunks={}, throttle_wait={:?}, elapsed={:?}",
182            self.run_id,
183            self.rows_processed,
184            self.chunks_finished,
185            self.chunks_started,
186            self.max_active_chunks,
187            self.throttle_wait,
188            self.elapsed
189        )
190    }
191}