1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Renders the table snapshot side of the [`MySqlSourceConnection`] dataflow.
//!
//! # Snapshot reading
//!
//! Depending on the `source_outputs resume_upper` parameters this dataflow decides which tables to
//! snapshot and performs a simple `SELECT * FROM table` on them in order to get a snapshot.
//! There are a few subtle points about this operation, described below.
//!
//! It is crucial for correctness that we always perform the snapshot of all tables at a specific
//! point in time. This must be true even in the presence of restarts or partially committed
//! snapshots. The consistent point that the snapshot must happen at is discovered and durably
//! recorded during planning of the source and is exposed to this ingestion dataflow via the
//! `initial_gtid_set` field in `MySqlSourceDetails`.
//!
//! Unfortunately MySQL does not provide an API to perform a transaction at a specific point in
//! time. Instead, MySQL allows us to perform a snapshot of a table and let us know at which point
//! in time the snapshot was taken. Using this information we can take a snapshot at an arbitrary
//! point in time and then rewind it to the desired `initial_gtid_set` by "rewinding" it. These two
//! phases are described in the following section.
//!
//! ## Producing a snapshot at a known point in time.
//!
//! Ideally we would like to start a transaction and ask MySQL to tell us the point in time this
//! transaction is running at. As far as we know there isn't such API so we achieve this using
//! table locks instead.
//!
//! The full set of tables that are meant to be snapshotted are partitioned among the workers. Each
//! worker initiates a connection to the server and acquires a table lock on all the tables that
//! have been assigned to it. By doing so we establish a moment in time where we know no writes are
//! happening to the tables we are interested in. After the locks are taken each worker reads the
//! current upper frontier (`snapshot_upper`) using the `@@gtid_executed` system variable. This
//! frontier establishes an upper bound on any possible write to the tables of interest until the
//! lock is released.
//!
//! Each worker now starts a transaction via a new connection with 'REPEATABLE READ' and
//! 'CONSISTENT SNAPSHOT' semantics. Due to linearizability we know that this transaction's view of
//! the database must some time `t_snapshot` such that `snapshot_upper <= t_snapshot`. We don't
//! actually know the exact value of `t_snapshot` and it might be strictly greater than
//! `snapshot_upper`. However, because this transaction will only be used to read the locked tables
//! and we know that `snapshot_upper` is an upper bound on all the writes that have happened to
//! them we can safely pretend that the transaction's `t_snapshot` is *equal* to `snapshot_upper`.
//! We have therefore succeeded in starting a transaction at a known point in time!
//!
//! At this point it is safe for each worker to unlock the tables, since the transaction has
//! established a point in time, and close the initial connection. Each worker can then read the
//! snapshot of the tables it is responsible for and publish it downstream.
//!
//! TODO: Other software products hold the table lock for the duration of the snapshot, and some do
//! not. We should figure out why and if we need to hold the lock longer. This may be because of a
//! difference in how REPEATABLE READ works in some MySQL-compatible systems (e.g. Aurora MySQL).
//!
//! ## Rewinding the snapshot to a specific point in time.
//!
//! Having obtained a snapshot of a table at some `snapshot_upper` we are now tasked with
//! transforming this snapshot into one at `initial_gtid_set`. In other words we have produced a
//! snapshot containing all updates that happened at `t: !(snapshot_upper <= t)` but what we
//! actually want is a snapshot containing all updates that happened at `t: !(initial_gtid <= t)`.
//!
//! If we assume that `initial_gtid_set <= snapshot_upper`, which is a fair assumption since the
//! former is obtained before the latter, then we can observe that the snapshot we produced
//! contains all updates at `t: !(initial_gtid <= t)` (i.e the snapshot we want) and some additional
//! unwanted updates at `t: initial_gtid <= t && !(snapshot_upper <= t)`. We happen to know exactly
//! what those additional unwanted updates are because those will be obtained by reading the
//! replication stream in the replication operator and so all we need to do to "rewind" our
//! `snapshot_upper` snapshot to `initial_gtid` is to ask the replication operator to "undo" any
//! updates that falls in the undesirable region.
//!
//! This is exactly what `RewindRequest` is about. It informs the replication operator that a
//! particular table has been snapshotted at `snapshot_upper` and would like all the updates
//! discovered during replication that happen at `t: initial_gtid <= t && !(snapshot_upper <= t)`.
//! to be cancelled. In Differential Dataflow this is as simple as flipping the sign of the diff
//! field.
//!
//! The snapshot reader emits updates at the minimum timestamp (by convention) to allow the
//! updates to be potentially negated by the replication operator, which will emit negated
//! updates at the minimum timestamp (by convention) when it encounters rows from a table that
//! occur before the GTID frontier in the Rewind Request for that table.
use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
use std::sync::Arc;

