rust_data_processing/execution/
observer.rs1use std::fmt;
2use std::sync::Mutex;
3use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
4use std::time::{Duration, Instant};
5
6use crate::processing::ReduceOp;
7use crate::types::Value;
8
9#[derive(Debug, Clone)]
11pub enum ExecutionEvent {
12 RunStarted,
13 ThrottleWaited {
14 duration: Duration,
15 },
16 ChunkStarted {
17 start_row: usize,
18 row_count: usize,
19 },
20 ChunkFinished {
21 output_rows: usize,
22 },
23 ReduceStarted {
24 column: String,
25 op: ReduceOp,
26 },
27 ReduceFinished {
28 result: Option<Value>,
29 },
30 RunFinished {
31 elapsed: Duration,
32 metrics: ExecutionMetricsSnapshot,
33 },
34}
35
36pub trait ExecutionObserver: Send + Sync {
38 fn on_event(&self, event: &ExecutionEvent);
39}
40
41#[derive(Default)]
43pub struct StdErrExecutionObserver;
44
45impl ExecutionObserver for StdErrExecutionObserver {
46 fn on_event(&self, event: &ExecutionEvent) {
47 eprintln!("{event:?}");
48 }
49}
50
51pub struct ExecutionMetrics {
55 run_id: AtomicU64,
56 started_at: Mutex<Option<Instant>>,
57 elapsed_ns: AtomicU64,
58
59 rows_processed: AtomicU64,
60 chunks_started: AtomicU64,
61 chunks_finished: AtomicU64,
62 throttle_wait_ns: AtomicU64,
63
64 active_chunks: AtomicUsize,
65 max_active_chunks: AtomicUsize,
66}
67
68impl ExecutionMetrics {
69 pub fn new() -> Self {
70 Self {
71 run_id: AtomicU64::new(0),
72 started_at: Mutex::new(None),
73 elapsed_ns: AtomicU64::new(0),
74 rows_processed: AtomicU64::new(0),
75 chunks_started: AtomicU64::new(0),
76 chunks_finished: AtomicU64::new(0),
77 throttle_wait_ns: AtomicU64::new(0),
78 active_chunks: AtomicUsize::new(0),
79 max_active_chunks: AtomicUsize::new(0),
80 }
81 }
82
83 pub fn begin_run(&self) {
84 let _ = self.run_id.fetch_add(1, Ordering::SeqCst) + 1;
85 *self.started_at.lock().expect("metrics mutex poisoned") = Some(Instant::now());
86
87 self.elapsed_ns.store(0, Ordering::SeqCst);
88 self.rows_processed.store(0, Ordering::SeqCst);
89 self.chunks_started.store(0, Ordering::SeqCst);
90 self.chunks_finished.store(0, Ordering::SeqCst);
91 self.throttle_wait_ns.store(0, Ordering::SeqCst);
92 self.active_chunks.store(0, Ordering::SeqCst);
93 self.max_active_chunks.store(0, Ordering::SeqCst);
94 }
95
96 pub fn end_run(&self, elapsed: Duration) {
97 self.elapsed_ns.store(
98 elapsed.as_nanos().min(u64::MAX as u128) as u64,
99 Ordering::SeqCst,
100 );
101 }
102
103 pub fn on_row_processed(&self) {
104 let _ = self.rows_processed.fetch_add(1, Ordering::SeqCst);
105 }
106
107 pub fn on_chunk_start(&self) {
108 let _ = self.chunks_started.fetch_add(1, Ordering::SeqCst);
109 let now = self.active_chunks.fetch_add(1, Ordering::SeqCst) + 1;
110 update_max_usize(&self.max_active_chunks, now);
111 }
112
113 pub fn on_chunk_end(&self) {
114 let _ = self.chunks_finished.fetch_add(1, Ordering::SeqCst);
115 let _ = self.active_chunks.fetch_sub(1, Ordering::SeqCst);
116 }
117
118 pub fn on_throttle_wait(&self, d: Duration) {
119 let add = d.as_nanos().min(u64::MAX as u128) as u64;
120 let _ = self.throttle_wait_ns.fetch_add(add, Ordering::SeqCst);
121 }
122
123 pub fn snapshot(&self) -> ExecutionMetricsSnapshot {
124 let run_id = self.run_id.load(Ordering::SeqCst);
125 let elapsed_ns = self.elapsed_ns.load(Ordering::SeqCst);
126 let elapsed = if elapsed_ns > 0 {
127 Some(Duration::from_nanos(elapsed_ns))
128 } else {
129 None
130 };
131
132 ExecutionMetricsSnapshot {
133 run_id,
134 elapsed,
135 rows_processed: self.rows_processed.load(Ordering::SeqCst),
136 chunks_started: self.chunks_started.load(Ordering::SeqCst),
137 chunks_finished: self.chunks_finished.load(Ordering::SeqCst),
138 throttle_wait: Duration::from_nanos(self.throttle_wait_ns.load(Ordering::SeqCst)),
139 max_active_chunks: self.max_active_chunks.load(Ordering::SeqCst),
140 }
141 }
142}
143
144impl Default for ExecutionMetrics {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150fn update_max_usize(dst: &AtomicUsize, now: usize) {
151 loop {
152 let cur = dst.load(Ordering::SeqCst);
153 if now <= cur {
154 break;
155 }
156 if dst
157 .compare_exchange(cur, now, Ordering::SeqCst, Ordering::SeqCst)
158 .is_ok()
159 {
160 break;
161 }
162 }
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
167pub struct ExecutionMetricsSnapshot {
168 pub run_id: u64,
169 pub elapsed: Option<Duration>,
170 pub rows_processed: u64,
171 pub chunks_started: u64,
172 pub chunks_finished: u64,
173 pub throttle_wait: Duration,
174 pub max_active_chunks: usize,
175}
176
177impl fmt::Display for ExecutionMetricsSnapshot {
178 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179 write!(
180 f,
181 "run_id={}, rows_processed={}, chunks={}/{}, max_active_chunks={}, throttle_wait={:?}, elapsed={:?}",
182 self.run_id,
183 self.rows_processed,
184 self.chunks_finished,
185 self.chunks_started,
186 self.max_active_chunks,
187 self.throttle_wait,
188 self.elapsed
189 )
190 }
191}