use std::collections::{BTreeMap, BTreeSet};
use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr};
use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
use mz_repr::{GlobalId, RelationType};
use mz_storage_client::controller::CollectionMetadata;
use proptest::prelude::{any, Arbitrary};
use proptest::strategy::{BoxedStrategy, Strategy};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use timely::progress::Antichain;
use crate::plan::Plan;
use crate::types::dataflows::proto_dataflow_description::{
ProtoIndexExport, ProtoIndexImport, ProtoSinkExport, ProtoSourceImport,
};
use crate::types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
use crate::types::sources::{SourceInstanceArguments, SourceInstanceDesc};
include!(concat!(
env!("OUT_DIR"),
"/mz_compute_client.types.dataflows.rs"
));
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
pub source_imports: BTreeMap<GlobalId, (SourceInstanceDesc<S>, bool)>,
pub index_imports: BTreeMap<GlobalId, IndexImport>,
pub objects_to_build: Vec<BuildDesc<P>>,
pub index_exports: BTreeMap<GlobalId, (IndexDesc, RelationType)>,
pub sink_exports: BTreeMap<GlobalId, ComputeSinkDesc<S, T>>,
pub as_of: Option<Antichain<T>>,
pub until: Antichain<T>,
pub debug_name: String,
}
impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
pub fn is_single_time(&self) -> bool {
let Some(as_of) = self.as_of.as_ref() else {
return false;
};
!as_of.is_empty()
&& as_of
.as_option()
.and_then(|as_of| as_of.checked_add(1))
.as_ref()
== self.until.as_option()
}
}
impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
pub fn new(name: String) -> Self {
Self {
source_imports: Default::default(),
index_imports: Default::default(),
objects_to_build: Vec::new(),
index_exports: Default::default(),
sink_exports: Default::default(),
as_of: Default::default(),
until: Antichain::new(),
debug_name: name,
}
}
pub fn import_index(
&mut self,
id: GlobalId,
desc: IndexDesc,
typ: RelationType,
monotonic: bool,
) {
self.index_imports.insert(
id,
IndexImport {
desc,
typ,
monotonic,
},
);
}
pub fn import_source(&mut self, id: GlobalId, typ: RelationType, monotonic: bool) {
self.source_imports.insert(
id,
(
SourceInstanceDesc {
storage_metadata: (),
arguments: SourceInstanceArguments { operators: None },
typ,
},
monotonic,
),
);
}
pub fn insert_plan(&mut self, id: GlobalId, plan: OptimizedMirRelationExpr) {
self.objects_to_build.push(BuildDesc { id, plan });
}
pub fn export_index(&mut self, id: GlobalId, description: IndexDesc, on_type: RelationType) {
self.insert_plan(
id,
OptimizedMirRelationExpr::declare_optimized(MirRelationExpr::ArrangeBy {
input: Box::new(MirRelationExpr::global_get(
description.on_id,
on_type.clone(),
)),
keys: vec![description.key.clone()],
}),
);
self.index_exports.insert(id, (description, on_type));
}
pub fn export_sink(&mut self, id: GlobalId, description: ComputeSinkDesc<(), T>) {
self.sink_exports.insert(id, description);
}
pub fn is_imported(&self, id: &GlobalId) -> bool {
self.objects_to_build.iter().any(|bd| &bd.id == id)
|| self.source_imports.keys().any(|i| i == id)
}
pub fn set_as_of(&mut self, as_of: Antichain<T>) {
self.as_of = Some(as_of);
}
pub fn arity_of(&self, id: &GlobalId) -> usize {
for (source_id, (source, _monotonic)) in self.source_imports.iter() {
if source_id == id {
return source.typ.arity();
}
}
for IndexImport { desc, typ, .. } in self.index_imports.values() {
if &desc.on_id == id {
return typ.arity();
}
}
for desc in self.objects_to_build.iter() {
if &desc.id == id {
return desc.plan.arity();
}
}
panic!("GlobalId {} not found in DataflowDesc", id);
}
pub fn visit_children<R, S, E>(&mut self, r: R, s: S) -> Result<(), E>
where
R: Fn(&mut OptimizedMirRelationExpr) -> Result<(), E>,
S: Fn(&mut MirScalarExpr) -> Result<(), E>,
{
for BuildDesc { plan, .. } in &mut self.objects_to_build {
r(plan)?;
}
for (source_instance_desc, _) in self.source_imports.values_mut() {
let Some(mfp) = source_instance_desc.arguments.operators.as_mut() else {
continue;
};
for expr in mfp.expressions.iter_mut() {
s(expr)?;
}
for (_, expr) in mfp.predicates.iter_mut() {
s(expr)?;
}
}
Ok(())
}
}
impl<P, S, T> DataflowDescription<P, S, T>
where
P: CollectionPlan,
{
pub fn export_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
self.index_exports
.keys()
.chain(self.sink_exports.keys())
.cloned()
}
pub fn subscribe_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
self.sink_exports
.iter()
.filter_map(|(id, desc)| match desc.connection {
ComputeSinkConnection::Subscribe(_) => Some(*id),
_ => None,
})
}
pub fn build_desc(&self, id: GlobalId) -> &BuildDesc<P> {
let mut builds = self.objects_to_build.iter().filter(|build| build.id == id);
let build = builds
.next()
.unwrap_or_else(|| panic!("object to build id {id} unexpectedly missing"));
assert!(builds.next().is_none());
build
}
pub fn depends_on(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
let mut out = BTreeSet::new();
self.depends_on_into(collection_id, &mut out);
out
}
pub fn depends_on_into(&self, collection_id: GlobalId, out: &mut BTreeSet<GlobalId>) {
out.insert(collection_id);
if self.source_imports.contains_key(&collection_id) {
out.insert(collection_id);
return;
}
let mut found_index = false;
for (index_id, IndexImport { desc, .. }) in &self.index_imports {
if desc.on_id == collection_id {
out.insert(*index_id);
found_index = true;
}
}
if found_index {
return;
}
let build = self.build_desc(collection_id);
for id in build.plan.depends_on() {
if !out.contains(&id) {
self.depends_on_into(id, out)
}
}
}
pub fn depends_on_imports(&self, collection_id: GlobalId) -> BTreeSet<GlobalId> {
let is_import = |id: &GlobalId| {
self.source_imports.contains_key(id) || self.index_imports.contains_key(id)
};
let deps = self.depends_on(collection_id);
deps.into_iter().filter(is_import).collect()
}
}
impl<P: PartialEq, S: PartialEq, T: timely::PartialOrder> DataflowDescription<P, S, T> {
pub fn compatible_with(&self, other: &Self) -> bool {
let equality = self.index_exports == other.index_exports
&& self.sink_exports == other.sink_exports
&& self.objects_to_build == other.objects_to_build
&& self.index_imports == other.index_imports
&& self.source_imports == other.source_imports;
let partial = if let (Some(as_of), Some(other_as_of)) = (&self.as_of, &other.as_of) {
timely::PartialOrder::less_equal(as_of, other_as_of)
} else {
false
};
equality && partial
}
}
impl RustType<ProtoDataflowDescription>
for DataflowDescription<crate::plan::Plan, CollectionMetadata>
{
fn into_proto(&self) -> ProtoDataflowDescription {
ProtoDataflowDescription {
source_imports: self.source_imports.into_proto(),
index_imports: self.index_imports.into_proto(),
objects_to_build: self.objects_to_build.into_proto(),
index_exports: self.index_exports.into_proto(),
sink_exports: self.sink_exports.into_proto(),
as_of: self.as_of.into_proto(),
until: Some(self.until.into_proto()),
debug_name: self.debug_name.clone(),
}
}
fn from_proto(proto: ProtoDataflowDescription) -> Result<Self, TryFromProtoError> {
Ok(DataflowDescription {
source_imports: proto.source_imports.into_rust()?,
index_imports: proto.index_imports.into_rust()?,
objects_to_build: proto.objects_to_build.into_rust()?,
index_exports: proto.index_exports.into_rust()?,
sink_exports: proto.sink_exports.into_rust()?,
as_of: proto.as_of.map(|x| x.into_rust()).transpose()?,
until: proto
.until
.map(|x| x.into_rust())
.transpose()?
.unwrap_or_else(Antichain::new),
debug_name: proto.debug_name,
})
}
}
impl ProtoMapEntry<GlobalId, (SourceInstanceDesc<CollectionMetadata>, bool)> for ProtoSourceImport {
fn from_rust<'a>(
entry: (
&'a GlobalId,
&'a (SourceInstanceDesc<CollectionMetadata>, bool),
),
) -> Self {
ProtoSourceImport {
id: Some(entry.0.into_proto()),
source_instance_desc: Some(entry.1 .0.into_proto()),
monotonic: entry.1 .1.into_proto(),
}
}
fn into_rust(
self,
) -> Result<(GlobalId, (SourceInstanceDesc<CollectionMetadata>, bool)), TryFromProtoError> {
Ok((
self.id.into_rust_if_some("ProtoSourceImport::id")?,
(
self.source_instance_desc
.into_rust_if_some("ProtoSourceImport::source_instance_desc")?,
self.monotonic.into_rust()?,
),
))
}
}
impl ProtoMapEntry<GlobalId, IndexImport> for ProtoIndexImport {
fn from_rust<'a>(
(
id,
IndexImport {
desc,
typ,
monotonic,
},
): (&'a GlobalId, &'a IndexImport),
) -> Self {
ProtoIndexImport {
id: Some(id.into_proto()),
index_desc: Some(desc.into_proto()),
typ: Some(typ.into_proto()),
monotonic: monotonic.into_proto(),
}
}
fn into_rust(self) -> Result<(GlobalId, IndexImport), TryFromProtoError> {
Ok((
self.id.into_rust_if_some("ProtoIndex::id")?,
IndexImport {
desc: self
.index_desc
.into_rust_if_some("ProtoIndexImport::index_desc")?,
typ: self.typ.into_rust_if_some("ProtoIndexImport::typ")?,
monotonic: self.monotonic.into_rust()?,
},
))
}
}
impl ProtoMapEntry<GlobalId, (IndexDesc, RelationType)> for ProtoIndexExport {
fn from_rust<'a>(
(id, (index_desc, typ)): (&'a GlobalId, &'a (IndexDesc, RelationType)),
) -> Self {
ProtoIndexExport {
id: Some(id.into_proto()),
index_desc: Some(index_desc.into_proto()),
typ: Some(typ.into_proto()),
}
}
fn into_rust(self) -> Result<(GlobalId, (IndexDesc, RelationType)), TryFromProtoError> {
Ok((
self.id.into_rust_if_some("ProtoIndexExport::id")?,
(
self.index_desc
.into_rust_if_some("ProtoIndexExport::index_desc")?,
self.typ.into_rust_if_some("ProtoIndexExport::typ")?,
),
))
}
}
impl ProtoMapEntry<GlobalId, ComputeSinkDesc<CollectionMetadata>> for ProtoSinkExport {
fn from_rust<'a>(
(id, sink_desc): (&'a GlobalId, &'a ComputeSinkDesc<CollectionMetadata>),
) -> Self {
ProtoSinkExport {
id: Some(id.into_proto()),
sink_desc: Some(sink_desc.into_proto()),
}
}
fn into_rust(
self,
) -> Result<(GlobalId, ComputeSinkDesc<CollectionMetadata>), TryFromProtoError> {
Ok((
self.id.into_rust_if_some("ProtoSinkExport::id")?,
self.sink_desc
.into_rust_if_some("ProtoSinkExport::sink_desc")?,
))
}
}
impl Arbitrary for DataflowDescription<Plan, CollectionMetadata, mz_repr::Timestamp> {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
any_dataflow_description().boxed()
}
}
proptest::prop_compose! {
fn any_dataflow_description()(
source_imports in proptest::collection::vec(any_source_import(), 1..3),
index_imports in proptest::collection::vec(any_dataflow_index_import(), 1..3),
objects_to_build in proptest::collection::vec(any::<BuildDesc<Plan>>(), 1..3),
index_exports in proptest::collection::vec(any_dataflow_index_export(), 1..3),
sink_descs in proptest::collection::vec(
any::<(GlobalId, ComputeSinkDesc<CollectionMetadata, mz_repr::Timestamp>)>(),
1..3,
),
as_of_some in any::<bool>(),
as_of in proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..5),
debug_name in ".*",
) -> DataflowDescription<Plan, CollectionMetadata, mz_repr::Timestamp> {
DataflowDescription {
source_imports: BTreeMap::from_iter(source_imports.into_iter()),
index_imports: BTreeMap::from_iter(index_imports.into_iter()),
objects_to_build,
index_exports: BTreeMap::from_iter(index_exports.into_iter()),
sink_exports: BTreeMap::from_iter(
sink_descs.into_iter(),
),
as_of: if as_of_some {
Some(Antichain::from(as_of))
} else {
None
},
until: Antichain::new(),
debug_name,
}
}
}
fn any_source_import(
) -> impl Strategy<Value = (GlobalId, (SourceInstanceDesc<CollectionMetadata>, bool))> {
(
any::<GlobalId>(),
any::<(SourceInstanceDesc<CollectionMetadata>, bool)>(),
)
}
proptest::prop_compose! {
fn any_dataflow_index_import()(
id in any::<GlobalId>(),
desc in any::<IndexDesc>(),
typ in any::<RelationType>(),
monotonic in any::<bool>(),
) -> (GlobalId, IndexImport) {
(id, IndexImport {desc, typ, monotonic})
}
}
proptest::prop_compose! {
fn any_dataflow_index_export()(
id in any::<GlobalId>(),
index in any::<IndexDesc>(),
typ in any::<RelationType>(),
) -> (GlobalId, (IndexDesc, RelationType)) {
(id, (index, typ))
}
}
pub type DataflowDesc = DataflowDescription<OptimizedMirRelationExpr, ()>;
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct IndexDesc {
pub on_id: GlobalId,
#[proptest(strategy = "proptest::collection::vec(any::<MirScalarExpr>(), 1..3)")]
pub key: Vec<MirScalarExpr>,
}
impl RustType<ProtoIndexDesc> for IndexDesc {
fn into_proto(&self) -> ProtoIndexDesc {
ProtoIndexDesc {
on_id: Some(self.on_id.into_proto()),
key: self.key.into_proto(),
}
}
fn from_proto(proto: ProtoIndexDesc) -> Result<Self, TryFromProtoError> {
Ok(IndexDesc {
on_id: proto.on_id.into_rust_if_some("ProtoIndexDesc::on_id")?,
key: proto.key.into_rust()?,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IndexImport {
pub desc: IndexDesc,
pub typ: RelationType,
pub monotonic: bool,
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct BuildDesc<P> {
pub id: GlobalId,
pub plan: P,
}
impl RustType<ProtoBuildDesc> for BuildDesc<crate::plan::Plan> {
fn into_proto(&self) -> ProtoBuildDesc {
ProtoBuildDesc {
id: Some(self.id.into_proto()),
plan: Some(self.plan.into_proto()),
}
}
fn from_proto(x: ProtoBuildDesc) -> Result<Self, TryFromProtoError> {
Ok(BuildDesc {
id: x.id.into_rust_if_some("ProtoBuildDesc::id")?,
plan: x.plan.into_rust_if_some("ProtoBuildDesc::plan")?,
})
}
}
#[cfg(test)]
mod tests {
use mz_proto::protobuf_roundtrip;
use proptest::prelude::ProptestConfig;
use proptest::proptest;
use crate::types::dataflows::DataflowDescription;
use super::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(32))]
#[mz_ore::test]
fn dataflow_description_protobuf_roundtrip(expect in any::<DataflowDescription<Plan, CollectionMetadata, mz_repr::Timestamp>>()) {
let actual = protobuf_roundtrip::<_, ProtoDataflowDescription>(&expect);
assert!(actual.is_ok());
assert_eq!(actual.unwrap(), expect);
}
}
}