1use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, Source, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30 CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31 RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56 instance_id: ComputeInstanceId,
57 collections: BTreeSet<GlobalId>,
58}
59
60impl ComputeInstanceSnapshot {
61 pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
62 controller
63 .compute
64 .collection_ids(id)
65 .map(|collection_ids| Self {
66 instance_id: id,
67 collections: collection_ids.collect(),
68 })
69 }
70
71 pub fn instance_id(&self) -> ComputeInstanceId {
73 self.instance_id
74 }
75
76 pub fn contains_collection(&self, id: &GlobalId) -> bool {
78 self.collections.contains(id)
79 }
80
81 pub fn insert_collection(&mut self, id: GlobalId) {
83 self.collections.insert(id);
84 }
85}
86
87#[derive(Debug)]
89pub struct DataflowBuilder<'a> {
90 pub catalog: &'a dyn OptimizerCatalog,
91 pub compute: ComputeInstanceSnapshot,
96 pub replan: Option<GlobalId>,
105 recursion_guard: RecursionGuard,
107}
108
109#[derive(Clone, Copy, Debug)]
111pub enum ExprPrepStyle<'a> {
112 Maintained,
115 OneShot {
118 logical_time: EvalTime,
119 session: &'a dyn SessionMetadata,
120 catalog_state: &'a CatalogState,
121 },
122 AsOfUpTo,
124 WebhookValidation {
126 now: DateTime<Utc>,
128 },
129}
130
131#[derive(Clone, Copy, Debug)]
132pub enum EvalTime {
133 Time(mz_repr::Timestamp),
134 Deferred,
136 NotAvailable,
138}
139
140pub fn dataflow_import_id_bundle<P>(
142 dataflow: &DataflowDescription<P>,
143 compute_instance: ComputeInstanceId,
144) -> CollectionIdBundle {
145 let storage_ids = dataflow.source_imports.keys().copied().collect();
146 let compute_ids = dataflow.index_imports.keys().copied().collect();
147 CollectionIdBundle {
148 storage_ids,
149 compute_ids: btreemap! {compute_instance => compute_ids},
150 }
151}
152
153impl<'a> DataflowBuilder<'a> {
154 pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
155 Self {
156 catalog,
157 compute,
158 replan: None,
159 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
160 }
161 }
162
163 pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
168 self.replan = config.replan;
169 self
170 }
171
172 pub fn import_into_dataflow(
176 &mut self,
177 id: &GlobalId,
178 dataflow: &mut DataflowDesc,
179 features: &OptimizerFeatures,
180 ) -> Result<(), OptimizerError> {
181 maybe_grow(|| {
182 if dataflow.is_imported(id) {
184 return Ok(());
185 }
186
187 let monotonic = self.monotonic_object(*id, features);
188
189 let mut valid_indexes = self.indexes_on(*id).peekable();
194 if valid_indexes.peek().is_some() {
195 for (index_id, idx) in valid_indexes {
196 let index_desc = IndexDesc {
197 on_id: *id,
198 key: idx.keys.to_vec(),
199 };
200 let entry = self.catalog.get_entry(id);
201 let desc = entry
202 .desc(
203 &self
204 .catalog
205 .resolve_full_name(entry.name(), entry.conn_id()),
206 )
207 .expect("indexes can only be built on items with descs");
208 dataflow.import_index(index_id, index_desc, desc.typ().clone(), monotonic);
209 }
210 } else {
211 drop(valid_indexes);
212 let entry = self.catalog.get_entry(id);
213 match entry.item() {
214 CatalogItem::Table(table) => {
215 dataflow.import_source(*id, table.desc_for(id).typ().clone(), monotonic);
216 }
217 CatalogItem::Source(source) => {
218 dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
219 }
220 CatalogItem::View(view) => {
221 let expr = view.optimized_expr.as_ref();
222 self.import_view_into_dataflow(id, expr, dataflow, features)?;
223 }
224 CatalogItem::MaterializedView(mview) => {
225 dataflow.import_source(*id, mview.desc.typ().clone(), monotonic);
226 }
227 CatalogItem::Log(log) => {
228 dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
229 }
230 CatalogItem::ContinualTask(ct) => {
231 dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
232 }
233 _ => unreachable!(),
234 }
235 }
236 Ok(())
237 })
238 }
239
240 pub fn import_view_into_dataflow(
250 &mut self,
251 view_id: &GlobalId,
252 view: &OptimizedMirRelationExpr,
253 dataflow: &mut DataflowDesc,
254 features: &OptimizerFeatures,
255 ) -> Result<(), OptimizerError> {
256 for get_id in view.depends_on() {
257 self.import_into_dataflow(&get_id, dataflow, features)?;
258 }
259 dataflow.insert_plan(*view_id, view.clone());
260 Ok(())
261 }
262
263 pub fn maybe_reoptimize_imported_views(
266 &self,
267 df_desc: &mut DataflowDesc,
268 config: &OptimizerConfig,
269 ) -> Result<(), OptimizerError> {
270 if !config.features.reoptimize_imported_views {
271 return Ok(()); }
273
274 let mut view_optimizer = view::Optimizer::new(config.clone(), None);
275 for desc in df_desc.objects_to_build.iter_mut().rev() {
276 if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
277 continue; }
279 if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
280 let _span = tracing::span!(
281 target: "optimizer",
282 tracing::Level::DEBUG,
283 "view",
284 path.segment = desc.id.to_string()
285 )
286 .entered();
287
288 desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
290
291 trace_plan(desc.plan.as_inner());
293 }
294 }
295
296 Ok(())
297 }
298
299 fn monotonic_source(&self, source: &Source) -> bool {
301 match &source.data_source {
302 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
303 ingestion_desc
305 .desc
306 .primary_export
307 .monotonic(&ingestion_desc.desc.connection)
308 }
309 DataSourceDesc::Webhook { .. } => true,
310 DataSourceDesc::IngestionExport {
311 ingestion_id,
312 data_config,
313 ..
314 } => {
315 let source_desc = self
316 .catalog
317 .get_entry_by_item_id(ingestion_id)
318 .source_desc()
319 .expect("ingestion export must reference a source")
320 .expect("ingestion export must reference a source");
321 data_config.monotonic(&source_desc.connection)
322 }
323 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => false,
324 }
325 }
326
327 fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
337 self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
338 .unwrap_or_else(|e| {
339 warn!(%id, "error inspecting object for monotonicity: {e}");
340 false
341 })
342 }
343
344 fn monotonic_object_inner(
345 &self,
346 id: GlobalId,
347 memo: &mut BTreeMap<GlobalId, bool>,
348 features: &OptimizerFeatures,
349 ) -> Result<bool, RecursionLimitError> {
350 if let Some(monotonic) = memo.get(&id) {
353 return Ok(*monotonic);
354 }
355
356 let monotonic = self.checked_recur(|_| {
357 match self.catalog.get_entry(&id).item() {
358 CatalogItem::Source(source) => Ok(self.monotonic_source(source)),
359 CatalogItem::View(View { optimized_expr, .. }) => {
360 let view_expr = optimized_expr.as_ref().clone().into_inner();
361
362 let mut monotonic_ids = BTreeSet::new();
365 let recursion_result: Result<(), RecursionLimitError> = view_expr
366 .try_visit_post(&mut |e| {
367 if let MirRelationExpr::Get {
368 id: Id::Global(got_id),
369 ..
370 } = e
371 {
372 if self.monotonic_object_inner(*got_id, memo, features)? {
373 monotonic_ids.insert(*got_id);
374 }
375 }
376 Ok(())
377 });
378 if let Err(error) = recursion_result {
379 warn!(%id, "error inspecting view for monotonicity: {error}");
382 }
383
384 let mut builder = DerivedBuilder::new(features);
385 builder.require(Monotonic::new(monotonic_ids.clone()));
386 let derived = builder.visit(&view_expr);
387
388 Ok(*derived
389 .as_view()
390 .value::<Monotonic>()
391 .expect("Expected monotonic result from non empty tree"))
392 }
393 CatalogItem::Index(Index { on, .. }) => {
394 self.monotonic_object_inner(*on, memo, features)
395 }
396 CatalogItem::Secret(_)
397 | CatalogItem::Type(_)
398 | CatalogItem::Connection(_)
399 | CatalogItem::Table(_)
400 | CatalogItem::Log(_)
401 | CatalogItem::MaterializedView(_)
402 | CatalogItem::Sink(_)
403 | CatalogItem::Func(_)
404 | CatalogItem::ContinualTask(_) => Ok(false),
405 }
406 })?;
407
408 memo.insert(id, monotonic);
409
410 Ok(monotonic)
411 }
412}
413
414impl<'a> CheckedRecursion for DataflowBuilder<'a> {
415 fn recursion_guard(&self) -> &RecursionGuard {
416 &self.recursion_guard
417 }
418}
419
420pub fn prep_relation_expr(
424 expr: &mut OptimizedMirRelationExpr,
425 style: ExprPrepStyle,
426) -> Result<(), OptimizerError> {
427 match style {
428 ExprPrepStyle::Maintained => {
429 expr.0.try_visit_mut_post(&mut |e| {
430 if let MirRelationExpr::Filter { input, predicates } = &*e {
432 let mfp =
433 MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
434 match mfp.into_plan() {
435 Err(e) => Err(OptimizerError::UnsupportedTemporalExpression(e)),
436 Ok(mut mfp) => {
437 for s in mfp.iter_nontemporal_exprs() {
438 prep_scalar_expr(s, style)?;
439 }
440 Ok(())
441 }
442 }
443 } else {
444 e.try_visit_scalars_mut1(&mut |s| prep_scalar_expr(s, style))
445 }
446 })
447 }
448 ExprPrepStyle::OneShot { .. }
449 | ExprPrepStyle::AsOfUpTo
450 | ExprPrepStyle::WebhookValidation { .. } => expr
451 .0
452 .try_visit_scalars_mut(&mut |s| prep_scalar_expr(s, style)),
453 }
454}
455
456pub fn prep_scalar_expr(
468 expr: &mut MirScalarExpr,
469 style: ExprPrepStyle,
470) -> Result<(), OptimizerError> {
471 match style {
472 ExprPrepStyle::OneShot {
475 logical_time,
476 session,
477 catalog_state,
478 } => expr.try_visit_mut_post(&mut |e| {
479 if let MirScalarExpr::CallUnmaterializable(f) = e {
480 *e = eval_unmaterializable_func(catalog_state, f, logical_time, session)?;
481 }
482 Ok(())
483 }),
484
485 ExprPrepStyle::Maintained | ExprPrepStyle::AsOfUpTo => {
487 let mut last_observed_unmaterializable_func = None;
488 expr.visit_mut_post(&mut |e| {
489 if let MirScalarExpr::CallUnmaterializable(f) = e {
490 last_observed_unmaterializable_func = Some(f.clone());
491 }
492 })?;
493
494 if let Some(f) = last_observed_unmaterializable_func {
495 let err = match style {
496 ExprPrepStyle::Maintained => OptimizerError::UnmaterializableFunction(f),
497 ExprPrepStyle::AsOfUpTo => OptimizerError::UncallableFunction {
498 func: f,
499 context: "AS OF or UP TO",
500 },
501 _ => unreachable!(),
502 };
503 return Err(err);
504 }
505 Ok(())
506 }
507
508 ExprPrepStyle::WebhookValidation { now } => {
509 expr.try_visit_mut_post(&mut |e| {
510 if let MirScalarExpr::CallUnmaterializable(
511 f @ UnmaterializableFunc::CurrentTimestamp,
512 ) = e
513 {
514 let now: Datum = now.try_into()?;
515 let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
516 *e = const_expr;
517 }
518 Ok::<_, anyhow::Error>(())
519 })?;
520 Ok(())
521 }
522 }
523}
524
525fn eval_unmaterializable_func(
526 state: &CatalogState,
527 f: &UnmaterializableFunc,
528 logical_time: EvalTime,
529 session: &dyn SessionMetadata,
530) -> Result<MirScalarExpr, OptimizerError> {
531 let pack_1d_array = |datums: Vec<Datum>| {
532 let mut row = Row::default();
533 row.packer()
534 .try_push_array(
535 &[ArrayDimension {
536 lower_bound: 1,
537 length: datums.len(),
538 }],
539 datums,
540 )
541 .expect("known to be a valid array");
542 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
543 };
544 let pack_dict = |mut datums: Vec<(String, String)>| {
545 datums.sort();
546 let mut row = Row::default();
547 row.packer().push_dict(
548 datums
549 .iter()
550 .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
551 );
552 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
553 };
554 let pack = |datum| {
555 Ok(MirScalarExpr::literal_ok(
556 datum,
557 f.output_type().scalar_type,
558 ))
559 };
560
561 match f {
562 UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
563 UnmaterializableFunc::CurrentSchema => {
564 let search_path = state.resolve_search_path(session);
565 let schema = search_path
566 .first()
567 .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
568 pack(Datum::from(schema))
569 }
570 UnmaterializableFunc::CurrentSchemasWithSystem => {
571 let search_path = state.resolve_search_path(session);
572 let search_path = state.effective_search_path(&search_path, false);
573 pack_1d_array(
574 search_path
575 .into_iter()
576 .map(|(db, schema)| {
577 let schema = state.get_schema(&db, &schema, session.conn_id());
578 Datum::String(&schema.name.schema)
579 })
580 .collect(),
581 )
582 }
583 UnmaterializableFunc::CurrentSchemasWithoutSystem => {
584 let search_path = state.resolve_search_path(session);
585 pack_1d_array(
586 search_path
587 .into_iter()
588 .map(|(db, schema)| {
589 let schema = state.get_schema(&db, &schema, session.conn_id());
590 Datum::String(&schema.name.schema)
591 })
592 .collect(),
593 )
594 }
595 UnmaterializableFunc::ViewableVariables => pack_dict(
596 viewable_variables(state, session)
597 .map(|var| (var.name().to_lowercase(), var.value()))
598 .collect(),
599 ),
600 UnmaterializableFunc::CurrentTimestamp => {
601 let t: Datum = session.pcx().wall_time.try_into()?;
602 pack(t)
603 }
604 UnmaterializableFunc::CurrentUser => pack(Datum::from(
605 state.get_role(session.current_role_id()).name(),
606 )),
607 UnmaterializableFunc::SessionUser => pack(Datum::from(
608 state.get_role(session.session_role_id()).name(),
609 )),
610 UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
611 rbac::is_rbac_enabled_for_session(state.system_config(), session),
612 )),
613 UnmaterializableFunc::MzEnvironmentId => {
614 pack(Datum::from(&*state.config().environment_id.to_string()))
615 }
616 UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
617 UnmaterializableFunc::MzNow => match logical_time {
618 EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
619 EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
620 EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
621 func: UnmaterializableFunc::MzNow,
622 context: "this",
623 }),
624 },
625 UnmaterializableFunc::MzRoleOidMemberships => {
626 let role_memberships = role_oid_memberships(state);
627 let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
628 .into_iter()
629 .map(|(role_id, role_membership)| {
630 (
631 role_id.to_string(),
632 role_membership
633 .into_iter()
634 .map(|role_id| role_id.to_string())
635 .collect(),
636 )
637 })
638 .collect();
639 role_memberships.sort();
640 let mut row = Row::default();
641 row.packer().push_dict_with(|row| {
642 for (role_id, role_membership) in &role_memberships {
643 row.push(Datum::from(role_id.as_str()));
644 row.try_push_array(
645 &[ArrayDimension {
646 lower_bound: 1,
647 length: role_membership.len(),
648 }],
649 role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
650 ).expect("role_membership is 1 dimensional, and its length is used for the array length");
651 }
652 });
653 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
654 }
655 UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
656 UnmaterializableFunc::MzUptime => {
657 let uptime = state.config().start_instant.elapsed();
658 let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
659 pack(uptime)
660 }
661 UnmaterializableFunc::MzVersion => pack(Datum::from(
662 &*state
663 .config()
664 .build_info
665 .human_version(state.config().helm_chart_version.clone()),
666 )),
667 UnmaterializableFunc::MzVersionNum => {
668 pack(Datum::Int32(state.config().build_info.version_num()))
669 }
670 UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
671 session.conn_id().unhandled(),
672 ))),
673 UnmaterializableFunc::PgPostmasterStartTime => {
674 let t: Datum = state.config().start_time.try_into()?;
675 pack(t)
676 }
677 UnmaterializableFunc::Version => {
678 let build_info = state.config().build_info;
679 let version = format!(
680 "PostgreSQL {}.{} on {} (Materialize {})",
681 SERVER_MAJOR_VERSION,
682 SERVER_MINOR_VERSION,
683 mz_build_info::TARGET_TRIPLE,
684 build_info.version,
685 );
686 pack(Datum::from(&*version))
687 }
688 }
689}
690
691fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
692 let mut role_memberships = BTreeMap::new();
693 for role_id in catalog.get_roles() {
694 let role = catalog.get_role(role_id);
695 if !role_memberships.contains_key(&role.oid) {
696 role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
697 }
698 }
699 role_memberships
700}
701
702fn role_oid_memberships_inner<'a>(
703 catalog: &'a CatalogState,
704 role_id: &RoleId,
705 role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
706) {
707 let role = catalog.get_role(role_id);
708 role_memberships.insert(role.oid, btreeset! {role.oid});
709 for parent_role_id in role.membership.map.keys() {
710 let parent_role = catalog.get_role(parent_role_id);
711 if !role_memberships.contains_key(&parent_role.oid) {
712 role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
713 }
714 let parent_membership: BTreeSet<_> = role_memberships
715 .get(&parent_role.oid)
716 .expect("inserted in recursive call above")
717 .into_iter()
718 .cloned()
719 .collect();
720 role_memberships
721 .get_mut(&role.oid)
722 .expect("inserted above")
723 .extend(parent_membership);
724 }
725}