Skip to main content

mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20    MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
21    MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
22    MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
23    MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
24    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
25    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
26    MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
27    MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
28    MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
29    MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
30    MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
31    MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
32    MZ_WEBHOOKS_SOURCES,
33};
34use mz_catalog::config::AwsPrincipalContext;
35use mz_catalog::durable::SourceReferences;
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38    CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
39    Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
40};
41use mz_controller::clusters::{
42    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
43};
44use mz_controller_types::ClusterId;
45use mz_expr::MirScalarExpr;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_persist_client::batch::ProtoBatch;
51use mz_repr::adt::array::ArrayDimension;
52use mz_repr::adt::interval::Interval;
53use mz_repr::adt::jsonb::Jsonb;
54use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
55use mz_repr::adt::regex;
56use mz_repr::network_policy_id::NetworkPolicyId;
57use mz_repr::refresh_schedule::RefreshEvery;
58use mz_repr::role_id::RoleId;
59use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
60use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
61use mz_sql::catalog::{
62    CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
63    TypeCategory,
64};
65use mz_sql::func::FuncImplCatalogDetails;
66use mz_sql::names::{
67    CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
68};
69use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
70use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
71use mz_sql::session::vars::SessionVars;
72use mz_sql_parser::ast::display::AstDisplay;
73use mz_storage_client::client::TableData;
74use mz_storage_types::connections::KafkaConnection;
75use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
76use mz_storage_types::connections::inline::ReferencedConnection;
77use mz_storage_types::connections::string_or_secret::StringOrSecret;
78use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
79use mz_storage_types::sources::{
80    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
81};
82use smallvec::smallvec;
83
84// DO NOT add any more imports from `crate` outside of `crate::catalog`.
85use crate::active_compute_sink::ActiveSubscribe;
86use crate::catalog::CatalogState;
87use crate::coord::ConnMeta;
88
89/// An update to a built-in table.
90#[derive(Debug, Clone)]
91pub struct BuiltinTableUpdate<T = CatalogItemId> {
92    /// The reference of the table to update.
93    pub id: T,
94    /// The data to put into the table.
95    pub data: TableData,
96}
97
98impl<T> BuiltinTableUpdate<T> {
99    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
100    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
101        BuiltinTableUpdate {
102            id,
103            data: TableData::Rows(vec![(row, diff)]),
104        }
105    }
106
107    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
108        BuiltinTableUpdate {
109            id,
110            data: TableData::Batches(smallvec![batch]),
111        }
112    }
113}
114
115impl CatalogState {
116    pub fn resolve_builtin_table_updates(
117        &self,
118        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
119    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
120        builtin_table_update
121            .into_iter()
122            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
123            .collect()
124    }
125
126    pub fn resolve_builtin_table_update(
127        &self,
128        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
129    ) -> BuiltinTableUpdate<CatalogItemId> {
130        let id = self.resolve_builtin_table(id);
131        BuiltinTableUpdate { id, data }
132    }
133
134    pub fn pack_depends_update(
135        &self,
136        depender: CatalogItemId,
137        dependee: CatalogItemId,
138        diff: Diff,
139    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
140        let row = Row::pack_slice(&[
141            Datum::String(&depender.to_string()),
142            Datum::String(&dependee.to_string()),
143        ]);
144        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
145    }
146
147    pub(super) fn pack_database_update(
148        &self,
149        database_id: &DatabaseId,
150        diff: Diff,
151    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
152        let database = self.get_database(database_id);
153        let row = self.pack_privilege_array_row(database.privileges());
154        let privileges = row.unpack_first();
155        BuiltinTableUpdate::row(
156            &*MZ_DATABASES,
157            Row::pack_slice(&[
158                Datum::String(&database.id.to_string()),
159                Datum::UInt32(database.oid),
160                Datum::String(database.name()),
161                Datum::String(&database.owner_id.to_string()),
162                privileges,
163            ]),
164            diff,
165        )
166    }
167
168    pub(super) fn pack_schema_update(
169        &self,
170        database_spec: &ResolvedDatabaseSpecifier,
171        schema_id: &SchemaId,
172        diff: Diff,
173    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
174        let (database_id, schema) = match database_spec {
175            ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
176            ResolvedDatabaseSpecifier::Id(id) => (
177                Some(id.to_string()),
178                &self.database_by_id[id].schemas_by_id[schema_id],
179            ),
180        };
181        let row = self.pack_privilege_array_row(schema.privileges());
182        let privileges = row.unpack_first();
183        BuiltinTableUpdate::row(
184            &*MZ_SCHEMAS,
185            Row::pack_slice(&[
186                Datum::String(&schema_id.to_string()),
187                Datum::UInt32(schema.oid),
188                Datum::from(database_id.as_deref()),
189                Datum::String(&schema.name.schema),
190                Datum::String(&schema.owner_id.to_string()),
191                privileges,
192            ]),
193            diff,
194        )
195    }
196
197    pub(super) fn pack_role_auth_update(
198        &self,
199        id: RoleId,
200        diff: Diff,
201    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
202        let role_auth = self.get_role_auth(&id);
203        let role = self.get_role(&id);
204        BuiltinTableUpdate::row(
205            &*MZ_ROLE_AUTH,
206            Row::pack_slice(&[
207                Datum::String(&role_auth.role_id.to_string()),
208                Datum::UInt32(role.oid),
209                match &role_auth.password_hash {
210                    Some(hash) => Datum::String(hash),
211                    None => Datum::Null,
212                },
213                Datum::TimestampTz(
214                    mz_ore::now::to_datetime(role_auth.updated_at)
215                        .try_into()
216                        .expect("must fit"),
217                ),
218            ]),
219            diff,
220        )
221    }
222
223    pub(super) fn pack_role_update(
224        &self,
225        id: RoleId,
226        diff: Diff,
227    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
228        match id {
229            // PUBLIC role should not show up in mz_roles.
230            RoleId::Public => vec![],
231            id => {
232                let role = self.get_role(&id);
233                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
234
235                // For self managed auth, we can get the actual login and superuser bits
236                // directly from the role. For cloud auth, the LOGIN and SUPERUSER attribute
237                // are nullish and determined each time a role logs in, so there's no clean
238                // way to accurately determine this in the catalog. Instead we do something
239                // a little gross. For system roles, we hardcode the known roles that can
240                // log in. For user roles, we determine `rolcanlogin` based on whether the
241                // role name looks like an email address.
242                //
243                // This works for the vast majority of cases in production. Roles that users
244                // log in to come from Frontegg and therefore *must* be valid email
245                // addresses, while roles that are created via `CREATE ROLE` (e.g.,
246                // `admin`, `prod_app`) almost certainly are not named to look like email
247                // addresses.
248                //
249                // For the moment, we're comfortable with the edge cases here. If we discover
250                // that folks are regularly creating non-login roles with names that look
251                // like an email address (e.g., `admins@sysops.foocorp`), we can change
252                // course.
253                let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
254                let matches_cloud_login_heuristic = regex::Regex::new(cloud_login_regex, true)
255                    .expect("valid regex")
256                    .is_match(&role.name);
257
258                let rolcanlogin = if let Some(login) = role.attributes.login {
259                    login
260                } else {
261                    // Note: Self-managed users with email-like
262                    // role names may have `rolcanlogin` set to true if the role
263                    // was initialized without an explicit LOGIN attribute.
264                    // We're comfortable with this edge case, since roles that
265                    // have login explictly turned off via NOLOGIN will have a
266                    // LOGIN attribute value of false.
267                    builtin_supers.contains(&role.id) || matches_cloud_login_heuristic
268                };
269
270                let rolsuper = if let Some(superuser) = role.attributes.superuser {
271                    Datum::from(superuser)
272                } else if let Some(_) = role.attributes.login {
273                    // If a role has an explicit LOGIN attribute but no SUPERUSER
274                    // attribute, we assume this is self-managed auth where
275                    // the role was created without indicating superuser privileges.
276                    Datum::from(false)
277                } else if builtin_supers.contains(&role.id) {
278                    // System roles (mz_system, mz_support) are superusers
279                    Datum::from(true)
280                } else {
281                    // In cloud environments, superuser status is determined
282                    // at login, so we set `rolsuper` to `null` here because
283                    // it cannot be known beforehand. For self-managed
284                    // auth, roles created without an explicit LOGIN
285                    // attribute would typically have `rolsuper` set to false.
286                    // However, since we cannot reliably distinguish between
287                    // cloud and self-managed here, we conservatively use NULL
288                    // for indeterminate cases.
289                    Datum::Null
290                };
291
292                let role_update = BuiltinTableUpdate::row(
293                    &*MZ_ROLES,
294                    Row::pack_slice(&[
295                        Datum::String(&role.id.to_string()),
296                        Datum::UInt32(role.oid),
297                        Datum::String(&role.name),
298                        Datum::from(role.attributes.inherit),
299                        Datum::from(rolcanlogin),
300                        rolsuper,
301                    ]),
302                    diff,
303                );
304                let mut updates = vec![role_update];
305
306                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
307                // we should instead have a static list of all session vars.
308                let session_vars_reference = SessionVars::new_unchecked(
309                    &mz_build_info::DUMMY_BUILD_INFO,
310                    SYSTEM_USER.clone(),
311                    None,
312                );
313
314                for (name, val) in role.vars() {
315                    let result = session_vars_reference
316                        .inspect(name)
317                        .and_then(|var| var.check(val.borrow()));
318                    let Ok(formatted_val) = result else {
319                        // Note: all variables should have been validated by this point, so we
320                        // shouldn't ever hit this.
321                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
322                        continue;
323                    };
324
325                    let role_var_update = BuiltinTableUpdate::row(
326                        &*MZ_ROLE_PARAMETERS,
327                        Row::pack_slice(&[
328                            Datum::String(&role.id.to_string()),
329                            Datum::String(name),
330                            Datum::String(&formatted_val),
331                        ]),
332                        diff,
333                    );
334                    updates.push(role_var_update);
335                }
336
337                updates
338            }
339        }
340    }
341
342    pub(super) fn pack_role_members_update(
343        &self,
344        role_id: RoleId,
345        member_id: RoleId,
346        diff: Diff,
347    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
348        let grantor_id = self
349            .get_role(&member_id)
350            .membership
351            .map
352            .get(&role_id)
353            .expect("catalog out of sync");
354        BuiltinTableUpdate::row(
355            &*MZ_ROLE_MEMBERS,
356            Row::pack_slice(&[
357                Datum::String(&role_id.to_string()),
358                Datum::String(&member_id.to_string()),
359                Datum::String(&grantor_id.to_string()),
360            ]),
361            diff,
362        )
363    }
364
365    pub(super) fn pack_cluster_update(
366        &self,
367        name: &str,
368        diff: Diff,
369    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
370        let id = self.clusters_by_name[name];
371        let cluster = &self.clusters_by_id[&id];
372        let row = self.pack_privilege_array_row(cluster.privileges());
373        let privileges = row.unpack_first();
374        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
375            match &cluster.config.variant {
376                ClusterVariant::Managed(config) => (
377                    Some(config.size.as_str()),
378                    Some(self.cluster_replica_size_has_disk(&config.size)),
379                    Some(config.replication_factor),
380                    if config.availability_zones.is_empty() {
381                        None
382                    } else {
383                        Some(config.availability_zones.clone())
384                    },
385                    Some(config.logging.log_logging),
386                    config.logging.interval.map(|d| {
387                        Interval::from_duration(&d)
388                            .expect("planning ensured this convertible back to interval")
389                    }),
390                ),
391                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
392            };
393
394        let mut row = Row::default();
395        let mut packer = row.packer();
396        packer.extend([
397            Datum::String(&id.to_string()),
398            Datum::String(name),
399            Datum::String(&cluster.owner_id.to_string()),
400            privileges,
401            cluster.is_managed().into(),
402            size.into(),
403            replication_factor.into(),
404            disk.into(),
405        ]);
406        if let Some(azs) = azs {
407            packer.push_list(azs.iter().map(|az| Datum::String(az)));
408        } else {
409            packer.push(Datum::Null);
410        }
411        packer.push(Datum::from(introspection_debugging));
412        packer.push(Datum::from(introspection_interval));
413
414        let mut updates = Vec::new();
415
416        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
417
418        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
419            let row = match managed_config.schedule {
420                ClusterSchedule::Manual => Row::pack_slice(&[
421                    Datum::String(&id.to_string()),
422                    Datum::String("manual"),
423                    Datum::Null,
424                ]),
425                ClusterSchedule::Refresh {
426                    hydration_time_estimate,
427                } => Row::pack_slice(&[
428                    Datum::String(&id.to_string()),
429                    Datum::String("on-refresh"),
430                    Datum::Interval(
431                        Interval::from_duration(&hydration_time_estimate)
432                            .expect("planning ensured that this is convertible back to Interval"),
433                    ),
434                ]),
435            };
436            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
437        }
438
439        updates.push(BuiltinTableUpdate::row(
440            &*MZ_CLUSTER_WORKLOAD_CLASSES,
441            Row::pack_slice(&[
442                Datum::String(&id.to_string()),
443                Datum::from(cluster.config.workload_class.as_deref()),
444            ]),
445            diff,
446        ));
447
448        updates
449    }
450
451    pub(super) fn pack_cluster_replica_update(
452        &self,
453        cluster_id: ClusterId,
454        name: &str,
455        diff: Diff,
456    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
457        let cluster = &self.clusters_by_id[&cluster_id];
458        let id = cluster.replica_id(name).expect("Must exist");
459        let replica = cluster.replica(id).expect("Must exist");
460
461        let (size, disk, az, internal, pending) = match &replica.config.location {
462            // TODO(guswynn): The column should be `availability_zones`, not
463            // `availability_zone`.
464            ReplicaLocation::Managed(ManagedReplicaLocation {
465                size,
466                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
467                allocation: _,
468                billed_as: _,
469                internal,
470                pending,
471            }) => (
472                Some(&**size),
473                Some(self.cluster_replica_size_has_disk(size)),
474                Some(az.as_str()),
475                *internal,
476                *pending,
477            ),
478            ReplicaLocation::Managed(ManagedReplicaLocation {
479                size,
480                availability_zones: _,
481                allocation: _,
482                billed_as: _,
483                internal,
484                pending,
485            }) => (
486                Some(&**size),
487                Some(self.cluster_replica_size_has_disk(size)),
488                None,
489                *internal,
490                *pending,
491            ),
492            _ => (None, None, None, false, false),
493        };
494
495        let cluster_replica_update = BuiltinTableUpdate::row(
496            &*MZ_CLUSTER_REPLICAS,
497            Row::pack_slice(&[
498                Datum::String(&id.to_string()),
499                Datum::String(name),
500                Datum::String(&cluster_id.to_string()),
501                Datum::from(size),
502                Datum::from(az),
503                Datum::String(&replica.owner_id.to_string()),
504                Datum::from(disk),
505            ]),
506            diff,
507        );
508
509        let mut updates = vec![cluster_replica_update];
510
511        if internal {
512            let update = BuiltinTableUpdate::row(
513                &*MZ_INTERNAL_CLUSTER_REPLICAS,
514                Row::pack_slice(&[Datum::String(&id.to_string())]),
515                diff,
516            );
517            updates.push(update);
518        }
519
520        if pending {
521            let update = BuiltinTableUpdate::row(
522                &*MZ_PENDING_CLUSTER_REPLICAS,
523                Row::pack_slice(&[Datum::String(&id.to_string())]),
524                diff,
525            );
526            updates.push(update);
527        }
528
529        updates
530    }
531
532    pub(crate) fn pack_network_policy_update(
533        &self,
534        policy_id: &NetworkPolicyId,
535        diff: Diff,
536    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
537        let policy = self.get_network_policy(policy_id);
538        let row = self.pack_privilege_array_row(&policy.privileges);
539        let privileges = row.unpack_first();
540        let mut updates = Vec::new();
541        for ref rule in policy.rules.clone() {
542            updates.push(BuiltinTableUpdate::row(
543                &*MZ_NETWORK_POLICY_RULES,
544                Row::pack_slice(&[
545                    Datum::String(&rule.name),
546                    Datum::String(&policy.id.to_string()),
547                    Datum::String(&rule.action.to_string()),
548                    Datum::String(&rule.address.to_string()),
549                    Datum::String(&rule.direction.to_string()),
550                ]),
551                diff,
552            ));
553        }
554        updates.push(BuiltinTableUpdate::row(
555            &*MZ_NETWORK_POLICIES,
556            Row::pack_slice(&[
557                Datum::String(&policy.id.to_string()),
558                Datum::String(&policy.name),
559                Datum::String(&policy.owner_id.to_string()),
560                privileges,
561                Datum::UInt32(policy.oid.clone()),
562            ]),
563            diff,
564        ));
565
566        Ok(updates)
567    }
568
569    pub(super) fn pack_item_update(
570        &self,
571        id: CatalogItemId,
572        diff: Diff,
573    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
574        let entry = self.get_entry(&id);
575        let oid = entry.oid();
576        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
577        let schema_id = &self
578            .get_schema(
579                &entry.name().qualifiers.database_spec,
580                &entry.name().qualifiers.schema_spec,
581                conn_id,
582            )
583            .id;
584        let name = &entry.name().item;
585        let owner_id = entry.owner_id();
586        let privileges_row = self.pack_privilege_array_row(entry.privileges());
587        let privileges = privileges_row.unpack_first();
588        let mut updates = match entry.item() {
589            CatalogItem::Log(_) => self.pack_source_update(
590                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
591                privileges, diff, None,
592            ),
593            CatalogItem::Index(index) => {
594                self.pack_index_update(id, oid, name, owner_id, index, diff)
595            }
596            CatalogItem::Table(table) => {
597                let mut updates = self
598                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
599
600                if let TableDataSource::DataSource {
601                    desc: data_source,
602                    timeline: _,
603                } = &table.data_source
604                {
605                    updates.extend(match data_source {
606                        DataSourceDesc::IngestionExport {
607                            ingestion_id,
608                            external_reference: UnresolvedItemName(external_reference),
609                            details: _,
610                            data_config: _,
611                        } => {
612                            let ingestion_entry = self
613                                .get_entry(ingestion_id)
614                                .source_desc()
615                                .expect("primary source exists")
616                                .expect("primary source is a source");
617
618                            match ingestion_entry.connection.name() {
619                                "postgres" => {
620                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
621                                    // The left-most qualification of Postgres
622                                    // tables is the database, but this
623                                    // information is redundant because each
624                                    // Postgres connection connects to only one
625                                    // database.
626                                    let schema_name = external_reference[1].to_ast_string_simple();
627                                    let table_name = external_reference[2].to_ast_string_simple();
628
629                                    self.pack_postgres_source_tables_update(
630                                        id,
631                                        &schema_name,
632                                        &table_name,
633                                        diff,
634                                    )
635                                }
636                                "mysql" => {
637                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
638                                    let schema_name = external_reference[0].to_ast_string_simple();
639                                    let table_name = external_reference[1].to_ast_string_simple();
640
641                                    self.pack_mysql_source_tables_update(
642                                        id,
643                                        &schema_name,
644                                        &table_name,
645                                        diff,
646                                    )
647                                }
648                                "sql-server" => {
649                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
650                                    // The left-most qualification of SQL Server tables is
651                                    // the database, but this information is redundant
652                                    // because each SQL Server connection connects to
653                                    // only one database.
654                                    let schema_name = external_reference[1].to_ast_string_simple();
655                                    let table_name = external_reference[2].to_ast_string_simple();
656
657                                    self.pack_sql_server_source_table_update(
658                                        id,
659                                        &schema_name,
660                                        &table_name,
661                                        diff,
662                                    )
663                                }
664                                // Load generator sources don't have any special
665                                // updates.
666                                "load-generator" => vec![],
667                                "kafka" => {
668                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
669                                    let topic = external_reference[0].to_ast_string_simple();
670                                    let envelope = data_source.envelope();
671                                    let (key_format, value_format) = data_source.formats();
672
673                                    self.pack_kafka_source_tables_update(
674                                        id,
675                                        &topic,
676                                        envelope,
677                                        key_format,
678                                        value_format,
679                                        diff,
680                                    )
681                                }
682                                s => unreachable!("{s} sources do not have tables"),
683                            }
684                        }
685                        _ => vec![],
686                    });
687                }
688
689                updates
690            }
691            CatalogItem::Source(source) => {
692                let source_type = source.source_type();
693                let connection_id = source.connection_id();
694                let envelope = source.data_source.envelope();
695                let cluster_entry = match source.data_source {
696                    // Ingestion exports don't have their own cluster, but
697                    // run on their ingestion's cluster.
698                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
699                        self.get_entry(&ingestion_id)
700                    }
701                    _ => entry,
702                };
703
704                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
705
706                let (key_format, value_format) = source.data_source.formats();
707
708                let mut updates = self.pack_source_update(
709                    id,
710                    oid,
711                    schema_id,
712                    name,
713                    source_type,
714                    connection_id,
715                    envelope,
716                    key_format,
717                    value_format,
718                    cluster_id.as_deref(),
719                    owner_id,
720                    privileges,
721                    diff,
722                    source.create_sql.as_ref(),
723                );
724
725                updates.extend(match &source.data_source {
726                    DataSourceDesc::Ingestion { desc, .. }
727                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
728                        GenericSourceConnection::Postgres(postgres) => {
729                            self.pack_postgres_source_update(id, postgres, diff)
730                        }
731                        GenericSourceConnection::Kafka(kafka) => {
732                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
733                        }
734                        _ => vec![],
735                    },
736                    DataSourceDesc::IngestionExport {
737                        ingestion_id,
738                        external_reference: UnresolvedItemName(external_reference),
739                        details: _,
740                        data_config: _,
741                    } => {
742                        let ingestion_entry = self
743                            .get_entry(ingestion_id)
744                            .source_desc()
745                            .expect("primary source exists")
746                            .expect("primary source is a source");
747
748                        match ingestion_entry.connection.name() {
749                            "postgres" => {
750                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
751                                // The left-most qualification of Postgres
752                                // tables is the database, but this
753                                // information is redundant because each
754                                // Postgres connection connects to only one
755                                // database.
756                                let schema_name = external_reference[1].to_ast_string_simple();
757                                let table_name = external_reference[2].to_ast_string_simple();
758
759                                self.pack_postgres_source_tables_update(
760                                    id,
761                                    &schema_name,
762                                    &table_name,
763                                    diff,
764                                )
765                            }
766                            "mysql" => {
767                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
768                                let schema_name = external_reference[0].to_ast_string_simple();
769                                let table_name = external_reference[1].to_ast_string_simple();
770
771                                self.pack_mysql_source_tables_update(
772                                    id,
773                                    &schema_name,
774                                    &table_name,
775                                    diff,
776                                )
777                            }
778                            "sql-server" => {
779                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
780                                // The left-most qualification of SQL Server tables is
781                                // the database, but this information is redundant
782                                // because each SQL Server connection connects to
783                                // only one database.
784                                let schema_name = external_reference[1].to_ast_string_simple();
785                                let table_name = external_reference[2].to_ast_string_simple();
786
787                                self.pack_sql_server_source_table_update(
788                                    id,
789                                    &schema_name,
790                                    &table_name,
791                                    diff,
792                                )
793                            }
794                            // Load generator sources don't have any special
795                            // updates.
796                            "load-generator" => vec![],
797                            s => unreachable!("{s} sources do not have subsources"),
798                        }
799                    }
800                    DataSourceDesc::Webhook { .. } => {
801                        vec![self.pack_webhook_source_update(id, diff)]
802                    }
803                    _ => vec![],
804                });
805
806                updates
807            }
808            CatalogItem::View(view) => {
809                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
810            }
811            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
812                id, oid, schema_id, name, owner_id, privileges, mview, diff,
813            ),
814            CatalogItem::Sink(sink) => {
815                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
816            }
817            CatalogItem::Type(ty) => {
818                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
819            }
820            CatalogItem::Func(func) => {
821                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
822            }
823            CatalogItem::Secret(_) => {
824                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
825            }
826            CatalogItem::Connection(connection) => self.pack_connection_update(
827                id, oid, schema_id, name, owner_id, privileges, connection, diff,
828            ),
829            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
830                id, oid, schema_id, name, owner_id, privileges, ct, diff,
831            ),
832        };
833
834        if !entry.item().is_temporary() {
835            // Populate or clean up the `mz_object_dependencies` table.
836            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
837            for dependee in entry.item().references().items() {
838                updates.push(self.pack_depends_update(id, *dependee, diff))
839            }
840        }
841
842        // Always report the latest for an objects columns.
843        if let Some(desc) = entry.relation_desc_latest() {
844            let defaults = match entry.item() {
845                CatalogItem::Table(Table {
846                    data_source: TableDataSource::TableWrites { defaults },
847                    ..
848                }) => Some(defaults),
849                _ => None,
850            };
851            for (i, (column_name, column_type)) in desc.iter().enumerate() {
852                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
853                let default: Datum = default
854                    .as_ref()
855                    .map(|d| Datum::String(d))
856                    .unwrap_or(Datum::Null);
857                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
858                let (type_name, type_oid) = match &column_type.scalar_type {
859                    SqlScalarType::List {
860                        custom_id: Some(custom_id),
861                        ..
862                    }
863                    | SqlScalarType::Map {
864                        custom_id: Some(custom_id),
865                        ..
866                    }
867                    | SqlScalarType::Record {
868                        custom_id: Some(custom_id),
869                        ..
870                    } => {
871                        let entry = self.get_entry(custom_id);
872                        // NOTE(benesch): the `mz_columns.type text` field is
873                        // wrong. Types do not have a name that can be
874                        // represented as a single textual field. There can be
875                        // multiple types with the same name in different
876                        // schemas and databases. We should eventually deprecate
877                        // the `type` field in favor of a new `type_id` field
878                        // that can be joined against `mz_types`.
879                        //
880                        // For now, in the interest of pragmatism, we just use
881                        // the type's item name, and accept that there may be
882                        // ambiguity if the same type name is used in multiple
883                        // schemas. The ambiguity is mitigated by the OID, which
884                        // can be joined against `mz_types.oid` to resolve the
885                        // ambiguity.
886                        let name = &*entry.name().item;
887                        let oid = entry.oid();
888                        (name, oid)
889                    }
890                    _ => (pgtype.name(), pgtype.oid()),
891                };
892                updates.push(BuiltinTableUpdate::row(
893                    &*MZ_COLUMNS,
894                    Row::pack_slice(&[
895                        Datum::String(&id.to_string()),
896                        Datum::String(column_name),
897                        Datum::UInt64(u64::cast_from(i + 1)),
898                        Datum::from(column_type.nullable),
899                        Datum::String(type_name),
900                        default,
901                        Datum::UInt32(type_oid),
902                        Datum::Int32(pgtype.typmod()),
903                    ]),
904                    diff,
905                ));
906            }
907        }
908
909        // Use initial lcw so that we can tell apart default from non-existent windows.
910        if let Some(cw) = entry.item().initial_logical_compaction_window() {
911            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
912        }
913
914        updates.extend(Self::pack_item_global_id_update(entry, diff));
915
916        updates
917    }
918
919    fn pack_item_global_id_update(
920        entry: &CatalogEntry,
921        diff: Diff,
922    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
923        let id = entry.id().to_string();
924        let global_ids = entry.global_ids();
925        global_ids.map(move |global_id| {
926            BuiltinTableUpdate::row(
927                &*MZ_OBJECT_GLOBAL_IDS,
928                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
929                diff,
930            )
931        })
932    }
933
934    fn pack_history_retention_strategy_update(
935        &self,
936        id: CatalogItemId,
937        cw: CompactionWindow,
938        diff: Diff,
939    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
940        let cw: u64 = cw.comparable_timestamp().into();
941        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
942            .expect("must serialize");
943        BuiltinTableUpdate::row(
944            &*MZ_HISTORY_RETENTION_STRATEGIES,
945            Row::pack_slice(&[
946                Datum::String(&id.to_string()),
947                // FOR is the only strategy at the moment. We may introduce FROM or others later.
948                Datum::String("FOR"),
949                cw.into_row().into_element(),
950            ]),
951            diff,
952        )
953    }
954
955    fn pack_table_update(
956        &self,
957        id: CatalogItemId,
958        oid: u32,
959        schema_id: &SchemaSpecifier,
960        name: &str,
961        owner_id: &RoleId,
962        privileges: Datum,
963        diff: Diff,
964        table: &Table,
965    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
966        let redacted = table.create_sql.as_ref().map(|create_sql| {
967            mz_sql::parse::parse(create_sql)
968                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
969                .into_element()
970                .ast
971                .to_ast_string_redacted()
972        });
973        let source_id = if let TableDataSource::DataSource {
974            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
975            ..
976        } = &table.data_source
977        {
978            Some(ingestion_id.to_string())
979        } else {
980            None
981        };
982
983        vec![BuiltinTableUpdate::row(
984            &*MZ_TABLES,
985            Row::pack_slice(&[
986                Datum::String(&id.to_string()),
987                Datum::UInt32(oid),
988                Datum::String(&schema_id.to_string()),
989                Datum::String(name),
990                Datum::String(&owner_id.to_string()),
991                privileges,
992                if let Some(create_sql) = &table.create_sql {
993                    Datum::String(create_sql)
994                } else {
995                    Datum::Null
996                },
997                if let Some(redacted) = &redacted {
998                    Datum::String(redacted)
999                } else {
1000                    Datum::Null
1001                },
1002                if let Some(source_id) = source_id.as_ref() {
1003                    Datum::String(source_id)
1004                } else {
1005                    Datum::Null
1006                },
1007            ]),
1008            diff,
1009        )]
1010    }
1011
1012    fn pack_source_update(
1013        &self,
1014        id: CatalogItemId,
1015        oid: u32,
1016        schema_id: &SchemaSpecifier,
1017        name: &str,
1018        source_desc_name: &str,
1019        connection_id: Option<CatalogItemId>,
1020        envelope: Option<&str>,
1021        key_format: Option<&str>,
1022        value_format: Option<&str>,
1023        cluster_id: Option<&str>,
1024        owner_id: &RoleId,
1025        privileges: Datum,
1026        diff: Diff,
1027        create_sql: Option<&String>,
1028    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1029        let redacted = create_sql.map(|create_sql| {
1030            let create_stmt = mz_sql::parse::parse(create_sql)
1031                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1032                .into_element()
1033                .ast;
1034            create_stmt.to_ast_string_redacted()
1035        });
1036        vec![BuiltinTableUpdate::row(
1037            &*MZ_SOURCES,
1038            Row::pack_slice(&[
1039                Datum::String(&id.to_string()),
1040                Datum::UInt32(oid),
1041                Datum::String(&schema_id.to_string()),
1042                Datum::String(name),
1043                Datum::String(source_desc_name),
1044                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1045                // This is the "source size", which is a remnant from linked
1046                // clusters.
1047                Datum::Null,
1048                Datum::from(envelope),
1049                Datum::from(key_format),
1050                Datum::from(value_format),
1051                Datum::from(cluster_id),
1052                Datum::String(&owner_id.to_string()),
1053                privileges,
1054                if let Some(create_sql) = create_sql {
1055                    Datum::String(create_sql)
1056                } else {
1057                    Datum::Null
1058                },
1059                if let Some(redacted) = &redacted {
1060                    Datum::String(redacted)
1061                } else {
1062                    Datum::Null
1063                },
1064            ]),
1065            diff,
1066        )]
1067    }
1068
1069    fn pack_postgres_source_update(
1070        &self,
1071        id: CatalogItemId,
1072        postgres: &PostgresSourceConnection<ReferencedConnection>,
1073        diff: Diff,
1074    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1075        vec![BuiltinTableUpdate::row(
1076            &*MZ_POSTGRES_SOURCES,
1077            Row::pack_slice(&[
1078                Datum::String(&id.to_string()),
1079                Datum::String(&postgres.publication_details.slot),
1080                Datum::from(postgres.publication_details.timeline_id),
1081            ]),
1082            diff,
1083        )]
1084    }
1085
1086    fn pack_kafka_source_update(
1087        &self,
1088        item_id: CatalogItemId,
1089        collection_id: GlobalId,
1090        kafka: &KafkaSourceConnection<ReferencedConnection>,
1091        diff: Diff,
1092    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1093        vec![BuiltinTableUpdate::row(
1094            &*MZ_KAFKA_SOURCES,
1095            Row::pack_slice(&[
1096                Datum::String(&item_id.to_string()),
1097                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1098                Datum::String(&kafka.topic),
1099            ]),
1100            diff,
1101        )]
1102    }
1103
1104    fn pack_postgres_source_tables_update(
1105        &self,
1106        id: CatalogItemId,
1107        schema_name: &str,
1108        table_name: &str,
1109        diff: Diff,
1110    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1111        vec![BuiltinTableUpdate::row(
1112            &*MZ_POSTGRES_SOURCE_TABLES,
1113            Row::pack_slice(&[
1114                Datum::String(&id.to_string()),
1115                Datum::String(schema_name),
1116                Datum::String(table_name),
1117            ]),
1118            diff,
1119        )]
1120    }
1121
1122    fn pack_mysql_source_tables_update(
1123        &self,
1124        id: CatalogItemId,
1125        schema_name: &str,
1126        table_name: &str,
1127        diff: Diff,
1128    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1129        vec![BuiltinTableUpdate::row(
1130            &*MZ_MYSQL_SOURCE_TABLES,
1131            Row::pack_slice(&[
1132                Datum::String(&id.to_string()),
1133                Datum::String(schema_name),
1134                Datum::String(table_name),
1135            ]),
1136            diff,
1137        )]
1138    }
1139
1140    fn pack_sql_server_source_table_update(
1141        &self,
1142        id: CatalogItemId,
1143        schema_name: &str,
1144        table_name: &str,
1145        diff: Diff,
1146    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1147        vec![BuiltinTableUpdate::row(
1148            &*MZ_SQL_SERVER_SOURCE_TABLES,
1149            Row::pack_slice(&[
1150                Datum::String(&id.to_string()),
1151                Datum::String(schema_name),
1152                Datum::String(table_name),
1153            ]),
1154            diff,
1155        )]
1156    }
1157
1158    fn pack_kafka_source_tables_update(
1159        &self,
1160        id: CatalogItemId,
1161        topic: &str,
1162        envelope: Option<&str>,
1163        key_format: Option<&str>,
1164        value_format: Option<&str>,
1165        diff: Diff,
1166    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1167        vec![BuiltinTableUpdate::row(
1168            &*MZ_KAFKA_SOURCE_TABLES,
1169            Row::pack_slice(&[
1170                Datum::String(&id.to_string()),
1171                Datum::String(topic),
1172                Datum::from(envelope),
1173                Datum::from(key_format),
1174                Datum::from(value_format),
1175            ]),
1176            diff,
1177        )]
1178    }
1179
1180    fn pack_connection_update(
1181        &self,
1182        id: CatalogItemId,
1183        oid: u32,
1184        schema_id: &SchemaSpecifier,
1185        name: &str,
1186        owner_id: &RoleId,
1187        privileges: Datum,
1188        connection: &Connection,
1189        diff: Diff,
1190    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1191        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1192            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1193            .into_element()
1194            .ast;
1195        let mut updates = vec![BuiltinTableUpdate::row(
1196            &*MZ_CONNECTIONS,
1197            Row::pack_slice(&[
1198                Datum::String(&id.to_string()),
1199                Datum::UInt32(oid),
1200                Datum::String(&schema_id.to_string()),
1201                Datum::String(name),
1202                Datum::String(match connection.details {
1203                    ConnectionDetails::Kafka { .. } => "kafka",
1204                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1205                    ConnectionDetails::Postgres { .. } => "postgres",
1206                    ConnectionDetails::Aws(..) => "aws",
1207                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1208                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1209                    ConnectionDetails::MySql { .. } => "mysql",
1210                    ConnectionDetails::SqlServer(_) => "sql-server",
1211                    ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1212                }),
1213                Datum::String(&owner_id.to_string()),
1214                privileges,
1215                Datum::String(&connection.create_sql),
1216                Datum::String(&create_stmt.to_ast_string_redacted()),
1217            ]),
1218            diff,
1219        )];
1220        match connection.details {
1221            ConnectionDetails::Kafka(ref kafka) => {
1222                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1223            }
1224            ConnectionDetails::Aws(ref aws_config) => {
1225                match self.pack_aws_connection_update(id, aws_config, diff) {
1226                    Ok(update) => {
1227                        updates.push(update);
1228                    }
1229                    Err(e) => {
1230                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1231                    }
1232                }
1233            }
1234            ConnectionDetails::AwsPrivatelink(_) => {
1235                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1236                    updates.push(self.pack_aws_privatelink_connection_update(
1237                        id,
1238                        aws_principal_context,
1239                        diff,
1240                    ));
1241                } else {
1242                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1243                }
1244            }
1245            ConnectionDetails::Ssh {
1246                ref key_1,
1247                ref key_2,
1248                ..
1249            } => {
1250                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1251            }
1252            ConnectionDetails::Csr(_)
1253            | ConnectionDetails::Postgres(_)
1254            | ConnectionDetails::MySql(_)
1255            | ConnectionDetails::SqlServer(_)
1256            | ConnectionDetails::IcebergCatalog(_) => (),
1257        };
1258        updates
1259    }
1260
1261    pub(crate) fn pack_ssh_tunnel_connection_update(
1262        &self,
1263        id: CatalogItemId,
1264        key_1: &SshKey,
1265        key_2: &SshKey,
1266        diff: Diff,
1267    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1268        BuiltinTableUpdate::row(
1269            &*MZ_SSH_TUNNEL_CONNECTIONS,
1270            Row::pack_slice(&[
1271                Datum::String(&id.to_string()),
1272                Datum::String(key_1.public_key().as_str()),
1273                Datum::String(key_2.public_key().as_str()),
1274            ]),
1275            diff,
1276        )
1277    }
1278
1279    fn pack_kafka_connection_update(
1280        &self,
1281        id: CatalogItemId,
1282        kafka: &KafkaConnection<ReferencedConnection>,
1283        diff: Diff,
1284    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1285        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1286        let mut row = Row::default();
1287        row.packer()
1288            .try_push_array(
1289                &[ArrayDimension {
1290                    lower_bound: 1,
1291                    length: kafka.brokers.len(),
1292                }],
1293                kafka
1294                    .brokers
1295                    .iter()
1296                    .map(|broker| Datum::String(&broker.address)),
1297            )
1298            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1299        let brokers = row.unpack_first();
1300        vec![BuiltinTableUpdate::row(
1301            &*MZ_KAFKA_CONNECTIONS,
1302            Row::pack_slice(&[
1303                Datum::String(&id.to_string()),
1304                brokers,
1305                Datum::String(&progress_topic),
1306            ]),
1307            diff,
1308        )]
1309    }
1310
1311    pub fn pack_aws_privatelink_connection_update(
1312        &self,
1313        connection_id: CatalogItemId,
1314        aws_principal_context: &AwsPrincipalContext,
1315        diff: Diff,
1316    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1317        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1318        let row = Row::pack_slice(&[
1319            Datum::String(&connection_id.to_string()),
1320            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1321        ]);
1322        BuiltinTableUpdate::row(id, row, diff)
1323    }
1324
1325    pub fn pack_aws_connection_update(
1326        &self,
1327        connection_id: CatalogItemId,
1328        aws_config: &AwsConnection,
1329        diff: Diff,
1330    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1331        let id = &MZ_AWS_CONNECTIONS;
1332
1333        let mut access_key_id = None;
1334        let mut access_key_id_secret_id = None;
1335        let mut secret_access_key_secret_id = None;
1336        let mut session_token = None;
1337        let mut session_token_secret_id = None;
1338        let mut assume_role_arn = None;
1339        let mut assume_role_session_name = None;
1340        let mut principal = None;
1341        let mut external_id = None;
1342        let mut example_trust_policy = None;
1343        match &aws_config.auth {
1344            AwsAuth::Credentials(credentials) => {
1345                match &credentials.access_key_id {
1346                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1347                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1348                }
1349                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1350                match credentials.session_token.as_ref() {
1351                    None => (),
1352                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1353                    Some(StringOrSecret::Secret(s)) => {
1354                        session_token_secret_id = Some(s.to_string())
1355                    }
1356                }
1357            }
1358            AwsAuth::AssumeRole(assume_role) => {
1359                assume_role_arn = Some(assume_role.arn.as_str());
1360                assume_role_session_name = assume_role.session_name.as_deref();
1361                principal = self
1362                    .config
1363                    .connection_context
1364                    .aws_connection_role_arn
1365                    .as_deref();
1366                external_id =
1367                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1368                example_trust_policy = {
1369                    let policy = assume_role
1370                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1371                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1372                    Some(policy.into_row())
1373                };
1374            }
1375        }
1376
1377        let row = Row::pack_slice(&[
1378            Datum::String(&connection_id.to_string()),
1379            Datum::from(aws_config.endpoint.as_deref()),
1380            Datum::from(aws_config.region.as_deref()),
1381            Datum::from(access_key_id),
1382            Datum::from(access_key_id_secret_id.as_deref()),
1383            Datum::from(secret_access_key_secret_id.as_deref()),
1384            Datum::from(session_token),
1385            Datum::from(session_token_secret_id.as_deref()),
1386            Datum::from(assume_role_arn),
1387            Datum::from(assume_role_session_name),
1388            Datum::from(principal),
1389            Datum::from(external_id.as_deref()),
1390            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1391        ]);
1392
1393        Ok(BuiltinTableUpdate::row(id, row, diff))
1394    }
1395
1396    fn pack_view_update(
1397        &self,
1398        id: CatalogItemId,
1399        oid: u32,
1400        schema_id: &SchemaSpecifier,
1401        name: &str,
1402        owner_id: &RoleId,
1403        privileges: Datum,
1404        view: &View,
1405        diff: Diff,
1406    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1407        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1408            .unwrap_or_else(|e| {
1409                panic!(
1410                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1411                    view.create_sql, e
1412                )
1413            })
1414            .into_element()
1415            .ast;
1416        let query = match &create_stmt {
1417            Statement::CreateView(stmt) => &stmt.definition.query,
1418            _ => unreachable!(),
1419        };
1420
1421        let mut query_string = query.to_ast_string_stable();
1422        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1423        // do the same for compatibility's sake.
1424        query_string.push(';');
1425
1426        vec![BuiltinTableUpdate::row(
1427            &*MZ_VIEWS,
1428            Row::pack_slice(&[
1429                Datum::String(&id.to_string()),
1430                Datum::UInt32(oid),
1431                Datum::String(&schema_id.to_string()),
1432                Datum::String(name),
1433                Datum::String(&query_string),
1434                Datum::String(&owner_id.to_string()),
1435                privileges,
1436                Datum::String(&view.create_sql),
1437                Datum::String(&create_stmt.to_ast_string_redacted()),
1438            ]),
1439            diff,
1440        )]
1441    }
1442
1443    fn pack_materialized_view_update(
1444        &self,
1445        id: CatalogItemId,
1446        oid: u32,
1447        schema_id: &SchemaSpecifier,
1448        name: &str,
1449        owner_id: &RoleId,
1450        privileges: Datum,
1451        mview: &MaterializedView,
1452        diff: Diff,
1453    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1454        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1455            .unwrap_or_else(|e| {
1456                panic!(
1457                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1458                    mview.create_sql, e
1459                )
1460            })
1461            .into_element()
1462            .ast;
1463        let query_string = match &create_stmt {
1464            Statement::CreateMaterializedView(stmt) => {
1465                let mut query_string = stmt.query.to_ast_string_stable();
1466                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1467                // do the same for compatibility's sake.
1468                query_string.push(';');
1469                query_string
1470            }
1471            _ => unreachable!(),
1472        };
1473
1474        let mut updates = Vec::new();
1475
1476        updates.push(BuiltinTableUpdate::row(
1477            &*MZ_MATERIALIZED_VIEWS,
1478            Row::pack_slice(&[
1479                Datum::String(&id.to_string()),
1480                Datum::UInt32(oid),
1481                Datum::String(&schema_id.to_string()),
1482                Datum::String(name),
1483                Datum::String(&mview.cluster_id.to_string()),
1484                Datum::String(&query_string),
1485                Datum::String(&owner_id.to_string()),
1486                privileges,
1487                Datum::String(&mview.create_sql),
1488                Datum::String(&create_stmt.to_ast_string_redacted()),
1489            ]),
1490            diff,
1491        ));
1492
1493        if let Some(refresh_schedule) = &mview.refresh_schedule {
1494            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1495            // empty `RefreshSchedule`.
1496            assert!(!refresh_schedule.is_empty());
1497            for RefreshEvery {
1498                interval,
1499                aligned_to,
1500            } in refresh_schedule.everies.iter()
1501            {
1502                let aligned_to_dt = mz_ore::now::to_datetime(
1503                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1504                );
1505                updates.push(BuiltinTableUpdate::row(
1506                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1507                    Row::pack_slice(&[
1508                        Datum::String(&id.to_string()),
1509                        Datum::String("every"),
1510                        Datum::Interval(
1511                            Interval::from_duration(interval).expect(
1512                                "planning ensured that this is convertible back to Interval",
1513                            ),
1514                        ),
1515                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1516                        Datum::Null,
1517                    ]),
1518                    diff,
1519                ));
1520            }
1521            for at in refresh_schedule.ats.iter() {
1522                let at_dt = mz_ore::now::to_datetime(
1523                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1524                );
1525                updates.push(BuiltinTableUpdate::row(
1526                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1527                    Row::pack_slice(&[
1528                        Datum::String(&id.to_string()),
1529                        Datum::String("at"),
1530                        Datum::Null,
1531                        Datum::Null,
1532                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1533                    ]),
1534                    diff,
1535                ));
1536            }
1537        } else {
1538            updates.push(BuiltinTableUpdate::row(
1539                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1540                Row::pack_slice(&[
1541                    Datum::String(&id.to_string()),
1542                    Datum::String("on-commit"),
1543                    Datum::Null,
1544                    Datum::Null,
1545                    Datum::Null,
1546                ]),
1547                diff,
1548            ));
1549        }
1550
1551        if let Some(target_id) = mview.replacement_target {
1552            updates.push(BuiltinTableUpdate::row(
1553                &*MZ_REPLACEMENTS,
1554                Row::pack_slice(&[
1555                    Datum::String(&id.to_string()),
1556                    Datum::String(&target_id.to_string()),
1557                ]),
1558                diff,
1559            ));
1560        }
1561
1562        updates
1563    }
1564
1565    fn pack_continual_task_update(
1566        &self,
1567        id: CatalogItemId,
1568        oid: u32,
1569        schema_id: &SchemaSpecifier,
1570        name: &str,
1571        owner_id: &RoleId,
1572        privileges: Datum,
1573        ct: &ContinualTask,
1574        diff: Diff,
1575    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1576        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1577            .unwrap_or_else(|e| {
1578                panic!(
1579                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1580                    ct.create_sql, e
1581                )
1582            })
1583            .into_element()
1584            .ast;
1585        let query_string = match &create_stmt {
1586            Statement::CreateContinualTask(stmt) => {
1587                let mut query_string = String::new();
1588                for stmt in &stmt.stmts {
1589                    let s = match stmt {
1590                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1591                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1592                    };
1593                    if query_string.is_empty() {
1594                        query_string = s;
1595                    } else {
1596                        query_string.push_str("; ");
1597                        query_string.push_str(&s);
1598                    }
1599                }
1600                query_string
1601            }
1602            _ => unreachable!(),
1603        };
1604
1605        vec![BuiltinTableUpdate::row(
1606            &*MZ_CONTINUAL_TASKS,
1607            Row::pack_slice(&[
1608                Datum::String(&id.to_string()),
1609                Datum::UInt32(oid),
1610                Datum::String(&schema_id.to_string()),
1611                Datum::String(name),
1612                Datum::String(&ct.cluster_id.to_string()),
1613                Datum::String(&query_string),
1614                Datum::String(&owner_id.to_string()),
1615                privileges,
1616                Datum::String(&ct.create_sql),
1617                Datum::String(&create_stmt.to_ast_string_redacted()),
1618            ]),
1619            diff,
1620        )]
1621    }
1622
1623    fn pack_sink_update(
1624        &self,
1625        id: CatalogItemId,
1626        oid: u32,
1627        schema_id: &SchemaSpecifier,
1628        name: &str,
1629        owner_id: &RoleId,
1630        sink: &Sink,
1631        diff: Diff,
1632    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1633        let mut updates = vec![];
1634        match &sink.connection {
1635            StorageSinkConnection::Kafka(KafkaSinkConnection {
1636                topic: topic_name, ..
1637            }) => {
1638                updates.push(BuiltinTableUpdate::row(
1639                    &*MZ_KAFKA_SINKS,
1640                    Row::pack_slice(&[
1641                        Datum::String(&id.to_string()),
1642                        Datum::String(topic_name.as_str()),
1643                    ]),
1644                    diff,
1645                ));
1646            }
1647            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1648                namespace, table, ..
1649            }) => {
1650                updates.push(BuiltinTableUpdate::row(
1651                    &*MZ_ICEBERG_SINKS,
1652                    Row::pack_slice(&[
1653                        Datum::String(&id.to_string()),
1654                        Datum::String(namespace.as_str()),
1655                        Datum::String(table.as_str()),
1656                    ]),
1657                    diff,
1658                ));
1659            }
1660        };
1661
1662        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1663            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1664            .into_element()
1665            .ast;
1666
1667        let envelope = sink.envelope();
1668
1669        // The combined format string is used for the deprecated `format` column.
1670        let combined_format = sink.combined_format();
1671        let (key_format, value_format) = match sink.formats() {
1672            Some((key_format, value_format)) => (key_format, Some(value_format)),
1673            None => (None, None),
1674        };
1675
1676        updates.push(BuiltinTableUpdate::row(
1677            &*MZ_SINKS,
1678            Row::pack_slice(&[
1679                Datum::String(&id.to_string()),
1680                Datum::UInt32(oid),
1681                Datum::String(&schema_id.to_string()),
1682                Datum::String(name),
1683                Datum::String(sink.connection.name()),
1684                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1685                // size column now deprecated w/o linked clusters
1686                Datum::Null,
1687                Datum::from(envelope),
1688                // FIXME: These key/value formats are kinda leaky! Should probably live in
1689                // the kafka sink table.
1690                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1691                Datum::from(key_format),
1692                Datum::from(value_format),
1693                Datum::String(&sink.cluster_id.to_string()),
1694                Datum::String(&owner_id.to_string()),
1695                Datum::String(&sink.create_sql),
1696                Datum::String(&create_stmt.to_ast_string_redacted()),
1697            ]),
1698            diff,
1699        ));
1700
1701        updates
1702    }
1703
1704    fn pack_index_update(
1705        &self,
1706        id: CatalogItemId,
1707        oid: u32,
1708        name: &str,
1709        owner_id: &RoleId,
1710        index: &Index,
1711        diff: Diff,
1712    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1713        let mut updates = vec![];
1714
1715        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1716            .unwrap_or_else(|e| {
1717                panic!(
1718                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1719                    index.create_sql, e
1720                )
1721            })
1722            .into_element()
1723            .ast;
1724
1725        let key_sqls = match &create_stmt {
1726            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1727                .as_ref()
1728                .expect("key_parts is filled in during planning"),
1729            _ => unreachable!(),
1730        };
1731        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1732
1733        updates.push(BuiltinTableUpdate::row(
1734            &*MZ_INDEXES,
1735            Row::pack_slice(&[
1736                Datum::String(&id.to_string()),
1737                Datum::UInt32(oid),
1738                Datum::String(name),
1739                Datum::String(&on_item_id.to_string()),
1740                Datum::String(&index.cluster_id.to_string()),
1741                Datum::String(&owner_id.to_string()),
1742                Datum::String(&index.create_sql),
1743                Datum::String(&create_stmt.to_ast_string_redacted()),
1744            ]),
1745            diff,
1746        ));
1747
1748        for (i, key) in index.keys.iter().enumerate() {
1749            let on_entry = self.get_entry_by_global_id(&index.on);
1750            let on_desc = on_entry
1751                .relation_desc()
1752                .expect("can only create indexes on items with a valid description");
1753            let nullable = key.typ(&on_desc.typ().column_types).nullable;
1754            let seq_in_index = u64::cast_from(i + 1);
1755            let key_sql = key_sqls
1756                .get(i)
1757                .expect("missing sql information for index key")
1758                .to_ast_string_simple();
1759            let (field_number, expression) = match key {
1760                MirScalarExpr::Column(col, _) => {
1761                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1762                }
1763                _ => (Datum::Null, Datum::String(&key_sql)),
1764            };
1765            updates.push(BuiltinTableUpdate::row(
1766                &*MZ_INDEX_COLUMNS,
1767                Row::pack_slice(&[
1768                    Datum::String(&id.to_string()),
1769                    Datum::UInt64(seq_in_index),
1770                    field_number,
1771                    expression,
1772                    Datum::from(nullable),
1773                ]),
1774                diff,
1775            ));
1776        }
1777
1778        updates
1779    }
1780
1781    fn pack_type_update(
1782        &self,
1783        id: CatalogItemId,
1784        oid: u32,
1785        schema_id: &SchemaSpecifier,
1786        name: &str,
1787        owner_id: &RoleId,
1788        privileges: Datum,
1789        typ: &Type,
1790        diff: Diff,
1791    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1792        let mut out = vec![];
1793
1794        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1795            mz_sql::parse::parse(create_sql)
1796                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1797                .into_element()
1798                .ast
1799                .to_ast_string_redacted()
1800        });
1801
1802        out.push(BuiltinTableUpdate::row(
1803            &*MZ_TYPES,
1804            Row::pack_slice(&[
1805                Datum::String(&id.to_string()),
1806                Datum::UInt32(oid),
1807                Datum::String(&schema_id.to_string()),
1808                Datum::String(name),
1809                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1810                Datum::String(&owner_id.to_string()),
1811                privileges,
1812                if let Some(create_sql) = &typ.create_sql {
1813                    Datum::String(create_sql)
1814                } else {
1815                    Datum::Null
1816                },
1817                if let Some(redacted) = &redacted {
1818                    Datum::String(redacted)
1819                } else {
1820                    Datum::Null
1821                },
1822            ]),
1823            diff,
1824        ));
1825
1826        let mut row = Row::default();
1827        let mut packer = row.packer();
1828
1829        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1830            if mods.is_empty() {
1831                packer.push(Datum::Null);
1832            } else {
1833                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1834            }
1835        }
1836
1837        let index_id = match &typ.details.typ {
1838            CatalogType::Array {
1839                element_reference: element_id,
1840            } => {
1841                packer.push(Datum::String(&id.to_string()));
1842                packer.push(Datum::String(&element_id.to_string()));
1843                &MZ_ARRAY_TYPES
1844            }
1845            CatalogType::List {
1846                element_reference: element_id,
1847                element_modifiers,
1848            } => {
1849                packer.push(Datum::String(&id.to_string()));
1850                packer.push(Datum::String(&element_id.to_string()));
1851                append_modifier(&mut packer, element_modifiers);
1852                &MZ_LIST_TYPES
1853            }
1854            CatalogType::Map {
1855                key_reference: key_id,
1856                value_reference: value_id,
1857                key_modifiers,
1858                value_modifiers,
1859            } => {
1860                packer.push(Datum::String(&id.to_string()));
1861                packer.push(Datum::String(&key_id.to_string()));
1862                packer.push(Datum::String(&value_id.to_string()));
1863                append_modifier(&mut packer, key_modifiers);
1864                append_modifier(&mut packer, value_modifiers);
1865                &MZ_MAP_TYPES
1866            }
1867            CatalogType::Pseudo => {
1868                packer.push(Datum::String(&id.to_string()));
1869                &MZ_PSEUDO_TYPES
1870            }
1871            _ => {
1872                packer.push(Datum::String(&id.to_string()));
1873                &MZ_BASE_TYPES
1874            }
1875        };
1876        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1877
1878        if let Some(pg_metadata) = &typ.details.pg_metadata {
1879            out.push(BuiltinTableUpdate::row(
1880                &*MZ_TYPE_PG_METADATA,
1881                Row::pack_slice(&[
1882                    Datum::String(&id.to_string()),
1883                    Datum::UInt32(pg_metadata.typinput_oid),
1884                    Datum::UInt32(pg_metadata.typreceive_oid),
1885                ]),
1886                diff,
1887            ));
1888        }
1889
1890        out
1891    }
1892
1893    fn pack_func_update(
1894        &self,
1895        id: CatalogItemId,
1896        schema_id: &SchemaSpecifier,
1897        name: &str,
1898        owner_id: &RoleId,
1899        func: &Func,
1900        diff: Diff,
1901    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1902        let mut updates = vec![];
1903        for func_impl_details in func.inner.func_impls() {
1904            let arg_type_ids = func_impl_details
1905                .arg_typs
1906                .iter()
1907                .map(|typ| self.get_system_type(typ).id().to_string())
1908                .collect::<Vec<_>>();
1909
1910            let mut row = Row::default();
1911            row.packer()
1912                .try_push_array(
1913                    &[ArrayDimension {
1914                        lower_bound: 1,
1915                        length: arg_type_ids.len(),
1916                    }],
1917                    arg_type_ids.iter().map(|id| Datum::String(id)),
1918                )
1919                .expect(
1920                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1921                );
1922            let arg_type_ids = row.unpack_first();
1923
1924            updates.push(BuiltinTableUpdate::row(
1925                &*MZ_FUNCTIONS,
1926                Row::pack_slice(&[
1927                    Datum::String(&id.to_string()),
1928                    Datum::UInt32(func_impl_details.oid),
1929                    Datum::String(&schema_id.to_string()),
1930                    Datum::String(name),
1931                    arg_type_ids,
1932                    Datum::from(
1933                        func_impl_details
1934                            .variadic_typ
1935                            .map(|typ| self.get_system_type(typ).id().to_string())
1936                            .as_deref(),
1937                    ),
1938                    Datum::from(
1939                        func_impl_details
1940                            .return_typ
1941                            .map(|typ| self.get_system_type(typ).id().to_string())
1942                            .as_deref(),
1943                    ),
1944                    func_impl_details.return_is_set.into(),
1945                    Datum::String(&owner_id.to_string()),
1946                ]),
1947                diff,
1948            ));
1949
1950            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1951                updates.push(BuiltinTableUpdate::row(
1952                    &*MZ_AGGREGATES,
1953                    Row::pack_slice(&[
1954                        Datum::UInt32(func_impl_details.oid),
1955                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1956                        Datum::String("n"),
1957                        Datum::Int16(0),
1958                    ]),
1959                    diff,
1960                ));
1961            }
1962        }
1963        updates
1964    }
1965
1966    pub fn pack_op_update(
1967        &self,
1968        operator: &str,
1969        func_impl_details: FuncImplCatalogDetails,
1970        diff: Diff,
1971    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1972        let arg_type_ids = func_impl_details
1973            .arg_typs
1974            .iter()
1975            .map(|typ| self.get_system_type(typ).id().to_string())
1976            .collect::<Vec<_>>();
1977
1978        let mut row = Row::default();
1979        row.packer()
1980            .try_push_array(
1981                &[ArrayDimension {
1982                    lower_bound: 1,
1983                    length: arg_type_ids.len(),
1984                }],
1985                arg_type_ids.iter().map(|id| Datum::String(id)),
1986            )
1987            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1988        let arg_type_ids = row.unpack_first();
1989
1990        BuiltinTableUpdate::row(
1991            &*MZ_OPERATORS,
1992            Row::pack_slice(&[
1993                Datum::UInt32(func_impl_details.oid),
1994                Datum::String(operator),
1995                arg_type_ids,
1996                Datum::from(
1997                    func_impl_details
1998                        .return_typ
1999                        .map(|typ| self.get_system_type(typ).id().to_string())
2000                        .as_deref(),
2001                ),
2002            ]),
2003            diff,
2004        )
2005    }
2006
2007    fn pack_secret_update(
2008        &self,
2009        id: CatalogItemId,
2010        oid: u32,
2011        schema_id: &SchemaSpecifier,
2012        name: &str,
2013        owner_id: &RoleId,
2014        privileges: Datum,
2015        diff: Diff,
2016    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2017        vec![BuiltinTableUpdate::row(
2018            &*MZ_SECRETS,
2019            Row::pack_slice(&[
2020                Datum::String(&id.to_string()),
2021                Datum::UInt32(oid),
2022                Datum::String(&schema_id.to_string()),
2023                Datum::String(name),
2024                Datum::String(&owner_id.to_string()),
2025                privileges,
2026            ]),
2027            diff,
2028        )]
2029    }
2030
2031    pub fn pack_audit_log_update(
2032        &self,
2033        event: &VersionedEvent,
2034        diff: Diff,
2035    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2036        let (event_type, object_type, details, user, occurred_at): (
2037            &EventType,
2038            &ObjectType,
2039            &EventDetails,
2040            &Option<String>,
2041            u64,
2042        ) = match event {
2043            VersionedEvent::V1(ev) => (
2044                &ev.event_type,
2045                &ev.object_type,
2046                &ev.details,
2047                &ev.user,
2048                ev.occurred_at,
2049            ),
2050        };
2051        let details = Jsonb::from_serde_json(details.as_json())
2052            .map_err(|e| {
2053                Error::new(ErrorKind::Unstructured(format!(
2054                    "could not pack audit log update: {}",
2055                    e
2056                )))
2057            })?
2058            .into_row();
2059        let details = details
2060            .iter()
2061            .next()
2062            .expect("details created above with a single jsonb column");
2063        let dt = mz_ore::now::to_datetime(occurred_at);
2064        let id = event.sortable_id();
2065        Ok(BuiltinTableUpdate::row(
2066            &*MZ_AUDIT_EVENTS,
2067            Row::pack_slice(&[
2068                Datum::UInt64(id),
2069                Datum::String(&format!("{}", event_type)),
2070                Datum::String(&format!("{}", object_type)),
2071                details,
2072                match user {
2073                    Some(user) => Datum::String(user),
2074                    None => Datum::Null,
2075                },
2076                Datum::TimestampTz(dt.try_into().expect("must fit")),
2077            ]),
2078            diff,
2079        ))
2080    }
2081
2082    pub fn pack_storage_usage_update(
2083        &self,
2084        VersionedStorageUsage::V1(event): VersionedStorageUsage,
2085        diff: Diff,
2086    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2087        let id = &MZ_STORAGE_USAGE_BY_SHARD;
2088        let row = Row::pack_slice(&[
2089            Datum::UInt64(event.id),
2090            Datum::from(event.shard_id.as_deref()),
2091            Datum::UInt64(event.size_bytes),
2092            Datum::TimestampTz(
2093                mz_ore::now::to_datetime(event.collection_timestamp)
2094                    .try_into()
2095                    .expect("must fit"),
2096            ),
2097        ]);
2098        BuiltinTableUpdate::row(id, row, diff)
2099    }
2100
2101    pub fn pack_egress_ip_update(
2102        &self,
2103        ip: &IpNet,
2104    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2105        let id = &MZ_EGRESS_IPS;
2106        let addr = ip.addr();
2107        let row = Row::pack_slice(&[
2108            Datum::String(&addr.to_string()),
2109            Datum::Int32(ip.prefix_len().into()),
2110            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2111        ]);
2112        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2113    }
2114
2115    pub fn pack_license_key_update(
2116        &self,
2117        license_key: &ValidatedLicenseKey,
2118    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2119        let id = &MZ_LICENSE_KEYS;
2120        let row = Row::pack_slice(&[
2121            Datum::String(&license_key.id),
2122            Datum::String(&license_key.organization),
2123            Datum::String(&license_key.environment_id),
2124            Datum::TimestampTz(
2125                mz_ore::now::to_datetime(license_key.expiration * 1000)
2126                    .try_into()
2127                    .expect("must fit"),
2128            ),
2129            Datum::TimestampTz(
2130                mz_ore::now::to_datetime(license_key.not_before * 1000)
2131                    .try_into()
2132                    .expect("must fit"),
2133            ),
2134        ]);
2135        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2136    }
2137
2138    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2139        let mut updates = Vec::new();
2140        for (size, alloc) in &self.cluster_replica_sizes.0 {
2141            if alloc.disabled {
2142                continue;
2143            }
2144
2145            // Just invent something when the limits are `None`, which only happens in non-prod
2146            // environments (tests, process orchestrator, etc.)
2147            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2148            let MemoryLimit(ByteSize(memory_bytes)) =
2149                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2150            let DiskLimit(ByteSize(disk_bytes)) =
2151                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2152
2153            let row = Row::pack_slice(&[
2154                size.as_str().into(),
2155                u64::cast_from(alloc.scale).into(),
2156                u64::cast_from(alloc.workers).into(),
2157                cpu_limit.as_nanocpus().into(),
2158                memory_bytes.into(),
2159                disk_bytes.into(),
2160                (alloc.credits_per_hour).into(),
2161            ]);
2162
2163            updates.push(BuiltinTableUpdate::row(
2164                &*MZ_CLUSTER_REPLICA_SIZES,
2165                row,
2166                Diff::ONE,
2167            ));
2168        }
2169
2170        updates
2171    }
2172
2173    pub fn pack_subscribe_update(
2174        &self,
2175        id: GlobalId,
2176        subscribe: &ActiveSubscribe,
2177        diff: Diff,
2178    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2179        let mut row = Row::default();
2180        let mut packer = row.packer();
2181        packer.push(Datum::String(&id.to_string()));
2182        packer.push(Datum::Uuid(subscribe.session_uuid));
2183        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2184
2185        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2186        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2187
2188        let depends_on: Vec<_> = subscribe
2189            .depends_on
2190            .iter()
2191            .map(|id| id.to_string())
2192            .collect();
2193        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2194
2195        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2196    }
2197
2198    pub fn pack_session_update(
2199        &self,
2200        conn: &ConnMeta,
2201        diff: Diff,
2202    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2203        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2204        BuiltinTableUpdate::row(
2205            &*MZ_SESSIONS,
2206            Row::pack_slice(&[
2207                Datum::Uuid(conn.uuid()),
2208                Datum::UInt32(conn.conn_id().unhandled()),
2209                Datum::String(&conn.authenticated_role_id().to_string()),
2210                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2211                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2212            ]),
2213            diff,
2214        )
2215    }
2216
2217    pub fn pack_default_privileges_update(
2218        &self,
2219        default_privilege_object: &DefaultPrivilegeObject,
2220        grantee: &RoleId,
2221        acl_mode: &AclMode,
2222        diff: Diff,
2223    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2224        BuiltinTableUpdate::row(
2225            &*MZ_DEFAULT_PRIVILEGES,
2226            Row::pack_slice(&[
2227                default_privilege_object.role_id.to_string().as_str().into(),
2228                default_privilege_object
2229                    .database_id
2230                    .map(|database_id| database_id.to_string())
2231                    .as_deref()
2232                    .into(),
2233                default_privilege_object
2234                    .schema_id
2235                    .map(|schema_id| schema_id.to_string())
2236                    .as_deref()
2237                    .into(),
2238                default_privilege_object
2239                    .object_type
2240                    .to_string()
2241                    .to_lowercase()
2242                    .as_str()
2243                    .into(),
2244                grantee.to_string().as_str().into(),
2245                acl_mode.to_string().as_str().into(),
2246            ]),
2247            diff,
2248        )
2249    }
2250
2251    pub fn pack_system_privileges_update(
2252        &self,
2253        privileges: MzAclItem,
2254        diff: Diff,
2255    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2256        BuiltinTableUpdate::row(
2257            &*MZ_SYSTEM_PRIVILEGES,
2258            Row::pack_slice(&[privileges.into()]),
2259            diff,
2260        )
2261    }
2262
2263    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2264        let mut row = Row::default();
2265        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2266        row.packer()
2267            .try_push_array(
2268                &[ArrayDimension {
2269                    lower_bound: 1,
2270                    length: flat_privileges.len(),
2271                }],
2272                flat_privileges
2273                    .into_iter()
2274                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2275            )
2276            .expect("privileges is 1 dimensional, and its length is used for the array length");
2277        row
2278    }
2279
2280    pub fn pack_comment_update(
2281        &self,
2282        object_id: CommentObjectId,
2283        column_pos: Option<usize>,
2284        comment: &str,
2285        diff: Diff,
2286    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2287        // Use the audit log representation so it's easier to join against.
2288        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2289        let audit_type = super::object_type_to_audit_object_type(object_type);
2290        let object_type_str = audit_type.to_string();
2291
2292        let object_id_str = match object_id {
2293            CommentObjectId::Table(global_id)
2294            | CommentObjectId::View(global_id)
2295            | CommentObjectId::MaterializedView(global_id)
2296            | CommentObjectId::Source(global_id)
2297            | CommentObjectId::Sink(global_id)
2298            | CommentObjectId::Index(global_id)
2299            | CommentObjectId::Func(global_id)
2300            | CommentObjectId::Connection(global_id)
2301            | CommentObjectId::Secret(global_id)
2302            | CommentObjectId::Type(global_id)
2303            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2304            CommentObjectId::Role(role_id) => role_id.to_string(),
2305            CommentObjectId::Database(database_id) => database_id.to_string(),
2306            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2307            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2308            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2309            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2310        };
2311        let column_pos_datum = match column_pos {
2312            Some(pos) => {
2313                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2314                let pos =
2315                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2316                Datum::Int32(pos)
2317            }
2318            None => Datum::Null,
2319        };
2320
2321        BuiltinTableUpdate::row(
2322            &*MZ_COMMENTS,
2323            Row::pack_slice(&[
2324                Datum::String(&object_id_str),
2325                Datum::String(&object_type_str),
2326                column_pos_datum,
2327                Datum::String(comment),
2328            ]),
2329            diff,
2330        )
2331    }
2332
2333    pub fn pack_webhook_source_update(
2334        &self,
2335        item_id: CatalogItemId,
2336        diff: Diff,
2337    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2338        let url = self
2339            .try_get_webhook_url(&item_id)
2340            .expect("webhook source should exist");
2341        let url = url.to_string();
2342        let name = &self.get_entry(&item_id).name().item;
2343        let id_str = item_id.to_string();
2344
2345        BuiltinTableUpdate::row(
2346            &*MZ_WEBHOOKS_SOURCES,
2347            Row::pack_slice(&[
2348                Datum::String(&id_str),
2349                Datum::String(name),
2350                Datum::String(&url),
2351            ]),
2352            diff,
2353        )
2354    }
2355
2356    pub fn pack_source_references_update(
2357        &self,
2358        source_references: &SourceReferences,
2359        diff: Diff,
2360    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2361        let source_id = source_references.source_id.to_string();
2362        let updated_at = &source_references.updated_at;
2363        source_references
2364            .references
2365            .iter()
2366            .map(|reference| {
2367                let mut row = Row::default();
2368                let mut packer = row.packer();
2369                packer.extend([
2370                    Datum::String(&source_id),
2371                    reference
2372                        .namespace
2373                        .as_ref()
2374                        .map(|s| Datum::String(s))
2375                        .unwrap_or(Datum::Null),
2376                    Datum::String(&reference.name),
2377                    Datum::TimestampTz(
2378                        mz_ore::now::to_datetime(*updated_at)
2379                            .try_into()
2380                            .expect("must fit"),
2381                    ),
2382                ]);
2383                if reference.columns.len() > 0 {
2384                    packer
2385                        .try_push_array(
2386                            &[ArrayDimension {
2387                                lower_bound: 1,
2388                                length: reference.columns.len(),
2389                            }],
2390                            reference.columns.iter().map(|col| Datum::String(col)),
2391                        )
2392                        .expect(
2393                            "columns is 1 dimensional, and its length is used for the array length",
2394                        );
2395                } else {
2396                    packer.push(Datum::Null);
2397                }
2398
2399                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2400            })
2401            .collect()
2402    }
2403}