use differential_dataflow::AsCollection;
use futures::TryStreamExt;
use mysql_async::prelude::Queryable;
use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts};
use mz_mysql_util::{pack_mysql_row, query_sys_var, MySqlError, ER_NO_SUCH_TABLE};
use mz_ore::cast::CastFrom;
use mz_ore::future::InTask;
use mz_ore::iter::IteratorExt;
use mz_ore::metrics::MetricsFutureExt;
use mz_repr::Row;
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::mysql::{gtid_set_frontier, GtidPartition};
use mz_storage_types::sources::MySqlSourceConnection;
use mz_timely_util::antichain::AntichainExt;
use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
use mz_timely_util::containers::stack::AccountedStackBuilder;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::{CapabilitySet, Concat};
use timely::dataflow::{Scope, Stream};
use timely::progress::Timestamp;
use tracing::{error, trace};

use crate::metrics::source::mysql::MySqlSnapshotMetrics;
use crate::source::types::{
    ProgressStatisticsUpdate, SignaledFuture, SourceMessage, StackedCollection,
};
use crate::source::RawSourceCreationConfig;

use super::schemas::verify_schemas;
use super::{
    return_definite_error, validate_mysql_repl_settings, DefiniteError, MySqlTableName,
    ReplicationError, RewindRequest, SourceOutputInfo, TransientError,
};

