Skip to main content

mz_storage/source/
mysql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`MySqlSourceConnection`].
11//!
12//! This dataflow is split into Snapshot and Replication operators.
13//!
14//! # Snapshot
15//!
16//! The snapshot operator is responsible for taking a consistent snapshot of the tables involved
17//! in the ingestion from the MySQL server. Each table that is being ingested is snapshot is
18//! assigned a specific worker, which performs a `SELECT * FROM table` and emits updates for all
19//! the rows in the given table.
20//!
21//! For all tables that are snapshotted the snapshot operator also emits a rewind request to
22//! the replication operator containing the GTID-set based frontier which will be used to
23//! ensure that the requested portion of the replication stream is subtracted from the snapshot.
24//!
25//! See the [snapshot] module for more information.
26//!
27//! # Replication
28//!
29//! The replication operator is responsible for ingesting the MySQL replication stream which must
30//! happen from a single worker.
31//!
32//! See the [replication] module for more information.
33//!
34//! # Error handling
35//!
36//! There are two kinds of errors that can happen during ingestion that are represented as two
37//! separate error types:
38//!
39//! [`DefiniteError`]s are errors that happen during processing of a specific collection record.
40//! These are the only errors that can ever end up in the error collection of a subsource.
41//!
42//! [`TransientError`]s are any errors that can happen for reasons that are unrelated to the data
43//! itself. This could be authentication failures, connection failures, etc. The only operators
44//! that can emit such errors are the `MySqlReplicationReader` and the `MySqlSnapshotReader`
45//! operators, which are the ones that talk to the external world. Both of these operators are
46//! built with the `AsyncOperatorBuilder::build_fallible` method which allows transient errors
47//! to be propagated upwards with the standard `?` operator without risking downgrading the
48//! capability and producing bogus frontiers.
49//!
50//! The error streams from both of those operators are published to the source status and also
51//! trigger a restart of the dataflow.
52
53use std::collections::BTreeMap;
54use std::fmt;
55use std::io;
56use std::rc::Rc;
57
58use differential_dataflow::AsCollection;
59use itertools::Itertools;
60use mz_mysql_util::quote_identifier;
61use mz_ore::cast::CastFrom;
62use mz_repr::Diff;
63use mz_repr::GlobalId;
64use mz_storage_types::errors::{DataflowError, SourceError};
65use mz_storage_types::sources::SourceExport;
66use mz_timely_util::containers::stack::FueledBuilder;
67use serde::{Deserialize, Serialize};
68use timely::container::CapacityContainerBuilder;
69use timely::dataflow::operators::core::Partition;
70use timely::dataflow::operators::vec::{Map, ToStream};
71use timely::dataflow::operators::{CapabilitySet, Concat};
72use timely::dataflow::{Scope, StreamVec};
73use timely::progress::Antichain;
74use uuid::Uuid;
75
76use mz_mysql_util::{
77    MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
78    ensure_replication_commit_order,
79};
80use mz_ore::error::ErrorExt;
81use mz_storage_types::errors::SourceErrorDetails;
82use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
83use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
84use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
85use mz_timely_util::order::Extrema;
86
87use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
88use crate::source::types::Probe;
89use crate::source::types::{FuelSize, SourceRender, StackedCollection};
90use crate::source::{RawSourceCreationConfig, SourceMessage};
91
92mod replication;
93mod schemas;
94mod snapshot;
95mod statistics;
96
97impl SourceRender for MySqlSourceConnection {
98    type Time = GtidPartition;
99
100    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
101
102    /// Render the ingestion dataflow. This function only connects things together and contains no
103    /// actual processing logic.
104    fn render<'scope>(
105        self,
106        scope: Scope<'scope, GtidPartition>,
107        config: &RawSourceCreationConfig,
108        resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
109        _start_signal: impl std::future::Future<Output = ()> + 'static,
110    ) -> (
111        BTreeMap<
112            GlobalId,
113            StackedCollection<'scope, GtidPartition, Result<SourceMessage, DataflowError>>,
114        >,
115        StreamVec<'scope, GtidPartition, HealthStatusMessage>,
116        StreamVec<'scope, GtidPartition, Probe<GtidPartition>>,
117        Vec<PressOnDropButton>,
118    ) {
119        // Collect the source outputs that we will be exporting.
120        let mut source_outputs = Vec::new();
121        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
122            let SourceExport {
123                details,
124                storage_metadata: _,
125                data_config: _,
126            } = export;
127            let details = match details {
128                SourceExportDetails::MySql(details) => details,
129                // This is an export that doesn't need any data output to it.
130                SourceExportDetails::None => continue,
131                _ => panic!("unexpected source export details: {:?}", details),
132            };
133
134            let desc = details.table.clone();
135            let initial_gtid_set = details.initial_gtid_set.to_string();
136            let resume_upper = Antichain::from_iter(
137                config
138                    .source_resume_uppers
139                    .get(id)
140                    .expect("missing resume upper")
141                    .iter()
142                    .map(GtidPartition::decode_row),
143            );
144            let name = MySqlTableName::new(&desc.schema_name, &desc.name);
145            source_outputs.push(SourceOutputInfo {
146                output_index: idx,
147                table_name: name.clone(),
148                desc,
149                text_columns: details.text_columns.clone(),
150                exclude_columns: details.exclude_columns.clone(),
151                initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
152                resume_upper,
153                export_id: id.clone(),
154                binlog_full_metadata: details.binlog_full_metadata,
155            });
156        }
157
158        let metrics = config.metrics.get_mysql_source_metrics(config.id);
159
160        let (snapshot_updates, rewinds, snapshot_err, snapshot_token) = snapshot::render(
161            scope.clone(),
162            config.clone(),
163            self.clone(),
164            source_outputs.clone(),
165            metrics.snapshot_metrics.clone(),
166        );
167
168        let (repl_updates, repl_err, repl_token) = replication::render(
169            scope.clone(),
170            config.clone(),
171            self.clone(),
172            source_outputs,
173            rewinds,
174            metrics,
175        );
176
177        let (stats_err, probe_stream, stats_token) = statistics::render(
178            scope.clone(),
179            config.clone(),
180            self,
181            resume_uppers,
182            snapshot_err.clone().concat(repl_err.clone()),
183        );
184
185        let updates = snapshot_updates.concat(repl_updates);
186        let partition_count = u64::cast_from(config.source_exports.len());
187        let data_streams: Vec<_> = updates
188            .inner
189            .partition::<CapacityContainerBuilder<_>, _, _>(
190                partition_count,
191                |((output, data), time, diff): (
192                    (usize, Result<SourceMessage, DataflowError>),
193                    _,
194                    Diff,
195                )| {
196                    let output = u64::cast_from(output);
197                    (output, (data, time, diff))
198                },
199            );
200        let mut data_collections = BTreeMap::new();
201        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
202            data_collections.insert(*id, data_stream.as_collection());
203        }
204
205        let export_ids = config.source_exports.keys().copied();
206        let health_init = export_ids
207            .map(Some)
208            .chain(std::iter::once(None))
209            .map(|id| HealthStatusMessage {
210                id,
211                namespace: Self::STATUS_NAMESPACE,
212                update: HealthStatusUpdate::Running,
213            })
214            .collect::<Vec<_>>()
215            .to_stream(scope);
216
217        let health_errs = snapshot_err
218            .concat(repl_err)
219            .concat(stats_err)
220            .map(move |err| {
221                // This update will cause the dataflow to restart
222                let err_string = err.display_with_causes().to_string();
223                let update = HealthStatusUpdate::halting(err_string.clone(), None);
224
225                let namespace = match err {
226                    ReplicationError::Transient(err)
227                        if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
228                    {
229                        StatusNamespace::Ssh
230                    }
231                    _ => Self::STATUS_NAMESPACE,
232                };
233
234                HealthStatusMessage {
235                    id: None,
236                    namespace: namespace.clone(),
237                    update,
238                }
239            });
240        let health = health_init.concat(health_errs);
241
242        (
243            data_collections,
244            health,
245            probe_stream,
246            vec![snapshot_token, repl_token, stats_token],
247        )
248    }
249}
250
251#[derive(Clone, Debug)]
252struct SourceOutputInfo {
253    output_index: usize,
254    table_name: MySqlTableName,
255    desc: MySqlTableDesc,
256    text_columns: Vec<String>,
257    exclude_columns: Vec<String>,
258    initial_gtid_set: Antichain<GtidPartition>,
259    resume_upper: Antichain<GtidPartition>,
260    export_id: GlobalId,
261    binlog_full_metadata: bool,
262}
263
264#[derive(Clone, Debug, thiserror::Error)]
265pub enum ReplicationError {
266    #[error(transparent)]
267    Transient(#[from] Rc<TransientError>),
268    #[error(transparent)]
269    Definite(#[from] Rc<DefiniteError>),
270}
271
272/// A transient error that never ends up in the collection of a specific table.
273#[derive(Debug, thiserror::Error)]
274pub enum TransientError {
275    #[error("couldn't decode binlog row")]
276    BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
277    #[error("stream ended prematurely")]
278    ReplicationEOF,
279    #[error(transparent)]
280    IoError(#[from] io::Error),
281    #[error("sql client error")]
282    SQLClient(#[from] mysql_async::Error),
283    #[error("ident decode error")]
284    IdentError(#[from] mz_sql_parser::ast::IdentError),
285    #[error(transparent)]
286    MySqlError(#[from] MySqlError),
287    #[error(transparent)]
288    Generic(#[from] anyhow::Error),
289}
290
291/// A definite error that always ends up in the collection of a specific table.
292#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
293pub enum DefiniteError {
294    #[error("unable to decode: {0}")]
295    ValueDecodeError(String),
296    #[error("table was truncated: {0}")]
297    TableTruncated(String),
298    #[error("table was dropped: {0}")]
299    TableDropped(String),
300    #[error("incompatible schema change: {0}")]
301    IncompatibleSchema(String),
302    #[error("received a gtid set from the server that violates our requirements: {0}")]
303    UnsupportedGtidState(String),
304    #[error("received out of order gtids for source {0} at transaction-id {1}")]
305    BinlogGtidMonotonicityViolation(String, GtidState),
306    #[error("mysql server does not have the binlog available at the requested gtid set")]
307    BinlogNotAvailable,
308    #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
309    BinlogMissingResumePoint(String, String),
310    #[error("mysql server configuration: {0}")]
311    ServerConfigurationError(String),
312}
313
314impl From<DefiniteError> for DataflowError {
315    fn from(err: DefiniteError) -> Self {
316        let m = err.to_string().into();
317        DataflowError::SourceError(Box::new(SourceError {
318            error: match &err {
319                DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
320                DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
321                DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
322                DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
323                DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
324                DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
325                    SourceErrorDetails::Other(m)
326                }
327                DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
328                DefiniteError::BinlogMissingResumePoint(_, _) => {
329                    SourceErrorDetails::Initialization(m)
330                }
331                DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
332            },
333        }))
334    }
335}
336
337/// A reference to a MySQL table. (schema_name, table_name)
338/// NOTE: We do not use `mz_sql_parser::ast:UnresolvedItemName` because the serialization
339/// behavior is not what we need for mysql.
340#[derive(
341    Debug,
342    Clone,
343    PartialEq,
344    Eq,
345    PartialOrd,
346    Ord,
347    Serialize,
348    Deserialize,
349    Hash
350)]
351pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
352
353impl MySqlTableName {
354    pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
355        Self(schema_name.to_string(), table_name.to_string())
356    }
357}
358
359impl fmt::Display for MySqlTableName {
360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        write!(
362            f,
363            "{}.{}",
364            quote_identifier(&self.0),
365            quote_identifier(&self.1)
366        )
367    }
368}
369
370impl From<&MySqlTableDesc> for MySqlTableName {
371    fn from(desc: &MySqlTableDesc) -> Self {
372        Self::new(&desc.schema_name, &desc.name)
373    }
374}
375
376#[derive(Debug, Clone, Deserialize, Serialize)]
377pub(crate) struct RewindRequest {
378    /// The output index that should be rewound.
379    pub(crate) output_index: usize,
380    /// The frontier of GTIDs that this snapshot represents; all GTIDs that are not beyond this
381    /// frontier have been committed by the snapshot operator at timestamp 0.
382    pub(crate) snapshot_upper: Antichain<GtidPartition>,
383}
384
385type StackedAsyncOutputHandle<T, D> =
386    AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<(D, T, Diff)>>>>;
387
388async fn return_definite_error(
389    err: DefiniteError,
390    outputs: &[usize],
391    data_handle: &StackedAsyncOutputHandle<
392        GtidPartition,
393        (usize, Result<SourceMessage, DataflowError>),
394    >,
395    data_cap_set: &CapabilitySet<GtidPartition>,
396    definite_error_handle: &AsyncOutputHandle<
397        GtidPartition,
398        CapacityContainerBuilder<Vec<ReplicationError>>,
399    >,
400    definite_error_cap_set: &CapabilitySet<GtidPartition>,
401) {
402    tracing::warn!("Returning definite error: {err}");
403    for output_index in outputs {
404        let update = (
405            (*output_index, Err(err.clone().into())),
406            GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
407            Diff::ONE,
408        );
409        let size = update.fuel_size();
410        data_handle
411            .give_fueled(&data_cap_set[0], update, size)
412            .await;
413    }
414    definite_error_handle.give(
415        &definite_error_cap_set[0],
416        ReplicationError::Definite(Rc::new(err)),
417    );
418    ()
419}
420
421async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
422    ensure_gtid_consistency(conn).await?;
423    ensure_full_row_binlog_format(conn).await?;
424    ensure_replication_commit_order(conn).await?;
425
426    Ok(())
427}