Skip to main content

mz_storage/source/sql_server/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`SqlServerSourceConnection`].
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::Instant;
16
17use differential_dataflow::AsCollection;
18use futures::StreamExt;
19use itertools::Itertools;
20use mz_ore::cast::CastFrom;
21use mz_ore::collections::HashMap;
22use mz_ore::future::InTask;
23use mz_repr::{Diff, GlobalId, Row, RowArena};
24use mz_sql_server_util::SqlServerCdcMetrics;
25use mz_sql_server_util::cdc::{CdcEvent, Lsn, Operation as CdcOperation};
26use mz_sql_server_util::desc::SqlServerRowDecoder;
27use mz_sql_server_util::inspect::{
28    ensure_database_cdc_enabled, ensure_sql_server_agent_running, get_latest_restore_history_id,
29};
30use mz_storage_types::dyncfgs::SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY;
31use mz_storage_types::errors::{DataflowError, DecodeError, DecodeErrorKind};
32use mz_storage_types::sources::SqlServerSourceConnection;
33use mz_storage_types::sources::sql_server::{MAX_LSN_WAIT, SNAPSHOT_PROGRESS_REPORT_INTERVAL};
34use mz_timely_util::builder_async::{
35    AsyncOutputHandle, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use mz_timely_util::containers::stack::FueledBuilder;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::operators::vec::Map;
40use timely::dataflow::operators::{CapabilitySet, Concat};
41use timely::dataflow::{Scope, StreamVec};
42use timely::progress::{Antichain, Timestamp};
43
44use crate::metrics::source::sql_server::SqlServerSourceMetrics;
45use crate::source::RawSourceCreationConfig;
46use crate::source::sql_server::{
47    DefiniteError, ReplicationError, SourceOutputInfo, TransientError,
48};
49use crate::source::types::{FuelSize, SignaledFuture, SourceMessage, StackedCollection};
50
51/// Used as a partition ID to determine the worker that is responsible for
52/// reading data from SQL Server.
53///
54/// TODO(sql_server2): It's possible we could have different workers
55/// replicate different tables, if we're using SQL Server's CDC features.
56static REPL_READER: &str = "reader";
57
58pub(crate) fn render<'scope>(
59    scope: Scope<'scope, Lsn>,
60    config: RawSourceCreationConfig,
61    outputs: BTreeMap<GlobalId, SourceOutputInfo>,
62    source: SqlServerSourceConnection,
63    metrics: SqlServerSourceMetrics,
64) -> (
65    StackedCollection<'scope, Lsn, (u64, Result<SourceMessage, DataflowError>)>,
66    StreamVec<'scope, Lsn, ReplicationError>,
67    PressOnDropButton,
68) {
69    let op_name = format!("SqlServerReplicationReader({})", config.id);
70    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
71
72    let (data_output, data_stream) = builder.new_output::<FueledBuilder<_>>();
73
74    // Captures DefiniteErrors that affect the entire source, including all outputs
75    let (definite_error_handle, definite_errors) =
76        builder.new_output::<CapacityContainerBuilder<_>>();
77
78    let (button, transient_errors) = builder.build_fallible(move |caps| {
79        let busy_signal = Arc::clone(&config.busy_signal);
80        Box::pin(SignaledFuture::new(busy_signal, async move {
81            let [
82                data_cap_set,
83                definite_error_cap_set,
84            ]: &mut [_; 2] = caps.try_into().unwrap();
85
86            let connection_config = source
87                .connection
88                .resolve_config(
89                    &config.config.connection_context.secrets_reader,
90                    &config.config,
91                    InTask::Yes,
92                )
93                .await?;
94            let mut client = mz_sql_server_util::Client::connect(connection_config).await?;
95
96            let worker_id = config.worker_id;
97
98            // The decoder is specific to the export, and each export pulls data from a specific capture instance.
99            let mut decoder_map: BTreeMap<_, _> = BTreeMap::new();
100            // Maps the 'capture instance' to the output index for only those outputs that this worker will snapshot
101            let mut capture_instance_to_snapshot: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
102            // Maps the 'capture instance' to the output index for all outputs of this worker
103            let mut capture_instances: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
104            // Export statistics for a given capture instance
105            let mut export_statistics: BTreeMap<_, Vec<_>> = BTreeMap::new();
106            // Maps the included columns for each output index so we can check
107            // whether schema updates are valid on a per-output basis
108            let mut included_columns: HashMap<u64, Vec<Arc<str>>> = HashMap::new();
109
110            for (export_id, output) in outputs.iter() {
111                let key = output.partition_index;
112                if decoder_map.insert(key, Arc::clone(&output.decoder)).is_some() {
113                    panic!("Multiple decoders for output index {}", output.partition_index);
114                }
115                // Collect the included columns from decoder for schema
116                // change validation. The decoder serves as an effective
117                // source of truth for which columns are "included", as we
118                // only care about the columns that are being decoded and
119                // replicated
120                let included_cols = output.decoder.included_column_names();
121                included_columns.insert(output.partition_index, included_cols);
122
123                capture_instances
124                    .entry(Arc::clone(&output.capture_instance))
125                    .or_default()
126                    .push(output.partition_index);
127
128                if *output.resume_upper == [Lsn::minimum()] {
129                    capture_instance_to_snapshot
130                        .entry(Arc::clone(&output.capture_instance))
131                        .or_default()
132                        .push((output.partition_index, output.initial_lsn));
133                }
134                export_statistics.entry(Arc::clone(&output.capture_instance))
135                    .or_default()
136                    .push(
137                        config
138                            .statistics
139                            .get(export_id)
140                            .expect("statistics have been intialized")
141                            .clone(),
142                    );
143            }
144
145            // Eagerly emit an event if we have tables to snapshot.
146            // A worker *must* emit a count even if not responsible for snapshotting a table
147            // as statistic summarization will return null if any worker hasn't set a value.
148            // This will also reset snapshot stats for any exports not snapshotting.
149            metrics.snapshot_table_count.set(u64::cast_from(capture_instance_to_snapshot.len()));
150            if !capture_instance_to_snapshot.is_empty() {
151                for stats in config.statistics.values() {
152                    stats.set_snapshot_records_known(0);
153                    stats.set_snapshot_records_staged(0);
154                }
155            }
156            // We need to emit statistics before we exit
157            // TODO(sql_server2): Run ingestions across multiple workers.
158            if !config.responsible_for(REPL_READER) {
159                return Ok::<_, TransientError>(());
160            }
161
162            let snapshot_instances = capture_instance_to_snapshot
163                    .keys()
164                    .map(|i| i.as_ref());
165
166            // TODO (maz): we can avoid this query by using SourceOutputInfo
167            let snapshot_tables =
168                mz_sql_server_util::inspect::get_tables_for_capture_instance(
169                    &mut client,
170                    snapshot_instances,
171                )
172                .await?;
173
174            // validate that the restore_history_id hasn't changed
175            let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
176            if current_restore_history_id != source.extras.restore_history_id {
177                if SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY.get(config.config.config_set()) {
178                    let definite_error = DefiniteError::RestoreHistoryChanged(
179                        source.extras.restore_history_id.clone(),
180                        current_restore_history_id.clone()
181                    );
182                    tracing::warn!(?definite_error, "Restore detected, exiting");
183
184                    return_definite_error(
185                            definite_error,
186                            capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
187                            data_output,
188                            data_cap_set,
189                            definite_error_handle,
190                            definite_error_cap_set,
191                        ).await;
192                    return Ok(());
193                } else {
194                    tracing::warn!(
195                        "Restore history mismatch ignored: expected={expected:?} actual={actual:?}",
196                        expected=source.extras.restore_history_id,
197                        actual=current_restore_history_id
198                    );
199                }
200            }
201
202            // For AOAG, it's possible that the dataflow restarted and is now connected to a
203            // different SQL Server, which may not have CDC enabled correctly.
204            ensure_database_cdc_enabled(&mut client).await?;
205            ensure_sql_server_agent_running(&mut client).await?;
206
207            // We first calculate all the total rows we need to fetch across all tables. Since this
208            // happens outside the snapshot transaction the totals might be off, so we won't assert
209            // that we get exactly this many rows later.
210            for table in &snapshot_tables {
211                let qualified_table_name = format!("{schema_name}.{table_name}",
212                    schema_name = &table.schema_name,
213                    table_name = &table.name);
214                let size_calc_start = Instant::now();
215                let table_total =
216                    mz_sql_server_util::inspect::snapshot_size(
217                        &mut client,
218                        &table.schema_name,
219                        &table.name,
220                    )
221                    .await?;
222                metrics.set_snapshot_table_size_latency(
223                    &qualified_table_name,
224                    size_calc_start.elapsed().as_secs_f64()
225                );
226                for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
227                    export_stat.set_snapshot_records_known(u64::cast_from(table_total));
228                    export_stat.set_snapshot_records_staged(0);
229                }
230            }
231            let cdc_metrics = PrometheusSqlServerCdcMetrics{inner: &metrics};
232            let mut cdc_handle = client
233                .cdc(capture_instances.keys().cloned(), cdc_metrics)
234                .max_lsn_wait(MAX_LSN_WAIT.get(config.config.config_set()));
235
236            // Snapshot any instance that requires it.
237            // Each table snapshot will have its own LSN captured at the moment of snapshotting.
238            let snapshot_lsns: BTreeMap<Arc<str>, Lsn> = {
239                // Before starting a transaction where the LSN will not advance, ensure
240                // the upstream DB is ready for CDC.
241                cdc_handle.wait_for_ready().await?;
242
243                // Intentionally logging this at info for debugging. This section won't get entered
244                // often, but if there are problems here, it will be much easier to troubleshoot
245                // knowing where stall/hang might be happening.
246                tracing::info!(%config.worker_id, "timely-{worker_id} upstream is ready");
247
248                let report_interval =
249                    SNAPSHOT_PROGRESS_REPORT_INTERVAL.handle(config.config.config_set());
250                let mut last_report = Instant::now();
251                let mut snapshot_lsns = BTreeMap::new();
252
253                for table in snapshot_tables {
254                    // TODO(sql_server3): filter columns to only select columns required for Source.
255                    let (snapshot_lsn, snapshot) = cdc_handle
256                        .snapshot(&table, config.worker_id, config.id)
257                        .await?;
258
259                    tracing::info!(
260                        %config.id,
261                        %table.name,
262                        %table.schema_name,
263                        %snapshot_lsn,
264                        "timely-{worker_id} snapshot start",
265                    );
266
267                    let mut snapshot = std::pin::pin!(snapshot);
268
269                    snapshot_lsns.insert(
270                        Arc::clone(&table.capture_instance.name),
271                        snapshot_lsn,
272                    );
273
274                    let ci_name = &table.capture_instance.name;
275                    let partition_indexes = capture_instance_to_snapshot
276                        .get(ci_name)
277                        .unwrap_or_else(|| {
278                            panic!(
279                                "no snapshot outputs in known capture \
280                                 instances [{}] for capture instance: \
281                                 '{}'",
282                                capture_instance_to_snapshot
283                                    .keys()
284                                    .join(","),
285                                ci_name,
286                            );
287                        });
288
289                    let mut snapshot_staged = 0;
290                    while let Some(result) = snapshot.next().await {
291                        let sql_server_row =
292                            result.map_err(TransientError::from)?;
293
294                        if last_report.elapsed() > report_interval.get() {
295                            last_report = Instant::now();
296                            let stats =
297                                export_statistics.get(ci_name).unwrap();
298                            for export_stat in stats {
299                                export_stat.set_snapshot_records_staged(
300                                    snapshot_staged,
301                                );
302                            }
303                        }
304
305                        for (partition_idx, _) in partition_indexes {
306                            // Decode the SQL Server row into an MZ one.
307                            let mut mz_row = Row::default();
308                            let arena = RowArena::default();
309
310                            let decoder = decoder_map
311                                .get(partition_idx)
312                                .expect("decoder for output");
313                            // Try to decode a row, returning a SourceError
314                            // if it fails.
315                            let message = decode(
316                                decoder,
317                                &sql_server_row,
318                                &mut mz_row,
319                                &arena,
320                                None,
321                            );
322                            let update =
323                                ((*partition_idx, message), Lsn::minimum(), Diff::ONE);
324                            let size = update.fuel_size();
325                            data_output
326                                .give_fueled(&data_cap_set[0], update, size)
327                                .await;
328                        }
329                        snapshot_staged += 1;
330                    }
331
332                    tracing::info!(
333                        %config.id,
334                        %table.name,
335                        %table.schema_name,
336                        %snapshot_lsn,
337                        "timely-{worker_id} snapshot complete",
338                    );
339                    metrics.snapshot_table_count.dec();
340                    // final update for snapshot_staged, using the staged
341                    // values as the total is an estimate
342                    let stats = export_statistics.get(ci_name).unwrap();
343                    for export_stat in stats {
344                        export_stat.set_snapshot_records_staged(snapshot_staged);
345                        export_stat.set_snapshot_records_known(snapshot_staged);
346                    }
347                }
348
349                snapshot_lsns
350            };
351
352            // Rewinds need to keep track of 2 timestamps to ensure that
353            // all replicas emit the same set of updates for any given timestamp.
354            // These are the initial_lsn and snapshot_lsn, where initial_lsn must be
355            // less than or equal to snapshot_lsn.
356            //
357            // - events at an LSN less than or equal to initial_lsn are ignored
358            // - events at an LSN greater than initial_lsn and less than or equal to
359            //   snapshot_lsn are retracted at Lsn::minimum(), and emitted at the commit_lsn
360            // - events at an LSN greater than snapshot_lsn are emitted at the commit_lsn
361            //
362            // where the commit_lsn is the upstream LSN that the event was committed at
363            //
364            // If initial_lsn == snapshot_lsn, all CDC events at LSNs up to and including the
365            // snapshot_lsn are ignored, and no rewinds are issued.
366            let mut rewinds: BTreeMap<_, _> = capture_instance_to_snapshot
367                .iter()
368                .flat_map(|(capture_instance, export_ids)|{
369                    let snapshot_lsn = snapshot_lsns.get(capture_instance).expect("snapshot lsn must be collected for capture instance");
370                    export_ids
371                        .iter()
372                        .map(|(idx, initial_lsn)| (*idx, (*initial_lsn, *snapshot_lsn)))
373                }).collect();
374
375            // For now, we assert that initial_lsn captured during purification is less
376            // than or equal to snapshot_lsn. If that was not true, it would mean that
377            // we observed a SQL server DB that appeared to go back in time.
378            // TODO (maz): not ideal to do this after snapshot, move this into
379            // CdcStream::snapshot after https://github.com/MaterializeInc/materialize/pull/32979 is merged.
380            for (initial_lsn, snapshot_lsn) in rewinds.values() {
381                assert!(
382                    initial_lsn <= snapshot_lsn,
383                    "initial_lsn={initial_lsn} snapshot_lsn={snapshot_lsn}"
384                );
385            }
386
387            tracing::debug!("rewinds to process: {rewinds:?}");
388
389            capture_instance_to_snapshot.clear();
390
391            // Resumption point is the minimum LSN that has been observed per capture instance.
392            let mut resume_lsns = BTreeMap::new();
393            for src_info in outputs.values() {
394                let resume_lsn = match src_info.resume_upper.as_option() {
395                    Some(lsn) if *lsn != Lsn::minimum() => *lsn,
396                    // initial_lsn is the max lsn observed, but the resume lsn
397                    // is the next lsn that should be read.  After a snapshot, initial_lsn
398                    // has been read, so replication will start at the next available lsn.
399                    Some(_) => src_info.initial_lsn.increment(),
400                    None => panic!("resume_upper has at least one value"),
401                };
402                resume_lsns.entry(Arc::clone(&src_info.capture_instance))
403                    .and_modify(|existing| *existing = std::cmp::min(*existing, resume_lsn))
404                    .or_insert(resume_lsn);
405            }
406
407            tracing::info!(%config.id, ?resume_lsns, "timely-{} replication starting", config.worker_id);
408            for instance in capture_instances.keys() {
409                let resume_lsn = resume_lsns
410                    .get(instance)
411                    .expect("resume_lsn exists for capture instance");
412                cdc_handle = cdc_handle.start_lsn(instance, *resume_lsn);
413            }
414
415            // Off to the races! Replicate data from SQL Server.
416            let cdc_stream = cdc_handle
417                .poll_interval(config.timestamp_interval)
418                .into_stream();
419            let mut cdc_stream = std::pin::pin!(cdc_stream);
420
421            let mut errored_partitions = BTreeSet::new();
422
423            // TODO(sql_server2): We should emit `ProgressStatisticsUpdate::SteadyState` messages
424            // here, when we receive progress events. What stops us from doing this now is our
425            // 10-byte LSN doesn't fit into the 8-byte integer that the progress event uses.
426            let mut log_rewinds_complete = true;
427
428            // deferred_updates temporarily stores rows for UPDATE operation to support Large Object
429            // Data (LOD) types (i.e. varchar(max), nvarchar(max)). The value of a
430            // LOD column will be NULL for the old row (operation = 3) if the value of the
431            // field did not change. The field data will be available in the new row
432            // (operation = 4).
433            // The CDC stream implementation emits a [`CdcEvent::Data`] event, which contains a
434            // batch of operations.  There is no guarantee that both old and new rows will
435            // exist in a single batch, so deferred updates must be tracked across multiple data
436            // events.
437            //
438            // In the current implementation schema change events won't be emitted between old
439            // and new rows.
440            //
441            // See <https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-capture-instance-ct-transact-sql?view=sql-server-ver17#large-object-data-types>
442            let mut deferred_updates = BTreeMap::new();
443
444            while let Some(event) = cdc_stream.next().await {
445                let event = event.map_err(TransientError::from)?;
446                tracing::trace!(?config.id, ?event, "got replication event");
447
448                tracing::trace!("deferred_updates = {deferred_updates:?}");
449                match event {
450                    // We've received all of the changes up-to this LSN, so
451                    // downgrade our capability.
452                    CdcEvent::Progress { next_lsn } => {
453                        tracing::debug!(?config.id, ?next_lsn, "got a closed lsn");
454                        // cannot downgrade capability until rewinds have been processed,
455                        // we must be able to produce data at the minimum offset.
456                        rewinds.retain(|_, (_, snapshot_lsn)| next_lsn <= *snapshot_lsn);
457                        if rewinds.is_empty() {
458                            if log_rewinds_complete {
459                                tracing::debug!("rewinds complete");
460                                log_rewinds_complete = false;
461                            }
462                            data_cap_set.downgrade(Antichain::from_elem(next_lsn));
463                        } else {
464                            tracing::debug!("rewinds remaining: {:?}", rewinds);
465                        }
466
467                        // Events are emitted in LSN order for a given capture instance, if any
468                        // deferred updates remain when the LSN progresses, it is a bug.
469                        if let Some(((deferred_lsn, _seqval), _row)) =
470                            deferred_updates.first_key_value()
471                            && *deferred_lsn < next_lsn
472                        {
473                            panic!(
474                                "deferred update lsn {deferred_lsn} \
475                                 < progress lsn {next_lsn}: {:?}",
476                                deferred_updates.keys()
477                            );
478                        }
479
480                    }
481                    // We've got new data! Let's process it.
482                    CdcEvent::Data {
483                        capture_instance,
484                        lsn,
485                        changes,
486                    } => {
487                        let Some(partition_indexes) =
488                            capture_instances.get(&capture_instance)
489                        else {
490                            let definite_error =
491                                DefiniteError::ProgrammingError(format!(
492                                    "capture instance didn't exist: \
493                                     '{capture_instance}'"
494                                ));
495                            return_definite_error(
496                                definite_error,
497                                capture_instances
498                                    .values()
499                                    .flat_map(|indexes| {
500                                        indexes.iter().copied()
501                                    }),
502                                data_output,
503                                data_cap_set,
504                                definite_error_handle,
505                                definite_error_cap_set,
506                            )
507                            .await;
508                            return Ok(());
509                        };
510
511                        let (valid_partitions, err_partitions) =
512                            partition_indexes
513                                .iter()
514                                .partition::<Vec<u64>, _>(
515                                    |&partition_idx| {
516                                        !errored_partitions
517                                            .contains(partition_idx)
518                                    },
519                                );
520
521                        if err_partitions.len() > 0 {
522                            metrics.ignored.inc_by(u64::cast_from(changes.len()));
523                        }
524
525                        handle_data_event(
526                            changes,
527                            &valid_partitions,
528                            &decoder_map,
529                            lsn,
530                            &rewinds,
531                            &data_output,
532                            data_cap_set,
533                            &metrics,
534                            &mut deferred_updates,
535                        ).await?
536                    },
537                    CdcEvent::SchemaUpdate {
538                        capture_instance,
539                        table,
540                        ddl_event,
541                    } => {
542                        let Some(partition_indexes) =
543                            capture_instances.get(&capture_instance)
544                        else {
545                            let definite_error =
546                                DefiniteError::ProgrammingError(format!(
547                                    "capture instance didn't exist: \
548                                     '{capture_instance}'"
549                                ));
550                            return_definite_error(
551                                definite_error,
552                                capture_instances
553                                    .values()
554                                    .flat_map(|indexes| {
555                                        indexes.iter().copied()
556                                    }),
557                                data_output,
558                                data_cap_set,
559                                definite_error_handle,
560                                definite_error_cap_set,
561                            )
562                            .await;
563                            return Ok(());
564                        };
565                        let error =
566                            DefiniteError::IncompatibleSchemaChange(
567                                capture_instance.to_string(),
568                                table.to_string(),
569                            );
570                        for partition_idx in partition_indexes {
571                            let cols = included_columns
572                                .get(partition_idx)
573                                .unwrap_or_else(|| {
574                                    panic!(
575                                        "Partition index didn't \
576                                         exist: '{partition_idx}'"
577                                    )
578                                });
579                            if !errored_partitions
580                                .contains(partition_idx)
581                                && !ddl_event.is_compatible(cols)
582                            {
583                                let msg = Err(
584                                    error.clone().into(),
585                                );
586                                let update = (
587                                    (*partition_idx, msg),
588                                    ddl_event.lsn,
589                                    Diff::ONE,
590                                );
591                                let size = update.fuel_size();
592                                data_output
593                                    .give_fueled(&data_cap_set[0], update, size)
594                                    .await;
595                                errored_partitions.insert(*partition_idx);
596                            }
597                        }
598                    }
599                };
600            }
601            Err(TransientError::ReplicationEOF)
602        }))
603    });
604
605    let error_stream = definite_errors.concat(transient_errors.map(ReplicationError::Transient));
606
607    (
608        data_stream.as_collection(),
609        error_stream,
610        button.press_on_drop(),
611    )
612}
613
614async fn handle_data_event(
615    changes: Vec<CdcOperation>,
616    partition_indexes: &[u64],
617    decoder_map: &BTreeMap<u64, Arc<SqlServerRowDecoder>>,
618    commit_lsn: Lsn,
619    rewinds: &BTreeMap<u64, (Lsn, Lsn)>,
620    data_output: &StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
621    data_cap_set: &CapabilitySet<Lsn>,
622    metrics: &SqlServerSourceMetrics,
623    deferred_updates: &mut BTreeMap<(Lsn, Lsn), CdcOperation>,
624) -> Result<(), TransientError> {
625    let mut mz_row = Row::default();
626    let arena = RowArena::default();
627
628    for change in changes {
629        // deferred_update is only valid for single iteration of the loop.  It is set once both
630        // old and new update rows are seen. It will be decoded and emitted to appropriate outputs.
631        // Its life now fullfilled, it will return to whence it came.
632        let mut deferred_update: Option<_> = None;
633        let (sql_server_row, diff): (_, _) = match change {
634            CdcOperation::Insert(sql_server_row) => {
635                metrics.inserts.inc();
636                (sql_server_row, Diff::ONE)
637            }
638            CdcOperation::Delete(sql_server_row) => {
639                metrics.deletes.inc();
640                (sql_server_row, Diff::MINUS_ONE)
641            }
642
643            // Updates are not ordered by seqval, so either old or new row could be observed first.
644            // The first update row is stashed, when the second arrives, both are processed.
645            CdcOperation::UpdateNew(seqval, sql_server_row) => {
646                // arbitrarily choosing to update metrics on the the new row
647                metrics.updates.inc();
648                deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
649                if deferred_update.is_none() {
650                    tracing::trace!("capture deferred UpdateNew ({commit_lsn}, {seqval})");
651                    deferred_updates.insert(
652                        (commit_lsn, seqval),
653                        CdcOperation::UpdateNew(seqval, sql_server_row),
654                    );
655                    continue;
656                }
657                // this is overriden below when the updates are ordered
658                (sql_server_row, Diff::ZERO)
659            }
660            CdcOperation::UpdateOld(seqval, sql_server_row) => {
661                deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
662                if deferred_update.is_none() {
663                    tracing::trace!("capture deferred UpdateOld ({commit_lsn}, {seqval})");
664                    deferred_updates.insert(
665                        (commit_lsn, seqval),
666                        CdcOperation::UpdateOld(seqval, sql_server_row),
667                    );
668                    continue;
669                }
670                // this is overriden below when the updates are ordered
671                (sql_server_row, Diff::ZERO)
672            }
673        };
674
675        // Try to decode the input row for each output.
676        for partition_idx in partition_indexes {
677            let decoder = decoder_map.get(partition_idx).unwrap();
678
679            let rewind = rewinds.get(partition_idx);
680            // We must continue here to avoid decoding and emitting. We don't have to compare with
681            // snapshot_lsn as we are guaranteed that initial_lsn <= snapshot_lsn.
682            if rewind.is_some_and(|(initial_lsn, _)| commit_lsn <= *initial_lsn) {
683                continue;
684            }
685
686            let (message, diff) = if let Some(ref deferred_update) = deferred_update {
687                let (old_row, new_row) = match deferred_update {
688                    CdcOperation::UpdateOld(_seqval, row) => (row, &sql_server_row),
689                    CdcOperation::UpdateNew(_seqval, row) => (&sql_server_row, row),
690                    CdcOperation::Insert(_) | CdcOperation::Delete(_) => unreachable!(),
691                };
692
693                let update_old = decode(decoder, old_row, &mut mz_row, &arena, Some(new_row));
694                if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
695                    let update = (
696                        (*partition_idx, update_old.clone()),
697                        Lsn::minimum(),
698                        Diff::ONE,
699                    );
700                    let size = update.fuel_size();
701                    data_output
702                        .give_fueled(&data_cap_set[0], update, size)
703                        .await;
704                }
705                let update = ((*partition_idx, update_old), commit_lsn, Diff::MINUS_ONE);
706                let size = update.fuel_size();
707                data_output
708                    .give_fueled(&data_cap_set[0], update, size)
709                    .await;
710
711                (
712                    decode(decoder, new_row, &mut mz_row, &arena, None),
713                    Diff::ONE,
714                )
715            } else {
716                (
717                    decode(decoder, &sql_server_row, &mut mz_row, &arena, None),
718                    diff,
719                )
720            };
721            assert_ne!(Diff::ZERO, diff);
722            if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
723                let update = ((*partition_idx, message.clone()), Lsn::minimum(), -diff);
724                let size = update.fuel_size();
725                data_output
726                    .give_fueled(&data_cap_set[0], update, size)
727                    .await;
728            }
729            let update = ((*partition_idx, message), commit_lsn, diff);
730            let size = update.fuel_size();
731            data_output
732                .give_fueled(&data_cap_set[0], update, size)
733                .await;
734        }
735    }
736    Ok(())
737}
738
739type StackedAsyncOutputHandle<T, D> =
740    AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<(D, T, Diff)>>>>;
741
742/// Helper method to decode a row from a [`tiberius::Row`] (or 2 of them in the case of update)
743/// to a [`Row`]. This centralizes the decode and mapping to result.
744fn decode(
745    decoder: &SqlServerRowDecoder,
746    row: &tiberius::Row,
747    mz_row: &mut Row,
748    arena: &RowArena,
749    new_row: Option<&tiberius::Row>,
750) -> Result<SourceMessage, DataflowError> {
751    match decoder.decode(row, mz_row, arena, new_row) {
752        Ok(()) => Ok(SourceMessage {
753            key: Row::default(),
754            value: mz_row.clone(),
755            metadata: Row::default(),
756        }),
757        Err(e) => {
758            let kind = DecodeErrorKind::Text(e.to_string().into());
759            // TODO(sql_server2): Get the raw bytes from `tiberius`.
760            let raw = format!("{row:?}");
761            Err(DataflowError::DecodeError(Box::new(DecodeError {
762                kind,
763                raw: raw.as_bytes().to_vec(),
764            })))
765        }
766    }
767}
768
769/// Helper method to return a "definite" error upstream.
770async fn return_definite_error(
771    err: DefiniteError,
772    outputs: impl Iterator<Item = u64>,
773    data_handle: StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
774    data_capset: &CapabilitySet<Lsn>,
775    errs_handle: AsyncOutputHandle<Lsn, CapacityContainerBuilder<Vec<ReplicationError>>>,
776    errs_capset: &CapabilitySet<Lsn>,
777) {
778    for output_idx in outputs {
779        let update = (
780            (output_idx, Err(err.clone().into())),
781            // Select an LSN that should not conflict with a previously observed LSN.  Ideally
782            // we could identify the LSN that resulted in the definite error so that all replicas
783            // would emit the same updates for the same times.
784            Lsn {
785                vlf_id: u32::MAX,
786                block_id: u32::MAX,
787                record_id: u16::MAX,
788            },
789            Diff::ONE,
790        );
791        let size = update.fuel_size();
792        data_handle.give_fueled(&data_capset[0], update, size).await;
793    }
794    errs_handle.give(
795        &errs_capset[0],
796        ReplicationError::DefiniteError(Rc::new(err)),
797    );
798}
799
800/// Provides an implemntation of [`SqlServerCdcMetrics`] that will update [`SqlServerSourceMetrics`]`
801struct PrometheusSqlServerCdcMetrics<'a> {
802    inner: &'a SqlServerSourceMetrics,
803}
804
805impl<'a> SqlServerCdcMetrics for PrometheusSqlServerCdcMetrics<'a> {
806    fn snapshot_table_lock_start(&self, table_name: &str) {
807        self.inner.update_snapshot_table_lock_count(table_name, 1);
808    }
809
810    fn snapshot_table_lock_end(&self, table_name: &str) {
811        self.inner.update_snapshot_table_lock_count(table_name, -1);
812    }
813}