mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20    MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
21    MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
22    MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEX_COLUMNS, MZ_INDEXES,
23    MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
24    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
25    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
26    MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
27    MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
28    MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS,
29    MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
30    MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
31    MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
32};
33use mz_catalog::config::AwsPrincipalContext;
34use mz_catalog::durable::SourceReferences;
35use mz_catalog::memory::error::{Error, ErrorKind};
36use mz_catalog::memory::objects::{
37    CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
38    MaterializedView, Sink, Table, TableDataSource, Type, View,
39};
40use mz_controller::clusters::{
41    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
42};
43use mz_controller_types::ClusterId;
44use mz_expr::MirScalarExpr;
45use mz_license_keys::ValidatedLicenseKey;
46use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_persist_client::batch::ProtoBatch;
50use mz_repr::adt::array::ArrayDimension;
51use mz_repr::adt::interval::Interval;
52use mz_repr::adt::jsonb::Jsonb;
53use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
54use mz_repr::network_policy_id::NetworkPolicyId;
55use mz_repr::refresh_schedule::RefreshEvery;
56use mz_repr::role_id::RoleId;
57use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
58use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
59use mz_sql::catalog::{
60    CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
61    TypeCategory,
62};
63use mz_sql::func::FuncImplCatalogDetails;
64use mz_sql::names::{
65    CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
66};
67use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
68use mz_sql::session::user::SYSTEM_USER;
69use mz_sql::session::vars::SessionVars;
70use mz_sql_parser::ast::display::AstDisplay;
71use mz_storage_client::client::TableData;
72use mz_storage_types::connections::KafkaConnection;
73use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
74use mz_storage_types::connections::inline::ReferencedConnection;
75use mz_storage_types::connections::string_or_secret::StringOrSecret;
76use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
77use mz_storage_types::sources::{
78    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
79};
80use smallvec::smallvec;
81
82// DO NOT add any more imports from `crate` outside of `crate::catalog`.
83use crate::active_compute_sink::ActiveSubscribe;
84use crate::catalog::CatalogState;
85use crate::coord::ConnMeta;
86
87/// An update to a built-in table.
88#[derive(Debug, Clone)]
89pub struct BuiltinTableUpdate<T = CatalogItemId> {
90    /// The reference of the table to update.
91    pub id: T,
92    /// The data to put into the table.
93    pub data: TableData,
94}
95
96impl<T> BuiltinTableUpdate<T> {
97    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
98    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
99        BuiltinTableUpdate {
100            id,
101            data: TableData::Rows(vec![(row, diff)]),
102        }
103    }
104
105    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
106        BuiltinTableUpdate {
107            id,
108            data: TableData::Batches(smallvec![batch]),
109        }
110    }
111}
112
113impl CatalogState {
114    pub fn resolve_builtin_table_updates(
115        &self,
116        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
117    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
118        builtin_table_update
119            .into_iter()
120            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
121            .collect()
122    }
123
124    pub fn resolve_builtin_table_update(
125        &self,
126        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
127    ) -> BuiltinTableUpdate<CatalogItemId> {
128        let id = self.resolve_builtin_table(id);
129        BuiltinTableUpdate { id, data }
130    }
131
132    pub fn pack_depends_update(
133        &self,
134        depender: CatalogItemId,
135        dependee: CatalogItemId,
136        diff: Diff,
137    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
138        let row = Row::pack_slice(&[
139            Datum::String(&depender.to_string()),
140            Datum::String(&dependee.to_string()),
141        ]);
142        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
143    }
144
145    pub(super) fn pack_database_update(
146        &self,
147        database_id: &DatabaseId,
148        diff: Diff,
149    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
150        let database = self.get_database(database_id);
151        let row = self.pack_privilege_array_row(database.privileges());
152        let privileges = row.unpack_first();
153        BuiltinTableUpdate::row(
154            &*MZ_DATABASES,
155            Row::pack_slice(&[
156                Datum::String(&database.id.to_string()),
157                Datum::UInt32(database.oid),
158                Datum::String(database.name()),
159                Datum::String(&database.owner_id.to_string()),
160                privileges,
161            ]),
162            diff,
163        )
164    }
165
166    pub(super) fn pack_schema_update(
167        &self,
168        database_spec: &ResolvedDatabaseSpecifier,
169        schema_id: &SchemaId,
170        diff: Diff,
171    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
172        let (database_id, schema) = match database_spec {
173            ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
174            ResolvedDatabaseSpecifier::Id(id) => (
175                Some(id.to_string()),
176                &self.database_by_id[id].schemas_by_id[schema_id],
177            ),
178        };
179        let row = self.pack_privilege_array_row(schema.privileges());
180        let privileges = row.unpack_first();
181        BuiltinTableUpdate::row(
182            &*MZ_SCHEMAS,
183            Row::pack_slice(&[
184                Datum::String(&schema_id.to_string()),
185                Datum::UInt32(schema.oid),
186                Datum::from(database_id.as_deref()),
187                Datum::String(&schema.name.schema),
188                Datum::String(&schema.owner_id.to_string()),
189                privileges,
190            ]),
191            diff,
192        )
193    }
194
195    pub(super) fn pack_role_update(
196        &self,
197        id: RoleId,
198        diff: Diff,
199    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
200        match id {
201            // PUBLIC role should not show up in mz_roles.
202            RoleId::Public => vec![],
203            id => {
204                let role = self.get_role(&id);
205                let role_update = BuiltinTableUpdate::row(
206                    &*MZ_ROLES,
207                    Row::pack_slice(&[
208                        Datum::String(&role.id.to_string()),
209                        Datum::UInt32(role.oid),
210                        Datum::String(&role.name),
211                        Datum::from(role.attributes.inherit),
212                    ]),
213                    diff,
214                );
215                let mut updates = vec![role_update];
216
217                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
218                // we should instead have a static list of all session vars.
219                let session_vars_reference = SessionVars::new_unchecked(
220                    &mz_build_info::DUMMY_BUILD_INFO,
221                    SYSTEM_USER.clone(),
222                    None,
223                );
224
225                for (name, val) in role.vars() {
226                    let result = session_vars_reference
227                        .inspect(name)
228                        .and_then(|var| var.check(val.borrow()));
229                    let Ok(formatted_val) = result else {
230                        // Note: all variables should have been validated by this point, so we
231                        // shouldn't ever hit this.
232                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
233                        continue;
234                    };
235
236                    let role_var_update = BuiltinTableUpdate::row(
237                        &*MZ_ROLE_PARAMETERS,
238                        Row::pack_slice(&[
239                            Datum::String(&role.id.to_string()),
240                            Datum::String(name),
241                            Datum::String(&formatted_val),
242                        ]),
243                        diff,
244                    );
245                    updates.push(role_var_update);
246                }
247
248                updates
249            }
250        }
251    }
252
253    pub(super) fn pack_role_members_update(
254        &self,
255        role_id: RoleId,
256        member_id: RoleId,
257        diff: Diff,
258    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
259        let grantor_id = self
260            .get_role(&member_id)
261            .membership
262            .map
263            .get(&role_id)
264            .expect("catalog out of sync");
265        BuiltinTableUpdate::row(
266            &*MZ_ROLE_MEMBERS,
267            Row::pack_slice(&[
268                Datum::String(&role_id.to_string()),
269                Datum::String(&member_id.to_string()),
270                Datum::String(&grantor_id.to_string()),
271            ]),
272            diff,
273        )
274    }
275
276    pub(super) fn pack_cluster_update(
277        &self,
278        name: &str,
279        diff: Diff,
280    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
281        let id = self.clusters_by_name[name];
282        let cluster = &self.clusters_by_id[&id];
283        let row = self.pack_privilege_array_row(cluster.privileges());
284        let privileges = row.unpack_first();
285        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
286            match &cluster.config.variant {
287                ClusterVariant::Managed(config) => (
288                    Some(config.size.as_str()),
289                    Some(self.cluster_replica_size_has_disk(&config.size)),
290                    Some(config.replication_factor),
291                    if config.availability_zones.is_empty() {
292                        None
293                    } else {
294                        Some(config.availability_zones.clone())
295                    },
296                    Some(config.logging.log_logging),
297                    config.logging.interval.map(|d| {
298                        Interval::from_duration(&d)
299                            .expect("planning ensured this convertible back to interval")
300                    }),
301                ),
302                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
303            };
304
305        let mut row = Row::default();
306        let mut packer = row.packer();
307        packer.extend([
308            Datum::String(&id.to_string()),
309            Datum::String(name),
310            Datum::String(&cluster.owner_id.to_string()),
311            privileges,
312            cluster.is_managed().into(),
313            size.into(),
314            replication_factor.into(),
315            disk.into(),
316        ]);
317        if let Some(azs) = azs {
318            packer.push_list(azs.iter().map(|az| Datum::String(az)));
319        } else {
320            packer.push(Datum::Null);
321        }
322        packer.push(Datum::from(introspection_debugging));
323        packer.push(Datum::from(introspection_interval));
324
325        let mut updates = Vec::new();
326
327        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
328
329        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
330            let row = match managed_config.schedule {
331                ClusterSchedule::Manual => Row::pack_slice(&[
332                    Datum::String(&id.to_string()),
333                    Datum::String("manual"),
334                    Datum::Null,
335                ]),
336                ClusterSchedule::Refresh {
337                    hydration_time_estimate,
338                } => Row::pack_slice(&[
339                    Datum::String(&id.to_string()),
340                    Datum::String("on-refresh"),
341                    Datum::Interval(
342                        Interval::from_duration(&hydration_time_estimate)
343                            .expect("planning ensured that this is convertible back to Interval"),
344                    ),
345                ]),
346            };
347            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
348        }
349
350        updates.push(BuiltinTableUpdate::row(
351            &*MZ_CLUSTER_WORKLOAD_CLASSES,
352            Row::pack_slice(&[
353                Datum::String(&id.to_string()),
354                Datum::from(cluster.config.workload_class.as_deref()),
355            ]),
356            diff,
357        ));
358
359        updates
360    }
361
362    pub(super) fn pack_cluster_replica_update(
363        &self,
364        cluster_id: ClusterId,
365        name: &str,
366        diff: Diff,
367    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
368        let cluster = &self.clusters_by_id[&cluster_id];
369        let id = cluster.replica_id(name).expect("Must exist");
370        let replica = cluster.replica(id).expect("Must exist");
371
372        let (size, disk, az, internal, pending) = match &replica.config.location {
373            // TODO(guswynn): The column should be `availability_zones`, not
374            // `availability_zone`.
375            ReplicaLocation::Managed(ManagedReplicaLocation {
376                size,
377                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
378                allocation: _,
379                billed_as: _,
380                internal,
381                pending,
382            }) => (
383                Some(&**size),
384                Some(self.cluster_replica_size_has_disk(size)),
385                Some(az.as_str()),
386                *internal,
387                *pending,
388            ),
389            ReplicaLocation::Managed(ManagedReplicaLocation {
390                size,
391                availability_zones: _,
392                allocation: _,
393                billed_as: _,
394                internal,
395                pending,
396            }) => (
397                Some(&**size),
398                Some(self.cluster_replica_size_has_disk(size)),
399                None,
400                *internal,
401                *pending,
402            ),
403            _ => (None, None, None, false, false),
404        };
405
406        let cluster_replica_update = BuiltinTableUpdate::row(
407            &*MZ_CLUSTER_REPLICAS,
408            Row::pack_slice(&[
409                Datum::String(&id.to_string()),
410                Datum::String(name),
411                Datum::String(&cluster_id.to_string()),
412                Datum::from(size),
413                Datum::from(az),
414                Datum::String(&replica.owner_id.to_string()),
415                Datum::from(disk),
416            ]),
417            diff,
418        );
419
420        let mut updates = vec![cluster_replica_update];
421
422        if internal {
423            let update = BuiltinTableUpdate::row(
424                &*MZ_INTERNAL_CLUSTER_REPLICAS,
425                Row::pack_slice(&[Datum::String(&id.to_string())]),
426                diff,
427            );
428            updates.push(update);
429        }
430
431        if pending {
432            let update = BuiltinTableUpdate::row(
433                &*MZ_PENDING_CLUSTER_REPLICAS,
434                Row::pack_slice(&[Datum::String(&id.to_string())]),
435                diff,
436            );
437            updates.push(update);
438        }
439
440        updates
441    }
442
443    pub(crate) fn pack_network_policy_update(
444        &self,
445        policy_id: &NetworkPolicyId,
446        diff: Diff,
447    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
448        let policy = self.get_network_policy(policy_id);
449        let row = self.pack_privilege_array_row(&policy.privileges);
450        let privileges = row.unpack_first();
451        let mut updates = Vec::new();
452        for ref rule in policy.rules.clone() {
453            updates.push(BuiltinTableUpdate::row(
454                &*MZ_NETWORK_POLICY_RULES,
455                Row::pack_slice(&[
456                    Datum::String(&rule.name),
457                    Datum::String(&policy.id.to_string()),
458                    Datum::String(&rule.action.to_string()),
459                    Datum::String(&rule.address.to_string()),
460                    Datum::String(&rule.direction.to_string()),
461                ]),
462                diff,
463            ));
464        }
465        updates.push(BuiltinTableUpdate::row(
466            &*MZ_NETWORK_POLICIES,
467            Row::pack_slice(&[
468                Datum::String(&policy.id.to_string()),
469                Datum::String(&policy.name),
470                Datum::String(&policy.owner_id.to_string()),
471                privileges,
472                Datum::UInt32(policy.oid.clone()),
473            ]),
474            diff,
475        ));
476
477        Ok(updates)
478    }
479
480    pub(super) fn pack_item_update(
481        &self,
482        id: CatalogItemId,
483        diff: Diff,
484    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
485        let entry = self.get_entry(&id);
486        let oid = entry.oid();
487        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
488        let schema_id = &self
489            .get_schema(
490                &entry.name().qualifiers.database_spec,
491                &entry.name().qualifiers.schema_spec,
492                conn_id,
493            )
494            .id;
495        let name = &entry.name().item;
496        let owner_id = entry.owner_id();
497        let privileges_row = self.pack_privilege_array_row(entry.privileges());
498        let privileges = privileges_row.unpack_first();
499        let mut updates = match entry.item() {
500            CatalogItem::Log(_) => self.pack_source_update(
501                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
502                privileges, diff, None,
503            ),
504            CatalogItem::Index(index) => {
505                self.pack_index_update(id, oid, name, owner_id, index, diff)
506            }
507            CatalogItem::Table(table) => {
508                let mut updates = self
509                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
510
511                if let TableDataSource::DataSource {
512                    desc: data_source,
513                    timeline: _,
514                } = &table.data_source
515                {
516                    updates.extend(match data_source {
517                        DataSourceDesc::IngestionExport {
518                            ingestion_id,
519                            external_reference: UnresolvedItemName(external_reference),
520                            details: _,
521                            data_config: _,
522                        } => {
523                            let ingestion_entry = self
524                                .get_entry(ingestion_id)
525                                .source_desc()
526                                .expect("primary source exists")
527                                .expect("primary source is a source");
528
529                            match ingestion_entry.connection.name() {
530                                "postgres" => {
531                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
532                                    // The left-most qualification of Postgres
533                                    // tables is the database, but this
534                                    // information is redundant because each
535                                    // Postgres connection connects to only one
536                                    // database.
537                                    let schema_name = external_reference[1].to_ast_string_simple();
538                                    let table_name = external_reference[2].to_ast_string_simple();
539
540                                    self.pack_postgres_source_tables_update(
541                                        id,
542                                        &schema_name,
543                                        &table_name,
544                                        diff,
545                                    )
546                                }
547                                "mysql" => {
548                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
549                                    let schema_name = external_reference[0].to_ast_string_simple();
550                                    let table_name = external_reference[1].to_ast_string_simple();
551
552                                    self.pack_mysql_source_tables_update(
553                                        id,
554                                        &schema_name,
555                                        &table_name,
556                                        diff,
557                                    )
558                                }
559                                "sql-server" => {
560                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
561                                    // The left-most qualification of SQL Server tables is
562                                    // the database, but this information is redundant
563                                    // because each SQL Server connection connects to
564                                    // only one database.
565                                    let schema_name = external_reference[1].to_ast_string_simple();
566                                    let table_name = external_reference[2].to_ast_string_simple();
567
568                                    self.pack_sql_server_source_table_update(
569                                        id,
570                                        &schema_name,
571                                        &table_name,
572                                        diff,
573                                    )
574                                }
575                                // Load generator sources don't have any special
576                                // updates.
577                                "load-generator" => vec![],
578                                "kafka" => {
579                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
580                                    let topic = external_reference[0].to_ast_string_simple();
581                                    let envelope = data_source.envelope();
582                                    let (key_format, value_format) = data_source.formats();
583
584                                    self.pack_kafka_source_tables_update(
585                                        id,
586                                        &topic,
587                                        envelope,
588                                        key_format,
589                                        value_format,
590                                        diff,
591                                    )
592                                }
593                                s => unreachable!("{s} sources do not have tables"),
594                            }
595                        }
596                        _ => vec![],
597                    });
598                }
599
600                updates
601            }
602            CatalogItem::Source(source) => {
603                let source_type = source.source_type();
604                let connection_id = source.connection_id();
605                let envelope = source.data_source.envelope();
606                let cluster_entry = match source.data_source {
607                    // Ingestion exports don't have their own cluster, but
608                    // run on their ingestion's cluster.
609                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
610                        self.get_entry(&ingestion_id)
611                    }
612                    _ => entry,
613                };
614
615                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
616
617                let (key_format, value_format) = source.data_source.formats();
618
619                let mut updates = self.pack_source_update(
620                    id,
621                    oid,
622                    schema_id,
623                    name,
624                    source_type,
625                    connection_id,
626                    envelope,
627                    key_format,
628                    value_format,
629                    cluster_id.as_deref(),
630                    owner_id,
631                    privileges,
632                    diff,
633                    source.create_sql.as_ref(),
634                );
635
636                updates.extend(match &source.data_source {
637                    DataSourceDesc::Ingestion { desc, .. }
638                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
639                        GenericSourceConnection::Postgres(postgres) => {
640                            self.pack_postgres_source_update(id, postgres, diff)
641                        }
642                        GenericSourceConnection::Kafka(kafka) => {
643                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
644                        }
645                        _ => vec![],
646                    },
647                    DataSourceDesc::IngestionExport {
648                        ingestion_id,
649                        external_reference: UnresolvedItemName(external_reference),
650                        details: _,
651                        data_config: _,
652                    } => {
653                        let ingestion_entry = self
654                            .get_entry(ingestion_id)
655                            .source_desc()
656                            .expect("primary source exists")
657                            .expect("primary source is a source");
658
659                        match ingestion_entry.connection.name() {
660                            "postgres" => {
661                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
662                                // The left-most qualification of Postgres
663                                // tables is the database, but this
664                                // information is redundant because each
665                                // Postgres connection connects to only one
666                                // database.
667                                let schema_name = external_reference[1].to_ast_string_simple();
668                                let table_name = external_reference[2].to_ast_string_simple();
669
670                                self.pack_postgres_source_tables_update(
671                                    id,
672                                    &schema_name,
673                                    &table_name,
674                                    diff,
675                                )
676                            }
677                            "mysql" => {
678                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
679                                let schema_name = external_reference[0].to_ast_string_simple();
680                                let table_name = external_reference[1].to_ast_string_simple();
681
682                                self.pack_mysql_source_tables_update(
683                                    id,
684                                    &schema_name,
685                                    &table_name,
686                                    diff,
687                                )
688                            }
689                            "sql-server" => {
690                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
691                                // The left-most qualification of SQL Server tables is
692                                // the database, but this information is redundant
693                                // because each SQL Server connection connects to
694                                // only one database.
695                                let schema_name = external_reference[1].to_ast_string_simple();
696                                let table_name = external_reference[2].to_ast_string_simple();
697
698                                self.pack_sql_server_source_table_update(
699                                    id,
700                                    &schema_name,
701                                    &table_name,
702                                    diff,
703                                )
704                            }
705                            // Load generator sources don't have any special
706                            // updates.
707                            "load-generator" => vec![],
708                            s => unreachable!("{s} sources do not have subsources"),
709                        }
710                    }
711                    DataSourceDesc::Webhook { .. } => {
712                        vec![self.pack_webhook_source_update(id, diff)]
713                    }
714                    _ => vec![],
715                });
716
717                updates
718            }
719            CatalogItem::View(view) => {
720                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
721            }
722            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
723                id, oid, schema_id, name, owner_id, privileges, mview, diff,
724            ),
725            CatalogItem::Sink(sink) => {
726                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
727            }
728            CatalogItem::Type(ty) => {
729                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
730            }
731            CatalogItem::Func(func) => {
732                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
733            }
734            CatalogItem::Secret(_) => {
735                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
736            }
737            CatalogItem::Connection(connection) => self.pack_connection_update(
738                id, oid, schema_id, name, owner_id, privileges, connection, diff,
739            ),
740            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
741                id, oid, schema_id, name, owner_id, privileges, ct, diff,
742            ),
743        };
744
745        if !entry.item().is_temporary() {
746            // Populate or clean up the `mz_object_dependencies` table.
747            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
748            for dependee in entry.item().references().items() {
749                updates.push(self.pack_depends_update(id, *dependee, diff))
750            }
751        }
752
753        let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
754        // Always report the latest for an objects columns.
755        if let Ok(desc) = entry.desc_latest(&full_name) {
756            let defaults = match entry.item() {
757                CatalogItem::Table(Table {
758                    data_source: TableDataSource::TableWrites { defaults },
759                    ..
760                }) => Some(defaults),
761                _ => None,
762            };
763            for (i, (column_name, column_type)) in desc.iter().enumerate() {
764                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
765                let default: Datum = default
766                    .as_ref()
767                    .map(|d| Datum::String(d))
768                    .unwrap_or(Datum::Null);
769                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
770                let (type_name, type_oid) = match &column_type.scalar_type {
771                    SqlScalarType::List {
772                        custom_id: Some(custom_id),
773                        ..
774                    }
775                    | SqlScalarType::Map {
776                        custom_id: Some(custom_id),
777                        ..
778                    }
779                    | SqlScalarType::Record {
780                        custom_id: Some(custom_id),
781                        ..
782                    } => {
783                        let entry = self.get_entry(custom_id);
784                        // NOTE(benesch): the `mz_columns.type text` field is
785                        // wrong. Types do not have a name that can be
786                        // represented as a single textual field. There can be
787                        // multiple types with the same name in different
788                        // schemas and databases. We should eventually deprecate
789                        // the `type` field in favor of a new `type_id` field
790                        // that can be joined against `mz_types`.
791                        //
792                        // For now, in the interest of pragmatism, we just use
793                        // the type's item name, and accept that there may be
794                        // ambiguity if the same type name is used in multiple
795                        // schemas. The ambiguity is mitigated by the OID, which
796                        // can be joined against `mz_types.oid` to resolve the
797                        // ambiguity.
798                        let name = &*entry.name().item;
799                        let oid = entry.oid();
800                        (name, oid)
801                    }
802                    _ => (pgtype.name(), pgtype.oid()),
803                };
804                updates.push(BuiltinTableUpdate::row(
805                    &*MZ_COLUMNS,
806                    Row::pack_slice(&[
807                        Datum::String(&id.to_string()),
808                        Datum::String(column_name),
809                        Datum::UInt64(u64::cast_from(i + 1)),
810                        Datum::from(column_type.nullable),
811                        Datum::String(type_name),
812                        default,
813                        Datum::UInt32(type_oid),
814                        Datum::Int32(pgtype.typmod()),
815                    ]),
816                    diff,
817                ));
818            }
819        }
820
821        // Use initial lcw so that we can tell apart default from non-existent windows.
822        if let Some(cw) = entry.item().initial_logical_compaction_window() {
823            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
824        }
825
826        updates
827    }
828
829    fn pack_history_retention_strategy_update(
830        &self,
831        id: CatalogItemId,
832        cw: CompactionWindow,
833        diff: Diff,
834    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
835        let cw: u64 = cw.comparable_timestamp().into();
836        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
837            .expect("must serialize");
838        BuiltinTableUpdate::row(
839            &*MZ_HISTORY_RETENTION_STRATEGIES,
840            Row::pack_slice(&[
841                Datum::String(&id.to_string()),
842                // FOR is the only strategy at the moment. We may introduce FROM or others later.
843                Datum::String("FOR"),
844                cw.into_row().into_element(),
845            ]),
846            diff,
847        )
848    }
849
850    fn pack_table_update(
851        &self,
852        id: CatalogItemId,
853        oid: u32,
854        schema_id: &SchemaSpecifier,
855        name: &str,
856        owner_id: &RoleId,
857        privileges: Datum,
858        diff: Diff,
859        table: &Table,
860    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
861        let redacted = table.create_sql.as_ref().map(|create_sql| {
862            mz_sql::parse::parse(create_sql)
863                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
864                .into_element()
865                .ast
866                .to_ast_string_redacted()
867        });
868        let source_id = if let TableDataSource::DataSource {
869            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
870            ..
871        } = &table.data_source
872        {
873            Some(ingestion_id.to_string())
874        } else {
875            None
876        };
877
878        vec![BuiltinTableUpdate::row(
879            &*MZ_TABLES,
880            Row::pack_slice(&[
881                Datum::String(&id.to_string()),
882                Datum::UInt32(oid),
883                Datum::String(&schema_id.to_string()),
884                Datum::String(name),
885                Datum::String(&owner_id.to_string()),
886                privileges,
887                if let Some(create_sql) = &table.create_sql {
888                    Datum::String(create_sql)
889                } else {
890                    Datum::Null
891                },
892                if let Some(redacted) = &redacted {
893                    Datum::String(redacted)
894                } else {
895                    Datum::Null
896                },
897                if let Some(source_id) = source_id.as_ref() {
898                    Datum::String(source_id)
899                } else {
900                    Datum::Null
901                },
902            ]),
903            diff,
904        )]
905    }
906
907    fn pack_source_update(
908        &self,
909        id: CatalogItemId,
910        oid: u32,
911        schema_id: &SchemaSpecifier,
912        name: &str,
913        source_desc_name: &str,
914        connection_id: Option<CatalogItemId>,
915        envelope: Option<&str>,
916        key_format: Option<&str>,
917        value_format: Option<&str>,
918        cluster_id: Option<&str>,
919        owner_id: &RoleId,
920        privileges: Datum,
921        diff: Diff,
922        create_sql: Option<&String>,
923    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
924        let redacted = create_sql.map(|create_sql| {
925            let create_stmt = mz_sql::parse::parse(create_sql)
926                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
927                .into_element()
928                .ast;
929            create_stmt.to_ast_string_redacted()
930        });
931        vec![BuiltinTableUpdate::row(
932            &*MZ_SOURCES,
933            Row::pack_slice(&[
934                Datum::String(&id.to_string()),
935                Datum::UInt32(oid),
936                Datum::String(&schema_id.to_string()),
937                Datum::String(name),
938                Datum::String(source_desc_name),
939                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
940                // This is the "source size", which is a remnant from linked
941                // clusters.
942                Datum::Null,
943                Datum::from(envelope),
944                Datum::from(key_format),
945                Datum::from(value_format),
946                Datum::from(cluster_id),
947                Datum::String(&owner_id.to_string()),
948                privileges,
949                if let Some(create_sql) = create_sql {
950                    Datum::String(create_sql)
951                } else {
952                    Datum::Null
953                },
954                if let Some(redacted) = &redacted {
955                    Datum::String(redacted)
956                } else {
957                    Datum::Null
958                },
959            ]),
960            diff,
961        )]
962    }
963
964    fn pack_postgres_source_update(
965        &self,
966        id: CatalogItemId,
967        postgres: &PostgresSourceConnection<ReferencedConnection>,
968        diff: Diff,
969    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
970        vec![BuiltinTableUpdate::row(
971            &*MZ_POSTGRES_SOURCES,
972            Row::pack_slice(&[
973                Datum::String(&id.to_string()),
974                Datum::String(&postgres.publication_details.slot),
975                Datum::from(postgres.publication_details.timeline_id),
976            ]),
977            diff,
978        )]
979    }
980
981    fn pack_kafka_source_update(
982        &self,
983        item_id: CatalogItemId,
984        collection_id: GlobalId,
985        kafka: &KafkaSourceConnection<ReferencedConnection>,
986        diff: Diff,
987    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
988        vec![BuiltinTableUpdate::row(
989            &*MZ_KAFKA_SOURCES,
990            Row::pack_slice(&[
991                Datum::String(&item_id.to_string()),
992                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
993                Datum::String(&kafka.topic),
994            ]),
995            diff,
996        )]
997    }
998
999    fn pack_postgres_source_tables_update(
1000        &self,
1001        id: CatalogItemId,
1002        schema_name: &str,
1003        table_name: &str,
1004        diff: Diff,
1005    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1006        vec![BuiltinTableUpdate::row(
1007            &*MZ_POSTGRES_SOURCE_TABLES,
1008            Row::pack_slice(&[
1009                Datum::String(&id.to_string()),
1010                Datum::String(schema_name),
1011                Datum::String(table_name),
1012            ]),
1013            diff,
1014        )]
1015    }
1016
1017    fn pack_mysql_source_tables_update(
1018        &self,
1019        id: CatalogItemId,
1020        schema_name: &str,
1021        table_name: &str,
1022        diff: Diff,
1023    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1024        vec![BuiltinTableUpdate::row(
1025            &*MZ_MYSQL_SOURCE_TABLES,
1026            Row::pack_slice(&[
1027                Datum::String(&id.to_string()),
1028                Datum::String(schema_name),
1029                Datum::String(table_name),
1030            ]),
1031            diff,
1032        )]
1033    }
1034
1035    fn pack_sql_server_source_table_update(
1036        &self,
1037        id: CatalogItemId,
1038        schema_name: &str,
1039        table_name: &str,
1040        diff: Diff,
1041    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1042        vec![BuiltinTableUpdate::row(
1043            &*MZ_SQL_SERVER_SOURCE_TABLES,
1044            Row::pack_slice(&[
1045                Datum::String(&id.to_string()),
1046                Datum::String(schema_name),
1047                Datum::String(table_name),
1048            ]),
1049            diff,
1050        )]
1051    }
1052
1053    fn pack_kafka_source_tables_update(
1054        &self,
1055        id: CatalogItemId,
1056        topic: &str,
1057        envelope: Option<&str>,
1058        key_format: Option<&str>,
1059        value_format: Option<&str>,
1060        diff: Diff,
1061    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1062        vec![BuiltinTableUpdate::row(
1063            &*MZ_KAFKA_SOURCE_TABLES,
1064            Row::pack_slice(&[
1065                Datum::String(&id.to_string()),
1066                Datum::String(topic),
1067                Datum::from(envelope),
1068                Datum::from(key_format),
1069                Datum::from(value_format),
1070            ]),
1071            diff,
1072        )]
1073    }
1074
1075    fn pack_connection_update(
1076        &self,
1077        id: CatalogItemId,
1078        oid: u32,
1079        schema_id: &SchemaSpecifier,
1080        name: &str,
1081        owner_id: &RoleId,
1082        privileges: Datum,
1083        connection: &Connection,
1084        diff: Diff,
1085    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1086        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1087            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1088            .into_element()
1089            .ast;
1090        let mut updates = vec![BuiltinTableUpdate::row(
1091            &*MZ_CONNECTIONS,
1092            Row::pack_slice(&[
1093                Datum::String(&id.to_string()),
1094                Datum::UInt32(oid),
1095                Datum::String(&schema_id.to_string()),
1096                Datum::String(name),
1097                Datum::String(match connection.details {
1098                    ConnectionDetails::Kafka { .. } => "kafka",
1099                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1100                    ConnectionDetails::Postgres { .. } => "postgres",
1101                    ConnectionDetails::Aws(..) => "aws",
1102                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1103                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1104                    ConnectionDetails::MySql { .. } => "mysql",
1105                    ConnectionDetails::SqlServer(_) => "sql-server",
1106                    ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1107                }),
1108                Datum::String(&owner_id.to_string()),
1109                privileges,
1110                Datum::String(&connection.create_sql),
1111                Datum::String(&create_stmt.to_ast_string_redacted()),
1112            ]),
1113            diff,
1114        )];
1115        match connection.details {
1116            ConnectionDetails::Kafka(ref kafka) => {
1117                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1118            }
1119            ConnectionDetails::Aws(ref aws_config) => {
1120                match self.pack_aws_connection_update(id, aws_config, diff) {
1121                    Ok(update) => {
1122                        updates.push(update);
1123                    }
1124                    Err(e) => {
1125                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1126                    }
1127                }
1128            }
1129            ConnectionDetails::AwsPrivatelink(_) => {
1130                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1131                    updates.push(self.pack_aws_privatelink_connection_update(
1132                        id,
1133                        aws_principal_context,
1134                        diff,
1135                    ));
1136                } else {
1137                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1138                }
1139            }
1140            ConnectionDetails::Ssh {
1141                ref key_1,
1142                ref key_2,
1143                ..
1144            } => {
1145                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1146            }
1147            ConnectionDetails::Csr(_)
1148            | ConnectionDetails::Postgres(_)
1149            | ConnectionDetails::MySql(_)
1150            | ConnectionDetails::SqlServer(_)
1151            | ConnectionDetails::IcebergCatalog(_) => (),
1152        };
1153        updates
1154    }
1155
1156    pub(crate) fn pack_ssh_tunnel_connection_update(
1157        &self,
1158        id: CatalogItemId,
1159        key_1: &SshKey,
1160        key_2: &SshKey,
1161        diff: Diff,
1162    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1163        BuiltinTableUpdate::row(
1164            &*MZ_SSH_TUNNEL_CONNECTIONS,
1165            Row::pack_slice(&[
1166                Datum::String(&id.to_string()),
1167                Datum::String(key_1.public_key().as_str()),
1168                Datum::String(key_2.public_key().as_str()),
1169            ]),
1170            diff,
1171        )
1172    }
1173
1174    fn pack_kafka_connection_update(
1175        &self,
1176        id: CatalogItemId,
1177        kafka: &KafkaConnection<ReferencedConnection>,
1178        diff: Diff,
1179    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1180        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1181        let mut row = Row::default();
1182        row.packer()
1183            .try_push_array(
1184                &[ArrayDimension {
1185                    lower_bound: 1,
1186                    length: kafka.brokers.len(),
1187                }],
1188                kafka
1189                    .brokers
1190                    .iter()
1191                    .map(|broker| Datum::String(&broker.address)),
1192            )
1193            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1194        let brokers = row.unpack_first();
1195        vec![BuiltinTableUpdate::row(
1196            &*MZ_KAFKA_CONNECTIONS,
1197            Row::pack_slice(&[
1198                Datum::String(&id.to_string()),
1199                brokers,
1200                Datum::String(&progress_topic),
1201            ]),
1202            diff,
1203        )]
1204    }
1205
1206    pub fn pack_aws_privatelink_connection_update(
1207        &self,
1208        connection_id: CatalogItemId,
1209        aws_principal_context: &AwsPrincipalContext,
1210        diff: Diff,
1211    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1212        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1213        let row = Row::pack_slice(&[
1214            Datum::String(&connection_id.to_string()),
1215            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1216        ]);
1217        BuiltinTableUpdate::row(id, row, diff)
1218    }
1219
1220    pub fn pack_aws_connection_update(
1221        &self,
1222        connection_id: CatalogItemId,
1223        aws_config: &AwsConnection,
1224        diff: Diff,
1225    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1226        let id = &MZ_AWS_CONNECTIONS;
1227
1228        let mut access_key_id = None;
1229        let mut access_key_id_secret_id = None;
1230        let mut secret_access_key_secret_id = None;
1231        let mut session_token = None;
1232        let mut session_token_secret_id = None;
1233        let mut assume_role_arn = None;
1234        let mut assume_role_session_name = None;
1235        let mut principal = None;
1236        let mut external_id = None;
1237        let mut example_trust_policy = None;
1238        match &aws_config.auth {
1239            AwsAuth::Credentials(credentials) => {
1240                match &credentials.access_key_id {
1241                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1242                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1243                }
1244                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1245                match credentials.session_token.as_ref() {
1246                    None => (),
1247                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1248                    Some(StringOrSecret::Secret(s)) => {
1249                        session_token_secret_id = Some(s.to_string())
1250                    }
1251                }
1252            }
1253            AwsAuth::AssumeRole(assume_role) => {
1254                assume_role_arn = Some(assume_role.arn.as_str());
1255                assume_role_session_name = assume_role.session_name.as_deref();
1256                principal = self
1257                    .config
1258                    .connection_context
1259                    .aws_connection_role_arn
1260                    .as_deref();
1261                external_id =
1262                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1263                example_trust_policy = {
1264                    let policy = assume_role
1265                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1266                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1267                    Some(policy.into_row())
1268                };
1269            }
1270        }
1271
1272        let row = Row::pack_slice(&[
1273            Datum::String(&connection_id.to_string()),
1274            Datum::from(aws_config.endpoint.as_deref()),
1275            Datum::from(aws_config.region.as_deref()),
1276            Datum::from(access_key_id),
1277            Datum::from(access_key_id_secret_id.as_deref()),
1278            Datum::from(secret_access_key_secret_id.as_deref()),
1279            Datum::from(session_token),
1280            Datum::from(session_token_secret_id.as_deref()),
1281            Datum::from(assume_role_arn),
1282            Datum::from(assume_role_session_name),
1283            Datum::from(principal),
1284            Datum::from(external_id.as_deref()),
1285            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1286        ]);
1287
1288        Ok(BuiltinTableUpdate::row(id, row, diff))
1289    }
1290
1291    fn pack_view_update(
1292        &self,
1293        id: CatalogItemId,
1294        oid: u32,
1295        schema_id: &SchemaSpecifier,
1296        name: &str,
1297        owner_id: &RoleId,
1298        privileges: Datum,
1299        view: &View,
1300        diff: Diff,
1301    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1302        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1303            .unwrap_or_else(|e| {
1304                panic!(
1305                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1306                    view.create_sql, e
1307                )
1308            })
1309            .into_element()
1310            .ast;
1311        let query = match &create_stmt {
1312            Statement::CreateView(stmt) => &stmt.definition.query,
1313            _ => unreachable!(),
1314        };
1315
1316        let mut query_string = query.to_ast_string_stable();
1317        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1318        // do the same for compatibility's sake.
1319        query_string.push(';');
1320
1321        vec![BuiltinTableUpdate::row(
1322            &*MZ_VIEWS,
1323            Row::pack_slice(&[
1324                Datum::String(&id.to_string()),
1325                Datum::UInt32(oid),
1326                Datum::String(&schema_id.to_string()),
1327                Datum::String(name),
1328                Datum::String(&query_string),
1329                Datum::String(&owner_id.to_string()),
1330                privileges,
1331                Datum::String(&view.create_sql),
1332                Datum::String(&create_stmt.to_ast_string_redacted()),
1333            ]),
1334            diff,
1335        )]
1336    }
1337
1338    fn pack_materialized_view_update(
1339        &self,
1340        id: CatalogItemId,
1341        oid: u32,
1342        schema_id: &SchemaSpecifier,
1343        name: &str,
1344        owner_id: &RoleId,
1345        privileges: Datum,
1346        mview: &MaterializedView,
1347        diff: Diff,
1348    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1349        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1350            .unwrap_or_else(|e| {
1351                panic!(
1352                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1353                    mview.create_sql, e
1354                )
1355            })
1356            .into_element()
1357            .ast;
1358        let query_string = match &create_stmt {
1359            Statement::CreateMaterializedView(stmt) => {
1360                let mut query_string = stmt.query.to_ast_string_stable();
1361                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1362                // do the same for compatibility's sake.
1363                query_string.push(';');
1364                query_string
1365            }
1366            _ => unreachable!(),
1367        };
1368
1369        let mut updates = Vec::new();
1370
1371        updates.push(BuiltinTableUpdate::row(
1372            &*MZ_MATERIALIZED_VIEWS,
1373            Row::pack_slice(&[
1374                Datum::String(&id.to_string()),
1375                Datum::UInt32(oid),
1376                Datum::String(&schema_id.to_string()),
1377                Datum::String(name),
1378                Datum::String(&mview.cluster_id.to_string()),
1379                Datum::String(&query_string),
1380                Datum::String(&owner_id.to_string()),
1381                privileges,
1382                Datum::String(&mview.create_sql),
1383                Datum::String(&create_stmt.to_ast_string_redacted()),
1384            ]),
1385            diff,
1386        ));
1387
1388        if let Some(refresh_schedule) = &mview.refresh_schedule {
1389            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1390            // empty `RefreshSchedule`.
1391            assert!(!refresh_schedule.is_empty());
1392            for RefreshEvery {
1393                interval,
1394                aligned_to,
1395            } in refresh_schedule.everies.iter()
1396            {
1397                let aligned_to_dt = mz_ore::now::to_datetime(
1398                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1399                );
1400                updates.push(BuiltinTableUpdate::row(
1401                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1402                    Row::pack_slice(&[
1403                        Datum::String(&id.to_string()),
1404                        Datum::String("every"),
1405                        Datum::Interval(
1406                            Interval::from_duration(interval).expect(
1407                                "planning ensured that this is convertible back to Interval",
1408                            ),
1409                        ),
1410                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1411                        Datum::Null,
1412                    ]),
1413                    diff,
1414                ));
1415            }
1416            for at in refresh_schedule.ats.iter() {
1417                let at_dt = mz_ore::now::to_datetime(
1418                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1419                );
1420                updates.push(BuiltinTableUpdate::row(
1421                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1422                    Row::pack_slice(&[
1423                        Datum::String(&id.to_string()),
1424                        Datum::String("at"),
1425                        Datum::Null,
1426                        Datum::Null,
1427                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1428                    ]),
1429                    diff,
1430                ));
1431            }
1432        } else {
1433            updates.push(BuiltinTableUpdate::row(
1434                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1435                Row::pack_slice(&[
1436                    Datum::String(&id.to_string()),
1437                    Datum::String("on-commit"),
1438                    Datum::Null,
1439                    Datum::Null,
1440                    Datum::Null,
1441                ]),
1442                diff,
1443            ));
1444        }
1445
1446        updates
1447    }
1448
1449    fn pack_continual_task_update(
1450        &self,
1451        id: CatalogItemId,
1452        oid: u32,
1453        schema_id: &SchemaSpecifier,
1454        name: &str,
1455        owner_id: &RoleId,
1456        privileges: Datum,
1457        ct: &ContinualTask,
1458        diff: Diff,
1459    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1460        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1461            .unwrap_or_else(|e| {
1462                panic!(
1463                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1464                    ct.create_sql, e
1465                )
1466            })
1467            .into_element()
1468            .ast;
1469        let query_string = match &create_stmt {
1470            Statement::CreateContinualTask(stmt) => {
1471                let mut query_string = String::new();
1472                for stmt in &stmt.stmts {
1473                    let s = match stmt {
1474                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1475                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1476                    };
1477                    if query_string.is_empty() {
1478                        query_string = s;
1479                    } else {
1480                        query_string.push_str("; ");
1481                        query_string.push_str(&s);
1482                    }
1483                }
1484                query_string
1485            }
1486            _ => unreachable!(),
1487        };
1488
1489        vec![BuiltinTableUpdate::row(
1490            &*MZ_CONTINUAL_TASKS,
1491            Row::pack_slice(&[
1492                Datum::String(&id.to_string()),
1493                Datum::UInt32(oid),
1494                Datum::String(&schema_id.to_string()),
1495                Datum::String(name),
1496                Datum::String(&ct.cluster_id.to_string()),
1497                Datum::String(&query_string),
1498                Datum::String(&owner_id.to_string()),
1499                privileges,
1500                Datum::String(&ct.create_sql),
1501                Datum::String(&create_stmt.to_ast_string_redacted()),
1502            ]),
1503            diff,
1504        )]
1505    }
1506
1507    fn pack_sink_update(
1508        &self,
1509        id: CatalogItemId,
1510        oid: u32,
1511        schema_id: &SchemaSpecifier,
1512        name: &str,
1513        owner_id: &RoleId,
1514        sink: &Sink,
1515        diff: Diff,
1516    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1517        let mut updates = vec![];
1518        match &sink.connection {
1519            StorageSinkConnection::Kafka(KafkaSinkConnection {
1520                topic: topic_name, ..
1521            }) => {
1522                updates.push(BuiltinTableUpdate::row(
1523                    &*MZ_KAFKA_SINKS,
1524                    Row::pack_slice(&[
1525                        Datum::String(&id.to_string()),
1526                        Datum::String(topic_name.as_str()),
1527                    ]),
1528                    diff,
1529                ));
1530            }
1531        };
1532
1533        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1534            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1535            .into_element()
1536            .ast;
1537
1538        let envelope = sink.envelope();
1539
1540        // The combined format string is used for the deprecated `format` column.
1541        let combined_format = sink.combined_format();
1542        let (key_format, value_format) = sink.formats();
1543
1544        updates.push(BuiltinTableUpdate::row(
1545            &*MZ_SINKS,
1546            Row::pack_slice(&[
1547                Datum::String(&id.to_string()),
1548                Datum::UInt32(oid),
1549                Datum::String(&schema_id.to_string()),
1550                Datum::String(name),
1551                Datum::String(sink.connection.name()),
1552                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1553                // size column now deprecated w/o linked clusters
1554                Datum::Null,
1555                Datum::from(envelope),
1556                Datum::from(combined_format.as_ref()),
1557                Datum::from(key_format),
1558                Datum::String(value_format),
1559                Datum::String(&sink.cluster_id.to_string()),
1560                Datum::String(&owner_id.to_string()),
1561                Datum::String(&sink.create_sql),
1562                Datum::String(&create_stmt.to_ast_string_redacted()),
1563            ]),
1564            diff,
1565        ));
1566
1567        updates
1568    }
1569
1570    fn pack_index_update(
1571        &self,
1572        id: CatalogItemId,
1573        oid: u32,
1574        name: &str,
1575        owner_id: &RoleId,
1576        index: &Index,
1577        diff: Diff,
1578    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1579        let mut updates = vec![];
1580
1581        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1582            .unwrap_or_else(|e| {
1583                panic!(
1584                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1585                    index.create_sql, e
1586                )
1587            })
1588            .into_element()
1589            .ast;
1590
1591        let key_sqls = match &create_stmt {
1592            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1593                .as_ref()
1594                .expect("key_parts is filled in during planning"),
1595            _ => unreachable!(),
1596        };
1597        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1598
1599        updates.push(BuiltinTableUpdate::row(
1600            &*MZ_INDEXES,
1601            Row::pack_slice(&[
1602                Datum::String(&id.to_string()),
1603                Datum::UInt32(oid),
1604                Datum::String(name),
1605                Datum::String(&on_item_id.to_string()),
1606                Datum::String(&index.cluster_id.to_string()),
1607                Datum::String(&owner_id.to_string()),
1608                Datum::String(&index.create_sql),
1609                Datum::String(&create_stmt.to_ast_string_redacted()),
1610            ]),
1611            diff,
1612        ));
1613
1614        for (i, key) in index.keys.iter().enumerate() {
1615            let on_entry = self.get_entry_by_global_id(&index.on);
1616            let nullable = key
1617                .typ(
1618                    &on_entry
1619                        .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1620                        .expect("can only create indexes on items with a valid description")
1621                        .typ()
1622                        .column_types,
1623                )
1624                .nullable;
1625            let seq_in_index = u64::cast_from(i + 1);
1626            let key_sql = key_sqls
1627                .get(i)
1628                .expect("missing sql information for index key")
1629                .to_ast_string_simple();
1630            let (field_number, expression) = match key {
1631                MirScalarExpr::Column(col, _) => {
1632                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1633                }
1634                _ => (Datum::Null, Datum::String(&key_sql)),
1635            };
1636            updates.push(BuiltinTableUpdate::row(
1637                &*MZ_INDEX_COLUMNS,
1638                Row::pack_slice(&[
1639                    Datum::String(&id.to_string()),
1640                    Datum::UInt64(seq_in_index),
1641                    field_number,
1642                    expression,
1643                    Datum::from(nullable),
1644                ]),
1645                diff,
1646            ));
1647        }
1648
1649        updates
1650    }
1651
1652    fn pack_type_update(
1653        &self,
1654        id: CatalogItemId,
1655        oid: u32,
1656        schema_id: &SchemaSpecifier,
1657        name: &str,
1658        owner_id: &RoleId,
1659        privileges: Datum,
1660        typ: &Type,
1661        diff: Diff,
1662    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1663        let mut out = vec![];
1664
1665        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1666            mz_sql::parse::parse(create_sql)
1667                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1668                .into_element()
1669                .ast
1670                .to_ast_string_redacted()
1671        });
1672
1673        out.push(BuiltinTableUpdate::row(
1674            &*MZ_TYPES,
1675            Row::pack_slice(&[
1676                Datum::String(&id.to_string()),
1677                Datum::UInt32(oid),
1678                Datum::String(&schema_id.to_string()),
1679                Datum::String(name),
1680                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1681                Datum::String(&owner_id.to_string()),
1682                privileges,
1683                if let Some(create_sql) = &typ.create_sql {
1684                    Datum::String(create_sql)
1685                } else {
1686                    Datum::Null
1687                },
1688                if let Some(redacted) = &redacted {
1689                    Datum::String(redacted)
1690                } else {
1691                    Datum::Null
1692                },
1693            ]),
1694            diff,
1695        ));
1696
1697        let mut row = Row::default();
1698        let mut packer = row.packer();
1699
1700        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1701            if mods.is_empty() {
1702                packer.push(Datum::Null);
1703            } else {
1704                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1705            }
1706        }
1707
1708        let index_id = match &typ.details.typ {
1709            CatalogType::Array {
1710                element_reference: element_id,
1711            } => {
1712                packer.push(Datum::String(&id.to_string()));
1713                packer.push(Datum::String(&element_id.to_string()));
1714                &MZ_ARRAY_TYPES
1715            }
1716            CatalogType::List {
1717                element_reference: element_id,
1718                element_modifiers,
1719            } => {
1720                packer.push(Datum::String(&id.to_string()));
1721                packer.push(Datum::String(&element_id.to_string()));
1722                append_modifier(&mut packer, element_modifiers);
1723                &MZ_LIST_TYPES
1724            }
1725            CatalogType::Map {
1726                key_reference: key_id,
1727                value_reference: value_id,
1728                key_modifiers,
1729                value_modifiers,
1730            } => {
1731                packer.push(Datum::String(&id.to_string()));
1732                packer.push(Datum::String(&key_id.to_string()));
1733                packer.push(Datum::String(&value_id.to_string()));
1734                append_modifier(&mut packer, key_modifiers);
1735                append_modifier(&mut packer, value_modifiers);
1736                &MZ_MAP_TYPES
1737            }
1738            CatalogType::Pseudo => {
1739                packer.push(Datum::String(&id.to_string()));
1740                &MZ_PSEUDO_TYPES
1741            }
1742            _ => {
1743                packer.push(Datum::String(&id.to_string()));
1744                &MZ_BASE_TYPES
1745            }
1746        };
1747        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1748
1749        if let Some(pg_metadata) = &typ.details.pg_metadata {
1750            out.push(BuiltinTableUpdate::row(
1751                &*MZ_TYPE_PG_METADATA,
1752                Row::pack_slice(&[
1753                    Datum::String(&id.to_string()),
1754                    Datum::UInt32(pg_metadata.typinput_oid),
1755                    Datum::UInt32(pg_metadata.typreceive_oid),
1756                ]),
1757                diff,
1758            ));
1759        }
1760
1761        out
1762    }
1763
1764    fn pack_func_update(
1765        &self,
1766        id: CatalogItemId,
1767        schema_id: &SchemaSpecifier,
1768        name: &str,
1769        owner_id: &RoleId,
1770        func: &Func,
1771        diff: Diff,
1772    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1773        let mut updates = vec![];
1774        for func_impl_details in func.inner.func_impls() {
1775            let arg_type_ids = func_impl_details
1776                .arg_typs
1777                .iter()
1778                .map(|typ| self.get_system_type(typ).id().to_string())
1779                .collect::<Vec<_>>();
1780
1781            let mut row = Row::default();
1782            row.packer()
1783                .try_push_array(
1784                    &[ArrayDimension {
1785                        lower_bound: 1,
1786                        length: arg_type_ids.len(),
1787                    }],
1788                    arg_type_ids.iter().map(|id| Datum::String(id)),
1789                )
1790                .expect(
1791                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1792                );
1793            let arg_type_ids = row.unpack_first();
1794
1795            updates.push(BuiltinTableUpdate::row(
1796                &*MZ_FUNCTIONS,
1797                Row::pack_slice(&[
1798                    Datum::String(&id.to_string()),
1799                    Datum::UInt32(func_impl_details.oid),
1800                    Datum::String(&schema_id.to_string()),
1801                    Datum::String(name),
1802                    arg_type_ids,
1803                    Datum::from(
1804                        func_impl_details
1805                            .variadic_typ
1806                            .map(|typ| self.get_system_type(typ).id().to_string())
1807                            .as_deref(),
1808                    ),
1809                    Datum::from(
1810                        func_impl_details
1811                            .return_typ
1812                            .map(|typ| self.get_system_type(typ).id().to_string())
1813                            .as_deref(),
1814                    ),
1815                    func_impl_details.return_is_set.into(),
1816                    Datum::String(&owner_id.to_string()),
1817                ]),
1818                diff,
1819            ));
1820
1821            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1822                updates.push(BuiltinTableUpdate::row(
1823                    &*MZ_AGGREGATES,
1824                    Row::pack_slice(&[
1825                        Datum::UInt32(func_impl_details.oid),
1826                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1827                        Datum::String("n"),
1828                        Datum::Int16(0),
1829                    ]),
1830                    diff,
1831                ));
1832            }
1833        }
1834        updates
1835    }
1836
1837    pub fn pack_op_update(
1838        &self,
1839        operator: &str,
1840        func_impl_details: FuncImplCatalogDetails,
1841        diff: Diff,
1842    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1843        let arg_type_ids = func_impl_details
1844            .arg_typs
1845            .iter()
1846            .map(|typ| self.get_system_type(typ).id().to_string())
1847            .collect::<Vec<_>>();
1848
1849        let mut row = Row::default();
1850        row.packer()
1851            .try_push_array(
1852                &[ArrayDimension {
1853                    lower_bound: 1,
1854                    length: arg_type_ids.len(),
1855                }],
1856                arg_type_ids.iter().map(|id| Datum::String(id)),
1857            )
1858            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1859        let arg_type_ids = row.unpack_first();
1860
1861        BuiltinTableUpdate::row(
1862            &*MZ_OPERATORS,
1863            Row::pack_slice(&[
1864                Datum::UInt32(func_impl_details.oid),
1865                Datum::String(operator),
1866                arg_type_ids,
1867                Datum::from(
1868                    func_impl_details
1869                        .return_typ
1870                        .map(|typ| self.get_system_type(typ).id().to_string())
1871                        .as_deref(),
1872                ),
1873            ]),
1874            diff,
1875        )
1876    }
1877
1878    fn pack_secret_update(
1879        &self,
1880        id: CatalogItemId,
1881        oid: u32,
1882        schema_id: &SchemaSpecifier,
1883        name: &str,
1884        owner_id: &RoleId,
1885        privileges: Datum,
1886        diff: Diff,
1887    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1888        vec![BuiltinTableUpdate::row(
1889            &*MZ_SECRETS,
1890            Row::pack_slice(&[
1891                Datum::String(&id.to_string()),
1892                Datum::UInt32(oid),
1893                Datum::String(&schema_id.to_string()),
1894                Datum::String(name),
1895                Datum::String(&owner_id.to_string()),
1896                privileges,
1897            ]),
1898            diff,
1899        )]
1900    }
1901
1902    pub fn pack_audit_log_update(
1903        &self,
1904        event: &VersionedEvent,
1905        diff: Diff,
1906    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1907        let (event_type, object_type, details, user, occurred_at): (
1908            &EventType,
1909            &ObjectType,
1910            &EventDetails,
1911            &Option<String>,
1912            u64,
1913        ) = match event {
1914            VersionedEvent::V1(ev) => (
1915                &ev.event_type,
1916                &ev.object_type,
1917                &ev.details,
1918                &ev.user,
1919                ev.occurred_at,
1920            ),
1921        };
1922        let details = Jsonb::from_serde_json(details.as_json())
1923            .map_err(|e| {
1924                Error::new(ErrorKind::Unstructured(format!(
1925                    "could not pack audit log update: {}",
1926                    e
1927                )))
1928            })?
1929            .into_row();
1930        let details = details
1931            .iter()
1932            .next()
1933            .expect("details created above with a single jsonb column");
1934        let dt = mz_ore::now::to_datetime(occurred_at);
1935        let id = event.sortable_id();
1936        Ok(BuiltinTableUpdate::row(
1937            &*MZ_AUDIT_EVENTS,
1938            Row::pack_slice(&[
1939                Datum::UInt64(id),
1940                Datum::String(&format!("{}", event_type)),
1941                Datum::String(&format!("{}", object_type)),
1942                details,
1943                match user {
1944                    Some(user) => Datum::String(user),
1945                    None => Datum::Null,
1946                },
1947                Datum::TimestampTz(dt.try_into().expect("must fit")),
1948            ]),
1949            diff,
1950        ))
1951    }
1952
1953    pub fn pack_storage_usage_update(
1954        &self,
1955        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1956        diff: Diff,
1957    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1958        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1959        let row = Row::pack_slice(&[
1960            Datum::UInt64(event.id),
1961            Datum::from(event.shard_id.as_deref()),
1962            Datum::UInt64(event.size_bytes),
1963            Datum::TimestampTz(
1964                mz_ore::now::to_datetime(event.collection_timestamp)
1965                    .try_into()
1966                    .expect("must fit"),
1967            ),
1968        ]);
1969        BuiltinTableUpdate::row(id, row, diff)
1970    }
1971
1972    pub fn pack_egress_ip_update(
1973        &self,
1974        ip: &IpNet,
1975    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1976        let id = &MZ_EGRESS_IPS;
1977        let addr = ip.addr();
1978        let row = Row::pack_slice(&[
1979            Datum::String(&addr.to_string()),
1980            Datum::Int32(ip.prefix_len().into()),
1981            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1982        ]);
1983        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1984    }
1985
1986    pub fn pack_license_key_update(
1987        &self,
1988        license_key: &ValidatedLicenseKey,
1989    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1990        let id = &MZ_LICENSE_KEYS;
1991        let row = Row::pack_slice(&[
1992            Datum::String(&license_key.id),
1993            Datum::String(&license_key.organization),
1994            Datum::String(&license_key.environment_id),
1995            Datum::TimestampTz(
1996                mz_ore::now::to_datetime(license_key.expiration * 1000)
1997                    .try_into()
1998                    .expect("must fit"),
1999            ),
2000            Datum::TimestampTz(
2001                mz_ore::now::to_datetime(license_key.not_before * 1000)
2002                    .try_into()
2003                    .expect("must fit"),
2004            ),
2005        ]);
2006        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2007    }
2008
2009    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2010        let mut updates = Vec::new();
2011        for (size, alloc) in &self.cluster_replica_sizes.0 {
2012            if alloc.disabled {
2013                continue;
2014            }
2015
2016            // Just invent something when the limits are `None`, which only happens in non-prod
2017            // environments (tests, process orchestrator, etc.)
2018            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2019            let MemoryLimit(ByteSize(memory_bytes)) =
2020                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2021            let DiskLimit(ByteSize(disk_bytes)) =
2022                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2023
2024            let row = Row::pack_slice(&[
2025                size.as_str().into(),
2026                u64::from(alloc.scale).into(),
2027                u64::cast_from(alloc.workers).into(),
2028                cpu_limit.as_nanocpus().into(),
2029                memory_bytes.into(),
2030                disk_bytes.into(),
2031                (alloc.credits_per_hour).into(),
2032            ]);
2033
2034            updates.push(BuiltinTableUpdate::row(
2035                &*MZ_CLUSTER_REPLICA_SIZES,
2036                row,
2037                Diff::ONE,
2038            ));
2039        }
2040
2041        updates
2042    }
2043
2044    pub fn pack_subscribe_update(
2045        &self,
2046        id: GlobalId,
2047        subscribe: &ActiveSubscribe,
2048        diff: Diff,
2049    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2050        let mut row = Row::default();
2051        let mut packer = row.packer();
2052        packer.push(Datum::String(&id.to_string()));
2053        packer.push(Datum::Uuid(subscribe.session_uuid));
2054        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2055
2056        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2057        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2058
2059        let depends_on: Vec<_> = subscribe
2060            .depends_on
2061            .iter()
2062            .map(|id| id.to_string())
2063            .collect();
2064        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2065
2066        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2067    }
2068
2069    pub fn pack_session_update(
2070        &self,
2071        conn: &ConnMeta,
2072        diff: Diff,
2073    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2074        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2075        BuiltinTableUpdate::row(
2076            &*MZ_SESSIONS,
2077            Row::pack_slice(&[
2078                Datum::Uuid(conn.uuid()),
2079                Datum::UInt32(conn.conn_id().unhandled()),
2080                Datum::String(&conn.authenticated_role_id().to_string()),
2081                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2082                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2083            ]),
2084            diff,
2085        )
2086    }
2087
2088    pub fn pack_default_privileges_update(
2089        &self,
2090        default_privilege_object: &DefaultPrivilegeObject,
2091        grantee: &RoleId,
2092        acl_mode: &AclMode,
2093        diff: Diff,
2094    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2095        BuiltinTableUpdate::row(
2096            &*MZ_DEFAULT_PRIVILEGES,
2097            Row::pack_slice(&[
2098                default_privilege_object.role_id.to_string().as_str().into(),
2099                default_privilege_object
2100                    .database_id
2101                    .map(|database_id| database_id.to_string())
2102                    .as_deref()
2103                    .into(),
2104                default_privilege_object
2105                    .schema_id
2106                    .map(|schema_id| schema_id.to_string())
2107                    .as_deref()
2108                    .into(),
2109                default_privilege_object
2110                    .object_type
2111                    .to_string()
2112                    .to_lowercase()
2113                    .as_str()
2114                    .into(),
2115                grantee.to_string().as_str().into(),
2116                acl_mode.to_string().as_str().into(),
2117            ]),
2118            diff,
2119        )
2120    }
2121
2122    pub fn pack_system_privileges_update(
2123        &self,
2124        privileges: MzAclItem,
2125        diff: Diff,
2126    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2127        BuiltinTableUpdate::row(
2128            &*MZ_SYSTEM_PRIVILEGES,
2129            Row::pack_slice(&[privileges.into()]),
2130            diff,
2131        )
2132    }
2133
2134    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2135        let mut row = Row::default();
2136        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2137        row.packer()
2138            .try_push_array(
2139                &[ArrayDimension {
2140                    lower_bound: 1,
2141                    length: flat_privileges.len(),
2142                }],
2143                flat_privileges
2144                    .into_iter()
2145                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2146            )
2147            .expect("privileges is 1 dimensional, and its length is used for the array length");
2148        row
2149    }
2150
2151    pub fn pack_comment_update(
2152        &self,
2153        object_id: CommentObjectId,
2154        column_pos: Option<usize>,
2155        comment: &str,
2156        diff: Diff,
2157    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2158        // Use the audit log representation so it's easier to join against.
2159        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2160        let audit_type = super::object_type_to_audit_object_type(object_type);
2161        let object_type_str = audit_type.to_string();
2162
2163        let object_id_str = match object_id {
2164            CommentObjectId::Table(global_id)
2165            | CommentObjectId::View(global_id)
2166            | CommentObjectId::MaterializedView(global_id)
2167            | CommentObjectId::Source(global_id)
2168            | CommentObjectId::Sink(global_id)
2169            | CommentObjectId::Index(global_id)
2170            | CommentObjectId::Func(global_id)
2171            | CommentObjectId::Connection(global_id)
2172            | CommentObjectId::Secret(global_id)
2173            | CommentObjectId::Type(global_id)
2174            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2175            CommentObjectId::Role(role_id) => role_id.to_string(),
2176            CommentObjectId::Database(database_id) => database_id.to_string(),
2177            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2178            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2179            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2180            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2181        };
2182        let column_pos_datum = match column_pos {
2183            Some(pos) => {
2184                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2185                let pos =
2186                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2187                Datum::Int32(pos)
2188            }
2189            None => Datum::Null,
2190        };
2191
2192        BuiltinTableUpdate::row(
2193            &*MZ_COMMENTS,
2194            Row::pack_slice(&[
2195                Datum::String(&object_id_str),
2196                Datum::String(&object_type_str),
2197                column_pos_datum,
2198                Datum::String(comment),
2199            ]),
2200            diff,
2201        )
2202    }
2203
2204    pub fn pack_webhook_source_update(
2205        &self,
2206        item_id: CatalogItemId,
2207        diff: Diff,
2208    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2209        let url = self
2210            .try_get_webhook_url(&item_id)
2211            .expect("webhook source should exist");
2212        let url = url.to_string();
2213        let name = &self.get_entry(&item_id).name().item;
2214        let id_str = item_id.to_string();
2215
2216        BuiltinTableUpdate::row(
2217            &*MZ_WEBHOOKS_SOURCES,
2218            Row::pack_slice(&[
2219                Datum::String(&id_str),
2220                Datum::String(name),
2221                Datum::String(&url),
2222            ]),
2223            diff,
2224        )
2225    }
2226
2227    pub fn pack_source_references_update(
2228        &self,
2229        source_references: &SourceReferences,
2230        diff: Diff,
2231    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2232        let source_id = source_references.source_id.to_string();
2233        let updated_at = &source_references.updated_at;
2234        source_references
2235            .references
2236            .iter()
2237            .map(|reference| {
2238                let mut row = Row::default();
2239                let mut packer = row.packer();
2240                packer.extend([
2241                    Datum::String(&source_id),
2242                    reference
2243                        .namespace
2244                        .as_ref()
2245                        .map(|s| Datum::String(s))
2246                        .unwrap_or(Datum::Null),
2247                    Datum::String(&reference.name),
2248                    Datum::TimestampTz(
2249                        mz_ore::now::to_datetime(*updated_at)
2250                            .try_into()
2251                            .expect("must fit"),
2252                    ),
2253                ]);
2254                if reference.columns.len() > 0 {
2255                    packer
2256                        .try_push_array(
2257                            &[ArrayDimension {
2258                                lower_bound: 1,
2259                                length: reference.columns.len(),
2260                            }],
2261                            reference.columns.iter().map(|col| Datum::String(col)),
2262                        )
2263                        .expect(
2264                            "columns is 1 dimensional, and its length is used for the array length",
2265                        );
2266                } else {
2267                    packer.push(Datum::Null);
2268                }
2269
2270                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2271            })
2272            .collect()
2273    }
2274}