1use std::collections::{BTreeMap, BTreeSet};
13use std::convert::Infallible;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::AsCollection;
19use differential_dataflow::containers::TimelyStack;
20use futures::StreamExt;
21use itertools::Itertools;
22use mz_ore::cast::CastFrom;
23use mz_ore::future::InTask;
24use mz_repr::{Diff, GlobalId, Row, RowArena};
25use mz_sql_server_util::SqlServerCdcMetrics;
26use mz_sql_server_util::cdc::{CdcEvent, Lsn, Operation as CdcOperation};
27use mz_sql_server_util::desc::SqlServerRowDecoder;
28use mz_sql_server_util::inspect::get_latest_restore_history_id;
29use mz_storage_types::errors::{DataflowError, DecodeError, DecodeErrorKind};
30use mz_storage_types::sources::SqlServerSourceConnection;
31use mz_storage_types::sources::sql_server::{
32 CDC_POLL_INTERVAL, MAX_LSN_WAIT, SNAPSHOT_PROGRESS_REPORT_INTERVAL,
33};
34use mz_timely_util::builder_async::{
35 AsyncOutputHandle, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use mz_timely_util::containers::stack::AccountedStackBuilder;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::operators::{CapabilitySet, Concat, Map};
40use timely::dataflow::{Scope, Stream as TimelyStream};
41use timely::progress::{Antichain, Timestamp};
42
43use crate::metrics::source::sql_server::SqlServerSourceMetrics;
44use crate::source::RawSourceCreationConfig;
45use crate::source::sql_server::{
46 DefiniteError, ReplicationError, SourceOutputInfo, TransientError,
47};
48use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
49
50static REPL_READER: &str = "reader";
56
57pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
58 scope: G,
59 config: RawSourceCreationConfig,
60 outputs: BTreeMap<GlobalId, SourceOutputInfo>,
61 source: SqlServerSourceConnection,
62 metrics: SqlServerSourceMetrics,
63) -> (
64 StackedCollection<G, (u64, Result<SourceMessage, DataflowError>)>,
65 TimelyStream<G, Infallible>,
66 TimelyStream<G, ReplicationError>,
67 PressOnDropButton,
68) {
69 let op_name = format!("SqlServerReplicationReader({})", config.id);
70 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
71
72 let (data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
73 let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
74
75 let (definite_error_handle, definite_errors) =
77 builder.new_output::<CapacityContainerBuilder<_>>();
78
79 let (button, transient_errors) = builder.build_fallible(move |caps| {
80 let busy_signal = Arc::clone(&config.busy_signal);
81 Box::pin(SignaledFuture::new(busy_signal, async move {
82 let [
83 data_cap_set,
84 upper_cap_set,
85 definite_error_cap_set,
86 ]: &mut [_; 3] = caps.try_into().unwrap();
87
88 let connection_config = source
89 .connection
90 .resolve_config(
91 &config.config.connection_context.secrets_reader,
92 &config.config,
93 InTask::Yes,
94 )
95 .await?;
96 let mut client = mz_sql_server_util::Client::connect(connection_config).await?;
97
98 let worker_id = config.worker_id;
99
100 let mut decoder_map: BTreeMap<_, _> = BTreeMap::new();
102 let mut capture_instance_to_snapshot: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
104 let mut capture_instances: BTreeMap<Arc<str>, Vec<_>> = BTreeMap::new();
106 let mut export_statistics: BTreeMap<_, Vec<_>> = BTreeMap::new();
108
109 for (export_id, output) in outputs.iter() {
110 if decoder_map.insert(output.partition_index, Arc::clone(&output.decoder)).is_some() {
111 panic!("Multiple decoders for output index {}", output.partition_index);
112 }
113 capture_instances
114 .entry(Arc::clone(&output.capture_instance))
115 .or_default()
116 .push(output.partition_index);
117
118 if *output.resume_upper == [Lsn::minimum()] {
119 capture_instance_to_snapshot
120 .entry(Arc::clone(&output.capture_instance))
121 .or_default()
122 .push((output.partition_index, output.initial_lsn));
123 }
124 export_statistics.entry(Arc::clone(&output.capture_instance))
125 .or_default()
126 .push(
127 config
128 .statistics
129 .get(export_id)
130 .expect("statistics have been intialized")
131 .clone(),
132 );
133 }
134
135 metrics.snapshot_table_count.set(u64::cast_from(capture_instance_to_snapshot.len()));
140 if !capture_instance_to_snapshot.is_empty() {
141 for stats in config.statistics.values() {
142 stats.set_snapshot_records_known(0);
143 stats.set_snapshot_records_staged(0);
144 }
145 }
146 if !config.responsible_for(REPL_READER) {
149 return Ok::<_, TransientError>(());
150 }
151
152 let snapshot_instances = capture_instance_to_snapshot
153 .keys()
154 .map(|i| i.as_ref());
155
156 let snapshot_tables = mz_sql_server_util::inspect::get_tables_for_capture_instance(&mut client, snapshot_instances).await?;
158
159 let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
161 if current_restore_history_id != source.extras.restore_history_id {
162 let definite_error = DefiniteError::RestoreHistoryChanged(
163 source.extras.restore_history_id.clone(),
164 current_restore_history_id.clone()
165 );
166 tracing::warn!(?definite_error, "Restore detected, exiting");
167
168 return_definite_error(
169 definite_error,
170 capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
171 data_output,
172 data_cap_set,
173 definite_error_handle,
174 definite_error_cap_set,
175 ).await;
176 return Ok(());
177 }
178
179 for table in &snapshot_tables {
183 let qualified_table_name = format!("{schema_name}.{table_name}",
184 schema_name = &table.schema_name,
185 table_name = &table.name);
186 let size_calc_start = Instant::now();
187 let table_total = mz_sql_server_util::inspect::snapshot_size(&mut client, &table.schema_name, &table.name).await?;
188 metrics.set_snapshot_table_size_latency(
189 &qualified_table_name,
190 size_calc_start.elapsed().as_secs_f64()
191 );
192 for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
193 export_stat.set_snapshot_records_known(u64::cast_from(table_total));
194 export_stat.set_snapshot_records_staged(0);
195 }
196 }
197 let cdc_metrics = PrometheusSqlServerCdcMetrics{inner: &metrics};
198 let mut cdc_handle = client
199 .cdc(capture_instances.keys().cloned(), cdc_metrics)
200 .max_lsn_wait(MAX_LSN_WAIT.get(config.config.config_set()));
201
202 let snapshot_lsns: BTreeMap<Arc<str>, Lsn> = {
205 cdc_handle.wait_for_ready().await?;
208
209 tracing::info!(%config.worker_id, "timely-{worker_id} upstream is ready");
213
214 let report_interval =
215 SNAPSHOT_PROGRESS_REPORT_INTERVAL.handle(config.config.config_set());
216 let mut last_report = Instant::now();
217 let mut snapshot_lsns = BTreeMap::new();
218 let arena = RowArena::default();
219
220 for table in snapshot_tables {
221 let (snapshot_lsn, snapshot) = cdc_handle
223 .snapshot(&table, config.worker_id, config.id)
224 .await?;
225
226 tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot start");
227
228 let mut snapshot = std::pin::pin!(snapshot);
229
230 snapshot_lsns.insert(Arc::clone(&table.capture_instance.name), snapshot_lsn);
231
232 let partition_indexes = capture_instance_to_snapshot.get(&table.capture_instance.name)
233 .unwrap_or_else(|| {
234 panic!("no snapshot outputs in known capture instances [{}] for capture instance: '{}'", capture_instance_to_snapshot.keys().join(","), table.capture_instance.name);
235 });
236
237 let mut snapshot_staged = 0;
238 while let Some(result) = snapshot.next().await {
239 let sql_server_row = result.map_err(TransientError::from)?;
240
241 if last_report.elapsed() > report_interval.get() {
242 last_report = Instant::now();
243 for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
244 export_stat.set_snapshot_records_staged(snapshot_staged);
245 }
246 }
247
248 for (partition_idx, _) in partition_indexes {
249 let mut mz_row = Row::default();
251
252 let decoder = decoder_map.get(partition_idx).expect("decoder for output");
253 let message = match decoder.decode(&sql_server_row, &mut mz_row, &arena) {
255 Ok(()) => Ok(SourceMessage {
256 key: Row::default(),
257 value: mz_row,
258 metadata: Row::default(),
259 }),
260 Err(e) => {
261 let kind = DecodeErrorKind::Text(e.to_string().into());
262 let raw = format!("{sql_server_row:?}");
264 Err(DataflowError::DecodeError(Box::new(DecodeError {
265 kind,
266 raw: raw.as_bytes().to_vec(),
267 })))
268 }
269 };
270 data_output
271 .give_fueled(
272 &data_cap_set[0],
273 ((*partition_idx, message), Lsn::minimum(), Diff::ONE),
274 )
275 .await;
276 }
277 snapshot_staged += 1;
278 }
279
280 tracing::info!(%config.id, %table.name, %table.schema_name, %snapshot_lsn, "timely-{worker_id} snapshot complete");
281 metrics.snapshot_table_count.dec();
282 for export_stat in export_statistics.get(&table.capture_instance.name).unwrap() {
284 export_stat.set_snapshot_records_staged(snapshot_staged);
285 export_stat.set_snapshot_records_known(snapshot_staged);
286 }
287 }
288
289 snapshot_lsns
290 };
291
292 let mut rewinds: BTreeMap<_, _> = capture_instance_to_snapshot
307 .iter()
308 .flat_map(|(capture_instance, export_ids)|{
309 let snapshot_lsn = snapshot_lsns.get(capture_instance).expect("snapshot lsn must be collected for capture instance");
310 export_ids
311 .iter()
312 .map(|(idx, initial_lsn)| (*idx, (*initial_lsn, *snapshot_lsn)))
313 }).collect();
314
315 for (initial_lsn, snapshot_lsn) in rewinds.values() {
321 assert!(
322 initial_lsn <= snapshot_lsn,
323 "initial_lsn={initial_lsn} snapshot_lsn={snapshot_lsn}"
324 );
325 }
326
327 tracing::debug!("rewinds to process: {rewinds:?}");
328
329 capture_instance_to_snapshot.clear();
330
331 let mut resume_lsns = BTreeMap::new();
333 for src_info in outputs.values() {
334 let resume_lsn = match src_info.resume_upper.as_option() {
335 Some(lsn) if *lsn != Lsn::minimum() => *lsn,
336 Some(_) => src_info.initial_lsn.increment(),
340 None => panic!("resume_upper has at least one value"),
341 };
342 resume_lsns.entry(Arc::clone(&src_info.capture_instance))
343 .and_modify(|existing| *existing = std::cmp::min(*existing, resume_lsn))
344 .or_insert(resume_lsn);
345 }
346
347 tracing::info!(%config.id, ?resume_lsns, "timely-{} replication starting", config.worker_id);
348 for instance in capture_instances.keys() {
349 let resume_lsn = resume_lsns
350 .get(instance)
351 .expect("resume_lsn exists for capture instance");
352 cdc_handle = cdc_handle.start_lsn(instance, *resume_lsn);
353 }
354
355 let cdc_stream = cdc_handle
357 .poll_interval(CDC_POLL_INTERVAL.get(config.config.config_set()))
358 .into_stream();
359 let mut cdc_stream = std::pin::pin!(cdc_stream);
360
361 let mut errored_instances = BTreeSet::new();
362
363 let mut log_rewinds_complete = true;
367 while let Some(event) = cdc_stream.next().await {
368 let event = event.map_err(TransientError::from)?;
369 tracing::trace!(?config.id, ?event, "got replication event");
370
371 match event {
372 CdcEvent::Progress { next_lsn } => {
375 tracing::debug!(?config.id, ?next_lsn, "got a closed lsn");
376 rewinds.retain(|_, (_, snapshot_lsn)| next_lsn <= *snapshot_lsn);
379 if rewinds.is_empty() {
380 if log_rewinds_complete {
381 tracing::debug!("rewinds complete");
382 log_rewinds_complete = false;
383 }
384 data_cap_set.downgrade(Antichain::from_elem(next_lsn));
385 } else {
386 tracing::debug!("rewinds remaining: {:?}", rewinds);
387 }
388 upper_cap_set.downgrade(Antichain::from_elem(next_lsn));
389 }
390 CdcEvent::Data {
392 capture_instance,
393 lsn,
394 changes,
395 } => {
396 if errored_instances.contains(&capture_instance) {
397 metrics.ignored.inc_by(u64::cast_from(changes.len()));
400 }
401
402 let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
403 let definite_error = DefiniteError::ProgrammingError(format!(
404 "capture instance didn't exist: '{capture_instance}'"
405 ));
406 return_definite_error(
407 definite_error,
408 capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
409 data_output,
410 data_cap_set,
411 definite_error_handle,
412 definite_error_cap_set,
413 )
414 .await;
415 return Ok(());
416 };
417
418 handle_data_event(
419 changes,
420 partition_indexes,
421 &decoder_map,
422 lsn,
423 &rewinds,
424 &data_output,
425 data_cap_set,
426 &metrics
427 ).await?
428 },
429 CdcEvent::SchemaUpdate { capture_instance, table, ddl_event } => {
430 if !errored_instances.contains(&capture_instance)
431 && !ddl_event.is_compatible() {
432 let Some(partition_indexes) = capture_instances.get(&capture_instance) else {
433 let definite_error = DefiniteError::ProgrammingError(format!(
434 "capture instance didn't exist: '{capture_instance}'"
435 ));
436 return_definite_error(
437 definite_error,
438 capture_instances.values().flat_map(|indexes| indexes.iter().copied()),
439 data_output,
440 data_cap_set,
441 definite_error_handle,
442 definite_error_cap_set,
443 )
444 .await;
445 return Ok(());
446 };
447 let error = DefiniteError::IncompatibleSchemaChange(
448 capture_instance.to_string(),
449 table.to_string()
450 );
451 for partition_idx in partition_indexes {
452 data_output
453 .give_fueled(
454 &data_cap_set[0],
455 ((*partition_idx, Err(error.clone().into())), ddl_event.lsn, Diff::ONE),
456 )
457 .await;
458 }
459 errored_instances.insert(capture_instance);
460 }
461 }
462 };
463 }
464 Err(TransientError::ReplicationEOF)
465 }))
466 });
467
468 let error_stream = definite_errors.concat(&transient_errors.map(ReplicationError::Transient));
469
470 (
471 data_stream.as_collection(),
472 upper_stream,
473 error_stream,
474 button.press_on_drop(),
475 )
476}
477
478async fn handle_data_event(
479 changes: Vec<CdcOperation>,
480 partition_indexes: &[u64],
481 decoder_map: &BTreeMap<u64, Arc<SqlServerRowDecoder>>,
482 commit_lsn: Lsn,
483 rewinds: &BTreeMap<u64, (Lsn, Lsn)>,
484 data_output: &StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
485 data_cap_set: &CapabilitySet<Lsn>,
486 metrics: &SqlServerSourceMetrics,
487) -> Result<(), TransientError> {
488 for change in changes {
489 let (sql_server_row, diff): (_, _) = match change {
490 CdcOperation::Insert(sql_server_row) => {
491 metrics.inserts.inc();
492 (sql_server_row, Diff::ONE)
493 }
494 CdcOperation::UpdateNew(sql_server_row) => {
495 metrics.updates.inc();
496 (sql_server_row, Diff::ONE)
497 }
498 CdcOperation::Delete(sql_server_row) => {
499 metrics.deletes.inc();
500 (sql_server_row, Diff::MINUS_ONE)
501 }
502 CdcOperation::UpdateOld(sql_server_row) => (sql_server_row, Diff::MINUS_ONE),
503 };
504
505 let mut mz_row = Row::default();
507 let arena = RowArena::default();
508
509 for partition_idx in partition_indexes {
510 let decoder = decoder_map.get(partition_idx).unwrap();
511
512 let rewind = rewinds.get(partition_idx);
513 if rewind.is_some_and(|(initial_lsn, _)| commit_lsn <= *initial_lsn) {
516 continue;
517 }
518
519 let message = match decoder.decode(&sql_server_row, &mut mz_row, &arena) {
521 Ok(()) => Ok(SourceMessage {
522 key: Row::default(),
523 value: mz_row.clone(),
524 metadata: Row::default(),
525 }),
526 Err(e) => {
527 let kind = DecodeErrorKind::Text(e.to_string().into());
528 let raw = format!("{sql_server_row:?}");
530 Err(DataflowError::DecodeError(Box::new(DecodeError {
531 kind,
532 raw: raw.as_bytes().to_vec(),
533 })))
534 }
535 };
536
537 if rewind.is_some_and(|(_, snapshot_lsn)| commit_lsn <= *snapshot_lsn) {
538 data_output
539 .give_fueled(
540 &data_cap_set[0],
541 ((*partition_idx, message.clone()), Lsn::minimum(), -diff),
542 )
543 .await;
544 }
545 data_output
546 .give_fueled(
547 &data_cap_set[0],
548 ((*partition_idx, message), commit_lsn, diff),
549 )
550 .await;
551 }
552 }
553 Ok(())
554}
555
556type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
557 T,
558 AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
559>;
560
561async fn return_definite_error(
563 err: DefiniteError,
564 outputs: impl Iterator<Item = u64>,
565 data_handle: StackedAsyncOutputHandle<Lsn, (u64, Result<SourceMessage, DataflowError>)>,
566 data_capset: &CapabilitySet<Lsn>,
567 errs_handle: AsyncOutputHandle<Lsn, CapacityContainerBuilder<Vec<ReplicationError>>>,
568 errs_capset: &CapabilitySet<Lsn>,
569) {
570 for output_idx in outputs {
571 let update = (
572 (output_idx, Err(err.clone().into())),
573 Lsn {
577 vlf_id: u32::MAX,
578 block_id: u32::MAX,
579 record_id: u16::MAX,
580 },
581 Diff::ONE,
582 );
583 data_handle.give_fueled(&data_capset[0], update).await;
584 }
585 errs_handle.give(
586 &errs_capset[0],
587 ReplicationError::DefiniteError(Rc::new(err)),
588 );
589}
590
591struct PrometheusSqlServerCdcMetrics<'a> {
593 inner: &'a SqlServerSourceMetrics,
594}
595
596impl<'a> SqlServerCdcMetrics for PrometheusSqlServerCdcMetrics<'a> {
597 fn snapshot_table_lock_start(&self, table_name: &str) {
598 self.inner.update_snapshot_table_lock_count(table_name, 1);
599 }
600
601 fn snapshot_table_lock_end(&self, table_name: &str) {
602 self.inner.update_snapshot_table_lock_count(table_name, -1);
603 }
604}