rust_data_processing/ingestion/
observability.rs1use std::fmt;
2use std::fs::OpenOptions;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use crate::error::IngestionError;
9
10use super::unified::IngestionFormat;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
14pub enum IngestionSeverity {
15 Info,
17 Warning,
19 Error,
21 Critical,
23}
24
25#[derive(Debug, Clone)]
27pub struct IngestionContext {
28 pub path: PathBuf,
30 pub format: IngestionFormat,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct IngestionStats {
37 pub rows: usize,
39}
40
41pub trait IngestionObserver: Send + Sync {
45 fn on_success(&self, _ctx: &IngestionContext, _stats: IngestionStats) {}
47
48 fn on_failure(
50 &self,
51 _ctx: &IngestionContext,
52 _severity: IngestionSeverity,
53 _error: &IngestionError,
54 ) {
55 }
56
57 fn on_alert(
61 &self,
62 ctx: &IngestionContext,
63 severity: IngestionSeverity,
64 error: &IngestionError,
65 ) {
66 self.on_failure(ctx, severity, error)
67 }
68}
69
70#[derive(Default)]
72pub struct CompositeObserver {
73 observers: Vec<Arc<dyn IngestionObserver>>,
74}
75
76impl CompositeObserver {
77 pub fn new(observers: Vec<Arc<dyn IngestionObserver>>) -> Self {
79 Self { observers }
80 }
81}
82
83impl fmt::Debug for CompositeObserver {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("CompositeObserver")
86 .field("observers_len", &self.observers.len())
87 .finish()
88 }
89}
90
91impl IngestionObserver for CompositeObserver {
92 fn on_success(&self, ctx: &IngestionContext, stats: IngestionStats) {
93 for o in &self.observers {
94 o.on_success(ctx, stats);
95 }
96 }
97
98 fn on_failure(
99 &self,
100 ctx: &IngestionContext,
101 severity: IngestionSeverity,
102 error: &IngestionError,
103 ) {
104 for o in &self.observers {
105 o.on_failure(ctx, severity, error);
106 }
107 }
108
109 fn on_alert(
110 &self,
111 ctx: &IngestionContext,
112 severity: IngestionSeverity,
113 error: &IngestionError,
114 ) {
115 for o in &self.observers {
116 o.on_alert(ctx, severity, error);
117 }
118 }
119}
120
121#[derive(Debug, Default)]
123pub struct StdErrObserver;
124
125impl IngestionObserver for StdErrObserver {
126 fn on_success(&self, ctx: &IngestionContext, stats: IngestionStats) {
127 eprintln!(
128 "[ingest][ok] format={:?} path={} rows={}",
129 ctx.format,
130 ctx.path.display(),
131 stats.rows
132 );
133 }
134
135 fn on_failure(
136 &self,
137 ctx: &IngestionContext,
138 severity: IngestionSeverity,
139 error: &IngestionError,
140 ) {
141 eprintln!(
142 "[ingest][{:?}] format={:?} path={} err={}",
143 severity,
144 ctx.format,
145 ctx.path.display(),
146 error
147 );
148 }
149
150 fn on_alert(
151 &self,
152 ctx: &IngestionContext,
153 severity: IngestionSeverity,
154 error: &IngestionError,
155 ) {
156 eprintln!(
157 "[ALERT][ingest][{:?}] format={:?} path={} err={}",
158 severity,
159 ctx.format,
160 ctx.path.display(),
161 error
162 );
163 }
164}
165
166#[derive(Debug)]
168pub struct FileObserver {
169 path: PathBuf,
170 lock: Mutex<()>,
171}
172
173impl FileObserver {
174 pub fn new(path: impl AsRef<Path>) -> Self {
178 Self {
179 path: path.as_ref().to_path_buf(),
180 lock: Mutex::new(()),
181 }
182 }
183
184 fn append_line(&self, line: &str) {
185 let _guard = self.lock.lock().ok();
186 if let Ok(mut f) = OpenOptions::new()
187 .create(true)
188 .append(true)
189 .open(&self.path)
190 {
191 let _ = writeln!(f, "{line}");
192 }
193 }
194}
195
196impl IngestionObserver for FileObserver {
197 fn on_success(&self, ctx: &IngestionContext, stats: IngestionStats) {
198 self.append_line(&format!(
199 "{} ok format={:?} path={} rows={}",
200 unix_ts(),
201 ctx.format,
202 ctx.path.display(),
203 stats.rows
204 ));
205 }
206
207 fn on_failure(
208 &self,
209 ctx: &IngestionContext,
210 severity: IngestionSeverity,
211 error: &IngestionError,
212 ) {
213 self.append_line(&format!(
214 "{} fail severity={:?} format={:?} path={} err={}",
215 unix_ts(),
216 severity,
217 ctx.format,
218 ctx.path.display(),
219 error
220 ));
221 }
222
223 fn on_alert(
224 &self,
225 ctx: &IngestionContext,
226 severity: IngestionSeverity,
227 error: &IngestionError,
228 ) {
229 self.append_line(&format!(
230 "{} ALERT severity={:?} format={:?} path={} err={}",
231 unix_ts(),
232 severity,
233 ctx.format,
234 ctx.path.display(),
235 error
236 ));
237 }
238}
239
240fn unix_ts() -> u64 {
241 SystemTime::now()
242 .duration_since(UNIX_EPOCH)
243 .unwrap_or_default()
244 .as_secs()
245}