1use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, TableDataSource, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30 CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31 RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, ReprRelationType, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56 instance_id: ComputeInstanceId,
57 collections: Option<BTreeSet<GlobalId>>,
62}
63
64impl ComputeInstanceSnapshot {
65 pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
66 controller
67 .compute
68 .collection_ids(id)
69 .map(|collection_ids| Self {
70 instance_id: id,
71 collections: Some(collection_ids.collect()),
72 })
73 }
74
75 pub fn new_from_parts(instance_id: ComputeInstanceId, collections: BTreeSet<GlobalId>) -> Self {
76 Self {
77 instance_id,
78 collections: Some(collections),
79 }
80 }
81
82 pub fn new_without_collections(instance_id: ComputeInstanceId) -> Self {
83 Self {
84 instance_id,
85 collections: None,
86 }
87 }
88
89 pub fn instance_id(&self) -> ComputeInstanceId {
91 self.instance_id
92 }
93
94 pub fn contains_collection(&self, id: &GlobalId) -> bool {
97 self.collections
98 .as_ref()
99 .map_or(true, |collections| collections.contains(id))
100 }
101
102 pub fn insert_collection(&mut self, id: GlobalId) {
104 self.collections
105 .as_mut()
106 .expect("insert_collection called on snapshot with None collections")
107 .insert(id);
108 }
109}
110
111#[derive(Debug)]
113pub struct DataflowBuilder<'a> {
114 pub catalog: &'a dyn OptimizerCatalog,
115 pub compute: ComputeInstanceSnapshot,
120 pub replan: Option<GlobalId>,
129 recursion_guard: RecursionGuard,
131}
132
133pub trait ExprPrep {
135 fn prep_relation_expr(&self, expr: &mut OptimizedMirRelationExpr)
137 -> Result<(), OptimizerError>;
138
139 fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError>;
141}
142
143pub struct ExprPrepNoop;
145impl ExprPrep for ExprPrepNoop {
146 fn prep_relation_expr(&self, _: &mut OptimizedMirRelationExpr) -> Result<(), OptimizerError> {
147 Ok(())
148 }
149 fn prep_scalar_expr(&self, _expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
150 Ok(())
151 }
152}
153
154pub struct ExprPrepMaintained;
157
158impl ExprPrep for ExprPrepMaintained {
159 fn prep_relation_expr(
160 &self,
161 expr: &mut OptimizedMirRelationExpr,
162 ) -> Result<(), OptimizerError> {
163 expr.0.try_visit_mut_post(&mut |e| {
164 if let MirRelationExpr::Filter { input, predicates } = &*e {
166 let mfp = MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
167 match mfp.into_plan() {
168 Err(e) => Err(OptimizerError::UnsupportedTemporalExpression(e)),
169 Ok(mut mfp) => {
170 for s in mfp.iter_nontemporal_exprs() {
171 self.prep_scalar_expr(s)?;
172 }
173 Ok(())
174 }
175 }
176 } else {
177 e.try_visit_scalars_mut1(&mut |s| self.prep_scalar_expr(s))
178 }
179 })
180 }
181
182 fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
183 let mut last_observed_unmaterializable_func = None;
185 expr.visit_mut_post(&mut |e| {
186 if let MirScalarExpr::CallUnmaterializable(f) = e {
187 last_observed_unmaterializable_func = Some(f.clone());
188 }
189 })?;
190
191 if let Some(f) = last_observed_unmaterializable_func {
192 Err(OptimizerError::UnmaterializableFunction(f))
193 } else {
194 Ok(())
195 }
196 }
197}
198
199pub struct ExprPrepOneShot<'a> {
202 pub logical_time: EvalTime,
203 pub session: &'a dyn SessionMetadata,
204 pub catalog_state: &'a CatalogState,
205}
206
207impl ExprPrep for ExprPrepOneShot<'_> {
208 fn prep_relation_expr(
209 &self,
210 expr: &mut OptimizedMirRelationExpr,
211 ) -> Result<(), OptimizerError> {
212 expr.0
213 .try_visit_scalars_mut(&mut |s| self.prep_scalar_expr(s))
214 }
215
216 fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
217 expr.try_visit_mut_post(&mut |e| {
220 if let MirScalarExpr::CallUnmaterializable(f) = e {
221 *e = eval_unmaterializable_func(
222 self.catalog_state,
223 f,
224 self.logical_time,
225 self.session,
226 )?;
227 }
228 Ok(())
229 })
230 }
231}
232
233pub struct ExprPrepWebhookValidation {
236 pub now: DateTime<Utc>,
238}
239
240impl ExprPrep for ExprPrepWebhookValidation {
241 fn prep_relation_expr(
242 &self,
243 expr: &mut OptimizedMirRelationExpr,
244 ) -> Result<(), OptimizerError> {
245 expr.0
246 .try_visit_scalars_mut(&mut |s| self.prep_scalar_expr(s))
247 }
248
249 fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
250 let now = self.now;
251 expr.try_visit_mut_post(&mut |e| {
252 if let MirScalarExpr::CallUnmaterializable(f @ UnmaterializableFunc::CurrentTimestamp) =
253 e
254 {
255 let now: Datum = now.try_into()?;
256 let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
257 *e = const_expr;
258 }
259 Ok(())
260 })
261 }
262}
263
264#[derive(Clone, Copy, Debug)]
265pub enum EvalTime {
266 Time(mz_repr::Timestamp),
267 Deferred,
269 NotAvailable,
271}
272
273pub fn dataflow_import_id_bundle<P>(
275 dataflow: &DataflowDescription<P>,
276 compute_instance: ComputeInstanceId,
277) -> CollectionIdBundle {
278 let storage_ids = dataflow.source_imports.keys().copied().collect();
279 let compute_ids = dataflow.index_imports.keys().copied().collect();
280 CollectionIdBundle {
281 storage_ids,
282 compute_ids: btreemap! {compute_instance => compute_ids},
283 }
284}
285
286impl<'a> DataflowBuilder<'a> {
287 pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
288 Self {
289 catalog,
290 compute,
291 replan: None,
292 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
293 }
294 }
295
296 pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
301 self.replan = config.replan;
302 self
303 }
304
305 pub fn import_into_dataflow(
311 &mut self,
312 id: &GlobalId,
313 dataflow: &mut DataflowDesc,
314 features: &OptimizerFeatures,
315 ) -> Result<(), OptimizerError> {
316 maybe_grow(|| {
317 if dataflow.is_imported(id) {
319 return Ok(());
320 }
321
322 let monotonic = self.monotonic_object(*id, features);
323
324 let mut valid_indexes = self.indexes_on(*id).peekable();
329 if valid_indexes.peek().is_some() {
330 for (index_id, idx) in valid_indexes {
331 let index_desc = IndexDesc {
332 on_id: *id,
333 key: idx.keys.to_vec(),
334 };
335 let entry = self.catalog.get_entry(id);
336 let desc = entry
337 .relation_desc()
338 .expect("indexes can only be built on items with descs");
339 dataflow.import_index(
340 index_id,
341 index_desc,
342 ReprRelationType::from(desc.typ()),
343 monotonic,
344 );
345 }
346 } else {
347 drop(valid_indexes);
348 let entry = self.catalog.get_entry(id);
349 match entry.item() {
351 CatalogItem::Table(table) => {
352 dataflow.import_source(*id, table.desc_for(id).into_typ(), monotonic);
353 }
354 CatalogItem::Source(source) => {
355 dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
356 }
357 CatalogItem::View(view) => {
358 let expr = view.optimized_expr.as_ref();
359 self.import_view_into_dataflow(id, expr, dataflow, features)?;
360 }
361 CatalogItem::MaterializedView(mview) if mview.replacement_target.is_some() => {
362 let expr = mview.optimized_expr.as_ref();
364 self.import_view_into_dataflow(id, expr, dataflow, features)?;
365 }
366 CatalogItem::MaterializedView(mview) => {
367 dataflow.import_source(*id, mview.desc_for(id).into_typ(), monotonic);
368 }
369 CatalogItem::Log(log) => {
370 dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
371 }
372 CatalogItem::ContinualTask(ct) => {
373 dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
374 }
375 CatalogItem::Sink(_)
376 | CatalogItem::Index(_)
377 | CatalogItem::Type(_)
378 | CatalogItem::Func(_)
379 | CatalogItem::Secret(_)
380 | CatalogItem::Connection(_) => {
381 unreachable!()
383 }
384 }
385 }
386 Ok(())
387 })
388 }
389
390 pub fn import_view_into_dataflow(
400 &mut self,
401 view_id: &GlobalId,
402 view: &OptimizedMirRelationExpr,
403 dataflow: &mut DataflowDesc,
404 features: &OptimizerFeatures,
405 ) -> Result<(), OptimizerError> {
406 for get_id in view.depends_on() {
407 self.import_into_dataflow(&get_id, dataflow, features)?;
408 }
409 dataflow.insert_plan(*view_id, view.clone());
410 Ok(())
411 }
412
413 pub fn maybe_reoptimize_imported_views(
416 &self,
417 df_desc: &mut DataflowDesc,
418 config: &OptimizerConfig,
419 ) -> Result<(), OptimizerError> {
420 if !config.features.reoptimize_imported_views {
421 return Ok(()); }
423
424 let mut view_optimizer = view::Optimizer::new(config.clone(), None);
425 for desc in df_desc.objects_to_build.iter_mut().rev() {
426 if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
427 continue; }
429 if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
430 let _span = tracing::span!(
431 target: "optimizer",
432 tracing::Level::DEBUG,
433 "view",
434 path.segment = desc.id.to_string()
435 )
436 .entered();
437
438 desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
440
441 trace_plan(desc.plan.as_inner());
443 }
444 }
445
446 Ok(())
447 }
448
449 fn monotonic_source(&self, data_source: &DataSourceDesc) -> bool {
451 match data_source {
452 DataSourceDesc::Ingestion { .. } => false,
453 DataSourceDesc::OldSyntaxIngestion {
454 desc, data_config, ..
455 } => data_config.monotonic(&desc.connection),
456 DataSourceDesc::Webhook { .. } => true,
457 DataSourceDesc::IngestionExport {
458 ingestion_id,
459 data_config,
460 ..
461 } => {
462 let source_desc = self
463 .catalog
464 .get_entry_by_item_id(ingestion_id)
465 .source_desc()
466 .expect("ingestion export must reference a source")
467 .expect("ingestion export must reference a source");
468 data_config.monotonic(&source_desc.connection)
469 }
470 DataSourceDesc::Introspection(_)
471 | DataSourceDesc::Progress
472 | DataSourceDesc::Catalog => false,
473 }
474 }
475
476 fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
486 self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
487 .unwrap_or_else(|e| {
488 warn!(%id, "error inspecting object for monotonicity: {e}");
489 false
490 })
491 }
492
493 fn monotonic_object_inner(
494 &self,
495 id: GlobalId,
496 memo: &mut BTreeMap<GlobalId, bool>,
497 features: &OptimizerFeatures,
498 ) -> Result<bool, RecursionLimitError> {
499 if let Some(monotonic) = memo.get(&id) {
502 return Ok(*monotonic);
503 }
504
505 let monotonic = self.checked_recur(|_| {
506 match self.catalog.get_entry(&id).item() {
507 CatalogItem::Source(source) => Ok(self.monotonic_source(&source.data_source)),
508 CatalogItem::Table(table) => match &table.data_source {
509 TableDataSource::TableWrites { .. } => Ok(false),
510 TableDataSource::DataSource { desc, timeline: _ } => {
511 Ok(self.monotonic_source(desc))
512 }
513 },
514 CatalogItem::View(View { optimized_expr, .. }) => {
515 let view_expr = optimized_expr.as_ref().clone().into_inner();
516
517 let mut monotonic_ids = BTreeSet::new();
520 let recursion_result: Result<(), RecursionLimitError> = view_expr
521 .try_visit_post(&mut |e| {
522 if let MirRelationExpr::Get {
523 id: Id::Global(got_id),
524 ..
525 } = e
526 {
527 if self.monotonic_object_inner(*got_id, memo, features)? {
528 monotonic_ids.insert(*got_id);
529 }
530 }
531 Ok(())
532 });
533 if let Err(error) = recursion_result {
534 warn!(%id, "error inspecting view for monotonicity: {error}");
537 }
538
539 let mut builder = DerivedBuilder::new(features);
540 builder.require(Monotonic::new(monotonic_ids.clone()));
541 let derived = builder.visit(&view_expr);
542
543 Ok(*derived
544 .as_view()
545 .value::<Monotonic>()
546 .expect("Expected monotonic result from non empty tree"))
547 }
548 CatalogItem::Index(Index { on, .. }) => {
549 self.monotonic_object_inner(*on, memo, features)
550 }
551 CatalogItem::Secret(_)
552 | CatalogItem::Type(_)
553 | CatalogItem::Connection(_)
554 | CatalogItem::Log(_)
555 | CatalogItem::MaterializedView(_)
556 | CatalogItem::Sink(_)
557 | CatalogItem::Func(_)
558 | CatalogItem::ContinualTask(_) => Ok(false),
559 }
560 })?;
561
562 memo.insert(id, monotonic);
563
564 Ok(monotonic)
565 }
566}
567
568impl<'a> CheckedRecursion for DataflowBuilder<'a> {
569 fn recursion_guard(&self) -> &RecursionGuard {
570 &self.recursion_guard
571 }
572}
573
574fn eval_unmaterializable_func(
575 state: &CatalogState,
576 f: &UnmaterializableFunc,
577 logical_time: EvalTime,
578 session: &dyn SessionMetadata,
579) -> Result<MirScalarExpr, OptimizerError> {
580 let pack_1d_array = |datums: Vec<Datum>| {
581 let mut row = Row::default();
582 row.packer()
583 .try_push_array(
584 &[ArrayDimension {
585 lower_bound: 1,
586 length: datums.len(),
587 }],
588 datums,
589 )
590 .expect("known to be a valid array");
591 Ok(MirScalarExpr::literal_from_single_element_row(
592 row,
593 f.output_type().scalar_type,
594 ))
595 };
596 let pack_dict = |mut datums: Vec<(String, String)>| {
597 datums.sort();
598 let mut row = Row::default();
599 row.packer().push_dict(
600 datums
601 .iter()
602 .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
603 );
604 Ok(MirScalarExpr::literal_from_single_element_row(
605 row,
606 f.output_type().scalar_type,
607 ))
608 };
609 let pack = |datum| {
610 Ok(MirScalarExpr::literal_ok(
611 datum,
612 f.output_type().scalar_type,
613 ))
614 };
615
616 match f {
617 UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
618 UnmaterializableFunc::CurrentSchema => {
619 let search_path = state.resolve_search_path(session);
620 let schema = search_path
621 .first()
622 .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
623 pack(Datum::from(schema))
624 }
625 UnmaterializableFunc::CurrentSchemasWithSystem => {
626 let search_path = state.resolve_search_path(session);
627 let search_path = state.effective_search_path(&search_path, false);
628 pack_1d_array(
629 search_path
630 .into_iter()
631 .map(|(db, schema)| {
632 let schema = state.get_schema(&db, &schema, session.conn_id());
633 Datum::String(&schema.name.schema)
634 })
635 .collect(),
636 )
637 }
638 UnmaterializableFunc::CurrentSchemasWithoutSystem => {
639 let search_path = state.resolve_search_path(session);
640 pack_1d_array(
641 search_path
642 .into_iter()
643 .map(|(db, schema)| {
644 let schema = state.get_schema(&db, &schema, session.conn_id());
645 Datum::String(&schema.name.schema)
646 })
647 .collect(),
648 )
649 }
650 UnmaterializableFunc::ViewableVariables => pack_dict(
651 viewable_variables(state, session)
652 .map(|var| (var.name().to_lowercase(), var.value()))
653 .collect(),
654 ),
655 UnmaterializableFunc::CurrentTimestamp => {
656 let t: Datum = session.pcx().wall_time.try_into()?;
657 pack(t)
658 }
659 UnmaterializableFunc::CurrentUser => pack(Datum::from(
660 state.get_role(session.current_role_id()).name(),
661 )),
662 UnmaterializableFunc::SessionUser => pack(Datum::from(
663 state.get_role(session.session_role_id()).name(),
664 )),
665 UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
666 rbac::is_rbac_enabled_for_session(state.system_config(), session),
667 )),
668 UnmaterializableFunc::MzEnvironmentId => {
669 pack(Datum::from(&*state.config().environment_id.to_string()))
670 }
671 UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
672 UnmaterializableFunc::MzNow => match logical_time {
673 EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
674 EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
675 EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
676 func: UnmaterializableFunc::MzNow,
677 context: "this",
678 }),
679 },
680 UnmaterializableFunc::MzRoleOidMemberships => {
681 let role_memberships = role_oid_memberships(state);
682 let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
683 .into_iter()
684 .map(|(role_id, role_membership)| {
685 (
686 role_id.to_string(),
687 role_membership
688 .into_iter()
689 .map(|role_id| role_id.to_string())
690 .collect(),
691 )
692 })
693 .collect();
694 role_memberships.sort();
695 let mut row = Row::default();
696 row.packer().push_dict_with(|row| {
697 for (role_id, role_membership) in &role_memberships {
698 row.push(Datum::from(role_id.as_str()));
699 row.try_push_array(
700 &[ArrayDimension {
701 lower_bound: 1,
702 length: role_membership.len(),
703 }],
704 role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
705 ).expect("role_membership is 1 dimensional, and its length is used for the array length");
706 }
707 });
708 Ok(MirScalarExpr::literal_from_single_element_row(
709 row,
710 f.output_type().scalar_type,
711 ))
712 }
713 UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
714 UnmaterializableFunc::MzUptime => {
715 let uptime = state.config().start_instant.elapsed();
716 let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
717 pack(uptime)
718 }
719 UnmaterializableFunc::MzVersion => pack(Datum::from(
720 &*state
721 .config()
722 .build_info
723 .human_version(state.config().helm_chart_version.clone()),
724 )),
725 UnmaterializableFunc::MzVersionNum => {
726 pack(Datum::Int32(state.config().build_info.version_num()))
727 }
728 UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
729 session.conn_id().unhandled(),
730 ))),
731 UnmaterializableFunc::PgPostmasterStartTime => {
732 let t: Datum = state.config().start_time.try_into()?;
733 pack(t)
734 }
735 UnmaterializableFunc::Version => {
736 let build_info = state.config().build_info;
737 let version = format!(
738 "PostgreSQL {}.{} on {} (Materialize {})",
739 SERVER_MAJOR_VERSION,
740 SERVER_MINOR_VERSION,
741 mz_build_info::TARGET_TRIPLE,
742 build_info.version,
743 );
744 pack(Datum::from(&*version))
745 }
746 }
747}
748
749fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
750 let mut role_memberships = BTreeMap::new();
751 for role_id in catalog.get_roles() {
752 let role = catalog.get_role(role_id);
753 if !role_memberships.contains_key(&role.oid) {
754 role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
755 }
756 }
757 role_memberships
758}
759
760fn role_oid_memberships_inner<'a>(
761 catalog: &'a CatalogState,
762 role_id: &RoleId,
763 role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
764) {
765 let role = catalog.get_role(role_id);
766 role_memberships.insert(role.oid, btreeset! {role.oid});
767 for parent_role_id in role.membership.map.keys() {
768 let parent_role = catalog.get_role(parent_role_id);
769 if !role_memberships.contains_key(&parent_role.oid) {
770 role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
771 }
772 let parent_membership: BTreeSet<_> = role_memberships
773 .get(&parent_role.oid)
774 .expect("inserted in recursive call above")
775 .into_iter()
776 .cloned()
777 .collect();
778 role_memberships
779 .get_mut(&role.oid)
780 .expect("inserted above")
781 .extend(parent_membership);
782 }
783}