mz_storage/source/sql_server/progress.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A "non-critical" operator that tracks the progress of a [`SqlServerSourceConnection`].
11//!
12//! The operator does the following:
13//!
14//! * At some cadence [`OFFSET_KNOWN_INTERVAL`] will probe the source for the max
15//! [`Lsn`], emit the upstream known offset, and update `SourceStatistics`.
16//! * Listen to a provided [`futures::Stream`] of resume uppers, which represents
17//! the durably committed upper for _all_ of the subsources/exports associated
18//! with this source. As the source makes progress this operator does two
19//! things:
20//! 1. If [`CDC_CLEANUP_CHANGE_TABLE`] is enabled, will delete entries from
21//! the upstream change table that we've already ingested.
22//! 2. Update `SourceStatistics` to notify listeners of a new
23//! "committed LSN".
24//!
25//! [`SqlServerSourceConnection`]: mz_storage_types::sources::SqlServerSourceConnection
26
27use std::collections::{BTreeMap, BTreeSet};
28
29use futures::StreamExt;
30use mz_ore::future::InTask;
31use mz_repr::GlobalId;
32use mz_sql_server_util::cdc::Lsn;
33use mz_sql_server_util::inspect::get_latest_restore_history_id;
34use mz_storage_types::connections::SqlServerConnectionDetails;
35use mz_storage_types::sources::SqlServerSourceExtras;
36use mz_storage_types::sources::sql_server::{
37 CDC_CLEANUP_CHANGE_TABLE, CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES, OFFSET_KNOWN_INTERVAL,
38};
39use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
40use timely::dataflow::operators::Map;
41use timely::dataflow::{Scope, Stream as TimelyStream};
42use timely::progress::Antichain;
43
44use crate::source::sql_server::{ReplicationError, SourceOutputInfo, TransientError};
45use crate::source::types::Probe;
46use crate::source::{RawSourceCreationConfig, probe};
47
48/// Used as a partition ID to determine the worker that is responsible for
49/// handling progress.
50static PROGRESS_WORKER: &str = "progress";
51
52pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
53 scope: G,
54 config: RawSourceCreationConfig,
55 connection: SqlServerConnectionDetails,
56 outputs: BTreeMap<GlobalId, SourceOutputInfo>,
57 committed_uppers: impl futures::Stream<Item = Antichain<Lsn>> + 'static,
58 extras: SqlServerSourceExtras,
59) -> (
60 TimelyStream<G, ReplicationError>,
61 TimelyStream<G, Probe<Lsn>>,
62 PressOnDropButton,
63) {
64 let op_name = format!("SqlServerProgress({})", config.id);
65 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
66
67 let (probe_output, probe_stream) = builder.new_output();
68
69 let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
70 Box::pin(async move {
71 let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
72
73 let emit_probe = |cap, probe: Probe<Lsn>| {
74 probe_output.give(cap, probe);
75 };
76
77 // Only a single worker is responsible for processing progress.
78 if !config.responsible_for(PROGRESS_WORKER) {
79 // Emit 0 to mark this worker as having started up correctly.
80 for stat in config.statistics.values() {
81 stat.set_offset_known(0);
82 stat.set_offset_committed(0);
83 }
84 }
85 let conn_config = connection
86 .resolve_config(
87 &config.config.connection_context.secrets_reader,
88 &config.config,
89 InTask::Yes,
90 )
91 .await?;
92 let mut client = mz_sql_server_util::Client::connect(conn_config).await?;
93
94
95 // Terminate the progress probes if a restore has happened. Replication operator will
96 // emit a definite error at the max LSN, but we also have to terminate the RLU probes
97 // to ensure that the error propogates to downstream consumers, otherwise it will
98 // wait in reclock as the server LSN will always be less than the LSN of the definite
99 // error.
100 let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
101 if current_restore_history_id != extras.restore_history_id {
102 tracing::error!("Restore happened, exiting");
103 return Ok(());
104 }
105
106
107 let probe_interval = OFFSET_KNOWN_INTERVAL.handle(config.config.config_set());
108 let mut probe_ticker = probe::Ticker::new(|| probe_interval.get(), config.now_fn);
109
110 // Offset that is measured from the upstream SQL Server instance. Tracked to detect an offset that moves backwards.
111 let mut prev_offset_known: Option<Lsn> = None;
112
113 // This stream of "resume uppers" tracks all of the Lsn's that we have durably
114 // committed for all subsources/exports and thus we can notify the upstream that the
115 // change tables can be cleaned up.
116 let mut committed_uppers = std::pin::pin!(committed_uppers);
117 let cleanup_change_table = CDC_CLEANUP_CHANGE_TABLE.handle(config.config.config_set());
118 let cleanup_max_deletes = CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES.handle(config.config.config_set());
119 let capture_instances: BTreeSet<_> = outputs.into_values().map(|info| info.capture_instance).collect();
120
121 loop {
122 tokio::select! {
123 probe_ts = probe_ticker.tick() => {
124 let max_lsn: Lsn = mz_sql_server_util::inspect::get_max_lsn(&mut client).await?;
125 // We have to return max_lsn + 1 in the probe so that the downstream consumers of
126 // the probe view the actual max lsn as fully committed and all data at that LSN
127 // as no longer subject to change. If we don't increment the LSN before emitting
128 // the probe then data will not be queryable in the tables produced by the Source.
129 let known_lsn = max_lsn.increment();
130 for stat in config.statistics.values() {
131 stat.set_offset_known(known_lsn.abbreviate());
132 }
133
134
135 // The DB should never go backwards, but it's good to know if it does.
136 let prev_known_lsn = match prev_offset_known {
137 None => {
138 prev_offset_known = Some(known_lsn);
139 known_lsn
140 },
141 Some(prev) => prev,
142 };
143 if known_lsn < prev_known_lsn {
144 mz_ore::soft_panic_or_log!(
145 "upstream SQL Server went backwards in time, current LSN: {known_lsn}, last known {prev_known_lsn}",
146 );
147 continue;
148 }
149 let probe = Probe { probe_ts, upstream_frontier: Antichain::from_elem(known_lsn) };
150 emit_probe(&probe_cap[0], probe);
151 prev_offset_known = Some(known_lsn);
152 },
153 Some(committed_upper) = committed_uppers.next() => {
154 let Some(committed_upper) = committed_upper.as_option() else {
155 // It's possible that the source has been dropped, in which case this can
156 // observe an empty upper. This operator should continue to loop until
157 // the drop dataflow propagates.
158 continue;
159 };
160
161 // If enabled, tell the upstream SQL Server instance to
162 // cleanup the underlying change table.
163 if cleanup_change_table.get() {
164 for instance in &capture_instances {
165 // TODO(sql_server3): The number of rows that got cleaned
166 // up should be present in informational notices sent back
167 // from the upstream, but the tiberius crate does not
168 // expose these.
169 let cleanup_result = mz_sql_server_util::inspect::cleanup_change_table(
170 &mut client,
171 instance,
172 committed_upper,
173 cleanup_max_deletes.get(),
174 ).await;
175 // TODO(sql_server2): Track this in a more user observable way.
176 if let Err(err) = cleanup_result {
177 tracing::warn!(?err, %instance, "cleanup of change table failed!");
178 }
179 }
180 }
181 for stat in config.statistics.values() {
182 stat.set_offset_committed(committed_upper.abbreviate());
183 }
184 }
185 };
186 }
187 })
188 });
189
190 let error_stream = transient_errors.map(ReplicationError::Transient);
191
192 (error_stream, probe_stream, button.press_on_drop())
193}