use std::collections::BTreeMap;
use std::iter;
use std::sync::Arc;
use differential_dataflow::{collection, AsCollection, Collection};
use mz_ore::cast::CastLossy;
use mz_persist_client::operators::shard_source::SnapshotMode;
use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker};
use mz_storage_operators::persist_source;
use mz_storage_operators::persist_source::Subtime;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::dyncfgs;
use mz_storage_types::errors::{
DataflowError, DecodeError, EnvelopeError, UpsertError, UpsertNullKeyError, UpsertValueError,
};
use mz_storage_types::parameters::StorageMaxInflightBytesConfig;
use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope, UpsertEnvelope, UpsertStyle};
use mz_storage_types::sources::*;
use mz_timely_util::builder_async::PressOnDropButton;
use mz_timely_util::operator::CollectionExt;
use mz_timely_util::order::refine_antichain;
use serde::{Deserialize, Serialize};
use timely::container::CapacityContainerBuilder;
use timely::dataflow::operators::generic::operator::empty;
use timely::dataflow::operators::{Concat, ConnectLoop, Feedback, Leave, Map, OkErr};
use timely::dataflow::scopes::{Child, Scope};
use timely::dataflow::Stream;
use timely::progress::{Antichain, Timestamp};
use crate::decode::{render_decode_cdcv2, render_decode_delimited};
use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
use crate::upsert::UpsertKey;
pub fn render_source<'g, G, C>(
scope: &mut Child<'g, G, mz_repr::Timestamp>,
dataflow_debug_name: &String,
connection: C,
description: IngestionDescription<CollectionMetadata>,
resume_stream: &Stream<Child<'g, G, mz_repr::Timestamp>, ()>,
storage_state: &crate::storage_state::StorageState,
base_source_config: RawSourceCreationConfig,
) -> (
BTreeMap<
GlobalId,
(
Collection<Child<'g, G, mz_repr::Timestamp>, Row, Diff>,
Collection<Child<'g, G, mz_repr::Timestamp>, DataflowError, Diff>,
),
>,
Stream<G, HealthStatusMessage>,
Vec<PressOnDropButton>,
)
where
G: Scope<Timestamp = ()>,
C: SourceConnection + SourceRender + 'static,
{
let mut needed_tokens = Vec::new();
let (starter, mut start_signal) = tokio::sync::mpsc::channel::<()>(1);
let start_signal = async move {
let _ = start_signal.recv().await;
};
let (exports, mut health, source_tokens) = source::create_raw_source(
scope,
storage_state,
resume_stream,
base_source_config.clone(),
connection,
start_signal,
);
needed_tokens.extend(source_tokens);
let mut outputs = BTreeMap::new();
for (export_id, export) in exports {
type CB<C> = CapacityContainerBuilder<C>;
let (ok_stream, err_stream) =
export.map_fallible::<CB<_>, CB<_>, _, _, _>("export-demux-ok-err", |r| r);
let error_collections = vec![err_stream.map(DataflowError::from)];
let data_config = base_source_config.source_exports[&export_id]
.data_config
.clone();
let (ok, err, extra_tokens, health_stream) = render_source_stream(
scope,
dataflow_debug_name,
export_id,
ok_stream,
data_config,
description.clone(),
error_collections,
storage_state,
base_source_config.clone(),
starter.clone(),
);
needed_tokens.extend(extra_tokens);
outputs.insert(export_id, (ok, err));
health = health.concat(&health_stream.leave());
}
(outputs, health, needed_tokens)
}
fn render_source_stream<G, FromTime>(
scope: &mut G,
dataflow_debug_name: &String,
export_id: GlobalId,
ok_source: Collection<G, SourceOutput<FromTime>, Diff>,
data_config: SourceExportDataConfig,
description: IngestionDescription<CollectionMetadata>,
mut error_collections: Vec<Collection<G, DataflowError, Diff>>,
storage_state: &crate::storage_state::StorageState,
base_source_config: RawSourceCreationConfig,
rehydrated_token: impl std::any::Any + 'static,
) -> (
Collection<G, Row, Diff>,
Collection<G, DataflowError, Diff>,
Vec<PressOnDropButton>,
Stream<G, HealthStatusMessage>,
)
where
G: Scope<Timestamp = mz_repr::Timestamp>,
FromTime: Timestamp + Sync,
{
let mut needed_tokens = vec![];
let SourceExportDataConfig { encoding, envelope } = data_config;
let SourceDesc {
connection: _,
timestamp_interval: _,
primary_export: _,
primary_export_details: _,
} = description.desc;
let (decoded_stream, decode_health) = match encoding {
None => (
ok_source.map(|r| DecodeResult {
key: Some(Ok(r.key)),
value: Some(Ok(r.value)),
metadata: r.metadata,
from_time: r.from_time,
}),
empty(scope),
),
Some(encoding) => render_decode_delimited(
&ok_source,
encoding.key,
encoding.value,
dataflow_debug_name.clone(),
storage_state.metrics.decode_defs.clone(),
storage_state.storage_configuration.clone(),
),
};
let (envelope_ok, envelope_err, envelope_health) = match &envelope {
SourceEnvelope::Upsert(upsert_envelope) => {
let upsert_input = upsert_commands(decoded_stream, upsert_envelope.clone());
let persist_clients = Arc::clone(&storage_state.persist_clients);
let resume_upper = base_source_config.resume_uppers[&export_id].clone();
let upper_ts = resume_upper
.as_option()
.expect("resuming an already finished ingestion")
.clone();
let (upsert, health_update) = scope.scoped(
&format!("upsert_rehydration_backpressure({})", export_id),
|scope| {
let (previous, previous_token, feedback_handle, backpressure_metrics) = {
let as_of = Antichain::from_elem(upper_ts.saturating_sub(1));
let backpressure_max_inflight_bytes = get_backpressure_max_inflight_bytes(
&storage_state
.storage_configuration
.parameters
.storage_dataflow_max_inflight_bytes_config,
&storage_state.instance_context.cluster_memory_limit,
);
let (feedback_handle, flow_control, backpressure_metrics) =
if let Some(storage_dataflow_max_inflight_bytes) =
backpressure_max_inflight_bytes
{
tracing::info!(
?backpressure_max_inflight_bytes,
"timely-{} using backpressure in upsert for source {}",
base_source_config.worker_id,
export_id
);
if !storage_state
.storage_configuration
.parameters
.storage_dataflow_max_inflight_bytes_config
.disk_only
|| storage_state.instance_context.scratch_directory.is_some()
{
let (feedback_handle, feedback_data) =
scope.feedback(Default::default());
let backpressure_metrics = Some(
base_source_config
.metrics
.get_backpressure_metrics(export_id, scope.index()),
);
(
Some(feedback_handle),
Some(persist_source::FlowControl {
progress_stream: feedback_data,
max_inflight_bytes: storage_dataflow_max_inflight_bytes,
summary: (Default::default(), Subtime::least_summary()),
metrics: backpressure_metrics.clone(),
}),
backpressure_metrics,
)
} else {
(None, None, None)
}
} else {
(None, None, None)
};
let grace_period = dyncfgs::CLUSTER_SHUTDOWN_GRACE_PERIOD
.get(storage_state.storage_configuration.config_set());
let storage_metadata = description.source_exports[&export_id]
.storage_metadata
.clone();
let (stream, tok) = persist_source::persist_source_core(
scope,
export_id,
persist_clients,
storage_metadata,
None,
Some(as_of),
SnapshotMode::Include,
Antichain::new(),
None,
flow_control,
false.then_some(|| unreachable!()),
async {},
move |error| {
Box::pin(async move {
tokio::time::sleep(grace_period).await;
panic!("upsert_rehydration: {error}")
})
},
);
(
stream.as_collection(),
Some(tok),
feedback_handle,
backpressure_metrics,
)
};
let export_statistics = storage_state
.aggregated_statistics
.get_source(&export_id)
.expect("statistics initialized")
.clone();
let export_config = SourceExportCreationConfig {
id: export_id,
worker_id: base_source_config.worker_id,
metrics: base_source_config.metrics.clone(),
source_statistics: export_statistics,
};
let (upsert, health_update, snapshot_progress, upsert_token) =
crate::upsert::upsert(
&upsert_input.enter(scope),
upsert_envelope.clone(),
refine_antichain(&resume_upper),
previous,
previous_token,
export_config,
&storage_state.instance_context,
&storage_state.storage_configuration,
&storage_state.dataflow_parameters,
backpressure_metrics,
);
needed_tokens.push(upsert_token);
if dyncfgs::DELAY_SOURCES_PAST_REHYDRATION
.get(storage_state.storage_configuration.config_set())
{
crate::upsert::rehydration_finished(
scope.clone(),
&base_source_config,
rehydrated_token,
refine_antichain(&resume_upper),
&snapshot_progress,
);
} else {
drop(rehydrated_token)
};
let upsert = match feedback_handle {
Some(feedback_handle) => {
snapshot_progress.connect_loop(feedback_handle);
upsert
}
None => upsert,
};
(
upsert.leave(),
health_update
.map(|(id, update)| HealthStatusMessage {
id,
namespace: StatusNamespace::Upsert,
update,
})
.leave(),
)
},
);
let (upsert_ok, upsert_err) = upsert.inner.ok_err(split_ok_err);
(
upsert_ok.as_collection(),
Some(upsert_err.as_collection()),
health_update,
)
}
SourceEnvelope::None(none_envelope) => {
let results = append_metadata_to_value(decoded_stream);
let flattened_stream = flatten_results_prepend_keys(none_envelope, results);
let (stream, errors) = flattened_stream.inner.ok_err(split_ok_err);
let errors = errors.as_collection();
(stream.as_collection(), Some(errors), empty(scope))
}
SourceEnvelope::CdcV2 => {
let (oks, token) = render_decode_cdcv2(&decoded_stream);
needed_tokens.push(token);
(oks, None, empty(scope))
}
};
let (collection, errors, health) = (
envelope_ok,
envelope_err,
decode_health.concat(&envelope_health),
);
if let Some(errors) = errors {
error_collections.push(errors);
}
let err_collection = match error_collections.len() {
0 => Collection::empty(scope),
1 => error_collections.pop().unwrap(),
_ => collection::concatenate(scope, error_collections),
};
(collection, err_collection, needed_tokens, health)
}
fn get_backpressure_max_inflight_bytes(
inflight_bytes_config: &StorageMaxInflightBytesConfig,
cluster_memory_limit: &Option<usize>,
) -> Option<usize> {
let StorageMaxInflightBytesConfig {
max_inflight_bytes_default,
max_inflight_bytes_cluster_size_fraction,
disk_only: _,
} = inflight_bytes_config;
if max_inflight_bytes_default.is_some() {
let current_cluster_max_bytes_limit =
cluster_memory_limit.as_ref().and_then(|cluster_memory| {
max_inflight_bytes_cluster_size_fraction.map(|fraction| {
usize::cast_lossy(f64::cast_lossy(*cluster_memory) * fraction)
})
});
current_cluster_max_bytes_limit.or(*max_inflight_bytes_default)
} else {
None
}
}
fn split_ok_err<O, E, T, D>(x: (Result<O, E>, T, D)) -> Result<(O, T, D), (E, T, D)> {
match x {
(Ok(ok), ts, diff) => Ok((ok, ts, diff)),
(Err(err), ts, diff) => Err((err, ts, diff)),
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
struct KV {
key: Option<Result<Row, DecodeError>>,
val: Option<Result<Row, DecodeError>>,
}
fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
results: Collection<G, DecodeResult<FromTime>, Diff>,
) -> Collection<G, KV, Diff> {
results.map(move |res| {
let val = res.value.map(|val_result| {
val_result.map(|mut val| {
if !res.metadata.is_empty() {
RowPacker::for_existing_row(&mut val).extend_by_row(&res.metadata);
}
val
})
});
KV { val, key: res.key }
})
}
fn upsert_commands<G: Scope, FromTime: Timestamp>(
input: Collection<G, DecodeResult<FromTime>, Diff>,
upsert_envelope: UpsertEnvelope,
) -> Collection<G, (UpsertKey, Option<Result<Row, UpsertError>>, FromTime), Diff> {
let mut row_buf = Row::default();
input.map(move |result| {
let from_time = result.from_time;
let key = match result.key {
Some(Ok(key)) => Ok(key),
None => Err(UpsertError::NullKey(UpsertNullKeyError)),
Some(Err(err)) => Err(UpsertError::KeyDecode(err)),
};
let key = match key {
Ok(key) => key,
err @ Err(_) => match result.value {
Some(_) => return (UpsertKey::from_key(err.as_ref()), Some(err), from_time),
None => return (UpsertKey::from_key(err.as_ref()), None, from_time),
},
};
let key_row = match upsert_envelope.style {
UpsertStyle::Debezium { .. }
| UpsertStyle::Default(KeyEnvelope::Flattened)
| UpsertStyle::ValueErrInline {
key_envelope: KeyEnvelope::Flattened,
error_column: _,
} => key,
UpsertStyle::Default(KeyEnvelope::Named(_))
| UpsertStyle::ValueErrInline {
key_envelope: KeyEnvelope::Named(_),
error_column: _,
} => {
if key.iter().nth(1).is_none() {
key
} else {
row_buf.packer().push_list(key.iter());
row_buf.clone()
}
}
UpsertStyle::Default(KeyEnvelope::None)
| UpsertStyle::ValueErrInline {
key_envelope: KeyEnvelope::None,
error_column: _,
} => unreachable!(),
};
let key = UpsertKey::from_key(Ok(&key_row));
let metadata = result.metadata;
let value = match result.value {
Some(Ok(ref row)) => match upsert_envelope.style {
UpsertStyle::Debezium { after_idx } => match row.iter().nth(after_idx).unwrap() {
Datum::List(after) => {
let mut packer = row_buf.packer();
packer.extend(after.iter());
packer.extend_by_row(&metadata);
Some(Ok(row_buf.clone()))
}
Datum::Null => None,
d => panic!("type error: expected record, found {:?}", d),
},
UpsertStyle::Default(_) => {
let mut packer = row_buf.packer();
packer.extend_by_row(&key_row);
packer.extend_by_row(row);
packer.extend_by_row(&metadata);
Some(Ok(row_buf.clone()))
}
UpsertStyle::ValueErrInline { .. } => {
let mut packer = row_buf.packer();
packer.extend_by_row(&key_row);
packer.push(Datum::Null);
packer.extend_by_row(row);
packer.extend_by_row(&metadata);
Some(Ok(row_buf.clone()))
}
},
Some(Err(inner)) => {
match upsert_envelope.style {
UpsertStyle::ValueErrInline { .. } => {
let mut count = 0;
let err_string = inner.to_string();
let mut packer = row_buf.packer();
for datum in key_row.iter() {
packer.push(datum);
count += 1;
}
packer.push_list(iter::once(Datum::String(&err_string)));
count += 1;
let metadata_len = metadata.as_row_ref().iter().count();
packer.extend(
iter::repeat(Datum::Null)
.take(upsert_envelope.source_arity - count - metadata_len),
);
packer.extend_by_row(&metadata);
Some(Ok(row_buf.clone()))
}
_ => Some(Err(UpsertError::Value(UpsertValueError {
for_key: key_row,
inner,
}))),
}
}
None => None,
};
(key, value, from_time)
})
}
fn flatten_results_prepend_keys<G>(
none_envelope: &NoneEnvelope,
results: Collection<G, KV, Diff>,
) -> Collection<G, Result<Row, DataflowError>, Diff>
where
G: Scope,
{
let NoneEnvelope {
key_envelope,
key_arity,
} = none_envelope;
let null_key_columns = Row::pack_slice(&vec![Datum::Null; *key_arity]);
match key_envelope {
KeyEnvelope::None => {
results.flat_map(|KV { val, .. }| val.map(|result| result.map_err(Into::into)))
}
KeyEnvelope::Flattened => results
.flat_map(raise_key_value_errors)
.map(move |maybe_kv| {
maybe_kv.map(|(key, value)| {
let mut key = key.unwrap_or_else(|| null_key_columns.clone());
RowPacker::for_existing_row(&mut key).extend_by_row(&value);
key
})
}),
KeyEnvelope::Named(_) => {
results
.flat_map(raise_key_value_errors)
.map(move |maybe_kv| {
maybe_kv.map(|(key, value)| {
let mut key = key.unwrap_or_else(|| null_key_columns.clone());
let row = if key.iter().nth(1).is_none() {
RowPacker::for_existing_row(&mut key).extend_by_row(&value);
key
} else {
let mut new_row = Row::default();
let mut packer = new_row.packer();
packer.push_list(key.iter());
packer.extend_by_row(&value);
new_row
};
row
})
})
}
}
}
fn raise_key_value_errors(
KV { key, val }: KV,
) -> Option<Result<(Option<Row>, Row), DataflowError>> {
match (key, val) {
(Some(Ok(key)), Some(Ok(value))) => Some(Ok((Some(key), value))),
(None, Some(Ok(value))) => Some(Ok((None, value))),
(_, Some(Err(e))) => Some(Err(e.into())),
(Some(Err(e)), _) => Some(Err(e.into())),
(None, None) => None,
_ => Some(Err(DataflowError::from(EnvelopeError::Flat(
"Value not present for message".into(),
)))),
}
}
#[cfg(test)]
mod test {
use super::*;
#[mz_ore::test]
fn test_no_default() {
let config = StorageMaxInflightBytesConfig {
max_inflight_bytes_default: None,
max_inflight_bytes_cluster_size_fraction: Some(0.5),
disk_only: false,
};
let memory_limit = Some(1000);
let backpressure_inflight_bytes_limit =
get_backpressure_max_inflight_bytes(&config, &memory_limit);
assert_eq!(backpressure_inflight_bytes_limit, None)
}
#[mz_ore::test]
fn test_no_matching_size() {
let config = StorageMaxInflightBytesConfig {
max_inflight_bytes_default: Some(10000),
max_inflight_bytes_cluster_size_fraction: Some(0.5),
disk_only: false,
};
let backpressure_inflight_bytes_limit = get_backpressure_max_inflight_bytes(&config, &None);
assert_eq!(
backpressure_inflight_bytes_limit,
config.max_inflight_bytes_default
)
}
#[mz_ore::test]
fn test_calculated_cluster_limit() {
let config = StorageMaxInflightBytesConfig {
max_inflight_bytes_default: Some(10000),
max_inflight_bytes_cluster_size_fraction: Some(0.5),
disk_only: false,
};
let memory_limit = Some(2000);
let backpressure_inflight_bytes_limit =
get_backpressure_max_inflight_bytes(&config, &memory_limit);
assert_eq!(backpressure_inflight_bytes_limit, Some(1000));
}
}