1use std::collections::BTreeMap;
54use std::fmt;
55use std::io;
56use std::rc::Rc;
57
58use differential_dataflow::AsCollection;
59use itertools::Itertools;
60use mz_mysql_util::quote_identifier;
61use mz_ore::cast::CastFrom;
62use mz_repr::Diff;
63use mz_repr::GlobalId;
64use mz_storage_types::errors::{DataflowError, SourceError};
65use mz_storage_types::sources::SourceExport;
66use mz_timely_util::containers::stack::FueledBuilder;
67use serde::{Deserialize, Serialize};
68use timely::container::CapacityContainerBuilder;
69use timely::dataflow::operators::core::Partition;
70use timely::dataflow::operators::vec::{Map, ToStream};
71use timely::dataflow::operators::{CapabilitySet, Concat};
72use timely::dataflow::{Scope, StreamVec};
73use timely::progress::Antichain;
74use uuid::Uuid;
75
76use mz_mysql_util::{
77 MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
78 ensure_replication_commit_order,
79};
80use mz_ore::error::ErrorExt;
81use mz_storage_types::errors::SourceErrorDetails;
82use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
83use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
84use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
85use mz_timely_util::order::Extrema;
86
87use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
88use crate::source::types::Probe;
89use crate::source::types::{FuelSize, SourceRender, StackedCollection};
90use crate::source::{RawSourceCreationConfig, SourceMessage};
91
92mod replication;
93mod schemas;
94mod snapshot;
95mod statistics;
96
97impl SourceRender for MySqlSourceConnection {
98 type Time = GtidPartition;
99
100 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
101
102 fn render<'scope>(
105 self,
106 scope: Scope<'scope, GtidPartition>,
107 config: &RawSourceCreationConfig,
108 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
109 _start_signal: impl std::future::Future<Output = ()> + 'static,
110 ) -> (
111 BTreeMap<
112 GlobalId,
113 StackedCollection<'scope, GtidPartition, Result<SourceMessage, DataflowError>>,
114 >,
115 StreamVec<'scope, GtidPartition, HealthStatusMessage>,
116 StreamVec<'scope, GtidPartition, Probe<GtidPartition>>,
117 Vec<PressOnDropButton>,
118 ) {
119 let mut source_outputs = Vec::new();
121 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
122 let SourceExport {
123 details,
124 storage_metadata: _,
125 data_config: _,
126 } = export;
127 let details = match details {
128 SourceExportDetails::MySql(details) => details,
129 SourceExportDetails::None => continue,
131 _ => panic!("unexpected source export details: {:?}", details),
132 };
133
134 let desc = details.table.clone();
135 let initial_gtid_set = details.initial_gtid_set.to_string();
136 let resume_upper = Antichain::from_iter(
137 config
138 .source_resume_uppers
139 .get(id)
140 .expect("missing resume upper")
141 .iter()
142 .map(GtidPartition::decode_row),
143 );
144 let name = MySqlTableName::new(&desc.schema_name, &desc.name);
145 source_outputs.push(SourceOutputInfo {
146 output_index: idx,
147 table_name: name.clone(),
148 desc,
149 text_columns: details.text_columns.clone(),
150 exclude_columns: details.exclude_columns.clone(),
151 initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
152 resume_upper,
153 export_id: id.clone(),
154 });
155 }
156
157 let metrics = config.metrics.get_mysql_source_metrics(config.id);
158
159 let (snapshot_updates, rewinds, snapshot_err, snapshot_token) = snapshot::render(
160 scope.clone(),
161 config.clone(),
162 self.clone(),
163 source_outputs.clone(),
164 metrics.snapshot_metrics.clone(),
165 );
166
167 let (repl_updates, repl_err, repl_token) = replication::render(
168 scope.clone(),
169 config.clone(),
170 self.clone(),
171 source_outputs,
172 rewinds,
173 metrics,
174 );
175
176 let (stats_err, probe_stream, stats_token) = statistics::render(
177 scope.clone(),
178 config.clone(),
179 self,
180 resume_uppers,
181 snapshot_err.clone().concat(repl_err.clone()),
182 );
183
184 let updates = snapshot_updates.concat(repl_updates);
185 let partition_count = u64::cast_from(config.source_exports.len());
186 let data_streams: Vec<_> = updates
187 .inner
188 .partition::<CapacityContainerBuilder<_>, _, _>(
189 partition_count,
190 |((output, data), time, diff): (
191 (usize, Result<SourceMessage, DataflowError>),
192 _,
193 Diff,
194 )| {
195 let output = u64::cast_from(output);
196 (output, (data, time, diff))
197 },
198 );
199 let mut data_collections = BTreeMap::new();
200 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
201 data_collections.insert(*id, data_stream.as_collection());
202 }
203
204 let export_ids = config.source_exports.keys().copied();
205 let health_init = export_ids
206 .map(Some)
207 .chain(std::iter::once(None))
208 .map(|id| HealthStatusMessage {
209 id,
210 namespace: Self::STATUS_NAMESPACE,
211 update: HealthStatusUpdate::Running,
212 })
213 .collect::<Vec<_>>()
214 .to_stream(scope);
215
216 let health_errs = snapshot_err
217 .concat(repl_err)
218 .concat(stats_err)
219 .map(move |err| {
220 let err_string = err.display_with_causes().to_string();
222 let update = HealthStatusUpdate::halting(err_string.clone(), None);
223
224 let namespace = match err {
225 ReplicationError::Transient(err)
226 if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
227 {
228 StatusNamespace::Ssh
229 }
230 _ => Self::STATUS_NAMESPACE,
231 };
232
233 HealthStatusMessage {
234 id: None,
235 namespace: namespace.clone(),
236 update,
237 }
238 });
239 let health = health_init.concat(health_errs);
240
241 (
242 data_collections,
243 health,
244 probe_stream,
245 vec![snapshot_token, repl_token, stats_token],
246 )
247 }
248}
249
250#[derive(Clone, Debug)]
251struct SourceOutputInfo {
252 output_index: usize,
253 table_name: MySqlTableName,
254 desc: MySqlTableDesc,
255 text_columns: Vec<String>,
256 exclude_columns: Vec<String>,
257 initial_gtid_set: Antichain<GtidPartition>,
258 resume_upper: Antichain<GtidPartition>,
259 export_id: GlobalId,
260}
261
262#[derive(Clone, Debug, thiserror::Error)]
263pub enum ReplicationError {
264 #[error(transparent)]
265 Transient(#[from] Rc<TransientError>),
266 #[error(transparent)]
267 Definite(#[from] Rc<DefiniteError>),
268}
269
270#[derive(Debug, thiserror::Error)]
272pub enum TransientError {
273 #[error("couldn't decode binlog row")]
274 BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
275 #[error("stream ended prematurely")]
276 ReplicationEOF,
277 #[error(transparent)]
278 IoError(#[from] io::Error),
279 #[error("sql client error")]
280 SQLClient(#[from] mysql_async::Error),
281 #[error("ident decode error")]
282 IdentError(#[from] mz_sql_parser::ast::IdentError),
283 #[error(transparent)]
284 MySqlError(#[from] MySqlError),
285 #[error(transparent)]
286 Generic(#[from] anyhow::Error),
287}
288
289#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
291pub enum DefiniteError {
292 #[error("unable to decode: {0}")]
293 ValueDecodeError(String),
294 #[error("table was truncated: {0}")]
295 TableTruncated(String),
296 #[error("table was dropped: {0}")]
297 TableDropped(String),
298 #[error("incompatible schema change: {0}")]
299 IncompatibleSchema(String),
300 #[error("received a gtid set from the server that violates our requirements: {0}")]
301 UnsupportedGtidState(String),
302 #[error("received out of order gtids for source {0} at transaction-id {1}")]
303 BinlogGtidMonotonicityViolation(String, GtidState),
304 #[error("mysql server does not have the binlog available at the requested gtid set")]
305 BinlogNotAvailable,
306 #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
307 BinlogMissingResumePoint(String, String),
308 #[error("mysql server configuration: {0}")]
309 ServerConfigurationError(String),
310}
311
312impl From<DefiniteError> for DataflowError {
313 fn from(err: DefiniteError) -> Self {
314 let m = err.to_string().into();
315 DataflowError::SourceError(Box::new(SourceError {
316 error: match &err {
317 DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
318 DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
319 DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
320 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
321 DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
322 DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
323 SourceErrorDetails::Other(m)
324 }
325 DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
326 DefiniteError::BinlogMissingResumePoint(_, _) => {
327 SourceErrorDetails::Initialization(m)
328 }
329 DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
330 },
331 }))
332 }
333}
334
335#[derive(
339 Debug,
340 Clone,
341 PartialEq,
342 Eq,
343 PartialOrd,
344 Ord,
345 Serialize,
346 Deserialize,
347 Hash
348)]
349pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
350
351impl MySqlTableName {
352 pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
353 Self(schema_name.to_string(), table_name.to_string())
354 }
355}
356
357impl fmt::Display for MySqlTableName {
358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359 write!(
360 f,
361 "{}.{}",
362 quote_identifier(&self.0),
363 quote_identifier(&self.1)
364 )
365 }
366}
367
368impl From<&MySqlTableDesc> for MySqlTableName {
369 fn from(desc: &MySqlTableDesc) -> Self {
370 Self::new(&desc.schema_name, &desc.name)
371 }
372}
373
374#[derive(Debug, Clone, Deserialize, Serialize)]
375pub(crate) struct RewindRequest {
376 pub(crate) output_index: usize,
378 pub(crate) snapshot_upper: Antichain<GtidPartition>,
381}
382
383type StackedAsyncOutputHandle<T, D> =
384 AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<(D, T, Diff)>>>>;
385
386async fn return_definite_error(
387 err: DefiniteError,
388 outputs: &[usize],
389 data_handle: &StackedAsyncOutputHandle<
390 GtidPartition,
391 (usize, Result<SourceMessage, DataflowError>),
392 >,
393 data_cap_set: &CapabilitySet<GtidPartition>,
394 definite_error_handle: &AsyncOutputHandle<
395 GtidPartition,
396 CapacityContainerBuilder<Vec<ReplicationError>>,
397 >,
398 definite_error_cap_set: &CapabilitySet<GtidPartition>,
399) {
400 tracing::warn!("Returning definite error: {err}");
401 for output_index in outputs {
402 let update = (
403 (*output_index, Err(err.clone().into())),
404 GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
405 Diff::ONE,
406 );
407 let size = update.fuel_size();
408 data_handle
409 .give_fueled(&data_cap_set[0], update, size)
410 .await;
411 }
412 definite_error_handle.give(
413 &definite_error_cap_set[0],
414 ReplicationError::Definite(Rc::new(err)),
415 );
416 ()
417}
418
419async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
420 ensure_gtid_consistency(conn).await?;
421 ensure_full_row_binlog_format(conn).await?;
422 ensure_replication_commit_order(conn).await?;
423
424 Ok(())
425}