Skip to main content

mz_storage/source/mysql/replication/
events.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use itertools::Itertools;
11use sqlparser::ast::{ObjectName, ObjectNamePart, ObjectType, Statement};
12use sqlparser::dialect::MySqlDialect;
13use sqlparser::parser::Parser;
14use std::collections::BTreeMap;
15
16use maplit::btreemap;
17use mysql_async::binlog::events::OptionalMetaExtractor;
18use mysql_common::binlog::events::{QueryEvent, RowsEventData};
19use mz_mysql_util::{MySqlError, pack_mysql_row};
20use mz_ore::iter::IteratorExt;
21use mz_repr::{Diff, Row};
22use mz_storage_types::errors::DataflowError;
23use mz_storage_types::sources::mysql::GtidPartition;
24use timely::progress::Timestamp;
25use tracing::trace;
26
27use crate::source::mysql::SourceOutputInfo;
28use crate::source::types::{FuelSize, SourceMessage};
29
30use super::super::schemas::verify_schemas;
31use super::super::{DefiniteError, MySqlTableName, TransientError};
32use super::context::ReplContext;
33
34const DIALECT: MySqlDialect = MySqlDialect {};
35
36/// Returns the MySqlTableName for the given table name referenced in a
37/// SQL statement, using the current schema if the table name is unqualified.
38fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
39    let stripped = name.replace('`', "");
40    let mut name_iter = stripped.split('.');
41    mysql_table_name(name_iter.next(), name_iter.next(), current_schema, name)
42}
43
44fn mysql_table_name(
45    first_component: Option<&str>,
46    second_component: Option<&str>,
47    current_schema: &str,
48    name: &str,
49) -> Result<MySqlTableName, TransientError> {
50    match (first_component, second_component) {
51        (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
52        (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
53        _ => Err(TransientError::Generic(anyhow::anyhow!(
54            "Invalid table name from QueryEvent: {}",
55            name
56        ))),
57    }
58}
59
60/// This function has the same intent as table_ident except handles the object name type parsed out of the sql by sqlparser rather than a raw string.
61/// The ObjectName type is more flexible than the constraints for an identifier in mysql. i.e. it has a function type, which appears to only be supported
62/// in snowflake. It also has a vector for identifier components, however a table identifier from the binlog should only have 1 or 2 components - never 0 or more than 2
63fn table_ident_from_object_name(
64    name: &ObjectName,
65    current_schema: &str,
66) -> Result<MySqlTableName, TransientError> {
67    let processed_name_parts: Vec<String> = name.0.iter().map(|part| match part {
68        ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
69        // Functions for table name identifiers are a snowflake-specific concept, unexpected for mysql so we should fail hard.
70        ObjectNamePart::Function(_) => Err(TransientError::Generic(anyhow::anyhow!(
71            "Invalid table name from QueryEvent, function identifiers not supported in mysql: {}", name
72        ))),
73    }).collect::<Result<_, _>>()?;
74    if processed_name_parts.len() != 1 && processed_name_parts.len() != 2 {
75        return Err(TransientError::Generic(anyhow::anyhow!(
76            "Invalid table name from QueryEvent: {}",
77            name
78        )));
79    }
80
81    let mut name_parts_iter = processed_name_parts.iter().map(|x| x.as_str());
82    mysql_table_name(
83        name_parts_iter.next(),
84        name_parts_iter.next(),
85        current_schema,
86        &name.to_string(),
87    )
88}
89
90/// Handles QueryEvents from the MySQL replication stream. Since we only use
91/// row-based replication, we only expect to see QueryEvents for DDL changes.
92///
93/// From the MySQL docs: 'A Query_event is created for each query that modifies
94/// the database, unless the query is logged row-based.' This means that we can
95/// expect any DDL changes to be represented as QueryEvents, which we must parse
96/// to figure out if any of the tables we care about have been affected.
97///
98/// This function returns a bool to represent whether the event that was handled
99/// represents a 'complete' event that should cause the frontier to advance beyond
100/// the current GTID.
101pub(super) async fn handle_query_event(
102    event: QueryEvent<'_>,
103    ctx: &mut ReplContext<'_>,
104    new_gtid: &GtidPartition,
105) -> Result<bool, TransientError> {
106    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
107
108    let query = event.query();
109    let current_schema = event.schema();
110    let mut is_complete_event = false;
111
112    // MySQL does not permit transactional DDL, so luckily we don't need to
113    // worry about tracking BEGIN/COMMIT query events. We only need to look
114    // for DDL changes that affect the schema of the tables we care about.
115    let mut query_iter = query.split_ascii_whitespace();
116    let first = query_iter.next();
117    let second = query_iter.next();
118    match (
119        first.map(str::to_ascii_lowercase).as_deref(),
120        second.map(str::to_ascii_lowercase).as_deref(),
121    ) {
122        // Detect `ALTER TABLE <tbl>`, `RENAME TABLE <tbl>` statements
123        (Some("alter") | Some("rename"), Some("table")) => {
124            let table = table_ident(
125                query_iter.next().ok_or_else(|| {
126                    TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
127                })?,
128                &current_schema,
129            )?;
130            is_complete_event = true;
131            if ctx.table_info.contains_key(&table) {
132                trace!(%id, "timely-{worker_id} DDL change detected \
133                       for {table:?}");
134                let info = &ctx.table_info[&table];
135                let mut conn = ctx
136                    .connection_config
137                    .connect(
138                        &format!("timely-{worker_id} MySQL "),
139                        &ctx.config.config.connection_context.ssh_tunnel_manager,
140                    )
141                    .await?;
142                for (err_output, err) in
143                    verify_schemas(&mut *conn, btreemap! { &table => info }).await?
144                {
145                    tracing::warn!(%id, "timely-{worker_id} DDL change \
146                           verification error for {table:?}[{}]: {err:?}",
147                           err_output.output_index);
148                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
149                    let update = (
150                        (err_output.output_index, Err(err.into())),
151                        new_gtid.clone(),
152                        Diff::ONE,
153                    );
154                    let size = update.fuel_size();
155                    ctx.data_output.give_fueled(&gtid_cap, update, size).await;
156                    ctx.errored_outputs.insert(err_output.output_index);
157                }
158            }
159        }
160        // Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements.
161        (Some("drop"), Some("table")) => {
162            let dropped_tables = drop_table_identifiers(&current_schema, &query)?;
163
164            // Sources referencing the dropped table name that were created before the table was dropped. Before is determined
165            // by looking at the initial gtid set for the source and ensuring that's before the new gtid.
166            let sources_to_drop: BTreeMap<&MySqlTableName, Vec<&SourceOutputInfo>> = dropped_tables
167                .iter()
168                .filter_map(|table_name| {
169                    ctx.table_info
170                        .get_key_value(table_name)
171                        .map(|(name, info)| {
172                            let kept = info
173                                .iter()
174                                .filter(|output| {
175                                    !ctx.errored_outputs.contains(&output.output_index)
176                                    // Only drop sources that were created before the table was dropped.
177                                    && output.initial_gtid_set.less_equal(new_gtid)
178                                })
179                                .collect();
180                            (name, kept)
181                        })
182                })
183                .collect();
184            is_complete_event = true;
185            for (table_name, outputs) in sources_to_drop {
186                tracing::info!(%id, "timely-{worker_id} DDL change dropped outputs: {outputs:?}");
187                for output in outputs {
188                    let err = DefiniteError::TableDropped(table_name.to_string());
189                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
190                    let update = (
191                        (output.output_index, Err(err.into())),
192                        new_gtid.clone(),
193                        Diff::ONE,
194                    );
195                    let size = std::mem::size_of_val(&update);
196                    ctx.data_output.give_fueled(&gtid_cap, update, size).await;
197                    ctx.errored_outputs.insert(output.output_index);
198                }
199            }
200        }
201        // Detect `TRUNCATE [TABLE] <tbl>` statements
202        (Some("truncate"), Some(_)) => {
203            // We need the original un-lowercased version of 'second' since it might be a table ref
204            let second = second.expect("known to be Some");
205            let table = if second.eq_ignore_ascii_case("table") {
206                table_ident(
207                    query_iter.next().ok_or_else(|| {
208                        TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
209                    })?,
210                    &current_schema,
211                )?
212            } else {
213                table_ident(second, &current_schema)?
214            };
215            is_complete_event = true;
216            if ctx.table_info.contains_key(&table) {
217                trace!(%id, "timely-{worker_id} TRUNCATE detected \
218                       for {table:?}");
219                if let Some(info) = ctx.table_info.get(&table) {
220                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
221                    for output in info {
222                        let update = (
223                            (
224                                output.output_index,
225                                Err(DataflowError::from(DefiniteError::TableTruncated(
226                                    table.to_string(),
227                                ))),
228                            ),
229                            new_gtid.clone(),
230                            Diff::ONE,
231                        );
232                        let size = update.fuel_size();
233                        ctx.data_output.give_fueled(&gtid_cap, update, size).await;
234                        ctx.errored_outputs.insert(output.output_index);
235                    }
236                }
237            }
238        }
239        // Detect `COMMIT` statements which signify the end of a transaction on non-XA capable
240        // storage engines
241        (Some("commit"), None) => {
242            is_complete_event = true;
243        }
244        // Detect `CREATE TABLE <tbl>` statements which don't affect existing tables but do
245        // signify a complete event (e.g. for the purposes of advancing the GTID)
246        (Some("create"), Some("table")) => {
247            // CREATE TABLE ... SELECT will have subsequent `RowEvent`s, to account for this, the statement contains the clause "START TRANSACTION".
248            // https://dev.mysql.com/worklog/task/?id=13355
249
250            let mut peek_stream = query_iter.peekable();
251            let mut ctas = false;
252            while let Some(token) = peek_stream.next() {
253                if token.eq_ignore_ascii_case("start")
254                    && peek_stream
255                        .peek()
256                        .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
257                {
258                    ctas = true;
259                    break;
260                }
261            }
262            is_complete_event = !ctas;
263        }
264        _ => {}
265    }
266
267    Ok(is_complete_event)
268}
269
270/// Handles parsing table names from "DROP TABLE [IF EXISTS] table1, table2, table3".
271fn drop_table_identifiers(
272    current_schema: &str,
273    query: &str,
274) -> Result<Vec<MySqlTableName>, TransientError> {
275    let invalid =
276        |msg| TransientError::Generic(anyhow::anyhow!("Invalid DDL query, {msg}, got: {query}"));
277
278    let parse_result = Parser::parse_sql(&DIALECT, query)
279        .map_err(|e| TransientError::Generic(anyhow::anyhow!(e)))?;
280    let stmt = parse_result
281        .iter()
282        .exactly_one()
283        .map_err(|_| invalid("expected only a single statement from the binlog"))?;
284
285    let Statement::Drop {
286        object_type,
287        temporary,
288        names,
289        ..
290    } = stmt
291    else {
292        return Err(invalid("expected DROP statement"));
293    };
294
295    if *object_type != ObjectType::Table || *temporary {
296        return Err(invalid("expected DROP TABLE statement"));
297    }
298    let table_identifiers: Vec<MySqlTableName> = names
299        .iter()
300        .map(|name| table_ident_from_object_name(name, current_schema))
301        .collect::<Result<Vec<_>, _>>()?;
302    Ok(table_identifiers)
303}
304
305/// Handles RowsEvents from the MySQL replication stream. These events contain
306/// insert/update/delete events for a single transaction or committed statement.
307///
308/// We use these events to update the dataflow with the new rows, and return a new
309/// frontier with which to advance the dataflow's progress.
310pub(super) async fn handle_rows_event(
311    event: RowsEventData<'_>,
312    ctx: &mut ReplContext<'_>,
313    new_gtid: &GtidPartition,
314    event_buffer: &mut Vec<(
315        (usize, Result<SourceMessage, DataflowError>),
316        GtidPartition,
317        mz_repr::Diff,
318    )>,
319) -> Result<(), TransientError> {
320    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
321
322    // Find the relevant table
323    let binlog_table_id = event.table_id();
324    let table_map_event = ctx
325        .stream
326        .get_ref()
327        .get_tme(binlog_table_id)
328        .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
329    let table = MySqlTableName::new(
330        &*table_map_event.database_name(),
331        &*table_map_event.table_name(),
332    );
333
334    let outputs = ctx.table_info.get(&table).map(|outputs| {
335        outputs
336            .into_iter()
337            .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
338            .collect::<Vec<_>>()
339    });
340    let outputs = match outputs {
341        Some(outputs) => outputs,
342        None => {
343            // We don't know about this table, or there are no un-errored outputs for it.
344            return Ok(());
345        }
346    };
347
348    trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
349
350    // Capability for this event.
351    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
352
353    // We can check here if the binlog has full row metadata by looking at the column name optional
354    // metadata, which is only present if full metadata is enabled.
355    let optional_metadata = OptionalMetaExtractor::new(table_map_event.iter_optional_meta())?;
356    let has_full_metadata = optional_metadata.iter_column_name().next().is_some();
357
358    // Iterate over the rows in this RowsEvent. Each row is a pair of 'before_row', 'after_row',
359    // to accomodate for updates and deletes (which include a before_row),
360    // and updates and inserts (which inclued an after row).
361    let mut final_row = Row::default();
362    let mut rows_iter = event.rows(table_map_event);
363    let mut rewind_count = 0;
364    let mut additions = 0;
365    let mut retractions = 0;
366    while let Some(Ok((before_row, after_row))) = rows_iter.next() {
367        // Update metrics for updates/inserts/deletes
368        match (&before_row, &after_row) {
369            (None, None) => {}
370            (Some(_), Some(_)) => {
371                ctx.metrics.updates.inc();
372            }
373            (None, Some(_)) => {
374                ctx.metrics.inserts.inc();
375            }
376            (Some(_), None) => {
377                ctx.metrics.deletes.inc();
378            }
379        }
380
381        let updates = [
382            before_row.map(|r| (r, Diff::MINUS_ONE)),
383            after_row.map(|r| (r, Diff::ONE)),
384        ];
385        let gtid_str = format!("{new_gtid:?}");
386        for (binlog_row, diff) in updates.into_iter().flatten() {
387            let row = mysql_async::Row::try_from(binlog_row)?;
388            for (output, row_val) in outputs.iter().repeat_clone(row) {
389                let event = if !has_full_metadata && output.binlog_full_metadata {
390                    ctx.errored_outputs.insert(output.output_index);
391                    Err(DataflowError::from(DefiniteError::ValueDecodeError(
392                        format!(
393                            "Table {0} was created with binlog_row_metadata=FULL but binlog_row_metadata has since been set to a different value, meaning we cannot reliably decode the columns",
394                            output.table_name
395                        ),
396                    )))
397                } else {
398                    match pack_mysql_row(
399                        &mut final_row,
400                        row_val,
401                        &output.desc,
402                        Some(&gtid_str),
403                        output.binlog_full_metadata,
404                    ) {
405                        Ok(row) => Ok(SourceMessage {
406                            key: Row::default(),
407                            value: row,
408                            metadata: Row::default(),
409                        }),
410                        // Produce a DefiniteError in the stream for any rows that fail to decode
411                        Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
412                            DefiniteError::ValueDecodeError(err.to_string()),
413                        )),
414                        Err(err) => Err(err)?,
415                    }
416                };
417
418                let data = (output.output_index, event);
419
420                // Rewind this update if it was already present in the snapshot
421                if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
422                {
423                    if !rewind_req.snapshot_upper.less_equal(new_gtid) {
424                        rewind_count += 1;
425                        event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
426                    }
427                }
428                if diff.is_positive() {
429                    additions += 1;
430                } else {
431                    retractions += 1;
432                }
433                let update = (data, new_gtid.clone(), diff);
434                let size = update.fuel_size();
435                ctx.data_output.give_fueled(&gtid_cap, update, size).await;
436            }
437        }
438    }
439
440    // We want to emit data in individual pieces to allow timely to break large chunks of data into
441    // containers. Naively interleaving new data and rewinds in the loop above defeats a timely
442    // optimization that caches push buffers if the `.give()` time has not changed.
443    //
444    // Instead, we buffer rewind events into a reusable buffer, and emit all at once here at the end.
445
446    if !event_buffer.is_empty() {
447        for d in event_buffer.drain(..) {
448            let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
449            let size = d.fuel_size();
450            ctx.data_output.give_fueled(rewind_data_cap, d, size).await;
451        }
452    }
453
454    trace!(
455        %id,
456        "timely-{worker_id} sent updates for {new_gtid:?} \
457            with {} updates ({} additions, {} retractions) and {} \
458            rewinds",
459        additions + retractions,
460        additions,
461        retractions,
462        rewind_count,
463    );
464
465    Ok(())
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    fn table(schema: &str, name: &str) -> MySqlTableName {
473        MySqlTableName::new(schema, name)
474    }
475
476    fn parse_drop(
477        query: &str,
478        current_schema: &str,
479    ) -> Result<Vec<MySqlTableName>, TransientError> {
480        let mut tokens = query.split_ascii_whitespace();
481        let first = tokens.next();
482        let second = tokens.next();
483        match (
484            first.map(str::to_ascii_lowercase).as_deref(),
485            second.map(str::to_ascii_lowercase).as_deref(),
486        ) {
487            (Some("drop"), Some("table")) => drop_table_identifiers(current_schema, query),
488            _ => Ok(Vec::new()),
489        }
490    }
491
492    #[mz_ore::test]
493    fn table_ident_unqualified_uses_current_schema() {
494        assert_eq!(
495            table_ident("orders", "shop").unwrap(),
496            table("shop", "orders")
497        );
498    }
499
500    #[mz_ore::test]
501    fn table_ident_qualified_overrides_current_schema() {
502        assert_eq!(
503            table_ident("inventory.orders", "shop").unwrap(),
504            table("inventory", "orders"),
505        );
506    }
507
508    #[mz_ore::test]
509    fn table_ident_strips_backtick_quoting() {
510        assert_eq!(
511            table_ident("`inventory`.`orders`", "shop").unwrap(),
512            table("inventory", "orders"),
513        );
514    }
515
516    #[mz_ore::test]
517    fn drop_parses_single_unqualified_table() {
518        assert_eq!(
519            parse_drop("DROP TABLE orders", "shop").unwrap(),
520            vec![table("shop", "orders")],
521        );
522    }
523
524    #[mz_ore::test]
525    fn drop_parses_qualified_table() {
526        assert_eq!(
527            parse_drop("DROP TABLE inventory.orders", "shop").unwrap(),
528            vec![table("inventory", "orders")],
529        );
530    }
531
532    #[mz_ore::test]
533    fn drop_parses_backtick_quoted_table() {
534        assert_eq!(
535            parse_drop("DROP TABLE `inventory`.`orders`", "shop").unwrap(),
536            vec![table("inventory", "orders")],
537        );
538    }
539
540    #[mz_ore::test]
541    fn drop_parses_if_exists_clause() {
542        assert_eq!(
543            parse_drop("DROP TABLE IF EXISTS orders", "shop").unwrap(),
544            vec![table("shop", "orders")],
545        );
546        assert_eq!(
547            parse_drop("DROP TABLE if exists orders", "shop").unwrap(),
548            vec![table("shop", "orders")],
549        );
550    }
551
552    #[mz_ore::test]
553    fn drop_rejects_if_without_exists() {
554        assert!(parse_drop("DROP TABLE IF orders", "shop").is_err());
555    }
556
557    #[mz_ore::test]
558    fn drop_rejects_missing_table_name() {
559        assert!(parse_drop("DROP TABLE", "shop").is_err());
560    }
561
562    #[mz_ore::test]
563    fn drop_parses_multiple_space_separated_tables() {
564        assert_eq!(
565            parse_drop("DROP TABLE orders, customers, items", "shop").unwrap(),
566            vec![
567                table("shop", "orders"),
568                table("shop", "customers"),
569                table("shop", "items"),
570            ],
571        );
572    }
573
574    #[mz_ore::test]
575    fn drop_parses_comma_joined_tables_without_spaces() {
576        assert_eq!(
577            parse_drop("DROP TABLE `shop`.`orders`,`shop`.`customers`", "shop").unwrap(),
578            vec![table("shop", "orders"), table("shop", "customers")],
579        );
580    }
581
582    #[mz_ore::test]
583    fn drop_does_not_treat_comment_shaped_text_in_identifier_as_comment() {
584        assert_eq!(
585            parse_drop("DROP TABLE `tbl /* not a comment */`", "shop").unwrap(),
586            vec![table("shop", "tbl /* not a comment */")],
587        );
588    }
589
590    #[mz_ore::test]
591    fn drop_parses_table_with_restrict() {
592        assert_eq!(
593            parse_drop("DROP TABLE orders RESTRICT", "shop").unwrap(),
594            vec![table("shop", "orders")],
595        );
596    }
597
598    #[mz_ore::test]
599    fn drop_parses_multiple_tables_with_cascade() {
600        assert_eq!(
601            parse_drop("DROP TABLE orders, customers CASCADE", "shop").unwrap(),
602            vec![table("shop", "orders"), table("shop", "customers")],
603        );
604    }
605
606    #[mz_ore::test]
607    fn drop_of_multiple_statements_errors() {
608        // Defensive only: a real `Query_event` never packs two top-level
609        // statements together.
610        assert!(parse_drop("DROP TABLE orders; DROP TABLE customers", "shop").is_err());
611    }
612
613    #[mz_ore::test]
614    fn drop_temporary_table_is_not_detected() {
615        assert_eq!(
616            parse_drop("DROP TEMPORARY TABLE orders", "shop").unwrap(),
617            Vec::<MySqlTableName>::new(),
618        );
619    }
620
621    #[mz_ore::test]
622    fn drop_quoted_identifier_with_dot_is_a_single_table() {
623        assert_eq!(
624            parse_drop("DROP TABLE `weird.name`", "shop").unwrap(),
625            vec![table("shop", "weird.name")],
626        );
627    }
628
629    #[mz_ore::test]
630    fn drop_escaped_backtick_identifier_is_preserved() {
631        assert_eq!(
632            parse_drop("DROP TABLE `a``b`", "shop").unwrap(),
633            vec![table("shop", "a`b")],
634        );
635    }
636
637    #[mz_ore::test]
638    fn identifier_with_multiple_dots_errors() {
639        assert!(parse_drop("DROP TABLE def.shop.customer", "shop").is_err());
640    }
641
642    #[mz_ore::test]
643    fn ansi_quotes_parsed_properly() {
644        assert_eq!(
645            parse_drop("DROP TABLE \"customer\"", "shop").unwrap(),
646            vec![table("shop", "customer")],
647        );
648    }
649}