mz_storage_types/sources/
mysql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to mysql sources
11
12use std::fmt;
13use std::io;
14use std::num::NonZeroU64;
15use std::sync::LazyLock;
16
17use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
18use mz_repr::CatalogItemId;
19use mz_repr::GlobalId;
20use mz_repr::{Datum, RelationDesc, Row, ScalarType};
21use mz_timely_util::order::Partitioned;
22use mz_timely_util::order::Step;
23use proptest::prelude::any;
24use proptest::strategy::Strategy;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27use timely::order::{PartialOrder, TotalOrder};
28use timely::progress::Antichain;
29use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
30use uuid::Uuid;
31
32use crate::AlterCompatible;
33use crate::connections::inline::{
34    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
35    ReferencedConnection,
36};
37use crate::controller::AlterError;
38use crate::sources::{SourceConnection, SourceTimestamp};
39
40use super::SourceExportDetails;
41
42include!(concat!(
43    env!("OUT_DIR"),
44    "/mz_storage_types.sources.mysql.rs"
45));
46
47#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
48pub struct MySqlSourceConnection<C: ConnectionAccess = InlinedConnection> {
49    pub connection_id: CatalogItemId,
50    pub connection: C::MySql,
51    pub details: MySqlSourceDetails,
52}
53
54impl<R: ConnectionResolver> IntoInlineConnection<MySqlSourceConnection, R>
55    for MySqlSourceConnection<ReferencedConnection>
56{
57    fn into_inline_connection(self, r: R) -> MySqlSourceConnection {
58        let MySqlSourceConnection {
59            connection_id,
60            connection,
61            details,
62        } = self;
63
64        MySqlSourceConnection {
65            connection_id,
66            connection: r.resolve_connection(connection).unwrap_mysql(),
67            details,
68        }
69    }
70}
71
72pub static MYSQL_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
73    RelationDesc::builder()
74        .with_column("source_id_lower", ScalarType::Uuid.nullable(false))
75        .with_column("source_id_upper", ScalarType::Uuid.nullable(false))
76        .with_column("transaction_id", ScalarType::UInt64.nullable(true))
77        .finish()
78});
79
80impl MySqlSourceConnection {
81    pub async fn fetch_write_frontier(
82        self,
83        storage_configuration: &crate::configuration::StorageConfiguration,
84    ) -> Result<timely::progress::Antichain<GtidPartition>, anyhow::Error> {
85        let config = self
86            .connection
87            .config(
88                &storage_configuration.connection_context.secrets_reader,
89                storage_configuration,
90                mz_ore::future::InTask::No,
91            )
92            .await?;
93
94        let mut conn = config
95            .connect(
96                "mysql fetch_write_frontier",
97                &storage_configuration.connection_context.ssh_tunnel_manager,
98            )
99            .await?;
100
101        let current_gtid_set =
102            mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
103
104        let current_upper = gtid_set_frontier(&current_gtid_set)?;
105
106        Ok(current_upper)
107    }
108}
109
110impl<C: ConnectionAccess> SourceConnection for MySqlSourceConnection<C> {
111    fn name(&self) -> &'static str {
112        "mysql"
113    }
114
115    fn external_reference(&self) -> Option<&str> {
116        None
117    }
118
119    fn default_key_desc(&self) -> RelationDesc {
120        RelationDesc::empty()
121    }
122
123    fn default_value_desc(&self) -> RelationDesc {
124        // The MySQL source only outputs data to its subsources. The catalog object
125        // representing the source itself is just an empty relation with no columns
126        RelationDesc::empty()
127    }
128
129    fn timestamp_desc(&self) -> RelationDesc {
130        MYSQL_PROGRESS_DESC.clone()
131    }
132
133    fn connection_id(&self) -> Option<CatalogItemId> {
134        Some(self.connection_id)
135    }
136
137    fn primary_export_details(&self) -> SourceExportDetails {
138        SourceExportDetails::None
139    }
140
141    fn supports_read_only(&self) -> bool {
142        false
143    }
144
145    fn prefers_single_replica(&self) -> bool {
146        true
147    }
148}
149
150impl<C: ConnectionAccess> AlterCompatible for MySqlSourceConnection<C> {
151    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
152        if self == other {
153            return Ok(());
154        }
155
156        let MySqlSourceConnection {
157            connection_id,
158            connection,
159            details,
160        } = self;
161
162        let compatibility_checks = [
163            (connection_id == &other.connection_id, "connection_id"),
164            (
165                connection.alter_compatible(id, &other.connection).is_ok(),
166                "connection",
167            ),
168            (
169                details.alter_compatible(id, &other.details).is_ok(),
170                "details",
171            ),
172        ];
173
174        for (compatible, field) in compatibility_checks {
175            if !compatible {
176                tracing::warn!(
177                    "MySqlSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
178                    self,
179                    other
180                );
181
182                return Err(AlterError { id });
183            }
184        }
185
186        Ok(())
187    }
188}
189
190impl RustType<ProtoMySqlSourceConnection> for MySqlSourceConnection {
191    fn into_proto(&self) -> ProtoMySqlSourceConnection {
192        ProtoMySqlSourceConnection {
193            connection: Some(self.connection.into_proto()),
194            connection_id: Some(self.connection_id.into_proto()),
195            details: Some(self.details.into_proto()),
196        }
197    }
198
199    fn from_proto(proto: ProtoMySqlSourceConnection) -> Result<Self, TryFromProtoError> {
200        Ok(MySqlSourceConnection {
201            connection: proto
202                .connection
203                .into_rust_if_some("ProtoMySqlSourceConnection::connection")?,
204            connection_id: proto
205                .connection_id
206                .into_rust_if_some("ProtoMySqlSourceConnection::connection_id")?,
207            details: proto
208                .details
209                .into_rust_if_some("ProtoMySqlSourceConnection::details")?,
210        })
211    }
212}
213
214/// This struct allows storing any mysql-specific details for a source, serialized as
215/// an option in the `CREATE SOURCE` statement. It was previously used but is not currently
216/// necessary, though we keep it around to maintain conformity with other sources.
217#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
218pub struct MySqlSourceDetails {}
219
220impl RustType<ProtoMySqlSourceDetails> for MySqlSourceDetails {
221    fn into_proto(&self) -> ProtoMySqlSourceDetails {
222        ProtoMySqlSourceDetails {}
223    }
224
225    fn from_proto(_proto: ProtoMySqlSourceDetails) -> Result<Self, TryFromProtoError> {
226        Ok(MySqlSourceDetails {})
227    }
228}
229
230impl AlterCompatible for MySqlSourceDetails {
231    fn alter_compatible(
232        &self,
233        _id: GlobalId,
234        _other: &Self,
235    ) -> Result<(), crate::controller::AlterError> {
236        Ok(())
237    }
238}
239
240fn any_gtidset() -> impl Strategy<Value = String> {
241    any::<(u128, u64)>().prop_map(|(uuid, tx_id)| format!("{}:{}", Uuid::from_u128(uuid), tx_id))
242}
243
244/// Specifies the details of a MySQL source export.
245#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
246pub struct MySqlSourceExportDetails {
247    pub table: mz_mysql_util::MySqlTableDesc,
248    /// The initial 'gtid_executed' set for this export.
249    /// This is used as the effective snapshot point for this export to ensure correctness
250    /// if the source is interrupted but commits one or more tables before the initial snapshot
251    /// of all tables is complete.
252    #[proptest(strategy = "any_gtidset()")]
253    pub initial_gtid_set: String,
254    pub text_columns: Vec<String>,
255    pub exclude_columns: Vec<String>,
256}
257
258impl RustType<ProtoMySqlSourceExportDetails> for MySqlSourceExportDetails {
259    fn into_proto(&self) -> ProtoMySqlSourceExportDetails {
260        ProtoMySqlSourceExportDetails {
261            table: Some(self.table.into_proto()),
262            initial_gtid_set: self.initial_gtid_set.clone(),
263            text_columns: self.text_columns.clone(),
264            exclude_columns: self.exclude_columns.clone(),
265        }
266    }
267
268    fn from_proto(proto: ProtoMySqlSourceExportDetails) -> Result<Self, TryFromProtoError> {
269        Ok(MySqlSourceExportDetails {
270            table: proto
271                .table
272                .into_rust_if_some("ProtoMySqlSourceExportDetails::table")?,
273            initial_gtid_set: proto.initial_gtid_set,
274            text_columns: proto.text_columns,
275            exclude_columns: proto.exclude_columns,
276        })
277    }
278}
279
280impl AlterCompatible for MySqlSourceExportDetails {
281    fn alter_compatible(
282        &self,
283        _id: GlobalId,
284        _other: &Self,
285    ) -> Result<(), crate::controller::AlterError> {
286        // compatibility checks are performed against the upstream table in the source
287        // render operators instead
288        let Self {
289            table: _,
290            initial_gtid_set: _,
291            text_columns: _,
292            exclude_columns: _,
293        } = self;
294        Ok(())
295    }
296}
297
298/// Represents a MySQL transaction id
299#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)]
300pub enum GtidState {
301    // NOTE: The ordering of the variants is important for the derived order implementation
302    /// Represents a MySQL server source-id that has not yet presented a GTID
303    Absent,
304
305    /// Represents an active MySQL server transaction-id for a given source.
306    ///
307    /// When used in a frontier / antichain, this represents the next transaction_id value that
308    /// we expect to see for the corresponding source_id(s).
309    /// When used as a timestamp, it represents an exact transaction_id value.
310    Active(NonZeroU64),
311}
312
313impl fmt::Display for GtidState {
314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
315        match self {
316            GtidState::Absent => write!(f, "Absent"),
317            GtidState::Active(id) => write!(f, "{}", id),
318        }
319    }
320}
321
322impl GtidState {
323    pub const MAX: GtidState = GtidState::Active(NonZeroU64::MAX);
324}
325
326impl Timestamp for GtidState {
327    // No need to describe complex summaries
328    type Summary = ();
329
330    fn minimum() -> Self {
331        GtidState::Absent
332    }
333}
334
335impl TotalOrder for GtidState {}
336
337impl PartialOrder for GtidState {
338    fn less_equal(&self, other: &Self) -> bool {
339        self <= other
340    }
341}
342
343impl PathSummary<GtidState> for () {
344    fn results_in(&self, src: &GtidState) -> Option<GtidState> {
345        Some(*src)
346    }
347
348    fn followed_by(&self, _other: &Self) -> Option<Self> {
349        Some(())
350    }
351}
352
353impl Refines<()> for GtidState {
354    fn to_inner(_other: ()) -> Self {
355        Self::minimum()
356    }
357
358    fn to_outer(self) -> () {}
359
360    fn summarize(_path: Self::Summary) -> <() as Timestamp>::Summary {}
361}
362
363/// This type is used to represent the progress of each MySQL GTID 'source_id' in the
364/// ingestion dataflow.
365///
366/// A MySQL GTID consists of a source_id (UUID) and transaction_id (non-zero u64).
367///
368/// For the purposes of this documentation effort, the term "source" refers to a MySQL
369/// server that is being replicated from:
370/// <https://dev.mysql.com/doc/refman/8.0/en/replication-gtids-concepts.html>
371///
372/// Each source_id represents a unique MySQL server, and the transaction_id
373/// monotonically increases for each source, representing the position of the transaction
374/// relative to other transactions on the same source.
375///
376/// Paritioining is by source_id which can be a singular UUID to represent a single source_id
377/// or a range of UUIDs to represent multiple sources.
378///
379/// The value of the partition is the NEXT transaction_id that we expect to see for the
380/// corresponding source(s), represented a GtidState::Next(transaction_id).
381///
382/// GtidState::Absent represents that no transactions have been seen for the
383/// corresponding source(s).
384///
385/// A complete Antichain of this type represents a frontier of the future transactions
386/// that we might see.
387pub type GtidPartition = Partitioned<Uuid, GtidState>;
388
389impl SourceTimestamp for GtidPartition {
390    fn encode_row(&self) -> Row {
391        let ts = match self.timestamp() {
392            GtidState::Absent => Datum::Null,
393            GtidState::Active(id) => Datum::UInt64(id.get()),
394        };
395        Row::pack(&[
396            Datum::Uuid(self.interval().lower),
397            Datum::Uuid(self.interval().upper),
398            ts,
399        ])
400    }
401
402    fn decode_row(row: &Row) -> Self {
403        let mut datums = row.iter();
404        match (datums.next(), datums.next(), datums.next(), datums.next()) {
405            (Some(Datum::Uuid(lower)), Some(Datum::Uuid(upper)), Some(Datum::UInt64(ts)), None) => {
406                match ts {
407                    0 => Partitioned::new_range(lower, upper, GtidState::Absent),
408                    ts => Partitioned::new_range(
409                        lower,
410                        upper,
411                        GtidState::Active(NonZeroU64::new(ts).unwrap()),
412                    ),
413                }
414            }
415            (Some(Datum::Uuid(lower)), Some(Datum::Uuid(upper)), Some(Datum::Null), None) => {
416                Partitioned::new_range(lower, upper, GtidState::Absent)
417            }
418            _ => panic!("invalid row {row:?}"),
419        }
420    }
421}
422
423/// Parses a GTID Set string received from a MySQL server (e.g. from @@gtid_purged or @@gtid_executed).
424///
425/// Returns the frontier of all future GTIDs that are not contained in the provided GTID set.
426///
427/// This includes singlular partitions that represent each UUID seen in the GTID Set, and range
428/// partitions that represent the missing UUIDs between the singular partitions, which are
429/// each set to GtidState::Absent.
430///
431/// TODO(roshan): Add compatibility for MySQL 8.3 'Tagged' GTIDs
432pub fn gtid_set_frontier(gtid_set_str: &str) -> Result<Antichain<GtidPartition>, io::Error> {
433    let mut partitions = Antichain::new();
434    let mut gap_lower = Some(Uuid::nil());
435    for mut gtid_str in gtid_set_str.split(',') {
436        if gtid_str.is_empty() {
437            continue;
438        };
439        gtid_str = gtid_str.trim();
440        let (uuid, intervals) = gtid_str.split_once(':').ok_or_else(|| {
441            std::io::Error::new(
442                std::io::ErrorKind::InvalidData,
443                format!("invalid gtid: {}", gtid_str),
444            )
445        })?;
446
447        let uuid = Uuid::parse_str(uuid).map_err(|e| {
448            io::Error::new(
449                io::ErrorKind::InvalidData,
450                format!("invalid uuid in gtid: {}: {}", uuid, e),
451            )
452        })?;
453
454        // From the MySQL docs:
455        // "When GTID sets are returned from server variables, UUIDs are in alphabetical order,
456        // and numeric intervals are merged and in ascending order."
457        // For our purposes, we need to ensure that all intervals are consecutive which means there
458        // should be at most one interval per GTID.
459        // TODO: should this same restriction be done when parsing a @@GTID_PURGED value? In that
460        // case the intervals might not be guaranteed to be consecutive, depending on how purging
461        // is implemented?
462        let mut intervals = intervals.split(':');
463        let end = match (intervals.next(), intervals.next()) {
464            (Some(interval_str), None) => {
465                let mut vals_iter = interval_str.split('-').map(str::parse::<u64>);
466                let start = vals_iter
467                    .next()
468                    .ok_or_else(|| {
469                        io::Error::new(
470                            io::ErrorKind::InvalidData,
471                            format!("couldn't parse int: {}", interval_str),
472                        )
473                    })?
474                    .map_err(|e| {
475                        io::Error::new(
476                            io::ErrorKind::InvalidData,
477                            format!("couldn't parse int: {}: {}", interval_str, e),
478                        )
479                    })?;
480                match vals_iter.next() {
481                    Some(Ok(end)) => end,
482                    None => start,
483                    _ => {
484                        return Err(io::Error::new(
485                            io::ErrorKind::InvalidData,
486                            format!("invalid gtid interval: {}", interval_str),
487                        ));
488                    }
489                }
490            }
491            _ => {
492                return Err(io::Error::new(
493                    io::ErrorKind::InvalidData,
494                    format!("gtid with non-consecutive intervals found! {}", gtid_str),
495                ));
496            }
497        };
498        // Create a partition representing all the UUIDs in the gap between the previous one and this one
499        if let Some(gap_upper) = uuid.backward_checked(1) {
500            let gap_lower = gap_lower.expect("uuids are in alphabetical order");
501            if gap_upper >= gap_lower {
502                partitions.insert(GtidPartition::new_range(
503                    gap_lower,
504                    gap_upper,
505                    GtidState::Absent,
506                ));
507            } else {
508                return Err(io::Error::new(
509                    io::ErrorKind::InvalidData,
510                    format!(
511                        "gtid set not presented in alphabetical uuid order: {}",
512                        gtid_set_str
513                    ),
514                ));
515            }
516        }
517        gap_lower = uuid.forward_checked(1);
518        // Insert a partition representing the 'next' GTID that might be seen from this source
519        partitions.insert(GtidPartition::new_singleton(
520            uuid,
521            GtidState::Active(NonZeroU64::new(end + 1).unwrap()),
522        ));
523    }
524
525    // Add the final range partition if there is a gap between the last partition and the maximum
526    if let Some(gap_lower) = gap_lower {
527        partitions.insert(GtidPartition::new_range(
528            gap_lower,
529            Uuid::max(),
530            GtidState::Absent,
531        ));
532    }
533
534    Ok(partitions)
535}
536
537#[cfg(test)]
538mod tests {
539
540    use mz_ore::assert_err;
541
542    use super::*;
543    use std::num::NonZeroU64;
544
545    #[mz_ore::test]
546    fn test_gtid_set_frontier_valid() {
547        let gtid_set_str = "14c1b43a-eb64-11eb-8a9a-0242ac130002:1, 2174B383-5441-11E8-B90A-C80AA9429562:1-3, 3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19";
548        let result = gtid_set_frontier(gtid_set_str).unwrap();
549        assert_eq!(result.len(), 7);
550        assert_eq!(
551            result,
552            Antichain::from_iter(vec![
553                GtidPartition::new_range(
554                    Uuid::nil(),
555                    Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130001").unwrap(),
556                    GtidState::Absent,
557                ),
558                GtidPartition::new_singleton(
559                    Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130002").unwrap(),
560                    GtidState::Active(NonZeroU64::new(2).unwrap()),
561                ),
562                GtidPartition::new_range(
563                    Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130003").unwrap(),
564                    Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429561").unwrap(),
565                    GtidState::Absent,
566                ),
567                GtidPartition::new_singleton(
568                    Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429562").unwrap(),
569                    GtidState::Active(NonZeroU64::new(4).unwrap()),
570                ),
571                GtidPartition::new_range(
572                    Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429563").unwrap(),
573                    Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429561").unwrap(),
574                    GtidState::Absent,
575                ),
576                GtidPartition::new_singleton(
577                    Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap(),
578                    GtidState::Active(NonZeroU64::new(20).unwrap()),
579                ),
580                GtidPartition::new_range(
581                    Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429563").unwrap(),
582                    Uuid::max(),
583                    GtidState::Absent,
584                ),
585            ]),
586        )
587    }
588
589    #[mz_ore::test]
590    fn test_gtid_set_frontier_non_alphabetical_uuids() {
591        let gtid_set_str =
592            "3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19, 2174B383-5441-11E8-B90A-C80AA9429562:1-3";
593        let result = gtid_set_frontier(gtid_set_str);
594        assert_err!(result);
595    }
596
597    #[mz_ore::test]
598    fn test_gtid_set_frontier_non_consecutive() {
599        let gtid_set_str = "2174B383-5441-11E8-B90A-C80AA9429562:1-3:5-8, 3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19";
600        let result = gtid_set_frontier(gtid_set_str);
601        assert_err!(result);
602    }
603
604    #[mz_ore::test]
605    fn test_gtid_set_frontier_invalid_uuid() {
606        let gtid_set_str =
607            "14c1b43a-eb64-11eb-8a9a-0242ac130002:1-5,24DA167-0C0C-11E8-8442-00059A3C7B00:1";
608        let result = gtid_set_frontier(gtid_set_str);
609        assert_err!(result);
610    }
611
612    #[mz_ore::test]
613    fn test_gtid_set_frontier_invalid_interval() {
614        let gtid_set_str =
615            "14c1b43a-eb64-11eb-8a9a-0242ac130002:1-5,14c1b43a-eb64-11eb-8a9a-0242ac130003:1-3:4";
616        let result = gtid_set_frontier(gtid_set_str);
617        assert_err!(result);
618    }
619
620    #[mz_ore::test]
621    fn test_gtid_set_frontier_empty_string() {
622        let gtid_set_str = "";
623        let result = gtid_set_frontier(gtid_set_str).unwrap();
624        assert_eq!(result.len(), 1);
625        assert_eq!(
626            result,
627            Antichain::from_elem(GtidPartition::new_range(
628                Uuid::nil(),
629                Uuid::max(),
630                GtidState::Absent,
631            ))
632        );
633    }
634}