Skip to main content

mz_storage/source/mysql/replication/
events.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use maplit::btreemap;
11use mysql_common::binlog::events::{QueryEvent, RowsEventData};
12use mz_mysql_util::{MySqlError, pack_mysql_row};
13use mz_ore::iter::IteratorExt;
14use mz_repr::{Diff, Row};
15use mz_storage_types::errors::DataflowError;
16use mz_storage_types::sources::mysql::GtidPartition;
17use timely::progress::Timestamp;
18use tracing::trace;
19
20use crate::source::types::{FuelSize, SourceMessage};
21
22use super::super::schemas::verify_schemas;
23use super::super::{DefiniteError, MySqlTableName, TransientError};
24use super::context::ReplContext;
25
26/// Returns the MySqlTableName for the given table name referenced in a
27/// SQL statement, using the current schema if the table name is unqualified.
28fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
29    let stripped = name.replace('`', "");
30    let mut name_iter = stripped.split('.');
31    match (name_iter.next(), name_iter.next()) {
32        (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
33        (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
34        _ => Err(TransientError::Generic(anyhow::anyhow!(
35            "Invalid table name from QueryEvent: {}",
36            name
37        ))),
38    }
39}
40
41/// Handles QueryEvents from the MySQL replication stream. Since we only use
42/// row-based replication, we only expect to see QueryEvents for DDL changes.
43///
44/// From the MySQL docs: 'A Query_event is created for each query that modifies
45/// the database, unless the query is logged row-based.' This means that we can
46/// expect any DDL changes to be represented as QueryEvents, which we must parse
47/// to figure out if any of the tables we care about have been affected.
48///
49/// This function returns a bool to represent whether the event that was handled
50/// represents a 'complete' event that should cause the frontier to advance beyond
51/// the current GTID.
52pub(super) async fn handle_query_event(
53    event: QueryEvent<'_>,
54    ctx: &mut ReplContext<'_>,
55    new_gtid: &GtidPartition,
56) -> Result<bool, TransientError> {
57    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
58
59    let query = event.query();
60    let current_schema = event.schema();
61    let mut is_complete_event = false;
62
63    // MySQL does not permit transactional DDL, so luckily we don't need to
64    // worry about tracking BEGIN/COMMIT query events. We only need to look
65    // for DDL changes that affect the schema of the tables we care about.
66    let mut query_iter = query.split_ascii_whitespace();
67    let first = query_iter.next();
68    let second = query_iter.next();
69    match (
70        first.map(str::to_ascii_lowercase).as_deref(),
71        second.map(str::to_ascii_lowercase).as_deref(),
72    ) {
73        // Detect `ALTER TABLE <tbl>`, `RENAME TABLE <tbl>` statements
74        (Some("alter") | Some("rename"), Some("table")) => {
75            let table = table_ident(
76                query_iter.next().ok_or_else(|| {
77                    TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
78                })?,
79                &current_schema,
80            )?;
81            is_complete_event = true;
82            if ctx.table_info.contains_key(&table) {
83                trace!(%id, "timely-{worker_id} DDL change detected \
84                       for {table:?}");
85                let info = &ctx.table_info[&table];
86                let mut conn = ctx
87                    .connection_config
88                    .connect(
89                        &format!("timely-{worker_id} MySQL "),
90                        &ctx.config.config.connection_context.ssh_tunnel_manager,
91                    )
92                    .await?;
93                for (err_output, err) in
94                    verify_schemas(&mut *conn, btreemap! { &table => info }).await?
95                {
96                    tracing::warn!(%id, "timely-{worker_id} DDL change \
97                           verification error for {table:?}[{}]: {err:?}",
98                           err_output.output_index);
99                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
100                    let update = (
101                        (err_output.output_index, Err(err.into())),
102                        new_gtid.clone(),
103                        Diff::ONE,
104                    );
105                    let size = update.fuel_size();
106                    ctx.data_output.give_fueled(&gtid_cap, update, size).await;
107                    ctx.errored_outputs.insert(err_output.output_index);
108                }
109            }
110        }
111        // Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements. Since
112        // this can drop multiple tables we just check all tables we care about
113        (Some("drop"), Some("table")) => {
114            let mut conn = ctx
115                .connection_config
116                .connect(
117                    &format!("timely-{worker_id} MySQL "),
118                    &ctx.config.config.connection_context.ssh_tunnel_manager,
119                )
120                .await?;
121            let expected = ctx
122                .table_info
123                .iter()
124                .map(|(t, info)| {
125                    (
126                        t,
127                        info.iter()
128                            .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
129                    )
130                })
131                .collect();
132            let schema_errors = verify_schemas(&mut *conn, expected).await?;
133            is_complete_event = true;
134            for (dropped_output, err) in schema_errors {
135                tracing::info!(%id, "timely-{worker_id} DDL change \
136                           dropped output: {dropped_output:?}: {err:?}");
137                let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
138                let update = (
139                    (dropped_output.output_index, Err(err.into())),
140                    new_gtid.clone(),
141                    Diff::ONE,
142                );
143                let size = std::mem::size_of_val(&update);
144                ctx.data_output.give_fueled(&gtid_cap, update, size).await;
145                ctx.errored_outputs.insert(dropped_output.output_index);
146            }
147        }
148        // Detect `TRUNCATE [TABLE] <tbl>` statements
149        (Some("truncate"), Some(_)) => {
150            // We need the original un-lowercased version of 'second' since it might be a table ref
151            let second = second.expect("known to be Some");
152            let table = if second.eq_ignore_ascii_case("table") {
153                table_ident(
154                    query_iter.next().ok_or_else(|| {
155                        TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
156                    })?,
157                    &current_schema,
158                )?
159            } else {
160                table_ident(second, &current_schema)?
161            };
162            is_complete_event = true;
163            if ctx.table_info.contains_key(&table) {
164                trace!(%id, "timely-{worker_id} TRUNCATE detected \
165                       for {table:?}");
166                if let Some(info) = ctx.table_info.get(&table) {
167                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
168                    for output in info {
169                        let update = (
170                            (
171                                output.output_index,
172                                Err(DataflowError::from(DefiniteError::TableTruncated(
173                                    table.to_string(),
174                                ))),
175                            ),
176                            new_gtid.clone(),
177                            Diff::ONE,
178                        );
179                        let size = update.fuel_size();
180                        ctx.data_output.give_fueled(&gtid_cap, update, size).await;
181                        ctx.errored_outputs.insert(output.output_index);
182                    }
183                }
184            }
185        }
186        // Detect `COMMIT` statements which signify the end of a transaction on non-XA capable
187        // storage engines
188        (Some("commit"), None) => {
189            is_complete_event = true;
190        }
191        // Detect `CREATE TABLE <tbl>` statements which don't affect existing tables but do
192        // signify a complete event (e.g. for the purposes of advancing the GTID)
193        (Some("create"), Some("table")) => {
194            // CREATE TABLE ... SELECT will have subsequent `RowEvent`s, to account for this, the statement contains the clause "START TRANSACTION".
195            // https://dev.mysql.com/worklog/task/?id=13355
196
197            let mut peek_stream = query_iter.peekable();
198            let mut ctas = false;
199            while let Some(token) = peek_stream.next() {
200                if token.eq_ignore_ascii_case("start")
201                    && peek_stream
202                        .peek()
203                        .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
204                {
205                    ctas = true;
206                    break;
207                }
208            }
209            is_complete_event = !ctas;
210        }
211        _ => {}
212    }
213
214    Ok(is_complete_event)
215}
216
217/// Handles RowsEvents from the MySQL replication stream. These events contain
218/// insert/update/delete events for a single transaction or committed statement.
219///
220/// We use these events to update the dataflow with the new rows, and return a new
221/// frontier with which to advance the dataflow's progress.
222pub(super) async fn handle_rows_event(
223    event: RowsEventData<'_>,
224    ctx: &ReplContext<'_>,
225    new_gtid: &GtidPartition,
226    event_buffer: &mut Vec<(
227        (usize, Result<SourceMessage, DataflowError>),
228        GtidPartition,
229        mz_repr::Diff,
230    )>,
231) -> Result<(), TransientError> {
232    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
233
234    // Find the relevant table
235    let binlog_table_id = event.table_id();
236    let table_map_event = ctx
237        .stream
238        .get_ref()
239        .get_tme(binlog_table_id)
240        .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
241    let table = MySqlTableName::new(
242        &*table_map_event.database_name(),
243        &*table_map_event.table_name(),
244    );
245
246    let outputs = ctx.table_info.get(&table).map(|outputs| {
247        outputs
248            .into_iter()
249            .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
250            .collect::<Vec<_>>()
251    });
252    let outputs = match outputs {
253        Some(outputs) => outputs,
254        None => {
255            // We don't know about this table, or there are no un-errored outputs for it.
256            return Ok(());
257        }
258    };
259
260    trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
261
262    // Capability for this event.
263    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
264
265    // Iterate over the rows in this RowsEvent. Each row is a pair of 'before_row', 'after_row',
266    // to accomodate for updates and deletes (which include a before_row),
267    // and updates and inserts (which inclued an after row).
268    let mut final_row = Row::default();
269    let mut rows_iter = event.rows(table_map_event);
270    let mut rewind_count = 0;
271    let mut additions = 0;
272    let mut retractions = 0;
273    while let Some(Ok((before_row, after_row))) = rows_iter.next() {
274        // Update metrics for updates/inserts/deletes
275        match (&before_row, &after_row) {
276            (None, None) => {}
277            (Some(_), Some(_)) => {
278                ctx.metrics.updates.inc();
279            }
280            (None, Some(_)) => {
281                ctx.metrics.inserts.inc();
282            }
283            (Some(_), None) => {
284                ctx.metrics.deletes.inc();
285            }
286        }
287
288        let updates = [
289            before_row.map(|r| (r, Diff::MINUS_ONE)),
290            after_row.map(|r| (r, Diff::ONE)),
291        ];
292        for (binlog_row, diff) in updates.into_iter().flatten() {
293            let row = mysql_async::Row::try_from(binlog_row)?;
294            for (output, row_val) in outputs.iter().repeat_clone(row) {
295                let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) {
296                    Ok(row) => Ok(SourceMessage {
297                        key: Row::default(),
298                        value: row,
299                        metadata: Row::default(),
300                    }),
301                    // Produce a DefiniteError in the stream for any rows that fail to decode
302                    Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
303                        DefiniteError::ValueDecodeError(err.to_string()),
304                    )),
305                    Err(err) => Err(err)?,
306                };
307
308                let data = (output.output_index, event);
309
310                // Rewind this update if it was already present in the snapshot
311                if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
312                {
313                    if !rewind_req.snapshot_upper.less_equal(new_gtid) {
314                        rewind_count += 1;
315                        event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
316                    }
317                }
318                if diff.is_positive() {
319                    additions += 1;
320                } else {
321                    retractions += 1;
322                }
323                let update = (data, new_gtid.clone(), diff);
324                let size = update.fuel_size();
325                ctx.data_output.give_fueled(&gtid_cap, update, size).await;
326            }
327        }
328    }
329
330    // We want to emit data in individual pieces to allow timely to break large chunks of data into
331    // containers. Naively interleaving new data and rewinds in the loop above defeats a timely
332    // optimization that caches push buffers if the `.give()` time has not changed.
333    //
334    // Instead, we buffer rewind events into a reusable buffer, and emit all at once here at the end.
335
336    if !event_buffer.is_empty() {
337        for d in event_buffer.drain(..) {
338            let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
339            let size = d.fuel_size();
340            ctx.data_output.give_fueled(rewind_data_cap, d, size).await;
341        }
342    }
343
344    trace!(
345        %id,
346        "timely-{worker_id} sent updates for {new_gtid:?} \
347            with {} updates ({} additions, {} retractions) and {} \
348            rewinds",
349        additions + retractions,
350        additions,
351        retractions,
352        rewind_count,
353    );
354
355    Ok(())
356}