Skip to main content

mz_storage/source/mysql/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the replication side of the [`MySqlSourceConnection`] ingestion dataflow.
11//!
12//! # Progress tracking using Partitioned Timestamps
13//!
14//! This dataflow uses a Partitioned Timestamp implementation to represent the GTID Set that
15//! comprises the full set of committed transactions from the MySQL Server. The frontier
16//! representing progress for this dataflow represents the full range of possible UUIDs +
17//! Transaction IDs of future GTIDs that could be added to the GTID Set.
18//!
19//! See the [`mz_storage_types::sources::mysql::GtidPartition`] type for more information.
20//!
21//! To maintain a complete frontier of the full UUID GTID range, we use a
22//! [`partitions::GtidReplicationPartitions`] struct to store the GTID Set as a set of partitions.
23//! This allows us to easily advance the frontier each time we see a new GTID on the replication
24//! stream.
25//!
26//! # Resumption
27//!
28//! When the dataflow is resumed, the MySQL replication stream is started from the GTID frontier
29//! of the minimum frontier across all source outputs. This is compared against the GTID set that
30//! may still be obtained from the MySQL server, using the @@GTID_PURGED value in MySQL to
31//! determine GTIDs that are no longer available in the binlog and to put the source in an error
32//! state if we cannot resume from the GTID frontier.
33//!
34//! # Rewinds
35//!
36//! The replication stream may be resumed from a point before the snapshot for a specific output
37//! occurs. To avoid double-counting updates that were present in the snapshot, we store a map
38//! of pending rewinds that we've received from the snapshot operator, and when we see updates
39//! for an output that were present in the snapshot, we negate the snapshot update
40//! (at the minimum timestamp) and send it again at the correct GTID.
41
42use std::collections::BTreeMap;
43use std::num::NonZeroU64;
44use std::pin::pin;
45use std::sync::Arc;
46
47use differential_dataflow::AsCollection;
48use futures::StreamExt;
49use itertools::Itertools;
50use mysql_async::prelude::Queryable;
51use mysql_async::{BinlogStream, BinlogStreamRequest, GnoInterval, Sid};
52use mz_ore::future::InTask;
53use mz_ssh_util::tunnel_manager::ManagedSshTunnelHandle;
54use mz_timely_util::containers::stack::AccountedStackBuilder;
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Exchange;
58use timely::dataflow::operators::Concat;
59use timely::dataflow::operators::core::Map;
60use timely::dataflow::{Scope, StreamVec};
61use timely::progress::{Antichain, Timestamp};
62use tracing::trace;
63use uuid::Uuid;
64
65use mz_mysql_util::{
66    ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE, MySqlConn, MySqlError, query_sys_var,
67};
68use mz_ore::cast::CastFrom;
69use mz_repr::GlobalId;
70use mz_storage_types::errors::DataflowError;
71use mz_storage_types::sources::MySqlSourceConnection;
72use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
73use mz_timely_util::builder_async::{
74    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
75};
76
77use crate::metrics::source::mysql::MySqlSourceMetrics;
78use crate::source::RawSourceCreationConfig;
79use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
80
81use super::{
82    DefiniteError, ReplicationError, RewindRequest, SourceOutputInfo, TransientError,
83    return_definite_error, validate_mysql_repl_settings,
84};
85
86mod context;
87mod events;
88mod partitions;
89
90/// Used as a partition id to determine if the worker is
91/// responsible for reading from the MySQL replication stream
92static REPL_READER: &str = "reader";
93
94/// A constant arbitrary offset to add to the source-id to
95/// produce a deterministic server-id for identifying Materialize
96/// as a replica on the upstream MySQL server.
97/// TODO(roshan): Add user-facing documentation for this
98static REPLICATION_SERVER_ID_OFFSET: u32 = 524000;
99
100/// Renders the replication dataflow. See the module documentation for more
101/// information.
102pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
103    scope: G,
104    config: RawSourceCreationConfig,
105    connection: MySqlSourceConnection,
106    source_outputs: Vec<SourceOutputInfo>,
107    rewind_stream: StreamVec<G, RewindRequest>,
108    metrics: MySqlSourceMetrics,
109) -> (
110    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
111    StreamVec<G, ReplicationError>,
112    PressOnDropButton,
113) {
114    let op_name = format!("MySqlReplicationReader({})", config.id);
115    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
116
117    let repl_reader_id = u64::cast_from(config.responsible_worker(REPL_READER));
118    let (mut data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
119    // Captures DefiniteErrors that affect the entire source, including all outputs
120    let (definite_error_handle, definite_errors) =
121        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
122    let mut rewind_input = builder.new_input_for(
123        rewind_stream,
124        Exchange::new(move |_| repl_reader_id),
125        &data_output,
126    );
127
128    let output_indexes = source_outputs
129        .iter()
130        .map(|output| output.output_index)
131        .collect_vec();
132
133    metrics.tables.set(u64::cast_from(source_outputs.len()));
134
135    let (button, transient_errors) = builder.build_fallible(move |caps| {
136        let busy_signal = Arc::clone(&config.busy_signal);
137        Box::pin(SignaledFuture::new(busy_signal, async move {
138            let (id, worker_id) = (config.id, config.worker_id);
139            let [data_cap_set, definite_error_cap_set]: &mut [_; 2] = caps.try_into().unwrap();
140
141            // Only run the replication reader on the worker responsible for it.
142            if !config.responsible_for(REPL_READER) {
143                return Ok(());
144            }
145
146            let connection_config = connection
147                .connection
148                .config(
149                    &config.config.connection_context.secrets_reader,
150                    &config.config,
151                    InTask::Yes,
152                )
153                .await?;
154
155            let mut conn = connection_config
156                .connect(
157                    &format!("timely-{worker_id} MySQL replication reader"),
158                    &config.config.connection_context.ssh_tunnel_manager,
159                )
160                .await?;
161
162            // Get the set of GTIDs that have been purged from the binlogs. The assumption is that this
163            // represents the frontier of possible GTIDs that exist in the binlog, that we can start
164            // replicating from.
165            let binlog_purged_set = query_sys_var(&mut conn, "global.gtid_purged").await?;
166            let binlog_frontier = match gtid_set_frontier(&binlog_purged_set) {
167                Ok(frontier) => frontier,
168                Err(err) => {
169                    let err = DefiniteError::UnsupportedGtidState(err.to_string());
170                    return Ok(
171                        // If GTID intervals in the binlog are not available in a monotonic consecutive
172                        // order this breaks all of our assumptions and there is nothing else we can do.
173                        // This can occur if the mysql server is restored to a previous point-in-time
174                        // or if a user manually adds transactions to the @@gtid_purged system var.
175                        return_definite_error(
176                            err,
177                            &output_indexes,
178                            &data_output,
179                            data_cap_set,
180                            &definite_error_handle,
181                            definite_error_cap_set,
182                        )
183                        .await,
184                    );
185                }
186            };
187
188            trace!(%id, "timely-{worker_id} replication binlog frontier: {binlog_frontier:?}");
189
190            // upstream-table-name: Vec<SourceOutputInfo> since multiple
191            // outputs can refer to the same table
192            let mut table_info = BTreeMap::new();
193            let mut output_uppers = Vec::new();
194
195            // Calculate the lowest frontier across all outputs, which represents the point which
196            // we should start replication from.
197            let min_frontier = Antichain::from_elem(GtidPartition::minimum());
198            for output in source_outputs.into_iter() {
199                // If an output is resuming at the minimum frontier then its snapshot
200                // has not yet been committed.
201                // We need to resume from a frontier before the output's snapshot frontier
202                // to ensure we don't miss updates that happen after the snapshot was taken.
203                //
204                // This also ensures that tables created as part of the same CREATE SOURCE
205                // statement are 'effectively' snapshot at the same GTID Set, even if their
206                // actual snapshot frontiers are different due to a restart.
207                //
208                // We've chosen the frontier beyond the GTID Set recorded
209                // during purification as this resume point.
210                if &output.resume_upper == &min_frontier {
211                    output_uppers.push(output.initial_gtid_set.clone());
212                } else {
213                    output_uppers.push(output.resume_upper.clone());
214                }
215
216                table_info
217                    .entry(output.table_name.clone())
218                    .or_insert_with(Vec::new)
219                    .push(output);
220            }
221            let resume_upper = match output_uppers.len() {
222                0 => {
223                    // If there are no outputs to replicate then we will just be updating the
224                    // source progress collection. In this case we can just start from the head of
225                    // the binlog to avoid wasting time on old events.
226                    trace!(%id, "timely-{worker_id} replication reader found no outputs \
227                                 to replicate, using latest gtid_executed as resume_upper");
228                    let executed_gtid_set =
229                        query_sys_var(&mut conn, "global.gtid_executed").await?;
230
231                    gtid_set_frontier(&executed_gtid_set)?
232                }
233                _ => Antichain::from_iter(output_uppers.into_iter().flatten()),
234            };
235
236            // Validate that we can actually resume from this upper.
237            if !PartialOrder::less_equal(&binlog_frontier, &resume_upper) {
238                let err = DefiniteError::BinlogMissingResumePoint(
239                    format!("{:?}", binlog_frontier),
240                    format!("{:?}", resume_upper),
241                );
242                return Ok(return_definite_error(
243                    err,
244                    &output_indexes,
245                    &data_output,
246                    data_cap_set,
247                    &definite_error_handle,
248                    definite_error_cap_set,
249                )
250                .await);
251            };
252
253            data_cap_set.downgrade(&*resume_upper);
254            trace!(%id, "timely-{worker_id} replication reader started at {:?}", resume_upper);
255
256            let mut rewinds = BTreeMap::new();
257            while let Some(event) = rewind_input.next().await {
258                if let AsyncEvent::Data(caps, data) = event {
259                    for req in data {
260                        // Check that the replication stream will be resumed from the snapshot point or before.
261                        if !PartialOrder::less_equal(&resume_upper, &req.snapshot_upper) {
262                            let err = DefiniteError::BinlogMissingResumePoint(
263                                format!("{:?}", resume_upper),
264                                format!("{:?}", req.snapshot_upper),
265                            );
266                            return Ok(return_definite_error(
267                                err,
268                                &output_indexes,
269                                &data_output,
270                                data_cap_set,
271                                &definite_error_handle,
272                                definite_error_cap_set,
273                            )
274                            .await);
275                        };
276                        // If the snapshot point is the same as the resume point then we don't need to rewind
277                        if resume_upper != req.snapshot_upper {
278                            rewinds.insert(req.output_index.clone(), (caps.clone(), req));
279                        }
280                    }
281                }
282            }
283            trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
284
285            // We don't use _conn_tunnel_handle here, but need to keep it around to ensure that the
286            // SSH tunnel is not dropped until the replication stream is dropped.
287            let (binlog_stream, _conn_tunnel_handle) =
288                match raw_stream(&config, conn, &resume_upper).await? {
289                    Ok(stream) => stream,
290                    // If the replication stream cannot be obtained in a definite way there is
291                    // nothing else to do. These errors are not retractable.
292                    Err(err) => {
293                        return Ok(return_definite_error(
294                            err,
295                            &output_indexes,
296                            &data_output,
297                            data_cap_set,
298                            &definite_error_handle,
299                            definite_error_cap_set,
300                        )
301                        .await);
302                    }
303                };
304            let mut stream = pin!(binlog_stream.peekable());
305
306            // Store all partitions from the resume_upper so we can create a frontier that comprises
307            // timestamps for partitions representing the full range of UUIDs to advance our main
308            // capabilities.
309            let mut data_partitions =
310                partitions::GtidReplicationPartitions::from(resume_upper.clone());
311            let mut progress_partitions = partitions::GtidReplicationPartitions::from(resume_upper);
312
313            let mut repl_context = context::ReplContext::new(
314                &config,
315                &connection_config,
316                stream.as_mut(),
317                &table_info,
318                &metrics,
319                &mut data_output,
320                data_cap_set,
321                rewinds,
322            );
323
324            let mut active_tx: Option<(Uuid, NonZeroU64)> = None;
325
326            let mut row_event_buffer = Vec::new();
327
328            while let Some(event) = repl_context.stream.next().await {
329                use mysql_async::binlog::events::*;
330                let event = event?;
331                let event_data = event.read_data()?;
332                metrics.total.inc();
333
334                match event_data {
335                    Some(EventData::XidEvent(_)) => {
336                        // We've received a transaction commit event, which means that we've seen
337                        // all events for the current GTID and we can advance the frontier beyond.
338                        let (source_id, tx_id) = active_tx.take().expect("unexpected xid event");
339
340                        // Increment the transaction-id to the next GTID we should see from this source-id
341                        let next_tx_id = tx_id.checked_add(1).unwrap();
342                        let next_gtid =
343                            GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
344
345                        if let Err(err) = data_partitions.advance_frontier(next_gtid) {
346                            return Ok(return_definite_error(
347                                err,
348                                &output_indexes,
349                                &data_output,
350                                data_cap_set,
351                                &definite_error_handle,
352                                definite_error_cap_set,
353                            )
354                            .await);
355                        }
356                        let new_upper = data_partitions.frontier();
357                        repl_context.downgrade_data_cap_set("xid_event", new_upper);
358                    }
359                    // We receive a GtidEvent that tells us the GTID of the incoming RowsEvents (and other events)
360                    Some(EventData::GtidEvent(event)) => {
361                        let source_id = Uuid::from_bytes(event.sid());
362                        let tx_id = NonZeroU64::new(event.gno()).unwrap();
363
364                        // We are potentially about to ingest a big transaction that we don't want
365                        // to store in memory. For this reason we are immediately downgrading our
366                        // progress frontier to one that includes the upcoming transaction. This
367                        // will cause a remap binding to be minted right away and so the data of
368                        // the transaction will not accumulate in the reclock operator.
369                        let next_tx_id = tx_id.checked_add(1).unwrap();
370                        let next_gtid =
371                            GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
372
373                        if let Err(err) = progress_partitions.advance_frontier(next_gtid) {
374                            return Ok(return_definite_error(
375                                err,
376                                &output_indexes,
377                                &data_output,
378                                data_cap_set,
379                                &definite_error_handle,
380                                definite_error_cap_set,
381                            )
382                            .await);
383                        }
384                        // Store the information of the active transaction for the subsequent events
385                        active_tx = Some((source_id, tx_id));
386                    }
387                    Some(EventData::RowsEvent(data)) => {
388                        let (source_id, tx_id) = active_tx
389                            .clone()
390                            .expect("gtid cap should be set by previous GtidEvent");
391                        let cur_gtid =
392                            GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
393
394                        events::handle_rows_event(
395                            data,
396                            &repl_context,
397                            &cur_gtid,
398                            &mut row_event_buffer,
399                        )
400                        .await?;
401
402                        // Advance the frontier up to the point right before this GTID, since we
403                        // might still see other events that are part of this same GTID, such as
404                        // row events for multiple tables or large row events split into multiple.
405                        if let Err(err) = data_partitions.advance_frontier(cur_gtid) {
406                            return Ok(return_definite_error(
407                                err,
408                                &output_indexes,
409                                &data_output,
410                                data_cap_set,
411                                &definite_error_handle,
412                                definite_error_cap_set,
413                            )
414                            .await);
415                        }
416                        let new_upper = data_partitions.frontier();
417                        repl_context.downgrade_data_cap_set("rows_event", new_upper);
418                    }
419                    Some(EventData::QueryEvent(event)) => {
420                        let (source_id, tx_id) = active_tx
421                            .clone()
422                            .expect("gtid cap should be set by previous GtidEvent");
423                        let cur_gtid =
424                            GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
425
426                        let should_advance =
427                            events::handle_query_event(event, &mut repl_context, &cur_gtid).await?;
428
429                        if should_advance {
430                            active_tx = None;
431                            // Increment the transaction-id to the next GTID we should see from this source-id
432                            let next_tx_id = tx_id.checked_add(1).unwrap();
433                            let next_gtid = GtidPartition::new_singleton(
434                                source_id,
435                                GtidState::Active(next_tx_id),
436                            );
437
438                            if let Err(err) = data_partitions.advance_frontier(next_gtid) {
439                                return Ok(return_definite_error(
440                                    err,
441                                    &output_indexes,
442                                    &data_output,
443                                    data_cap_set,
444                                    &definite_error_handle,
445                                    definite_error_cap_set,
446                                )
447                                .await);
448                            }
449                            let new_upper = data_partitions.frontier();
450                            repl_context.downgrade_data_cap_set("query_event", new_upper);
451                        }
452                    }
453                    _ => {
454                        // TODO: Handle other event types
455                        metrics.ignored.inc();
456                    }
457                }
458            }
459            // We never expect the replication stream to gracefully end
460            Err(TransientError::ReplicationEOF)
461        }))
462    });
463
464    // TODO: Split row decoding into a separate operator that can be distributed across all workers
465
466    let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
467
468    (data_stream.as_collection(), errors, button.press_on_drop())
469}
470
471/// Produces the replication stream from the MySQL server. This will return all transactions
472/// whose GTIDs were not present in the GTID UUIDs referenced in the `resume_uppper` partitions.
473async fn raw_stream(
474    config: &RawSourceCreationConfig,
475    mut conn: MySqlConn,
476    resume_upper: &Antichain<GtidPartition>,
477) -> Result<Result<(BinlogStream, Option<ManagedSshTunnelHandle>), DefiniteError>, TransientError> {
478    // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
479    match validate_mysql_repl_settings(&mut conn).await {
480        Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
481            return Ok(Err(DefiniteError::ServerConfigurationError(
482                err.to_string(),
483            )));
484        }
485        Err(err) => Err(err)?,
486        Ok(()) => (),
487    };
488
489    // To start the stream we need to provide a GTID set of the transactions that we've 'seen'
490    // and the server will send us all transactions that have been committed after that point.
491    // NOTE: The 'Gno' intervals in this transaction-set use an open set [start, end)
492    // interval, which is different than the closed-set [start, end] form returned by the
493    // @gtid_executed system variable. So the intervals we construct in this GTID set
494    // end with the value of the transaction-id that we want to start replication at,
495    // which happens to be the same as the definition of a frontier value.
496    // https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#sysvar_gtid_executed
497    // https://dev.mysql.com/doc/dev/mysql-server/latest/classGtid__set.html#ab46da5ceeae0198b90f209b0a8be2a24
498    let seen_gtids = resume_upper
499        .iter()
500        .flat_map(|partition| match partition.timestamp() {
501            GtidState::Absent => None,
502            GtidState::Active(frontier_time) => {
503                let part_uuid = partition
504                    .interval()
505                    .singleton()
506                    .expect("Non-absent paritions will be singletons");
507                // NOTE: Since we enforce replica_preserve_commit_order=ON we can start the interval at 1
508                // since we know that all transactions with a lower transaction id were monotonic
509                Some(
510                    Sid::new(*part_uuid.as_bytes())
511                        .with_interval(GnoInterval::new(1, frontier_time.get())),
512                )
513            }
514        })
515        .collect::<Vec<_>>();
516
517    // Request that the stream provide us with a heartbeat message when no other messages have
518    // been sent. This isn't strictly necessary, but is a lightweight additional general
519    // health-check for the replication stream
520    conn.query_drop(format!(
521        "SET @master_heartbeat_period = {};",
522        mz_storage_types::dyncfgs::MYSQL_REPLICATION_HEARTBEAT_INTERVAL
523            .get(config.config.config_set())
524            .as_nanos()
525    ))
526    .await?;
527
528    // Generate a deterministic server-id for identifying us as a replica on the upstream mysql server.
529    // The value does not actually matter since it's irrelevant for GTID-based replication and won't
530    // cause errors if it happens to be the same as another replica in the mysql cluster (based on testing),
531    // but by setting it to a constant value we can make it easier for users to identify Materialize connections
532    let server_id = match config.id {
533        GlobalId::System(id) => id,
534        GlobalId::User(id) => id,
535        GlobalId::Transient(id) => id,
536        _ => unreachable!(),
537    };
538    let server_id = match u32::try_from(server_id) {
539        Ok(id) if id + REPLICATION_SERVER_ID_OFFSET < u32::MAX => id + REPLICATION_SERVER_ID_OFFSET,
540        _ => REPLICATION_SERVER_ID_OFFSET,
541    };
542
543    trace!(
544        "requesting replication stream with seen_gtids: {seen_gtids:?} \
545         and server_id: {server_id:?}"
546    );
547
548    // We need to transform the connection into a BinlogStream (which takes the `Conn` by value),
549    // but to avoid dropping any active SSH tunnel used by the connection we need to preserve the
550    // tunnel handle and return it
551    let (inner_conn, conn_tunnel_handle) = conn.take();
552
553    let repl_stream = match inner_conn
554        .get_binlog_stream(
555            BinlogStreamRequest::new(server_id)
556                .with_gtid()
557                .with_gtid_set(seen_gtids),
558        )
559        .await
560    {
561        Ok(stream) => stream,
562        Err(mysql_async::Error::Server(ref server_err))
563            if server_err.code == ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE =>
564        {
565            // The GTID set we requested is no longer available
566            return Ok(Err(DefiniteError::BinlogNotAvailable));
567        }
568        // TODO: handle other error types. Some may require a re-snapshot and some may be transient
569        Err(err) => return Err(err.into()),
570    };
571
572    Ok(Ok((repl_stream, conn_tunnel_handle)))
573}