Skip to main content

mz_storage/source/sql_server/
progress.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A "non-critical" operator that tracks the progress of a [`SqlServerSourceConnection`].
11//!
12//! The operator does the following:
13//!
14//! * At some cadence [`OFFSET_KNOWN_INTERVAL`] will probe the source for the max
15//!   [`Lsn`], emit the upstream known offset, and update `SourceStatistics`.
16//! * Listen to a provided [`futures::Stream`] of resume uppers, which represents
17//!   the durably committed upper for _all_ of the subsources/exports associated
18//!   with this source. As the source makes progress this operator does two
19//!   things:
20//!     1. If [`CDC_CLEANUP_CHANGE_TABLE`] is enabled, will delete entries from
21//!        the upstream change table that we've already ingested.
22//!     2. Update `SourceStatistics` to notify listeners of a new
23//!        "committed LSN".
24//!
25//! [`SqlServerSourceConnection`]: mz_storage_types::sources::SqlServerSourceConnection
26
27use std::collections::{BTreeMap, BTreeSet};
28
29use futures::StreamExt;
30use mz_ore::future::InTask;
31use mz_repr::GlobalId;
32use mz_sql_server_util::cdc::Lsn;
33use mz_sql_server_util::inspect::{get_latest_restore_history_id, get_max_lsn};
34use mz_storage_types::connections::SqlServerConnectionDetails;
35use mz_storage_types::dyncfgs::SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY;
36use mz_storage_types::sources::SqlServerSourceExtras;
37use mz_storage_types::sources::sql_server::{
38    CDC_CLEANUP_CHANGE_TABLE, CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES, OFFSET_KNOWN_INTERVAL,
39};
40use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
41use timely::container::CapacityContainerBuilder;
42use timely::dataflow::operators::Map;
43use timely::dataflow::{Scope, Stream as TimelyStream};
44use timely::progress::Antichain;
45
46use crate::source::sql_server::{ReplicationError, SourceOutputInfo, TransientError};
47use crate::source::types::Probe;
48use crate::source::{RawSourceCreationConfig, probe};
49
50/// Used as a partition ID to determine the worker that is responsible for
51/// handling progress.
52static PROGRESS_WORKER: &str = "progress";
53
54pub(crate) fn render<G: Scope<Timestamp = Lsn>>(
55    scope: G,
56    config: RawSourceCreationConfig,
57    connection: SqlServerConnectionDetails,
58    outputs: BTreeMap<GlobalId, SourceOutputInfo>,
59    committed_uppers: impl futures::Stream<Item = Antichain<Lsn>> + 'static,
60    extras: SqlServerSourceExtras,
61) -> (
62    TimelyStream<G, ReplicationError>,
63    TimelyStream<G, Probe<Lsn>>,
64    PressOnDropButton,
65) {
66    let op_name = format!("SqlServerProgress({})", config.id);
67    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
68
69    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
70
71    let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
72        Box::pin(async move {
73            let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
74
75            let emit_probe = |cap, probe: Probe<Lsn>| {
76                probe_output.give(cap, probe);
77            };
78
79            // Only a single worker is responsible for processing progress.
80            if !config.responsible_for(PROGRESS_WORKER) {
81                // Emit 0 to mark this worker as having started up correctly.
82                for stat in config.statistics.values() {
83                    stat.set_offset_known(0);
84                    stat.set_offset_committed(0);
85                }
86            }
87            let conn_config = connection
88                .resolve_config(
89                    &config.config.connection_context.secrets_reader,
90                    &config.config,
91                    InTask::Yes,
92                )
93                .await?;
94            let mut client = mz_sql_server_util::Client::connect(conn_config).await?;
95
96
97            // Terminate the progress probes if a restore has happened. Replication operator will
98            // emit a definite error at the max LSN, but we also have to terminate the RLU probes
99            // to ensure that the error propogates to downstream consumers, otherwise it will
100            // wait in reclock as the server LSN will always be less than the LSN of the definite
101            // error.
102            let current_restore_history_id = get_latest_restore_history_id(&mut client).await?;
103            if current_restore_history_id != extras.restore_history_id
104                && SQL_SERVER_SOURCE_VALIDATE_RESTORE_HISTORY.get(config.config.config_set()) {
105                tracing::warn!("Restore detected, exiting");
106                return Ok(());
107             }
108
109
110            let probe_interval = OFFSET_KNOWN_INTERVAL.handle(config.config.config_set());
111            let mut probe_ticker = probe::Ticker::new(|| probe_interval.get(), config.now_fn);
112
113            // Offset that is measured from the upstream SQL Server instance. Tracked to detect an offset that moves backwards.
114            let mut prev_offset_known: Option<Lsn> = None;
115
116            // This stream of "resume uppers" tracks all of the Lsn's that we have durably
117            // committed for all subsources/exports and thus we can notify the upstream that the
118            // change tables can be cleaned up.
119            let mut committed_uppers = std::pin::pin!(committed_uppers);
120            let cleanup_change_table =
121                CDC_CLEANUP_CHANGE_TABLE.handle(config.config.config_set());
122            let cleanup_max_deletes =
123                CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES
124                    .handle(config.config.config_set());
125            let capture_instances: BTreeSet<_> = outputs
126                .into_values()
127                .map(|info| info.capture_instance)
128                .collect();
129
130            loop {
131                tokio::select! {
132                    probe_ts = probe_ticker.tick() => {
133                        let max_lsn: Lsn = get_max_lsn(&mut client).await?;
134                        // We have to return max_lsn + 1 in the probe so that the downstream consumers of
135                        // the probe view the actual max lsn as fully committed and all data at that LSN
136                        // as no longer subject to change. If we don't increment the LSN before emitting
137                        // the probe then data will not be queryable in the tables produced by the Source.
138                        let known_lsn = max_lsn.increment();
139                        for stat in config.statistics.values() {
140                            stat.set_offset_known(known_lsn.abbreviate());
141                        }
142
143
144                        // The DB should never go backwards, but it's good to know if it does.
145                        let prev_known_lsn = match prev_offset_known {
146                            None => {
147                                prev_offset_known = Some(known_lsn);
148                                known_lsn
149                            },
150                            Some(prev) => prev,
151                        };
152                        if known_lsn < prev_known_lsn {
153                            mz_ore::soft_panic_or_log!(
154                                "upstream SQL Server went backwards \
155                                 in time, current LSN: {known_lsn}, \
156                                 last known {prev_known_lsn}",
157                            );
158                            continue;
159                        }
160                        let probe = Probe {
161                            probe_ts,
162                            upstream_frontier: Antichain::from_elem(known_lsn),
163                        };
164                        emit_probe(&probe_cap[0], probe);
165                        prev_offset_known = Some(known_lsn);
166                    },
167                    Some(committed_upper) = committed_uppers.next() => {
168                        let Some(committed_upper) = committed_upper.as_option() else {
169                            // It's possible that the source has been dropped, in which case this can
170                            // observe an empty upper. This operator should continue to loop until
171                            // the drop dataflow propagates.
172                            continue;
173                        };
174
175                        // If enabled, tell the upstream SQL Server instance to
176                        // cleanup the underlying change table.
177                        if cleanup_change_table.get() {
178                            for instance in &capture_instances {
179                                // TODO(sql_server3): The number of rows that got cleaned
180                                // up should be present in informational notices sent back
181                                // from the upstream, but the tiberius crate does not
182                                // expose these.
183                                let cleanup_result =
184                                    mz_sql_server_util::inspect::cleanup_change_table(
185                                        &mut client,
186                                        instance,
187                                        committed_upper,
188                                        cleanup_max_deletes.get(),
189                                    ).await;
190                                // TODO(sql_server2): Track this in a more user observable way.
191                                if let Err(err) = cleanup_result {
192                                    tracing::warn!(?err, %instance, "cleanup of change table failed!");
193                                }
194                            }
195                        }
196                        for stat in config.statistics.values() {
197                            stat.set_offset_committed(committed_upper.abbreviate());
198                        }
199                    }
200                };
201            }
202        })
203    });
204
205    let error_stream = transient_errors.map(ReplicationError::Transient);
206
207    (error_stream, probe_stream, button.press_on_drop())
208}