mz_storage/source/sql_server/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`SqlServerSourceConnection`].
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::convert::Infallible;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::containers::TimelyStack;
20use futures::StreamExt;
21use itertools::Itertools;
22use mz_ore::cast::CastFrom;
23use mz_ore::future::InTask;
24use mz_repr::{Diff, GlobalId, Row, RowArena};
25use mz_sql_server_util::SqlServerCdcMetrics;
26use mz_sql_server_util::cdc::{CdcEvent, Lsn, Operation as CdcOperation};
27use mz_sql_server_util::desc::SqlServerRowDecoder;
28use mz_sql_server_util::inspect::get_latest_restore_history_id;
29use mz_storage_types::errors::{DataflowError, DecodeError, DecodeErrorKind};
30use mz_storage_types::sources::SqlServerSourceConnection;
31use mz_storage_types::sources::sql_server::{
32    CDC_POLL_INTERVAL, MAX_LSN_WAIT, SNAPSHOT_PROGRESS_REPORT_INTERVAL,
33};
34use mz_timely_util::builder_async::{
35    AsyncOutputHandle, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use mz_timely_util::containers::stack::AccountedStackBuilder;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::operators::{CapabilitySet, Concat, Map};
40use timely::dataflow::{Scope, Stream as TimelyStream};
41use timely::progress::{Antichain, Timestamp};
42
43use crate::metrics::source::sql_server::SqlServerSourceMetrics;
44use crate::source::RawSourceCreationConfig;
45use crate::source::sql_server::{
46    DefiniteError, ReplicationError, SourceOutputInfo, TransientError,
47};
48use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
49
50/// Used as a partition ID to determine the worker that is responsible for
51/// reading data from SQL Server.
52///
53/// TODO(sql_server2): It's possible we could have different workers
54/// replicate different tables, if we're using SQL Server's CDC features.
55static REPL_READER: &str = "reader";
56
57pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
58    scope: G,
59    config: RawSourceCreationConfig,
60    outputs: BTreeMap<GlobalId, SourceOutputInfo>,
61    source: SqlServerSourceConnection,
62    metrics: SqlServerSourceMetrics,
63) -> (
64    StackedCollection<G, (u64, Result<SourceMessage, DataflowError>)>,
65    TimelyStream<G, Infallible>,
66    TimelyStream<G, ReplicationError>,
67    PressOnDropButton,
68) {
69    let op_name = format!("SqlServerReplicationReader({})", config.id);
70    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
71
72    let (data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
73    let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
74
75    // Captures DefiniteErrors that affect the entire source, including all outputs
76    let (definite_error_handle, definite_errors) =
77        builder.new_output::<CapacityContainerBuilder<_>>();
78
79    let (button, transient_errors) = builder.build_fallible(move |caps| {
80        let busy_signal = Arc::clone(&config.busy_signal);
81        Box::pin(SignaledFuture::new(busy_signal, async move {
82            let [
83                data_cap_set,
84                upper_cap_set,
85                definite_error_cap_set,
86            ]: &mut [_; 3] = caps.try_into().unwrap();
87
88            let connection_config = source
89                .connection
90                .resolve_config(
91                    &config.config.connection_context.secrets_reader,
92                    &config.config,
93                    InTask::Yes,
94                )
95                .await?;
96            let mut client = mz_sql_server_util::Client::connect(connection_config).await?;
97
98            let worker_id = config.worker_id;
99
100            // The decoder is specific to the export, and each export pulls data from a specific capture instance.
101            let mut decoder_map: BTreeMap<_, _> = BTreeMap::new();
102            // Maps the 'capture instance' to the output index for only those outputs that this worker will snapshot
103            let mut capture_instance_to_snapshot: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
104            // Maps the 'capture instance' to the output index for all outputs of this worker
105            let mut capture_instances: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
106            // Export statistics for a given capture instance
107            let mut export_statistics: BTreeMap<_, Vec<_>> = BTreeMap::new();
108
109            for (export_id, output) in outputs.iter() {
110                if decoder_map.insert(output.partition_index, Arc::clone(&output.decoder)).is_some() {
111                    panic!("Multiple decoders for output index {}", output.partition_index);
112                }
113                capture_instances
114                    .entry(Arc::clone(&output.capture_instance))
115                    .or_default()
116                    .push(output.partition_index);
117
118                if *output.resume_upper == [Lsn::minimum()] {
119                    capture_instance_to_snapshot
120                        .entry(Arc::clone(&output.capture_instance))
121                        .or_default()
122                        .push((output.partition_index, output.initial_lsn));
123                }
124                export_statistics.entry(Arc::clone(&output.capture_instance))
125                    .or_default()
126                    .push(
127                        config
128                            .statistics
129                            .get(export_id)
130                            .expect("statistics have been intialized")
131                            .clone(),
132                    );
133            }
134
135            // Eagerly emit an event if we have tables to snapshot.
136            // A worker *must* emit a count even if not responsible for snapshotting a table
137            // as statistic summarization will return null if any worker hasn't set a value.
138            // This will also reset snapshot stats for any exports not snapshotting.
139            metrics.snapshot_table_count.set(u64::cast_from(capture_instance_to_snapshot.len()));
140            if !capture_instance_to_snapshot.is_empty() {
141                for stats in config.statistics.values() {
142                    stats.set_snapshot_records_known(0);
143                    stats.set_snapshot_records_staged(0);
144                }
145            }
146            // We need to emit statistics before we exit
147            // TODO(sql_server2): Run ingestions across multiple workers.
148            if !config.responsible_for(REPL_READER) {
149                return Ok::<_, TransientError>(());
150            }
151
152            let snapshot_instances = capture_instance_to_snapshot
153                    .keys()
154                    .map(|i| i.as_ref());
155
156            // TODO (maz): we can avoid this query by using SourceOutputInfo
157            let snapshot_tables = mz_sql_server_util::inspect::get_tables_for_capture_instance(&mut client, snapshot_instances).await?;
158
159            // validate that the restore_history_id hasn't changed
160            let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
161            if current_restore_history_id != source.extras.restore_history_id {
162                let definite_error = DefiniteError::RestoreHistoryChanged(
163                    source.extras.restore_history_id.clone(),
164                    current_restore_history_id.clone()
165                );
166                tracing::warn!(?definite_error, "Restore detected, exiting");
167
168                return_definite_error(
169                        definite_error,
170                        capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
171                        data_output,
172                        data_cap_set,
173                        definite_error_handle,
174                        definite_error_cap_set,
175                    ).await;
176                return Ok(());
177            }
178
179            // We first calculate all the total rows we need to fetch across all tables. Since this
180            // happens outside the snapshot transaction the totals might be off, so we won't assert
181            // that we get exactly this many rows later.
182            for table in &snapshot_tables {
183                let qualified_table_name = format!("{schema_name}.{table_name}",
184                    schema_name = &table.schema_name,
185                    table_name = &table.name);
186                let size_calc_start = Instant::now();
187                let table_total = mz_sql_server_util::inspect::snapshot_size(&mut client, &table.schema_name, &table.name).await?;
188                metrics.set_snapshot_table_size_latency(
189                    &qualified_table_name,
190                    size_calc_start.elapsed().as_secs_f64()
191                );
192                for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
193                    export_stat.set_snapshot_records_known(u64::cast_from(table_total));
194                    export_stat.set_snapshot_records_staged(0);
195                }
196            }
197            let cdc_metrics = PrometheusSqlServerCdcMetrics{inner: &metrics};
198            let mut cdc_handle = client
199                .cdc(capture_instances.keys().cloned(), cdc_metrics)
200                .max_lsn_wait(MAX_LSN_WAIT.get(config.config.config_set()));
201
202            // Snapshot any instance that requires it.
203            // Each table snapshot will have its own LSN captured at the moment of snapshotting.
204            let snapshot_lsns: BTreeMap<Arc<str>, Lsn> = {
205                // Before starting a transaction where the LSN will not advance, ensure
206                // the upstream DB is ready for CDC.
207                cdc_handle.wait_for_ready().await?;
208
209                // Intentionally logging this at info for debugging. This section won't get entered
210                // often, but if there are problems here, it will be much easier to troubleshoot
211                // knowing where stall/hang might be happening.
212                tracing::info!(%config.worker_id, "timely-{worker_id} upstream is ready");
213
214                let report_interval =
215                    SNAPSHOT_PROGRESS_REPORT_INTERVAL.handle(config.config.config_set());
216                let mut last_report = Instant::now();
217                let mut snapshot_lsns = BTreeMap::new();
218                let arena = RowArena::default();
219
220                for table in snapshot_tables {
221                    // TODO(sql_server3): filter columns to only select columns required for Source.
222                    let (snapshot_lsn, snapshot) = cdc_handle
223                        .snapshot(&table, config.worker_id, config.id)
224                        .await?;
225
226                    tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot start");
227
228                    let mut snapshot = std::pin::pin!(snapshot);
229
230                    snapshot_lsns.insert(Arc::clone(&table.capture_instance.name), snapshot_lsn);
231
232                    let partition_indexes = capture_instance_to_snapshot.get(&table.capture_instance.name)
233                        .unwrap_or_else(|| {
234                            panic!("no snapshot outputs in known capture instances [{}] for capture instance: '{}'", capture_instance_to_snapshot.keys().join(","), table.capture_instance.name);
235                        });
236
237                    let mut snapshot_staged = 0;
238                    while let Some(result) = snapshot.next().await {
239                        let sql_server_row = result.map_err(TransientError::from)?;
240
241                        if last_report.elapsed() > report_interval.get() {
242                            last_report = Instant::now();
243                            for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
244                                export_stat.set_snapshot_records_staged(snapshot_staged);
245                            }
246                        }
247
248                        for (partition_idx, _) in partition_indexes {
249                            // Decode the SQL Server row into an MZ one.
250                            let mut mz_row = Row::default();
251
252                            let decoder = decoder_map.get(partition_idx).expect("decoder for output");
253                            // Try to decode a row, returning a SourceError if it fails.
254                            let message = match decoder.decode(&sql_server_row, &mut mz_row, &arena) {
255                                Ok(()) => Ok(SourceMessage {
256                                    key: Row::default(),
257                                    value: mz_row,
258                                    metadata: Row::default(),
259                                }),
260                                Err(e) => {
261                                    let kind = DecodeErrorKind::Text(e.to_string().into());
262                                    // TODO(sql_server2): Get the raw bytes from `tiberius`.
263                                    let raw = format!("{sql_server_row:?}");
264                                    Err(DataflowError::DecodeError(Box::new(DecodeError {
265                                        kind,
266                                        raw: raw.as_bytes().to_vec(),
267                                    })))
268                                }
269                            };
270                            data_output
271                                .give_fueled(
272                                    &data_cap_set[0],
273                                    ((*partition_idx, message), Lsn::minimum(), Diff::ONE),
274                                )
275                                .await;
276                        }
277                        snapshot_staged += 1;
278                    }
279
280                    tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot complete");
281                    metrics.snapshot_table_count.dec();
282                    // final update for snapshot_staged, using the staged values as the total is an estimate
283                    for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
284                        export_stat.set_snapshot_records_staged(snapshot_staged);
285                        export_stat.set_snapshot_records_known(snapshot_staged);
286                    }
287                }
288
289                snapshot_lsns
290            };
291
292            // Rewinds need to keep track of 2 timestamps to ensure that
293            // all replicas emit the same set of updates for any given timestamp.
294            // These are the initial_lsn and snapshot_lsn, where initial_lsn must be
295            // less than or equal to snapshot_lsn.
296            //
297            // - events at an LSN less than or equal to initial_lsn are ignored
298            // - events at an LSN greater than initial_lsn and less than or equal to
299            //   snapshot_lsn are retracted at Lsn::minimum(), and emitted at the commit_lsn
300            // - events at an LSN greater than snapshot_lsn are emitted at the commit_lsn
301            //
302            // where the commit_lsn is the upstream LSN that the event was committed at
303            //
304            // If initial_lsn == snapshot_lsn, all CDC events at LSNs up to and including the
305            // snapshot_lsn are ignored, and no rewinds are issued.
306            let mut rewinds: BTreeMap<_, _> = capture_instance_to_snapshot
307                .iter()
308                .flat_map(|(capture_instance, export_ids)|{
309                    let snapshot_lsn = snapshot_lsns.get(capture_instance).expect("snapshot lsn must be collected for capture instance");
310                    export_ids
311                        .iter()
312                        .map(|(idx, initial_lsn)| (*idx, (*initial_lsn, *snapshot_lsn)))
313                }).collect();
314
315            // For now, we assert that initial_lsn captured during purification is less
316            // than or equal to snapshot_lsn. If that was not true, it would mean that
317            // we observed a SQL server DB that appeared to go back in time.
318            // TODO (maz): not ideal to do this after snapshot, move this into
319            // CdcStream::snapshot after https://github.com/MaterializeInc/materialize/pull/32979 is merged.
320            for (initial_lsn, snapshot_lsn) in rewinds.values() {
321                assert!(
322                    initial_lsn <= snapshot_lsn,
323                    "initial_lsn={initial_lsn} snapshot_lsn={snapshot_lsn}"
324                );
325            }
326
327            tracing::debug!("rewinds to process: {rewinds:?}");
328
329            capture_instance_to_snapshot.clear();
330
331            // Resumption point is the minimum LSN that has been observed per capture instance.
332            let mut resume_lsns = BTreeMap::new();
333            for src_info in outputs.values() {
334                let resume_lsn = match src_info.resume_upper.as_option() {
335                    Some(lsn) if *lsn != Lsn::minimum() => *lsn,
336                    // initial_lsn is the max lsn observed, but the resume lsn
337                    // is the next lsn that should be read.  After a snapshot, initial_lsn
338                    // has been read, so replication will start at the next available lsn.
339                    Some(_) => src_info.initial_lsn.increment(),
340                    None => panic!("resume_upper has at least one value"),
341                };
342                resume_lsns.entry(Arc::clone(&src_info.capture_instance))
343                    .and_modify(|existing| *existing = std::cmp::min(*existing, resume_lsn))
344                    .or_insert(resume_lsn);
345            }
346
347            tracing::info!(%config.id, ?resume_lsns, "timely-{} replication starting", config.worker_id);
348            for instance in capture_instances.keys() {
349                let resume_lsn = resume_lsns
350                    .get(instance)
351                    .expect("resume_lsn exists for capture instance");
352                cdc_handle = cdc_handle.start_lsn(instance, *resume_lsn);
353            }
354
355            // Off to the races! Replicate data from SQL Server.
356            let cdc_stream = cdc_handle
357                .poll_interval(CDC_POLL_INTERVAL.get(config.config.config_set()))
358                .into_stream();
359            let mut cdc_stream = std::pin::pin!(cdc_stream);
360
361            let mut errored_instances = BTreeSet::new();
362
363            // TODO(sql_server2): We should emit `ProgressStatisticsUpdate::SteadyState` messages
364            // here, when we receive progress events. What stops us from doing this now is our
365            // 10-byte LSN doesn't fit into the 8-byte integer that the progress event uses.
366            let mut log_rewinds_complete = true;
367            while let Some(event) = cdc_stream.next().await {
368                let event = event.map_err(TransientError::from)?;
369                tracing::trace!(?config.id, ?event, "got replication event");
370
371                match event {
372                    // We've received all of the changes up-to this LSN, so
373                    // downgrade our capability.
374                    CdcEvent::Progress { next_lsn } => {
375                        tracing::debug!(?config.id, ?next_lsn, "got a closed lsn");
376                        // cannot downgrade capability until rewinds have been processed,
377                        // we must be able to produce data at the minimum offset.
378                        rewinds.retain(|_, (_, snapshot_lsn)| next_lsn <= *snapshot_lsn);
379                        if rewinds.is_empty() {
380                            if log_rewinds_complete {
381                                tracing::debug!("rewinds complete");
382                                log_rewinds_complete = false;
383                            }
384                            data_cap_set.downgrade(Antichain::from_elem(next_lsn));
385                        } else {
386                            tracing::debug!("rewinds remaining: {:?}", rewinds);
387                        }
388                        upper_cap_set.downgrade(Antichain::from_elem(next_lsn));
389                    }
390                    // We've got new data! Let's process it.
391                    CdcEvent::Data {
392                        capture_instance,
393                        lsn,
394                        changes,
395                    } => {
396                        if errored_instances.contains(&capture_instance) {
397                            // outputs for this captured instance are in an errored state, so they are not
398                            // emitted
399                            metrics.ignored.inc_by(u64::cast_from(changes.len()));
400                        }
401
402                        let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
403                            let definite_error = DefiniteError::ProgrammingError(format!(
404                                "capture instance didn't exist: '{capture_instance}'"
405                            ));
406                            return_definite_error(
407                                definite_error,
408                                capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
409                                data_output,
410                                data_cap_set,
411                                definite_error_handle,
412                                definite_error_cap_set,
413                            )
414                            .await;
415                            return Ok(());
416                        };
417
418                        handle_data_event(
419                            changes,
420                            partition_indexes,
421                            &decoder_map,
422                            lsn,
423                            &rewinds,
424                            &data_output,
425                            data_cap_set,
426                            &metrics
427                        ).await?
428                    },
429                    CdcEvent::SchemaUpdate { capture_instance, table, ddl_event } => {
430                        if !errored_instances.contains(&capture_instance)
431                            && !ddl_event.is_compatible() {
432                            let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
433                                let definite_error = DefiniteError::ProgrammingError(format!(
434                                    "capture instance didn't exist: '{capture_instance}'"
435                                ));
436                                return_definite_error(
437                                    definite_error,
438                                    capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
439                                    data_output,
440                                    data_cap_set,
441                                    definite_error_handle,
442                                    definite_error_cap_set,
443                                )
444                                .await;
445                                return Ok(());
446                            };
447                            let error = DefiniteError::IncompatibleSchemaChange(
448                                capture_instance.to_string(),
449                                table.to_string()
450                            );
451                            for partition_idx in partition_indexes {
452                                data_output
453                                    .give_fueled(
454                                        &data_cap_set[0],
455                                        ((*partition_idx, Err(error.clone().into())), ddl_event.lsn, Diff::ONE),
456                                    )
457                                    .await;
458                            }
459                            errored_instances.insert(capture_instance);
460                        }
461                    }
462                };
463            }
464            Err(TransientError::ReplicationEOF)
465        }))
466    });
467
468    let error_stream = definite_errors.concat(&transient_errors.map(ReplicationError::Transient));
469
470    (
471        data_stream.as_collection(),
472        upper_stream,
473        error_stream,
474        button.press_on_drop(),
475    )
476}
477
478async fn handle_data_event(
479    changes: Vec<CdcOperation>,
480    partition_indexes: &[u64],
481    decoder_map: &BTreeMap<u64, Arc<SqlServerRowDecoder>>,
482    commit_lsn: Lsn,
483    rewinds: &BTreeMap<u64, (Lsn, Lsn)>,
484    data_output: &StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
485    data_cap_set: &CapabilitySet<Lsn>,
486    metrics: &SqlServerSourceMetrics,
487) -> Result<(), TransientError> {
488    for change in changes {
489        let (sql_server_row, diff): (_, _) = match change {
490            CdcOperation::Insert(sql_server_row) => {
491                metrics.inserts.inc();
492                (sql_server_row, Diff::ONE)
493            }
494            CdcOperation::UpdateNew(sql_server_row) => {
495                metrics.updates.inc();
496                (sql_server_row, Diff::ONE)
497            }
498            CdcOperation::Delete(sql_server_row) => {
499                metrics.deletes.inc();
500                (sql_server_row, Diff::MINUS_ONE)
501            }
502            CdcOperation::UpdateOld(sql_server_row) => (sql_server_row, Diff::MINUS_ONE),
503        };
504
505        // Try to decode a row, returning a SourceError if it fails.
506        let mut mz_row = Row::default();
507        let arena = RowArena::default();
508
509        for partition_idx in partition_indexes {
510            let decoder = decoder_map.get(partition_idx).unwrap();
511
512            let rewind = rewinds.get(partition_idx);
513            // We must continue here to avoid decoding and emitting. We don't have to compare with
514            // snapshot_lsn as we are guaranteed that initial_lsn <= snapshot_lsn.
515            if rewind.is_some_and(|(initial_lsn, _)| commit_lsn <= *initial_lsn) {
516                continue;
517            }
518
519            // Try to decode a row, returning a SourceError if it fails.
520            let message = match decoder.decode(&sql_server_row, &mut mz_row, &arena) {
521                Ok(()) => Ok(SourceMessage {
522                    key: Row::default(),
523                    value: mz_row.clone(),
524                    metadata: Row::default(),
525                }),
526                Err(e) => {
527                    let kind = DecodeErrorKind::Text(e.to_string().into());
528                    // TODO(sql_server2): Get the raw bytes from `tiberius`.
529                    let raw = format!("{sql_server_row:?}");
530                    Err(DataflowError::DecodeError(Box::new(DecodeError {
531                        kind,
532                        raw: raw.as_bytes().to_vec(),
533                    })))
534                }
535            };
536
537            if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
538                data_output
539                    .give_fueled(
540                        &data_cap_set[0],
541                        ((*partition_idx, message.clone()), Lsn::minimum(), -diff),
542                    )
543                    .await;
544            }
545            data_output
546                .give_fueled(
547                    &data_cap_set[0],
548                    ((*partition_idx, message), commit_lsn, diff),
549                )
550                .await;
551        }
552    }
553    Ok(())
554}
555
556type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
557    T,
558    AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
559>;
560
561/// Helper method to return a "definite" error upstream.
562async fn return_definite_error(
563    err: DefiniteError,
564    outputs: impl Iterator<Item = u64>,
565    data_handle: StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
566    data_capset: &CapabilitySet<Lsn>,
567    errs_handle: AsyncOutputHandle<Lsn, CapacityContainerBuilder<Vec<ReplicationError>>>,
568    errs_capset: &CapabilitySet<Lsn>,
569) {
570    for output_idx in outputs {
571        let update = (
572            (output_idx, Err(err.clone().into())),
573            // Select an LSN that should not conflict with a previously observed LSN.  Ideally
574            // we could identify the LSN that resulted in the definite error so that all replicas
575            // would emit the same updates for the same times.
576            Lsn {
577                vlf_id: u32::MAX,
578                block_id: u32::MAX,
579                record_id: u16::MAX,
580            },
581            Diff::ONE,
582        );
583        data_handle.give_fueled(&data_capset[0], update).await;
584    }
585    errs_handle.give(
586        &errs_capset[0],
587        ReplicationError::DefiniteError(Rc::new(err)),
588    );
589}
590
591/// Provides an implemntation of [`SqlServerCdcMetrics`] that will update [`SqlServerSourceMetrics`]`
592struct PrometheusSqlServerCdcMetrics<'a> {
593    inner: &'a SqlServerSourceMetrics,
594}
595
596impl<'a> SqlServerCdcMetrics for PrometheusSqlServerCdcMetrics<'a> {
597    fn snapshot_table_lock_start(&self, table_name: &str) {
598        self.inner.update_snapshot_table_lock_count(table_name, 1);
599    }
600
601    fn snapshot_table_lock_end(&self, table_name: &str) {
602        self.inner.update_snapshot_table_lock_count(table_name, -1);
603    }
604}