Skip to main content

mz_adapter/optimize/
dataflows.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and methods for building and shipping dataflow descriptions.
11//!
12//! Dataflows are buildable from the coordinator's `catalog` and `indexes`
13//! members, which respectively describe the collection backing identifiers
14//! and indicate which identifiers have arrangements available. This module
15//! isolates that logic from the rest of the somewhat complicated coordinator.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, TableDataSource, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30    CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31    RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, ReprRelationType, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52/// A reference-less snapshot of a compute instance. There is no guarantee `instance_id` continues
53/// to exist after this has been made.
54#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56    instance_id: ComputeInstanceId,
57    /// The collections that exist on this compute instance. If it's None, then any collection that
58    /// a caller asks us about is considered to exist.
59    /// TODO(peek-seq): Remove this completely once all callers are able to handle suddenly missing
60    /// collections, in which case we won't need a `ComputeInstanceSnapshot` at all.
61    collections: Option<BTreeSet<GlobalId>>,
62}
63
64impl ComputeInstanceSnapshot {
65    pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
66        controller
67            .compute
68            .collection_ids(id)
69            .map(|collection_ids| Self {
70                instance_id: id,
71                collections: Some(collection_ids.collect()),
72            })
73    }
74
75    pub fn new_from_parts(instance_id: ComputeInstanceId, collections: BTreeSet<GlobalId>) -> Self {
76        Self {
77            instance_id,
78            collections: Some(collections),
79        }
80    }
81
82    pub fn new_without_collections(instance_id: ComputeInstanceId) -> Self {
83        Self {
84            instance_id,
85            collections: None,
86        }
87    }
88
89    /// Return the ID of this compute instance.
90    pub fn instance_id(&self) -> ComputeInstanceId {
91        self.instance_id
92    }
93
94    /// Reports whether the instance contains the indicated collection. If the snapshot doesn't
95    /// track collections, then it returns true.
96    pub fn contains_collection(&self, id: &GlobalId) -> bool {
97        self.collections
98            .as_ref()
99            .map_or(true, |collections| collections.contains(id))
100    }
101
102    /// Inserts the given collection into the snapshot.
103    pub fn insert_collection(&mut self, id: GlobalId) {
104        self.collections
105            .as_mut()
106            .expect("insert_collection called on snapshot with None collections")
107            .insert(id);
108    }
109}
110
111/// Borrows of catalog and indexes sufficient to build dataflow descriptions.
112#[derive(Debug)]
113pub struct DataflowBuilder<'a> {
114    pub catalog: &'a dyn OptimizerCatalog,
115    /// A handle to the compute abstraction, which describes indexes by identifier.
116    ///
117    /// This can also be used to grab a handle to the storage abstraction, through
118    /// its `storage_mut()` method.
119    pub compute: ComputeInstanceSnapshot,
120    /// If set, indicates that the `DataflowBuilder` operates in "replan" mode
121    /// and should consider only catalog items that are strictly less than the
122    /// given [`GlobalId`].
123    ///
124    /// In particular, indexes with higher [`GlobalId`] that are present in the
125    /// catalog will be ignored.
126    ///
127    /// Bound from [`OptimizerConfig::replan`].
128    pub replan: Option<GlobalId>,
129    /// A guard for recursive operations in this [`DataflowBuilder`] instance.
130    recursion_guard: RecursionGuard,
131}
132
133/// Behavior to prepare relation and scalar expressions for use in a dataflow.
134pub trait ExprPrep {
135    /// Prepare a relation expression.
136    fn prep_relation_expr(&self, expr: &mut OptimizedMirRelationExpr)
137    -> Result<(), OptimizerError>;
138
139    /// Prepare a scalar expression.
140    fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError>;
141}
142
143/// A no-op expression preparer.
144pub struct ExprPrepNoop;
145impl ExprPrep for ExprPrepNoop {
146    fn prep_relation_expr(&self, _: &mut OptimizedMirRelationExpr) -> Result<(), OptimizerError> {
147        Ok(())
148    }
149    fn prep_scalar_expr(&self, _expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
150        Ok(())
151    }
152}
153
154/// Preparing an expression for maintained dataflow, e.g., index, materialized view, or subscribe.
155/// Produces errors for calls to unmaterializable functions.
156pub struct ExprPrepMaintained;
157
158impl ExprPrep for ExprPrepMaintained {
159    fn prep_relation_expr(
160        &self,
161        expr: &mut OptimizedMirRelationExpr,
162    ) -> Result<(), OptimizerError> {
163        expr.0.try_visit_mut_post(&mut |e| {
164            // Carefully test filter expressions, which may represent temporal filters.
165            if let MirRelationExpr::Filter { input, predicates } = &*e {
166                let mfp = MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
167                match mfp.into_plan() {
168                    Err(e) => Err(OptimizerError::UnsupportedTemporalExpression(e)),
169                    Ok(mut mfp) => {
170                        for s in mfp.iter_nontemporal_exprs() {
171                            self.prep_scalar_expr(s)?;
172                        }
173                        Ok(())
174                    }
175                }
176            } else {
177                e.try_visit_scalars_mut1(&mut |s| self.prep_scalar_expr(s))
178            }
179        })
180    }
181
182    fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
183        // Reject the query if it contains any unmaterializable function calls.
184        let mut last_observed_unmaterializable_func = None;
185        expr.visit_mut_post(&mut |e| {
186            if let MirScalarExpr::CallUnmaterializable(f) = e {
187                last_observed_unmaterializable_func = Some(f.clone());
188            }
189        })?;
190
191        if let Some(f) = last_observed_unmaterializable_func {
192            Err(OptimizerError::UnmaterializableFunction(f))
193        } else {
194            Ok(())
195        }
196    }
197}
198
199/// Prepare an expression to run once at a logical time in a session.
200/// Calls to all unmaterializable functions are replaced with constants.
201pub struct ExprPrepOneShot<'a> {
202    pub logical_time: EvalTime,
203    pub session: &'a dyn SessionMetadata,
204    pub catalog_state: &'a CatalogState,
205}
206
207impl ExprPrep for ExprPrepOneShot<'_> {
208    fn prep_relation_expr(
209        &self,
210        expr: &mut OptimizedMirRelationExpr,
211    ) -> Result<(), OptimizerError> {
212        expr.0
213            .try_visit_scalars_mut(&mut |s| self.prep_scalar_expr(s))
214    }
215
216    fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
217        // Evaluate each unmaterializable function and replace the
218        // invocation with the result.
219        expr.try_visit_mut_post(&mut |e| {
220            if let MirScalarExpr::CallUnmaterializable(f) = e {
221                *e = eval_unmaterializable_func(
222                    self.catalog_state,
223                    f,
224                    self.logical_time,
225                    self.session,
226                )?;
227            }
228            Ok(())
229        })
230    }
231}
232
233/// Prepare an expression for evaluation in a CHECK expression of a webhook source.
234/// Replaces calls to `UnmaterializableFunc::CurrentTimestamp`, others are left untouched.
235pub struct ExprPrepWebhookValidation {
236    /// Time at which this expression is being evaluated.
237    pub now: DateTime<Utc>,
238}
239
240impl ExprPrep for ExprPrepWebhookValidation {
241    fn prep_relation_expr(
242        &self,
243        expr: &mut OptimizedMirRelationExpr,
244    ) -> Result<(), OptimizerError> {
245        expr.0
246            .try_visit_scalars_mut(&mut |s| self.prep_scalar_expr(s))
247    }
248
249    fn prep_scalar_expr(&self, expr: &mut MirScalarExpr) -> Result<(), OptimizerError> {
250        let now = self.now;
251        expr.try_visit_mut_post(&mut |e| {
252            if let MirScalarExpr::CallUnmaterializable(f @ UnmaterializableFunc::CurrentTimestamp) =
253                e
254            {
255                let now: Datum = now.try_into()?;
256                let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
257                *e = const_expr;
258            }
259            Ok(())
260        })
261    }
262}
263
264#[derive(Clone, Copy, Debug)]
265pub enum EvalTime {
266    Time(mz_repr::Timestamp),
267    /// Skips mz_now() calls.
268    Deferred,
269    /// Errors on mz_now() calls.
270    NotAvailable,
271}
272
273/// Returns an ID bundle with the given dataflows imports.
274pub fn dataflow_import_id_bundle<P>(
275    dataflow: &DataflowDescription<P>,
276    compute_instance: ComputeInstanceId,
277) -> CollectionIdBundle {
278    let storage_ids = dataflow.source_imports.keys().copied().collect();
279    let compute_ids = dataflow.index_imports.keys().copied().collect();
280    CollectionIdBundle {
281        storage_ids,
282        compute_ids: btreemap! {compute_instance => compute_ids},
283    }
284}
285
286impl<'a> DataflowBuilder<'a> {
287    pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
288        Self {
289            catalog,
290            compute,
291            replan: None,
292            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
293        }
294    }
295
296    // TODO(aalexandrov): strictly speaking it should be better if we can make
297    // `config: &OptimizerConfig` a field in the enclosing builder. However,
298    // before we can do that we should make sure that nobody outside of the
299    // optimizer is using a DataflowBuilder instance.
300    pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
301        self.replan = config.replan;
302        self
303    }
304
305    /// Imports the view, source, or table with `id` into the provided
306    /// dataflow description. [`OptimizerFeatures`] is used while running
307    /// the [`Monotonic`] analysis.
308    ///
309    /// Panics if `id` refers to a non-importable item, such as an index or sink.
310    pub fn import_into_dataflow(
311        &mut self,
312        id: &GlobalId,
313        dataflow: &mut DataflowDesc,
314        features: &OptimizerFeatures,
315    ) -> Result<(), OptimizerError> {
316        maybe_grow(|| {
317            // Avoid importing the item redundantly.
318            if dataflow.is_imported(id) {
319                return Ok(());
320            }
321
322            let monotonic = self.monotonic_object(*id, features);
323
324            // A valid index is any index on `id` that is known to index oracle.
325            // Here, we import all indexes that belong to all imported collections. Later,
326            // `prune_and_annotate_dataflow_index_imports` runs at the end of the MIR
327            // pipeline, and removes unneeded index imports based on the optimized plan.
328            let mut valid_indexes = self.indexes_on(*id).peekable();
329            if valid_indexes.peek().is_some() {
330                for (index_id, idx) in valid_indexes {
331                    let index_desc = IndexDesc {
332                        on_id: *id,
333                        key: idx.keys.to_vec(),
334                    };
335                    let entry = self.catalog.get_entry(id);
336                    let desc = entry
337                        .relation_desc()
338                        .expect("indexes can only be built on items with descs");
339                    dataflow.import_index(
340                        index_id,
341                        index_desc,
342                        ReprRelationType::from(desc.typ()),
343                        monotonic,
344                    );
345                }
346            } else {
347                drop(valid_indexes);
348                let entry = self.catalog.get_entry(id);
349                // Note that the following match should be kept in sync with `sufficient_collections`.
350                match entry.item() {
351                    CatalogItem::Table(table) => {
352                        dataflow.import_source(*id, table.desc_for(id).into_typ(), monotonic);
353                    }
354                    CatalogItem::Source(source) => {
355                        dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
356                    }
357                    CatalogItem::View(view) => {
358                        let expr = view.optimized_expr.as_ref();
359                        self.import_view_into_dataflow(id, expr, dataflow, features)?;
360                    }
361                    CatalogItem::MaterializedView(mview) if mview.replacement_target.is_some() => {
362                        // Can't read from replacements, use the view definition directly.
363                        let expr = mview.optimized_expr.as_ref();
364                        self.import_view_into_dataflow(id, expr, dataflow, features)?;
365                    }
366                    CatalogItem::MaterializedView(mview) => {
367                        dataflow.import_source(*id, mview.desc_for(id).into_typ(), monotonic);
368                    }
369                    CatalogItem::Log(log) => {
370                        dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
371                    }
372                    CatalogItem::ContinualTask(ct) => {
373                        dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
374                    }
375                    CatalogItem::Sink(_)
376                    | CatalogItem::Index(_)
377                    | CatalogItem::Type(_)
378                    | CatalogItem::Func(_)
379                    | CatalogItem::Secret(_)
380                    | CatalogItem::Connection(_) => {
381                        // Non-importable thing; can't get here.
382                        unreachable!()
383                    }
384                }
385            }
386            Ok(())
387        })
388    }
389
390    /// Imports the view with the specified ID and expression into the provided
391    /// dataflow description. [`OptimizerFeatures`] is used while running
392    /// expression [`mz_transform::analysis::Analysis`].
393    ///
394    /// You should generally prefer calling
395    /// [`DataflowBuilder::import_into_dataflow`], which can handle objects of
396    /// any type as long as they exist in the catalog. This method exists for
397    /// when the view does not exist in the catalog, e.g., because it is
398    /// identified by a [`GlobalId::Transient`].
399    pub fn import_view_into_dataflow(
400        &mut self,
401        view_id: &GlobalId,
402        view: &OptimizedMirRelationExpr,
403        dataflow: &mut DataflowDesc,
404        features: &OptimizerFeatures,
405    ) -> Result<(), OptimizerError> {
406        for get_id in view.depends_on() {
407            self.import_into_dataflow(&get_id, dataflow, features)?;
408        }
409        dataflow.insert_plan(*view_id, view.clone());
410        Ok(())
411    }
412
413    // Re-optimize the imported view plans using the current optimizer
414    // configuration if reoptimization is requested.
415    pub fn maybe_reoptimize_imported_views(
416        &self,
417        df_desc: &mut DataflowDesc,
418        config: &OptimizerConfig,
419    ) -> Result<(), OptimizerError> {
420        if !config.features.reoptimize_imported_views {
421            return Ok(()); // Do nothing if not explicitly requested.
422        }
423
424        let mut view_optimizer = view::Optimizer::new(config.clone(), None);
425        for desc in df_desc.objects_to_build.iter_mut().rev() {
426            if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
427                continue; // Skip descriptions that do not reference proper views.
428            }
429            if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
430                let _span = tracing::span!(
431                    target: "optimizer",
432                    tracing::Level::DEBUG,
433                    "view",
434                    path.segment = desc.id.to_string()
435                )
436                .entered();
437
438                // Reoptimize the view and update the resulting `desc.plan`.
439                desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
440
441                // Report the optimized plan under this span.
442                trace_plan(desc.plan.as_inner());
443            }
444        }
445
446        Ok(())
447    }
448
449    /// Determine the given source's monotonicity.
450    fn monotonic_source(&self, data_source: &DataSourceDesc) -> bool {
451        match data_source {
452            DataSourceDesc::Ingestion { .. } => false,
453            DataSourceDesc::OldSyntaxIngestion {
454                desc, data_config, ..
455            } => data_config.monotonic(&desc.connection),
456            DataSourceDesc::Webhook { .. } => true,
457            DataSourceDesc::IngestionExport {
458                ingestion_id,
459                data_config,
460                ..
461            } => {
462                let source_desc = self
463                    .catalog
464                    .get_entry_by_item_id(ingestion_id)
465                    .source_desc()
466                    .expect("ingestion export must reference a source")
467                    .expect("ingestion export must reference a source");
468                data_config.monotonic(&source_desc.connection)
469            }
470            DataSourceDesc::Introspection(_)
471            | DataSourceDesc::Progress
472            | DataSourceDesc::Catalog => false,
473        }
474    }
475
476    /// Determine the given objects's monotonicity.
477    ///
478    /// This recursively traverses the expressions of all views depended on by the given object.
479    /// If this becomes a performance problem, we could add the monotonicity information of views
480    /// into the catalog instead.
481    ///
482    /// Note that materialized views are never monotonic, no matter their definition, because the
483    /// self-correcting persist_sink may insert retractions to correct the contents of its output
484    /// collection.
485    fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
486        self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
487            .unwrap_or_else(|e| {
488                warn!(%id, "error inspecting object for monotonicity: {e}");
489                false
490            })
491    }
492
493    fn monotonic_object_inner(
494        &self,
495        id: GlobalId,
496        memo: &mut BTreeMap<GlobalId, bool>,
497        features: &OptimizerFeatures,
498    ) -> Result<bool, RecursionLimitError> {
499        // An object might be reached multiple times. If we already computed the monotonicity of
500        // the given ID, use that. If not, then compute it and remember the result.
501        if let Some(monotonic) = memo.get(&id) {
502            return Ok(*monotonic);
503        }
504
505        let monotonic = self.checked_recur(|_| {
506            match self.catalog.get_entry(&id).item() {
507                CatalogItem::Source(source) => Ok(self.monotonic_source(&source.data_source)),
508                CatalogItem::Table(table) => match &table.data_source {
509                    TableDataSource::TableWrites { .. } => Ok(false),
510                    TableDataSource::DataSource { desc, timeline: _ } => {
511                        Ok(self.monotonic_source(desc))
512                    }
513                },
514                CatalogItem::View(View { optimized_expr, .. }) => {
515                    let view_expr = optimized_expr.as_ref().clone().into_inner();
516
517                    // Inspect global ids that occur in the Gets in view_expr, and collect the ids
518                    // of monotonic dependees.
519                    let mut monotonic_ids = BTreeSet::new();
520                    let recursion_result: Result<(), RecursionLimitError> = view_expr
521                        .try_visit_post(&mut |e| {
522                            if let MirRelationExpr::Get {
523                                id: Id::Global(got_id),
524                                ..
525                            } = e
526                            {
527                                if self.monotonic_object_inner(*got_id, memo, features)? {
528                                    monotonic_ids.insert(*got_id);
529                                }
530                            }
531                            Ok(())
532                        });
533                    if let Err(error) = recursion_result {
534                        // We still might have got some of the IDs, so just log and continue. Now
535                        // the subsequent monotonicity analysis can have false negatives.
536                        warn!(%id, "error inspecting view for monotonicity: {error}");
537                    }
538
539                    let mut builder = DerivedBuilder::new(features);
540                    builder.require(Monotonic::new(monotonic_ids.clone()));
541                    let derived = builder.visit(&view_expr);
542
543                    Ok(*derived
544                        .as_view()
545                        .value::<Monotonic>()
546                        .expect("Expected monotonic result from non empty tree"))
547                }
548                CatalogItem::Index(Index { on, .. }) => {
549                    self.monotonic_object_inner(*on, memo, features)
550                }
551                CatalogItem::Secret(_)
552                | CatalogItem::Type(_)
553                | CatalogItem::Connection(_)
554                | CatalogItem::Log(_)
555                | CatalogItem::MaterializedView(_)
556                | CatalogItem::Sink(_)
557                | CatalogItem::Func(_)
558                | CatalogItem::ContinualTask(_) => Ok(false),
559            }
560        })?;
561
562        memo.insert(id, monotonic);
563
564        Ok(monotonic)
565    }
566}
567
568impl<'a> CheckedRecursion for DataflowBuilder<'a> {
569    fn recursion_guard(&self) -> &RecursionGuard {
570        &self.recursion_guard
571    }
572}
573
574fn eval_unmaterializable_func(
575    state: &CatalogState,
576    f: &UnmaterializableFunc,
577    logical_time: EvalTime,
578    session: &dyn SessionMetadata,
579) -> Result<MirScalarExpr, OptimizerError> {
580    let pack_1d_array = |datums: Vec<Datum>| {
581        let mut row = Row::default();
582        row.packer()
583            .try_push_array(
584                &[ArrayDimension {
585                    lower_bound: 1,
586                    length: datums.len(),
587                }],
588                datums,
589            )
590            .expect("known to be a valid array");
591        Ok(MirScalarExpr::literal_from_single_element_row(
592            row,
593            f.output_type().scalar_type,
594        ))
595    };
596    let pack_dict = |mut datums: Vec<(String, String)>| {
597        datums.sort();
598        let mut row = Row::default();
599        row.packer().push_dict(
600            datums
601                .iter()
602                .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
603        );
604        Ok(MirScalarExpr::literal_from_single_element_row(
605            row,
606            f.output_type().scalar_type,
607        ))
608    };
609    let pack = |datum| {
610        Ok(MirScalarExpr::literal_ok(
611            datum,
612            f.output_type().scalar_type,
613        ))
614    };
615
616    match f {
617        UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
618        UnmaterializableFunc::CurrentSchema => {
619            let search_path = state.resolve_search_path(session);
620            let schema = search_path
621                .first()
622                .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
623            pack(Datum::from(schema))
624        }
625        UnmaterializableFunc::CurrentSchemasWithSystem => {
626            let search_path = state.resolve_search_path(session);
627            let search_path = state.effective_search_path(&search_path, false);
628            pack_1d_array(
629                search_path
630                    .into_iter()
631                    .map(|(db, schema)| {
632                        let schema = state.get_schema(&db, &schema, session.conn_id());
633                        Datum::String(&schema.name.schema)
634                    })
635                    .collect(),
636            )
637        }
638        UnmaterializableFunc::CurrentSchemasWithoutSystem => {
639            let search_path = state.resolve_search_path(session);
640            pack_1d_array(
641                search_path
642                    .into_iter()
643                    .map(|(db, schema)| {
644                        let schema = state.get_schema(&db, &schema, session.conn_id());
645                        Datum::String(&schema.name.schema)
646                    })
647                    .collect(),
648            )
649        }
650        UnmaterializableFunc::ViewableVariables => pack_dict(
651            viewable_variables(state, session)
652                .map(|var| (var.name().to_lowercase(), var.value()))
653                .collect(),
654        ),
655        UnmaterializableFunc::CurrentTimestamp => {
656            let t: Datum = session.pcx().wall_time.try_into()?;
657            pack(t)
658        }
659        UnmaterializableFunc::CurrentUser => pack(Datum::from(
660            state.get_role(session.current_role_id()).name(),
661        )),
662        UnmaterializableFunc::SessionUser => pack(Datum::from(
663            state.get_role(session.session_role_id()).name(),
664        )),
665        UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
666            rbac::is_rbac_enabled_for_session(state.system_config(), session),
667        )),
668        UnmaterializableFunc::MzEnvironmentId => {
669            pack(Datum::from(&*state.config().environment_id.to_string()))
670        }
671        UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
672        UnmaterializableFunc::MzNow => match logical_time {
673            EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
674            EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
675            EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
676                func: UnmaterializableFunc::MzNow,
677                context: "this",
678            }),
679        },
680        UnmaterializableFunc::MzRoleOidMemberships => {
681            let role_memberships = role_oid_memberships(state);
682            let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
683                .into_iter()
684                .map(|(role_id, role_membership)| {
685                    (
686                        role_id.to_string(),
687                        role_membership
688                            .into_iter()
689                            .map(|role_id| role_id.to_string())
690                            .collect(),
691                    )
692                })
693                .collect();
694            role_memberships.sort();
695            let mut row = Row::default();
696            row.packer().push_dict_with(|row| {
697                for (role_id, role_membership) in &role_memberships {
698                    row.push(Datum::from(role_id.as_str()));
699                    row.try_push_array(
700                        &[ArrayDimension {
701                            lower_bound: 1,
702                            length: role_membership.len(),
703                        }],
704                        role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
705                    ).expect("role_membership is 1 dimensional, and its length is used for the array length");
706                }
707            });
708            Ok(MirScalarExpr::literal_from_single_element_row(
709                row,
710                f.output_type().scalar_type,
711            ))
712        }
713        UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
714        UnmaterializableFunc::MzUptime => {
715            let uptime = state.config().start_instant.elapsed();
716            let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
717            pack(uptime)
718        }
719        UnmaterializableFunc::MzVersion => pack(Datum::from(
720            &*state
721                .config()
722                .build_info
723                .human_version(state.config().helm_chart_version.clone()),
724        )),
725        UnmaterializableFunc::MzVersionNum => {
726            pack(Datum::Int32(state.config().build_info.version_num()))
727        }
728        UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
729            session.conn_id().unhandled(),
730        ))),
731        UnmaterializableFunc::PgPostmasterStartTime => {
732            let t: Datum = state.config().start_time.try_into()?;
733            pack(t)
734        }
735        UnmaterializableFunc::Version => {
736            let build_info = state.config().build_info;
737            let version = format!(
738                "PostgreSQL {}.{} on {} (Materialize {})",
739                SERVER_MAJOR_VERSION,
740                SERVER_MINOR_VERSION,
741                mz_build_info::TARGET_TRIPLE,
742                build_info.version,
743            );
744            pack(Datum::from(&*version))
745        }
746    }
747}
748
749fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
750    let mut role_memberships = BTreeMap::new();
751    for role_id in catalog.get_roles() {
752        let role = catalog.get_role(role_id);
753        if !role_memberships.contains_key(&role.oid) {
754            role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
755        }
756    }
757    role_memberships
758}
759
760fn role_oid_memberships_inner<'a>(
761    catalog: &'a CatalogState,
762    role_id: &RoleId,
763    role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
764) {
765    let role = catalog.get_role(role_id);
766    role_memberships.insert(role.oid, btreeset! {role.oid});
767    for parent_role_id in role.membership.map.keys() {
768        let parent_role = catalog.get_role(parent_role_id);
769        if !role_memberships.contains_key(&parent_role.oid) {
770            role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
771        }
772        let parent_membership: BTreeSet<_> = role_memberships
773            .get(&parent_role.oid)
774            .expect("inserted in recursive call above")
775            .into_iter()
776            .cloned()
777            .collect();
778        role_memberships
779            .get_mut(&role.oid)
780            .expect("inserted above")
781            .extend(parent_membership);
782    }
783}