mz_storage/source/mysql/snapshot.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the table snapshot side of the [`MySqlSourceConnection`] dataflow.
11//!
12//! # Snapshot reading
13//!
14//! Depending on the `source_outputs resume_upper` parameters this dataflow decides which tables to
15//! snapshot and performs a simple `SELECT * FROM table` on them in order to get a snapshot.
16//! There are a few subtle points about this operation, described below.
17//!
18//! It is crucial for correctness that we always perform the snapshot of all tables at a specific
19//! point in time. This must be true even in the presence of restarts or partially committed
20//! snapshots. The consistent point that the snapshot must happen at is discovered and durably
21//! recorded during planning of the source and is exposed to this ingestion dataflow via the
22//! `initial_gtid_set` field in `MySqlSourceDetails`.
23//!
24//! Unfortunately MySQL does not provide an API to perform a transaction at a specific point in
25//! time. Instead, MySQL allows us to perform a snapshot of a table and let us know at which point
26//! in time the snapshot was taken. Using this information we can take a snapshot at an arbitrary
27//! point in time and then rewind it to the desired `initial_gtid_set` by "rewinding" it. These two
28//! phases are described in the following section.
29//!
30//! ## Producing a snapshot at a known point in time.
31//!
32//! Ideally we would like to start a transaction and ask MySQL to tell us the point in time this
33//! transaction is running at. As far as we know there isn't such API so we achieve this using
34//! table locks instead.
35//!
36//! The full set of tables that are meant to be snapshotted are partitioned among the workers. Each
37//! worker initiates a connection to the server and acquires a table lock on all the tables that
38//! have been assigned to it. By doing so we establish a moment in time where we know no writes are
39//! happening to the tables we are interested in. After the locks are taken each worker reads the
40//! current upper frontier (`snapshot_upper`) using the `@@gtid_executed` system variable. This
41//! frontier establishes an upper bound on any possible write to the tables of interest until the
42//! lock is released.
43//!
44//! Each worker now starts a transaction via a new connection with 'REPEATABLE READ' and
45//! 'CONSISTENT SNAPSHOT' semantics. Due to linearizability we know that this transaction's view of
46//! the database must some time `t_snapshot` such that `snapshot_upper <= t_snapshot`. We don't
47//! actually know the exact value of `t_snapshot` and it might be strictly greater than
48//! `snapshot_upper`. However, because this transaction will only be used to read the locked tables
49//! and we know that `snapshot_upper` is an upper bound on all the writes that have happened to
50//! them we can safely pretend that the transaction's `t_snapshot` is *equal* to `snapshot_upper`.
51//! We have therefore succeeded in starting a transaction at a known point in time!
52//!
53//! At this point it is safe for each worker to unlock the tables, since the transaction has
54//! established a point in time, and close the initial connection. Each worker can then read the
55//! snapshot of the tables it is responsible for and publish it downstream.
56//!
57//! TODO: Other software products hold the table lock for the duration of the snapshot, and some do
58//! not. We should figure out why and if we need to hold the lock longer. This may be because of a
59//! difference in how REPEATABLE READ works in some MySQL-compatible systems (e.g. Aurora MySQL).
60//!
61//! ## Rewinding the snapshot to a specific point in time.
62//!
63//! Having obtained a snapshot of a table at some `snapshot_upper` we are now tasked with
64//! transforming this snapshot into one at `initial_gtid_set`. In other words we have produced a
65//! snapshot containing all updates that happened at `t: !(snapshot_upper <= t)` but what we
66//! actually want is a snapshot containing all updates that happened at `t: !(initial_gtid <= t)`.
67//!
68//! If we assume that `initial_gtid_set <= snapshot_upper`, which is a fair assumption since the
69//! former is obtained before the latter, then we can observe that the snapshot we produced
70//! contains all updates at `t: !(initial_gtid <= t)` (i.e the snapshot we want) and some additional
71//! unwanted updates at `t: initial_gtid <= t && !(snapshot_upper <= t)`. We happen to know exactly
72//! what those additional unwanted updates are because those will be obtained by reading the
73//! replication stream in the replication operator and so all we need to do to "rewind" our
74//! `snapshot_upper` snapshot to `initial_gtid` is to ask the replication operator to "undo" any
75//! updates that falls in the undesirable region.
76//!
77//! This is exactly what `RewindRequest` is about. It informs the replication operator that a
78//! particular table has been snapshotted at `snapshot_upper` and would like all the updates
79//! discovered during replication that happen at `t: initial_gtid <= t && !(snapshot_upper <= t)`.
80//! to be cancelled. In Differential Dataflow this is as simple as flipping the sign of the diff
81//! field.
82//!
83//! The snapshot reader emits updates at the minimum timestamp (by convention) to allow the
84//! updates to be potentially negated by the replication operator, which will emit negated
85//! updates at the minimum timestamp (by convention) when it encounters rows from a table that
86//! occur before the GTID frontier in the Rewind Request for that table.
87use std::collections::{BTreeMap, BTreeSet};
88use std::rc::Rc;
89use std::sync::Arc;
90
91use differential_dataflow::AsCollection;
92use futures::TryStreamExt;
93use itertools::Itertools;
94use mysql_async::prelude::Queryable;
95use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts};
96use mz_mysql_util::{
97 ER_NO_SUCH_TABLE, MySqlError, pack_mysql_row, query_sys_var, quote_identifier,
98};
99use mz_ore::cast::CastFrom;
100use mz_ore::future::InTask;
101use mz_ore::iter::IteratorExt;
102use mz_ore::metrics::MetricsFutureExt;
103use mz_repr::{Diff, Row};
104use mz_storage_types::errors::DataflowError;
105use mz_storage_types::sources::MySqlSourceConnection;
106use mz_storage_types::sources::mysql::{GtidPartition, gtid_set_frontier};
107use mz_timely_util::antichain::AntichainExt;
108use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
109use mz_timely_util::containers::stack::AccountedStackBuilder;
110use timely::dataflow::operators::core::Map;
111use timely::dataflow::operators::{CapabilitySet, Concat};
112use timely::dataflow::{Scope, Stream};
113use timely::progress::Timestamp;
114use tracing::{error, trace};
115
116use crate::metrics::source::mysql::MySqlSnapshotMetrics;
117use crate::source::RawSourceCreationConfig;
118use crate::source::types::{
119 ProgressStatisticsUpdate, SignaledFuture, SourceMessage, StackedCollection,
120};
121
122use super::schemas::verify_schemas;
123use super::{
124 DefiniteError, MySqlTableName, ReplicationError, RewindRequest, SourceOutputInfo,
125 TransientError, return_definite_error, validate_mysql_repl_settings,
126};
127
128/// Renders the snapshot dataflow. See the module documentation for more information.
129pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
130 scope: G,
131 config: RawSourceCreationConfig,
132 connection: MySqlSourceConnection,
133 source_outputs: Vec<SourceOutputInfo>,
134 metrics: MySqlSnapshotMetrics,
135) -> (
136 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
137 Stream<G, RewindRequest>,
138 Stream<G, ProgressStatisticsUpdate>,
139 Stream<G, ReplicationError>,
140 PressOnDropButton,
141) {
142 let mut builder =
143 AsyncOperatorBuilder::new(format!("MySqlSnapshotReader({})", config.id), scope.clone());
144
145 let (raw_handle, raw_data) = builder.new_output::<AccountedStackBuilder<_>>();
146 let (rewinds_handle, rewinds) = builder.new_output();
147 // Captures DefiniteErrors that affect the entire source, including all outputs
148 let (definite_error_handle, definite_errors) = builder.new_output();
149
150 let (stats_output, stats_stream) = builder.new_output();
151
152 // A global view of all outputs that will be snapshot by all workers.
153 let mut all_outputs = vec![];
154 // A map containing only the table infos that this worker should snapshot.
155 let mut reader_snapshot_table_info = BTreeMap::new();
156
157 for output in source_outputs.into_iter() {
158 // Determine which outputs need to be snapshot and which already have been.
159 if *output.resume_upper != [GtidPartition::minimum()] {
160 // Already has been snapshotted.
161 continue;
162 }
163 all_outputs.push(output.output_index);
164 if config.responsible_for(&output.table_name) {
165 reader_snapshot_table_info
166 .entry(output.table_name.clone())
167 .or_insert_with(Vec::new)
168 .push(output);
169 }
170 }
171
172 let (button, transient_errors): (_, Stream<G, Rc<TransientError>>) =
173 builder.build_fallible(move |caps| {
174 let busy_signal = Arc::clone(&config.busy_signal);
175 Box::pin(SignaledFuture::new(busy_signal, async move {
176 let [
177 data_cap_set,
178 rewind_cap_set,
179 definite_error_cap_set,
180 stats_cap,
181 ]: &mut [_; 4] = caps.try_into().unwrap();
182
183 let id = config.id;
184 let worker_id = config.worker_id;
185
186 // If this worker has no tables to snapshot then there is nothing to do.
187 if reader_snapshot_table_info.is_empty() {
188 trace!(%id, "timely-{worker_id} initializing table reader \
189 with no tables to snapshot, exiting");
190 if !all_outputs.is_empty() {
191 // Emit 0, to mark this worker as having started up correctly,
192 // but having done no snapshotting. Otherwise leave
193 // this not filled in (no snapshotting is occurring in this instance of
194 // the dataflow).
195 stats_output.give(
196 &stats_cap[0],
197 ProgressStatisticsUpdate::Snapshot {
198 records_known: 0,
199 records_staged: 0,
200 },
201 );
202 }
203 return Ok(());
204 } else {
205 trace!(%id, "timely-{worker_id} initializing table reader \
206 with {} tables to snapshot",
207 reader_snapshot_table_info.len());
208 }
209
210 let connection_config = connection
211 .connection
212 .config(
213 &config.config.connection_context.secrets_reader,
214 &config.config,
215 InTask::Yes,
216 )
217 .await?;
218 let task_name = format!("timely-{worker_id} MySQL snapshotter");
219
220 let lock_clauses = reader_snapshot_table_info
221 .keys()
222 .map(|t| format!("{} READ", t))
223 .collect::<Vec<String>>()
224 .join(", ");
225 let mut lock_conn = connection_config
226 .connect(
227 &task_name,
228 &config.config.connection_context.ssh_tunnel_manager,
229 )
230 .await?;
231 if let Some(timeout) = config
232 .config
233 .parameters
234 .mysql_source_timeouts
235 .snapshot_lock_wait_timeout
236 {
237 lock_conn
238 .query_drop(format!(
239 "SET @@session.lock_wait_timeout = {}",
240 timeout.as_secs()
241 ))
242 .await?;
243 }
244
245 trace!(%id, "timely-{worker_id} acquiring table locks: {lock_clauses}");
246 match lock_conn
247 .query_drop(format!("LOCK TABLES {lock_clauses}"))
248 .await
249 {
250 // Handle the case where a table we are snapshotting has been dropped or renamed.
251 Err(mysql_async::Error::Server(mysql_async::ServerError {
252 code,
253 message,
254 ..
255 })) if code == ER_NO_SUCH_TABLE => {
256 trace!(%id, "timely-{worker_id} received unknown table error from \
257 lock query");
258 let err = DefiniteError::TableDropped(message);
259 return Ok(return_definite_error(
260 err,
261 &all_outputs,
262 &raw_handle,
263 data_cap_set,
264 &definite_error_handle,
265 definite_error_cap_set,
266 )
267 .await);
268 }
269 e => e?,
270 };
271
272 // Record the frontier of future GTIDs based on the executed GTID set at the start
273 // of the snapshot
274 let snapshot_gtid_set =
275 query_sys_var(&mut lock_conn, "global.gtid_executed").await?;
276 let snapshot_gtid_frontier = match gtid_set_frontier(&snapshot_gtid_set) {
277 Ok(frontier) => frontier,
278 Err(err) => {
279 let err = DefiniteError::UnsupportedGtidState(err.to_string());
280 // If we received a GTID Set with non-consecutive intervals this breaks all
281 // our assumptions, so there is nothing else we can do.
282 return Ok(return_definite_error(
283 err,
284 &all_outputs,
285 &raw_handle,
286 data_cap_set,
287 &definite_error_handle,
288 definite_error_cap_set,
289 )
290 .await);
291 }
292 };
293
294 // TODO(roshan): Insert metric for how long it took to acquire the locks
295 trace!(%id, "timely-{worker_id} acquired table locks at: {}",
296 snapshot_gtid_frontier.pretty());
297
298 let mut conn = connection_config
299 .connect(
300 &task_name,
301 &config.config.connection_context.ssh_tunnel_manager,
302 )
303 .await?;
304
305 // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
306 match validate_mysql_repl_settings(&mut conn).await {
307 Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
308 return Ok(return_definite_error(
309 DefiniteError::ServerConfigurationError(err.to_string()),
310 &all_outputs,
311 &raw_handle,
312 data_cap_set,
313 &definite_error_handle,
314 definite_error_cap_set,
315 )
316 .await);
317 }
318 Err(err) => Err(err)?,
319 Ok(()) => (),
320 };
321
322 trace!(%id, "timely-{worker_id} starting transaction with \
323 consistent snapshot at: {}", snapshot_gtid_frontier.pretty());
324
325 // Start a transaction with REPEATABLE READ and 'CONSISTENT SNAPSHOT' semantics
326 // so we can read a consistent snapshot of the table at the specific GTID we read.
327 let mut tx_opts = TxOpts::default();
328 tx_opts
329 .with_isolation_level(IsolationLevel::RepeatableRead)
330 .with_consistent_snapshot(true)
331 .with_readonly(true);
332 let mut tx = conn.start_transaction(tx_opts).await?;
333 // Set the session time zone to UTC so that we can read TIMESTAMP columns as UTC
334 // From https://dev.mysql.com/doc/refman/8.0/en/datetime.html: "MySQL converts TIMESTAMP values
335 // from the current time zone to UTC for storage, and back from UTC to the current time zone
336 // for retrieval. (This does not occur for other types such as DATETIME.)"
337 tx.query_drop("set @@session.time_zone = '+00:00'").await?;
338
339 // Configure query execution time based on param. We want to be able to
340 // override the server value here in case it's set too low,
341 // respective to the size of the data we need to copy.
342 if let Some(timeout) = config
343 .config
344 .parameters
345 .mysql_source_timeouts
346 .snapshot_max_execution_time
347 {
348 tx.query_drop(format!(
349 "SET @@session.max_execution_time = {}",
350 timeout.as_millis()
351 ))
352 .await?;
353 }
354
355 // We have started our transaction so we can unlock the tables.
356 lock_conn.query_drop("UNLOCK TABLES").await?;
357 lock_conn.disconnect().await?;
358
359 trace!(%id, "timely-{worker_id} started transaction");
360
361 // Verify the schemas of the tables we are snapshotting
362 let errored_outputs =
363 verify_schemas(&mut tx, reader_snapshot_table_info.iter().collect()).await?;
364 let mut removed_outputs = BTreeSet::new();
365 for (output, err) in errored_outputs {
366 // Publish the error for this table and stop ingesting it
367 raw_handle
368 .give_fueled(
369 &data_cap_set[0],
370 (
371 (output.output_index, Err(err.clone().into())),
372 GtidPartition::minimum(),
373 Diff::ONE,
374 ),
375 )
376 .await;
377 trace!(%id, "timely-{worker_id} stopping snapshot of output {output:?} \
378 due to schema mismatch");
379 removed_outputs.insert(output.output_index);
380 }
381 for (_, outputs) in reader_snapshot_table_info.iter_mut() {
382 outputs.retain(|output| !removed_outputs.contains(&output.output_index));
383 }
384 reader_snapshot_table_info.retain(|_, outputs| !outputs.is_empty());
385
386 let snapshot_total = fetch_snapshot_size(
387 &mut tx,
388 reader_snapshot_table_info
389 .iter()
390 .map(|(name, outputs)| ((*name).clone(), outputs.len()))
391 .collect(),
392 metrics,
393 )
394 .await?;
395
396 stats_output.give(
397 &stats_cap[0],
398 ProgressStatisticsUpdate::Snapshot {
399 records_known: snapshot_total,
400 records_staged: 0,
401 },
402 );
403
404 // This worker has nothing else to do
405 if reader_snapshot_table_info.is_empty() {
406 return Ok(());
407 }
408
409 // Read the snapshot data from the tables
410 let mut final_row = Row::default();
411
412 let mut snapshot_staged = 0;
413 for (table, outputs) in &reader_snapshot_table_info {
414 let query = build_snapshot_query(outputs);
415 trace!(%id, "timely-{worker_id} reading snapshot query='{}'", query);
416 let mut results = tx.exec_stream(query, ()).await?;
417 let mut count = 0;
418 while let Some(row) = results.try_next().await? {
419 let row: MySqlRow = row;
420 for (output, row_val) in outputs.iter().repeat_clone(row) {
421 let event = match pack_mysql_row(&mut final_row, row_val, &output.desc)
422 {
423 Ok(row) => Ok(SourceMessage {
424 key: Row::default(),
425 value: row,
426 metadata: Row::default(),
427 }),
428 // Produce a DefiniteError in the stream for any rows that fail to decode
429 Err(err @ MySqlError::ValueDecodeError { .. }) => {
430 Err(DataflowError::from(DefiniteError::ValueDecodeError(
431 err.to_string(),
432 )))
433 }
434 Err(err) => Err(err)?,
435 };
436 raw_handle
437 .give_fueled(
438 &data_cap_set[0],
439 (
440 (output.output_index, event),
441 GtidPartition::minimum(),
442 Diff::ONE,
443 ),
444 )
445 .await;
446 count += 1;
447 snapshot_staged += 1;
448 // TODO(guswynn): does this 1000 need to be configurable?
449 if snapshot_staged % 1000 == 0 {
450 stats_output.give(
451 &stats_cap[0],
452 ProgressStatisticsUpdate::Snapshot {
453 records_known: snapshot_total,
454 records_staged: snapshot_staged,
455 },
456 );
457 }
458 }
459 }
460 trace!(%id, "timely-{worker_id} snapshotted {count} records from \
461 table '{table}'");
462 }
463
464 // We are done with the snapshot so now we will emit rewind requests. It is
465 // important that this happens after the snapshot has finished because this is what
466 // unblocks the replication operator and we want this to happen serially. It might
467 // seem like a good idea to read the replication stream concurrently with the
468 // snapshot but it actually leads to a lot of data being staged for the future,
469 // which needlesly consumed memory in the cluster.
470 for (table, outputs) in reader_snapshot_table_info {
471 for output in outputs {
472 trace!(%id, "timely-{worker_id} producing rewind request for {table}\
473 output {}", output.output_index);
474 let req = RewindRequest {
475 output_index: output.output_index,
476 snapshot_upper: snapshot_gtid_frontier.clone(),
477 };
478 rewinds_handle.give(&rewind_cap_set[0], req);
479 }
480 }
481 *rewind_cap_set = CapabilitySet::new();
482
483 if snapshot_staged < snapshot_total {
484 error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow
485 bigger than records staged {snapshot_staged}");
486 snapshot_staged = snapshot_total;
487 }
488 stats_output.give(
489 &stats_cap[0],
490 ProgressStatisticsUpdate::Snapshot {
491 records_known: snapshot_total,
492 records_staged: snapshot_staged,
493 },
494 );
495 Ok(())
496 }))
497 });
498
499 // TODO: Split row decoding into a separate operator that can be distributed across all workers
500
501 let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
502
503 (
504 raw_data.as_collection(),
505 rewinds,
506 stats_stream,
507 errors,
508 button.press_on_drop(),
509 )
510}
511
512/// Fetch the size of the snapshot on this worker.
513async fn fetch_snapshot_size<Q>(
514 conn: &mut Q,
515 tables: Vec<(MySqlTableName, usize)>,
516 metrics: MySqlSnapshotMetrics,
517) -> Result<u64, anyhow::Error>
518where
519 Q: Queryable,
520{
521 let mut total = 0;
522 for (table, num_outputs) in tables {
523 let stats = collect_table_statistics(conn, &table).await?;
524 metrics.record_table_count_latency(table.1, table.0, stats.count_latency);
525 total += stats.count * u64::cast_from(num_outputs);
526 }
527 Ok(total)
528}
529
530/// Builds the SQL query to be used for creating the snapshot using the first entry in outputs.
531///
532/// Expect `outputs` to contain entries for a single table, and to have at least 1 entry.
533/// Expect that each MySqlTableDesc entry contains all columns described in information_schema.columns.
534#[must_use]
535fn build_snapshot_query(outputs: &[SourceOutputInfo]) -> String {
536 let info = outputs.first().expect("MySQL table info");
537 for output in &outputs[1..] {
538 assert!(
539 info.desc.columns == output.desc.columns,
540 "Mismatch in table descriptions for {}",
541 info.table_name
542 );
543 }
544 let columns = info
545 .desc
546 .columns
547 .iter()
548 .map(|col| quote_identifier(&col.name))
549 .join(", ");
550 format!("SELECT {} FROM {}", columns, info.table_name)
551}
552
553#[derive(Default)]
554struct TableStatistics {
555 count_latency: f64,
556 count: u64,
557}
558
559async fn collect_table_statistics<Q>(
560 conn: &mut Q,
561 table: &MySqlTableName,
562) -> Result<TableStatistics, anyhow::Error>
563where
564 Q: Queryable,
565{
566 let mut stats = TableStatistics::default();
567
568 let count_row: Option<u64> = conn
569 .query_first(format!("SELECT COUNT(*) FROM {}", table))
570 .wall_time()
571 .set_at(&mut stats.count_latency)
572 .await?;
573 stats.count = count_row.ok_or_else(|| anyhow::anyhow!("failed to COUNT(*) {table}"))?;
574
575 Ok(stats)
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581 use mz_mysql_util::{MySqlColumnDesc, MySqlTableDesc};
582 use timely::progress::Antichain;
583
584 #[mz_ore::test]
585 fn snapshot_query_duplicate_table() {
586 let schema_name = "myschema".to_string();
587 let table_name = "mytable".to_string();
588 let table = MySqlTableName(schema_name.clone(), table_name.clone());
589 let columns = ["c1", "c2", "c3"]
590 .iter()
591 .map(|col| MySqlColumnDesc {
592 name: col.to_string(),
593 column_type: None,
594 meta: None,
595 })
596 .collect::<Vec<_>>();
597 let desc = MySqlTableDesc {
598 schema_name: schema_name.clone(),
599 name: table_name.clone(),
600 columns,
601 keys: BTreeSet::default(),
602 };
603 let info = SourceOutputInfo {
604 output_index: 1, // ignored
605 table_name: table.clone(),
606 desc,
607 text_columns: vec![],
608 exclude_columns: vec![],
609 initial_gtid_set: Antichain::default(),
610 resume_upper: Antichain::default(),
611 };
612 let query = build_snapshot_query(&[info.clone(), info]);
613 assert_eq!(
614 format!(
615 "SELECT `c1`, `c2`, `c3` FROM `{}`.`{}`",
616 &schema_name, &table_name
617 ),
618 query
619 );
620 }
621}