rust_data_processing/ingestion/
observability.rs

1use std::fmt;
2use std::fs::OpenOptions;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::sync::{Arc, Mutex};
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use crate::error::IngestionError;
9
10use super::unified::IngestionFormat;
11
12/// Severity classification used for observer callbacks and alerting thresholds.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
14pub enum IngestionSeverity {
15    /// Informational event.
16    Info,
17    /// Warning-level event (non-fatal).
18    Warning,
19    /// Error-level event (operation failed).
20    Error,
21    /// Critical error (typically I/O or other infrastructure failures).
22    Critical,
23}
24
25/// Context about an ingestion attempt.
26#[derive(Debug, Clone)]
27pub struct IngestionContext {
28    /// The input path used for ingestion.
29    pub path: PathBuf,
30    /// Format used for ingestion.
31    pub format: IngestionFormat,
32}
33
34/// Minimal stats reported on successful ingestion.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub struct IngestionStats {
37    /// Number of ingested rows.
38    pub rows: usize,
39}
40
41/// Observer interface for ingestion outcomes.
42///
43/// Implementors can record metrics, logs, or trigger alerts.
44pub trait IngestionObserver: Send + Sync {
45    /// Called when ingestion succeeds.
46    fn on_success(&self, _ctx: &IngestionContext, _stats: IngestionStats) {}
47
48    /// Called when ingestion fails.
49    fn on_failure(
50        &self,
51        _ctx: &IngestionContext,
52        _severity: IngestionSeverity,
53        _error: &IngestionError,
54    ) {
55    }
56
57    /// Called when an ingestion failure meets an alert threshold.
58    ///
59    /// Default behavior forwards to [`Self::on_failure`].
60    fn on_alert(
61        &self,
62        ctx: &IngestionContext,
63        severity: IngestionSeverity,
64        error: &IngestionError,
65    ) {
66        self.on_failure(ctx, severity, error)
67    }
68}
69
70/// An observer that fans out callbacks to a list of observers.
71#[derive(Default)]
72pub struct CompositeObserver {
73    observers: Vec<Arc<dyn IngestionObserver>>,
74}
75
76impl CompositeObserver {
77    /// Create a new composite observer from a list of observers.
78    pub fn new(observers: Vec<Arc<dyn IngestionObserver>>) -> Self {
79        Self { observers }
80    }
81}
82
83impl fmt::Debug for CompositeObserver {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("CompositeObserver")
86            .field("observers_len", &self.observers.len())
87            .finish()
88    }
89}
90
91impl IngestionObserver for CompositeObserver {
92    fn on_success(&self, ctx: &IngestionContext, stats: IngestionStats) {
93        for o in &self.observers {
94            o.on_success(ctx, stats);
95        }
96    }
97
98    fn on_failure(
99        &self,
100        ctx: &IngestionContext,
101        severity: IngestionSeverity,
102        error: &IngestionError,
103    ) {
104        for o in &self.observers {
105            o.on_failure(ctx, severity, error);
106        }
107    }
108
109    fn on_alert(
110        &self,
111        ctx: &IngestionContext,
112        severity: IngestionSeverity,
113        error: &IngestionError,
114    ) {
115        for o in &self.observers {
116            o.on_alert(ctx, severity, error);
117        }
118    }
119}
120
121/// Logs ingestion events to stderr.
122#[derive(Debug, Default)]
123pub struct StdErrObserver;
124
125impl IngestionObserver for StdErrObserver {
126    fn on_success(&self, ctx: &IngestionContext, stats: IngestionStats) {
127        eprintln!(
128            "[ingest][ok] format={:?} path={} rows={}",
129            ctx.format,
130            ctx.path.display(),
131            stats.rows
132        );
133    }
134
135    fn on_failure(
136        &self,
137        ctx: &IngestionContext,
138        severity: IngestionSeverity,
139        error: &IngestionError,
140    ) {
141        eprintln!(
142            "[ingest][{:?}] format={:?} path={} err={}",
143            severity,
144            ctx.format,
145            ctx.path.display(),
146            error
147        );
148    }
149
150    fn on_alert(
151        &self,
152        ctx: &IngestionContext,
153        severity: IngestionSeverity,
154        error: &IngestionError,
155    ) {
156        eprintln!(
157            "[ALERT][ingest][{:?}] format={:?} path={} err={}",
158            severity,
159            ctx.format,
160            ctx.path.display(),
161            error
162        );
163    }
164}
165
166/// Appends ingestion events to a local log file.
167#[derive(Debug)]
168pub struct FileObserver {
169    path: PathBuf,
170    lock: Mutex<()>,
171}
172
173impl FileObserver {
174    /// Create a file observer that appends events to `path`.
175    ///
176    /// Writes are best-effort; failures to open/write the log file are ignored.
177    pub fn new(path: impl AsRef<Path>) -> Self {
178        Self {
179            path: path.as_ref().to_path_buf(),
180            lock: Mutex::new(()),
181        }
182    }
183
184    fn append_line(&self, line: &str) {
185        let _guard = self.lock.lock().ok();
186        if let Ok(mut f) = OpenOptions::new()
187            .create(true)
188            .append(true)
189            .open(&self.path)
190        {
191            let _ = writeln!(f, "{line}");
192        }
193    }
194}
195
196impl IngestionObserver for FileObserver {
197    fn on_success(&self, ctx: &IngestionContext, stats: IngestionStats) {
198        self.append_line(&format!(
199            "{} ok format={:?} path={} rows={}",
200            unix_ts(),
201            ctx.format,
202            ctx.path.display(),
203            stats.rows
204        ));
205    }
206
207    fn on_failure(
208        &self,
209        ctx: &IngestionContext,
210        severity: IngestionSeverity,
211        error: &IngestionError,
212    ) {
213        self.append_line(&format!(
214            "{} fail severity={:?} format={:?} path={} err={}",
215            unix_ts(),
216            severity,
217            ctx.format,
218            ctx.path.display(),
219            error
220        ));
221    }
222
223    fn on_alert(
224        &self,
225        ctx: &IngestionContext,
226        severity: IngestionSeverity,
227        error: &IngestionError,
228    ) {
229        self.append_line(&format!(
230            "{} ALERT severity={:?} format={:?} path={} err={}",
231            unix_ts(),
232            severity,
233            ctx.format,
234            ctx.path.display(),
235            error
236        ));
237    }
238}
239
240fn unix_ts() -> u64 {
241    SystemTime::now()
242        .duration_since(UNIX_EPOCH)
243        .unwrap_or_default()
244        .as_secs()
245}