Skip to main content

mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20    MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS, MZ_CONTINUAL_TASKS,
21    MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES,
22    MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
23    MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
25    MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES,
26    MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS,
27    MZ_ROLES, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
28    MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
29    MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
30    MZ_WEBHOOKS_SOURCES,
31};
32use mz_catalog::config::AwsPrincipalContext;
33use mz_catalog::durable::SourceReferences;
34use mz_catalog::memory::error::{Error, ErrorKind};
35use mz_catalog::memory::objects::{
36    CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
37    Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
38};
39use mz_controller::clusters::{
40    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
41};
42use mz_controller_types::ClusterId;
43use mz_expr::MirScalarExpr;
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
46use mz_ore::cast::CastFrom;
47use mz_ore::collections::CollectionExt;
48use mz_persist_client::batch::ProtoBatch;
49use mz_repr::adt::array::ArrayDimension;
50use mz_repr::adt::interval::Interval;
51use mz_repr::adt::jsonb::Jsonb;
52use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
53use mz_repr::refresh_schedule::RefreshEvery;
54use mz_repr::role_id::RoleId;
55use mz_repr::{
56    CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
57};
58use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
59use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
60use mz_sql::func::FuncImplCatalogDetails;
61use mz_sql::names::{CommentObjectId, SchemaSpecifier};
62use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
63use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
64use mz_sql::session::vars::SessionVars;
65use mz_sql_parser::ast::display::AstDisplay;
66use mz_storage_client::client::TableData;
67use mz_storage_types::connections::KafkaConnection;
68use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
69use mz_storage_types::connections::inline::ReferencedConnection;
70use mz_storage_types::connections::string_or_secret::StringOrSecret;
71use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
72use mz_storage_types::sources::{
73    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
74};
75use smallvec::smallvec;
76
77// DO NOT add any more imports from `crate` outside of `crate::catalog`.
78use crate::active_compute_sink::ActiveSubscribe;
79use crate::catalog::CatalogState;
80use crate::coord::ConnMeta;
81
82/// An update to a built-in table.
83#[derive(Debug, Clone)]
84pub struct BuiltinTableUpdate<T = CatalogItemId> {
85    /// The reference of the table to update.
86    pub id: T,
87    /// The data to put into the table.
88    pub data: TableData,
89}
90
91impl<T> BuiltinTableUpdate<T> {
92    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
93    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
94        BuiltinTableUpdate {
95            id,
96            data: TableData::Rows(vec![(row, diff)]),
97        }
98    }
99
100    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
101        BuiltinTableUpdate {
102            id,
103            data: TableData::Batches(smallvec![batch]),
104        }
105    }
106}
107
108impl CatalogState {
109    pub fn resolve_builtin_table_updates(
110        &self,
111        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
112    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
113        builtin_table_update
114            .into_iter()
115            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
116            .collect()
117    }
118
119    pub fn resolve_builtin_table_update(
120        &self,
121        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
122    ) -> BuiltinTableUpdate<CatalogItemId> {
123        let id = self.resolve_builtin_table(id);
124        BuiltinTableUpdate { id, data }
125    }
126
127    pub fn pack_depends_update(
128        &self,
129        depender: CatalogItemId,
130        dependee: CatalogItemId,
131        diff: Diff,
132    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
133        let row = Row::pack_slice(&[
134            Datum::String(&depender.to_string()),
135            Datum::String(&dependee.to_string()),
136        ]);
137        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
138    }
139
140    pub(super) fn pack_role_auth_update(
141        &self,
142        id: RoleId,
143        diff: Diff,
144    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
145        let role_auth = self.get_role_auth(&id);
146        let role = self.get_role(&id);
147        BuiltinTableUpdate::row(
148            &*MZ_ROLE_AUTH,
149            Row::pack_slice(&[
150                Datum::String(&role_auth.role_id.to_string()),
151                Datum::UInt32(role.oid),
152                match &role_auth.password_hash {
153                    Some(hash) => Datum::String(hash),
154                    None => Datum::Null,
155                },
156                Datum::TimestampTz(
157                    mz_ore::now::to_datetime(role_auth.updated_at)
158                        .try_into()
159                        .expect("must fit"),
160                ),
161            ]),
162            diff,
163        )
164    }
165
166    pub(super) fn pack_role_update(
167        &self,
168        id: RoleId,
169        diff: Diff,
170    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
171        match id {
172            // PUBLIC role should not show up in mz_roles.
173            RoleId::Public => vec![],
174            id => {
175                let role = self.get_role(&id);
176                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
177
178                let rolcanlogin = if let Some(login) = role.attributes.login {
179                    login
180                } else {
181                    builtin_supers.contains(&role.id)
182                };
183
184                let rolsuper = if let Some(superuser) = role.attributes.superuser {
185                    Datum::from(superuser)
186                } else if builtin_supers.contains(&role.id) {
187                    // System roles (mz_system, mz_support) are superusers
188                    Datum::from(true)
189                } else {
190                    // In cloud environments, superuser status is determined
191                    // at login, so we set `rolsuper` to `null` here because
192                    // it cannot be known beforehand. For self-managed
193                    // auth, roles created without an explicit SUPERUSER
194                    // attribute would typically have `rolsuper` set to false.
195                    // However, since we cannot reliably distinguish between
196                    // cloud and self-managed here, we conservatively use NULL
197                    // for indeterminate cases.
198                    Datum::Null
199                };
200
201                let role_update = BuiltinTableUpdate::row(
202                    &*MZ_ROLES,
203                    Row::pack_slice(&[
204                        Datum::String(&role.id.to_string()),
205                        Datum::UInt32(role.oid),
206                        Datum::String(&role.name),
207                        Datum::from(role.attributes.inherit),
208                        Datum::from(rolcanlogin),
209                        rolsuper,
210                    ]),
211                    diff,
212                );
213                let mut updates = vec![role_update];
214
215                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
216                // we should instead have a static list of all session vars.
217                let session_vars_reference = SessionVars::new_unchecked(
218                    &mz_build_info::DUMMY_BUILD_INFO,
219                    SYSTEM_USER.clone(),
220                    None,
221                );
222
223                for (name, val) in role.vars() {
224                    let result = session_vars_reference
225                        .inspect(name)
226                        .and_then(|var| var.check(val.borrow()));
227                    let Ok(formatted_val) = result else {
228                        // Note: all variables should have been validated by this point, so we
229                        // shouldn't ever hit this.
230                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
231                        continue;
232                    };
233
234                    let role_var_update = BuiltinTableUpdate::row(
235                        &*MZ_ROLE_PARAMETERS,
236                        Row::pack_slice(&[
237                            Datum::String(&role.id.to_string()),
238                            Datum::String(name),
239                            Datum::String(&formatted_val),
240                        ]),
241                        diff,
242                    );
243                    updates.push(role_var_update);
244                }
245
246                updates
247            }
248        }
249    }
250
251    pub(super) fn pack_cluster_update(
252        &self,
253        name: &str,
254        diff: Diff,
255    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
256        let id = self.clusters_by_name[name];
257        let cluster = &self.clusters_by_id[&id];
258        let row = self.pack_privilege_array_row(cluster.privileges());
259        let privileges = row.unpack_first();
260        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
261            match &cluster.config.variant {
262                ClusterVariant::Managed(config) => (
263                    Some(config.size.as_str()),
264                    Some(self.cluster_replica_size_has_disk(&config.size)),
265                    Some(config.replication_factor),
266                    if config.availability_zones.is_empty() {
267                        None
268                    } else {
269                        Some(config.availability_zones.clone())
270                    },
271                    Some(config.logging.log_logging),
272                    config.logging.interval.map(|d| {
273                        Interval::from_duration(&d)
274                            .expect("planning ensured this convertible back to interval")
275                    }),
276                ),
277                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
278            };
279
280        let mut row = Row::default();
281        let mut packer = row.packer();
282        packer.extend([
283            Datum::String(&id.to_string()),
284            Datum::String(name),
285            Datum::String(&cluster.owner_id.to_string()),
286            privileges,
287            cluster.is_managed().into(),
288            size.into(),
289            replication_factor.into(),
290            disk.into(),
291        ]);
292        if let Some(azs) = azs {
293            packer.push_list(azs.iter().map(|az| Datum::String(az)));
294        } else {
295            packer.push(Datum::Null);
296        }
297        packer.push(Datum::from(introspection_debugging));
298        packer.push(Datum::from(introspection_interval));
299
300        let mut updates = Vec::new();
301
302        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
303
304        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
305            let row = match managed_config.schedule {
306                ClusterSchedule::Manual => Row::pack_slice(&[
307                    Datum::String(&id.to_string()),
308                    Datum::String("manual"),
309                    Datum::Null,
310                ]),
311                ClusterSchedule::Refresh {
312                    hydration_time_estimate,
313                } => Row::pack_slice(&[
314                    Datum::String(&id.to_string()),
315                    Datum::String("on-refresh"),
316                    Datum::Interval(
317                        Interval::from_duration(&hydration_time_estimate)
318                            .expect("planning ensured that this is convertible back to Interval"),
319                    ),
320                ]),
321            };
322            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
323        }
324
325        updates
326    }
327
328    pub(super) fn pack_cluster_replica_update(
329        &self,
330        cluster_id: ClusterId,
331        name: &str,
332        diff: Diff,
333    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
334        let cluster = &self.clusters_by_id[&cluster_id];
335        let id = cluster.replica_id(name).expect("Must exist");
336        let replica = cluster.replica(id).expect("Must exist");
337
338        let (size, disk, az) = match &replica.config.location {
339            // TODO(guswynn): The column should be `availability_zones`, not
340            // `availability_zone`.
341            ReplicaLocation::Managed(ManagedReplicaLocation {
342                size,
343                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
344                allocation: _,
345                billed_as: _,
346                internal: _,
347                pending: _,
348            }) => (
349                Some(&**size),
350                Some(self.cluster_replica_size_has_disk(size)),
351                Some(az.as_str()),
352            ),
353            ReplicaLocation::Managed(ManagedReplicaLocation {
354                size,
355                availability_zones: _,
356                allocation: _,
357                billed_as: _,
358                internal: _,
359                pending: _,
360            }) => (
361                Some(&**size),
362                Some(self.cluster_replica_size_has_disk(size)),
363                None,
364            ),
365            ReplicaLocation::Unmanaged(_) => (None, None, None),
366        };
367
368        let cluster_replica_update = BuiltinTableUpdate::row(
369            &*MZ_CLUSTER_REPLICAS,
370            Row::pack_slice(&[
371                Datum::String(&id.to_string()),
372                Datum::String(name),
373                Datum::String(&cluster_id.to_string()),
374                Datum::from(size),
375                Datum::from(az),
376                Datum::String(&replica.owner_id.to_string()),
377                Datum::from(disk),
378            ]),
379            diff,
380        );
381
382        vec![cluster_replica_update]
383    }
384
385    pub(super) fn pack_item_update(
386        &self,
387        id: CatalogItemId,
388        diff: Diff,
389    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
390        let entry = self.get_entry(&id);
391        let oid = entry.oid();
392        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
393        let schema_id = &self
394            .get_schema(
395                &entry.name().qualifiers.database_spec,
396                &entry.name().qualifiers.schema_spec,
397                conn_id,
398            )
399            .id;
400        let name = &entry.name().item;
401        let owner_id = entry.owner_id();
402        let privileges_row = self.pack_privilege_array_row(entry.privileges());
403        let privileges = privileges_row.unpack_first();
404        let mut updates = match entry.item() {
405            CatalogItem::Log(_) => self.pack_source_update(
406                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
407                privileges, diff, None,
408            ),
409            CatalogItem::Index(index) => {
410                self.pack_index_update(id, oid, name, owner_id, index, diff)
411            }
412            CatalogItem::Table(table) => {
413                let mut updates = self
414                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
415
416                if let TableDataSource::DataSource {
417                    desc: data_source,
418                    timeline: _,
419                } = &table.data_source
420                {
421                    updates.extend(match data_source {
422                        DataSourceDesc::IngestionExport {
423                            ingestion_id,
424                            external_reference: UnresolvedItemName(external_reference),
425                            details: _,
426                            data_config: _,
427                        } => {
428                            let ingestion_entry = self
429                                .get_entry(ingestion_id)
430                                .source_desc()
431                                .expect("primary source exists")
432                                .expect("primary source is a source");
433
434                            match ingestion_entry.connection.name() {
435                                "postgres" => {
436                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
437                                    // The left-most qualification of Postgres
438                                    // tables is the database, but this
439                                    // information is redundant because each
440                                    // Postgres connection connects to only one
441                                    // database.
442                                    let schema_name = external_reference[1].as_str();
443                                    let table_name = external_reference[2].as_str();
444
445                                    self.pack_postgres_source_tables_update(
446                                        id,
447                                        schema_name,
448                                        table_name,
449                                        diff,
450                                    )
451                                }
452                                "mysql" => {
453                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
454                                    let schema_name = external_reference[0].as_str();
455                                    let table_name = external_reference[1].as_str();
456
457                                    self.pack_mysql_source_tables_update(
458                                        id,
459                                        schema_name,
460                                        table_name,
461                                        diff,
462                                    )
463                                }
464                                "sql-server" => {
465                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
466                                    // The left-most qualification of SQL Server tables is
467                                    // the database, but this information is redundant
468                                    // because each SQL Server connection connects to
469                                    // only one database.
470                                    let schema_name = external_reference[1].as_str();
471                                    let table_name = external_reference[2].as_str();
472
473                                    self.pack_sql_server_source_table_update(
474                                        id,
475                                        schema_name,
476                                        table_name,
477                                        diff,
478                                    )
479                                }
480                                // Load generator sources don't have any special
481                                // updates.
482                                "load-generator" => vec![],
483                                "kafka" => {
484                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
485                                    let topic = external_reference[0].as_str();
486                                    let envelope = data_source.envelope();
487                                    let (key_format, value_format) = data_source.formats();
488
489                                    self.pack_kafka_source_tables_update(
490                                        id,
491                                        topic,
492                                        envelope,
493                                        key_format,
494                                        value_format,
495                                        diff,
496                                    )
497                                }
498                                s => unreachable!("{s} sources do not have tables"),
499                            }
500                        }
501                        DataSourceDesc::Ingestion { .. }
502                        | DataSourceDesc::OldSyntaxIngestion { .. }
503                        | DataSourceDesc::Introspection(_)
504                        | DataSourceDesc::Progress
505                        | DataSourceDesc::Webhook { .. }
506                        | DataSourceDesc::Catalog => vec![],
507                    });
508                }
509
510                updates
511            }
512            CatalogItem::Source(source) => {
513                let source_type = source.source_type();
514                let connection_id = source.connection_id();
515                let envelope = source.data_source.envelope();
516                let cluster_entry = match source.data_source {
517                    // Ingestion exports don't have their own cluster, but
518                    // run on their ingestion's cluster.
519                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
520                        self.get_entry(&ingestion_id)
521                    }
522                    DataSourceDesc::Ingestion { .. }
523                    | DataSourceDesc::OldSyntaxIngestion { .. }
524                    | DataSourceDesc::Introspection(_)
525                    | DataSourceDesc::Progress
526                    | DataSourceDesc::Webhook { .. }
527                    | DataSourceDesc::Catalog => entry,
528                };
529
530                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
531
532                let (key_format, value_format) = source.data_source.formats();
533
534                let mut updates = self.pack_source_update(
535                    id,
536                    oid,
537                    schema_id,
538                    name,
539                    source_type,
540                    connection_id,
541                    envelope,
542                    key_format,
543                    value_format,
544                    cluster_id.as_deref(),
545                    owner_id,
546                    privileges,
547                    diff,
548                    source.create_sql.as_ref(),
549                );
550
551                updates.extend(match &source.data_source {
552                    DataSourceDesc::Ingestion { desc, .. }
553                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
554                        GenericSourceConnection::Postgres(postgres) => {
555                            self.pack_postgres_source_update(id, postgres, diff)
556                        }
557                        GenericSourceConnection::Kafka(kafka) => {
558                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
559                        }
560                        _ => vec![],
561                    },
562                    DataSourceDesc::IngestionExport {
563                        ingestion_id,
564                        external_reference: UnresolvedItemName(external_reference),
565                        details: _,
566                        data_config: _,
567                    } => {
568                        let ingestion_entry = self
569                            .get_entry(ingestion_id)
570                            .source_desc()
571                            .expect("primary source exists")
572                            .expect("primary source is a source");
573
574                        match ingestion_entry.connection.name() {
575                            "postgres" => {
576                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
577                                // The left-most qualification of Postgres
578                                // tables is the database, but this
579                                // information is redundant because each
580                                // Postgres connection connects to only one
581                                // database.
582                                let schema_name = external_reference[1].as_str();
583                                let table_name = external_reference[2].as_str();
584
585                                self.pack_postgres_source_tables_update(
586                                    id,
587                                    schema_name,
588                                    table_name,
589                                    diff,
590                                )
591                            }
592                            "mysql" => {
593                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
594                                let schema_name = external_reference[0].as_str();
595                                let table_name = external_reference[1].as_str();
596
597                                self.pack_mysql_source_tables_update(
598                                    id,
599                                    schema_name,
600                                    table_name,
601                                    diff,
602                                )
603                            }
604                            "sql-server" => {
605                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
606                                // The left-most qualification of SQL Server tables is
607                                // the database, but this information is redundant
608                                // because each SQL Server connection connects to
609                                // only one database.
610                                let schema_name = external_reference[1].as_str();
611                                let table_name = external_reference[2].as_str();
612
613                                self.pack_sql_server_source_table_update(
614                                    id,
615                                    schema_name,
616                                    table_name,
617                                    diff,
618                                )
619                            }
620                            // Load generator sources don't have any special
621                            // updates.
622                            "load-generator" => vec![],
623                            s => unreachable!("{s} sources do not have subsources"),
624                        }
625                    }
626                    DataSourceDesc::Webhook { .. } => {
627                        vec![self.pack_webhook_source_update(id, diff)]
628                    }
629                    DataSourceDesc::Introspection(_)
630                    | DataSourceDesc::Progress
631                    | DataSourceDesc::Catalog => vec![],
632                });
633
634                updates
635            }
636            CatalogItem::View(view) => {
637                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
638            }
639            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
640                id, oid, schema_id, name, owner_id, privileges, mview, diff,
641            ),
642            CatalogItem::Sink(sink) => {
643                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
644            }
645            CatalogItem::Type(ty) => {
646                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
647            }
648            CatalogItem::Func(func) => {
649                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
650            }
651            CatalogItem::Secret(_) => {
652                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
653            }
654            CatalogItem::Connection(connection) => self.pack_connection_update(
655                id, oid, schema_id, name, owner_id, privileges, connection, diff,
656            ),
657            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
658                id, oid, schema_id, name, owner_id, privileges, ct, diff,
659            ),
660        };
661
662        if !entry.item().is_temporary() {
663            // Populate or clean up the `mz_object_dependencies` table.
664            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
665            for dependee in entry.item().references().items() {
666                updates.push(self.pack_depends_update(id, *dependee, diff))
667            }
668        }
669
670        // Always report the latest for an objects columns.
671        if let Some(desc) = entry.relation_desc_latest() {
672            let defaults = match entry.item() {
673                CatalogItem::Table(Table {
674                    data_source: TableDataSource::TableWrites { defaults },
675                    ..
676                }) => Some(defaults),
677                _ => None,
678            };
679            for (i, (column_name, column_type)) in desc.iter().enumerate() {
680                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
681                let default: Datum = default
682                    .as_ref()
683                    .map(|d| Datum::String(d))
684                    .unwrap_or(Datum::Null);
685                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
686                let (type_name, type_oid) = match &column_type.scalar_type {
687                    SqlScalarType::List {
688                        custom_id: Some(custom_id),
689                        ..
690                    }
691                    | SqlScalarType::Map {
692                        custom_id: Some(custom_id),
693                        ..
694                    }
695                    | SqlScalarType::Record {
696                        custom_id: Some(custom_id),
697                        ..
698                    } => {
699                        let entry = self.get_entry(custom_id);
700                        // NOTE(benesch): the `mz_columns.type text` field is
701                        // wrong. Types do not have a name that can be
702                        // represented as a single textual field. There can be
703                        // multiple types with the same name in different
704                        // schemas and databases. We should eventually deprecate
705                        // the `type` field in favor of a new `type_id` field
706                        // that can be joined against `mz_types`.
707                        //
708                        // For now, in the interest of pragmatism, we just use
709                        // the type's item name, and accept that there may be
710                        // ambiguity if the same type name is used in multiple
711                        // schemas. The ambiguity is mitigated by the OID, which
712                        // can be joined against `mz_types.oid` to resolve the
713                        // ambiguity.
714                        let name = &*entry.name().item;
715                        let oid = entry.oid();
716                        (name, oid)
717                    }
718                    _ => (pgtype.name(), pgtype.oid()),
719                };
720                updates.push(BuiltinTableUpdate::row(
721                    &*MZ_COLUMNS,
722                    Row::pack_slice(&[
723                        Datum::String(&id.to_string()),
724                        Datum::String(column_name),
725                        Datum::UInt64(u64::cast_from(i + 1)),
726                        Datum::from(column_type.nullable),
727                        Datum::String(type_name),
728                        default,
729                        Datum::UInt32(type_oid),
730                        Datum::Int32(pgtype.typmod()),
731                    ]),
732                    diff,
733                ));
734            }
735        }
736
737        // Use initial lcw so that we can tell apart default from non-existent windows.
738        if let Some(cw) = entry.item().initial_logical_compaction_window() {
739            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
740        }
741
742        updates.extend(Self::pack_item_global_id_update(entry, diff));
743
744        updates
745    }
746
747    fn pack_item_global_id_update(
748        entry: &CatalogEntry,
749        diff: Diff,
750    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
751        let id = entry.id().to_string();
752        let global_ids = entry.global_ids();
753        global_ids.map(move |global_id| {
754            BuiltinTableUpdate::row(
755                &*MZ_OBJECT_GLOBAL_IDS,
756                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
757                diff,
758            )
759        })
760    }
761
762    fn pack_history_retention_strategy_update(
763        &self,
764        id: CatalogItemId,
765        cw: CompactionWindow,
766        diff: Diff,
767    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
768        let cw: u64 = cw.comparable_timestamp().into();
769        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
770            .expect("must serialize");
771        BuiltinTableUpdate::row(
772            &*MZ_HISTORY_RETENTION_STRATEGIES,
773            Row::pack_slice(&[
774                Datum::String(&id.to_string()),
775                // FOR is the only strategy at the moment. We may introduce FROM or others later.
776                Datum::String("FOR"),
777                cw.into_row().into_element(),
778            ]),
779            diff,
780        )
781    }
782
783    fn pack_table_update(
784        &self,
785        id: CatalogItemId,
786        oid: u32,
787        schema_id: &SchemaSpecifier,
788        name: &str,
789        owner_id: &RoleId,
790        privileges: Datum,
791        diff: Diff,
792        table: &Table,
793    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
794        let redacted = table.create_sql.as_ref().map(|create_sql| {
795            mz_sql::parse::parse(create_sql)
796                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
797                .into_element()
798                .ast
799                .to_ast_string_redacted()
800        });
801        let source_id = if let TableDataSource::DataSource {
802            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
803            ..
804        } = &table.data_source
805        {
806            Some(ingestion_id.to_string())
807        } else {
808            None
809        };
810
811        vec![BuiltinTableUpdate::row(
812            &*MZ_TABLES,
813            Row::pack_slice(&[
814                Datum::String(&id.to_string()),
815                Datum::UInt32(oid),
816                Datum::String(&schema_id.to_string()),
817                Datum::String(name),
818                Datum::String(&owner_id.to_string()),
819                privileges,
820                if let Some(create_sql) = &table.create_sql {
821                    Datum::String(create_sql)
822                } else {
823                    Datum::Null
824                },
825                if let Some(redacted) = &redacted {
826                    Datum::String(redacted)
827                } else {
828                    Datum::Null
829                },
830                if let Some(source_id) = source_id.as_ref() {
831                    Datum::String(source_id)
832                } else {
833                    Datum::Null
834                },
835            ]),
836            diff,
837        )]
838    }
839
840    fn pack_source_update(
841        &self,
842        id: CatalogItemId,
843        oid: u32,
844        schema_id: &SchemaSpecifier,
845        name: &str,
846        source_desc_name: &str,
847        connection_id: Option<CatalogItemId>,
848        envelope: Option<&str>,
849        key_format: Option<&str>,
850        value_format: Option<&str>,
851        cluster_id: Option<&str>,
852        owner_id: &RoleId,
853        privileges: Datum,
854        diff: Diff,
855        create_sql: Option<&String>,
856    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
857        let redacted = create_sql.map(|create_sql| {
858            let create_stmt = mz_sql::parse::parse(create_sql)
859                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
860                .into_element()
861                .ast;
862            create_stmt.to_ast_string_redacted()
863        });
864        vec![BuiltinTableUpdate::row(
865            &*MZ_SOURCES,
866            Row::pack_slice(&[
867                Datum::String(&id.to_string()),
868                Datum::UInt32(oid),
869                Datum::String(&schema_id.to_string()),
870                Datum::String(name),
871                Datum::String(source_desc_name),
872                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
873                // This is the "source size", which is a remnant from linked
874                // clusters.
875                Datum::Null,
876                Datum::from(envelope),
877                Datum::from(key_format),
878                Datum::from(value_format),
879                Datum::from(cluster_id),
880                Datum::String(&owner_id.to_string()),
881                privileges,
882                if let Some(create_sql) = create_sql {
883                    Datum::String(create_sql)
884                } else {
885                    Datum::Null
886                },
887                if let Some(redacted) = &redacted {
888                    Datum::String(redacted)
889                } else {
890                    Datum::Null
891                },
892            ]),
893            diff,
894        )]
895    }
896
897    fn pack_postgres_source_update(
898        &self,
899        id: CatalogItemId,
900        postgres: &PostgresSourceConnection<ReferencedConnection>,
901        diff: Diff,
902    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
903        vec![BuiltinTableUpdate::row(
904            &*MZ_POSTGRES_SOURCES,
905            Row::pack_slice(&[
906                Datum::String(&id.to_string()),
907                Datum::String(&postgres.publication_details.slot),
908                Datum::from(postgres.publication_details.timeline_id),
909            ]),
910            diff,
911        )]
912    }
913
914    fn pack_kafka_source_update(
915        &self,
916        item_id: CatalogItemId,
917        collection_id: GlobalId,
918        kafka: &KafkaSourceConnection<ReferencedConnection>,
919        diff: Diff,
920    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
921        vec![BuiltinTableUpdate::row(
922            &*MZ_KAFKA_SOURCES,
923            Row::pack_slice(&[
924                Datum::String(&item_id.to_string()),
925                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
926                Datum::String(&kafka.topic),
927            ]),
928            diff,
929        )]
930    }
931
932    fn pack_postgres_source_tables_update(
933        &self,
934        id: CatalogItemId,
935        schema_name: &str,
936        table_name: &str,
937        diff: Diff,
938    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
939        vec![BuiltinTableUpdate::row(
940            &*MZ_POSTGRES_SOURCE_TABLES,
941            Row::pack_slice(&[
942                Datum::String(&id.to_string()),
943                Datum::String(schema_name),
944                Datum::String(table_name),
945            ]),
946            diff,
947        )]
948    }
949
950    fn pack_mysql_source_tables_update(
951        &self,
952        id: CatalogItemId,
953        schema_name: &str,
954        table_name: &str,
955        diff: Diff,
956    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
957        vec![BuiltinTableUpdate::row(
958            &*MZ_MYSQL_SOURCE_TABLES,
959            Row::pack_slice(&[
960                Datum::String(&id.to_string()),
961                Datum::String(schema_name),
962                Datum::String(table_name),
963            ]),
964            diff,
965        )]
966    }
967
968    fn pack_sql_server_source_table_update(
969        &self,
970        id: CatalogItemId,
971        schema_name: &str,
972        table_name: &str,
973        diff: Diff,
974    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
975        vec![BuiltinTableUpdate::row(
976            &*MZ_SQL_SERVER_SOURCE_TABLES,
977            Row::pack_slice(&[
978                Datum::String(&id.to_string()),
979                Datum::String(schema_name),
980                Datum::String(table_name),
981            ]),
982            diff,
983        )]
984    }
985
986    fn pack_kafka_source_tables_update(
987        &self,
988        id: CatalogItemId,
989        topic: &str,
990        envelope: Option<&str>,
991        key_format: Option<&str>,
992        value_format: Option<&str>,
993        diff: Diff,
994    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
995        vec![BuiltinTableUpdate::row(
996            &*MZ_KAFKA_SOURCE_TABLES,
997            Row::pack_slice(&[
998                Datum::String(&id.to_string()),
999                Datum::String(topic),
1000                Datum::from(envelope),
1001                Datum::from(key_format),
1002                Datum::from(value_format),
1003            ]),
1004            diff,
1005        )]
1006    }
1007
1008    fn pack_connection_update(
1009        &self,
1010        id: CatalogItemId,
1011        oid: u32,
1012        schema_id: &SchemaSpecifier,
1013        name: &str,
1014        owner_id: &RoleId,
1015        privileges: Datum,
1016        connection: &Connection,
1017        diff: Diff,
1018    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1019        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1020            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1021            .into_element()
1022            .ast;
1023        let mut updates = vec![BuiltinTableUpdate::row(
1024            &*MZ_CONNECTIONS,
1025            Row::pack_slice(&[
1026                Datum::String(&id.to_string()),
1027                Datum::UInt32(oid),
1028                Datum::String(&schema_id.to_string()),
1029                Datum::String(name),
1030                Datum::String(match connection.details {
1031                    ConnectionDetails::Kafka { .. } => "kafka",
1032                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1033                    ConnectionDetails::Postgres { .. } => "postgres",
1034                    ConnectionDetails::Aws(..) => "aws",
1035                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1036                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1037                    ConnectionDetails::MySql { .. } => "mysql",
1038                    ConnectionDetails::SqlServer(_) => "sql-server",
1039                    ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1040                }),
1041                Datum::String(&owner_id.to_string()),
1042                privileges,
1043                Datum::String(&connection.create_sql),
1044                Datum::String(&create_stmt.to_ast_string_redacted()),
1045            ]),
1046            diff,
1047        )];
1048        match connection.details {
1049            ConnectionDetails::Kafka(ref kafka) => {
1050                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1051            }
1052            ConnectionDetails::Aws(ref aws_config) => {
1053                match self.pack_aws_connection_update(id, aws_config, diff) {
1054                    Ok(update) => {
1055                        updates.push(update);
1056                    }
1057                    Err(e) => {
1058                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1059                    }
1060                }
1061            }
1062            ConnectionDetails::AwsPrivatelink(_) => {
1063                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1064                    updates.push(self.pack_aws_privatelink_connection_update(
1065                        id,
1066                        aws_principal_context,
1067                        diff,
1068                    ));
1069                } else {
1070                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1071                }
1072            }
1073            ConnectionDetails::Ssh {
1074                ref key_1,
1075                ref key_2,
1076                ..
1077            } => {
1078                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1079            }
1080            ConnectionDetails::Csr(_)
1081            | ConnectionDetails::Postgres(_)
1082            | ConnectionDetails::MySql(_)
1083            | ConnectionDetails::SqlServer(_)
1084            | ConnectionDetails::IcebergCatalog(_) => (),
1085        };
1086        updates
1087    }
1088
1089    pub(crate) fn pack_ssh_tunnel_connection_update(
1090        &self,
1091        id: CatalogItemId,
1092        key_1: &SshKey,
1093        key_2: &SshKey,
1094        diff: Diff,
1095    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1096        BuiltinTableUpdate::row(
1097            &*MZ_SSH_TUNNEL_CONNECTIONS,
1098            Row::pack_slice(&[
1099                Datum::String(&id.to_string()),
1100                Datum::String(key_1.public_key().as_str()),
1101                Datum::String(key_2.public_key().as_str()),
1102            ]),
1103            diff,
1104        )
1105    }
1106
1107    fn pack_kafka_connection_update(
1108        &self,
1109        id: CatalogItemId,
1110        kafka: &KafkaConnection<ReferencedConnection>,
1111        diff: Diff,
1112    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1113        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1114        let mut row = Row::default();
1115        row.packer()
1116            .try_push_array(
1117                &[ArrayDimension {
1118                    lower_bound: 1,
1119                    length: kafka.brokers.len(),
1120                }],
1121                kafka
1122                    .brokers
1123                    .iter()
1124                    .map(|broker| Datum::String(&broker.address)),
1125            )
1126            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1127        let brokers = row.unpack_first();
1128        vec![BuiltinTableUpdate::row(
1129            &*MZ_KAFKA_CONNECTIONS,
1130            Row::pack_slice(&[
1131                Datum::String(&id.to_string()),
1132                brokers,
1133                Datum::String(&progress_topic),
1134            ]),
1135            diff,
1136        )]
1137    }
1138
1139    pub fn pack_aws_privatelink_connection_update(
1140        &self,
1141        connection_id: CatalogItemId,
1142        aws_principal_context: &AwsPrincipalContext,
1143        diff: Diff,
1144    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1145        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1146        let row = Row::pack_slice(&[
1147            Datum::String(&connection_id.to_string()),
1148            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1149        ]);
1150        BuiltinTableUpdate::row(id, row, diff)
1151    }
1152
1153    pub fn pack_aws_connection_update(
1154        &self,
1155        connection_id: CatalogItemId,
1156        aws_config: &AwsConnection,
1157        diff: Diff,
1158    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1159        let id = &MZ_AWS_CONNECTIONS;
1160
1161        let mut access_key_id = None;
1162        let mut access_key_id_secret_id = None;
1163        let mut secret_access_key_secret_id = None;
1164        let mut session_token = None;
1165        let mut session_token_secret_id = None;
1166        let mut assume_role_arn = None;
1167        let mut assume_role_session_name = None;
1168        let mut principal = None;
1169        let mut external_id = None;
1170        let mut example_trust_policy = None;
1171        match &aws_config.auth {
1172            AwsAuth::Credentials(credentials) => {
1173                match &credentials.access_key_id {
1174                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1175                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1176                }
1177                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1178                match credentials.session_token.as_ref() {
1179                    None => (),
1180                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1181                    Some(StringOrSecret::Secret(s)) => {
1182                        session_token_secret_id = Some(s.to_string())
1183                    }
1184                }
1185            }
1186            AwsAuth::AssumeRole(assume_role) => {
1187                assume_role_arn = Some(assume_role.arn.as_str());
1188                assume_role_session_name = assume_role.session_name.as_deref();
1189                principal = self
1190                    .config
1191                    .connection_context
1192                    .aws_connection_role_arn
1193                    .as_deref();
1194                external_id =
1195                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1196                example_trust_policy = {
1197                    let policy = assume_role
1198                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1199                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1200                    Some(policy.into_row())
1201                };
1202            }
1203        }
1204
1205        let row = Row::pack_slice(&[
1206            Datum::String(&connection_id.to_string()),
1207            Datum::from(aws_config.endpoint.as_deref()),
1208            Datum::from(aws_config.region.as_deref()),
1209            Datum::from(access_key_id),
1210            Datum::from(access_key_id_secret_id.as_deref()),
1211            Datum::from(secret_access_key_secret_id.as_deref()),
1212            Datum::from(session_token),
1213            Datum::from(session_token_secret_id.as_deref()),
1214            Datum::from(assume_role_arn),
1215            Datum::from(assume_role_session_name),
1216            Datum::from(principal),
1217            Datum::from(external_id.as_deref()),
1218            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1219        ]);
1220
1221        Ok(BuiltinTableUpdate::row(id, row, diff))
1222    }
1223
1224    fn pack_view_update(
1225        &self,
1226        id: CatalogItemId,
1227        oid: u32,
1228        schema_id: &SchemaSpecifier,
1229        name: &str,
1230        owner_id: &RoleId,
1231        privileges: Datum,
1232        view: &View,
1233        diff: Diff,
1234    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1235        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1236            .unwrap_or_else(|e| {
1237                panic!(
1238                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1239                    view.create_sql, e
1240                )
1241            })
1242            .into_element()
1243            .ast;
1244        let query = match &create_stmt {
1245            Statement::CreateView(stmt) => &stmt.definition.query,
1246            _ => unreachable!(),
1247        };
1248
1249        let mut query_string = query.to_ast_string_stable();
1250        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1251        // do the same for compatibility's sake.
1252        query_string.push(';');
1253
1254        vec![BuiltinTableUpdate::row(
1255            &*MZ_VIEWS,
1256            Row::pack_slice(&[
1257                Datum::String(&id.to_string()),
1258                Datum::UInt32(oid),
1259                Datum::String(&schema_id.to_string()),
1260                Datum::String(name),
1261                Datum::String(&query_string),
1262                Datum::String(&owner_id.to_string()),
1263                privileges,
1264                Datum::String(&view.create_sql),
1265                Datum::String(&create_stmt.to_ast_string_redacted()),
1266            ]),
1267            diff,
1268        )]
1269    }
1270
1271    fn pack_materialized_view_update(
1272        &self,
1273        id: CatalogItemId,
1274        oid: u32,
1275        schema_id: &SchemaSpecifier,
1276        name: &str,
1277        owner_id: &RoleId,
1278        privileges: Datum,
1279        mview: &MaterializedView,
1280        diff: Diff,
1281    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1282        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1283            .unwrap_or_else(|e| {
1284                panic!(
1285                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1286                    mview.create_sql, e
1287                )
1288            })
1289            .into_element()
1290            .ast;
1291        let query_string = match &create_stmt {
1292            Statement::CreateMaterializedView(stmt) => {
1293                let mut query_string = stmt.query.to_ast_string_stable();
1294                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1295                // do the same for compatibility's sake.
1296                query_string.push(';');
1297                query_string
1298            }
1299            _ => unreachable!(),
1300        };
1301
1302        let mut updates = Vec::new();
1303
1304        updates.push(BuiltinTableUpdate::row(
1305            &*MZ_MATERIALIZED_VIEWS,
1306            Row::pack_slice(&[
1307                Datum::String(&id.to_string()),
1308                Datum::UInt32(oid),
1309                Datum::String(&schema_id.to_string()),
1310                Datum::String(name),
1311                Datum::String(&mview.cluster_id.to_string()),
1312                Datum::String(&query_string),
1313                Datum::String(&owner_id.to_string()),
1314                privileges,
1315                Datum::String(&mview.create_sql),
1316                Datum::String(&create_stmt.to_ast_string_redacted()),
1317            ]),
1318            diff,
1319        ));
1320
1321        if let Some(refresh_schedule) = &mview.refresh_schedule {
1322            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1323            // empty `RefreshSchedule`.
1324            assert!(!refresh_schedule.is_empty());
1325            for RefreshEvery {
1326                interval,
1327                aligned_to,
1328            } in refresh_schedule.everies.iter()
1329            {
1330                let aligned_to_dt = mz_ore::now::to_datetime(
1331                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1332                );
1333                updates.push(BuiltinTableUpdate::row(
1334                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1335                    Row::pack_slice(&[
1336                        Datum::String(&id.to_string()),
1337                        Datum::String("every"),
1338                        Datum::Interval(
1339                            Interval::from_duration(interval).expect(
1340                                "planning ensured that this is convertible back to Interval",
1341                            ),
1342                        ),
1343                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1344                        Datum::Null,
1345                    ]),
1346                    diff,
1347                ));
1348            }
1349            for at in refresh_schedule.ats.iter() {
1350                let at_dt = mz_ore::now::to_datetime(
1351                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1352                );
1353                updates.push(BuiltinTableUpdate::row(
1354                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1355                    Row::pack_slice(&[
1356                        Datum::String(&id.to_string()),
1357                        Datum::String("at"),
1358                        Datum::Null,
1359                        Datum::Null,
1360                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1361                    ]),
1362                    diff,
1363                ));
1364            }
1365        } else {
1366            updates.push(BuiltinTableUpdate::row(
1367                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1368                Row::pack_slice(&[
1369                    Datum::String(&id.to_string()),
1370                    Datum::String("on-commit"),
1371                    Datum::Null,
1372                    Datum::Null,
1373                    Datum::Null,
1374                ]),
1375                diff,
1376            ));
1377        }
1378
1379        if let Some(target_id) = mview.replacement_target {
1380            updates.push(BuiltinTableUpdate::row(
1381                &*MZ_REPLACEMENTS,
1382                Row::pack_slice(&[
1383                    Datum::String(&id.to_string()),
1384                    Datum::String(&target_id.to_string()),
1385                ]),
1386                diff,
1387            ));
1388        }
1389
1390        updates
1391    }
1392
1393    fn pack_continual_task_update(
1394        &self,
1395        id: CatalogItemId,
1396        oid: u32,
1397        schema_id: &SchemaSpecifier,
1398        name: &str,
1399        owner_id: &RoleId,
1400        privileges: Datum,
1401        ct: &ContinualTask,
1402        diff: Diff,
1403    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1404        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1405            .unwrap_or_else(|e| {
1406                panic!(
1407                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1408                    ct.create_sql, e
1409                )
1410            })
1411            .into_element()
1412            .ast;
1413        let query_string = match &create_stmt {
1414            Statement::CreateContinualTask(stmt) => {
1415                let mut query_string = String::new();
1416                for stmt in &stmt.stmts {
1417                    let s = match stmt {
1418                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1419                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1420                    };
1421                    if query_string.is_empty() {
1422                        query_string = s;
1423                    } else {
1424                        query_string.push_str("; ");
1425                        query_string.push_str(&s);
1426                    }
1427                }
1428                query_string
1429            }
1430            _ => unreachable!(),
1431        };
1432
1433        vec![BuiltinTableUpdate::row(
1434            &*MZ_CONTINUAL_TASKS,
1435            Row::pack_slice(&[
1436                Datum::String(&id.to_string()),
1437                Datum::UInt32(oid),
1438                Datum::String(&schema_id.to_string()),
1439                Datum::String(name),
1440                Datum::String(&ct.cluster_id.to_string()),
1441                Datum::String(&query_string),
1442                Datum::String(&owner_id.to_string()),
1443                privileges,
1444                Datum::String(&ct.create_sql),
1445                Datum::String(&create_stmt.to_ast_string_redacted()),
1446            ]),
1447            diff,
1448        )]
1449    }
1450
1451    fn pack_sink_update(
1452        &self,
1453        id: CatalogItemId,
1454        oid: u32,
1455        schema_id: &SchemaSpecifier,
1456        name: &str,
1457        owner_id: &RoleId,
1458        sink: &Sink,
1459        diff: Diff,
1460    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1461        let mut updates = vec![];
1462        match &sink.connection {
1463            StorageSinkConnection::Kafka(KafkaSinkConnection {
1464                topic: topic_name, ..
1465            }) => {
1466                updates.push(BuiltinTableUpdate::row(
1467                    &*MZ_KAFKA_SINKS,
1468                    Row::pack_slice(&[
1469                        Datum::String(&id.to_string()),
1470                        Datum::String(topic_name.as_str()),
1471                    ]),
1472                    diff,
1473                ));
1474            }
1475            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1476                namespace, table, ..
1477            }) => {
1478                updates.push(BuiltinTableUpdate::row(
1479                    &*MZ_ICEBERG_SINKS,
1480                    Row::pack_slice(&[
1481                        Datum::String(&id.to_string()),
1482                        Datum::String(namespace.as_str()),
1483                        Datum::String(table.as_str()),
1484                    ]),
1485                    diff,
1486                ));
1487            }
1488        };
1489
1490        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1491            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1492            .into_element()
1493            .ast;
1494
1495        let envelope = sink.envelope();
1496
1497        // The combined format string is used for the deprecated `format` column.
1498        let combined_format = sink.combined_format();
1499        let (key_format, value_format) = match sink.formats() {
1500            Some((key_format, value_format)) => (key_format, Some(value_format)),
1501            None => (None, None),
1502        };
1503
1504        updates.push(BuiltinTableUpdate::row(
1505            &*MZ_SINKS,
1506            Row::pack_slice(&[
1507                Datum::String(&id.to_string()),
1508                Datum::UInt32(oid),
1509                Datum::String(&schema_id.to_string()),
1510                Datum::String(name),
1511                Datum::String(sink.connection.name()),
1512                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1513                // size column now deprecated w/o linked clusters
1514                Datum::Null,
1515                Datum::from(envelope),
1516                // FIXME: These key/value formats are kinda leaky! Should probably live in
1517                // the kafka sink table.
1518                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1519                Datum::from(key_format),
1520                Datum::from(value_format),
1521                Datum::String(&sink.cluster_id.to_string()),
1522                Datum::String(&owner_id.to_string()),
1523                Datum::String(&sink.create_sql),
1524                Datum::String(&create_stmt.to_ast_string_redacted()),
1525            ]),
1526            diff,
1527        ));
1528
1529        updates
1530    }
1531
1532    fn pack_index_update(
1533        &self,
1534        id: CatalogItemId,
1535        oid: u32,
1536        name: &str,
1537        owner_id: &RoleId,
1538        index: &Index,
1539        diff: Diff,
1540    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1541        let mut updates = vec![];
1542
1543        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1544            .unwrap_or_else(|e| {
1545                panic!(
1546                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1547                    index.create_sql, e
1548                )
1549            })
1550            .into_element()
1551            .ast;
1552
1553        let key_sqls = match &create_stmt {
1554            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1555                .as_ref()
1556                .expect("key_parts is filled in during planning"),
1557            _ => unreachable!(),
1558        };
1559        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1560
1561        updates.push(BuiltinTableUpdate::row(
1562            &*MZ_INDEXES,
1563            Row::pack_slice(&[
1564                Datum::String(&id.to_string()),
1565                Datum::UInt32(oid),
1566                Datum::String(name),
1567                Datum::String(&on_item_id.to_string()),
1568                Datum::String(&index.cluster_id.to_string()),
1569                Datum::String(&owner_id.to_string()),
1570                Datum::String(&index.create_sql),
1571                Datum::String(&create_stmt.to_ast_string_redacted()),
1572            ]),
1573            diff,
1574        ));
1575
1576        let on_entry = self.get_entry_by_global_id(&index.on);
1577        let on_desc = on_entry
1578            .relation_desc()
1579            .expect("can only create indexes on items with a valid description");
1580        let repr_col_types: Vec<ReprColumnType> = on_desc
1581            .typ()
1582            .column_types
1583            .iter()
1584            .map(ReprColumnType::from)
1585            .collect();
1586        for (i, key) in index.keys.iter().enumerate() {
1587            let nullable = key.typ(&repr_col_types).nullable;
1588            let seq_in_index = u64::cast_from(i + 1);
1589            let key_sql = key_sqls
1590                .get(i)
1591                .expect("missing sql information for index key")
1592                .to_ast_string_simple();
1593            let (field_number, expression) = match key {
1594                MirScalarExpr::Column(col, _) => {
1595                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1596                }
1597                _ => (Datum::Null, Datum::String(&key_sql)),
1598            };
1599            updates.push(BuiltinTableUpdate::row(
1600                &*MZ_INDEX_COLUMNS,
1601                Row::pack_slice(&[
1602                    Datum::String(&id.to_string()),
1603                    Datum::UInt64(seq_in_index),
1604                    field_number,
1605                    expression,
1606                    Datum::from(nullable),
1607                ]),
1608                diff,
1609            ));
1610        }
1611
1612        updates
1613    }
1614
1615    fn pack_type_update(
1616        &self,
1617        id: CatalogItemId,
1618        oid: u32,
1619        schema_id: &SchemaSpecifier,
1620        name: &str,
1621        owner_id: &RoleId,
1622        privileges: Datum,
1623        typ: &Type,
1624        diff: Diff,
1625    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1626        let mut out = vec![];
1627
1628        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1629            mz_sql::parse::parse(create_sql)
1630                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1631                .into_element()
1632                .ast
1633                .to_ast_string_redacted()
1634        });
1635
1636        out.push(BuiltinTableUpdate::row(
1637            &*MZ_TYPES,
1638            Row::pack_slice(&[
1639                Datum::String(&id.to_string()),
1640                Datum::UInt32(oid),
1641                Datum::String(&schema_id.to_string()),
1642                Datum::String(name),
1643                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1644                Datum::String(&owner_id.to_string()),
1645                privileges,
1646                if let Some(create_sql) = &typ.create_sql {
1647                    Datum::String(create_sql)
1648                } else {
1649                    Datum::Null
1650                },
1651                if let Some(redacted) = &redacted {
1652                    Datum::String(redacted)
1653                } else {
1654                    Datum::Null
1655                },
1656            ]),
1657            diff,
1658        ));
1659
1660        let mut row = Row::default();
1661        let mut packer = row.packer();
1662
1663        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1664            if mods.is_empty() {
1665                packer.push(Datum::Null);
1666            } else {
1667                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1668            }
1669        }
1670
1671        let index_id = match &typ.details.typ {
1672            CatalogType::Array {
1673                element_reference: element_id,
1674            } => {
1675                packer.push(Datum::String(&id.to_string()));
1676                packer.push(Datum::String(&element_id.to_string()));
1677                &MZ_ARRAY_TYPES
1678            }
1679            CatalogType::List {
1680                element_reference: element_id,
1681                element_modifiers,
1682            } => {
1683                packer.push(Datum::String(&id.to_string()));
1684                packer.push(Datum::String(&element_id.to_string()));
1685                append_modifier(&mut packer, element_modifiers);
1686                &MZ_LIST_TYPES
1687            }
1688            CatalogType::Map {
1689                key_reference: key_id,
1690                value_reference: value_id,
1691                key_modifiers,
1692                value_modifiers,
1693            } => {
1694                packer.push(Datum::String(&id.to_string()));
1695                packer.push(Datum::String(&key_id.to_string()));
1696                packer.push(Datum::String(&value_id.to_string()));
1697                append_modifier(&mut packer, key_modifiers);
1698                append_modifier(&mut packer, value_modifiers);
1699                &MZ_MAP_TYPES
1700            }
1701            CatalogType::Pseudo => {
1702                packer.push(Datum::String(&id.to_string()));
1703                &MZ_PSEUDO_TYPES
1704            }
1705            _ => {
1706                packer.push(Datum::String(&id.to_string()));
1707                &MZ_BASE_TYPES
1708            }
1709        };
1710        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1711
1712        if let Some(pg_metadata) = &typ.details.pg_metadata {
1713            out.push(BuiltinTableUpdate::row(
1714                &*MZ_TYPE_PG_METADATA,
1715                Row::pack_slice(&[
1716                    Datum::String(&id.to_string()),
1717                    Datum::UInt32(pg_metadata.typinput_oid),
1718                    Datum::UInt32(pg_metadata.typreceive_oid),
1719                ]),
1720                diff,
1721            ));
1722        }
1723
1724        out
1725    }
1726
1727    fn pack_func_update(
1728        &self,
1729        id: CatalogItemId,
1730        schema_id: &SchemaSpecifier,
1731        name: &str,
1732        owner_id: &RoleId,
1733        func: &Func,
1734        diff: Diff,
1735    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1736        let mut updates = vec![];
1737        for func_impl_details in func.inner.func_impls() {
1738            let arg_type_ids = func_impl_details
1739                .arg_typs
1740                .iter()
1741                .map(|typ| self.get_system_type(typ).id().to_string())
1742                .collect::<Vec<_>>();
1743
1744            let mut row = Row::default();
1745            row.packer()
1746                .try_push_array(
1747                    &[ArrayDimension {
1748                        lower_bound: 1,
1749                        length: arg_type_ids.len(),
1750                    }],
1751                    arg_type_ids.iter().map(|id| Datum::String(id)),
1752                )
1753                .expect(
1754                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1755                );
1756            let arg_type_ids = row.unpack_first();
1757
1758            updates.push(BuiltinTableUpdate::row(
1759                &*MZ_FUNCTIONS,
1760                Row::pack_slice(&[
1761                    Datum::String(&id.to_string()),
1762                    Datum::UInt32(func_impl_details.oid),
1763                    Datum::String(&schema_id.to_string()),
1764                    Datum::String(name),
1765                    arg_type_ids,
1766                    Datum::from(
1767                        func_impl_details
1768                            .variadic_typ
1769                            .map(|typ| self.get_system_type(typ).id().to_string())
1770                            .as_deref(),
1771                    ),
1772                    Datum::from(
1773                        func_impl_details
1774                            .return_typ
1775                            .map(|typ| self.get_system_type(typ).id().to_string())
1776                            .as_deref(),
1777                    ),
1778                    func_impl_details.return_is_set.into(),
1779                    Datum::String(&owner_id.to_string()),
1780                ]),
1781                diff,
1782            ));
1783
1784            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1785                updates.push(BuiltinTableUpdate::row(
1786                    &*MZ_AGGREGATES,
1787                    Row::pack_slice(&[
1788                        Datum::UInt32(func_impl_details.oid),
1789                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1790                        Datum::String("n"),
1791                        Datum::Int16(0),
1792                    ]),
1793                    diff,
1794                ));
1795            }
1796        }
1797        updates
1798    }
1799
1800    pub fn pack_op_update(
1801        &self,
1802        operator: &str,
1803        func_impl_details: FuncImplCatalogDetails,
1804        diff: Diff,
1805    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1806        let arg_type_ids = func_impl_details
1807            .arg_typs
1808            .iter()
1809            .map(|typ| self.get_system_type(typ).id().to_string())
1810            .collect::<Vec<_>>();
1811
1812        let mut row = Row::default();
1813        row.packer()
1814            .try_push_array(
1815                &[ArrayDimension {
1816                    lower_bound: 1,
1817                    length: arg_type_ids.len(),
1818                }],
1819                arg_type_ids.iter().map(|id| Datum::String(id)),
1820            )
1821            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1822        let arg_type_ids = row.unpack_first();
1823
1824        BuiltinTableUpdate::row(
1825            &*MZ_OPERATORS,
1826            Row::pack_slice(&[
1827                Datum::UInt32(func_impl_details.oid),
1828                Datum::String(operator),
1829                arg_type_ids,
1830                Datum::from(
1831                    func_impl_details
1832                        .return_typ
1833                        .map(|typ| self.get_system_type(typ).id().to_string())
1834                        .as_deref(),
1835                ),
1836            ]),
1837            diff,
1838        )
1839    }
1840
1841    fn pack_secret_update(
1842        &self,
1843        id: CatalogItemId,
1844        oid: u32,
1845        schema_id: &SchemaSpecifier,
1846        name: &str,
1847        owner_id: &RoleId,
1848        privileges: Datum,
1849        diff: Diff,
1850    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1851        vec![BuiltinTableUpdate::row(
1852            &*MZ_SECRETS,
1853            Row::pack_slice(&[
1854                Datum::String(&id.to_string()),
1855                Datum::UInt32(oid),
1856                Datum::String(&schema_id.to_string()),
1857                Datum::String(name),
1858                Datum::String(&owner_id.to_string()),
1859                privileges,
1860            ]),
1861            diff,
1862        )]
1863    }
1864
1865    pub fn pack_audit_log_update(
1866        &self,
1867        event: &VersionedEvent,
1868        diff: Diff,
1869    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1870        let (event_type, object_type, details, user, occurred_at): (
1871            &EventType,
1872            &ObjectType,
1873            &EventDetails,
1874            &Option<String>,
1875            u64,
1876        ) = match event {
1877            VersionedEvent::V1(ev) => (
1878                &ev.event_type,
1879                &ev.object_type,
1880                &ev.details,
1881                &ev.user,
1882                ev.occurred_at,
1883            ),
1884        };
1885        let details = Jsonb::from_serde_json(details.as_json())
1886            .map_err(|e| {
1887                Error::new(ErrorKind::Unstructured(format!(
1888                    "could not pack audit log update: {}",
1889                    e
1890                )))
1891            })?
1892            .into_row();
1893        let details = details
1894            .iter()
1895            .next()
1896            .expect("details created above with a single jsonb column");
1897        let dt = mz_ore::now::to_datetime(occurred_at);
1898        let id = event.sortable_id();
1899        Ok(BuiltinTableUpdate::row(
1900            &*MZ_AUDIT_EVENTS,
1901            Row::pack_slice(&[
1902                Datum::UInt64(id),
1903                Datum::String(&format!("{}", event_type)),
1904                Datum::String(&format!("{}", object_type)),
1905                details,
1906                match user {
1907                    Some(user) => Datum::String(user),
1908                    None => Datum::Null,
1909                },
1910                Datum::TimestampTz(dt.try_into().expect("must fit")),
1911            ]),
1912            diff,
1913        ))
1914    }
1915
1916    pub fn pack_storage_usage_update(
1917        &self,
1918        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1919        diff: Diff,
1920    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1921        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1922        let row = Row::pack_slice(&[
1923            Datum::UInt64(event.id),
1924            Datum::from(event.shard_id.as_deref()),
1925            Datum::UInt64(event.size_bytes),
1926            Datum::TimestampTz(
1927                mz_ore::now::to_datetime(event.collection_timestamp)
1928                    .try_into()
1929                    .expect("must fit"),
1930            ),
1931        ]);
1932        BuiltinTableUpdate::row(id, row, diff)
1933    }
1934
1935    pub fn pack_egress_ip_update(
1936        &self,
1937        ip: &IpNet,
1938    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1939        let id = &MZ_EGRESS_IPS;
1940        let addr = ip.addr();
1941        let row = Row::pack_slice(&[
1942            Datum::String(&addr.to_string()),
1943            Datum::Int32(ip.prefix_len().into()),
1944            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1945        ]);
1946        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1947    }
1948
1949    pub fn pack_license_key_update(
1950        &self,
1951        license_key: &ValidatedLicenseKey,
1952    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1953        let id = &MZ_LICENSE_KEYS;
1954        let row = Row::pack_slice(&[
1955            Datum::String(&license_key.id),
1956            Datum::String(&license_key.organization),
1957            Datum::String(&license_key.environment_id),
1958            Datum::TimestampTz(
1959                mz_ore::now::to_datetime(license_key.expiration * 1000)
1960                    .try_into()
1961                    .expect("must fit"),
1962            ),
1963            Datum::TimestampTz(
1964                mz_ore::now::to_datetime(license_key.not_before * 1000)
1965                    .try_into()
1966                    .expect("must fit"),
1967            ),
1968        ]);
1969        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1970    }
1971
1972    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1973        let mut updates = Vec::new();
1974        for (size, alloc) in &self.cluster_replica_sizes.0 {
1975            if alloc.disabled {
1976                continue;
1977            }
1978
1979            // Just invent something when the limits are `None`, which only happens in non-prod
1980            // environments (tests, process orchestrator, etc.)
1981            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1982            let MemoryLimit(ByteSize(memory_bytes)) =
1983                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1984            let DiskLimit(ByteSize(disk_bytes)) =
1985                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1986
1987            let row = Row::pack_slice(&[
1988                size.as_str().into(),
1989                u64::cast_from(alloc.scale).into(),
1990                u64::cast_from(alloc.workers).into(),
1991                cpu_limit.as_nanocpus().into(),
1992                memory_bytes.into(),
1993                disk_bytes.into(),
1994                (alloc.credits_per_hour).into(),
1995            ]);
1996
1997            updates.push(BuiltinTableUpdate::row(
1998                &*MZ_CLUSTER_REPLICA_SIZES,
1999                row,
2000                Diff::ONE,
2001            ));
2002        }
2003
2004        updates
2005    }
2006
2007    pub fn pack_subscribe_update(
2008        &self,
2009        id: GlobalId,
2010        subscribe: &ActiveSubscribe,
2011        diff: Diff,
2012    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2013        let mut row = Row::default();
2014        let mut packer = row.packer();
2015        packer.push(Datum::String(&id.to_string()));
2016        packer.push(Datum::Uuid(subscribe.session_uuid));
2017        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2018
2019        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2020        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2021
2022        let depends_on: Vec<_> = subscribe
2023            .depends_on
2024            .iter()
2025            .map(|id| id.to_string())
2026            .collect();
2027        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2028
2029        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2030    }
2031
2032    pub fn pack_session_update(
2033        &self,
2034        conn: &ConnMeta,
2035        diff: Diff,
2036    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2037        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2038        BuiltinTableUpdate::row(
2039            &*MZ_SESSIONS,
2040            Row::pack_slice(&[
2041                Datum::Uuid(conn.uuid()),
2042                Datum::UInt32(conn.conn_id().unhandled()),
2043                Datum::String(&conn.authenticated_role_id().to_string()),
2044                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2045                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2046            ]),
2047            diff,
2048        )
2049    }
2050
2051    pub fn pack_default_privileges_update(
2052        &self,
2053        default_privilege_object: &DefaultPrivilegeObject,
2054        grantee: &RoleId,
2055        acl_mode: &AclMode,
2056        diff: Diff,
2057    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2058        BuiltinTableUpdate::row(
2059            &*MZ_DEFAULT_PRIVILEGES,
2060            Row::pack_slice(&[
2061                default_privilege_object.role_id.to_string().as_str().into(),
2062                default_privilege_object
2063                    .database_id
2064                    .map(|database_id| database_id.to_string())
2065                    .as_deref()
2066                    .into(),
2067                default_privilege_object
2068                    .schema_id
2069                    .map(|schema_id| schema_id.to_string())
2070                    .as_deref()
2071                    .into(),
2072                default_privilege_object
2073                    .object_type
2074                    .to_string()
2075                    .to_lowercase()
2076                    .as_str()
2077                    .into(),
2078                grantee.to_string().as_str().into(),
2079                acl_mode.to_string().as_str().into(),
2080            ]),
2081            diff,
2082        )
2083    }
2084
2085    pub fn pack_system_privileges_update(
2086        &self,
2087        privileges: MzAclItem,
2088        diff: Diff,
2089    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2090        BuiltinTableUpdate::row(
2091            &*MZ_SYSTEM_PRIVILEGES,
2092            Row::pack_slice(&[privileges.into()]),
2093            diff,
2094        )
2095    }
2096
2097    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2098        let mut row = Row::default();
2099        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2100        row.packer()
2101            .try_push_array(
2102                &[ArrayDimension {
2103                    lower_bound: 1,
2104                    length: flat_privileges.len(),
2105                }],
2106                flat_privileges
2107                    .into_iter()
2108                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2109            )
2110            .expect("privileges is 1 dimensional, and its length is used for the array length");
2111        row
2112    }
2113
2114    pub fn pack_comment_update(
2115        &self,
2116        object_id: CommentObjectId,
2117        column_pos: Option<usize>,
2118        comment: &str,
2119        diff: Diff,
2120    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2121        // Use the audit log representation so it's easier to join against.
2122        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2123        let audit_type = super::object_type_to_audit_object_type(object_type);
2124        let object_type_str = audit_type.to_string();
2125
2126        let object_id_str = match object_id {
2127            CommentObjectId::Table(global_id)
2128            | CommentObjectId::View(global_id)
2129            | CommentObjectId::MaterializedView(global_id)
2130            | CommentObjectId::Source(global_id)
2131            | CommentObjectId::Sink(global_id)
2132            | CommentObjectId::Index(global_id)
2133            | CommentObjectId::Func(global_id)
2134            | CommentObjectId::Connection(global_id)
2135            | CommentObjectId::Secret(global_id)
2136            | CommentObjectId::Type(global_id)
2137            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2138            CommentObjectId::Role(role_id) => role_id.to_string(),
2139            CommentObjectId::Database(database_id) => database_id.to_string(),
2140            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2141            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2142            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2143            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2144        };
2145        let column_pos_datum = match column_pos {
2146            Some(pos) => {
2147                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2148                let pos =
2149                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2150                Datum::Int32(pos)
2151            }
2152            None => Datum::Null,
2153        };
2154
2155        BuiltinTableUpdate::row(
2156            &*MZ_COMMENTS,
2157            Row::pack_slice(&[
2158                Datum::String(&object_id_str),
2159                Datum::String(&object_type_str),
2160                column_pos_datum,
2161                Datum::String(comment),
2162            ]),
2163            diff,
2164        )
2165    }
2166
2167    pub fn pack_webhook_source_update(
2168        &self,
2169        item_id: CatalogItemId,
2170        diff: Diff,
2171    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2172        let url = self
2173            .try_get_webhook_url(&item_id)
2174            .expect("webhook source should exist");
2175        let url = url.to_string();
2176        let name = &self.get_entry(&item_id).name().item;
2177        let id_str = item_id.to_string();
2178
2179        BuiltinTableUpdate::row(
2180            &*MZ_WEBHOOKS_SOURCES,
2181            Row::pack_slice(&[
2182                Datum::String(&id_str),
2183                Datum::String(name),
2184                Datum::String(&url),
2185            ]),
2186            diff,
2187        )
2188    }
2189
2190    pub fn pack_source_references_update(
2191        &self,
2192        source_references: &SourceReferences,
2193        diff: Diff,
2194    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2195        let source_id = source_references.source_id.to_string();
2196        let updated_at = &source_references.updated_at;
2197        source_references
2198            .references
2199            .iter()
2200            .map(|reference| {
2201                let mut row = Row::default();
2202                let mut packer = row.packer();
2203                packer.extend([
2204                    Datum::String(&source_id),
2205                    reference
2206                        .namespace
2207                        .as_ref()
2208                        .map(|s| Datum::String(s))
2209                        .unwrap_or(Datum::Null),
2210                    Datum::String(&reference.name),
2211                    Datum::TimestampTz(
2212                        mz_ore::now::to_datetime(*updated_at)
2213                            .try_into()
2214                            .expect("must fit"),
2215                    ),
2216                ]);
2217                if reference.columns.len() > 0 {
2218                    packer
2219                        .try_push_array(
2220                            &[ArrayDimension {
2221                                lower_bound: 1,
2222                                length: reference.columns.len(),
2223                            }],
2224                            reference.columns.iter().map(|col| Datum::String(col)),
2225                        )
2226                        .expect(
2227                            "columns is 1 dimensional, and its length is used for the array length",
2228                        );
2229                } else {
2230                    packer.push(Datum::Null);
2231                }
2232
2233                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2234            })
2235            .collect()
2236    }
2237}