mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
16use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{
19    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
20    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
21    MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22    MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23    MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
24    MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
26    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
27    MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
28    MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
29    MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
30    MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31    MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32    MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33    MZ_WEBHOOKS_SOURCES,
34};
35use mz_catalog::config::AwsPrincipalContext;
36use mz_catalog::durable::SourceReferences;
37use mz_catalog::memory::error::{Error, ErrorKind};
38use mz_catalog::memory::objects::{
39    CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
40    Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
41};
42use mz_controller::clusters::{
43    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
44};
45use mz_controller_types::ClusterId;
46use mz_expr::MirScalarExpr;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
49use mz_ore::cast::CastFrom;
50use mz_ore::collections::CollectionExt;
51use mz_persist_client::batch::ProtoBatch;
52use mz_repr::adt::array::ArrayDimension;
53use mz_repr::adt::interval::Interval;
54use mz_repr::adt::jsonb::Jsonb;
55use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
56use mz_repr::adt::regex;
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::refresh_schedule::RefreshEvery;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
61use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
62use mz_sql::catalog::{
63    CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
64    TypeCategory,
65};
66use mz_sql::func::FuncImplCatalogDetails;
67use mz_sql::names::{
68    CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
69};
70use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
71use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
72use mz_sql::session::vars::SessionVars;
73use mz_sql_parser::ast::display::AstDisplay;
74use mz_storage_client::client::TableData;
75use mz_storage_types::connections::KafkaConnection;
76use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
77use mz_storage_types::connections::inline::ReferencedConnection;
78use mz_storage_types::connections::string_or_secret::StringOrSecret;
79use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
80use mz_storage_types::sources::{
81    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
82};
83use smallvec::smallvec;
84
85// DO NOT add any more imports from `crate` outside of `crate::catalog`.
86use crate::active_compute_sink::ActiveSubscribe;
87use crate::catalog::CatalogState;
88use crate::coord::ConnMeta;
89
90/// An update to a built-in table.
91#[derive(Debug, Clone)]
92pub struct BuiltinTableUpdate<T = CatalogItemId> {
93    /// The reference of the table to update.
94    pub id: T,
95    /// The data to put into the table.
96    pub data: TableData,
97}
98
99impl<T> BuiltinTableUpdate<T> {
100    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
101    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
102        BuiltinTableUpdate {
103            id,
104            data: TableData::Rows(vec![(row, diff)]),
105        }
106    }
107
108    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
109        BuiltinTableUpdate {
110            id,
111            data: TableData::Batches(smallvec![batch]),
112        }
113    }
114}
115
116impl CatalogState {
117    pub fn resolve_builtin_table_updates(
118        &self,
119        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
120    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
121        builtin_table_update
122            .into_iter()
123            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
124            .collect()
125    }
126
127    pub fn resolve_builtin_table_update(
128        &self,
129        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
130    ) -> BuiltinTableUpdate<CatalogItemId> {
131        let id = self.resolve_builtin_table(id);
132        BuiltinTableUpdate { id, data }
133    }
134
135    pub fn pack_depends_update(
136        &self,
137        depender: CatalogItemId,
138        dependee: CatalogItemId,
139        diff: Diff,
140    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
141        let row = Row::pack_slice(&[
142            Datum::String(&depender.to_string()),
143            Datum::String(&dependee.to_string()),
144        ]);
145        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
146    }
147
148    pub(super) fn pack_database_update(
149        &self,
150        database_id: &DatabaseId,
151        diff: Diff,
152    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
153        let database = self.get_database(database_id);
154        let row = self.pack_privilege_array_row(database.privileges());
155        let privileges = row.unpack_first();
156        BuiltinTableUpdate::row(
157            &*MZ_DATABASES,
158            Row::pack_slice(&[
159                Datum::String(&database.id.to_string()),
160                Datum::UInt32(database.oid),
161                Datum::String(database.name()),
162                Datum::String(&database.owner_id.to_string()),
163                privileges,
164            ]),
165            diff,
166        )
167    }
168
169    pub(super) fn pack_schema_update(
170        &self,
171        database_spec: &ResolvedDatabaseSpecifier,
172        schema_id: &SchemaId,
173        diff: Diff,
174    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
175        let (database_id, schema) = match database_spec {
176            ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
177            ResolvedDatabaseSpecifier::Id(id) => (
178                Some(id.to_string()),
179                &self.database_by_id[id].schemas_by_id[schema_id],
180            ),
181        };
182        let row = self.pack_privilege_array_row(schema.privileges());
183        let privileges = row.unpack_first();
184        BuiltinTableUpdate::row(
185            &*MZ_SCHEMAS,
186            Row::pack_slice(&[
187                Datum::String(&schema_id.to_string()),
188                Datum::UInt32(schema.oid),
189                Datum::from(database_id.as_deref()),
190                Datum::String(&schema.name.schema),
191                Datum::String(&schema.owner_id.to_string()),
192                privileges,
193            ]),
194            diff,
195        )
196    }
197
198    pub(super) fn pack_role_auth_update(
199        &self,
200        id: RoleId,
201        diff: Diff,
202    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
203        let role_auth = self.get_role_auth(&id);
204        let role = self.get_role(&id);
205        BuiltinTableUpdate::row(
206            &*MZ_ROLE_AUTH,
207            Row::pack_slice(&[
208                Datum::String(&role_auth.role_id.to_string()),
209                Datum::UInt32(role.oid),
210                match &role_auth.password_hash {
211                    Some(hash) => Datum::String(hash),
212                    None => Datum::Null,
213                },
214                Datum::TimestampTz(
215                    mz_ore::now::to_datetime(role_auth.updated_at)
216                        .try_into()
217                        .expect("must fit"),
218                ),
219            ]),
220            diff,
221        )
222    }
223
224    pub(super) fn pack_role_update(
225        &self,
226        id: RoleId,
227        diff: Diff,
228    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
229        match id {
230            // PUBLIC role should not show up in mz_roles.
231            RoleId::Public => vec![],
232            id => {
233                let role = self.get_role(&id);
234                let self_managed_auth_enabled = self
235                    .system_config()
236                    .get(ENABLE_PASSWORD_AUTH.name())
237                    .ok()
238                    .map(|v| v.value() == "on")
239                    .unwrap_or(false);
240
241                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
242
243                // For self managed auth, we can get the actual login and superuser bits
244                // directly from the role. For cloud auth, we have to do some heuristics.
245                // We determine login status each time a role logs in, so there's no clean
246                // way to accurately determine this in the catalog. Instead we do something
247                // a little gross. For system roles, we hardcode the known roles that can
248                // log in. For user roles, we determine `rolcanlogin` based on whether the
249                // role name looks like an email address.
250                //
251                // This works for the vast majority of cases in production. Roles that users
252                // log in to come from Frontegg and therefore *must* be valid email
253                // addresses, while roles that are created via `CREATE ROLE` (e.g.,
254                // `admin`, `prod_app`) almost certainly are not named to look like email
255                // addresses.
256                //
257                // For the moment, we're comfortable with the edge cases here. If we discover
258                // that folks are regularly creating non-login roles with names that look
259                // like an email address (e.g., `admins@sysops.foocorp`), we can change
260                // course.
261
262                let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
263                let matches = regex::Regex::new(cloud_login_regex, true)
264                    .expect("valid regex")
265                    .is_match(&role.name);
266
267                let rolcanlogin = if self_managed_auth_enabled {
268                    role.attributes.login.unwrap_or(false)
269                } else {
270                    builtin_supers.contains(&role.id) || matches
271                };
272
273                let rolsuper = if self_managed_auth_enabled {
274                    Datum::from(role.attributes.superuser.unwrap_or(false))
275                } else if builtin_supers.contains(&role.id) {
276                    Datum::from(true)
277                } else {
278                    Datum::Null
279                };
280
281                let role_update = BuiltinTableUpdate::row(
282                    &*MZ_ROLES,
283                    Row::pack_slice(&[
284                        Datum::String(&role.id.to_string()),
285                        Datum::UInt32(role.oid),
286                        Datum::String(&role.name),
287                        Datum::from(role.attributes.inherit),
288                        Datum::from(rolcanlogin),
289                        rolsuper,
290                    ]),
291                    diff,
292                );
293                let mut updates = vec![role_update];
294
295                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
296                // we should instead have a static list of all session vars.
297                let session_vars_reference = SessionVars::new_unchecked(
298                    &mz_build_info::DUMMY_BUILD_INFO,
299                    SYSTEM_USER.clone(),
300                    None,
301                );
302
303                for (name, val) in role.vars() {
304                    let result = session_vars_reference
305                        .inspect(name)
306                        .and_then(|var| var.check(val.borrow()));
307                    let Ok(formatted_val) = result else {
308                        // Note: all variables should have been validated by this point, so we
309                        // shouldn't ever hit this.
310                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
311                        continue;
312                    };
313
314                    let role_var_update = BuiltinTableUpdate::row(
315                        &*MZ_ROLE_PARAMETERS,
316                        Row::pack_slice(&[
317                            Datum::String(&role.id.to_string()),
318                            Datum::String(name),
319                            Datum::String(&formatted_val),
320                        ]),
321                        diff,
322                    );
323                    updates.push(role_var_update);
324                }
325
326                updates
327            }
328        }
329    }
330
331    pub(super) fn pack_role_members_update(
332        &self,
333        role_id: RoleId,
334        member_id: RoleId,
335        diff: Diff,
336    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
337        let grantor_id = self
338            .get_role(&member_id)
339            .membership
340            .map
341            .get(&role_id)
342            .expect("catalog out of sync");
343        BuiltinTableUpdate::row(
344            &*MZ_ROLE_MEMBERS,
345            Row::pack_slice(&[
346                Datum::String(&role_id.to_string()),
347                Datum::String(&member_id.to_string()),
348                Datum::String(&grantor_id.to_string()),
349            ]),
350            diff,
351        )
352    }
353
354    pub(super) fn pack_cluster_update(
355        &self,
356        name: &str,
357        diff: Diff,
358    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
359        let id = self.clusters_by_name[name];
360        let cluster = &self.clusters_by_id[&id];
361        let row = self.pack_privilege_array_row(cluster.privileges());
362        let privileges = row.unpack_first();
363        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
364            match &cluster.config.variant {
365                ClusterVariant::Managed(config) => (
366                    Some(config.size.as_str()),
367                    Some(self.cluster_replica_size_has_disk(&config.size)),
368                    Some(config.replication_factor),
369                    if config.availability_zones.is_empty() {
370                        None
371                    } else {
372                        Some(config.availability_zones.clone())
373                    },
374                    Some(config.logging.log_logging),
375                    config.logging.interval.map(|d| {
376                        Interval::from_duration(&d)
377                            .expect("planning ensured this convertible back to interval")
378                    }),
379                ),
380                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
381            };
382
383        let mut row = Row::default();
384        let mut packer = row.packer();
385        packer.extend([
386            Datum::String(&id.to_string()),
387            Datum::String(name),
388            Datum::String(&cluster.owner_id.to_string()),
389            privileges,
390            cluster.is_managed().into(),
391            size.into(),
392            replication_factor.into(),
393            disk.into(),
394        ]);
395        if let Some(azs) = azs {
396            packer.push_list(azs.iter().map(|az| Datum::String(az)));
397        } else {
398            packer.push(Datum::Null);
399        }
400        packer.push(Datum::from(introspection_debugging));
401        packer.push(Datum::from(introspection_interval));
402
403        let mut updates = Vec::new();
404
405        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
406
407        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
408            let row = match managed_config.schedule {
409                ClusterSchedule::Manual => Row::pack_slice(&[
410                    Datum::String(&id.to_string()),
411                    Datum::String("manual"),
412                    Datum::Null,
413                ]),
414                ClusterSchedule::Refresh {
415                    hydration_time_estimate,
416                } => Row::pack_slice(&[
417                    Datum::String(&id.to_string()),
418                    Datum::String("on-refresh"),
419                    Datum::Interval(
420                        Interval::from_duration(&hydration_time_estimate)
421                            .expect("planning ensured that this is convertible back to Interval"),
422                    ),
423                ]),
424            };
425            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
426        }
427
428        updates.push(BuiltinTableUpdate::row(
429            &*MZ_CLUSTER_WORKLOAD_CLASSES,
430            Row::pack_slice(&[
431                Datum::String(&id.to_string()),
432                Datum::from(cluster.config.workload_class.as_deref()),
433            ]),
434            diff,
435        ));
436
437        updates
438    }
439
440    pub(super) fn pack_cluster_replica_update(
441        &self,
442        cluster_id: ClusterId,
443        name: &str,
444        diff: Diff,
445    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
446        let cluster = &self.clusters_by_id[&cluster_id];
447        let id = cluster.replica_id(name).expect("Must exist");
448        let replica = cluster.replica(id).expect("Must exist");
449
450        let (size, disk, az, internal, pending) = match &replica.config.location {
451            // TODO(guswynn): The column should be `availability_zones`, not
452            // `availability_zone`.
453            ReplicaLocation::Managed(ManagedReplicaLocation {
454                size,
455                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
456                allocation: _,
457                billed_as: _,
458                internal,
459                pending,
460            }) => (
461                Some(&**size),
462                Some(self.cluster_replica_size_has_disk(size)),
463                Some(az.as_str()),
464                *internal,
465                *pending,
466            ),
467            ReplicaLocation::Managed(ManagedReplicaLocation {
468                size,
469                availability_zones: _,
470                allocation: _,
471                billed_as: _,
472                internal,
473                pending,
474            }) => (
475                Some(&**size),
476                Some(self.cluster_replica_size_has_disk(size)),
477                None,
478                *internal,
479                *pending,
480            ),
481            _ => (None, None, None, false, false),
482        };
483
484        let cluster_replica_update = BuiltinTableUpdate::row(
485            &*MZ_CLUSTER_REPLICAS,
486            Row::pack_slice(&[
487                Datum::String(&id.to_string()),
488                Datum::String(name),
489                Datum::String(&cluster_id.to_string()),
490                Datum::from(size),
491                Datum::from(az),
492                Datum::String(&replica.owner_id.to_string()),
493                Datum::from(disk),
494            ]),
495            diff,
496        );
497
498        let mut updates = vec![cluster_replica_update];
499
500        if internal {
501            let update = BuiltinTableUpdate::row(
502                &*MZ_INTERNAL_CLUSTER_REPLICAS,
503                Row::pack_slice(&[Datum::String(&id.to_string())]),
504                diff,
505            );
506            updates.push(update);
507        }
508
509        if pending {
510            let update = BuiltinTableUpdate::row(
511                &*MZ_PENDING_CLUSTER_REPLICAS,
512                Row::pack_slice(&[Datum::String(&id.to_string())]),
513                diff,
514            );
515            updates.push(update);
516        }
517
518        updates
519    }
520
521    pub(crate) fn pack_network_policy_update(
522        &self,
523        policy_id: &NetworkPolicyId,
524        diff: Diff,
525    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
526        let policy = self.get_network_policy(policy_id);
527        let row = self.pack_privilege_array_row(&policy.privileges);
528        let privileges = row.unpack_first();
529        let mut updates = Vec::new();
530        for ref rule in policy.rules.clone() {
531            updates.push(BuiltinTableUpdate::row(
532                &*MZ_NETWORK_POLICY_RULES,
533                Row::pack_slice(&[
534                    Datum::String(&rule.name),
535                    Datum::String(&policy.id.to_string()),
536                    Datum::String(&rule.action.to_string()),
537                    Datum::String(&rule.address.to_string()),
538                    Datum::String(&rule.direction.to_string()),
539                ]),
540                diff,
541            ));
542        }
543        updates.push(BuiltinTableUpdate::row(
544            &*MZ_NETWORK_POLICIES,
545            Row::pack_slice(&[
546                Datum::String(&policy.id.to_string()),
547                Datum::String(&policy.name),
548                Datum::String(&policy.owner_id.to_string()),
549                privileges,
550                Datum::UInt32(policy.oid.clone()),
551            ]),
552            diff,
553        ));
554
555        Ok(updates)
556    }
557
558    pub(super) fn pack_item_update(
559        &self,
560        id: CatalogItemId,
561        diff: Diff,
562    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
563        let entry = self.get_entry(&id);
564        let oid = entry.oid();
565        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
566        let schema_id = &self
567            .get_schema(
568                &entry.name().qualifiers.database_spec,
569                &entry.name().qualifiers.schema_spec,
570                conn_id,
571            )
572            .id;
573        let name = &entry.name().item;
574        let owner_id = entry.owner_id();
575        let privileges_row = self.pack_privilege_array_row(entry.privileges());
576        let privileges = privileges_row.unpack_first();
577        let mut updates = match entry.item() {
578            CatalogItem::Log(_) => self.pack_source_update(
579                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
580                privileges, diff, None,
581            ),
582            CatalogItem::Index(index) => {
583                self.pack_index_update(id, oid, name, owner_id, index, diff)
584            }
585            CatalogItem::Table(table) => {
586                let mut updates = self
587                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
588
589                if let TableDataSource::DataSource {
590                    desc: data_source,
591                    timeline: _,
592                } = &table.data_source
593                {
594                    updates.extend(match data_source {
595                        DataSourceDesc::IngestionExport {
596                            ingestion_id,
597                            external_reference: UnresolvedItemName(external_reference),
598                            details: _,
599                            data_config: _,
600                        } => {
601                            let ingestion_entry = self
602                                .get_entry(ingestion_id)
603                                .source_desc()
604                                .expect("primary source exists")
605                                .expect("primary source is a source");
606
607                            match ingestion_entry.connection.name() {
608                                "postgres" => {
609                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
610                                    // The left-most qualification of Postgres
611                                    // tables is the database, but this
612                                    // information is redundant because each
613                                    // Postgres connection connects to only one
614                                    // database.
615                                    let schema_name = external_reference[1].to_ast_string_simple();
616                                    let table_name = external_reference[2].to_ast_string_simple();
617
618                                    self.pack_postgres_source_tables_update(
619                                        id,
620                                        &schema_name,
621                                        &table_name,
622                                        diff,
623                                    )
624                                }
625                                "mysql" => {
626                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
627                                    let schema_name = external_reference[0].to_ast_string_simple();
628                                    let table_name = external_reference[1].to_ast_string_simple();
629
630                                    self.pack_mysql_source_tables_update(
631                                        id,
632                                        &schema_name,
633                                        &table_name,
634                                        diff,
635                                    )
636                                }
637                                "sql-server" => {
638                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
639                                    // The left-most qualification of SQL Server tables is
640                                    // the database, but this information is redundant
641                                    // because each SQL Server connection connects to
642                                    // only one database.
643                                    let schema_name = external_reference[1].to_ast_string_simple();
644                                    let table_name = external_reference[2].to_ast_string_simple();
645
646                                    self.pack_sql_server_source_table_update(
647                                        id,
648                                        &schema_name,
649                                        &table_name,
650                                        diff,
651                                    )
652                                }
653                                // Load generator sources don't have any special
654                                // updates.
655                                "load-generator" => vec![],
656                                "kafka" => {
657                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
658                                    let topic = external_reference[0].to_ast_string_simple();
659                                    let envelope = data_source.envelope();
660                                    let (key_format, value_format) = data_source.formats();
661
662                                    self.pack_kafka_source_tables_update(
663                                        id,
664                                        &topic,
665                                        envelope,
666                                        key_format,
667                                        value_format,
668                                        diff,
669                                    )
670                                }
671                                s => unreachable!("{s} sources do not have tables"),
672                            }
673                        }
674                        _ => vec![],
675                    });
676                }
677
678                updates
679            }
680            CatalogItem::Source(source) => {
681                let source_type = source.source_type();
682                let connection_id = source.connection_id();
683                let envelope = source.data_source.envelope();
684                let cluster_entry = match source.data_source {
685                    // Ingestion exports don't have their own cluster, but
686                    // run on their ingestion's cluster.
687                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
688                        self.get_entry(&ingestion_id)
689                    }
690                    _ => entry,
691                };
692
693                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
694
695                let (key_format, value_format) = source.data_source.formats();
696
697                let mut updates = self.pack_source_update(
698                    id,
699                    oid,
700                    schema_id,
701                    name,
702                    source_type,
703                    connection_id,
704                    envelope,
705                    key_format,
706                    value_format,
707                    cluster_id.as_deref(),
708                    owner_id,
709                    privileges,
710                    diff,
711                    source.create_sql.as_ref(),
712                );
713
714                updates.extend(match &source.data_source {
715                    DataSourceDesc::Ingestion { desc, .. }
716                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
717                        GenericSourceConnection::Postgres(postgres) => {
718                            self.pack_postgres_source_update(id, postgres, diff)
719                        }
720                        GenericSourceConnection::Kafka(kafka) => {
721                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
722                        }
723                        _ => vec![],
724                    },
725                    DataSourceDesc::IngestionExport {
726                        ingestion_id,
727                        external_reference: UnresolvedItemName(external_reference),
728                        details: _,
729                        data_config: _,
730                    } => {
731                        let ingestion_entry = self
732                            .get_entry(ingestion_id)
733                            .source_desc()
734                            .expect("primary source exists")
735                            .expect("primary source is a source");
736
737                        match ingestion_entry.connection.name() {
738                            "postgres" => {
739                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
740                                // The left-most qualification of Postgres
741                                // tables is the database, but this
742                                // information is redundant because each
743                                // Postgres connection connects to only one
744                                // database.
745                                let schema_name = external_reference[1].to_ast_string_simple();
746                                let table_name = external_reference[2].to_ast_string_simple();
747
748                                self.pack_postgres_source_tables_update(
749                                    id,
750                                    &schema_name,
751                                    &table_name,
752                                    diff,
753                                )
754                            }
755                            "mysql" => {
756                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
757                                let schema_name = external_reference[0].to_ast_string_simple();
758                                let table_name = external_reference[1].to_ast_string_simple();
759
760                                self.pack_mysql_source_tables_update(
761                                    id,
762                                    &schema_name,
763                                    &table_name,
764                                    diff,
765                                )
766                            }
767                            "sql-server" => {
768                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
769                                // The left-most qualification of SQL Server tables is
770                                // the database, but this information is redundant
771                                // because each SQL Server connection connects to
772                                // only one database.
773                                let schema_name = external_reference[1].to_ast_string_simple();
774                                let table_name = external_reference[2].to_ast_string_simple();
775
776                                self.pack_sql_server_source_table_update(
777                                    id,
778                                    &schema_name,
779                                    &table_name,
780                                    diff,
781                                )
782                            }
783                            // Load generator sources don't have any special
784                            // updates.
785                            "load-generator" => vec![],
786                            s => unreachable!("{s} sources do not have subsources"),
787                        }
788                    }
789                    DataSourceDesc::Webhook { .. } => {
790                        vec![self.pack_webhook_source_update(id, diff)]
791                    }
792                    _ => vec![],
793                });
794
795                updates
796            }
797            CatalogItem::View(view) => {
798                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
799            }
800            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
801                id, oid, schema_id, name, owner_id, privileges, mview, diff,
802            ),
803            CatalogItem::Sink(sink) => {
804                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
805            }
806            CatalogItem::Type(ty) => {
807                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
808            }
809            CatalogItem::Func(func) => {
810                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
811            }
812            CatalogItem::Secret(_) => {
813                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
814            }
815            CatalogItem::Connection(connection) => self.pack_connection_update(
816                id, oid, schema_id, name, owner_id, privileges, connection, diff,
817            ),
818            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
819                id, oid, schema_id, name, owner_id, privileges, ct, diff,
820            ),
821        };
822
823        if !entry.item().is_temporary() {
824            // Populate or clean up the `mz_object_dependencies` table.
825            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
826            for dependee in entry.item().references().items() {
827                updates.push(self.pack_depends_update(id, *dependee, diff))
828            }
829        }
830
831        // Always report the latest for an objects columns.
832        if let Some(desc) = entry.relation_desc_latest() {
833            let defaults = match entry.item() {
834                CatalogItem::Table(Table {
835                    data_source: TableDataSource::TableWrites { defaults },
836                    ..
837                }) => Some(defaults),
838                _ => None,
839            };
840            for (i, (column_name, column_type)) in desc.iter().enumerate() {
841                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
842                let default: Datum = default
843                    .as_ref()
844                    .map(|d| Datum::String(d))
845                    .unwrap_or(Datum::Null);
846                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
847                let (type_name, type_oid) = match &column_type.scalar_type {
848                    SqlScalarType::List {
849                        custom_id: Some(custom_id),
850                        ..
851                    }
852                    | SqlScalarType::Map {
853                        custom_id: Some(custom_id),
854                        ..
855                    }
856                    | SqlScalarType::Record {
857                        custom_id: Some(custom_id),
858                        ..
859                    } => {
860                        let entry = self.get_entry(custom_id);
861                        // NOTE(benesch): the `mz_columns.type text` field is
862                        // wrong. Types do not have a name that can be
863                        // represented as a single textual field. There can be
864                        // multiple types with the same name in different
865                        // schemas and databases. We should eventually deprecate
866                        // the `type` field in favor of a new `type_id` field
867                        // that can be joined against `mz_types`.
868                        //
869                        // For now, in the interest of pragmatism, we just use
870                        // the type's item name, and accept that there may be
871                        // ambiguity if the same type name is used in multiple
872                        // schemas. The ambiguity is mitigated by the OID, which
873                        // can be joined against `mz_types.oid` to resolve the
874                        // ambiguity.
875                        let name = &*entry.name().item;
876                        let oid = entry.oid();
877                        (name, oid)
878                    }
879                    _ => (pgtype.name(), pgtype.oid()),
880                };
881                updates.push(BuiltinTableUpdate::row(
882                    &*MZ_COLUMNS,
883                    Row::pack_slice(&[
884                        Datum::String(&id.to_string()),
885                        Datum::String(column_name),
886                        Datum::UInt64(u64::cast_from(i + 1)),
887                        Datum::from(column_type.nullable),
888                        Datum::String(type_name),
889                        default,
890                        Datum::UInt32(type_oid),
891                        Datum::Int32(pgtype.typmod()),
892                    ]),
893                    diff,
894                ));
895            }
896        }
897
898        // Use initial lcw so that we can tell apart default from non-existent windows.
899        if let Some(cw) = entry.item().initial_logical_compaction_window() {
900            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
901        }
902
903        updates.extend(Self::pack_item_global_id_update(entry, diff));
904
905        updates
906    }
907
908    fn pack_item_global_id_update(
909        entry: &CatalogEntry,
910        diff: Diff,
911    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
912        let id = entry.id().to_string();
913        let global_ids = entry.global_ids();
914        global_ids.map(move |global_id| {
915            BuiltinTableUpdate::row(
916                &*MZ_OBJECT_GLOBAL_IDS,
917                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
918                diff,
919            )
920        })
921    }
922
923    fn pack_history_retention_strategy_update(
924        &self,
925        id: CatalogItemId,
926        cw: CompactionWindow,
927        diff: Diff,
928    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
929        let cw: u64 = cw.comparable_timestamp().into();
930        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
931            .expect("must serialize");
932        BuiltinTableUpdate::row(
933            &*MZ_HISTORY_RETENTION_STRATEGIES,
934            Row::pack_slice(&[
935                Datum::String(&id.to_string()),
936                // FOR is the only strategy at the moment. We may introduce FROM or others later.
937                Datum::String("FOR"),
938                cw.into_row().into_element(),
939            ]),
940            diff,
941        )
942    }
943
944    fn pack_table_update(
945        &self,
946        id: CatalogItemId,
947        oid: u32,
948        schema_id: &SchemaSpecifier,
949        name: &str,
950        owner_id: &RoleId,
951        privileges: Datum,
952        diff: Diff,
953        table: &Table,
954    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
955        let redacted = table.create_sql.as_ref().map(|create_sql| {
956            mz_sql::parse::parse(create_sql)
957                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
958                .into_element()
959                .ast
960                .to_ast_string_redacted()
961        });
962        let source_id = if let TableDataSource::DataSource {
963            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
964            ..
965        } = &table.data_source
966        {
967            Some(ingestion_id.to_string())
968        } else {
969            None
970        };
971
972        vec![BuiltinTableUpdate::row(
973            &*MZ_TABLES,
974            Row::pack_slice(&[
975                Datum::String(&id.to_string()),
976                Datum::UInt32(oid),
977                Datum::String(&schema_id.to_string()),
978                Datum::String(name),
979                Datum::String(&owner_id.to_string()),
980                privileges,
981                if let Some(create_sql) = &table.create_sql {
982                    Datum::String(create_sql)
983                } else {
984                    Datum::Null
985                },
986                if let Some(redacted) = &redacted {
987                    Datum::String(redacted)
988                } else {
989                    Datum::Null
990                },
991                if let Some(source_id) = source_id.as_ref() {
992                    Datum::String(source_id)
993                } else {
994                    Datum::Null
995                },
996            ]),
997            diff,
998        )]
999    }
1000
1001    fn pack_source_update(
1002        &self,
1003        id: CatalogItemId,
1004        oid: u32,
1005        schema_id: &SchemaSpecifier,
1006        name: &str,
1007        source_desc_name: &str,
1008        connection_id: Option<CatalogItemId>,
1009        envelope: Option<&str>,
1010        key_format: Option<&str>,
1011        value_format: Option<&str>,
1012        cluster_id: Option<&str>,
1013        owner_id: &RoleId,
1014        privileges: Datum,
1015        diff: Diff,
1016        create_sql: Option<&String>,
1017    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1018        let redacted = create_sql.map(|create_sql| {
1019            let create_stmt = mz_sql::parse::parse(create_sql)
1020                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1021                .into_element()
1022                .ast;
1023            create_stmt.to_ast_string_redacted()
1024        });
1025        vec![BuiltinTableUpdate::row(
1026            &*MZ_SOURCES,
1027            Row::pack_slice(&[
1028                Datum::String(&id.to_string()),
1029                Datum::UInt32(oid),
1030                Datum::String(&schema_id.to_string()),
1031                Datum::String(name),
1032                Datum::String(source_desc_name),
1033                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1034                // This is the "source size", which is a remnant from linked
1035                // clusters.
1036                Datum::Null,
1037                Datum::from(envelope),
1038                Datum::from(key_format),
1039                Datum::from(value_format),
1040                Datum::from(cluster_id),
1041                Datum::String(&owner_id.to_string()),
1042                privileges,
1043                if let Some(create_sql) = create_sql {
1044                    Datum::String(create_sql)
1045                } else {
1046                    Datum::Null
1047                },
1048                if let Some(redacted) = &redacted {
1049                    Datum::String(redacted)
1050                } else {
1051                    Datum::Null
1052                },
1053            ]),
1054            diff,
1055        )]
1056    }
1057
1058    fn pack_postgres_source_update(
1059        &self,
1060        id: CatalogItemId,
1061        postgres: &PostgresSourceConnection<ReferencedConnection>,
1062        diff: Diff,
1063    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1064        vec![BuiltinTableUpdate::row(
1065            &*MZ_POSTGRES_SOURCES,
1066            Row::pack_slice(&[
1067                Datum::String(&id.to_string()),
1068                Datum::String(&postgres.publication_details.slot),
1069                Datum::from(postgres.publication_details.timeline_id),
1070            ]),
1071            diff,
1072        )]
1073    }
1074
1075    fn pack_kafka_source_update(
1076        &self,
1077        item_id: CatalogItemId,
1078        collection_id: GlobalId,
1079        kafka: &KafkaSourceConnection<ReferencedConnection>,
1080        diff: Diff,
1081    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1082        vec![BuiltinTableUpdate::row(
1083            &*MZ_KAFKA_SOURCES,
1084            Row::pack_slice(&[
1085                Datum::String(&item_id.to_string()),
1086                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1087                Datum::String(&kafka.topic),
1088            ]),
1089            diff,
1090        )]
1091    }
1092
1093    fn pack_postgres_source_tables_update(
1094        &self,
1095        id: CatalogItemId,
1096        schema_name: &str,
1097        table_name: &str,
1098        diff: Diff,
1099    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1100        vec![BuiltinTableUpdate::row(
1101            &*MZ_POSTGRES_SOURCE_TABLES,
1102            Row::pack_slice(&[
1103                Datum::String(&id.to_string()),
1104                Datum::String(schema_name),
1105                Datum::String(table_name),
1106            ]),
1107            diff,
1108        )]
1109    }
1110
1111    fn pack_mysql_source_tables_update(
1112        &self,
1113        id: CatalogItemId,
1114        schema_name: &str,
1115        table_name: &str,
1116        diff: Diff,
1117    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1118        vec![BuiltinTableUpdate::row(
1119            &*MZ_MYSQL_SOURCE_TABLES,
1120            Row::pack_slice(&[
1121                Datum::String(&id.to_string()),
1122                Datum::String(schema_name),
1123                Datum::String(table_name),
1124            ]),
1125            diff,
1126        )]
1127    }
1128
1129    fn pack_sql_server_source_table_update(
1130        &self,
1131        id: CatalogItemId,
1132        schema_name: &str,
1133        table_name: &str,
1134        diff: Diff,
1135    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1136        vec![BuiltinTableUpdate::row(
1137            &*MZ_SQL_SERVER_SOURCE_TABLES,
1138            Row::pack_slice(&[
1139                Datum::String(&id.to_string()),
1140                Datum::String(schema_name),
1141                Datum::String(table_name),
1142            ]),
1143            diff,
1144        )]
1145    }
1146
1147    fn pack_kafka_source_tables_update(
1148        &self,
1149        id: CatalogItemId,
1150        topic: &str,
1151        envelope: Option<&str>,
1152        key_format: Option<&str>,
1153        value_format: Option<&str>,
1154        diff: Diff,
1155    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1156        vec![BuiltinTableUpdate::row(
1157            &*MZ_KAFKA_SOURCE_TABLES,
1158            Row::pack_slice(&[
1159                Datum::String(&id.to_string()),
1160                Datum::String(topic),
1161                Datum::from(envelope),
1162                Datum::from(key_format),
1163                Datum::from(value_format),
1164            ]),
1165            diff,
1166        )]
1167    }
1168
1169    fn pack_connection_update(
1170        &self,
1171        id: CatalogItemId,
1172        oid: u32,
1173        schema_id: &SchemaSpecifier,
1174        name: &str,
1175        owner_id: &RoleId,
1176        privileges: Datum,
1177        connection: &Connection,
1178        diff: Diff,
1179    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1180        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1181            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1182            .into_element()
1183            .ast;
1184        let mut updates = vec![BuiltinTableUpdate::row(
1185            &*MZ_CONNECTIONS,
1186            Row::pack_slice(&[
1187                Datum::String(&id.to_string()),
1188                Datum::UInt32(oid),
1189                Datum::String(&schema_id.to_string()),
1190                Datum::String(name),
1191                Datum::String(match connection.details {
1192                    ConnectionDetails::Kafka { .. } => "kafka",
1193                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1194                    ConnectionDetails::Postgres { .. } => "postgres",
1195                    ConnectionDetails::Aws(..) => "aws",
1196                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1197                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1198                    ConnectionDetails::MySql { .. } => "mysql",
1199                    ConnectionDetails::SqlServer(_) => "sql-server",
1200                    ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1201                }),
1202                Datum::String(&owner_id.to_string()),
1203                privileges,
1204                Datum::String(&connection.create_sql),
1205                Datum::String(&create_stmt.to_ast_string_redacted()),
1206            ]),
1207            diff,
1208        )];
1209        match connection.details {
1210            ConnectionDetails::Kafka(ref kafka) => {
1211                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1212            }
1213            ConnectionDetails::Aws(ref aws_config) => {
1214                match self.pack_aws_connection_update(id, aws_config, diff) {
1215                    Ok(update) => {
1216                        updates.push(update);
1217                    }
1218                    Err(e) => {
1219                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1220                    }
1221                }
1222            }
1223            ConnectionDetails::AwsPrivatelink(_) => {
1224                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1225                    updates.push(self.pack_aws_privatelink_connection_update(
1226                        id,
1227                        aws_principal_context,
1228                        diff,
1229                    ));
1230                } else {
1231                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1232                }
1233            }
1234            ConnectionDetails::Ssh {
1235                ref key_1,
1236                ref key_2,
1237                ..
1238            } => {
1239                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1240            }
1241            ConnectionDetails::Csr(_)
1242            | ConnectionDetails::Postgres(_)
1243            | ConnectionDetails::MySql(_)
1244            | ConnectionDetails::SqlServer(_)
1245            | ConnectionDetails::IcebergCatalog(_) => (),
1246        };
1247        updates
1248    }
1249
1250    pub(crate) fn pack_ssh_tunnel_connection_update(
1251        &self,
1252        id: CatalogItemId,
1253        key_1: &SshKey,
1254        key_2: &SshKey,
1255        diff: Diff,
1256    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1257        BuiltinTableUpdate::row(
1258            &*MZ_SSH_TUNNEL_CONNECTIONS,
1259            Row::pack_slice(&[
1260                Datum::String(&id.to_string()),
1261                Datum::String(key_1.public_key().as_str()),
1262                Datum::String(key_2.public_key().as_str()),
1263            ]),
1264            diff,
1265        )
1266    }
1267
1268    fn pack_kafka_connection_update(
1269        &self,
1270        id: CatalogItemId,
1271        kafka: &KafkaConnection<ReferencedConnection>,
1272        diff: Diff,
1273    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1274        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1275        let mut row = Row::default();
1276        row.packer()
1277            .try_push_array(
1278                &[ArrayDimension {
1279                    lower_bound: 1,
1280                    length: kafka.brokers.len(),
1281                }],
1282                kafka
1283                    .brokers
1284                    .iter()
1285                    .map(|broker| Datum::String(&broker.address)),
1286            )
1287            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1288        let brokers = row.unpack_first();
1289        vec![BuiltinTableUpdate::row(
1290            &*MZ_KAFKA_CONNECTIONS,
1291            Row::pack_slice(&[
1292                Datum::String(&id.to_string()),
1293                brokers,
1294                Datum::String(&progress_topic),
1295            ]),
1296            diff,
1297        )]
1298    }
1299
1300    pub fn pack_aws_privatelink_connection_update(
1301        &self,
1302        connection_id: CatalogItemId,
1303        aws_principal_context: &AwsPrincipalContext,
1304        diff: Diff,
1305    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1306        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1307        let row = Row::pack_slice(&[
1308            Datum::String(&connection_id.to_string()),
1309            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1310        ]);
1311        BuiltinTableUpdate::row(id, row, diff)
1312    }
1313
1314    pub fn pack_aws_connection_update(
1315        &self,
1316        connection_id: CatalogItemId,
1317        aws_config: &AwsConnection,
1318        diff: Diff,
1319    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1320        let id = &MZ_AWS_CONNECTIONS;
1321
1322        let mut access_key_id = None;
1323        let mut access_key_id_secret_id = None;
1324        let mut secret_access_key_secret_id = None;
1325        let mut session_token = None;
1326        let mut session_token_secret_id = None;
1327        let mut assume_role_arn = None;
1328        let mut assume_role_session_name = None;
1329        let mut principal = None;
1330        let mut external_id = None;
1331        let mut example_trust_policy = None;
1332        match &aws_config.auth {
1333            AwsAuth::Credentials(credentials) => {
1334                match &credentials.access_key_id {
1335                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1336                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1337                }
1338                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1339                match credentials.session_token.as_ref() {
1340                    None => (),
1341                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1342                    Some(StringOrSecret::Secret(s)) => {
1343                        session_token_secret_id = Some(s.to_string())
1344                    }
1345                }
1346            }
1347            AwsAuth::AssumeRole(assume_role) => {
1348                assume_role_arn = Some(assume_role.arn.as_str());
1349                assume_role_session_name = assume_role.session_name.as_deref();
1350                principal = self
1351                    .config
1352                    .connection_context
1353                    .aws_connection_role_arn
1354                    .as_deref();
1355                external_id =
1356                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1357                example_trust_policy = {
1358                    let policy = assume_role
1359                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1360                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1361                    Some(policy.into_row())
1362                };
1363            }
1364        }
1365
1366        let row = Row::pack_slice(&[
1367            Datum::String(&connection_id.to_string()),
1368            Datum::from(aws_config.endpoint.as_deref()),
1369            Datum::from(aws_config.region.as_deref()),
1370            Datum::from(access_key_id),
1371            Datum::from(access_key_id_secret_id.as_deref()),
1372            Datum::from(secret_access_key_secret_id.as_deref()),
1373            Datum::from(session_token),
1374            Datum::from(session_token_secret_id.as_deref()),
1375            Datum::from(assume_role_arn),
1376            Datum::from(assume_role_session_name),
1377            Datum::from(principal),
1378            Datum::from(external_id.as_deref()),
1379            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1380        ]);
1381
1382        Ok(BuiltinTableUpdate::row(id, row, diff))
1383    }
1384
1385    fn pack_view_update(
1386        &self,
1387        id: CatalogItemId,
1388        oid: u32,
1389        schema_id: &SchemaSpecifier,
1390        name: &str,
1391        owner_id: &RoleId,
1392        privileges: Datum,
1393        view: &View,
1394        diff: Diff,
1395    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1396        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1397            .unwrap_or_else(|e| {
1398                panic!(
1399                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1400                    view.create_sql, e
1401                )
1402            })
1403            .into_element()
1404            .ast;
1405        let query = match &create_stmt {
1406            Statement::CreateView(stmt) => &stmt.definition.query,
1407            _ => unreachable!(),
1408        };
1409
1410        let mut query_string = query.to_ast_string_stable();
1411        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1412        // do the same for compatibility's sake.
1413        query_string.push(';');
1414
1415        vec![BuiltinTableUpdate::row(
1416            &*MZ_VIEWS,
1417            Row::pack_slice(&[
1418                Datum::String(&id.to_string()),
1419                Datum::UInt32(oid),
1420                Datum::String(&schema_id.to_string()),
1421                Datum::String(name),
1422                Datum::String(&query_string),
1423                Datum::String(&owner_id.to_string()),
1424                privileges,
1425                Datum::String(&view.create_sql),
1426                Datum::String(&create_stmt.to_ast_string_redacted()),
1427            ]),
1428            diff,
1429        )]
1430    }
1431
1432    fn pack_materialized_view_update(
1433        &self,
1434        id: CatalogItemId,
1435        oid: u32,
1436        schema_id: &SchemaSpecifier,
1437        name: &str,
1438        owner_id: &RoleId,
1439        privileges: Datum,
1440        mview: &MaterializedView,
1441        diff: Diff,
1442    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1443        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1444            .unwrap_or_else(|e| {
1445                panic!(
1446                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1447                    mview.create_sql, e
1448                )
1449            })
1450            .into_element()
1451            .ast;
1452        let query_string = match &create_stmt {
1453            Statement::CreateMaterializedView(stmt) => {
1454                let mut query_string = stmt.query.to_ast_string_stable();
1455                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1456                // do the same for compatibility's sake.
1457                query_string.push(';');
1458                query_string
1459            }
1460            _ => unreachable!(),
1461        };
1462
1463        let mut updates = Vec::new();
1464
1465        updates.push(BuiltinTableUpdate::row(
1466            &*MZ_MATERIALIZED_VIEWS,
1467            Row::pack_slice(&[
1468                Datum::String(&id.to_string()),
1469                Datum::UInt32(oid),
1470                Datum::String(&schema_id.to_string()),
1471                Datum::String(name),
1472                Datum::String(&mview.cluster_id.to_string()),
1473                Datum::String(&query_string),
1474                Datum::String(&owner_id.to_string()),
1475                privileges,
1476                Datum::String(&mview.create_sql),
1477                Datum::String(&create_stmt.to_ast_string_redacted()),
1478            ]),
1479            diff,
1480        ));
1481
1482        if let Some(refresh_schedule) = &mview.refresh_schedule {
1483            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1484            // empty `RefreshSchedule`.
1485            assert!(!refresh_schedule.is_empty());
1486            for RefreshEvery {
1487                interval,
1488                aligned_to,
1489            } in refresh_schedule.everies.iter()
1490            {
1491                let aligned_to_dt = mz_ore::now::to_datetime(
1492                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1493                );
1494                updates.push(BuiltinTableUpdate::row(
1495                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1496                    Row::pack_slice(&[
1497                        Datum::String(&id.to_string()),
1498                        Datum::String("every"),
1499                        Datum::Interval(
1500                            Interval::from_duration(interval).expect(
1501                                "planning ensured that this is convertible back to Interval",
1502                            ),
1503                        ),
1504                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1505                        Datum::Null,
1506                    ]),
1507                    diff,
1508                ));
1509            }
1510            for at in refresh_schedule.ats.iter() {
1511                let at_dt = mz_ore::now::to_datetime(
1512                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1513                );
1514                updates.push(BuiltinTableUpdate::row(
1515                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1516                    Row::pack_slice(&[
1517                        Datum::String(&id.to_string()),
1518                        Datum::String("at"),
1519                        Datum::Null,
1520                        Datum::Null,
1521                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1522                    ]),
1523                    diff,
1524                ));
1525            }
1526        } else {
1527            updates.push(BuiltinTableUpdate::row(
1528                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1529                Row::pack_slice(&[
1530                    Datum::String(&id.to_string()),
1531                    Datum::String("on-commit"),
1532                    Datum::Null,
1533                    Datum::Null,
1534                    Datum::Null,
1535                ]),
1536                diff,
1537            ));
1538        }
1539
1540        if let Some(target_id) = mview.replacement_target {
1541            updates.push(BuiltinTableUpdate::row(
1542                &*MZ_REPLACEMENTS,
1543                Row::pack_slice(&[
1544                    Datum::String(&id.to_string()),
1545                    Datum::String(&target_id.to_string()),
1546                ]),
1547                diff,
1548            ));
1549        }
1550
1551        updates
1552    }
1553
1554    fn pack_continual_task_update(
1555        &self,
1556        id: CatalogItemId,
1557        oid: u32,
1558        schema_id: &SchemaSpecifier,
1559        name: &str,
1560        owner_id: &RoleId,
1561        privileges: Datum,
1562        ct: &ContinualTask,
1563        diff: Diff,
1564    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1565        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1566            .unwrap_or_else(|e| {
1567                panic!(
1568                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1569                    ct.create_sql, e
1570                )
1571            })
1572            .into_element()
1573            .ast;
1574        let query_string = match &create_stmt {
1575            Statement::CreateContinualTask(stmt) => {
1576                let mut query_string = String::new();
1577                for stmt in &stmt.stmts {
1578                    let s = match stmt {
1579                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1580                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1581                    };
1582                    if query_string.is_empty() {
1583                        query_string = s;
1584                    } else {
1585                        query_string.push_str("; ");
1586                        query_string.push_str(&s);
1587                    }
1588                }
1589                query_string
1590            }
1591            _ => unreachable!(),
1592        };
1593
1594        vec![BuiltinTableUpdate::row(
1595            &*MZ_CONTINUAL_TASKS,
1596            Row::pack_slice(&[
1597                Datum::String(&id.to_string()),
1598                Datum::UInt32(oid),
1599                Datum::String(&schema_id.to_string()),
1600                Datum::String(name),
1601                Datum::String(&ct.cluster_id.to_string()),
1602                Datum::String(&query_string),
1603                Datum::String(&owner_id.to_string()),
1604                privileges,
1605                Datum::String(&ct.create_sql),
1606                Datum::String(&create_stmt.to_ast_string_redacted()),
1607            ]),
1608            diff,
1609        )]
1610    }
1611
1612    fn pack_sink_update(
1613        &self,
1614        id: CatalogItemId,
1615        oid: u32,
1616        schema_id: &SchemaSpecifier,
1617        name: &str,
1618        owner_id: &RoleId,
1619        sink: &Sink,
1620        diff: Diff,
1621    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1622        let mut updates = vec![];
1623        match &sink.connection {
1624            StorageSinkConnection::Kafka(KafkaSinkConnection {
1625                topic: topic_name, ..
1626            }) => {
1627                updates.push(BuiltinTableUpdate::row(
1628                    &*MZ_KAFKA_SINKS,
1629                    Row::pack_slice(&[
1630                        Datum::String(&id.to_string()),
1631                        Datum::String(topic_name.as_str()),
1632                    ]),
1633                    diff,
1634                ));
1635            }
1636            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1637                namespace, table, ..
1638            }) => {
1639                updates.push(BuiltinTableUpdate::row(
1640                    &*MZ_ICEBERG_SINKS,
1641                    Row::pack_slice(&[
1642                        Datum::String(&id.to_string()),
1643                        Datum::String(namespace.as_str()),
1644                        Datum::String(table.as_str()),
1645                    ]),
1646                    diff,
1647                ));
1648            }
1649        };
1650
1651        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1652            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1653            .into_element()
1654            .ast;
1655
1656        let envelope = sink.envelope();
1657
1658        // The combined format string is used for the deprecated `format` column.
1659        let combined_format = sink.combined_format();
1660        let (key_format, value_format) = match sink.formats() {
1661            Some((key_format, value_format)) => (key_format, Some(value_format)),
1662            None => (None, None),
1663        };
1664
1665        updates.push(BuiltinTableUpdate::row(
1666            &*MZ_SINKS,
1667            Row::pack_slice(&[
1668                Datum::String(&id.to_string()),
1669                Datum::UInt32(oid),
1670                Datum::String(&schema_id.to_string()),
1671                Datum::String(name),
1672                Datum::String(sink.connection.name()),
1673                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1674                // size column now deprecated w/o linked clusters
1675                Datum::Null,
1676                Datum::from(envelope),
1677                // FIXME: These key/value formats are kinda leaky! Should probably live in
1678                // the kafka sink table.
1679                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1680                Datum::from(key_format),
1681                Datum::from(value_format),
1682                Datum::String(&sink.cluster_id.to_string()),
1683                Datum::String(&owner_id.to_string()),
1684                Datum::String(&sink.create_sql),
1685                Datum::String(&create_stmt.to_ast_string_redacted()),
1686            ]),
1687            diff,
1688        ));
1689
1690        updates
1691    }
1692
1693    fn pack_index_update(
1694        &self,
1695        id: CatalogItemId,
1696        oid: u32,
1697        name: &str,
1698        owner_id: &RoleId,
1699        index: &Index,
1700        diff: Diff,
1701    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1702        let mut updates = vec![];
1703
1704        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1705            .unwrap_or_else(|e| {
1706                panic!(
1707                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1708                    index.create_sql, e
1709                )
1710            })
1711            .into_element()
1712            .ast;
1713
1714        let key_sqls = match &create_stmt {
1715            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1716                .as_ref()
1717                .expect("key_parts is filled in during planning"),
1718            _ => unreachable!(),
1719        };
1720        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1721
1722        updates.push(BuiltinTableUpdate::row(
1723            &*MZ_INDEXES,
1724            Row::pack_slice(&[
1725                Datum::String(&id.to_string()),
1726                Datum::UInt32(oid),
1727                Datum::String(name),
1728                Datum::String(&on_item_id.to_string()),
1729                Datum::String(&index.cluster_id.to_string()),
1730                Datum::String(&owner_id.to_string()),
1731                Datum::String(&index.create_sql),
1732                Datum::String(&create_stmt.to_ast_string_redacted()),
1733            ]),
1734            diff,
1735        ));
1736
1737        for (i, key) in index.keys.iter().enumerate() {
1738            let on_entry = self.get_entry_by_global_id(&index.on);
1739            let on_desc = on_entry
1740                .relation_desc()
1741                .expect("can only create indexes on items with a valid description");
1742            let nullable = key.typ(&on_desc.typ().column_types).nullable;
1743            let seq_in_index = u64::cast_from(i + 1);
1744            let key_sql = key_sqls
1745                .get(i)
1746                .expect("missing sql information for index key")
1747                .to_ast_string_simple();
1748            let (field_number, expression) = match key {
1749                MirScalarExpr::Column(col, _) => {
1750                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1751                }
1752                _ => (Datum::Null, Datum::String(&key_sql)),
1753            };
1754            updates.push(BuiltinTableUpdate::row(
1755                &*MZ_INDEX_COLUMNS,
1756                Row::pack_slice(&[
1757                    Datum::String(&id.to_string()),
1758                    Datum::UInt64(seq_in_index),
1759                    field_number,
1760                    expression,
1761                    Datum::from(nullable),
1762                ]),
1763                diff,
1764            ));
1765        }
1766
1767        updates
1768    }
1769
1770    fn pack_type_update(
1771        &self,
1772        id: CatalogItemId,
1773        oid: u32,
1774        schema_id: &SchemaSpecifier,
1775        name: &str,
1776        owner_id: &RoleId,
1777        privileges: Datum,
1778        typ: &Type,
1779        diff: Diff,
1780    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1781        let mut out = vec![];
1782
1783        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1784            mz_sql::parse::parse(create_sql)
1785                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1786                .into_element()
1787                .ast
1788                .to_ast_string_redacted()
1789        });
1790
1791        out.push(BuiltinTableUpdate::row(
1792            &*MZ_TYPES,
1793            Row::pack_slice(&[
1794                Datum::String(&id.to_string()),
1795                Datum::UInt32(oid),
1796                Datum::String(&schema_id.to_string()),
1797                Datum::String(name),
1798                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1799                Datum::String(&owner_id.to_string()),
1800                privileges,
1801                if let Some(create_sql) = &typ.create_sql {
1802                    Datum::String(create_sql)
1803                } else {
1804                    Datum::Null
1805                },
1806                if let Some(redacted) = &redacted {
1807                    Datum::String(redacted)
1808                } else {
1809                    Datum::Null
1810                },
1811            ]),
1812            diff,
1813        ));
1814
1815        let mut row = Row::default();
1816        let mut packer = row.packer();
1817
1818        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1819            if mods.is_empty() {
1820                packer.push(Datum::Null);
1821            } else {
1822                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1823            }
1824        }
1825
1826        let index_id = match &typ.details.typ {
1827            CatalogType::Array {
1828                element_reference: element_id,
1829            } => {
1830                packer.push(Datum::String(&id.to_string()));
1831                packer.push(Datum::String(&element_id.to_string()));
1832                &MZ_ARRAY_TYPES
1833            }
1834            CatalogType::List {
1835                element_reference: element_id,
1836                element_modifiers,
1837            } => {
1838                packer.push(Datum::String(&id.to_string()));
1839                packer.push(Datum::String(&element_id.to_string()));
1840                append_modifier(&mut packer, element_modifiers);
1841                &MZ_LIST_TYPES
1842            }
1843            CatalogType::Map {
1844                key_reference: key_id,
1845                value_reference: value_id,
1846                key_modifiers,
1847                value_modifiers,
1848            } => {
1849                packer.push(Datum::String(&id.to_string()));
1850                packer.push(Datum::String(&key_id.to_string()));
1851                packer.push(Datum::String(&value_id.to_string()));
1852                append_modifier(&mut packer, key_modifiers);
1853                append_modifier(&mut packer, value_modifiers);
1854                &MZ_MAP_TYPES
1855            }
1856            CatalogType::Pseudo => {
1857                packer.push(Datum::String(&id.to_string()));
1858                &MZ_PSEUDO_TYPES
1859            }
1860            _ => {
1861                packer.push(Datum::String(&id.to_string()));
1862                &MZ_BASE_TYPES
1863            }
1864        };
1865        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1866
1867        if let Some(pg_metadata) = &typ.details.pg_metadata {
1868            out.push(BuiltinTableUpdate::row(
1869                &*MZ_TYPE_PG_METADATA,
1870                Row::pack_slice(&[
1871                    Datum::String(&id.to_string()),
1872                    Datum::UInt32(pg_metadata.typinput_oid),
1873                    Datum::UInt32(pg_metadata.typreceive_oid),
1874                ]),
1875                diff,
1876            ));
1877        }
1878
1879        out
1880    }
1881
1882    fn pack_func_update(
1883        &self,
1884        id: CatalogItemId,
1885        schema_id: &SchemaSpecifier,
1886        name: &str,
1887        owner_id: &RoleId,
1888        func: &Func,
1889        diff: Diff,
1890    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1891        let mut updates = vec![];
1892        for func_impl_details in func.inner.func_impls() {
1893            let arg_type_ids = func_impl_details
1894                .arg_typs
1895                .iter()
1896                .map(|typ| self.get_system_type(typ).id().to_string())
1897                .collect::<Vec<_>>();
1898
1899            let mut row = Row::default();
1900            row.packer()
1901                .try_push_array(
1902                    &[ArrayDimension {
1903                        lower_bound: 1,
1904                        length: arg_type_ids.len(),
1905                    }],
1906                    arg_type_ids.iter().map(|id| Datum::String(id)),
1907                )
1908                .expect(
1909                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1910                );
1911            let arg_type_ids = row.unpack_first();
1912
1913            updates.push(BuiltinTableUpdate::row(
1914                &*MZ_FUNCTIONS,
1915                Row::pack_slice(&[
1916                    Datum::String(&id.to_string()),
1917                    Datum::UInt32(func_impl_details.oid),
1918                    Datum::String(&schema_id.to_string()),
1919                    Datum::String(name),
1920                    arg_type_ids,
1921                    Datum::from(
1922                        func_impl_details
1923                            .variadic_typ
1924                            .map(|typ| self.get_system_type(typ).id().to_string())
1925                            .as_deref(),
1926                    ),
1927                    Datum::from(
1928                        func_impl_details
1929                            .return_typ
1930                            .map(|typ| self.get_system_type(typ).id().to_string())
1931                            .as_deref(),
1932                    ),
1933                    func_impl_details.return_is_set.into(),
1934                    Datum::String(&owner_id.to_string()),
1935                ]),
1936                diff,
1937            ));
1938
1939            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1940                updates.push(BuiltinTableUpdate::row(
1941                    &*MZ_AGGREGATES,
1942                    Row::pack_slice(&[
1943                        Datum::UInt32(func_impl_details.oid),
1944                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1945                        Datum::String("n"),
1946                        Datum::Int16(0),
1947                    ]),
1948                    diff,
1949                ));
1950            }
1951        }
1952        updates
1953    }
1954
1955    pub fn pack_op_update(
1956        &self,
1957        operator: &str,
1958        func_impl_details: FuncImplCatalogDetails,
1959        diff: Diff,
1960    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1961        let arg_type_ids = func_impl_details
1962            .arg_typs
1963            .iter()
1964            .map(|typ| self.get_system_type(typ).id().to_string())
1965            .collect::<Vec<_>>();
1966
1967        let mut row = Row::default();
1968        row.packer()
1969            .try_push_array(
1970                &[ArrayDimension {
1971                    lower_bound: 1,
1972                    length: arg_type_ids.len(),
1973                }],
1974                arg_type_ids.iter().map(|id| Datum::String(id)),
1975            )
1976            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1977        let arg_type_ids = row.unpack_first();
1978
1979        BuiltinTableUpdate::row(
1980            &*MZ_OPERATORS,
1981            Row::pack_slice(&[
1982                Datum::UInt32(func_impl_details.oid),
1983                Datum::String(operator),
1984                arg_type_ids,
1985                Datum::from(
1986                    func_impl_details
1987                        .return_typ
1988                        .map(|typ| self.get_system_type(typ).id().to_string())
1989                        .as_deref(),
1990                ),
1991            ]),
1992            diff,
1993        )
1994    }
1995
1996    fn pack_secret_update(
1997        &self,
1998        id: CatalogItemId,
1999        oid: u32,
2000        schema_id: &SchemaSpecifier,
2001        name: &str,
2002        owner_id: &RoleId,
2003        privileges: Datum,
2004        diff: Diff,
2005    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2006        vec![BuiltinTableUpdate::row(
2007            &*MZ_SECRETS,
2008            Row::pack_slice(&[
2009                Datum::String(&id.to_string()),
2010                Datum::UInt32(oid),
2011                Datum::String(&schema_id.to_string()),
2012                Datum::String(name),
2013                Datum::String(&owner_id.to_string()),
2014                privileges,
2015            ]),
2016            diff,
2017        )]
2018    }
2019
2020    pub fn pack_audit_log_update(
2021        &self,
2022        event: &VersionedEvent,
2023        diff: Diff,
2024    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2025        let (event_type, object_type, details, user, occurred_at): (
2026            &EventType,
2027            &ObjectType,
2028            &EventDetails,
2029            &Option<String>,
2030            u64,
2031        ) = match event {
2032            VersionedEvent::V1(ev) => (
2033                &ev.event_type,
2034                &ev.object_type,
2035                &ev.details,
2036                &ev.user,
2037                ev.occurred_at,
2038            ),
2039        };
2040        let details = Jsonb::from_serde_json(details.as_json())
2041            .map_err(|e| {
2042                Error::new(ErrorKind::Unstructured(format!(
2043                    "could not pack audit log update: {}",
2044                    e
2045                )))
2046            })?
2047            .into_row();
2048        let details = details
2049            .iter()
2050            .next()
2051            .expect("details created above with a single jsonb column");
2052        let dt = mz_ore::now::to_datetime(occurred_at);
2053        let id = event.sortable_id();
2054        Ok(BuiltinTableUpdate::row(
2055            &*MZ_AUDIT_EVENTS,
2056            Row::pack_slice(&[
2057                Datum::UInt64(id),
2058                Datum::String(&format!("{}", event_type)),
2059                Datum::String(&format!("{}", object_type)),
2060                details,
2061                match user {
2062                    Some(user) => Datum::String(user),
2063                    None => Datum::Null,
2064                },
2065                Datum::TimestampTz(dt.try_into().expect("must fit")),
2066            ]),
2067            diff,
2068        ))
2069    }
2070
2071    pub fn pack_storage_usage_update(
2072        &self,
2073        VersionedStorageUsage::V1(event): VersionedStorageUsage,
2074        diff: Diff,
2075    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2076        let id = &MZ_STORAGE_USAGE_BY_SHARD;
2077        let row = Row::pack_slice(&[
2078            Datum::UInt64(event.id),
2079            Datum::from(event.shard_id.as_deref()),
2080            Datum::UInt64(event.size_bytes),
2081            Datum::TimestampTz(
2082                mz_ore::now::to_datetime(event.collection_timestamp)
2083                    .try_into()
2084                    .expect("must fit"),
2085            ),
2086        ]);
2087        BuiltinTableUpdate::row(id, row, diff)
2088    }
2089
2090    pub fn pack_egress_ip_update(
2091        &self,
2092        ip: &IpNet,
2093    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2094        let id = &MZ_EGRESS_IPS;
2095        let addr = ip.addr();
2096        let row = Row::pack_slice(&[
2097            Datum::String(&addr.to_string()),
2098            Datum::Int32(ip.prefix_len().into()),
2099            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2100        ]);
2101        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2102    }
2103
2104    pub fn pack_license_key_update(
2105        &self,
2106        license_key: &ValidatedLicenseKey,
2107    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2108        let id = &MZ_LICENSE_KEYS;
2109        let row = Row::pack_slice(&[
2110            Datum::String(&license_key.id),
2111            Datum::String(&license_key.organization),
2112            Datum::String(&license_key.environment_id),
2113            Datum::TimestampTz(
2114                mz_ore::now::to_datetime(license_key.expiration * 1000)
2115                    .try_into()
2116                    .expect("must fit"),
2117            ),
2118            Datum::TimestampTz(
2119                mz_ore::now::to_datetime(license_key.not_before * 1000)
2120                    .try_into()
2121                    .expect("must fit"),
2122            ),
2123        ]);
2124        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2125    }
2126
2127    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2128        let mut updates = Vec::new();
2129        for (size, alloc) in &self.cluster_replica_sizes.0 {
2130            if alloc.disabled {
2131                continue;
2132            }
2133
2134            // Just invent something when the limits are `None`, which only happens in non-prod
2135            // environments (tests, process orchestrator, etc.)
2136            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2137            let MemoryLimit(ByteSize(memory_bytes)) =
2138                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2139            let DiskLimit(ByteSize(disk_bytes)) =
2140                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2141
2142            let row = Row::pack_slice(&[
2143                size.as_str().into(),
2144                u64::cast_from(alloc.scale).into(),
2145                u64::cast_from(alloc.workers).into(),
2146                cpu_limit.as_nanocpus().into(),
2147                memory_bytes.into(),
2148                disk_bytes.into(),
2149                (alloc.credits_per_hour).into(),
2150            ]);
2151
2152            updates.push(BuiltinTableUpdate::row(
2153                &*MZ_CLUSTER_REPLICA_SIZES,
2154                row,
2155                Diff::ONE,
2156            ));
2157        }
2158
2159        updates
2160    }
2161
2162    pub fn pack_subscribe_update(
2163        &self,
2164        id: GlobalId,
2165        subscribe: &ActiveSubscribe,
2166        diff: Diff,
2167    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2168        let mut row = Row::default();
2169        let mut packer = row.packer();
2170        packer.push(Datum::String(&id.to_string()));
2171        packer.push(Datum::Uuid(subscribe.session_uuid));
2172        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2173
2174        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2175        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2176
2177        let depends_on: Vec<_> = subscribe
2178            .depends_on
2179            .iter()
2180            .map(|id| id.to_string())
2181            .collect();
2182        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2183
2184        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2185    }
2186
2187    pub fn pack_session_update(
2188        &self,
2189        conn: &ConnMeta,
2190        diff: Diff,
2191    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2192        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2193        BuiltinTableUpdate::row(
2194            &*MZ_SESSIONS,
2195            Row::pack_slice(&[
2196                Datum::Uuid(conn.uuid()),
2197                Datum::UInt32(conn.conn_id().unhandled()),
2198                Datum::String(&conn.authenticated_role_id().to_string()),
2199                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2200                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2201            ]),
2202            diff,
2203        )
2204    }
2205
2206    pub fn pack_default_privileges_update(
2207        &self,
2208        default_privilege_object: &DefaultPrivilegeObject,
2209        grantee: &RoleId,
2210        acl_mode: &AclMode,
2211        diff: Diff,
2212    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2213        BuiltinTableUpdate::row(
2214            &*MZ_DEFAULT_PRIVILEGES,
2215            Row::pack_slice(&[
2216                default_privilege_object.role_id.to_string().as_str().into(),
2217                default_privilege_object
2218                    .database_id
2219                    .map(|database_id| database_id.to_string())
2220                    .as_deref()
2221                    .into(),
2222                default_privilege_object
2223                    .schema_id
2224                    .map(|schema_id| schema_id.to_string())
2225                    .as_deref()
2226                    .into(),
2227                default_privilege_object
2228                    .object_type
2229                    .to_string()
2230                    .to_lowercase()
2231                    .as_str()
2232                    .into(),
2233                grantee.to_string().as_str().into(),
2234                acl_mode.to_string().as_str().into(),
2235            ]),
2236            diff,
2237        )
2238    }
2239
2240    pub fn pack_system_privileges_update(
2241        &self,
2242        privileges: MzAclItem,
2243        diff: Diff,
2244    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2245        BuiltinTableUpdate::row(
2246            &*MZ_SYSTEM_PRIVILEGES,
2247            Row::pack_slice(&[privileges.into()]),
2248            diff,
2249        )
2250    }
2251
2252    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2253        let mut row = Row::default();
2254        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2255        row.packer()
2256            .try_push_array(
2257                &[ArrayDimension {
2258                    lower_bound: 1,
2259                    length: flat_privileges.len(),
2260                }],
2261                flat_privileges
2262                    .into_iter()
2263                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2264            )
2265            .expect("privileges is 1 dimensional, and its length is used for the array length");
2266        row
2267    }
2268
2269    pub fn pack_comment_update(
2270        &self,
2271        object_id: CommentObjectId,
2272        column_pos: Option<usize>,
2273        comment: &str,
2274        diff: Diff,
2275    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2276        // Use the audit log representation so it's easier to join against.
2277        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2278        let audit_type = super::object_type_to_audit_object_type(object_type);
2279        let object_type_str = audit_type.to_string();
2280
2281        let object_id_str = match object_id {
2282            CommentObjectId::Table(global_id)
2283            | CommentObjectId::View(global_id)
2284            | CommentObjectId::MaterializedView(global_id)
2285            | CommentObjectId::Source(global_id)
2286            | CommentObjectId::Sink(global_id)
2287            | CommentObjectId::Index(global_id)
2288            | CommentObjectId::Func(global_id)
2289            | CommentObjectId::Connection(global_id)
2290            | CommentObjectId::Secret(global_id)
2291            | CommentObjectId::Type(global_id)
2292            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2293            CommentObjectId::Role(role_id) => role_id.to_string(),
2294            CommentObjectId::Database(database_id) => database_id.to_string(),
2295            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2296            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2297            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2298            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2299        };
2300        let column_pos_datum = match column_pos {
2301            Some(pos) => {
2302                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2303                let pos =
2304                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2305                Datum::Int32(pos)
2306            }
2307            None => Datum::Null,
2308        };
2309
2310        BuiltinTableUpdate::row(
2311            &*MZ_COMMENTS,
2312            Row::pack_slice(&[
2313                Datum::String(&object_id_str),
2314                Datum::String(&object_type_str),
2315                column_pos_datum,
2316                Datum::String(comment),
2317            ]),
2318            diff,
2319        )
2320    }
2321
2322    pub fn pack_webhook_source_update(
2323        &self,
2324        item_id: CatalogItemId,
2325        diff: Diff,
2326    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2327        let url = self
2328            .try_get_webhook_url(&item_id)
2329            .expect("webhook source should exist");
2330        let url = url.to_string();
2331        let name = &self.get_entry(&item_id).name().item;
2332        let id_str = item_id.to_string();
2333
2334        BuiltinTableUpdate::row(
2335            &*MZ_WEBHOOKS_SOURCES,
2336            Row::pack_slice(&[
2337                Datum::String(&id_str),
2338                Datum::String(name),
2339                Datum::String(&url),
2340            ]),
2341            diff,
2342        )
2343    }
2344
2345    pub fn pack_source_references_update(
2346        &self,
2347        source_references: &SourceReferences,
2348        diff: Diff,
2349    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2350        let source_id = source_references.source_id.to_string();
2351        let updated_at = &source_references.updated_at;
2352        source_references
2353            .references
2354            .iter()
2355            .map(|reference| {
2356                let mut row = Row::default();
2357                let mut packer = row.packer();
2358                packer.extend([
2359                    Datum::String(&source_id),
2360                    reference
2361                        .namespace
2362                        .as_ref()
2363                        .map(|s| Datum::String(s))
2364                        .unwrap_or(Datum::Null),
2365                    Datum::String(&reference.name),
2366                    Datum::TimestampTz(
2367                        mz_ore::now::to_datetime(*updated_at)
2368                            .try_into()
2369                            .expect("must fit"),
2370                    ),
2371                ]);
2372                if reference.columns.len() > 0 {
2373                    packer
2374                        .try_push_array(
2375                            &[ArrayDimension {
2376                                lower_bound: 1,
2377                                length: reference.columns.len(),
2378                            }],
2379                            reference.columns.iter().map(|col| Datum::String(col)),
2380                        )
2381                        .expect(
2382                            "columns is 1 dimensional, and its length is used for the array length",
2383                        );
2384                } else {
2385                    packer.push(Datum::Null);
2386                }
2387
2388                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2389            })
2390            .collect()
2391    }
2392}