1use std::collections::BTreeMap;
54use std::fmt;
55use std::io;
56use std::rc::Rc;
57
58use differential_dataflow::AsCollection;
59use differential_dataflow::containers::TimelyStack;
60use itertools::Itertools;
61use mz_mysql_util::quote_identifier;
62use mz_ore::cast::CastFrom;
63use mz_repr::Diff;
64use mz_repr::GlobalId;
65use mz_storage_types::errors::{DataflowError, SourceError};
66use mz_storage_types::sources::SourceExport;
67use mz_timely_util::containers::stack::AccountedStackBuilder;
68use serde::{Deserialize, Serialize};
69use timely::container::CapacityContainerBuilder;
70use timely::dataflow::operators::core::Partition;
71use timely::dataflow::operators::vec::{Map, ToStream};
72use timely::dataflow::operators::{CapabilitySet, Concat};
73use timely::dataflow::{Scope, StreamVec};
74use timely::progress::Antichain;
75use uuid::Uuid;
76
77use mz_mysql_util::{
78 MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
79 ensure_replication_commit_order,
80};
81use mz_ore::error::ErrorExt;
82use mz_storage_types::errors::SourceErrorDetails;
83use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
84use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
85use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
86use mz_timely_util::order::Extrema;
87
88use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
89use crate::source::types::Probe;
90use crate::source::types::{SourceRender, StackedCollection};
91use crate::source::{RawSourceCreationConfig, SourceMessage};
92
93mod replication;
94mod schemas;
95mod snapshot;
96mod statistics;
97
98impl SourceRender for MySqlSourceConnection {
99 type Time = GtidPartition;
100
101 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
102
103 fn render<G: Scope<Timestamp = GtidPartition>>(
106 self,
107 scope: &mut G,
108 config: &RawSourceCreationConfig,
109 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
110 _start_signal: impl std::future::Future<Output = ()> + 'static,
111 ) -> (
112 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
113 StreamVec<G, HealthStatusMessage>,
114 StreamVec<G, Probe<GtidPartition>>,
115 Vec<PressOnDropButton>,
116 ) {
117 let mut source_outputs = Vec::new();
119 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
120 let SourceExport {
121 details,
122 storage_metadata: _,
123 data_config: _,
124 } = export;
125 let details = match details {
126 SourceExportDetails::MySql(details) => details,
127 SourceExportDetails::None => continue,
129 _ => panic!("unexpected source export details: {:?}", details),
130 };
131
132 let desc = details.table.clone();
133 let initial_gtid_set = details.initial_gtid_set.to_string();
134 let resume_upper = Antichain::from_iter(
135 config
136 .source_resume_uppers
137 .get(id)
138 .expect("missing resume upper")
139 .iter()
140 .map(GtidPartition::decode_row),
141 );
142 let name = MySqlTableName::new(&desc.schema_name, &desc.name);
143 source_outputs.push(SourceOutputInfo {
144 output_index: idx,
145 table_name: name.clone(),
146 desc,
147 text_columns: details.text_columns.clone(),
148 exclude_columns: details.exclude_columns.clone(),
149 initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
150 resume_upper,
151 export_id: id.clone(),
152 });
153 }
154
155 let metrics = config.metrics.get_mysql_source_metrics(config.id);
156
157 let (snapshot_updates, rewinds, snapshot_err, snapshot_token) = snapshot::render(
158 scope.clone(),
159 config.clone(),
160 self.clone(),
161 source_outputs.clone(),
162 metrics.snapshot_metrics.clone(),
163 );
164
165 let (repl_updates, repl_err, repl_token) = replication::render(
166 scope.clone(),
167 config.clone(),
168 self.clone(),
169 source_outputs,
170 rewinds,
171 metrics,
172 );
173
174 let (stats_err, probe_stream, stats_token) = statistics::render(
175 scope.clone(),
176 config.clone(),
177 self,
178 resume_uppers,
179 snapshot_err.clone().concat(repl_err.clone()),
180 );
181
182 let updates = snapshot_updates.concat(repl_updates);
183 let partition_count = u64::cast_from(config.source_exports.len());
184 let data_streams: Vec<_> = updates
185 .inner
186 .partition::<CapacityContainerBuilder<_>, _, _>(
187 partition_count,
188 |((output, data), time, diff): &(
189 (usize, Result<SourceMessage, DataflowError>),
190 _,
191 Diff,
192 )| {
193 let output = u64::cast_from(*output);
194 (output, (data.clone(), time.clone(), diff.clone()))
195 },
196 );
197 let mut data_collections = BTreeMap::new();
198 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
199 data_collections.insert(*id, data_stream.as_collection());
200 }
201
202 let export_ids = config.source_exports.keys().copied();
203 let health_init = export_ids
204 .map(Some)
205 .chain(std::iter::once(None))
206 .map(|id| HealthStatusMessage {
207 id,
208 namespace: Self::STATUS_NAMESPACE,
209 update: HealthStatusUpdate::Running,
210 })
211 .collect::<Vec<_>>()
212 .to_stream(scope);
213
214 let health_errs = snapshot_err
215 .concat(repl_err)
216 .concat(stats_err)
217 .map(move |err| {
218 let err_string = err.display_with_causes().to_string();
220 let update = HealthStatusUpdate::halting(err_string.clone(), None);
221
222 let namespace = match err {
223 ReplicationError::Transient(err)
224 if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
225 {
226 StatusNamespace::Ssh
227 }
228 _ => Self::STATUS_NAMESPACE,
229 };
230
231 HealthStatusMessage {
232 id: None,
233 namespace: namespace.clone(),
234 update,
235 }
236 });
237 let health = health_init.concat(health_errs);
238
239 (
240 data_collections,
241 health,
242 probe_stream,
243 vec![snapshot_token, repl_token, stats_token],
244 )
245 }
246}
247
248#[derive(Clone, Debug)]
249struct SourceOutputInfo {
250 output_index: usize,
251 table_name: MySqlTableName,
252 desc: MySqlTableDesc,
253 text_columns: Vec<String>,
254 exclude_columns: Vec<String>,
255 initial_gtid_set: Antichain<GtidPartition>,
256 resume_upper: Antichain<GtidPartition>,
257 export_id: GlobalId,
258}
259
260#[derive(Clone, Debug, thiserror::Error)]
261pub enum ReplicationError {
262 #[error(transparent)]
263 Transient(#[from] Rc<TransientError>),
264 #[error(transparent)]
265 Definite(#[from] Rc<DefiniteError>),
266}
267
268#[derive(Debug, thiserror::Error)]
270pub enum TransientError {
271 #[error("couldn't decode binlog row")]
272 BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
273 #[error("stream ended prematurely")]
274 ReplicationEOF,
275 #[error(transparent)]
276 IoError(#[from] io::Error),
277 #[error("sql client error")]
278 SQLClient(#[from] mysql_async::Error),
279 #[error("ident decode error")]
280 IdentError(#[from] mz_sql_parser::ast::IdentError),
281 #[error(transparent)]
282 MySqlError(#[from] MySqlError),
283 #[error(transparent)]
284 Generic(#[from] anyhow::Error),
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
289pub enum DefiniteError {
290 #[error("unable to decode: {0}")]
291 ValueDecodeError(String),
292 #[error("table was truncated: {0}")]
293 TableTruncated(String),
294 #[error("table was dropped: {0}")]
295 TableDropped(String),
296 #[error("incompatible schema change: {0}")]
297 IncompatibleSchema(String),
298 #[error("received a gtid set from the server that violates our requirements: {0}")]
299 UnsupportedGtidState(String),
300 #[error("received out of order gtids for source {0} at transaction-id {1}")]
301 BinlogGtidMonotonicityViolation(String, GtidState),
302 #[error("mysql server does not have the binlog available at the requested gtid set")]
303 BinlogNotAvailable,
304 #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
305 BinlogMissingResumePoint(String, String),
306 #[error("mysql server configuration: {0}")]
307 ServerConfigurationError(String),
308}
309
310impl From<DefiniteError> for DataflowError {
311 fn from(err: DefiniteError) -> Self {
312 let m = err.to_string().into();
313 DataflowError::SourceError(Box::new(SourceError {
314 error: match &err {
315 DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
316 DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
317 DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
318 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
319 DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
320 DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
321 SourceErrorDetails::Other(m)
322 }
323 DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
324 DefiniteError::BinlogMissingResumePoint(_, _) => {
325 SourceErrorDetails::Initialization(m)
326 }
327 DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
328 },
329 }))
330 }
331}
332
333#[derive(
337 Debug,
338 Clone,
339 PartialEq,
340 Eq,
341 PartialOrd,
342 Ord,
343 Serialize,
344 Deserialize,
345 Hash
346)]
347pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
348
349impl MySqlTableName {
350 pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
351 Self(schema_name.to_string(), table_name.to_string())
352 }
353}
354
355impl fmt::Display for MySqlTableName {
356 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
357 write!(
358 f,
359 "{}.{}",
360 quote_identifier(&self.0),
361 quote_identifier(&self.1)
362 )
363 }
364}
365
366impl From<&MySqlTableDesc> for MySqlTableName {
367 fn from(desc: &MySqlTableDesc) -> Self {
368 Self::new(&desc.schema_name, &desc.name)
369 }
370}
371
372#[derive(Debug, Clone, Deserialize, Serialize)]
373pub(crate) struct RewindRequest {
374 pub(crate) output_index: usize,
376 pub(crate) snapshot_upper: Antichain<GtidPartition>,
379}
380
381type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
382 T,
383 AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
384>;
385
386async fn return_definite_error(
387 err: DefiniteError,
388 outputs: &[usize],
389 data_handle: &StackedAsyncOutputHandle<
390 GtidPartition,
391 (usize, Result<SourceMessage, DataflowError>),
392 >,
393 data_cap_set: &CapabilitySet<GtidPartition>,
394 definite_error_handle: &AsyncOutputHandle<
395 GtidPartition,
396 CapacityContainerBuilder<Vec<ReplicationError>>,
397 >,
398 definite_error_cap_set: &CapabilitySet<GtidPartition>,
399) {
400 tracing::warn!("Returning definite error: {err}");
401 for output_index in outputs {
402 let update = (
403 (*output_index, Err(err.clone().into())),
404 GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
405 Diff::ONE,
406 );
407 data_handle.give_fueled(&data_cap_set[0], update).await;
408 }
409 definite_error_handle.give(
410 &definite_error_cap_set[0],
411 ReplicationError::Definite(Rc::new(err)),
412 );
413 ()
414}
415
416async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
417 ensure_gtid_consistency(conn).await?;
418 ensure_full_row_binlog_format(conn).await?;
419 ensure_replication_commit_order(conn).await?;
420
421 Ok(())
422}