1use std::collections::BTreeMap;
54use std::convert::Infallible;
55use std::fmt;
56use std::io;
57use std::rc::Rc;
58
59use differential_dataflow::AsCollection;
60use differential_dataflow::containers::TimelyStack;
61use itertools::Itertools;
62use mz_mysql_util::quote_identifier;
63use mz_ore::cast::CastFrom;
64use mz_repr::Diff;
65use mz_repr::GlobalId;
66use mz_storage_types::errors::{DataflowError, SourceError};
67use mz_storage_types::sources::SourceExport;
68use mz_timely_util::containers::stack::AccountedStackBuilder;
69use serde::{Deserialize, Serialize};
70use timely::container::CapacityContainerBuilder;
71use timely::dataflow::channels::pushers::Tee;
72use timely::dataflow::operators::core::Partition;
73use timely::dataflow::operators::{CapabilitySet, Concat, Map, ToStream};
74use timely::dataflow::{Scope, Stream};
75use timely::progress::Antichain;
76use uuid::Uuid;
77
78use mz_mysql_util::{
79 MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
80 ensure_replication_commit_order,
81};
82use mz_ore::error::ErrorExt;
83use mz_storage_types::errors::SourceErrorDetails;
84use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
85use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
86use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
87use mz_timely_util::order::Extrema;
88
89use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
90use crate::source::types::Probe;
91use crate::source::types::{ProgressStatisticsUpdate, SourceRender, StackedCollection};
92use crate::source::{RawSourceCreationConfig, SourceMessage};
93
94mod replication;
95mod schemas;
96mod snapshot;
97mod statistics;
98
99impl SourceRender for MySqlSourceConnection {
100 type Time = GtidPartition;
101
102 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
103
104 fn render<G: Scope<Timestamp = GtidPartition>>(
107 self,
108 scope: &mut G,
109 config: &RawSourceCreationConfig,
110 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
111 _start_signal: impl std::future::Future<Output = ()> + 'static,
112 ) -> (
113 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
114 Stream<G, Infallible>,
115 Stream<G, HealthStatusMessage>,
116 Stream<G, ProgressStatisticsUpdate>,
117 Option<Stream<G, Probe<GtidPartition>>>,
118 Vec<PressOnDropButton>,
119 ) {
120 let mut source_outputs = Vec::new();
122 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
123 let SourceExport {
124 details,
125 storage_metadata: _,
126 data_config: _,
127 } = export;
128 let details = match details {
129 SourceExportDetails::MySql(details) => details,
130 SourceExportDetails::None => continue,
132 _ => panic!("unexpected source export details: {:?}", details),
133 };
134
135 let desc = details.table.clone();
136 let initial_gtid_set = details.initial_gtid_set.to_string();
137 let resume_upper = Antichain::from_iter(
138 config
139 .source_resume_uppers
140 .get(id)
141 .expect("missing resume upper")
142 .iter()
143 .map(GtidPartition::decode_row),
144 );
145 let name = MySqlTableName::new(&desc.schema_name, &desc.name);
146 source_outputs.push(SourceOutputInfo {
147 output_index: idx,
148 table_name: name.clone(),
149 desc,
150 text_columns: details.text_columns.clone(),
151 exclude_columns: details.exclude_columns.clone(),
152 initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
153 resume_upper,
154 });
155 }
156
157 let metrics = config.metrics.get_mysql_source_metrics(config.id);
158
159 let (snapshot_updates, rewinds, snapshot_stats, snapshot_err, snapshot_token) =
160 snapshot::render(
161 scope.clone(),
162 config.clone(),
163 self.clone(),
164 source_outputs.clone(),
165 metrics.snapshot_metrics.clone(),
166 );
167
168 let (repl_updates, uppers, repl_err, repl_token) = replication::render(
169 scope.clone(),
170 config.clone(),
171 self.clone(),
172 source_outputs,
173 &rewinds,
174 metrics,
175 );
176
177 let (stats_stream, stats_err, probe_stream, stats_token) =
178 statistics::render(scope.clone(), config.clone(), self, resume_uppers);
179
180 let stats_stream = stats_stream.concat(&snapshot_stats);
181
182 let updates = snapshot_updates.concat(&repl_updates);
183 let partition_count = u64::cast_from(config.source_exports.len());
184 let data_streams: Vec<_> = updates
185 .inner
186 .partition::<CapacityContainerBuilder<_>, _, _>(
187 partition_count,
188 |((output, data), time, diff): &(
189 (usize, Result<SourceMessage, DataflowError>),
190 _,
191 Diff,
192 )| {
193 let output = u64::cast_from(*output);
194 (output, (data.clone(), time.clone(), diff.clone()))
195 },
196 );
197 let mut data_collections = BTreeMap::new();
198 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
199 data_collections.insert(*id, data_stream.as_collection());
200 }
201
202 let health_init = std::iter::once(HealthStatusMessage {
203 id: None,
204 namespace: Self::STATUS_NAMESPACE,
205 update: HealthStatusUpdate::Running,
206 })
207 .to_stream(scope);
208
209 let health_errs = snapshot_err
210 .concat(&repl_err)
211 .concat(&stats_err)
212 .map(move |err| {
213 let err_string = err.display_with_causes().to_string();
215 let update = HealthStatusUpdate::halting(err_string.clone(), None);
216
217 let namespace = match err {
218 ReplicationError::Transient(err)
219 if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
220 {
221 StatusNamespace::Ssh
222 }
223 _ => Self::STATUS_NAMESPACE,
224 };
225
226 HealthStatusMessage {
227 id: None,
228 namespace: namespace.clone(),
229 update,
230 }
231 });
232 let health = health_init.concat(&health_errs);
233
234 (
235 data_collections,
236 uppers,
237 health,
238 stats_stream,
239 Some(probe_stream),
240 vec![snapshot_token, repl_token, stats_token],
241 )
242 }
243}
244
245#[derive(Clone, Debug)]
246struct SourceOutputInfo {
247 output_index: usize,
248 table_name: MySqlTableName,
249 desc: MySqlTableDesc,
250 text_columns: Vec<String>,
251 exclude_columns: Vec<String>,
252 initial_gtid_set: Antichain<GtidPartition>,
253 resume_upper: Antichain<GtidPartition>,
254}
255
256#[derive(Clone, Debug, thiserror::Error)]
257pub enum ReplicationError {
258 #[error(transparent)]
259 Transient(#[from] Rc<TransientError>),
260 #[error(transparent)]
261 Definite(#[from] Rc<DefiniteError>),
262}
263
264#[derive(Debug, thiserror::Error)]
266pub enum TransientError {
267 #[error("couldn't decode binlog row")]
268 BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
269 #[error("stream ended prematurely")]
270 ReplicationEOF,
271 #[error(transparent)]
272 IoError(#[from] io::Error),
273 #[error("sql client error")]
274 SQLClient(#[from] mysql_async::Error),
275 #[error("ident decode error")]
276 IdentError(#[from] mz_sql_parser::ast::IdentError),
277 #[error(transparent)]
278 MySqlError(#[from] MySqlError),
279 #[error(transparent)]
280 Generic(#[from] anyhow::Error),
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
285pub enum DefiniteError {
286 #[error("unable to decode: {0}")]
287 ValueDecodeError(String),
288 #[error("table was truncated: {0}")]
289 TableTruncated(String),
290 #[error("table was dropped: {0}")]
291 TableDropped(String),
292 #[error("incompatible schema change: {0}")]
293 IncompatibleSchema(String),
294 #[error("received a gtid set from the server that violates our requirements: {0}")]
295 UnsupportedGtidState(String),
296 #[error("received out of order gtids for source {0} at transaction-id {1}")]
297 BinlogGtidMonotonicityViolation(String, GtidState),
298 #[error("mysql server does not have the binlog available at the requested gtid set")]
299 BinlogNotAvailable,
300 #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
301 BinlogMissingResumePoint(String, String),
302 #[error("mysql server configuration: {0}")]
303 ServerConfigurationError(String),
304}
305
306impl From<DefiniteError> for DataflowError {
307 fn from(err: DefiniteError) -> Self {
308 let m = err.to_string().into();
309 DataflowError::SourceError(Box::new(SourceError {
310 error: match &err {
311 DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
312 DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
313 DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
314 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
315 DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
316 DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
317 SourceErrorDetails::Other(m)
318 }
319 DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
320 DefiniteError::BinlogMissingResumePoint(_, _) => {
321 SourceErrorDetails::Initialization(m)
322 }
323 DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
324 },
325 }))
326 }
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
333pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
334
335impl MySqlTableName {
336 pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
337 Self(schema_name.to_string(), table_name.to_string())
338 }
339}
340
341impl fmt::Display for MySqlTableName {
342 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343 write!(
344 f,
345 "{}.{}",
346 quote_identifier(&self.0),
347 quote_identifier(&self.1)
348 )
349 }
350}
351
352impl From<&MySqlTableDesc> for MySqlTableName {
353 fn from(desc: &MySqlTableDesc) -> Self {
354 Self::new(&desc.schema_name, &desc.name)
355 }
356}
357
358#[derive(Debug, Clone, Deserialize, Serialize)]
359pub(crate) struct RewindRequest {
360 pub(crate) output_index: usize,
362 pub(crate) snapshot_upper: Antichain<GtidPartition>,
365}
366
367type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
368 T,
369 AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
370 Tee<T, TimelyStack<(D, T, Diff)>>,
371>;
372
373async fn return_definite_error(
374 err: DefiniteError,
375 outputs: &[usize],
376 data_handle: &StackedAsyncOutputHandle<
377 GtidPartition,
378 (usize, Result<SourceMessage, DataflowError>),
379 >,
380 data_cap_set: &CapabilitySet<GtidPartition>,
381 definite_error_handle: &AsyncOutputHandle<
382 GtidPartition,
383 CapacityContainerBuilder<Vec<ReplicationError>>,
384 Tee<GtidPartition, Vec<ReplicationError>>,
385 >,
386 definite_error_cap_set: &CapabilitySet<GtidPartition>,
387) {
388 for output_index in outputs {
389 let update = (
390 (*output_index, Err(err.clone().into())),
391 GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
392 Diff::ONE,
393 );
394 data_handle.give_fueled(&data_cap_set[0], update).await;
395 }
396 definite_error_handle.give(
397 &definite_error_cap_set[0],
398 ReplicationError::Definite(Rc::new(err)),
399 );
400 ()
401}
402
403async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
404 ensure_gtid_consistency(conn).await?;
405 ensure_full_row_binlog_format(conn).await?;
406 ensure_replication_commit_order(conn).await?;
407
408 Ok(())
409}