/// Renders the snapshot dataflow. See the module documentation for more information.
pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
    scope: G,
    config: RawSourceCreationConfig,
    connection: MySqlSourceConnection,
    source_outputs: Vec<SourceOutputInfo>,
    metrics: MySqlSnapshotMetrics,
) -> (
    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
    Stream<G, RewindRequest>,
    Stream<G, ProgressStatisticsUpdate>,
    Stream<G, ReplicationError>,
    PressOnDropButton,
) {
    let mut builder =
        AsyncOperatorBuilder::new(format!("MySqlSnapshotReader({})", config.id), scope.clone());

    let (raw_handle, raw_data) = builder.new_output::<AccountedStackBuilder<_>>();
    let (rewinds_handle, rewinds) = builder.new_output();
    // Captures DefiniteErrors that affect the entire source, including all outputs
    let (definite_error_handle, definite_errors) = builder.new_output();

    let (stats_output, stats_stream) = builder.new_output();

    // A global view of all outputs that will be snapshot by all workers.
    let mut all_outputs = vec![];
    // A map containing only the table infos that this worker should snapshot.
    let mut reader_snapshot_table_info = BTreeMap::new();

    for output in source_outputs.into_iter() {
        // Determine which outputs need to be snapshot and which already have been.
        if *output.resume_upper != [GtidPartition::minimum()] {
            // Already has been snapshotted.
            continue;
        }
        all_outputs.push(output.output_index);
        if config.responsible_for(&output.table_name) {
            reader_snapshot_table_info
                .entry(output.table_name.clone())
                .or_insert_with(Vec::new)
                .push(output);
        }
    }

    let (button, transient_errors): (_, Stream<G, Rc<TransientError>>) =
        builder.build_fallible(move |caps| {
            let busy_signal = Arc::clone(&config.busy_signal);
            Box::pin(SignaledFuture::new(busy_signal, async move {
                let [data_cap_set, rewind_cap_set, definite_error_cap_set, stats_cap]: &mut [_; 4] =
                    caps.try_into().unwrap();

                let id = config.id;
                let worker_id = config.worker_id;

                // If this worker has no tables to snapshot then there is nothing to do.
                if reader_snapshot_table_info.is_empty() {
                    trace!(%id, "timely-{worker_id} initializing table reader \
                                 with no tables to snapshot, exiting");
                    if !all_outputs.is_empty() {
                        // Emit 0, to mark this worker as having started up correctly,
                        // but having done no snapshotting. Otherwise leave
                        // this not filled in (no snapshotting is occurring in this instance of
                        // the dataflow).
                        stats_output.give(
                            &stats_cap[0],
                            ProgressStatisticsUpdate::Snapshot {
                                records_known: 0,
                                records_staged: 0,
                            },
                        );
                    }
                    return Ok(());
                } else {
                    trace!(%id, "timely-{worker_id} initializing table reader \
                                 with {} tables to snapshot",
                           reader_snapshot_table_info.len());
                }

                let connection_config = connection
                    .connection
                    .config(
                        &config.config.connection_context.secrets_reader,
                        &config.config,
                        InTask::Yes,
                    )
                    .await?;
                let task_name = format!("timely-{worker_id} MySQL snapshotter");

                let lock_clauses = reader_snapshot_table_info
                    .keys()
                    .map(|t| format!("{} READ", t))
                    .collect::<Vec<String>>()
                    .join(", ");
                let mut lock_conn = connection_config
                    .connect(
                        &task_name,
                        &config.config.connection_context.ssh_tunnel_manager,
                    )
                    .await?;
                if let Some(timeout) = config
                    .config
                    .parameters
                    .mysql_source_timeouts
                    .snapshot_lock_wait_timeout
                {
                    lock_conn
                        .query_drop(format!(
                            "SET @@session.lock_wait_timeout = {}",
                            timeout.as_secs()
                        ))
                        .await?;
                }

                trace!(%id, "timely-{worker_id} acquiring table locks: {lock_clauses}");
                match lock_conn
                    .query_drop(format!("LOCK TABLES {lock_clauses}"))
                    .await
                {
                    // Handle the case where a table we are snapshotting has been dropped or renamed.
                    Err(mysql_async::Error::Server(mysql_async::ServerError {
                        code,
                        message,
                        ..
                    })) if code == ER_NO_SUCH_TABLE => {
                        trace!(%id, "timely-{worker_id} received unknown table error from \
                                     lock query");
                        let err = DefiniteError::TableDropped(message);
                        return Ok(return_definite_error(
                            err,
                            &all_outputs,
                            &raw_handle,
                            data_cap_set,
                            &definite_error_handle,
                            definite_error_cap_set,
                        )
                        .await);
                    }
                    e => e?,
                };

                // Record the frontier of future GTIDs based on the executed GTID set at the start
                // of the snapshot
                let snapshot_gtid_set =
                    query_sys_var(&mut lock_conn, "global.gtid_executed").await?;
                let snapshot_gtid_frontier = match gtid_set_frontier(&snapshot_gtid_set) {
                    Ok(frontier) => frontier,
                    Err(err) => {
                        let err = DefiniteError::UnsupportedGtidState(err.to_string());
                        // If we received a GTID Set with non-consecutive intervals this breaks all
                        // our assumptions, so there is nothing else we can do.
                        return Ok(return_definite_error(
                            err,
                            &all_outputs,
                            &raw_handle,
                            data_cap_set,
                            &definite_error_handle,
                            definite_error_cap_set,
                        )
                        .await);
                    }
                };

                // TODO(roshan): Insert metric for how long it took to acquire the locks
                trace!(%id, "timely-{worker_id} acquired table locks at: {}",
                       snapshot_gtid_frontier.pretty());

                let mut conn = connection_config
                    .connect(
                        &task_name,
                        &config.config.connection_context.ssh_tunnel_manager,
                    )
                    .await?;

                // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
                match validate_mysql_repl_settings(&mut conn).await {
                    Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
                        return Ok(return_definite_error(
                            DefiniteError::ServerConfigurationError(err.to_string()),
                            &all_outputs,
                            &raw_handle,
                            data_cap_set,
                            &definite_error_handle,
                            definite_error_cap_set,
                        )
                        .await);
                    }
                    Err(err) => Err(err)?,
                    Ok(()) => (),
                };

                trace!(%id, "timely-{worker_id} starting transaction with \
                             consistent snapshot at: {}", snapshot_gtid_frontier.pretty());

                // Start a transaction with REPEATABLE READ and 'CONSISTENT SNAPSHOT' semantics
                // so we can read a consistent snapshot of the table at the specific GTID we read.
                let mut tx_opts = TxOpts::default();
                tx_opts
                    .with_isolation_level(IsolationLevel::RepeatableRead)
                    .with_consistent_snapshot(true)
                    .with_readonly(true);
                let mut tx = conn.start_transaction(tx_opts).await?;
                // Set the session time zone to UTC so that we can read TIMESTAMP columns as UTC
                // From https://dev.mysql.com/doc/refman/8.0/en/datetime.html: "MySQL converts TIMESTAMP values
                // from the current time zone to UTC for storage, and back from UTC to the current time zone
                // for retrieval. (This does not occur for other types such as DATETIME.)"
                tx.query_drop("set @@session.time_zone = '+00:00'").await?;

                // Configure query execution time based on param. We want to be able to
                // override the server value here in case it's set too low,
                // respective to the size of the data we need to copy.
                if let Some(timeout) = config
                    .config
                    .parameters
                    .mysql_source_timeouts
                    .snapshot_max_execution_time
                {
                    tx.query_drop(format!(
                        "SET @@session.max_execution_time = {}",
                        timeout.as_millis()
                    ))
                    .await?;
                }

                // We have started our transaction so we can unlock the tables.
                lock_conn.query_drop("UNLOCK TABLES").await?;
                lock_conn.disconnect().await?;

                trace!(%id, "timely-{worker_id} started transaction");

                // Verify the schemas of the tables we are snapshotting
                let errored_outputs =
                    verify_schemas(&mut tx, reader_snapshot_table_info.iter().collect()).await?;
                let mut removed_outputs = BTreeSet::new();
                for (output, err) in errored_outputs {
                    // Publish the error for this table and stop ingesting it
                    raw_handle
                        .give_fueled(
                            &data_cap_set[0],
                            (
                                (output.output_index, Err(err.clone().into())),
                                GtidPartition::minimum(),
                                1,
                            ),
                        )
                        .await;
                    trace!(%id, "timely-{worker_id} stopping snapshot of output {output:?} \
                                due to schema mismatch");
                    removed_outputs.insert(output.output_index);
                }
                for (_, outputs) in reader_snapshot_table_info.iter_mut() {
                    outputs.retain(|output| !removed_outputs.contains(&output.output_index));
                }
                reader_snapshot_table_info.retain(|_, outputs| !outputs.is_empty());

                let snapshot_total = fetch_snapshot_size(
                    &mut tx,
                    reader_snapshot_table_info
                        .iter()
                        .map(|(name, outputs)| ((*name).clone(), outputs.len()))
                        .collect(),
                    metrics,
                )
                .await?;

                stats_output.give(
                    &stats_cap[0],
                    ProgressStatisticsUpdate::Snapshot {
                        records_known: snapshot_total,
                        records_staged: 0,
                    },
                );

                // This worker has nothing else to do
                if reader_snapshot_table_info.is_empty() {
                    return Ok(());
                }

                // Read the snapshot data from the tables
                let mut final_row = Row::default();

                let mut snapshot_staged = 0;
                for (table, outputs) in &reader_snapshot_table_info {
                    let query = format!("SELECT * FROM {}", table);
                    trace!(%id, "timely-{worker_id} reading snapshot from \
                                 table '{table}'");
                    let mut results = tx.exec_stream(query, ()).await?;
                    let mut count = 0;
                    while let Some(row) = results.try_next().await? {
                        let row: MySqlRow = row;
                        for (output, row_val) in outputs.iter().repeat_clone(row) {
                            let event = match pack_mysql_row(&mut final_row, row_val, &output.desc)
                            {
                                Ok(row) => Ok(SourceMessage {
                                    key: Row::default(),
                                    value: row,
                                    metadata: Row::default(),
                                }),
                                // Produce a DefiniteError in the stream for any rows that fail to decode
                                Err(err @ MySqlError::ValueDecodeError { .. }) => {
                                    Err(DataflowError::from(DefiniteError::ValueDecodeError(
                                        err.to_string(),
                                    )))
                                }
                                Err(err) => Err(err)?,
                            };
                            raw_handle
                                .give_fueled(
                                    &data_cap_set[0],
                                    ((output.output_index, event), GtidPartition::minimum(), 1),
                                )
                                .await;
                            count += 1;
                            snapshot_staged += 1;
                            // TODO(guswynn): does this 1000 need to be configurable?
                            if snapshot_staged % 1000 == 0 {
                                stats_output.give(
                                    &stats_cap[0],
                                    ProgressStatisticsUpdate::Snapshot {
                                        records_known: snapshot_total,
                                        records_staged: snapshot_staged,
                                    },
                                );
                            }
                        }
                    }
                    trace!(%id, "timely-{worker_id} snapshotted {count} records from \
                                 table '{table}'");
                }

                // We are done with the snapshot so now we will emit rewind requests. It is
                // important that this happens after the snapshot has finished because this is what
                // unblocks the replication operator and we want this to happen serially. It might
                // seem like a good idea to read the replication stream concurrently with the
                // snapshot but it actually leads to a lot of data being staged for the future,
                // which needlesly consumed memory in the cluster.
                for (table, outputs) in reader_snapshot_table_info {
                    for output in outputs {
                        trace!(%id, "timely-{worker_id} producing rewind request for {table}\
                                     output {}", output.output_index);
                        let req = RewindRequest {
                            output_index: output.output_index,
                            snapshot_upper: snapshot_gtid_frontier.clone(),
                        };
                        rewinds_handle.give(&rewind_cap_set[0], req);
                    }
                }
                *rewind_cap_set = CapabilitySet::new();

                if snapshot_staged < snapshot_total {
                    error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow
                                 bigger than records staged {snapshot_staged}");
                    snapshot_staged = snapshot_total;
                }
                stats_output.give(
                    &stats_cap[0],
                    ProgressStatisticsUpdate::Snapshot {
                        records_known: snapshot_total,
                        records_staged: snapshot_staged,
                    },
                );
                Ok(())
            }))
        });

    // TODO: Split row decoding into a separate operator that can be distributed across all workers

    let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));

    (
        raw_data.as_collection(),
        rewinds,
        stats_stream,
        errors,
        button.press_on_drop(),
    )
}

