1use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, TableDataSource, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30 CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31 RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56 instance_id: ComputeInstanceId,
57 collections: Option<BTreeSet<GlobalId>>,
62}
63
64impl ComputeInstanceSnapshot {
65 pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
66 controller
67 .compute
68 .collection_ids(id)
69 .map(|collection_ids| Self {
70 instance_id: id,
71 collections: Some(collection_ids.collect()),
72 })
73 }
74
75 pub fn new_from_parts(instance_id: ComputeInstanceId, collections: BTreeSet<GlobalId>) -> Self {
76 Self {
77 instance_id,
78 collections: Some(collections),
79 }
80 }
81
82 pub fn new_without_collections(instance_id: ComputeInstanceId) -> Self {
83 Self {
84 instance_id,
85 collections: None,
86 }
87 }
88
89 pub fn instance_id(&self) -> ComputeInstanceId {
91 self.instance_id
92 }
93
94 pub fn contains_collection(&self, id: &GlobalId) -> bool {
97 self.collections
98 .as_ref()
99 .map_or(true, |collections| collections.contains(id))
100 }
101
102 pub fn insert_collection(&mut self, id: GlobalId) {
104 self.collections
105 .as_mut()
106 .expect("insert_collection called on snapshot with None collections")
107 .insert(id);
108 }
109}
110
111#[derive(Debug)]
113pub struct DataflowBuilder<'a> {
114 pub catalog: &'a dyn OptimizerCatalog,
115 pub compute: ComputeInstanceSnapshot,
120 pub replan: Option<GlobalId>,
129 recursion_guard: RecursionGuard,
131}
132
133#[derive(Clone, Copy, Debug)]
135pub enum ExprPrepStyle<'a> {
136 Maintained,
139 OneShot {
142 logical_time: EvalTime,
143 session: &'a dyn SessionMetadata,
144 catalog_state: &'a CatalogState,
145 },
146 WebhookValidation {
148 now: DateTime<Utc>,
150 },
151}
152
153#[derive(Clone, Copy, Debug)]
154pub enum EvalTime {
155 Time(mz_repr::Timestamp),
156 Deferred,
158 NotAvailable,
160}
161
162pub fn dataflow_import_id_bundle<P>(
164 dataflow: &DataflowDescription<P>,
165 compute_instance: ComputeInstanceId,
166) -> CollectionIdBundle {
167 let storage_ids = dataflow.source_imports.keys().copied().collect();
168 let compute_ids = dataflow.index_imports.keys().copied().collect();
169 CollectionIdBundle {
170 storage_ids,
171 compute_ids: btreemap! {compute_instance => compute_ids},
172 }
173}
174
175impl<'a> DataflowBuilder<'a> {
176 pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
177 Self {
178 catalog,
179 compute,
180 replan: None,
181 recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
182 }
183 }
184
185 pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
190 self.replan = config.replan;
191 self
192 }
193
194 pub fn import_into_dataflow(
198 &mut self,
199 id: &GlobalId,
200 dataflow: &mut DataflowDesc,
201 features: &OptimizerFeatures,
202 ) -> Result<(), OptimizerError> {
203 maybe_grow(|| {
204 if dataflow.is_imported(id) {
206 return Ok(());
207 }
208
209 let monotonic = self.monotonic_object(*id, features);
210
211 let mut valid_indexes = self.indexes_on(*id).peekable();
216 if valid_indexes.peek().is_some() {
217 for (index_id, idx) in valid_indexes {
218 let index_desc = IndexDesc {
219 on_id: *id,
220 key: idx.keys.to_vec(),
221 };
222 let entry = self.catalog.get_entry(id);
223 let desc = entry
224 .desc(
225 &self
226 .catalog
227 .resolve_full_name(entry.name(), entry.conn_id()),
228 )
229 .expect("indexes can only be built on items with descs");
230 dataflow.import_index(index_id, index_desc, desc.typ().clone(), monotonic);
231 }
232 } else {
233 drop(valid_indexes);
234 let entry = self.catalog.get_entry(id);
235 match entry.item() {
236 CatalogItem::Table(table) => {
237 dataflow.import_source(*id, table.desc_for(id).into_typ(), monotonic);
238 }
239 CatalogItem::Source(source) => {
240 dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
241 }
242 CatalogItem::View(view) => {
243 let expr = view.optimized_expr.as_ref();
244 self.import_view_into_dataflow(id, expr, dataflow, features)?;
245 }
246 CatalogItem::MaterializedView(mview) => {
247 dataflow.import_source(*id, mview.desc_for(id).into_typ(), monotonic);
248 }
249 CatalogItem::Log(log) => {
250 dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
251 }
252 CatalogItem::ContinualTask(ct) => {
253 dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
254 }
255 _ => unreachable!(),
256 }
257 }
258 Ok(())
259 })
260 }
261
262 pub fn import_view_into_dataflow(
272 &mut self,
273 view_id: &GlobalId,
274 view: &OptimizedMirRelationExpr,
275 dataflow: &mut DataflowDesc,
276 features: &OptimizerFeatures,
277 ) -> Result<(), OptimizerError> {
278 for get_id in view.depends_on() {
279 self.import_into_dataflow(&get_id, dataflow, features)?;
280 }
281 dataflow.insert_plan(*view_id, view.clone());
282 Ok(())
283 }
284
285 pub fn maybe_reoptimize_imported_views(
288 &self,
289 df_desc: &mut DataflowDesc,
290 config: &OptimizerConfig,
291 ) -> Result<(), OptimizerError> {
292 if !config.features.reoptimize_imported_views {
293 return Ok(()); }
295
296 let mut view_optimizer = view::Optimizer::new(config.clone(), None);
297 for desc in df_desc.objects_to_build.iter_mut().rev() {
298 if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
299 continue; }
301 if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
302 let _span = tracing::span!(
303 target: "optimizer",
304 tracing::Level::DEBUG,
305 "view",
306 path.segment = desc.id.to_string()
307 )
308 .entered();
309
310 desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
312
313 trace_plan(desc.plan.as_inner());
315 }
316 }
317
318 Ok(())
319 }
320
321 fn monotonic_source(&self, data_source: &DataSourceDesc) -> bool {
323 match data_source {
324 DataSourceDesc::Ingestion { .. } => false,
325 DataSourceDesc::OldSyntaxIngestion {
326 desc, data_config, ..
327 } => data_config.monotonic(&desc.connection),
328 DataSourceDesc::Webhook { .. } => true,
329 DataSourceDesc::IngestionExport {
330 ingestion_id,
331 data_config,
332 ..
333 } => {
334 let source_desc = self
335 .catalog
336 .get_entry_by_item_id(ingestion_id)
337 .source_desc()
338 .expect("ingestion export must reference a source")
339 .expect("ingestion export must reference a source");
340 data_config.monotonic(&source_desc.connection)
341 }
342 DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => false,
343 }
344 }
345
346 fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
356 self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
357 .unwrap_or_else(|e| {
358 warn!(%id, "error inspecting object for monotonicity: {e}");
359 false
360 })
361 }
362
363 fn monotonic_object_inner(
364 &self,
365 id: GlobalId,
366 memo: &mut BTreeMap<GlobalId, bool>,
367 features: &OptimizerFeatures,
368 ) -> Result<bool, RecursionLimitError> {
369 if let Some(monotonic) = memo.get(&id) {
372 return Ok(*monotonic);
373 }
374
375 let monotonic = self.checked_recur(|_| {
376 match self.catalog.get_entry(&id).item() {
377 CatalogItem::Source(source) => Ok(self.monotonic_source(&source.data_source)),
378 CatalogItem::Table(table) => match &table.data_source {
379 TableDataSource::TableWrites { .. } => Ok(false),
380 TableDataSource::DataSource { desc, timeline: _ } => {
381 Ok(self.monotonic_source(desc))
382 }
383 },
384 CatalogItem::View(View { optimized_expr, .. }) => {
385 let view_expr = optimized_expr.as_ref().clone().into_inner();
386
387 let mut monotonic_ids = BTreeSet::new();
390 let recursion_result: Result<(), RecursionLimitError> = view_expr
391 .try_visit_post(&mut |e| {
392 if let MirRelationExpr::Get {
393 id: Id::Global(got_id),
394 ..
395 } = e
396 {
397 if self.monotonic_object_inner(*got_id, memo, features)? {
398 monotonic_ids.insert(*got_id);
399 }
400 }
401 Ok(())
402 });
403 if let Err(error) = recursion_result {
404 warn!(%id, "error inspecting view for monotonicity: {error}");
407 }
408
409 let mut builder = DerivedBuilder::new(features);
410 builder.require(Monotonic::new(monotonic_ids.clone()));
411 let derived = builder.visit(&view_expr);
412
413 Ok(*derived
414 .as_view()
415 .value::<Monotonic>()
416 .expect("Expected monotonic result from non empty tree"))
417 }
418 CatalogItem::Index(Index { on, .. }) => {
419 self.monotonic_object_inner(*on, memo, features)
420 }
421 CatalogItem::Secret(_)
422 | CatalogItem::Type(_)
423 | CatalogItem::Connection(_)
424 | CatalogItem::Log(_)
425 | CatalogItem::MaterializedView(_)
426 | CatalogItem::Sink(_)
427 | CatalogItem::Func(_)
428 | CatalogItem::ContinualTask(_) => Ok(false),
429 }
430 })?;
431
432 memo.insert(id, monotonic);
433
434 Ok(monotonic)
435 }
436}
437
438impl<'a> CheckedRecursion for DataflowBuilder<'a> {
439 fn recursion_guard(&self) -> &RecursionGuard {
440 &self.recursion_guard
441 }
442}
443
444pub fn prep_relation_expr(
448 expr: &mut OptimizedMirRelationExpr,
449 style: ExprPrepStyle,
450) -> Result<(), OptimizerError> {
451 match style {
452 ExprPrepStyle::Maintained => {
453 expr.0.try_visit_mut_post(&mut |e| {
454 if let MirRelationExpr::Filter { input, predicates } = &*e {
456 let mfp =
457 MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
458 match mfp.into_plan() {
459 Err(e) => Err(OptimizerError::UnsupportedTemporalExpression(e)),
460 Ok(mut mfp) => {
461 for s in mfp.iter_nontemporal_exprs() {
462 prep_scalar_expr(s, style)?;
463 }
464 Ok(())
465 }
466 }
467 } else {
468 e.try_visit_scalars_mut1(&mut |s| prep_scalar_expr(s, style))
469 }
470 })
471 }
472 ExprPrepStyle::OneShot { .. } | ExprPrepStyle::WebhookValidation { .. } => expr
473 .0
474 .try_visit_scalars_mut(&mut |s| prep_scalar_expr(s, style)),
475 }
476}
477
478pub fn prep_scalar_expr(
490 expr: &mut MirScalarExpr,
491 style: ExprPrepStyle,
492) -> Result<(), OptimizerError> {
493 match style {
494 ExprPrepStyle::OneShot {
497 logical_time,
498 session,
499 catalog_state,
500 } => expr.try_visit_mut_post(&mut |e| {
501 if let MirScalarExpr::CallUnmaterializable(f) = e {
502 *e = eval_unmaterializable_func(catalog_state, f, logical_time, session)?;
503 }
504 Ok(())
505 }),
506
507 ExprPrepStyle::Maintained => {
509 let mut last_observed_unmaterializable_func = None;
510 expr.visit_mut_post(&mut |e| {
511 if let MirScalarExpr::CallUnmaterializable(f) = e {
512 last_observed_unmaterializable_func = Some(f.clone());
513 }
514 })?;
515
516 if let Some(f) = last_observed_unmaterializable_func {
517 let err = match style {
518 ExprPrepStyle::Maintained => OptimizerError::UnmaterializableFunction(f),
519 _ => unreachable!(),
520 };
521 return Err(err);
522 }
523 Ok(())
524 }
525
526 ExprPrepStyle::WebhookValidation { now } => {
527 expr.try_visit_mut_post(&mut |e| {
528 if let MirScalarExpr::CallUnmaterializable(
529 f @ UnmaterializableFunc::CurrentTimestamp,
530 ) = e
531 {
532 let now: Datum = now.try_into()?;
533 let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
534 *e = const_expr;
535 }
536 Ok::<_, anyhow::Error>(())
537 })?;
538 Ok(())
539 }
540 }
541}
542
543fn eval_unmaterializable_func(
544 state: &CatalogState,
545 f: &UnmaterializableFunc,
546 logical_time: EvalTime,
547 session: &dyn SessionMetadata,
548) -> Result<MirScalarExpr, OptimizerError> {
549 let pack_1d_array = |datums: Vec<Datum>| {
550 let mut row = Row::default();
551 row.packer()
552 .try_push_array(
553 &[ArrayDimension {
554 lower_bound: 1,
555 length: datums.len(),
556 }],
557 datums,
558 )
559 .expect("known to be a valid array");
560 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
561 };
562 let pack_dict = |mut datums: Vec<(String, String)>| {
563 datums.sort();
564 let mut row = Row::default();
565 row.packer().push_dict(
566 datums
567 .iter()
568 .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
569 );
570 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
571 };
572 let pack = |datum| {
573 Ok(MirScalarExpr::literal_ok(
574 datum,
575 f.output_type().scalar_type,
576 ))
577 };
578
579 match f {
580 UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
581 UnmaterializableFunc::CurrentSchema => {
582 let search_path = state.resolve_search_path(session);
583 let schema = search_path
584 .first()
585 .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
586 pack(Datum::from(schema))
587 }
588 UnmaterializableFunc::CurrentSchemasWithSystem => {
589 let search_path = state.resolve_search_path(session);
590 let search_path = state.effective_search_path(&search_path, false);
591 pack_1d_array(
592 search_path
593 .into_iter()
594 .map(|(db, schema)| {
595 let schema = state.get_schema(&db, &schema, session.conn_id());
596 Datum::String(&schema.name.schema)
597 })
598 .collect(),
599 )
600 }
601 UnmaterializableFunc::CurrentSchemasWithoutSystem => {
602 let search_path = state.resolve_search_path(session);
603 pack_1d_array(
604 search_path
605 .into_iter()
606 .map(|(db, schema)| {
607 let schema = state.get_schema(&db, &schema, session.conn_id());
608 Datum::String(&schema.name.schema)
609 })
610 .collect(),
611 )
612 }
613 UnmaterializableFunc::ViewableVariables => pack_dict(
614 viewable_variables(state, session)
615 .map(|var| (var.name().to_lowercase(), var.value()))
616 .collect(),
617 ),
618 UnmaterializableFunc::CurrentTimestamp => {
619 let t: Datum = session.pcx().wall_time.try_into()?;
620 pack(t)
621 }
622 UnmaterializableFunc::CurrentUser => pack(Datum::from(
623 state.get_role(session.current_role_id()).name(),
624 )),
625 UnmaterializableFunc::SessionUser => pack(Datum::from(
626 state.get_role(session.session_role_id()).name(),
627 )),
628 UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
629 rbac::is_rbac_enabled_for_session(state.system_config(), session),
630 )),
631 UnmaterializableFunc::MzEnvironmentId => {
632 pack(Datum::from(&*state.config().environment_id.to_string()))
633 }
634 UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
635 UnmaterializableFunc::MzNow => match logical_time {
636 EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
637 EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
638 EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
639 func: UnmaterializableFunc::MzNow,
640 context: "this",
641 }),
642 },
643 UnmaterializableFunc::MzRoleOidMemberships => {
644 let role_memberships = role_oid_memberships(state);
645 let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
646 .into_iter()
647 .map(|(role_id, role_membership)| {
648 (
649 role_id.to_string(),
650 role_membership
651 .into_iter()
652 .map(|role_id| role_id.to_string())
653 .collect(),
654 )
655 })
656 .collect();
657 role_memberships.sort();
658 let mut row = Row::default();
659 row.packer().push_dict_with(|row| {
660 for (role_id, role_membership) in &role_memberships {
661 row.push(Datum::from(role_id.as_str()));
662 row.try_push_array(
663 &[ArrayDimension {
664 lower_bound: 1,
665 length: role_membership.len(),
666 }],
667 role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
668 ).expect("role_membership is 1 dimensional, and its length is used for the array length");
669 }
670 });
671 Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
672 }
673 UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
674 UnmaterializableFunc::MzUptime => {
675 let uptime = state.config().start_instant.elapsed();
676 let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
677 pack(uptime)
678 }
679 UnmaterializableFunc::MzVersion => pack(Datum::from(
680 &*state
681 .config()
682 .build_info
683 .human_version(state.config().helm_chart_version.clone()),
684 )),
685 UnmaterializableFunc::MzVersionNum => {
686 pack(Datum::Int32(state.config().build_info.version_num()))
687 }
688 UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
689 session.conn_id().unhandled(),
690 ))),
691 UnmaterializableFunc::PgPostmasterStartTime => {
692 let t: Datum = state.config().start_time.try_into()?;
693 pack(t)
694 }
695 UnmaterializableFunc::Version => {
696 let build_info = state.config().build_info;
697 let version = format!(
698 "PostgreSQL {}.{} on {} (Materialize {})",
699 SERVER_MAJOR_VERSION,
700 SERVER_MINOR_VERSION,
701 mz_build_info::TARGET_TRIPLE,
702 build_info.version,
703 );
704 pack(Datum::from(&*version))
705 }
706 }
707}
708
709fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
710 let mut role_memberships = BTreeMap::new();
711 for role_id in catalog.get_roles() {
712 let role = catalog.get_role(role_id);
713 if !role_memberships.contains_key(&role.oid) {
714 role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
715 }
716 }
717 role_memberships
718}
719
720fn role_oid_memberships_inner<'a>(
721 catalog: &'a CatalogState,
722 role_id: &RoleId,
723 role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
724) {
725 let role = catalog.get_role(role_id);
726 role_memberships.insert(role.oid, btreeset! {role.oid});
727 for parent_role_id in role.membership.map.keys() {
728 let parent_role = catalog.get_role(parent_role_id);
729 if !role_memberships.contains_key(&parent_role.oid) {
730 role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
731 }
732 let parent_membership: BTreeSet<_> = role_memberships
733 .get(&parent_role.oid)
734 .expect("inserted in recursive call above")
735 .into_iter()
736 .cloned()
737 .collect();
738 role_memberships
739 .get_mut(&role.oid)
740 .expect("inserted above")
741 .extend(parent_membership);
742 }
743}