1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use maplit::btreemap;
11use mysql_common::binlog::events::{QueryEvent, RowsEventData};
12use mz_mysql_util::{MySqlError, pack_mysql_row};
13use mz_ore::iter::IteratorExt;
14use mz_repr::{Diff, Row};
15use mz_storage_types::errors::DataflowError;
16use mz_storage_types::sources::mysql::GtidPartition;
17use timely::progress::Timestamp;
18use tracing::trace;
1920use crate::source::types::SourceMessage;
2122use super::super::schemas::verify_schemas;
23use super::super::{DefiniteError, MySqlTableName, TransientError};
24use super::context::ReplContext;
2526/// Returns the MySqlTableName for the given table name referenced in a
27/// SQL statement, using the current schema if the table name is unqualified.
28fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
29let stripped = name.replace('`', "");
30let mut name_iter = stripped.split('.');
31match (name_iter.next(), name_iter.next()) {
32 (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
33 (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
34_ => Err(TransientError::Generic(anyhow::anyhow!(
35"Invalid table name from QueryEvent: {}",
36 name
37 ))),
38 }
39}
4041/// Handles QueryEvents from the MySQL replication stream. Since we only use
42/// row-based replication, we only expect to see QueryEvents for DDL changes.
43///
44/// From the MySQL docs: 'A Query_event is created for each query that modifies
45/// the database, unless the query is logged row-based.' This means that we can
46/// expect any DDL changes to be represented as QueryEvents, which we must parse
47/// to figure out if any of the tables we care about have been affected.
48///
49/// This function returns a bool to represent whether the event that was handled
50/// represents a 'complete' event that should cause the frontier to advance beyond
51/// the current GTID.
52pub(super) async fn handle_query_event(
53 event: QueryEvent<'_>,
54 ctx: &mut ReplContext<'_>,
55 new_gtid: &GtidPartition,
56) -> Result<bool, TransientError> {
57let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
5859let query = event.query();
60let current_schema = event.schema();
61let mut is_complete_event = false;
6263// MySQL does not permit transactional DDL, so luckily we don't need to
64 // worry about tracking BEGIN/COMMIT query events. We only need to look
65 // for DDL changes that affect the schema of the tables we care about.
66let mut query_iter = query.split_ascii_whitespace();
67let first = query_iter.next();
68let second = query_iter.next();
69match (
70 first.map(str::to_ascii_lowercase).as_deref(),
71 second.map(str::to_ascii_lowercase).as_deref(),
72 ) {
73// Detect `ALTER TABLE <tbl>`, `RENAME TABLE <tbl>` statements
74(Some("alter") | Some("rename"), Some("table")) => {
75let table = table_ident(
76 query_iter.next().ok_or_else(|| {
77 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
78 })?,
79¤t_schema,
80 )?;
81 is_complete_event = true;
82if ctx.table_info.contains_key(&table) {
83trace!(%id, "timely-{worker_id} DDL change detected \
84 for {table:?}");
85let info = &ctx.table_info[&table];
86let mut conn = ctx
87 .connection_config
88 .connect(
89&format!("timely-{worker_id} MySQL "),
90&ctx.config.config.connection_context.ssh_tunnel_manager,
91 )
92 .await?;
93for (err_output, err) in
94verify_schemas(&mut *conn, btreemap! { &table => info }).await?
95{
96trace!(%id, "timely-{worker_id} DDL change \
97 verification error for {table:?}[{}]: {err:?}",
98 err_output.output_index);
99let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
100 ctx.data_output
101 .give_fueled(
102>id_cap,
103 (
104 (err_output.output_index, Err(err.into())),
105 new_gtid.clone(),
106 Diff::ONE,
107 ),
108 )
109 .await;
110 ctx.errored_outputs.insert(err_output.output_index);
111 }
112 }
113 }
114// Detect `DROP TABLE [IF EXISTS] <tbl>, <tbl>` statements. Since
115 // this can drop multiple tables we just check all tables we care about
116(Some("drop"), Some("table")) => {
117let mut conn = ctx
118 .connection_config
119 .connect(
120&format!("timely-{worker_id} MySQL "),
121&ctx.config.config.connection_context.ssh_tunnel_manager,
122 )
123 .await?;
124let expected = ctx
125 .table_info
126 .iter()
127 .map(|(t, info)| {
128 (
129 t,
130 info.iter()
131 .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
132 )
133 })
134 .collect();
135let schema_errors = verify_schemas(&mut *conn, expected).await?;
136 is_complete_event = true;
137for (dropped_output, err) in schema_errors {
138trace!(%id, "timely-{worker_id} DDL change \
139 dropped output: {dropped_output:?}: {err:?}");
140let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
141 ctx.data_output
142 .give_fueled(
143>id_cap,
144 (
145 (dropped_output.output_index, Err(err.into())),
146 new_gtid.clone(),
147 Diff::ONE,
148 ),
149 )
150 .await;
151 ctx.errored_outputs.insert(dropped_output.output_index);
152 }
153 }
154// Detect `TRUNCATE [TABLE] <tbl>` statements
155(Some("truncate"), Some(_)) => {
156// We need the original un-lowercased version of 'second' since it might be a table ref
157let second = second.expect("known to be Some");
158let table = if second.eq_ignore_ascii_case("table") {
159 table_ident(
160 query_iter.next().ok_or_else(|| {
161 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
162 })?,
163¤t_schema,
164 )?
165} else {
166 table_ident(second, ¤t_schema)?
167};
168 is_complete_event = true;
169if ctx.table_info.contains_key(&table) {
170trace!(%id, "timely-{worker_id} TRUNCATE detected \
171 for {table:?}");
172if let Some(info) = ctx.table_info.get(&table) {
173let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
174for output in info {
175 ctx.data_output
176 .give_fueled(
177>id_cap,
178 (
179 (
180 output.output_index,
181Err(DataflowError::from(DefiniteError::TableTruncated(
182 table.to_string(),
183 ))),
184 ),
185 new_gtid.clone(),
186 Diff::ONE,
187 ),
188 )
189 .await;
190 ctx.errored_outputs.insert(output.output_index);
191 }
192 }
193 }
194 }
195// Detect `COMMIT` statements which signify the end of a transaction on non-XA capable
196 // storage engines
197(Some("commit"), None) => {
198 is_complete_event = true;
199 }
200// Detect `CREATE TABLE <tbl>` statements which don't affect existing tables but do
201 // signify a complete event (e.g. for the purposes of advancing the GTID)
202(Some("create"), Some("table")) => {
203 is_complete_event = true;
204 }
205_ => {}
206 }
207208Ok(is_complete_event)
209}
210211/// Handles RowsEvents from the MySQL replication stream. These events contain
212/// insert/update/delete events for a single transaction or committed statement.
213///
214/// We use these events to update the dataflow with the new rows, and return a new
215/// frontier with which to advance the dataflow's progress.
216pub(super) async fn handle_rows_event(
217 event: RowsEventData<'_>,
218 ctx: &ReplContext<'_>,
219 new_gtid: &GtidPartition,
220 event_buffer: &mut Vec<(
221 (usize, Result<SourceMessage, DataflowError>),
222 GtidPartition,
223 mz_repr::Diff,
224 )>,
225) -> Result<(), TransientError> {
226let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
227228// Find the relevant table
229let binlog_table_id = event.table_id();
230let table_map_event = ctx
231 .stream
232 .get_ref()
233 .get_tme(binlog_table_id)
234 .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
235let table = MySqlTableName::new(
236&*table_map_event.database_name(),
237&*table_map_event.table_name(),
238 );
239240let outputs = ctx.table_info.get(&table).map(|outputs| {
241 outputs
242 .into_iter()
243 .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
244 .collect::<Vec<_>>()
245 });
246let outputs = match outputs {
247Some(outputs) => outputs,
248None => {
249// We don't know about this table, or there are no un-errored outputs for it.
250return Ok(());
251 }
252 };
253254trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
255256// Capability for this event.
257let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
258259// Iterate over the rows in this RowsEvent. Each row is a pair of 'before_row', 'after_row',
260 // to accomodate for updates and deletes (which include a before_row),
261 // and updates and inserts (which inclued an after row).
262let mut final_row = Row::default();
263let mut rows_iter = event.rows(table_map_event);
264let mut rewind_count = 0;
265let mut additions = 0;
266let mut retractions = 0;
267while let Some(Ok((before_row, after_row))) = rows_iter.next() {
268// Update metrics for updates/inserts/deletes
269match (&before_row, &after_row) {
270 (None, None) => {}
271 (Some(_), Some(_)) => {
272 ctx.metrics.updates.inc();
273 }
274 (None, Some(_)) => {
275 ctx.metrics.inserts.inc();
276 }
277 (Some(_), None) => {
278 ctx.metrics.deletes.inc();
279 }
280 }
281282let updates = [
283 before_row.map(|r| (r, Diff::MINUS_ONE)),
284 after_row.map(|r| (r, Diff::ONE)),
285 ];
286for (binlog_row, diff) in updates.into_iter().flatten() {
287let row = mysql_async::Row::try_from(binlog_row)?;
288for (output, row_val) in outputs.iter().repeat_clone(row) {
289let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) {
290Ok(row) => Ok(SourceMessage {
291 key: Row::default(),
292 value: row,
293 metadata: Row::default(),
294 }),
295// Produce a DefiniteError in the stream for any rows that fail to decode
296Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
297 DefiniteError::ValueDecodeError(err.to_string()),
298 )),
299Err(err) => Err(err)?,
300 };
301302let data = (output.output_index, event);
303304// Rewind this update if it was already present in the snapshot
305if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
306 {
307if !rewind_req.snapshot_upper.less_equal(new_gtid) {
308 rewind_count += 1;
309 event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
310 }
311 }
312if diff.is_positive() {
313 additions += 1;
314 } else {
315 retractions += 1;
316 }
317 ctx.data_output
318 .give_fueled(>id_cap, (data, new_gtid.clone(), diff))
319 .await;
320 }
321 }
322 }
323324// We want to emit data in individual pieces to allow timely to break large chunks of data into
325 // containers. Naively interleaving new data and rewinds in the loop above defeats a timely
326 // optimization that caches push buffers if the `.give()` time has not changed.
327 //
328 // Instead, we buffer rewind events into a reusable buffer, and emit all at once here at the end.
329330if !event_buffer.is_empty() {
331for d in event_buffer.drain(..) {
332let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
333 ctx.data_output.give_fueled(rewind_data_cap, d).await;
334 }
335 }
336337trace!(
338 %id,
339"timely-{worker_id} sent updates for {new_gtid:?} \
340 with {} updates ({} additions, {} retractions) and {} \
341 rewinds",
342 additions + retractions,
343 additions,
344 retractions,
345 rewind_count,
346 );
347348Ok(())
349}