mz_storage/source/mysql/replication/
events.rs1use maplit::btreemap;
11use mysql_common::binlog::events::{QueryEvent, RowsEventData};
12use mz_mysql_util::{MySqlError, pack_mysql_row};
13use mz_ore::iter::IteratorExt;
14use mz_repr::{Diff, Row};
15use mz_storage_types::errors::DataflowError;
16use mz_storage_types::sources::mysql::GtidPartition;
17use timely::progress::Timestamp;
18use tracing::trace;
19
20use crate::source::types::{FuelSize, SourceMessage};
21
22use super::super::schemas::verify_schemas;
23use super::super::{DefiniteError, MySqlTableName, TransientError};
24use super::context::ReplContext;
25
26fn table_ident(name: &str, current_schema: &str) -> Result<MySqlTableName, TransientError> {
29 let stripped = name.replace('`', "");
30 let mut name_iter = stripped.split('.');
31 match (name_iter.next(), name_iter.next()) {
32 (Some(t_name), None) => Ok(MySqlTableName::new(current_schema, t_name)),
33 (Some(schema_name), Some(t_name)) => Ok(MySqlTableName::new(schema_name, t_name)),
34 _ => Err(TransientError::Generic(anyhow::anyhow!(
35 "Invalid table name from QueryEvent: {}",
36 name
37 ))),
38 }
39}
40
41pub(super) async fn handle_query_event(
53 event: QueryEvent<'_>,
54 ctx: &mut ReplContext<'_>,
55 new_gtid: &GtidPartition,
56) -> Result<bool, TransientError> {
57 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
58
59 let query = event.query();
60 let current_schema = event.schema();
61 let mut is_complete_event = false;
62
63 let mut query_iter = query.split_ascii_whitespace();
67 let first = query_iter.next();
68 let second = query_iter.next();
69 match (
70 first.map(str::to_ascii_lowercase).as_deref(),
71 second.map(str::to_ascii_lowercase).as_deref(),
72 ) {
73 (Some("alter") | Some("rename"), Some("table")) => {
75 let table = table_ident(
76 query_iter.next().ok_or_else(|| {
77 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
78 })?,
79 ¤t_schema,
80 )?;
81 is_complete_event = true;
82 if ctx.table_info.contains_key(&table) {
83 trace!(%id, "timely-{worker_id} DDL change detected \
84 for {table:?}");
85 let info = &ctx.table_info[&table];
86 let mut conn = ctx
87 .connection_config
88 .connect(
89 &format!("timely-{worker_id} MySQL "),
90 &ctx.config.config.connection_context.ssh_tunnel_manager,
91 )
92 .await?;
93 for (err_output, err) in
94 verify_schemas(&mut *conn, btreemap! { &table => info }).await?
95 {
96 tracing::warn!(%id, "timely-{worker_id} DDL change \
97 verification error for {table:?}[{}]: {err:?}",
98 err_output.output_index);
99 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
100 let update = (
101 (err_output.output_index, Err(err.into())),
102 new_gtid.clone(),
103 Diff::ONE,
104 );
105 let size = update.fuel_size();
106 ctx.data_output.give_fueled(>id_cap, update, size).await;
107 ctx.errored_outputs.insert(err_output.output_index);
108 }
109 }
110 }
111 (Some("drop"), Some("table")) => {
114 let mut conn = ctx
115 .connection_config
116 .connect(
117 &format!("timely-{worker_id} MySQL "),
118 &ctx.config.config.connection_context.ssh_tunnel_manager,
119 )
120 .await?;
121 let expected = ctx
122 .table_info
123 .iter()
124 .map(|(t, info)| {
125 (
126 t,
127 info.iter()
128 .filter(|output| !ctx.errored_outputs.contains(&output.output_index)),
129 )
130 })
131 .collect();
132 let schema_errors = verify_schemas(&mut *conn, expected).await?;
133 is_complete_event = true;
134 for (dropped_output, err) in schema_errors {
135 tracing::info!(%id, "timely-{worker_id} DDL change \
136 dropped output: {dropped_output:?}: {err:?}");
137 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
138 let update = (
139 (dropped_output.output_index, Err(err.into())),
140 new_gtid.clone(),
141 Diff::ONE,
142 );
143 let size = std::mem::size_of_val(&update);
144 ctx.data_output.give_fueled(>id_cap, update, size).await;
145 ctx.errored_outputs.insert(dropped_output.output_index);
146 }
147 }
148 (Some("truncate"), Some(_)) => {
150 let second = second.expect("known to be Some");
152 let table = if second.eq_ignore_ascii_case("table") {
153 table_ident(
154 query_iter.next().ok_or_else(|| {
155 TransientError::Generic(anyhow::anyhow!("Invalid DDL query: {}", query))
156 })?,
157 ¤t_schema,
158 )?
159 } else {
160 table_ident(second, ¤t_schema)?
161 };
162 is_complete_event = true;
163 if ctx.table_info.contains_key(&table) {
164 trace!(%id, "timely-{worker_id} TRUNCATE detected \
165 for {table:?}");
166 if let Some(info) = ctx.table_info.get(&table) {
167 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
168 for output in info {
169 let update = (
170 (
171 output.output_index,
172 Err(DataflowError::from(DefiniteError::TableTruncated(
173 table.to_string(),
174 ))),
175 ),
176 new_gtid.clone(),
177 Diff::ONE,
178 );
179 let size = update.fuel_size();
180 ctx.data_output.give_fueled(>id_cap, update, size).await;
181 ctx.errored_outputs.insert(output.output_index);
182 }
183 }
184 }
185 }
186 (Some("commit"), None) => {
189 is_complete_event = true;
190 }
191 (Some("create"), Some("table")) => {
194 let mut peek_stream = query_iter.peekable();
198 let mut ctas = false;
199 while let Some(token) = peek_stream.next() {
200 if token.eq_ignore_ascii_case("start")
201 && peek_stream
202 .peek()
203 .is_some_and(|t| t.eq_ignore_ascii_case("transaction"))
204 {
205 ctas = true;
206 break;
207 }
208 }
209 is_complete_event = !ctas;
210 }
211 _ => {}
212 }
213
214 Ok(is_complete_event)
215}
216
217pub(super) async fn handle_rows_event(
223 event: RowsEventData<'_>,
224 ctx: &ReplContext<'_>,
225 new_gtid: &GtidPartition,
226 event_buffer: &mut Vec<(
227 (usize, Result<SourceMessage, DataflowError>),
228 GtidPartition,
229 mz_repr::Diff,
230 )>,
231) -> Result<(), TransientError> {
232 let (id, worker_id) = (ctx.config.id, ctx.config.worker_id);
233
234 let binlog_table_id = event.table_id();
236 let table_map_event = ctx
237 .stream
238 .get_ref()
239 .get_tme(binlog_table_id)
240 .ok_or_else(|| TransientError::Generic(anyhow::anyhow!("Table map event not found")))?;
241 let table = MySqlTableName::new(
242 &*table_map_event.database_name(),
243 &*table_map_event.table_name(),
244 );
245
246 let outputs = ctx.table_info.get(&table).map(|outputs| {
247 outputs
248 .into_iter()
249 .filter(|output| !ctx.errored_outputs.contains(&output.output_index))
250 .collect::<Vec<_>>()
251 });
252 let outputs = match outputs {
253 Some(outputs) => outputs,
254 None => {
255 return Ok(());
257 }
258 };
259
260 trace!(%id, "timely-{worker_id} handling RowsEvent for {table:?}");
261
262 let gtid_cap = ctx.data_cap_set.delayed(new_gtid);
264
265 let mut final_row = Row::default();
269 let mut rows_iter = event.rows(table_map_event);
270 let mut rewind_count = 0;
271 let mut additions = 0;
272 let mut retractions = 0;
273 while let Some(Ok((before_row, after_row))) = rows_iter.next() {
274 match (&before_row, &after_row) {
276 (None, None) => {}
277 (Some(_), Some(_)) => {
278 ctx.metrics.updates.inc();
279 }
280 (None, Some(_)) => {
281 ctx.metrics.inserts.inc();
282 }
283 (Some(_), None) => {
284 ctx.metrics.deletes.inc();
285 }
286 }
287
288 let updates = [
289 before_row.map(|r| (r, Diff::MINUS_ONE)),
290 after_row.map(|r| (r, Diff::ONE)),
291 ];
292 for (binlog_row, diff) in updates.into_iter().flatten() {
293 let row = mysql_async::Row::try_from(binlog_row)?;
294 for (output, row_val) in outputs.iter().repeat_clone(row) {
295 let event = match pack_mysql_row(&mut final_row, row_val, &output.desc) {
296 Ok(row) => Ok(SourceMessage {
297 key: Row::default(),
298 value: row,
299 metadata: Row::default(),
300 }),
301 Err(err @ MySqlError::ValueDecodeError { .. }) => Err(DataflowError::from(
303 DefiniteError::ValueDecodeError(err.to_string()),
304 )),
305 Err(err) => Err(err)?,
306 };
307
308 let data = (output.output_index, event);
309
310 if let Some((_rewind_data_cap, rewind_req)) = ctx.rewinds.get(&output.output_index)
312 {
313 if !rewind_req.snapshot_upper.less_equal(new_gtid) {
314 rewind_count += 1;
315 event_buffer.push((data.clone(), GtidPartition::minimum(), -diff));
316 }
317 }
318 if diff.is_positive() {
319 additions += 1;
320 } else {
321 retractions += 1;
322 }
323 let update = (data, new_gtid.clone(), diff);
324 let size = update.fuel_size();
325 ctx.data_output.give_fueled(>id_cap, update, size).await;
326 }
327 }
328 }
329
330 if !event_buffer.is_empty() {
337 for d in event_buffer.drain(..) {
338 let (rewind_data_cap, _) = ctx.rewinds.get(&d.0.0).unwrap();
339 let size = d.fuel_size();
340 ctx.data_output.give_fueled(rewind_data_cap, d, size).await;
341 }
342 }
343
344 trace!(
345 %id,
346 "timely-{worker_id} sent updates for {new_gtid:?} \
347 with {} updates ({} additions, {} retractions) and {} \
348 rewinds",
349 additions + retractions,
350 additions,
351 retractions,
352 rewind_count,
353 );
354
355 Ok(())
356}