1use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, Source, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30 CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31 RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56 instance_id: ComputeInstanceId,
57 collections: BTreeSet<GlobalId>,
58}
59
60impl ComputeInstanceSnapshot {
61 pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
62 controller
63 .compute
64 .collection_ids(id)
65 .map(|collection_ids| Self {
66 instance_id: id,
67 collections: collection_ids.collect(),
68 })
69 }
70
71 pub fn instance_id(&self) -> ComputeInstanceId {
73 self.instance_id
74 }
75
76 pub fn contains_collection(&self, id: &GlobalId) -> bool {
78 self.collections.contains(id)
79 }
80
81 pub fn insert_collection(&mut self, id: GlobalId) {
83 self.collections.insert(id);
84 }
85}
86
87#[derive(Debug)]
89pub struct DataflowBuilder<'a> {
90 pub catalog: &'a dyn OptimizerCatalog,
91 pub compute: ComputeInstanceSnapshot,
96 pub replan: Option<GlobalId>,
105 recursion_guard: RecursionGuard,
107}
108
109#[derive(Clone, Copy, Debug)]
111pub enum ExprPrepStyle<'a> {
112 Index,
114 OneShot {
117 logical_time: EvalTime,
118 session: &'a dyn SessionMetadata,
119 catalog_state: &'a CatalogState,
120 },
121 AsOfUpTo,
123 WebhookValidation {
125 now: DateTime<Utc>,
127 },
128}
129
130#[derive(Clone, Copy, Debug)]
131pub enum EvalTime {
132 Time(mz_repr::Timestamp),
133 Deferred,
135 NotAvailable,
137}
138
139pub fn dataflow_import_id_bundle<P>(
141 dataflow: &DataflowDescription<P>,
142 compute_instance: ComputeInstanceId,
143) -> CollectionIdBundle {
144 let storage_ids = dataflow.source_imports.keys().copied().collect();
145 let compute_ids = dataflow.index_imports.keys().copied().collect();
146 CollectionIdBundle {
147 storage_ids,
148 compute_ids: btreemap! {compute_instance => compute_ids},
149 }
150}
151
152impl<'a> DataflowBuilder<'a> {
153 pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
154 Self {
155 catalog,
156 compute,
157 replan: None,
158 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
159 }
160 }
161
162 pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
167 self.replan = config.replan;
168 self
169 }
170
171 pub fn import_into_dataflow(
175 &mut self,
176 id: &GlobalId,
177 dataflow: &mut DataflowDesc,
178 features: &OptimizerFeatures,
179 ) -> Result<(), OptimizerError> {
180 maybe_grow(|| {
181 if dataflow.is_imported(id) {
183 return Ok(());
184 }
185
186 let monotonic = self.monotonic_object(*id, features);
187
188 let mut valid_indexes = self.indexes_on(*id).peekable();
193 if valid_indexes.peek().is_some() {
194 for (index_id, idx) in valid_indexes {
195 let index_desc = IndexDesc {
196 on_id: *id,
197 key: idx.keys.to_vec(),
198 };
199 let entry = self.catalog.get_entry(id);
200 let desc = entry
201 .desc(
202 &self
203 .catalog
204 .resolve_full_name(entry.name(), entry.conn_id()),
205 )
206 .expect("indexes can only be built on items with descs");
207 dataflow.import_index(index_id, index_desc, desc.typ().clone(), monotonic);
208 }
209 } else {
210 drop(valid_indexes);
211 let entry = self.catalog.get_entry(id);
212 match entry.item() {
213 CatalogItem::Table(table) => {
214 dataflow.import_source(*id, table.desc_for(id).typ().clone(), monotonic);
215 }
216 CatalogItem::Source(source) => {
217 dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
218 }
219 CatalogItem::View(view) => {
220 let expr = view.optimized_expr.as_ref();
221 self.import_view_into_dataflow(id, expr, dataflow, features)?;
222 }
223 CatalogItem::MaterializedView(mview) => {
224 dataflow.import_source(*id, mview.desc.typ().clone(), monotonic);
225 }
226 CatalogItem::Log(log) => {
227 dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
228 }
229 CatalogItem::ContinualTask(ct) => {
230 dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
231 }
232 _ => unreachable!(),
233 }
234 }
235 Ok(())
236 })
237 }
238
239 pub fn import_view_into_dataflow(
249 &mut self,
250 view_id: &GlobalId,
251 view: &OptimizedMirRelationExpr,
252 dataflow: &mut DataflowDesc,
253 features: &OptimizerFeatures,
254 ) -> Result<(), OptimizerError> {
255 for get_id in view.depends_on() {
256 self.import_into_dataflow(&get_id, dataflow, features)?;
257 }
258 dataflow.insert_plan(*view_id, view.clone());
259 Ok(())
260 }
261
262 pub fn maybe_reoptimize_imported_views(
265 &self,
266 df_desc: &mut DataflowDesc,
267 config: &OptimizerConfig,
268 ) -> Result<(), OptimizerError> {
269 if !config.features.reoptimize_imported_views {
270 return Ok(()); }
272
273 let mut view_optimizer = view::Optimizer::new(config.clone(), None);
274 for desc in df_desc.objects_to_build.iter_mut().rev() {
275 if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
276 continue; }
278 if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
279 let _span = tracing::span!(
280 target: "optimizer",
281 tracing::Level::DEBUG,
282 "view",
283 path.segment = desc.id.to_string()
284 )
285 .entered();
286
287 desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
289
290 trace_plan(desc.plan.as_inner());
292 }
293 }
294
295 Ok(())
296 }
297
298 fn monotonic_source(&self, source: &Source) -> bool {
300 match &source.data_source {
301 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
302 ingestion_desc
304 .desc
305 .primary_export
306 .monotonic(&ingestion_desc.desc.connection)
307 }
308 DataSourceDesc::Webhook { .. } => true,
309 DataSourceDesc::IngestionExport {
310 ingestion_id,
311 data_config,
312 ..
313 } => {
314 let source_desc = self
315 .catalog
316 .get_entry_by_item_id(ingestion_id)
317 .source_desc()
318 .expect("ingestion export must reference a source")
319 .expect("ingestion export must reference a source");
320 data_config.monotonic(&source_desc.connection)
321 }
322 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => false,
323 }
324 }
325
326 fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
336 self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
337 .unwrap_or_else(|e| {
338 warn!(%id, "error inspecting object for monotonicity: {e}");
339 false
340 })
341 }
342
343 fn monotonic_object_inner(
344 &self,
345 id: GlobalId,
346 memo: &mut BTreeMap<GlobalId, bool>,
347 features: &OptimizerFeatures,
348 ) -> Result<bool, RecursionLimitError> {
349 if let Some(monotonic) = memo.get(&id) {
352 return Ok(*monotonic);
353 }
354
355 let monotonic = self.checked_recur(|_| {
356 match self.catalog.get_entry(&id).item() {
357 CatalogItem::Source(source) => Ok(self.monotonic_source(source)),
358 CatalogItem::View(View { optimized_expr, .. }) => {
359 let view_expr = optimized_expr.as_ref().clone().into_inner();
360
361 let mut monotonic_ids = BTreeSet::new();
364 let recursion_result: Result<(), RecursionLimitError> = view_expr
365 .try_visit_post(&mut |e| {
366 if let MirRelationExpr::Get {
367 id: Id::Global(got_id),
368 ..
369 } = e
370 {
371 if self.monotonic_object_inner(*got_id, memo, features)? {
372 monotonic_ids.insert(*got_id);
373 }
374 }
375 Ok(())
376 });
377 if let Err(error) = recursion_result {
378 warn!(%id, "error inspecting view for monotonicity: {error}");
381 }
382
383 let mut builder = DerivedBuilder::new(features);
384 builder.require(Monotonic::new(monotonic_ids.clone()));
385 let derived = builder.visit(&view_expr);
386
387 Ok(*derived
388 .as_view()
389 .value::<Monotonic>()
390 .expect("Expected monotonic result from non empty tree"))
391 }
392 CatalogItem::Index(Index { on, .. }) => {
393 self.monotonic_object_inner(*on, memo, features)
394 }
395 CatalogItem::Secret(_)
396 | CatalogItem::Type(_)
397 | CatalogItem::Connection(_)
398 | CatalogItem::Table(_)
399 | CatalogItem::Log(_)
400 | CatalogItem::MaterializedView(_)
401 | CatalogItem::Sink(_)
402 | CatalogItem::Func(_)
403 | CatalogItem::ContinualTask(_) => Ok(false),
404 }
405 })?;
406
407 memo.insert(id, monotonic);
408
409 Ok(monotonic)
410 }
411}
412
413impl<'a> CheckedRecursion for DataflowBuilder<'a> {
414 fn recursion_guard(&self) -> &RecursionGuard {
415 &self.recursion_guard
416 }
417}
418
419pub fn prep_relation_expr(
423 expr: &mut OptimizedMirRelationExpr,
424 style: ExprPrepStyle,
425) -> Result<(), OptimizerError> {
426 match style {
427 ExprPrepStyle::Index => {
428 expr.0.try_visit_mut_post(&mut |e| {
429 if let MirRelationExpr::Filter { input, predicates } = &*e {
431 let mfp =
432 MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
433 match mfp.into_plan() {
434 Err(e) => Err(OptimizerError::Internal(e)),
435 Ok(mut mfp) => {
436 for s in mfp.iter_nontemporal_exprs() {
437 prep_scalar_expr(s, style)?;
438 }
439 Ok(())
440 }
441 }
442 } else {
443 e.try_visit_scalars_mut1(&mut |s| prep_scalar_expr(s, style))
444 }
445 })
446 }
447 ExprPrepStyle::OneShot { .. }
448 | ExprPrepStyle::AsOfUpTo
449 | ExprPrepStyle::WebhookValidation { .. } => expr
450 .0
451 .try_visit_scalars_mut(&mut |s| prep_scalar_expr(s, style)),
452 }
453}
454
455pub fn prep_scalar_expr(
467 expr: &mut MirScalarExpr,
468 style: ExprPrepStyle,
469) -> Result<(), OptimizerError> {
470 match style {
471 ExprPrepStyle::OneShot {
474 logical_time,
475 session,
476 catalog_state,
477 } => expr.try_visit_mut_post(&mut |e| {
478 if let MirScalarExpr::CallUnmaterializable(f) = e {
479 *e = eval_unmaterializable_func(catalog_state, f, logical_time, session)?;
480 }
481 Ok(())
482 }),
483
484 ExprPrepStyle::Index | ExprPrepStyle::AsOfUpTo => {
486 let mut last_observed_unmaterializable_func = None;
487 expr.visit_mut_post(&mut |e| {
488 if let MirScalarExpr::CallUnmaterializable(f) = e {
489 last_observed_unmaterializable_func = Some(f.clone());
490 }
491 })?;
492
493 if let Some(f) = last_observed_unmaterializable_func {
494 let err = match style {
495 ExprPrepStyle::Index => OptimizerError::UnmaterializableFunction(f),
496 ExprPrepStyle::AsOfUpTo => OptimizerError::UncallableFunction {
497 func: f,
498 context: "AS OF or UP TO",
499 },
500 _ => unreachable!(),
501 };
502 return Err(err);
503 }
504 Ok(())
505 }
506
507 ExprPrepStyle::WebhookValidation { now } => {
508 expr.try_visit_mut_post(&mut |e| {
509 if let MirScalarExpr::CallUnmaterializable(
510 f @ UnmaterializableFunc::CurrentTimestamp,
511 ) = e
512 {
513 let now: Datum = now.try_into()?;
514 let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
515 *e = const_expr;
516 }
517 Ok::<_, anyhow::Error>(())
518 })?;
519 Ok(())
520 }
521 }
522}
523
524fn eval_unmaterializable_func(
525 state: &CatalogState,
526 f: &UnmaterializableFunc,
527 logical_time: EvalTime,
528 session: &dyn SessionMetadata,
529) -> Result<MirScalarExpr, OptimizerError> {
530 let pack_1d_array = |datums: Vec<Datum>| {
531 let mut row = Row::default();
532 row.packer()
533 .try_push_array(
534 &[ArrayDimension {
535 lower_bound: 1,
536 length: datums.len(),
537 }],
538 datums,
539 )
540 .expect("known to be a valid array");
541 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
542 };
543 let pack_dict = |mut datums: Vec<(String, String)>| {
544 datums.sort();
545 let mut row = Row::default();
546 row.packer().push_dict(
547 datums
548 .iter()
549 .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
550 );
551 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
552 };
553 let pack = |datum| {
554 Ok(MirScalarExpr::literal_ok(
555 datum,
556 f.output_type().scalar_type,
557 ))
558 };
559
560 match f {
561 UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
562 UnmaterializableFunc::CurrentSchema => {
563 let search_path = state.resolve_search_path(session);
564 let schema = search_path
565 .first()
566 .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
567 pack(Datum::from(schema))
568 }
569 UnmaterializableFunc::CurrentSchemasWithSystem => {
570 let search_path = state.resolve_search_path(session);
571 let search_path = state.effective_search_path(&search_path, false);
572 pack_1d_array(
573 search_path
574 .into_iter()
575 .map(|(db, schema)| {
576 let schema = state.get_schema(&db, &schema, session.conn_id());
577 Datum::String(&schema.name.schema)
578 })
579 .collect(),
580 )
581 }
582 UnmaterializableFunc::CurrentSchemasWithoutSystem => {
583 let search_path = state.resolve_search_path(session);
584 pack_1d_array(
585 search_path
586 .into_iter()
587 .map(|(db, schema)| {
588 let schema = state.get_schema(&db, &schema, session.conn_id());
589 Datum::String(&schema.name.schema)
590 })
591 .collect(),
592 )
593 }
594 UnmaterializableFunc::ViewableVariables => pack_dict(
595 viewable_variables(state, session)
596 .map(|var| (var.name().to_lowercase(), var.value()))
597 .collect(),
598 ),
599 UnmaterializableFunc::CurrentTimestamp => {
600 let t: Datum = session.pcx().wall_time.try_into()?;
601 pack(t)
602 }
603 UnmaterializableFunc::CurrentUser => pack(Datum::from(
604 state.get_role(session.current_role_id()).name(),
605 )),
606 UnmaterializableFunc::SessionUser => pack(Datum::from(
607 state.get_role(session.session_role_id()).name(),
608 )),
609 UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
610 rbac::is_rbac_enabled_for_session(state.system_config(), session),
611 )),
612 UnmaterializableFunc::MzEnvironmentId => {
613 pack(Datum::from(&*state.config().environment_id.to_string()))
614 }
615 UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
616 UnmaterializableFunc::MzNow => match logical_time {
617 EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
618 EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
619 EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
620 func: UnmaterializableFunc::MzNow,
621 context: "this",
622 }),
623 },
624 UnmaterializableFunc::MzRoleOidMemberships => {
625 let role_memberships = role_oid_memberships(state);
626 let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
627 .into_iter()
628 .map(|(role_id, role_membership)| {
629 (
630 role_id.to_string(),
631 role_membership
632 .into_iter()
633 .map(|role_id| role_id.to_string())
634 .collect(),
635 )
636 })
637 .collect();
638 role_memberships.sort();
639 let mut row = Row::default();
640 row.packer().push_dict_with(|row| {
641 for (role_id, role_membership) in &role_memberships {
642 row.push(Datum::from(role_id.as_str()));
643 row.try_push_array(
644 &[ArrayDimension {
645 lower_bound: 1,
646 length: role_membership.len(),
647 }],
648 role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
649 ).expect("role_membership is 1 dimensional, and its length is used for the array length");
650 }
651 });
652 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
653 }
654 UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
655 UnmaterializableFunc::MzUptime => {
656 let uptime = state.config().start_instant.elapsed();
657 let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
658 pack(uptime)
659 }
660 UnmaterializableFunc::MzVersion => pack(Datum::from(
661 &*state
662 .config()
663 .build_info
664 .human_version(state.config().helm_chart_version.clone()),
665 )),
666 UnmaterializableFunc::MzVersionNum => {
667 pack(Datum::Int32(state.config().build_info.version_num()))
668 }
669 UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
670 session.conn_id().unhandled(),
671 ))),
672 UnmaterializableFunc::PgPostmasterStartTime => {
673 let t: Datum = state.config().start_time.try_into()?;
674 pack(t)
675 }
676 UnmaterializableFunc::Version => {
677 let build_info = state.config().build_info;
678 let version = format!(
679 "PostgreSQL {}.{} on {} (Materialize {})",
680 SERVER_MAJOR_VERSION,
681 SERVER_MINOR_VERSION,
682 mz_build_info::TARGET_TRIPLE,
683 build_info.version,
684 );
685 pack(Datum::from(&*version))
686 }
687 }
688}
689
690fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
691 let mut role_memberships = BTreeMap::new();
692 for role_id in catalog.get_roles() {
693 let role = catalog.get_role(role_id);
694 if !role_memberships.contains_key(&role.oid) {
695 role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
696 }
697 }
698 role_memberships
699}
700
701fn role_oid_memberships_inner<'a>(
702 catalog: &'a CatalogState,
703 role_id: &RoleId,
704 role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
705) {
706 let role = catalog.get_role(role_id);
707 role_memberships.insert(role.oid, btreeset! {role.oid});
708 for parent_role_id in role.membership.map.keys() {
709 let parent_role = catalog.get_role(parent_role_id);
710 if !role_memberships.contains_key(&parent_role.oid) {
711 role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
712 }
713 let parent_membership: BTreeSet<_> = role_memberships
714 .get(&parent_role.oid)
715 .expect("inserted in recursive call above")
716 .into_iter()
717 .cloned()
718 .collect();
719 role_memberships
720 .get_mut(&role.oid)
721 .expect("inserted above")
722 .extend(parent_membership);
723 }
724}