Skip to main content

mz_storage/source/mysql/
snapshot.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the table snapshot side of the [`MySqlSourceConnection`] dataflow.
11//!
12//! # Snapshot reading
13//!
14//! Depending on the `source_outputs resume_upper` parameters this dataflow decides which tables to
15//! snapshot and performs a simple `SELECT * FROM table` on them in order to get a snapshot.
16//! There are a few subtle points about this operation, described below.
17//!
18//! It is crucial for correctness that we always perform the snapshot of all tables at a specific
19//! point in time. This must be true even in the presence of restarts or partially committed
20//! snapshots. The consistent point that the snapshot must happen at is discovered and durably
21//! recorded during planning of the source and is exposed to this ingestion dataflow via the
22//! `initial_gtid_set` field in `MySqlSourceDetails`.
23//!
24//! Unfortunately MySQL does not provide an API to perform a transaction at a specific point in
25//! time. Instead, MySQL allows us to perform a snapshot of a table and let us know at which point
26//! in time the snapshot was taken. Using this information we can take a snapshot at an arbitrary
27//! point in time and then rewind it to the desired `initial_gtid_set` by "rewinding" it. These two
28//! phases are described in the following section.
29//!
30//! ## Producing a snapshot at a known point in time.
31//!
32//! Ideally we would like to start a transaction and ask MySQL to tell us the point in time this
33//! transaction is running at. As far as we know there isn't such API so we achieve this using
34//! table locks instead.
35//!
36//! The full set of tables that are meant to be snapshotted are partitioned among the workers. Each
37//! worker initiates a connection to the server and acquires a table lock on all the tables that
38//! have been assigned to it. By doing so we establish a moment in time where we know no writes are
39//! happening to the tables we are interested in. After the locks are taken each worker reads the
40//! current upper frontier (`snapshot_upper`) using the `@@gtid_executed` system variable. This
41//! frontier establishes an upper bound on any possible write to the tables of interest until the
42//! lock is released.
43//!
44//! Each worker now starts a transaction via a new connection with 'REPEATABLE READ' and
45//! 'CONSISTENT SNAPSHOT' semantics. Due to linearizability we know that this transaction's view of
46//! the database must some time `t_snapshot` such that `snapshot_upper <= t_snapshot`. We don't
47//! actually know the exact value of `t_snapshot` and it might be strictly greater than
48//! `snapshot_upper`. However, because this transaction will only be used to read the locked tables
49//! and we know that `snapshot_upper` is an upper bound on all the writes that have happened to
50//! them we can safely pretend that the transaction's `t_snapshot` is *equal* to `snapshot_upper`.
51//! We have therefore succeeded in starting a transaction at a known point in time!
52//!
53//! At this point it is safe for each worker to unlock the tables, since the transaction has
54//! established a point in time, and close the initial connection. Each worker can then read the
55//! snapshot of the tables it is responsible for and publish it downstream.
56//!
57//! TODO: Other software products hold the table lock for the duration of the snapshot, and some do
58//! not. We should figure out why and if we need to hold the lock longer. This may be because of a
59//! difference in how REPEATABLE READ works in some MySQL-compatible systems (e.g. Aurora MySQL).
60//!
61//! ## Rewinding the snapshot to a specific point in time.
62//!
63//! Having obtained a snapshot of a table at some `snapshot_upper` we are now tasked with
64//! transforming this snapshot into one at `initial_gtid_set`. In other words we have produced a
65//! snapshot containing all updates that happened at `t: !(snapshot_upper <= t)` but what we
66//! actually want is a snapshot containing all updates that happened at `t: !(initial_gtid <= t)`.
67//!
68//! If we assume that `initial_gtid_set <= snapshot_upper`, which is a fair assumption since the
69//! former is obtained before the latter, then we can observe that the snapshot we produced
70//! contains all updates at `t: !(initial_gtid <= t)` (i.e the snapshot we want) and some additional
71//! unwanted updates at `t: initial_gtid <= t && !(snapshot_upper <= t)`. We happen to know exactly
72//! what those additional unwanted updates are because those will be obtained by reading the
73//! replication stream in the replication operator and so all we need to do to "rewind" our
74//! `snapshot_upper` snapshot to `initial_gtid` is to ask the replication operator to "undo" any
75//! updates that falls in the undesirable region.
76//!
77//! This is exactly what `RewindRequest` is about. It informs the replication operator that a
78//! particular table has been snapshotted at `snapshot_upper` and would like all the updates
79//! discovered during replication that happen at `t: initial_gtid <= t && !(snapshot_upper <= t)`.
80//! to be cancelled. In Differential Dataflow this is as simple as flipping the sign of the diff
81//! field.
82//!
83//! The snapshot reader emits updates at the minimum timestamp (by convention) to allow the
84//! updates to be potentially negated by the replication operator, which will emit negated
85//! updates at the minimum timestamp (by convention) when it encounters rows from a table that
86//! occur before the GTID frontier in the Rewind Request for that table.
87use std::collections::{BTreeMap, BTreeSet};
88use std::rc::Rc;
89use std::sync::Arc;
90
91use differential_dataflow::AsCollection;
92use futures::TryStreamExt;
93use itertools::Itertools;
94use mysql_async::prelude::Queryable;
95use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts};
96use mz_mysql_util::{
97    ER_NO_SUCH_TABLE, MySqlError, pack_mysql_row, query_sys_var, quote_identifier,
98};
99use mz_ore::cast::CastFrom;
100use mz_ore::future::InTask;
101use mz_ore::iter::IteratorExt;
102use mz_ore::metrics::MetricsFutureExt;
103use mz_repr::{Diff, Row};
104use mz_storage_types::errors::DataflowError;
105use mz_storage_types::sources::MySqlSourceConnection;
106use mz_storage_types::sources::mysql::{GtidPartition, gtid_set_frontier};
107use mz_timely_util::antichain::AntichainExt;
108use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
109use mz_timely_util::containers::stack::FueledBuilder;
110use timely::container::CapacityContainerBuilder;
111use timely::dataflow::operators::core::Map;
112use timely::dataflow::operators::{CapabilitySet, Concat};
113use timely::dataflow::{Scope, StreamVec};
114use timely::progress::Timestamp;
115use tracing::{error, trace};
116
117use crate::metrics::source::mysql::MySqlSnapshotMetrics;
118use crate::source::RawSourceCreationConfig;
119use crate::source::types::{FuelSize, SignaledFuture, SourceMessage, StackedCollection};
120use crate::statistics::SourceStatistics;
121
122use super::schemas::verify_schemas;
123use super::{
124    DefiniteError, MySqlTableName, ReplicationError, RewindRequest, SourceOutputInfo,
125    TransientError, return_definite_error, validate_mysql_repl_settings,
126};
127
128/// Renders the snapshot dataflow. See the module documentation for more information.
129pub(crate) fn render<'scope>(
130    scope: Scope<'scope, GtidPartition>,
131    config: RawSourceCreationConfig,
132    connection: MySqlSourceConnection,
133    source_outputs: Vec<SourceOutputInfo>,
134    metrics: MySqlSnapshotMetrics,
135) -> (
136    StackedCollection<'scope, GtidPartition, (usize, Result<SourceMessage, DataflowError>)>,
137    StreamVec<'scope, GtidPartition, RewindRequest>,
138    StreamVec<'scope, GtidPartition, ReplicationError>,
139    PressOnDropButton,
140) {
141    let mut builder =
142        AsyncOperatorBuilder::new(format!("MySqlSnapshotReader({})", config.id), scope.clone());
143
144    let (raw_handle, raw_data) = builder.new_output::<FueledBuilder<_>>();
145    let (rewinds_handle, rewinds) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
146    // Captures DefiniteErrors that affect the entire source, including all outputs
147    let (definite_error_handle, definite_errors) =
148        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
149
150    // A global view of all outputs that will be snapshot by all workers.
151    let mut all_outputs = vec![];
152    // A map containing only the table infos that this worker should snapshot.
153    let mut reader_snapshot_table_info = BTreeMap::new();
154    // Maps MySQL table name to export `SourceStatistics`. Same info exists in reader_snapshot_table_info,
155    // but this avoids having to iterate + map each time the statistics are needed.
156    let mut export_statistics = BTreeMap::new();
157    for output in source_outputs.into_iter() {
158        // Determine which outputs need to be snapshot and which already have been.
159        if *output.resume_upper != [GtidPartition::minimum()] {
160            // Already has been snapshotted.
161            continue;
162        }
163        all_outputs.push(output.output_index);
164        if config.responsible_for(&output.table_name) {
165            let export_stats = config
166                .statistics
167                .get(&output.export_id)
168                .expect("statistics have been intialized")
169                .clone();
170            export_statistics
171                .entry(output.table_name.clone())
172                .or_insert_with(Vec::new)
173                .push(export_stats);
174
175            reader_snapshot_table_info
176                .entry(output.table_name.clone())
177                .or_insert_with(Vec::new)
178                .push(output);
179        }
180    }
181
182    let (button, transient_errors): (_, StreamVec<'scope, GtidPartition, Rc<TransientError>>) =
183        builder.build_fallible(move |caps| {
184            let busy_signal = Arc::clone(&config.busy_signal);
185            Box::pin(SignaledFuture::new(busy_signal, async move {
186                let [data_cap_set, rewind_cap_set, definite_error_cap_set]: &mut [_; 3] =
187                    caps.try_into().unwrap();
188
189                let id = config.id;
190                let worker_id = config.worker_id;
191
192                if !all_outputs.is_empty() {
193                    // A worker *must* emit a count even if not responsible for snapshotting a table
194                    // as statistic summarization will return null if any worker hasn't set a value.
195                    // This will also reset snapshot stats for any exports not snapshotting.
196                    for statistics in config.statistics.values() {
197                        statistics.set_snapshot_records_known(0);
198                        statistics.set_snapshot_records_staged(0);
199                    }
200                }
201
202                // If this worker has no tables to snapshot then there is nothing to do.
203                if reader_snapshot_table_info.is_empty() {
204                    trace!(%id, "timely-{worker_id} initializing table reader \
205                                 with no tables to snapshot, exiting");
206                    return Ok(());
207                } else {
208                    trace!(%id, "timely-{worker_id} initializing table reader \
209                                 with {} tables to snapshot",
210                           reader_snapshot_table_info.len());
211                }
212
213                let connection_config = connection
214                    .connection
215                    .config(
216                        &config.config.connection_context.secrets_reader,
217                        &config.config,
218                        InTask::Yes,
219                    )
220                    .await?;
221                let task_name = format!("timely-{worker_id} MySQL snapshotter");
222
223                let lock_clauses = reader_snapshot_table_info
224                    .keys()
225                    .map(|t| format!("{} READ", t))
226                    .collect::<Vec<String>>()
227                    .join(", ");
228                let mut lock_conn = connection_config
229                    .connect(
230                        &task_name,
231                        &config.config.connection_context.ssh_tunnel_manager,
232                    )
233                    .await?;
234                if let Some(timeout) = config
235                    .config
236                    .parameters
237                    .mysql_source_timeouts
238                    .snapshot_lock_wait_timeout
239                {
240                    // Interpolating a `Duration` integer; not parameterizable in MySQL `SET`.
241                    #[allow(clippy::disallowed_methods)]
242                    lock_conn
243                        .query_drop(format!(
244                            "SET @@session.lock_wait_timeout = {}",
245                            timeout.as_secs()
246                        ))
247                        .await?;
248                }
249
250                trace!(%id, "timely-{worker_id} acquiring table locks: {lock_clauses}");
251                // `lock_clauses` is built from `MySqlTableName::Display`, which
252                // escapes both schema and table via `quote_identifier`.
253                #[allow(clippy::disallowed_methods)]
254                let lock_result = lock_conn
255                    .query_drop(format!("LOCK TABLES {lock_clauses}"))
256                    .await;
257                match lock_result {
258                    // Handle the case where a table we are snapshotting has been dropped or renamed.
259                    Err(mysql_async::Error::Server(mysql_async::ServerError {
260                        code,
261                        message,
262                        ..
263                    })) if code == ER_NO_SUCH_TABLE => {
264                        trace!(%id, "timely-{worker_id} received unknown table error from \
265                                     lock query");
266                        let err = DefiniteError::TableDropped(message);
267                        return Ok(return_definite_error(
268                            err,
269                            &all_outputs,
270                            &raw_handle,
271                            data_cap_set,
272                            &definite_error_handle,
273                            definite_error_cap_set,
274                        )
275                        .await);
276                    }
277                    e => e?,
278                };
279
280                // Record the frontier of future GTIDs based on the executed GTID set at the start
281                // of the snapshot
282                let snapshot_gtid_set =
283                    query_sys_var(&mut lock_conn, "global.gtid_executed").await?;
284                let snapshot_gtid_frontier = match gtid_set_frontier(&snapshot_gtid_set) {
285                    Ok(frontier) => frontier,
286                    Err(err) => {
287                        let err = DefiniteError::UnsupportedGtidState(err.to_string());
288                        // If we received a GTID Set with non-consecutive intervals this breaks all
289                        // our assumptions, so there is nothing else we can do.
290                        return Ok(return_definite_error(
291                            err,
292                            &all_outputs,
293                            &raw_handle,
294                            data_cap_set,
295                            &definite_error_handle,
296                            definite_error_cap_set,
297                        )
298                        .await);
299                    }
300                };
301
302                // TODO(roshan): Insert metric for how long it took to acquire the locks
303                trace!(%id, "timely-{worker_id} acquired table locks at: {}",
304                       snapshot_gtid_frontier.pretty());
305
306                let mut conn = connection_config
307                    .connect(
308                        &task_name,
309                        &config.config.connection_context.ssh_tunnel_manager,
310                    )
311                    .await?;
312
313                // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
314                match validate_mysql_repl_settings(&mut conn).await {
315                    Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
316                        return Ok(return_definite_error(
317                            DefiniteError::ServerConfigurationError(err.to_string()),
318                            &all_outputs,
319                            &raw_handle,
320                            data_cap_set,
321                            &definite_error_handle,
322                            definite_error_cap_set,
323                        )
324                        .await);
325                    }
326                    Err(err) => Err(err)?,
327                    Ok(()) => (),
328                };
329
330                trace!(%id, "timely-{worker_id} starting transaction with \
331                             consistent snapshot at: {}", snapshot_gtid_frontier.pretty());
332
333                // Start a transaction with REPEATABLE READ and 'CONSISTENT SNAPSHOT' semantics
334                // so we can read a consistent snapshot of the table at the specific GTID we read.
335                let mut tx_opts = TxOpts::default();
336                tx_opts
337                    .with_isolation_level(IsolationLevel::RepeatableRead)
338                    .with_consistent_snapshot(true)
339                    .with_readonly(true);
340                let mut tx = conn.start_transaction(tx_opts).await?;
341                // Set the session time zone to UTC so that we can read TIMESTAMP columns as UTC
342                // From https://dev.mysql.com/doc/refman/8.0/en/datetime.html: "MySQL converts TIMESTAMP values
343                // from the current time zone to UTC for storage, and back from UTC to the current time zone
344                // for retrieval. (This does not occur for other types such as DATETIME.)"
345                #[allow(clippy::disallowed_methods)] // static SQL string
346                tx.query_drop("set @@session.time_zone = '+00:00'").await?;
347
348                // Configure query execution time based on param. We want to be able to
349                // override the server value here in case it's set too low,
350                // respective to the size of the data we need to copy.
351                if let Some(timeout) = config
352                    .config
353                    .parameters
354                    .mysql_source_timeouts
355                    .snapshot_max_execution_time
356                {
357                    // Interpolating an integer millis value; not parameterizable in MySQL `SET`.
358                    #[allow(clippy::disallowed_methods)]
359                    tx.query_drop(format!(
360                        "SET @@session.max_execution_time = {}",
361                        timeout.as_millis()
362                    ))
363                    .await?;
364                }
365
366                // We have started our transaction so we can unlock the tables.
367                #[allow(clippy::disallowed_methods)] // static SQL string
368                lock_conn.query_drop("UNLOCK TABLES").await?;
369                lock_conn.disconnect().await?;
370
371                trace!(%id, "timely-{worker_id} started transaction");
372
373                // Verify the schemas of the tables we are snapshotting
374                let errored_outputs =
375                    verify_schemas(&mut tx, reader_snapshot_table_info.iter().collect()).await?;
376                let mut removed_outputs = BTreeSet::new();
377                for (output, err) in errored_outputs {
378                    // Publish the error for this table and stop ingesting it
379                    let update = (
380                        (output.output_index, Err(err.clone().into())),
381                        GtidPartition::minimum(),
382                        Diff::ONE,
383                    );
384                    let size = update.fuel_size();
385                    raw_handle.give_fueled(&data_cap_set[0], update, size).await;
386                    tracing::warn!(%id, "timely-{worker_id} stopping snapshot of output {output:?} \
387                                due to schema mismatch");
388                    removed_outputs.insert(output.output_index);
389                }
390                for (_, outputs) in reader_snapshot_table_info.iter_mut() {
391                    outputs.retain(|output| !removed_outputs.contains(&output.output_index));
392                }
393                reader_snapshot_table_info.retain(|_, outputs| !outputs.is_empty());
394
395                let snapshot_total = fetch_snapshot_size(
396                    &mut tx,
397                    reader_snapshot_table_info
398                        .iter()
399                        .map(|(name, outputs)| {
400                            (
401                                name.clone(),
402                                outputs.len(),
403                                export_statistics.get(name).unwrap(),
404                            )
405                        })
406                        .collect(),
407                    metrics,
408                )
409                .await?;
410
411                // This worker has nothing else to do
412                if reader_snapshot_table_info.is_empty() {
413                    return Ok(());
414                }
415
416                // Read the snapshot data from the tables
417                let mut final_row = Row::default();
418
419                let mut snapshot_staged_total = 0;
420                for (table, outputs) in &reader_snapshot_table_info {
421                    let mut snapshot_staged = 0;
422                    let query = build_snapshot_query(outputs);
423                    trace!(%id, "timely-{worker_id} reading snapshot query='{}'", query);
424                    let mut results = tx.exec_stream(query, ()).await?;
425                    while let Some(row) = results.try_next().await? {
426                        let row: MySqlRow = row;
427                        snapshot_staged += 1;
428                        for (output, row_val) in outputs.iter().repeat_clone(row) {
429                            // We don't need to verify if binlog_row_metadata matches the expected when snapshotting as
430                            // the snapshot query always returns rows with full metadata. If the output is configured
431                            // with binlog_full_metadata = false, then we will just ignore the metadata when decoding.
432                            let event = match pack_mysql_row(
433                                &mut final_row,
434                                row_val,
435                                &output.desc,
436                                None,
437                                output.binlog_full_metadata,
438                            ) {
439                                Ok(row) => Ok(SourceMessage {
440                                    key: Row::default(),
441                                    value: row,
442                                    metadata: Row::default(),
443                                }),
444                                // Produce a DefiniteError in the stream for any rows that fail to decode
445                                Err(err @ MySqlError::ValueDecodeError { .. }) => {
446                                    Err(DataflowError::from(DefiniteError::ValueDecodeError(
447                                        err.to_string(),
448                                    )))
449                                }
450                                Err(err) => Err(err)?,
451                            };
452                            let update = (
453                                (output.output_index, event),
454                                GtidPartition::minimum(),
455                                Diff::ONE,
456                            );
457                            let size = update.fuel_size();
458                            raw_handle.give_fueled(&data_cap_set[0], update, size).await;
459                        }
460                        // This overcounting maintains existing behavior but will be removed one readers no longer rely on the value.
461                        snapshot_staged_total += u64::cast_from(outputs.len());
462                        if snapshot_staged_total % 1000 == 0 {
463                            for statistics in export_statistics.get(table).unwrap() {
464                                statistics.set_snapshot_records_staged(snapshot_staged);
465                            }
466                        }
467                    }
468                    for statistics in export_statistics.get(table).unwrap() {
469                        statistics.set_snapshot_records_staged(snapshot_staged);
470                    }
471                    trace!(%id, "timely-{worker_id} snapshotted {} records from \
472                                 table '{table}'", snapshot_staged * u64::cast_from(outputs.len()));
473                }
474
475                // We are done with the snapshot so now we will emit rewind requests. It is
476                // important that this happens after the snapshot has finished because this is what
477                // unblocks the replication operator and we want this to happen serially. It might
478                // seem like a good idea to read the replication stream concurrently with the
479                // snapshot but it actually leads to a lot of data being staged for the future,
480                // which needlesly consumed memory in the cluster.
481                for (table, outputs) in reader_snapshot_table_info {
482                    for output in outputs {
483                        trace!(%id, "timely-{worker_id} producing rewind request for {table}\
484                                     output {}", output.output_index);
485                        let req = RewindRequest {
486                            output_index: output.output_index,
487                            snapshot_upper: snapshot_gtid_frontier.clone(),
488                        };
489                        rewinds_handle.give(&rewind_cap_set[0], req);
490                    }
491                }
492                *rewind_cap_set = CapabilitySet::new();
493
494                // TODO (maz): Should we remove this to match Postgres?
495                if snapshot_staged_total < snapshot_total {
496                    error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow \
497                                 bigger than records staged {snapshot_staged_total}");
498                }
499
500                Ok(())
501            }))
502        });
503
504    // TODO: Split row decoding into a separate operator that can be distributed across all workers
505
506    let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
507
508    (
509        raw_data.as_collection(),
510        rewinds,
511        errors,
512        button.press_on_drop(),
513    )
514}
515
516/// Fetch the size of the snapshot on this worker and emits the appropriate emtrics and statistics
517/// for each table.
518async fn fetch_snapshot_size<Q>(
519    conn: &mut Q,
520    tables: Vec<(MySqlTableName, usize, &Vec<SourceStatistics>)>,
521    metrics: MySqlSnapshotMetrics,
522) -> Result<u64, anyhow::Error>
523where
524    Q: Queryable,
525{
526    let mut total = 0;
527    for (table, num_outputs, export_statistics) in tables {
528        let stats = collect_table_statistics(conn, &table).await?;
529        metrics.record_table_count_latency(table.1, table.0, stats.count_latency);
530        for export_stat in export_statistics {
531            export_stat.set_snapshot_records_known(stats.count);
532            export_stat.set_snapshot_records_staged(0);
533        }
534        total += stats.count * u64::cast_from(num_outputs);
535    }
536    Ok(total)
537}
538
539/// Builds the SQL query to be used for creating the snapshot using the first entry in outputs.
540///
541/// Expect `outputs` to contain entries for a single table, and to have at least 1 entry.
542/// Expect that each MySqlTableDesc entry contains all columns described in information_schema.columns.
543#[must_use]
544fn build_snapshot_query(outputs: &[SourceOutputInfo]) -> String {
545    let info = outputs.first().expect("MySQL table info");
546    for output in &outputs[1..] {
547        // the columns may be decoded based on position, and different outputs may replicate
548        // different columns, so we need to ensure that all columns are accounted for.
549        assert!(
550            info.desc.columns.len() == output.desc.columns.len(),
551            "Mismatch in table descriptions for {}",
552            info.table_name
553        );
554    }
555    let columns = info
556        .desc
557        .columns
558        .iter()
559        .map(|col| quote_identifier(&col.name))
560        .join(", ");
561    format!("SELECT {} FROM {}", columns, info.table_name)
562}
563
564#[derive(Default)]
565struct TableStatistics {
566    count_latency: f64,
567    count: u64,
568}
569
570async fn collect_table_statistics<Q>(
571    conn: &mut Q,
572    table: &MySqlTableName,
573) -> Result<TableStatistics, anyhow::Error>
574where
575    Q: Queryable,
576{
577    let mut stats = TableStatistics::default();
578
579    // `MySqlTableName::Display` escapes both identifier components via
580    // `quote_identifier`, so this interpolation is safe; not parameterizable.
581    #[allow(clippy::disallowed_methods)]
582    let count_row: Option<u64> = conn
583        .query_first(format!("SELECT COUNT(*) FROM {}", table))
584        .wall_time()
585        .set_at(&mut stats.count_latency)
586        .await?;
587    stats.count = count_row.ok_or_else(|| anyhow::anyhow!("failed to COUNT(*) {table}"))?;
588
589    Ok(stats)
590}
591
592#[cfg(test)]
593mod tests {
594    use super::*;
595    use mz_mysql_util::{MySqlColumnDesc, MySqlTableDesc};
596    use timely::progress::Antichain;
597
598    #[mz_ore::test]
599    fn snapshot_query_duplicate_table() {
600        let schema_name = "myschema".to_string();
601        let table_name = "mytable".to_string();
602        let table = MySqlTableName(schema_name.clone(), table_name.clone());
603        let columns = ["c1", "c2", "c3"]
604            .iter()
605            .map(|col| MySqlColumnDesc {
606                name: col.to_string(),
607                column_type: None,
608                meta: None,
609            })
610            .collect::<Vec<_>>();
611        let desc = MySqlTableDesc {
612            schema_name: schema_name.clone(),
613            name: table_name.clone(),
614            columns,
615            keys: BTreeSet::default(),
616        };
617        let info = SourceOutputInfo {
618            output_index: 1, // ignored
619            table_name: table.clone(),
620            desc,
621            text_columns: vec![],
622            exclude_columns: vec![],
623            initial_gtid_set: Antichain::default(),
624            resume_upper: Antichain::default(),
625            export_id: mz_repr::GlobalId::User(1),
626            binlog_full_metadata: false,
627        };
628        let query = build_snapshot_query(&[info.clone(), info]);
629        assert_eq!(
630            format!(
631                "SELECT `c1`, `c2`, `c3` FROM `{}`.`{}`",
632                &schema_name, &table_name
633            ),
634            query
635        );
636    }
637}