1use itertools::Itertools;
11use sqlparser::ast::{ObjectName, ObjectNamePart, ObjectType, Statement};
12use sqlparser::dialect::MySqlDialect;
13use sqlparser::parser::Parser;
14use std::collections::BTreeMap;
15
16use maplit::btreemap;
17use mysql_async::binlog::events::OptionalMetaExtractor;
18use mysql_common::binlog::events::{QueryEvent, RowsEventData};
19use mz_mysql_util::{MySqlError, pack_mysql_row};
20use mz_ore::iter::IteratorExt;
21use mz_repr::{Diff, Row};
22use mz_storage_types::errors::DataflowError;
23use mz_storage_types::sources::mysql::GtidPartition;
24use timely::progress::Timestamp;
25use tracing::trace;
26
27use crate::source::mysql::SourceOutputInfo;
28use crate::source::types::{FuelSize, SourceMessage};
29
30use super::super::schemas::verify_schemas;
31use super::super::{DefiniteError, MySqlTableName, TransientError};
32use super::context::ReplContext;
33
34const DIALECT: MySqlDialect = MySqlDialect {};
35
36fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
39 let stripped = name.replace('`', "");
40 let mut name_iter = stripped.split('.');
41 mysql_table_name(name_iter.next(), name_iter.next(), current_schema, name)
42}
43
44fn mysql_table_name(
45 first_component: Option<&str>,
46 second_component: Option<&str>,
47 current_schema: &str,
48 name: &str,
49) -> Result<MySqlTableName, TransientError> {
50 match (first_component, second_component) {
51 (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
52 (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
53 _ => Err(TransientError::Generic(anyhow::anyhow!(
54 "Invalid table name from QueryEvent: {}",
55 name
56 ))),
57 }
58}
59
60fn table_ident_from_object_name(
64 name: &ObjectName,
65 current_schema: &str,
66) -> Result<MySqlTableName, TransientError> {
67 let processed_name_parts: Vec<String> = name.0.iter().map(|part| match part {
68 ObjectNamePart::Identifier(ident) => Ok(ident.value.clone()),
69 ObjectNamePart::Function(_) => Err(TransientError::Generic(anyhow::anyhow!(
71 "Invalid table name from QueryEvent, function identifiers not supported in mysql: {}", name
72 ))),
73 }).collect::<Result<_, _>>()?;
74 if processed_name_parts.len() != 1 && processed_name_parts.len() != 2 {
75 return Err(TransientError::Generic(anyhow::anyhow!(
76 "Invalid table name from QueryEvent: {}",
77 name
78 )));
79 }
80
81 let mut name_parts_iter = processed_name_parts.iter().map(|x| x.as_str());
82 mysql_table_name(
83 name_parts_iter.next(),
84 name_parts_iter.next(),
85 current_schema,
86 &name.to_string(),
87 )
88}
89
90pub(super) async fn handle_query_event(
102 event: QueryEvent<'_>,
103 ctx: &mut ReplContext<'_>,
104 new_gtid: &GtidPartition,
105) -> Result<bool, TransientError> {
106 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
107
108 let query = event.query();
109 let current_schema = event.schema();
110 let mut is_complete_event = false;
111
112 let mut query_iter = query.split_ascii_whitespace();
116 let first = query_iter.next();
117 let second = query_iter.next();
118 match (
119 first.map(str::to_ascii_lowercase).as_deref(),
120 second.map(str::to_ascii_lowercase).as_deref(),
121 ) {
122 (Some("alter") | Some("rename"), Some("table")) => {
124 let table = table_ident(
125 query_iter.next().ok_or_else(|| {
126 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
127 })?,
128 ¤t_schema,
129 )?;
130 is_complete_event = true;
131 if ctx.table_info.contains_key(&table) {
132 trace!(%id, "timely-{worker_id} DDL change detected \
133 for {table:?}");
134 let info = &ctx.table_info[&table];
135 let mut conn = ctx
136 .connection_config
137 .connect(
138 &format!("timely-{worker_id} MySQL "),
139 &ctx.config.config.connection_context.ssh_tunnel_manager,
140 )
141 .await?;
142 for (err_output, err) in
143 verify_schemas(&mut *conn, btreemap! { &table => info }).await?
144 {
145 tracing::warn!(%id, "timely-{worker_id} DDL change \
146 verification error for {table:?}[{}]: {err:?}",
147 err_output.output_index);
148 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
149 let update = (
150 (err_output.output_index, Err(err.into())),
151 new_gtid.clone(),
152 Diff::ONE,
153 );
154 let size = update.fuel_size();
155 ctx.data_output.give_fueled(>id_cap, update, size).await;
156 ctx.errored_outputs.insert(err_output.output_index);
157 }
158 }
159 }
160 (Some("drop"), Some("table")) => {
162 let dropped_tables = drop_table_identifiers(¤t_schema, &query)?;
163
164 let sources_to_drop: BTreeMap<&MySqlTableName, Vec<&SourceOutputInfo>> = dropped_tables
167 .iter()
168 .filter_map(|table_name| {
169 ctx.table_info
170 .get_key_value(table_name)
171 .map(|(name, info)| {
172 let kept = info
173 .iter()
174 .filter(|output| {
175 !ctx.errored_outputs.contains(&output.output_index)
176 && output.initial_gtid_set.less_equal(new_gtid)
178 })
179 .collect();
180 (name, kept)
181 })
182 })
183 .collect();
184 is_complete_event = true;
185 for (table_name, outputs) in sources_to_drop {
186 tracing::info!(%id, "timely-{worker_id} DDL change dropped outputs: {outputs:?}");
187 for output in outputs {
188 let err = DefiniteError::TableDropped(table_name.to_string());
189 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
190 let update = (
191 (output.output_index, Err(err.into())),
192 new_gtid.clone(),
193 Diff::ONE,
194 );
195 let size = std::mem::size_of_val(&update);
196 ctx.data_output.give_fueled(>id_cap, update, size).await;
197 ctx.errored_outputs.insert(output.output_index);
198 }
199 }
200 }
201 (Some("truncate"), Some(_)) => {
203 let second = second.expect("known to be Some");
205 let table = if second.eq_ignore_ascii_case("table") {
206 table_ident(
207 query_iter.next().ok_or_else(|| {
208 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
209 })?,
210 ¤t_schema,
211 )?
212 } else {
213 table_ident(second, ¤t_schema)?
214 };
215 is_complete_event = true;
216 if ctx.table_info.contains_key(&table) {
217 trace!(%id, "timely-{worker_id} TRUNCATE detected \
218 for {table:?}");
219 if let Some(info) = ctx.table_info.get(&table) {
220 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
221 for output in info {
222 let update = (
223 (
224 output.output_index,
225 Err(DataflowError::from(DefiniteError::TableTruncated(
226 table.to_string(),
227 ))),
228 ),
229 new_gtid.clone(),
230 Diff::ONE,
231 );
232 let size = update.fuel_size();
233 ctx.data_output.give_fueled(>id_cap, update, size).await;
234 ctx.errored_outputs.insert(output.output_index);
235 }
236 }
237 }
238 }
239 (Some("commit"), None) => {
242 is_complete_event = true;
243 }
244 (Some("create"), Some("table")) => {
247 let mut peek_stream = query_iter.peekable();
251 let mut ctas = false;
252 while let Some(token) = peek_stream.next() {
253 if token.eq_ignore_ascii_case("start")
254 && peek_stream
255 .peek()
256 .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
257 {
258 ctas = true;
259 break;
260 }
261 }
262 is_complete_event = !ctas;
263 }
264 _ => {}
265 }
266
267 Ok(is_complete_event)
268}
269
270fn drop_table_identifiers(
272 current_schema: &str,
273 query: &str,
274) -> Result<Vec<MySqlTableName>, TransientError> {
275 let invalid =
276 |msg| TransientError::Generic(anyhow::anyhow!("Invalid DDL query, {msg}, got: {query}"));
277
278 let parse_result = Parser::parse_sql(&DIALECT, query)
279 .map_err(|e| TransientError::Generic(anyhow::anyhow!(e)))?;
280 let stmt = parse_result
281 .iter()
282 .exactly_one()
283 .map_err(|_| invalid("expected only a single statement from the binlog"))?;
284
285 let Statement::Drop {
286 object_type,
287 temporary,
288 names,
289 ..
290 } = stmt
291 else {
292 return Err(invalid("expected DROP statement"));
293 };
294
295 if *object_type != ObjectType::Table || *temporary {
296 return Err(invalid("expected DROP TABLE statement"));
297 }
298 let table_identifiers: Vec<MySqlTableName> = names
299 .iter()
300 .map(|name| table_ident_from_object_name(name, current_schema))
301 .collect::<Result<Vec<_>, _>>()?;
302 Ok(table_identifiers)
303}
304
305pub(super) async fn handle_rows_event(
311 event: RowsEventData<'_>,
312 ctx: &mut ReplContext<'_>,
313 new_gtid: &GtidPartition,
314 event_buffer: &mut Vec<(
315 (usize, Result<SourceMessage, DataflowError>),
316 GtidPartition,
317 mz_repr::Diff,
318 )>,
319) -> Result<(), TransientError> {
320 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
321
322 let binlog_table_id = event.table_id();
324 let table_map_event = ctx
325 .stream
326 .get_ref()
327 .get_tme(binlog_table_id)
328 .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
329 let table = MySqlTableName::new(
330 &*table_map_event.database_name(),
331 &*table_map_event.table_name(),
332 );
333
334 let outputs = ctx.table_info.get(&table).map(|outputs| {
335 outputs
336 .into_iter()
337 .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
338 .collect::<Vec<_>>()
339 });
340 let outputs = match outputs {
341 Some(outputs) => outputs,
342 None => {
343 return Ok(());
345 }
346 };
347
348 trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
349
350 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
352
353 let optional_metadata = OptionalMetaExtractor::new(table_map_event.iter_optional_meta())?;
356 let has_full_metadata = optional_metadata.iter_column_name().next().is_some();
357
358 let mut final_row = Row::default();
362 let mut rows_iter = event.rows(table_map_event);
363 let mut rewind_count = 0;
364 let mut additions = 0;
365 let mut retractions = 0;
366 while let Some(Ok((before_row, after_row))) = rows_iter.next() {
367 match (&before_row, &after_row) {
369 (None, None) => {}
370 (Some(_), Some(_)) => {
371 ctx.metrics.updates.inc();
372 }
373 (None, Some(_)) => {
374 ctx.metrics.inserts.inc();
375 }
376 (Some(_), None) => {
377 ctx.metrics.deletes.inc();
378 }
379 }
380
381 let updates = [
382 before_row.map(|r| (r, Diff::MINUS_ONE)),
383 after_row.map(|r| (r, Diff::ONE)),
384 ];
385 let gtid_str = format!("{new_gtid:?}");
386 for (binlog_row, diff) in updates.into_iter().flatten() {
387 let row = mysql_async::Row::try_from(binlog_row)?;
388 for (output, row_val) in outputs.iter().repeat_clone(row) {
389 let event = if !has_full_metadata && output.binlog_full_metadata {
390 ctx.errored_outputs.insert(output.output_index);
391 Err(DataflowError::from(DefiniteError::ValueDecodeError(
392 format!(
393 "Table {0} was created with binlog_row_metadata=FULL but binlog_row_metadata has since been set to a different value, meaning we cannot reliably decode the columns",
394 output.table_name
395 ),
396 )))
397 } else {
398 match pack_mysql_row(
399 &mut final_row,
400 row_val,
401 &output.desc,
402 Some(>id_str),
403 output.binlog_full_metadata,
404 ) {
405 Ok(row) => Ok(SourceMessage {
406 key: Row::default(),
407 value: row,
408 metadata: Row::default(),
409 }),
410 Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
412 DefiniteError::ValueDecodeError(err.to_string()),
413 )),
414 Err(err) => Err(err)?,
415 }
416 };
417
418 let data = (output.output_index, event);
419
420 if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
422 {
423 if !rewind_req.snapshot_upper.less_equal(new_gtid) {
424 rewind_count += 1;
425 event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
426 }
427 }
428 if diff.is_positive() {
429 additions += 1;
430 } else {
431 retractions += 1;
432 }
433 let update = (data, new_gtid.clone(), diff);
434 let size = update.fuel_size();
435 ctx.data_output.give_fueled(>id_cap, update, size).await;
436 }
437 }
438 }
439
440 if !event_buffer.is_empty() {
447 for d in event_buffer.drain(..) {
448 let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
449 let size = d.fuel_size();
450 ctx.data_output.give_fueled(rewind_data_cap, d, size).await;
451 }
452 }
453
454 trace!(
455 %id,
456 "timely-{worker_id} sent updates for {new_gtid:?} \
457 with {} updates ({} additions, {} retractions) and {} \
458 rewinds",
459 additions + retractions,
460 additions,
461 retractions,
462 rewind_count,
463 );
464
465 Ok(())
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 fn table(schema: &str, name: &str) -> MySqlTableName {
473 MySqlTableName::new(schema, name)
474 }
475
476 fn parse_drop(
477 query: &str,
478 current_schema: &str,
479 ) -> Result<Vec<MySqlTableName>, TransientError> {
480 let mut tokens = query.split_ascii_whitespace();
481 let first = tokens.next();
482 let second = tokens.next();
483 match (
484 first.map(str::to_ascii_lowercase).as_deref(),
485 second.map(str::to_ascii_lowercase).as_deref(),
486 ) {
487 (Some("drop"), Some("table")) => drop_table_identifiers(current_schema, query),
488 _ => Ok(Vec::new()),
489 }
490 }
491
492 #[mz_ore::test]
493 fn table_ident_unqualified_uses_current_schema() {
494 assert_eq!(
495 table_ident("orders", "shop").unwrap(),
496 table("shop", "orders")
497 );
498 }
499
500 #[mz_ore::test]
501 fn table_ident_qualified_overrides_current_schema() {
502 assert_eq!(
503 table_ident("inventory.orders", "shop").unwrap(),
504 table("inventory", "orders"),
505 );
506 }
507
508 #[mz_ore::test]
509 fn table_ident_strips_backtick_quoting() {
510 assert_eq!(
511 table_ident("`inventory`.`orders`", "shop").unwrap(),
512 table("inventory", "orders"),
513 );
514 }
515
516 #[mz_ore::test]
517 fn drop_parses_single_unqualified_table() {
518 assert_eq!(
519 parse_drop("DROP TABLE orders", "shop").unwrap(),
520 vec![table("shop", "orders")],
521 );
522 }
523
524 #[mz_ore::test]
525 fn drop_parses_qualified_table() {
526 assert_eq!(
527 parse_drop("DROP TABLE inventory.orders", "shop").unwrap(),
528 vec![table("inventory", "orders")],
529 );
530 }
531
532 #[mz_ore::test]
533 fn drop_parses_backtick_quoted_table() {
534 assert_eq!(
535 parse_drop("DROP TABLE `inventory`.`orders`", "shop").unwrap(),
536 vec![table("inventory", "orders")],
537 );
538 }
539
540 #[mz_ore::test]
541 fn drop_parses_if_exists_clause() {
542 assert_eq!(
543 parse_drop("DROP TABLE IF EXISTS orders", "shop").unwrap(),
544 vec![table("shop", "orders")],
545 );
546 assert_eq!(
547 parse_drop("DROP TABLE if exists orders", "shop").unwrap(),
548 vec![table("shop", "orders")],
549 );
550 }
551
552 #[mz_ore::test]
553 fn drop_rejects_if_without_exists() {
554 assert!(parse_drop("DROP TABLE IF orders", "shop").is_err());
555 }
556
557 #[mz_ore::test]
558 fn drop_rejects_missing_table_name() {
559 assert!(parse_drop("DROP TABLE", "shop").is_err());
560 }
561
562 #[mz_ore::test]
563 fn drop_parses_multiple_space_separated_tables() {
564 assert_eq!(
565 parse_drop("DROP TABLE orders, customers, items", "shop").unwrap(),
566 vec![
567 table("shop", "orders"),
568 table("shop", "customers"),
569 table("shop", "items"),
570 ],
571 );
572 }
573
574 #[mz_ore::test]
575 fn drop_parses_comma_joined_tables_without_spaces() {
576 assert_eq!(
577 parse_drop("DROP TABLE `shop`.`orders`,`shop`.`customers`", "shop").unwrap(),
578 vec![table("shop", "orders"), table("shop", "customers")],
579 );
580 }
581
582 #[mz_ore::test]
583 fn drop_does_not_treat_comment_shaped_text_in_identifier_as_comment() {
584 assert_eq!(
585 parse_drop("DROP TABLE `tbl /* not a comment */`", "shop").unwrap(),
586 vec![table("shop", "tbl /* not a comment */")],
587 );
588 }
589
590 #[mz_ore::test]
591 fn drop_parses_table_with_restrict() {
592 assert_eq!(
593 parse_drop("DROP TABLE orders RESTRICT", "shop").unwrap(),
594 vec![table("shop", "orders")],
595 );
596 }
597
598 #[mz_ore::test]
599 fn drop_parses_multiple_tables_with_cascade() {
600 assert_eq!(
601 parse_drop("DROP TABLE orders, customers CASCADE", "shop").unwrap(),
602 vec![table("shop", "orders"), table("shop", "customers")],
603 );
604 }
605
606 #[mz_ore::test]
607 fn drop_of_multiple_statements_errors() {
608 assert!(parse_drop("DROP TABLE orders; DROP TABLE customers", "shop").is_err());
611 }
612
613 #[mz_ore::test]
614 fn drop_temporary_table_is_not_detected() {
615 assert_eq!(
616 parse_drop("DROP TEMPORARY TABLE orders", "shop").unwrap(),
617 Vec::<MySqlTableName>::new(),
618 );
619 }
620
621 #[mz_ore::test]
622 fn drop_quoted_identifier_with_dot_is_a_single_table() {
623 assert_eq!(
624 parse_drop("DROP TABLE `weird.name`", "shop").unwrap(),
625 vec![table("shop", "weird.name")],
626 );
627 }
628
629 #[mz_ore::test]
630 fn drop_escaped_backtick_identifier_is_preserved() {
631 assert_eq!(
632 parse_drop("DROP TABLE `a``b`", "shop").unwrap(),
633 vec![table("shop", "a`b")],
634 );
635 }
636
637 #[mz_ore::test]
638 fn identifier_with_multiple_dots_errors() {
639 assert!(parse_drop("DROP TABLE def.shop.customer", "shop").is_err());
640 }
641
642 #[mz_ore::test]
643 fn ansi_quotes_parsed_properly() {
644 assert_eq!(
645 parse_drop("DROP TABLE \"customer\"", "shop").unwrap(),
646 vec![table("shop", "customer")],
647 );
648 }
649}