mz_sql_server_util/cdc.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Replicate a table from SQL Server using their Change-Data-Capture (CDC) primitives.
11//!
12//! This module provides a [`CdcStream`] type that provides the following API for
13//! replicating a table:
14//!
15//! 1. [`CdcStream::snapshot`] returns an initial snapshot of a table and the [`Lsn`] at
16//! which the snapshot was taken.
17//! 2. [`CdcStream::into_stream`] returns a [`futures::Stream`] of [`CdcEvent`]s
18//! optionally from the [`Lsn`] returned in step 1.
19//!
20//! The snapshot process is responsible for identifying an [`Lsn`] that corresponds to
21//! a point-in-time view of the data for the table(s) being copied. Similarly to
22//! MySQL, Microsoft SQL server, as far as we know, does not provide an API to
23//! achieve this.
24//!
25//! SQL Server `SNAPSHOT` isolation provides guarantees that a reader will only
26//! see writes committed before the transaction began. More specficially, this
27//! snapshot is implemented using versions that are visibile based on the
28//! transaction sequence number (`XSN`). The `XSN` is set at the first
29//! read or write, not at `BEGIN TRANSACTION`, see [here](https://learn.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-locking-and-row-versioning-guide?view=sql-server-ver17).
30//! This provides us a suitable starting point for capturing the table data.
31//! To force an `XSN` to be assigned, experiments have shown that a table must
32//! be read. We choose a well-known table that we should already have access to,
33//! [cdc.change_tables](https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-change-tables-transact-sql?view=sql-server-ver17),
34//! and read a single value from it.
35//!
36//! Due to the asynchronous nature of CDC, we can assume that the [`Lsn`]
37//! returned from any CDC tables or CDC functions will always be stale,
38//! in relation to the source table that CDC is tracking. The system table
39//! [sys.dm_tran_database_transactions](https://learn.microsoft.com/en-us/sql/relational-databases/system-dynamic-management-views/sys-dm-tran-database-transactions-transact-sql?view=sql-server-ver17)
40//! will contain an [`Lsn`] for any transaction that performs a write operation.
41//! Creating a savepoint using [SAVE TRANSACTION](https://learn.microsoft.com/en-us/sql/t-sql/language-elements/save-transaction-transact-sql?view=sql-server-ver17)
42//! is sufficient to generate an [`Lsn`] in this case.
43//!
44//! To ensure that the the point-in-time view is established atomically with
45//! collection of the [`Lsn`], we lock the tables to prevent writes from being
46//! interleaved between the 2 commands (read to establish `XSN` and creation of
47//! the savepoint).
48//!
49//! SQL server supports table locks, but those will only be released
50//! once the outermost transaction completes. For this reason, this module
51//! uses two connections for the snapshot process. The first connection is used
52//! to initiate a transaction and lock the upstream tables under
53//! [`TransactionIsolationLevel::ReadCommitted`] isolation. While the first
54//! connection maintains the locks, the second connection starts a
55//! transaction with [`TransactionIsolationLevel::Snapshot`] isolation and
56//! creates a savepoint. Once the savepoint is created, SQL server has assigned
57//! an [`Lsn`] and the the first connection rolls back the transaction.
58//! The [`Lsn`] and snapshot are captured by the second connection within the
59//! existing transaction.
60//!
61//! After completing the snapshot we use [`crate::inspect::get_changes_asc`] which will return
62//! all changes between a `[lower, upper)` bound of [`Lsn`]s.
63
64use std::collections::BTreeMap;
65use std::fmt;
66use std::sync::Arc;
67use std::time::Duration;
68
69use derivative::Derivative;
70use futures::{Stream, StreamExt};
71use mz_repr::GlobalId;
72use proptest_derive::Arbitrary;
73use serde::{Deserialize, Serialize};
74use tiberius::numeric::Numeric;
75
76use crate::desc::{SqlServerQualifiedTableName, SqlServerTableRaw};
77use crate::inspect::DDLEvent;
78use crate::{Client, SqlServerCdcMetrics, SqlServerError, TransactionIsolationLevel};
79
80/// A stream of changes from a table in SQL Server that has CDC enabled.
81///
82/// SQL Server does not have an API to push or notify consumers of changes, so we periodically
83/// poll the upstream source.
84///
85/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/change-data-capture-tables-transact-sql?view=sql-server-ver16>
86pub struct CdcStream<'a, M: SqlServerCdcMetrics> {
87 /// Client we use for querying SQL Server.
88 client: &'a mut Client,
89 /// Upstream capture instances we'll list changes from.
90 capture_instances: BTreeMap<Arc<str>, Option<Lsn>>,
91 /// How often we poll the upstream for changes.
92 poll_interval: Duration,
93 /// How long we'll wait for SQL Server to return a max LSN before taking a snapshot.
94 ///
95 /// Note: When CDC is first enabled in an instance of SQL Server it can take a moment
96 /// for it to "completely" startup. Before starting a `TRANSACTION` for our snapshot
97 /// we'll wait this duration for SQL Server to report an [`Lsn`] and thus indicate CDC is
98 /// ready to go.
99 max_lsn_wait: Duration,
100 /// Metrics.
101 metrics: M,
102}
103
104impl<'a, M: SqlServerCdcMetrics> CdcStream<'a, M> {
105 pub(crate) fn new(
106 client: &'a mut Client,
107 capture_instances: BTreeMap<Arc<str>, Option<Lsn>>,
108 metrics: M,
109 ) -> Self {
110 CdcStream {
111 client,
112 capture_instances,
113 poll_interval: Duration::from_secs(1),
114 max_lsn_wait: Duration::from_secs(10),
115 metrics,
116 }
117 }
118
119 /// Set the [`Lsn`] that we should start streaming changes from.
120 ///
121 /// If the provided [`Lsn`] is not available, the stream will return an error
122 /// when first polled.
123 pub fn start_lsn(mut self, capture_instance: &str, lsn: Lsn) -> Self {
124 let start_lsn = self
125 .capture_instances
126 .get_mut(capture_instance)
127 .expect("capture instance does not exist");
128 *start_lsn = Some(lsn);
129 self
130 }
131
132 /// The cadence at which we'll poll the upstream SQL Server database for changes.
133 ///
134 /// Default is 1 second.
135 pub fn poll_interval(mut self, interval: Duration) -> Self {
136 self.poll_interval = interval;
137 self
138 }
139
140 /// The max duration we'll wait for SQL Server to return an [`Lsn`] before taking a
141 /// snapshot.
142 ///
143 /// When CDC is first enabled in SQL Server it can take a moment before it is fully
144 /// setup and starts reporting LSNs.
145 ///
146 /// Default is 10 seconds.
147 pub fn max_lsn_wait(mut self, wait: Duration) -> Self {
148 self.max_lsn_wait = wait;
149 self
150 }
151
152 /// Takes a snapshot of the upstream table that the specified `table` represents.
153 pub async fn snapshot<'b>(
154 &'b mut self,
155 table: &SqlServerTableRaw,
156 worker_id: usize,
157 source_id: GlobalId,
158 ) -> Result<
159 (
160 Lsn,
161 impl Stream<Item = Result<tiberius::Row, SqlServerError>>,
162 ),
163 SqlServerError,
164 > {
165 static SAVEPOINT_NAME: &str = "_mz_snap_";
166
167 // The client that will be used for fencing does not need any special isolation level
168 // as it will be just be locking the table(s).
169 let mut fencing_client = self.client.new_connection().await?;
170 let mut fence_txn = fencing_client.transaction().await?;
171 let qualified_table_name = format!(
172 "{schema_name}.{table_name}",
173 schema_name = &table.schema_name,
174 table_name = &table.name
175 );
176 self.metrics
177 .snapshot_table_lock_start(&qualified_table_name);
178 fence_txn
179 .lock_table_shared(&table.schema_name, &table.name)
180 .await?;
181 tracing::info!(%source_id, %table.schema_name, %table.name, "timely-{worker_id} locked table");
182
183 self.client
184 .set_transaction_isolation(TransactionIsolationLevel::Snapshot)
185 .await?;
186 let mut txn = self.client.transaction().await?;
187 // Creating a savepoint forces a write to the transaction log, which will
188 // assign an LSN, but it does not force a transaction sequence number to be
189 // assigned as far as I can tell. I have not observed any entries added to
190 // `sys.dm_tran_active_snapshot_database_transactions` when creating a savepoint
191 // or when reading system views to retrieve the LSN.
192 //
193 // We choose cdc.change_tables because it is a system table that will exist
194 // when CDC is enabled, it has a well known schema, and as a CDC client,
195 // we should be able to read from it already.
196 let res = txn
197 .simple_query("SELECT TOP 1 object_id FROM cdc.change_tables")
198 .await?;
199 if res.len() != 1 {
200 Err(SqlServerError::InvariantViolated(
201 "No objects found in cdc.change_tables".into(),
202 ))?
203 }
204
205 // Because the table is locked, any write operation has either
206 // completed, or is blocked. The LSN and XSN acquired now will represent a
207 // consistent point-in-time view, such that any committed write will be
208 // visible to this snapshot and the LSN of such a write will be less than
209 // or equal to the LSN captured here. Creating the savepoint sets the LSN,
210 // we can read it after rolling back the locks.
211 txn.create_savepoint(SAVEPOINT_NAME).await?;
212 tracing::info!(%source_id, %table.schema_name, %table.name, %SAVEPOINT_NAME, "timely-{worker_id} created savepoint");
213
214 // Once the savepoint is created (which establishes the XSN and captures the LSN),
215 // the table no longer needs to be locked. Any writes that happen to the upstream table
216 // will have an LSN higher than our captured LSN, and will be read from CDC.
217 fence_txn.rollback().await?;
218 self.metrics.snapshot_table_lock_end(&qualified_table_name);
219 let lsn = txn.get_lsn().await?;
220
221 tracing::info!(%source_id, ?lsn, "timely-{worker_id} starting snapshot");
222 let rows = async_stream::try_stream! {
223 {
224 let snapshot_stream = crate::inspect::snapshot(txn.client, table);
225 tokio::pin!(snapshot_stream);
226
227 while let Some(row) = snapshot_stream.next().await {
228 yield row?;
229 }
230 }
231
232 txn.rollback().await?
233 };
234
235 Ok((lsn, rows))
236 }
237
238 /// Consume `self` returning a [`Stream`] of [`CdcEvent`]s.
239 pub fn into_stream(
240 mut self,
241 ) -> impl Stream<Item = Result<CdcEvent, SqlServerError>> + use<'a, M> {
242 async_stream::try_stream! {
243 // Initialize all of our start LSNs.
244 self.initialize_start_lsns().await?;
245
246 // When starting the stream we'll emit one progress event if we've already observed
247 // everything the DB currently has.
248 if let Some(starting_lsn) = self.capture_instances.values().filter_map(|x| *x).min() {
249 let db_curr_lsn = crate::inspect::get_max_lsn(self.client).await?;
250 let next_lsn = db_curr_lsn.increment();
251 if starting_lsn >= db_curr_lsn {
252 tracing::debug!(
253 %starting_lsn,
254 %db_curr_lsn,
255 %next_lsn,
256 "yielding initial progress",
257 );
258 yield CdcEvent::Progress { next_lsn };
259 }
260 }
261
262 loop {
263 // Measure the tick before we do any operation so the time it takes
264 // to query SQL Server is included in the time that we wait.
265 let next_tick = tokio::time::Instant::now()
266 .checked_add(self.poll_interval)
267 .expect("tick interval overflowed!");
268
269 // We always check for changes based on the "global" minimum LSN of any
270 // one capture instance.
271 let maybe_curr_lsn = self.capture_instances.values().filter_map(|x| *x).min();
272 let Some(curr_lsn) = maybe_curr_lsn else {
273 tracing::warn!("shutting down CDC stream because nothing to replicate");
274 break;
275 };
276
277 // Get the max LSN for the DB.
278 let db_max_lsn = crate::inspect::get_max_lsn(self.client).await?;
279 tracing::debug!(?db_max_lsn, ?curr_lsn, "got max LSN");
280
281 // If the LSN of the DB has increased then get all of our changes.
282 if db_max_lsn > curr_lsn {
283 for (instance, instance_lsn) in &self.capture_instances {
284 let Some(instance_lsn) = instance_lsn.as_ref() else {
285 tracing::error!(?instance, "found uninitialized LSN!");
286 continue;
287 };
288
289 // We've already replicated everything up-to db_max_lsn, so
290 // nothing to do.
291 if db_max_lsn < *instance_lsn {
292 continue;
293 }
294
295 {
296 // Get a stream of all the changes for the current instance.
297 let changes = crate::inspect::get_changes_asc(
298 self.client,
299 &*instance,
300 *instance_lsn,
301 db_max_lsn,
302 RowFilterOption::AllUpdateOld,
303 )
304 // TODO(sql_server3): Make this chunk size configurable.
305 .ready_chunks(64);
306 let mut changes = std::pin::pin!(changes);
307
308 // Map and stream all the rows to our listener.
309 while let Some(chunk) = changes.next().await {
310 // Group events by LSN.
311 //
312 // TODO(sql_server3): Can we maybe re-use this BTreeMap or these Vec
313 // allocations? Something to be careful of is shrinking the allocations
314 // if/when they grow to large, e.g. from a large spike of changes.
315 // Alternatively we could also use a single Vec here since we know the
316 // changes are ordered by LSN.
317 let mut events: BTreeMap<Lsn, Vec<Operation>> = BTreeMap::default();
318 for change in chunk {
319 let (lsn, operation) = change.and_then(Operation::try_parse)?;
320 events.entry(lsn).or_default().push(operation);
321 }
322
323 // Emit the groups of events.
324 for (lsn, changes) in events {
325 yield CdcEvent::Data {
326 capture_instance: Arc::clone(instance),
327 lsn,
328 changes,
329 };
330 }
331 }
332 }
333
334 let ddl_history = crate::inspect::get_ddl_history(
335 self.client, instance, instance_lsn, &db_max_lsn,
336 ).await?;
337 for (table, ddl_events) in ddl_history {
338 for ddl_event in ddl_events {
339 yield CdcEvent::SchemaUpdate {
340 capture_instance: Arc::clone(instance),
341 table: table.clone(),
342 ddl_event
343 }
344 }
345 }
346 }
347
348 // Increment our LSN (`get_changes` is inclusive).
349 //
350 // TODO(sql_server2): We should occassionally check to see how close the LSN we
351 // generate is to the LSN returned from incrementing via SQL Server itself.
352 let next_lsn = db_max_lsn.increment();
353 tracing::debug!(?curr_lsn, ?next_lsn, "incrementing LSN");
354
355 // Notify our listener that we've emitted all changes __less than__ this LSN.
356 //
357 // Note: This aligns well with timely's semantics of progress tracking.
358 yield CdcEvent::Progress { next_lsn };
359
360 // We just listed everything upto next_lsn.
361 for instance_lsn in self.capture_instances.values_mut() {
362 let instance_lsn = instance_lsn.as_mut().expect("should be initialized");
363 // Ensure LSNs don't go backwards.
364 *instance_lsn = std::cmp::max(*instance_lsn, next_lsn);
365 }
366 }
367
368 tokio::time::sleep_until(next_tick).await;
369 }
370 }
371 }
372
373 /// Determine the [`Lsn`] to start streaming changes from.
374 async fn initialize_start_lsns(&mut self) -> Result<(), SqlServerError> {
375 // First, initialize all start LSNs. If a capture instance didn't have
376 // one specified then we'll start from the current max.
377 let max_lsn = crate::inspect::get_max_lsn(self.client).await?;
378 for (_instance, requsted_lsn) in self.capture_instances.iter_mut() {
379 if requsted_lsn.is_none() {
380 requsted_lsn.replace(max_lsn);
381 }
382 }
383
384 // For each instance, ensure their requested LSN is available.
385 for (instance, requested_lsn) in self.capture_instances.iter() {
386 let requested_lsn = requested_lsn
387 .as_ref()
388 .expect("initialized all values above");
389
390 // Get the minimum Lsn available for this instance.
391 let available_lsn = crate::inspect::get_min_lsn(self.client, &*instance).await?;
392
393 // If we cannot start at our desired LSN, we must return an error!.
394 if *requested_lsn < available_lsn {
395 return Err(CdcError::LsnNotAvailable {
396 capture_instance: Arc::clone(instance),
397 requested: *requested_lsn,
398 minimum: available_lsn,
399 }
400 .into());
401 }
402 }
403
404 Ok(())
405 }
406
407 /// If CDC was recently enabled on an instance of SQL Server then it will report
408 /// `NULL` for the minimum LSN of a capture instance and/or the maximum LSN of the
409 /// entire database.
410 ///
411 /// This method runs a retry loop that waits for the upstream DB to report good
412 /// values. It should be called before taking the initial [`CdcStream::snapshot`]
413 /// to ensure the system is ready to proceed with CDC.
414 pub async fn wait_for_ready(&mut self) -> Result<(), SqlServerError> {
415 // Ensure all of the capture instances are reporting an LSN.
416 for instance in self.capture_instances.keys() {
417 crate::inspect::get_min_lsn_retry(self.client, instance, self.max_lsn_wait).await?;
418 }
419
420 // Ensure the database is reporting a max LSN.
421 crate::inspect::get_max_lsn_retry(self.client, self.max_lsn_wait).await?;
422
423 Ok(())
424 }
425}
426
427/// A change event from a [`CdcStream`].
428#[derive(Derivative)]
429#[derivative(Debug)]
430pub enum CdcEvent {
431 /// Changes have occurred upstream.
432 Data {
433 /// The capture instance these changes are for.
434 capture_instance: Arc<str>,
435 /// The LSN that this change occurred at.
436 lsn: Lsn,
437 /// The change itself.
438 changes: Vec<Operation>,
439 },
440 /// We've made progress and observed all the changes less than `next_lsn`.
441 Progress {
442 /// We've received all of the data for [`Lsn`]s __less than__ this one.
443 next_lsn: Lsn,
444 },
445 /// DDL change has occured for the upstream table.
446 SchemaUpdate {
447 /// The capture instance.
448 capture_instance: Arc<str>,
449 /// The upstream table that was updated.
450 table: SqlServerQualifiedTableName,
451 /// DDL event
452 ddl_event: DDLEvent,
453 },
454}
455
456#[derive(Debug, thiserror::Error)]
457pub enum CdcError {
458 #[error(
459 "the requested LSN '{requested:?}' is less than the minimum '{minimum:?}' for `{capture_instance}'"
460 )]
461 LsnNotAvailable {
462 capture_instance: Arc<str>,
463 requested: Lsn,
464 minimum: Lsn,
465 },
466 #[error("failed to get the required column '{column_name}': {error}")]
467 RequiredColumn {
468 column_name: &'static str,
469 error: String,
470 },
471 #[error("failed to cleanup values for '{capture_instance}' at {low_water_mark}")]
472 CleanupFailed {
473 capture_instance: String,
474 low_water_mark: Lsn,
475 },
476}
477
478/// This type is used to represent the progress of each SQL Server instance in
479/// the ingestion dataflow.
480///
481/// A SQL Server LSN is a three part "number" that provides a __total order__
482/// to all transations within a database. Interally we don't really care what
483/// these parts mean, but they are:
484///
485/// 1. A Virtual Log File (VLF) sequence number, bytes [0, 4)
486/// 2. Log block number, bytes [4, 8)
487/// 3. Log record number, bytes [8, 10)
488///
489/// For more info on log sequence numbers in SQL Server see:
490/// <https://learn.microsoft.com/en-us/sql/relational-databases/sql-server-transaction-log-architecture-and-management-guide?view=sql-server-ver16#Logical_Arch>
491///
492/// Note: The derived impl of [`PartialOrd`] and [`Ord`] relies on the field
493/// ordering so do not change it.
494#[derive(
495 Default,
496 Copy,
497 Clone,
498 Debug,
499 Eq,
500 PartialEq,
501 PartialOrd,
502 Ord,
503 Hash,
504 Serialize,
505 Deserialize,
506 Arbitrary
507)]
508pub struct Lsn {
509 /// Virtual Log File sequence number.
510 pub vlf_id: u32,
511 /// Log block number.
512 pub block_id: u32,
513 /// Log record number.
514 pub record_id: u16,
515}
516
517impl Lsn {
518 const SIZE: usize = 10;
519
520 /// Interpret the provided bytes as an [`Lsn`].
521 pub fn try_from_bytes(bytes: &[u8]) -> Result<Self, String> {
522 if bytes.len() != Self::SIZE {
523 return Err(format!("incorrect length, expected 10 got {}", bytes.len()));
524 }
525
526 let vlf_id: [u8; 4] = bytes[0..4].try_into().expect("known good length");
527 let block_id: [u8; 4] = bytes[4..8].try_into().expect("known good length");
528 let record_id: [u8; 2] = bytes[8..].try_into().expect("known good length");
529
530 Ok(Lsn {
531 vlf_id: u32::from_be_bytes(vlf_id),
532 block_id: u32::from_be_bytes(block_id),
533 record_id: u16::from_be_bytes(record_id),
534 })
535 }
536
537 /// Return the underlying byte slice for this [`Lsn`].
538 pub fn as_bytes(&self) -> [u8; 10] {
539 let mut raw: [u8; Self::SIZE] = [0; 10];
540
541 raw[0..4].copy_from_slice(&self.vlf_id.to_be_bytes());
542 raw[4..8].copy_from_slice(&self.block_id.to_be_bytes());
543 raw[8..].copy_from_slice(&self.record_id.to_be_bytes());
544
545 raw
546 }
547
548 /// Increment this [`Lsn`].
549 ///
550 /// The returned [`Lsn`] may not exist upstream yet, but it's guaranteed to
551 /// sort greater than `self`.
552 pub fn increment(self) -> Lsn {
553 let (record_id, carry) = self.record_id.overflowing_add(1);
554 let (block_id, carry) = self.block_id.overflowing_add(carry.into());
555 let (vlf_id, overflow) = self.vlf_id.overflowing_add(carry.into());
556 assert!(!overflow, "overflowed Lsn, {self:?}");
557
558 Lsn {
559 vlf_id,
560 block_id,
561 record_id,
562 }
563 }
564
565 /// Drops the `record_id` portion of the [`Lsn`] so we can fit an "abbreviation"
566 /// of this [`Lsn`] into a [`u64`], without losing the total order.
567 pub fn abbreviate(&self) -> u64 {
568 let mut abbreviated: u64 = 0;
569
570 #[allow(clippy::as_conversions)]
571 {
572 abbreviated += (self.vlf_id as u64) << 32;
573 abbreviated += self.block_id as u64;
574 }
575
576 abbreviated
577 }
578}
579
580impl fmt::Display for Lsn {
581 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
582 write!(f, "{}:{}:{}", self.vlf_id, self.block_id, self.record_id)
583 }
584}
585
586impl TryFrom<&[u8]> for Lsn {
587 type Error = String;
588
589 fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
590 Lsn::try_from_bytes(value)
591 }
592}
593
594impl TryFrom<Numeric> for Lsn {
595 type Error = String;
596
597 fn try_from(value: Numeric) -> Result<Self, Self::Error> {
598 if value.dec_part() != 0 {
599 return Err(format!(
600 "LSN expect Numeric(25,0), but found decimal portion {}",
601 value.dec_part()
602 ));
603 }
604 let mut decimal_lsn = value.int_part();
605 // LSN is composed of 4 bytes : 4 bytes : 2 bytes
606 // and MS provided the method to decode that here
607 // https://github.com/microsoft/sql-server-samples/blob/master/samples/features/ssms-templates/Sql/Change%20Data%20Capture/Enumeration/Create%20Function%20fn_convertnumericlsntobinary.sql
608
609 let vlf_id = u32::try_from(decimal_lsn / 10_i128.pow(15))
610 .map_err(|e| format!("Failed to decode vlf_id for lsn {decimal_lsn}: {e:?}"))?;
611 decimal_lsn -= i128::from(vlf_id) * 10_i128.pow(15);
612
613 let block_id = u32::try_from(decimal_lsn / 10_i128.pow(5))
614 .map_err(|e| format!("Failed to decode block_id for lsn {decimal_lsn}: {e:?}"))?;
615 decimal_lsn -= i128::from(block_id) * 10_i128.pow(5);
616
617 let record_id = u16::try_from(decimal_lsn)
618 .map_err(|e| format!("Failed to decode record_id for lsn {decimal_lsn}: {e:?}"))?;
619
620 Ok(Lsn {
621 vlf_id,
622 block_id,
623 record_id,
624 })
625 }
626}
627
628impl columnation::Columnation for Lsn {
629 type InnerRegion = columnation::CopyRegion<Lsn>;
630}
631
632impl timely::progress::Timestamp for Lsn {
633 // No need to describe complex summaries.
634 type Summary = ();
635
636 fn minimum() -> Self {
637 Lsn::default()
638 }
639}
640
641impl timely::progress::PathSummary<Lsn> for () {
642 fn results_in(&self, src: &Lsn) -> Option<Lsn> {
643 Some(*src)
644 }
645
646 fn followed_by(&self, _other: &Self) -> Option<Self> {
647 Some(())
648 }
649}
650
651impl timely::progress::timestamp::Refines<()> for Lsn {
652 fn to_inner(_other: ()) -> Self {
653 use timely::progress::Timestamp;
654 Self::minimum()
655 }
656 fn to_outer(self) -> () {}
657
658 fn summarize(_path: <Self as timely::progress::Timestamp>::Summary) -> () {}
659}
660
661impl timely::order::PartialOrder for Lsn {
662 fn less_equal(&self, other: &Self) -> bool {
663 self <= other
664 }
665
666 fn less_than(&self, other: &Self) -> bool {
667 self < other
668 }
669}
670impl timely::order::TotalOrder for Lsn {}
671
672/// Structured format of an [`Lsn`].
673///
674/// Note: The derived impl of [`PartialOrd`] and [`Ord`] relies on the field
675/// ordering so do not change it.
676#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
677pub struct StructuredLsn {
678 vlf_id: u32,
679 block_id: u32,
680 record_id: u16,
681}
682
683/// When querying CDC functions like `cdc.fn_cdc_get_all_changes_<capture_instance>` this governs
684/// what content is returned.
685///
686/// Note: There exists another option `All` that exclude the _before_ value from an `UPDATE`. We
687/// don't support this for SQL Server sources yet, so it's not included in this enum.
688///
689/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/cdc-fn-cdc-get-all-changes-capture-instance-transact-sql?view=sql-server-ver16#row_filter_option>
690#[derive(Debug, Copy, Clone)]
691pub enum RowFilterOption {
692 /// Includes both the before and after values of a row when changed because of an `UPDATE`.
693 AllUpdateOld,
694}
695
696impl RowFilterOption {
697 /// Returns this option formatted in a way that can be used in a query.
698 pub fn to_sql_string(&self) -> &'static str {
699 match self {
700 RowFilterOption::AllUpdateOld => "all update old",
701 }
702 }
703}
704
705impl fmt::Display for RowFilterOption {
706 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707 write!(f, "{}", self.to_sql_string())
708 }
709}
710
711/// Identifies what change was made to the SQL Server table tracked by CDC.
712#[derive(Debug)]
713pub enum Operation {
714 /// Row was `INSERT`-ed.
715 Insert(tiberius::Row),
716 /// Row was `DELETE`-ed.
717 Delete(tiberius::Row),
718 /// Original value of the row when `UPDATE`-ed.
719 UpdateOld(Lsn, tiberius::Row),
720 /// New value of the row when `UPDATE`-ed.
721 UpdateNew(Lsn, tiberius::Row),
722}
723
724impl Operation {
725 /// Parse the provided [`tiberius::Row`] to determine what [`Operation`] occurred.
726 ///
727 /// See <https://learn.microsoft.com/en-us/sql/relational-databases/system-functions/cdc-fn-cdc-get-all-changes-capture-instance-transact-sql?view=sql-server-ver16#table-returned>.
728 fn try_parse(data: tiberius::Row) -> Result<(Lsn, Self), SqlServerError> {
729 static START_LSN_COLUMN: &str = "__$start_lsn";
730 static OPERATION_COLUMN: &str = "__$operation";
731 static SEQVAL_COLUMN: &str = "__$seqval";
732
733 let lsn: &[u8] = data
734 .try_get(START_LSN_COLUMN)
735 .map_err(|e| CdcError::RequiredColumn {
736 column_name: START_LSN_COLUMN,
737 error: e.to_string(),
738 })?
739 .ok_or_else(|| CdcError::RequiredColumn {
740 column_name: START_LSN_COLUMN,
741 error: "got null value".to_string(),
742 })?;
743 let operation: i32 = data
744 .try_get(OPERATION_COLUMN)
745 .map_err(|e| CdcError::RequiredColumn {
746 column_name: OPERATION_COLUMN,
747 error: e.to_string(),
748 })?
749 .ok_or_else(|| CdcError::RequiredColumn {
750 column_name: OPERATION_COLUMN,
751 error: "got null value".to_string(),
752 })?;
753 let seqval: &[u8] = data
754 .try_get(SEQVAL_COLUMN)
755 .map_err(|e| CdcError::RequiredColumn {
756 column_name: SEQVAL_COLUMN,
757 error: e.to_string(),
758 })?
759 .ok_or_else(|| CdcError::RequiredColumn {
760 column_name: SEQVAL_COLUMN,
761 error: "got null value".to_string(),
762 })?;
763
764 let lsn = Lsn::try_from(lsn).map_err(|msg| SqlServerError::InvalidData {
765 column_name: START_LSN_COLUMN.to_string(),
766 error: msg,
767 })?;
768 let seqval = Lsn::try_from(seqval).map_err(|msg| SqlServerError::InvalidData {
769 column_name: SEQVAL_COLUMN.to_string(),
770 error: msg,
771 })?;
772
773 let operation = match operation {
774 1 => Operation::Delete(data),
775 2 => Operation::Insert(data),
776 3 => Operation::UpdateOld(seqval, data),
777 4 => Operation::UpdateNew(seqval, data),
778 other => {
779 return Err(SqlServerError::InvalidData {
780 column_name: OPERATION_COLUMN.to_string(),
781 error: format!("unrecognized operation {other}"),
782 });
783 }
784 };
785
786 Ok((lsn, operation))
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::Lsn;
793 use proptest::prelude::*;
794 use tiberius::numeric::Numeric;
795
796 #[mz_ore::test]
797 fn smoketest_lsn_ordering() {
798 let a = hex::decode("0000003D000019B80004").unwrap();
799 let a = Lsn::try_from(&a[..]).unwrap();
800
801 let b = hex::decode("0000003D000019F00011").unwrap();
802 let b = Lsn::try_from(&b[..]).unwrap();
803
804 let c = hex::decode("0000003D00001A500003").unwrap();
805 let c = Lsn::try_from(&c[..]).unwrap();
806
807 assert!(a < b);
808 assert!(b < c);
809 assert!(a < c);
810
811 assert_eq!(a, a);
812 assert_eq!(b, b);
813 assert_eq!(c, c);
814 }
815
816 #[mz_ore::test]
817 fn smoketest_lsn_roundtrips() {
818 #[track_caller]
819 fn test_case(hex: &str) {
820 let og = hex::decode(hex).unwrap();
821 let lsn = Lsn::try_from(&og[..]).unwrap();
822 let rnd = lsn.as_bytes();
823 assert_eq!(og[..], rnd[..]);
824 }
825
826 test_case("0000003D000019B80004");
827 test_case("0000003D000019F00011");
828 test_case("0000003D00001A500003");
829 }
830
831 #[mz_ore::test]
832 fn proptest_lsn_roundtrips() {
833 #[track_caller]
834 fn test_case(bytes: [u8; 10]) {
835 let lsn = Lsn::try_from_bytes(&bytes[..]).unwrap();
836 let rnd = lsn.as_bytes();
837 assert_eq!(&bytes[..], &rnd[..]);
838 }
839 proptest!(|(random_bytes in any::<[u8; 10]>())| {
840 test_case(random_bytes)
841 })
842 }
843
844 #[mz_ore::test]
845 fn proptest_lsn_increment() {
846 #[track_caller]
847 fn test_case(bytes: [u8; 10]) {
848 let lsn = Lsn::try_from_bytes(&bytes[..]).unwrap();
849 let new = lsn.increment();
850 assert!(lsn < new);
851 }
852 proptest!(|(random_bytes in any::<[u8; 10]>())| {
853 test_case(random_bytes)
854 })
855 }
856
857 #[mz_ore::test]
858 fn proptest_lsn_abbreviate_total_order() {
859 #[track_caller]
860 fn test_case(bytes: [u8; 10], num_increment: u8) {
861 let lsn = Lsn::try_from_bytes(&bytes[..]).unwrap();
862 let mut new = lsn;
863 for _ in 0..num_increment {
864 new = new.increment();
865 }
866
867 let a = lsn.abbreviate();
868 let b = new.abbreviate();
869
870 assert!(a <= b);
871 }
872 proptest!(|(random_bytes in any::<[u8; 10]>(), num_increment in any::<u8>())| {
873 test_case(random_bytes, num_increment)
874 })
875 }
876
877 #[mz_ore::test]
878 fn test_numeric_lsn_ordering() {
879 let a = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00001_i128, 0)).unwrap();
880 let b = Lsn::try_from(Numeric::new_with_scale(45_0000008784_00002_i128, 0)).unwrap();
881 let c = Lsn::try_from(Numeric::new_with_scale(45_0000008785_00002_i128, 0)).unwrap();
882 let d = Lsn::try_from(Numeric::new_with_scale(49_0000008784_00002_i128, 0)).unwrap();
883 assert!(a < b);
884 assert!(b < c);
885 assert!(c < d);
886 assert!(a < d);
887
888 assert_eq!(a, a);
889 assert_eq!(b, b);
890 assert_eq!(c, c);
891 assert_eq!(d, d);
892 }
893
894 #[mz_ore::test]
895 fn test_numeric_lsn_invalid() {
896 let with_decimal = Numeric::new_with_scale(1, 20);
897 assert!(Lsn::try_from(with_decimal).is_err());
898
899 for v in [
900 4294967296_0000000000_00000_i128, // vlf_id is too large
901 1_4294967296_00000_i128, // block_id is too large
902 1_0000000001_65536_i128, // record_id is too large
903 -49_0000008784_00002_i128, // negative is invalid
904 ] {
905 let invalid_lsn = Numeric::new_with_scale(v, 0);
906 assert!(Lsn::try_from(invalid_lsn).is_err());
907 }
908 }
909}