use std::fmt;
use std::io;
use std::num::NonZeroU64;
use std::sync::LazyLock;
use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
use mz_repr::CatalogItemId;
use mz_repr::GlobalId;
use mz_repr::{Datum, RelationDesc, Row, ScalarType};
use mz_timely_util::order::Partitioned;
use mz_timely_util::order::Step;
use proptest::prelude::any;
use proptest::strategy::Strategy;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use timely::order::{PartialOrder, TotalOrder};
use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
use timely::progress::Antichain;
use uuid::Uuid;
use crate::connections::inline::{
ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
ReferencedConnection,
};
use crate::controller::AlterError;
use crate::sources::{SourceConnection, SourceTimestamp};
use crate::AlterCompatible;
use super::SourceExportDetails;
include!(concat!(
env!("OUT_DIR"),
"/mz_storage_types.sources.mysql.rs"
));
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct MySqlSourceConnection<C: ConnectionAccess = InlinedConnection> {
pub connection_id: CatalogItemId,
pub connection: C::MySql,
pub details: MySqlSourceDetails,
}
impl<R: ConnectionResolver> IntoInlineConnection<MySqlSourceConnection, R>
for MySqlSourceConnection<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> MySqlSourceConnection {
let MySqlSourceConnection {
connection_id,
connection,
details,
} = self;
MySqlSourceConnection {
connection_id,
connection: r.resolve_connection(connection).unwrap_mysql(),
details,
}
}
}
pub static MYSQL_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
RelationDesc::builder()
.with_column("source_id_lower", ScalarType::Uuid.nullable(false))
.with_column("source_id_upper", ScalarType::Uuid.nullable(false))
.with_column("transaction_id", ScalarType::UInt64.nullable(true))
.finish()
});
impl MySqlSourceConnection {
pub async fn fetch_write_frontier(
self,
storage_configuration: &crate::configuration::StorageConfiguration,
) -> Result<timely::progress::Antichain<GtidPartition>, anyhow::Error> {
let config = self
.connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
mz_ore::future::InTask::No,
)
.await?;
let mut conn = config
.connect(
"mysql fetch_write_frontier",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let current_gtid_set =
mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
let current_upper = gtid_set_frontier(¤t_gtid_set)?;
Ok(current_upper)
}
}
impl<C: ConnectionAccess> SourceConnection for MySqlSourceConnection<C> {
fn name(&self) -> &'static str {
"mysql"
}
fn external_reference(&self) -> Option<&str> {
None
}
fn default_key_desc(&self) -> RelationDesc {
RelationDesc::empty()
}
fn default_value_desc(&self) -> RelationDesc {
RelationDesc::empty()
}
fn timestamp_desc(&self) -> RelationDesc {
MYSQL_PROGRESS_DESC.clone()
}
fn connection_id(&self) -> Option<CatalogItemId> {
Some(self.connection_id)
}
fn primary_export_details(&self) -> SourceExportDetails {
SourceExportDetails::None
}
fn supports_read_only(&self) -> bool {
false
}
}
impl<C: ConnectionAccess> AlterCompatible for MySqlSourceConnection<C> {
fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
if self == other {
return Ok(());
}
let MySqlSourceConnection {
connection_id,
connection,
details,
} = self;
let compatibility_checks = [
(connection_id == &other.connection_id, "connection_id"),
(
connection.alter_compatible(id, &other.connection).is_ok(),
"connection",
),
(
details.alter_compatible(id, &other.details).is_ok(),
"details",
),
];
for (compatible, field) in compatibility_checks {
if !compatible {
tracing::warn!(
"MySqlSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
self,
other
);
return Err(AlterError { id });
}
}
Ok(())
}
}
impl RustType<ProtoMySqlSourceConnection> for MySqlSourceConnection {
fn into_proto(&self) -> ProtoMySqlSourceConnection {
ProtoMySqlSourceConnection {
connection: Some(self.connection.into_proto()),
connection_id: Some(self.connection_id.into_proto()),
details: Some(self.details.into_proto()),
}
}
fn from_proto(proto: ProtoMySqlSourceConnection) -> Result<Self, TryFromProtoError> {
Ok(MySqlSourceConnection {
connection: proto
.connection
.into_rust_if_some("ProtoMySqlSourceConnection::connection")?,
connection_id: proto
.connection_id
.into_rust_if_some("ProtoMySqlSourceConnection::connection_id")?,
details: proto
.details
.into_rust_if_some("ProtoMySqlSourceConnection::details")?,
})
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct MySqlSourceDetails {}
impl RustType<ProtoMySqlSourceDetails> for MySqlSourceDetails {
fn into_proto(&self) -> ProtoMySqlSourceDetails {
ProtoMySqlSourceDetails {}
}
fn from_proto(_proto: ProtoMySqlSourceDetails) -> Result<Self, TryFromProtoError> {
Ok(MySqlSourceDetails {})
}
}
impl AlterCompatible for MySqlSourceDetails {
fn alter_compatible(
&self,
_id: GlobalId,
_other: &Self,
) -> Result<(), crate::controller::AlterError> {
Ok(())
}
}
fn any_gtidset() -> impl Strategy<Value = String> {
any::<(u128, u64)>().prop_map(|(uuid, tx_id)| format!("{}:{}", Uuid::from_u128(uuid), tx_id))
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct MySqlSourceExportDetails {
pub table: mz_mysql_util::MySqlTableDesc,
#[proptest(strategy = "any_gtidset()")]
pub initial_gtid_set: String,
pub text_columns: Vec<String>,
pub exclude_columns: Vec<String>,
}
impl RustType<ProtoMySqlSourceExportDetails> for MySqlSourceExportDetails {
fn into_proto(&self) -> ProtoMySqlSourceExportDetails {
ProtoMySqlSourceExportDetails {
table: Some(self.table.into_proto()),
initial_gtid_set: self.initial_gtid_set.clone(),
text_columns: self.text_columns.clone(),
exclude_columns: self.exclude_columns.clone(),
}
}
fn from_proto(proto: ProtoMySqlSourceExportDetails) -> Result<Self, TryFromProtoError> {
Ok(MySqlSourceExportDetails {
table: proto
.table
.into_rust_if_some("ProtoMySqlSourceExportDetails::table")?,
initial_gtid_set: proto.initial_gtid_set,
text_columns: proto.text_columns,
exclude_columns: proto.exclude_columns,
})
}
}
impl AlterCompatible for MySqlSourceExportDetails {
fn alter_compatible(
&self,
_id: GlobalId,
_other: &Self,
) -> Result<(), crate::controller::AlterError> {
let Self {
table: _,
initial_gtid_set: _,
text_columns: _,
exclude_columns: _,
} = self;
Ok(())
}
}
#[derive(Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub enum GtidState {
Absent,
Active(NonZeroU64),
}
impl fmt::Display for GtidState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
GtidState::Absent => write!(f, "Absent"),
GtidState::Active(id) => write!(f, "{}", id),
}
}
}
impl GtidState {
pub const MAX: GtidState = GtidState::Active(NonZeroU64::MAX);
}
impl Timestamp for GtidState {
type Summary = ();
fn minimum() -> Self {
GtidState::Absent
}
}
impl TotalOrder for GtidState {}
impl PartialOrder for GtidState {
fn less_equal(&self, other: &Self) -> bool {
self <= other
}
}
impl PathSummary<GtidState> for () {
fn results_in(&self, src: &GtidState) -> Option<GtidState> {
Some(*src)
}
fn followed_by(&self, _other: &Self) -> Option<Self> {
Some(())
}
}
impl Refines<()> for GtidState {
fn to_inner(_other: ()) -> Self {
Self::minimum()
}
fn to_outer(self) -> () {}
fn summarize(_path: Self::Summary) -> <() as Timestamp>::Summary {}
}
pub type GtidPartition = Partitioned<Uuid, GtidState>;
impl SourceTimestamp for GtidPartition {
fn encode_row(&self) -> Row {
let ts = match self.timestamp() {
GtidState::Absent => Datum::Null,
GtidState::Active(id) => Datum::UInt64(id.get()),
};
Row::pack(&[
Datum::Uuid(self.interval().lower),
Datum::Uuid(self.interval().upper),
ts,
])
}
fn decode_row(row: &Row) -> Self {
let mut datums = row.iter();
match (datums.next(), datums.next(), datums.next(), datums.next()) {
(Some(Datum::Uuid(lower)), Some(Datum::Uuid(upper)), Some(Datum::UInt64(ts)), None) => {
match ts {
0 => Partitioned::new_range(lower, upper, GtidState::Absent),
ts => Partitioned::new_range(
lower,
upper,
GtidState::Active(NonZeroU64::new(ts).unwrap()),
),
}
}
(Some(Datum::Uuid(lower)), Some(Datum::Uuid(upper)), Some(Datum::Null), None) => {
Partitioned::new_range(lower, upper, GtidState::Absent)
}
_ => panic!("invalid row {row:?}"),
}
}
}
pub fn gtid_set_frontier(gtid_set_str: &str) -> Result<Antichain<GtidPartition>, io::Error> {
let mut partitions = Antichain::new();
let mut gap_lower = Some(Uuid::nil());
for mut gtid_str in gtid_set_str.split(',') {
if gtid_str.is_empty() {
continue;
};
gtid_str = gtid_str.trim();
let (uuid, intervals) = gtid_str.split_once(':').ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid gtid: {}", gtid_str),
)
})?;
let uuid = Uuid::parse_str(uuid).map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid uuid in gtid: {}: {}", uuid, e),
)
})?;
let mut intervals = intervals.split(':');
let end = match (intervals.next(), intervals.next()) {
(Some(interval_str), None) => {
let mut vals_iter = interval_str.split('-').map(str::parse::<u64>);
let start = vals_iter
.next()
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("couldn't parse int: {}", interval_str),
)
})?
.map_err(|e| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("couldn't parse int: {}: {}", interval_str, e),
)
})?;
match vals_iter.next() {
Some(Ok(end)) => end,
None => start,
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid gtid interval: {}", interval_str),
))
}
}
}
_ => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("gtid with non-consecutive intervals found! {}", gtid_str),
))
}
};
if let Some(gap_upper) = uuid.backward_checked(1) {
let gap_lower = gap_lower.expect("uuids are in alphabetical order");
if gap_upper >= gap_lower {
partitions.insert(GtidPartition::new_range(
gap_lower,
gap_upper,
GtidState::Absent,
));
} else {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"gtid set not presented in alphabetical uuid order: {}",
gtid_set_str
),
));
}
}
gap_lower = uuid.forward_checked(1);
partitions.insert(GtidPartition::new_singleton(
uuid,
GtidState::Active(NonZeroU64::new(end + 1).unwrap()),
));
}
if let Some(gap_lower) = gap_lower {
partitions.insert(GtidPartition::new_range(
gap_lower,
Uuid::max(),
GtidState::Absent,
));
}
Ok(partitions)
}
#[cfg(test)]
mod tests {
use mz_ore::assert_err;
use super::*;
use std::num::NonZeroU64;
#[mz_ore::test]
fn test_gtid_set_frontier_valid() {
let gtid_set_str =
"14c1b43a-eb64-11eb-8a9a-0242ac130002:1, 2174B383-5441-11E8-B90A-C80AA9429562:1-3, 3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19";
let result = gtid_set_frontier(gtid_set_str).unwrap();
assert_eq!(result.len(), 7);
assert_eq!(
result,
Antichain::from_iter(vec![
GtidPartition::new_range(
Uuid::nil(),
Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130001").unwrap(),
GtidState::Absent,
),
GtidPartition::new_singleton(
Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130002").unwrap(),
GtidState::Active(NonZeroU64::new(2).unwrap()),
),
GtidPartition::new_range(
Uuid::parse_str("14c1b43a-eb64-11eb-8a9a-0242ac130003").unwrap(),
Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429561").unwrap(),
GtidState::Absent,
),
GtidPartition::new_singleton(
Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429562").unwrap(),
GtidState::Active(NonZeroU64::new(4).unwrap()),
),
GtidPartition::new_range(
Uuid::parse_str("2174B383-5441-11E8-B90A-C80AA9429563").unwrap(),
Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429561").unwrap(),
GtidState::Absent,
),
GtidPartition::new_singleton(
Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562").unwrap(),
GtidState::Active(NonZeroU64::new(20).unwrap()),
),
GtidPartition::new_range(
Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429563").unwrap(),
Uuid::max(),
GtidState::Absent,
),
]),
)
}
#[mz_ore::test]
fn test_gtid_set_frontier_non_alphabetical_uuids() {
let gtid_set_str =
"3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19, 2174B383-5441-11E8-B90A-C80AA9429562:1-3";
let result = gtid_set_frontier(gtid_set_str);
assert_err!(result);
}
#[mz_ore::test]
fn test_gtid_set_frontier_non_consecutive() {
let gtid_set_str =
"2174B383-5441-11E8-B90A-C80AA9429562:1-3:5-8, 3E11FA47-71CA-11E1-9E33-C80AA9429562:1-19";
let result = gtid_set_frontier(gtid_set_str);
assert_err!(result);
}
#[mz_ore::test]
fn test_gtid_set_frontier_invalid_uuid() {
let gtid_set_str =
"14c1b43a-eb64-11eb-8a9a-0242ac130002:1-5,24DA167-0C0C-11E8-8442-00059A3C7B00:1";
let result = gtid_set_frontier(gtid_set_str);
assert_err!(result);
}
#[mz_ore::test]
fn test_gtid_set_frontier_invalid_interval() {
let gtid_set_str =
"14c1b43a-eb64-11eb-8a9a-0242ac130002:1-5,14c1b43a-eb64-11eb-8a9a-0242ac130003:1-3:4";
let result = gtid_set_frontier(gtid_set_str);
assert_err!(result);
}
#[mz_ore::test]
fn test_gtid_set_frontier_empty_string() {
let gtid_set_str = "";
let result = gtid_set_frontier(gtid_set_str).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(
result,
Antichain::from_elem(GtidPartition::new_range(
Uuid::nil(),
Uuid::max(),
GtidState::Absent,
))
);
}
}