mz_storage/source/
sql_server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`SqlServerSource`].
11
12use std::collections::BTreeMap;
13use std::convert::Infallible;
14use std::future::Future;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use differential_dataflow::AsCollection;
19use itertools::Itertools;
20use mz_ore::cast::CastFrom;
21use mz_ore::error::ErrorExt;
22use mz_repr::{Diff, GlobalId};
23use mz_sql_server_util::SqlServerError;
24use mz_sql_server_util::cdc::Lsn;
25use mz_sql_server_util::desc::{SqlServerRowDecoder, SqlServerTableDesc};
26use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
27use mz_storage_types::sources::{
28    SourceExport, SourceExportDetails, SourceTimestamp, SqlServerSource,
29};
30use mz_timely_util::builder_async::PressOnDropButton;
31use timely::container::CapacityContainerBuilder;
32use timely::dataflow::operators::core::Partition;
33use timely::dataflow::operators::{Concat, Map, ToStream};
34use timely::dataflow::{Scope, Stream as TimelyStream};
35use timely::progress::Antichain;
36
37use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
38use crate::source::RawSourceCreationConfig;
39use crate::source::types::{
40    Probe, ProgressStatisticsUpdate, SourceMessage, SourceRender, StackedCollection,
41};
42
43mod progress;
44mod replication;
45
46#[derive(Debug, Clone)]
47struct SourceOutputInfo {
48    /// Name of the capture instance in the upstream SQL Server DB.
49    capture_instance: Arc<str>,
50    /// Description of the upstream table.
51    #[allow(dead_code)]
52    upstream_desc: Arc<SqlServerTableDesc>,
53    /// Type that can decode (and map) SQL Server rows into Materialize rows.
54    decoder: Arc<SqlServerRowDecoder>,
55    /// Upper to resume replication from.
56    resume_upper: Antichain<Lsn>,
57    /// An index to split the timely stream.
58    partition_index: u64,
59}
60
61#[derive(Debug, Clone, thiserror::Error)]
62pub enum ReplicationError {
63    #[error(transparent)]
64    Transient(#[from] Rc<TransientError>),
65    #[error(transparent)]
66    DefiniteError(#[from] Rc<DefiniteError>),
67}
68
69#[derive(Debug, thiserror::Error)]
70pub enum TransientError {
71    #[error("stream ended prematurely")]
72    ReplicationEOF,
73    #[error(transparent)]
74    SqlServer(#[from] SqlServerError),
75    #[error("programming error: {0}")]
76    ProgrammingError(String),
77    #[error(transparent)]
78    Generic(#[from] anyhow::Error),
79}
80
81#[derive(Debug, Clone, thiserror::Error)]
82pub enum DefiniteError {
83    #[error("unable to decode: {0}")]
84    ValueDecodeError(String),
85    #[error("failed to decode row: {0}")]
86    Decoding(String),
87    #[error("programming error: {0}")]
88    ProgrammingError(String),
89}
90
91impl From<DefiniteError> for DataflowError {
92    fn from(val: DefiniteError) -> Self {
93        let msg = val.to_string().into();
94        DataflowError::SourceError(Box::new(SourceError {
95            error: SourceErrorDetails::Other(msg),
96        }))
97    }
98}
99
100impl SourceRender for SqlServerSource {
101    type Time = Lsn;
102
103    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::SqlServer;
104
105    fn render<G: Scope<Timestamp = Self::Time>>(
106        self,
107        scope: &mut G,
108        config: &RawSourceCreationConfig,
109        resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
110        _start_signal: impl Future<Output = ()> + 'static,
111    ) -> (
112        // Timely Collection for each Source Export defined in the provided `config`.
113        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
114        TimelyStream<G, Infallible>,
115        TimelyStream<G, HealthStatusMessage>,
116        TimelyStream<G, ProgressStatisticsUpdate>,
117        Option<TimelyStream<G, Probe<Self::Time>>>,
118        Vec<PressOnDropButton>,
119    ) {
120        // Collect the source outputs that we will be exporting.
121        let mut source_outputs = BTreeMap::new();
122        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
123            let SourceExport {
124                details,
125                storage_metadata,
126                data_config: _,
127            } = export;
128
129            let details = match details {
130                SourceExportDetails::SqlServer(details) => details,
131                // This is an export that doesn't need any data output to it.
132                SourceExportDetails::None => continue,
133                other => unreachable!("unexpected source export details: {other:?}"),
134            };
135
136            let decoder = details
137                .table
138                .decoder(&storage_metadata.relation_desc)
139                .expect("TODO handle errors");
140            let upstream_desc = Arc::new(details.table.clone());
141            let resume_upper = config
142                .source_resume_uppers
143                .get(id)
144                .expect("missing resume upper")
145                .iter()
146                .map(Lsn::decode_row);
147
148            let output_info = SourceOutputInfo {
149                capture_instance: Arc::clone(&details.capture_instance),
150                upstream_desc,
151                decoder: Arc::new(decoder),
152                resume_upper: Antichain::from_iter(resume_upper),
153                partition_index: u64::cast_from(idx),
154            };
155            source_outputs.insert(*id, output_info);
156        }
157
158        let (repl_updates, uppers, repl_errs, snapshot_stats, repl_token) = replication::render(
159            scope.clone(),
160            config.clone(),
161            source_outputs.clone(),
162            self.clone(),
163        );
164
165        let (progress_stats, progress_errs, progress_token) = progress::render(
166            scope.clone(),
167            config.clone(),
168            self.connection.clone(),
169            source_outputs.clone(),
170            resume_uppers,
171        );
172
173        let partition_count = u64::cast_from(config.source_exports.len());
174        let data_streams: Vec<_> = repl_updates
175            .inner
176            .partition::<CapacityContainerBuilder<_>, _, _>(
177                partition_count,
178                move |((partition_idx, data), time, diff): &(
179                    (u64, Result<SourceMessage, DataflowError>),
180                    Lsn,
181                    Diff,
182                )| { (*partition_idx, (data.clone(), time.clone(), diff.clone())) },
183            );
184        let mut data_collections = BTreeMap::new();
185        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
186            data_collections.insert(*id, data_stream.as_collection());
187        }
188
189        let health_init = std::iter::once(HealthStatusMessage {
190            id: None,
191            namespace: Self::STATUS_NAMESPACE,
192            update: HealthStatusUpdate::Running,
193        })
194        .to_stream(scope);
195        let health_errs = repl_errs.concat(&progress_errs).map(move |err| {
196            // This update will cause the dataflow to restart
197            let err_string = err.display_with_causes().to_string();
198            let update = HealthStatusUpdate::halting(err_string, None);
199            // TODO(sql_server2): If the error has anything to do with SSH
200            // connections we should use the SSH status namespace.
201            let namespace = Self::STATUS_NAMESPACE;
202
203            HealthStatusMessage {
204                id: None,
205                namespace: namespace.clone(),
206                update,
207            }
208        });
209        let health = health_init.concat(&health_errs);
210
211        let stats = snapshot_stats.concat(&progress_stats);
212
213        (
214            data_collections,
215            uppers,
216            health,
217            stats,
218            None,
219            vec![repl_token, progress_token],
220        )
221    }
222}