mz_storage/source/mysql/replication/
events.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use maplit::btreemap;
11use mysql_common::binlog::events::{QueryEvent, RowsEventData};
12use mz_mysql_util::{MySqlError, pack_mysql_row};
13use mz_ore::iter::IteratorExt;
14use mz_repr::{Diff, Row};
15use mz_storage_types::errors::DataflowError;
16use mz_storage_types::sources::mysql::GtidPartition;
17use timely::progress::Timestamp;
18use tracing::trace;
19
20use crate::source::types::SourceMessage;
21
22use super::super::schemas::verify_schemas;
23use super::super::{DefiniteError, MySqlTableName, TransientError};
24use super::context::ReplContext;
25
26/// Returns the MySqlTableName for the given table name referenced in a
27/// SQL statement, using the current schema if the table name is unqualified.
28fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
29    let stripped = name.replace('`', "");
30    let mut name_iter = stripped.split('.');
31    match (name_iter.next(), name_iter.next()) {
32        (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
33        (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
34        _ => Err(TransientError::Generic(anyhow::anyhow!(
35            "Invalid table name from QueryEvent: {}",
36            name
37        ))),
38    }
39}
40
41/// Handles QueryEvents from the MySQL replication stream. Since we only use
42/// row-based replication, we only expect to see QueryEvents for DDL changes.
43///
44/// From the MySQL docs: 'A Query_event is created for each query that modifies
45/// the database, unless the query is logged row-based.' This means that we can
46/// expect any DDL changes to be represented as QueryEvents, which we must parse
47/// to figure out if any of the tables we care about have been affected.
48///
49/// This function returns a bool to represent whether the event that was handled
50/// represents a 'complete' event that should cause the frontier to advance beyond
51/// the current GTID.
52pub(super) async fn handle_query_event(
53    event: QueryEvent<'_>,
54    ctx: &mut ReplContext<'_>,
55    new_gtid: &GtidPartition,
56) -> Result<bool, TransientError> {
57    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
58
59    let query = event.query();
60    let current_schema = event.schema();
61    let mut is_complete_event = false;
62
63    // MySQL does not permit transactional DDL, so luckily we don't need to
64    // worry about tracking BEGIN/COMMIT query events. We only need to look
65    // for DDL changes that affect the schema of the tables we care about.
66    let mut query_iter = query.split_ascii_whitespace();
67    let first = query_iter.next();
68    let second = query_iter.next();
69    match (
70        first.map(str::to_ascii_lowercase).as_deref(),
71        second.map(str::to_ascii_lowercase).as_deref(),
72    ) {
73        // Detect `ALTER TABLE <tbl>`, `RENAME TABLE <tbl>` statements
74        (Some("alter") | Some("rename"), Some("table")) => {
75            let table = table_ident(
76                query_iter.next().ok_or_else(|| {
77                    TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
78                })?,
79                &current_schema,
80            )?;
81            is_complete_event = true;
82            if ctx.table_info.contains_key(&table) {
83                trace!(%id, "timely-{worker_id} DDL change detected \
84                       for {table:?}");
85                let info = &ctx.table_info[&table];
86                let mut conn = ctx
87                    .connection_config
88                    .connect(
89                        &format!("timely-{worker_id} MySQL "),
90                        &ctx.config.config.connection_context.ssh_tunnel_manager,
91                    )
92                    .await?;
93                for (err_output, err) in
94                    verify_schemas(&mut *conn, btreemap! { &table => info }).await?
95                {
96                    trace!(%id, "timely-{worker_id} DDL change \
97                           verification error for {table:?}[{}]: {err:?}",
98                           err_output.output_index);
99                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
100                    ctx.data_output
101                        .give_fueled(
102                            &gtid_cap,
103                            (
104                                (err_output.output_index, Err(err.into())),
105                                new_gtid.clone(),
106                                Diff::ONE,
107                            ),
108                        )
109                        .await;
110                    ctx.errored_outputs.insert(err_output.output_index);
111                }
112            }
113        }
114        // Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements. Since
115        // this can drop multiple tables we just check all tables we care about
116        (Some("drop"), Some("table")) => {
117            let mut conn = ctx
118                .connection_config
119                .connect(
120                    &format!("timely-{worker_id} MySQL "),
121                    &ctx.config.config.connection_context.ssh_tunnel_manager,
122                )
123                .await?;
124            let expected = ctx
125                .table_info
126                .iter()
127                .map(|(t, info)| {
128                    (
129                        t,
130                        info.iter()
131                            .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
132                    )
133                })
134                .collect();
135            let schema_errors = verify_schemas(&mut *conn, expected).await?;
136            is_complete_event = true;
137            for (dropped_output, err) in schema_errors {
138                trace!(%id, "timely-{worker_id} DDL change \
139                           dropped output: {dropped_output:?}: {err:?}");
140                let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
141                ctx.data_output
142                    .give_fueled(
143                        &gtid_cap,
144                        (
145                            (dropped_output.output_index, Err(err.into())),
146                            new_gtid.clone(),
147                            Diff::ONE,
148                        ),
149                    )
150                    .await;
151                ctx.errored_outputs.insert(dropped_output.output_index);
152            }
153        }
154        // Detect `TRUNCATE [TABLE] <tbl>` statements
155        (Some("truncate"), Some(_)) => {
156            // We need the original un-lowercased version of 'second' since it might be a table ref
157            let second = second.expect("known to be Some");
158            let table = if second.eq_ignore_ascii_case("table") {
159                table_ident(
160                    query_iter.next().ok_or_else(|| {
161                        TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
162                    })?,
163                    &current_schema,
164                )?
165            } else {
166                table_ident(second, &current_schema)?
167            };
168            is_complete_event = true;
169            if ctx.table_info.contains_key(&table) {
170                trace!(%id, "timely-{worker_id} TRUNCATE detected \
171                       for {table:?}");
172                if let Some(info) = ctx.table_info.get(&table) {
173                    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
174                    for output in info {
175                        ctx.data_output
176                            .give_fueled(
177                                &gtid_cap,
178                                (
179                                    (
180                                        output.output_index,
181                                        Err(DataflowError::from(DefiniteError::TableTruncated(
182                                            table.to_string(),
183                                        ))),
184                                    ),
185                                    new_gtid.clone(),
186                                    Diff::ONE,
187                                ),
188                            )
189                            .await;
190                        ctx.errored_outputs.insert(output.output_index);
191                    }
192                }
193            }
194        }
195        // Detect `COMMIT` statements which signify the end of a transaction on non-XA capable
196        // storage engines
197        (Some("commit"), None) => {
198            is_complete_event = true;
199        }
200        // Detect `CREATE TABLE <tbl>` statements which don't affect existing tables but do
201        // signify a complete event (e.g. for the purposes of advancing the GTID)
202        (Some("create"), Some("table")) => {
203            is_complete_event = true;
204        }
205        _ => {}
206    }
207
208    Ok(is_complete_event)
209}
210
211/// Handles RowsEvents from the MySQL replication stream. These events contain
212/// insert/update/delete events for a single transaction or committed statement.
213///
214/// We use these events to update the dataflow with the new rows, and return a new
215/// frontier with which to advance the dataflow's progress.
216pub(super) async fn handle_rows_event(
217    event: RowsEventData<'_>,
218    ctx: &ReplContext<'_>,
219    new_gtid: &GtidPartition,
220    event_buffer: &mut Vec<(
221        (usize, Result<SourceMessage, DataflowError>),
222        GtidPartition,
223        mz_repr::Diff,
224    )>,
225) -> Result<(), TransientError> {
226    let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
227
228    // Find the relevant table
229    let binlog_table_id = event.table_id();
230    let table_map_event = ctx
231        .stream
232        .get_ref()
233        .get_tme(binlog_table_id)
234        .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
235    let table = MySqlTableName::new(
236        &*table_map_event.database_name(),
237        &*table_map_event.table_name(),
238    );
239
240    let outputs = ctx.table_info.get(&table).map(|outputs| {
241        outputs
242            .into_iter()
243            .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
244            .collect::<Vec<_>>()
245    });
246    let outputs = match outputs {
247        Some(outputs) => outputs,
248        None => {
249            // We don't know about this table, or there are no un-errored outputs for it.
250            return Ok(());
251        }
252    };
253
254    trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
255
256    // Capability for this event.
257    let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
258
259    // Iterate over the rows in this RowsEvent. Each row is a pair of 'before_row', 'after_row',
260    // to accomodate for updates and deletes (which include a before_row),
261    // and updates and inserts (which inclued an after row).
262    let mut final_row = Row::default();
263    let mut rows_iter = event.rows(table_map_event);
264    let mut rewind_count = 0;
265    let mut additions = 0;
266    let mut retractions = 0;
267    while let Some(Ok((before_row, after_row))) = rows_iter.next() {
268        // Update metrics for updates/inserts/deletes
269        match (&before_row, &after_row) {
270            (None, None) => {}
271            (Some(_), Some(_)) => {
272                ctx.metrics.updates.inc();
273            }
274            (None, Some(_)) => {
275                ctx.metrics.inserts.inc();
276            }
277            (Some(_), None) => {
278                ctx.metrics.deletes.inc();
279            }
280        }
281
282        let updates = [
283            before_row.map(|r| (r, Diff::MINUS_ONE)),
284            after_row.map(|r| (r, Diff::ONE)),
285        ];
286        for (binlog_row, diff) in updates.into_iter().flatten() {
287            let row = mysql_async::Row::try_from(binlog_row)?;
288            for (output, row_val) in outputs.iter().repeat_clone(row) {
289                let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) {
290                    Ok(row) => Ok(SourceMessage {
291                        key: Row::default(),
292                        value: row,
293                        metadata: Row::default(),
294                    }),
295                    // Produce a DefiniteError in the stream for any rows that fail to decode
296                    Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
297                        DefiniteError::ValueDecodeError(err.to_string()),
298                    )),
299                    Err(err) => Err(err)?,
300                };
301
302                let data = (output.output_index, event);
303
304                // Rewind this update if it was already present in the snapshot
305                if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
306                {
307                    if !rewind_req.snapshot_upper.less_equal(new_gtid) {
308                        rewind_count += 1;
309                        event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
310                    }
311                }
312                if diff.is_positive() {
313                    additions += 1;
314                } else {
315                    retractions += 1;
316                }
317                ctx.data_output
318                    .give_fueled(&gtid_cap, (data, new_gtid.clone(), diff))
319                    .await;
320            }
321        }
322    }
323
324    // We want to emit data in individual pieces to allow timely to break large chunks of data into
325    // containers. Naively interleaving new data and rewinds in the loop above defeats a timely
326    // optimization that caches push buffers if the `.give()` time has not changed.
327    //
328    // Instead, we buffer rewind events into a reusable buffer, and emit all at once here at the end.
329
330    if !event_buffer.is_empty() {
331        for d in event_buffer.drain(..) {
332            let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
333            ctx.data_output.give_fueled(rewind_data_cap, d).await;
334        }
335    }
336
337    trace!(
338        %id,
339        "timely-{worker_id} sent updates for {new_gtid:?} \
340            with {} updates ({} additions, {} retractions) and {} \
341            rewinds",
342        additions + retractions,
343        additions,
344        retractions,
345        rewind_count,
346    );
347
348    Ok(())
349}