use std::convert::Infallible;
use std::fmt;
use std::io;
use std::rc::Rc;
use mz_repr::Diff;
use mz_storage_types::errors::{DataflowError, SourceError};
use mz_storage_types::sources::IndexedSourceExport;
use mz_storage_types::sources::SourceExport;
use mz_timely_util::containers::stack::{AccountedStackBuilder, StackWrapper};
use serde::{Deserialize, Serialize};
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::{CapabilitySet, Concat, Map, ToStream};
use timely::dataflow::{Scope, Stream};
use timely::progress::Antichain;
use uuid::Uuid;
use mz_mysql_util::{
ensure_full_row_binlog_format, ensure_gtid_consistency, ensure_replication_commit_order,
MySqlError, MySqlTableDesc,
};
use mz_ore::error::ErrorExt;
use mz_storage_types::errors::SourceErrorDetails;
use mz_storage_types::sources::mysql::{gtid_set_frontier, GtidPartition, GtidState};
use mz_storage_types::sources::{MySqlSourceConnection, SourceExportDetails, SourceTimestamp};
use mz_timely_util::builder_async::{AsyncOutputHandle, PressOnDropButton};
use mz_timely_util::order::Extrema;
use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
use crate::source::types::Probe;
use crate::source::types::{ProgressStatisticsUpdate, SourceRender, StackedCollection};
use crate::source::{RawSourceCreationConfig, SourceMessage};
mod replication;
mod schemas;
mod snapshot;
mod statistics;
impl SourceRender for MySqlSourceConnection {
type Time = GtidPartition;
const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::MySql;
fn render<G: Scope<Timestamp = GtidPartition>>(
self,
scope: &mut G,
config: RawSourceCreationConfig,
resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
_start_signal: impl std::future::Future<Output = ()> + 'static,
) -> (
StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
Option<Stream<G, Infallible>>,
Stream<G, HealthStatusMessage>,
Stream<G, ProgressStatisticsUpdate>,
Option<Stream<G, Probe<GtidPartition>>>,
Vec<PressOnDropButton>,
) {
let mut source_outputs = Vec::new();
for (
id,
IndexedSourceExport {
ingestion_output,
export:
SourceExport {
details,
storage_metadata: _,
data_config: _,
},
},
) in &config.source_exports
{
let details = match details {
SourceExportDetails::MySql(details) => details,
SourceExportDetails::None => continue,
_ => panic!("unexpected source export details: {:?}", details),
};
let desc = details.table.clone();
let initial_gtid_set = details.initial_gtid_set.to_string();
let resume_upper = Antichain::from_iter(
config
.source_resume_uppers
.get(id)
.expect("missing resume upper")
.iter()
.map(GtidPartition::decode_row),
);
let name = MySqlTableName::new(&desc.schema_name, &desc.name);
source_outputs.push(SourceOutputInfo {
table_name: name.clone(),
output_index: *ingestion_output,
desc,
text_columns: details.text_columns.clone(),
exclude_columns: details.exclude_columns.clone(),
initial_gtid_set: gtid_set_frontier(&initial_gtid_set).expect("invalid gtid set"),
resume_upper,
});
}
let metrics = config.metrics.get_mysql_source_metrics(config.id);
let (snapshot_updates, rewinds, snapshot_stats, snapshot_err, snapshot_token) =
snapshot::render(
scope.clone(),
config.clone(),
self.clone(),
source_outputs.clone(),
metrics.snapshot_metrics.clone(),
);
let (repl_updates, uppers, repl_err, repl_token) = replication::render(
scope.clone(),
config.clone(),
self.clone(),
source_outputs,
&rewinds,
metrics,
);
let (stats_stream, stats_err, probe_stream, stats_token) =
statistics::render(scope.clone(), config, self, resume_uppers);
let stats_stream = stats_stream.concat(&snapshot_stats);
let updates = snapshot_updates.concat(&repl_updates);
let health_init = std::iter::once(HealthStatusMessage {
index: 0,
namespace: Self::STATUS_NAMESPACE,
update: HealthStatusUpdate::Running,
})
.to_stream(scope);
let health_errs = snapshot_err
.concat(&repl_err)
.concat(&stats_err)
.map(move |err| {
let err_string = err.display_with_causes().to_string();
let update = HealthStatusUpdate::halting(err_string.clone(), None);
let namespace = match err {
ReplicationError::Transient(err)
if matches!(&*err, TransientError::MySqlError(MySqlError::Ssh(_))) =>
{
StatusNamespace::Ssh
}
_ => Self::STATUS_NAMESPACE,
};
HealthStatusMessage {
index: 0,
namespace: namespace.clone(),
update,
}
});
let health = health_init.concat(&health_errs);
(
updates,
Some(uppers),
health,
stats_stream,
Some(probe_stream),
vec![snapshot_token, repl_token, stats_token],
)
}
}
#[derive(Clone, Debug)]
struct SourceOutputInfo {
output_index: usize,
table_name: MySqlTableName,
desc: MySqlTableDesc,
text_columns: Vec<String>,
exclude_columns: Vec<String>,
initial_gtid_set: Antichain<GtidPartition>,
resume_upper: Antichain<GtidPartition>,
}
#[derive(Clone, Debug, thiserror::Error)]
pub enum ReplicationError {
#[error(transparent)]
Transient(#[from] Rc<TransientError>),
#[error(transparent)]
Definite(#[from] Rc<DefiniteError>),
}
#[derive(Debug, thiserror::Error)]
pub enum TransientError {
#[error("couldn't decode binlog row")]
BinlogRowDecodeError(#[from] mysql_async::binlog::row::BinlogRowToRowError),
#[error("stream ended prematurely")]
ReplicationEOF,
#[error(transparent)]
IoError(#[from] io::Error),
#[error("sql client error")]
SQLClient(#[from] mysql_async::Error),
#[error("ident decode error")]
IdentError(#[from] mz_sql_parser::ast::IdentError),
#[error(transparent)]
MySqlError(#[from] MySqlError),
#[error(transparent)]
Generic(#[from] anyhow::Error),
}
#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
pub enum DefiniteError {
#[error("unable to decode: {0}")]
ValueDecodeError(String),
#[error("table was truncated: {0}")]
TableTruncated(String),
#[error("table was dropped: {0}")]
TableDropped(String),
#[error("incompatible schema change: {0}")]
IncompatibleSchema(String),
#[error("received a gtid set from the server that violates our requirements: {0}")]
UnsupportedGtidState(String),
#[error("received out of order gtids for source {0} at transaction-id {1}")]
BinlogGtidMonotonicityViolation(String, GtidState),
#[error("mysql server does not have the binlog available at the requested gtid set")]
BinlogNotAvailable,
#[error("mysql server binlog frontier at {0} is beyond required frontier {1}")]
BinlogMissingResumePoint(String, String),
#[error("mysql server configuration: {0}")]
ServerConfigurationError(String),
}
impl From<DefiniteError> for DataflowError {
fn from(err: DefiniteError) -> Self {
let m = err.to_string().into();
DataflowError::SourceError(Box::new(SourceError {
error: match &err {
DefiniteError::ValueDecodeError(_) => SourceErrorDetails::Other(m),
DefiniteError::TableTruncated(_) => SourceErrorDetails::Other(m),
DefiniteError::TableDropped(_) => SourceErrorDetails::Other(m),
DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
DefiniteError::UnsupportedGtidState(_) => SourceErrorDetails::Other(m),
DefiniteError::BinlogGtidMonotonicityViolation(_, _) => {
SourceErrorDetails::Other(m)
}
DefiniteError::BinlogNotAvailable => SourceErrorDetails::Initialization(m),
DefiniteError::BinlogMissingResumePoint(_, _) => {
SourceErrorDetails::Initialization(m)
}
DefiniteError::ServerConfigurationError(_) => SourceErrorDetails::Initialization(m),
},
}))
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)]
pub(crate) struct MySqlTableName(pub(crate) String, pub(crate) String);
impl MySqlTableName {
pub(crate) fn new(schema_name: &str, table_name: &str) -> Self {
Self(schema_name.to_string(), table_name.to_string())
}
}
impl fmt::Display for MySqlTableName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "`{}`.`{}`", self.0, self.1)
}
}
impl From<&MySqlTableDesc> for MySqlTableName {
fn from(desc: &MySqlTableDesc) -> Self {
Self::new(&desc.schema_name, &desc.name)
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct RewindRequest {
pub(crate) output_index: usize,
pub(crate) snapshot_upper: Antichain<GtidPartition>,
}
type StackedAsyncOutputHandle<T, D> = AsyncOutputHandle<
T,
AccountedStackBuilder<CapacityContainerBuilder<StackWrapper<(D, T, Diff)>>>,
Tee<T, StackWrapper<(D, T, Diff)>>,
>;
async fn return_definite_error(
err: DefiniteError,
outputs: &[usize],
data_handle: &StackedAsyncOutputHandle<
GtidPartition,
(usize, Result<SourceMessage, DataflowError>),
>,
data_cap_set: &CapabilitySet<GtidPartition>,
definite_error_handle: &AsyncOutputHandle<
GtidPartition,
CapacityContainerBuilder<Vec<ReplicationError>>,
Tee<GtidPartition, Vec<ReplicationError>>,
>,
definite_error_cap_set: &CapabilitySet<GtidPartition>,
) {
for output_index in outputs {
let update = (
(*output_index, Err(err.clone().into())),
GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
1,
);
data_handle.give_fueled(&data_cap_set[0], update).await;
}
definite_error_handle.give(
&definite_error_cap_set[0],
ReplicationError::Definite(Rc::new(err)),
);
()
}
async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
ensure_gtid_consistency(conn).await?;
ensure_full_row_binlog_format(conn).await?;
ensure_replication_commit_order(conn).await?;
Ok(())
}