1use std::collections::BTreeMap;
13use std::future::Future;
14use std::rc::Rc;
15use std::sync::Arc;
16
17use differential_dataflow::AsCollection;
18use itertools::Itertools;
19use mz_ore::cast::CastFrom;
20use mz_ore::error::ErrorExt;
21use mz_repr::{Diff, GlobalId};
22use mz_sql_server_util::SqlServerError;
23use mz_sql_server_util::cdc::Lsn;
24use mz_sql_server_util::desc::{SqlServerRowDecoder, SqlServerTableDesc};
25use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
26use mz_storage_types::sources::{
27 SourceExport, SourceExportDetails, SourceTimestamp, SqlServerSourceConnection,
28};
29use mz_timely_util::builder_async::PressOnDropButton;
30use timely::container::CapacityContainerBuilder;
31use timely::dataflow::operators::Concat;
32use timely::dataflow::operators::core::Partition;
33use timely::dataflow::operators::vec::{Map, ToStream};
34use timely::dataflow::{Scope, StreamVec};
35use timely::progress::Antichain;
36
37use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
38use crate::source::RawSourceCreationConfig;
39use crate::source::types::{Probe, SourceMessage, SourceRender, StackedCollection};
40
41mod progress;
42mod replication;
43
44#[derive(Debug, Clone)]
45struct SourceOutputInfo {
46 capture_instance: Arc<str>,
48 #[allow(dead_code)]
50 upstream_desc: Arc<SqlServerTableDesc>,
51 decoder: Arc<SqlServerRowDecoder>,
53 resume_upper: Antichain<Lsn>,
55 partition_index: u64,
57 initial_lsn: Lsn,
59}
60
61#[derive(Debug, Clone, thiserror::Error)]
62pub enum ReplicationError {
63 #[error(transparent)]
64 Transient(#[from] Rc<TransientError>),
65 #[error(transparent)]
66 DefiniteError(#[from] Rc<DefiniteError>),
67}
68
69#[derive(Debug, thiserror::Error)]
70pub enum TransientError {
71 #[error("stream ended prematurely")]
72 ReplicationEOF,
73 #[error(transparent)]
74 SqlServer(#[from] SqlServerError),
75 #[error(transparent)]
76 Generic(#[from] anyhow::Error),
77}
78
79#[derive(Debug, Clone, thiserror::Error)]
80pub enum DefiniteError {
81 #[error("unable to decode: {0}")]
82 ValueDecodeError(String),
83 #[error("failed to decode row: {0}")]
84 Decoding(String),
85 #[error("programming error: {0}")]
86 ProgrammingError(String),
87 #[error("Restore history id changed from {0:?} to {1:?}")]
88 RestoreHistoryChanged(Option<i32>, Option<i32>),
89 #[error("Incompatible schema change for table {0} capture instance {1}")]
90 IncompatibleSchemaChange(String, String),
91}
92
93impl From<DefiniteError> for DataflowError {
94 fn from(val: DefiniteError) -> Self {
95 let msg = val.to_string().into();
96 DataflowError::SourceError(Box::new(SourceError {
97 error: SourceErrorDetails::Other(msg),
98 }))
99 }
100}
101
102impl SourceRender for SqlServerSourceConnection {
103 type Time = Lsn;
104
105 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::SqlServer;
106
107 fn render<G: Scope<Timestamp = Self::Time>>(
108 self,
109 scope: &mut G,
110 config: &RawSourceCreationConfig,
111 resume_uppers: impl futures::Stream<Item = Antichain<Self::Time>> + 'static,
112 _start_signal: impl Future<Output = ()> + 'static,
113 ) -> (
114 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
116 StreamVec<G, HealthStatusMessage>,
117 StreamVec<G, Probe<Self::Time>>,
118 Vec<PressOnDropButton>,
119 ) {
120 let mut source_outputs = BTreeMap::new();
122 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
123 let SourceExport {
124 details,
125 storage_metadata,
126 data_config: _,
127 } = export;
128
129 let details = match details {
130 SourceExportDetails::SqlServer(details) => details,
131 SourceExportDetails::None => continue,
133 other => unreachable!("unexpected source export details: {other:?}"),
134 };
135
136 let decoder = details
137 .table
138 .decoder(&storage_metadata.relation_desc)
139 .expect("TODO handle errors");
140 let upstream_desc = Arc::new(details.table.clone());
141 let resume_upper = config
142 .source_resume_uppers
143 .get(id)
144 .expect("missing resume upper")
145 .iter()
146 .map(Lsn::decode_row);
147
148 let output_info = SourceOutputInfo {
149 capture_instance: Arc::clone(&details.capture_instance),
150 upstream_desc,
151 decoder: Arc::new(decoder),
152 resume_upper: Antichain::from_iter(resume_upper),
153 partition_index: u64::cast_from(idx),
154 initial_lsn: details.initial_lsn,
155 };
156 source_outputs.insert(*id, output_info);
157 }
158
159 let metrics = config
160 .metrics
161 .get_sql_server_source_metrics(config.id, config.worker_id);
162
163 let (repl_updates, repl_errs, repl_token) = replication::render(
164 scope.clone(),
165 config.clone(),
166 source_outputs.clone(),
167 self.clone(),
168 metrics,
169 );
170
171 let (progress_errs, progress_probes, progress_token) = progress::render(
172 scope.clone(),
173 config.clone(),
174 self.connection.clone(),
175 source_outputs.clone(),
176 resume_uppers,
177 self.extras.clone(),
178 );
179
180 let partition_count = u64::cast_from(config.source_exports.len());
181 let data_streams: Vec<_> = repl_updates
182 .inner
183 .partition::<CapacityContainerBuilder<_>, _, _>(
184 partition_count,
185 move |((partition_idx, data), time, diff): &(
186 (u64, Result<SourceMessage, DataflowError>),
187 Lsn,
188 Diff,
189 )| { (*partition_idx, (data.clone(), time.clone(), diff.clone())) },
190 );
191 let mut data_collections = BTreeMap::new();
192 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
193 data_collections.insert(*id, data_stream.as_collection());
194 }
195
196 let export_ids = config.source_exports.keys().copied();
197 let health_init = export_ids
198 .map(Some)
199 .chain(std::iter::once(None))
200 .map(|id| HealthStatusMessage {
201 id,
202 namespace: Self::STATUS_NAMESPACE,
203 update: HealthStatusUpdate::Running,
204 })
205 .collect::<Vec<_>>()
206 .to_stream(scope);
207
208 let health_errs = repl_errs.concat(progress_errs).map(move |err| {
209 let err_string = err.display_with_causes().to_string();
211 let update = HealthStatusUpdate::halting(err_string, None);
212 let namespace = Self::STATUS_NAMESPACE;
215
216 HealthStatusMessage {
217 id: None,
218 namespace: namespace.clone(),
219 update,
220 }
221 });
222 let health = health_init.concat(health_errs);
223
224 (
225 data_collections,
226 health,
227 progress_probes,
228 vec![repl_token, progress_token],
229 )
230 }
231}