1use std::collections::BTreeMap;
54use std::convert::Infallible;
55use std::fmt;
56use std::io;
57use std::rc::Rc;
58
59use differential_dataflow::AsCollection;
60use differential_dataflow::containers::TimelyStack;
61use itertools::Itertools;
62use mz_mysql_util::quote_identifier;
63use mz_ore::cast::CastFrom;
64use mz_repr::Diff;
65use mz_repr::GlobalId;
66use mz_storage_types::errors::{DataflowError, SourceError};
67use mz_storage_types::sources::SourceExport;
68use mz_timely_util::containers::stack::AccountedStackBuilder;
69use serde::{Deserialize, Serialize};
70use timely::container::CapacityContainerBuilder;
71use timely::dataflow::operators::core::Partition;
72use timely::dataflow::operators::{CapabilitySet, Concat, Map, ToStream};
73use timely::dataflow::{Scope, Stream};
74use timely::progress::Antichain;
75use uuid::Uuid;
76
77use mz_mysql_util::{
78 MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
79 ensure_replication_commit_order,
80};
81use mz_ore::error::ErrorExt;
82use mz_storage_types::errors::SourceErrorDetails;
83use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
84use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
85use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
86use mz_timely_util::order::Extrema;
87
88use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
89use crate::source::types::Probe;
90use crate::source::types::{SourceRender, StackedCollection};
91use crate::source::{RawSourceCreationConfig, SourceMessage};
92
93mod replication;
94mod schemas;
95mod snapshot;
96mod statistics;
97
98impl SourceRender for MySqlSourceConnection {
99 type Time = GtidPartition;
100
101 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
102
103 fn render<G: Scope<Timestamp = GtidPartition>>(
106 self,
107 scope: &mut G,
108 config: &RawSourceCreationConfig,
109 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
110 _start_signal: impl std::future::Future<Output = ()> + 'static,
111 ) -> (
112 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
113 Stream<G, Infallible>,
114 Stream<G, HealthStatusMessage>,
115 Option<Stream<G, Probe<GtidPartition>>>,
116 Vec<PressOnDropButton>,
117 ) {
118 let mut source_outputs = Vec::new();
120 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
121 let SourceExport {
122 details,
123 storage_metadata: _,
124 data_config: _,
125 } = export;
126 let details = match details {
127 SourceExportDetails::MySql(details) => details,
128 SourceExportDetails::None => continue,
130 _ => panic!("unexpected source export details: {:?}", details),
131 };
132
133 let desc = details.table.clone();
134 let initial_gtid_set = details.initial_gtid_set.to_string();
135 let resume_upper = Antichain::from_iter(
136 config
137 .source_resume_uppers
138 .get(id)
139 .expect("missing resume upper")
140 .iter()
141 .map(GtidPartition::decode_row),
142 );
143 let name = MySqlTableName::new(&desc.schema_name, &desc.name);
144 source_outputs.push(SourceOutputInfo {
145 output_index: idx,
146 table_name: name.clone(),
147 desc,
148 text_columns: details.text_columns.clone(),
149 exclude_columns: details.exclude_columns.clone(),
150 initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
151 resume_upper,
152 export_id: id.clone(),
153 });
154 }
155
156 let metrics = config.metrics.get_mysql_source_metrics(config.id);
157
158 let (snapshot_updates, rewinds, snapshot_err, snapshot_token) = snapshot::render(
159 scope.clone(),
160 config.clone(),
161 self.clone(),
162 source_outputs.clone(),
163 metrics.snapshot_metrics.clone(),
164 );
165
166 let (repl_updates, uppers, repl_err, repl_token) = replication::render(
167 scope.clone(),
168 config.clone(),
169 self.clone(),
170 source_outputs,
171 &rewinds,
172 metrics,
173 );
174
175 let (stats_err, probe_stream, stats_token) =
176 statistics::render(scope.clone(), config.clone(), self, resume_uppers);
177
178 let updates = snapshot_updates.concat(&repl_updates);
179 let partition_count = u64::cast_from(config.source_exports.len());
180 let data_streams: Vec<_> = updates
181 .inner
182 .partition::<CapacityContainerBuilder<_>, _, _>(
183 partition_count,
184 |((output, data), time, diff): &(
185 (usize, Result<SourceMessage, DataflowError>),
186 _,
187 Diff,
188 )| {
189 let output = u64::cast_from(*output);
190 (output, (data.clone(), time.clone(), diff.clone()))
191 },
192 );
193 let mut data_collections = BTreeMap::new();
194 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
195 data_collections.insert(*id, data_stream.as_collection());
196 }
197
198 let export_ids = config.source_exports.keys().copied();
199 let health_init = export_ids
200 .map(Some)
201 .chain(std::iter::once(None))
202 .map(|id| HealthStatusMessage {
203 id,
204 namespace: Self::STATUS_NAMESPACE,
205 update: HealthStatusUpdate::Running,
206 })
207 .collect::<Vec<_>>()
208 .to_stream(scope);
209
210 let health_errs = snapshot_err
211 .concat(&repl_err)
212 .concat(&stats_err)
213 .map(move |err| {
214 let err_string = err.display_with_causes().to_string();
216 let update = HealthStatusUpdate::halting(err_string.clone(), None);
217
218 let namespace = match err {
219 ReplicationError::Transient(err)
220 if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
221 {
222 StatusNamespace::Ssh
223 }
224 _ => Self::STATUS_NAMESPACE,
225 };
226
227 HealthStatusMessage {
228 id: None,
229 namespace: namespace.clone(),
230 update,
231 }
232 });
233 let health = health_init.concat(&health_errs);
234
235 (
236 data_collections,
237 uppers,
238 health,
239 Some(probe_stream),
240 vec![snapshot_token, repl_token, stats_token],
241 )
242 }
243}
244
245#[derive(Clone, Debug)]
246struct SourceOutputInfo {
247 output_index: usize,
248 table_name: MySqlTableName,
249 desc: MySqlTableDesc,
250 text_columns: Vec<String>,
251 exclude_columns: Vec<String>,
252 initial_gtid_set: Antichain<GtidPartition>,
253 resume_upper: Antichain<GtidPartition>,
254 export_id: GlobalId,
255}
256
257#[derive(Clone, Debug, thiserror::Error)]
258pub enum ReplicationError {
259 #[error(transparent)]
260 Transient(#[from] Rc<TransientError>),
261 #[error(transparent)]
262 Definite(#[from] Rc<DefiniteError>),
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum TransientError {
268 #[error("couldn't decode binlog row")]
269 BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
270 #[error("stream ended prematurely")]
271 ReplicationEOF,
272 #[error(transparent)]
273 IoError(#[from] io::Error),
274 #[error("sql client error")]
275 SQLClient(#[from] mysql_async::Error),
276 #[error("ident decode error")]
277 IdentError(#[from] mz_sql_parser::ast::IdentError),
278 #[error(transparent)]
279 MySqlError(#[from] MySqlError),
280 #[error(transparent)]
281 Generic(#[from] anyhow::Error),
282}
283
284#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
286pub enum DefiniteError {
287 #[error("unable to decode: {0}")]
288 ValueDecodeError(String),
289 #[error("table was truncated: {0}")]
290 TableTruncated(String),
291 #[error("table was dropped: {0}")]
292 TableDropped(String),
293 #[error("incompatible schema change: {0}")]
294 IncompatibleSchema(String),
295 #[error("received a gtid set from the server that violates our requirements: {0}")]
296 UnsupportedGtidState(String),
297 #[error("received out of order gtids for source {0} at transaction-id {1}")]
298 BinlogGtidMonotonicityViolation(String, GtidState),
299 #[error("mysql server does not have the binlog available at the requested gtid set")]
300 BinlogNotAvailable,
301 #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
302 BinlogMissingResumePoint(String, String),
303 #[error("mysql server configuration: {0}")]
304 ServerConfigurationError(String),
305}
306
307impl From<DefiniteError> for DataflowError {
308 fn from(err: DefiniteError) -> Self {
309 let m = err.to_string().into();
310 DataflowError::SourceError(Box::new(SourceError {
311 error: match &err {
312 DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
313 DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
314 DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
315 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
316 DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
317 DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
318 SourceErrorDetails::Other(m)
319 }
320 DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
321 DefiniteError::BinlogMissingResumePoint(_, _) => {
322 SourceErrorDetails::Initialization(m)
323 }
324 DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
325 },
326 }))
327 }
328}
329
330#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
334pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
335
336impl MySqlTableName {
337 pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
338 Self(schema_name.to_string(), table_name.to_string())
339 }
340}
341
342impl fmt::Display for MySqlTableName {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 write!(
345 f,
346 "{}.{}",
347 quote_identifier(&self.0),
348 quote_identifier(&self.1)
349 )
350 }
351}
352
353impl From<&MySqlTableDesc> for MySqlTableName {
354 fn from(desc: &MySqlTableDesc) -> Self {
355 Self::new(&desc.schema_name, &desc.name)
356 }
357}
358
359#[derive(Debug, Clone, Deserialize, Serialize)]
360pub(crate) struct RewindRequest {
361 pub(crate) output_index: usize,
363 pub(crate) snapshot_upper: Antichain<GtidPartition>,
366}
367
368type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
369 T,
370 AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
371>;
372
373async fn return_definite_error(
374 err: DefiniteError,
375 outputs: &[usize],
376 data_handle: &StackedAsyncOutputHandle<
377 GtidPartition,
378 (usize, Result<SourceMessage, DataflowError>),
379 >,
380 data_cap_set: &CapabilitySet<GtidPartition>,
381 definite_error_handle: &AsyncOutputHandle<
382 GtidPartition,
383 CapacityContainerBuilder<Vec<ReplicationError>>,
384 >,
385 definite_error_cap_set: &CapabilitySet<GtidPartition>,
386) {
387 for output_index in outputs {
388 let update = (
389 (*output_index, Err(err.clone().into())),
390 GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
391 Diff::ONE,
392 );
393 data_handle.give_fueled(&data_cap_set[0], update).await;
394 }
395 definite_error_handle.give(
396 &definite_error_cap_set[0],
397 ReplicationError::Definite(Rc::new(err)),
398 );
399 ()
400}
401
402async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
403 ensure_gtid_consistency(conn).await?;
404 ensure_full_row_binlog_format(conn).await?;
405 ensure_replication_commit_order(conn).await?;
406
407 Ok(())
408}