mz_storage/source/mysql/replication.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the replication side of the [`MySqlSourceConnection`] ingestion dataflow.
11//!
12//! # Progress tracking using Partitioned Timestamps
13//!
14//! This dataflow uses a Partitioned Timestamp implementation to represent the GTID Set that
15//! comprises the full set of committed transactions from the MySQL Server. The frontier
16//! representing progress for this dataflow represents the full range of possible UUIDs +
17//! Transaction IDs of future GTIDs that could be added to the GTID Set.
18//!
19//! See the [`mz_storage_types::sources::mysql::GtidPartition`] type for more information.
20//!
21//! To maintain a complete frontier of the full UUID GTID range, we use a
22//! [`partitions::GtidReplicationPartitions`] struct to store the GTID Set as a set of partitions.
23//! This allows us to easily advance the frontier each time we see a new GTID on the replication
24//! stream.
25//!
26//! # Resumption
27//!
28//! When the dataflow is resumed, the MySQL replication stream is started from the GTID frontier
29//! of the minimum frontier across all source outputs. This is compared against the GTID set that
30//! may still be obtained from the MySQL server, using the @@GTID_PURGED value in MySQL to
31//! determine GTIDs that are no longer available in the binlog and to put the source in an error
32//! state if we cannot resume from the GTID frontier.
33//!
34//! # Rewinds
35//!
36//! The replication stream may be resumed from a point before the snapshot for a specific output
37//! occurs. To avoid double-counting updates that were present in the snapshot, we store a map
38//! of pending rewinds that we've received from the snapshot operator, and when we see updates
39//! for an output that were present in the snapshot, we negate the snapshot update
40//! (at the minimum timestamp) and send it again at the correct GTID.
41
42use std::collections::BTreeMap;
43use std::num::NonZeroU64;
44use std::pin::pin;
45use std::sync::Arc;
46
47use differential_dataflow::AsCollection;
48use futures::StreamExt;
49use itertools::Itertools;
50use mysql_async::prelude::Queryable;
51use mysql_async::{BinlogStream, BinlogStreamRequest, GnoInterval, Sid};
52use mz_ore::future::InTask;
53use mz_ssh_util::tunnel_manager::ManagedSshTunnelHandle;
54use mz_timely_util::containers::stack::AccountedStackBuilder;
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Exchange;
58use timely::dataflow::operators::Concat;
59use timely::dataflow::operators::core::Map;
60use timely::dataflow::{Scope, StreamVec};
61use timely::progress::{Antichain, Timestamp};
62use tracing::trace;
63use uuid::Uuid;
64
65use mz_mysql_util::{
66 ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE, MySqlConn, MySqlError, query_sys_var,
67};
68use mz_ore::cast::CastFrom;
69use mz_repr::GlobalId;
70use mz_storage_types::errors::DataflowError;
71use mz_storage_types::sources::MySqlSourceConnection;
72use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
73use mz_timely_util::builder_async::{
74 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
75};
76
77use crate::metrics::source::mysql::MySqlSourceMetrics;
78use crate::source::RawSourceCreationConfig;
79use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
80
81use super::{
82 DefiniteError, ReplicationError, RewindRequest, SourceOutputInfo, TransientError,
83 return_definite_error, validate_mysql_repl_settings,
84};
85
86mod context;
87mod events;
88mod partitions;
89
90/// Used as a partition id to determine if the worker is
91/// responsible for reading from the MySQL replication stream
92static REPL_READER: &str = "reader";
93
94/// A constant arbitrary offset to add to the source-id to
95/// produce a deterministic server-id for identifying Materialize
96/// as a replica on the upstream MySQL server.
97/// TODO(roshan): Add user-facing documentation for this
98static REPLICATION_SERVER_ID_OFFSET: u32 = 524000;
99
100/// Renders the replication dataflow. See the module documentation for more
101/// information.
102pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
103 scope: G,
104 config: RawSourceCreationConfig,
105 connection: MySqlSourceConnection,
106 source_outputs: Vec<SourceOutputInfo>,
107 rewind_stream: StreamVec<G, RewindRequest>,
108 metrics: MySqlSourceMetrics,
109) -> (
110 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
111 StreamVec<G, ReplicationError>,
112 PressOnDropButton,
113) {
114 let op_name = format!("MySqlReplicationReader({})", config.id);
115 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
116
117 let repl_reader_id = u64::cast_from(config.responsible_worker(REPL_READER));
118 let (mut data_output, data_stream) = builder.new_output::<AccountedStackBuilder<_>>();
119 // Captures DefiniteErrors that affect the entire source, including all outputs
120 let (definite_error_handle, definite_errors) =
121 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
122 let mut rewind_input = builder.new_input_for(
123 rewind_stream,
124 Exchange::new(move |_| repl_reader_id),
125 &data_output,
126 );
127
128 let output_indexes = source_outputs
129 .iter()
130 .map(|output| output.output_index)
131 .collect_vec();
132
133 metrics.tables.set(u64::cast_from(source_outputs.len()));
134
135 let (button, transient_errors) = builder.build_fallible(move |caps| {
136 let busy_signal = Arc::clone(&config.busy_signal);
137 Box::pin(SignaledFuture::new(busy_signal, async move {
138 let (id, worker_id) = (config.id, config.worker_id);
139 let [data_cap_set, definite_error_cap_set]: &mut [_; 2] = caps.try_into().unwrap();
140
141 // Only run the replication reader on the worker responsible for it.
142 if !config.responsible_for(REPL_READER) {
143 return Ok(());
144 }
145
146 let connection_config = connection
147 .connection
148 .config(
149 &config.config.connection_context.secrets_reader,
150 &config.config,
151 InTask::Yes,
152 )
153 .await?;
154
155 let mut conn = connection_config
156 .connect(
157 &format!("timely-{worker_id} MySQL replication reader"),
158 &config.config.connection_context.ssh_tunnel_manager,
159 )
160 .await?;
161
162 // Get the set of GTIDs that have been purged from the binlogs. The assumption is that this
163 // represents the frontier of possible GTIDs that exist in the binlog, that we can start
164 // replicating from.
165 let binlog_purged_set = query_sys_var(&mut conn, "global.gtid_purged").await?;
166 let binlog_frontier = match gtid_set_frontier(&binlog_purged_set) {
167 Ok(frontier) => frontier,
168 Err(err) => {
169 let err = DefiniteError::UnsupportedGtidState(err.to_string());
170 return Ok(
171 // If GTID intervals in the binlog are not available in a monotonic consecutive
172 // order this breaks all of our assumptions and there is nothing else we can do.
173 // This can occur if the mysql server is restored to a previous point-in-time
174 // or if a user manually adds transactions to the @@gtid_purged system var.
175 return_definite_error(
176 err,
177 &output_indexes,
178 &data_output,
179 data_cap_set,
180 &definite_error_handle,
181 definite_error_cap_set,
182 )
183 .await,
184 );
185 }
186 };
187
188 trace!(%id, "timely-{worker_id} replication binlog frontier: {binlog_frontier:?}");
189
190 // upstream-table-name: Vec<SourceOutputInfo> since multiple
191 // outputs can refer to the same table
192 let mut table_info = BTreeMap::new();
193 let mut output_uppers = Vec::new();
194
195 // Calculate the lowest frontier across all outputs, which represents the point which
196 // we should start replication from.
197 let min_frontier = Antichain::from_elem(GtidPartition::minimum());
198 for output in source_outputs.into_iter() {
199 // If an output is resuming at the minimum frontier then its snapshot
200 // has not yet been committed.
201 // We need to resume from a frontier before the output's snapshot frontier
202 // to ensure we don't miss updates that happen after the snapshot was taken.
203 //
204 // This also ensures that tables created as part of the same CREATE SOURCE
205 // statement are 'effectively' snapshot at the same GTID Set, even if their
206 // actual snapshot frontiers are different due to a restart.
207 //
208 // We've chosen the frontier beyond the GTID Set recorded
209 // during purification as this resume point.
210 if &output.resume_upper == &min_frontier {
211 output_uppers.push(output.initial_gtid_set.clone());
212 } else {
213 output_uppers.push(output.resume_upper.clone());
214 }
215
216 table_info
217 .entry(output.table_name.clone())
218 .or_insert_with(Vec::new)
219 .push(output);
220 }
221 let resume_upper = match output_uppers.len() {
222 0 => {
223 // If there are no outputs to replicate then we will just be updating the
224 // source progress collection. In this case we can just start from the head of
225 // the binlog to avoid wasting time on old events.
226 trace!(%id, "timely-{worker_id} replication reader found no outputs \
227 to replicate, using latest gtid_executed as resume_upper");
228 let executed_gtid_set =
229 query_sys_var(&mut conn, "global.gtid_executed").await?;
230
231 gtid_set_frontier(&executed_gtid_set)?
232 }
233 _ => Antichain::from_iter(output_uppers.into_iter().flatten()),
234 };
235
236 // Validate that we can actually resume from this upper.
237 if !PartialOrder::less_equal(&binlog_frontier, &resume_upper) {
238 let err = DefiniteError::BinlogMissingResumePoint(
239 format!("{:?}", binlog_frontier),
240 format!("{:?}", resume_upper),
241 );
242 return Ok(return_definite_error(
243 err,
244 &output_indexes,
245 &data_output,
246 data_cap_set,
247 &definite_error_handle,
248 definite_error_cap_set,
249 )
250 .await);
251 };
252
253 data_cap_set.downgrade(&*resume_upper);
254 trace!(%id, "timely-{worker_id} replication reader started at {:?}", resume_upper);
255
256 let mut rewinds = BTreeMap::new();
257 while let Some(event) = rewind_input.next().await {
258 if let AsyncEvent::Data(caps, data) = event {
259 for req in data {
260 // Check that the replication stream will be resumed from the snapshot point or before.
261 if !PartialOrder::less_equal(&resume_upper, &req.snapshot_upper) {
262 let err = DefiniteError::BinlogMissingResumePoint(
263 format!("{:?}", resume_upper),
264 format!("{:?}", req.snapshot_upper),
265 );
266 return Ok(return_definite_error(
267 err,
268 &output_indexes,
269 &data_output,
270 data_cap_set,
271 &definite_error_handle,
272 definite_error_cap_set,
273 )
274 .await);
275 };
276 // If the snapshot point is the same as the resume point then we don't need to rewind
277 if resume_upper != req.snapshot_upper {
278 rewinds.insert(req.output_index.clone(), (caps.clone(), req));
279 }
280 }
281 }
282 }
283 trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
284
285 // We don't use _conn_tunnel_handle here, but need to keep it around to ensure that the
286 // SSH tunnel is not dropped until the replication stream is dropped.
287 let (binlog_stream, _conn_tunnel_handle) =
288 match raw_stream(&config, conn, &resume_upper).await? {
289 Ok(stream) => stream,
290 // If the replication stream cannot be obtained in a definite way there is
291 // nothing else to do. These errors are not retractable.
292 Err(err) => {
293 return Ok(return_definite_error(
294 err,
295 &output_indexes,
296 &data_output,
297 data_cap_set,
298 &definite_error_handle,
299 definite_error_cap_set,
300 )
301 .await);
302 }
303 };
304 let mut stream = pin!(binlog_stream.peekable());
305
306 // Store all partitions from the resume_upper so we can create a frontier that comprises
307 // timestamps for partitions representing the full range of UUIDs to advance our main
308 // capabilities.
309 let mut data_partitions =
310 partitions::GtidReplicationPartitions::from(resume_upper.clone());
311 let mut progress_partitions = partitions::GtidReplicationPartitions::from(resume_upper);
312
313 let mut repl_context = context::ReplContext::new(
314 &config,
315 &connection_config,
316 stream.as_mut(),
317 &table_info,
318 &metrics,
319 &mut data_output,
320 data_cap_set,
321 rewinds,
322 );
323
324 let mut active_tx: Option<(Uuid, NonZeroU64)> = None;
325
326 let mut row_event_buffer = Vec::new();
327
328 while let Some(event) = repl_context.stream.next().await {
329 use mysql_async::binlog::events::*;
330 let event = event?;
331 let event_data = event.read_data()?;
332 metrics.total.inc();
333
334 match event_data {
335 Some(EventData::XidEvent(_)) => {
336 // We've received a transaction commit event, which means that we've seen
337 // all events for the current GTID and we can advance the frontier beyond.
338 let (source_id, tx_id) = active_tx.take().expect("unexpected xid event");
339
340 // Increment the transaction-id to the next GTID we should see from this source-id
341 let next_tx_id = tx_id.checked_add(1).unwrap();
342 let next_gtid =
343 GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
344
345 if let Err(err) = data_partitions.advance_frontier(next_gtid) {
346 return Ok(return_definite_error(
347 err,
348 &output_indexes,
349 &data_output,
350 data_cap_set,
351 &definite_error_handle,
352 definite_error_cap_set,
353 )
354 .await);
355 }
356 let new_upper = data_partitions.frontier();
357 repl_context.downgrade_data_cap_set("xid_event", new_upper);
358 }
359 // We receive a GtidEvent that tells us the GTID of the incoming RowsEvents (and other events)
360 Some(EventData::GtidEvent(event)) => {
361 let source_id = Uuid::from_bytes(event.sid());
362 let tx_id = NonZeroU64::new(event.gno()).unwrap();
363
364 // We are potentially about to ingest a big transaction that we don't want
365 // to store in memory. For this reason we are immediately downgrading our
366 // progress frontier to one that includes the upcoming transaction. This
367 // will cause a remap binding to be minted right away and so the data of
368 // the transaction will not accumulate in the reclock operator.
369 let next_tx_id = tx_id.checked_add(1).unwrap();
370 let next_gtid =
371 GtidPartition::new_singleton(source_id, GtidState::Active(next_tx_id));
372
373 if let Err(err) = progress_partitions.advance_frontier(next_gtid) {
374 return Ok(return_definite_error(
375 err,
376 &output_indexes,
377 &data_output,
378 data_cap_set,
379 &definite_error_handle,
380 definite_error_cap_set,
381 )
382 .await);
383 }
384 // Store the information of the active transaction for the subsequent events
385 active_tx = Some((source_id, tx_id));
386 }
387 Some(EventData::RowsEvent(data)) => {
388 let (source_id, tx_id) = active_tx
389 .clone()
390 .expect("gtid cap should be set by previous GtidEvent");
391 let cur_gtid =
392 GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
393
394 events::handle_rows_event(
395 data,
396 &repl_context,
397 &cur_gtid,
398 &mut row_event_buffer,
399 )
400 .await?;
401
402 // Advance the frontier up to the point right before this GTID, since we
403 // might still see other events that are part of this same GTID, such as
404 // row events for multiple tables or large row events split into multiple.
405 if let Err(err) = data_partitions.advance_frontier(cur_gtid) {
406 return Ok(return_definite_error(
407 err,
408 &output_indexes,
409 &data_output,
410 data_cap_set,
411 &definite_error_handle,
412 definite_error_cap_set,
413 )
414 .await);
415 }
416 let new_upper = data_partitions.frontier();
417 repl_context.downgrade_data_cap_set("rows_event", new_upper);
418 }
419 Some(EventData::QueryEvent(event)) => {
420 let (source_id, tx_id) = active_tx
421 .clone()
422 .expect("gtid cap should be set by previous GtidEvent");
423 let cur_gtid =
424 GtidPartition::new_singleton(source_id, GtidState::Active(tx_id));
425
426 let should_advance =
427 events::handle_query_event(event, &mut repl_context, &cur_gtid).await?;
428
429 if should_advance {
430 active_tx = None;
431 // Increment the transaction-id to the next GTID we should see from this source-id
432 let next_tx_id = tx_id.checked_add(1).unwrap();
433 let next_gtid = GtidPartition::new_singleton(
434 source_id,
435 GtidState::Active(next_tx_id),
436 );
437
438 if let Err(err) = data_partitions.advance_frontier(next_gtid) {
439 return Ok(return_definite_error(
440 err,
441 &output_indexes,
442 &data_output,
443 data_cap_set,
444 &definite_error_handle,
445 definite_error_cap_set,
446 )
447 .await);
448 }
449 let new_upper = data_partitions.frontier();
450 repl_context.downgrade_data_cap_set("query_event", new_upper);
451 }
452 }
453 _ => {
454 // TODO: Handle other event types
455 metrics.ignored.inc();
456 }
457 }
458 }
459 // We never expect the replication stream to gracefully end
460 Err(TransientError::ReplicationEOF)
461 }))
462 });
463
464 // TODO: Split row decoding into a separate operator that can be distributed across all workers
465
466 let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
467
468 (data_stream.as_collection(), errors, button.press_on_drop())
469}
470
471/// Produces the replication stream from the MySQL server. This will return all transactions
472/// whose GTIDs were not present in the GTID UUIDs referenced in the `resume_uppper` partitions.
473async fn raw_stream(
474 config: &RawSourceCreationConfig,
475 mut conn: MySqlConn,
476 resume_upper: &Antichain<GtidPartition>,
477) -> Result<Result<(BinlogStream, Option<ManagedSshTunnelHandle>), DefiniteError>, TransientError> {
478 // Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
479 match validate_mysql_repl_settings(&mut conn).await {
480 Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
481 return Ok(Err(DefiniteError::ServerConfigurationError(
482 err.to_string(),
483 )));
484 }
485 Err(err) => Err(err)?,
486 Ok(()) => (),
487 };
488
489 // To start the stream we need to provide a GTID set of the transactions that we've 'seen'
490 // and the server will send us all transactions that have been committed after that point.
491 // NOTE: The 'Gno' intervals in this transaction-set use an open set [start, end)
492 // interval, which is different than the closed-set [start, end] form returned by the
493 // @gtid_executed system variable. So the intervals we construct in this GTID set
494 // end with the value of the transaction-id that we want to start replication at,
495 // which happens to be the same as the definition of a frontier value.
496 // https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#sysvar_gtid_executed
497 // https://dev.mysql.com/doc/dev/mysql-server/latest/classGtid__set.html#ab46da5ceeae0198b90f209b0a8be2a24
498 let seen_gtids = resume_upper
499 .iter()
500 .flat_map(|partition| match partition.timestamp() {
501 GtidState::Absent => None,
502 GtidState::Active(frontier_time) => {
503 let part_uuid = partition
504 .interval()
505 .singleton()
506 .expect("Non-absent paritions will be singletons");
507 // NOTE: Since we enforce replica_preserve_commit_order=ON we can start the interval at 1
508 // since we know that all transactions with a lower transaction id were monotonic
509 Some(
510 Sid::new(*part_uuid.as_bytes())
511 .with_interval(GnoInterval::new(1, frontier_time.get())),
512 )
513 }
514 })
515 .collect::<Vec<_>>();
516
517 // Request that the stream provide us with a heartbeat message when no other messages have
518 // been sent. This isn't strictly necessary, but is a lightweight additional general
519 // health-check for the replication stream
520 conn.query_drop(format!(
521 "SET @master_heartbeat_period = {};",
522 mz_storage_types::dyncfgs::MYSQL_REPLICATION_HEARTBEAT_INTERVAL
523 .get(config.config.config_set())
524 .as_nanos()
525 ))
526 .await?;
527
528 // Generate a deterministic server-id for identifying us as a replica on the upstream mysql server.
529 // The value does not actually matter since it's irrelevant for GTID-based replication and won't
530 // cause errors if it happens to be the same as another replica in the mysql cluster (based on testing),
531 // but by setting it to a constant value we can make it easier for users to identify Materialize connections
532 let server_id = match config.id {
533 GlobalId::System(id) => id,
534 GlobalId::User(id) => id,
535 GlobalId::Transient(id) => id,
536 _ => unreachable!(),
537 };
538 let server_id = match u32::try_from(server_id) {
539 Ok(id) if id + REPLICATION_SERVER_ID_OFFSET < u32::MAX => id + REPLICATION_SERVER_ID_OFFSET,
540 _ => REPLICATION_SERVER_ID_OFFSET,
541 };
542
543 trace!(
544 "requesting replication stream with seen_gtids: {seen_gtids:?} \
545 and server_id: {server_id:?}"
546 );
547
548 // We need to transform the connection into a BinlogStream (which takes the `Conn` by value),
549 // but to avoid dropping any active SSH tunnel used by the connection we need to preserve the
550 // tunnel handle and return it
551 let (inner_conn, conn_tunnel_handle) = conn.take();
552
553 let repl_stream = match inner_conn
554 .get_binlog_stream(
555 BinlogStreamRequest::new(server_id)
556 .with_gtid()
557 .with_gtid_set(seen_gtids),
558 )
559 .await
560 {
561 Ok(stream) => stream,
562 Err(mysql_async::Error::Server(ref server_err))
563 if server_err.code == ER_SOURCE_FATAL_ERROR_READING_BINLOG_CODE =>
564 {
565 // The GTID set we requested is no longer available
566 return Ok(Err(DefiniteError::BinlogNotAvailable));
567 }
568 // TODO: handle other error types. Some may require a re-snapshot and some may be transient
569 Err(err) => return Err(err.into()),
570 };
571
572 Ok(Ok((repl_stream, conn_tunnel_handle)))
573}