1use std::collections::{BTreeMap, BTreeSet};
13use std::convert::Infallible;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::containers::TimelyStack;
20use futures::StreamExt;
21use itertools::Itertools;
22use mz_ore::cast::CastFrom;
23use mz_ore::future::InTask;
24use mz_repr::{Diff, GlobalId, Row, RowArena};
25use mz_sql_server_util::SqlServerCdcMetrics;
26use mz_sql_server_util::cdc::{CdcEvent, Lsn, Operation as CdcOperation};
27use mz_sql_server_util::desc::SqlServerRowDecoder;
28use mz_sql_server_util::inspect::{
29 ensure_database_cdc_enabled, ensure_sql_server_agent_running, get_latest_restore_history_id,
30};
31use mz_storage_types::dyncfgs::SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY;
32use mz_storage_types::errors::{DataflowError, DecodeError, DecodeErrorKind};
33use mz_storage_types::sources::SqlServerSourceConnection;
34use mz_storage_types::sources::sql_server::{
35 CDC_POLL_INTERVAL, MAX_LSN_WAIT, SNAPSHOT_PROGRESS_REPORT_INTERVAL,
36};
37use mz_timely_util::builder_async::{
38 AsyncOutputHandle, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
39};
40use mz_timely_util::containers::stack::AccountedStackBuilder;
41use timely::container::CapacityContainerBuilder;
42use timely::dataflow::operators::{CapabilitySet, Concat, Map};
43use timely::dataflow::{Scope, Stream as TimelyStream};
44use timely::progress::{Antichain, Timestamp};
45
46use crate::metrics::source::sql_server::SqlServerSourceMetrics;
47use crate::source::RawSourceCreationConfig;
48use crate::source::sql_server::{
49 DefiniteError, ReplicationError, SourceOutputInfo, TransientError,
50};
51use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
52
53static REPL_READER: &str = "reader";
59
60pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
61 scope: G,
62 config: RawSourceCreationConfig,
63 outputs: BTreeMap<GlobalId, SourceOutputInfo>,
64 source: SqlServerSourceConnection,
65 metrics: SqlServerSourceMetrics,
66) -> (
67 StackedCollection<G, (u64, Result<SourceMessage, DataflowError>)>,
68 TimelyStream<G, Infallible>,
69 TimelyStream<G, ReplicationError>,
70 PressOnDropButton,
71) {
72 let op_name = format!("SqlServerReplicationReader({})", config.id);
73 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
74
75 let (data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
76 let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
77
78 let (definite_error_handle, definite_errors) =
80 builder.new_output::<CapacityContainerBuilder<_>>();
81
82 let (button, transient_errors) = builder.build_fallible(move |caps| {
83 let busy_signal = Arc::clone(&config.busy_signal);
84 Box::pin(SignaledFuture::new(busy_signal, async move {
85 let [
86 data_cap_set,
87 upper_cap_set,
88 definite_error_cap_set,
89 ]: &mut [_; 3] = caps.try_into().unwrap();
90
91 let connection_config = source
92 .connection
93 .resolve_config(
94 &config.config.connection_context.secrets_reader,
95 &config.config,
96 InTask::Yes,
97 )
98 .await?;
99 let mut client = mz_sql_server_util::Client::connect(connection_config).await?;
100
101 let worker_id = config.worker_id;
102
103 let mut decoder_map: BTreeMap<_, _> = BTreeMap::new();
105 let mut capture_instance_to_snapshot: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
107 let mut capture_instances: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
109 let mut export_statistics: BTreeMap<_, Vec<_>> = BTreeMap::new();
111
112 for (export_id, output) in outputs.iter() {
113 if decoder_map.insert(output.partition_index, Arc::clone(&output.decoder)).is_some() {
114 panic!("Multiple decoders for output index {}", output.partition_index);
115 }
116 capture_instances
117 .entry(Arc::clone(&output.capture_instance))
118 .or_default()
119 .push(output.partition_index);
120
121 if *output.resume_upper == [Lsn::minimum()] {
122 capture_instance_to_snapshot
123 .entry(Arc::clone(&output.capture_instance))
124 .or_default()
125 .push((output.partition_index, output.initial_lsn));
126 }
127 export_statistics.entry(Arc::clone(&output.capture_instance))
128 .or_default()
129 .push(
130 config
131 .statistics
132 .get(export_id)
133 .expect("statistics have been intialized")
134 .clone(),
135 );
136 }
137
138 metrics.snapshot_table_count.set(u64::cast_from(capture_instance_to_snapshot.len()));
143 if !capture_instance_to_snapshot.is_empty() {
144 for stats in config.statistics.values() {
145 stats.set_snapshot_records_known(0);
146 stats.set_snapshot_records_staged(0);
147 }
148 }
149 if !config.responsible_for(REPL_READER) {
152 return Ok::<_, TransientError>(());
153 }
154
155 let snapshot_instances = capture_instance_to_snapshot
156 .keys()
157 .map(|i| i.as_ref());
158
159 let snapshot_tables = mz_sql_server_util::inspect::get_tables_for_capture_instance(&mut client, snapshot_instances).await?;
161
162 let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
164 if current_restore_history_id != source.extras.restore_history_id {
165 if SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY.get(config.config.config_set()) {
166 let definite_error = DefiniteError::RestoreHistoryChanged(
167 source.extras.restore_history_id.clone(),
168 current_restore_history_id.clone()
169 );
170 tracing::warn!(?definite_error, "Restore detected, exiting");
171
172 return_definite_error(
173 definite_error,
174 capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
175 data_output,
176 data_cap_set,
177 definite_error_handle,
178 definite_error_cap_set,
179 ).await;
180 return Ok(());
181 } else {
182 tracing::warn!(
183 "Restore history mismatch ignored: expected={expected:?} actual={actual:?}",
184 expected=source.extras.restore_history_id,
185 actual=current_restore_history_id
186 );
187 }
188 }
189
190 ensure_database_cdc_enabled(&mut client).await?;
193 ensure_sql_server_agent_running(&mut client).await?;
194
195 for table in &snapshot_tables {
199 let qualified_table_name = format!("{schema_name}.{table_name}",
200 schema_name = &table.schema_name,
201 table_name = &table.name);
202 let size_calc_start = Instant::now();
203 let table_total = mz_sql_server_util::inspect::snapshot_size(&mut client, &table.schema_name, &table.name).await?;
204 metrics.set_snapshot_table_size_latency(
205 &qualified_table_name,
206 size_calc_start.elapsed().as_secs_f64()
207 );
208 for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
209 export_stat.set_snapshot_records_known(u64::cast_from(table_total));
210 export_stat.set_snapshot_records_staged(0);
211 }
212 }
213 let cdc_metrics = PrometheusSqlServerCdcMetrics{inner: &metrics};
214 let mut cdc_handle = client
215 .cdc(capture_instances.keys().cloned(), cdc_metrics)
216 .max_lsn_wait(MAX_LSN_WAIT.get(config.config.config_set()));
217
218 let snapshot_lsns: BTreeMap<Arc<str>, Lsn> = {
221 cdc_handle.wait_for_ready().await?;
224
225 tracing::info!(%config.worker_id, "timely-{worker_id} upstream is ready");
229
230 let report_interval =
231 SNAPSHOT_PROGRESS_REPORT_INTERVAL.handle(config.config.config_set());
232 let mut last_report = Instant::now();
233 let mut snapshot_lsns = BTreeMap::new();
234 let arena = RowArena::default();
235
236 for table in snapshot_tables {
237 let (snapshot_lsn, snapshot) = cdc_handle
239 .snapshot(&table, config.worker_id, config.id)
240 .await?;
241
242 tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot start");
243
244 let mut snapshot = std::pin::pin!(snapshot);
245
246 snapshot_lsns.insert(Arc::clone(&table.capture_instance.name), snapshot_lsn);
247
248 let partition_indexes = capture_instance_to_snapshot.get(&table.capture_instance.name)
249 .unwrap_or_else(|| {
250 panic!("no snapshot outputs in known capture instances [{}] for capture instance: '{}'", capture_instance_to_snapshot.keys().join(","), table.capture_instance.name);
251 });
252
253 let mut snapshot_staged = 0;
254 while let Some(result) = snapshot.next().await {
255 let sql_server_row = result.map_err(TransientError::from)?;
256
257 if last_report.elapsed() > report_interval.get() {
258 last_report = Instant::now();
259 for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
260 export_stat.set_snapshot_records_staged(snapshot_staged);
261 }
262 }
263
264 for (partition_idx, _) in partition_indexes {
265 let mut mz_row = Row::default();
267
268 let decoder = decoder_map.get(partition_idx).expect("decoder for output");
269 let message = decode(decoder, &sql_server_row, &mut mz_row, &arena, None);
271 data_output
272 .give_fueled(
273 &data_cap_set[0],
274 ((*partition_idx, message), Lsn::minimum(), Diff::ONE),
275 )
276 .await;
277 }
278 snapshot_staged += 1;
279 }
280
281 tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot complete");
282 metrics.snapshot_table_count.dec();
283 for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
285 export_stat.set_snapshot_records_staged(snapshot_staged);
286 export_stat.set_snapshot_records_known(snapshot_staged);
287 }
288 }
289
290 snapshot_lsns
291 };
292
293 let mut rewinds: BTreeMap<_, _> = capture_instance_to_snapshot
308 .iter()
309 .flat_map(|(capture_instance, export_ids)|{
310 let snapshot_lsn = snapshot_lsns.get(capture_instance).expect("snapshot lsn must be collected for capture instance");
311 export_ids
312 .iter()
313 .map(|(idx, initial_lsn)| (*idx, (*initial_lsn, *snapshot_lsn)))
314 }).collect();
315
316 for (initial_lsn, snapshot_lsn) in rewinds.values() {
322 assert!(
323 initial_lsn <= snapshot_lsn,
324 "initial_lsn={initial_lsn} snapshot_lsn={snapshot_lsn}"
325 );
326 }
327
328 tracing::debug!("rewinds to process: {rewinds:?}");
329
330 capture_instance_to_snapshot.clear();
331
332 let mut resume_lsns = BTreeMap::new();
334 for src_info in outputs.values() {
335 let resume_lsn = match src_info.resume_upper.as_option() {
336 Some(lsn) if *lsn != Lsn::minimum() => *lsn,
337 Some(_) => src_info.initial_lsn.increment(),
341 None => panic!("resume_upper has at least one value"),
342 };
343 resume_lsns.entry(Arc::clone(&src_info.capture_instance))
344 .and_modify(|existing| *existing = std::cmp::min(*existing, resume_lsn))
345 .or_insert(resume_lsn);
346 }
347
348 tracing::info!(%config.id, ?resume_lsns, "timely-{} replication starting", config.worker_id);
349 for instance in capture_instances.keys() {
350 let resume_lsn = resume_lsns
351 .get(instance)
352 .expect("resume_lsn exists for capture instance");
353 cdc_handle = cdc_handle.start_lsn(instance, *resume_lsn);
354 }
355
356 let cdc_stream = cdc_handle
358 .poll_interval(CDC_POLL_INTERVAL.get(config.config.config_set()))
359 .into_stream();
360 let mut cdc_stream = std::pin::pin!(cdc_stream);
361
362 let mut errored_instances = BTreeSet::new();
363
364 let mut log_rewinds_complete = true;
368
369 let mut deferred_updates = BTreeMap::new();
384
385 while let Some(event) = cdc_stream.next().await {
386 let event = event.map_err(TransientError::from)?;
387 tracing::trace!(?config.id, ?event, "got replication event");
388
389 tracing::trace!("deferred_updates = {deferred_updates:?}");
390 match event {
391 CdcEvent::Progress { next_lsn } => {
394 tracing::debug!(?config.id, ?next_lsn, "got a closed lsn");
395 rewinds.retain(|_, (_, snapshot_lsn)| next_lsn <= *snapshot_lsn);
398 if rewinds.is_empty() {
399 if log_rewinds_complete {
400 tracing::debug!("rewinds complete");
401 log_rewinds_complete = false;
402 }
403 data_cap_set.downgrade(Antichain::from_elem(next_lsn));
404 } else {
405 tracing::debug!("rewinds remaining: {:?}", rewinds);
406 }
407
408 if let Some(((deferred_lsn, _seqval), _row)) = deferred_updates.first_key_value()
411 && *deferred_lsn < next_lsn
412 {
413 panic!(
414 "deferred update lsn {deferred_lsn} < progress lsn {next_lsn}: {:?}",
415 deferred_updates.keys()
416 );
417 }
418
419 upper_cap_set.downgrade(Antichain::from_elem(next_lsn));
420 }
421 CdcEvent::Data {
423 capture_instance,
424 lsn,
425 changes,
426 } => {
427 if errored_instances.contains(&capture_instance) {
428 metrics.ignored.inc_by(u64::cast_from(changes.len()));
431 }
432
433 let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
434 let definite_error = DefiniteError::ProgrammingError(format!(
435 "capture instance didn't exist: '{capture_instance}'"
436 ));
437 return_definite_error(
438 definite_error,
439 capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
440 data_output,
441 data_cap_set,
442 definite_error_handle,
443 definite_error_cap_set,
444 )
445 .await;
446 return Ok(());
447 };
448
449
450 handle_data_event(
451 changes,
452 partition_indexes,
453 &decoder_map,
454 lsn,
455 &rewinds,
456 &data_output,
457 data_cap_set,
458 &metrics,
459 &mut deferred_updates,
460 ).await?
461 },
462 CdcEvent::SchemaUpdate { capture_instance, table, ddl_event } => {
463 if !errored_instances.contains(&capture_instance)
464 && !ddl_event.is_compatible() {
465 let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
466 let definite_error = DefiniteError::ProgrammingError(format!(
467 "capture instance didn't exist: '{capture_instance}'"
468 ));
469 return_definite_error(
470 definite_error,
471 capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
472 data_output,
473 data_cap_set,
474 definite_error_handle,
475 definite_error_cap_set,
476 )
477 .await;
478 return Ok(());
479 };
480 let error = DefiniteError::IncompatibleSchemaChange(
481 capture_instance.to_string(),
482 table.to_string()
483 );
484 for partition_idx in partition_indexes {
485 data_output
486 .give_fueled(
487 &data_cap_set[0],
488 ((*partition_idx, Err(error.clone().into())), ddl_event.lsn, Diff::ONE),
489 )
490 .await;
491 }
492 errored_instances.insert(capture_instance);
493 }
494 }
495 };
496 }
497 Err(TransientError::ReplicationEOF)
498 }))
499 });
500
501 let error_stream = definite_errors.concat(&transient_errors.map(ReplicationError::Transient));
502
503 (
504 data_stream.as_collection(),
505 upper_stream,
506 error_stream,
507 button.press_on_drop(),
508 )
509}
510
511async fn handle_data_event(
512 changes: Vec<CdcOperation>,
513 partition_indexes: &[u64],
514 decoder_map: &BTreeMap<u64, Arc<SqlServerRowDecoder>>,
515 commit_lsn: Lsn,
516 rewinds: &BTreeMap<u64, (Lsn, Lsn)>,
517 data_output: &StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
518 data_cap_set: &CapabilitySet<Lsn>,
519 metrics: &SqlServerSourceMetrics,
520 deferred_updates: &mut BTreeMap<(Lsn, Lsn), CdcOperation>,
521) -> Result<(), TransientError> {
522 let mut mz_row = Row::default();
523 let arena = RowArena::default();
524
525 for change in changes {
526 let mut deferred_update: Option<_> = None;
530 let (sql_server_row, diff): (_, _) = match change {
531 CdcOperation::Insert(sql_server_row) => {
532 metrics.inserts.inc();
533 (sql_server_row, Diff::ONE)
534 }
535 CdcOperation::Delete(sql_server_row) => {
536 metrics.deletes.inc();
537 (sql_server_row, Diff::MINUS_ONE)
538 }
539
540 CdcOperation::UpdateNew(seqval, sql_server_row) => {
543 metrics.updates.inc();
545 deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
546 if deferred_update.is_none() {
547 tracing::trace!("capture deferred UpdateNew ({commit_lsn}, {seqval})");
548 deferred_updates.insert(
549 (commit_lsn, seqval),
550 CdcOperation::UpdateNew(seqval, sql_server_row),
551 );
552 continue;
553 }
554 (sql_server_row, Diff::ZERO)
556 }
557 CdcOperation::UpdateOld(seqval, sql_server_row) => {
558 deferred_update = deferred_updates.remove(&(commit_lsn, seqval));
559 if deferred_update.is_none() {
560 tracing::trace!("capture deferred UpdateOld ({commit_lsn}, {seqval})");
561 deferred_updates.insert(
562 (commit_lsn, seqval),
563 CdcOperation::UpdateOld(seqval, sql_server_row),
564 );
565 continue;
566 }
567 (sql_server_row, Diff::ZERO)
569 }
570 };
571
572 for partition_idx in partition_indexes {
574 let decoder = decoder_map.get(partition_idx).unwrap();
575
576 let rewind = rewinds.get(partition_idx);
577 if rewind.is_some_and(|(initial_lsn, _)| commit_lsn <= *initial_lsn) {
580 continue;
581 }
582
583 let (message, diff) = if let Some(ref deferred_update) = deferred_update {
584 let (old_row, new_row) = match deferred_update {
585 CdcOperation::UpdateOld(_seqval, row) => (row, &sql_server_row),
586 CdcOperation::UpdateNew(_seqval, row) => (&sql_server_row, row),
587 CdcOperation::Insert(_) | CdcOperation::Delete(_) => unreachable!(),
588 };
589
590 let update_old = decode(decoder, old_row, &mut mz_row, &arena, Some(new_row));
591 if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
592 data_output
593 .give_fueled(
594 &data_cap_set[0],
595 (
596 (*partition_idx, update_old.clone()),
597 Lsn::minimum(),
598 Diff::ONE,
599 ),
600 )
601 .await;
602 }
603 data_output
604 .give_fueled(
605 &data_cap_set[0],
606 ((*partition_idx, update_old), commit_lsn, Diff::MINUS_ONE),
607 )
608 .await;
609
610 (
611 decode(decoder, new_row, &mut mz_row, &arena, None),
612 Diff::ONE,
613 )
614 } else {
615 (
616 decode(decoder, &sql_server_row, &mut mz_row, &arena, None),
617 diff,
618 )
619 };
620 assert_ne!(Diff::ZERO, diff);
621 if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
622 data_output
623 .give_fueled(
624 &data_cap_set[0],
625 ((*partition_idx, message.clone()), Lsn::minimum(), -diff),
626 )
627 .await;
628 }
629 data_output
630 .give_fueled(
631 &data_cap_set[0],
632 ((*partition_idx, message), commit_lsn, diff),
633 )
634 .await;
635 }
636 }
637 Ok(())
638}
639
640type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
641 T,
642 AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
643>;
644
645fn decode(
648 decoder: &SqlServerRowDecoder,
649 row: &tiberius::Row,
650 mz_row: &mut Row,
651 arena: &RowArena,
652 new_row: Option<&tiberius::Row>,
653) -> Result<SourceMessage, DataflowError> {
654 match decoder.decode(row, mz_row, arena, new_row) {
655 Ok(()) => Ok(SourceMessage {
656 key: Row::default(),
657 value: mz_row.clone(),
658 metadata: Row::default(),
659 }),
660 Err(e) => {
661 let kind = DecodeErrorKind::Text(e.to_string().into());
662 let raw = format!("{row:?}");
664 Err(DataflowError::DecodeError(Box::new(DecodeError {
665 kind,
666 raw: raw.as_bytes().to_vec(),
667 })))
668 }
669 }
670}
671
672async fn return_definite_error(
674 err: DefiniteError,
675 outputs: impl Iterator<Item = u64>,
676 data_handle: StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
677 data_capset: &CapabilitySet<Lsn>,
678 errs_handle: AsyncOutputHandle<Lsn, CapacityContainerBuilder<Vec<ReplicationError>>>,
679 errs_capset: &CapabilitySet<Lsn>,
680) {
681 for output_idx in outputs {
682 let update = (
683 (output_idx, Err(err.clone().into())),
684 Lsn {
688 vlf_id: u32::MAX,
689 block_id: u32::MAX,
690 record_id: u16::MAX,
691 },
692 Diff::ONE,
693 );
694 data_handle.give_fueled(&data_capset[0], update).await;
695 }
696 errs_handle.give(
697 &errs_capset[0],
698 ReplicationError::DefiniteError(Rc::new(err)),
699 );
700}
701
702struct PrometheusSqlServerCdcMetrics<'a> {
704 inner: &'a SqlServerSourceMetrics,
705}
706
707impl<'a> SqlServerCdcMetrics for PrometheusSqlServerCdcMetrics<'a> {
708 fn snapshot_table_lock_start(&self, table_name: &str) {
709 self.inner.update_snapshot_table_lock_count(table_name, 1);
710 }
711
712 fn snapshot_table_lock_end(&self, table_name: &str) {
713 self.inner.update_snapshot_table_lock_count(table_name, -1);
714 }
715}