Skip to main content

mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20    MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_DEFAULT_PRIVILEGES,
21    MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS,
22    MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
23    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25    MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26    MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27    MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
28    MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
29    MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35    CatalogEntry, CatalogItem, ClusterVariant, Connection, DataSourceDesc, Func, Index,
36    MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55    CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76// DO NOT add any more imports from `crate` outside of `crate::catalog`.
77use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81/// An update to a built-in table.
82#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84    /// The reference of the table to update.
85    pub id: T,
86    /// The data to put into the table.
87    pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
92    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93        BuiltinTableUpdate {
94            id,
95            data: TableData::Rows(vec![(row, diff)]),
96        }
97    }
98
99    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100        BuiltinTableUpdate {
101            id,
102            data: TableData::Batches(smallvec![batch]),
103        }
104    }
105}
106
107impl CatalogState {
108    pub fn resolve_builtin_table_updates(
109        &self,
110        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112        builtin_table_update
113            .into_iter()
114            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115            .collect()
116    }
117
118    pub fn resolve_builtin_table_update(
119        &self,
120        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121    ) -> BuiltinTableUpdate<CatalogItemId> {
122        let id = self.resolve_builtin_table(id);
123        BuiltinTableUpdate { id, data }
124    }
125
126    pub fn pack_depends_update(
127        &self,
128        depender: CatalogItemId,
129        dependee: CatalogItemId,
130        diff: Diff,
131    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132        let row = Row::pack_slice(&[
133            Datum::String(&depender.to_string()),
134            Datum::String(&dependee.to_string()),
135        ]);
136        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137    }
138
139    pub(super) fn pack_role_auth_update(
140        &self,
141        id: RoleId,
142        diff: Diff,
143    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144        let role_auth = self.get_role_auth(&id);
145        let role = self.get_role(&id);
146        BuiltinTableUpdate::row(
147            &*MZ_ROLE_AUTH,
148            Row::pack_slice(&[
149                Datum::String(&role_auth.role_id.to_string()),
150                Datum::UInt32(role.oid),
151                match &role_auth.password_hash {
152                    Some(hash) => Datum::String(hash),
153                    None => Datum::Null,
154                },
155                Datum::TimestampTz(
156                    mz_ore::now::to_datetime(role_auth.updated_at)
157                        .try_into()
158                        .expect("must fit"),
159                ),
160            ]),
161            diff,
162        )
163    }
164
165    pub(super) fn pack_role_update(
166        &self,
167        id: RoleId,
168        diff: Diff,
169    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170        match id {
171            // PUBLIC role should not show up in mz_roles.
172            RoleId::Public => vec![],
173            id => {
174                let role = self.get_role(&id);
175                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177                let rolcanlogin = if let Some(login) = role.attributes.login {
178                    login
179                } else {
180                    builtin_supers.contains(&role.id)
181                };
182
183                let rolsuper = if let Some(superuser) = role.attributes.superuser {
184                    Datum::from(superuser)
185                } else if builtin_supers.contains(&role.id) {
186                    // System roles (mz_system, mz_support) are superusers
187                    Datum::from(true)
188                } else {
189                    // In cloud environments, superuser status is determined
190                    // at login, so we set `rolsuper` to `null` here because
191                    // it cannot be known beforehand. For self-managed
192                    // auth, roles created without an explicit SUPERUSER
193                    // attribute would typically have `rolsuper` set to false.
194                    // However, since we cannot reliably distinguish between
195                    // cloud and self-managed here, we conservatively use NULL
196                    // for indeterminate cases.
197                    Datum::Null
198                };
199
200                let role_update = BuiltinTableUpdate::row(
201                    &*MZ_ROLES,
202                    Row::pack_slice(&[
203                        Datum::String(&role.id.to_string()),
204                        Datum::UInt32(role.oid),
205                        Datum::String(&role.name),
206                        Datum::from(role.attributes.inherit),
207                        Datum::from(rolcanlogin),
208                        rolsuper,
209                    ]),
210                    diff,
211                );
212                let mut updates = vec![role_update];
213
214                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
215                // we should instead have a static list of all session vars.
216                let session_vars_reference = SessionVars::new_unchecked(
217                    &mz_build_info::DUMMY_BUILD_INFO,
218                    SYSTEM_USER.clone(),
219                    None,
220                );
221
222                for (name, val) in role.vars() {
223                    let result = session_vars_reference
224                        .inspect(name)
225                        .and_then(|var| var.check(val.borrow()));
226                    let Ok(formatted_val) = result else {
227                        // Note: all variables should have been validated by this point, so we
228                        // shouldn't ever hit this.
229                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
230                        continue;
231                    };
232
233                    let role_var_update = BuiltinTableUpdate::row(
234                        &*MZ_ROLE_PARAMETERS,
235                        Row::pack_slice(&[
236                            Datum::String(&role.id.to_string()),
237                            Datum::String(name),
238                            Datum::String(&formatted_val),
239                        ]),
240                        diff,
241                    );
242                    updates.push(role_var_update);
243                }
244
245                updates
246            }
247        }
248    }
249
250    pub(super) fn pack_cluster_update(
251        &self,
252        name: &str,
253        diff: Diff,
254    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255        let id = self.clusters_by_name[name];
256        let cluster = &self.clusters_by_id[&id];
257        let row = self.pack_privilege_array_row(cluster.privileges());
258        let privileges = row.unpack_first();
259        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260            match &cluster.config.variant {
261                ClusterVariant::Managed(config) => (
262                    Some(config.size.as_str()),
263                    Some(self.cluster_replica_size_has_disk(&config.size)),
264                    Some(config.replication_factor),
265                    if config.availability_zones.is_empty() {
266                        None
267                    } else {
268                        Some(config.availability_zones.clone())
269                    },
270                    Some(config.logging.log_logging),
271                    config.logging.interval.map(|d| {
272                        Interval::from_duration(&d)
273                            .expect("planning ensured this convertible back to interval")
274                    }),
275                ),
276                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277            };
278
279        let mut row = Row::default();
280        let mut packer = row.packer();
281        packer.extend([
282            Datum::String(&id.to_string()),
283            Datum::String(name),
284            Datum::String(&cluster.owner_id.to_string()),
285            privileges,
286            cluster.is_managed().into(),
287            size.into(),
288            replication_factor.into(),
289            disk.into(),
290        ]);
291        if let Some(azs) = azs {
292            packer.push_list(azs.iter().map(|az| Datum::String(az)));
293        } else {
294            packer.push(Datum::Null);
295        }
296        packer.push(Datum::from(introspection_debugging));
297        packer.push(Datum::from(introspection_interval));
298
299        let mut updates = Vec::new();
300
301        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304            let row = match managed_config.schedule {
305                ClusterSchedule::Manual => Row::pack_slice(&[
306                    Datum::String(&id.to_string()),
307                    Datum::String("manual"),
308                    Datum::Null,
309                ]),
310                ClusterSchedule::Refresh {
311                    hydration_time_estimate,
312                } => Row::pack_slice(&[
313                    Datum::String(&id.to_string()),
314                    Datum::String("on-refresh"),
315                    Datum::Interval(
316                        Interval::from_duration(&hydration_time_estimate)
317                            .expect("planning ensured that this is convertible back to Interval"),
318                    ),
319                ]),
320            };
321            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322        }
323
324        updates
325    }
326
327    pub(super) fn pack_cluster_replica_update(
328        &self,
329        cluster_id: ClusterId,
330        name: &str,
331        diff: Diff,
332    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333        let cluster = &self.clusters_by_id[&cluster_id];
334        let id = cluster.replica_id(name).expect("Must exist");
335        let replica = cluster.replica(id).expect("Must exist");
336
337        let (size, disk, az) = match &replica.config.location {
338            // TODO(guswynn): The column should be `availability_zones`, not
339            // `availability_zone`.
340            ReplicaLocation::Managed(ManagedReplicaLocation {
341                size,
342                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343                allocation: _,
344                billed_as: _,
345                internal: _,
346                pending: _,
347            }) => (
348                Some(&**size),
349                Some(self.cluster_replica_size_has_disk(size)),
350                Some(az.as_str()),
351            ),
352            ReplicaLocation::Managed(ManagedReplicaLocation {
353                size,
354                availability_zones: _,
355                allocation: _,
356                billed_as: _,
357                internal: _,
358                pending: _,
359            }) => (
360                Some(&**size),
361                Some(self.cluster_replica_size_has_disk(size)),
362                None,
363            ),
364            ReplicaLocation::Unmanaged(_) => (None, None, None),
365        };
366
367        let cluster_replica_update = BuiltinTableUpdate::row(
368            &*MZ_CLUSTER_REPLICAS,
369            Row::pack_slice(&[
370                Datum::String(&id.to_string()),
371                Datum::String(name),
372                Datum::String(&cluster_id.to_string()),
373                Datum::from(size),
374                Datum::from(az),
375                Datum::String(&replica.owner_id.to_string()),
376                Datum::from(disk),
377            ]),
378            diff,
379        );
380
381        vec![cluster_replica_update]
382    }
383
384    pub(super) fn pack_item_update(
385        &self,
386        id: CatalogItemId,
387        diff: Diff,
388    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389        let entry = self.get_entry(&id);
390        let oid = entry.oid();
391        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392        let schema_id = &self
393            .get_schema(
394                &entry.name().qualifiers.database_spec,
395                &entry.name().qualifiers.schema_spec,
396                conn_id,
397            )
398            .id;
399        let name = &entry.name().item;
400        let owner_id = entry.owner_id();
401        let privileges_row = self.pack_privilege_array_row(entry.privileges());
402        let privileges = privileges_row.unpack_first();
403        let mut updates = match entry.item() {
404            CatalogItem::Log(_) => self.pack_source_update(
405                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
406                privileges, diff, None,
407            ),
408            CatalogItem::Index(index) => {
409                self.pack_index_update(id, oid, name, owner_id, index, diff)
410            }
411            CatalogItem::Table(table) => {
412                let mut updates = self
413                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
414
415                if let TableDataSource::DataSource {
416                    desc: data_source,
417                    timeline: _,
418                } = &table.data_source
419                {
420                    updates.extend(match data_source {
421                        DataSourceDesc::IngestionExport {
422                            ingestion_id,
423                            external_reference: UnresolvedItemName(external_reference),
424                            details: _,
425                            data_config: _,
426                        } => {
427                            let ingestion_entry = self
428                                .get_entry(ingestion_id)
429                                .source_desc()
430                                .expect("primary source exists")
431                                .expect("primary source is a source");
432
433                            match ingestion_entry.connection.name() {
434                                "postgres" => {
435                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
436                                    // The left-most qualification of Postgres
437                                    // tables is the database, but this
438                                    // information is redundant because each
439                                    // Postgres connection connects to only one
440                                    // database.
441                                    let schema_name = external_reference[1].as_str();
442                                    let table_name = external_reference[2].as_str();
443
444                                    self.pack_postgres_source_tables_update(
445                                        id,
446                                        schema_name,
447                                        table_name,
448                                        diff,
449                                    )
450                                }
451                                "mysql" => {
452                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
453                                    let schema_name = external_reference[0].as_str();
454                                    let table_name = external_reference[1].as_str();
455
456                                    self.pack_mysql_source_tables_update(
457                                        id,
458                                        schema_name,
459                                        table_name,
460                                        diff,
461                                    )
462                                }
463                                "sql-server" => {
464                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
465                                    // The left-most qualification of SQL Server tables is
466                                    // the database, but this information is redundant
467                                    // because each SQL Server connection connects to
468                                    // only one database.
469                                    let schema_name = external_reference[1].as_str();
470                                    let table_name = external_reference[2].as_str();
471
472                                    self.pack_sql_server_source_table_update(
473                                        id,
474                                        schema_name,
475                                        table_name,
476                                        diff,
477                                    )
478                                }
479                                // Load generator sources don't have any special
480                                // updates.
481                                "load-generator" => vec![],
482                                "kafka" => {
483                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
484                                    let topic = external_reference[0].as_str();
485                                    let envelope = data_source.envelope();
486                                    let (key_format, value_format) = data_source.formats();
487
488                                    self.pack_kafka_source_tables_update(
489                                        id,
490                                        topic,
491                                        envelope,
492                                        key_format,
493                                        value_format,
494                                        diff,
495                                    )
496                                }
497                                s => unreachable!("{s} sources do not have tables"),
498                            }
499                        }
500                        DataSourceDesc::Ingestion { .. }
501                        | DataSourceDesc::OldSyntaxIngestion { .. }
502                        | DataSourceDesc::Introspection(_)
503                        | DataSourceDesc::Progress
504                        | DataSourceDesc::Webhook { .. }
505                        | DataSourceDesc::Catalog => vec![],
506                    });
507                }
508
509                updates
510            }
511            CatalogItem::Source(source) => {
512                let source_type = source.source_type();
513                let connection_id = source.connection_id();
514                let envelope = source.data_source.envelope();
515                let cluster_entry = match source.data_source {
516                    // Ingestion exports don't have their own cluster, but
517                    // run on their ingestion's cluster.
518                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
519                        self.get_entry(&ingestion_id)
520                    }
521                    DataSourceDesc::Ingestion { .. }
522                    | DataSourceDesc::OldSyntaxIngestion { .. }
523                    | DataSourceDesc::Introspection(_)
524                    | DataSourceDesc::Progress
525                    | DataSourceDesc::Webhook { .. }
526                    | DataSourceDesc::Catalog => entry,
527                };
528
529                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
530
531                let (key_format, value_format) = source.data_source.formats();
532
533                let mut updates = self.pack_source_update(
534                    id,
535                    oid,
536                    schema_id,
537                    name,
538                    source_type,
539                    connection_id,
540                    envelope,
541                    key_format,
542                    value_format,
543                    cluster_id.as_deref(),
544                    owner_id,
545                    privileges,
546                    diff,
547                    source.create_sql.as_ref(),
548                );
549
550                updates.extend(match &source.data_source {
551                    DataSourceDesc::Ingestion { desc, .. }
552                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
553                        GenericSourceConnection::Postgres(postgres) => {
554                            self.pack_postgres_source_update(id, postgres, diff)
555                        }
556                        GenericSourceConnection::Kafka(kafka) => {
557                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
558                        }
559                        _ => vec![],
560                    },
561                    DataSourceDesc::IngestionExport {
562                        ingestion_id,
563                        external_reference: UnresolvedItemName(external_reference),
564                        details: _,
565                        data_config: _,
566                    } => {
567                        let ingestion_entry = self
568                            .get_entry(ingestion_id)
569                            .source_desc()
570                            .expect("primary source exists")
571                            .expect("primary source is a source");
572
573                        match ingestion_entry.connection.name() {
574                            "postgres" => {
575                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
576                                // The left-most qualification of Postgres
577                                // tables is the database, but this
578                                // information is redundant because each
579                                // Postgres connection connects to only one
580                                // database.
581                                let schema_name = external_reference[1].as_str();
582                                let table_name = external_reference[2].as_str();
583
584                                self.pack_postgres_source_tables_update(
585                                    id,
586                                    schema_name,
587                                    table_name,
588                                    diff,
589                                )
590                            }
591                            "mysql" => {
592                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
593                                let schema_name = external_reference[0].as_str();
594                                let table_name = external_reference[1].as_str();
595
596                                self.pack_mysql_source_tables_update(
597                                    id,
598                                    schema_name,
599                                    table_name,
600                                    diff,
601                                )
602                            }
603                            "sql-server" => {
604                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
605                                // The left-most qualification of SQL Server tables is
606                                // the database, but this information is redundant
607                                // because each SQL Server connection connects to
608                                // only one database.
609                                let schema_name = external_reference[1].as_str();
610                                let table_name = external_reference[2].as_str();
611
612                                self.pack_sql_server_source_table_update(
613                                    id,
614                                    schema_name,
615                                    table_name,
616                                    diff,
617                                )
618                            }
619                            // Load generator sources don't have any special
620                            // updates.
621                            "load-generator" => vec![],
622                            s => unreachable!("{s} sources do not have subsources"),
623                        }
624                    }
625                    DataSourceDesc::Webhook { .. } => {
626                        vec![self.pack_webhook_source_update(id, diff)]
627                    }
628                    DataSourceDesc::Introspection(_)
629                    | DataSourceDesc::Progress
630                    | DataSourceDesc::Catalog => vec![],
631                });
632
633                updates
634            }
635            CatalogItem::View(view) => {
636                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
637            }
638            CatalogItem::MaterializedView(mview) => {
639                self.pack_materialized_view_update(id, mview, diff)
640            }
641            CatalogItem::Sink(sink) => {
642                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
643            }
644            CatalogItem::Type(ty) => {
645                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
646            }
647            CatalogItem::Func(func) => {
648                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
649            }
650            CatalogItem::Secret(_) => vec![],
651            CatalogItem::Connection(connection) => {
652                self.pack_connection_update(id, connection, diff)
653            }
654        };
655
656        if !entry.item().is_temporary() {
657            // Populate or clean up the `mz_object_dependencies` table.
658            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
659            for dependee in entry.item().references().items() {
660                updates.push(self.pack_depends_update(id, *dependee, diff))
661            }
662        }
663
664        // Always report the latest for an objects columns.
665        if let Some(desc) = entry.relation_desc_latest() {
666            let defaults = match entry.item() {
667                CatalogItem::Table(Table {
668                    data_source: TableDataSource::TableWrites { defaults },
669                    ..
670                }) => Some(defaults),
671                _ => None,
672            };
673            for (i, (column_name, column_type)) in desc.iter().enumerate() {
674                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
675                let default: Datum = default
676                    .as_ref()
677                    .map(|d| Datum::String(d))
678                    .unwrap_or(Datum::Null);
679                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
680                let (type_name, type_oid) = match &column_type.scalar_type {
681                    SqlScalarType::List {
682                        custom_id: Some(custom_id),
683                        ..
684                    }
685                    | SqlScalarType::Map {
686                        custom_id: Some(custom_id),
687                        ..
688                    }
689                    | SqlScalarType::Record {
690                        custom_id: Some(custom_id),
691                        ..
692                    } => {
693                        let entry = self.get_entry(custom_id);
694                        // NOTE(benesch): the `mz_columns.type text` field is
695                        // wrong. Types do not have a name that can be
696                        // represented as a single textual field. There can be
697                        // multiple types with the same name in different
698                        // schemas and databases. We should eventually deprecate
699                        // the `type` field in favor of a new `type_id` field
700                        // that can be joined against `mz_types`.
701                        //
702                        // For now, in the interest of pragmatism, we just use
703                        // the type's item name, and accept that there may be
704                        // ambiguity if the same type name is used in multiple
705                        // schemas. The ambiguity is mitigated by the OID, which
706                        // can be joined against `mz_types.oid` to resolve the
707                        // ambiguity.
708                        let name = &*entry.name().item;
709                        let oid = entry.oid();
710                        (name, oid)
711                    }
712                    _ => (pgtype.name(), pgtype.oid()),
713                };
714                updates.push(BuiltinTableUpdate::row(
715                    &*MZ_COLUMNS,
716                    Row::pack_slice(&[
717                        Datum::String(&id.to_string()),
718                        Datum::String(column_name),
719                        Datum::UInt64(u64::cast_from(i + 1)),
720                        Datum::from(column_type.nullable),
721                        Datum::String(type_name),
722                        default,
723                        Datum::UInt32(type_oid),
724                        Datum::Int32(pgtype.typmod()),
725                    ]),
726                    diff,
727                ));
728            }
729        }
730
731        // Use initial lcw so that we can tell apart default from non-existent windows.
732        if let Some(cw) = entry.item().initial_logical_compaction_window() {
733            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
734        }
735
736        updates.extend(Self::pack_item_global_id_update(entry, diff));
737
738        updates
739    }
740
741    fn pack_item_global_id_update(
742        entry: &CatalogEntry,
743        diff: Diff,
744    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
745        let id = entry.id().to_string();
746        let global_ids = entry.global_ids();
747        global_ids.map(move |global_id| {
748            BuiltinTableUpdate::row(
749                &*MZ_OBJECT_GLOBAL_IDS,
750                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
751                diff,
752            )
753        })
754    }
755
756    fn pack_history_retention_strategy_update(
757        &self,
758        id: CatalogItemId,
759        cw: CompactionWindow,
760        diff: Diff,
761    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
762        let cw: u64 = cw.comparable_timestamp().into();
763        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
764            .expect("must serialize");
765        BuiltinTableUpdate::row(
766            &*MZ_HISTORY_RETENTION_STRATEGIES,
767            Row::pack_slice(&[
768                Datum::String(&id.to_string()),
769                // FOR is the only strategy at the moment. We may introduce FROM or others later.
770                Datum::String("FOR"),
771                cw.into_row().into_element(),
772            ]),
773            diff,
774        )
775    }
776
777    fn pack_table_update(
778        &self,
779        id: CatalogItemId,
780        oid: u32,
781        schema_id: &SchemaSpecifier,
782        name: &str,
783        owner_id: &RoleId,
784        privileges: Datum,
785        diff: Diff,
786        table: &Table,
787    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
788        let redacted = table.create_sql.as_ref().map(|create_sql| {
789            mz_sql::parse::parse(create_sql)
790                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
791                .into_element()
792                .ast
793                .to_ast_string_redacted()
794        });
795        let source_id = if let TableDataSource::DataSource {
796            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
797            ..
798        } = &table.data_source
799        {
800            Some(ingestion_id.to_string())
801        } else {
802            None
803        };
804
805        vec![BuiltinTableUpdate::row(
806            &*MZ_TABLES,
807            Row::pack_slice(&[
808                Datum::String(&id.to_string()),
809                Datum::UInt32(oid),
810                Datum::String(&schema_id.to_string()),
811                Datum::String(name),
812                Datum::String(&owner_id.to_string()),
813                privileges,
814                if let Some(create_sql) = &table.create_sql {
815                    Datum::String(create_sql)
816                } else {
817                    Datum::Null
818                },
819                if let Some(redacted) = &redacted {
820                    Datum::String(redacted)
821                } else {
822                    Datum::Null
823                },
824                if let Some(source_id) = source_id.as_ref() {
825                    Datum::String(source_id)
826                } else {
827                    Datum::Null
828                },
829            ]),
830            diff,
831        )]
832    }
833
834    fn pack_source_update(
835        &self,
836        id: CatalogItemId,
837        oid: u32,
838        schema_id: &SchemaSpecifier,
839        name: &str,
840        source_desc_name: &str,
841        connection_id: Option<CatalogItemId>,
842        envelope: Option<&str>,
843        key_format: Option<&str>,
844        value_format: Option<&str>,
845        cluster_id: Option<&str>,
846        owner_id: &RoleId,
847        privileges: Datum,
848        diff: Diff,
849        create_sql: Option<&String>,
850    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
851        let redacted = create_sql.map(|create_sql| {
852            let create_stmt = mz_sql::parse::parse(create_sql)
853                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
854                .into_element()
855                .ast;
856            create_stmt.to_ast_string_redacted()
857        });
858        vec![BuiltinTableUpdate::row(
859            &*MZ_SOURCES,
860            Row::pack_slice(&[
861                Datum::String(&id.to_string()),
862                Datum::UInt32(oid),
863                Datum::String(&schema_id.to_string()),
864                Datum::String(name),
865                Datum::String(source_desc_name),
866                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
867                // This is the "source size", which is a remnant from linked
868                // clusters.
869                Datum::Null,
870                Datum::from(envelope),
871                Datum::from(key_format),
872                Datum::from(value_format),
873                Datum::from(cluster_id),
874                Datum::String(&owner_id.to_string()),
875                privileges,
876                if let Some(create_sql) = create_sql {
877                    Datum::String(create_sql)
878                } else {
879                    Datum::Null
880                },
881                if let Some(redacted) = &redacted {
882                    Datum::String(redacted)
883                } else {
884                    Datum::Null
885                },
886            ]),
887            diff,
888        )]
889    }
890
891    fn pack_postgres_source_update(
892        &self,
893        id: CatalogItemId,
894        postgres: &PostgresSourceConnection<ReferencedConnection>,
895        diff: Diff,
896    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
897        vec![BuiltinTableUpdate::row(
898            &*MZ_POSTGRES_SOURCES,
899            Row::pack_slice(&[
900                Datum::String(&id.to_string()),
901                Datum::String(&postgres.publication_details.slot),
902                Datum::from(postgres.publication_details.timeline_id),
903            ]),
904            diff,
905        )]
906    }
907
908    fn pack_kafka_source_update(
909        &self,
910        item_id: CatalogItemId,
911        collection_id: GlobalId,
912        kafka: &KafkaSourceConnection<ReferencedConnection>,
913        diff: Diff,
914    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
915        vec![BuiltinTableUpdate::row(
916            &*MZ_KAFKA_SOURCES,
917            Row::pack_slice(&[
918                Datum::String(&item_id.to_string()),
919                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
920                Datum::String(&kafka.topic),
921            ]),
922            diff,
923        )]
924    }
925
926    fn pack_postgres_source_tables_update(
927        &self,
928        id: CatalogItemId,
929        schema_name: &str,
930        table_name: &str,
931        diff: Diff,
932    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
933        vec![BuiltinTableUpdate::row(
934            &*MZ_POSTGRES_SOURCE_TABLES,
935            Row::pack_slice(&[
936                Datum::String(&id.to_string()),
937                Datum::String(schema_name),
938                Datum::String(table_name),
939            ]),
940            diff,
941        )]
942    }
943
944    fn pack_mysql_source_tables_update(
945        &self,
946        id: CatalogItemId,
947        schema_name: &str,
948        table_name: &str,
949        diff: Diff,
950    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
951        vec![BuiltinTableUpdate::row(
952            &*MZ_MYSQL_SOURCE_TABLES,
953            Row::pack_slice(&[
954                Datum::String(&id.to_string()),
955                Datum::String(schema_name),
956                Datum::String(table_name),
957            ]),
958            diff,
959        )]
960    }
961
962    fn pack_sql_server_source_table_update(
963        &self,
964        id: CatalogItemId,
965        schema_name: &str,
966        table_name: &str,
967        diff: Diff,
968    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
969        vec![BuiltinTableUpdate::row(
970            &*MZ_SQL_SERVER_SOURCE_TABLES,
971            Row::pack_slice(&[
972                Datum::String(&id.to_string()),
973                Datum::String(schema_name),
974                Datum::String(table_name),
975            ]),
976            diff,
977        )]
978    }
979
980    fn pack_kafka_source_tables_update(
981        &self,
982        id: CatalogItemId,
983        topic: &str,
984        envelope: Option<&str>,
985        key_format: Option<&str>,
986        value_format: Option<&str>,
987        diff: Diff,
988    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
989        vec![BuiltinTableUpdate::row(
990            &*MZ_KAFKA_SOURCE_TABLES,
991            Row::pack_slice(&[
992                Datum::String(&id.to_string()),
993                Datum::String(topic),
994                Datum::from(envelope),
995                Datum::from(key_format),
996                Datum::from(value_format),
997            ]),
998            diff,
999        )]
1000    }
1001
1002    fn pack_connection_update(
1003        &self,
1004        id: CatalogItemId,
1005        connection: &Connection,
1006        diff: Diff,
1007    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1008        let mut updates = vec![];
1009        match connection.details {
1010            ConnectionDetails::Kafka(ref kafka) => {
1011                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1012            }
1013            ConnectionDetails::Aws(ref aws_config) => {
1014                match self.pack_aws_connection_update(id, aws_config, diff) {
1015                    Ok(update) => {
1016                        updates.push(update);
1017                    }
1018                    Err(e) => {
1019                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1020                    }
1021                }
1022            }
1023            ConnectionDetails::AwsPrivatelink(_) => {
1024                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1025                    updates.push(self.pack_aws_privatelink_connection_update(
1026                        id,
1027                        aws_principal_context,
1028                        diff,
1029                    ));
1030                } else {
1031                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1032                }
1033            }
1034            ConnectionDetails::Ssh {
1035                ref key_1,
1036                ref key_2,
1037                ..
1038            } => {
1039                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1040            }
1041            ConnectionDetails::Csr(_)
1042            | ConnectionDetails::Postgres(_)
1043            | ConnectionDetails::MySql(_)
1044            | ConnectionDetails::SqlServer(_)
1045            | ConnectionDetails::IcebergCatalog(_) => (),
1046        };
1047        updates
1048    }
1049
1050    pub(crate) fn pack_ssh_tunnel_connection_update(
1051        &self,
1052        id: CatalogItemId,
1053        key_1: &SshKey,
1054        key_2: &SshKey,
1055        diff: Diff,
1056    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1057        BuiltinTableUpdate::row(
1058            &*MZ_SSH_TUNNEL_CONNECTIONS,
1059            Row::pack_slice(&[
1060                Datum::String(&id.to_string()),
1061                Datum::String(key_1.public_key().as_str()),
1062                Datum::String(key_2.public_key().as_str()),
1063            ]),
1064            diff,
1065        )
1066    }
1067
1068    fn pack_kafka_connection_update(
1069        &self,
1070        id: CatalogItemId,
1071        kafka: &KafkaConnection<ReferencedConnection>,
1072        diff: Diff,
1073    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1074        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1075        let mut row = Row::default();
1076        row.packer()
1077            .try_push_array(
1078                &[ArrayDimension {
1079                    lower_bound: 1,
1080                    length: kafka.brokers.len(),
1081                }],
1082                kafka
1083                    .brokers
1084                    .iter()
1085                    .map(|broker| Datum::String(&broker.address)),
1086            )
1087            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1088        let brokers = row.unpack_first();
1089        vec![BuiltinTableUpdate::row(
1090            &*MZ_KAFKA_CONNECTIONS,
1091            Row::pack_slice(&[
1092                Datum::String(&id.to_string()),
1093                brokers,
1094                Datum::String(&progress_topic),
1095            ]),
1096            diff,
1097        )]
1098    }
1099
1100    pub fn pack_aws_privatelink_connection_update(
1101        &self,
1102        connection_id: CatalogItemId,
1103        aws_principal_context: &AwsPrincipalContext,
1104        diff: Diff,
1105    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1106        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1107        let row = Row::pack_slice(&[
1108            Datum::String(&connection_id.to_string()),
1109            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1110        ]);
1111        BuiltinTableUpdate::row(id, row, diff)
1112    }
1113
1114    pub fn pack_aws_connection_update(
1115        &self,
1116        connection_id: CatalogItemId,
1117        aws_config: &AwsConnection,
1118        diff: Diff,
1119    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1120        let id = &MZ_AWS_CONNECTIONS;
1121
1122        let mut access_key_id = None;
1123        let mut access_key_id_secret_id = None;
1124        let mut secret_access_key_secret_id = None;
1125        let mut session_token = None;
1126        let mut session_token_secret_id = None;
1127        let mut assume_role_arn = None;
1128        let mut assume_role_session_name = None;
1129        let mut principal = None;
1130        let mut external_id = None;
1131        let mut example_trust_policy = None;
1132        match &aws_config.auth {
1133            AwsAuth::Credentials(credentials) => {
1134                match &credentials.access_key_id {
1135                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1136                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1137                }
1138                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1139                match credentials.session_token.as_ref() {
1140                    None => (),
1141                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1142                    Some(StringOrSecret::Secret(s)) => {
1143                        session_token_secret_id = Some(s.to_string())
1144                    }
1145                }
1146            }
1147            AwsAuth::AssumeRole(assume_role) => {
1148                assume_role_arn = Some(assume_role.arn.as_str());
1149                assume_role_session_name = assume_role.session_name.as_deref();
1150                principal = self
1151                    .config
1152                    .connection_context
1153                    .aws_connection_role_arn
1154                    .as_deref();
1155                external_id =
1156                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1157                example_trust_policy = {
1158                    let policy = assume_role
1159                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1160                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1161                    Some(policy.into_row())
1162                };
1163            }
1164        }
1165
1166        let row = Row::pack_slice(&[
1167            Datum::String(&connection_id.to_string()),
1168            Datum::from(aws_config.endpoint.as_deref()),
1169            Datum::from(aws_config.region.as_deref()),
1170            Datum::from(access_key_id),
1171            Datum::from(access_key_id_secret_id.as_deref()),
1172            Datum::from(secret_access_key_secret_id.as_deref()),
1173            Datum::from(session_token),
1174            Datum::from(session_token_secret_id.as_deref()),
1175            Datum::from(assume_role_arn),
1176            Datum::from(assume_role_session_name),
1177            Datum::from(principal),
1178            Datum::from(external_id.as_deref()),
1179            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1180        ]);
1181
1182        Ok(BuiltinTableUpdate::row(id, row, diff))
1183    }
1184
1185    fn pack_view_update(
1186        &self,
1187        id: CatalogItemId,
1188        oid: u32,
1189        schema_id: &SchemaSpecifier,
1190        name: &str,
1191        owner_id: &RoleId,
1192        privileges: Datum,
1193        view: &View,
1194        diff: Diff,
1195    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1196        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1197            .unwrap_or_else(|e| {
1198                panic!(
1199                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1200                    view.create_sql, e
1201                )
1202            })
1203            .into_element()
1204            .ast;
1205        let query = match &create_stmt {
1206            Statement::CreateView(stmt) => &stmt.definition.query,
1207            _ => unreachable!(),
1208        };
1209
1210        let mut query_string = query.to_ast_string_stable();
1211        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1212        // do the same for compatibility's sake.
1213        query_string.push(';');
1214
1215        vec![BuiltinTableUpdate::row(
1216            &*MZ_VIEWS,
1217            Row::pack_slice(&[
1218                Datum::String(&id.to_string()),
1219                Datum::UInt32(oid),
1220                Datum::String(&schema_id.to_string()),
1221                Datum::String(name),
1222                Datum::String(&query_string),
1223                Datum::String(&owner_id.to_string()),
1224                privileges,
1225                Datum::String(&view.create_sql),
1226                Datum::String(&create_stmt.to_ast_string_redacted()),
1227            ]),
1228            diff,
1229        )]
1230    }
1231
1232    fn pack_materialized_view_update(
1233        &self,
1234        id: CatalogItemId,
1235        mview: &MaterializedView,
1236        diff: Diff,
1237    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1238        let mut updates = Vec::new();
1239
1240        if let Some(refresh_schedule) = &mview.refresh_schedule {
1241            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1242            // empty `RefreshSchedule`.
1243            assert!(!refresh_schedule.is_empty());
1244            for RefreshEvery {
1245                interval,
1246                aligned_to,
1247            } in refresh_schedule.everies.iter()
1248            {
1249                let aligned_to_dt = mz_ore::now::to_datetime(
1250                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1251                );
1252                updates.push(BuiltinTableUpdate::row(
1253                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1254                    Row::pack_slice(&[
1255                        Datum::String(&id.to_string()),
1256                        Datum::String("every"),
1257                        Datum::Interval(
1258                            Interval::from_duration(interval).expect(
1259                                "planning ensured that this is convertible back to Interval",
1260                            ),
1261                        ),
1262                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1263                        Datum::Null,
1264                    ]),
1265                    diff,
1266                ));
1267            }
1268            for at in refresh_schedule.ats.iter() {
1269                let at_dt = mz_ore::now::to_datetime(
1270                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1271                );
1272                updates.push(BuiltinTableUpdate::row(
1273                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1274                    Row::pack_slice(&[
1275                        Datum::String(&id.to_string()),
1276                        Datum::String("at"),
1277                        Datum::Null,
1278                        Datum::Null,
1279                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1280                    ]),
1281                    diff,
1282                ));
1283            }
1284        } else {
1285            updates.push(BuiltinTableUpdate::row(
1286                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1287                Row::pack_slice(&[
1288                    Datum::String(&id.to_string()),
1289                    Datum::String("on-commit"),
1290                    Datum::Null,
1291                    Datum::Null,
1292                    Datum::Null,
1293                ]),
1294                diff,
1295            ));
1296        }
1297
1298        if let Some(target_id) = mview.replacement_target {
1299            updates.push(BuiltinTableUpdate::row(
1300                &*MZ_REPLACEMENTS,
1301                Row::pack_slice(&[
1302                    Datum::String(&id.to_string()),
1303                    Datum::String(&target_id.to_string()),
1304                ]),
1305                diff,
1306            ));
1307        }
1308
1309        updates
1310    }
1311
1312    fn pack_sink_update(
1313        &self,
1314        id: CatalogItemId,
1315        oid: u32,
1316        schema_id: &SchemaSpecifier,
1317        name: &str,
1318        owner_id: &RoleId,
1319        sink: &Sink,
1320        diff: Diff,
1321    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1322        let mut updates = vec![];
1323        match &sink.connection {
1324            StorageSinkConnection::Kafka(KafkaSinkConnection {
1325                topic: topic_name, ..
1326            }) => {
1327                updates.push(BuiltinTableUpdate::row(
1328                    &*MZ_KAFKA_SINKS,
1329                    Row::pack_slice(&[
1330                        Datum::String(&id.to_string()),
1331                        Datum::String(topic_name.as_str()),
1332                    ]),
1333                    diff,
1334                ));
1335            }
1336            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1337                namespace, table, ..
1338            }) => {
1339                updates.push(BuiltinTableUpdate::row(
1340                    &*MZ_ICEBERG_SINKS,
1341                    Row::pack_slice(&[
1342                        Datum::String(&id.to_string()),
1343                        Datum::String(namespace.as_str()),
1344                        Datum::String(table.as_str()),
1345                    ]),
1346                    diff,
1347                ));
1348            }
1349        };
1350
1351        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1352            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1353            .into_element()
1354            .ast;
1355
1356        let envelope = sink.envelope();
1357
1358        // The combined format string is used for the deprecated `format` column.
1359        let combined_format = sink.combined_format();
1360        let (key_format, value_format) = match sink.formats() {
1361            Some((key_format, value_format)) => (key_format, Some(value_format)),
1362            None => (None, None),
1363        };
1364
1365        updates.push(BuiltinTableUpdate::row(
1366            &*MZ_SINKS,
1367            Row::pack_slice(&[
1368                Datum::String(&id.to_string()),
1369                Datum::UInt32(oid),
1370                Datum::String(&schema_id.to_string()),
1371                Datum::String(name),
1372                Datum::String(sink.connection.name()),
1373                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1374                // size column now deprecated w/o linked clusters
1375                Datum::Null,
1376                Datum::from(envelope),
1377                // FIXME: These key/value formats are kinda leaky! Should probably live in
1378                // the kafka sink table.
1379                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1380                Datum::from(key_format),
1381                Datum::from(value_format),
1382                Datum::String(&sink.cluster_id.to_string()),
1383                Datum::String(&owner_id.to_string()),
1384                Datum::String(&sink.create_sql),
1385                Datum::String(&create_stmt.to_ast_string_redacted()),
1386            ]),
1387            diff,
1388        ));
1389
1390        updates
1391    }
1392
1393    fn pack_index_update(
1394        &self,
1395        id: CatalogItemId,
1396        oid: u32,
1397        name: &str,
1398        owner_id: &RoleId,
1399        index: &Index,
1400        diff: Diff,
1401    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1402        let mut updates = vec![];
1403
1404        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1405            .unwrap_or_else(|e| {
1406                panic!(
1407                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1408                    index.create_sql, e
1409                )
1410            })
1411            .into_element()
1412            .ast;
1413
1414        let key_sqls = match &create_stmt {
1415            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1416                .as_ref()
1417                .expect("key_parts is filled in during planning"),
1418            _ => unreachable!(),
1419        };
1420        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1421
1422        updates.push(BuiltinTableUpdate::row(
1423            &*MZ_INDEXES,
1424            Row::pack_slice(&[
1425                Datum::String(&id.to_string()),
1426                Datum::UInt32(oid),
1427                Datum::String(name),
1428                Datum::String(&on_item_id.to_string()),
1429                Datum::String(&index.cluster_id.to_string()),
1430                Datum::String(&owner_id.to_string()),
1431                Datum::String(&index.create_sql),
1432                Datum::String(&create_stmt.to_ast_string_redacted()),
1433            ]),
1434            diff,
1435        ));
1436
1437        let on_entry = self.get_entry_by_global_id(&index.on);
1438        let on_desc = on_entry
1439            .relation_desc()
1440            .expect("can only create indexes on items with a valid description");
1441        let repr_col_types: Vec<ReprColumnType> = on_desc
1442            .typ()
1443            .column_types
1444            .iter()
1445            .map(ReprColumnType::from)
1446            .collect();
1447        for (i, key) in index.keys.iter().enumerate() {
1448            let nullable = key.typ(&repr_col_types).nullable;
1449            let seq_in_index = u64::cast_from(i + 1);
1450            let key_sql = key_sqls
1451                .get(i)
1452                .expect("missing sql information for index key")
1453                .to_ast_string_simple();
1454            let (field_number, expression) = match key {
1455                MirScalarExpr::Column(col, _) => {
1456                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1457                }
1458                _ => (Datum::Null, Datum::String(&key_sql)),
1459            };
1460            updates.push(BuiltinTableUpdate::row(
1461                &*MZ_INDEX_COLUMNS,
1462                Row::pack_slice(&[
1463                    Datum::String(&id.to_string()),
1464                    Datum::UInt64(seq_in_index),
1465                    field_number,
1466                    expression,
1467                    Datum::from(nullable),
1468                ]),
1469                diff,
1470            ));
1471        }
1472
1473        updates
1474    }
1475
1476    fn pack_type_update(
1477        &self,
1478        id: CatalogItemId,
1479        oid: u32,
1480        schema_id: &SchemaSpecifier,
1481        name: &str,
1482        owner_id: &RoleId,
1483        privileges: Datum,
1484        typ: &Type,
1485        diff: Diff,
1486    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1487        let mut out = vec![];
1488
1489        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1490            mz_sql::parse::parse(create_sql)
1491                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1492                .into_element()
1493                .ast
1494                .to_ast_string_redacted()
1495        });
1496
1497        out.push(BuiltinTableUpdate::row(
1498            &*MZ_TYPES,
1499            Row::pack_slice(&[
1500                Datum::String(&id.to_string()),
1501                Datum::UInt32(oid),
1502                Datum::String(&schema_id.to_string()),
1503                Datum::String(name),
1504                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1505                Datum::String(&owner_id.to_string()),
1506                privileges,
1507                if let Some(create_sql) = &typ.create_sql {
1508                    Datum::String(create_sql)
1509                } else {
1510                    Datum::Null
1511                },
1512                if let Some(redacted) = &redacted {
1513                    Datum::String(redacted)
1514                } else {
1515                    Datum::Null
1516                },
1517            ]),
1518            diff,
1519        ));
1520
1521        let mut row = Row::default();
1522        let mut packer = row.packer();
1523
1524        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1525            if mods.is_empty() {
1526                packer.push(Datum::Null);
1527            } else {
1528                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1529            }
1530        }
1531
1532        let index_id = match &typ.details.typ {
1533            CatalogType::Array {
1534                element_reference: element_id,
1535            } => {
1536                packer.push(Datum::String(&id.to_string()));
1537                packer.push(Datum::String(&element_id.to_string()));
1538                &MZ_ARRAY_TYPES
1539            }
1540            CatalogType::List {
1541                element_reference: element_id,
1542                element_modifiers,
1543            } => {
1544                packer.push(Datum::String(&id.to_string()));
1545                packer.push(Datum::String(&element_id.to_string()));
1546                append_modifier(&mut packer, element_modifiers);
1547                &MZ_LIST_TYPES
1548            }
1549            CatalogType::Map {
1550                key_reference: key_id,
1551                value_reference: value_id,
1552                key_modifiers,
1553                value_modifiers,
1554            } => {
1555                packer.push(Datum::String(&id.to_string()));
1556                packer.push(Datum::String(&key_id.to_string()));
1557                packer.push(Datum::String(&value_id.to_string()));
1558                append_modifier(&mut packer, key_modifiers);
1559                append_modifier(&mut packer, value_modifiers);
1560                &MZ_MAP_TYPES
1561            }
1562            CatalogType::Pseudo => {
1563                packer.push(Datum::String(&id.to_string()));
1564                &MZ_PSEUDO_TYPES
1565            }
1566            _ => {
1567                packer.push(Datum::String(&id.to_string()));
1568                &MZ_BASE_TYPES
1569            }
1570        };
1571        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1572
1573        if let Some(pg_metadata) = &typ.details.pg_metadata {
1574            out.push(BuiltinTableUpdate::row(
1575                &*MZ_TYPE_PG_METADATA,
1576                Row::pack_slice(&[
1577                    Datum::String(&id.to_string()),
1578                    Datum::UInt32(pg_metadata.typinput_oid),
1579                    Datum::UInt32(pg_metadata.typreceive_oid),
1580                ]),
1581                diff,
1582            ));
1583        }
1584
1585        out
1586    }
1587
1588    fn pack_func_update(
1589        &self,
1590        id: CatalogItemId,
1591        schema_id: &SchemaSpecifier,
1592        name: &str,
1593        owner_id: &RoleId,
1594        func: &Func,
1595        diff: Diff,
1596    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1597        let mut updates = vec![];
1598        for func_impl_details in func.inner.func_impls() {
1599            let arg_type_ids = func_impl_details
1600                .arg_typs
1601                .iter()
1602                .map(|typ| self.get_system_type(typ).id().to_string())
1603                .collect::<Vec<_>>();
1604
1605            let mut row = Row::default();
1606            row.packer()
1607                .try_push_array(
1608                    &[ArrayDimension {
1609                        lower_bound: 1,
1610                        length: arg_type_ids.len(),
1611                    }],
1612                    arg_type_ids.iter().map(|id| Datum::String(id)),
1613                )
1614                .expect(
1615                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1616                );
1617            let arg_type_ids = row.unpack_first();
1618
1619            updates.push(BuiltinTableUpdate::row(
1620                &*MZ_FUNCTIONS,
1621                Row::pack_slice(&[
1622                    Datum::String(&id.to_string()),
1623                    Datum::UInt32(func_impl_details.oid),
1624                    Datum::String(&schema_id.to_string()),
1625                    Datum::String(name),
1626                    arg_type_ids,
1627                    Datum::from(
1628                        func_impl_details
1629                            .variadic_typ
1630                            .map(|typ| self.get_system_type(typ).id().to_string())
1631                            .as_deref(),
1632                    ),
1633                    Datum::from(
1634                        func_impl_details
1635                            .return_typ
1636                            .map(|typ| self.get_system_type(typ).id().to_string())
1637                            .as_deref(),
1638                    ),
1639                    func_impl_details.return_is_set.into(),
1640                    Datum::String(&owner_id.to_string()),
1641                ]),
1642                diff,
1643            ));
1644
1645            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1646                updates.push(BuiltinTableUpdate::row(
1647                    &*MZ_AGGREGATES,
1648                    Row::pack_slice(&[
1649                        Datum::UInt32(func_impl_details.oid),
1650                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1651                        Datum::String("n"),
1652                        Datum::Int16(0),
1653                    ]),
1654                    diff,
1655                ));
1656            }
1657        }
1658        updates
1659    }
1660
1661    pub fn pack_op_update(
1662        &self,
1663        operator: &str,
1664        func_impl_details: FuncImplCatalogDetails,
1665        diff: Diff,
1666    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1667        let arg_type_ids = func_impl_details
1668            .arg_typs
1669            .iter()
1670            .map(|typ| self.get_system_type(typ).id().to_string())
1671            .collect::<Vec<_>>();
1672
1673        let mut row = Row::default();
1674        row.packer()
1675            .try_push_array(
1676                &[ArrayDimension {
1677                    lower_bound: 1,
1678                    length: arg_type_ids.len(),
1679                }],
1680                arg_type_ids.iter().map(|id| Datum::String(id)),
1681            )
1682            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1683        let arg_type_ids = row.unpack_first();
1684
1685        BuiltinTableUpdate::row(
1686            &*MZ_OPERATORS,
1687            Row::pack_slice(&[
1688                Datum::UInt32(func_impl_details.oid),
1689                Datum::String(operator),
1690                arg_type_ids,
1691                Datum::from(
1692                    func_impl_details
1693                        .return_typ
1694                        .map(|typ| self.get_system_type(typ).id().to_string())
1695                        .as_deref(),
1696                ),
1697            ]),
1698            diff,
1699        )
1700    }
1701
1702    pub fn pack_audit_log_update(
1703        &self,
1704        event: &VersionedEvent,
1705        diff: Diff,
1706    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1707        let (event_type, object_type, details, user, occurred_at): (
1708            &EventType,
1709            &ObjectType,
1710            &EventDetails,
1711            &Option<String>,
1712            u64,
1713        ) = match event {
1714            VersionedEvent::V1(ev) => (
1715                &ev.event_type,
1716                &ev.object_type,
1717                &ev.details,
1718                &ev.user,
1719                ev.occurred_at,
1720            ),
1721        };
1722        let details = Jsonb::from_serde_json(details.as_json())
1723            .map_err(|e| {
1724                Error::new(ErrorKind::Unstructured(format!(
1725                    "could not pack audit log update: {}",
1726                    e
1727                )))
1728            })?
1729            .into_row();
1730        let details = details
1731            .iter()
1732            .next()
1733            .expect("details created above with a single jsonb column");
1734        let dt = mz_ore::now::to_datetime(occurred_at);
1735        let id = event.sortable_id();
1736        Ok(BuiltinTableUpdate::row(
1737            &*MZ_AUDIT_EVENTS,
1738            Row::pack_slice(&[
1739                Datum::UInt64(id),
1740                Datum::String(&format!("{}", event_type)),
1741                Datum::String(&format!("{}", object_type)),
1742                details,
1743                match user {
1744                    Some(user) => Datum::String(user),
1745                    None => Datum::Null,
1746                },
1747                Datum::TimestampTz(dt.try_into().expect("must fit")),
1748            ]),
1749            diff,
1750        ))
1751    }
1752
1753    pub fn pack_storage_usage_update(
1754        &self,
1755        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1756        diff: Diff,
1757    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1758        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1759        let row = Row::pack_slice(&[
1760            Datum::UInt64(event.id),
1761            Datum::from(event.shard_id.as_deref()),
1762            Datum::UInt64(event.size_bytes),
1763            Datum::TimestampTz(
1764                mz_ore::now::to_datetime(event.collection_timestamp)
1765                    .try_into()
1766                    .expect("must fit"),
1767            ),
1768        ]);
1769        BuiltinTableUpdate::row(id, row, diff)
1770    }
1771
1772    pub fn pack_egress_ip_update(
1773        &self,
1774        ip: &IpNet,
1775    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1776        let id = &MZ_EGRESS_IPS;
1777        let addr = ip.network();
1778        let row = Row::pack_slice(&[
1779            Datum::String(&addr.to_string()),
1780            Datum::Int32(ip.prefix_len().into()),
1781            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1782        ]);
1783        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1784    }
1785
1786    pub fn pack_license_key_update(
1787        &self,
1788        license_key: &ValidatedLicenseKey,
1789    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1790        let id = &MZ_LICENSE_KEYS;
1791        let row = Row::pack_slice(&[
1792            Datum::String(&license_key.id),
1793            Datum::String(&license_key.organization),
1794            Datum::String(&license_key.environment_id),
1795            Datum::TimestampTz(
1796                mz_ore::now::to_datetime(license_key.expiration * 1000)
1797                    .try_into()
1798                    .expect("must fit"),
1799            ),
1800            Datum::TimestampTz(
1801                mz_ore::now::to_datetime(license_key.not_before * 1000)
1802                    .try_into()
1803                    .expect("must fit"),
1804            ),
1805        ]);
1806        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1807    }
1808
1809    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1810        let mut updates = Vec::new();
1811        for (size, alloc) in &self.cluster_replica_sizes.0 {
1812            if alloc.disabled {
1813                continue;
1814            }
1815
1816            // Just invent something when the limits are `None`, which only happens in non-prod
1817            // environments (tests, process orchestrator, etc.)
1818            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1819            let MemoryLimit(ByteSize(memory_bytes)) =
1820                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1821            let DiskLimit(ByteSize(disk_bytes)) =
1822                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1823
1824            let row = Row::pack_slice(&[
1825                size.as_str().into(),
1826                u64::cast_from(alloc.scale).into(),
1827                u64::cast_from(alloc.workers).into(),
1828                cpu_limit.as_nanocpus().into(),
1829                memory_bytes.into(),
1830                disk_bytes.into(),
1831                (alloc.credits_per_hour).into(),
1832            ]);
1833
1834            updates.push(BuiltinTableUpdate::row(
1835                &*MZ_CLUSTER_REPLICA_SIZES,
1836                row,
1837                Diff::ONE,
1838            ));
1839        }
1840
1841        updates
1842    }
1843
1844    pub fn pack_subscribe_update(
1845        &self,
1846        id: GlobalId,
1847        subscribe: &ActiveSubscribe,
1848        diff: Diff,
1849    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1850        let mut row = Row::default();
1851        let mut packer = row.packer();
1852        packer.push(Datum::String(&id.to_string()));
1853        packer.push(Datum::Uuid(subscribe.session_uuid));
1854        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1855
1856        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1857        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1858
1859        let depends_on: Vec<_> = subscribe
1860            .depends_on
1861            .iter()
1862            .map(|id| id.to_string())
1863            .collect();
1864        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1865
1866        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1867    }
1868
1869    pub fn pack_session_update(
1870        &self,
1871        conn: &ConnMeta,
1872        diff: Diff,
1873    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1874        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1875        BuiltinTableUpdate::row(
1876            &*MZ_SESSIONS,
1877            Row::pack_slice(&[
1878                Datum::Uuid(conn.uuid()),
1879                Datum::UInt32(conn.conn_id().unhandled()),
1880                Datum::String(&conn.authenticated_role_id().to_string()),
1881                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1882                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1883            ]),
1884            diff,
1885        )
1886    }
1887
1888    pub fn pack_default_privileges_update(
1889        &self,
1890        default_privilege_object: &DefaultPrivilegeObject,
1891        grantee: &RoleId,
1892        acl_mode: &AclMode,
1893        diff: Diff,
1894    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1895        BuiltinTableUpdate::row(
1896            &*MZ_DEFAULT_PRIVILEGES,
1897            Row::pack_slice(&[
1898                default_privilege_object.role_id.to_string().as_str().into(),
1899                default_privilege_object
1900                    .database_id
1901                    .map(|database_id| database_id.to_string())
1902                    .as_deref()
1903                    .into(),
1904                default_privilege_object
1905                    .schema_id
1906                    .map(|schema_id| schema_id.to_string())
1907                    .as_deref()
1908                    .into(),
1909                default_privilege_object
1910                    .object_type
1911                    .to_string()
1912                    .to_lowercase()
1913                    .as_str()
1914                    .into(),
1915                grantee.to_string().as_str().into(),
1916                acl_mode.to_string().as_str().into(),
1917            ]),
1918            diff,
1919        )
1920    }
1921
1922    pub fn pack_system_privileges_update(
1923        &self,
1924        privileges: MzAclItem,
1925        diff: Diff,
1926    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1927        BuiltinTableUpdate::row(
1928            &*MZ_SYSTEM_PRIVILEGES,
1929            Row::pack_slice(&[privileges.into()]),
1930            diff,
1931        )
1932    }
1933
1934    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1935        let mut row = Row::default();
1936        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1937        row.packer()
1938            .try_push_array(
1939                &[ArrayDimension {
1940                    lower_bound: 1,
1941                    length: flat_privileges.len(),
1942                }],
1943                flat_privileges
1944                    .into_iter()
1945                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1946            )
1947            .expect("privileges is 1 dimensional, and its length is used for the array length");
1948        row
1949    }
1950
1951    pub fn pack_comment_update(
1952        &self,
1953        object_id: CommentObjectId,
1954        column_pos: Option<usize>,
1955        comment: &str,
1956        diff: Diff,
1957    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1958        // Use the audit log representation so it's easier to join against.
1959        let object_type = mz_sql::catalog::ObjectType::from(object_id);
1960        let audit_type = super::object_type_to_audit_object_type(object_type);
1961        let object_type_str = audit_type.to_string();
1962
1963        let object_id_str = match object_id {
1964            CommentObjectId::Table(global_id)
1965            | CommentObjectId::View(global_id)
1966            | CommentObjectId::MaterializedView(global_id)
1967            | CommentObjectId::Source(global_id)
1968            | CommentObjectId::Sink(global_id)
1969            | CommentObjectId::Index(global_id)
1970            | CommentObjectId::Func(global_id)
1971            | CommentObjectId::Connection(global_id)
1972            | CommentObjectId::Secret(global_id)
1973            | CommentObjectId::Type(global_id) => global_id.to_string(),
1974            CommentObjectId::Role(role_id) => role_id.to_string(),
1975            CommentObjectId::Database(database_id) => database_id.to_string(),
1976            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1977            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1978            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1979            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1980        };
1981        let column_pos_datum = match column_pos {
1982            Some(pos) => {
1983                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
1984                let pos =
1985                    i32::try_from(pos).expect("we constrain this value in the planning layer");
1986                Datum::Int32(pos)
1987            }
1988            None => Datum::Null,
1989        };
1990
1991        BuiltinTableUpdate::row(
1992            &*MZ_COMMENTS,
1993            Row::pack_slice(&[
1994                Datum::String(&object_id_str),
1995                Datum::String(&object_type_str),
1996                column_pos_datum,
1997                Datum::String(comment),
1998            ]),
1999            diff,
2000        )
2001    }
2002
2003    pub fn pack_webhook_source_update(
2004        &self,
2005        item_id: CatalogItemId,
2006        diff: Diff,
2007    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2008        let url = self
2009            .try_get_webhook_url(&item_id)
2010            .expect("webhook source should exist");
2011        let url = url.to_string();
2012        let name = &self.get_entry(&item_id).name().item;
2013        let id_str = item_id.to_string();
2014
2015        BuiltinTableUpdate::row(
2016            &*MZ_WEBHOOKS_SOURCES,
2017            Row::pack_slice(&[
2018                Datum::String(&id_str),
2019                Datum::String(name),
2020                Datum::String(&url),
2021            ]),
2022            diff,
2023        )
2024    }
2025
2026    pub fn pack_source_references_update(
2027        &self,
2028        source_references: &SourceReferences,
2029        diff: Diff,
2030    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2031        let source_id = source_references.source_id.to_string();
2032        let updated_at = &source_references.updated_at;
2033        source_references
2034            .references
2035            .iter()
2036            .map(|reference| {
2037                let mut row = Row::default();
2038                let mut packer = row.packer();
2039                packer.extend([
2040                    Datum::String(&source_id),
2041                    reference
2042                        .namespace
2043                        .as_ref()
2044                        .map(|s| Datum::String(s))
2045                        .unwrap_or(Datum::Null),
2046                    Datum::String(&reference.name),
2047                    Datum::TimestampTz(
2048                        mz_ore::now::to_datetime(*updated_at)
2049                            .try_into()
2050                            .expect("must fit"),
2051                    ),
2052                ]);
2053                if reference.columns.len() > 0 {
2054                    packer
2055                        .try_push_array(
2056                            &[ArrayDimension {
2057                                lower_bound: 1,
2058                                length: reference.columns.len(),
2059                            }],
2060                            reference.columns.iter().map(|col| Datum::String(col)),
2061                        )
2062                        .expect(
2063                            "columns is 1 dimensional, and its length is used for the array length",
2064                        );
2065                } else {
2066                    packer.push(Datum::Null);
2067                }
2068
2069                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2070            })
2071            .collect()
2072    }
2073}