use std::collections::{BTreeMap, BTreeSet};
use chrono::{DateTime, Utc};
use maplit::{btreemap, btreeset};
use mz_adapter_types::compaction::CompactionWindow;
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, MaterializedView, Source, View};
use mz_compute_client::controller::error::InstanceMissing;
use mz_compute_types::dataflows::{BuildDesc, DataflowDesc, DataflowDescription, IndexDesc};
use mz_compute_types::plan::Plan;
use mz_compute_types::sinks::ComputeSinkDesc;
use mz_compute_types::ComputeInstanceId;
use mz_controller::Controller;
use mz_expr::visit::Visit;
use mz_expr::{
CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
UnmaterializableFunc, RECURSION_LIMIT,
};
use mz_ore::cast::ReinterpretCast;
use mz_ore::stack::{maybe_grow, CheckedRecursion, RecursionGuard, RecursionLimitError};
use mz_repr::adt::array::ArrayDimension;
use mz_repr::role_id::RoleId;
use mz_repr::{Datum, GlobalId, Row};
use mz_sql::catalog::{CatalogRole, SessionCatalog};
use mz_sql::rbac;
use mz_transform::dataflow::DataflowMetainfo;
use tracing::warn;
use crate::catalog::CatalogState;
use crate::coord::id_bundle::CollectionIdBundle;
use crate::coord::Coordinator;
use crate::session::{Session, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
use crate::util::{viewable_variables, ResultExt};
use crate::AdapterError;
#[derive(Debug, Clone)]
pub struct ComputeInstanceSnapshot {
instance_id: ComputeInstanceId,
collections: BTreeSet<GlobalId>,
}
impl ComputeInstanceSnapshot {
pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
controller
.compute
.instance_ref(id)
.map(|instance| ComputeInstanceSnapshot {
instance_id: id,
collections: BTreeSet::from_iter(instance.collections().map(|(id, _state)| *id)),
})
}
pub fn instance_id(&self) -> ComputeInstanceId {
self.instance_id
}
pub fn contains_collection(&self, id: &GlobalId) -> bool {
self.collections.contains(id)
}
pub fn insert_collection(&mut self, id: GlobalId) {
self.collections.insert(id);
}
}
#[derive(Debug)]
pub struct DataflowBuilder<'a> {
pub catalog: &'a CatalogState,
pub compute: ComputeInstanceSnapshot,
pub ignored_indexes: BTreeSet<GlobalId>,
recursion_guard: RecursionGuard,
}
#[derive(Clone, Copy, Debug)]
pub enum ExprPrepStyle<'a> {
Index,
OneShot {
logical_time: EvalTime,
session: &'a Session,
catalog_state: &'a CatalogState,
},
AsOfUpTo,
WebhookValidation {
now: DateTime<Utc>,
},
}
#[derive(Clone, Copy, Debug)]
pub enum EvalTime {
Time(mz_repr::Timestamp),
Deferred,
NotAvailable,
}
impl Coordinator {
#[tracing::instrument(level = "debug", skip_all)]
pub fn dataflow_builder(&self, instance: ComputeInstanceId) -> DataflowBuilder {
let compute = self
.instance_snapshot(instance)
.expect("compute instance does not exist");
DataflowBuilder::new(self.catalog().state(), compute)
}
pub fn instance_snapshot(
&self,
id: ComputeInstanceId,
) -> Result<ComputeInstanceSnapshot, InstanceMissing> {
ComputeInstanceSnapshot::new(&self.controller, id)
}
pub(crate) async fn ship_dataflow(
&mut self,
dataflow: DataflowDescription<Plan>,
instance: ComputeInstanceId,
) {
let export_ids = dataflow.export_ids().collect();
self.controller
.active_compute()
.create_dataflow(instance, dataflow)
.unwrap_or_terminate("dataflow creation cannot fail");
self.initialize_compute_read_policies(export_ids, instance, CompactionWindow::Default)
.await;
}
}
pub fn dataflow_import_id_bundle<P>(
dataflow: &DataflowDescription<P>,
compute_instance: ComputeInstanceId,
) -> CollectionIdBundle {
let storage_ids = dataflow.source_imports.keys().copied().collect();
let compute_ids = dataflow.index_imports.keys().copied().collect();
CollectionIdBundle {
storage_ids,
compute_ids: btreemap! {compute_instance => compute_ids},
}
}
impl<'a> DataflowBuilder<'a> {
pub fn new(catalog: &'a CatalogState, compute: ComputeInstanceSnapshot) -> Self {
Self {
catalog,
compute,
ignored_indexes: Default::default(),
recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
}
}
pub fn ignore_indexes(&mut self, indexes: impl Iterator<Item = GlobalId>) {
self.ignored_indexes.extend(indexes);
}
pub fn import_into_dataflow(
&mut self,
id: &GlobalId,
dataflow: &mut DataflowDesc,
) -> Result<(), AdapterError> {
maybe_grow(|| {
if dataflow.is_imported(id) {
return Ok(());
}
let mut valid_indexes = self.indexes_on(*id).peekable();
if valid_indexes.peek().is_some() {
for (index_id, idx) in valid_indexes {
let index_desc = IndexDesc {
on_id: *id,
key: idx.keys.to_vec(),
};
let entry = self.catalog.get_entry(id);
let desc = entry
.desc(
&self
.catalog
.resolve_full_name(entry.name(), entry.conn_id()),
)
.expect("indexes can only be built on items with descs");
let monotonic = self.monotonic_view(*id);
dataflow.import_index(index_id, index_desc, desc.typ().clone(), monotonic);
}
} else {
drop(valid_indexes);
let entry = self.catalog.get_entry(id);
match entry.item() {
CatalogItem::Table(table) => {
dataflow.import_source(*id, table.desc.typ().clone(), false);
}
CatalogItem::Source(source) => {
dataflow.import_source(
*id,
source.desc.typ().clone(),
self.monotonic_source(source),
);
}
CatalogItem::View(view) => {
let expr = view.optimized_expr.clone();
self.import_view_into_dataflow(id, &expr, dataflow)?;
}
CatalogItem::MaterializedView(mview) => {
let monotonic = self.monotonic_view(*id);
dataflow.import_source(*id, mview.desc.typ().clone(), monotonic);
}
CatalogItem::Log(log) => {
dataflow.import_source(*id, log.variant.desc().typ().clone(), false);
}
_ => unreachable!(),
}
}
Ok(())
})
}
pub fn import_view_into_dataflow(
&mut self,
view_id: &GlobalId,
view: &OptimizedMirRelationExpr,
dataflow: &mut DataflowDesc,
) -> Result<(), AdapterError> {
for get_id in view.depends_on() {
self.import_into_dataflow(&get_id, dataflow)?;
}
dataflow.insert_plan(*view_id, view.clone());
Ok(())
}
pub fn build_sink_dataflow(
&mut self,
name: String,
id: GlobalId,
sink_description: ComputeSinkDesc,
) -> Result<(DataflowDesc, DataflowMetainfo), AdapterError> {
let mut dataflow = DataflowDesc::new(name);
let dataflow_metainfo =
self.build_sink_dataflow_into(&mut dataflow, id, sink_description)?;
Ok((dataflow, dataflow_metainfo))
}
pub fn build_sink_dataflow_into(
&mut self,
dataflow: &mut DataflowDesc,
id: GlobalId,
sink_description: ComputeSinkDesc,
) -> Result<DataflowMetainfo, AdapterError> {
self.import_into_dataflow(&sink_description.from, dataflow)?;
for BuildDesc { plan, .. } in &mut dataflow.objects_to_build {
prep_relation_expr(plan, ExprPrepStyle::Index)?;
}
dataflow.export_sink(id, sink_description);
let dataflow_metainfo =
mz_transform::optimize_dataflow(dataflow, self, &mz_transform::EmptyStatisticsOracle)?;
Ok(dataflow_metainfo)
}
fn monotonic_source(&self, source: &Source) -> bool {
match &source.data_source {
DataSourceDesc::Ingestion(ingestion) => ingestion.desc.monotonic(),
DataSourceDesc::Introspection(_)
| DataSourceDesc::Progress
| DataSourceDesc::Webhook { .. }
| DataSourceDesc::Source => false,
}
}
fn monotonic_view(&self, id: GlobalId) -> bool {
self.monotonic_view_inner(id, &mut BTreeMap::new())
.unwrap_or_else(|e| {
warn!("Error inspecting view {id} for monotonicity: {e}");
false
})
}
fn monotonic_view_inner(
&self,
id: GlobalId,
memo: &mut BTreeMap<GlobalId, bool>,
) -> Result<bool, RecursionLimitError> {
self.checked_recur(|_| {
match self.catalog.get_entry(&id).item() {
CatalogItem::Source(source) => Ok(self.monotonic_source(source)),
CatalogItem::View(View { optimized_expr, .. })
| CatalogItem::MaterializedView(MaterializedView { optimized_expr, .. }) => {
let mut view_expr = optimized_expr.clone().into_inner();
let mut monotonic_ids = BTreeSet::new();
let recursion_result: Result<(), RecursionLimitError> = view_expr
.try_visit_post(&mut |e| {
if let MirRelationExpr::Get {
id: Id::Global(got_id),
..
} = e
{
let got_id = *got_id;
let monotonic = match memo.get(&got_id) {
Some(monotonic) => *monotonic,
None => {
let monotonic = self.monotonic_view_inner(got_id, memo)?;
memo.insert(got_id, monotonic);
monotonic
}
};
if monotonic {
monotonic_ids.insert(got_id);
}
}
Ok(())
});
if let Err(error) = recursion_result {
warn!("Error inspecting view {id} for monotonicity: {error}");
}
mz_transform::monotonic::MonotonicFlag::default().apply(
&mut view_expr,
&monotonic_ids,
&mut BTreeSet::new(),
)
}
CatalogItem::Secret(_)
| CatalogItem::Type(_)
| CatalogItem::Connection(_)
| CatalogItem::Table(_)
| CatalogItem::Log(_)
| CatalogItem::Index(_)
| CatalogItem::Sink(_)
| CatalogItem::Func(_) => Ok(false),
}
})
}
}
impl<'a> CheckedRecursion for DataflowBuilder<'a> {
fn recursion_guard(&self) -> &RecursionGuard {
&self.recursion_guard
}
}
pub fn prep_relation_expr(
expr: &mut OptimizedMirRelationExpr,
style: ExprPrepStyle,
) -> Result<(), AdapterError> {
match style {
ExprPrepStyle::Index => {
expr.0.try_visit_mut_post(&mut |e| {
if let MirRelationExpr::Filter { input, predicates } = &*e {
let mfp =
MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
match mfp.into_plan() {
Err(e) => coord_bail!("{:?}", e),
Ok(mut mfp) => {
for s in mfp.iter_nontemporal_exprs() {
prep_scalar_expr(s, style)?;
}
Ok(())
}
}
} else {
e.try_visit_scalars_mut1(&mut |s| prep_scalar_expr(s, style))
}
})
}
ExprPrepStyle::OneShot { .. }
| ExprPrepStyle::AsOfUpTo
| ExprPrepStyle::WebhookValidation { .. } => expr
.0
.try_visit_scalars_mut(&mut |s| prep_scalar_expr(s, style)),
}
}
pub fn prep_scalar_expr(
expr: &mut MirScalarExpr,
style: ExprPrepStyle,
) -> Result<(), AdapterError> {
match style {
ExprPrepStyle::OneShot {
logical_time,
session,
catalog_state,
} => expr.try_visit_mut_post(&mut |e| {
if let MirScalarExpr::CallUnmaterializable(f) = e {
*e = eval_unmaterializable_func(catalog_state, f, logical_time, session)?;
}
Ok(())
}),
ExprPrepStyle::Index | ExprPrepStyle::AsOfUpTo => {
let mut last_observed_unmaterializable_func = None;
expr.visit_mut_post(&mut |e| {
if let MirScalarExpr::CallUnmaterializable(f) = e {
last_observed_unmaterializable_func = Some(f.clone());
}
})?;
if let Some(f) = last_observed_unmaterializable_func {
let err = match style {
ExprPrepStyle::Index => AdapterError::UnmaterializableFunction(f),
ExprPrepStyle::AsOfUpTo => AdapterError::UncallableFunction {
func: f,
context: "AS OF or UP TO",
},
_ => unreachable!(),
};
return Err(err);
}
Ok(())
}
ExprPrepStyle::WebhookValidation { now } => {
expr.try_visit_mut_post(&mut |e| {
if let MirScalarExpr::CallUnmaterializable(
f @ UnmaterializableFunc::CurrentTimestamp,
) = e
{
let now: Datum = now.try_into()?;
let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
*e = const_expr;
}
Ok::<_, anyhow::Error>(())
})?;
Ok(())
}
}
}
fn eval_unmaterializable_func(
state: &CatalogState,
f: &UnmaterializableFunc,
logical_time: EvalTime,
session: &Session,
) -> Result<MirScalarExpr, AdapterError> {
let pack_1d_array = |datums: Vec<Datum>| {
let mut row = Row::default();
row.packer()
.push_array(
&[ArrayDimension {
lower_bound: 1,
length: datums.len(),
}],
datums,
)
.expect("known to be a valid array");
Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
};
let pack_dict = |mut datums: Vec<(String, String)>| {
datums.sort();
let mut row = Row::default();
row.packer().push_dict(
datums
.iter()
.map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
);
Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
};
let pack = |datum| {
Ok(MirScalarExpr::literal_ok(
datum,
f.output_type().scalar_type,
))
};
match f {
UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.vars().database())),
UnmaterializableFunc::CurrentSchema => {
let catalog = state.for_session(session);
let schema = catalog
.search_path()
.first()
.map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
pack(Datum::from(schema))
}
UnmaterializableFunc::CurrentSchemasWithSystem => {
let catalog = state.for_session(session);
let search_path = catalog.effective_search_path(false);
pack_1d_array(
search_path
.into_iter()
.map(|(db, schema)| {
let schema = state.get_schema(&db, &schema, session.conn_id());
Datum::String(&schema.name.schema)
})
.collect(),
)
}
UnmaterializableFunc::CurrentSchemasWithoutSystem => {
let catalog = state.for_session(session);
let search_path = catalog.search_path();
pack_1d_array(
search_path
.into_iter()
.map(|(db, schema)| {
let schema = state.get_schema(db, schema, session.conn_id());
Datum::String(&schema.name.schema)
})
.collect(),
)
}
UnmaterializableFunc::ViewableVariables => pack_dict(
viewable_variables(state, session)
.map(|var| (var.name().to_lowercase(), var.value()))
.collect(),
),
UnmaterializableFunc::CurrentTimestamp => {
let t: Datum = session.pcx().wall_time.try_into()?;
pack(t)
}
UnmaterializableFunc::CurrentUser => pack(Datum::from(
state.get_role(session.current_role_id()).name(),
)),
UnmaterializableFunc::SessionUser => pack(Datum::from(
state.get_role(session.session_role_id()).name(),
)),
UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
rbac::is_rbac_enabled_for_session(state.system_config(), session.vars()),
)),
UnmaterializableFunc::MzEnvironmentId => {
pack(Datum::from(&*state.config().environment_id.to_string()))
}
UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
UnmaterializableFunc::MzNow => match logical_time {
EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
EvalTime::NotAvailable => coord_bail!("cannot call mz_now in this context"),
},
UnmaterializableFunc::MzRoleOidMemberships => {
let role_memberships = role_oid_memberships(state);
let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
.into_iter()
.map(|(role_id, role_membership)| {
(
role_id.to_string(),
role_membership
.into_iter()
.map(|role_id| role_id.to_string())
.collect(),
)
})
.collect();
role_memberships.sort();
let mut row = Row::default();
row.packer().push_dict_with(|row| {
for (role_id, role_membership) in &role_memberships {
row.push(Datum::from(role_id.as_str()));
row.push_array(
&[ArrayDimension {
lower_bound: 1,
length: role_membership.len(),
}],
role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
).expect("role_membership is 1 dimensional, and its length is used for the array length");
}
});
Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
}
UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
UnmaterializableFunc::MzUptime => {
let uptime = state.config().start_instant.elapsed();
let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
pack(uptime)
}
UnmaterializableFunc::MzVersion => {
pack(Datum::from(&*state.config().build_info.human_version()))
}
UnmaterializableFunc::MzVersionNum => {
pack(Datum::Int32(state.config().build_info.version_num()))
}
UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
session.conn_id().unhandled(),
))),
UnmaterializableFunc::PgPostmasterStartTime => {
let t: Datum = state.config().start_time.try_into()?;
pack(t)
}
UnmaterializableFunc::Version => {
let build_info = state.config().build_info;
let version = format!(
"PostgreSQL {}.{} on {} (Materialize {})",
SERVER_MAJOR_VERSION,
SERVER_MINOR_VERSION,
mz_build_info::TARGET_TRIPLE,
build_info.version,
);
pack(Datum::from(&*version))
}
}
}
fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
let mut role_memberships = BTreeMap::new();
for role_id in catalog.get_roles() {
let role = catalog.get_role(role_id);
if !role_memberships.contains_key(&role.oid) {
role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
}
}
role_memberships
}
fn role_oid_memberships_inner<'a>(
catalog: &'a CatalogState,
role_id: &RoleId,
role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
) {
let role = catalog.get_role(role_id);
role_memberships.insert(role.oid, btreeset! {role.oid});
for parent_role_id in role.membership.map.keys() {
let parent_role = catalog.get_role(parent_role_id);
if !role_memberships.contains_key(&parent_role.oid) {
role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
}
let parent_membership: BTreeSet<_> = role_memberships
.get(&parent_role.oid)
.expect("inserted in recursive call above")
.into_iter()
.cloned()
.collect();
role_memberships
.get_mut(&role.oid)
.expect("inserted above")
.extend(parent_membership);
}
}
#[cfg(test)]
impl Coordinator {
#[allow(dead_code)]
async fn verify_ship_dataflow_no_error(&mut self, dataflow: DataflowDescription<Plan>) {
let compute_instance = ComputeInstanceId::User(1);
let _: () = self.ship_dataflow(dataflow, compute_instance).await;
}
}