mz_adapter/optimize/
dataflows.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and methods for building and shipping dataflow descriptions.
11//!
12//! Dataflows are buildable from the coordinator's `catalog` and `indexes`
13//! members, which respectively describe the collection backing identifiers
14//! and indicate which identifiers have arrangements available. This module
15//! isolates that logic from the rest of the somewhat complicated coordinator.
16
17use std::collections::{BTreeMap, BTreeSet};
18
19use chrono::{DateTime, Utc};
20use maplit::{btreemap, btreeset};
21use tracing::warn;
22
23use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Index, TableDataSource, View};
24use mz_compute_client::controller::error::InstanceMissing;
25use mz_compute_types::ComputeInstanceId;
26use mz_compute_types::dataflows::{DataflowDesc, DataflowDescription, IndexDesc};
27use mz_controller::Controller;
28use mz_expr::visit::Visit;
29use mz_expr::{
30    CollectionPlan, Id, MapFilterProject, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr,
31    RECURSION_LIMIT, UnmaterializableFunc,
32};
33use mz_ore::cast::ReinterpretCast;
34use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError, maybe_grow};
35use mz_repr::adt::array::ArrayDimension;
36use mz_repr::explain::trace_plan;
37use mz_repr::optimize::OptimizerFeatures;
38use mz_repr::role_id::RoleId;
39use mz_repr::{Datum, GlobalId, Row};
40use mz_sql::catalog::CatalogRole;
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_transform::analysis::DerivedBuilder;
44use mz_transform::analysis::monotonic::Monotonic;
45
46use crate::catalog::CatalogState;
47use crate::coord::id_bundle::CollectionIdBundle;
48use crate::optimize::{Optimize, OptimizerCatalog, OptimizerConfig, OptimizerError, view};
49use crate::session::{SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION};
50use crate::util::viewable_variables;
51
52/// A reference-less snapshot of a compute instance. There is no guarantee `instance_id` continues
53/// to exist after this has been made.
54#[derive(Debug, Clone)]
55pub struct ComputeInstanceSnapshot {
56    instance_id: ComputeInstanceId,
57    /// The collections that exist on this compute instance. If it's None, then any collection that
58    /// a caller asks us about is considered to exist.
59    /// TODO(peek-seq): Remove this completely once all callers are able to handle suddenly missing
60    /// collections, in which case we won't need a `ComputeInstanceSnapshot` at all.
61    collections: Option<BTreeSet<GlobalId>>,
62}
63
64impl ComputeInstanceSnapshot {
65    pub fn new(controller: &Controller, id: ComputeInstanceId) -> Result<Self, InstanceMissing> {
66        controller
67            .compute
68            .collection_ids(id)
69            .map(|collection_ids| Self {
70                instance_id: id,
71                collections: Some(collection_ids.collect()),
72            })
73    }
74
75    pub fn new_from_parts(instance_id: ComputeInstanceId, collections: BTreeSet<GlobalId>) -> Self {
76        Self {
77            instance_id,
78            collections: Some(collections),
79        }
80    }
81
82    pub fn new_without_collections(instance_id: ComputeInstanceId) -> Self {
83        Self {
84            instance_id,
85            collections: None,
86        }
87    }
88
89    /// Return the ID of this compute instance.
90    pub fn instance_id(&self) -> ComputeInstanceId {
91        self.instance_id
92    }
93
94    /// Reports whether the instance contains the indicated collection. If the snapshot doesn't
95    /// track collections, then it returns true.
96    pub fn contains_collection(&self, id: &GlobalId) -> bool {
97        self.collections
98            .as_ref()
99            .map_or(true, |collections| collections.contains(id))
100    }
101
102    /// Inserts the given collection into the snapshot.
103    pub fn insert_collection(&mut self, id: GlobalId) {
104        self.collections
105            .as_mut()
106            .expect("insert_collection called on snapshot with None collections")
107            .insert(id);
108    }
109}
110
111/// Borrows of catalog and indexes sufficient to build dataflow descriptions.
112#[derive(Debug)]
113pub struct DataflowBuilder<'a> {
114    pub catalog: &'a dyn OptimizerCatalog,
115    /// A handle to the compute abstraction, which describes indexes by identifier.
116    ///
117    /// This can also be used to grab a handle to the storage abstraction, through
118    /// its `storage_mut()` method.
119    pub compute: ComputeInstanceSnapshot,
120    /// If set, indicates that the `DataflowBuilder` operates in "replan" mode
121    /// and should consider only catalog items that are strictly less than the
122    /// given [`GlobalId`].
123    ///
124    /// In particular, indexes with higher [`GlobalId`] that are present in the
125    /// catalog will be ignored.
126    ///
127    /// Bound from [`OptimizerConfig::replan`].
128    pub replan: Option<GlobalId>,
129    /// A guard for recursive operations in this [`DataflowBuilder`] instance.
130    recursion_guard: RecursionGuard,
131}
132
133/// The styles in which an expression can be prepared for use in a dataflow.
134#[derive(Clone, Copy, Debug)]
135pub enum ExprPrepStyle<'a> {
136    /// The expression is being prepared for installation as a maintained dataflow, e.g.,
137    /// index, materialized view, or subscribe.
138    Maintained,
139    /// The expression is being prepared to run once at the specified logical
140    /// time in the specified session.
141    OneShot {
142        logical_time: EvalTime,
143        session: &'a dyn SessionMetadata,
144        catalog_state: &'a CatalogState,
145    },
146    /// The expression is being prepared for evaluation in a CHECK expression of a webhook source.
147    WebhookValidation {
148        /// Time at which this expression is being evaluated.
149        now: DateTime<Utc>,
150    },
151}
152
153#[derive(Clone, Copy, Debug)]
154pub enum EvalTime {
155    Time(mz_repr::Timestamp),
156    /// Skips mz_now() calls.
157    Deferred,
158    /// Errors on mz_now() calls.
159    NotAvailable,
160}
161
162/// Returns an ID bundle with the given dataflows imports.
163pub fn dataflow_import_id_bundle<P>(
164    dataflow: &DataflowDescription<P>,
165    compute_instance: ComputeInstanceId,
166) -> CollectionIdBundle {
167    let storage_ids = dataflow.source_imports.keys().copied().collect();
168    let compute_ids = dataflow.index_imports.keys().copied().collect();
169    CollectionIdBundle {
170        storage_ids,
171        compute_ids: btreemap! {compute_instance => compute_ids},
172    }
173}
174
175impl<'a> DataflowBuilder<'a> {
176    pub fn new(catalog: &'a dyn OptimizerCatalog, compute: ComputeInstanceSnapshot) -> Self {
177        Self {
178            catalog,
179            compute,
180            replan: None,
181            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
182        }
183    }
184
185    // TODO(aalexandrov): strictly speaking it should be better if we can make
186    // `config: &OptimizerConfig` a field in the enclosing builder. However,
187    // before we can do that we should make sure that nobody outside of the
188    // optimizer is using a DataflowBuilder instance.
189    pub(super) fn with_config(mut self, config: &OptimizerConfig) -> Self {
190        self.replan = config.replan;
191        self
192    }
193
194    /// Imports the view, source, or table with `id` into the provided
195    /// dataflow description. [`OptimizerFeatures`] is used while running
196    /// the [`Monotonic`] analysis.
197    pub fn import_into_dataflow(
198        &mut self,
199        id: &GlobalId,
200        dataflow: &mut DataflowDesc,
201        features: &OptimizerFeatures,
202    ) -> Result<(), OptimizerError> {
203        maybe_grow(|| {
204            // Avoid importing the item redundantly.
205            if dataflow.is_imported(id) {
206                return Ok(());
207            }
208
209            let monotonic = self.monotonic_object(*id, features);
210
211            // A valid index is any index on `id` that is known to index oracle.
212            // Here, we import all indexes that belong to all imported collections. Later,
213            // `prune_and_annotate_dataflow_index_imports` runs at the end of the MIR
214            // pipeline, and removes unneeded index imports based on the optimized plan.
215            let mut valid_indexes = self.indexes_on(*id).peekable();
216            if valid_indexes.peek().is_some() {
217                for (index_id, idx) in valid_indexes {
218                    let index_desc = IndexDesc {
219                        on_id: *id,
220                        key: idx.keys.to_vec(),
221                    };
222                    let entry = self.catalog.get_entry(id);
223                    let desc = entry
224                        .desc(
225                            &self
226                                .catalog
227                                .resolve_full_name(entry.name(), entry.conn_id()),
228                        )
229                        .expect("indexes can only be built on items with descs");
230                    dataflow.import_index(index_id, index_desc, desc.typ().clone(), monotonic);
231                }
232            } else {
233                drop(valid_indexes);
234                let entry = self.catalog.get_entry(id);
235                match entry.item() {
236                    CatalogItem::Table(table) => {
237                        dataflow.import_source(*id, table.desc_for(id).into_typ(), monotonic);
238                    }
239                    CatalogItem::Source(source) => {
240                        dataflow.import_source(*id, source.desc.typ().clone(), monotonic);
241                    }
242                    CatalogItem::View(view) => {
243                        let expr = view.optimized_expr.as_ref();
244                        self.import_view_into_dataflow(id, expr, dataflow, features)?;
245                    }
246                    CatalogItem::MaterializedView(mview) => {
247                        dataflow.import_source(*id, mview.desc_for(id).into_typ(), monotonic);
248                    }
249                    CatalogItem::Log(log) => {
250                        dataflow.import_source(*id, log.variant.desc().typ().clone(), monotonic);
251                    }
252                    CatalogItem::ContinualTask(ct) => {
253                        dataflow.import_source(*id, ct.desc.typ().clone(), monotonic);
254                    }
255                    _ => unreachable!(),
256                }
257            }
258            Ok(())
259        })
260    }
261
262    /// Imports the view with the specified ID and expression into the provided
263    /// dataflow description. [`OptimizerFeatures`] is used while running
264    /// expression [`mz_transform::analysis::Analysis`].
265    ///
266    /// You should generally prefer calling
267    /// [`DataflowBuilder::import_into_dataflow`], which can handle objects of
268    /// any type as long as they exist in the catalog. This method exists for
269    /// when the view does not exist in the catalog, e.g., because it is
270    /// identified by a [`GlobalId::Transient`].
271    pub fn import_view_into_dataflow(
272        &mut self,
273        view_id: &GlobalId,
274        view: &OptimizedMirRelationExpr,
275        dataflow: &mut DataflowDesc,
276        features: &OptimizerFeatures,
277    ) -> Result<(), OptimizerError> {
278        for get_id in view.depends_on() {
279            self.import_into_dataflow(&get_id, dataflow, features)?;
280        }
281        dataflow.insert_plan(*view_id, view.clone());
282        Ok(())
283    }
284
285    // Re-optimize the imported view plans using the current optimizer
286    // configuration if reoptimization is requested.
287    pub fn maybe_reoptimize_imported_views(
288        &self,
289        df_desc: &mut DataflowDesc,
290        config: &OptimizerConfig,
291    ) -> Result<(), OptimizerError> {
292        if !config.features.reoptimize_imported_views {
293            return Ok(()); // Do nothing if not explicitly requested.
294        }
295
296        let mut view_optimizer = view::Optimizer::new(config.clone(), None);
297        for desc in df_desc.objects_to_build.iter_mut().rev() {
298            if matches!(desc.id, GlobalId::Explain | GlobalId::Transient(_)) {
299                continue; // Skip descriptions that do not reference proper views.
300            }
301            if let CatalogItem::View(view) = &self.catalog.get_entry(&desc.id).item {
302                let _span = tracing::span!(
303                    target: "optimizer",
304                    tracing::Level::DEBUG,
305                    "view",
306                    path.segment = desc.id.to_string()
307                )
308                .entered();
309
310                // Reoptimize the view and update the resulting `desc.plan`.
311                desc.plan = view_optimizer.optimize(view.raw_expr.as_ref().clone())?;
312
313                // Report the optimized plan under this span.
314                trace_plan(desc.plan.as_inner());
315            }
316        }
317
318        Ok(())
319    }
320
321    /// Determine the given source's monotonicity.
322    fn monotonic_source(&self, data_source: &DataSourceDesc) -> bool {
323        match data_source {
324            DataSourceDesc::Ingestion { .. } => false,
325            DataSourceDesc::OldSyntaxIngestion {
326                desc, data_config, ..
327            } => data_config.monotonic(&desc.connection),
328            DataSourceDesc::Webhook { .. } => true,
329            DataSourceDesc::IngestionExport {
330                ingestion_id,
331                data_config,
332                ..
333            } => {
334                let source_desc = self
335                    .catalog
336                    .get_entry_by_item_id(ingestion_id)
337                    .source_desc()
338                    .expect("ingestion export must reference a source")
339                    .expect("ingestion export must reference a source");
340                data_config.monotonic(&source_desc.connection)
341            }
342            DataSourceDesc::Introspection(_) | DataSourceDesc::Progress => false,
343        }
344    }
345
346    /// Determine the given objects's monotonicity.
347    ///
348    /// This recursively traverses the expressions of all views depended on by the given object.
349    /// If this becomes a performance problem, we could add the monotonicity information of views
350    /// into the catalog instead.
351    ///
352    /// Note that materialized views are never monotonic, no matter their definition, because the
353    /// self-correcting persist_sink may insert retractions to correct the contents of its output
354    /// collection.
355    fn monotonic_object(&self, id: GlobalId, features: &OptimizerFeatures) -> bool {
356        self.monotonic_object_inner(id, &mut BTreeMap::new(), features)
357            .unwrap_or_else(|e| {
358                warn!(%id, "error inspecting object for monotonicity: {e}");
359                false
360            })
361    }
362
363    fn monotonic_object_inner(
364        &self,
365        id: GlobalId,
366        memo: &mut BTreeMap<GlobalId, bool>,
367        features: &OptimizerFeatures,
368    ) -> Result<bool, RecursionLimitError> {
369        // An object might be reached multiple times. If we already computed the monotonicity of
370        // the given ID, use that. If not, then compute it and remember the result.
371        if let Some(monotonic) = memo.get(&id) {
372            return Ok(*monotonic);
373        }
374
375        let monotonic = self.checked_recur(|_| {
376            match self.catalog.get_entry(&id).item() {
377                CatalogItem::Source(source) => Ok(self.monotonic_source(&source.data_source)),
378                CatalogItem::Table(table) => match &table.data_source {
379                    TableDataSource::TableWrites { .. } => Ok(false),
380                    TableDataSource::DataSource { desc, timeline: _ } => {
381                        Ok(self.monotonic_source(desc))
382                    }
383                },
384                CatalogItem::View(View { optimized_expr, .. }) => {
385                    let view_expr = optimized_expr.as_ref().clone().into_inner();
386
387                    // Inspect global ids that occur in the Gets in view_expr, and collect the ids
388                    // of monotonic dependees.
389                    let mut monotonic_ids = BTreeSet::new();
390                    let recursion_result: Result<(), RecursionLimitError> = view_expr
391                        .try_visit_post(&mut |e| {
392                            if let MirRelationExpr::Get {
393                                id: Id::Global(got_id),
394                                ..
395                            } = e
396                            {
397                                if self.monotonic_object_inner(*got_id, memo, features)? {
398                                    monotonic_ids.insert(*got_id);
399                                }
400                            }
401                            Ok(())
402                        });
403                    if let Err(error) = recursion_result {
404                        // We still might have got some of the IDs, so just log and continue. Now
405                        // the subsequent monotonicity analysis can have false negatives.
406                        warn!(%id, "error inspecting view for monotonicity: {error}");
407                    }
408
409                    let mut builder = DerivedBuilder::new(features);
410                    builder.require(Monotonic::new(monotonic_ids.clone()));
411                    let derived = builder.visit(&view_expr);
412
413                    Ok(*derived
414                        .as_view()
415                        .value::<Monotonic>()
416                        .expect("Expected monotonic result from non empty tree"))
417                }
418                CatalogItem::Index(Index { on, .. }) => {
419                    self.monotonic_object_inner(*on, memo, features)
420                }
421                CatalogItem::Secret(_)
422                | CatalogItem::Type(_)
423                | CatalogItem::Connection(_)
424                | CatalogItem::Log(_)
425                | CatalogItem::MaterializedView(_)
426                | CatalogItem::Sink(_)
427                | CatalogItem::Func(_)
428                | CatalogItem::ContinualTask(_) => Ok(false),
429            }
430        })?;
431
432        memo.insert(id, monotonic);
433
434        Ok(monotonic)
435    }
436}
437
438impl<'a> CheckedRecursion for DataflowBuilder<'a> {
439    fn recursion_guard(&self) -> &RecursionGuard {
440        &self.recursion_guard
441    }
442}
443
444/// Prepares a relation expression for dataflow execution by preparing all
445/// contained scalar expressions (see `prep_scalar_expr`) in the specified
446/// style.
447pub fn prep_relation_expr(
448    expr: &mut OptimizedMirRelationExpr,
449    style: ExprPrepStyle,
450) -> Result<(), OptimizerError> {
451    match style {
452        ExprPrepStyle::Maintained => {
453            expr.0.try_visit_mut_post(&mut |e| {
454                // Carefully test filter expressions, which may represent temporal filters.
455                if let MirRelationExpr::Filter { input, predicates } = &*e {
456                    let mfp =
457                        MapFilterProject::new(input.arity()).filter(predicates.iter().cloned());
458                    match mfp.into_plan() {
459                        Err(e) => Err(OptimizerError::UnsupportedTemporalExpression(e)),
460                        Ok(mut mfp) => {
461                            for s in mfp.iter_nontemporal_exprs() {
462                                prep_scalar_expr(s, style)?;
463                            }
464                            Ok(())
465                        }
466                    }
467                } else {
468                    e.try_visit_scalars_mut1(&mut |s| prep_scalar_expr(s, style))
469                }
470            })
471        }
472        ExprPrepStyle::OneShot { .. } | ExprPrepStyle::WebhookValidation { .. } => expr
473            .0
474            .try_visit_scalars_mut(&mut |s| prep_scalar_expr(s, style)),
475    }
476}
477
478/// Prepares a scalar expression for execution by handling unmaterializable
479/// functions.
480///
481/// How we prepare the scalar expression depends on which `style` is specificed.
482///
483/// * `OneShot`: Calls to all unmaterializable functions are replaced.
484/// * `Index`: An error is produced if a call to an unmaterializable function is encountered.
485/// * `AsOfUpTo`: An error is produced if a call to an unmaterializable function is encountered.
486/// * `WebhookValidation`: Only calls to `UnmaterializableFunc::CurrentTimestamp` are replaced,
487///   others are left untouched.
488///
489pub fn prep_scalar_expr(
490    expr: &mut MirScalarExpr,
491    style: ExprPrepStyle,
492) -> Result<(), OptimizerError> {
493    match style {
494        // Evaluate each unmaterializable function and replace the
495        // invocation with the result.
496        ExprPrepStyle::OneShot {
497            logical_time,
498            session,
499            catalog_state,
500        } => expr.try_visit_mut_post(&mut |e| {
501            if let MirScalarExpr::CallUnmaterializable(f) = e {
502                *e = eval_unmaterializable_func(catalog_state, f, logical_time, session)?;
503            }
504            Ok(())
505        }),
506
507        // Reject the query if it contains any unmaterializable function calls.
508        ExprPrepStyle::Maintained => {
509            let mut last_observed_unmaterializable_func = None;
510            expr.visit_mut_post(&mut |e| {
511                if let MirScalarExpr::CallUnmaterializable(f) = e {
512                    last_observed_unmaterializable_func = Some(f.clone());
513                }
514            })?;
515
516            if let Some(f) = last_observed_unmaterializable_func {
517                let err = match style {
518                    ExprPrepStyle::Maintained => OptimizerError::UnmaterializableFunction(f),
519                    _ => unreachable!(),
520                };
521                return Err(err);
522            }
523            Ok(())
524        }
525
526        ExprPrepStyle::WebhookValidation { now } => {
527            expr.try_visit_mut_post(&mut |e| {
528                if let MirScalarExpr::CallUnmaterializable(
529                    f @ UnmaterializableFunc::CurrentTimestamp,
530                ) = e
531                {
532                    let now: Datum = now.try_into()?;
533                    let const_expr = MirScalarExpr::literal_ok(now, f.output_type().scalar_type);
534                    *e = const_expr;
535                }
536                Ok::<_, anyhow::Error>(())
537            })?;
538            Ok(())
539        }
540    }
541}
542
543fn eval_unmaterializable_func(
544    state: &CatalogState,
545    f: &UnmaterializableFunc,
546    logical_time: EvalTime,
547    session: &dyn SessionMetadata,
548) -> Result<MirScalarExpr, OptimizerError> {
549    let pack_1d_array = |datums: Vec<Datum>| {
550        let mut row = Row::default();
551        row.packer()
552            .try_push_array(
553                &[ArrayDimension {
554                    lower_bound: 1,
555                    length: datums.len(),
556                }],
557                datums,
558            )
559            .expect("known to be a valid array");
560        Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
561    };
562    let pack_dict = |mut datums: Vec<(String, String)>| {
563        datums.sort();
564        let mut row = Row::default();
565        row.packer().push_dict(
566            datums
567                .iter()
568                .map(|(key, value)| (key.as_str(), Datum::from(value.as_str()))),
569        );
570        Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
571    };
572    let pack = |datum| {
573        Ok(MirScalarExpr::literal_ok(
574            datum,
575            f.output_type().scalar_type,
576        ))
577    };
578
579    match f {
580        UnmaterializableFunc::CurrentDatabase => pack(Datum::from(session.database())),
581        UnmaterializableFunc::CurrentSchema => {
582            let search_path = state.resolve_search_path(session);
583            let schema = search_path
584                .first()
585                .map(|(db, schema)| &*state.get_schema(db, schema, session.conn_id()).name.schema);
586            pack(Datum::from(schema))
587        }
588        UnmaterializableFunc::CurrentSchemasWithSystem => {
589            let search_path = state.resolve_search_path(session);
590            let search_path = state.effective_search_path(&search_path, false);
591            pack_1d_array(
592                search_path
593                    .into_iter()
594                    .map(|(db, schema)| {
595                        let schema = state.get_schema(&db, &schema, session.conn_id());
596                        Datum::String(&schema.name.schema)
597                    })
598                    .collect(),
599            )
600        }
601        UnmaterializableFunc::CurrentSchemasWithoutSystem => {
602            let search_path = state.resolve_search_path(session);
603            pack_1d_array(
604                search_path
605                    .into_iter()
606                    .map(|(db, schema)| {
607                        let schema = state.get_schema(&db, &schema, session.conn_id());
608                        Datum::String(&schema.name.schema)
609                    })
610                    .collect(),
611            )
612        }
613        UnmaterializableFunc::ViewableVariables => pack_dict(
614            viewable_variables(state, session)
615                .map(|var| (var.name().to_lowercase(), var.value()))
616                .collect(),
617        ),
618        UnmaterializableFunc::CurrentTimestamp => {
619            let t: Datum = session.pcx().wall_time.try_into()?;
620            pack(t)
621        }
622        UnmaterializableFunc::CurrentUser => pack(Datum::from(
623            state.get_role(session.current_role_id()).name(),
624        )),
625        UnmaterializableFunc::SessionUser => pack(Datum::from(
626            state.get_role(session.session_role_id()).name(),
627        )),
628        UnmaterializableFunc::IsRbacEnabled => pack(Datum::from(
629            rbac::is_rbac_enabled_for_session(state.system_config(), session),
630        )),
631        UnmaterializableFunc::MzEnvironmentId => {
632            pack(Datum::from(&*state.config().environment_id.to_string()))
633        }
634        UnmaterializableFunc::MzIsSuperuser => pack(Datum::from(session.is_superuser())),
635        UnmaterializableFunc::MzNow => match logical_time {
636            EvalTime::Time(logical_time) => pack(Datum::MzTimestamp(logical_time)),
637            EvalTime::Deferred => Ok(MirScalarExpr::CallUnmaterializable(f.clone())),
638            EvalTime::NotAvailable => Err(OptimizerError::UncallableFunction {
639                func: UnmaterializableFunc::MzNow,
640                context: "this",
641            }),
642        },
643        UnmaterializableFunc::MzRoleOidMemberships => {
644            let role_memberships = role_oid_memberships(state);
645            let mut role_memberships: Vec<(_, Vec<_>)> = role_memberships
646                .into_iter()
647                .map(|(role_id, role_membership)| {
648                    (
649                        role_id.to_string(),
650                        role_membership
651                            .into_iter()
652                            .map(|role_id| role_id.to_string())
653                            .collect(),
654                    )
655                })
656                .collect();
657            role_memberships.sort();
658            let mut row = Row::default();
659            row.packer().push_dict_with(|row| {
660                for (role_id, role_membership) in &role_memberships {
661                    row.push(Datum::from(role_id.as_str()));
662                    row.try_push_array(
663                        &[ArrayDimension {
664                            lower_bound: 1,
665                            length: role_membership.len(),
666                        }],
667                        role_membership.iter().map(|role_id| Datum::from(role_id.as_str())),
668                    ).expect("role_membership is 1 dimensional, and its length is used for the array length");
669                }
670            });
671            Ok(MirScalarExpr::Literal(Ok(row), f.output_type()))
672        }
673        UnmaterializableFunc::MzSessionId => pack(Datum::from(state.config().session_id)),
674        UnmaterializableFunc::MzUptime => {
675            let uptime = state.config().start_instant.elapsed();
676            let uptime = chrono::Duration::from_std(uptime).map_or(Datum::Null, Datum::from);
677            pack(uptime)
678        }
679        UnmaterializableFunc::MzVersion => pack(Datum::from(
680            &*state
681                .config()
682                .build_info
683                .human_version(state.config().helm_chart_version.clone()),
684        )),
685        UnmaterializableFunc::MzVersionNum => {
686            pack(Datum::Int32(state.config().build_info.version_num()))
687        }
688        UnmaterializableFunc::PgBackendPid => pack(Datum::Int32(i32::reinterpret_cast(
689            session.conn_id().unhandled(),
690        ))),
691        UnmaterializableFunc::PgPostmasterStartTime => {
692            let t: Datum = state.config().start_time.try_into()?;
693            pack(t)
694        }
695        UnmaterializableFunc::Version => {
696            let build_info = state.config().build_info;
697            let version = format!(
698                "PostgreSQL {}.{} on {} (Materialize {})",
699                SERVER_MAJOR_VERSION,
700                SERVER_MINOR_VERSION,
701                mz_build_info::TARGET_TRIPLE,
702                build_info.version,
703            );
704            pack(Datum::from(&*version))
705        }
706    }
707}
708
709fn role_oid_memberships<'a>(catalog: &'a CatalogState) -> BTreeMap<u32, BTreeSet<u32>> {
710    let mut role_memberships = BTreeMap::new();
711    for role_id in catalog.get_roles() {
712        let role = catalog.get_role(role_id);
713        if !role_memberships.contains_key(&role.oid) {
714            role_oid_memberships_inner(catalog, role_id, &mut role_memberships);
715        }
716    }
717    role_memberships
718}
719
720fn role_oid_memberships_inner<'a>(
721    catalog: &'a CatalogState,
722    role_id: &RoleId,
723    role_memberships: &mut BTreeMap<u32, BTreeSet<u32>>,
724) {
725    let role = catalog.get_role(role_id);
726    role_memberships.insert(role.oid, btreeset! {role.oid});
727    for parent_role_id in role.membership.map.keys() {
728        let parent_role = catalog.get_role(parent_role_id);
729        if !role_memberships.contains_key(&parent_role.oid) {
730            role_oid_memberships_inner(catalog, parent_role_id, role_memberships);
731        }
732        let parent_membership: BTreeSet<_> = role_memberships
733            .get(&parent_role.oid)
734            .expect("inserted in recursive call above")
735            .into_iter()
736            .cloned()
737            .collect();
738        role_memberships
739            .get_mut(&role.oid)
740            .expect("inserted above")
741            .extend(parent_membership);
742    }
743}