1use std::collections::BTreeMap;
13use std::convert::Infallible;
14use std::future::Future;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use differential_dataflow::AsCollection;
19use itertools::Itertools;
20use mz_ore::cast::CastFrom;
21use mz_ore::error::ErrorExt;
22use mz_repr::{Diff, GlobalId};
23use mz_sql_server_util::SqlServerError;
24use mz_sql_server_util::cdc::Lsn;
25use mz_sql_server_util::desc::{SqlServerRowDecoder, SqlServerTableDesc};
26use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
27use mz_storage_types::sources::{
28    SourceExport, SourceExportDetails, SourceTimestamp, SqlServerSourceConnection,
29};
30use mz_timely_util::builder_async::PressOnDropButton;
31use timely::container::CapacityContainerBuilder;
32use timely::dataflow::operators::core::Partition;
33use timely::dataflow::operators::{Concat, Map, ToStream};
34use timely::dataflow::{Scope, Stream as TimelyStream};
35use timely::progress::Antichain;
36
37use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
38use crate::source::RawSourceCreationConfig;
39use crate::source::types::{Probe, SourceMessage, SourceRender, StackedCollection};
40
41mod progress;
42mod replication;
43
44#[derive(Debug, Clone)]
45struct SourceOutputInfo {
46    capture_instance: Arc<str>,
48    #[allow(dead_code)]
50    upstream_desc: Arc<SqlServerTableDesc>,
51    decoder: Arc<SqlServerRowDecoder>,
53    resume_upper: Antichain<Lsn>,
55    partition_index: u64,
57    initial_lsn: Lsn,
59}
60
61#[derive(Debug, Clone, thiserror::Error)]
62pub enum ReplicationError {
63    #[error(transparent)]
64    Transient(#[from] Rc<TransientError>),
65    #[error(transparent)]
66    DefiniteError(#[from] Rc<DefiniteError>),
67}
68
69#[derive(Debug, thiserror::Error)]
70pub enum TransientError {
71    #[error("stream ended prematurely")]
72    ReplicationEOF,
73    #[error(transparent)]
74    SqlServer(#[from] SqlServerError),
75    #[error(transparent)]
76    Generic(#[from] anyhow::Error),
77}
78
79#[derive(Debug, Clone, thiserror::Error)]
80pub enum DefiniteError {
81    #[error("unable to decode: {0}")]
82    ValueDecodeError(String),
83    #[error("failed to decode row: {0}")]
84    Decoding(String),
85    #[error("programming error: {0}")]
86    ProgrammingError(String),
87    #[error("Restore history id changed from {0:?} to {1:?}")]
88    RestoreHistoryChanged(Option<i32>, Option<i32>),
89    #[error("Incompatible schema change for table {0} capture instance {1}")]
90    IncompatibleSchemaChange(String, String),
91}
92
93impl From<DefiniteError> for DataflowError {
94    fn from(val: DefiniteError) -> Self {
95        let msg = val.to_string().into();
96        DataflowError::SourceError(Box::new(SourceError {
97            error: SourceErrorDetails::Other(msg),
98        }))
99    }
100}
101
102impl SourceRender for SqlServerSourceConnection {
103    type Time = Lsn;
104
105    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::SqlServer;
106
107    fn render<G: Scope<Timestamp = Self::Time>>(
108        self,
109        scope: &mut G,
110        config: &RawSourceCreationConfig,
111        resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
112        _start_signal: impl Future<Output = ()> + 'static,
113    ) -> (
114        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
116        TimelyStream<G, Infallible>,
117        TimelyStream<G, HealthStatusMessage>,
118        Option<TimelyStream<G, Probe<Self::Time>>>,
119        Vec<PressOnDropButton>,
120    ) {
121        let mut source_outputs = BTreeMap::new();
123        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
124            let SourceExport {
125                details,
126                storage_metadata,
127                data_config: _,
128            } = export;
129
130            let details = match details {
131                SourceExportDetails::SqlServer(details) => details,
132                SourceExportDetails::None => continue,
134                other => unreachable!("unexpected source export details: {other:?}"),
135            };
136
137            let decoder = details
138                .table
139                .decoder(&storage_metadata.relation_desc)
140                .expect("TODO handle errors");
141            let upstream_desc = Arc::new(details.table.clone());
142            let resume_upper = config
143                .source_resume_uppers
144                .get(id)
145                .expect("missing resume upper")
146                .iter()
147                .map(Lsn::decode_row);
148
149            let output_info = SourceOutputInfo {
150                capture_instance: Arc::clone(&details.capture_instance),
151                upstream_desc,
152                decoder: Arc::new(decoder),
153                resume_upper: Antichain::from_iter(resume_upper),
154                partition_index: u64::cast_from(idx),
155                initial_lsn: details.initial_lsn,
156            };
157            source_outputs.insert(*id, output_info);
158        }
159
160        let (repl_updates, uppers, repl_errs, repl_token) = replication::render(
161            scope.clone(),
162            config.clone(),
163            source_outputs.clone(),
164            self.clone(),
165        );
166
167        let (progress_errs, progress_probes, progress_token) = progress::render(
168            scope.clone(),
169            config.clone(),
170            self.connection.clone(),
171            source_outputs.clone(),
172            resume_uppers,
173            self.extras.clone(),
174        );
175
176        let partition_count = u64::cast_from(config.source_exports.len());
177        let data_streams: Vec<_> = repl_updates
178            .inner
179            .partition::<CapacityContainerBuilder<_>, _, _>(
180                partition_count,
181                move |((partition_idx, data), time, diff): &(
182                    (u64, Result<SourceMessage, DataflowError>),
183                    Lsn,
184                    Diff,
185                )| { (*partition_idx, (data.clone(), time.clone(), diff.clone())) },
186            );
187        let mut data_collections = BTreeMap::new();
188        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
189            data_collections.insert(*id, data_stream.as_collection());
190        }
191
192        let export_ids = config.source_exports.keys().copied();
193        let health_init = export_ids
194            .map(Some)
195            .chain(std::iter::once(None))
196            .map(|id| HealthStatusMessage {
197                id,
198                namespace: Self::STATUS_NAMESPACE,
199                update: HealthStatusUpdate::Running,
200            })
201            .collect::<Vec<_>>()
202            .to_stream(scope);
203
204        let health_errs = repl_errs.concat(&progress_errs).map(move |err| {
205            let err_string = err.display_with_causes().to_string();
207            let update = HealthStatusUpdate::halting(err_string, None);
208            let namespace = Self::STATUS_NAMESPACE;
211
212            HealthStatusMessage {
213                id: None,
214                namespace: namespace.clone(),
215                update,
216            }
217        });
218        let health = health_init.concat(&health_errs);
219
220        (
221            data_collections,
222            uppers,
223            health,
224            Some(progress_probes),
225            vec![repl_token, progress_token],
226        )
227    }
228}