mz_storage/source/mysql/replication.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the replication side of the [`MySqlSourceConnection`] ingestion dataflow.
11//!
12//! # Progress tracking using Partitioned Timestamps
13//!
14//! This dataflow uses a Partitioned Timestamp implementation to represent the GTID Set that
15//! comprises the full set of committed transactions from the MySQL Server. The frontier
16//! representing progress for this dataflow represents the full range of possible UUIDs +
17//! Transaction IDs of future GTIDs that could be added to the GTID Set.
18//!
19//! See the [`mz_storage_types::sources::mysql::GtidPartition`] type for more information.
20//!
21//! To maintain a complete frontier of the full UUID GTID range, we use a
22//! [`partitions::GtidReplicationPartitions`] struct to store the GTID Set as a set of partitions.
23//! This allows us to easily advance the frontier each time we see a new GTID on the replication
24//! stream.
25//!
26//! # Resumption
27//!
28//! When the dataflow is resumed, the MySQL replication stream is started from the GTID frontier
29//! of the minimum frontier across all source outputs. This is compared against the GTID set that
30//! may still be obtained from the MySQL server, using the @@GTID_PURGED value in MySQL to
31//! determine GTIDs that are no longer available in the binlog and to put the source in an error
32//! state if we cannot resume from the GTID frontier.
33//!
34//! # Rewinds
35//!
36//! The replication stream may be resumed from a point before the snapshot for a specific output
37//! occurs. To avoid double-counting updates that were present in the snapshot, we store a map
38//! of pending rewinds that we've received from the snapshot operator, and when we see updates
39//! for an output that were present in the snapshot, we negate the snapshot update
40//! (at the minimum timestamp) and send it again at the correct GTID.
41
42use std::collections::BTreeMap;
43use std::convert::Infallible;
44use std::num::NonZeroU64;
45use std::pin::pin;
46use std::sync::Arc;
47
48use differential_dataflow::AsCollection;
49use futures::StreamExt;
50use itertools::Itertools;
51use mysql_async::prelude::Queryable;
52use mysql_async::{BinlogStream, BinlogStreamRequest, GnoInterval, Sid};
53use mz_ore::future::InTask;
54use mz_ssh_util::tunnel_manager::ManagedSshTunnelHandle;
55use mz_timely_util::containers::stack::AccountedStackBuilder;
56use timely::PartialOrder;
57use timely::container::CapacityContainerBuilder;
58use timely::dataflow::channels::pact::Exchange;
59use timely::dataflow::operators::Concat;
60use timely::dataflow::operators::core::Map;
61use timely::dataflow::{Scope, Stream};
62use timely::progress::{Antichain, Timestamp};
63use tracing::trace;
64use uuid::Uuid;
65
66use mz_mysql_util::{
67 ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE, MySqlConn, MySqlError, query_sys_var,
68};
69use mz_ore::cast::CastFrom;
70use mz_repr::GlobalId;
71use mz_storage_types::errors::DataflowError;
72use mz_storage_types::sources::MySqlSourceConnection;
73use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
74use mz_timely_util::builder_async::{
75 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
76};
77
78use crate::metrics::source::mysql::MySqlSourceMetrics;
79use crate::source::RawSourceCreationConfig;
80use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
81
82use super::{
83 DefiniteError, ReplicationError, RewindRequest, SourceOutputInfo, TransientError,
84 return_definite_error, validate_mysql_repl_settings,
85};
86
87mod context;
88mod events;
89mod partitions;
90
91/// Used as a partition id to determine if the worker is
92/// responsible for reading from the MySQL replication stream
93static REPL_READER: &str = "reader";
94
95/// A constant arbitrary offset to add to the source-id to
96/// produce a deterministic server-id for identifying Materialize
97/// as a replica on the upstream MySQL server.
98/// TODO(roshan): Add user-facing documentation for this
99static REPLICATION_SERVER_ID_OFFSET: u32 = 524000;
100
101/// Renders the replication dataflow. See the module documentation for more
102/// information.
103pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
104 scope: G,
105 config: RawSourceCreationConfig,
106 connection: MySqlSourceConnection,
107 source_outputs: Vec<SourceOutputInfo>,
108 rewind_stream: &Stream<G, RewindRequest>,
109 metrics: MySqlSourceMetrics,
110) -> (
111 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
112 Stream<G, Infallible>,
113 Stream<G, ReplicationError>,
114 PressOnDropButton,
115) {
116 let op_name = format!("MySqlReplicationReader({})", config.id);
117 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
118
119 let repl_reader_id = u64::cast_from(config.responsible_worker(REPL_READER));
120 let (mut data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
121 let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
122 // Captures DefiniteErrors that affect the entire source, including all outputs
123 let (definite_error_handle, definite_errors) =
124 builder.new_output::<CapacityContainerBuilder<_>>();
125 let mut rewind_input = builder.new_input_for(
126 rewind_stream,
127 Exchange::new(move |_| repl_reader_id),
128 &data_output,
129 );
130
131 let output_indexes = source_outputs
132 .iter()
133 .map(|output| output.output_index)
134 .collect_vec();
135
136 metrics.tables.set(u64::cast_from(source_outputs.len()));
137
138 let (button, transient_errors) = builder.build_fallible(move |caps| {
139 let busy_signal = Arc::clone(&config.busy_signal);
140 Box::pin(SignaledFuture::new(busy_signal, async move {
141 let (id, worker_id) = (config.id, config.worker_id);
142 let [data_cap_set, upper_cap_set, definite_error_cap_set]: &mut [_; 3] =
143 caps.try_into().unwrap();
144
145 // Only run the replication reader on the worker responsible for it.
146 if !config.responsible_for(REPL_READER) {
147 return Ok(());
148 }
149
150 let connection_config = connection
151 .connection
152 .config(
153 &config.config.connection_context.secrets_reader,
154 &config.config,
155 InTask::Yes,
156 )
157 .await?;
158
159 let mut conn = connection_config
160 .connect(
161 &format!("timely-{worker_id} MySQL replication reader"),
162 &config.config.connection_context.ssh_tunnel_manager,
163 )
164 .await?;
165
166 // Get the set of GTIDs that have been purged from the binlogs. The assumption is that this
167 // represents the frontier of possible GTIDs that exist in the binlog, that we can start
168 // replicating from.
169 let binlog_purged_set = query_sys_var(&mut conn, "global.gtid_purged").await?;
170 let binlog_frontier = match gtid_set_frontier(&binlog_purged_set) {
171 Ok(frontier) => frontier,
172 Err(err) => {
173 let err = DefiniteError::UnsupportedGtidState(err.to_string());
174 return Ok(
175 // If GTID intervals in the binlog are not available in a monotonic consecutive
176 // order this breaks all of our assumptions and there is nothing else we can do.
177 // This can occur if the mysql server is restored to a previous point-in-time
178 // or if a user manually adds transactions to the @@gtid_purged system var.
179 return_definite_error(
180 err,
181 &output_indexes,
182 &data_output,
183 data_cap_set,
184 &definite_error_handle,
185 definite_error_cap_set,
186 )
187 .await,
188 );
189 }
190 };
191
192 trace!(%id, "timely-{worker_id} replication binlog frontier: {binlog_frontier:?}");
193
194 // upstream-table-name: Vec<SourceOutputInfo> since multiple
195 // outputs can refer to the same table
196 let mut table_info = BTreeMap::new();
197 let mut output_uppers = Vec::new();
198
199 // Calculate the lowest frontier across all outputs, which represents the point which
200 // we should start replication from.
201 let min_frontier = Antichain::from_elem(GtidPartition::minimum());
202 for output in source_outputs.into_iter() {
203 // If an output is resuming at the minimum frontier then its snapshot
204 // has not yet been committed.
205 // We need to resume from a frontier before the output's snapshot frontier
206 // to ensure we don't miss updates that happen after the snapshot was taken.
207 //
208 // This also ensures that tables created as part of the same CREATE SOURCE
209 // statement are 'effectively' snapshot at the same GTID Set, even if their
210 // actual snapshot frontiers are different due to a restart.
211 //
212 // We've chosen the frontier beyond the GTID Set recorded
213 // during purification as this resume point.
214 if &output.resume_upper == &min_frontier {
215 output_uppers.push(output.initial_gtid_set.clone());
216 } else {
217 output_uppers.push(output.resume_upper.clone());
218 }
219
220 table_info
221 .entry(output.table_name.clone())
222 .or_insert_with(Vec::new)
223 .push(output);
224 }
225 let resume_upper = match output_uppers.len() {
226 0 => {
227 // If there are no outputs to replicate then we will just be updating the
228 // source progress collection. In this case we can just start from the head of
229 // the binlog to avoid wasting time on old events.
230 trace!(%id, "timely-{worker_id} replication reader found no outputs \
231 to replicate, using latest gtid_executed as resume_upper");
232 let executed_gtid_set =
233 query_sys_var(&mut conn, "global.gtid_executed").await?;
234
235 gtid_set_frontier(&executed_gtid_set)?
236 }
237 _ => Antichain::from_iter(output_uppers.into_iter().flatten()),
238 };
239
240 // Validate that we can actually resume from this upper.
241 if !PartialOrder::less_equal(&binlog_frontier, &resume_upper) {
242 let err = DefiniteError::BinlogMissingResumePoint(
243 format!("{:?}", binlog_frontier),
244 format!("{:?}", resume_upper),
245 );
246 return Ok(return_definite_error(
247 err,
248 &output_indexes,
249 &data_output,
250 data_cap_set,
251 &definite_error_handle,
252 definite_error_cap_set,
253 )
254 .await);
255 };
256
257 data_cap_set.downgrade(&*resume_upper);
258 upper_cap_set.downgrade(&*resume_upper);
259 trace!(%id, "timely-{worker_id} replication reader started at {:?}", resume_upper);
260
261 let mut rewinds = BTreeMap::new();
262 while let Some(event) = rewind_input.next().await {
263 if let AsyncEvent::Data(caps, data) = event {
264 for req in data {
265 // Check that the replication stream will be resumed from the snapshot point or before.
266 if !PartialOrder::less_equal(&resume_upper, &req.snapshot_upper) {
267 let err = DefiniteError::BinlogMissingResumePoint(
268 format!("{:?}", resume_upper),
269 format!("{:?}", req.snapshot_upper),
270 );
271 return Ok(return_definite_error(
272 err,
273 &output_indexes,
274 &data_output,
275 data_cap_set,
276 &definite_error_handle,
277 definite_error_cap_set,
278 )
279 .await);
280 };
281 // If the snapshot point is the same as the resume point then we don't need to rewind
282 if resume_upper != req.snapshot_upper {
283 rewinds.insert(req.output_index.clone(), (caps.clone(), req));
284 }
285 }
286 }
287 }
288 trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
289
290 // We don't use _conn_tunnel_handle here, but need to keep it around to ensure that the
291 // SSH tunnel is not dropped until the replication stream is dropped.
292 let (binlog_stream, _conn_tunnel_handle) =
293 match raw_stream(&config, conn, &resume_upper).await? {
294 Ok(stream) => stream,
295 // If the replication stream cannot be obtained in a definite way there is
296 // nothing else to do. These errors are not retractable.
297 Err(err) => {
298 return Ok(return_definite_error(
299 err,
300 &output_indexes,
301 &data_output,
302 data_cap_set,
303 &definite_error_handle,
304 definite_error_cap_set,
305 )
306 .await);
307 }
308 };
309 let mut stream = pin!(binlog_stream.peekable());
310
311 // Store all partitions from the resume_upper so we can create a frontier that comprises
312 // timestamps for partitions representing the full range of UUIDs to advance our main
313 // capabilities.
314 let mut data_partitions =
315 partitions::GtidReplicationPartitions::from(resume_upper.clone());
316 let mut progress_partitions = partitions::GtidReplicationPartitions::from(resume_upper);
317
318 let mut repl_context = context::ReplContext::new(
319 &config,
320 &connection_config,
321 stream.as_mut(),
322 &table_info,
323 &metrics,
324 &mut data_output,
325 data_cap_set,
326 upper_cap_set,
327 rewinds,
328 );
329
330 let mut active_tx: Option<(Uuid, NonZeroU64)> = None;
331
332 let mut row_event_buffer = Vec::new();
333
334 while let Some(event) = repl_context.stream.next().await {
335 use mysql_async::binlog::events::*;
336 let event = event?;
337 let event_data = event.read_data()?;
338 metrics.total.inc();
339
340 match event_data {
341 Some(EventData::XidEvent(_)) => {
342 // We've received a transaction commit event, which means that we've seen
343 // all events for the current GTID and we can advance the frontier beyond.
344 let (source_id, tx_id) = active_tx.take().expect("unexpected xid event");
345
346 // Increment the transaction-id to the next GTID we should see from this source-id
347 let next_tx_id = tx_id.checked_add(1).unwrap();
348 let next_gtid =
349 GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
350
351 if let Err(err) = data_partitions.advance_frontier(next_gtid) {
352 return Ok(return_definite_error(
353 err,
354 &output_indexes,
355 &data_output,
356 data_cap_set,
357 &definite_error_handle,
358 definite_error_cap_set,
359 )
360 .await);
361 }
362 let new_upper = data_partitions.frontier();
363 repl_context.downgrade_data_cap_set("xid_event", new_upper);
364 }
365 // We receive a GtidEvent that tells us the GTID of the incoming RowsEvents (and other events)
366 Some(EventData::GtidEvent(event)) => {
367 let source_id = Uuid::from_bytes(event.sid());
368 let tx_id = NonZeroU64::new(event.gno()).unwrap();
369
370 // We are potentially about to ingest a big transaction that we don't want
371 // to store in memory. For this reason we are immediately downgrading our
372 // progress frontier to one that includes the upcoming transaction. This
373 // will cause a remap binding to be minted right away and so the data of
374 // the transaction will not accumulate in the reclock operator.
375 let next_tx_id = tx_id.checked_add(1).unwrap();
376 let next_gtid =
377 GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
378
379 if let Err(err) = progress_partitions.advance_frontier(next_gtid) {
380 return Ok(return_definite_error(
381 err,
382 &output_indexes,
383 &data_output,
384 data_cap_set,
385 &definite_error_handle,
386 definite_error_cap_set,
387 )
388 .await);
389 }
390 let new_upper = progress_partitions.frontier();
391 repl_context.downgrade_progress_cap_set("xid_event", new_upper);
392
393 // Store the information of the active transaction for the subsequent events
394 active_tx = Some((source_id, tx_id));
395 }
396 Some(EventData::RowsEvent(data)) => {
397 let (source_id, tx_id) = active_tx
398 .clone()
399 .expect("gtid cap should be set by previous GtidEvent");
400 let cur_gtid =
401 GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
402
403 events::handle_rows_event(
404 data,
405 &repl_context,
406 &cur_gtid,
407 &mut row_event_buffer,
408 )
409 .await?;
410
411 // Advance the frontier up to the point right before this GTID, since we
412 // might still see other events that are part of this same GTID, such as
413 // row events for multiple tables or large row events split into multiple.
414 if let Err(err) = data_partitions.advance_frontier(cur_gtid) {
415 return Ok(return_definite_error(
416 err,
417 &output_indexes,
418 &data_output,
419 data_cap_set,
420 &definite_error_handle,
421 definite_error_cap_set,
422 )
423 .await);
424 }
425 let new_upper = data_partitions.frontier();
426 repl_context.downgrade_data_cap_set("rows_event", new_upper);
427 }
428 Some(EventData::QueryEvent(event)) => {
429 let (source_id, tx_id) = active_tx
430 .clone()
431 .expect("gtid cap should be set by previous GtidEvent");
432 let cur_gtid =
433 GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
434
435 let should_advance =
436 events::handle_query_event(event, &mut repl_context, &cur_gtid).await?;
437
438 if should_advance {
439 active_tx = None;
440 // Increment the transaction-id to the next GTID we should see from this source-id
441 let next_tx_id = tx_id.checked_add(1).unwrap();
442 let next_gtid = GtidPartition::new_singleton(
443 source_id,
444 GtidState::Active(next_tx_id),
445 );
446
447 if let Err(err) = data_partitions.advance_frontier(next_gtid) {
448 return Ok(return_definite_error(
449 err,
450 &output_indexes,
451 &data_output,
452 data_cap_set,
453 &definite_error_handle,
454 definite_error_cap_set,
455 )
456 .await);
457 }
458 let new_upper = data_partitions.frontier();
459 repl_context.downgrade_data_cap_set("query_event", new_upper);
460 }
461 }
462 _ => {
463 // TODO: Handle other event types
464 metrics.ignored.inc();
465 }
466 }
467 }
468 // We never expect the replication stream to gracefully end
469 Err(TransientError::ReplicationEOF)
470 }))
471 });
472
473 // TODO: Split row decoding into a separate operator that can be distributed across all workers
474
475 let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
476
477 (
478 data_stream.as_collection(),
479 upper_stream,
480 errors,
481 button.press_on_drop(),
482 )
483}
484
485/// Produces the replication stream from the MySQL server. This will return all transactions
486/// whose GTIDs were not present in the GTID UUIDs referenced in the `resume_uppper` partitions.
487async fn raw_stream(
488 config: &RawSourceCreationConfig,
489 mut conn: MySqlConn,
490 resume_upper: &Antichain<GtidPartition>,
491) -> Result<Result<(BinlogStream, Option<ManagedSshTunnelHandle>), DefiniteError>, TransientError> {
492 // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
493 match validate_mysql_repl_settings(&mut conn).await {
494 Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
495 return Ok(Err(DefiniteError::ServerConfigurationError(
496 err.to_string(),
497 )));
498 }
499 Err(err) => Err(err)?,
500 Ok(()) => (),
501 };
502
503 // To start the stream we need to provide a GTID set of the transactions that we've 'seen'
504 // and the server will send us all transactions that have been committed after that point.
505 // NOTE: The 'Gno' intervals in this transaction-set use an open set [start, end)
506 // interval, which is different than the closed-set [start, end] form returned by the
507 // @gtid_executed system variable. So the intervals we construct in this GTID set
508 // end with the value of the transaction-id that we want to start replication at,
509 // which happens to be the same as the definition of a frontier value.
510 // https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#sysvar_gtid_executed
511 // https://dev.mysql.com/doc/dev/mysql-server/latest/classGtid__set.html#ab46da5ceeae0198b90f209b0a8be2a24
512 let seen_gtids = resume_upper
513 .iter()
514 .flat_map(|partition| match partition.timestamp() {
515 GtidState::Absent => None,
516 GtidState::Active(frontier_time) => {
517 let part_uuid = partition
518 .interval()
519 .singleton()
520 .expect("Non-absent paritions will be singletons");
521 // NOTE: Since we enforce replica_preserve_commit_order=ON we can start the interval at 1
522 // since we know that all transactions with a lower transaction id were monotonic
523 Some(
524 Sid::new(*part_uuid.as_bytes())
525 .with_interval(GnoInterval::new(1, frontier_time.get())),
526 )
527 }
528 })
529 .collect::<Vec<_>>();
530
531 // Request that the stream provide us with a heartbeat message when no other messages have
532 // been sent. This isn't strictly necessary, but is a lightweight additional general
533 // health-check for the replication stream
534 conn.query_drop(format!(
535 "SET @master_heartbeat_period = {};",
536 mz_storage_types::dyncfgs::MYSQL_REPLICATION_HEARTBEAT_INTERVAL
537 .get(config.config.config_set())
538 .as_nanos()
539 ))
540 .await?;
541
542 // Generate a deterministic server-id for identifying us as a replica on the upstream mysql server.
543 // The value does not actually matter since it's irrelevant for GTID-based replication and won't
544 // cause errors if it happens to be the same as another replica in the mysql cluster (based on testing),
545 // but by setting it to a constant value we can make it easier for users to identify Materialize connections
546 let server_id = match config.id {
547 GlobalId::System(id) => id,
548 GlobalId::User(id) => id,
549 GlobalId::Transient(id) => id,
550 _ => unreachable!(),
551 };
552 let server_id = match u32::try_from(server_id) {
553 Ok(id) if id + REPLICATION_SERVER_ID_OFFSET < u32::MAX => id + REPLICATION_SERVER_ID_OFFSET,
554 _ => REPLICATION_SERVER_ID_OFFSET,
555 };
556
557 trace!(
558 "requesting replication stream with seen_gtids: {seen_gtids:?} \
559 and server_id: {server_id:?}"
560 );
561
562 // We need to transform the connection into a BinlogStream (which takes the `Conn` by value),
563 // but to avoid dropping any active SSH tunnel used by the connection we need to preserve the
564 // tunnel handle and return it
565 let (inner_conn, conn_tunnel_handle) = conn.take();
566
567 let repl_stream = match inner_conn
568 .get_binlog_stream(
569 BinlogStreamRequest::new(server_id)
570 .with_gtid()
571 .with_gtid_set(seen_gtids),
572 )
573 .await
574 {
575 Ok(stream) => stream,
576 Err(mysql_async::Error::Server(ref server_err))
577 if server_err.code == ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE =>
578 {
579 // The GTID set we requested is no longer available
580 return Ok(Err(DefiniteError::BinlogNotAvailable));
581 }
582 // TODO: handle other error types. Some may require a re-snapshot and some may be transient
583 Err(err) => return Err(err.into()),
584 };
585
586 Ok(Ok((repl_stream, conn_tunnel_handle)))
587}