mz_storage/source/sql_server/
progress.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A "non-critical" operator that tracks the progress of a [`SqlServerSource`].
11//!
12//! The operator does the following:
13//!
14//! * At some cadence [`OFFSET_KNOWN_INTERVAL`] will probe the source for the max
15//!   [`Lsn`] and emit a [`ProgressStatisticsUpdate`] to notify listeners of a
16//!   new "known LSN".
17//! * Listen to a provided [`futures::Stream`] of resume uppers, which represents
18//!   the durably committed upper for _all_ of the subsources/exports associated
19//!   with this source. As the source makes progress this operator does two
20//!   things:
21//!     1. If [`CDC_CLEANUP_CHANGE_TABLE`] is enabled, will delete entries from
22//!        the upstream change table that we've already ingested.
23//!     2. Emit a [`ProgressStatisticsUpdate`] to notify listeners of a new
24//!        "committed LSN".
25//!
26//! [`SqlServerSource`]: mz_storage_types::sources::SqlServerSource
27
28use std::collections::{BTreeMap, BTreeSet};
29
30use futures::StreamExt;
31use mz_ore::future::InTask;
32use mz_repr::GlobalId;
33use mz_sql_server_util::cdc::Lsn;
34use mz_sql_server_util::inspect::get_latest_restore_history_id;
35use mz_storage_types::connections::SqlServerConnectionDetails;
36use mz_storage_types::sources::SqlServerSourceExtras;
37use mz_storage_types::sources::sql_server::{
38    CDC_CLEANUP_CHANGE_TABLE, CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES, OFFSET_KNOWN_INTERVAL,
39};
40use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
41use timely::dataflow::operators::Map;
42use timely::dataflow::{Scope, Stream as TimelyStream};
43use timely::progress::Antichain;
44
45use crate::source::sql_server::{ReplicationError, SourceOutputInfo, TransientError};
46use crate::source::types::{Probe, ProgressStatisticsUpdate};
47use crate::source::{RawSourceCreationConfig, probe};
48
49/// Used as a partition ID to determine the worker that is responsible for
50/// handling progress.
51static PROGRESS_WORKER: &str = "progress";
52
53pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
54    scope: G,
55    config: RawSourceCreationConfig,
56    connection: SqlServerConnectionDetails,
57    outputs: BTreeMap<GlobalId, SourceOutputInfo>,
58    committed_uppers: impl futures::Stream<Item = Antichain<Lsn>> + 'static,
59    extras: SqlServerSourceExtras,
60) -> (
61    TimelyStream<G, ProgressStatisticsUpdate>,
62    TimelyStream<G, ReplicationError>,
63    TimelyStream<G, Probe<Lsn>>,
64    PressOnDropButton,
65) {
66    let op_name = format!("SqlServerProgress({})", config.id);
67    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
68
69    let (stats_output, stats_stream) = builder.new_output();
70    let (probe_output, probe_stream) = builder.new_output();
71
72    let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
73        Box::pin(async move {
74            let [stats_cap, probe_cap]: &mut [_; 2] = caps.try_into().unwrap();
75
76            // Small helper closure.
77            let emit_stats = |cap, known: u64, committed: u64| {
78                let update = ProgressStatisticsUpdate::SteadyState {
79                    offset_known: known,
80                    offset_committed: committed,
81                };
82                tracing::debug!(?config.id, %known, %committed, "steadystate progress");
83                stats_output.give(cap, update);
84            };
85
86            let emit_probe = |cap, probe: Probe<Lsn>| {
87                probe_output.give(cap, probe);
88            };
89
90            // Only a single worker is responsible for processing progress.
91            if !config.responsible_for(PROGRESS_WORKER) {
92                // Emit 0 to mark this worker as having started up correctly.
93                emit_stats(&stats_cap[0], 0, 0);
94            }
95
96            let conn_config = connection
97                .resolve_config(
98                    &config.config.connection_context.secrets_reader,
99                    &config.config,
100                    InTask::Yes,
101                )
102                .await?;
103            let mut client = mz_sql_server_util::Client::connect(conn_config).await?;
104
105
106            // Terminate the progress probes if a restore has happened. Replication operator will
107            // emit a definite error at the max LSN, but we also have to terminate the RLU probes
108            // to ensure that the error propogates to downstream consumers, otherwise it will
109            // wait in reclock as the server LSN will always be less than the LSN of the definite
110            // error.
111            let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
112            if current_restore_history_id != extras.restore_history_id {
113                tracing::error!("Restore happened, exiting");
114                return Ok(());
115             }
116
117
118            let probe_interval = OFFSET_KNOWN_INTERVAL.handle(config.config.config_set());
119            let mut probe_ticker = probe::Ticker::new(|| probe_interval.get(), config.now_fn);
120
121            // Offset that is measured from the upstream SQL Server instance.
122            let mut prev_offset_known: Option<Lsn> = None;
123            // Offset that we have observed from the `resume_uppers` stream.
124            let mut prev_offset_committed: Option<Lsn> = None;
125
126            // This stream of "resume uppers" tracks all of the Lsn's that we have durably
127            // committed for all subsources/exports and thus we can notify the upstream that the
128            // change tables can be cleaned up.
129            let mut committed_uppers = std::pin::pin!(committed_uppers);
130            let cleanup_change_table = CDC_CLEANUP_CHANGE_TABLE.handle(config.config.config_set());
131            let cleanup_max_deletes = CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES.handle(config.config.config_set());
132            let capture_instances: BTreeSet<_> = outputs.into_values().map(|info| info.capture_instance).collect();
133
134            loop {
135                tokio::select! {
136                    probe_ts = probe_ticker.tick() => {
137                        let max_lsn: Lsn = mz_sql_server_util::inspect::get_max_lsn(&mut client).await?;
138                        // We have to return max_lsn + 1 in the probe so that the downstream consumers of
139                        // the probe view the actual max lsn as fully committed and all data at that LSN
140                        // as no longer subject to change. If we don't increment the LSN before emitting
141                        // the probe then data will not be queryable in the tables produced by the Source.
142                        let known_lsn = max_lsn.increment();
143
144                        // The DB should never go backwards, but it's good to know if it does.
145                        let prev_known_lsn = match prev_offset_known {
146                            None => {
147                                prev_offset_known = Some(known_lsn);
148                                known_lsn
149                            },
150                            Some(prev) => prev,
151                        };
152                        if known_lsn < prev_known_lsn {
153                            mz_ore::soft_panic_or_log!(
154                                "upstream SQL Server went backwards in time, current LSN: {known_lsn}, last known {prev_known_lsn}",
155                            );
156                            continue;
157                        }
158
159                        // Update any listeners with our most recently known LSN.
160                        if let Some(prev_commit_lsn) = prev_offset_committed {
161                            let known_lsn_abrv = known_lsn.abbreviate();
162                            let commit_lsn_abrv = prev_commit_lsn.abbreviate();
163                            emit_stats(&stats_cap[0], known_lsn_abrv, commit_lsn_abrv);
164                        }
165
166                        let probe = Probe { probe_ts, upstream_frontier: Antichain::from_elem(known_lsn) };
167                        emit_probe(&probe_cap[0], probe);
168
169                        prev_offset_known = Some(known_lsn);
170                    },
171                    Some(committed_upper) = committed_uppers.next() => {
172                        let Some(committed_upper) = committed_upper.as_option() else {
173                            // It's possible that the source has been dropped, in which case this can
174                            // observe an empty upper. This operator should continue to loop until
175                            // the drop dataflow propagates.
176                            continue;
177                        };
178
179                        // If enabled, tell the upstream SQL Server instance to
180                        // cleanup the underlying change table.
181                        if cleanup_change_table.get() {
182                            for instance in &capture_instances {
183                                // TODO(sql_server3): The number of rows that got cleaned
184                                // up should be present in informational notices sent back
185                                // from the upstream, but the tiberius crate does not
186                                // expose these.
187                                let cleanup_result = mz_sql_server_util::inspect::cleanup_change_table(
188                                    &mut client,
189                                    instance,
190                                    committed_upper,
191                                    cleanup_max_deletes.get(),
192                                ).await;
193                                // TODO(sql_server2): Track this in a more user observable way.
194                                if let Err(err) = cleanup_result {
195                                    tracing::warn!(?err, %instance, "cleanup of change table failed!");
196                                }
197                            }
198                        }
199
200                        // Update any listeners with our most recently committed LSN.
201                        if let Some(prev_known_lsn) = prev_offset_known {
202                            let known_lsn_abrv = prev_known_lsn.abbreviate();
203                            let commit_lsn_abrv = committed_upper.abbreviate();
204                            emit_stats(&stats_cap[0], known_lsn_abrv, commit_lsn_abrv);
205                        }
206                        prev_offset_committed = Some(*committed_upper);
207                    }
208                };
209            }
210        })
211    });
212
213    let error_stream = transient_errors.map(ReplicationError::Transient);
214
215    (
216        stats_stream,
217        error_stream,
218        probe_stream,
219        button.press_on_drop(),
220    )
221}