mz_adapter/optimize/
dataflows.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and methods for building and shipping dataflow descriptions.
11//!
12//! Dataflows are buildable from the coordinator's `catalog` and `indexes`
13//! members, which respectively describe the collection backing identifiers
14//! and indicate which identifiers have arrangements available. This module
15//! isolates that logic from the rest of the somewhat complicated coordinator.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, Source, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30    CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31    RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52/// A reference-less snapshot of a compute instance. There is no guarantee `instance_id` continues
53/// to exist after this has been made.
54#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56    instance_id: ComputeInstanceId,
57    collections: BTreeSet<GlobalId>,
58}
59
60impl ComputeInstanceSnapshot {
61    pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
62        controller
63            .compute
64            .collection_ids(id)
65            .map(|collection_ids| Self {
66                instance_id: id,
67                collections: collection_ids.collect(),
68            })
69    }
70
71    /// Return the ID of this compute instance.
72    pub fn instance_id(&self) -> ComputeInstanceId {
73        self.instance_id
74    }
75
76    /// Reports whether the instance contains the indicated collection.
77    pub fn contains_collection(&self, id: &GlobalId) -> bool {
78        self.collections.contains(id)
79    }
80
81    /// Inserts the given collection into the snapshot.
82    pub fn insert_collection(&mut self, id: GlobalId) {
83        self.collections.insert(id);
84    }
85}
86
87/// Borrows of catalog and indexes sufficient to build dataflow descriptions.
88#[derive(Debug)]
89pub struct DataflowBuilder<'a> {
90    pub catalog: &'a dyn OptimizerCatalog,
91    /// A handle to the compute abstraction, which describes indexes by identifier.
92    ///
93    /// This can also be used to grab a handle to the storage abstraction, through
94    /// its `storage_mut()` method.
95    pub compute: ComputeInstanceSnapshot,
96    /// If set, indicates that the `DataflowBuilder` operates in "replan" mode
97    /// and should consider only catalog items that are strictly less than the
98    /// given [`GlobalId`].
99    ///
100    /// In particular, indexes with higher [`GlobalId`] that are present in the
101    /// catalog will be ignored.
102    ///
103    /// Bound from [`OptimizerConfig::replan`].
104    pub replan: Option<GlobalId>,
105    /// A guard for recursive operations in this [`DataflowBuilder`] instance.
106    recursion_guard: RecursionGuard,
107}
108
109/// The styles in which an expression can be prepared for use in a dataflow.
110#[derive(Clone, Copy, Debug)]
111pub enum ExprPrepStyle<'a> {
112    /// The expression is being prepared for installation as a maintained dataflow, e.g.,
113    /// index, materialized view, or subscribe.
114    Maintained,
115    /// The expression is being prepared to run once at the specified logical
116    /// time in the specified session.
117    OneShot {
118        logical_time: EvalTime,
119        session: &'a dyn SessionMetadata,
120        catalog_state: &'a CatalogState,
121    },
122    /// The expression is being prepared for evaluation in an AS OF or UP TO clause.
123    AsOfUpTo,
124    /// The expression is being prepared for evaluation in a CHECK expression of a webhook source.
125    WebhookValidation {
126        /// Time at which this expression is being evaluated.
127        now: DateTime<Utc>,
128    },
129}
130
131#[derive(Clone, Copy, Debug)]
132pub enum EvalTime {
133    Time(mz_repr::Timestamp),
134    /// Skips mz_now() calls.
135    Deferred,
136    /// Errors on mz_now() calls.
137    NotAvailable,
138}
139
140/// Returns an ID bundle with the given dataflows imports.
141pub fn dataflow_import_id_bundle<P>(
142    dataflow: &DataflowDescription<P>,
143    compute_instance: ComputeInstanceId,
144) -> CollectionIdBundle {
145    let storage_ids = dataflow.source_imports.keys().copied().collect();
146    let compute_ids = dataflow.index_imports.keys().copied().collect();
147    CollectionIdBundle {
148        storage_ids,
149        compute_ids: btreemap! {compute_instance => compute_ids},
150    }
151}
152
153impl<'a> DataflowBuilder<'a> {
154    pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
155        Self {
156            catalog,
157            compute,
158            replan: None,
159            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
160        }
161    }
162
163    // TODO(aalexandrov): strictly speaking it should be better if we can make
164    // `config: &OptimizerConfig` a field in the enclosing builder. However,
165    // before we can do that we should make sure that nobody outside of the
166    // optimizer is using a DataflowBuilder instance.
167    pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
168        self.replan = config.replan;
169        self
170    }
171
172    /// Imports the view, source, or table with `id` into the provided
173    /// dataflow description. [`OptimizerFeatures`] is used while running
174    /// the [`Monotonic`] analysis.
175    pub fn import_into_dataflow(
176        &mut self,
177        id: &GlobalId,
178        dataflow: &mut DataflowDesc,
179        features: &OptimizerFeatures,
180    ) -> Result<(), OptimizerError> {
181        maybe_grow(|| {
182            // Avoid importing the item redundantly.
183            if dataflow.is_imported(id) {
184                return Ok(());
185            }
186
187            let monotonic = self.monotonic_object(*id, features);
188
189            // A valid index is any index on `id` that is known to index oracle.
190            // Here, we import all indexes that belong to all imported collections. Later,
191            // `prune_and_annotate_dataflow_index_imports` runs at the end of the MIR
192            // pipeline, and removes unneeded index imports based on the optimized plan.
193            let mut valid_indexes = self.indexes_on(*id).peekable();
194            if valid_indexes.peek().is_some() {
195                for (index_id, idx) in valid_indexes {
196                    let index_desc = IndexDesc {
197                        on_id: *id,
198                        key: idx.keys.to_vec(),
199                    };
200                    let entry = self.catalog.get_entry(id);
201                    let desc = entry
202                        .desc(
203                            &self
204                                .catalog
205                                .resolve_full_name(entry.name(), entry.conn_id()),
206                        )
207                        .expect("indexes can only be built on items with descs");
208                    dataflow.import_index(index_id, index_desc, desc.typ().clone(), monotonic);
209                }
210            } else {
211                drop(valid_indexes);
212                let entry = self.catalog.get_entry(id);
213                match entry.item() {
214                    CatalogItem::Table(table) => {
215                        dataflow.import_source(*id, table.desc_for(id).typ().clone(), monotonic);
216                    }
217                    CatalogItem::Source(source) => {
218                        dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
219                    }
220                    CatalogItem::View(view) => {
221                        let expr = view.optimized_expr.as_ref();
222                        self.import_view_into_dataflow(id, expr, dataflow, features)?;
223                    }
224                    CatalogItem::MaterializedView(mview) => {
225                        dataflow.import_source(*id, mview.desc.typ().clone(), monotonic);
226                    }
227                    CatalogItem::Log(log) => {
228                        dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
229                    }
230                    CatalogItem::ContinualTask(ct) => {
231                        dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
232                    }
233                    _ => unreachable!(),
234                }
235            }
236            Ok(())
237        })
238    }
239
240    /// Imports the view with the specified ID and expression into the provided
241    /// dataflow description. [`OptimizerFeatures`] is used while running
242    /// expression [`mz_transform::analysis::Analysis`].
243    ///
244    /// You should generally prefer calling
245    /// [`DataflowBuilder::import_into_dataflow`], which can handle objects of
246    /// any type as long as they exist in the catalog. This method exists for
247    /// when the view does not exist in the catalog, e.g., because it is
248    /// identified by a [`GlobalId::Transient`].
249    pub fn import_view_into_dataflow(
250        &mut self,
251        view_id: &GlobalId,
252        view: &OptimizedMirRelationExpr,
253        dataflow: &mut DataflowDesc,
254        features: &OptimizerFeatures,
255    ) -> Result<(), OptimizerError> {
256        for get_id in view.depends_on() {
257            self.import_into_dataflow(&get_id, dataflow, features)?;
258        }
259        dataflow.insert_plan(*view_id, view.clone());
260        Ok(())
261    }
262
263    // Re-optimize the imported view plans using the current optimizer
264    // configuration if reoptimization is requested.
265    pub fn maybe_reoptimize_imported_views(
266        &self,
267        df_desc: &mut DataflowDesc,
268        config: &OptimizerConfig,
269    ) -> Result<(), OptimizerError> {
270        if !config.features.reoptimize_imported_views {
271            return Ok(()); // Do nothing if not explicitly requested.
272        }
273
274        let mut view_optimizer = view::Optimizer::new(config.clone(), None);
275        for desc in df_desc.objects_to_build.iter_mut().rev() {
276            if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
277                continue; // Skip descriptions that do not reference proper views.
278            }
279            if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
280                let _span = tracing::span!(
281                    target: "optimizer",
282                    tracing::Level::DEBUG,
283                    "view",
284                    path.segment = desc.id.to_string()
285                )
286                .entered();
287
288                // Reoptimize the view and update the resulting `desc.plan`.
289                desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
290
291                // Report the optimized plan under this span.
292                trace_plan(desc.plan.as_inner());
293            }
294        }
295
296        Ok(())
297    }
298
299    /// Determine the given source's monotonicity.
300    fn monotonic_source(&self, source: &Source) -> bool {
301        match &source.data_source {
302            DataSourceDesc::Ingestion { ingestion_desc, .. } => {
303                // Check if the primary export of this source is monotonic
304                ingestion_desc
305                    .desc
306                    .primary_export
307                    .monotonic(&ingestion_desc.desc.connection)
308            }
309            DataSourceDesc::Webhook { .. } => true,
310            DataSourceDesc::IngestionExport {
311                ingestion_id,
312                data_config,
313                ..
314            } => {
315                let source_desc = self
316                    .catalog
317                    .get_entry_by_item_id(ingestion_id)
318                    .source_desc()
319                    .expect("ingestion export must reference a source")
320                    .expect("ingestion export must reference a source");
321                data_config.monotonic(&source_desc.connection)
322            }
323            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => false,
324        }
325    }
326
327    /// Determine the given objects's monotonicity.
328    ///
329    /// This recursively traverses the expressions of all views depended on by the given object.
330    /// If this becomes a performance problem, we could add the monotonicity information of views
331    /// into the catalog instead.
332    ///
333    /// Note that materialized views are never monotonic, no matter their definition, because the
334    /// self-correcting persist_sink may insert retractions to correct the contents of its output
335    /// collection.
336    fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
337        self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
338            .unwrap_or_else(|e| {
339                warn!(%id, "error inspecting object for monotonicity: {e}");
340                false
341            })
342    }
343
344    fn monotonic_object_inner(
345        &self,
346        id: GlobalId,
347        memo: &mut BTreeMap<GlobalId, bool>,
348        features: &OptimizerFeatures,
349    ) -> Result<bool, RecursionLimitError> {
350        // An object might be reached multiple times. If we already computed the monotonicity of
351        // the given ID, use that. If not, then compute it and remember the result.
352        if let Some(monotonic) = memo.get(&id) {
353            return Ok(*monotonic);
354        }
355
356        let monotonic = self.checked_recur(|_| {
357            match self.catalog.get_entry(&id).item() {
358                CatalogItem::Source(source) => Ok(self.monotonic_source(source)),
359                CatalogItem::View(View { optimized_expr, .. }) => {
360                    let view_expr = optimized_expr.as_ref().clone().into_inner();
361
362                    // Inspect global ids that occur in the Gets in view_expr, and collect the ids
363                    // of monotonic dependees.
364                    let mut monotonic_ids = BTreeSet::new();
365                    let recursion_result: Result<(), RecursionLimitError> = view_expr
366                        .try_visit_post(&mut |e| {
367                            if let MirRelationExpr::Get {
368                                id: Id::Global(got_id),
369                                ..
370                            } = e
371                            {
372                                if self.monotonic_object_inner(*got_id, memo, features)? {
373                                    monotonic_ids.insert(*got_id);
374                                }
375                            }
376                            Ok(())
377                        });
378                    if let Err(error) = recursion_result {
379                        // We still might have got some of the IDs, so just log and continue. Now
380                        // the subsequent monotonicity analysis can have false negatives.
381                        warn!(%id, "error inspecting view for monotonicity: {error}");
382                    }
383
384                    let mut builder = DerivedBuilder::new(features);
385                    builder.require(Monotonic::new(monotonic_ids.clone()));
386                    let derived = builder.visit(&view_expr);
387
388                    Ok(*derived
389                        .as_view()
390                        .value::<Monotonic>()
391                        .expect("Expected monotonic result from non empty tree"))
392                }
393                CatalogItem::Index(Index { on, .. }) => {
394                    self.monotonic_object_inner(*on, memo, features)
395                }
396                CatalogItem::Secret(_)
397                | CatalogItem::Type(_)
398                | CatalogItem::Connection(_)
399                | CatalogItem::Table(_)
400                | CatalogItem::Log(_)
401                | CatalogItem::MaterializedView(_)
402                | CatalogItem::Sink(_)
403                | CatalogItem::Func(_)
404                | CatalogItem::ContinualTask(_) => Ok(false),
405            }
406        })?;
407
408        memo.insert(id, monotonic);
409
410        Ok(monotonic)
411    }
412}
413
414impl<'a> CheckedRecursion for DataflowBuilder<'a> {
415    fn recursion_guard(&self) -> &RecursionGuard {
416        &self.recursion_guard
417    }
418}
419
420/// Prepares a relation expression for dataflow execution by preparing all
421/// contained scalar expressions (see `prep_scalar_expr`) in the specified
422/// style.
423pub fn prep_relation_expr(
424    expr: &mut OptimizedMirRelationExpr,
425    style: ExprPrepStyle,
426) -> Result<(), OptimizerError> {
427    match style {
428        ExprPrepStyle::Maintained => {
429            expr.0.try_visit_mut_post(&mut |e| {
430                // Carefully test filter expressions, which may represent temporal filters.
431                if let MirRelationExpr::Filter { input, predicates } = &*e {
432                    let mfp =
433                        MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
434                    match mfp.into_plan() {
435                        Err(e) => Err(OptimizerError::UnsupportedTemporalExpression(e)),
436                        Ok(mut mfp) => {
437                            for s in mfp.iter_nontemporal_exprs() {
438                                prep_scalar_expr(s, style)?;
439                            }
440                            Ok(())
441                        }
442                    }
443                } else {
444                    e.try_visit_scalars_mut1(&mut |s| prep_scalar_expr(s, style))
445                }
446            })
447        }
448        ExprPrepStyle::OneShot { .. }
449        | ExprPrepStyle::AsOfUpTo
450        | ExprPrepStyle::WebhookValidation { .. } => expr
451            .0
452            .try_visit_scalars_mut(&mut |s| prep_scalar_expr(s, style)),
453    }
454}
455
456/// Prepares a scalar expression for execution by handling unmaterializable
457/// functions.
458///
459/// How we prepare the scalar expression depends on which `style` is specificed.
460///
461/// * `OneShot`: Calls to all unmaterializable functions are replaced.
462/// * `Index`: An error is produced if a call to an unmaterializable function is encountered.
463/// * `AsOfUpTo`: An error is produced if a call to an unmaterializable function is encountered.
464/// * `WebhookValidation`: Only calls to `UnmaterializableFunc::CurrentTimestamp` are replaced,
465///   others are left untouched.
466///
467pub fn prep_scalar_expr(
468    expr: &mut MirScalarExpr,
469    style: ExprPrepStyle,
470) -> Result<(), OptimizerError> {
471    match style {
472        // Evaluate each unmaterializable function and replace the
473        // invocation with the result.
474        ExprPrepStyle::OneShot {
475            logical_time,
476            session,
477            catalog_state,
478        } => expr.try_visit_mut_post(&mut |e| {
479            if let MirScalarExpr::CallUnmaterializable(f) = e {
480                *e = eval_unmaterializable_func(catalog_state, f, logical_time, session)?;
481            }
482            Ok(())
483        }),
484
485        // Reject the query if it contains any unmaterializable function calls.
486        ExprPrepStyle::Maintained | ExprPrepStyle::AsOfUpTo => {
487            let mut last_observed_unmaterializable_func = None;
488            expr.visit_mut_post(&mut |e| {
489                if let MirScalarExpr::CallUnmaterializable(f) = e {
490                    last_observed_unmaterializable_func = Some(f.clone());
491                }
492            })?;
493
494            if let Some(f) = last_observed_unmaterializable_func {
495                let err = match style {
496                    ExprPrepStyle::Maintained => OptimizerError::UnmaterializableFunction(f),
497                    ExprPrepStyle::AsOfUpTo => OptimizerError::UncallableFunction {
498                        func: f,
499                        context: "AS OF or UP TO",
500                    },
501                    _ => unreachable!(),
502                };
503                return Err(err);
504            }
505            Ok(())
506        }
507
508        ExprPrepStyle::WebhookValidation { now } => {
509            expr.try_visit_mut_post(&mut |e| {
510                if let MirScalarExpr::CallUnmaterializable(
511                    f @ UnmaterializableFunc::CurrentTimestamp,
512                ) = e
513                {
514                    let now: Datum = now.try_into()?;
515                    let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
516                    *e = const_expr;
517                }
518                Ok::<_, anyhow::Error>(())
519            })?;
520            Ok(())
521        }
522    }
523}
524
525fn eval_unmaterializable_func(
526    state: &CatalogState,
527    f: &UnmaterializableFunc,
528    logical_time: EvalTime,
529    session: &dyn SessionMetadata,
530) -> Result<MirScalarExpr, OptimizerError> {
531    let pack_1d_array = |datums: Vec<Datum>| {
532        let mut row = Row::default();
533        row.packer()
534            .try_push_array(
535                &[ArrayDimension {
536                    lower_bound: 1,
537                    length: datums.len(),
538                }],
539                datums,
540            )
541            .expect("known to be a valid array");
542        Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
543    };
544    let pack_dict = |mut datums: Vec<(String, String)>| {
545        datums.sort();
546        let mut row = Row::default();
547        row.packer().push_dict(
548            datums
549                .iter()
550                .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
551        );
552        Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
553    };
554    let pack = |datum| {
555        Ok(MirScalarExpr::literal_ok(
556            datum,
557            f.output_type().scalar_type,
558        ))
559    };
560
561    match f {
562        UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
563        UnmaterializableFunc::CurrentSchema => {
564            let search_path = state.resolve_search_path(session);
565            let schema = search_path
566                .first()
567                .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
568            pack(Datum::from(schema))
569        }
570        UnmaterializableFunc::CurrentSchemasWithSystem => {
571            let search_path = state.resolve_search_path(session);
572            let search_path = state.effective_search_path(&search_path, false);
573            pack_1d_array(
574                search_path
575                    .into_iter()
576                    .map(|(db, schema)| {
577                        let schema = state.get_schema(&db, &schema, session.conn_id());
578                        Datum::String(&schema.name.schema)
579                    })
580                    .collect(),
581            )
582        }
583        UnmaterializableFunc::CurrentSchemasWithoutSystem => {
584            let search_path = state.resolve_search_path(session);
585            pack_1d_array(
586                search_path
587                    .into_iter()
588                    .map(|(db, schema)| {
589                        let schema = state.get_schema(&db, &schema, session.conn_id());
590                        Datum::String(&schema.name.schema)
591                    })
592                    .collect(),
593            )
594        }
595        UnmaterializableFunc::ViewableVariables => pack_dict(
596            viewable_variables(state, session)
597                .map(|var| (var.name().to_lowercase(), var.value()))
598                .collect(),
599        ),
600        UnmaterializableFunc::CurrentTimestamp => {
601            let t: Datum = session.pcx().wall_time.try_into()?;
602            pack(t)
603        }
604        UnmaterializableFunc::CurrentUser => pack(Datum::from(
605            state.get_role(session.current_role_id()).name(),
606        )),
607        UnmaterializableFunc::SessionUser => pack(Datum::from(
608            state.get_role(session.session_role_id()).name(),
609        )),
610        UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
611            rbac::is_rbac_enabled_for_session(state.system_config(), session),
612        )),
613        UnmaterializableFunc::MzEnvironmentId => {
614            pack(Datum::from(&*state.config().environment_id.to_string()))
615        }
616        UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
617        UnmaterializableFunc::MzNow => match logical_time {
618            EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
619            EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
620            EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
621                func: UnmaterializableFunc::MzNow,
622                context: "this",
623            }),
624        },
625        UnmaterializableFunc::MzRoleOidMemberships => {
626            let role_memberships = role_oid_memberships(state);
627            let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
628                .into_iter()
629                .map(|(role_id, role_membership)| {
630                    (
631                        role_id.to_string(),
632                        role_membership
633                            .into_iter()
634                            .map(|role_id| role_id.to_string())
635                            .collect(),
636                    )
637                })
638                .collect();
639            role_memberships.sort();
640            let mut row = Row::default();
641            row.packer().push_dict_with(|row| {
642                for (role_id, role_membership) in &role_memberships {
643                    row.push(Datum::from(role_id.as_str()));
644                    row.try_push_array(
645                        &[ArrayDimension {
646                            lower_bound: 1,
647                            length: role_membership.len(),
648                        }],
649                        role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
650                    ).expect("role_membership is 1 dimensional, and its length is used for the array length");
651                }
652            });
653            Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
654        }
655        UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
656        UnmaterializableFunc::MzUptime => {
657            let uptime = state.config().start_instant.elapsed();
658            let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
659            pack(uptime)
660        }
661        UnmaterializableFunc::MzVersion => pack(Datum::from(
662            &*state
663                .config()
664                .build_info
665                .human_version(state.config().helm_chart_version.clone()),
666        )),
667        UnmaterializableFunc::MzVersionNum => {
668            pack(Datum::Int32(state.config().build_info.version_num()))
669        }
670        UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
671            session.conn_id().unhandled(),
672        ))),
673        UnmaterializableFunc::PgPostmasterStartTime => {
674            let t: Datum = state.config().start_time.try_into()?;
675            pack(t)
676        }
677        UnmaterializableFunc::Version => {
678            let build_info = state.config().build_info;
679            let version = format!(
680                "PostgreSQL {}.{} on {} (Materialize {})",
681                SERVER_MAJOR_VERSION,
682                SERVER_MINOR_VERSION,
683                mz_build_info::TARGET_TRIPLE,
684                build_info.version,
685            );
686            pack(Datum::from(&*version))
687        }
688    }
689}
690
691fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
692    let mut role_memberships = BTreeMap::new();
693    for role_id in catalog.get_roles() {
694        let role = catalog.get_role(role_id);
695        if !role_memberships.contains_key(&role.oid) {
696            role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
697        }
698    }
699    role_memberships
700}
701
702fn role_oid_memberships_inner<'a>(
703    catalog: &'a CatalogState,
704    role_id: &RoleId,
705    role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
706) {
707    let role = catalog.get_role(role_id);
708    role_memberships.insert(role.oid, btreeset! {role.oid});
709    for parent_role_id in role.membership.map.keys() {
710        let parent_role = catalog.get_role(parent_role_id);
711        if !role_memberships.contains_key(&parent_role.oid) {
712            role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
713        }
714        let parent_membership: BTreeSet<_> = role_memberships
715            .get(&parent_role.oid)
716            .expect("inserted in recursive call above")
717            .into_iter()
718            .cloned()
719            .collect();
720        role_memberships
721            .get_mut(&role.oid)
722            .expect("inserted above")
723            .extend(parent_membership);
724    }
725}