mz_storage/source/mysql/replication.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the replication side of the [`MySqlSourceConnection`] ingestion dataflow.
11//!
12//! # Progress tracking using Partitioned Timestamps
13//!
14//! This dataflow uses a Partitioned Timestamp implementation to represent the GTID Set that
15//! comprises the full set of committed transactions from the MySQL Server. The frontier
16//! representing progress for this dataflow represents the full range of possible UUIDs +
17//! Transaction IDs of future GTIDs that could be added to the GTID Set.
18//!
19//! See the [`mz_storage_types::sources::mysql::GtidPartition`] type for more information.
20//!
21//! To maintain a complete frontier of the full UUID GTID range, we use a
22//! [`partitions::GtidReplicationPartitions`] struct to store the GTID Set as a set of partitions.
23//! This allows us to easily advance the frontier each time we see a new GTID on the replication
24//! stream.
25//!
26//! # Resumption
27//!
28//! When the dataflow is resumed, the MySQL replication stream is started from the GTID frontier
29//! of the minimum frontier across all source outputs. This is compared against the GTID set that
30//! may still be obtained from the MySQL server, using the @@GTID_PURGED value in MySQL to
31//! determine GTIDs that are no longer available in the binlog and to put the source in an error
32//! state if we cannot resume from the GTID frontier.
33//!
34//! # Rewinds
35//!
36//! The replication stream may be resumed from a point before the snapshot for a specific output
37//! occurs. To avoid double-counting updates that were present in the snapshot, we store a map
38//! of pending rewinds that we've received from the snapshot operator, and when we see updates
39//! for an output that were present in the snapshot, we negate the snapshot update
40//! (at the minimum timestamp) and send it again at the correct GTID.
41
42use std::collections::BTreeMap;
43use std::num::NonZeroU64;
44use std::pin::pin;
45use std::sync::Arc;
46
47use differential_dataflow::AsCollection;
48use futures::StreamExt;
49use itertools::Itertools;
50use mysql_async::prelude::Queryable;
51use mysql_async::{BinlogStream, BinlogStreamRequest, GnoInterval, Sid};
52use mz_ore::future::InTask;
53use mz_ssh_util::tunnel_manager::ManagedSshTunnelHandle;
54use mz_timely_util::containers::stack::AccountedStackBuilder;
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Exchange;
58use timely::dataflow::operators::Concat;
59use timely::dataflow::operators::core::Map;
60use timely::dataflow::{Scope, StreamVec};
61use timely::progress::{Antichain, Timestamp};
62use tracing::trace;
63use uuid::Uuid;
64
65use mz_mysql_util::{
66 ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE, MySqlConn, MySqlError, query_sys_var,
67};
68use mz_ore::cast::CastFrom;
69use mz_repr::GlobalId;
70use mz_storage_types::errors::DataflowError;
71use mz_storage_types::sources::MySqlSourceConnection;
72use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
73use mz_timely_util::builder_async::{
74 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
75};
76
77use crate::metrics::source::mysql::MySqlSourceMetrics;
78use crate::source::RawSourceCreationConfig;
79use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
80
81use super::{
82 DefiniteError, ReplicationError, RewindRequest, SourceOutputInfo, TransientError,
83 return_definite_error, validate_mysql_repl_settings,
84};
85
86mod context;
87mod events;
88mod partitions;
89
90/// Used as a partition id to determine if the worker is
91/// responsible for reading from the MySQL replication stream
92static REPL_READER: &str = "reader";
93
94/// A constant arbitrary offset to add to the source-id to
95/// produce a deterministic server-id for identifying Materialize
96/// as a replica on the upstream MySQL server.
97/// TODO(roshan): Add user-facing documentation for this
98static REPLICATION_SERVER_ID_OFFSET: u32 = 524000;
99
100/// Renders the replication dataflow. See the module documentation for more
101/// information.
102pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
103 scope: G,
104 config: RawSourceCreationConfig,
105 connection: MySqlSourceConnection,
106 source_outputs: Vec<SourceOutputInfo>,
107 rewind_stream: StreamVec<G, RewindRequest>,
108 metrics: MySqlSourceMetrics,
109) -> (
110 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
111 StreamVec<G, ReplicationError>,
112 PressOnDropButton,
113) {
114 let op_name = format!("MySqlReplicationReader({})", config.id);
115 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
116
117 let repl_reader_id = u64::cast_from(config.responsible_worker(REPL_READER));
118 let (mut data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
119 // Captures DefiniteErrors that affect the entire source, including all outputs
120 let (definite_error_handle, definite_errors) =
121 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
122 let mut rewind_input = builder.new_input_for(
123 rewind_stream,
124 Exchange::new(move |_| repl_reader_id),
125 &data_output,
126 );
127
128 let output_indexes = source_outputs
129 .iter()
130 .map(|output| output.output_index)
131 .collect_vec();
132
133 metrics.tables.set(u64::cast_from(source_outputs.len()));
134
135 let (button, transient_errors) = builder.build_fallible(move |caps| {
136 let busy_signal = Arc::clone(&config.busy_signal);
137 Box::pin(SignaledFuture::new(busy_signal, async move {
138 let (id, worker_id) = (config.id, config.worker_id);
139 let [data_cap_set, definite_error_cap_set]: &mut [_; 2] = caps.try_into().unwrap();
140
141 // Only run the replication reader on the worker responsible for it.
142 if !config.responsible_for(REPL_READER) {
143 return Ok(());
144 }
145
146 let connection_config = connection
147 .connection
148 .config(
149 &config.config.connection_context.secrets_reader,
150 &config.config,
151 InTask::Yes,
152 )
153 .await?;
154
155 let mut conn = connection_config
156 .connect(
157 &format!("timely-{worker_id} MySQL replication reader"),
158 &config.config.connection_context.ssh_tunnel_manager,
159 )
160 .await?;
161
162 // Get the set of GTIDs that have been purged from the binlogs. The assumption is that this
163 // represents the frontier of possible GTIDs that exist in the binlog, that we can start
164 // replicating from.
165 let binlog_purged_set = query_sys_var(&mut conn, "global.gtid_purged").await?;
166 let binlog_frontier = match gtid_set_frontier(&binlog_purged_set) {
167 Ok(frontier) => frontier,
168 Err(err) => {
169 let err = DefiniteError::UnsupportedGtidState(err.to_string());
170 tracing::warn!(%id, "Unable to determine GTID frontier from @@gtid_purged: {err}");
171 return Ok(
172 // If GTID intervals in the binlog are not available in a monotonic consecutive
173 // order this breaks all of our assumptions and there is nothing else we can do.
174 // This can occur if the mysql server is restored to a previous point-in-time
175 // or if a user manually adds transactions to the @@gtid_purged system var.
176 return_definite_error(
177 err,
178 &output_indexes,
179 &data_output,
180 data_cap_set,
181 &definite_error_handle,
182 definite_error_cap_set,
183 )
184 .await,
185 );
186 }
187 };
188
189 trace!(%id, "timely-{worker_id} replication binlog frontier: {binlog_frontier:?}");
190
191 // upstream-table-name: Vec<SourceOutputInfo> since multiple
192 // outputs can refer to the same table
193 let mut table_info = BTreeMap::new();
194 let mut output_uppers = Vec::new();
195
196 // Calculate the lowest frontier across all outputs, which represents the point which
197 // we should start replication from.
198 let min_frontier = Antichain::from_elem(GtidPartition::minimum());
199 for output in source_outputs.into_iter() {
200 // If an output is resuming at the minimum frontier then its snapshot
201 // has not yet been committed.
202 // We need to resume from a frontier before the output's snapshot frontier
203 // to ensure we don't miss updates that happen after the snapshot was taken.
204 //
205 // This also ensures that tables created as part of the same CREATE SOURCE
206 // statement are 'effectively' snapshot at the same GTID Set, even if their
207 // actual snapshot frontiers are different due to a restart.
208 //
209 // We've chosen the frontier beyond the GTID Set recorded
210 // during purification as this resume point.
211 if &output.resume_upper == &min_frontier {
212 output_uppers.push(output.initial_gtid_set.clone());
213 } else {
214 output_uppers.push(output.resume_upper.clone());
215 }
216
217 table_info
218 .entry(output.table_name.clone())
219 .or_insert_with(Vec::new)
220 .push(output);
221 }
222 let resume_upper = match output_uppers.len() {
223 0 => {
224 // If there are no outputs to replicate then we will just be updating the
225 // source progress collection. In this case we can just start from the head of
226 // the binlog to avoid wasting time on old events.
227 trace!(%id, "timely-{worker_id} replication reader found no outputs \
228 to replicate, using latest gtid_executed as resume_upper");
229 let executed_gtid_set =
230 query_sys_var(&mut conn, "global.gtid_executed").await?;
231
232 gtid_set_frontier(&executed_gtid_set)?
233 }
234 _ => Antichain::from_iter(output_uppers.into_iter().flatten()),
235 };
236
237 // Validate that we can actually resume from this upper.
238 if !PartialOrder::less_equal(&binlog_frontier, &resume_upper) {
239 let err = DefiniteError::BinlogMissingResumePoint(
240 format!("{:?}", binlog_frontier),
241 format!("{:?}", resume_upper),
242 );
243 return Ok(return_definite_error(
244 err,
245 &output_indexes,
246 &data_output,
247 data_cap_set,
248 &definite_error_handle,
249 definite_error_cap_set,
250 )
251 .await);
252 };
253
254 data_cap_set.downgrade(&*resume_upper);
255 trace!(%id, "timely-{worker_id} replication reader started at {:?}", resume_upper);
256
257 let mut rewinds = BTreeMap::new();
258 while let Some(event) = rewind_input.next().await {
259 if let AsyncEvent::Data(caps, data) = event {
260 for req in data {
261 // Check that the replication stream will be resumed from the snapshot point or before.
262 if !PartialOrder::less_equal(&resume_upper, &req.snapshot_upper) {
263 let err = DefiniteError::BinlogMissingResumePoint(
264 format!("{:?}", resume_upper),
265 format!("{:?}", req.snapshot_upper),
266 );
267 return Ok(return_definite_error(
268 err,
269 &output_indexes,
270 &data_output,
271 data_cap_set,
272 &definite_error_handle,
273 definite_error_cap_set,
274 )
275 .await);
276 };
277 // If the snapshot point is the same as the resume point then we don't need to rewind
278 if resume_upper != req.snapshot_upper {
279 rewinds.insert(req.output_index.clone(), (caps.clone(), req));
280 }
281 }
282 }
283 }
284 trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
285
286 // We don't use _conn_tunnel_handle here, but need to keep it around to ensure that the
287 // SSH tunnel is not dropped until the replication stream is dropped.
288 let (binlog_stream, _conn_tunnel_handle) =
289 match raw_stream(&config, conn, &resume_upper).await? {
290 Ok(stream) => stream,
291 // If the replication stream cannot be obtained in a definite way there is
292 // nothing else to do. These errors are not retractable.
293 Err(err) => {
294 return Ok(return_definite_error(
295 err,
296 &output_indexes,
297 &data_output,
298 data_cap_set,
299 &definite_error_handle,
300 definite_error_cap_set,
301 )
302 .await);
303 }
304 };
305 let mut stream = pin!(binlog_stream.peekable());
306
307 // Store all partitions from the resume_upper so we can create a frontier that comprises
308 // timestamps for partitions representing the full range of UUIDs to advance our main
309 // capabilities.
310 let mut data_partitions =
311 partitions::GtidReplicationPartitions::from(resume_upper.clone());
312 let mut progress_partitions = partitions::GtidReplicationPartitions::from(resume_upper);
313
314 let mut repl_context = context::ReplContext::new(
315 &config,
316 &connection_config,
317 stream.as_mut(),
318 &table_info,
319 &metrics,
320 &mut data_output,
321 data_cap_set,
322 rewinds,
323 );
324
325 let mut active_tx: Option<(Uuid, NonZeroU64)> = None;
326
327 let mut row_event_buffer = Vec::new();
328
329 while let Some(event) = repl_context.stream.next().await {
330 use mysql_async::binlog::events::*;
331 let event = event?;
332 let event_data = event.read_data()?;
333 metrics.total.inc();
334
335 match event_data {
336 Some(EventData::XidEvent(_)) => {
337 // We've received a transaction commit event, which means that we've seen
338 // all events for the current GTID and we can advance the frontier beyond.
339 let (source_id, tx_id) = active_tx.take().expect("unexpected xid event");
340
341 // Increment the transaction-id to the next GTID we should see from this source-id
342 let next_tx_id = tx_id.checked_add(1).unwrap();
343 let next_gtid =
344 GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
345
346 if let Err(err) = data_partitions.advance_frontier(next_gtid) {
347 return Ok(return_definite_error(
348 err,
349 &output_indexes,
350 &data_output,
351 data_cap_set,
352 &definite_error_handle,
353 definite_error_cap_set,
354 )
355 .await);
356 }
357 let new_upper = data_partitions.frontier();
358 repl_context.downgrade_data_cap_set("xid_event", new_upper);
359 }
360 // We receive a GtidEvent that tells us the GTID of the incoming RowsEvents (and other events)
361 Some(EventData::GtidEvent(event)) => {
362 let source_id = Uuid::from_bytes(event.sid());
363 let tx_id = NonZeroU64::new(event.gno()).unwrap();
364
365 // We are potentially about to ingest a big transaction that we don't want
366 // to store in memory. For this reason we are immediately downgrading our
367 // progress frontier to one that includes the upcoming transaction. This
368 // will cause a remap binding to be minted right away and so the data of
369 // the transaction will not accumulate in the reclock operator.
370 let next_tx_id = tx_id.checked_add(1).unwrap();
371 let next_gtid =
372 GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
373
374 if let Err(err) = progress_partitions.advance_frontier(next_gtid) {
375 return Ok(return_definite_error(
376 err,
377 &output_indexes,
378 &data_output,
379 data_cap_set,
380 &definite_error_handle,
381 definite_error_cap_set,
382 )
383 .await);
384 }
385 // Store the information of the active transaction for the subsequent events
386 active_tx = Some((source_id, tx_id));
387 }
388 Some(EventData::RowsEvent(data)) => {
389 let (source_id, tx_id) = active_tx
390 .clone()
391 .expect("gtid cap should be set by previous GtidEvent");
392 let cur_gtid =
393 GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
394
395 events::handle_rows_event(
396 data,
397 &repl_context,
398 &cur_gtid,
399 &mut row_event_buffer,
400 )
401 .await?;
402
403 // Advance the frontier up to the point right before this GTID, since we
404 // might still see other events that are part of this same GTID, such as
405 // row events for multiple tables or large row events split into multiple.
406 if let Err(err) = data_partitions.advance_frontier(cur_gtid) {
407 return Ok(return_definite_error(
408 err,
409 &output_indexes,
410 &data_output,
411 data_cap_set,
412 &definite_error_handle,
413 definite_error_cap_set,
414 )
415 .await);
416 }
417 let new_upper = data_partitions.frontier();
418 repl_context.downgrade_data_cap_set("rows_event", new_upper);
419 }
420 Some(EventData::QueryEvent(event)) => {
421 let (source_id, tx_id) = active_tx
422 .clone()
423 .expect("gtid cap should be set by previous GtidEvent");
424 let cur_gtid =
425 GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
426
427 let should_advance =
428 events::handle_query_event(event, &mut repl_context, &cur_gtid).await?;
429
430 if should_advance {
431 active_tx = None;
432 // Increment the transaction-id to the next GTID we should see from this source-id
433 let next_tx_id = tx_id.checked_add(1).unwrap();
434 let next_gtid = GtidPartition::new_singleton(
435 source_id,
436 GtidState::Active(next_tx_id),
437 );
438
439 if let Err(err) = data_partitions.advance_frontier(next_gtid) {
440 return Ok(return_definite_error(
441 err,
442 &output_indexes,
443 &data_output,
444 data_cap_set,
445 &definite_error_handle,
446 definite_error_cap_set,
447 )
448 .await);
449 }
450 let new_upper = data_partitions.frontier();
451 repl_context.downgrade_data_cap_set("query_event", new_upper);
452 }
453 }
454 _ => {
455 // TODO: Handle other event types
456 metrics.ignored.inc();
457 }
458 }
459 }
460 // We never expect the replication stream to gracefully end
461 Err(TransientError::ReplicationEOF)
462 }))
463 });
464
465 // TODO: Split row decoding into a separate operator that can be distributed across all workers
466
467 let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
468
469 (data_stream.as_collection(), errors, button.press_on_drop())
470}
471
472/// Produces the replication stream from the MySQL server. This will return all transactions
473/// whose GTIDs were not present in the GTID UUIDs referenced in the `resume_uppper` partitions.
474async fn raw_stream(
475 config: &RawSourceCreationConfig,
476 mut conn: MySqlConn,
477 resume_upper: &Antichain<GtidPartition>,
478) -> Result<Result<(BinlogStream, Option<ManagedSshTunnelHandle>), DefiniteError>, TransientError> {
479 // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
480 match validate_mysql_repl_settings(&mut conn).await {
481 Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
482 return Ok(Err(DefiniteError::ServerConfigurationError(
483 err.to_string(),
484 )));
485 }
486 Err(err) => Err(err)?,
487 Ok(()) => (),
488 };
489
490 // To start the stream we need to provide a GTID set of the transactions that we've 'seen'
491 // and the server will send us all transactions that have been committed after that point.
492 // NOTE: The 'Gno' intervals in this transaction-set use an open set [start, end)
493 // interval, which is different than the closed-set [start, end] form returned by the
494 // @gtid_executed system variable. So the intervals we construct in this GTID set
495 // end with the value of the transaction-id that we want to start replication at,
496 // which happens to be the same as the definition of a frontier value.
497 // https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#sysvar_gtid_executed
498 // https://dev.mysql.com/doc/dev/mysql-server/latest/classGtid__set.html#ab46da5ceeae0198b90f209b0a8be2a24
499 let seen_gtids = resume_upper
500 .iter()
501 .flat_map(|partition| match partition.timestamp() {
502 GtidState::Absent => None,
503 GtidState::Active(frontier_time) => {
504 let part_uuid = partition
505 .interval()
506 .singleton()
507 .expect("Non-absent paritions will be singletons");
508 // NOTE: Since we enforce replica_preserve_commit_order=ON we can start the interval at 1
509 // since we know that all transactions with a lower transaction id were monotonic
510 Some(
511 Sid::new(*part_uuid.as_bytes())
512 .with_interval(GnoInterval::new(1, frontier_time.get())),
513 )
514 }
515 })
516 .collect::<Vec<_>>();
517
518 // Request that the stream provide us with a heartbeat message when no other messages have
519 // been sent. This isn't strictly necessary, but is a lightweight additional general
520 // health-check for the replication stream
521 conn.query_drop(format!(
522 "SET @master_heartbeat_period = {};",
523 mz_storage_types::dyncfgs::MYSQL_REPLICATION_HEARTBEAT_INTERVAL
524 .get(config.config.config_set())
525 .as_nanos()
526 ))
527 .await?;
528
529 // Generate a deterministic server-id for identifying us as a replica on the upstream mysql server.
530 // The value does not actually matter since it's irrelevant for GTID-based replication and won't
531 // cause errors if it happens to be the same as another replica in the mysql cluster (based on testing),
532 // but by setting it to a constant value we can make it easier for users to identify Materialize connections
533 let server_id = match config.id {
534 GlobalId::System(id) => id,
535 GlobalId::User(id) => id,
536 GlobalId::Transient(id) => id,
537 _ => unreachable!(),
538 };
539 let server_id = match u32::try_from(server_id) {
540 Ok(id) if id + REPLICATION_SERVER_ID_OFFSET < u32::MAX => id + REPLICATION_SERVER_ID_OFFSET,
541 _ => REPLICATION_SERVER_ID_OFFSET,
542 };
543
544 trace!(
545 "requesting replication stream with seen_gtids: {seen_gtids:?} \
546 and server_id: {server_id:?}"
547 );
548
549 // We need to transform the connection into a BinlogStream (which takes the `Conn` by value),
550 // but to avoid dropping any active SSH tunnel used by the connection we need to preserve the
551 // tunnel handle and return it
552 let (inner_conn, conn_tunnel_handle) = conn.take();
553
554 let repl_stream = match inner_conn
555 .get_binlog_stream(
556 BinlogStreamRequest::new(server_id)
557 .with_gtid()
558 .with_gtid_set(seen_gtids),
559 )
560 .await
561 {
562 Ok(stream) => stream,
563 Err(mysql_async::Error::Server(ref server_err))
564 if server_err.code == ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE =>
565 {
566 // The GTID set we requested is no longer available
567 return Ok(Err(DefiniteError::BinlogNotAvailable));
568 }
569 // TODO: handle other error types. Some may require a re-snapshot and some may be transient
570 Err(err) => return Err(err.into()),
571 };
572
573 Ok(Ok((repl_stream, conn_tunnel_handle)))
574}