mz_storage/source/mysql/replication/
events.rs1use maplit::btreemap;
11use mysql_async::binlog::events::OptionalMetaExtractor;
12use mysql_common::binlog::events::{QueryEvent, RowsEventData};
13use mz_mysql_util::{MySqlError, pack_mysql_row};
14use mz_ore::iter::IteratorExt;
15use mz_repr::{Diff, Row};
16use mz_storage_types::errors::DataflowError;
17use mz_storage_types::sources::mysql::GtidPartition;
18use timely::progress::Timestamp;
19use tracing::trace;
20
21use crate::source::types::{FuelSize, SourceMessage};
22
23use super::super::schemas::verify_schemas;
24use super::super::{DefiniteError, MySqlTableName, TransientError};
25use super::context::ReplContext;
26
27fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
30 let stripped = name.replace('`', "");
31 let mut name_iter = stripped.split('.');
32 match (name_iter.next(), name_iter.next()) {
33 (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
34 (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
35 _ => Err(TransientError::Generic(anyhow::anyhow!(
36 "Invalid table name from QueryEvent: {}",
37 name
38 ))),
39 }
40}
41
42pub(super) async fn handle_query_event(
54 event: QueryEvent<'_>,
55 ctx: &mut ReplContext<'_>,
56 new_gtid: &GtidPartition,
57) -> Result<bool, TransientError> {
58 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
59
60 let query = event.query();
61 let current_schema = event.schema();
62 let mut is_complete_event = false;
63
64 let mut query_iter = query.split_ascii_whitespace();
68 let first = query_iter.next();
69 let second = query_iter.next();
70 match (
71 first.map(str::to_ascii_lowercase).as_deref(),
72 second.map(str::to_ascii_lowercase).as_deref(),
73 ) {
74 (Some("alter") | Some("rename"), Some("table")) => {
76 let table = table_ident(
77 query_iter.next().ok_or_else(|| {
78 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
79 })?,
80 ¤t_schema,
81 )?;
82 is_complete_event = true;
83 if ctx.table_info.contains_key(&table) {
84 trace!(%id, "timely-{worker_id} DDL change detected \
85 for {table:?}");
86 let info = &ctx.table_info[&table];
87 let mut conn = ctx
88 .connection_config
89 .connect(
90 &format!("timely-{worker_id} MySQL "),
91 &ctx.config.config.connection_context.ssh_tunnel_manager,
92 )
93 .await?;
94 for (err_output, err) in
95 verify_schemas(&mut *conn, btreemap! { &table => info }).await?
96 {
97 tracing::warn!(%id, "timely-{worker_id} DDL change \
98 verification error for {table:?}[{}]: {err:?}",
99 err_output.output_index);
100 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
101 let update = (
102 (err_output.output_index, Err(err.into())),
103 new_gtid.clone(),
104 Diff::ONE,
105 );
106 let size = update.fuel_size();
107 ctx.data_output.give_fueled(>id_cap, update, size).await;
108 ctx.errored_outputs.insert(err_output.output_index);
109 }
110 }
111 }
112 (Some("drop"), Some("table")) => {
115 let mut conn = ctx
116 .connection_config
117 .connect(
118 &format!("timely-{worker_id} MySQL "),
119 &ctx.config.config.connection_context.ssh_tunnel_manager,
120 )
121 .await?;
122 let expected = ctx
123 .table_info
124 .iter()
125 .map(|(t, info)| {
126 (
127 t,
128 info.iter()
129 .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
130 )
131 })
132 .collect();
133 let schema_errors = verify_schemas(&mut *conn, expected).await?;
134 is_complete_event = true;
135 for (dropped_output, err) in schema_errors {
136 tracing::info!(%id, "timely-{worker_id} DDL change \
137 dropped output: {dropped_output:?}: {err:?}");
138 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
139 let update = (
140 (dropped_output.output_index, Err(err.into())),
141 new_gtid.clone(),
142 Diff::ONE,
143 );
144 let size = std::mem::size_of_val(&update);
145 ctx.data_output.give_fueled(>id_cap, update, size).await;
146 ctx.errored_outputs.insert(dropped_output.output_index);
147 }
148 }
149 (Some("truncate"), Some(_)) => {
151 let second = second.expect("known to be Some");
153 let table = if second.eq_ignore_ascii_case("table") {
154 table_ident(
155 query_iter.next().ok_or_else(|| {
156 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
157 })?,
158 ¤t_schema,
159 )?
160 } else {
161 table_ident(second, ¤t_schema)?
162 };
163 is_complete_event = true;
164 if ctx.table_info.contains_key(&table) {
165 trace!(%id, "timely-{worker_id} TRUNCATE detected \
166 for {table:?}");
167 if let Some(info) = ctx.table_info.get(&table) {
168 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
169 for output in info {
170 let update = (
171 (
172 output.output_index,
173 Err(DataflowError::from(DefiniteError::TableTruncated(
174 table.to_string(),
175 ))),
176 ),
177 new_gtid.clone(),
178 Diff::ONE,
179 );
180 let size = update.fuel_size();
181 ctx.data_output.give_fueled(>id_cap, update, size).await;
182 ctx.errored_outputs.insert(output.output_index);
183 }
184 }
185 }
186 }
187 (Some("commit"), None) => {
190 is_complete_event = true;
191 }
192 (Some("create"), Some("table")) => {
195 let mut peek_stream = query_iter.peekable();
199 let mut ctas = false;
200 while let Some(token) = peek_stream.next() {
201 if token.eq_ignore_ascii_case("start")
202 && peek_stream
203 .peek()
204 .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
205 {
206 ctas = true;
207 break;
208 }
209 }
210 is_complete_event = !ctas;
211 }
212 _ => {}
213 }
214
215 Ok(is_complete_event)
216}
217
218pub(super) async fn handle_rows_event(
224 event: RowsEventData<'_>,
225 ctx: &mut ReplContext<'_>,
226 new_gtid: &GtidPartition,
227 event_buffer: &mut Vec<(
228 (usize, Result<SourceMessage, DataflowError>),
229 GtidPartition,
230 mz_repr::Diff,
231 )>,
232) -> Result<(), TransientError> {
233 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
234
235 let binlog_table_id = event.table_id();
237 let table_map_event = ctx
238 .stream
239 .get_ref()
240 .get_tme(binlog_table_id)
241 .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
242 let table = MySqlTableName::new(
243 &*table_map_event.database_name(),
244 &*table_map_event.table_name(),
245 );
246
247 let outputs = ctx.table_info.get(&table).map(|outputs| {
248 outputs
249 .into_iter()
250 .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
251 .collect::<Vec<_>>()
252 });
253 let outputs = match outputs {
254 Some(outputs) => outputs,
255 None => {
256 return Ok(());
258 }
259 };
260
261 trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
262
263 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
265
266 let optional_metadata = OptionalMetaExtractor::new(table_map_event.iter_optional_meta())?;
269 let has_full_metadata = optional_metadata.iter_column_name().next().is_some();
270
271 let mut final_row = Row::default();
275 let mut rows_iter = event.rows(table_map_event);
276 let mut rewind_count = 0;
277 let mut additions = 0;
278 let mut retractions = 0;
279 while let Some(Ok((before_row, after_row))) = rows_iter.next() {
280 match (&before_row, &after_row) {
282 (None, None) => {}
283 (Some(_), Some(_)) => {
284 ctx.metrics.updates.inc();
285 }
286 (None, Some(_)) => {
287 ctx.metrics.inserts.inc();
288 }
289 (Some(_), None) => {
290 ctx.metrics.deletes.inc();
291 }
292 }
293
294 let updates = [
295 before_row.map(|r| (r, Diff::MINUS_ONE)),
296 after_row.map(|r| (r, Diff::ONE)),
297 ];
298 let gtid_str = format!("{new_gtid:?}");
299 for (binlog_row, diff) in updates.into_iter().flatten() {
300 let row = mysql_async::Row::try_from(binlog_row)?;
301 for (output, row_val) in outputs.iter().repeat_clone(row) {
302 let event = if !has_full_metadata && output.binlog_full_metadata {
303 ctx.errored_outputs.insert(output.output_index);
304 Err(DataflowError::from(DefiniteError::ValueDecodeError(
305 format!(
306 "Table {0} was created with binlog_row_metadata=FULL but binlog_row_metadata has since been set to a different value, meaning we cannot reliably decode the columns",
307 output.table_name
308 ),
309 )))
310 } else {
311 match pack_mysql_row(
312 &mut final_row,
313 row_val,
314 &output.desc,
315 Some(>id_str),
316 output.binlog_full_metadata,
317 ) {
318 Ok(row) => Ok(SourceMessage {
319 key: Row::default(),
320 value: row,
321 metadata: Row::default(),
322 }),
323 Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
325 DefiniteError::ValueDecodeError(err.to_string()),
326 )),
327 Err(err) => Err(err)?,
328 }
329 };
330
331 let data = (output.output_index, event);
332
333 if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
335 {
336 if !rewind_req.snapshot_upper.less_equal(new_gtid) {
337 rewind_count += 1;
338 event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
339 }
340 }
341 if diff.is_positive() {
342 additions += 1;
343 } else {
344 retractions += 1;
345 }
346 let update = (data, new_gtid.clone(), diff);
347 let size = update.fuel_size();
348 ctx.data_output.give_fueled(>id_cap, update, size).await;
349 }
350 }
351 }
352
353 if !event_buffer.is_empty() {
360 for d in event_buffer.drain(..) {
361 let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
362 let size = d.fuel_size();
363 ctx.data_output.give_fueled(rewind_data_cap, d, size).await;
364 }
365 }
366
367 trace!(
368 %id,
369 "timely-{worker_id} sent updates for {new_gtid:?} \
370 with {} updates ({} additions, {} retractions) and {} \
371 rewinds",
372 additions + retractions,
373 additions,
374 retractions,
375 rewind_count,
376 );
377
378 Ok(())
379}