mz_storage/source/
mysql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`MySqlSourceConnection`].
11//!
12//! This dataflow is split into Snapshot and Replication operators.
13//!
14//! # Snapshot
15//!
16//! The snapshot operator is responsible for taking a consistent snapshot of the tables involved
17//! in the ingestion from the MySQL server. Each table that is being ingested is snapshot is
18//! assigned a specific worker, which performs a `SELECT * FROM table` and emits updates for all
19//! the rows in the given table.
20//!
21//! For all tables that are snapshotted the snapshot operator also emits a rewind request to
22//! the replication operator containing the GTID-set based frontier which will be used to
23//! ensure that the requested portion of the replication stream is subtracted from the snapshot.
24//!
25//! See the [snapshot] module for more information.
26//!
27//! # Replication
28//!
29//! The replication operator is responsible for ingesting the MySQL replication stream which must
30//! happen from a single worker.
31//!
32//! See the [replication] module for more information.
33//!
34//! # Error handling
35//!
36//! There are two kinds of errors that can happen during ingestion that are represented as two
37//! separate error types:
38//!
39//! [`DefiniteError`]s are errors that happen during processing of a specific collection record.
40//! These are the only errors that can ever end up in the error collection of a subsource.
41//!
42//! [`TransientError`]s are any errors that can happen for reasons that are unrelated to the data
43//! itself. This could be authentication failures, connection failures, etc. The only operators
44//! that can emit such errors are the `MySqlReplicationReader` and the `MySqlSnapshotReader`
45//! operators, which are the ones that talk to the external world. Both of these operators are
46//! built with the `AsyncOperatorBuilder::build_fallible` method which allows transient errors
47//! to be propagated upwards with the standard `?` operator without risking downgrading the
48//! capability and producing bogus frontiers.
49//!
50//! The error streams from both of those operators are published to the source status and also
51//! trigger a restart of the dataflow.
52
53use std::collections::BTreeMap;
54use std::convert::Infallible;
55use std::fmt;
56use std::io;
57use std::rc::Rc;
58
59use differential_dataflow::AsCollection;
60use differential_dataflow::containers::TimelyStack;
61use itertools::Itertools;
62use mz_mysql_util::quote_identifier;
63use mz_ore::cast::CastFrom;
64use mz_repr::Diff;
65use mz_repr::GlobalId;
66use mz_storage_types::errors::{DataflowError, SourceError};
67use mz_storage_types::sources::SourceExport;
68use mz_timely_util::containers::stack::AccountedStackBuilder;
69use serde::{Deserialize, Serialize};
70use timely::container::CapacityContainerBuilder;
71use timely::dataflow::channels::pushers::Tee;
72use timely::dataflow::operators::core::Partition;
73use timely::dataflow::operators::{CapabilitySet, Concat, Map, ToStream};
74use timely::dataflow::{Scope, Stream};
75use timely::progress::Antichain;
76use uuid::Uuid;
77
78use mz_mysql_util::{
79    MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
80    ensure_replication_commit_order,
81};
82use mz_ore::error::ErrorExt;
83use mz_storage_types::errors::SourceErrorDetails;
84use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
85use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
86use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
87use mz_timely_util::order::Extrema;
88
89use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
90use crate::source::types::Probe;
91use crate::source::types::{ProgressStatisticsUpdate, SourceRender, StackedCollection};
92use crate::source::{RawSourceCreationConfig, SourceMessage};
93
94mod replication;
95mod schemas;
96mod snapshot;
97mod statistics;
98
99impl SourceRender for MySqlSourceConnection {
100    type Time = GtidPartition;
101
102    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
103
104    /// Render the ingestion dataflow. This function only connects things together and contains no
105    /// actual processing logic.
106    fn render<G: Scope<Timestamp = GtidPartition>>(
107        self,
108        scope: &mut G,
109        config: &RawSourceCreationConfig,
110        resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
111        _start_signal: impl std::future::Future<Output = ()> + 'static,
112    ) -> (
113        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
114        Stream<G, Infallible>,
115        Stream<G, HealthStatusMessage>,
116        Stream<G, ProgressStatisticsUpdate>,
117        Option<Stream<G, Probe<GtidPartition>>>,
118        Vec<PressOnDropButton>,
119    ) {
120        // Collect the source outputs that we will be exporting.
121        let mut source_outputs = Vec::new();
122        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
123            let SourceExport {
124                details,
125                storage_metadata: _,
126                data_config: _,
127            } = export;
128            let details = match details {
129                SourceExportDetails::MySql(details) => details,
130                // This is an export that doesn't need any data output to it.
131                SourceExportDetails::None => continue,
132                _ => panic!("unexpected source export details: {:?}", details),
133            };
134
135            let desc = details.table.clone();
136            let initial_gtid_set = details.initial_gtid_set.to_string();
137            let resume_upper = Antichain::from_iter(
138                config
139                    .source_resume_uppers
140                    .get(id)
141                    .expect("missing resume upper")
142                    .iter()
143                    .map(GtidPartition::decode_row),
144            );
145            let name = MySqlTableName::new(&desc.schema_name, &desc.name);
146            source_outputs.push(SourceOutputInfo {
147                output_index: idx,
148                table_name: name.clone(),
149                desc,
150                text_columns: details.text_columns.clone(),
151                exclude_columns: details.exclude_columns.clone(),
152                initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
153                resume_upper,
154            });
155        }
156
157        let metrics = config.metrics.get_mysql_source_metrics(config.id);
158
159        let (snapshot_updates, rewinds, snapshot_stats, snapshot_err, snapshot_token) =
160            snapshot::render(
161                scope.clone(),
162                config.clone(),
163                self.clone(),
164                source_outputs.clone(),
165                metrics.snapshot_metrics.clone(),
166            );
167
168        let (repl_updates, uppers, repl_err, repl_token) = replication::render(
169            scope.clone(),
170            config.clone(),
171            self.clone(),
172            source_outputs,
173            &rewinds,
174            metrics,
175        );
176
177        let (stats_stream, stats_err, probe_stream, stats_token) =
178            statistics::render(scope.clone(), config.clone(), self, resume_uppers);
179
180        let stats_stream = stats_stream.concat(&snapshot_stats);
181
182        let updates = snapshot_updates.concat(&repl_updates);
183        let partition_count = u64::cast_from(config.source_exports.len());
184        let data_streams: Vec<_> = updates
185            .inner
186            .partition::<CapacityContainerBuilder<_>, _, _>(
187                partition_count,
188                |((output, data), time, diff): &(
189                    (usize, Result<SourceMessage, DataflowError>),
190                    _,
191                    Diff,
192                )| {
193                    let output = u64::cast_from(*output);
194                    (output, (data.clone(), time.clone(), diff.clone()))
195                },
196            );
197        let mut data_collections = BTreeMap::new();
198        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
199            data_collections.insert(*id, data_stream.as_collection());
200        }
201
202        let health_init = std::iter::once(HealthStatusMessage {
203            id: None,
204            namespace: Self::STATUS_NAMESPACE,
205            update: HealthStatusUpdate::Running,
206        })
207        .to_stream(scope);
208
209        let health_errs = snapshot_err
210            .concat(&repl_err)
211            .concat(&stats_err)
212            .map(move |err| {
213                // This update will cause the dataflow to restart
214                let err_string = err.display_with_causes().to_string();
215                let update = HealthStatusUpdate::halting(err_string.clone(), None);
216
217                let namespace = match err {
218                    ReplicationError::Transient(err)
219                        if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
220                    {
221                        StatusNamespace::Ssh
222                    }
223                    _ => Self::STATUS_NAMESPACE,
224                };
225
226                HealthStatusMessage {
227                    id: None,
228                    namespace: namespace.clone(),
229                    update,
230                }
231            });
232        let health = health_init.concat(&health_errs);
233
234        (
235            data_collections,
236            uppers,
237            health,
238            stats_stream,
239            Some(probe_stream),
240            vec![snapshot_token, repl_token, stats_token],
241        )
242    }
243}
244
245#[derive(Clone, Debug)]
246struct SourceOutputInfo {
247    output_index: usize,
248    table_name: MySqlTableName,
249    desc: MySqlTableDesc,
250    text_columns: Vec<String>,
251    exclude_columns: Vec<String>,
252    initial_gtid_set: Antichain<GtidPartition>,
253    resume_upper: Antichain<GtidPartition>,
254}
255
256#[derive(Clone, Debug, thiserror::Error)]
257pub enum ReplicationError {
258    #[error(transparent)]
259    Transient(#[from] Rc<TransientError>),
260    #[error(transparent)]
261    Definite(#[from] Rc<DefiniteError>),
262}
263
264/// A transient error that never ends up in the collection of a specific table.
265#[derive(Debug, thiserror::Error)]
266pub enum TransientError {
267    #[error("couldn't decode binlog row")]
268    BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
269    #[error("stream ended prematurely")]
270    ReplicationEOF,
271    #[error(transparent)]
272    IoError(#[from] io::Error),
273    #[error("sql client error")]
274    SQLClient(#[from] mysql_async::Error),
275    #[error("ident decode error")]
276    IdentError(#[from] mz_sql_parser::ast::IdentError),
277    #[error(transparent)]
278    MySqlError(#[from] MySqlError),
279    #[error(transparent)]
280    Generic(#[from] anyhow::Error),
281}
282
283/// A definite error that always ends up in the collection of a specific table.
284#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
285pub enum DefiniteError {
286    #[error("unable to decode: {0}")]
287    ValueDecodeError(String),
288    #[error("table was truncated: {0}")]
289    TableTruncated(String),
290    #[error("table was dropped: {0}")]
291    TableDropped(String),
292    #[error("incompatible schema change: {0}")]
293    IncompatibleSchema(String),
294    #[error("received a gtid set from the server that violates our requirements: {0}")]
295    UnsupportedGtidState(String),
296    #[error("received out of order gtids for source {0} at transaction-id {1}")]
297    BinlogGtidMonotonicityViolation(String, GtidState),
298    #[error("mysql server does not have the binlog available at the requested gtid set")]
299    BinlogNotAvailable,
300    #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
301    BinlogMissingResumePoint(String, String),
302    #[error("mysql server configuration: {0}")]
303    ServerConfigurationError(String),
304}
305
306impl From<DefiniteError> for DataflowError {
307    fn from(err: DefiniteError) -> Self {
308        let m = err.to_string().into();
309        DataflowError::SourceError(Box::new(SourceError {
310            error: match &err {
311                DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
312                DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
313                DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
314                DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
315                DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
316                DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
317                    SourceErrorDetails::Other(m)
318                }
319                DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
320                DefiniteError::BinlogMissingResumePoint(_, _) => {
321                    SourceErrorDetails::Initialization(m)
322                }
323                DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
324            },
325        }))
326    }
327}
328
329/// A reference to a MySQL table. (schema_name, table_name)
330/// NOTE: We do not use `mz_sql_parser::ast:UnresolvedItemName` because the serialization
331/// behavior is not what we need for mysql.
332#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
333pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
334
335impl MySqlTableName {
336    pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
337        Self(schema_name.to_string(), table_name.to_string())
338    }
339}
340
341impl fmt::Display for MySqlTableName {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        write!(
344            f,
345            "{}.{}",
346            quote_identifier(&self.0),
347            quote_identifier(&self.1)
348        )
349    }
350}
351
352impl From<&MySqlTableDesc> for MySqlTableName {
353    fn from(desc: &MySqlTableDesc) -> Self {
354        Self::new(&desc.schema_name, &desc.name)
355    }
356}
357
358#[derive(Debug, Clone, Deserialize, Serialize)]
359pub(crate) struct RewindRequest {
360    /// The output index that should be rewound.
361    pub(crate) output_index: usize,
362    /// The frontier of GTIDs that this snapshot represents; all GTIDs that are not beyond this
363    /// frontier have been committed by the snapshot operator at timestamp 0.
364    pub(crate) snapshot_upper: Antichain<GtidPartition>,
365}
366
367type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
368    T,
369    AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
370    Tee<T, TimelyStack<(D, T, Diff)>>,
371>;
372
373async fn return_definite_error(
374    err: DefiniteError,
375    outputs: &[usize],
376    data_handle: &StackedAsyncOutputHandle<
377        GtidPartition,
378        (usize, Result<SourceMessage, DataflowError>),
379    >,
380    data_cap_set: &CapabilitySet<GtidPartition>,
381    definite_error_handle: &AsyncOutputHandle<
382        GtidPartition,
383        CapacityContainerBuilder<Vec<ReplicationError>>,
384        Tee<GtidPartition, Vec<ReplicationError>>,
385    >,
386    definite_error_cap_set: &CapabilitySet<GtidPartition>,
387) {
388    for output_index in outputs {
389        let update = (
390            (*output_index, Err(err.clone().into())),
391            GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
392            Diff::ONE,
393        );
394        data_handle.give_fueled(&data_cap_set[0], update).await;
395    }
396    definite_error_handle.give(
397        &definite_error_cap_set[0],
398        ReplicationError::Definite(Rc::new(err)),
399    );
400    ()
401}
402
403async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
404    ensure_gtid_consistency(conn).await?;
405    ensure_full_row_binlog_format(conn).await?;
406    ensure_replication_commit_order(conn).await?;
407
408    Ok(())
409}