1use std::collections::{BTreeMap, BTreeSet};
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::Instant;
16
17use differential_dataflow::AsCollection;
18use futures::StreamExt;
19use itertools::Itertools;
20use mz_ore::cast::CastFrom;
21use mz_ore::collections::HashMap;
22use mz_ore::future::InTask;
23use mz_repr::{Diff, GlobalId, Row, RowArena};
24use mz_sql_server_util::SqlServerCdcMetrics;
25use mz_sql_server_util::cdc::{CdcEvent, Lsn, Operation as CdcOperation};
26use mz_sql_server_util::desc::SqlServerRowDecoder;
27use mz_sql_server_util::inspect::{
28 ensure_database_cdc_enabled, ensure_sql_server_agent_running, get_latest_restore_history_id,
29};
30use mz_storage_types::dyncfgs::SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY;
31use mz_storage_types::errors::{DataflowError, DecodeError, DecodeErrorKind};
32use mz_storage_types::sources::SqlServerSourceConnection;
33use mz_storage_types::sources::sql_server::{MAX_LSN_WAIT, SNAPSHOT_PROGRESS_REPORT_INTERVAL};
34use mz_timely_util::builder_async::{
35 AsyncOutputHandle, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use mz_timely_util::containers::stack::FueledBuilder;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::operators::vec::Map;
40use timely::dataflow::operators::{CapabilitySet, Concat};
41use timely::dataflow::{Scope, StreamVec};
42use timely::progress::{Antichain, Timestamp};
43
44use crate::metrics::source::sql_server::SqlServerSourceMetrics;
45use crate::source::RawSourceCreationConfig;
46use crate::source::sql_server::{
47 DefiniteError, ReplicationError, SourceOutputInfo, TransientError,
48};
49use crate::source::types::{FuelSize, SignaledFuture, SourceMessage, StackedCollection};
50
51static REPL_READER: &str = "reader";
57
58pub(crate) fn render<'scope>(
59 scope: Scope<'scope, Lsn>,
60 config: RawSourceCreationConfig,
61 outputs: BTreeMap<GlobalId, SourceOutputInfo>,
62 source: SqlServerSourceConnection,
63 metrics: SqlServerSourceMetrics,
64) -> (
65 StackedCollection<'scope, Lsn, (u64, Result<SourceMessage, DataflowError>)>,
66 StreamVec<'scope, Lsn, ReplicationError>,
67 PressOnDropButton,
68) {
69 let op_name = format!("SqlServerReplicationReader({})", config.id);
70 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
71
72 let (data_output, data_stream) = builder.new_output::<FueledBuilder<_>>();
73
74 let (definite_error_handle, definite_errors) =
76 builder.new_output::<CapacityContainerBuilder<_>>();
77
78 let (button, transient_errors) = builder.build_fallible(move |caps| {
79 let busy_signal = Arc::clone(&config.busy_signal);
80 Box::pin(SignaledFuture::new(busy_signal, async move {
81 let [
82 data_cap_set,
83 definite_error_cap_set,
84 ]: &mut [_; 2] = caps.try_into().unwrap();
85
86 let connection_config = source
87 .connection
88 .resolve_config(
89 &config.config.connection_context.secrets_reader,
90 &config.config,
91 InTask::Yes,
92 )
93 .await?;
94 let mut client = mz_sql_server_util::Client::connect(connection_config).await?;
95
96 let worker_id = config.worker_id;
97
98 let mut decoder_map: BTreeMap<_, _> = BTreeMap::new();
100 let mut capture_instance_to_snapshot: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
102 let mut capture_instances: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
104 let mut export_statistics: BTreeMap<_, Vec<_>> = BTreeMap::new();
106 let mut included_columns: HashMap<u64, Vec<Arc<str>>> = HashMap::new();
109
110 for (export_id, output) in outputs.iter() {
111 let key = output.partition_index;
112 if decoder_map.insert(key, Arc::clone(&output.decoder)).is_some() {
113 panic!("Multiple decoders for output index {}", output.partition_index);
114 }
115 let included_cols = output.decoder.included_column_names();
121 included_columns.insert(output.partition_index, included_cols);
122
123 capture_instances
124 .entry(Arc::clone(&output.capture_instance))
125 .or_default()
126 .push(output.partition_index);
127
128 if *output.resume_upper == [Lsn::minimum()] {
129 capture_instance_to_snapshot
130 .entry(Arc::clone(&output.capture_instance))
131 .or_default()
132 .push((output.partition_index, output.initial_lsn));
133 }
134 export_statistics.entry(Arc::clone(&output.capture_instance))
135 .or_default()
136 .push(
137 config
138 .statistics
139 .get(export_id)
140 .expect("statistics have been intialized")
141 .clone(),
142 );
143 }
144
145 metrics.snapshot_table_count.set(u64::cast_from(capture_instance_to_snapshot.len()));
150 if !capture_instance_to_snapshot.is_empty() {
151 for stats in config.statistics.values() {
152 stats.set_snapshot_records_known(0);
153 stats.set_snapshot_records_staged(0);
154 }
155 }
156 if !config.responsible_for(REPL_READER) {
159 return Ok::<_, TransientError>(());
160 }
161
162 let snapshot_instances = capture_instance_to_snapshot
163 .keys()
164 .map(|i| i.as_ref());
165
166 let snapshot_tables =
168 mz_sql_server_util::inspect::get_tables_for_capture_instance(
169 &mut client,
170 snapshot_instances,
171 )
172 .await?;
173
174 let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
176 if current_restore_history_id != source.extras.restore_history_id {
177 if SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY.get(config.config.config_set()) {
178 let definite_error = DefiniteError::RestoreHistoryChanged(
179 source.extras.restore_history_id.clone(),
180 current_restore_history_id.clone()
181 );
182 tracing::warn!(?definite_error, "Restore detected, exiting");
183
184 return_definite_error(
185 definite_error,
186 capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
187 data_output,
188 data_cap_set,
189 definite_error_handle,
190 definite_error_cap_set,
191 ).await;
192 return Ok(());
193 } else {
194 tracing::warn!(
195 "Restore history mismatch ignored: expected={expected:?} actual={actual:?}",
196 expected=source.extras.restore_history_id,
197 actual=current_restore_history_id
198 );
199 }
200 }
201
202 ensure_database_cdc_enabled(&mut client).await?;
205 ensure_sql_server_agent_running(&mut client).await?;
206
207 for table in &snapshot_tables {
211 let qualified_table_name = format!("{schema_name}.{table_name}",
212 schema_name = &table.schema_name,
213 table_name = &table.name);
214 let size_calc_start = Instant::now();
215 let table_total =
216 mz_sql_server_util::inspect::snapshot_size(
217 &mut client,
218 &table.schema_name,
219 &table.name,
220 )
221 .await?;
222 metrics.set_snapshot_table_size_latency(
223 &qualified_table_name,
224 size_calc_start.elapsed().as_secs_f64()
225 );
226 for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
227 export_stat.set_snapshot_records_known(u64::cast_from(table_total));
228 export_stat.set_snapshot_records_staged(0);
229 }
230 }
231 let cdc_metrics = PrometheusSqlServerCdcMetrics{inner: &metrics};
232 let mut cdc_handle = client
233 .cdc(capture_instances.keys().cloned(), cdc_metrics)
234 .max_lsn_wait(MAX_LSN_WAIT.get(config.config.config_set()));
235
236 let snapshot_lsns: BTreeMap<Arc<str>, Lsn> = {
239 cdc_handle.wait_for_ready().await?;
242
243 tracing::info!(%config.worker_id, "timely-{worker_id} upstream is ready");
247
248 let report_interval =
249 SNAPSHOT_PROGRESS_REPORT_INTERVAL.handle(config.config.config_set());
250 let mut last_report = Instant::now();
251 let mut snapshot_lsns = BTreeMap::new();
252
253 for table in snapshot_tables {
254 let (snapshot_lsn, snapshot) = cdc_handle
256 .snapshot(&table, config.worker_id, config.id)
257 .await?;
258
259 tracing::info!(
260 %config.id,
261 %table.name,
262 %table.schema_name,
263 %snapshot_lsn,
264 "timely-{worker_id} snapshot start",
265 );
266
267 let mut snapshot = std::pin::pin!(snapshot);
268
269 snapshot_lsns.insert(
270 Arc::clone(&table.capture_instance.name),
271 snapshot_lsn,
272 );
273
274 let ci_name = &table.capture_instance.name;
275 let partition_indexes = capture_instance_to_snapshot
276 .get(ci_name)
277 .unwrap_or_else(|| {
278 panic!(
279 "no snapshot outputs in known capture \
280 instances [{}] for capture instance: \
281 '{}'",
282 capture_instance_to_snapshot
283 .keys()
284 .join(","),
285 ci_name,
286 );
287 });
288
289 let mut snapshot_staged = 0;
290 while let Some(result) = snapshot.next().await {
291 let sql_server_row =
292 result.map_err(TransientError::from)?;
293
294 if last_report.elapsed() > report_interval.get() {
295 last_report = Instant::now();
296 let stats =
297 export_statistics.get(ci_name).unwrap();
298 for export_stat in stats {
299 export_stat.set_snapshot_records_staged(
300 snapshot_staged,
301 );
302 }
303 }
304
305 for (partition_idx, _) in partition_indexes {
306 let mut mz_row = Row::default();
308 let arena = RowArena::default();
309
310 let decoder = decoder_map
311 .get(partition_idx)
312 .expect("decoder for output");
313 let message = decode(
316 decoder,
317 &sql_server_row,
318 &mut mz_row,
319 &arena,
320 None,
321 );
322 let update =
323 ((*partition_idx, message), Lsn::minimum(), Diff::ONE);
324 let size = update.fuel_size();
325 data_output
326 .give_fueled(&data_cap_set[0], update, size)
327 .await;
328 }
329 snapshot_staged += 1;
330 }
331
332 tracing::info!(
333 %config.id,
334 %table.name,
335 %table.schema_name,
336 %snapshot_lsn,
337 "timely-{worker_id} snapshot complete",
338 );
339 metrics.snapshot_table_count.dec();
340 let stats = export_statistics.get(ci_name).unwrap();
343 for export_stat in stats {
344 export_stat.set_snapshot_records_staged(snapshot_staged);
345 export_stat.set_snapshot_records_known(snapshot_staged);
346 }
347 }
348
349 snapshot_lsns
350 };
351
352 let mut rewinds: BTreeMap<_, _> = capture_instance_to_snapshot
367 .iter()
368 .flat_map(|(capture_instance, export_ids)|{
369 let snapshot_lsn = snapshot_lsns.get(capture_instance).expect("snapshot lsn must be collected for capture instance");
370 export_ids
371 .iter()
372 .map(|(idx, initial_lsn)| (*idx, (*initial_lsn, *snapshot_lsn)))
373 }).collect();
374
375 for (initial_lsn, snapshot_lsn) in rewinds.values() {
381 assert!(
382 initial_lsn <= snapshot_lsn,
383 "initial_lsn={initial_lsn} snapshot_lsn={snapshot_lsn}"
384 );
385 }
386
387 tracing::debug!("rewinds to process: {rewinds:?}");
388
389 capture_instance_to_snapshot.clear();
390
391 let mut resume_lsns = BTreeMap::new();
393 for src_info in outputs.values() {
394 let resume_lsn = match src_info.resume_upper.as_option() {
395 Some(lsn) if *lsn != Lsn::minimum() => *lsn,
396 Some(_) => src_info.initial_lsn.increment(),
400 None => panic!("resume_upper has at least one value"),
401 };
402 resume_lsns.entry(Arc::clone(&src_info.capture_instance))
403 .and_modify(|existing| *existing = std::cmp::min(*existing, resume_lsn))
404 .or_insert(resume_lsn);
405 }
406
407 tracing::info!(%config.id, ?resume_lsns, "timely-{} replication starting", config.worker_id);
408 for instance in capture_instances.keys() {
409 let resume_lsn = resume_lsns
410 .get(instance)
411 .expect("resume_lsn exists for capture instance");
412 cdc_handle = cdc_handle.start_lsn(instance, *resume_lsn);
413 }
414
415 let cdc_stream = cdc_handle
417 .poll_interval(config.timestamp_interval)
418 .into_stream();
419 let mut cdc_stream = std::pin::pin!(cdc_stream);
420
421 let mut errored_partitions = BTreeSet::new();
422
423 let mut log_rewinds_complete = true;
427
428 let mut deferred_updates = BTreeMap::new();
443
444 while let Some(event) = cdc_stream.next().await {
445 let event = event.map_err(TransientError::from)?;
446 tracing::trace!(?config.id, ?event, "got replication event");
447
448 tracing::trace!("deferred_updates = {deferred_updates:?}");
449 match event {
450 CdcEvent::Progress { next_lsn } => {
453 tracing::debug!(?config.id, ?next_lsn, "got a closed lsn");
454 rewinds.retain(|_, (_, snapshot_lsn)| next_lsn <= *snapshot_lsn);
457 if rewinds.is_empty() {
458 if log_rewinds_complete {
459 tracing::debug!("rewinds complete");
460 log_rewinds_complete = false;
461 }
462 data_cap_set.downgrade(Antichain::from_elem(next_lsn));
463 } else {
464 tracing::debug!("rewinds remaining: {:?}", rewinds);
465 }
466
467 if let Some(((deferred_lsn, _seqval), _row)) =
470 deferred_updates.first_key_value()
471 && *deferred_lsn < next_lsn
472 {
473 panic!(
474 "deferred update lsn {deferred_lsn} \
475 < progress lsn {next_lsn}: {:?}",
476 deferred_updates.keys()
477 );
478 }
479
480 }
481 CdcEvent::Data {
483 capture_instance,
484 lsn,
485 changes,
486 } => {
487 let Some(partition_indexes) =
488 capture_instances.get(&capture_instance)
489 else {
490 let definite_error =
491 DefiniteError::ProgrammingError(format!(
492 "capture instance didn't exist: \
493 '{capture_instance}'"
494 ));
495 return_definite_error(
496 definite_error,
497 capture_instances
498 .values()
499 .flat_map(|indexes| {
500 indexes.iter().copied()
501 }),
502 data_output,
503 data_cap_set,
504 definite_error_handle,
505 definite_error_cap_set,
506 )
507 .await;
508 return Ok(());
509 };
510
511 let (valid_partitions, err_partitions) =
512 partition_indexes
513 .iter()
514 .partition::<Vec<u64>, _>(
515 |&partition_idx| {
516 !errored_partitions
517 .contains(partition_idx)
518 },
519 );
520
521 if err_partitions.len() > 0 {
522 metrics.ignored.inc_by(u64::cast_from(changes.len()));
523 }
524
525 handle_data_event(
526 changes,
527 &valid_partitions,
528 &decoder_map,
529 lsn,
530 &rewinds,
531 &data_output,
532 data_cap_set,
533 &metrics,
534 &mut deferred_updates,
535 ).await?
536 },
537 CdcEvent::SchemaUpdate {
538 capture_instance,
539 table,
540 ddl_event,
541 } => {
542 let Some(partition_indexes) =
543 capture_instances.get(&capture_instance)
544 else {
545 let definite_error =
546 DefiniteError::ProgrammingError(format!(
547 "capture instance didn't exist: \
548 '{capture_instance}'"
549 ));
550 return_definite_error(
551 definite_error,
552 capture_instances
553 .values()
554 .flat_map(|indexes| {
555 indexes.iter().copied()
556 }),
557 data_output,
558 data_cap_set,
559 definite_error_handle,
560 definite_error_cap_set,
561 )
562 .await;
563 return Ok(());
564 };
565 let error =
566 DefiniteError::IncompatibleSchemaChange(
567 capture_instance.to_string(),
568 table.to_string(),
569 );
570 for partition_idx in partition_indexes {
571 let cols = included_columns
572 .get(partition_idx)
573 .unwrap_or_else(|| {
574 panic!(
575 "Partition index didn't \
576 exist: '{partition_idx}'"
577 )
578 });
579 if !errored_partitions
580 .contains(partition_idx)
581 && !ddl_event.is_compatible(cols)
582 {
583 let msg = Err(
584 error.clone().into(),
585 );
586 let update = (
587 (*partition_idx, msg),
588 ddl_event.lsn,
589 Diff::ONE,
590 );
591 let size = update.fuel_size();
592 data_output
593 .give_fueled(&data_cap_set[0], update, size)
594 .await;
595 errored_partitions.insert(*partition_idx);
596 }
597 }
598 }
599 };
600 }
601 Err(TransientError::ReplicationEOF)
602 }))
603 });
604
605 let error_stream = definite_errors.concat(transient_errors.map(ReplicationError::Transient));
606
607 (
608 data_stream.as_collection(),
609 error_stream,
610 button.press_on_drop(),
611 )
612}
613
614async fn handle_data_event(
615 changes: Vec<CdcOperation>,
616 partition_indexes: &[u64],
617 decoder_map: &BTreeMap<u64, Arc<SqlServerRowDecoder>>,
618 commit_lsn: Lsn,
619 rewinds: &BTreeMap<u64, (Lsn, Lsn)>,
620 data_output: &StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
621 data_cap_set: &CapabilitySet<Lsn>,
622 metrics: &SqlServerSourceMetrics,
623 deferred_updates: &mut BTreeMap<(Lsn, Lsn), CdcOperation>,
624) -> Result<(), TransientError> {
625 let mut mz_row = Row::default();
626 let arena = RowArena::default();
627
628 for change in changes {
629 let mut deferred_update: Option<_> = None;
633 let (sql_server_row, diff): (_, _) = match change {
634 CdcOperation::Insert(sql_server_row) => {
635 metrics.inserts.inc();
636 (sql_server_row, Diff::ONE)
637 }
638 CdcOperation::Delete(sql_server_row) => {
639 metrics.deletes.inc();
640 (sql_server_row, Diff::MINUS_ONE)
641 }
642
643 CdcOperation::UpdateNew(seqval, sql_server_row) => {
646 metrics.updates.inc();
648 deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
649 if deferred_update.is_none() {
650 tracing::trace!("capture deferred UpdateNew ({commit_lsn}, {seqval})");
651 deferred_updates.insert(
652 (commit_lsn, seqval),
653 CdcOperation::UpdateNew(seqval, sql_server_row),
654 );
655 continue;
656 }
657 (sql_server_row, Diff::ZERO)
659 }
660 CdcOperation::UpdateOld(seqval, sql_server_row) => {
661 deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
662 if deferred_update.is_none() {
663 tracing::trace!("capture deferred UpdateOld ({commit_lsn}, {seqval})");
664 deferred_updates.insert(
665 (commit_lsn, seqval),
666 CdcOperation::UpdateOld(seqval, sql_server_row),
667 );
668 continue;
669 }
670 (sql_server_row, Diff::ZERO)
672 }
673 };
674
675 for partition_idx in partition_indexes {
677 let decoder = decoder_map.get(partition_idx).unwrap();
678
679 let rewind = rewinds.get(partition_idx);
680 if rewind.is_some_and(|(initial_lsn, _)| commit_lsn <= *initial_lsn) {
683 continue;
684 }
685
686 let (message, diff) = if let Some(ref deferred_update) = deferred_update {
687 let (old_row, new_row) = match deferred_update {
688 CdcOperation::UpdateOld(_seqval, row) => (row, &sql_server_row),
689 CdcOperation::UpdateNew(_seqval, row) => (&sql_server_row, row),
690 CdcOperation::Insert(_) | CdcOperation::Delete(_) => unreachable!(),
691 };
692
693 let update_old = decode(decoder, old_row, &mut mz_row, &arena, Some(new_row));
694 if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
695 let update = (
696 (*partition_idx, update_old.clone()),
697 Lsn::minimum(),
698 Diff::ONE,
699 );
700 let size = update.fuel_size();
701 data_output
702 .give_fueled(&data_cap_set[0], update, size)
703 .await;
704 }
705 let update = ((*partition_idx, update_old), commit_lsn, Diff::MINUS_ONE);
706 let size = update.fuel_size();
707 data_output
708 .give_fueled(&data_cap_set[0], update, size)
709 .await;
710
711 (
712 decode(decoder, new_row, &mut mz_row, &arena, None),
713 Diff::ONE,
714 )
715 } else {
716 (
717 decode(decoder, &sql_server_row, &mut mz_row, &arena, None),
718 diff,
719 )
720 };
721 assert_ne!(Diff::ZERO, diff);
722 if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
723 let update = ((*partition_idx, message.clone()), Lsn::minimum(), -diff);
724 let size = update.fuel_size();
725 data_output
726 .give_fueled(&data_cap_set[0], update, size)
727 .await;
728 }
729 let update = ((*partition_idx, message), commit_lsn, diff);
730 let size = update.fuel_size();
731 data_output
732 .give_fueled(&data_cap_set[0], update, size)
733 .await;
734 }
735 }
736 Ok(())
737}
738
739type StackedAsyncOutputHandle<T, D> =
740 AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<(D, T, Diff)>>>>;
741
742fn decode(
745 decoder: &SqlServerRowDecoder,
746 row: &tiberius::Row,
747 mz_row: &mut Row,
748 arena: &RowArena,
749 new_row: Option<&tiberius::Row>,
750) -> Result<SourceMessage, DataflowError> {
751 match decoder.decode(row, mz_row, arena, new_row) {
752 Ok(()) => Ok(SourceMessage {
753 key: Row::default(),
754 value: mz_row.clone(),
755 metadata: Row::default(),
756 }),
757 Err(e) => {
758 let kind = DecodeErrorKind::Text(e.to_string().into());
759 let raw = format!("{row:?}");
761 Err(DataflowError::DecodeError(Box::new(DecodeError {
762 kind,
763 raw: raw.as_bytes().to_vec(),
764 })))
765 }
766 }
767}
768
769async fn return_definite_error(
771 err: DefiniteError,
772 outputs: impl Iterator<Item = u64>,
773 data_handle: StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
774 data_capset: &CapabilitySet<Lsn>,
775 errs_handle: AsyncOutputHandle<Lsn, CapacityContainerBuilder<Vec<ReplicationError>>>,
776 errs_capset: &CapabilitySet<Lsn>,
777) {
778 for output_idx in outputs {
779 let update = (
780 (output_idx, Err(err.clone().into())),
781 Lsn {
785 vlf_id: u32::MAX,
786 block_id: u32::MAX,
787 record_id: u16::MAX,
788 },
789 Diff::ONE,
790 );
791 let size = update.fuel_size();
792 data_handle.give_fueled(&data_capset[0], update, size).await;
793 }
794 errs_handle.give(
795 &errs_capset[0],
796 ReplicationError::DefiniteError(Rc::new(err)),
797 );
798}
799
800struct PrometheusSqlServerCdcMetrics<'a> {
802 inner: &'a SqlServerSourceMetrics,
803}
804
805impl<'a> SqlServerCdcMetrics for PrometheusSqlServerCdcMetrics<'a> {
806 fn snapshot_table_lock_start(&self, table_name: &str) {
807 self.inner.update_snapshot_table_lock_count(table_name, 1);
808 }
809
810 fn snapshot_table_lock_end(&self, table_name: &str) {
811 self.inner.update_snapshot_table_lock_count(table_name, -1);
812 }
813}