rust_data_processing/cdc/mod.rs
1//! CDC interface boundary (Phase 1 spike).
2//!
3//! This module intentionally **does not** depend on a specific CDC implementation crate.
4//! It defines the public boundary types we would accept/produce if we add CDC in Phase 2.
5//!
6//! ## Example
7//!
8//! ```rust
9//! use rust_data_processing::cdc::{CdcEvent, CdcOp, RowImage, SourceMeta, TableRef};
10//! use rust_data_processing::types::Value;
11//!
12//! let ev = CdcEvent {
13//! meta: SourceMeta { source: Some("db".to_string()), checkpoint: None },
14//! table: TableRef::with_schema("public", "users"),
15//! op: CdcOp::Insert,
16//! before: None,
17//! after: Some(RowImage::new(vec![
18//! ("id".to_string(), Value::Int64(1)),
19//! ("name".to_string(), Value::Utf8("Ada".to_string())),
20//! ])),
21//! };
22//!
23//! assert_eq!(ev.op, CdcOp::Insert);
24//! ```
25
26use crate::types::Value;
27
28/// The operation represented by a CDC event.
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub enum CdcOp {
31 Insert,
32 Update,
33 Delete,
34 Truncate,
35}
36
37/// Identifies a table in a source database.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct TableRef {
40 pub schema: Option<String>,
41 pub name: String,
42}
43
44impl TableRef {
45 pub fn new(name: impl Into<String>) -> Self {
46 Self {
47 schema: None,
48 name: name.into(),
49 }
50 }
51
52 pub fn with_schema(schema: impl Into<String>, name: impl Into<String>) -> Self {
53 Self {
54 schema: Some(schema.into()),
55 name: name.into(),
56 }
57 }
58}
59
60/// A single row image.
61///
62/// We keep values as an ordered list (not a map) to preserve deterministic ordering and to allow
63/// duplicate column names to be rejected by ingestion-time validation if desired.
64#[derive(Debug, Clone, PartialEq)]
65pub struct RowImage {
66 pub values: Vec<(String, Value)>,
67}
68
69impl RowImage {
70 pub fn new(values: Vec<(String, Value)>) -> Self {
71 Self { values }
72 }
73}
74
75/// A cursor/checkpoint used to resume CDC consumption.
76///
77/// This is intentionally opaque; different CDC implementations use different notions (LSN, GTID, etc.).
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct CdcCheckpoint(pub String);
80
81/// Minimal metadata common across CDC sources.
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub struct SourceMeta {
84 /// A human-friendly source identifier (e.g. connection name).
85 pub source: Option<String>,
86 /// Best-effort checkpoint token associated with this event.
87 pub checkpoint: Option<CdcCheckpoint>,
88}
89
90/// A single change event.
91#[derive(Debug, Clone, PartialEq)]
92pub struct CdcEvent {
93 pub meta: SourceMeta,
94 pub table: TableRef,
95 pub op: CdcOp,
96 /// Before image (typically present for UPDATE/DELETE).
97 pub before: Option<RowImage>,
98 /// After image (typically present for INSERT/UPDATE).
99 pub after: Option<RowImage>,
100}
101
102/// Batch-oriented CDC source boundary.
103///
104/// Decision (Phase 1): batch-first boundary avoids forcing async/runtime choice into the crate.
105pub trait CdcSource {
106 type Error: std::error::Error + Send + Sync + 'static;
107
108 /// Fetch the next batch of CDC events.
109 ///
110 /// - `Ok(None)` means "clean end" (source exhausted / stopped).
111 /// - `Ok(Some(batch))` yields a non-empty batch.
112 fn next_batch(&mut self) -> Result<Option<Vec<CdcEvent>>, Self::Error>;
113}