1use std::collections::BTreeMap;
54use std::fmt;
55use std::io;
56use std::rc::Rc;
57
58use differential_dataflow::AsCollection;
59use itertools::Itertools;
60use mz_mysql_util::quote_identifier;
61use mz_ore::cast::CastFrom;
62use mz_repr::Diff;
63use mz_repr::GlobalId;
64use mz_storage_types::errors::{DataflowError, SourceError};
65use mz_storage_types::sources::SourceExport;
66use mz_timely_util::columnation::ColumnationStack;
67use mz_timely_util::containers::stack::AccountedStackBuilder;
68use serde::{Deserialize, Serialize};
69use timely::container::CapacityContainerBuilder;
70use timely::dataflow::operators::core::Partition;
71use timely::dataflow::operators::vec::{Map, ToStream};
72use timely::dataflow::operators::{CapabilitySet, Concat};
73use timely::dataflow::{Scope, StreamVec};
74use timely::progress::Antichain;
75use uuid::Uuid;
76
77use mz_mysql_util::{
78 MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
79 ensure_replication_commit_order,
80};
81use mz_ore::error::ErrorExt;
82use mz_storage_types::errors::SourceErrorDetails;
83use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
84use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
85use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
86use mz_timely_util::order::Extrema;
87
88use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
89use crate::source::types::Probe;
90use crate::source::types::{SourceRender, StackedCollection};
91use crate::source::{RawSourceCreationConfig, SourceMessage};
92
93mod replication;
94mod schemas;
95mod snapshot;
96mod statistics;
97
98impl SourceRender for MySqlSourceConnection {
99 type Time = GtidPartition;
100
101 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
102
103 fn render<'scope>(
106 self,
107 scope: Scope<'scope, GtidPartition>,
108 config: &RawSourceCreationConfig,
109 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
110 _start_signal: impl std::future::Future<Output = ()> + 'static,
111 ) -> (
112 BTreeMap<
113 GlobalId,
114 StackedCollection<'scope, GtidPartition, Result<SourceMessage, DataflowError>>,
115 >,
116 StreamVec<'scope, GtidPartition, HealthStatusMessage>,
117 StreamVec<'scope, GtidPartition, Probe<GtidPartition>>,
118 Vec<PressOnDropButton>,
119 ) {
120 let mut source_outputs = Vec::new();
122 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
123 let SourceExport {
124 details,
125 storage_metadata: _,
126 data_config: _,
127 } = export;
128 let details = match details {
129 SourceExportDetails::MySql(details) => details,
130 SourceExportDetails::None => continue,
132 _ => panic!("unexpected source export details: {:?}", details),
133 };
134
135 let desc = details.table.clone();
136 let initial_gtid_set = details.initial_gtid_set.to_string();
137 let resume_upper = Antichain::from_iter(
138 config
139 .source_resume_uppers
140 .get(id)
141 .expect("missing resume upper")
142 .iter()
143 .map(GtidPartition::decode_row),
144 );
145 let name = MySqlTableName::new(&desc.schema_name, &desc.name);
146 source_outputs.push(SourceOutputInfo {
147 output_index: idx,
148 table_name: name.clone(),
149 desc,
150 text_columns: details.text_columns.clone(),
151 exclude_columns: details.exclude_columns.clone(),
152 initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
153 resume_upper,
154 export_id: id.clone(),
155 });
156 }
157
158 let metrics = config.metrics.get_mysql_source_metrics(config.id);
159
160 let (snapshot_updates, rewinds, snapshot_err, snapshot_token) = snapshot::render(
161 scope.clone(),
162 config.clone(),
163 self.clone(),
164 source_outputs.clone(),
165 metrics.snapshot_metrics.clone(),
166 );
167
168 let (repl_updates, repl_err, repl_token) = replication::render(
169 scope.clone(),
170 config.clone(),
171 self.clone(),
172 source_outputs,
173 rewinds,
174 metrics,
175 );
176
177 let (stats_err, probe_stream, stats_token) = statistics::render(
178 scope.clone(),
179 config.clone(),
180 self,
181 resume_uppers,
182 snapshot_err.clone().concat(repl_err.clone()),
183 );
184
185 let updates = snapshot_updates.concat(repl_updates);
186 let partition_count = u64::cast_from(config.source_exports.len());
187 let data_streams: Vec<_> = updates
188 .inner
189 .partition::<CapacityContainerBuilder<_>, _, _>(
190 partition_count,
191 |((output, data), time, diff): &(
192 (usize, Result<SourceMessage, DataflowError>),
193 _,
194 Diff,
195 )| {
196 let output = u64::cast_from(*output);
197 (output, (data.clone(), time.clone(), diff.clone()))
198 },
199 );
200 let mut data_collections = BTreeMap::new();
201 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
202 data_collections.insert(*id, data_stream.as_collection());
203 }
204
205 let export_ids = config.source_exports.keys().copied();
206 let health_init = export_ids
207 .map(Some)
208 .chain(std::iter::once(None))
209 .map(|id| HealthStatusMessage {
210 id,
211 namespace: Self::STATUS_NAMESPACE,
212 update: HealthStatusUpdate::Running,
213 })
214 .collect::<Vec<_>>()
215 .to_stream(scope);
216
217 let health_errs = snapshot_err
218 .concat(repl_err)
219 .concat(stats_err)
220 .map(move |err| {
221 let err_string = err.display_with_causes().to_string();
223 let update = HealthStatusUpdate::halting(err_string.clone(), None);
224
225 let namespace = match err {
226 ReplicationError::Transient(err)
227 if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
228 {
229 StatusNamespace::Ssh
230 }
231 _ => Self::STATUS_NAMESPACE,
232 };
233
234 HealthStatusMessage {
235 id: None,
236 namespace: namespace.clone(),
237 update,
238 }
239 });
240 let health = health_init.concat(health_errs);
241
242 (
243 data_collections,
244 health,
245 probe_stream,
246 vec![snapshot_token, repl_token, stats_token],
247 )
248 }
249}
250
251#[derive(Clone, Debug)]
252struct SourceOutputInfo {
253 output_index: usize,
254 table_name: MySqlTableName,
255 desc: MySqlTableDesc,
256 text_columns: Vec<String>,
257 exclude_columns: Vec<String>,
258 initial_gtid_set: Antichain<GtidPartition>,
259 resume_upper: Antichain<GtidPartition>,
260 export_id: GlobalId,
261}
262
263#[derive(Clone, Debug, thiserror::Error)]
264pub enum ReplicationError {
265 #[error(transparent)]
266 Transient(#[from] Rc<TransientError>),
267 #[error(transparent)]
268 Definite(#[from] Rc<DefiniteError>),
269}
270
271#[derive(Debug, thiserror::Error)]
273pub enum TransientError {
274 #[error("couldn't decode binlog row")]
275 BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
276 #[error("stream ended prematurely")]
277 ReplicationEOF,
278 #[error(transparent)]
279 IoError(#[from] io::Error),
280 #[error("sql client error")]
281 SQLClient(#[from] mysql_async::Error),
282 #[error("ident decode error")]
283 IdentError(#[from] mz_sql_parser::ast::IdentError),
284 #[error(transparent)]
285 MySqlError(#[from] MySqlError),
286 #[error(transparent)]
287 Generic(#[from] anyhow::Error),
288}
289
290#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
292pub enum DefiniteError {
293 #[error("unable to decode: {0}")]
294 ValueDecodeError(String),
295 #[error("table was truncated: {0}")]
296 TableTruncated(String),
297 #[error("table was dropped: {0}")]
298 TableDropped(String),
299 #[error("incompatible schema change: {0}")]
300 IncompatibleSchema(String),
301 #[error("received a gtid set from the server that violates our requirements: {0}")]
302 UnsupportedGtidState(String),
303 #[error("received out of order gtids for source {0} at transaction-id {1}")]
304 BinlogGtidMonotonicityViolation(String, GtidState),
305 #[error("mysql server does not have the binlog available at the requested gtid set")]
306 BinlogNotAvailable,
307 #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
308 BinlogMissingResumePoint(String, String),
309 #[error("mysql server configuration: {0}")]
310 ServerConfigurationError(String),
311}
312
313impl From<DefiniteError> for DataflowError {
314 fn from(err: DefiniteError) -> Self {
315 let m = err.to_string().into();
316 DataflowError::SourceError(Box::new(SourceError {
317 error: match &err {
318 DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
319 DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
320 DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
321 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
322 DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
323 DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
324 SourceErrorDetails::Other(m)
325 }
326 DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
327 DefiniteError::BinlogMissingResumePoint(_, _) => {
328 SourceErrorDetails::Initialization(m)
329 }
330 DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
331 },
332 }))
333 }
334}
335
336#[derive(
340 Debug,
341 Clone,
342 PartialEq,
343 Eq,
344 PartialOrd,
345 Ord,
346 Serialize,
347 Deserialize,
348 Hash
349)]
350pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
351
352impl MySqlTableName {
353 pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
354 Self(schema_name.to_string(), table_name.to_string())
355 }
356}
357
358impl fmt::Display for MySqlTableName {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 write!(
361 f,
362 "{}.{}",
363 quote_identifier(&self.0),
364 quote_identifier(&self.1)
365 )
366 }
367}
368
369impl From<&MySqlTableDesc> for MySqlTableName {
370 fn from(desc: &MySqlTableDesc) -> Self {
371 Self::new(&desc.schema_name, &desc.name)
372 }
373}
374
375#[derive(Debug, Clone, Deserialize, Serialize)]
376pub(crate) struct RewindRequest {
377 pub(crate) output_index: usize,
379 pub(crate) snapshot_upper: Antichain<GtidPartition>,
382}
383
384type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
385 T,
386 AccountedStackBuilder<CapacityContainerBuilder<ColumnationStack<(D, T, Diff)>>>,
387>;
388
389async fn return_definite_error(
390 err: DefiniteError,
391 outputs: &[usize],
392 data_handle: &StackedAsyncOutputHandle<
393 GtidPartition,
394 (usize, Result<SourceMessage, DataflowError>),
395 >,
396 data_cap_set: &CapabilitySet<GtidPartition>,
397 definite_error_handle: &AsyncOutputHandle<
398 GtidPartition,
399 CapacityContainerBuilder<Vec<ReplicationError>>,
400 >,
401 definite_error_cap_set: &CapabilitySet<GtidPartition>,
402) {
403 tracing::warn!("Returning definite error: {err}");
404 for output_index in outputs {
405 let update = (
406 (*output_index, Err(err.clone().into())),
407 GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
408 Diff::ONE,
409 );
410 data_handle.give_fueled(&data_cap_set[0], update).await;
411 }
412 definite_error_handle.give(
413 &definite_error_cap_set[0],
414 ReplicationError::Definite(Rc::new(err)),
415 );
416 ()
417}
418
419async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
420 ensure_gtid_consistency(conn).await?;
421 ensure_full_row_binlog_format(conn).await?;
422 ensure_replication_commit_order(conn).await?;
423
424 Ok(())
425}