mz_storage/source/mysql/replication/
events.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use maplit::btreemap;
11use mysql_common::binlog::events::{QueryEvent, RowsEventData};
12use mz_mysql_util::{MySqlError, pack_mysql_row};
13use mz_ore::iter::IteratorExt;
14use mz_repr::{Diff, Row};
15use mz_storage_types::errors::DataflowError;
16use mz_storage_types::sources::mysql::GtidPartition;
17use timely::progress::Timestamp;
18use tracing::trace;
19
20use crate::source::types::SourceMessage;
21
22use super::super::schemas::verify_schemas;
23use super::super::{DefiniteError, MySqlTableName, TransientError};
24use super::context::ReplContext;
25
26/// Returns the MySqlTableName for the given table name referenced in a
27/// SQL statement, using the current schema if the table name is unqualified.
28fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
29    let stripped = name.replace('`', "");
30    let mut name_iter = stripped.split('.');
31    match (name_iter.next(), name_iter.next()) {
32        (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
33        (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
34        _ => Err(TransientError::Generic(anyhow::anyhow!(
35            "Invalid table name from QueryEvent: {}",
36            name
37        ))),
38    }
39}
40
41/// Handles QueryEvents from the MySQL replication stream. Since we only use
42/// row-based replication, we only expect to see QueryEvents for DDL changes.
43///
44/// From the MySQL docs: 'A Query_event is created for each query that modifies
45/// the database, unless the query is logged row-based.' This means that we can
46/// expect any DDL changes to be represented as QueryEvents, which we must parse
47/// to figure out if any of the tables we care about have been affected.
48///
49/// This function returns a bool to represent whether the event that was handled
50/// represents a 'complete' event that should cause the frontier to advance beyond
51/// the current GTID.
52pub(super) async fn handle_query_event(
53    event: QueryEvent<'_>,
54    ctx: &mut ReplContext<'_>,
55    new_gtid: &GtidPartition,
56) -> Result<bool, TransientError> {
57    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
58
59    let query = event.query();
60    let current_schema = event.schema();
61    let mut is_complete_event = false;
62
63    // MySQL does not permit transactional DDL, so luckily we don't need to
64    // worry about tracking BEGIN/COMMIT query events. We only need to look
65    // for DDL changes that affect the schema of the tables we care about.
66    let mut query_iter = query.split_ascii_whitespace();
67    let first = query_iter.next();
68    let second = query_iter.next();
69    match (
70        first.map(str::to_ascii_lowercase).as_deref(),
71        second.map(str::to_ascii_lowercase).as_deref(),
72    ) {
73        // Detect `ALTER TABLE <tbl>`, `RENAME TABLE <tbl>` statements
74        (Some("alter") | Some("rename"), Some("table")) => {
75            let table = table_ident(
76                query_iter.next().ok_or_else(|| {
77                    TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
78                })?,
79                &current_schema,
80            )?;
81            is_complete_event = true;
82            if ctx.table_info.contains_key(&table) {
83                trace!(%id, "timely-{worker_id} DDL change detected \
84                       for {table:?}");
85                let info = &ctx.table_info[&table];
86                let mut conn = ctx
87                    .connection_config
88                    .connect(
89                        &format!("timely-{worker_id} MySQL "),
90                        &ctx.config.config.connection_context.ssh_tunnel_manager,
91                    )
92                    .await?;
93                for (err_output, err) in
94                    verify_schemas(&mut *conn, btreemap! { &table => info }).await?
95                {
96                    trace!(%id, "timely-{worker_id} DDL change \
97                           verification error for {table:?}[{}]: {err:?}",
98                           err_output.output_index);
99                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
100                    ctx.data_output
101                        .give_fueled(
102                            &gtid_cap,
103                            (
104                                (err_output.output_index, Err(err.into())),
105                                new_gtid.clone(),
106                                Diff::ONE,
107                            ),
108                        )
109                        .await;
110                    ctx.errored_outputs.insert(err_output.output_index);
111                }
112            }
113        }
114        // Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements. Since
115        // this can drop multiple tables we just check all tables we care about
116        (Some("drop"), Some("table")) => {
117            let mut conn = ctx
118                .connection_config
119                .connect(
120                    &format!("timely-{worker_id} MySQL "),
121                    &ctx.config.config.connection_context.ssh_tunnel_manager,
122                )
123                .await?;
124            let expected = ctx
125                .table_info
126                .iter()
127                .map(|(t, info)| {
128                    (
129                        t,
130                        info.iter()
131                            .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
132                    )
133                })
134                .collect();
135            let schema_errors = verify_schemas(&mut *conn, expected).await?;
136            is_complete_event = true;
137            for (dropped_output, err) in schema_errors {
138                trace!(%id, "timely-{worker_id} DDL change \
139                           dropped output: {dropped_output:?}: {err:?}");
140                let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
141                ctx.data_output
142                    .give_fueled(
143                        &gtid_cap,
144                        (
145                            (dropped_output.output_index, Err(err.into())),
146                            new_gtid.clone(),
147                            Diff::ONE,
148                        ),
149                    )
150                    .await;
151                ctx.errored_outputs.insert(dropped_output.output_index);
152            }
153        }
154        // Detect `TRUNCATE [TABLE] <tbl>` statements
155        (Some("truncate"), Some(_)) => {
156            // We need the original un-lowercased version of 'second' since it might be a table ref
157            let second = second.expect("known to be Some");
158            let table = if second.eq_ignore_ascii_case("table") {
159                table_ident(
160                    query_iter.next().ok_or_else(|| {
161                        TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
162                    })?,
163                    &current_schema,
164                )?
165            } else {
166                table_ident(second, &current_schema)?
167            };
168            is_complete_event = true;
169            if ctx.table_info.contains_key(&table) {
170                trace!(%id, "timely-{worker_id} TRUNCATE detected \
171                       for {table:?}");
172                if let Some(info) = ctx.table_info.get(&table) {
173                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
174                    for output in info {
175                        ctx.data_output
176                            .give_fueled(
177                                &gtid_cap,
178                                (
179                                    (
180                                        output.output_index,
181                                        Err(DataflowError::from(DefiniteError::TableTruncated(
182                                            table.to_string(),
183                                        ))),
184                                    ),
185                                    new_gtid.clone(),
186                                    Diff::ONE,
187                                ),
188                            )
189                            .await;
190                        ctx.errored_outputs.insert(output.output_index);
191                    }
192                }
193            }
194        }
195        // Detect `COMMIT` statements which signify the end of a transaction on non-XA capable
196        // storage engines
197        (Some("commit"), None) => {
198            is_complete_event = true;
199        }
200        // Detect `CREATE TABLE <tbl>` statements which don't affect existing tables but do
201        // signify a complete event (e.g. for the purposes of advancing the GTID)
202        (Some("create"), Some("table")) => {
203            // CREATE TABLE ... SELECT will have subsequent `RowEvent`s, to account for this, the statement contains the clause "START TRANSACTION".
204            // https://dev.mysql.com/worklog/task/?id=13355
205
206            let mut peek_stream = query_iter.peekable();
207            let mut ctas = false;
208            while let Some(token) = peek_stream.next() {
209                if token.eq_ignore_ascii_case("start")
210                    && peek_stream
211                        .peek()
212                        .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
213                {
214                    ctas = true;
215                    break;
216                }
217            }
218            is_complete_event = !ctas;
219        }
220        _ => {}
221    }
222
223    Ok(is_complete_event)
224}
225
226/// Handles RowsEvents from the MySQL replication stream. These events contain
227/// insert/update/delete events for a single transaction or committed statement.
228///
229/// We use these events to update the dataflow with the new rows, and return a new
230/// frontier with which to advance the dataflow's progress.
231pub(super) async fn handle_rows_event(
232    event: RowsEventData<'_>,
233    ctx: &ReplContext<'_>,
234    new_gtid: &GtidPartition,
235    event_buffer: &mut Vec<(
236        (usize, Result<SourceMessage, DataflowError>),
237        GtidPartition,
238        mz_repr::Diff,
239    )>,
240) -> Result<(), TransientError> {
241    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
242
243    // Find the relevant table
244    let binlog_table_id = event.table_id();
245    let table_map_event = ctx
246        .stream
247        .get_ref()
248        .get_tme(binlog_table_id)
249        .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
250    let table = MySqlTableName::new(
251        &*table_map_event.database_name(),
252        &*table_map_event.table_name(),
253    );
254
255    let outputs = ctx.table_info.get(&table).map(|outputs| {
256        outputs
257            .into_iter()
258            .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
259            .collect::<Vec<_>>()
260    });
261    let outputs = match outputs {
262        Some(outputs) => outputs,
263        None => {
264            // We don't know about this table, or there are no un-errored outputs for it.
265            return Ok(());
266        }
267    };
268
269    trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
270
271    // Capability for this event.
272    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
273
274    // Iterate over the rows in this RowsEvent. Each row is a pair of 'before_row', 'after_row',
275    // to accomodate for updates and deletes (which include a before_row),
276    // and updates and inserts (which inclued an after row).
277    let mut final_row = Row::default();
278    let mut rows_iter = event.rows(table_map_event);
279    let mut rewind_count = 0;
280    let mut additions = 0;
281    let mut retractions = 0;
282    while let Some(Ok((before_row, after_row))) = rows_iter.next() {
283        // Update metrics for updates/inserts/deletes
284        match (&before_row, &after_row) {
285            (None, None) => {}
286            (Some(_), Some(_)) => {
287                ctx.metrics.updates.inc();
288            }
289            (None, Some(_)) => {
290                ctx.metrics.inserts.inc();
291            }
292            (Some(_), None) => {
293                ctx.metrics.deletes.inc();
294            }
295        }
296
297        let updates = [
298            before_row.map(|r| (r, Diff::MINUS_ONE)),
299            after_row.map(|r| (r, Diff::ONE)),
300        ];
301        for (binlog_row, diff) in updates.into_iter().flatten() {
302            let row = mysql_async::Row::try_from(binlog_row)?;
303            for (output, row_val) in outputs.iter().repeat_clone(row) {
304                let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) {
305                    Ok(row) => Ok(SourceMessage {
306                        key: Row::default(),
307                        value: row,
308                        metadata: Row::default(),
309                    }),
310                    // Produce a DefiniteError in the stream for any rows that fail to decode
311                    Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
312                        DefiniteError::ValueDecodeError(err.to_string()),
313                    )),
314                    Err(err) => Err(err)?,
315                };
316
317                let data = (output.output_index, event);
318
319                // Rewind this update if it was already present in the snapshot
320                if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
321                {
322                    if !rewind_req.snapshot_upper.less_equal(new_gtid) {
323                        rewind_count += 1;
324                        event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
325                    }
326                }
327                if diff.is_positive() {
328                    additions += 1;
329                } else {
330                    retractions += 1;
331                }
332                ctx.data_output
333                    .give_fueled(&gtid_cap, (data, new_gtid.clone(), diff))
334                    .await;
335            }
336        }
337    }
338
339    // We want to emit data in individual pieces to allow timely to break large chunks of data into
340    // containers. Naively interleaving new data and rewinds in the loop above defeats a timely
341    // optimization that caches push buffers if the `.give()` time has not changed.
342    //
343    // Instead, we buffer rewind events into a reusable buffer, and emit all at once here at the end.
344
345    if !event_buffer.is_empty() {
346        for d in event_buffer.drain(..) {
347            let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
348            ctx.data_output.give_fueled(rewind_data_cap, d).await;
349        }
350    }
351
352    trace!(
353        %id,
354        "timely-{worker_id} sent updates for {new_gtid:?} \
355            with {} updates ({} additions, {} retractions) and {} \
356            rewinds",
357        additions + retractions,
358        additions,
359        retractions,
360        rewind_count,
361    );
362
363    Ok(())
364}