/// Fetch the size of the snapshot on this worker.
async fn fetch_snapshot_size<'a, Q>(
    conn: &mut Q,
    tables: Vec<(MySqlTableName, usize)>,
    metrics: MySqlSnapshotMetrics,
) -> Result<u64, anyhow::Error>
where
    Q: Queryable,
{
    let mut total = 0;
    for (table, num_outputs) in tables {
        let stats = collect_table_statistics(conn, &table).await?;
        metrics.record_table_count_latency(table.1, table.0, stats.count_latency);
        total += stats.count * u64::cast_from(num_outputs);
    }
    Ok(total)
}

#[derive(Default)]
struct TableStatistics {
    count_latency: f64,
    count: u64,
}

async fn collect_table_statistics<Q>(
    conn: &mut Q,
    table: &MySqlTableName,
) -> Result<TableStatistics, anyhow::Error>
where
    Q: Queryable,
{
    let mut stats = TableStatistics::default();

    let count_row: Option<u64> = conn
        .query_first(format!("SELECT COUNT(*) FROM {}", table))
        .wall_time()
        .set_at(&mut stats.count_latency)
        .await?;
    stats.count = count_row.ok_or_else(|| anyhow::anyhow!("failed to COUNT(*) {table}"))?;

    Ok(stats)
}