mz_adapter/optimize/
dataflows.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and methods for building and shipping dataflow descriptions.
11//!
12//! Dataflows are buildable from the coordinator's `catalog` and `indexes`
13//! members, which respectively describe the collection backing identifiers
14//! and indicate which identifiers have arrangements available. This module
15//! isolates that logic from the rest of the somewhat complicated coordinator.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, TableDataSource, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30    CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31    RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52/// A reference-less snapshot of a compute instance. There is no guarantee `instance_id` continues
53/// to exist after this has been made.
54#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56    instance_id: ComputeInstanceId,
57    /// The collections that exist on this compute instance. If it's None, then any collection that
58    /// a caller asks us about is considered to exist.
59    /// TODO(peek-seq): Remove this completely once all callers are able to handle suddenly missing
60    /// collections, in which case we won't need a `ComputeInstanceSnapshot` at all.
61    collections: Option<BTreeSet<GlobalId>>,
62}
63
64impl ComputeInstanceSnapshot {
65    pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
66        controller
67            .compute
68            .collection_ids(id)
69            .map(|collection_ids| Self {
70                instance_id: id,
71                collections: Some(collection_ids.collect()),
72            })
73    }
74
75    pub fn new_from_parts(instance_id: ComputeInstanceId, collections: BTreeSet<GlobalId>) -> Self {
76        Self {
77            instance_id,
78            collections: Some(collections),
79        }
80    }
81
82    pub fn new_without_collections(instance_id: ComputeInstanceId) -> Self {
83        Self {
84            instance_id,
85            collections: None,
86        }
87    }
88
89    /// Return the ID of this compute instance.
90    pub fn instance_id(&self) -> ComputeInstanceId {
91        self.instance_id
92    }
93
94    /// Reports whether the instance contains the indicated collection. If the snapshot doesn't
95    /// track collections, then it returns true.
96    pub fn contains_collection(&self, id: &GlobalId) -> bool {
97        self.collections
98            .as_ref()
99            .map_or(true, |collections| collections.contains(id))
100    }
101
102    /// Inserts the given collection into the snapshot.
103    pub fn insert_collection(&mut self, id: GlobalId) {
104        self.collections
105            .as_mut()
106            .expect("insert_collection called on snapshot with None collections")
107            .insert(id);
108    }
109}
110
111/// Borrows of catalog and indexes sufficient to build dataflow descriptions.
112#[derive(Debug)]
113pub struct DataflowBuilder<'a> {
114    pub catalog: &'a dyn OptimizerCatalog,
115    /// A handle to the compute abstraction, which describes indexes by identifier.
116    ///
117    /// This can also be used to grab a handle to the storage abstraction, through
118    /// its `storage_mut()` method.
119    pub compute: ComputeInstanceSnapshot,
120    /// If set, indicates that the `DataflowBuilder` operates in "replan" mode
121    /// and should consider only catalog items that are strictly less than the
122    /// given [`GlobalId`].
123    ///
124    /// In particular, indexes with higher [`GlobalId`] that are present in the
125    /// catalog will be ignored.
126    ///
127    /// Bound from [`OptimizerConfig::replan`].
128    pub replan: Option<GlobalId>,
129    /// A guard for recursive operations in this [`DataflowBuilder`] instance.
130    recursion_guard: RecursionGuard,
131}
132
133/// The styles in which an expression can be prepared for use in a dataflow.
134#[derive(Clone, Copy, Debug)]
135pub enum ExprPrepStyle<'a> {
136    /// The expression is being prepared for installation as a maintained dataflow, e.g.,
137    /// index, materialized view, or subscribe.
138    Maintained,
139    /// The expression is being prepared to run once at the specified logical
140    /// time in the specified session.
141    OneShot {
142        logical_time: EvalTime,
143        session: &'a dyn SessionMetadata,
144        catalog_state: &'a CatalogState,
145    },
146    /// The expression is being prepared for evaluation in a CHECK expression of a webhook source.
147    WebhookValidation {
148        /// Time at which this expression is being evaluated.
149        now: DateTime<Utc>,
150    },
151}
152
153#[derive(Clone, Copy, Debug)]
154pub enum EvalTime {
155    Time(mz_repr::Timestamp),
156    /// Skips mz_now() calls.
157    Deferred,
158    /// Errors on mz_now() calls.
159    NotAvailable,
160}
161
162/// Returns an ID bundle with the given dataflows imports.
163pub fn dataflow_import_id_bundle<P>(
164    dataflow: &DataflowDescription<P>,
165    compute_instance: ComputeInstanceId,
166) -> CollectionIdBundle {
167    let storage_ids = dataflow.source_imports.keys().copied().collect();
168    let compute_ids = dataflow.index_imports.keys().copied().collect();
169    CollectionIdBundle {
170        storage_ids,
171        compute_ids: btreemap! {compute_instance => compute_ids},
172    }
173}
174
175impl<'a> DataflowBuilder<'a> {
176    pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
177        Self {
178            catalog,
179            compute,
180            replan: None,
181            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
182        }
183    }
184
185    // TODO(aalexandrov): strictly speaking it should be better if we can make
186    // `config: &OptimizerConfig` a field in the enclosing builder. However,
187    // before we can do that we should make sure that nobody outside of the
188    // optimizer is using a DataflowBuilder instance.
189    pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
190        self.replan = config.replan;
191        self
192    }
193
194    /// Imports the view, source, or table with `id` into the provided
195    /// dataflow description. [`OptimizerFeatures`] is used while running
196    /// the [`Monotonic`] analysis.
197    pub fn import_into_dataflow(
198        &mut self,
199        id: &GlobalId,
200        dataflow: &mut DataflowDesc,
201        features: &OptimizerFeatures,
202    ) -> Result<(), OptimizerError> {
203        maybe_grow(|| {
204            // Avoid importing the item redundantly.
205            if dataflow.is_imported(id) {
206                return Ok(());
207            }
208
209            let monotonic = self.monotonic_object(*id, features);
210
211            // A valid index is any index on `id` that is known to index oracle.
212            // Here, we import all indexes that belong to all imported collections. Later,
213            // `prune_and_annotate_dataflow_index_imports` runs at the end of the MIR
214            // pipeline, and removes unneeded index imports based on the optimized plan.
215            let mut valid_indexes = self.indexes_on(*id).peekable();
216            if valid_indexes.peek().is_some() {
217                for (index_id, idx) in valid_indexes {
218                    let index_desc = IndexDesc {
219                        on_id: *id,
220                        key: idx.keys.to_vec(),
221                    };
222                    let entry = self.catalog.get_entry(id);
223                    let desc = entry
224                        .relation_desc()
225                        .expect("indexes can only be built on items with descs");
226                    dataflow.import_index(index_id, index_desc, desc.typ().clone(), monotonic);
227                }
228            } else {
229                drop(valid_indexes);
230                let entry = self.catalog.get_entry(id);
231                match entry.item() {
232                    CatalogItem::Table(table) => {
233                        dataflow.import_source(*id, table.desc_for(id).into_typ(), monotonic);
234                    }
235                    CatalogItem::Source(source) => {
236                        dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
237                    }
238                    CatalogItem::View(view) => {
239                        let expr = view.optimized_expr.as_ref();
240                        self.import_view_into_dataflow(id, expr, dataflow, features)?;
241                    }
242                    CatalogItem::MaterializedView(mview) => {
243                        dataflow.import_source(*id, mview.desc_for(id).into_typ(), monotonic);
244                    }
245                    CatalogItem::Log(log) => {
246                        dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
247                    }
248                    CatalogItem::ContinualTask(ct) => {
249                        dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
250                    }
251                    _ => unreachable!(),
252                }
253            }
254            Ok(())
255        })
256    }
257
258    /// Imports the view with the specified ID and expression into the provided
259    /// dataflow description. [`OptimizerFeatures`] is used while running
260    /// expression [`mz_transform::analysis::Analysis`].
261    ///
262    /// You should generally prefer calling
263    /// [`DataflowBuilder::import_into_dataflow`], which can handle objects of
264    /// any type as long as they exist in the catalog. This method exists for
265    /// when the view does not exist in the catalog, e.g., because it is
266    /// identified by a [`GlobalId::Transient`].
267    pub fn import_view_into_dataflow(
268        &mut self,
269        view_id: &GlobalId,
270        view: &OptimizedMirRelationExpr,
271        dataflow: &mut DataflowDesc,
272        features: &OptimizerFeatures,
273    ) -> Result<(), OptimizerError> {
274        for get_id in view.depends_on() {
275            self.import_into_dataflow(&get_id, dataflow, features)?;
276        }
277        dataflow.insert_plan(*view_id, view.clone());
278        Ok(())
279    }
280
281    // Re-optimize the imported view plans using the current optimizer
282    // configuration if reoptimization is requested.
283    pub fn maybe_reoptimize_imported_views(
284        &self,
285        df_desc: &mut DataflowDesc,
286        config: &OptimizerConfig,
287    ) -> Result<(), OptimizerError> {
288        if !config.features.reoptimize_imported_views {
289            return Ok(()); // Do nothing if not explicitly requested.
290        }
291
292        let mut view_optimizer = view::Optimizer::new(config.clone(), None);
293        for desc in df_desc.objects_to_build.iter_mut().rev() {
294            if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
295                continue; // Skip descriptions that do not reference proper views.
296            }
297            if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
298                let _span = tracing::span!(
299                    target: "optimizer",
300                    tracing::Level::DEBUG,
301                    "view",
302                    path.segment = desc.id.to_string()
303                )
304                .entered();
305
306                // Reoptimize the view and update the resulting `desc.plan`.
307                desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
308
309                // Report the optimized plan under this span.
310                trace_plan(desc.plan.as_inner());
311            }
312        }
313
314        Ok(())
315    }
316
317    /// Determine the given source's monotonicity.
318    fn monotonic_source(&self, data_source: &DataSourceDesc) -> bool {
319        match data_source {
320            DataSourceDesc::Ingestion { .. } => false,
321            DataSourceDesc::OldSyntaxIngestion {
322                desc, data_config, ..
323            } => data_config.monotonic(&desc.connection),
324            DataSourceDesc::Webhook { .. } => true,
325            DataSourceDesc::IngestionExport {
326                ingestion_id,
327                data_config,
328                ..
329            } => {
330                let source_desc = self
331                    .catalog
332                    .get_entry_by_item_id(ingestion_id)
333                    .source_desc()
334                    .expect("ingestion export must reference a source")
335                    .expect("ingestion export must reference a source");
336                data_config.monotonic(&source_desc.connection)
337            }
338            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => false,
339        }
340    }
341
342    /// Determine the given objects's monotonicity.
343    ///
344    /// This recursively traverses the expressions of all views depended on by the given object.
345    /// If this becomes a performance problem, we could add the monotonicity information of views
346    /// into the catalog instead.
347    ///
348    /// Note that materialized views are never monotonic, no matter their definition, because the
349    /// self-correcting persist_sink may insert retractions to correct the contents of its output
350    /// collection.
351    fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
352        self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
353            .unwrap_or_else(|e| {
354                warn!(%id, "error inspecting object for monotonicity: {e}");
355                false
356            })
357    }
358
359    fn monotonic_object_inner(
360        &self,
361        id: GlobalId,
362        memo: &mut BTreeMap<GlobalId, bool>,
363        features: &OptimizerFeatures,
364    ) -> Result<bool, RecursionLimitError> {
365        // An object might be reached multiple times. If we already computed the monotonicity of
366        // the given ID, use that. If not, then compute it and remember the result.
367        if let Some(monotonic) = memo.get(&id) {
368            return Ok(*monotonic);
369        }
370
371        let monotonic = self.checked_recur(|_| {
372            match self.catalog.get_entry(&id).item() {
373                CatalogItem::Source(source) => Ok(self.monotonic_source(&source.data_source)),
374                CatalogItem::Table(table) => match &table.data_source {
375                    TableDataSource::TableWrites { .. } => Ok(false),
376                    TableDataSource::DataSource { desc, timeline: _ } => {
377                        Ok(self.monotonic_source(desc))
378                    }
379                },
380                CatalogItem::View(View { optimized_expr, .. }) => {
381                    let view_expr = optimized_expr.as_ref().clone().into_inner();
382
383                    // Inspect global ids that occur in the Gets in view_expr, and collect the ids
384                    // of monotonic dependees.
385                    let mut monotonic_ids = BTreeSet::new();
386                    let recursion_result: Result<(), RecursionLimitError> = view_expr
387                        .try_visit_post(&mut |e| {
388                            if let MirRelationExpr::Get {
389                                id: Id::Global(got_id),
390                                ..
391                            } = e
392                            {
393                                if self.monotonic_object_inner(*got_id, memo, features)? {
394                                    monotonic_ids.insert(*got_id);
395                                }
396                            }
397                            Ok(())
398                        });
399                    if let Err(error) = recursion_result {
400                        // We still might have got some of the IDs, so just log and continue. Now
401                        // the subsequent monotonicity analysis can have false negatives.
402                        warn!(%id, "error inspecting view for monotonicity: {error}");
403                    }
404
405                    let mut builder = DerivedBuilder::new(features);
406                    builder.require(Monotonic::new(monotonic_ids.clone()));
407                    let derived = builder.visit(&view_expr);
408
409                    Ok(*derived
410                        .as_view()
411                        .value::<Monotonic>()
412                        .expect("Expected monotonic result from non empty tree"))
413                }
414                CatalogItem::Index(Index { on, .. }) => {
415                    self.monotonic_object_inner(*on, memo, features)
416                }
417                CatalogItem::Secret(_)
418                | CatalogItem::Type(_)
419                | CatalogItem::Connection(_)
420                | CatalogItem::Log(_)
421                | CatalogItem::MaterializedView(_)
422                | CatalogItem::Sink(_)
423                | CatalogItem::Func(_)
424                | CatalogItem::ContinualTask(_) => Ok(false),
425            }
426        })?;
427
428        memo.insert(id, monotonic);
429
430        Ok(monotonic)
431    }
432}
433
434impl<'a> CheckedRecursion for DataflowBuilder<'a> {
435    fn recursion_guard(&self) -> &RecursionGuard {
436        &self.recursion_guard
437    }
438}
439
440/// Prepares a relation expression for dataflow execution by preparing all
441/// contained scalar expressions (see `prep_scalar_expr`) in the specified
442/// style.
443pub fn prep_relation_expr(
444    expr: &mut OptimizedMirRelationExpr,
445    style: ExprPrepStyle,
446) -> Result<(), OptimizerError> {
447    match style {
448        ExprPrepStyle::Maintained => {
449            expr.0.try_visit_mut_post(&mut |e| {
450                // Carefully test filter expressions, which may represent temporal filters.
451                if let MirRelationExpr::Filter { input, predicates } = &*e {
452                    let mfp =
453                        MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
454                    match mfp.into_plan() {
455                        Err(e) => Err(OptimizerError::UnsupportedTemporalExpression(e)),
456                        Ok(mut mfp) => {
457                            for s in mfp.iter_nontemporal_exprs() {
458                                prep_scalar_expr(s, style)?;
459                            }
460                            Ok(())
461                        }
462                    }
463                } else {
464                    e.try_visit_scalars_mut1(&mut |s| prep_scalar_expr(s, style))
465                }
466            })
467        }
468        ExprPrepStyle::OneShot { .. } | ExprPrepStyle::WebhookValidation { .. } => expr
469            .0
470            .try_visit_scalars_mut(&mut |s| prep_scalar_expr(s, style)),
471    }
472}
473
474/// Prepares a scalar expression for execution by handling unmaterializable
475/// functions.
476///
477/// How we prepare the scalar expression depends on which `style` is specificed.
478///
479/// * `OneShot`: Calls to all unmaterializable functions are replaced.
480/// * `Index`: An error is produced if a call to an unmaterializable function is encountered.
481/// * `AsOfUpTo`: An error is produced if a call to an unmaterializable function is encountered.
482/// * `WebhookValidation`: Only calls to `UnmaterializableFunc::CurrentTimestamp` are replaced,
483///   others are left untouched.
484///
485pub fn prep_scalar_expr(
486    expr: &mut MirScalarExpr,
487    style: ExprPrepStyle,
488) -> Result<(), OptimizerError> {
489    match style {
490        // Evaluate each unmaterializable function and replace the
491        // invocation with the result.
492        ExprPrepStyle::OneShot {
493            logical_time,
494            session,
495            catalog_state,
496        } => expr.try_visit_mut_post(&mut |e| {
497            if let MirScalarExpr::CallUnmaterializable(f) = e {
498                *e = eval_unmaterializable_func(catalog_state, f, logical_time, session)?;
499            }
500            Ok(())
501        }),
502
503        // Reject the query if it contains any unmaterializable function calls.
504        ExprPrepStyle::Maintained => {
505            let mut last_observed_unmaterializable_func = None;
506            expr.visit_mut_post(&mut |e| {
507                if let MirScalarExpr::CallUnmaterializable(f) = e {
508                    last_observed_unmaterializable_func = Some(f.clone());
509                }
510            })?;
511
512            if let Some(f) = last_observed_unmaterializable_func {
513                let err = match style {
514                    ExprPrepStyle::Maintained => OptimizerError::UnmaterializableFunction(f),
515                    _ => unreachable!(),
516                };
517                return Err(err);
518            }
519            Ok(())
520        }
521
522        ExprPrepStyle::WebhookValidation { now } => {
523            expr.try_visit_mut_post(&mut |e| {
524                if let MirScalarExpr::CallUnmaterializable(
525                    f @ UnmaterializableFunc::CurrentTimestamp,
526                ) = e
527                {
528                    let now: Datum = now.try_into()?;
529                    let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
530                    *e = const_expr;
531                }
532                Ok::<_, anyhow::Error>(())
533            })?;
534            Ok(())
535        }
536    }
537}
538
539fn eval_unmaterializable_func(
540    state: &CatalogState,
541    f: &UnmaterializableFunc,
542    logical_time: EvalTime,
543    session: &dyn SessionMetadata,
544) -> Result<MirScalarExpr, OptimizerError> {
545    let pack_1d_array = |datums: Vec<Datum>| {
546        let mut row = Row::default();
547        row.packer()
548            .try_push_array(
549                &[ArrayDimension {
550                    lower_bound: 1,
551                    length: datums.len(),
552                }],
553                datums,
554            )
555            .expect("known to be a valid array");
556        Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
557    };
558    let pack_dict = |mut datums: Vec<(String, String)>| {
559        datums.sort();
560        let mut row = Row::default();
561        row.packer().push_dict(
562            datums
563                .iter()
564                .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
565        );
566        Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
567    };
568    let pack = |datum| {
569        Ok(MirScalarExpr::literal_ok(
570            datum,
571            f.output_type().scalar_type,
572        ))
573    };
574
575    match f {
576        UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
577        UnmaterializableFunc::CurrentSchema => {
578            let search_path = state.resolve_search_path(session);
579            let schema = search_path
580                .first()
581                .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
582            pack(Datum::from(schema))
583        }
584        UnmaterializableFunc::CurrentSchemasWithSystem => {
585            let search_path = state.resolve_search_path(session);
586            let search_path = state.effective_search_path(&search_path, false);
587            pack_1d_array(
588                search_path
589                    .into_iter()
590                    .map(|(db, schema)| {
591                        let schema = state.get_schema(&db, &schema, session.conn_id());
592                        Datum::String(&schema.name.schema)
593                    })
594                    .collect(),
595            )
596        }
597        UnmaterializableFunc::CurrentSchemasWithoutSystem => {
598            let search_path = state.resolve_search_path(session);
599            pack_1d_array(
600                search_path
601                    .into_iter()
602                    .map(|(db, schema)| {
603                        let schema = state.get_schema(&db, &schema, session.conn_id());
604                        Datum::String(&schema.name.schema)
605                    })
606                    .collect(),
607            )
608        }
609        UnmaterializableFunc::ViewableVariables => pack_dict(
610            viewable_variables(state, session)
611                .map(|var| (var.name().to_lowercase(), var.value()))
612                .collect(),
613        ),
614        UnmaterializableFunc::CurrentTimestamp => {
615            let t: Datum = session.pcx().wall_time.try_into()?;
616            pack(t)
617        }
618        UnmaterializableFunc::CurrentUser => pack(Datum::from(
619            state.get_role(session.current_role_id()).name(),
620        )),
621        UnmaterializableFunc::SessionUser => pack(Datum::from(
622            state.get_role(session.session_role_id()).name(),
623        )),
624        UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
625            rbac::is_rbac_enabled_for_session(state.system_config(), session),
626        )),
627        UnmaterializableFunc::MzEnvironmentId => {
628            pack(Datum::from(&*state.config().environment_id.to_string()))
629        }
630        UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
631        UnmaterializableFunc::MzNow => match logical_time {
632            EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
633            EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
634            EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
635                func: UnmaterializableFunc::MzNow,
636                context: "this",
637            }),
638        },
639        UnmaterializableFunc::MzRoleOidMemberships => {
640            let role_memberships = role_oid_memberships(state);
641            let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
642                .into_iter()
643                .map(|(role_id, role_membership)| {
644                    (
645                        role_id.to_string(),
646                        role_membership
647                            .into_iter()
648                            .map(|role_id| role_id.to_string())
649                            .collect(),
650                    )
651                })
652                .collect();
653            role_memberships.sort();
654            let mut row = Row::default();
655            row.packer().push_dict_with(|row| {
656                for (role_id, role_membership) in &role_memberships {
657                    row.push(Datum::from(role_id.as_str()));
658                    row.try_push_array(
659                        &[ArrayDimension {
660                            lower_bound: 1,
661                            length: role_membership.len(),
662                        }],
663                        role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
664                    ).expect("role_membership is 1 dimensional, and its length is used for the array length");
665                }
666            });
667            Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
668        }
669        UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
670        UnmaterializableFunc::MzUptime => {
671            let uptime = state.config().start_instant.elapsed();
672            let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
673            pack(uptime)
674        }
675        UnmaterializableFunc::MzVersion => pack(Datum::from(
676            &*state
677                .config()
678                .build_info
679                .human_version(state.config().helm_chart_version.clone()),
680        )),
681        UnmaterializableFunc::MzVersionNum => {
682            pack(Datum::Int32(state.config().build_info.version_num()))
683        }
684        UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
685            session.conn_id().unhandled(),
686        ))),
687        UnmaterializableFunc::PgPostmasterStartTime => {
688            let t: Datum = state.config().start_time.try_into()?;
689            pack(t)
690        }
691        UnmaterializableFunc::Version => {
692            let build_info = state.config().build_info;
693            let version = format!(
694                "PostgreSQL {}.{} on {} (Materialize {})",
695                SERVER_MAJOR_VERSION,
696                SERVER_MINOR_VERSION,
697                mz_build_info::TARGET_TRIPLE,
698                build_info.version,
699            );
700            pack(Datum::from(&*version))
701        }
702    }
703}
704
705fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
706    let mut role_memberships = BTreeMap::new();
707    for role_id in catalog.get_roles() {
708        let role = catalog.get_role(role_id);
709        if !role_memberships.contains_key(&role.oid) {
710            role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
711        }
712    }
713    role_memberships
714}
715
716fn role_oid_memberships_inner<'a>(
717    catalog: &'a CatalogState,
718    role_id: &RoleId,
719    role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
720) {
721    let role = catalog.get_role(role_id);
722    role_memberships.insert(role.oid, btreeset! {role.oid});
723    for parent_role_id in role.membership.map.keys() {
724        let parent_role = catalog.get_role(parent_role_id);
725        if !role_memberships.contains_key(&parent_role.oid) {
726            role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
727        }
728        let parent_membership: BTreeSet<_> = role_memberships
729            .get(&parent_role.oid)
730            .expect("inserted in recursive call above")
731            .into_iter()
732            .cloned()
733            .collect();
734        role_memberships
735            .get_mut(&role.oid)
736            .expect("inserted above")
737            .extend(parent_membership);
738    }
739}