mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
16use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{
19    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
20    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
21    MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22    MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23    MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
24    MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
26    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
27    MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
28    MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
29    MZ_PSEUDO_TYPES, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS,
30    MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31    MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32    MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33    MZ_WEBHOOKS_SOURCES,
34};
35use mz_catalog::config::AwsPrincipalContext;
36use mz_catalog::durable::SourceReferences;
37use mz_catalog::memory::error::{Error, ErrorKind};
38use mz_catalog::memory::objects::{
39    CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
40    Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
41};
42use mz_controller::clusters::{
43    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
44};
45use mz_controller_types::ClusterId;
46use mz_expr::MirScalarExpr;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
49use mz_ore::cast::CastFrom;
50use mz_ore::collections::CollectionExt;
51use mz_persist_client::batch::ProtoBatch;
52use mz_repr::adt::array::ArrayDimension;
53use mz_repr::adt::interval::Interval;
54use mz_repr::adt::jsonb::Jsonb;
55use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
56use mz_repr::adt::regex;
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::refresh_schedule::RefreshEvery;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
61use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
62use mz_sql::catalog::{
63    CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
64    TypeCategory,
65};
66use mz_sql::func::FuncImplCatalogDetails;
67use mz_sql::names::{
68    CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
69};
70use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
71use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
72use mz_sql::session::vars::SessionVars;
73use mz_sql_parser::ast::display::AstDisplay;
74use mz_storage_client::client::TableData;
75use mz_storage_types::connections::KafkaConnection;
76use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
77use mz_storage_types::connections::inline::ReferencedConnection;
78use mz_storage_types::connections::string_or_secret::StringOrSecret;
79use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
80use mz_storage_types::sources::{
81    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
82};
83use smallvec::smallvec;
84
85// DO NOT add any more imports from `crate` outside of `crate::catalog`.
86use crate::active_compute_sink::ActiveSubscribe;
87use crate::catalog::CatalogState;
88use crate::coord::ConnMeta;
89
90/// An update to a built-in table.
91#[derive(Debug, Clone)]
92pub struct BuiltinTableUpdate<T = CatalogItemId> {
93    /// The reference of the table to update.
94    pub id: T,
95    /// The data to put into the table.
96    pub data: TableData,
97}
98
99impl<T> BuiltinTableUpdate<T> {
100    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
101    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
102        BuiltinTableUpdate {
103            id,
104            data: TableData::Rows(vec![(row, diff)]),
105        }
106    }
107
108    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
109        BuiltinTableUpdate {
110            id,
111            data: TableData::Batches(smallvec![batch]),
112        }
113    }
114}
115
116impl CatalogState {
117    pub fn resolve_builtin_table_updates(
118        &self,
119        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
120    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
121        builtin_table_update
122            .into_iter()
123            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
124            .collect()
125    }
126
127    pub fn resolve_builtin_table_update(
128        &self,
129        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
130    ) -> BuiltinTableUpdate<CatalogItemId> {
131        let id = self.resolve_builtin_table(id);
132        BuiltinTableUpdate { id, data }
133    }
134
135    pub fn pack_depends_update(
136        &self,
137        depender: CatalogItemId,
138        dependee: CatalogItemId,
139        diff: Diff,
140    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
141        let row = Row::pack_slice(&[
142            Datum::String(&depender.to_string()),
143            Datum::String(&dependee.to_string()),
144        ]);
145        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
146    }
147
148    pub(super) fn pack_database_update(
149        &self,
150        database_id: &DatabaseId,
151        diff: Diff,
152    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
153        let database = self.get_database(database_id);
154        let row = self.pack_privilege_array_row(database.privileges());
155        let privileges = row.unpack_first();
156        BuiltinTableUpdate::row(
157            &*MZ_DATABASES,
158            Row::pack_slice(&[
159                Datum::String(&database.id.to_string()),
160                Datum::UInt32(database.oid),
161                Datum::String(database.name()),
162                Datum::String(&database.owner_id.to_string()),
163                privileges,
164            ]),
165            diff,
166        )
167    }
168
169    pub(super) fn pack_schema_update(
170        &self,
171        database_spec: &ResolvedDatabaseSpecifier,
172        schema_id: &SchemaId,
173        diff: Diff,
174    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
175        let (database_id, schema) = match database_spec {
176            ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
177            ResolvedDatabaseSpecifier::Id(id) => (
178                Some(id.to_string()),
179                &self.database_by_id[id].schemas_by_id[schema_id],
180            ),
181        };
182        let row = self.pack_privilege_array_row(schema.privileges());
183        let privileges = row.unpack_first();
184        BuiltinTableUpdate::row(
185            &*MZ_SCHEMAS,
186            Row::pack_slice(&[
187                Datum::String(&schema_id.to_string()),
188                Datum::UInt32(schema.oid),
189                Datum::from(database_id.as_deref()),
190                Datum::String(&schema.name.schema),
191                Datum::String(&schema.owner_id.to_string()),
192                privileges,
193            ]),
194            diff,
195        )
196    }
197
198    pub(super) fn pack_role_auth_update(
199        &self,
200        id: RoleId,
201        diff: Diff,
202    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
203        let role_auth = self.get_role_auth(&id);
204        let role = self.get_role(&id);
205        BuiltinTableUpdate::row(
206            &*MZ_ROLE_AUTH,
207            Row::pack_slice(&[
208                Datum::String(&role_auth.role_id.to_string()),
209                Datum::UInt32(role.oid),
210                match &role_auth.password_hash {
211                    Some(hash) => Datum::String(hash),
212                    None => Datum::Null,
213                },
214                Datum::TimestampTz(
215                    mz_ore::now::to_datetime(role_auth.updated_at)
216                        .try_into()
217                        .expect("must fit"),
218                ),
219            ]),
220            diff,
221        )
222    }
223
224    pub(super) fn pack_role_update(
225        &self,
226        id: RoleId,
227        diff: Diff,
228    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
229        match id {
230            // PUBLIC role should not show up in mz_roles.
231            RoleId::Public => vec![],
232            id => {
233                let role = self.get_role(&id);
234                let self_managed_auth_enabled = self
235                    .system_config()
236                    .get(ENABLE_PASSWORD_AUTH.name())
237                    .ok()
238                    .map(|v| v.value() == "on")
239                    .unwrap_or(false);
240
241                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
242
243                // For self managed auth, we can get the actual login and superuser bits
244                // directly from the role. For cloud auth, we have to do some heuristics.
245                // We determine login status each time a role logs in, so there's no clean
246                // way to accurately determine this in the catalog. Instead we do something
247                // a little gross. For system roles, we hardcode the known roles that can
248                // log in. For user roles, we determine `rolcanlogin` based on whether the
249                // role name looks like an email address.
250                //
251                // This works for the vast majority of cases in production. Roles that users
252                // log in to come from Frontegg and therefore *must* be valid email
253                // addresses, while roles that are created via `CREATE ROLE` (e.g.,
254                // `admin`, `prod_app`) almost certainly are not named to look like email
255                // addresses.
256                //
257                // For the moment, we're comfortable with the edge cases here. If we discover
258                // that folks are regularly creating non-login roles with names that look
259                // like an email address (e.g., `admins@sysops.foocorp`), we can change
260                // course.
261
262                let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
263                let matches = regex::Regex::new(cloud_login_regex, true)
264                    .expect("valid regex")
265                    .is_match(&role.name);
266
267                let rolcanlogin = if self_managed_auth_enabled {
268                    role.attributes.login.unwrap_or(false)
269                } else {
270                    builtin_supers.contains(&role.id) || matches
271                };
272
273                let rolsuper = if self_managed_auth_enabled {
274                    Datum::from(role.attributes.superuser.unwrap_or(false))
275                } else if builtin_supers.contains(&role.id) {
276                    Datum::from(true)
277                } else {
278                    Datum::Null
279                };
280
281                let role_update = BuiltinTableUpdate::row(
282                    &*MZ_ROLES,
283                    Row::pack_slice(&[
284                        Datum::String(&role.id.to_string()),
285                        Datum::UInt32(role.oid),
286                        Datum::String(&role.name),
287                        Datum::from(role.attributes.inherit),
288                        Datum::from(rolcanlogin),
289                        rolsuper,
290                    ]),
291                    diff,
292                );
293                let mut updates = vec![role_update];
294
295                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
296                // we should instead have a static list of all session vars.
297                let session_vars_reference = SessionVars::new_unchecked(
298                    &mz_build_info::DUMMY_BUILD_INFO,
299                    SYSTEM_USER.clone(),
300                    None,
301                );
302
303                for (name, val) in role.vars() {
304                    let result = session_vars_reference
305                        .inspect(name)
306                        .and_then(|var| var.check(val.borrow()));
307                    let Ok(formatted_val) = result else {
308                        // Note: all variables should have been validated by this point, so we
309                        // shouldn't ever hit this.
310                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
311                        continue;
312                    };
313
314                    let role_var_update = BuiltinTableUpdate::row(
315                        &*MZ_ROLE_PARAMETERS,
316                        Row::pack_slice(&[
317                            Datum::String(&role.id.to_string()),
318                            Datum::String(name),
319                            Datum::String(&formatted_val),
320                        ]),
321                        diff,
322                    );
323                    updates.push(role_var_update);
324                }
325
326                updates
327            }
328        }
329    }
330
331    pub(super) fn pack_role_members_update(
332        &self,
333        role_id: RoleId,
334        member_id: RoleId,
335        diff: Diff,
336    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
337        let grantor_id = self
338            .get_role(&member_id)
339            .membership
340            .map
341            .get(&role_id)
342            .expect("catalog out of sync");
343        BuiltinTableUpdate::row(
344            &*MZ_ROLE_MEMBERS,
345            Row::pack_slice(&[
346                Datum::String(&role_id.to_string()),
347                Datum::String(&member_id.to_string()),
348                Datum::String(&grantor_id.to_string()),
349            ]),
350            diff,
351        )
352    }
353
354    pub(super) fn pack_cluster_update(
355        &self,
356        name: &str,
357        diff: Diff,
358    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
359        let id = self.clusters_by_name[name];
360        let cluster = &self.clusters_by_id[&id];
361        let row = self.pack_privilege_array_row(cluster.privileges());
362        let privileges = row.unpack_first();
363        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
364            match &cluster.config.variant {
365                ClusterVariant::Managed(config) => (
366                    Some(config.size.as_str()),
367                    Some(self.cluster_replica_size_has_disk(&config.size)),
368                    Some(config.replication_factor),
369                    if config.availability_zones.is_empty() {
370                        None
371                    } else {
372                        Some(config.availability_zones.clone())
373                    },
374                    Some(config.logging.log_logging),
375                    config.logging.interval.map(|d| {
376                        Interval::from_duration(&d)
377                            .expect("planning ensured this convertible back to interval")
378                    }),
379                ),
380                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
381            };
382
383        let mut row = Row::default();
384        let mut packer = row.packer();
385        packer.extend([
386            Datum::String(&id.to_string()),
387            Datum::String(name),
388            Datum::String(&cluster.owner_id.to_string()),
389            privileges,
390            cluster.is_managed().into(),
391            size.into(),
392            replication_factor.into(),
393            disk.into(),
394        ]);
395        if let Some(azs) = azs {
396            packer.push_list(azs.iter().map(|az| Datum::String(az)));
397        } else {
398            packer.push(Datum::Null);
399        }
400        packer.push(Datum::from(introspection_debugging));
401        packer.push(Datum::from(introspection_interval));
402
403        let mut updates = Vec::new();
404
405        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
406
407        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
408            let row = match managed_config.schedule {
409                ClusterSchedule::Manual => Row::pack_slice(&[
410                    Datum::String(&id.to_string()),
411                    Datum::String("manual"),
412                    Datum::Null,
413                ]),
414                ClusterSchedule::Refresh {
415                    hydration_time_estimate,
416                } => Row::pack_slice(&[
417                    Datum::String(&id.to_string()),
418                    Datum::String("on-refresh"),
419                    Datum::Interval(
420                        Interval::from_duration(&hydration_time_estimate)
421                            .expect("planning ensured that this is convertible back to Interval"),
422                    ),
423                ]),
424            };
425            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
426        }
427
428        updates.push(BuiltinTableUpdate::row(
429            &*MZ_CLUSTER_WORKLOAD_CLASSES,
430            Row::pack_slice(&[
431                Datum::String(&id.to_string()),
432                Datum::from(cluster.config.workload_class.as_deref()),
433            ]),
434            diff,
435        ));
436
437        updates
438    }
439
440    pub(super) fn pack_cluster_replica_update(
441        &self,
442        cluster_id: ClusterId,
443        name: &str,
444        diff: Diff,
445    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
446        let cluster = &self.clusters_by_id[&cluster_id];
447        let id = cluster.replica_id(name).expect("Must exist");
448        let replica = cluster.replica(id).expect("Must exist");
449
450        let (size, disk, az, internal, pending) = match &replica.config.location {
451            // TODO(guswynn): The column should be `availability_zones`, not
452            // `availability_zone`.
453            ReplicaLocation::Managed(ManagedReplicaLocation {
454                size,
455                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
456                allocation: _,
457                billed_as: _,
458                internal,
459                pending,
460            }) => (
461                Some(&**size),
462                Some(self.cluster_replica_size_has_disk(size)),
463                Some(az.as_str()),
464                *internal,
465                *pending,
466            ),
467            ReplicaLocation::Managed(ManagedReplicaLocation {
468                size,
469                availability_zones: _,
470                allocation: _,
471                billed_as: _,
472                internal,
473                pending,
474            }) => (
475                Some(&**size),
476                Some(self.cluster_replica_size_has_disk(size)),
477                None,
478                *internal,
479                *pending,
480            ),
481            _ => (None, None, None, false, false),
482        };
483
484        let cluster_replica_update = BuiltinTableUpdate::row(
485            &*MZ_CLUSTER_REPLICAS,
486            Row::pack_slice(&[
487                Datum::String(&id.to_string()),
488                Datum::String(name),
489                Datum::String(&cluster_id.to_string()),
490                Datum::from(size),
491                Datum::from(az),
492                Datum::String(&replica.owner_id.to_string()),
493                Datum::from(disk),
494            ]),
495            diff,
496        );
497
498        let mut updates = vec![cluster_replica_update];
499
500        if internal {
501            let update = BuiltinTableUpdate::row(
502                &*MZ_INTERNAL_CLUSTER_REPLICAS,
503                Row::pack_slice(&[Datum::String(&id.to_string())]),
504                diff,
505            );
506            updates.push(update);
507        }
508
509        if pending {
510            let update = BuiltinTableUpdate::row(
511                &*MZ_PENDING_CLUSTER_REPLICAS,
512                Row::pack_slice(&[Datum::String(&id.to_string())]),
513                diff,
514            );
515            updates.push(update);
516        }
517
518        updates
519    }
520
521    pub(crate) fn pack_network_policy_update(
522        &self,
523        policy_id: &NetworkPolicyId,
524        diff: Diff,
525    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
526        let policy = self.get_network_policy(policy_id);
527        let row = self.pack_privilege_array_row(&policy.privileges);
528        let privileges = row.unpack_first();
529        let mut updates = Vec::new();
530        for ref rule in policy.rules.clone() {
531            updates.push(BuiltinTableUpdate::row(
532                &*MZ_NETWORK_POLICY_RULES,
533                Row::pack_slice(&[
534                    Datum::String(&rule.name),
535                    Datum::String(&policy.id.to_string()),
536                    Datum::String(&rule.action.to_string()),
537                    Datum::String(&rule.address.to_string()),
538                    Datum::String(&rule.direction.to_string()),
539                ]),
540                diff,
541            ));
542        }
543        updates.push(BuiltinTableUpdate::row(
544            &*MZ_NETWORK_POLICIES,
545            Row::pack_slice(&[
546                Datum::String(&policy.id.to_string()),
547                Datum::String(&policy.name),
548                Datum::String(&policy.owner_id.to_string()),
549                privileges,
550                Datum::UInt32(policy.oid.clone()),
551            ]),
552            diff,
553        ));
554
555        Ok(updates)
556    }
557
558    pub(super) fn pack_item_update(
559        &self,
560        id: CatalogItemId,
561        diff: Diff,
562    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
563        let entry = self.get_entry(&id);
564        let oid = entry.oid();
565        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
566        let schema_id = &self
567            .get_schema(
568                &entry.name().qualifiers.database_spec,
569                &entry.name().qualifiers.schema_spec,
570                conn_id,
571            )
572            .id;
573        let name = &entry.name().item;
574        let owner_id = entry.owner_id();
575        let privileges_row = self.pack_privilege_array_row(entry.privileges());
576        let privileges = privileges_row.unpack_first();
577        let mut updates = match entry.item() {
578            CatalogItem::Log(_) => self.pack_source_update(
579                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
580                privileges, diff, None,
581            ),
582            CatalogItem::Index(index) => {
583                self.pack_index_update(id, oid, name, owner_id, index, diff)
584            }
585            CatalogItem::Table(table) => {
586                let mut updates = self
587                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
588
589                if let TableDataSource::DataSource {
590                    desc: data_source,
591                    timeline: _,
592                } = &table.data_source
593                {
594                    updates.extend(match data_source {
595                        DataSourceDesc::IngestionExport {
596                            ingestion_id,
597                            external_reference: UnresolvedItemName(external_reference),
598                            details: _,
599                            data_config: _,
600                        } => {
601                            let ingestion_entry = self
602                                .get_entry(ingestion_id)
603                                .source_desc()
604                                .expect("primary source exists")
605                                .expect("primary source is a source");
606
607                            match ingestion_entry.connection.name() {
608                                "postgres" => {
609                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
610                                    // The left-most qualification of Postgres
611                                    // tables is the database, but this
612                                    // information is redundant because each
613                                    // Postgres connection connects to only one
614                                    // database.
615                                    let schema_name = external_reference[1].to_ast_string_simple();
616                                    let table_name = external_reference[2].to_ast_string_simple();
617
618                                    self.pack_postgres_source_tables_update(
619                                        id,
620                                        &schema_name,
621                                        &table_name,
622                                        diff,
623                                    )
624                                }
625                                "mysql" => {
626                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
627                                    let schema_name = external_reference[0].to_ast_string_simple();
628                                    let table_name = external_reference[1].to_ast_string_simple();
629
630                                    self.pack_mysql_source_tables_update(
631                                        id,
632                                        &schema_name,
633                                        &table_name,
634                                        diff,
635                                    )
636                                }
637                                "sql-server" => {
638                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
639                                    // The left-most qualification of SQL Server tables is
640                                    // the database, but this information is redundant
641                                    // because each SQL Server connection connects to
642                                    // only one database.
643                                    let schema_name = external_reference[1].to_ast_string_simple();
644                                    let table_name = external_reference[2].to_ast_string_simple();
645
646                                    self.pack_sql_server_source_table_update(
647                                        id,
648                                        &schema_name,
649                                        &table_name,
650                                        diff,
651                                    )
652                                }
653                                // Load generator sources don't have any special
654                                // updates.
655                                "load-generator" => vec![],
656                                "kafka" => {
657                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
658                                    let topic = external_reference[0].to_ast_string_simple();
659                                    let envelope = data_source.envelope();
660                                    let (key_format, value_format) = data_source.formats();
661
662                                    self.pack_kafka_source_tables_update(
663                                        id,
664                                        &topic,
665                                        envelope,
666                                        key_format,
667                                        value_format,
668                                        diff,
669                                    )
670                                }
671                                s => unreachable!("{s} sources do not have tables"),
672                            }
673                        }
674                        _ => vec![],
675                    });
676                }
677
678                updates
679            }
680            CatalogItem::Source(source) => {
681                let source_type = source.source_type();
682                let connection_id = source.connection_id();
683                let envelope = source.data_source.envelope();
684                let cluster_entry = match source.data_source {
685                    // Ingestion exports don't have their own cluster, but
686                    // run on their ingestion's cluster.
687                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
688                        self.get_entry(&ingestion_id)
689                    }
690                    _ => entry,
691                };
692
693                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
694
695                let (key_format, value_format) = source.data_source.formats();
696
697                let mut updates = self.pack_source_update(
698                    id,
699                    oid,
700                    schema_id,
701                    name,
702                    source_type,
703                    connection_id,
704                    envelope,
705                    key_format,
706                    value_format,
707                    cluster_id.as_deref(),
708                    owner_id,
709                    privileges,
710                    diff,
711                    source.create_sql.as_ref(),
712                );
713
714                updates.extend(match &source.data_source {
715                    DataSourceDesc::Ingestion { desc, .. }
716                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
717                        GenericSourceConnection::Postgres(postgres) => {
718                            self.pack_postgres_source_update(id, postgres, diff)
719                        }
720                        GenericSourceConnection::Kafka(kafka) => {
721                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
722                        }
723                        _ => vec![],
724                    },
725                    DataSourceDesc::IngestionExport {
726                        ingestion_id,
727                        external_reference: UnresolvedItemName(external_reference),
728                        details: _,
729                        data_config: _,
730                    } => {
731                        let ingestion_entry = self
732                            .get_entry(ingestion_id)
733                            .source_desc()
734                            .expect("primary source exists")
735                            .expect("primary source is a source");
736
737                        match ingestion_entry.connection.name() {
738                            "postgres" => {
739                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
740                                // The left-most qualification of Postgres
741                                // tables is the database, but this
742                                // information is redundant because each
743                                // Postgres connection connects to only one
744                                // database.
745                                let schema_name = external_reference[1].to_ast_string_simple();
746                                let table_name = external_reference[2].to_ast_string_simple();
747
748                                self.pack_postgres_source_tables_update(
749                                    id,
750                                    &schema_name,
751                                    &table_name,
752                                    diff,
753                                )
754                            }
755                            "mysql" => {
756                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
757                                let schema_name = external_reference[0].to_ast_string_simple();
758                                let table_name = external_reference[1].to_ast_string_simple();
759
760                                self.pack_mysql_source_tables_update(
761                                    id,
762                                    &schema_name,
763                                    &table_name,
764                                    diff,
765                                )
766                            }
767                            "sql-server" => {
768                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
769                                // The left-most qualification of SQL Server tables is
770                                // the database, but this information is redundant
771                                // because each SQL Server connection connects to
772                                // only one database.
773                                let schema_name = external_reference[1].to_ast_string_simple();
774                                let table_name = external_reference[2].to_ast_string_simple();
775
776                                self.pack_sql_server_source_table_update(
777                                    id,
778                                    &schema_name,
779                                    &table_name,
780                                    diff,
781                                )
782                            }
783                            // Load generator sources don't have any special
784                            // updates.
785                            "load-generator" => vec![],
786                            s => unreachable!("{s} sources do not have subsources"),
787                        }
788                    }
789                    DataSourceDesc::Webhook { .. } => {
790                        vec![self.pack_webhook_source_update(id, diff)]
791                    }
792                    _ => vec![],
793                });
794
795                updates
796            }
797            CatalogItem::View(view) => {
798                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
799            }
800            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
801                id, oid, schema_id, name, owner_id, privileges, mview, diff,
802            ),
803            CatalogItem::Sink(sink) => {
804                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
805            }
806            CatalogItem::Type(ty) => {
807                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
808            }
809            CatalogItem::Func(func) => {
810                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
811            }
812            CatalogItem::Secret(_) => {
813                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
814            }
815            CatalogItem::Connection(connection) => self.pack_connection_update(
816                id, oid, schema_id, name, owner_id, privileges, connection, diff,
817            ),
818            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
819                id, oid, schema_id, name, owner_id, privileges, ct, diff,
820            ),
821        };
822
823        if !entry.item().is_temporary() {
824            // Populate or clean up the `mz_object_dependencies` table.
825            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
826            for dependee in entry.item().references().items() {
827                updates.push(self.pack_depends_update(id, *dependee, diff))
828            }
829        }
830
831        let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
832        // Always report the latest for an objects columns.
833        if let Ok(desc) = entry.desc_latest(&full_name) {
834            let defaults = match entry.item() {
835                CatalogItem::Table(Table {
836                    data_source: TableDataSource::TableWrites { defaults },
837                    ..
838                }) => Some(defaults),
839                _ => None,
840            };
841            for (i, (column_name, column_type)) in desc.iter().enumerate() {
842                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
843                let default: Datum = default
844                    .as_ref()
845                    .map(|d| Datum::String(d))
846                    .unwrap_or(Datum::Null);
847                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
848                let (type_name, type_oid) = match &column_type.scalar_type {
849                    SqlScalarType::List {
850                        custom_id: Some(custom_id),
851                        ..
852                    }
853                    | SqlScalarType::Map {
854                        custom_id: Some(custom_id),
855                        ..
856                    }
857                    | SqlScalarType::Record {
858                        custom_id: Some(custom_id),
859                        ..
860                    } => {
861                        let entry = self.get_entry(custom_id);
862                        // NOTE(benesch): the `mz_columns.type text` field is
863                        // wrong. Types do not have a name that can be
864                        // represented as a single textual field. There can be
865                        // multiple types with the same name in different
866                        // schemas and databases. We should eventually deprecate
867                        // the `type` field in favor of a new `type_id` field
868                        // that can be joined against `mz_types`.
869                        //
870                        // For now, in the interest of pragmatism, we just use
871                        // the type's item name, and accept that there may be
872                        // ambiguity if the same type name is used in multiple
873                        // schemas. The ambiguity is mitigated by the OID, which
874                        // can be joined against `mz_types.oid` to resolve the
875                        // ambiguity.
876                        let name = &*entry.name().item;
877                        let oid = entry.oid();
878                        (name, oid)
879                    }
880                    _ => (pgtype.name(), pgtype.oid()),
881                };
882                updates.push(BuiltinTableUpdate::row(
883                    &*MZ_COLUMNS,
884                    Row::pack_slice(&[
885                        Datum::String(&id.to_string()),
886                        Datum::String(column_name),
887                        Datum::UInt64(u64::cast_from(i + 1)),
888                        Datum::from(column_type.nullable),
889                        Datum::String(type_name),
890                        default,
891                        Datum::UInt32(type_oid),
892                        Datum::Int32(pgtype.typmod()),
893                    ]),
894                    diff,
895                ));
896            }
897        }
898
899        // Use initial lcw so that we can tell apart default from non-existent windows.
900        if let Some(cw) = entry.item().initial_logical_compaction_window() {
901            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
902        }
903
904        updates.extend(Self::pack_item_global_id_update(entry, diff));
905
906        updates
907    }
908
909    fn pack_item_global_id_update(
910        entry: &CatalogEntry,
911        diff: Diff,
912    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
913        let id = entry.id().to_string();
914        let global_ids = entry.global_ids();
915        global_ids.map(move |global_id| {
916            BuiltinTableUpdate::row(
917                &*MZ_OBJECT_GLOBAL_IDS,
918                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
919                diff,
920            )
921        })
922    }
923
924    fn pack_history_retention_strategy_update(
925        &self,
926        id: CatalogItemId,
927        cw: CompactionWindow,
928        diff: Diff,
929    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
930        let cw: u64 = cw.comparable_timestamp().into();
931        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
932            .expect("must serialize");
933        BuiltinTableUpdate::row(
934            &*MZ_HISTORY_RETENTION_STRATEGIES,
935            Row::pack_slice(&[
936                Datum::String(&id.to_string()),
937                // FOR is the only strategy at the moment. We may introduce FROM or others later.
938                Datum::String("FOR"),
939                cw.into_row().into_element(),
940            ]),
941            diff,
942        )
943    }
944
945    fn pack_table_update(
946        &self,
947        id: CatalogItemId,
948        oid: u32,
949        schema_id: &SchemaSpecifier,
950        name: &str,
951        owner_id: &RoleId,
952        privileges: Datum,
953        diff: Diff,
954        table: &Table,
955    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
956        let redacted = table.create_sql.as_ref().map(|create_sql| {
957            mz_sql::parse::parse(create_sql)
958                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
959                .into_element()
960                .ast
961                .to_ast_string_redacted()
962        });
963        let source_id = if let TableDataSource::DataSource {
964            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
965            ..
966        } = &table.data_source
967        {
968            Some(ingestion_id.to_string())
969        } else {
970            None
971        };
972
973        vec![BuiltinTableUpdate::row(
974            &*MZ_TABLES,
975            Row::pack_slice(&[
976                Datum::String(&id.to_string()),
977                Datum::UInt32(oid),
978                Datum::String(&schema_id.to_string()),
979                Datum::String(name),
980                Datum::String(&owner_id.to_string()),
981                privileges,
982                if let Some(create_sql) = &table.create_sql {
983                    Datum::String(create_sql)
984                } else {
985                    Datum::Null
986                },
987                if let Some(redacted) = &redacted {
988                    Datum::String(redacted)
989                } else {
990                    Datum::Null
991                },
992                if let Some(source_id) = source_id.as_ref() {
993                    Datum::String(source_id)
994                } else {
995                    Datum::Null
996                },
997            ]),
998            diff,
999        )]
1000    }
1001
1002    fn pack_source_update(
1003        &self,
1004        id: CatalogItemId,
1005        oid: u32,
1006        schema_id: &SchemaSpecifier,
1007        name: &str,
1008        source_desc_name: &str,
1009        connection_id: Option<CatalogItemId>,
1010        envelope: Option<&str>,
1011        key_format: Option<&str>,
1012        value_format: Option<&str>,
1013        cluster_id: Option<&str>,
1014        owner_id: &RoleId,
1015        privileges: Datum,
1016        diff: Diff,
1017        create_sql: Option<&String>,
1018    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1019        let redacted = create_sql.map(|create_sql| {
1020            let create_stmt = mz_sql::parse::parse(create_sql)
1021                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1022                .into_element()
1023                .ast;
1024            create_stmt.to_ast_string_redacted()
1025        });
1026        vec![BuiltinTableUpdate::row(
1027            &*MZ_SOURCES,
1028            Row::pack_slice(&[
1029                Datum::String(&id.to_string()),
1030                Datum::UInt32(oid),
1031                Datum::String(&schema_id.to_string()),
1032                Datum::String(name),
1033                Datum::String(source_desc_name),
1034                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1035                // This is the "source size", which is a remnant from linked
1036                // clusters.
1037                Datum::Null,
1038                Datum::from(envelope),
1039                Datum::from(key_format),
1040                Datum::from(value_format),
1041                Datum::from(cluster_id),
1042                Datum::String(&owner_id.to_string()),
1043                privileges,
1044                if let Some(create_sql) = create_sql {
1045                    Datum::String(create_sql)
1046                } else {
1047                    Datum::Null
1048                },
1049                if let Some(redacted) = &redacted {
1050                    Datum::String(redacted)
1051                } else {
1052                    Datum::Null
1053                },
1054            ]),
1055            diff,
1056        )]
1057    }
1058
1059    fn pack_postgres_source_update(
1060        &self,
1061        id: CatalogItemId,
1062        postgres: &PostgresSourceConnection<ReferencedConnection>,
1063        diff: Diff,
1064    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1065        vec![BuiltinTableUpdate::row(
1066            &*MZ_POSTGRES_SOURCES,
1067            Row::pack_slice(&[
1068                Datum::String(&id.to_string()),
1069                Datum::String(&postgres.publication_details.slot),
1070                Datum::from(postgres.publication_details.timeline_id),
1071            ]),
1072            diff,
1073        )]
1074    }
1075
1076    fn pack_kafka_source_update(
1077        &self,
1078        item_id: CatalogItemId,
1079        collection_id: GlobalId,
1080        kafka: &KafkaSourceConnection<ReferencedConnection>,
1081        diff: Diff,
1082    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1083        vec![BuiltinTableUpdate::row(
1084            &*MZ_KAFKA_SOURCES,
1085            Row::pack_slice(&[
1086                Datum::String(&item_id.to_string()),
1087                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1088                Datum::String(&kafka.topic),
1089            ]),
1090            diff,
1091        )]
1092    }
1093
1094    fn pack_postgres_source_tables_update(
1095        &self,
1096        id: CatalogItemId,
1097        schema_name: &str,
1098        table_name: &str,
1099        diff: Diff,
1100    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1101        vec![BuiltinTableUpdate::row(
1102            &*MZ_POSTGRES_SOURCE_TABLES,
1103            Row::pack_slice(&[
1104                Datum::String(&id.to_string()),
1105                Datum::String(schema_name),
1106                Datum::String(table_name),
1107            ]),
1108            diff,
1109        )]
1110    }
1111
1112    fn pack_mysql_source_tables_update(
1113        &self,
1114        id: CatalogItemId,
1115        schema_name: &str,
1116        table_name: &str,
1117        diff: Diff,
1118    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1119        vec![BuiltinTableUpdate::row(
1120            &*MZ_MYSQL_SOURCE_TABLES,
1121            Row::pack_slice(&[
1122                Datum::String(&id.to_string()),
1123                Datum::String(schema_name),
1124                Datum::String(table_name),
1125            ]),
1126            diff,
1127        )]
1128    }
1129
1130    fn pack_sql_server_source_table_update(
1131        &self,
1132        id: CatalogItemId,
1133        schema_name: &str,
1134        table_name: &str,
1135        diff: Diff,
1136    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1137        vec![BuiltinTableUpdate::row(
1138            &*MZ_SQL_SERVER_SOURCE_TABLES,
1139            Row::pack_slice(&[
1140                Datum::String(&id.to_string()),
1141                Datum::String(schema_name),
1142                Datum::String(table_name),
1143            ]),
1144            diff,
1145        )]
1146    }
1147
1148    fn pack_kafka_source_tables_update(
1149        &self,
1150        id: CatalogItemId,
1151        topic: &str,
1152        envelope: Option<&str>,
1153        key_format: Option<&str>,
1154        value_format: Option<&str>,
1155        diff: Diff,
1156    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1157        vec![BuiltinTableUpdate::row(
1158            &*MZ_KAFKA_SOURCE_TABLES,
1159            Row::pack_slice(&[
1160                Datum::String(&id.to_string()),
1161                Datum::String(topic),
1162                Datum::from(envelope),
1163                Datum::from(key_format),
1164                Datum::from(value_format),
1165            ]),
1166            diff,
1167        )]
1168    }
1169
1170    fn pack_connection_update(
1171        &self,
1172        id: CatalogItemId,
1173        oid: u32,
1174        schema_id: &SchemaSpecifier,
1175        name: &str,
1176        owner_id: &RoleId,
1177        privileges: Datum,
1178        connection: &Connection,
1179        diff: Diff,
1180    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1181        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1182            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1183            .into_element()
1184            .ast;
1185        let mut updates = vec![BuiltinTableUpdate::row(
1186            &*MZ_CONNECTIONS,
1187            Row::pack_slice(&[
1188                Datum::String(&id.to_string()),
1189                Datum::UInt32(oid),
1190                Datum::String(&schema_id.to_string()),
1191                Datum::String(name),
1192                Datum::String(match connection.details {
1193                    ConnectionDetails::Kafka { .. } => "kafka",
1194                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1195                    ConnectionDetails::Postgres { .. } => "postgres",
1196                    ConnectionDetails::Aws(..) => "aws",
1197                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1198                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1199                    ConnectionDetails::MySql { .. } => "mysql",
1200                    ConnectionDetails::SqlServer(_) => "sql-server",
1201                    ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1202                }),
1203                Datum::String(&owner_id.to_string()),
1204                privileges,
1205                Datum::String(&connection.create_sql),
1206                Datum::String(&create_stmt.to_ast_string_redacted()),
1207            ]),
1208            diff,
1209        )];
1210        match connection.details {
1211            ConnectionDetails::Kafka(ref kafka) => {
1212                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1213            }
1214            ConnectionDetails::Aws(ref aws_config) => {
1215                match self.pack_aws_connection_update(id, aws_config, diff) {
1216                    Ok(update) => {
1217                        updates.push(update);
1218                    }
1219                    Err(e) => {
1220                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1221                    }
1222                }
1223            }
1224            ConnectionDetails::AwsPrivatelink(_) => {
1225                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1226                    updates.push(self.pack_aws_privatelink_connection_update(
1227                        id,
1228                        aws_principal_context,
1229                        diff,
1230                    ));
1231                } else {
1232                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1233                }
1234            }
1235            ConnectionDetails::Ssh {
1236                ref key_1,
1237                ref key_2,
1238                ..
1239            } => {
1240                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1241            }
1242            ConnectionDetails::Csr(_)
1243            | ConnectionDetails::Postgres(_)
1244            | ConnectionDetails::MySql(_)
1245            | ConnectionDetails::SqlServer(_)
1246            | ConnectionDetails::IcebergCatalog(_) => (),
1247        };
1248        updates
1249    }
1250
1251    pub(crate) fn pack_ssh_tunnel_connection_update(
1252        &self,
1253        id: CatalogItemId,
1254        key_1: &SshKey,
1255        key_2: &SshKey,
1256        diff: Diff,
1257    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1258        BuiltinTableUpdate::row(
1259            &*MZ_SSH_TUNNEL_CONNECTIONS,
1260            Row::pack_slice(&[
1261                Datum::String(&id.to_string()),
1262                Datum::String(key_1.public_key().as_str()),
1263                Datum::String(key_2.public_key().as_str()),
1264            ]),
1265            diff,
1266        )
1267    }
1268
1269    fn pack_kafka_connection_update(
1270        &self,
1271        id: CatalogItemId,
1272        kafka: &KafkaConnection<ReferencedConnection>,
1273        diff: Diff,
1274    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1275        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1276        let mut row = Row::default();
1277        row.packer()
1278            .try_push_array(
1279                &[ArrayDimension {
1280                    lower_bound: 1,
1281                    length: kafka.brokers.len(),
1282                }],
1283                kafka
1284                    .brokers
1285                    .iter()
1286                    .map(|broker| Datum::String(&broker.address)),
1287            )
1288            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1289        let brokers = row.unpack_first();
1290        vec![BuiltinTableUpdate::row(
1291            &*MZ_KAFKA_CONNECTIONS,
1292            Row::pack_slice(&[
1293                Datum::String(&id.to_string()),
1294                brokers,
1295                Datum::String(&progress_topic),
1296            ]),
1297            diff,
1298        )]
1299    }
1300
1301    pub fn pack_aws_privatelink_connection_update(
1302        &self,
1303        connection_id: CatalogItemId,
1304        aws_principal_context: &AwsPrincipalContext,
1305        diff: Diff,
1306    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1307        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1308        let row = Row::pack_slice(&[
1309            Datum::String(&connection_id.to_string()),
1310            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1311        ]);
1312        BuiltinTableUpdate::row(id, row, diff)
1313    }
1314
1315    pub fn pack_aws_connection_update(
1316        &self,
1317        connection_id: CatalogItemId,
1318        aws_config: &AwsConnection,
1319        diff: Diff,
1320    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1321        let id = &MZ_AWS_CONNECTIONS;
1322
1323        let mut access_key_id = None;
1324        let mut access_key_id_secret_id = None;
1325        let mut secret_access_key_secret_id = None;
1326        let mut session_token = None;
1327        let mut session_token_secret_id = None;
1328        let mut assume_role_arn = None;
1329        let mut assume_role_session_name = None;
1330        let mut principal = None;
1331        let mut external_id = None;
1332        let mut example_trust_policy = None;
1333        match &aws_config.auth {
1334            AwsAuth::Credentials(credentials) => {
1335                match &credentials.access_key_id {
1336                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1337                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1338                }
1339                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1340                match credentials.session_token.as_ref() {
1341                    None => (),
1342                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1343                    Some(StringOrSecret::Secret(s)) => {
1344                        session_token_secret_id = Some(s.to_string())
1345                    }
1346                }
1347            }
1348            AwsAuth::AssumeRole(assume_role) => {
1349                assume_role_arn = Some(assume_role.arn.as_str());
1350                assume_role_session_name = assume_role.session_name.as_deref();
1351                principal = self
1352                    .config
1353                    .connection_context
1354                    .aws_connection_role_arn
1355                    .as_deref();
1356                external_id =
1357                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1358                example_trust_policy = {
1359                    let policy = assume_role
1360                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1361                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1362                    Some(policy.into_row())
1363                };
1364            }
1365        }
1366
1367        let row = Row::pack_slice(&[
1368            Datum::String(&connection_id.to_string()),
1369            Datum::from(aws_config.endpoint.as_deref()),
1370            Datum::from(aws_config.region.as_deref()),
1371            Datum::from(access_key_id),
1372            Datum::from(access_key_id_secret_id.as_deref()),
1373            Datum::from(secret_access_key_secret_id.as_deref()),
1374            Datum::from(session_token),
1375            Datum::from(session_token_secret_id.as_deref()),
1376            Datum::from(assume_role_arn),
1377            Datum::from(assume_role_session_name),
1378            Datum::from(principal),
1379            Datum::from(external_id.as_deref()),
1380            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1381        ]);
1382
1383        Ok(BuiltinTableUpdate::row(id, row, diff))
1384    }
1385
1386    fn pack_view_update(
1387        &self,
1388        id: CatalogItemId,
1389        oid: u32,
1390        schema_id: &SchemaSpecifier,
1391        name: &str,
1392        owner_id: &RoleId,
1393        privileges: Datum,
1394        view: &View,
1395        diff: Diff,
1396    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1397        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1398            .unwrap_or_else(|e| {
1399                panic!(
1400                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1401                    view.create_sql, e
1402                )
1403            })
1404            .into_element()
1405            .ast;
1406        let query = match &create_stmt {
1407            Statement::CreateView(stmt) => &stmt.definition.query,
1408            _ => unreachable!(),
1409        };
1410
1411        let mut query_string = query.to_ast_string_stable();
1412        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1413        // do the same for compatibility's sake.
1414        query_string.push(';');
1415
1416        vec![BuiltinTableUpdate::row(
1417            &*MZ_VIEWS,
1418            Row::pack_slice(&[
1419                Datum::String(&id.to_string()),
1420                Datum::UInt32(oid),
1421                Datum::String(&schema_id.to_string()),
1422                Datum::String(name),
1423                Datum::String(&query_string),
1424                Datum::String(&owner_id.to_string()),
1425                privileges,
1426                Datum::String(&view.create_sql),
1427                Datum::String(&create_stmt.to_ast_string_redacted()),
1428            ]),
1429            diff,
1430        )]
1431    }
1432
1433    fn pack_materialized_view_update(
1434        &self,
1435        id: CatalogItemId,
1436        oid: u32,
1437        schema_id: &SchemaSpecifier,
1438        name: &str,
1439        owner_id: &RoleId,
1440        privileges: Datum,
1441        mview: &MaterializedView,
1442        diff: Diff,
1443    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1444        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1445            .unwrap_or_else(|e| {
1446                panic!(
1447                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1448                    mview.create_sql, e
1449                )
1450            })
1451            .into_element()
1452            .ast;
1453        let query_string = match &create_stmt {
1454            Statement::CreateMaterializedView(stmt) => {
1455                let mut query_string = stmt.query.to_ast_string_stable();
1456                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1457                // do the same for compatibility's sake.
1458                query_string.push(';');
1459                query_string
1460            }
1461            _ => unreachable!(),
1462        };
1463
1464        let mut updates = Vec::new();
1465
1466        updates.push(BuiltinTableUpdate::row(
1467            &*MZ_MATERIALIZED_VIEWS,
1468            Row::pack_slice(&[
1469                Datum::String(&id.to_string()),
1470                Datum::UInt32(oid),
1471                Datum::String(&schema_id.to_string()),
1472                Datum::String(name),
1473                Datum::String(&mview.cluster_id.to_string()),
1474                Datum::String(&query_string),
1475                Datum::String(&owner_id.to_string()),
1476                privileges,
1477                Datum::String(&mview.create_sql),
1478                Datum::String(&create_stmt.to_ast_string_redacted()),
1479            ]),
1480            diff,
1481        ));
1482
1483        if let Some(refresh_schedule) = &mview.refresh_schedule {
1484            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1485            // empty `RefreshSchedule`.
1486            assert!(!refresh_schedule.is_empty());
1487            for RefreshEvery {
1488                interval,
1489                aligned_to,
1490            } in refresh_schedule.everies.iter()
1491            {
1492                let aligned_to_dt = mz_ore::now::to_datetime(
1493                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1494                );
1495                updates.push(BuiltinTableUpdate::row(
1496                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1497                    Row::pack_slice(&[
1498                        Datum::String(&id.to_string()),
1499                        Datum::String("every"),
1500                        Datum::Interval(
1501                            Interval::from_duration(interval).expect(
1502                                "planning ensured that this is convertible back to Interval",
1503                            ),
1504                        ),
1505                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1506                        Datum::Null,
1507                    ]),
1508                    diff,
1509                ));
1510            }
1511            for at in refresh_schedule.ats.iter() {
1512                let at_dt = mz_ore::now::to_datetime(
1513                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1514                );
1515                updates.push(BuiltinTableUpdate::row(
1516                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1517                    Row::pack_slice(&[
1518                        Datum::String(&id.to_string()),
1519                        Datum::String("at"),
1520                        Datum::Null,
1521                        Datum::Null,
1522                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1523                    ]),
1524                    diff,
1525                ));
1526            }
1527        } else {
1528            updates.push(BuiltinTableUpdate::row(
1529                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1530                Row::pack_slice(&[
1531                    Datum::String(&id.to_string()),
1532                    Datum::String("on-commit"),
1533                    Datum::Null,
1534                    Datum::Null,
1535                    Datum::Null,
1536                ]),
1537                diff,
1538            ));
1539        }
1540
1541        updates
1542    }
1543
1544    fn pack_continual_task_update(
1545        &self,
1546        id: CatalogItemId,
1547        oid: u32,
1548        schema_id: &SchemaSpecifier,
1549        name: &str,
1550        owner_id: &RoleId,
1551        privileges: Datum,
1552        ct: &ContinualTask,
1553        diff: Diff,
1554    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1555        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1556            .unwrap_or_else(|e| {
1557                panic!(
1558                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1559                    ct.create_sql, e
1560                )
1561            })
1562            .into_element()
1563            .ast;
1564        let query_string = match &create_stmt {
1565            Statement::CreateContinualTask(stmt) => {
1566                let mut query_string = String::new();
1567                for stmt in &stmt.stmts {
1568                    let s = match stmt {
1569                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1570                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1571                    };
1572                    if query_string.is_empty() {
1573                        query_string = s;
1574                    } else {
1575                        query_string.push_str("; ");
1576                        query_string.push_str(&s);
1577                    }
1578                }
1579                query_string
1580            }
1581            _ => unreachable!(),
1582        };
1583
1584        vec![BuiltinTableUpdate::row(
1585            &*MZ_CONTINUAL_TASKS,
1586            Row::pack_slice(&[
1587                Datum::String(&id.to_string()),
1588                Datum::UInt32(oid),
1589                Datum::String(&schema_id.to_string()),
1590                Datum::String(name),
1591                Datum::String(&ct.cluster_id.to_string()),
1592                Datum::String(&query_string),
1593                Datum::String(&owner_id.to_string()),
1594                privileges,
1595                Datum::String(&ct.create_sql),
1596                Datum::String(&create_stmt.to_ast_string_redacted()),
1597            ]),
1598            diff,
1599        )]
1600    }
1601
1602    fn pack_sink_update(
1603        &self,
1604        id: CatalogItemId,
1605        oid: u32,
1606        schema_id: &SchemaSpecifier,
1607        name: &str,
1608        owner_id: &RoleId,
1609        sink: &Sink,
1610        diff: Diff,
1611    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1612        let mut updates = vec![];
1613        match &sink.connection {
1614            StorageSinkConnection::Kafka(KafkaSinkConnection {
1615                topic: topic_name, ..
1616            }) => {
1617                updates.push(BuiltinTableUpdate::row(
1618                    &*MZ_KAFKA_SINKS,
1619                    Row::pack_slice(&[
1620                        Datum::String(&id.to_string()),
1621                        Datum::String(topic_name.as_str()),
1622                    ]),
1623                    diff,
1624                ));
1625            }
1626            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1627                namespace, table, ..
1628            }) => {
1629                updates.push(BuiltinTableUpdate::row(
1630                    &*MZ_ICEBERG_SINKS,
1631                    Row::pack_slice(&[
1632                        Datum::String(&id.to_string()),
1633                        Datum::String(namespace.as_str()),
1634                        Datum::String(table.as_str()),
1635                    ]),
1636                    diff,
1637                ));
1638            }
1639        };
1640
1641        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1642            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1643            .into_element()
1644            .ast;
1645
1646        let envelope = sink.envelope();
1647
1648        // The combined format string is used for the deprecated `format` column.
1649        let combined_format = sink.combined_format();
1650        let (key_format, value_format) = match sink.formats() {
1651            Some((key_format, value_format)) => (key_format, Some(value_format)),
1652            None => (None, None),
1653        };
1654
1655        updates.push(BuiltinTableUpdate::row(
1656            &*MZ_SINKS,
1657            Row::pack_slice(&[
1658                Datum::String(&id.to_string()),
1659                Datum::UInt32(oid),
1660                Datum::String(&schema_id.to_string()),
1661                Datum::String(name),
1662                Datum::String(sink.connection.name()),
1663                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1664                // size column now deprecated w/o linked clusters
1665                Datum::Null,
1666                Datum::from(envelope),
1667                // FIXME: These key/value formats are kinda leaky! Should probably live in
1668                // the kafka sink table.
1669                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1670                Datum::from(key_format),
1671                Datum::from(value_format),
1672                Datum::String(&sink.cluster_id.to_string()),
1673                Datum::String(&owner_id.to_string()),
1674                Datum::String(&sink.create_sql),
1675                Datum::String(&create_stmt.to_ast_string_redacted()),
1676            ]),
1677            diff,
1678        ));
1679
1680        updates
1681    }
1682
1683    fn pack_index_update(
1684        &self,
1685        id: CatalogItemId,
1686        oid: u32,
1687        name: &str,
1688        owner_id: &RoleId,
1689        index: &Index,
1690        diff: Diff,
1691    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1692        let mut updates = vec![];
1693
1694        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1695            .unwrap_or_else(|e| {
1696                panic!(
1697                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1698                    index.create_sql, e
1699                )
1700            })
1701            .into_element()
1702            .ast;
1703
1704        let key_sqls = match &create_stmt {
1705            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1706                .as_ref()
1707                .expect("key_parts is filled in during planning"),
1708            _ => unreachable!(),
1709        };
1710        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1711
1712        updates.push(BuiltinTableUpdate::row(
1713            &*MZ_INDEXES,
1714            Row::pack_slice(&[
1715                Datum::String(&id.to_string()),
1716                Datum::UInt32(oid),
1717                Datum::String(name),
1718                Datum::String(&on_item_id.to_string()),
1719                Datum::String(&index.cluster_id.to_string()),
1720                Datum::String(&owner_id.to_string()),
1721                Datum::String(&index.create_sql),
1722                Datum::String(&create_stmt.to_ast_string_redacted()),
1723            ]),
1724            diff,
1725        ));
1726
1727        for (i, key) in index.keys.iter().enumerate() {
1728            let on_entry = self.get_entry_by_global_id(&index.on);
1729            let nullable = key
1730                .typ(
1731                    &on_entry
1732                        .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1733                        .expect("can only create indexes on items with a valid description")
1734                        .typ()
1735                        .column_types,
1736                )
1737                .nullable;
1738            let seq_in_index = u64::cast_from(i + 1);
1739            let key_sql = key_sqls
1740                .get(i)
1741                .expect("missing sql information for index key")
1742                .to_ast_string_simple();
1743            let (field_number, expression) = match key {
1744                MirScalarExpr::Column(col, _) => {
1745                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1746                }
1747                _ => (Datum::Null, Datum::String(&key_sql)),
1748            };
1749            updates.push(BuiltinTableUpdate::row(
1750                &*MZ_INDEX_COLUMNS,
1751                Row::pack_slice(&[
1752                    Datum::String(&id.to_string()),
1753                    Datum::UInt64(seq_in_index),
1754                    field_number,
1755                    expression,
1756                    Datum::from(nullable),
1757                ]),
1758                diff,
1759            ));
1760        }
1761
1762        updates
1763    }
1764
1765    fn pack_type_update(
1766        &self,
1767        id: CatalogItemId,
1768        oid: u32,
1769        schema_id: &SchemaSpecifier,
1770        name: &str,
1771        owner_id: &RoleId,
1772        privileges: Datum,
1773        typ: &Type,
1774        diff: Diff,
1775    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1776        let mut out = vec![];
1777
1778        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1779            mz_sql::parse::parse(create_sql)
1780                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1781                .into_element()
1782                .ast
1783                .to_ast_string_redacted()
1784        });
1785
1786        out.push(BuiltinTableUpdate::row(
1787            &*MZ_TYPES,
1788            Row::pack_slice(&[
1789                Datum::String(&id.to_string()),
1790                Datum::UInt32(oid),
1791                Datum::String(&schema_id.to_string()),
1792                Datum::String(name),
1793                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1794                Datum::String(&owner_id.to_string()),
1795                privileges,
1796                if let Some(create_sql) = &typ.create_sql {
1797                    Datum::String(create_sql)
1798                } else {
1799                    Datum::Null
1800                },
1801                if let Some(redacted) = &redacted {
1802                    Datum::String(redacted)
1803                } else {
1804                    Datum::Null
1805                },
1806            ]),
1807            diff,
1808        ));
1809
1810        let mut row = Row::default();
1811        let mut packer = row.packer();
1812
1813        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1814            if mods.is_empty() {
1815                packer.push(Datum::Null);
1816            } else {
1817                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1818            }
1819        }
1820
1821        let index_id = match &typ.details.typ {
1822            CatalogType::Array {
1823                element_reference: element_id,
1824            } => {
1825                packer.push(Datum::String(&id.to_string()));
1826                packer.push(Datum::String(&element_id.to_string()));
1827                &MZ_ARRAY_TYPES
1828            }
1829            CatalogType::List {
1830                element_reference: element_id,
1831                element_modifiers,
1832            } => {
1833                packer.push(Datum::String(&id.to_string()));
1834                packer.push(Datum::String(&element_id.to_string()));
1835                append_modifier(&mut packer, element_modifiers);
1836                &MZ_LIST_TYPES
1837            }
1838            CatalogType::Map {
1839                key_reference: key_id,
1840                value_reference: value_id,
1841                key_modifiers,
1842                value_modifiers,
1843            } => {
1844                packer.push(Datum::String(&id.to_string()));
1845                packer.push(Datum::String(&key_id.to_string()));
1846                packer.push(Datum::String(&value_id.to_string()));
1847                append_modifier(&mut packer, key_modifiers);
1848                append_modifier(&mut packer, value_modifiers);
1849                &MZ_MAP_TYPES
1850            }
1851            CatalogType::Pseudo => {
1852                packer.push(Datum::String(&id.to_string()));
1853                &MZ_PSEUDO_TYPES
1854            }
1855            _ => {
1856                packer.push(Datum::String(&id.to_string()));
1857                &MZ_BASE_TYPES
1858            }
1859        };
1860        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1861
1862        if let Some(pg_metadata) = &typ.details.pg_metadata {
1863            out.push(BuiltinTableUpdate::row(
1864                &*MZ_TYPE_PG_METADATA,
1865                Row::pack_slice(&[
1866                    Datum::String(&id.to_string()),
1867                    Datum::UInt32(pg_metadata.typinput_oid),
1868                    Datum::UInt32(pg_metadata.typreceive_oid),
1869                ]),
1870                diff,
1871            ));
1872        }
1873
1874        out
1875    }
1876
1877    fn pack_func_update(
1878        &self,
1879        id: CatalogItemId,
1880        schema_id: &SchemaSpecifier,
1881        name: &str,
1882        owner_id: &RoleId,
1883        func: &Func,
1884        diff: Diff,
1885    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1886        let mut updates = vec![];
1887        for func_impl_details in func.inner.func_impls() {
1888            let arg_type_ids = func_impl_details
1889                .arg_typs
1890                .iter()
1891                .map(|typ| self.get_system_type(typ).id().to_string())
1892                .collect::<Vec<_>>();
1893
1894            let mut row = Row::default();
1895            row.packer()
1896                .try_push_array(
1897                    &[ArrayDimension {
1898                        lower_bound: 1,
1899                        length: arg_type_ids.len(),
1900                    }],
1901                    arg_type_ids.iter().map(|id| Datum::String(id)),
1902                )
1903                .expect(
1904                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1905                );
1906            let arg_type_ids = row.unpack_first();
1907
1908            updates.push(BuiltinTableUpdate::row(
1909                &*MZ_FUNCTIONS,
1910                Row::pack_slice(&[
1911                    Datum::String(&id.to_string()),
1912                    Datum::UInt32(func_impl_details.oid),
1913                    Datum::String(&schema_id.to_string()),
1914                    Datum::String(name),
1915                    arg_type_ids,
1916                    Datum::from(
1917                        func_impl_details
1918                            .variadic_typ
1919                            .map(|typ| self.get_system_type(typ).id().to_string())
1920                            .as_deref(),
1921                    ),
1922                    Datum::from(
1923                        func_impl_details
1924                            .return_typ
1925                            .map(|typ| self.get_system_type(typ).id().to_string())
1926                            .as_deref(),
1927                    ),
1928                    func_impl_details.return_is_set.into(),
1929                    Datum::String(&owner_id.to_string()),
1930                ]),
1931                diff,
1932            ));
1933
1934            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1935                updates.push(BuiltinTableUpdate::row(
1936                    &*MZ_AGGREGATES,
1937                    Row::pack_slice(&[
1938                        Datum::UInt32(func_impl_details.oid),
1939                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1940                        Datum::String("n"),
1941                        Datum::Int16(0),
1942                    ]),
1943                    diff,
1944                ));
1945            }
1946        }
1947        updates
1948    }
1949
1950    pub fn pack_op_update(
1951        &self,
1952        operator: &str,
1953        func_impl_details: FuncImplCatalogDetails,
1954        diff: Diff,
1955    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1956        let arg_type_ids = func_impl_details
1957            .arg_typs
1958            .iter()
1959            .map(|typ| self.get_system_type(typ).id().to_string())
1960            .collect::<Vec<_>>();
1961
1962        let mut row = Row::default();
1963        row.packer()
1964            .try_push_array(
1965                &[ArrayDimension {
1966                    lower_bound: 1,
1967                    length: arg_type_ids.len(),
1968                }],
1969                arg_type_ids.iter().map(|id| Datum::String(id)),
1970            )
1971            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1972        let arg_type_ids = row.unpack_first();
1973
1974        BuiltinTableUpdate::row(
1975            &*MZ_OPERATORS,
1976            Row::pack_slice(&[
1977                Datum::UInt32(func_impl_details.oid),
1978                Datum::String(operator),
1979                arg_type_ids,
1980                Datum::from(
1981                    func_impl_details
1982                        .return_typ
1983                        .map(|typ| self.get_system_type(typ).id().to_string())
1984                        .as_deref(),
1985                ),
1986            ]),
1987            diff,
1988        )
1989    }
1990
1991    fn pack_secret_update(
1992        &self,
1993        id: CatalogItemId,
1994        oid: u32,
1995        schema_id: &SchemaSpecifier,
1996        name: &str,
1997        owner_id: &RoleId,
1998        privileges: Datum,
1999        diff: Diff,
2000    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2001        vec![BuiltinTableUpdate::row(
2002            &*MZ_SECRETS,
2003            Row::pack_slice(&[
2004                Datum::String(&id.to_string()),
2005                Datum::UInt32(oid),
2006                Datum::String(&schema_id.to_string()),
2007                Datum::String(name),
2008                Datum::String(&owner_id.to_string()),
2009                privileges,
2010            ]),
2011            diff,
2012        )]
2013    }
2014
2015    pub fn pack_audit_log_update(
2016        &self,
2017        event: &VersionedEvent,
2018        diff: Diff,
2019    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2020        let (event_type, object_type, details, user, occurred_at): (
2021            &EventType,
2022            &ObjectType,
2023            &EventDetails,
2024            &Option<String>,
2025            u64,
2026        ) = match event {
2027            VersionedEvent::V1(ev) => (
2028                &ev.event_type,
2029                &ev.object_type,
2030                &ev.details,
2031                &ev.user,
2032                ev.occurred_at,
2033            ),
2034        };
2035        let details = Jsonb::from_serde_json(details.as_json())
2036            .map_err(|e| {
2037                Error::new(ErrorKind::Unstructured(format!(
2038                    "could not pack audit log update: {}",
2039                    e
2040                )))
2041            })?
2042            .into_row();
2043        let details = details
2044            .iter()
2045            .next()
2046            .expect("details created above with a single jsonb column");
2047        let dt = mz_ore::now::to_datetime(occurred_at);
2048        let id = event.sortable_id();
2049        Ok(BuiltinTableUpdate::row(
2050            &*MZ_AUDIT_EVENTS,
2051            Row::pack_slice(&[
2052                Datum::UInt64(id),
2053                Datum::String(&format!("{}", event_type)),
2054                Datum::String(&format!("{}", object_type)),
2055                details,
2056                match user {
2057                    Some(user) => Datum::String(user),
2058                    None => Datum::Null,
2059                },
2060                Datum::TimestampTz(dt.try_into().expect("must fit")),
2061            ]),
2062            diff,
2063        ))
2064    }
2065
2066    pub fn pack_storage_usage_update(
2067        &self,
2068        VersionedStorageUsage::V1(event): VersionedStorageUsage,
2069        diff: Diff,
2070    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2071        let id = &MZ_STORAGE_USAGE_BY_SHARD;
2072        let row = Row::pack_slice(&[
2073            Datum::UInt64(event.id),
2074            Datum::from(event.shard_id.as_deref()),
2075            Datum::UInt64(event.size_bytes),
2076            Datum::TimestampTz(
2077                mz_ore::now::to_datetime(event.collection_timestamp)
2078                    .try_into()
2079                    .expect("must fit"),
2080            ),
2081        ]);
2082        BuiltinTableUpdate::row(id, row, diff)
2083    }
2084
2085    pub fn pack_egress_ip_update(
2086        &self,
2087        ip: &IpNet,
2088    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2089        let id = &MZ_EGRESS_IPS;
2090        let addr = ip.addr();
2091        let row = Row::pack_slice(&[
2092            Datum::String(&addr.to_string()),
2093            Datum::Int32(ip.prefix_len().into()),
2094            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2095        ]);
2096        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2097    }
2098
2099    pub fn pack_license_key_update(
2100        &self,
2101        license_key: &ValidatedLicenseKey,
2102    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2103        let id = &MZ_LICENSE_KEYS;
2104        let row = Row::pack_slice(&[
2105            Datum::String(&license_key.id),
2106            Datum::String(&license_key.organization),
2107            Datum::String(&license_key.environment_id),
2108            Datum::TimestampTz(
2109                mz_ore::now::to_datetime(license_key.expiration * 1000)
2110                    .try_into()
2111                    .expect("must fit"),
2112            ),
2113            Datum::TimestampTz(
2114                mz_ore::now::to_datetime(license_key.not_before * 1000)
2115                    .try_into()
2116                    .expect("must fit"),
2117            ),
2118        ]);
2119        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2120    }
2121
2122    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2123        let mut updates = Vec::new();
2124        for (size, alloc) in &self.cluster_replica_sizes.0 {
2125            if alloc.disabled {
2126                continue;
2127            }
2128
2129            // Just invent something when the limits are `None`, which only happens in non-prod
2130            // environments (tests, process orchestrator, etc.)
2131            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2132            let MemoryLimit(ByteSize(memory_bytes)) =
2133                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2134            let DiskLimit(ByteSize(disk_bytes)) =
2135                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2136
2137            let row = Row::pack_slice(&[
2138                size.as_str().into(),
2139                u64::cast_from(alloc.scale).into(),
2140                u64::cast_from(alloc.workers).into(),
2141                cpu_limit.as_nanocpus().into(),
2142                memory_bytes.into(),
2143                disk_bytes.into(),
2144                (alloc.credits_per_hour).into(),
2145            ]);
2146
2147            updates.push(BuiltinTableUpdate::row(
2148                &*MZ_CLUSTER_REPLICA_SIZES,
2149                row,
2150                Diff::ONE,
2151            ));
2152        }
2153
2154        updates
2155    }
2156
2157    pub fn pack_subscribe_update(
2158        &self,
2159        id: GlobalId,
2160        subscribe: &ActiveSubscribe,
2161        diff: Diff,
2162    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2163        let mut row = Row::default();
2164        let mut packer = row.packer();
2165        packer.push(Datum::String(&id.to_string()));
2166        packer.push(Datum::Uuid(subscribe.session_uuid));
2167        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2168
2169        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2170        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2171
2172        let depends_on: Vec<_> = subscribe
2173            .depends_on
2174            .iter()
2175            .map(|id| id.to_string())
2176            .collect();
2177        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2178
2179        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2180    }
2181
2182    pub fn pack_session_update(
2183        &self,
2184        conn: &ConnMeta,
2185        diff: Diff,
2186    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2187        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2188        BuiltinTableUpdate::row(
2189            &*MZ_SESSIONS,
2190            Row::pack_slice(&[
2191                Datum::Uuid(conn.uuid()),
2192                Datum::UInt32(conn.conn_id().unhandled()),
2193                Datum::String(&conn.authenticated_role_id().to_string()),
2194                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2195                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2196            ]),
2197            diff,
2198        )
2199    }
2200
2201    pub fn pack_default_privileges_update(
2202        &self,
2203        default_privilege_object: &DefaultPrivilegeObject,
2204        grantee: &RoleId,
2205        acl_mode: &AclMode,
2206        diff: Diff,
2207    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2208        BuiltinTableUpdate::row(
2209            &*MZ_DEFAULT_PRIVILEGES,
2210            Row::pack_slice(&[
2211                default_privilege_object.role_id.to_string().as_str().into(),
2212                default_privilege_object
2213                    .database_id
2214                    .map(|database_id| database_id.to_string())
2215                    .as_deref()
2216                    .into(),
2217                default_privilege_object
2218                    .schema_id
2219                    .map(|schema_id| schema_id.to_string())
2220                    .as_deref()
2221                    .into(),
2222                default_privilege_object
2223                    .object_type
2224                    .to_string()
2225                    .to_lowercase()
2226                    .as_str()
2227                    .into(),
2228                grantee.to_string().as_str().into(),
2229                acl_mode.to_string().as_str().into(),
2230            ]),
2231            diff,
2232        )
2233    }
2234
2235    pub fn pack_system_privileges_update(
2236        &self,
2237        privileges: MzAclItem,
2238        diff: Diff,
2239    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2240        BuiltinTableUpdate::row(
2241            &*MZ_SYSTEM_PRIVILEGES,
2242            Row::pack_slice(&[privileges.into()]),
2243            diff,
2244        )
2245    }
2246
2247    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2248        let mut row = Row::default();
2249        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2250        row.packer()
2251            .try_push_array(
2252                &[ArrayDimension {
2253                    lower_bound: 1,
2254                    length: flat_privileges.len(),
2255                }],
2256                flat_privileges
2257                    .into_iter()
2258                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2259            )
2260            .expect("privileges is 1 dimensional, and its length is used for the array length");
2261        row
2262    }
2263
2264    pub fn pack_comment_update(
2265        &self,
2266        object_id: CommentObjectId,
2267        column_pos: Option<usize>,
2268        comment: &str,
2269        diff: Diff,
2270    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2271        // Use the audit log representation so it's easier to join against.
2272        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2273        let audit_type = super::object_type_to_audit_object_type(object_type);
2274        let object_type_str = audit_type.to_string();
2275
2276        let object_id_str = match object_id {
2277            CommentObjectId::Table(global_id)
2278            | CommentObjectId::View(global_id)
2279            | CommentObjectId::MaterializedView(global_id)
2280            | CommentObjectId::Source(global_id)
2281            | CommentObjectId::Sink(global_id)
2282            | CommentObjectId::Index(global_id)
2283            | CommentObjectId::Func(global_id)
2284            | CommentObjectId::Connection(global_id)
2285            | CommentObjectId::Secret(global_id)
2286            | CommentObjectId::Type(global_id)
2287            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2288            CommentObjectId::Role(role_id) => role_id.to_string(),
2289            CommentObjectId::Database(database_id) => database_id.to_string(),
2290            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2291            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2292            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2293            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2294        };
2295        let column_pos_datum = match column_pos {
2296            Some(pos) => {
2297                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2298                let pos =
2299                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2300                Datum::Int32(pos)
2301            }
2302            None => Datum::Null,
2303        };
2304
2305        BuiltinTableUpdate::row(
2306            &*MZ_COMMENTS,
2307            Row::pack_slice(&[
2308                Datum::String(&object_id_str),
2309                Datum::String(&object_type_str),
2310                column_pos_datum,
2311                Datum::String(comment),
2312            ]),
2313            diff,
2314        )
2315    }
2316
2317    pub fn pack_webhook_source_update(
2318        &self,
2319        item_id: CatalogItemId,
2320        diff: Diff,
2321    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2322        let url = self
2323            .try_get_webhook_url(&item_id)
2324            .expect("webhook source should exist");
2325        let url = url.to_string();
2326        let name = &self.get_entry(&item_id).name().item;
2327        let id_str = item_id.to_string();
2328
2329        BuiltinTableUpdate::row(
2330            &*MZ_WEBHOOKS_SOURCES,
2331            Row::pack_slice(&[
2332                Datum::String(&id_str),
2333                Datum::String(name),
2334                Datum::String(&url),
2335            ]),
2336            diff,
2337        )
2338    }
2339
2340    pub fn pack_source_references_update(
2341        &self,
2342        source_references: &SourceReferences,
2343        diff: Diff,
2344    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2345        let source_id = source_references.source_id.to_string();
2346        let updated_at = &source_references.updated_at;
2347        source_references
2348            .references
2349            .iter()
2350            .map(|reference| {
2351                let mut row = Row::default();
2352                let mut packer = row.packer();
2353                packer.extend([
2354                    Datum::String(&source_id),
2355                    reference
2356                        .namespace
2357                        .as_ref()
2358                        .map(|s| Datum::String(s))
2359                        .unwrap_or(Datum::Null),
2360                    Datum::String(&reference.name),
2361                    Datum::TimestampTz(
2362                        mz_ore::now::to_datetime(*updated_at)
2363                            .try_into()
2364                            .expect("must fit"),
2365                    ),
2366                ]);
2367                if reference.columns.len() > 0 {
2368                    packer
2369                        .try_push_array(
2370                            &[ArrayDimension {
2371                                lower_bound: 1,
2372                                length: reference.columns.len(),
2373                            }],
2374                            reference.columns.iter().map(|col| Datum::String(col)),
2375                        )
2376                        .expect(
2377                            "columns is 1 dimensional, and its length is used for the array length",
2378                        );
2379                } else {
2380                    packer.push(Datum::Null);
2381                }
2382
2383                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2384            })
2385            .collect()
2386    }
2387}