1use std::fmt;
13use std::io;
14use std::num::NonZeroU64;
15use std::sync::LazyLock;
16
17use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
18use mz_repr::CatalogItemId;
19use mz_repr::GlobalId;
20use mz_repr::{Datum, RelationDesc, Row, ScalarType};
21use mz_timely_util::order::Partitioned;
22use mz_timely_util::order::Step;
23use proptest::prelude::any;
24use proptest::strategy::Strategy;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use timely::order::{PartialOrder, TotalOrder};
28use timely::progress::Antichain;
29use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
30use uuid::Uuid;
31
32use crate::AlterCompatible;
33use crate::connections::inline::{
34 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
35 ReferencedConnection,
36};
37use crate::controller::AlterError;
38use crate::sources::{SourceConnection, SourceTimestamp};
39
40use super::SourceExportDetails;
41
42include!(concat!(
43 env!("OUT_DIR"),
44 "/mz_storage_types.sources.mysql.rs"
45));
46
47#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
48pub struct MySqlSourceConnection<C: ConnectionAccess = InlinedConnection> {
49 pub connection_id: CatalogItemId,
50 pub connection: C::MySql,
51 pub details: MySqlSourceDetails,
52}
53
54impl<R: ConnectionResolver> IntoInlineConnection<MySqlSourceConnection, R>
55 for MySqlSourceConnection<ReferencedConnection>
56{
57 fn into_inline_connection(self, r: R) -> MySqlSourceConnection {
58 let MySqlSourceConnection {
59 connection_id,
60 connection,
61 details,
62 } = self;
63
64 MySqlSourceConnection {
65 connection_id,
66 connection: r.resolve_connection(connection).unwrap_mysql(),
67 details,
68 }
69 }
70}
71
72pub static MYSQL_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
73 RelationDesc::builder()
74 .with_column("source_id_lower", ScalarType::Uuid.nullable(false))
75 .with_column("source_id_upper", ScalarType::Uuid.nullable(false))
76 .with_column("transaction_id", ScalarType::UInt64.nullable(true))
77 .finish()
78});
79
80impl MySqlSourceConnection {
81 pub async fn fetch_write_frontier(
82 self,
83 storage_configuration: &crate::configuration::StorageConfiguration,
84 ) -> Result<timely::progress::Antichain<GtidPartition>, anyhow::Error> {
85 let config = self
86 .connection
87 .config(
88 &storage_configuration.connection_context.secrets_reader,
89 storage_configuration,
90 mz_ore::future::InTask::No,
91 )
92 .await?;
93
94 let mut conn = config
95 .connect(
96 "mysql fetch_write_frontier",
97 &storage_configuration.connection_context.ssh_tunnel_manager,
98 )
99 .await?;
100
101 let current_gtid_set =
102 mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
103
104 let current_upper = gtid_set_frontier(¤t_gtid_set)?;
105
106 Ok(current_upper)
107 }
108}
109
110impl<C: ConnectionAccess> SourceConnection for MySqlSourceConnection<C> {
111 fn name(&self) -> &'static str {
112 "mysql"
113 }
114
115 fn external_reference(&self) -> Option<&str> {
116 None
117 }
118
119 fn default_key_desc(&self) -> RelationDesc {
120 RelationDesc::empty()
121 }
122
123 fn default_value_desc(&self) -> RelationDesc {
124 RelationDesc::empty()
127 }
128
129 fn timestamp_desc(&self) -> RelationDesc {
130 MYSQL_PROGRESS_DESC.clone()
131 }
132
133 fn connection_id(&self) -> Option<CatalogItemId> {
134 Some(self.connection_id)
135 }
136
137 fn primary_export_details(&self) -> SourceExportDetails {
138 SourceExportDetails::None
139 }
140
141 fn supports_read_only(&self) -> bool {
142 false
143 }
144
145 fn prefers_single_replica(&self) -> bool {
146 true
147 }
148}
149
150impl<C: ConnectionAccess> AlterCompatible for MySqlSourceConnection<C> {
151 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
152 if self == other {
153 return Ok(());
154 }
155
156 let MySqlSourceConnection {
157 connection_id,
158 connection,
159 details,
160 } = self;
161
162 let compatibility_checks = [
163 (connection_id == &other.connection_id, "connection_id"),
164 (
165 connection.alter_compatible(id, &other.connection).is_ok(),
166 "connection",
167 ),
168 (
169 details.alter_compatible(id, &other.details).is_ok(),
170 "details",
171 ),
172 ];
173
174 for (compatible, field) in compatibility_checks {
175 if !compatible {
176 tracing::warn!(
177 "MySqlSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
178 self,
179 other
180 );
181
182 return Err(AlterError { id });
183 }
184 }
185
186 Ok(())
187 }
188}
189
190impl RustType<ProtoMySqlSourceConnection> for MySqlSourceConnection {
191 fn into_proto(&self) -> ProtoMySqlSourceConnection {
192 ProtoMySqlSourceConnection {
193 connection: Some(self.connection.into_proto()),
194 connection_id: Some(self.connection_id.into_proto()),
195 details: Some(self.details.into_proto()),
196 }
197 }
198
199 fn from_proto(proto: ProtoMySqlSourceConnection) -> Result<Self, TryFromProtoError> {
200 Ok(MySqlSourceConnection {
201 connection: proto
202 .connection
203 .into_rust_if_some("ProtoMySqlSourceConnection::connection")?,
204 connection_id: proto
205 .connection_id
206 .into_rust_if_some("ProtoMySqlSourceConnection::connection_id")?,
207 details: proto
208 .details
209 .into_rust_if_some("ProtoMySqlSourceConnection::details")?,
210 })
211 }
212}
213
214#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
218pub struct MySqlSourceDetails {}
219
220impl RustType<ProtoMySqlSourceDetails> for MySqlSourceDetails {
221 fn into_proto(&self) -> ProtoMySqlSourceDetails {
222 ProtoMySqlSourceDetails {}
223 }
224
225 fn from_proto(_proto: ProtoMySqlSourceDetails) -> Result<Self, TryFromProtoError> {
226 Ok(MySqlSourceDetails {})
227 }
228}
229
230impl AlterCompatible for MySqlSourceDetails {
231 fn alter_compatible(
232 &self,
233 _id: GlobalId,
234 _other: &Self,
235 ) -> Result<(), crate::controller::AlterError> {
236 Ok(())
237 }
238}
239
240fn any_gtidset() -> impl Strategy<Value = String> {
241 any::<(u128, u64)>().prop_map(|(uuid, tx_id)| format!("{}:{}", Uuid::from_u128(uuid), tx_id))
242}
243
244#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
246pub struct MySqlSourceExportDetails {
247 pub table: mz_mysql_util::MySqlTableDesc,
248 #[proptest(strategy = "any_gtidset()")]
253 pub initial_gtid_set: String,
254 pub text_columns: Vec<String>,
255 pub exclude_columns: Vec<String>,
256}
257
258impl RustType<ProtoMySqlSourceExportDetails> for MySqlSourceExportDetails {
259 fn into_proto(&self) -> ProtoMySqlSourceExportDetails {
260 ProtoMySqlSourceExportDetails {
261 table: Some(self.table.into_proto()),
262 initial_gtid_set: self.initial_gtid_set.clone(),
263 text_columns: self.text_columns.clone(),
264 exclude_columns: self.exclude_columns.clone(),
265 }
266 }
267
268 fn from_proto(proto: ProtoMySqlSourceExportDetails) -> Result<Self, TryFromProtoError> {
269 Ok(MySqlSourceExportDetails {
270 table: proto
271 .table
272 .into_rust_if_some("ProtoMySqlSourceExportDetails::table")?,
273 initial_gtid_set: proto.initial_gtid_set,
274 text_columns: proto.text_columns,
275 exclude_columns: proto.exclude_columns,
276 })
277 }
278}
279
280impl AlterCompatible for MySqlSourceExportDetails {
281 fn alter_compatible(
282 &self,
283 _id: GlobalId,
284 _other: &Self,
285 ) -> Result<(), crate::controller::AlterError> {
286 let Self {
289 table: _,
290 initial_gtid_set: _,
291 text_columns: _,
292 exclude_columns: _,
293 } = self;
294 Ok(())
295 }
296}
297
298#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)]
300pub enum GtidState {
301 Absent,
304
305 Active(NonZeroU64),
311}
312
313impl fmt::Display for GtidState {
314 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315 match self {
316 GtidState::Absent => write!(f, "Absent"),
317 GtidState::Active(id) => write!(f, "{}", id),
318 }
319 }
320}
321
322impl GtidState {
323 pub const MAX: GtidState = GtidState::Active(NonZeroU64::MAX);
324}
325
326impl Timestamp for GtidState {
327 type Summary = ();
329
330 fn minimum() -> Self {
331 GtidState::Absent
332 }
333}
334
335impl TotalOrder for GtidState {}
336
337impl PartialOrder for GtidState {
338 fn less_equal(&self, other: &Self) -> bool {
339 self <= other
340 }
341}
342
343impl PathSummary<GtidState> for () {
344 fn results_in(&self, src: &GtidState) -> Option<GtidState> {
345 Some(*src)
346 }
347
348 fn followed_by(&self, _other: &Self) -> Option<Self> {
349 Some(())
350 }
351}
352
353impl Refines<()> for GtidState {
354 fn to_inner(_other: ()) -> Self {
355 Self::minimum()
356 }
357
358 fn to_outer(self) -> () {}
359
360 fn summarize(_path: Self::Summary) -> <() as Timestamp>::Summary {}
361}
362
363pub type GtidPartition = Partitioned<Uuid, GtidState>;
388
389impl SourceTimestamp for GtidPartition {
390 fn encode_row(&self) -> Row {
391 let ts = match self.timestamp() {
392 GtidState::Absent => Datum::Null,
393 GtidState::Active(id) => Datum::UInt64(id.get()),
394 };
395 Row::pack(&[
396 Datum::Uuid(self.interval().lower),
397 Datum::Uuid(self.interval().upper),
398 ts,
399 ])
400 }
401
402 fn decode_row(row: &Row) -> Self {
403 let mut datums = row.iter();
404 match (datums.next(), datums.next(), datums.next(), datums.next()) {
405 (Some(Datum::Uuid(lower)), Some(Datum::Uuid(upper)), Some(Datum::UInt64(ts)), None) => {
406 match ts {
407 0 => Partitioned::new_range(lower, upper, GtidState::Absent),
408 ts => Partitioned::new_range(
409 lower,
410 upper,
411 GtidState::Active(NonZeroU64::new(ts).unwrap()),
412 ),
413 }
414 }
415 (Some(Datum::Uuid(lower)), Some(Datum::Uuid(upper)), Some(Datum::Null), None) => {
416 Partitioned::new_range(lower, upper, GtidState::Absent)
417 }
418 _ => panic!("invalid row {row:?}"),
419 }
420 }
421}
422
423pub fn gtid_set_frontier(gtid_set_str: &str) -> Result<Antichain<GtidPartition>, io::Error> {
433 let mut partitions = Antichain::new();
434 let mut gap_lower = Some(Uuid::nil());
435 for mut gtid_str in gtid_set_str.split(',') {
436 if gtid_str.is_empty() {
437 continue;
438 };
439 gtid_str = gtid_str.trim();
440 let (uuid, intervals) = gtid_str.split_once(':').ok_or_else(|| {
441 std::io::Error::new(
442 std::io::ErrorKind::InvalidData,
443 format!("invalid gtid: {}", gtid_str),
444 )
445 })?;
446
447 let uuid = Uuid::parse_str(uuid).map_err(|e| {
448 io::Error::new(
449 io::ErrorKind::InvalidData,
450 format!("invalid uuid in gtid: {}: {}", uuid, e),
451 )
452 })?;
453
454 let mut intervals = intervals.split(':');
463 let end = match (intervals.next(), intervals.next()) {
464 (Some(interval_str), None) => {
465 let mut vals_iter = interval_str.split('-').map(str::parse::<u64>);
466 let start = vals_iter
467 .next()
468 .ok_or_else(|| {
469 io::Error::new(
470 io::ErrorKind::InvalidData,
471 format!("couldn't parse int: {}", interval_str),
472 )
473 })?
474 .map_err(|e| {
475 io::Error::new(
476 io::ErrorKind::InvalidData,
477 format!("couldn't parse int: {}: {}", interval_str, e),
478 )
479 })?;
480 match vals_iter.next() {
481 Some(Ok(end)) => end,
482 None => start,
483 _ => {
484 return Err(io::Error::new(
485 io::ErrorKind::InvalidData,
486 format!("invalid gtid interval: {}", interval_str),
487 ));
488 }
489 }
490 }
491 _ => {
492 return Err(io::Error::new(
493 io::ErrorKind::InvalidData,
494 format!("gtid with non-consecutive intervals found! {}", gtid_str),
495 ));
496 }
497 };
498 if let Some(gap_upper) = uuid.backward_checked(1) {
500 let gap_lower = gap_lower.expect("uuids are in alphabetical order");
501 if gap_upper >= gap_lower {
502 partitions.insert(GtidPartition::new_range(
503 gap_lower,
504 gap_upper,
505 GtidState::Absent,
506 ));
507 } else {
508 return Err(io::Error::new(
509 io::ErrorKind::InvalidData,
510 format!(
511 "gtid set not presented in alphabetical uuid order: {}",
512 gtid_set_str
513 ),
514 ));
515 }
516 }
517 gap_lower = uuid.forward_checked(1);
518 partitions.insert(GtidPartition::new_singleton(
520 uuid,
521 GtidState::Active(NonZeroU64::new(end + 1).unwrap()),
522 ));
523 }
524
525 if let Some(gap_lower) = gap_lower {
527 partitions.insert(GtidPartition::new_range(
528 gap_lower,
529 Uuid::max(),
530 GtidState::Absent,
531 ));
532 }
533
534 Ok(partitions)
535}
536
537#[cfg(test)]
538mod tests {
539
540 use mz_ore::assert_err;
541
542 use super::*;
543 use std::num::NonZeroU64;
544
545 #[mz_ore::test]
546 fn test_gtid_set_frontier_valid() {
547 let gtid_set_str = "14c1b43a-eb64-11eb-8a9a-0242ac130002:1, 2174B383-5441-11E8-B90A-C80AA9429562:1-3, 3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19";
548 let result = gtid_set_frontier(gtid_set_str).unwrap();
549 assert_eq!(result.len(), 7);
550 assert_eq!(
551 result,
552 Antichain::from_iter(vec![
553 GtidPartition::new_range(
554 Uuid::nil(),
555 Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130001").unwrap(),
556 GtidState::Absent,
557 ),
558 GtidPartition::new_singleton(
559 Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130002").unwrap(),
560 GtidState::Active(NonZeroU64::new(2).unwrap()),
561 ),
562 GtidPartition::new_range(
563 Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130003").unwrap(),
564 Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429561").unwrap(),
565 GtidState::Absent,
566 ),
567 GtidPartition::new_singleton(
568 Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429562").unwrap(),
569 GtidState::Active(NonZeroU64::new(4).unwrap()),
570 ),
571 GtidPartition::new_range(
572 Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429563").unwrap(),
573 Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429561").unwrap(),
574 GtidState::Absent,
575 ),
576 GtidPartition::new_singleton(
577 Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap(),
578 GtidState::Active(NonZeroU64::new(20).unwrap()),
579 ),
580 GtidPartition::new_range(
581 Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429563").unwrap(),
582 Uuid::max(),
583 GtidState::Absent,
584 ),
585 ]),
586 )
587 }
588
589 #[mz_ore::test]
590 fn test_gtid_set_frontier_non_alphabetical_uuids() {
591 let gtid_set_str =
592 "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19, 2174B383-5441-11E8-B90A-C80AA9429562:1-3";
593 let result = gtid_set_frontier(gtid_set_str);
594 assert_err!(result);
595 }
596
597 #[mz_ore::test]
598 fn test_gtid_set_frontier_non_consecutive() {
599 let gtid_set_str = "2174B383-5441-11E8-B90A-C80AA9429562:1-3:5-8, 3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19";
600 let result = gtid_set_frontier(gtid_set_str);
601 assert_err!(result);
602 }
603
604 #[mz_ore::test]
605 fn test_gtid_set_frontier_invalid_uuid() {
606 let gtid_set_str =
607 "14c1b43a-eb64-11eb-8a9a-0242ac130002:1-5,24DA167-0C0C-11E8-8442-00059A3C7B00:1";
608 let result = gtid_set_frontier(gtid_set_str);
609 assert_err!(result);
610 }
611
612 #[mz_ore::test]
613 fn test_gtid_set_frontier_invalid_interval() {
614 let gtid_set_str =
615 "14c1b43a-eb64-11eb-8a9a-0242ac130002:1-5,14c1b43a-eb64-11eb-8a9a-0242ac130003:1-3:4";
616 let result = gtid_set_frontier(gtid_set_str);
617 assert_err!(result);
618 }
619
620 #[mz_ore::test]
621 fn test_gtid_set_frontier_empty_string() {
622 let gtid_set_str = "";
623 let result = gtid_set_frontier(gtid_set_str).unwrap();
624 assert_eq!(result.len(), 1);
625 assert_eq!(
626 result,
627 Antichain::from_elem(GtidPartition::new_range(
628 Uuid::nil(),
629 Uuid::max(),
630 GtidState::Absent,
631 ))
632 );
633 }
634}