Skip to main content

mz_storage/source/mysql/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the replication side of the [`MySqlSourceConnection`] ingestion dataflow.
11//!
12//! # Progress tracking using Partitioned Timestamps
13//!
14//! This dataflow uses a Partitioned Timestamp implementation to represent the GTID Set that
15//! comprises the full set of committed transactions from the MySQL Server. The frontier
16//! representing progress for this dataflow represents the full range of possible UUIDs +
17//! Transaction IDs of future GTIDs that could be added to the GTID Set.
18//!
19//! See the [`mz_storage_types::sources::mysql::GtidPartition`] type for more information.
20//!
21//! To maintain a complete frontier of the full UUID GTID range, we use a
22//! [`partitions::GtidReplicationPartitions`] struct to store the GTID Set as a set of partitions.
23//! This allows us to easily advance the frontier each time we see a new GTID on the replication
24//! stream.
25//!
26//! # Resumption
27//!
28//! When the dataflow is resumed, the MySQL replication stream is started from the GTID frontier
29//! of the minimum frontier across all source outputs. This is compared against the GTID set that
30//! may still be obtained from the MySQL server, using the @@GTID_PURGED value in MySQL to
31//! determine GTIDs that are no longer available in the binlog and to put the source in an error
32//! state if we cannot resume from the GTID frontier.
33//!
34//! # Rewinds
35//!
36//! The replication stream may be resumed from a point before the snapshot for a specific output
37//! occurs. To avoid double-counting updates that were present in the snapshot, we store a map
38//! of pending rewinds that we've received from the snapshot operator, and when we see updates
39//! for an output that were present in the snapshot, we negate the snapshot update
40//! (at the minimum timestamp) and send it again at the correct GTID.
41
42use std::collections::BTreeMap;
43use std::num::NonZeroU64;
44use std::pin::pin;
45use std::sync::Arc;
46
47use differential_dataflow::AsCollection;
48use futures::StreamExt;
49use itertools::Itertools;
50use mysql_async::prelude::Queryable;
51use mysql_async::{BinlogStream, BinlogStreamRequest, GnoInterval, Sid};
52use mz_ore::future::InTask;
53use mz_ssh_util::tunnel_manager::ManagedSshTunnelHandle;
54use mz_timely_util::containers::stack::AccountedStackBuilder;
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Exchange;
58use timely::dataflow::operators::Concat;
59use timely::dataflow::operators::core::Map;
60use timely::dataflow::{Scope, StreamVec};
61use timely::progress::{Antichain, Timestamp};
62use tracing::trace;
63use uuid::Uuid;
64
65use mz_mysql_util::{
66    ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE, MySqlConn, MySqlError, query_sys_var,
67};
68use mz_ore::cast::CastFrom;
69use mz_repr::GlobalId;
70use mz_storage_types::errors::DataflowError;
71use mz_storage_types::sources::MySqlSourceConnection;
72use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
73use mz_timely_util::builder_async::{
74    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
75};
76
77use crate::metrics::source::mysql::MySqlSourceMetrics;
78use crate::source::RawSourceCreationConfig;
79use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
80
81use super::{
82    DefiniteError, ReplicationError, RewindRequest, SourceOutputInfo, TransientError,
83    return_definite_error, validate_mysql_repl_settings,
84};
85
86mod context;
87mod events;
88mod partitions;
89
90/// Used as a partition id to determine if the worker is
91/// responsible for reading from the MySQL replication stream
92static REPL_READER: &str = "reader";
93
94/// A constant arbitrary offset to add to the source-id to
95/// produce a deterministic server-id for identifying Materialize
96/// as a replica on the upstream MySQL server.
97/// TODO(roshan): Add user-facing documentation for this
98static REPLICATION_SERVER_ID_OFFSET: u32 = 524000;
99
100/// Renders the replication dataflow. See the module documentation for more
101/// information.
102pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
103    scope: G,
104    config: RawSourceCreationConfig,
105    connection: MySqlSourceConnection,
106    source_outputs: Vec<SourceOutputInfo>,
107    rewind_stream: StreamVec<G, RewindRequest>,
108    metrics: MySqlSourceMetrics,
109) -> (
110    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
111    StreamVec<G, ReplicationError>,
112    PressOnDropButton,
113) {
114    let op_name = format!("MySqlReplicationReader({})", config.id);
115    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
116
117    let repl_reader_id = u64::cast_from(config.responsible_worker(REPL_READER));
118    let (mut data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
119    // Captures DefiniteErrors that affect the entire source, including all outputs
120    let (definite_error_handle, definite_errors) =
121        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
122    let mut rewind_input = builder.new_input_for(
123        rewind_stream,
124        Exchange::new(move |_| repl_reader_id),
125        &data_output,
126    );
127
128    let output_indexes = source_outputs
129        .iter()
130        .map(|output| output.output_index)
131        .collect_vec();
132
133    metrics.tables.set(u64::cast_from(source_outputs.len()));
134
135    let (button, transient_errors) = builder.build_fallible(move |caps| {
136        let busy_signal = Arc::clone(&config.busy_signal);
137        Box::pin(SignaledFuture::new(busy_signal, async move {
138            let (id, worker_id) = (config.id, config.worker_id);
139            let [data_cap_set, definite_error_cap_set]: &mut [_; 2] = caps.try_into().unwrap();
140
141            // Only run the replication reader on the worker responsible for it.
142            if !config.responsible_for(REPL_READER) {
143                return Ok(());
144            }
145
146            let connection_config = connection
147                .connection
148                .config(
149                    &config.config.connection_context.secrets_reader,
150                    &config.config,
151                    InTask::Yes,
152                )
153                .await?;
154
155            let mut conn = connection_config
156                .connect(
157                    &format!("timely-{worker_id} MySQL replication reader"),
158                    &config.config.connection_context.ssh_tunnel_manager,
159                )
160                .await?;
161
162            // Get the set of GTIDs that have been purged from the binlogs. The assumption is that this
163            // represents the frontier of possible GTIDs that exist in the binlog, that we can start
164            // replicating from.
165            let binlog_purged_set = query_sys_var(&mut conn, "global.gtid_purged").await?;
166            let binlog_frontier = match gtid_set_frontier(&binlog_purged_set) {
167                Ok(frontier) => frontier,
168                Err(err) => {
169                    let err = DefiniteError::UnsupportedGtidState(err.to_string());
170                    tracing::warn!(%id, "Unable to determine GTID frontier from @@gtid_purged: {err}");
171                    return Ok(
172                        // If GTID intervals in the binlog are not available in a monotonic consecutive
173                        // order this breaks all of our assumptions and there is nothing else we can do.
174                        // This can occur if the mysql server is restored to a previous point-in-time
175                        // or if a user manually adds transactions to the @@gtid_purged system var.
176                        return_definite_error(
177                            err,
178                            &output_indexes,
179                            &data_output,
180                            data_cap_set,
181                            &definite_error_handle,
182                            definite_error_cap_set,
183                        )
184                        .await,
185                    );
186                }
187            };
188
189            trace!(%id, "timely-{worker_id} replication binlog frontier: {binlog_frontier:?}");
190
191            // upstream-table-name: Vec<SourceOutputInfo> since multiple
192            // outputs can refer to the same table
193            let mut table_info = BTreeMap::new();
194            let mut output_uppers = Vec::new();
195
196            // Calculate the lowest frontier across all outputs, which represents the point which
197            // we should start replication from.
198            let min_frontier = Antichain::from_elem(GtidPartition::minimum());
199            for output in source_outputs.into_iter() {
200                // If an output is resuming at the minimum frontier then its snapshot
201                // has not yet been committed.
202                // We need to resume from a frontier before the output's snapshot frontier
203                // to ensure we don't miss updates that happen after the snapshot was taken.
204                //
205                // This also ensures that tables created as part of the same CREATE SOURCE
206                // statement are 'effectively' snapshot at the same GTID Set, even if their
207                // actual snapshot frontiers are different due to a restart.
208                //
209                // We've chosen the frontier beyond the GTID Set recorded
210                // during purification as this resume point.
211                if &output.resume_upper == &min_frontier {
212                    output_uppers.push(output.initial_gtid_set.clone());
213                } else {
214                    output_uppers.push(output.resume_upper.clone());
215                }
216
217                table_info
218                    .entry(output.table_name.clone())
219                    .or_insert_with(Vec::new)
220                    .push(output);
221            }
222            let resume_upper = match output_uppers.len() {
223                0 => {
224                    // If there are no outputs to replicate then we will just be updating the
225                    // source progress collection. In this case we can just start from the head of
226                    // the binlog to avoid wasting time on old events.
227                    trace!(%id, "timely-{worker_id} replication reader found no outputs \
228                                 to replicate, using latest gtid_executed as resume_upper");
229                    let executed_gtid_set =
230                        query_sys_var(&mut conn, "global.gtid_executed").await?;
231
232                    gtid_set_frontier(&executed_gtid_set)?
233                }
234                _ => Antichain::from_iter(output_uppers.into_iter().flatten()),
235            };
236
237            // Validate that we can actually resume from this upper.
238            if !PartialOrder::less_equal(&binlog_frontier, &resume_upper) {
239                let err = DefiniteError::BinlogMissingResumePoint(
240                    format!("{:?}", binlog_frontier),
241                    format!("{:?}", resume_upper),
242                );
243                return Ok(return_definite_error(
244                    err,
245                    &output_indexes,
246                    &data_output,
247                    data_cap_set,
248                    &definite_error_handle,
249                    definite_error_cap_set,
250                )
251                .await);
252            };
253
254            data_cap_set.downgrade(&*resume_upper);
255            trace!(%id, "timely-{worker_id} replication reader started at {:?}", resume_upper);
256
257            let mut rewinds = BTreeMap::new();
258            while let Some(event) = rewind_input.next().await {
259                if let AsyncEvent::Data(caps, data) = event {
260                    for req in data {
261                        // Check that the replication stream will be resumed from the snapshot point or before.
262                        if !PartialOrder::less_equal(&resume_upper, &req.snapshot_upper) {
263                            let err = DefiniteError::BinlogMissingResumePoint(
264                                format!("{:?}", resume_upper),
265                                format!("{:?}", req.snapshot_upper),
266                            );
267                            return Ok(return_definite_error(
268                                err,
269                                &output_indexes,
270                                &data_output,
271                                data_cap_set,
272                                &definite_error_handle,
273                                definite_error_cap_set,
274                            )
275                            .await);
276                        };
277                        // If the snapshot point is the same as the resume point then we don't need to rewind
278                        if resume_upper != req.snapshot_upper {
279                            rewinds.insert(req.output_index.clone(), (caps.clone(), req));
280                        }
281                    }
282                }
283            }
284            trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
285
286            // We don't use _conn_tunnel_handle here, but need to keep it around to ensure that the
287            // SSH tunnel is not dropped until the replication stream is dropped.
288            let (binlog_stream, _conn_tunnel_handle) =
289                match raw_stream(&config, conn, &resume_upper).await? {
290                    Ok(stream) => stream,
291                    // If the replication stream cannot be obtained in a definite way there is
292                    // nothing else to do. These errors are not retractable.
293                    Err(err) => {
294                        return Ok(return_definite_error(
295                            err,
296                            &output_indexes,
297                            &data_output,
298                            data_cap_set,
299                            &definite_error_handle,
300                            definite_error_cap_set,
301                        )
302                        .await);
303                    }
304                };
305            let mut stream = pin!(binlog_stream.peekable());
306
307            // Store all partitions from the resume_upper so we can create a frontier that comprises
308            // timestamps for partitions representing the full range of UUIDs to advance our main
309            // capabilities.
310            let mut data_partitions =
311                partitions::GtidReplicationPartitions::from(resume_upper.clone());
312            let mut progress_partitions = partitions::GtidReplicationPartitions::from(resume_upper);
313
314            let mut repl_context = context::ReplContext::new(
315                &config,
316                &connection_config,
317                stream.as_mut(),
318                &table_info,
319                &metrics,
320                &mut data_output,
321                data_cap_set,
322                rewinds,
323            );
324
325            let mut active_tx: Option<(Uuid, NonZeroU64)> = None;
326
327            let mut row_event_buffer = Vec::new();
328
329            while let Some(event) = repl_context.stream.next().await {
330                use mysql_async::binlog::events::*;
331                let event = event?;
332                let event_data = event.read_data()?;
333                metrics.total.inc();
334
335                match event_data {
336                    Some(EventData::XidEvent(_)) => {
337                        // We've received a transaction commit event, which means that we've seen
338                        // all events for the current GTID and we can advance the frontier beyond.
339                        let (source_id, tx_id) = active_tx.take().expect("unexpected xid event");
340
341                        // Increment the transaction-id to the next GTID we should see from this source-id
342                        let next_tx_id = tx_id.checked_add(1).unwrap();
343                        let next_gtid =
344                            GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
345
346                        if let Err(err) = data_partitions.advance_frontier(next_gtid) {
347                            return Ok(return_definite_error(
348                                err,
349                                &output_indexes,
350                                &data_output,
351                                data_cap_set,
352                                &definite_error_handle,
353                                definite_error_cap_set,
354                            )
355                            .await);
356                        }
357                        let new_upper = data_partitions.frontier();
358                        repl_context.downgrade_data_cap_set("xid_event", new_upper);
359                    }
360                    // We receive a GtidEvent that tells us the GTID of the incoming RowsEvents (and other events)
361                    Some(EventData::GtidEvent(event)) => {
362                        let source_id = Uuid::from_bytes(event.sid());
363                        let tx_id = NonZeroU64::new(event.gno()).unwrap();
364
365                        // We are potentially about to ingest a big transaction that we don't want
366                        // to store in memory. For this reason we are immediately downgrading our
367                        // progress frontier to one that includes the upcoming transaction. This
368                        // will cause a remap binding to be minted right away and so the data of
369                        // the transaction will not accumulate in the reclock operator.
370                        let next_tx_id = tx_id.checked_add(1).unwrap();
371                        let next_gtid =
372                            GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
373
374                        if let Err(err) = progress_partitions.advance_frontier(next_gtid) {
375                            return Ok(return_definite_error(
376                                err,
377                                &output_indexes,
378                                &data_output,
379                                data_cap_set,
380                                &definite_error_handle,
381                                definite_error_cap_set,
382                            )
383                            .await);
384                        }
385                        // Store the information of the active transaction for the subsequent events
386                        active_tx = Some((source_id, tx_id));
387                    }
388                    Some(EventData::RowsEvent(data)) => {
389                        let (source_id, tx_id) = active_tx
390                            .clone()
391                            .expect("gtid cap should be set by previous GtidEvent");
392                        let cur_gtid =
393                            GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
394
395                        events::handle_rows_event(
396                            data,
397                            &repl_context,
398                            &cur_gtid,
399                            &mut row_event_buffer,
400                        )
401                        .await?;
402
403                        // Advance the frontier up to the point right before this GTID, since we
404                        // might still see other events that are part of this same GTID, such as
405                        // row events for multiple tables or large row events split into multiple.
406                        if let Err(err) = data_partitions.advance_frontier(cur_gtid) {
407                            return Ok(return_definite_error(
408                                err,
409                                &output_indexes,
410                                &data_output,
411                                data_cap_set,
412                                &definite_error_handle,
413                                definite_error_cap_set,
414                            )
415                            .await);
416                        }
417                        let new_upper = data_partitions.frontier();
418                        repl_context.downgrade_data_cap_set("rows_event", new_upper);
419                    }
420                    Some(EventData::QueryEvent(event)) => {
421                        let (source_id, tx_id) = active_tx
422                            .clone()
423                            .expect("gtid cap should be set by previous GtidEvent");
424                        let cur_gtid =
425                            GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
426
427                        let should_advance =
428                            events::handle_query_event(event, &mut repl_context, &cur_gtid).await?;
429
430                        if should_advance {
431                            active_tx = None;
432                            // Increment the transaction-id to the next GTID we should see from this source-id
433                            let next_tx_id = tx_id.checked_add(1).unwrap();
434                            let next_gtid = GtidPartition::new_singleton(
435                                source_id,
436                                GtidState::Active(next_tx_id),
437                            );
438
439                            if let Err(err) = data_partitions.advance_frontier(next_gtid) {
440                                return Ok(return_definite_error(
441                                    err,
442                                    &output_indexes,
443                                    &data_output,
444                                    data_cap_set,
445                                    &definite_error_handle,
446                                    definite_error_cap_set,
447                                )
448                                .await);
449                            }
450                            let new_upper = data_partitions.frontier();
451                            repl_context.downgrade_data_cap_set("query_event", new_upper);
452                        }
453                    }
454                    _ => {
455                        // TODO: Handle other event types
456                        metrics.ignored.inc();
457                    }
458                }
459            }
460            // We never expect the replication stream to gracefully end
461            Err(TransientError::ReplicationEOF)
462        }))
463    });
464
465    // TODO: Split row decoding into a separate operator that can be distributed across all workers
466
467    let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
468
469    (data_stream.as_collection(), errors, button.press_on_drop())
470}
471
472/// Produces the replication stream from the MySQL server. This will return all transactions
473/// whose GTIDs were not present in the GTID UUIDs referenced in the `resume_uppper` partitions.
474async fn raw_stream(
475    config: &RawSourceCreationConfig,
476    mut conn: MySqlConn,
477    resume_upper: &Antichain<GtidPartition>,
478) -> Result<Result<(BinlogStream, Option<ManagedSshTunnelHandle>), DefiniteError>, TransientError> {
479    // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
480    match validate_mysql_repl_settings(&mut conn).await {
481        Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
482            return Ok(Err(DefiniteError::ServerConfigurationError(
483                err.to_string(),
484            )));
485        }
486        Err(err) => Err(err)?,
487        Ok(()) => (),
488    };
489
490    // To start the stream we need to provide a GTID set of the transactions that we've 'seen'
491    // and the server will send us all transactions that have been committed after that point.
492    // NOTE: The 'Gno' intervals in this transaction-set use an open set [start, end)
493    // interval, which is different than the closed-set [start, end] form returned by the
494    // @gtid_executed system variable. So the intervals we construct in this GTID set
495    // end with the value of the transaction-id that we want to start replication at,
496    // which happens to be the same as the definition of a frontier value.
497    // https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#sysvar_gtid_executed
498    // https://dev.mysql.com/doc/dev/mysql-server/latest/classGtid__set.html#ab46da5ceeae0198b90f209b0a8be2a24
499    let seen_gtids = resume_upper
500        .iter()
501        .flat_map(|partition| match partition.timestamp() {
502            GtidState::Absent => None,
503            GtidState::Active(frontier_time) => {
504                let part_uuid = partition
505                    .interval()
506                    .singleton()
507                    .expect("Non-absent paritions will be singletons");
508                // NOTE: Since we enforce replica_preserve_commit_order=ON we can start the interval at 1
509                // since we know that all transactions with a lower transaction id were monotonic
510                Some(
511                    Sid::new(*part_uuid.as_bytes())
512                        .with_interval(GnoInterval::new(1, frontier_time.get())),
513                )
514            }
515        })
516        .collect::<Vec<_>>();
517
518    // Request that the stream provide us with a heartbeat message when no other messages have
519    // been sent. This isn't strictly necessary, but is a lightweight additional general
520    // health-check for the replication stream
521    conn.query_drop(format!(
522        "SET @master_heartbeat_period = {};",
523        mz_storage_types::dyncfgs::MYSQL_REPLICATION_HEARTBEAT_INTERVAL
524            .get(config.config.config_set())
525            .as_nanos()
526    ))
527    .await?;
528
529    // Generate a deterministic server-id for identifying us as a replica on the upstream mysql server.
530    // The value does not actually matter since it's irrelevant for GTID-based replication and won't
531    // cause errors if it happens to be the same as another replica in the mysql cluster (based on testing),
532    // but by setting it to a constant value we can make it easier for users to identify Materialize connections
533    let server_id = match config.id {
534        GlobalId::System(id) => id,
535        GlobalId::User(id) => id,
536        GlobalId::Transient(id) => id,
537        _ => unreachable!(),
538    };
539    let server_id = match u32::try_from(server_id) {
540        Ok(id) if id + REPLICATION_SERVER_ID_OFFSET < u32::MAX => id + REPLICATION_SERVER_ID_OFFSET,
541        _ => REPLICATION_SERVER_ID_OFFSET,
542    };
543
544    trace!(
545        "requesting replication stream with seen_gtids: {seen_gtids:?} \
546         and server_id: {server_id:?}"
547    );
548
549    // We need to transform the connection into a BinlogStream (which takes the `Conn` by value),
550    // but to avoid dropping any active SSH tunnel used by the connection we need to preserve the
551    // tunnel handle and return it
552    let (inner_conn, conn_tunnel_handle) = conn.take();
553
554    let repl_stream = match inner_conn
555        .get_binlog_stream(
556            BinlogStreamRequest::new(server_id)
557                .with_gtid()
558                .with_gtid_set(seen_gtids),
559        )
560        .await
561    {
562        Ok(stream) => stream,
563        Err(mysql_async::Error::Server(ref server_err))
564            if server_err.code == ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE =>
565        {
566            // The GTID set we requested is no longer available
567            return Ok(Err(DefiniteError::BinlogNotAvailable));
568        }
569        // TODO: handle other error types. Some may require a re-snapshot and some may be transient
570        Err(err) => return Err(err.into()),
571    };
572
573    Ok(Ok((repl_stream, conn_tunnel_handle)))
574}