Skip to main content

mz_storage/source/mysql/replication/
events.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use maplit::btreemap;
11use mysql_async::binlog::events::OptionalMetaExtractor;
12use mysql_common::binlog::events::{QueryEvent, RowsEventData};
13use mz_mysql_util::{MySqlError, pack_mysql_row};
14use mz_ore::iter::IteratorExt;
15use mz_repr::{Diff, Row};
16use mz_storage_types::errors::DataflowError;
17use mz_storage_types::sources::mysql::GtidPartition;
18use timely::progress::Timestamp;
19use tracing::trace;
20
21use crate::source::types::{FuelSize, SourceMessage};
22
23use super::super::schemas::verify_schemas;
24use super::super::{DefiniteError, MySqlTableName, TransientError};
25use super::context::ReplContext;
26
27/// Returns the MySqlTableName for the given table name referenced in a
28/// SQL statement, using the current schema if the table name is unqualified.
29fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
30    let stripped = name.replace('`', "");
31    let mut name_iter = stripped.split('.');
32    match (name_iter.next(), name_iter.next()) {
33        (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
34        (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
35        _ => Err(TransientError::Generic(anyhow::anyhow!(
36            "Invalid table name from QueryEvent: {}",
37            name
38        ))),
39    }
40}
41
42/// Handles QueryEvents from the MySQL replication stream. Since we only use
43/// row-based replication, we only expect to see QueryEvents for DDL changes.
44///
45/// From the MySQL docs: 'A Query_event is created for each query that modifies
46/// the database, unless the query is logged row-based.' This means that we can
47/// expect any DDL changes to be represented as QueryEvents, which we must parse
48/// to figure out if any of the tables we care about have been affected.
49///
50/// This function returns a bool to represent whether the event that was handled
51/// represents a 'complete' event that should cause the frontier to advance beyond
52/// the current GTID.
53pub(super) async fn handle_query_event(
54    event: QueryEvent<'_>,
55    ctx: &mut ReplContext<'_>,
56    new_gtid: &GtidPartition,
57) -> Result<bool, TransientError> {
58    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
59
60    let query = event.query();
61    let current_schema = event.schema();
62    let mut is_complete_event = false;
63
64    // MySQL does not permit transactional DDL, so luckily we don't need to
65    // worry about tracking BEGIN/COMMIT query events. We only need to look
66    // for DDL changes that affect the schema of the tables we care about.
67    let mut query_iter = query.split_ascii_whitespace();
68    let first = query_iter.next();
69    let second = query_iter.next();
70    match (
71        first.map(str::to_ascii_lowercase).as_deref(),
72        second.map(str::to_ascii_lowercase).as_deref(),
73    ) {
74        // Detect `ALTER TABLE <tbl>`, `RENAME TABLE <tbl>` statements
75        (Some("alter") | Some("rename"), Some("table")) => {
76            let table = table_ident(
77                query_iter.next().ok_or_else(|| {
78                    TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
79                })?,
80                &current_schema,
81            )?;
82            is_complete_event = true;
83            if ctx.table_info.contains_key(&table) {
84                trace!(%id, "timely-{worker_id} DDL change detected \
85                       for {table:?}");
86                let info = &ctx.table_info[&table];
87                let mut conn = ctx
88                    .connection_config
89                    .connect(
90                        &format!("timely-{worker_id} MySQL "),
91                        &ctx.config.config.connection_context.ssh_tunnel_manager,
92                    )
93                    .await?;
94                for (err_output, err) in
95                    verify_schemas(&mut *conn, btreemap! { &table => info }).await?
96                {
97                    tracing::warn!(%id, "timely-{worker_id} DDL change \
98                           verification error for {table:?}[{}]: {err:?}",
99                           err_output.output_index);
100                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
101                    let update = (
102                        (err_output.output_index, Err(err.into())),
103                        new_gtid.clone(),
104                        Diff::ONE,
105                    );
106                    let size = update.fuel_size();
107                    ctx.data_output.give_fueled(&gtid_cap, update, size).await;
108                    ctx.errored_outputs.insert(err_output.output_index);
109                }
110            }
111        }
112        // Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements. Since
113        // this can drop multiple tables we just check all tables we care about
114        (Some("drop"), Some("table")) => {
115            let mut conn = ctx
116                .connection_config
117                .connect(
118                    &format!("timely-{worker_id} MySQL "),
119                    &ctx.config.config.connection_context.ssh_tunnel_manager,
120                )
121                .await?;
122            let expected = ctx
123                .table_info
124                .iter()
125                .map(|(t, info)| {
126                    (
127                        t,
128                        info.iter()
129                            .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
130                    )
131                })
132                .collect();
133            let schema_errors = verify_schemas(&mut *conn, expected).await?;
134            is_complete_event = true;
135            for (dropped_output, err) in schema_errors {
136                tracing::info!(%id, "timely-{worker_id} DDL change \
137                           dropped output: {dropped_output:?}: {err:?}");
138                let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
139                let update = (
140                    (dropped_output.output_index, Err(err.into())),
141                    new_gtid.clone(),
142                    Diff::ONE,
143                );
144                let size = std::mem::size_of_val(&update);
145                ctx.data_output.give_fueled(&gtid_cap, update, size).await;
146                ctx.errored_outputs.insert(dropped_output.output_index);
147            }
148        }
149        // Detect `TRUNCATE [TABLE] <tbl>` statements
150        (Some("truncate"), Some(_)) => {
151            // We need the original un-lowercased version of 'second' since it might be a table ref
152            let second = second.expect("known to be Some");
153            let table = if second.eq_ignore_ascii_case("table") {
154                table_ident(
155                    query_iter.next().ok_or_else(|| {
156                        TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
157                    })?,
158                    &current_schema,
159                )?
160            } else {
161                table_ident(second, &current_schema)?
162            };
163            is_complete_event = true;
164            if ctx.table_info.contains_key(&table) {
165                trace!(%id, "timely-{worker_id} TRUNCATE detected \
166                       for {table:?}");
167                if let Some(info) = ctx.table_info.get(&table) {
168                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
169                    for output in info {
170                        let update = (
171                            (
172                                output.output_index,
173                                Err(DataflowError::from(DefiniteError::TableTruncated(
174                                    table.to_string(),
175                                ))),
176                            ),
177                            new_gtid.clone(),
178                            Diff::ONE,
179                        );
180                        let size = update.fuel_size();
181                        ctx.data_output.give_fueled(&gtid_cap, update, size).await;
182                        ctx.errored_outputs.insert(output.output_index);
183                    }
184                }
185            }
186        }
187        // Detect `COMMIT` statements which signify the end of a transaction on non-XA capable
188        // storage engines
189        (Some("commit"), None) => {
190            is_complete_event = true;
191        }
192        // Detect `CREATE TABLE <tbl>` statements which don't affect existing tables but do
193        // signify a complete event (e.g. for the purposes of advancing the GTID)
194        (Some("create"), Some("table")) => {
195            // CREATE TABLE ... SELECT will have subsequent `RowEvent`s, to account for this, the statement contains the clause "START TRANSACTION".
196            // https://dev.mysql.com/worklog/task/?id=13355
197
198            let mut peek_stream = query_iter.peekable();
199            let mut ctas = false;
200            while let Some(token) = peek_stream.next() {
201                if token.eq_ignore_ascii_case("start")
202                    && peek_stream
203                        .peek()
204                        .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
205                {
206                    ctas = true;
207                    break;
208                }
209            }
210            is_complete_event = !ctas;
211        }
212        _ => {}
213    }
214
215    Ok(is_complete_event)
216}
217
218/// Handles RowsEvents from the MySQL replication stream. These events contain
219/// insert/update/delete events for a single transaction or committed statement.
220///
221/// We use these events to update the dataflow with the new rows, and return a new
222/// frontier with which to advance the dataflow's progress.
223pub(super) async fn handle_rows_event(
224    event: RowsEventData<'_>,
225    ctx: &mut ReplContext<'_>,
226    new_gtid: &GtidPartition,
227    event_buffer: &mut Vec<(
228        (usize, Result<SourceMessage, DataflowError>),
229        GtidPartition,
230        mz_repr::Diff,
231    )>,
232) -> Result<(), TransientError> {
233    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
234
235    // Find the relevant table
236    let binlog_table_id = event.table_id();
237    let table_map_event = ctx
238        .stream
239        .get_ref()
240        .get_tme(binlog_table_id)
241        .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
242    let table = MySqlTableName::new(
243        &*table_map_event.database_name(),
244        &*table_map_event.table_name(),
245    );
246
247    let outputs = ctx.table_info.get(&table).map(|outputs| {
248        outputs
249            .into_iter()
250            .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
251            .collect::<Vec<_>>()
252    });
253    let outputs = match outputs {
254        Some(outputs) => outputs,
255        None => {
256            // We don't know about this table, or there are no un-errored outputs for it.
257            return Ok(());
258        }
259    };
260
261    trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
262
263    // Capability for this event.
264    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
265
266    // We can check here if the binlog has full row metadata by looking at the column name optional
267    // metadata, which is only present if full metadata is enabled.
268    let optional_metadata = OptionalMetaExtractor::new(table_map_event.iter_optional_meta())?;
269    let has_full_metadata = optional_metadata.iter_column_name().next().is_some();
270
271    // Iterate over the rows in this RowsEvent. Each row is a pair of 'before_row', 'after_row',
272    // to accomodate for updates and deletes (which include a before_row),
273    // and updates and inserts (which inclued an after row).
274    let mut final_row = Row::default();
275    let mut rows_iter = event.rows(table_map_event);
276    let mut rewind_count = 0;
277    let mut additions = 0;
278    let mut retractions = 0;
279    while let Some(Ok((before_row, after_row))) = rows_iter.next() {
280        // Update metrics for updates/inserts/deletes
281        match (&before_row, &after_row) {
282            (None, None) => {}
283            (Some(_), Some(_)) => {
284                ctx.metrics.updates.inc();
285            }
286            (None, Some(_)) => {
287                ctx.metrics.inserts.inc();
288            }
289            (Some(_), None) => {
290                ctx.metrics.deletes.inc();
291            }
292        }
293
294        let updates = [
295            before_row.map(|r| (r, Diff::MINUS_ONE)),
296            after_row.map(|r| (r, Diff::ONE)),
297        ];
298        let gtid_str = format!("{new_gtid:?}");
299        for (binlog_row, diff) in updates.into_iter().flatten() {
300            let row = mysql_async::Row::try_from(binlog_row)?;
301            for (output, row_val) in outputs.iter().repeat_clone(row) {
302                let event = if !has_full_metadata && output.binlog_full_metadata {
303                    ctx.errored_outputs.insert(output.output_index);
304                    Err(DataflowError::from(DefiniteError::ValueDecodeError(
305                        format!(
306                            "Table {0} was created with binlog_row_metadata=FULL but binlog_row_metadata has since been set to a different value, meaning we cannot reliably decode the columns",
307                            output.table_name
308                        ),
309                    )))
310                } else {
311                    match pack_mysql_row(
312                        &mut final_row,
313                        row_val,
314                        &output.desc,
315                        Some(&gtid_str),
316                        output.binlog_full_metadata,
317                    ) {
318                        Ok(row) => Ok(SourceMessage {
319                            key: Row::default(),
320                            value: row,
321                            metadata: Row::default(),
322                        }),
323                        // Produce a DefiniteError in the stream for any rows that fail to decode
324                        Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
325                            DefiniteError::ValueDecodeError(err.to_string()),
326                        )),
327                        Err(err) => Err(err)?,
328                    }
329                };
330
331                let data = (output.output_index, event);
332
333                // Rewind this update if it was already present in the snapshot
334                if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
335                {
336                    if !rewind_req.snapshot_upper.less_equal(new_gtid) {
337                        rewind_count += 1;
338                        event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
339                    }
340                }
341                if diff.is_positive() {
342                    additions += 1;
343                } else {
344                    retractions += 1;
345                }
346                let update = (data, new_gtid.clone(), diff);
347                let size = update.fuel_size();
348                ctx.data_output.give_fueled(&gtid_cap, update, size).await;
349            }
350        }
351    }
352
353    // We want to emit data in individual pieces to allow timely to break large chunks of data into
354    // containers. Naively interleaving new data and rewinds in the loop above defeats a timely
355    // optimization that caches push buffers if the `.give()` time has not changed.
356    //
357    // Instead, we buffer rewind events into a reusable buffer, and emit all at once here at the end.
358
359    if !event_buffer.is_empty() {
360        for d in event_buffer.drain(..) {
361            let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
362            let size = d.fuel_size();
363            ctx.data_output.give_fueled(rewind_data_cap, d, size).await;
364        }
365    }
366
367    trace!(
368        %id,
369        "timely-{worker_id} sent updates for {new_gtid:?} \
370            with {} updates ({} additions, {} retractions) and {} \
371            rewinds",
372        additions + retractions,
373        additions,
374        retractions,
375        rewind_count,
376    );
377
378    Ok(())
379}