rust_data_processing/cdc/
mod.rs

1//! CDC interface boundary (Phase 1 spike).
2//!
3//! This module intentionally **does not** depend on a specific CDC implementation crate.
4//! It defines the public boundary types we would accept/produce if we add CDC in Phase 2.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use rust_data_processing::cdc::{CdcEvent, CdcOp, RowImage, SourceMeta, TableRef};
10//! use rust_data_processing::types::Value;
11//!
12//! let ev = CdcEvent {
13//!     meta: SourceMeta { source: Some("db".to_string()), checkpoint: None },
14//!     table: TableRef::with_schema("public", "users"),
15//!     op: CdcOp::Insert,
16//!     before: None,
17//!     after: Some(RowImage::new(vec![
18//!         ("id".to_string(), Value::Int64(1)),
19//!         ("name".to_string(), Value::Utf8("Ada".to_string())),
20//!     ])),
21//! };
22//!
23//! assert_eq!(ev.op, CdcOp::Insert);
24//! ```
25
26use crate::types::Value;
27
28/// The operation represented by a CDC event.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum CdcOp {
31    Insert,
32    Update,
33    Delete,
34    Truncate,
35}
36
37/// Identifies a table in a source database.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct TableRef {
40    pub schema: Option<String>,
41    pub name: String,
42}
43
44impl TableRef {
45    pub fn new(name: impl Into<String>) -> Self {
46        Self {
47            schema: None,
48            name: name.into(),
49        }
50    }
51
52    pub fn with_schema(schema: impl Into<String>, name: impl Into<String>) -> Self {
53        Self {
54            schema: Some(schema.into()),
55            name: name.into(),
56        }
57    }
58}
59
60/// A single row image.
61///
62/// We keep values as an ordered list (not a map) to preserve deterministic ordering and to allow
63/// duplicate column names to be rejected by ingestion-time validation if desired.
64#[derive(Debug, Clone, PartialEq)]
65pub struct RowImage {
66    pub values: Vec<(String, Value)>,
67}
68
69impl RowImage {
70    pub fn new(values: Vec<(String, Value)>) -> Self {
71        Self { values }
72    }
73}
74
75/// A cursor/checkpoint used to resume CDC consumption.
76///
77/// This is intentionally opaque; different CDC implementations use different notions (LSN, GTID, etc.).
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct CdcCheckpoint(pub String);
80
81/// Minimal metadata common across CDC sources.
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub struct SourceMeta {
84    /// A human-friendly source identifier (e.g. connection name).
85    pub source: Option<String>,
86    /// Best-effort checkpoint token associated with this event.
87    pub checkpoint: Option<CdcCheckpoint>,
88}
89
90/// A single change event.
91#[derive(Debug, Clone, PartialEq)]
92pub struct CdcEvent {
93    pub meta: SourceMeta,
94    pub table: TableRef,
95    pub op: CdcOp,
96    /// Before image (typically present for UPDATE/DELETE).
97    pub before: Option<RowImage>,
98    /// After image (typically present for INSERT/UPDATE).
99    pub after: Option<RowImage>,
100}
101
102/// Batch-oriented CDC source boundary.
103///
104/// Decision (Phase 1): batch-first boundary avoids forcing async/runtime choice into the crate.
105pub trait CdcSource {
106    type Error: std::error::Error + Send + Sync + 'static;
107
108    /// Fetch the next batch of CDC events.
109    ///
110    /// - `Ok(None)` means "clean end" (source exhausted / stopped).
111    /// - `Ok(Some(batch))` yields a non-empty batch.
112    fn next_batch(&mut self) -> Result<Option<Vec<CdcEvent>>, Self::Error>;
113}