Skip to main content

mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20    MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_CONTINUAL_TASKS,
21    MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES,
22    MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
23    MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25    MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26    MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27    MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
28    MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
29    MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35    CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
36    Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55    CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76// DO NOT add any more imports from `crate` outside of `crate::catalog`.
77use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81/// An update to a built-in table.
82#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84    /// The reference of the table to update.
85    pub id: T,
86    /// The data to put into the table.
87    pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
92    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93        BuiltinTableUpdate {
94            id,
95            data: TableData::Rows(vec![(row, diff)]),
96        }
97    }
98
99    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100        BuiltinTableUpdate {
101            id,
102            data: TableData::Batches(smallvec![batch]),
103        }
104    }
105}
106
107impl CatalogState {
108    pub fn resolve_builtin_table_updates(
109        &self,
110        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112        builtin_table_update
113            .into_iter()
114            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115            .collect()
116    }
117
118    pub fn resolve_builtin_table_update(
119        &self,
120        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121    ) -> BuiltinTableUpdate<CatalogItemId> {
122        let id = self.resolve_builtin_table(id);
123        BuiltinTableUpdate { id, data }
124    }
125
126    pub fn pack_depends_update(
127        &self,
128        depender: CatalogItemId,
129        dependee: CatalogItemId,
130        diff: Diff,
131    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132        let row = Row::pack_slice(&[
133            Datum::String(&depender.to_string()),
134            Datum::String(&dependee.to_string()),
135        ]);
136        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137    }
138
139    pub(super) fn pack_role_auth_update(
140        &self,
141        id: RoleId,
142        diff: Diff,
143    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144        let role_auth = self.get_role_auth(&id);
145        let role = self.get_role(&id);
146        BuiltinTableUpdate::row(
147            &*MZ_ROLE_AUTH,
148            Row::pack_slice(&[
149                Datum::String(&role_auth.role_id.to_string()),
150                Datum::UInt32(role.oid),
151                match &role_auth.password_hash {
152                    Some(hash) => Datum::String(hash),
153                    None => Datum::Null,
154                },
155                Datum::TimestampTz(
156                    mz_ore::now::to_datetime(role_auth.updated_at)
157                        .try_into()
158                        .expect("must fit"),
159                ),
160            ]),
161            diff,
162        )
163    }
164
165    pub(super) fn pack_role_update(
166        &self,
167        id: RoleId,
168        diff: Diff,
169    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170        match id {
171            // PUBLIC role should not show up in mz_roles.
172            RoleId::Public => vec![],
173            id => {
174                let role = self.get_role(&id);
175                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177                let rolcanlogin = if let Some(login) = role.attributes.login {
178                    login
179                } else {
180                    builtin_supers.contains(&role.id)
181                };
182
183                let rolsuper = if let Some(superuser) = role.attributes.superuser {
184                    Datum::from(superuser)
185                } else if builtin_supers.contains(&role.id) {
186                    // System roles (mz_system, mz_support) are superusers
187                    Datum::from(true)
188                } else {
189                    // In cloud environments, superuser status is determined
190                    // at login, so we set `rolsuper` to `null` here because
191                    // it cannot be known beforehand. For self-managed
192                    // auth, roles created without an explicit SUPERUSER
193                    // attribute would typically have `rolsuper` set to false.
194                    // However, since we cannot reliably distinguish between
195                    // cloud and self-managed here, we conservatively use NULL
196                    // for indeterminate cases.
197                    Datum::Null
198                };
199
200                let role_update = BuiltinTableUpdate::row(
201                    &*MZ_ROLES,
202                    Row::pack_slice(&[
203                        Datum::String(&role.id.to_string()),
204                        Datum::UInt32(role.oid),
205                        Datum::String(&role.name),
206                        Datum::from(role.attributes.inherit),
207                        Datum::from(rolcanlogin),
208                        rolsuper,
209                    ]),
210                    diff,
211                );
212                let mut updates = vec![role_update];
213
214                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
215                // we should instead have a static list of all session vars.
216                let session_vars_reference = SessionVars::new_unchecked(
217                    &mz_build_info::DUMMY_BUILD_INFO,
218                    SYSTEM_USER.clone(),
219                    None,
220                );
221
222                for (name, val) in role.vars() {
223                    let result = session_vars_reference
224                        .inspect(name)
225                        .and_then(|var| var.check(val.borrow()));
226                    let Ok(formatted_val) = result else {
227                        // Note: all variables should have been validated by this point, so we
228                        // shouldn't ever hit this.
229                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
230                        continue;
231                    };
232
233                    let role_var_update = BuiltinTableUpdate::row(
234                        &*MZ_ROLE_PARAMETERS,
235                        Row::pack_slice(&[
236                            Datum::String(&role.id.to_string()),
237                            Datum::String(name),
238                            Datum::String(&formatted_val),
239                        ]),
240                        diff,
241                    );
242                    updates.push(role_var_update);
243                }
244
245                updates
246            }
247        }
248    }
249
250    pub(super) fn pack_cluster_update(
251        &self,
252        name: &str,
253        diff: Diff,
254    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255        let id = self.clusters_by_name[name];
256        let cluster = &self.clusters_by_id[&id];
257        let row = self.pack_privilege_array_row(cluster.privileges());
258        let privileges = row.unpack_first();
259        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260            match &cluster.config.variant {
261                ClusterVariant::Managed(config) => (
262                    Some(config.size.as_str()),
263                    Some(self.cluster_replica_size_has_disk(&config.size)),
264                    Some(config.replication_factor),
265                    if config.availability_zones.is_empty() {
266                        None
267                    } else {
268                        Some(config.availability_zones.clone())
269                    },
270                    Some(config.logging.log_logging),
271                    config.logging.interval.map(|d| {
272                        Interval::from_duration(&d)
273                            .expect("planning ensured this convertible back to interval")
274                    }),
275                ),
276                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277            };
278
279        let mut row = Row::default();
280        let mut packer = row.packer();
281        packer.extend([
282            Datum::String(&id.to_string()),
283            Datum::String(name),
284            Datum::String(&cluster.owner_id.to_string()),
285            privileges,
286            cluster.is_managed().into(),
287            size.into(),
288            replication_factor.into(),
289            disk.into(),
290        ]);
291        if let Some(azs) = azs {
292            packer.push_list(azs.iter().map(|az| Datum::String(az)));
293        } else {
294            packer.push(Datum::Null);
295        }
296        packer.push(Datum::from(introspection_debugging));
297        packer.push(Datum::from(introspection_interval));
298
299        let mut updates = Vec::new();
300
301        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304            let row = match managed_config.schedule {
305                ClusterSchedule::Manual => Row::pack_slice(&[
306                    Datum::String(&id.to_string()),
307                    Datum::String("manual"),
308                    Datum::Null,
309                ]),
310                ClusterSchedule::Refresh {
311                    hydration_time_estimate,
312                } => Row::pack_slice(&[
313                    Datum::String(&id.to_string()),
314                    Datum::String("on-refresh"),
315                    Datum::Interval(
316                        Interval::from_duration(&hydration_time_estimate)
317                            .expect("planning ensured that this is convertible back to Interval"),
318                    ),
319                ]),
320            };
321            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322        }
323
324        updates
325    }
326
327    pub(super) fn pack_cluster_replica_update(
328        &self,
329        cluster_id: ClusterId,
330        name: &str,
331        diff: Diff,
332    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333        let cluster = &self.clusters_by_id[&cluster_id];
334        let id = cluster.replica_id(name).expect("Must exist");
335        let replica = cluster.replica(id).expect("Must exist");
336
337        let (size, disk, az) = match &replica.config.location {
338            // TODO(guswynn): The column should be `availability_zones`, not
339            // `availability_zone`.
340            ReplicaLocation::Managed(ManagedReplicaLocation {
341                size,
342                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343                allocation: _,
344                billed_as: _,
345                internal: _,
346                pending: _,
347            }) => (
348                Some(&**size),
349                Some(self.cluster_replica_size_has_disk(size)),
350                Some(az.as_str()),
351            ),
352            ReplicaLocation::Managed(ManagedReplicaLocation {
353                size,
354                availability_zones: _,
355                allocation: _,
356                billed_as: _,
357                internal: _,
358                pending: _,
359            }) => (
360                Some(&**size),
361                Some(self.cluster_replica_size_has_disk(size)),
362                None,
363            ),
364            ReplicaLocation::Unmanaged(_) => (None, None, None),
365        };
366
367        let cluster_replica_update = BuiltinTableUpdate::row(
368            &*MZ_CLUSTER_REPLICAS,
369            Row::pack_slice(&[
370                Datum::String(&id.to_string()),
371                Datum::String(name),
372                Datum::String(&cluster_id.to_string()),
373                Datum::from(size),
374                Datum::from(az),
375                Datum::String(&replica.owner_id.to_string()),
376                Datum::from(disk),
377            ]),
378            diff,
379        );
380
381        vec![cluster_replica_update]
382    }
383
384    pub(super) fn pack_item_update(
385        &self,
386        id: CatalogItemId,
387        diff: Diff,
388    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389        let entry = self.get_entry(&id);
390        let oid = entry.oid();
391        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392        let schema_id = &self
393            .get_schema(
394                &entry.name().qualifiers.database_spec,
395                &entry.name().qualifiers.schema_spec,
396                conn_id,
397            )
398            .id;
399        let name = &entry.name().item;
400        let owner_id = entry.owner_id();
401        let privileges_row = self.pack_privilege_array_row(entry.privileges());
402        let privileges = privileges_row.unpack_first();
403        let mut updates = match entry.item() {
404            CatalogItem::Log(_) => self.pack_source_update(
405                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
406                privileges, diff, None,
407            ),
408            CatalogItem::Index(index) => {
409                self.pack_index_update(id, oid, name, owner_id, index, diff)
410            }
411            CatalogItem::Table(table) => {
412                let mut updates = self
413                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
414
415                if let TableDataSource::DataSource {
416                    desc: data_source,
417                    timeline: _,
418                } = &table.data_source
419                {
420                    updates.extend(match data_source {
421                        DataSourceDesc::IngestionExport {
422                            ingestion_id,
423                            external_reference: UnresolvedItemName(external_reference),
424                            details: _,
425                            data_config: _,
426                        } => {
427                            let ingestion_entry = self
428                                .get_entry(ingestion_id)
429                                .source_desc()
430                                .expect("primary source exists")
431                                .expect("primary source is a source");
432
433                            match ingestion_entry.connection.name() {
434                                "postgres" => {
435                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
436                                    // The left-most qualification of Postgres
437                                    // tables is the database, but this
438                                    // information is redundant because each
439                                    // Postgres connection connects to only one
440                                    // database.
441                                    let schema_name = external_reference[1].as_str();
442                                    let table_name = external_reference[2].as_str();
443
444                                    self.pack_postgres_source_tables_update(
445                                        id,
446                                        schema_name,
447                                        table_name,
448                                        diff,
449                                    )
450                                }
451                                "mysql" => {
452                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
453                                    let schema_name = external_reference[0].as_str();
454                                    let table_name = external_reference[1].as_str();
455
456                                    self.pack_mysql_source_tables_update(
457                                        id,
458                                        schema_name,
459                                        table_name,
460                                        diff,
461                                    )
462                                }
463                                "sql-server" => {
464                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
465                                    // The left-most qualification of SQL Server tables is
466                                    // the database, but this information is redundant
467                                    // because each SQL Server connection connects to
468                                    // only one database.
469                                    let schema_name = external_reference[1].as_str();
470                                    let table_name = external_reference[2].as_str();
471
472                                    self.pack_sql_server_source_table_update(
473                                        id,
474                                        schema_name,
475                                        table_name,
476                                        diff,
477                                    )
478                                }
479                                // Load generator sources don't have any special
480                                // updates.
481                                "load-generator" => vec![],
482                                "kafka" => {
483                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
484                                    let topic = external_reference[0].as_str();
485                                    let envelope = data_source.envelope();
486                                    let (key_format, value_format) = data_source.formats();
487
488                                    self.pack_kafka_source_tables_update(
489                                        id,
490                                        topic,
491                                        envelope,
492                                        key_format,
493                                        value_format,
494                                        diff,
495                                    )
496                                }
497                                s => unreachable!("{s} sources do not have tables"),
498                            }
499                        }
500                        DataSourceDesc::Ingestion { .. }
501                        | DataSourceDesc::OldSyntaxIngestion { .. }
502                        | DataSourceDesc::Introspection(_)
503                        | DataSourceDesc::Progress
504                        | DataSourceDesc::Webhook { .. }
505                        | DataSourceDesc::Catalog => vec![],
506                    });
507                }
508
509                updates
510            }
511            CatalogItem::Source(source) => {
512                let source_type = source.source_type();
513                let connection_id = source.connection_id();
514                let envelope = source.data_source.envelope();
515                let cluster_entry = match source.data_source {
516                    // Ingestion exports don't have their own cluster, but
517                    // run on their ingestion's cluster.
518                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
519                        self.get_entry(&ingestion_id)
520                    }
521                    DataSourceDesc::Ingestion { .. }
522                    | DataSourceDesc::OldSyntaxIngestion { .. }
523                    | DataSourceDesc::Introspection(_)
524                    | DataSourceDesc::Progress
525                    | DataSourceDesc::Webhook { .. }
526                    | DataSourceDesc::Catalog => entry,
527                };
528
529                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
530
531                let (key_format, value_format) = source.data_source.formats();
532
533                let mut updates = self.pack_source_update(
534                    id,
535                    oid,
536                    schema_id,
537                    name,
538                    source_type,
539                    connection_id,
540                    envelope,
541                    key_format,
542                    value_format,
543                    cluster_id.as_deref(),
544                    owner_id,
545                    privileges,
546                    diff,
547                    source.create_sql.as_ref(),
548                );
549
550                updates.extend(match &source.data_source {
551                    DataSourceDesc::Ingestion { desc, .. }
552                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
553                        GenericSourceConnection::Postgres(postgres) => {
554                            self.pack_postgres_source_update(id, postgres, diff)
555                        }
556                        GenericSourceConnection::Kafka(kafka) => {
557                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
558                        }
559                        _ => vec![],
560                    },
561                    DataSourceDesc::IngestionExport {
562                        ingestion_id,
563                        external_reference: UnresolvedItemName(external_reference),
564                        details: _,
565                        data_config: _,
566                    } => {
567                        let ingestion_entry = self
568                            .get_entry(ingestion_id)
569                            .source_desc()
570                            .expect("primary source exists")
571                            .expect("primary source is a source");
572
573                        match ingestion_entry.connection.name() {
574                            "postgres" => {
575                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
576                                // The left-most qualification of Postgres
577                                // tables is the database, but this
578                                // information is redundant because each
579                                // Postgres connection connects to only one
580                                // database.
581                                let schema_name = external_reference[1].as_str();
582                                let table_name = external_reference[2].as_str();
583
584                                self.pack_postgres_source_tables_update(
585                                    id,
586                                    schema_name,
587                                    table_name,
588                                    diff,
589                                )
590                            }
591                            "mysql" => {
592                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
593                                let schema_name = external_reference[0].as_str();
594                                let table_name = external_reference[1].as_str();
595
596                                self.pack_mysql_source_tables_update(
597                                    id,
598                                    schema_name,
599                                    table_name,
600                                    diff,
601                                )
602                            }
603                            "sql-server" => {
604                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
605                                // The left-most qualification of SQL Server tables is
606                                // the database, but this information is redundant
607                                // because each SQL Server connection connects to
608                                // only one database.
609                                let schema_name = external_reference[1].as_str();
610                                let table_name = external_reference[2].as_str();
611
612                                self.pack_sql_server_source_table_update(
613                                    id,
614                                    schema_name,
615                                    table_name,
616                                    diff,
617                                )
618                            }
619                            // Load generator sources don't have any special
620                            // updates.
621                            "load-generator" => vec![],
622                            s => unreachable!("{s} sources do not have subsources"),
623                        }
624                    }
625                    DataSourceDesc::Webhook { .. } => {
626                        vec![self.pack_webhook_source_update(id, diff)]
627                    }
628                    DataSourceDesc::Introspection(_)
629                    | DataSourceDesc::Progress
630                    | DataSourceDesc::Catalog => vec![],
631                });
632
633                updates
634            }
635            CatalogItem::View(view) => {
636                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
637            }
638            CatalogItem::MaterializedView(mview) => {
639                self.pack_materialized_view_update(id, mview, diff)
640            }
641            CatalogItem::Sink(sink) => {
642                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
643            }
644            CatalogItem::Type(ty) => {
645                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
646            }
647            CatalogItem::Func(func) => {
648                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
649            }
650            CatalogItem::Secret(_) => vec![],
651            CatalogItem::Connection(connection) => {
652                self.pack_connection_update(id, connection, diff)
653            }
654            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
655                id, oid, schema_id, name, owner_id, privileges, ct, diff,
656            ),
657        };
658
659        if !entry.item().is_temporary() {
660            // Populate or clean up the `mz_object_dependencies` table.
661            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
662            for dependee in entry.item().references().items() {
663                updates.push(self.pack_depends_update(id, *dependee, diff))
664            }
665        }
666
667        // Always report the latest for an objects columns.
668        if let Some(desc) = entry.relation_desc_latest() {
669            let defaults = match entry.item() {
670                CatalogItem::Table(Table {
671                    data_source: TableDataSource::TableWrites { defaults },
672                    ..
673                }) => Some(defaults),
674                _ => None,
675            };
676            for (i, (column_name, column_type)) in desc.iter().enumerate() {
677                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
678                let default: Datum = default
679                    .as_ref()
680                    .map(|d| Datum::String(d))
681                    .unwrap_or(Datum::Null);
682                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
683                let (type_name, type_oid) = match &column_type.scalar_type {
684                    SqlScalarType::List {
685                        custom_id: Some(custom_id),
686                        ..
687                    }
688                    | SqlScalarType::Map {
689                        custom_id: Some(custom_id),
690                        ..
691                    }
692                    | SqlScalarType::Record {
693                        custom_id: Some(custom_id),
694                        ..
695                    } => {
696                        let entry = self.get_entry(custom_id);
697                        // NOTE(benesch): the `mz_columns.type text` field is
698                        // wrong. Types do not have a name that can be
699                        // represented as a single textual field. There can be
700                        // multiple types with the same name in different
701                        // schemas and databases. We should eventually deprecate
702                        // the `type` field in favor of a new `type_id` field
703                        // that can be joined against `mz_types`.
704                        //
705                        // For now, in the interest of pragmatism, we just use
706                        // the type's item name, and accept that there may be
707                        // ambiguity if the same type name is used in multiple
708                        // schemas. The ambiguity is mitigated by the OID, which
709                        // can be joined against `mz_types.oid` to resolve the
710                        // ambiguity.
711                        let name = &*entry.name().item;
712                        let oid = entry.oid();
713                        (name, oid)
714                    }
715                    _ => (pgtype.name(), pgtype.oid()),
716                };
717                updates.push(BuiltinTableUpdate::row(
718                    &*MZ_COLUMNS,
719                    Row::pack_slice(&[
720                        Datum::String(&id.to_string()),
721                        Datum::String(column_name),
722                        Datum::UInt64(u64::cast_from(i + 1)),
723                        Datum::from(column_type.nullable),
724                        Datum::String(type_name),
725                        default,
726                        Datum::UInt32(type_oid),
727                        Datum::Int32(pgtype.typmod()),
728                    ]),
729                    diff,
730                ));
731            }
732        }
733
734        // Use initial lcw so that we can tell apart default from non-existent windows.
735        if let Some(cw) = entry.item().initial_logical_compaction_window() {
736            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
737        }
738
739        updates.extend(Self::pack_item_global_id_update(entry, diff));
740
741        updates
742    }
743
744    fn pack_item_global_id_update(
745        entry: &CatalogEntry,
746        diff: Diff,
747    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
748        let id = entry.id().to_string();
749        let global_ids = entry.global_ids();
750        global_ids.map(move |global_id| {
751            BuiltinTableUpdate::row(
752                &*MZ_OBJECT_GLOBAL_IDS,
753                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
754                diff,
755            )
756        })
757    }
758
759    fn pack_history_retention_strategy_update(
760        &self,
761        id: CatalogItemId,
762        cw: CompactionWindow,
763        diff: Diff,
764    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
765        let cw: u64 = cw.comparable_timestamp().into();
766        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
767            .expect("must serialize");
768        BuiltinTableUpdate::row(
769            &*MZ_HISTORY_RETENTION_STRATEGIES,
770            Row::pack_slice(&[
771                Datum::String(&id.to_string()),
772                // FOR is the only strategy at the moment. We may introduce FROM or others later.
773                Datum::String("FOR"),
774                cw.into_row().into_element(),
775            ]),
776            diff,
777        )
778    }
779
780    fn pack_table_update(
781        &self,
782        id: CatalogItemId,
783        oid: u32,
784        schema_id: &SchemaSpecifier,
785        name: &str,
786        owner_id: &RoleId,
787        privileges: Datum,
788        diff: Diff,
789        table: &Table,
790    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
791        let redacted = table.create_sql.as_ref().map(|create_sql| {
792            mz_sql::parse::parse(create_sql)
793                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
794                .into_element()
795                .ast
796                .to_ast_string_redacted()
797        });
798        let source_id = if let TableDataSource::DataSource {
799            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
800            ..
801        } = &table.data_source
802        {
803            Some(ingestion_id.to_string())
804        } else {
805            None
806        };
807
808        vec![BuiltinTableUpdate::row(
809            &*MZ_TABLES,
810            Row::pack_slice(&[
811                Datum::String(&id.to_string()),
812                Datum::UInt32(oid),
813                Datum::String(&schema_id.to_string()),
814                Datum::String(name),
815                Datum::String(&owner_id.to_string()),
816                privileges,
817                if let Some(create_sql) = &table.create_sql {
818                    Datum::String(create_sql)
819                } else {
820                    Datum::Null
821                },
822                if let Some(redacted) = &redacted {
823                    Datum::String(redacted)
824                } else {
825                    Datum::Null
826                },
827                if let Some(source_id) = source_id.as_ref() {
828                    Datum::String(source_id)
829                } else {
830                    Datum::Null
831                },
832            ]),
833            diff,
834        )]
835    }
836
837    fn pack_source_update(
838        &self,
839        id: CatalogItemId,
840        oid: u32,
841        schema_id: &SchemaSpecifier,
842        name: &str,
843        source_desc_name: &str,
844        connection_id: Option<CatalogItemId>,
845        envelope: Option<&str>,
846        key_format: Option<&str>,
847        value_format: Option<&str>,
848        cluster_id: Option<&str>,
849        owner_id: &RoleId,
850        privileges: Datum,
851        diff: Diff,
852        create_sql: Option<&String>,
853    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
854        let redacted = create_sql.map(|create_sql| {
855            let create_stmt = mz_sql::parse::parse(create_sql)
856                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
857                .into_element()
858                .ast;
859            create_stmt.to_ast_string_redacted()
860        });
861        vec![BuiltinTableUpdate::row(
862            &*MZ_SOURCES,
863            Row::pack_slice(&[
864                Datum::String(&id.to_string()),
865                Datum::UInt32(oid),
866                Datum::String(&schema_id.to_string()),
867                Datum::String(name),
868                Datum::String(source_desc_name),
869                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
870                // This is the "source size", which is a remnant from linked
871                // clusters.
872                Datum::Null,
873                Datum::from(envelope),
874                Datum::from(key_format),
875                Datum::from(value_format),
876                Datum::from(cluster_id),
877                Datum::String(&owner_id.to_string()),
878                privileges,
879                if let Some(create_sql) = create_sql {
880                    Datum::String(create_sql)
881                } else {
882                    Datum::Null
883                },
884                if let Some(redacted) = &redacted {
885                    Datum::String(redacted)
886                } else {
887                    Datum::Null
888                },
889            ]),
890            diff,
891        )]
892    }
893
894    fn pack_postgres_source_update(
895        &self,
896        id: CatalogItemId,
897        postgres: &PostgresSourceConnection<ReferencedConnection>,
898        diff: Diff,
899    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
900        vec![BuiltinTableUpdate::row(
901            &*MZ_POSTGRES_SOURCES,
902            Row::pack_slice(&[
903                Datum::String(&id.to_string()),
904                Datum::String(&postgres.publication_details.slot),
905                Datum::from(postgres.publication_details.timeline_id),
906            ]),
907            diff,
908        )]
909    }
910
911    fn pack_kafka_source_update(
912        &self,
913        item_id: CatalogItemId,
914        collection_id: GlobalId,
915        kafka: &KafkaSourceConnection<ReferencedConnection>,
916        diff: Diff,
917    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
918        vec![BuiltinTableUpdate::row(
919            &*MZ_KAFKA_SOURCES,
920            Row::pack_slice(&[
921                Datum::String(&item_id.to_string()),
922                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
923                Datum::String(&kafka.topic),
924            ]),
925            diff,
926        )]
927    }
928
929    fn pack_postgres_source_tables_update(
930        &self,
931        id: CatalogItemId,
932        schema_name: &str,
933        table_name: &str,
934        diff: Diff,
935    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
936        vec![BuiltinTableUpdate::row(
937            &*MZ_POSTGRES_SOURCE_TABLES,
938            Row::pack_slice(&[
939                Datum::String(&id.to_string()),
940                Datum::String(schema_name),
941                Datum::String(table_name),
942            ]),
943            diff,
944        )]
945    }
946
947    fn pack_mysql_source_tables_update(
948        &self,
949        id: CatalogItemId,
950        schema_name: &str,
951        table_name: &str,
952        diff: Diff,
953    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
954        vec![BuiltinTableUpdate::row(
955            &*MZ_MYSQL_SOURCE_TABLES,
956            Row::pack_slice(&[
957                Datum::String(&id.to_string()),
958                Datum::String(schema_name),
959                Datum::String(table_name),
960            ]),
961            diff,
962        )]
963    }
964
965    fn pack_sql_server_source_table_update(
966        &self,
967        id: CatalogItemId,
968        schema_name: &str,
969        table_name: &str,
970        diff: Diff,
971    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
972        vec![BuiltinTableUpdate::row(
973            &*MZ_SQL_SERVER_SOURCE_TABLES,
974            Row::pack_slice(&[
975                Datum::String(&id.to_string()),
976                Datum::String(schema_name),
977                Datum::String(table_name),
978            ]),
979            diff,
980        )]
981    }
982
983    fn pack_kafka_source_tables_update(
984        &self,
985        id: CatalogItemId,
986        topic: &str,
987        envelope: Option<&str>,
988        key_format: Option<&str>,
989        value_format: Option<&str>,
990        diff: Diff,
991    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
992        vec![BuiltinTableUpdate::row(
993            &*MZ_KAFKA_SOURCE_TABLES,
994            Row::pack_slice(&[
995                Datum::String(&id.to_string()),
996                Datum::String(topic),
997                Datum::from(envelope),
998                Datum::from(key_format),
999                Datum::from(value_format),
1000            ]),
1001            diff,
1002        )]
1003    }
1004
1005    fn pack_connection_update(
1006        &self,
1007        id: CatalogItemId,
1008        connection: &Connection,
1009        diff: Diff,
1010    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1011        let mut updates = vec![];
1012        match connection.details {
1013            ConnectionDetails::Kafka(ref kafka) => {
1014                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1015            }
1016            ConnectionDetails::Aws(ref aws_config) => {
1017                match self.pack_aws_connection_update(id, aws_config, diff) {
1018                    Ok(update) => {
1019                        updates.push(update);
1020                    }
1021                    Err(e) => {
1022                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1023                    }
1024                }
1025            }
1026            ConnectionDetails::AwsPrivatelink(_) => {
1027                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1028                    updates.push(self.pack_aws_privatelink_connection_update(
1029                        id,
1030                        aws_principal_context,
1031                        diff,
1032                    ));
1033                } else {
1034                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1035                }
1036            }
1037            ConnectionDetails::Ssh {
1038                ref key_1,
1039                ref key_2,
1040                ..
1041            } => {
1042                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1043            }
1044            ConnectionDetails::Csr(_)
1045            | ConnectionDetails::Postgres(_)
1046            | ConnectionDetails::MySql(_)
1047            | ConnectionDetails::SqlServer(_)
1048            | ConnectionDetails::IcebergCatalog(_) => (),
1049        };
1050        updates
1051    }
1052
1053    pub(crate) fn pack_ssh_tunnel_connection_update(
1054        &self,
1055        id: CatalogItemId,
1056        key_1: &SshKey,
1057        key_2: &SshKey,
1058        diff: Diff,
1059    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1060        BuiltinTableUpdate::row(
1061            &*MZ_SSH_TUNNEL_CONNECTIONS,
1062            Row::pack_slice(&[
1063                Datum::String(&id.to_string()),
1064                Datum::String(key_1.public_key().as_str()),
1065                Datum::String(key_2.public_key().as_str()),
1066            ]),
1067            diff,
1068        )
1069    }
1070
1071    fn pack_kafka_connection_update(
1072        &self,
1073        id: CatalogItemId,
1074        kafka: &KafkaConnection<ReferencedConnection>,
1075        diff: Diff,
1076    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1077        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1078        let mut row = Row::default();
1079        row.packer()
1080            .try_push_array(
1081                &[ArrayDimension {
1082                    lower_bound: 1,
1083                    length: kafka.brokers.len(),
1084                }],
1085                kafka
1086                    .brokers
1087                    .iter()
1088                    .map(|broker| Datum::String(&broker.address)),
1089            )
1090            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1091        let brokers = row.unpack_first();
1092        vec![BuiltinTableUpdate::row(
1093            &*MZ_KAFKA_CONNECTIONS,
1094            Row::pack_slice(&[
1095                Datum::String(&id.to_string()),
1096                brokers,
1097                Datum::String(&progress_topic),
1098            ]),
1099            diff,
1100        )]
1101    }
1102
1103    pub fn pack_aws_privatelink_connection_update(
1104        &self,
1105        connection_id: CatalogItemId,
1106        aws_principal_context: &AwsPrincipalContext,
1107        diff: Diff,
1108    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1109        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1110        let row = Row::pack_slice(&[
1111            Datum::String(&connection_id.to_string()),
1112            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1113        ]);
1114        BuiltinTableUpdate::row(id, row, diff)
1115    }
1116
1117    pub fn pack_aws_connection_update(
1118        &self,
1119        connection_id: CatalogItemId,
1120        aws_config: &AwsConnection,
1121        diff: Diff,
1122    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1123        let id = &MZ_AWS_CONNECTIONS;
1124
1125        let mut access_key_id = None;
1126        let mut access_key_id_secret_id = None;
1127        let mut secret_access_key_secret_id = None;
1128        let mut session_token = None;
1129        let mut session_token_secret_id = None;
1130        let mut assume_role_arn = None;
1131        let mut assume_role_session_name = None;
1132        let mut principal = None;
1133        let mut external_id = None;
1134        let mut example_trust_policy = None;
1135        match &aws_config.auth {
1136            AwsAuth::Credentials(credentials) => {
1137                match &credentials.access_key_id {
1138                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1139                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1140                }
1141                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1142                match credentials.session_token.as_ref() {
1143                    None => (),
1144                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1145                    Some(StringOrSecret::Secret(s)) => {
1146                        session_token_secret_id = Some(s.to_string())
1147                    }
1148                }
1149            }
1150            AwsAuth::AssumeRole(assume_role) => {
1151                assume_role_arn = Some(assume_role.arn.as_str());
1152                assume_role_session_name = assume_role.session_name.as_deref();
1153                principal = self
1154                    .config
1155                    .connection_context
1156                    .aws_connection_role_arn
1157                    .as_deref();
1158                external_id =
1159                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1160                example_trust_policy = {
1161                    let policy = assume_role
1162                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1163                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1164                    Some(policy.into_row())
1165                };
1166            }
1167        }
1168
1169        let row = Row::pack_slice(&[
1170            Datum::String(&connection_id.to_string()),
1171            Datum::from(aws_config.endpoint.as_deref()),
1172            Datum::from(aws_config.region.as_deref()),
1173            Datum::from(access_key_id),
1174            Datum::from(access_key_id_secret_id.as_deref()),
1175            Datum::from(secret_access_key_secret_id.as_deref()),
1176            Datum::from(session_token),
1177            Datum::from(session_token_secret_id.as_deref()),
1178            Datum::from(assume_role_arn),
1179            Datum::from(assume_role_session_name),
1180            Datum::from(principal),
1181            Datum::from(external_id.as_deref()),
1182            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1183        ]);
1184
1185        Ok(BuiltinTableUpdate::row(id, row, diff))
1186    }
1187
1188    fn pack_view_update(
1189        &self,
1190        id: CatalogItemId,
1191        oid: u32,
1192        schema_id: &SchemaSpecifier,
1193        name: &str,
1194        owner_id: &RoleId,
1195        privileges: Datum,
1196        view: &View,
1197        diff: Diff,
1198    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1199        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1200            .unwrap_or_else(|e| {
1201                panic!(
1202                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1203                    view.create_sql, e
1204                )
1205            })
1206            .into_element()
1207            .ast;
1208        let query = match &create_stmt {
1209            Statement::CreateView(stmt) => &stmt.definition.query,
1210            _ => unreachable!(),
1211        };
1212
1213        let mut query_string = query.to_ast_string_stable();
1214        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1215        // do the same for compatibility's sake.
1216        query_string.push(';');
1217
1218        vec![BuiltinTableUpdate::row(
1219            &*MZ_VIEWS,
1220            Row::pack_slice(&[
1221                Datum::String(&id.to_string()),
1222                Datum::UInt32(oid),
1223                Datum::String(&schema_id.to_string()),
1224                Datum::String(name),
1225                Datum::String(&query_string),
1226                Datum::String(&owner_id.to_string()),
1227                privileges,
1228                Datum::String(&view.create_sql),
1229                Datum::String(&create_stmt.to_ast_string_redacted()),
1230            ]),
1231            diff,
1232        )]
1233    }
1234
1235    fn pack_materialized_view_update(
1236        &self,
1237        id: CatalogItemId,
1238        mview: &MaterializedView,
1239        diff: Diff,
1240    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1241        let mut updates = Vec::new();
1242
1243        if let Some(refresh_schedule) = &mview.refresh_schedule {
1244            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1245            // empty `RefreshSchedule`.
1246            assert!(!refresh_schedule.is_empty());
1247            for RefreshEvery {
1248                interval,
1249                aligned_to,
1250            } in refresh_schedule.everies.iter()
1251            {
1252                let aligned_to_dt = mz_ore::now::to_datetime(
1253                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1254                );
1255                updates.push(BuiltinTableUpdate::row(
1256                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1257                    Row::pack_slice(&[
1258                        Datum::String(&id.to_string()),
1259                        Datum::String("every"),
1260                        Datum::Interval(
1261                            Interval::from_duration(interval).expect(
1262                                "planning ensured that this is convertible back to Interval",
1263                            ),
1264                        ),
1265                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1266                        Datum::Null,
1267                    ]),
1268                    diff,
1269                ));
1270            }
1271            for at in refresh_schedule.ats.iter() {
1272                let at_dt = mz_ore::now::to_datetime(
1273                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1274                );
1275                updates.push(BuiltinTableUpdate::row(
1276                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1277                    Row::pack_slice(&[
1278                        Datum::String(&id.to_string()),
1279                        Datum::String("at"),
1280                        Datum::Null,
1281                        Datum::Null,
1282                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1283                    ]),
1284                    diff,
1285                ));
1286            }
1287        } else {
1288            updates.push(BuiltinTableUpdate::row(
1289                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1290                Row::pack_slice(&[
1291                    Datum::String(&id.to_string()),
1292                    Datum::String("on-commit"),
1293                    Datum::Null,
1294                    Datum::Null,
1295                    Datum::Null,
1296                ]),
1297                diff,
1298            ));
1299        }
1300
1301        if let Some(target_id) = mview.replacement_target {
1302            updates.push(BuiltinTableUpdate::row(
1303                &*MZ_REPLACEMENTS,
1304                Row::pack_slice(&[
1305                    Datum::String(&id.to_string()),
1306                    Datum::String(&target_id.to_string()),
1307                ]),
1308                diff,
1309            ));
1310        }
1311
1312        updates
1313    }
1314
1315    fn pack_continual_task_update(
1316        &self,
1317        id: CatalogItemId,
1318        oid: u32,
1319        schema_id: &SchemaSpecifier,
1320        name: &str,
1321        owner_id: &RoleId,
1322        privileges: Datum,
1323        ct: &ContinualTask,
1324        diff: Diff,
1325    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1326        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1327            .unwrap_or_else(|e| {
1328                panic!(
1329                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1330                    ct.create_sql, e
1331                )
1332            })
1333            .into_element()
1334            .ast;
1335        let query_string = match &create_stmt {
1336            Statement::CreateContinualTask(stmt) => {
1337                let mut query_string = String::new();
1338                for stmt in &stmt.stmts {
1339                    let s = match stmt {
1340                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1341                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1342                    };
1343                    if query_string.is_empty() {
1344                        query_string = s;
1345                    } else {
1346                        query_string.push_str("; ");
1347                        query_string.push_str(&s);
1348                    }
1349                }
1350                query_string
1351            }
1352            _ => unreachable!(),
1353        };
1354
1355        vec![BuiltinTableUpdate::row(
1356            &*MZ_CONTINUAL_TASKS,
1357            Row::pack_slice(&[
1358                Datum::String(&id.to_string()),
1359                Datum::UInt32(oid),
1360                Datum::String(&schema_id.to_string()),
1361                Datum::String(name),
1362                Datum::String(&ct.cluster_id.to_string()),
1363                Datum::String(&query_string),
1364                Datum::String(&owner_id.to_string()),
1365                privileges,
1366                Datum::String(&ct.create_sql),
1367                Datum::String(&create_stmt.to_ast_string_redacted()),
1368            ]),
1369            diff,
1370        )]
1371    }
1372
1373    fn pack_sink_update(
1374        &self,
1375        id: CatalogItemId,
1376        oid: u32,
1377        schema_id: &SchemaSpecifier,
1378        name: &str,
1379        owner_id: &RoleId,
1380        sink: &Sink,
1381        diff: Diff,
1382    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1383        let mut updates = vec![];
1384        match &sink.connection {
1385            StorageSinkConnection::Kafka(KafkaSinkConnection {
1386                topic: topic_name, ..
1387            }) => {
1388                updates.push(BuiltinTableUpdate::row(
1389                    &*MZ_KAFKA_SINKS,
1390                    Row::pack_slice(&[
1391                        Datum::String(&id.to_string()),
1392                        Datum::String(topic_name.as_str()),
1393                    ]),
1394                    diff,
1395                ));
1396            }
1397            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1398                namespace, table, ..
1399            }) => {
1400                updates.push(BuiltinTableUpdate::row(
1401                    &*MZ_ICEBERG_SINKS,
1402                    Row::pack_slice(&[
1403                        Datum::String(&id.to_string()),
1404                        Datum::String(namespace.as_str()),
1405                        Datum::String(table.as_str()),
1406                    ]),
1407                    diff,
1408                ));
1409            }
1410        };
1411
1412        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1413            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1414            .into_element()
1415            .ast;
1416
1417        let envelope = sink.envelope();
1418
1419        // The combined format string is used for the deprecated `format` column.
1420        let combined_format = sink.combined_format();
1421        let (key_format, value_format) = match sink.formats() {
1422            Some((key_format, value_format)) => (key_format, Some(value_format)),
1423            None => (None, None),
1424        };
1425
1426        updates.push(BuiltinTableUpdate::row(
1427            &*MZ_SINKS,
1428            Row::pack_slice(&[
1429                Datum::String(&id.to_string()),
1430                Datum::UInt32(oid),
1431                Datum::String(&schema_id.to_string()),
1432                Datum::String(name),
1433                Datum::String(sink.connection.name()),
1434                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1435                // size column now deprecated w/o linked clusters
1436                Datum::Null,
1437                Datum::from(envelope),
1438                // FIXME: These key/value formats are kinda leaky! Should probably live in
1439                // the kafka sink table.
1440                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1441                Datum::from(key_format),
1442                Datum::from(value_format),
1443                Datum::String(&sink.cluster_id.to_string()),
1444                Datum::String(&owner_id.to_string()),
1445                Datum::String(&sink.create_sql),
1446                Datum::String(&create_stmt.to_ast_string_redacted()),
1447            ]),
1448            diff,
1449        ));
1450
1451        updates
1452    }
1453
1454    fn pack_index_update(
1455        &self,
1456        id: CatalogItemId,
1457        oid: u32,
1458        name: &str,
1459        owner_id: &RoleId,
1460        index: &Index,
1461        diff: Diff,
1462    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1463        let mut updates = vec![];
1464
1465        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1466            .unwrap_or_else(|e| {
1467                panic!(
1468                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1469                    index.create_sql, e
1470                )
1471            })
1472            .into_element()
1473            .ast;
1474
1475        let key_sqls = match &create_stmt {
1476            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1477                .as_ref()
1478                .expect("key_parts is filled in during planning"),
1479            _ => unreachable!(),
1480        };
1481        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1482
1483        updates.push(BuiltinTableUpdate::row(
1484            &*MZ_INDEXES,
1485            Row::pack_slice(&[
1486                Datum::String(&id.to_string()),
1487                Datum::UInt32(oid),
1488                Datum::String(name),
1489                Datum::String(&on_item_id.to_string()),
1490                Datum::String(&index.cluster_id.to_string()),
1491                Datum::String(&owner_id.to_string()),
1492                Datum::String(&index.create_sql),
1493                Datum::String(&create_stmt.to_ast_string_redacted()),
1494            ]),
1495            diff,
1496        ));
1497
1498        let on_entry = self.get_entry_by_global_id(&index.on);
1499        let on_desc = on_entry
1500            .relation_desc()
1501            .expect("can only create indexes on items with a valid description");
1502        let repr_col_types: Vec<ReprColumnType> = on_desc
1503            .typ()
1504            .column_types
1505            .iter()
1506            .map(ReprColumnType::from)
1507            .collect();
1508        for (i, key) in index.keys.iter().enumerate() {
1509            let nullable = key.typ(&repr_col_types).nullable;
1510            let seq_in_index = u64::cast_from(i + 1);
1511            let key_sql = key_sqls
1512                .get(i)
1513                .expect("missing sql information for index key")
1514                .to_ast_string_simple();
1515            let (field_number, expression) = match key {
1516                MirScalarExpr::Column(col, _) => {
1517                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1518                }
1519                _ => (Datum::Null, Datum::String(&key_sql)),
1520            };
1521            updates.push(BuiltinTableUpdate::row(
1522                &*MZ_INDEX_COLUMNS,
1523                Row::pack_slice(&[
1524                    Datum::String(&id.to_string()),
1525                    Datum::UInt64(seq_in_index),
1526                    field_number,
1527                    expression,
1528                    Datum::from(nullable),
1529                ]),
1530                diff,
1531            ));
1532        }
1533
1534        updates
1535    }
1536
1537    fn pack_type_update(
1538        &self,
1539        id: CatalogItemId,
1540        oid: u32,
1541        schema_id: &SchemaSpecifier,
1542        name: &str,
1543        owner_id: &RoleId,
1544        privileges: Datum,
1545        typ: &Type,
1546        diff: Diff,
1547    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1548        let mut out = vec![];
1549
1550        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1551            mz_sql::parse::parse(create_sql)
1552                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1553                .into_element()
1554                .ast
1555                .to_ast_string_redacted()
1556        });
1557
1558        out.push(BuiltinTableUpdate::row(
1559            &*MZ_TYPES,
1560            Row::pack_slice(&[
1561                Datum::String(&id.to_string()),
1562                Datum::UInt32(oid),
1563                Datum::String(&schema_id.to_string()),
1564                Datum::String(name),
1565                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1566                Datum::String(&owner_id.to_string()),
1567                privileges,
1568                if let Some(create_sql) = &typ.create_sql {
1569                    Datum::String(create_sql)
1570                } else {
1571                    Datum::Null
1572                },
1573                if let Some(redacted) = &redacted {
1574                    Datum::String(redacted)
1575                } else {
1576                    Datum::Null
1577                },
1578            ]),
1579            diff,
1580        ));
1581
1582        let mut row = Row::default();
1583        let mut packer = row.packer();
1584
1585        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1586            if mods.is_empty() {
1587                packer.push(Datum::Null);
1588            } else {
1589                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1590            }
1591        }
1592
1593        let index_id = match &typ.details.typ {
1594            CatalogType::Array {
1595                element_reference: element_id,
1596            } => {
1597                packer.push(Datum::String(&id.to_string()));
1598                packer.push(Datum::String(&element_id.to_string()));
1599                &MZ_ARRAY_TYPES
1600            }
1601            CatalogType::List {
1602                element_reference: element_id,
1603                element_modifiers,
1604            } => {
1605                packer.push(Datum::String(&id.to_string()));
1606                packer.push(Datum::String(&element_id.to_string()));
1607                append_modifier(&mut packer, element_modifiers);
1608                &MZ_LIST_TYPES
1609            }
1610            CatalogType::Map {
1611                key_reference: key_id,
1612                value_reference: value_id,
1613                key_modifiers,
1614                value_modifiers,
1615            } => {
1616                packer.push(Datum::String(&id.to_string()));
1617                packer.push(Datum::String(&key_id.to_string()));
1618                packer.push(Datum::String(&value_id.to_string()));
1619                append_modifier(&mut packer, key_modifiers);
1620                append_modifier(&mut packer, value_modifiers);
1621                &MZ_MAP_TYPES
1622            }
1623            CatalogType::Pseudo => {
1624                packer.push(Datum::String(&id.to_string()));
1625                &MZ_PSEUDO_TYPES
1626            }
1627            _ => {
1628                packer.push(Datum::String(&id.to_string()));
1629                &MZ_BASE_TYPES
1630            }
1631        };
1632        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1633
1634        if let Some(pg_metadata) = &typ.details.pg_metadata {
1635            out.push(BuiltinTableUpdate::row(
1636                &*MZ_TYPE_PG_METADATA,
1637                Row::pack_slice(&[
1638                    Datum::String(&id.to_string()),
1639                    Datum::UInt32(pg_metadata.typinput_oid),
1640                    Datum::UInt32(pg_metadata.typreceive_oid),
1641                ]),
1642                diff,
1643            ));
1644        }
1645
1646        out
1647    }
1648
1649    fn pack_func_update(
1650        &self,
1651        id: CatalogItemId,
1652        schema_id: &SchemaSpecifier,
1653        name: &str,
1654        owner_id: &RoleId,
1655        func: &Func,
1656        diff: Diff,
1657    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1658        let mut updates = vec![];
1659        for func_impl_details in func.inner.func_impls() {
1660            let arg_type_ids = func_impl_details
1661                .arg_typs
1662                .iter()
1663                .map(|typ| self.get_system_type(typ).id().to_string())
1664                .collect::<Vec<_>>();
1665
1666            let mut row = Row::default();
1667            row.packer()
1668                .try_push_array(
1669                    &[ArrayDimension {
1670                        lower_bound: 1,
1671                        length: arg_type_ids.len(),
1672                    }],
1673                    arg_type_ids.iter().map(|id| Datum::String(id)),
1674                )
1675                .expect(
1676                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1677                );
1678            let arg_type_ids = row.unpack_first();
1679
1680            updates.push(BuiltinTableUpdate::row(
1681                &*MZ_FUNCTIONS,
1682                Row::pack_slice(&[
1683                    Datum::String(&id.to_string()),
1684                    Datum::UInt32(func_impl_details.oid),
1685                    Datum::String(&schema_id.to_string()),
1686                    Datum::String(name),
1687                    arg_type_ids,
1688                    Datum::from(
1689                        func_impl_details
1690                            .variadic_typ
1691                            .map(|typ| self.get_system_type(typ).id().to_string())
1692                            .as_deref(),
1693                    ),
1694                    Datum::from(
1695                        func_impl_details
1696                            .return_typ
1697                            .map(|typ| self.get_system_type(typ).id().to_string())
1698                            .as_deref(),
1699                    ),
1700                    func_impl_details.return_is_set.into(),
1701                    Datum::String(&owner_id.to_string()),
1702                ]),
1703                diff,
1704            ));
1705
1706            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1707                updates.push(BuiltinTableUpdate::row(
1708                    &*MZ_AGGREGATES,
1709                    Row::pack_slice(&[
1710                        Datum::UInt32(func_impl_details.oid),
1711                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1712                        Datum::String("n"),
1713                        Datum::Int16(0),
1714                    ]),
1715                    diff,
1716                ));
1717            }
1718        }
1719        updates
1720    }
1721
1722    pub fn pack_op_update(
1723        &self,
1724        operator: &str,
1725        func_impl_details: FuncImplCatalogDetails,
1726        diff: Diff,
1727    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1728        let arg_type_ids = func_impl_details
1729            .arg_typs
1730            .iter()
1731            .map(|typ| self.get_system_type(typ).id().to_string())
1732            .collect::<Vec<_>>();
1733
1734        let mut row = Row::default();
1735        row.packer()
1736            .try_push_array(
1737                &[ArrayDimension {
1738                    lower_bound: 1,
1739                    length: arg_type_ids.len(),
1740                }],
1741                arg_type_ids.iter().map(|id| Datum::String(id)),
1742            )
1743            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1744        let arg_type_ids = row.unpack_first();
1745
1746        BuiltinTableUpdate::row(
1747            &*MZ_OPERATORS,
1748            Row::pack_slice(&[
1749                Datum::UInt32(func_impl_details.oid),
1750                Datum::String(operator),
1751                arg_type_ids,
1752                Datum::from(
1753                    func_impl_details
1754                        .return_typ
1755                        .map(|typ| self.get_system_type(typ).id().to_string())
1756                        .as_deref(),
1757                ),
1758            ]),
1759            diff,
1760        )
1761    }
1762
1763    pub fn pack_audit_log_update(
1764        &self,
1765        event: &VersionedEvent,
1766        diff: Diff,
1767    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1768        let (event_type, object_type, details, user, occurred_at): (
1769            &EventType,
1770            &ObjectType,
1771            &EventDetails,
1772            &Option<String>,
1773            u64,
1774        ) = match event {
1775            VersionedEvent::V1(ev) => (
1776                &ev.event_type,
1777                &ev.object_type,
1778                &ev.details,
1779                &ev.user,
1780                ev.occurred_at,
1781            ),
1782        };
1783        let details = Jsonb::from_serde_json(details.as_json())
1784            .map_err(|e| {
1785                Error::new(ErrorKind::Unstructured(format!(
1786                    "could not pack audit log update: {}",
1787                    e
1788                )))
1789            })?
1790            .into_row();
1791        let details = details
1792            .iter()
1793            .next()
1794            .expect("details created above with a single jsonb column");
1795        let dt = mz_ore::now::to_datetime(occurred_at);
1796        let id = event.sortable_id();
1797        Ok(BuiltinTableUpdate::row(
1798            &*MZ_AUDIT_EVENTS,
1799            Row::pack_slice(&[
1800                Datum::UInt64(id),
1801                Datum::String(&format!("{}", event_type)),
1802                Datum::String(&format!("{}", object_type)),
1803                details,
1804                match user {
1805                    Some(user) => Datum::String(user),
1806                    None => Datum::Null,
1807                },
1808                Datum::TimestampTz(dt.try_into().expect("must fit")),
1809            ]),
1810            diff,
1811        ))
1812    }
1813
1814    pub fn pack_storage_usage_update(
1815        &self,
1816        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1817        diff: Diff,
1818    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1819        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1820        let row = Row::pack_slice(&[
1821            Datum::UInt64(event.id),
1822            Datum::from(event.shard_id.as_deref()),
1823            Datum::UInt64(event.size_bytes),
1824            Datum::TimestampTz(
1825                mz_ore::now::to_datetime(event.collection_timestamp)
1826                    .try_into()
1827                    .expect("must fit"),
1828            ),
1829        ]);
1830        BuiltinTableUpdate::row(id, row, diff)
1831    }
1832
1833    pub fn pack_egress_ip_update(
1834        &self,
1835        ip: &IpNet,
1836    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1837        let id = &MZ_EGRESS_IPS;
1838        let addr = ip.addr();
1839        let row = Row::pack_slice(&[
1840            Datum::String(&addr.to_string()),
1841            Datum::Int32(ip.prefix_len().into()),
1842            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1843        ]);
1844        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1845    }
1846
1847    pub fn pack_license_key_update(
1848        &self,
1849        license_key: &ValidatedLicenseKey,
1850    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1851        let id = &MZ_LICENSE_KEYS;
1852        let row = Row::pack_slice(&[
1853            Datum::String(&license_key.id),
1854            Datum::String(&license_key.organization),
1855            Datum::String(&license_key.environment_id),
1856            Datum::TimestampTz(
1857                mz_ore::now::to_datetime(license_key.expiration * 1000)
1858                    .try_into()
1859                    .expect("must fit"),
1860            ),
1861            Datum::TimestampTz(
1862                mz_ore::now::to_datetime(license_key.not_before * 1000)
1863                    .try_into()
1864                    .expect("must fit"),
1865            ),
1866        ]);
1867        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1868    }
1869
1870    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1871        let mut updates = Vec::new();
1872        for (size, alloc) in &self.cluster_replica_sizes.0 {
1873            if alloc.disabled {
1874                continue;
1875            }
1876
1877            // Just invent something when the limits are `None`, which only happens in non-prod
1878            // environments (tests, process orchestrator, etc.)
1879            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1880            let MemoryLimit(ByteSize(memory_bytes)) =
1881                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1882            let DiskLimit(ByteSize(disk_bytes)) =
1883                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1884
1885            let row = Row::pack_slice(&[
1886                size.as_str().into(),
1887                u64::cast_from(alloc.scale).into(),
1888                u64::cast_from(alloc.workers).into(),
1889                cpu_limit.as_nanocpus().into(),
1890                memory_bytes.into(),
1891                disk_bytes.into(),
1892                (alloc.credits_per_hour).into(),
1893            ]);
1894
1895            updates.push(BuiltinTableUpdate::row(
1896                &*MZ_CLUSTER_REPLICA_SIZES,
1897                row,
1898                Diff::ONE,
1899            ));
1900        }
1901
1902        updates
1903    }
1904
1905    pub fn pack_subscribe_update(
1906        &self,
1907        id: GlobalId,
1908        subscribe: &ActiveSubscribe,
1909        diff: Diff,
1910    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1911        let mut row = Row::default();
1912        let mut packer = row.packer();
1913        packer.push(Datum::String(&id.to_string()));
1914        packer.push(Datum::Uuid(subscribe.session_uuid));
1915        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1916
1917        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1918        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1919
1920        let depends_on: Vec<_> = subscribe
1921            .depends_on
1922            .iter()
1923            .map(|id| id.to_string())
1924            .collect();
1925        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1926
1927        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1928    }
1929
1930    pub fn pack_session_update(
1931        &self,
1932        conn: &ConnMeta,
1933        diff: Diff,
1934    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1935        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1936        BuiltinTableUpdate::row(
1937            &*MZ_SESSIONS,
1938            Row::pack_slice(&[
1939                Datum::Uuid(conn.uuid()),
1940                Datum::UInt32(conn.conn_id().unhandled()),
1941                Datum::String(&conn.authenticated_role_id().to_string()),
1942                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1943                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1944            ]),
1945            diff,
1946        )
1947    }
1948
1949    pub fn pack_default_privileges_update(
1950        &self,
1951        default_privilege_object: &DefaultPrivilegeObject,
1952        grantee: &RoleId,
1953        acl_mode: &AclMode,
1954        diff: Diff,
1955    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1956        BuiltinTableUpdate::row(
1957            &*MZ_DEFAULT_PRIVILEGES,
1958            Row::pack_slice(&[
1959                default_privilege_object.role_id.to_string().as_str().into(),
1960                default_privilege_object
1961                    .database_id
1962                    .map(|database_id| database_id.to_string())
1963                    .as_deref()
1964                    .into(),
1965                default_privilege_object
1966                    .schema_id
1967                    .map(|schema_id| schema_id.to_string())
1968                    .as_deref()
1969                    .into(),
1970                default_privilege_object
1971                    .object_type
1972                    .to_string()
1973                    .to_lowercase()
1974                    .as_str()
1975                    .into(),
1976                grantee.to_string().as_str().into(),
1977                acl_mode.to_string().as_str().into(),
1978            ]),
1979            diff,
1980        )
1981    }
1982
1983    pub fn pack_system_privileges_update(
1984        &self,
1985        privileges: MzAclItem,
1986        diff: Diff,
1987    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1988        BuiltinTableUpdate::row(
1989            &*MZ_SYSTEM_PRIVILEGES,
1990            Row::pack_slice(&[privileges.into()]),
1991            diff,
1992        )
1993    }
1994
1995    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1996        let mut row = Row::default();
1997        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1998        row.packer()
1999            .try_push_array(
2000                &[ArrayDimension {
2001                    lower_bound: 1,
2002                    length: flat_privileges.len(),
2003                }],
2004                flat_privileges
2005                    .into_iter()
2006                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2007            )
2008            .expect("privileges is 1 dimensional, and its length is used for the array length");
2009        row
2010    }
2011
2012    pub fn pack_comment_update(
2013        &self,
2014        object_id: CommentObjectId,
2015        column_pos: Option<usize>,
2016        comment: &str,
2017        diff: Diff,
2018    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2019        // Use the audit log representation so it's easier to join against.
2020        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2021        let audit_type = super::object_type_to_audit_object_type(object_type);
2022        let object_type_str = audit_type.to_string();
2023
2024        let object_id_str = match object_id {
2025            CommentObjectId::Table(global_id)
2026            | CommentObjectId::View(global_id)
2027            | CommentObjectId::MaterializedView(global_id)
2028            | CommentObjectId::Source(global_id)
2029            | CommentObjectId::Sink(global_id)
2030            | CommentObjectId::Index(global_id)
2031            | CommentObjectId::Func(global_id)
2032            | CommentObjectId::Connection(global_id)
2033            | CommentObjectId::Secret(global_id)
2034            | CommentObjectId::Type(global_id)
2035            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2036            CommentObjectId::Role(role_id) => role_id.to_string(),
2037            CommentObjectId::Database(database_id) => database_id.to_string(),
2038            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2039            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2040            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2041            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2042        };
2043        let column_pos_datum = match column_pos {
2044            Some(pos) => {
2045                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2046                let pos =
2047                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2048                Datum::Int32(pos)
2049            }
2050            None => Datum::Null,
2051        };
2052
2053        BuiltinTableUpdate::row(
2054            &*MZ_COMMENTS,
2055            Row::pack_slice(&[
2056                Datum::String(&object_id_str),
2057                Datum::String(&object_type_str),
2058                column_pos_datum,
2059                Datum::String(comment),
2060            ]),
2061            diff,
2062        )
2063    }
2064
2065    pub fn pack_webhook_source_update(
2066        &self,
2067        item_id: CatalogItemId,
2068        diff: Diff,
2069    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2070        let url = self
2071            .try_get_webhook_url(&item_id)
2072            .expect("webhook source should exist");
2073        let url = url.to_string();
2074        let name = &self.get_entry(&item_id).name().item;
2075        let id_str = item_id.to_string();
2076
2077        BuiltinTableUpdate::row(
2078            &*MZ_WEBHOOKS_SOURCES,
2079            Row::pack_slice(&[
2080                Datum::String(&id_str),
2081                Datum::String(name),
2082                Datum::String(&url),
2083            ]),
2084            diff,
2085        )
2086    }
2087
2088    pub fn pack_source_references_update(
2089        &self,
2090        source_references: &SourceReferences,
2091        diff: Diff,
2092    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2093        let source_id = source_references.source_id.to_string();
2094        let updated_at = &source_references.updated_at;
2095        source_references
2096            .references
2097            .iter()
2098            .map(|reference| {
2099                let mut row = Row::default();
2100                let mut packer = row.packer();
2101                packer.extend([
2102                    Datum::String(&source_id),
2103                    reference
2104                        .namespace
2105                        .as_ref()
2106                        .map(|s| Datum::String(s))
2107                        .unwrap_or(Datum::Null),
2108                    Datum::String(&reference.name),
2109                    Datum::TimestampTz(
2110                        mz_ore::now::to_datetime(*updated_at)
2111                            .try_into()
2112                            .expect("must fit"),
2113                    ),
2114                ]);
2115                if reference.columns.len() > 0 {
2116                    packer
2117                        .try_push_array(
2118                            &[ArrayDimension {
2119                                lower_bound: 1,
2120                                length: reference.columns.len(),
2121                            }],
2122                            reference.columns.iter().map(|col| Datum::String(col)),
2123                        )
2124                        .expect(
2125                            "columns is 1 dimensional, and its length is used for the array length",
2126                        );
2127                } else {
2128                    packer.push(Datum::Null);
2129                }
2130
2131                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2132            })
2133            .collect()
2134    }
2135}