1use std::collections::BTreeMap;
54use std::convert::Infallible;
55use std::fmt;
56use std::io;
57use std::rc::Rc;
58
59use differential_dataflow::AsCollection;
60use differential_dataflow::containers::TimelyStack;
61use itertools::Itertools;
62use mz_mysql_util::quote_identifier;
63use mz_ore::cast::CastFrom;
64use mz_repr::Diff;
65use mz_repr::GlobalId;
66use mz_storage_types::errors::{DataflowError, SourceError};
67use mz_storage_types::sources::SourceExport;
68use mz_timely_util::containers::stack::AccountedStackBuilder;
69use serde::{Deserialize, Serialize};
70use timely::container::CapacityContainerBuilder;
71use timely::dataflow::channels::pushers::Tee;
72use timely::dataflow::operators::core::Partition;
73use timely::dataflow::operators::{CapabilitySet, Concat, Map, ToStream};
74use timely::dataflow::{Scope, Stream};
75use timely::progress::Antichain;
76use uuid::Uuid;
77
78use mz_mysql_util::{
79    MySqlError, MySqlTableDesc, ensure_full_row_binlog_format, ensure_gtid_consistency,
80    ensure_replication_commit_order,
81};
82use mz_ore::error::ErrorExt;
83use mz_storage_types::errors::SourceErrorDetails;
84use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
85use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
86use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
87use mz_timely_util::order::Extrema;
88
89use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
90use crate::source::types::Probe;
91use crate::source::types::{SourceRender, StackedCollection};
92use crate::source::{RawSourceCreationConfig, SourceMessage};
93
94mod replication;
95mod schemas;
96mod snapshot;
97mod statistics;
98
99impl SourceRender for MySqlSourceConnection {
100    type Time = GtidPartition;
101
102    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
103
104    fn render<G: Scope<Timestamp = GtidPartition>>(
107        self,
108        scope: &mut G,
109        config: &RawSourceCreationConfig,
110        resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
111        _start_signal: impl std::future::Future<Output = ()> + 'static,
112    ) -> (
113        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
114        Stream<G, Infallible>,
115        Stream<G, HealthStatusMessage>,
116        Option<Stream<G, Probe<GtidPartition>>>,
117        Vec<PressOnDropButton>,
118    ) {
119        let mut source_outputs = Vec::new();
121        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
122            let SourceExport {
123                details,
124                storage_metadata: _,
125                data_config: _,
126            } = export;
127            let details = match details {
128                SourceExportDetails::MySql(details) => details,
129                SourceExportDetails::None => continue,
131                _ => panic!("unexpected source export details: {:?}", details),
132            };
133
134            let desc = details.table.clone();
135            let initial_gtid_set = details.initial_gtid_set.to_string();
136            let resume_upper = Antichain::from_iter(
137                config
138                    .source_resume_uppers
139                    .get(id)
140                    .expect("missing resume upper")
141                    .iter()
142                    .map(GtidPartition::decode_row),
143            );
144            let name = MySqlTableName::new(&desc.schema_name, &desc.name);
145            source_outputs.push(SourceOutputInfo {
146                output_index: idx,
147                table_name: name.clone(),
148                desc,
149                text_columns: details.text_columns.clone(),
150                exclude_columns: details.exclude_columns.clone(),
151                initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
152                resume_upper,
153                export_id: id.clone(),
154            });
155        }
156
157        let metrics = config.metrics.get_mysql_source_metrics(config.id);
158
159        let (snapshot_updates, rewinds, snapshot_err, snapshot_token) = snapshot::render(
160            scope.clone(),
161            config.clone(),
162            self.clone(),
163            source_outputs.clone(),
164            metrics.snapshot_metrics.clone(),
165        );
166
167        let (repl_updates, uppers, repl_err, repl_token) = replication::render(
168            scope.clone(),
169            config.clone(),
170            self.clone(),
171            source_outputs,
172            &rewinds,
173            metrics,
174        );
175
176        let (stats_err, probe_stream, stats_token) =
177            statistics::render(scope.clone(), config.clone(), self, resume_uppers);
178
179        let updates = snapshot_updates.concat(&repl_updates);
180        let partition_count = u64::cast_from(config.source_exports.len());
181        let data_streams: Vec<_> = updates
182            .inner
183            .partition::<CapacityContainerBuilder<_>, _, _>(
184                partition_count,
185                |((output, data), time, diff): &(
186                    (usize, Result<SourceMessage, DataflowError>),
187                    _,
188                    Diff,
189                )| {
190                    let output = u64::cast_from(*output);
191                    (output, (data.clone(), time.clone(), diff.clone()))
192                },
193            );
194        let mut data_collections = BTreeMap::new();
195        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
196            data_collections.insert(*id, data_stream.as_collection());
197        }
198
199        let export_ids = config.source_exports.keys().copied();
200        let health_init = export_ids
201            .map(Some)
202            .chain(std::iter::once(None))
203            .map(|id| HealthStatusMessage {
204                id,
205                namespace: Self::STATUS_NAMESPACE,
206                update: HealthStatusUpdate::Running,
207            })
208            .collect::<Vec<_>>()
209            .to_stream(scope);
210
211        let health_errs = snapshot_err
212            .concat(&repl_err)
213            .concat(&stats_err)
214            .map(move |err| {
215                let err_string = err.display_with_causes().to_string();
217                let update = HealthStatusUpdate::halting(err_string.clone(), None);
218
219                let namespace = match err {
220                    ReplicationError::Transient(err)
221                        if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
222                    {
223                        StatusNamespace::Ssh
224                    }
225                    _ => Self::STATUS_NAMESPACE,
226                };
227
228                HealthStatusMessage {
229                    id: None,
230                    namespace: namespace.clone(),
231                    update,
232                }
233            });
234        let health = health_init.concat(&health_errs);
235
236        (
237            data_collections,
238            uppers,
239            health,
240            Some(probe_stream),
241            vec![snapshot_token, repl_token, stats_token],
242        )
243    }
244}
245
246#[derive(Clone, Debug)]
247struct SourceOutputInfo {
248    output_index: usize,
249    table_name: MySqlTableName,
250    desc: MySqlTableDesc,
251    text_columns: Vec<String>,
252    exclude_columns: Vec<String>,
253    initial_gtid_set: Antichain<GtidPartition>,
254    resume_upper: Antichain<GtidPartition>,
255    export_id: GlobalId,
256}
257
258#[derive(Clone, Debug, thiserror::Error)]
259pub enum ReplicationError {
260    #[error(transparent)]
261    Transient(#[from] Rc<TransientError>),
262    #[error(transparent)]
263    Definite(#[from] Rc<DefiniteError>),
264}
265
266#[derive(Debug, thiserror::Error)]
268pub enum TransientError {
269    #[error("couldn't decode binlog row")]
270    BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
271    #[error("stream ended prematurely")]
272    ReplicationEOF,
273    #[error(transparent)]
274    IoError(#[from] io::Error),
275    #[error("sql client error")]
276    SQLClient(#[from] mysql_async::Error),
277    #[error("ident decode error")]
278    IdentError(#[from] mz_sql_parser::ast::IdentError),
279    #[error(transparent)]
280    MySqlError(#[from] MySqlError),
281    #[error(transparent)]
282    Generic(#[from] anyhow::Error),
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
287pub enum DefiniteError {
288    #[error("unable to decode: {0}")]
289    ValueDecodeError(String),
290    #[error("table was truncated: {0}")]
291    TableTruncated(String),
292    #[error("table was dropped: {0}")]
293    TableDropped(String),
294    #[error("incompatible schema change: {0}")]
295    IncompatibleSchema(String),
296    #[error("received a gtid set from the server that violates our requirements: {0}")]
297    UnsupportedGtidState(String),
298    #[error("received out of order gtids for source {0} at transaction-id {1}")]
299    BinlogGtidMonotonicityViolation(String, GtidState),
300    #[error("mysql server does not have the binlog available at the requested gtid set")]
301    BinlogNotAvailable,
302    #[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
303    BinlogMissingResumePoint(String, String),
304    #[error("mysql server configuration: {0}")]
305    ServerConfigurationError(String),
306}
307
308impl From<DefiniteError> for DataflowError {
309    fn from(err: DefiniteError) -> Self {
310        let m = err.to_string().into();
311        DataflowError::SourceError(Box::new(SourceError {
312            error: match &err {
313                DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
314                DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
315                DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
316                DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
317                DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
318                DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
319                    SourceErrorDetails::Other(m)
320                }
321                DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
322                DefiniteError::BinlogMissingResumePoint(_, _) => {
323                    SourceErrorDetails::Initialization(m)
324                }
325                DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
326            },
327        }))
328    }
329}
330
331#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
335pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
336
337impl MySqlTableName {
338    pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
339        Self(schema_name.to_string(), table_name.to_string())
340    }
341}
342
343impl fmt::Display for MySqlTableName {
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345        write!(
346            f,
347            "{}.{}",
348            quote_identifier(&self.0),
349            quote_identifier(&self.1)
350        )
351    }
352}
353
354impl From<&MySqlTableDesc> for MySqlTableName {
355    fn from(desc: &MySqlTableDesc) -> Self {
356        Self::new(&desc.schema_name, &desc.name)
357    }
358}
359
360#[derive(Debug, Clone, Deserialize, Serialize)]
361pub(crate) struct RewindRequest {
362    pub(crate) output_index: usize,
364    pub(crate) snapshot_upper: Antichain<GtidPartition>,
367}
368
369type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
370    T,
371    AccountedStackBuilder<CapacityContainerBuilder<TimelyStack<(D, T, Diff)>>>,
372    Tee<T, TimelyStack<(D, T, Diff)>>,
373>;
374
375async fn return_definite_error(
376    err: DefiniteError,
377    outputs: &[usize],
378    data_handle: &StackedAsyncOutputHandle<
379        GtidPartition,
380        (usize, Result<SourceMessage, DataflowError>),
381    >,
382    data_cap_set: &CapabilitySet<GtidPartition>,
383    definite_error_handle: &AsyncOutputHandle<
384        GtidPartition,
385        CapacityContainerBuilder<Vec<ReplicationError>>,
386        Tee<GtidPartition, Vec<ReplicationError>>,
387    >,
388    definite_error_cap_set: &CapabilitySet<GtidPartition>,
389) {
390    for output_index in outputs {
391        let update = (
392            (*output_index, Err(err.clone().into())),
393            GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
394            Diff::ONE,
395        );
396        data_handle.give_fueled(&data_cap_set[0], update).await;
397    }
398    definite_error_handle.give(
399        &definite_error_cap_set[0],
400        ReplicationError::Definite(Rc::new(err)),
401    );
402    ()
403}
404
405async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
406    ensure_gtid_consistency(conn).await?;
407    ensure_full_row_binlog_format(conn).await?;
408    ensure_replication_commit_order(conn).await?;
409
410    Ok(())
411}