mz_storage/source/mysql/replication/
events.rs1use maplit::btreemap;
11use mysql_common::binlog::events::{QueryEvent, RowsEventData};
12use mz_mysql_util::{MySqlError, pack_mysql_row};
13use mz_ore::iter::IteratorExt;
14use mz_repr::{Diff, Row};
15use mz_storage_types::errors::DataflowError;
16use mz_storage_types::sources::mysql::GtidPartition;
17use timely::progress::Timestamp;
18use tracing::trace;
19
20use crate::source::types::SourceMessage;
21
22use super::super::schemas::verify_schemas;
23use super::super::{DefiniteError, MySqlTableName, TransientError};
24use super::context::ReplContext;
25
26fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
29 let stripped = name.replace('`', "");
30 let mut name_iter = stripped.split('.');
31 match (name_iter.next(), name_iter.next()) {
32 (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
33 (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
34 _ => Err(TransientError::Generic(anyhow::anyhow!(
35 "Invalid table name from QueryEvent: {}",
36 name
37 ))),
38 }
39}
40
41pub(super) async fn handle_query_event(
53 event: QueryEvent<'_>,
54 ctx: &mut ReplContext<'_>,
55 new_gtid: &GtidPartition,
56) -> Result<bool, TransientError> {
57 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
58
59 let query = event.query();
60 let current_schema = event.schema();
61 let mut is_complete_event = false;
62
63 let mut query_iter = query.split_ascii_whitespace();
67 let first = query_iter.next();
68 let second = query_iter.next();
69 match (
70 first.map(str::to_ascii_lowercase).as_deref(),
71 second.map(str::to_ascii_lowercase).as_deref(),
72 ) {
73 (Some("alter") | Some("rename"), Some("table")) => {
75 let table = table_ident(
76 query_iter.next().ok_or_else(|| {
77 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
78 })?,
79 ¤t_schema,
80 )?;
81 is_complete_event = true;
82 if ctx.table_info.contains_key(&table) {
83 trace!(%id, "timely-{worker_id} DDL change detected \
84 for {table:?}");
85 let info = &ctx.table_info[&table];
86 let mut conn = ctx
87 .connection_config
88 .connect(
89 &format!("timely-{worker_id} MySQL "),
90 &ctx.config.config.connection_context.ssh_tunnel_manager,
91 )
92 .await?;
93 for (err_output, err) in
94 verify_schemas(&mut *conn, btreemap! { &table => info }).await?
95 {
96 trace!(%id, "timely-{worker_id} DDL change \
97 verification error for {table:?}[{}]: {err:?}",
98 err_output.output_index);
99 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
100 ctx.data_output
101 .give_fueled(
102 >id_cap,
103 (
104 (err_output.output_index, Err(err.into())),
105 new_gtid.clone(),
106 Diff::ONE,
107 ),
108 )
109 .await;
110 ctx.errored_outputs.insert(err_output.output_index);
111 }
112 }
113 }
114 (Some("drop"), Some("table")) => {
117 let mut conn = ctx
118 .connection_config
119 .connect(
120 &format!("timely-{worker_id} MySQL "),
121 &ctx.config.config.connection_context.ssh_tunnel_manager,
122 )
123 .await?;
124 let expected = ctx
125 .table_info
126 .iter()
127 .map(|(t, info)| {
128 (
129 t,
130 info.iter()
131 .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
132 )
133 })
134 .collect();
135 let schema_errors = verify_schemas(&mut *conn, expected).await?;
136 is_complete_event = true;
137 for (dropped_output, err) in schema_errors {
138 trace!(%id, "timely-{worker_id} DDL change \
139 dropped output: {dropped_output:?}: {err:?}");
140 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
141 ctx.data_output
142 .give_fueled(
143 >id_cap,
144 (
145 (dropped_output.output_index, Err(err.into())),
146 new_gtid.clone(),
147 Diff::ONE,
148 ),
149 )
150 .await;
151 ctx.errored_outputs.insert(dropped_output.output_index);
152 }
153 }
154 (Some("truncate"), Some(_)) => {
156 let second = second.expect("known to be Some");
158 let table = if second.eq_ignore_ascii_case("table") {
159 table_ident(
160 query_iter.next().ok_or_else(|| {
161 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
162 })?,
163 ¤t_schema,
164 )?
165 } else {
166 table_ident(second, ¤t_schema)?
167 };
168 is_complete_event = true;
169 if ctx.table_info.contains_key(&table) {
170 trace!(%id, "timely-{worker_id} TRUNCATE detected \
171 for {table:?}");
172 if let Some(info) = ctx.table_info.get(&table) {
173 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
174 for output in info {
175 ctx.data_output
176 .give_fueled(
177 >id_cap,
178 (
179 (
180 output.output_index,
181 Err(DataflowError::from(DefiniteError::TableTruncated(
182 table.to_string(),
183 ))),
184 ),
185 new_gtid.clone(),
186 Diff::ONE,
187 ),
188 )
189 .await;
190 ctx.errored_outputs.insert(output.output_index);
191 }
192 }
193 }
194 }
195 (Some("commit"), None) => {
198 is_complete_event = true;
199 }
200 (Some("create"), Some("table")) => {
203 let mut peek_stream = query_iter.peekable();
207 let mut ctas = false;
208 while let Some(token) = peek_stream.next() {
209 if token.eq_ignore_ascii_case("start")
210 && peek_stream
211 .peek()
212 .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
213 {
214 ctas = true;
215 break;
216 }
217 }
218 is_complete_event = !ctas;
219 }
220 _ => {}
221 }
222
223 Ok(is_complete_event)
224}
225
226pub(super) async fn handle_rows_event(
232 event: RowsEventData<'_>,
233 ctx: &ReplContext<'_>,
234 new_gtid: &GtidPartition,
235 event_buffer: &mut Vec<(
236 (usize, Result<SourceMessage, DataflowError>),
237 GtidPartition,
238 mz_repr::Diff,
239 )>,
240) -> Result<(), TransientError> {
241 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
242
243 let binlog_table_id = event.table_id();
245 let table_map_event = ctx
246 .stream
247 .get_ref()
248 .get_tme(binlog_table_id)
249 .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
250 let table = MySqlTableName::new(
251 &*table_map_event.database_name(),
252 &*table_map_event.table_name(),
253 );
254
255 let outputs = ctx.table_info.get(&table).map(|outputs| {
256 outputs
257 .into_iter()
258 .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
259 .collect::<Vec<_>>()
260 });
261 let outputs = match outputs {
262 Some(outputs) => outputs,
263 None => {
264 return Ok(());
266 }
267 };
268
269 trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
270
271 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
273
274 let mut final_row = Row::default();
278 let mut rows_iter = event.rows(table_map_event);
279 let mut rewind_count = 0;
280 let mut additions = 0;
281 let mut retractions = 0;
282 while let Some(Ok((before_row, after_row))) = rows_iter.next() {
283 match (&before_row, &after_row) {
285 (None, None) => {}
286 (Some(_), Some(_)) => {
287 ctx.metrics.updates.inc();
288 }
289 (None, Some(_)) => {
290 ctx.metrics.inserts.inc();
291 }
292 (Some(_), None) => {
293 ctx.metrics.deletes.inc();
294 }
295 }
296
297 let updates = [
298 before_row.map(|r| (r, Diff::MINUS_ONE)),
299 after_row.map(|r| (r, Diff::ONE)),
300 ];
301 for (binlog_row, diff) in updates.into_iter().flatten() {
302 let row = mysql_async::Row::try_from(binlog_row)?;
303 for (output, row_val) in outputs.iter().repeat_clone(row) {
304 let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) {
305 Ok(row) => Ok(SourceMessage {
306 key: Row::default(),
307 value: row,
308 metadata: Row::default(),
309 }),
310 Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
312 DefiniteError::ValueDecodeError(err.to_string()),
313 )),
314 Err(err) => Err(err)?,
315 };
316
317 let data = (output.output_index, event);
318
319 if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
321 {
322 if !rewind_req.snapshot_upper.less_equal(new_gtid) {
323 rewind_count += 1;
324 event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
325 }
326 }
327 if diff.is_positive() {
328 additions += 1;
329 } else {
330 retractions += 1;
331 }
332 ctx.data_output
333 .give_fueled(>id_cap, (data, new_gtid.clone(), diff))
334 .await;
335 }
336 }
337 }
338
339 if !event_buffer.is_empty() {
346 for d in event_buffer.drain(..) {
347 let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
348 ctx.data_output.give_fueled(rewind_data_cap, d).await;
349 }
350 }
351
352 trace!(
353 %id,
354 "timely-{worker_id} sent updates for {new_gtid:?} \
355 with {} updates ({} additions, {} retractions) and {} \
356 rewinds",
357 additions + retractions,
358 additions,
359 retractions,
360 rewind_count,
361 );
362
363 Ok(())
364}