1use std::collections::BTreeMap;
13use std::convert::Infallible;
14use std::future::Future;
15use std::rc::Rc;
16use std::sync::Arc;
17
18use differential_dataflow::AsCollection;
19use itertools::Itertools;
20use mz_ore::cast::CastFrom;
21use mz_ore::error::ErrorExt;
22use mz_repr::{Diff, GlobalId};
23use mz_sql_server_util::SqlServerError;
24use mz_sql_server_util::cdc::Lsn;
25use mz_sql_server_util::desc::{SqlServerRowDecoder, SqlServerTableDesc};
26use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
27use mz_storage_types::sources::{
28 SourceExport, SourceExportDetails, SourceTimestamp, SqlServerSource,
29};
30use mz_timely_util::builder_async::PressOnDropButton;
31use timely::container::CapacityContainerBuilder;
32use timely::dataflow::operators::core::Partition;
33use timely::dataflow::operators::{Concat, Map, ToStream};
34use timely::dataflow::{Scope, Stream as TimelyStream};
35use timely::progress::Antichain;
36
37use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
38use crate::source::RawSourceCreationConfig;
39use crate::source::types::{
40 Probe, ProgressStatisticsUpdate, SourceMessage, SourceRender, StackedCollection,
41};
42
43mod progress;
44mod replication;
45
46#[derive(Debug, Clone)]
47struct SourceOutputInfo {
48 capture_instance: Arc<str>,
50 #[allow(dead_code)]
52 upstream_desc: Arc<SqlServerTableDesc>,
53 decoder: Arc<SqlServerRowDecoder>,
55 resume_upper: Antichain<Lsn>,
57 partition_index: u64,
59 initial_lsn: Lsn,
61}
62
63#[derive(Debug, Clone, thiserror::Error)]
64pub enum ReplicationError {
65 #[error(transparent)]
66 Transient(#[from] Rc<TransientError>),
67 #[error(transparent)]
68 DefiniteError(#[from] Rc<DefiniteError>),
69}
70
71#[derive(Debug, thiserror::Error)]
72pub enum TransientError {
73 #[error("stream ended prematurely")]
74 ReplicationEOF,
75 #[error(transparent)]
76 SqlServer(#[from] SqlServerError),
77 #[error(transparent)]
78 Generic(#[from] anyhow::Error),
79}
80
81#[derive(Debug, Clone, thiserror::Error)]
82pub enum DefiniteError {
83 #[error("unable to decode: {0}")]
84 ValueDecodeError(String),
85 #[error("failed to decode row: {0}")]
86 Decoding(String),
87 #[error("programming error: {0}")]
88 ProgrammingError(String),
89 #[error("Restore history id changed from {0:?} to {1:?}")]
90 RestoreHistoryChanged(Option<i32>, Option<i32>),
91}
92
93impl From<DefiniteError> for DataflowError {
94 fn from(val: DefiniteError) -> Self {
95 let msg = val.to_string().into();
96 DataflowError::SourceError(Box::new(SourceError {
97 error: SourceErrorDetails::Other(msg),
98 }))
99 }
100}
101
102impl SourceRender for SqlServerSource {
103 type Time = Lsn;
104
105 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::SqlServer;
106
107 fn render<G: Scope<Timestamp = Self::Time>>(
108 self,
109 scope: &mut G,
110 config: &RawSourceCreationConfig,
111 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
112 _start_signal: impl Future<Output = ()> + 'static,
113 ) -> (
114 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
116 TimelyStream<G, Infallible>,
117 TimelyStream<G, HealthStatusMessage>,
118 TimelyStream<G, ProgressStatisticsUpdate>,
119 Option<TimelyStream<G, Probe<Self::Time>>>,
120 Vec<PressOnDropButton>,
121 ) {
122 let mut source_outputs = BTreeMap::new();
124 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
125 let SourceExport {
126 details,
127 storage_metadata,
128 data_config: _,
129 } = export;
130
131 let details = match details {
132 SourceExportDetails::SqlServer(details) => details,
133 SourceExportDetails::None => continue,
135 other => unreachable!("unexpected source export details: {other:?}"),
136 };
137
138 let decoder = details
139 .table
140 .decoder(&storage_metadata.relation_desc)
141 .expect("TODO handle errors");
142 let upstream_desc = Arc::new(details.table.clone());
143 let resume_upper = config
144 .source_resume_uppers
145 .get(id)
146 .expect("missing resume upper")
147 .iter()
148 .map(Lsn::decode_row);
149
150 let output_info = SourceOutputInfo {
151 capture_instance: Arc::clone(&details.capture_instance),
152 upstream_desc,
153 decoder: Arc::new(decoder),
154 resume_upper: Antichain::from_iter(resume_upper),
155 partition_index: u64::cast_from(idx),
156 initial_lsn: details.initial_lsn,
157 };
158 source_outputs.insert(*id, output_info);
159 }
160
161 let (repl_updates, uppers, repl_errs, snapshot_stats, repl_token) = replication::render(
162 scope.clone(),
163 config.clone(),
164 source_outputs.clone(),
165 self.clone(),
166 );
167
168 let (progress_stats, progress_errs, progress_probes, progress_token) = progress::render(
169 scope.clone(),
170 config.clone(),
171 self.connection.clone(),
172 source_outputs.clone(),
173 resume_uppers,
174 self.extras.clone(),
175 );
176
177 let partition_count = u64::cast_from(config.source_exports.len());
178 let data_streams: Vec<_> = repl_updates
179 .inner
180 .partition::<CapacityContainerBuilder<_>, _, _>(
181 partition_count,
182 move |((partition_idx, data), time, diff): &(
183 (u64, Result<SourceMessage, DataflowError>),
184 Lsn,
185 Diff,
186 )| { (*partition_idx, (data.clone(), time.clone(), diff.clone())) },
187 );
188 let mut data_collections = BTreeMap::new();
189 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
190 data_collections.insert(*id, data_stream.as_collection());
191 }
192
193 let health_init = std::iter::once(HealthStatusMessage {
194 id: None,
195 namespace: Self::STATUS_NAMESPACE,
196 update: HealthStatusUpdate::Running,
197 })
198 .to_stream(scope);
199 let health_errs = repl_errs.concat(&progress_errs).map(move |err| {
200 let err_string = err.display_with_causes().to_string();
202 let update = HealthStatusUpdate::halting(err_string, None);
203 let namespace = Self::STATUS_NAMESPACE;
206
207 HealthStatusMessage {
208 id: None,
209 namespace: namespace.clone(),
210 update,
211 }
212 });
213 let health = health_init.concat(&health_errs);
214
215 let stats = snapshot_stats.concat(&progress_stats);
216
217 (
218 data_collections,
219 uppers,
220 health,
221 stats,
222 Some(progress_probes),
223 vec![repl_token, progress_token],
224 )
225 }
226}