mz_storage/source/mysql/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the replication side of the [`MySqlSourceConnection`] ingestion dataflow.
11//!
12//! # Progress tracking using Partitioned Timestamps
13//!
14//! This dataflow uses a Partitioned Timestamp implementation to represent the GTID Set that
15//! comprises the full set of committed transactions from the MySQL Server. The frontier
16//! representing progress for this dataflow represents the full range of possible UUIDs +
17//! Transaction IDs of future GTIDs that could be added to the GTID Set.
18//!
19//! See the [`mz_storage_types::sources::mysql::GtidPartition`] type for more information.
20//!
21//! To maintain a complete frontier of the full UUID GTID range, we use a
22//! [`partitions::GtidReplicationPartitions`] struct to store the GTID Set as a set of partitions.
23//! This allows us to easily advance the frontier each time we see a new GTID on the replication
24//! stream.
25//!
26//! # Resumption
27//!
28//! When the dataflow is resumed, the MySQL replication stream is started from the GTID frontier
29//! of the minimum frontier across all source outputs. This is compared against the GTID set that
30//! may still be obtained from the MySQL server, using the @@GTID_PURGED value in MySQL to
31//! determine GTIDs that are no longer available in the binlog and to put the source in an error
32//! state if we cannot resume from the GTID frontier.
33//!
34//! # Rewinds
35//!
36//! The replication stream may be resumed from a point before the snapshot for a specific output
37//! occurs. To avoid double-counting updates that were present in the snapshot, we store a map
38//! of pending rewinds that we've received from the snapshot operator, and when we see updates
39//! for an output that were present in the snapshot, we negate the snapshot update
40//! (at the minimum timestamp) and send it again at the correct GTID.
41
42use std::collections::BTreeMap;
43use std::convert::Infallible;
44use std::num::NonZeroU64;
45use std::pin::pin;
46use std::sync::Arc;
47
48use differential_dataflow::AsCollection;
49use futures::StreamExt;
50use itertools::Itertools;
51use mysql_async::prelude::Queryable;
52use mysql_async::{BinlogStream, BinlogStreamRequest, GnoInterval, Sid};
53use mz_ore::future::InTask;
54use mz_ssh_util::tunnel_manager::ManagedSshTunnelHandle;
55use mz_timely_util::containers::stack::AccountedStackBuilder;
56use timely::PartialOrder;
57use timely::container::CapacityContainerBuilder;
58use timely::dataflow::channels::pact::Exchange;
59use timely::dataflow::operators::Concat;
60use timely::dataflow::operators::core::Map;
61use timely::dataflow::{Scope, Stream};
62use timely::progress::{Antichain, Timestamp};
63use tracing::trace;
64use uuid::Uuid;
65
66use mz_mysql_util::{
67    ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE, MySqlConn, MySqlError, query_sys_var,
68};
69use mz_ore::cast::CastFrom;
70use mz_repr::GlobalId;
71use mz_storage_types::errors::DataflowError;
72use mz_storage_types::sources::MySqlSourceConnection;
73use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
74use mz_timely_util::builder_async::{
75    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
76};
77
78use crate::metrics::source::mysql::MySqlSourceMetrics;
79use crate::source::RawSourceCreationConfig;
80use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
81
82use super::{
83    DefiniteError, ReplicationError, RewindRequest, SourceOutputInfo, TransientError,
84    return_definite_error, validate_mysql_repl_settings,
85};
86
87mod context;
88mod events;
89mod partitions;
90
91/// Used as a partition id to determine if the worker is
92/// responsible for reading from the MySQL replication stream
93static REPL_READER: &str = "reader";
94
95/// A constant arbitrary offset to add to the source-id to
96/// produce a deterministic server-id for identifying Materialize
97/// as a replica on the upstream MySQL server.
98/// TODO(roshan): Add user-facing documentation for this
99static REPLICATION_SERVER_ID_OFFSET: u32 = 524000;
100
101/// Renders the replication dataflow. See the module documentation for more
102/// information.
103pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
104    scope: G,
105    config: RawSourceCreationConfig,
106    connection: MySqlSourceConnection,
107    source_outputs: Vec<SourceOutputInfo>,
108    rewind_stream: &Stream<G, RewindRequest>,
109    metrics: MySqlSourceMetrics,
110) -> (
111    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
112    Stream<G, Infallible>,
113    Stream<G, ReplicationError>,
114    PressOnDropButton,
115) {
116    let op_name = format!("MySqlReplicationReader({})", config.id);
117    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
118
119    let repl_reader_id = u64::cast_from(config.responsible_worker(REPL_READER));
120    let (mut data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
121    let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
122    // Captures DefiniteErrors that affect the entire source, including all outputs
123    let (definite_error_handle, definite_errors) =
124        builder.new_output::<CapacityContainerBuilder<_>>();
125    let mut rewind_input = builder.new_input_for(
126        rewind_stream,
127        Exchange::new(move |_| repl_reader_id),
128        &data_output,
129    );
130
131    let output_indexes = source_outputs
132        .iter()
133        .map(|output| output.output_index)
134        .collect_vec();
135
136    metrics.tables.set(u64::cast_from(source_outputs.len()));
137
138    let (button, transient_errors) = builder.build_fallible(move |caps| {
139        let busy_signal = Arc::clone(&config.busy_signal);
140        Box::pin(SignaledFuture::new(busy_signal, async move {
141            let (id, worker_id) = (config.id, config.worker_id);
142            let [data_cap_set, upper_cap_set, definite_error_cap_set]: &mut [_; 3] =
143                caps.try_into().unwrap();
144
145            // Only run the replication reader on the worker responsible for it.
146            if !config.responsible_for(REPL_READER) {
147                return Ok(());
148            }
149
150            let connection_config = connection
151                .connection
152                .config(
153                    &config.config.connection_context.secrets_reader,
154                    &config.config,
155                    InTask::Yes,
156                )
157                .await?;
158
159            let mut conn = connection_config
160                .connect(
161                    &format!("timely-{worker_id} MySQL replication reader"),
162                    &config.config.connection_context.ssh_tunnel_manager,
163                )
164                .await?;
165
166            // Get the set of GTIDs that have been purged from the binlogs. The assumption is that this
167            // represents the frontier of possible GTIDs that exist in the binlog, that we can start
168            // replicating from.
169            let binlog_purged_set = query_sys_var(&mut conn, "global.gtid_purged").await?;
170            let binlog_frontier = match gtid_set_frontier(&binlog_purged_set) {
171                Ok(frontier) => frontier,
172                Err(err) => {
173                    let err = DefiniteError::UnsupportedGtidState(err.to_string());
174                    return Ok(
175                        // If GTID intervals in the binlog are not available in a monotonic consecutive
176                        // order this breaks all of our assumptions and there is nothing else we can do.
177                        // This can occur if the mysql server is restored to a previous point-in-time
178                        // or if a user manually adds transactions to the @@gtid_purged system var.
179                        return_definite_error(
180                            err,
181                            &output_indexes,
182                            &data_output,
183                            data_cap_set,
184                            &definite_error_handle,
185                            definite_error_cap_set,
186                        )
187                        .await,
188                    );
189                }
190            };
191
192            trace!(%id, "timely-{worker_id} replication binlog frontier: {binlog_frontier:?}");
193
194            // upstream-table-name: Vec<SourceOutputInfo> since multiple
195            // outputs can refer to the same table
196            let mut table_info = BTreeMap::new();
197            let mut output_uppers = Vec::new();
198
199            // Calculate the lowest frontier across all outputs, which represents the point which
200            // we should start replication from.
201            let min_frontier = Antichain::from_elem(GtidPartition::minimum());
202            for output in source_outputs.into_iter() {
203                // If an output is resuming at the minimum frontier then its snapshot
204                // has not yet been committed.
205                // We need to resume from a frontier before the output's snapshot frontier
206                // to ensure we don't miss updates that happen after the snapshot was taken.
207                //
208                // This also ensures that tables created as part of the same CREATE SOURCE
209                // statement are 'effectively' snapshot at the same GTID Set, even if their
210                // actual snapshot frontiers are different due to a restart.
211                //
212                // We've chosen the frontier beyond the GTID Set recorded
213                // during purification as this resume point.
214                if &output.resume_upper == &min_frontier {
215                    output_uppers.push(output.initial_gtid_set.clone());
216                } else {
217                    output_uppers.push(output.resume_upper.clone());
218                }
219
220                table_info
221                    .entry(output.table_name.clone())
222                    .or_insert_with(Vec::new)
223                    .push(output);
224            }
225            let resume_upper = match output_uppers.len() {
226                0 => {
227                    // If there are no outputs to replicate then we will just be updating the
228                    // source progress collection. In this case we can just start from the head of
229                    // the binlog to avoid wasting time on old events.
230                    trace!(%id, "timely-{worker_id} replication reader found no outputs \
231                                 to replicate, using latest gtid_executed as resume_upper");
232                    let executed_gtid_set =
233                        query_sys_var(&mut conn, "global.gtid_executed").await?;
234
235                    gtid_set_frontier(&executed_gtid_set)?
236                }
237                _ => Antichain::from_iter(output_uppers.into_iter().flatten()),
238            };
239
240            // Validate that we can actually resume from this upper.
241            if !PartialOrder::less_equal(&binlog_frontier, &resume_upper) {
242                let err = DefiniteError::BinlogMissingResumePoint(
243                    format!("{:?}", binlog_frontier),
244                    format!("{:?}", resume_upper),
245                );
246                return Ok(return_definite_error(
247                    err,
248                    &output_indexes,
249                    &data_output,
250                    data_cap_set,
251                    &definite_error_handle,
252                    definite_error_cap_set,
253                )
254                .await);
255            };
256
257            data_cap_set.downgrade(&*resume_upper);
258            upper_cap_set.downgrade(&*resume_upper);
259            trace!(%id, "timely-{worker_id} replication reader started at {:?}", resume_upper);
260
261            let mut rewinds = BTreeMap::new();
262            while let Some(event) = rewind_input.next().await {
263                if let AsyncEvent::Data(caps, data) = event {
264                    for req in data {
265                        // Check that the replication stream will be resumed from the snapshot point or before.
266                        if !PartialOrder::less_equal(&resume_upper, &req.snapshot_upper) {
267                            let err = DefiniteError::BinlogMissingResumePoint(
268                                format!("{:?}", resume_upper),
269                                format!("{:?}", req.snapshot_upper),
270                            );
271                            return Ok(return_definite_error(
272                                err,
273                                &output_indexes,
274                                &data_output,
275                                data_cap_set,
276                                &definite_error_handle,
277                                definite_error_cap_set,
278                            )
279                            .await);
280                        };
281                        // If the snapshot point is the same as the resume point then we don't need to rewind
282                        if resume_upper != req.snapshot_upper {
283                            rewinds.insert(req.output_index.clone(), (caps.clone(), req));
284                        }
285                    }
286                }
287            }
288            trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
289
290            // We don't use _conn_tunnel_handle here, but need to keep it around to ensure that the
291            // SSH tunnel is not dropped until the replication stream is dropped.
292            let (binlog_stream, _conn_tunnel_handle) =
293                match raw_stream(&config, conn, &resume_upper).await? {
294                    Ok(stream) => stream,
295                    // If the replication stream cannot be obtained in a definite way there is
296                    // nothing else to do. These errors are not retractable.
297                    Err(err) => {
298                        return Ok(return_definite_error(
299                            err,
300                            &output_indexes,
301                            &data_output,
302                            data_cap_set,
303                            &definite_error_handle,
304                            definite_error_cap_set,
305                        )
306                        .await);
307                    }
308                };
309            let mut stream = pin!(binlog_stream.peekable());
310
311            // Store all partitions from the resume_upper so we can create a frontier that comprises
312            // timestamps for partitions representing the full range of UUIDs to advance our main
313            // capabilities.
314            let mut data_partitions =
315                partitions::GtidReplicationPartitions::from(resume_upper.clone());
316            let mut progress_partitions = partitions::GtidReplicationPartitions::from(resume_upper);
317
318            let mut repl_context = context::ReplContext::new(
319                &config,
320                &connection_config,
321                stream.as_mut(),
322                &table_info,
323                &metrics,
324                &mut data_output,
325                data_cap_set,
326                upper_cap_set,
327                rewinds,
328            );
329
330            let mut active_tx: Option<(Uuid, NonZeroU64)> = None;
331
332            let mut row_event_buffer = Vec::new();
333
334            while let Some(event) = repl_context.stream.next().await {
335                use mysql_async::binlog::events::*;
336                let event = event?;
337                let event_data = event.read_data()?;
338                metrics.total.inc();
339
340                match event_data {
341                    Some(EventData::XidEvent(_)) => {
342                        // We've received a transaction commit event, which means that we've seen
343                        // all events for the current GTID and we can advance the frontier beyond.
344                        let (source_id, tx_id) = active_tx.take().expect("unexpected xid event");
345
346                        // Increment the transaction-id to the next GTID we should see from this source-id
347                        let next_tx_id = tx_id.checked_add(1).unwrap();
348                        let next_gtid =
349                            GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
350
351                        if let Err(err) = data_partitions.advance_frontier(next_gtid) {
352                            return Ok(return_definite_error(
353                                err,
354                                &output_indexes,
355                                &data_output,
356                                data_cap_set,
357                                &definite_error_handle,
358                                definite_error_cap_set,
359                            )
360                            .await);
361                        }
362                        let new_upper = data_partitions.frontier();
363                        repl_context.downgrade_data_cap_set("xid_event", new_upper);
364                    }
365                    // We receive a GtidEvent that tells us the GTID of the incoming RowsEvents (and other events)
366                    Some(EventData::GtidEvent(event)) => {
367                        let source_id = Uuid::from_bytes(event.sid());
368                        let tx_id = NonZeroU64::new(event.gno()).unwrap();
369
370                        // We are potentially about to ingest a big transaction that we don't want
371                        // to store in memory. For this reason we are immediately downgrading our
372                        // progress frontier to one that includes the upcoming transaction. This
373                        // will cause a remap binding to be minted right away and so the data of
374                        // the transaction will not accumulate in the reclock operator.
375                        let next_tx_id = tx_id.checked_add(1).unwrap();
376                        let next_gtid =
377                            GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
378
379                        if let Err(err) = progress_partitions.advance_frontier(next_gtid) {
380                            return Ok(return_definite_error(
381                                err,
382                                &output_indexes,
383                                &data_output,
384                                data_cap_set,
385                                &definite_error_handle,
386                                definite_error_cap_set,
387                            )
388                            .await);
389                        }
390                        let new_upper = progress_partitions.frontier();
391                        repl_context.downgrade_progress_cap_set("xid_event", new_upper);
392
393                        // Store the information of the active transaction for the subsequent events
394                        active_tx = Some((source_id, tx_id));
395                    }
396                    Some(EventData::RowsEvent(data)) => {
397                        let (source_id, tx_id) = active_tx
398                            .clone()
399                            .expect("gtid cap should be set by previous GtidEvent");
400                        let cur_gtid =
401                            GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
402
403                        events::handle_rows_event(
404                            data,
405                            &repl_context,
406                            &cur_gtid,
407                            &mut row_event_buffer,
408                        )
409                        .await?;
410
411                        // Advance the frontier up to the point right before this GTID, since we
412                        // might still see other events that are part of this same GTID, such as
413                        // row events for multiple tables or large row events split into multiple.
414                        if let Err(err) = data_partitions.advance_frontier(cur_gtid) {
415                            return Ok(return_definite_error(
416                                err,
417                                &output_indexes,
418                                &data_output,
419                                data_cap_set,
420                                &definite_error_handle,
421                                definite_error_cap_set,
422                            )
423                            .await);
424                        }
425                        let new_upper = data_partitions.frontier();
426                        repl_context.downgrade_data_cap_set("rows_event", new_upper);
427                    }
428                    Some(EventData::QueryEvent(event)) => {
429                        let (source_id, tx_id) = active_tx
430                            .clone()
431                            .expect("gtid cap should be set by previous GtidEvent");
432                        let cur_gtid =
433                            GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
434
435                        let should_advance =
436                            events::handle_query_event(event, &mut repl_context, &cur_gtid).await?;
437
438                        if should_advance {
439                            active_tx = None;
440                            // Increment the transaction-id to the next GTID we should see from this source-id
441                            let next_tx_id = tx_id.checked_add(1).unwrap();
442                            let next_gtid = GtidPartition::new_singleton(
443                                source_id,
444                                GtidState::Active(next_tx_id),
445                            );
446
447                            if let Err(err) = data_partitions.advance_frontier(next_gtid) {
448                                return Ok(return_definite_error(
449                                    err,
450                                    &output_indexes,
451                                    &data_output,
452                                    data_cap_set,
453                                    &definite_error_handle,
454                                    definite_error_cap_set,
455                                )
456                                .await);
457                            }
458                            let new_upper = data_partitions.frontier();
459                            repl_context.downgrade_data_cap_set("query_event", new_upper);
460                        }
461                    }
462                    _ => {
463                        // TODO: Handle other event types
464                        metrics.ignored.inc();
465                    }
466                }
467            }
468            // We never expect the replication stream to gracefully end
469            Err(TransientError::ReplicationEOF)
470        }))
471    });
472
473    // TODO: Split row decoding into a separate operator that can be distributed across all workers
474
475    let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
476
477    (
478        data_stream.as_collection(),
479        upper_stream,
480        errors,
481        button.press_on_drop(),
482    )
483}
484
485/// Produces the replication stream from the MySQL server. This will return all transactions
486/// whose GTIDs were not present in the GTID UUIDs referenced in the `resume_uppper` partitions.
487async fn raw_stream(
488    config: &RawSourceCreationConfig,
489    mut conn: MySqlConn,
490    resume_upper: &Antichain<GtidPartition>,
491) -> Result<Result<(BinlogStream, Option<ManagedSshTunnelHandle>), DefiniteError>, TransientError> {
492    // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
493    match validate_mysql_repl_settings(&mut conn).await {
494        Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
495            return Ok(Err(DefiniteError::ServerConfigurationError(
496                err.to_string(),
497            )));
498        }
499        Err(err) => Err(err)?,
500        Ok(()) => (),
501    };
502
503    // To start the stream we need to provide a GTID set of the transactions that we've 'seen'
504    // and the server will send us all transactions that have been committed after that point.
505    // NOTE: The 'Gno' intervals in this transaction-set use an open set [start, end)
506    // interval, which is different than the closed-set [start, end] form returned by the
507    // @gtid_executed system variable. So the intervals we construct in this GTID set
508    // end with the value of the transaction-id that we want to start replication at,
509    // which happens to be the same as the definition of a frontier value.
510    // https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#sysvar_gtid_executed
511    // https://dev.mysql.com/doc/dev/mysql-server/latest/classGtid__set.html#ab46da5ceeae0198b90f209b0a8be2a24
512    let seen_gtids = resume_upper
513        .iter()
514        .flat_map(|partition| match partition.timestamp() {
515            GtidState::Absent => None,
516            GtidState::Active(frontier_time) => {
517                let part_uuid = partition
518                    .interval()
519                    .singleton()
520                    .expect("Non-absent paritions will be singletons");
521                // NOTE: Since we enforce replica_preserve_commit_order=ON we can start the interval at 1
522                // since we know that all transactions with a lower transaction id were monotonic
523                Some(
524                    Sid::new(*part_uuid.as_bytes())
525                        .with_interval(GnoInterval::new(1, frontier_time.get())),
526                )
527            }
528        })
529        .collect::<Vec<_>>();
530
531    // Request that the stream provide us with a heartbeat message when no other messages have
532    // been sent. This isn't strictly necessary, but is a lightweight additional general
533    // health-check for the replication stream
534    conn.query_drop(format!(
535        "SET @master_heartbeat_period = {};",
536        mz_storage_types::dyncfgs::MYSQL_REPLICATION_HEARTBEAT_INTERVAL
537            .get(config.config.config_set())
538            .as_nanos()
539    ))
540    .await?;
541
542    // Generate a deterministic server-id for identifying us as a replica on the upstream mysql server.
543    // The value does not actually matter since it's irrelevant for GTID-based replication and won't
544    // cause errors if it happens to be the same as another replica in the mysql cluster (based on testing),
545    // but by setting it to a constant value we can make it easier for users to identify Materialize connections
546    let server_id = match config.id {
547        GlobalId::System(id) => id,
548        GlobalId::User(id) => id,
549        GlobalId::Transient(id) => id,
550        _ => unreachable!(),
551    };
552    let server_id = match u32::try_from(server_id) {
553        Ok(id) if id + REPLICATION_SERVER_ID_OFFSET < u32::MAX => id + REPLICATION_SERVER_ID_OFFSET,
554        _ => REPLICATION_SERVER_ID_OFFSET,
555    };
556
557    trace!(
558        "requesting replication stream with seen_gtids: {seen_gtids:?} \
559         and server_id: {server_id:?}"
560    );
561
562    // We need to transform the connection into a BinlogStream (which takes the `Conn` by value),
563    // but to avoid dropping any active SSH tunnel used by the connection we need to preserve the
564    // tunnel handle and return it
565    let (inner_conn, conn_tunnel_handle) = conn.take();
566
567    let repl_stream = match inner_conn
568        .get_binlog_stream(
569            BinlogStreamRequest::new(server_id)
570                .with_gtid()
571                .with_gtid_set(seen_gtids),
572        )
573        .await
574    {
575        Ok(stream) => stream,
576        Err(mysql_async::Error::Server(ref server_err))
577            if server_err.code == ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE =>
578        {
579            // The GTID set we requested is no longer available
580            return Ok(Err(DefiniteError::BinlogNotAvailable));
581        }
582        // TODO: handle other error types. Some may require a re-snapshot and some may be transient
583        Err(err) => return Err(err.into()),
584    };
585
586    Ok(Ok((repl_stream, conn_tunnel_handle)))
587}