mz_storage/source/sql_server/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`SqlServerSourceConnection`].
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::convert::Infallible;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::containers::TimelyStack;
20use futures::StreamExt;
21use itertools::Itertools;
22use mz_ore::cast::CastFrom;
23use mz_ore::future::InTask;
24use mz_repr::{Diff, GlobalId, Row, RowArena};
25use mz_sql_server_util::SqlServerCdcMetrics;
26use mz_sql_server_util::cdc::{CdcEvent, Lsn, Operation as CdcOperation};
27use mz_sql_server_util::desc::SqlServerRowDecoder;
28use mz_sql_server_util::inspect::{
29    ensure_database_cdc_enabled, ensure_sql_server_agent_running, get_latest_restore_history_id,
30};
31use mz_storage_types::dyncfgs::SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY;
32use mz_storage_types::errors::{DataflowError, DecodeError, DecodeErrorKind};
33use mz_storage_types::sources::SqlServerSourceConnection;
34use mz_storage_types::sources::sql_server::{
35    CDC_POLL_INTERVAL, MAX_LSN_WAIT, SNAPSHOT_PROGRESS_REPORT_INTERVAL,
36};
37use mz_timely_util::builder_async::{
38    AsyncOutputHandle, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
39};
40use mz_timely_util::containers::stack::AccountedStackBuilder;
41use timely::container::CapacityContainerBuilder;
42use timely::dataflow::operators::{CapabilitySet, Concat, Map};
43use timely::dataflow::{Scope, Stream as TimelyStream};
44use timely::progress::{Antichain, Timestamp};
45
46use crate::metrics::source::sql_server::SqlServerSourceMetrics;
47use crate::source::RawSourceCreationConfig;
48use crate::source::sql_server::{
49    DefiniteError, ReplicationError, SourceOutputInfo, TransientError,
50};
51use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
52
53/// Used as a partition ID to determine the worker that is responsible for
54/// reading data from SQL Server.
55///
56/// TODO(sql_server2): It's possible we could have different workers
57/// replicate different tables, if we're using SQL Server's CDC features.
58static REPL_READER: &str = "reader";
59
60pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
61    scope: G,
62    config: RawSourceCreationConfig,
63    outputs: BTreeMap<GlobalId, SourceOutputInfo>,
64    source: SqlServerSourceConnection,
65    metrics: SqlServerSourceMetrics,
66) -> (
67    StackedCollection<G, (u64, Result<SourceMessage, DataflowError>)>,
68    TimelyStream<G, Infallible>,
69    TimelyStream<G, ReplicationError>,
70    PressOnDropButton,
71) {
72    let op_name = format!("SqlServerReplicationReader({})", config.id);
73    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
74
75    let (data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
76    let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
77
78    // Captures DefiniteErrors that affect the entire source, including all outputs
79    let (definite_error_handle, definite_errors) =
80        builder.new_output::<CapacityContainerBuilder<_>>();
81
82    let (button, transient_errors) = builder.build_fallible(move |caps| {
83        let busy_signal = Arc::clone(&config.busy_signal);
84        Box::pin(SignaledFuture::new(busy_signal, async move {
85            let [
86                data_cap_set,
87                upper_cap_set,
88                definite_error_cap_set,
89            ]: &mut [_; 3] = caps.try_into().unwrap();
90
91            let connection_config = source
92                .connection
93                .resolve_config(
94                    &config.config.connection_context.secrets_reader,
95                    &config.config,
96                    InTask::Yes,
97                )
98                .await?;
99            let mut client = mz_sql_server_util::Client::connect(connection_config).await?;
100
101            let worker_id = config.worker_id;
102
103            // The decoder is specific to the export, and each export pulls data from a specific capture instance.
104            let mut decoder_map: BTreeMap<_, _> = BTreeMap::new();
105            // Maps the 'capture instance' to the output index for only those outputs that this worker will snapshot
106            let mut capture_instance_to_snapshot: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
107            // Maps the 'capture instance' to the output index for all outputs of this worker
108            let mut capture_instances: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
109            // Export statistics for a given capture instance
110            let mut export_statistics: BTreeMap<_, Vec<_>> = BTreeMap::new();
111
112            for (export_id, output) in outputs.iter() {
113                if decoder_map.insert(output.partition_index, Arc::clone(&output.decoder)).is_some() {
114                    panic!("Multiple decoders for output index {}", output.partition_index);
115                }
116                capture_instances
117                    .entry(Arc::clone(&output.capture_instance))
118                    .or_default()
119                    .push(output.partition_index);
120
121                if *output.resume_upper == [Lsn::minimum()] {
122                    capture_instance_to_snapshot
123                        .entry(Arc::clone(&output.capture_instance))
124                        .or_default()
125                        .push((output.partition_index, output.initial_lsn));
126                }
127                export_statistics.entry(Arc::clone(&output.capture_instance))
128                    .or_default()
129                    .push(
130                        config
131                            .statistics
132                            .get(export_id)
133                            .expect("statistics have been intialized")
134                            .clone(),
135                    );
136            }
137
138            // Eagerly emit an event if we have tables to snapshot.
139            // A worker *must* emit a count even if not responsible for snapshotting a table
140            // as statistic summarization will return null if any worker hasn't set a value.
141            // This will also reset snapshot stats for any exports not snapshotting.
142            metrics.snapshot_table_count.set(u64::cast_from(capture_instance_to_snapshot.len()));
143            if !capture_instance_to_snapshot.is_empty() {
144                for stats in config.statistics.values() {
145                    stats.set_snapshot_records_known(0);
146                    stats.set_snapshot_records_staged(0);
147                }
148            }
149            // We need to emit statistics before we exit
150            // TODO(sql_server2): Run ingestions across multiple workers.
151            if !config.responsible_for(REPL_READER) {
152                return Ok::<_, TransientError>(());
153            }
154
155            let snapshot_instances = capture_instance_to_snapshot
156                    .keys()
157                    .map(|i| i.as_ref());
158
159            // TODO (maz): we can avoid this query by using SourceOutputInfo
160            let snapshot_tables = mz_sql_server_util::inspect::get_tables_for_capture_instance(&mut client, snapshot_instances).await?;
161
162            // validate that the restore_history_id hasn't changed
163            let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
164            if current_restore_history_id != source.extras.restore_history_id {
165                if SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY.get(config.config.config_set()) {
166                    let definite_error = DefiniteError::RestoreHistoryChanged(
167                        source.extras.restore_history_id.clone(),
168                        current_restore_history_id.clone()
169                    );
170                    tracing::warn!(?definite_error, "Restore detected, exiting");
171
172                    return_definite_error(
173                            definite_error,
174                            capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
175                            data_output,
176                            data_cap_set,
177                            definite_error_handle,
178                            definite_error_cap_set,
179                        ).await;
180                    return Ok(());
181                } else {
182                    tracing::warn!(
183                        "Restore history mismatch ignored: expected={expected:?} actual={actual:?}",
184                        expected=source.extras.restore_history_id,
185                        actual=current_restore_history_id
186                    );
187                }
188            }
189
190            // For AOAG, it's possible that the dataflow restarted and is now connected to a
191            // different SQL Server, which may not have CDC enabled correctly.
192            ensure_database_cdc_enabled(&mut client).await?;
193            ensure_sql_server_agent_running(&mut client).await?;
194
195            // We first calculate all the total rows we need to fetch across all tables. Since this
196            // happens outside the snapshot transaction the totals might be off, so we won't assert
197            // that we get exactly this many rows later.
198            for table in &snapshot_tables {
199                let qualified_table_name = format!("{schema_name}.{table_name}",
200                    schema_name = &table.schema_name,
201                    table_name = &table.name);
202                let size_calc_start = Instant::now();
203                let table_total = mz_sql_server_util::inspect::snapshot_size(&mut client, &table.schema_name, &table.name).await?;
204                metrics.set_snapshot_table_size_latency(
205                    &qualified_table_name,
206                    size_calc_start.elapsed().as_secs_f64()
207                );
208                for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
209                    export_stat.set_snapshot_records_known(u64::cast_from(table_total));
210                    export_stat.set_snapshot_records_staged(0);
211                }
212            }
213            let cdc_metrics = PrometheusSqlServerCdcMetrics{inner: &metrics};
214            let mut cdc_handle = client
215                .cdc(capture_instances.keys().cloned(), cdc_metrics)
216                .max_lsn_wait(MAX_LSN_WAIT.get(config.config.config_set()));
217
218            // Snapshot any instance that requires it.
219            // Each table snapshot will have its own LSN captured at the moment of snapshotting.
220            let snapshot_lsns: BTreeMap<Arc<str>, Lsn> = {
221                // Before starting a transaction where the LSN will not advance, ensure
222                // the upstream DB is ready for CDC.
223                cdc_handle.wait_for_ready().await?;
224
225                // Intentionally logging this at info for debugging. This section won't get entered
226                // often, but if there are problems here, it will be much easier to troubleshoot
227                // knowing where stall/hang might be happening.
228                tracing::info!(%config.worker_id, "timely-{worker_id} upstream is ready");
229
230                let report_interval =
231                    SNAPSHOT_PROGRESS_REPORT_INTERVAL.handle(config.config.config_set());
232                let mut last_report = Instant::now();
233                let mut snapshot_lsns = BTreeMap::new();
234                let arena = RowArena::default();
235
236                for table in snapshot_tables {
237                    // TODO(sql_server3): filter columns to only select columns required for Source.
238                    let (snapshot_lsn, snapshot) = cdc_handle
239                        .snapshot(&table, config.worker_id, config.id)
240                        .await?;
241
242                    tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot start");
243
244                    let mut snapshot = std::pin::pin!(snapshot);
245
246                    snapshot_lsns.insert(Arc::clone(&table.capture_instance.name), snapshot_lsn);
247
248                    let partition_indexes = capture_instance_to_snapshot.get(&table.capture_instance.name)
249                        .unwrap_or_else(|| {
250                            panic!("no snapshot outputs in known capture instances [{}] for capture instance: '{}'", capture_instance_to_snapshot.keys().join(","), table.capture_instance.name);
251                        });
252
253                    let mut snapshot_staged = 0;
254                    while let Some(result) = snapshot.next().await {
255                        let sql_server_row = result.map_err(TransientError::from)?;
256
257                        if last_report.elapsed() > report_interval.get() {
258                            last_report = Instant::now();
259                            for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
260                                export_stat.set_snapshot_records_staged(snapshot_staged);
261                            }
262                        }
263
264                        for (partition_idx, _) in partition_indexes {
265                            // Decode the SQL Server row into an MZ one.
266                            let mut mz_row = Row::default();
267
268                            let decoder = decoder_map.get(partition_idx).expect("decoder for output");
269                            // Try to decode a row, returning a SourceError if it fails.
270                            let message = decode(decoder, &sql_server_row, &mut mz_row, &arena, None);
271                            data_output
272                                .give_fueled(
273                                    &data_cap_set[0],
274                                    ((*partition_idx, message), Lsn::minimum(), Diff::ONE),
275                                )
276                                .await;
277                        }
278                        snapshot_staged += 1;
279                    }
280
281                    tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot complete");
282                    metrics.snapshot_table_count.dec();
283                    // final update for snapshot_staged, using the staged values as the total is an estimate
284                    for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
285                        export_stat.set_snapshot_records_staged(snapshot_staged);
286                        export_stat.set_snapshot_records_known(snapshot_staged);
287                    }
288                }
289
290                snapshot_lsns
291            };
292
293            // Rewinds need to keep track of 2 timestamps to ensure that
294            // all replicas emit the same set of updates for any given timestamp.
295            // These are the initial_lsn and snapshot_lsn, where initial_lsn must be
296            // less than or equal to snapshot_lsn.
297            //
298            // - events at an LSN less than or equal to initial_lsn are ignored
299            // - events at an LSN greater than initial_lsn and less than or equal to
300            //   snapshot_lsn are retracted at Lsn::minimum(), and emitted at the commit_lsn
301            // - events at an LSN greater than snapshot_lsn are emitted at the commit_lsn
302            //
303            // where the commit_lsn is the upstream LSN that the event was committed at
304            //
305            // If initial_lsn == snapshot_lsn, all CDC events at LSNs up to and including the
306            // snapshot_lsn are ignored, and no rewinds are issued.
307            let mut rewinds: BTreeMap<_, _> = capture_instance_to_snapshot
308                .iter()
309                .flat_map(|(capture_instance, export_ids)|{
310                    let snapshot_lsn = snapshot_lsns.get(capture_instance).expect("snapshot lsn must be collected for capture instance");
311                    export_ids
312                        .iter()
313                        .map(|(idx, initial_lsn)| (*idx, (*initial_lsn, *snapshot_lsn)))
314                }).collect();
315
316            // For now, we assert that initial_lsn captured during purification is less
317            // than or equal to snapshot_lsn. If that was not true, it would mean that
318            // we observed a SQL server DB that appeared to go back in time.
319            // TODO (maz): not ideal to do this after snapshot, move this into
320            // CdcStream::snapshot after https://github.com/MaterializeInc/materialize/pull/32979 is merged.
321            for (initial_lsn, snapshot_lsn) in rewinds.values() {
322                assert!(
323                    initial_lsn <= snapshot_lsn,
324                    "initial_lsn={initial_lsn} snapshot_lsn={snapshot_lsn}"
325                );
326            }
327
328            tracing::debug!("rewinds to process: {rewinds:?}");
329
330            capture_instance_to_snapshot.clear();
331
332            // Resumption point is the minimum LSN that has been observed per capture instance.
333            let mut resume_lsns = BTreeMap::new();
334            for src_info in outputs.values() {
335                let resume_lsn = match src_info.resume_upper.as_option() {
336                    Some(lsn) if *lsn != Lsn::minimum() => *lsn,
337                    // initial_lsn is the max lsn observed, but the resume lsn
338                    // is the next lsn that should be read.  After a snapshot, initial_lsn
339                    // has been read, so replication will start at the next available lsn.
340                    Some(_) => src_info.initial_lsn.increment(),
341                    None => panic!("resume_upper has at least one value"),
342                };
343                resume_lsns.entry(Arc::clone(&src_info.capture_instance))
344                    .and_modify(|existing| *existing = std::cmp::min(*existing, resume_lsn))
345                    .or_insert(resume_lsn);
346            }
347
348            tracing::info!(%config.id, ?resume_lsns, "timely-{} replication starting", config.worker_id);
349            for instance in capture_instances.keys() {
350                let resume_lsn = resume_lsns
351                    .get(instance)
352                    .expect("resume_lsn exists for capture instance");
353                cdc_handle = cdc_handle.start_lsn(instance, *resume_lsn);
354            }
355
356            // Off to the races! Replicate data from SQL Server.
357            let cdc_stream = cdc_handle
358                .poll_interval(CDC_POLL_INTERVAL.get(config.config.config_set()))
359                .into_stream();
360            let mut cdc_stream = std::pin::pin!(cdc_stream);
361
362            let mut errored_instances = BTreeSet::new();
363
364            // TODO(sql_server2): We should emit `ProgressStatisticsUpdate::SteadyState` messages
365            // here, when we receive progress events. What stops us from doing this now is our
366            // 10-byte LSN doesn't fit into the 8-byte integer that the progress event uses.
367            let mut log_rewinds_complete = true;
368
369            // deferred_updates temporarily stores rows for UPDATE operation to support Large Object
370            // Data (LOD) types (i.e. varchar(max), nvarchar(max)). The value of a
371            // LOD column will be NULL for the old row (operation = 3) if the value of the
372            // field did not change. The field data will be available in the new row
373            // (operation = 4).
374            // The CDC stream implementation emits a [`CdcEvent::Data`] event, which contains a
375            // batch of operations.  There is no guarantee that both old and new rows will
376            // exist in a single batch, so deferred updates must be tracked across multiple data
377            // events.
378            //
379            // In the current implementation schema change events won't be emitted between old
380            // and new rows.
381            //
382            // See <https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/cdc-capture-instance-ct-transact-sql?view=sql-server-ver17#large-object-data-types>
383            let mut deferred_updates = BTreeMap::new();
384
385            while let Some(event) = cdc_stream.next().await {
386                let event = event.map_err(TransientError::from)?;
387                tracing::trace!(?config.id, ?event, "got replication event");
388
389                tracing::trace!("deferred_updates = {deferred_updates:?}");
390                match event {
391                    // We've received all of the changes up-to this LSN, so
392                    // downgrade our capability.
393                    CdcEvent::Progress { next_lsn } => {
394                        tracing::debug!(?config.id, ?next_lsn, "got a closed lsn");
395                        // cannot downgrade capability until rewinds have been processed,
396                        // we must be able to produce data at the minimum offset.
397                        rewinds.retain(|_, (_, snapshot_lsn)| next_lsn <= *snapshot_lsn);
398                        if rewinds.is_empty() {
399                            if log_rewinds_complete {
400                                tracing::debug!("rewinds complete");
401                                log_rewinds_complete = false;
402                            }
403                            data_cap_set.downgrade(Antichain::from_elem(next_lsn));
404                        } else {
405                            tracing::debug!("rewinds remaining: {:?}", rewinds);
406                        }
407
408                        // Events are emitted in LSN order for a given capture instance, if any
409                        // deferred updates remain when the LSN progresses, it is a bug.
410                        if let Some(((deferred_lsn, _seqval), _row)) = deferred_updates.first_key_value()
411                            && *deferred_lsn < next_lsn
412                        {
413                            panic!(
414                                "deferred update lsn {deferred_lsn} < progress lsn {next_lsn}: {:?}",
415                                deferred_updates.keys()
416                            );
417                        }
418
419                        upper_cap_set.downgrade(Antichain::from_elem(next_lsn));
420                    }
421                    // We've got new data! Let's process it.
422                    CdcEvent::Data {
423                        capture_instance,
424                        lsn,
425                        changes,
426                    } => {
427                        if errored_instances.contains(&capture_instance) {
428                            // outputs for this captured instance are in an errored state, so they are not
429                            // emitted
430                            metrics.ignored.inc_by(u64::cast_from(changes.len()));
431                        }
432
433                        let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
434                            let definite_error = DefiniteError::ProgrammingError(format!(
435                                "capture instance didn't exist: '{capture_instance}'"
436                            ));
437                            return_definite_error(
438                                definite_error,
439                                capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
440                                data_output,
441                                data_cap_set,
442                                definite_error_handle,
443                                definite_error_cap_set,
444                            )
445                            .await;
446                            return Ok(());
447                        };
448
449
450                        handle_data_event(
451                            changes,
452                            partition_indexes,
453                            &decoder_map,
454                            lsn,
455                            &rewinds,
456                            &data_output,
457                            data_cap_set,
458                            &metrics,
459                            &mut deferred_updates,
460                        ).await?
461                    },
462                    CdcEvent::SchemaUpdate { capture_instance, table, ddl_event } => {
463                        if !errored_instances.contains(&capture_instance)
464                            && !ddl_event.is_compatible() {
465                            let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
466                                let definite_error = DefiniteError::ProgrammingError(format!(
467                                    "capture instance didn't exist: '{capture_instance}'"
468                                ));
469                                return_definite_error(
470                                    definite_error,
471                                    capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
472                                    data_output,
473                                    data_cap_set,
474                                    definite_error_handle,
475                                    definite_error_cap_set,
476                                )
477                                .await;
478                                return Ok(());
479                            };
480                            let error = DefiniteError::IncompatibleSchemaChange(
481                                capture_instance.to_string(),
482                                table.to_string()
483                            );
484                            for partition_idx in partition_indexes {
485                                data_output
486                                    .give_fueled(
487                                        &data_cap_set[0],
488                                        ((*partition_idx, Err(error.clone().into())), ddl_event.lsn, Diff::ONE),
489                                    )
490                                    .await;
491                            }
492                            errored_instances.insert(capture_instance);
493                        }
494                    }
495                };
496            }
497            Err(TransientError::ReplicationEOF)
498        }))
499    });
500
501    let error_stream = definite_errors.concat(&transient_errors.map(ReplicationError::Transient));
502
503    (
504        data_stream.as_collection(),
505        upper_stream,
506        error_stream,
507        button.press_on_drop(),
508    )
509}
510
511async fn handle_data_event(
512    changes: Vec<CdcOperation>,
513    partition_indexes: &[u64],
514    decoder_map: &BTreeMap<u64, Arc<SqlServerRowDecoder>>,
515    commit_lsn: Lsn,
516    rewinds: &BTreeMap<u64, (Lsn, Lsn)>,
517    data_output: &StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
518    data_cap_set: &CapabilitySet<Lsn>,
519    metrics: &SqlServerSourceMetrics,
520    deferred_updates: &mut BTreeMap<(Lsn, Lsn), CdcOperation>,
521) -> Result<(), TransientError> {
522    let mut mz_row = Row::default();
523    let arena = RowArena::default();
524
525    for change in changes {
526        // deferred_update is only valid for single iteration of the loop.  It is set once both
527        // old and new update rows are seen. It will be decoded and emitted to appropriate outputs.
528        // Its life now fullfilled, it will return to whence it came.
529        let mut deferred_update: Option<_> = None;
530        let (sql_server_row, diff): (_, _) = match change {
531            CdcOperation::Insert(sql_server_row) => {
532                metrics.inserts.inc();
533                (sql_server_row, Diff::ONE)
534            }
535            CdcOperation::Delete(sql_server_row) => {
536                metrics.deletes.inc();
537                (sql_server_row, Diff::MINUS_ONE)
538            }
539
540            // Updates are not ordered by seqval, so either old or new row could be observed first.
541            // The first update row is stashed, when the second arrives, both are processed.
542            CdcOperation::UpdateNew(seqval, sql_server_row) => {
543                // arbitrarily choosing to update metrics on the the new row
544                metrics.updates.inc();
545                deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
546                if deferred_update.is_none() {
547                    tracing::trace!("capture deferred UpdateNew ({commit_lsn}, {seqval})");
548                    deferred_updates.insert(
549                        (commit_lsn, seqval),
550                        CdcOperation::UpdateNew(seqval, sql_server_row),
551                    );
552                    continue;
553                }
554                // this is overriden below when the updates are ordered
555                (sql_server_row, Diff::ZERO)
556            }
557            CdcOperation::UpdateOld(seqval, sql_server_row) => {
558                deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
559                if deferred_update.is_none() {
560                    tracing::trace!("capture deferred UpdateOld ({commit_lsn}, {seqval})");
561                    deferred_updates.insert(
562                        (commit_lsn, seqval),
563                        CdcOperation::UpdateOld(seqval, sql_server_row),
564                    );
565                    continue;
566                }
567                // this is overriden below when the updates are ordered
568                (sql_server_row, Diff::ZERO)
569            }
570        };
571
572        // Try to decode the input row for each output.
573        for partition_idx in partition_indexes {
574            let decoder = decoder_map.get(partition_idx).unwrap();
575
576            let rewind = rewinds.get(partition_idx);
577            // We must continue here to avoid decoding and emitting. We don't have to compare with
578            // snapshot_lsn as we are guaranteed that initial_lsn <= snapshot_lsn.
579            if rewind.is_some_and(|(initial_lsn, _)| commit_lsn <= *initial_lsn) {
580                continue;
581            }
582
583            let (message, diff) = if let Some(ref deferred_update) = deferred_update {
584                let (old_row, new_row) = match deferred_update {
585                    CdcOperation::UpdateOld(_seqval, row) => (row, &sql_server_row),
586                    CdcOperation::UpdateNew(_seqval, row) => (&sql_server_row, row),
587                    CdcOperation::Insert(_) | CdcOperation::Delete(_) => unreachable!(),
588                };
589
590                let update_old = decode(decoder, old_row, &mut mz_row, &arena, Some(new_row));
591                if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
592                    data_output
593                        .give_fueled(
594                            &data_cap_set[0],
595                            (
596                                (*partition_idx, update_old.clone()),
597                                Lsn::minimum(),
598                                Diff::ONE,
599                            ),
600                        )
601                        .await;
602                }
603                data_output
604                    .give_fueled(
605                        &data_cap_set[0],
606                        ((*partition_idx, update_old), commit_lsn, Diff::MINUS_ONE),
607                    )
608                    .await;
609
610                (
611                    decode(decoder, new_row, &mut mz_row, &arena, None),
612                    Diff::ONE,
613                )
614            } else {
615                (
616                    decode(decoder, &sql_server_row, &mut mz_row, &arena, None),
617                    diff,
618                )
619            };
620            assert_ne!(Diff::ZERO, diff);
621            if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
622                data_output
623                    .give_fueled(
624                        &data_cap_set[0],
625                        ((*partition_idx, message.clone()), Lsn::minimum(), -diff),
626                    )
627                    .await;
628            }
629            data_output
630                .give_fueled(
631                    &data_cap_set[0],
632                    ((*partition_idx, message), commit_lsn, diff),
633                )
634                .await;
635        }
636    }
637    Ok(())
638}
639
640type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
641    T,
642    AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
643>;
644
645/// Helper method to decode a row from a [`tiberius::Row`] (or 2 of them in the case of update)
646/// to a [`Row`]. This centralizes the decode and mapping to result.
647fn decode(
648    decoder: &SqlServerRowDecoder,
649    row: &tiberius::Row,
650    mz_row: &mut Row,
651    arena: &RowArena,
652    new_row: Option<&tiberius::Row>,
653) -> Result<SourceMessage, DataflowError> {
654    match decoder.decode(row, mz_row, arena, new_row) {
655        Ok(()) => Ok(SourceMessage {
656            key: Row::default(),
657            value: mz_row.clone(),
658            metadata: Row::default(),
659        }),
660        Err(e) => {
661            let kind = DecodeErrorKind::Text(e.to_string().into());
662            // TODO(sql_server2): Get the raw bytes from `tiberius`.
663            let raw = format!("{row:?}");
664            Err(DataflowError::DecodeError(Box::new(DecodeError {
665                kind,
666                raw: raw.as_bytes().to_vec(),
667            })))
668        }
669    }
670}
671
672/// Helper method to return a "definite" error upstream.
673async fn return_definite_error(
674    err: DefiniteError,
675    outputs: impl Iterator<Item = u64>,
676    data_handle: StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
677    data_capset: &CapabilitySet<Lsn>,
678    errs_handle: AsyncOutputHandle<Lsn, CapacityContainerBuilder<Vec<ReplicationError>>>,
679    errs_capset: &CapabilitySet<Lsn>,
680) {
681    for output_idx in outputs {
682        let update = (
683            (output_idx, Err(err.clone().into())),
684            // Select an LSN that should not conflict with a previously observed LSN.  Ideally
685            // we could identify the LSN that resulted in the definite error so that all replicas
686            // would emit the same updates for the same times.
687            Lsn {
688                vlf_id: u32::MAX,
689                block_id: u32::MAX,
690                record_id: u16::MAX,
691            },
692            Diff::ONE,
693        );
694        data_handle.give_fueled(&data_capset[0], update).await;
695    }
696    errs_handle.give(
697        &errs_capset[0],
698        ReplicationError::DefiniteError(Rc::new(err)),
699    );
700}
701
702/// Provides an implemntation of [`SqlServerCdcMetrics`] that will update [`SqlServerSourceMetrics`]`
703struct PrometheusSqlServerCdcMetrics<'a> {
704    inner: &'a SqlServerSourceMetrics,
705}
706
707impl<'a> SqlServerCdcMetrics for PrometheusSqlServerCdcMetrics<'a> {
708    fn snapshot_table_lock_start(&self, table_name: &str) {
709        self.inner.update_snapshot_table_lock_count(table_name, 1);
710    }
711
712    fn snapshot_table_lock_end(&self, table_name: &str) {
713        self.inner.update_snapshot_table_lock_count(table_name, -1);
714    }
715}