mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use std::cmp;
13use std::time::Duration;
14
15use bytesize::ByteSize;
16use ipnet::IpNet;
17use mz_adapter_types::compaction::CompactionWindow;
18use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
19use mz_catalog::SYSTEM_CONN_ID;
20use mz_catalog::builtin::{
21    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
22    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
23    MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
24    MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
25    MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEX_COLUMNS, MZ_INDEXES,
26    MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
27    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
28    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
29    MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
30    MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
31    MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS,
32    MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
33    MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
34    MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
35};
36use mz_catalog::config::AwsPrincipalContext;
37use mz_catalog::durable::SourceReferences;
38use mz_catalog::memory::error::{Error, ErrorKind};
39use mz_catalog::memory::objects::{
40    CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
41    MaterializedView, Sink, Table, TableDataSource, Type, View,
42};
43use mz_compute_types::dyncfgs::{
44    MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS, MEMORY_LIMITER_USAGE_FACTOR,
45};
46use mz_controller::clusters::{
47    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
48};
49use mz_controller_types::ClusterId;
50use mz_expr::MirScalarExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
53use mz_ore::cast::{CastFrom, CastLossy};
54use mz_ore::collections::CollectionExt;
55use mz_persist_client::batch::ProtoBatch;
56use mz_repr::adt::array::ArrayDimension;
57use mz_repr::adt::interval::Interval;
58use mz_repr::adt::jsonb::Jsonb;
59use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::refresh_schedule::RefreshEvery;
62use mz_repr::role_id::RoleId;
63use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
64use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
65use mz_sql::catalog::{
66    CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
67    TypeCategory,
68};
69use mz_sql::func::FuncImplCatalogDetails;
70use mz_sql::names::{
71    CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
72};
73use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
74use mz_sql::session::user::SYSTEM_USER;
75use mz_sql::session::vars::SessionVars;
76use mz_sql_parser::ast::display::AstDisplay;
77use mz_storage_client::client::TableData;
78use mz_storage_types::connections::KafkaConnection;
79use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
80use mz_storage_types::connections::inline::ReferencedConnection;
81use mz_storage_types::connections::string_or_secret::StringOrSecret;
82use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
83use mz_storage_types::sources::{
84    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
85};
86use smallvec::smallvec;
87
88// DO NOT add any more imports from `crate` outside of `crate::catalog`.
89use crate::active_compute_sink::ActiveSubscribe;
90use crate::catalog::CatalogState;
91use crate::coord::ConnMeta;
92
93/// An update to a built-in table.
94#[derive(Debug, Clone)]
95pub struct BuiltinTableUpdate<T = CatalogItemId> {
96    /// The reference of the table to update.
97    pub id: T,
98    /// The data to put into the table.
99    pub data: TableData,
100}
101
102impl<T> BuiltinTableUpdate<T> {
103    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
104    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
105        BuiltinTableUpdate {
106            id,
107            data: TableData::Rows(vec![(row, diff)]),
108        }
109    }
110
111    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
112        BuiltinTableUpdate {
113            id,
114            data: TableData::Batches(smallvec![batch]),
115        }
116    }
117}
118
119impl CatalogState {
120    pub fn resolve_builtin_table_updates(
121        &self,
122        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
123    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
124        builtin_table_update
125            .into_iter()
126            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
127            .collect()
128    }
129
130    pub fn resolve_builtin_table_update(
131        &self,
132        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
133    ) -> BuiltinTableUpdate<CatalogItemId> {
134        let id = self.resolve_builtin_table(id);
135        BuiltinTableUpdate { id, data }
136    }
137
138    pub fn pack_depends_update(
139        &self,
140        depender: CatalogItemId,
141        dependee: CatalogItemId,
142        diff: Diff,
143    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144        let row = Row::pack_slice(&[
145            Datum::String(&depender.to_string()),
146            Datum::String(&dependee.to_string()),
147        ]);
148        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
149    }
150
151    pub(super) fn pack_database_update(
152        &self,
153        database_id: &DatabaseId,
154        diff: Diff,
155    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
156        let database = self.get_database(database_id);
157        let row = self.pack_privilege_array_row(database.privileges());
158        let privileges = row.unpack_first();
159        BuiltinTableUpdate::row(
160            &*MZ_DATABASES,
161            Row::pack_slice(&[
162                Datum::String(&database.id.to_string()),
163                Datum::UInt32(database.oid),
164                Datum::String(database.name()),
165                Datum::String(&database.owner_id.to_string()),
166                privileges,
167            ]),
168            diff,
169        )
170    }
171
172    pub(super) fn pack_schema_update(
173        &self,
174        database_spec: &ResolvedDatabaseSpecifier,
175        schema_id: &SchemaId,
176        diff: Diff,
177    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
178        let (database_id, schema) = match database_spec {
179            ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
180            ResolvedDatabaseSpecifier::Id(id) => (
181                Some(id.to_string()),
182                &self.database_by_id[id].schemas_by_id[schema_id],
183            ),
184        };
185        let row = self.pack_privilege_array_row(schema.privileges());
186        let privileges = row.unpack_first();
187        BuiltinTableUpdate::row(
188            &*MZ_SCHEMAS,
189            Row::pack_slice(&[
190                Datum::String(&schema_id.to_string()),
191                Datum::UInt32(schema.oid),
192                Datum::from(database_id.as_deref()),
193                Datum::String(&schema.name.schema),
194                Datum::String(&schema.owner_id.to_string()),
195                privileges,
196            ]),
197            diff,
198        )
199    }
200
201    pub(super) fn pack_role_update(
202        &self,
203        id: RoleId,
204        diff: Diff,
205    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
206        match id {
207            // PUBLIC role should not show up in mz_roles.
208            RoleId::Public => vec![],
209            id => {
210                let role = self.get_role(&id);
211                let role_update = BuiltinTableUpdate::row(
212                    &*MZ_ROLES,
213                    Row::pack_slice(&[
214                        Datum::String(&role.id.to_string()),
215                        Datum::UInt32(role.oid),
216                        Datum::String(&role.name),
217                        Datum::from(role.attributes.inherit),
218                    ]),
219                    diff,
220                );
221                let mut updates = vec![role_update];
222
223                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
224                // we should instead have a static list of all session vars.
225                let session_vars_reference = SessionVars::new_unchecked(
226                    &mz_build_info::DUMMY_BUILD_INFO,
227                    SYSTEM_USER.clone(),
228                    None,
229                );
230
231                for (name, val) in role.vars() {
232                    let result = session_vars_reference
233                        .inspect(name)
234                        .and_then(|var| var.check(val.borrow()));
235                    let Ok(formatted_val) = result else {
236                        // Note: all variables should have been validated by this point, so we
237                        // shouldn't ever hit this.
238                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
239                        continue;
240                    };
241
242                    let role_var_update = BuiltinTableUpdate::row(
243                        &*MZ_ROLE_PARAMETERS,
244                        Row::pack_slice(&[
245                            Datum::String(&role.id.to_string()),
246                            Datum::String(name),
247                            Datum::String(&formatted_val),
248                        ]),
249                        diff,
250                    );
251                    updates.push(role_var_update);
252                }
253
254                updates
255            }
256        }
257    }
258
259    pub(super) fn pack_role_members_update(
260        &self,
261        role_id: RoleId,
262        member_id: RoleId,
263        diff: Diff,
264    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
265        let grantor_id = self
266            .get_role(&member_id)
267            .membership
268            .map
269            .get(&role_id)
270            .expect("catalog out of sync");
271        BuiltinTableUpdate::row(
272            &*MZ_ROLE_MEMBERS,
273            Row::pack_slice(&[
274                Datum::String(&role_id.to_string()),
275                Datum::String(&member_id.to_string()),
276                Datum::String(&grantor_id.to_string()),
277            ]),
278            diff,
279        )
280    }
281
282    pub(super) fn pack_cluster_update(
283        &self,
284        name: &str,
285        diff: Diff,
286    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
287        let id = self.clusters_by_name[name];
288        let cluster = &self.clusters_by_id[&id];
289        let row = self.pack_privilege_array_row(cluster.privileges());
290        let privileges = row.unpack_first();
291        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
292            match &cluster.config.variant {
293                ClusterVariant::Managed(config) => (
294                    Some(config.size.as_str()),
295                    Some(self.cluster_replica_size_has_disk(&config.size)),
296                    Some(config.replication_factor),
297                    if config.availability_zones.is_empty() {
298                        None
299                    } else {
300                        Some(config.availability_zones.clone())
301                    },
302                    Some(config.logging.log_logging),
303                    config.logging.interval.map(|d| {
304                        Interval::from_duration(&d)
305                            .expect("planning ensured this convertible back to interval")
306                    }),
307                ),
308                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
309            };
310
311        let mut row = Row::default();
312        let mut packer = row.packer();
313        packer.extend([
314            Datum::String(&id.to_string()),
315            Datum::String(name),
316            Datum::String(&cluster.owner_id.to_string()),
317            privileges,
318            cluster.is_managed().into(),
319            size.into(),
320            replication_factor.into(),
321            disk.into(),
322        ]);
323        if let Some(azs) = azs {
324            packer.push_list(azs.iter().map(|az| Datum::String(az)));
325        } else {
326            packer.push(Datum::Null);
327        }
328        packer.push(Datum::from(introspection_debugging));
329        packer.push(Datum::from(introspection_interval));
330
331        let mut updates = Vec::new();
332
333        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
334
335        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
336            let row = match managed_config.schedule {
337                ClusterSchedule::Manual => Row::pack_slice(&[
338                    Datum::String(&id.to_string()),
339                    Datum::String("manual"),
340                    Datum::Null,
341                ]),
342                ClusterSchedule::Refresh {
343                    hydration_time_estimate,
344                } => Row::pack_slice(&[
345                    Datum::String(&id.to_string()),
346                    Datum::String("on-refresh"),
347                    Datum::Interval(
348                        Interval::from_duration(&hydration_time_estimate)
349                            .expect("planning ensured that this is convertible back to Interval"),
350                    ),
351                ]),
352            };
353            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
354        }
355
356        updates.push(BuiltinTableUpdate::row(
357            &*MZ_CLUSTER_WORKLOAD_CLASSES,
358            Row::pack_slice(&[
359                Datum::String(&id.to_string()),
360                Datum::from(cluster.config.workload_class.as_deref()),
361            ]),
362            diff,
363        ));
364
365        updates
366    }
367
368    pub(super) fn pack_cluster_replica_update(
369        &self,
370        cluster_id: ClusterId,
371        name: &str,
372        diff: Diff,
373    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
374        let cluster = &self.clusters_by_id[&cluster_id];
375        let id = cluster.replica_id(name).expect("Must exist");
376        let replica = cluster.replica(id).expect("Must exist");
377
378        let (size, disk, az, internal, pending) = match &replica.config.location {
379            // TODO(guswynn): The column should be `availability_zones`, not
380            // `availability_zone`.
381            ReplicaLocation::Managed(ManagedReplicaLocation {
382                size,
383                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
384                allocation: _,
385                billed_as: _,
386                internal,
387                pending,
388            }) => (
389                Some(&**size),
390                Some(self.cluster_replica_size_has_disk(size)),
391                Some(az.as_str()),
392                *internal,
393                *pending,
394            ),
395            ReplicaLocation::Managed(ManagedReplicaLocation {
396                size,
397                availability_zones: _,
398                allocation: _,
399                billed_as: _,
400                internal,
401                pending,
402            }) => (
403                Some(&**size),
404                Some(self.cluster_replica_size_has_disk(size)),
405                None,
406                *internal,
407                *pending,
408            ),
409            _ => (None, None, None, false, false),
410        };
411
412        let cluster_replica_update = BuiltinTableUpdate::row(
413            &*MZ_CLUSTER_REPLICAS,
414            Row::pack_slice(&[
415                Datum::String(&id.to_string()),
416                Datum::String(name),
417                Datum::String(&cluster_id.to_string()),
418                Datum::from(size),
419                Datum::from(az),
420                Datum::String(&replica.owner_id.to_string()),
421                Datum::from(disk),
422            ]),
423            diff,
424        );
425
426        let mut updates = vec![cluster_replica_update];
427
428        if internal {
429            let update = BuiltinTableUpdate::row(
430                &*MZ_INTERNAL_CLUSTER_REPLICAS,
431                Row::pack_slice(&[Datum::String(&id.to_string())]),
432                diff,
433            );
434            updates.push(update);
435        }
436
437        if pending {
438            let update = BuiltinTableUpdate::row(
439                &*MZ_PENDING_CLUSTER_REPLICAS,
440                Row::pack_slice(&[Datum::String(&id.to_string())]),
441                diff,
442            );
443            updates.push(update);
444        }
445
446        updates
447    }
448
449    pub(crate) fn pack_network_policy_update(
450        &self,
451        policy_id: &NetworkPolicyId,
452        diff: Diff,
453    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
454        let policy = self.get_network_policy(policy_id);
455        let row = self.pack_privilege_array_row(&policy.privileges);
456        let privileges = row.unpack_first();
457        let mut updates = Vec::new();
458        for ref rule in policy.rules.clone() {
459            updates.push(BuiltinTableUpdate::row(
460                &*MZ_NETWORK_POLICY_RULES,
461                Row::pack_slice(&[
462                    Datum::String(&rule.name),
463                    Datum::String(&policy.id.to_string()),
464                    Datum::String(&rule.action.to_string()),
465                    Datum::String(&rule.address.to_string()),
466                    Datum::String(&rule.direction.to_string()),
467                ]),
468                diff,
469            ));
470        }
471        updates.push(BuiltinTableUpdate::row(
472            &*MZ_NETWORK_POLICIES,
473            Row::pack_slice(&[
474                Datum::String(&policy.id.to_string()),
475                Datum::String(&policy.name),
476                Datum::String(&policy.owner_id.to_string()),
477                privileges,
478                Datum::UInt32(policy.oid.clone()),
479            ]),
480            diff,
481        ));
482
483        Ok(updates)
484    }
485
486    pub(super) fn pack_item_update(
487        &self,
488        id: CatalogItemId,
489        diff: Diff,
490    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
491        let entry = self.get_entry(&id);
492        let oid = entry.oid();
493        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
494        let schema_id = &self
495            .get_schema(
496                &entry.name().qualifiers.database_spec,
497                &entry.name().qualifiers.schema_spec,
498                conn_id,
499            )
500            .id;
501        let name = &entry.name().item;
502        let owner_id = entry.owner_id();
503        let privileges_row = self.pack_privilege_array_row(entry.privileges());
504        let privileges = privileges_row.unpack_first();
505        let mut updates = match entry.item() {
506            CatalogItem::Log(_) => self.pack_source_update(
507                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
508                privileges, diff, None,
509            ),
510            CatalogItem::Index(index) => {
511                self.pack_index_update(id, oid, name, owner_id, index, diff)
512            }
513            CatalogItem::Table(table) => {
514                let mut updates = self
515                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
516
517                if let TableDataSource::DataSource {
518                    desc: data_source,
519                    timeline: _,
520                } = &table.data_source
521                {
522                    updates.extend(match data_source {
523                        DataSourceDesc::IngestionExport {
524                            ingestion_id,
525                            external_reference: UnresolvedItemName(external_reference),
526                            details: _,
527                            data_config: _,
528                        } => {
529                            let ingestion_entry = self
530                                .get_entry(ingestion_id)
531                                .source_desc()
532                                .expect("primary source exists")
533                                .expect("primary source is a source");
534
535                            match ingestion_entry.connection.name() {
536                                "postgres" => {
537                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
538                                    // The left-most qualification of Postgres
539                                    // tables is the database, but this
540                                    // information is redundant because each
541                                    // Postgres connection connects to only one
542                                    // database.
543                                    let schema_name = external_reference[1].to_ast_string_simple();
544                                    let table_name = external_reference[2].to_ast_string_simple();
545
546                                    self.pack_postgres_source_tables_update(
547                                        id,
548                                        &schema_name,
549                                        &table_name,
550                                        diff,
551                                    )
552                                }
553                                "mysql" => {
554                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
555                                    let schema_name = external_reference[0].to_ast_string_simple();
556                                    let table_name = external_reference[1].to_ast_string_simple();
557
558                                    self.pack_mysql_source_tables_update(
559                                        id,
560                                        &schema_name,
561                                        &table_name,
562                                        diff,
563                                    )
564                                }
565                                "sql-server" => {
566                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
567                                    // The left-most qualification of SQL Server tables is
568                                    // the database, but this information is redundant
569                                    // because each SQL Server connection connects to
570                                    // only one database.
571                                    let schema_name = external_reference[1].to_ast_string_simple();
572                                    let table_name = external_reference[2].to_ast_string_simple();
573
574                                    self.pack_sql_server_source_table_update(
575                                        id,
576                                        &schema_name,
577                                        &table_name,
578                                        diff,
579                                    )
580                                }
581                                // Load generator sources don't have any special
582                                // updates.
583                                "load-generator" => vec![],
584                                "kafka" => {
585                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
586                                    let topic = external_reference[0].to_ast_string_simple();
587                                    let envelope = data_source.envelope();
588                                    let (key_format, value_format) = data_source.formats();
589
590                                    self.pack_kafka_source_tables_update(
591                                        id,
592                                        &topic,
593                                        envelope,
594                                        key_format,
595                                        value_format,
596                                        diff,
597                                    )
598                                }
599                                s => unreachable!("{s} sources do not have tables"),
600                            }
601                        }
602                        _ => vec![],
603                    });
604                }
605
606                updates
607            }
608            CatalogItem::Source(source) => {
609                let source_type = source.source_type();
610                let connection_id = source.connection_id();
611                let envelope = source.data_source.envelope();
612                let cluster_entry = match source.data_source {
613                    // Ingestion exports don't have their own cluster, but
614                    // run on their ingestion's cluster.
615                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
616                        self.get_entry(&ingestion_id)
617                    }
618                    _ => entry,
619                };
620
621                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
622
623                let (key_format, value_format) = source.data_source.formats();
624
625                let mut updates = self.pack_source_update(
626                    id,
627                    oid,
628                    schema_id,
629                    name,
630                    source_type,
631                    connection_id,
632                    envelope,
633                    key_format,
634                    value_format,
635                    cluster_id.as_deref(),
636                    owner_id,
637                    privileges,
638                    diff,
639                    source.create_sql.as_ref(),
640                );
641
642                updates.extend(match &source.data_source {
643                    DataSourceDesc::Ingestion { ingestion_desc, .. } => {
644                        match &ingestion_desc.desc.connection {
645                            GenericSourceConnection::Postgres(postgres) => {
646                                self.pack_postgres_source_update(id, postgres, diff)
647                            }
648                            GenericSourceConnection::Kafka(kafka) => {
649                                self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
650                            }
651                            _ => vec![],
652                        }
653                    }
654                    DataSourceDesc::IngestionExport {
655                        ingestion_id,
656                        external_reference: UnresolvedItemName(external_reference),
657                        details: _,
658                        data_config: _,
659                    } => {
660                        let ingestion_entry = self
661                            .get_entry(ingestion_id)
662                            .source_desc()
663                            .expect("primary source exists")
664                            .expect("primary source is a source");
665
666                        match ingestion_entry.connection.name() {
667                            "postgres" => {
668                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
669                                // The left-most qualification of Postgres
670                                // tables is the database, but this
671                                // information is redundant because each
672                                // Postgres connection connects to only one
673                                // database.
674                                let schema_name = external_reference[1].to_ast_string_simple();
675                                let table_name = external_reference[2].to_ast_string_simple();
676
677                                self.pack_postgres_source_tables_update(
678                                    id,
679                                    &schema_name,
680                                    &table_name,
681                                    diff,
682                                )
683                            }
684                            "mysql" => {
685                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
686                                let schema_name = external_reference[0].to_ast_string_simple();
687                                let table_name = external_reference[1].to_ast_string_simple();
688
689                                self.pack_mysql_source_tables_update(
690                                    id,
691                                    &schema_name,
692                                    &table_name,
693                                    diff,
694                                )
695                            }
696                            "sql-server" => {
697                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
698                                // The left-most qualification of SQL Server tables is
699                                // the database, but this information is redundant
700                                // because each SQL Server connection connects to
701                                // only one database.
702                                let schema_name = external_reference[1].to_ast_string_simple();
703                                let table_name = external_reference[2].to_ast_string_simple();
704
705                                self.pack_sql_server_source_table_update(
706                                    id,
707                                    &schema_name,
708                                    &table_name,
709                                    diff,
710                                )
711                            }
712                            // Load generator sources don't have any special
713                            // updates.
714                            "load-generator" => vec![],
715                            s => unreachable!("{s} sources do not have subsources"),
716                        }
717                    }
718                    DataSourceDesc::Webhook { .. } => {
719                        vec![self.pack_webhook_source_update(id, diff)]
720                    }
721                    _ => vec![],
722                });
723
724                updates
725            }
726            CatalogItem::View(view) => {
727                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
728            }
729            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
730                id, oid, schema_id, name, owner_id, privileges, mview, diff,
731            ),
732            CatalogItem::Sink(sink) => {
733                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
734            }
735            CatalogItem::Type(ty) => {
736                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
737            }
738            CatalogItem::Func(func) => {
739                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
740            }
741            CatalogItem::Secret(_) => {
742                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
743            }
744            CatalogItem::Connection(connection) => self.pack_connection_update(
745                id, oid, schema_id, name, owner_id, privileges, connection, diff,
746            ),
747            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
748                id, oid, schema_id, name, owner_id, privileges, ct, diff,
749            ),
750        };
751
752        if !entry.item().is_temporary() {
753            // Populate or clean up the `mz_object_dependencies` table.
754            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
755            for dependee in entry.item().references().items() {
756                updates.push(self.pack_depends_update(id, *dependee, diff))
757            }
758        }
759
760        let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
761        // Always report the latest for an objects columns.
762        if let Ok(desc) = entry.desc_latest(&full_name) {
763            let defaults = match entry.item() {
764                CatalogItem::Table(Table {
765                    data_source: TableDataSource::TableWrites { defaults },
766                    ..
767                }) => Some(defaults),
768                _ => None,
769            };
770            for (i, (column_name, column_type)) in desc.iter().enumerate() {
771                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
772                let default: Datum = default
773                    .as_ref()
774                    .map(|d| Datum::String(d))
775                    .unwrap_or(Datum::Null);
776                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
777                let (type_name, type_oid) = match &column_type.scalar_type {
778                    SqlScalarType::List {
779                        custom_id: Some(custom_id),
780                        ..
781                    }
782                    | SqlScalarType::Map {
783                        custom_id: Some(custom_id),
784                        ..
785                    }
786                    | SqlScalarType::Record {
787                        custom_id: Some(custom_id),
788                        ..
789                    } => {
790                        let entry = self.get_entry(custom_id);
791                        // NOTE(benesch): the `mz_columns.type text` field is
792                        // wrong. Types do not have a name that can be
793                        // represented as a single textual field. There can be
794                        // multiple types with the same name in different
795                        // schemas and databases. We should eventually deprecate
796                        // the `type` field in favor of a new `type_id` field
797                        // that can be joined against `mz_types`.
798                        //
799                        // For now, in the interest of pragmatism, we just use
800                        // the type's item name, and accept that there may be
801                        // ambiguity if the same type name is used in multiple
802                        // schemas. The ambiguity is mitigated by the OID, which
803                        // can be joined against `mz_types.oid` to resolve the
804                        // ambiguity.
805                        let name = &*entry.name().item;
806                        let oid = entry.oid();
807                        (name, oid)
808                    }
809                    _ => (pgtype.name(), pgtype.oid()),
810                };
811                updates.push(BuiltinTableUpdate::row(
812                    &*MZ_COLUMNS,
813                    Row::pack_slice(&[
814                        Datum::String(&id.to_string()),
815                        Datum::String(column_name),
816                        Datum::UInt64(u64::cast_from(i + 1)),
817                        Datum::from(column_type.nullable),
818                        Datum::String(type_name),
819                        default,
820                        Datum::UInt32(type_oid),
821                        Datum::Int32(pgtype.typmod()),
822                    ]),
823                    diff,
824                ));
825            }
826        }
827
828        // Use initial lcw so that we can tell apart default from non-existent windows.
829        if let Some(cw) = entry.item().initial_logical_compaction_window() {
830            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
831        }
832
833        updates
834    }
835
836    fn pack_history_retention_strategy_update(
837        &self,
838        id: CatalogItemId,
839        cw: CompactionWindow,
840        diff: Diff,
841    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
842        let cw: u64 = cw.comparable_timestamp().into();
843        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
844            .expect("must serialize");
845        BuiltinTableUpdate::row(
846            &*MZ_HISTORY_RETENTION_STRATEGIES,
847            Row::pack_slice(&[
848                Datum::String(&id.to_string()),
849                // FOR is the only strategy at the moment. We may introduce FROM or others later.
850                Datum::String("FOR"),
851                cw.into_row().into_element(),
852            ]),
853            diff,
854        )
855    }
856
857    fn pack_table_update(
858        &self,
859        id: CatalogItemId,
860        oid: u32,
861        schema_id: &SchemaSpecifier,
862        name: &str,
863        owner_id: &RoleId,
864        privileges: Datum,
865        diff: Diff,
866        table: &Table,
867    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
868        let redacted = table.create_sql.as_ref().map(|create_sql| {
869            mz_sql::parse::parse(create_sql)
870                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
871                .into_element()
872                .ast
873                .to_ast_string_redacted()
874        });
875        let source_id = if let TableDataSource::DataSource {
876            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
877            ..
878        } = &table.data_source
879        {
880            Some(ingestion_id.to_string())
881        } else {
882            None
883        };
884
885        vec![BuiltinTableUpdate::row(
886            &*MZ_TABLES,
887            Row::pack_slice(&[
888                Datum::String(&id.to_string()),
889                Datum::UInt32(oid),
890                Datum::String(&schema_id.to_string()),
891                Datum::String(name),
892                Datum::String(&owner_id.to_string()),
893                privileges,
894                if let Some(create_sql) = &table.create_sql {
895                    Datum::String(create_sql)
896                } else {
897                    Datum::Null
898                },
899                if let Some(redacted) = &redacted {
900                    Datum::String(redacted)
901                } else {
902                    Datum::Null
903                },
904                if let Some(source_id) = source_id.as_ref() {
905                    Datum::String(source_id)
906                } else {
907                    Datum::Null
908                },
909            ]),
910            diff,
911        )]
912    }
913
914    fn pack_source_update(
915        &self,
916        id: CatalogItemId,
917        oid: u32,
918        schema_id: &SchemaSpecifier,
919        name: &str,
920        source_desc_name: &str,
921        connection_id: Option<CatalogItemId>,
922        envelope: Option<&str>,
923        key_format: Option<&str>,
924        value_format: Option<&str>,
925        cluster_id: Option<&str>,
926        owner_id: &RoleId,
927        privileges: Datum,
928        diff: Diff,
929        create_sql: Option<&String>,
930    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
931        let redacted = create_sql.map(|create_sql| {
932            let create_stmt = mz_sql::parse::parse(create_sql)
933                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
934                .into_element()
935                .ast;
936            create_stmt.to_ast_string_redacted()
937        });
938        vec![BuiltinTableUpdate::row(
939            &*MZ_SOURCES,
940            Row::pack_slice(&[
941                Datum::String(&id.to_string()),
942                Datum::UInt32(oid),
943                Datum::String(&schema_id.to_string()),
944                Datum::String(name),
945                Datum::String(source_desc_name),
946                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
947                // This is the "source size", which is a remnant from linked
948                // clusters.
949                Datum::Null,
950                Datum::from(envelope),
951                Datum::from(key_format),
952                Datum::from(value_format),
953                Datum::from(cluster_id),
954                Datum::String(&owner_id.to_string()),
955                privileges,
956                if let Some(create_sql) = create_sql {
957                    Datum::String(create_sql)
958                } else {
959                    Datum::Null
960                },
961                if let Some(redacted) = &redacted {
962                    Datum::String(redacted)
963                } else {
964                    Datum::Null
965                },
966            ]),
967            diff,
968        )]
969    }
970
971    fn pack_postgres_source_update(
972        &self,
973        id: CatalogItemId,
974        postgres: &PostgresSourceConnection<ReferencedConnection>,
975        diff: Diff,
976    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
977        vec![BuiltinTableUpdate::row(
978            &*MZ_POSTGRES_SOURCES,
979            Row::pack_slice(&[
980                Datum::String(&id.to_string()),
981                Datum::String(&postgres.publication_details.slot),
982                Datum::from(postgres.publication_details.timeline_id),
983            ]),
984            diff,
985        )]
986    }
987
988    fn pack_kafka_source_update(
989        &self,
990        item_id: CatalogItemId,
991        collection_id: GlobalId,
992        kafka: &KafkaSourceConnection<ReferencedConnection>,
993        diff: Diff,
994    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
995        vec![BuiltinTableUpdate::row(
996            &*MZ_KAFKA_SOURCES,
997            Row::pack_slice(&[
998                Datum::String(&item_id.to_string()),
999                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1000                Datum::String(&kafka.topic),
1001            ]),
1002            diff,
1003        )]
1004    }
1005
1006    fn pack_postgres_source_tables_update(
1007        &self,
1008        id: CatalogItemId,
1009        schema_name: &str,
1010        table_name: &str,
1011        diff: Diff,
1012    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1013        vec![BuiltinTableUpdate::row(
1014            &*MZ_POSTGRES_SOURCE_TABLES,
1015            Row::pack_slice(&[
1016                Datum::String(&id.to_string()),
1017                Datum::String(schema_name),
1018                Datum::String(table_name),
1019            ]),
1020            diff,
1021        )]
1022    }
1023
1024    fn pack_mysql_source_tables_update(
1025        &self,
1026        id: CatalogItemId,
1027        schema_name: &str,
1028        table_name: &str,
1029        diff: Diff,
1030    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1031        vec![BuiltinTableUpdate::row(
1032            &*MZ_MYSQL_SOURCE_TABLES,
1033            Row::pack_slice(&[
1034                Datum::String(&id.to_string()),
1035                Datum::String(schema_name),
1036                Datum::String(table_name),
1037            ]),
1038            diff,
1039        )]
1040    }
1041
1042    fn pack_sql_server_source_table_update(
1043        &self,
1044        id: CatalogItemId,
1045        schema_name: &str,
1046        table_name: &str,
1047        diff: Diff,
1048    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1049        vec![BuiltinTableUpdate::row(
1050            &*MZ_SQL_SERVER_SOURCE_TABLES,
1051            Row::pack_slice(&[
1052                Datum::String(&id.to_string()),
1053                Datum::String(schema_name),
1054                Datum::String(table_name),
1055            ]),
1056            diff,
1057        )]
1058    }
1059
1060    fn pack_kafka_source_tables_update(
1061        &self,
1062        id: CatalogItemId,
1063        topic: &str,
1064        envelope: Option<&str>,
1065        key_format: Option<&str>,
1066        value_format: Option<&str>,
1067        diff: Diff,
1068    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1069        vec![BuiltinTableUpdate::row(
1070            &*MZ_KAFKA_SOURCE_TABLES,
1071            Row::pack_slice(&[
1072                Datum::String(&id.to_string()),
1073                Datum::String(topic),
1074                Datum::from(envelope),
1075                Datum::from(key_format),
1076                Datum::from(value_format),
1077            ]),
1078            diff,
1079        )]
1080    }
1081
1082    fn pack_connection_update(
1083        &self,
1084        id: CatalogItemId,
1085        oid: u32,
1086        schema_id: &SchemaSpecifier,
1087        name: &str,
1088        owner_id: &RoleId,
1089        privileges: Datum,
1090        connection: &Connection,
1091        diff: Diff,
1092    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1093        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1094            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1095            .into_element()
1096            .ast;
1097        let mut updates = vec![BuiltinTableUpdate::row(
1098            &*MZ_CONNECTIONS,
1099            Row::pack_slice(&[
1100                Datum::String(&id.to_string()),
1101                Datum::UInt32(oid),
1102                Datum::String(&schema_id.to_string()),
1103                Datum::String(name),
1104                Datum::String(match connection.details {
1105                    ConnectionDetails::Kafka { .. } => "kafka",
1106                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1107                    ConnectionDetails::Postgres { .. } => "postgres",
1108                    ConnectionDetails::Aws(..) => "aws",
1109                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1110                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1111                    ConnectionDetails::MySql { .. } => "mysql",
1112                    ConnectionDetails::SqlServer(_) => "sql-server",
1113                    ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1114                }),
1115                Datum::String(&owner_id.to_string()),
1116                privileges,
1117                Datum::String(&connection.create_sql),
1118                Datum::String(&create_stmt.to_ast_string_redacted()),
1119            ]),
1120            diff,
1121        )];
1122        match connection.details {
1123            ConnectionDetails::Kafka(ref kafka) => {
1124                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1125            }
1126            ConnectionDetails::Aws(ref aws_config) => {
1127                match self.pack_aws_connection_update(id, aws_config, diff) {
1128                    Ok(update) => {
1129                        updates.push(update);
1130                    }
1131                    Err(e) => {
1132                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1133                    }
1134                }
1135            }
1136            ConnectionDetails::AwsPrivatelink(_) => {
1137                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1138                    updates.push(self.pack_aws_privatelink_connection_update(
1139                        id,
1140                        aws_principal_context,
1141                        diff,
1142                    ));
1143                } else {
1144                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1145                }
1146            }
1147            ConnectionDetails::Ssh {
1148                ref key_1,
1149                ref key_2,
1150                ..
1151            } => {
1152                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1153            }
1154            ConnectionDetails::Csr(_)
1155            | ConnectionDetails::Postgres(_)
1156            | ConnectionDetails::MySql(_)
1157            | ConnectionDetails::SqlServer(_)
1158            | ConnectionDetails::IcebergCatalog(_) => (),
1159        };
1160        updates
1161    }
1162
1163    pub(crate) fn pack_ssh_tunnel_connection_update(
1164        &self,
1165        id: CatalogItemId,
1166        key_1: &SshKey,
1167        key_2: &SshKey,
1168        diff: Diff,
1169    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1170        BuiltinTableUpdate::row(
1171            &*MZ_SSH_TUNNEL_CONNECTIONS,
1172            Row::pack_slice(&[
1173                Datum::String(&id.to_string()),
1174                Datum::String(key_1.public_key().as_str()),
1175                Datum::String(key_2.public_key().as_str()),
1176            ]),
1177            diff,
1178        )
1179    }
1180
1181    fn pack_kafka_connection_update(
1182        &self,
1183        id: CatalogItemId,
1184        kafka: &KafkaConnection<ReferencedConnection>,
1185        diff: Diff,
1186    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1187        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1188        let mut row = Row::default();
1189        row.packer()
1190            .try_push_array(
1191                &[ArrayDimension {
1192                    lower_bound: 1,
1193                    length: kafka.brokers.len(),
1194                }],
1195                kafka
1196                    .brokers
1197                    .iter()
1198                    .map(|broker| Datum::String(&broker.address)),
1199            )
1200            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1201        let brokers = row.unpack_first();
1202        vec![BuiltinTableUpdate::row(
1203            &*MZ_KAFKA_CONNECTIONS,
1204            Row::pack_slice(&[
1205                Datum::String(&id.to_string()),
1206                brokers,
1207                Datum::String(&progress_topic),
1208            ]),
1209            diff,
1210        )]
1211    }
1212
1213    pub fn pack_aws_privatelink_connection_update(
1214        &self,
1215        connection_id: CatalogItemId,
1216        aws_principal_context: &AwsPrincipalContext,
1217        diff: Diff,
1218    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1219        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1220        let row = Row::pack_slice(&[
1221            Datum::String(&connection_id.to_string()),
1222            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1223        ]);
1224        BuiltinTableUpdate::row(id, row, diff)
1225    }
1226
1227    pub fn pack_aws_connection_update(
1228        &self,
1229        connection_id: CatalogItemId,
1230        aws_config: &AwsConnection,
1231        diff: Diff,
1232    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1233        let id = &MZ_AWS_CONNECTIONS;
1234
1235        let mut access_key_id = None;
1236        let mut access_key_id_secret_id = None;
1237        let mut secret_access_key_secret_id = None;
1238        let mut session_token = None;
1239        let mut session_token_secret_id = None;
1240        let mut assume_role_arn = None;
1241        let mut assume_role_session_name = None;
1242        let mut principal = None;
1243        let mut external_id = None;
1244        let mut example_trust_policy = None;
1245        match &aws_config.auth {
1246            AwsAuth::Credentials(credentials) => {
1247                match &credentials.access_key_id {
1248                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1249                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1250                }
1251                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1252                match credentials.session_token.as_ref() {
1253                    None => (),
1254                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1255                    Some(StringOrSecret::Secret(s)) => {
1256                        session_token_secret_id = Some(s.to_string())
1257                    }
1258                }
1259            }
1260            AwsAuth::AssumeRole(assume_role) => {
1261                assume_role_arn = Some(assume_role.arn.as_str());
1262                assume_role_session_name = assume_role.session_name.as_deref();
1263                principal = self
1264                    .config
1265                    .connection_context
1266                    .aws_connection_role_arn
1267                    .as_deref();
1268                external_id =
1269                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1270                example_trust_policy = {
1271                    let policy = assume_role
1272                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1273                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1274                    Some(policy.into_row())
1275                };
1276            }
1277        }
1278
1279        let row = Row::pack_slice(&[
1280            Datum::String(&connection_id.to_string()),
1281            Datum::from(aws_config.endpoint.as_deref()),
1282            Datum::from(aws_config.region.as_deref()),
1283            Datum::from(access_key_id),
1284            Datum::from(access_key_id_secret_id.as_deref()),
1285            Datum::from(secret_access_key_secret_id.as_deref()),
1286            Datum::from(session_token),
1287            Datum::from(session_token_secret_id.as_deref()),
1288            Datum::from(assume_role_arn),
1289            Datum::from(assume_role_session_name),
1290            Datum::from(principal),
1291            Datum::from(external_id.as_deref()),
1292            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1293        ]);
1294
1295        Ok(BuiltinTableUpdate::row(id, row, diff))
1296    }
1297
1298    fn pack_view_update(
1299        &self,
1300        id: CatalogItemId,
1301        oid: u32,
1302        schema_id: &SchemaSpecifier,
1303        name: &str,
1304        owner_id: &RoleId,
1305        privileges: Datum,
1306        view: &View,
1307        diff: Diff,
1308    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1309        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1310            .unwrap_or_else(|e| {
1311                panic!(
1312                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1313                    view.create_sql, e
1314                )
1315            })
1316            .into_element()
1317            .ast;
1318        let query = match &create_stmt {
1319            Statement::CreateView(stmt) => &stmt.definition.query,
1320            _ => unreachable!(),
1321        };
1322
1323        let mut query_string = query.to_ast_string_stable();
1324        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1325        // do the same for compatibility's sake.
1326        query_string.push(';');
1327
1328        vec![BuiltinTableUpdate::row(
1329            &*MZ_VIEWS,
1330            Row::pack_slice(&[
1331                Datum::String(&id.to_string()),
1332                Datum::UInt32(oid),
1333                Datum::String(&schema_id.to_string()),
1334                Datum::String(name),
1335                Datum::String(&query_string),
1336                Datum::String(&owner_id.to_string()),
1337                privileges,
1338                Datum::String(&view.create_sql),
1339                Datum::String(&create_stmt.to_ast_string_redacted()),
1340            ]),
1341            diff,
1342        )]
1343    }
1344
1345    fn pack_materialized_view_update(
1346        &self,
1347        id: CatalogItemId,
1348        oid: u32,
1349        schema_id: &SchemaSpecifier,
1350        name: &str,
1351        owner_id: &RoleId,
1352        privileges: Datum,
1353        mview: &MaterializedView,
1354        diff: Diff,
1355    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1356        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1357            .unwrap_or_else(|e| {
1358                panic!(
1359                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1360                    mview.create_sql, e
1361                )
1362            })
1363            .into_element()
1364            .ast;
1365        let query_string = match &create_stmt {
1366            Statement::CreateMaterializedView(stmt) => {
1367                let mut query_string = stmt.query.to_ast_string_stable();
1368                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1369                // do the same for compatibility's sake.
1370                query_string.push(';');
1371                query_string
1372            }
1373            _ => unreachable!(),
1374        };
1375
1376        let mut updates = Vec::new();
1377
1378        updates.push(BuiltinTableUpdate::row(
1379            &*MZ_MATERIALIZED_VIEWS,
1380            Row::pack_slice(&[
1381                Datum::String(&id.to_string()),
1382                Datum::UInt32(oid),
1383                Datum::String(&schema_id.to_string()),
1384                Datum::String(name),
1385                Datum::String(&mview.cluster_id.to_string()),
1386                Datum::String(&query_string),
1387                Datum::String(&owner_id.to_string()),
1388                privileges,
1389                Datum::String(&mview.create_sql),
1390                Datum::String(&create_stmt.to_ast_string_redacted()),
1391            ]),
1392            diff,
1393        ));
1394
1395        if let Some(refresh_schedule) = &mview.refresh_schedule {
1396            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1397            // empty `RefreshSchedule`.
1398            assert!(!refresh_schedule.is_empty());
1399            for RefreshEvery {
1400                interval,
1401                aligned_to,
1402            } in refresh_schedule.everies.iter()
1403            {
1404                let aligned_to_dt = mz_ore::now::to_datetime(
1405                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1406                );
1407                updates.push(BuiltinTableUpdate::row(
1408                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1409                    Row::pack_slice(&[
1410                        Datum::String(&id.to_string()),
1411                        Datum::String("every"),
1412                        Datum::Interval(
1413                            Interval::from_duration(interval).expect(
1414                                "planning ensured that this is convertible back to Interval",
1415                            ),
1416                        ),
1417                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1418                        Datum::Null,
1419                    ]),
1420                    diff,
1421                ));
1422            }
1423            for at in refresh_schedule.ats.iter() {
1424                let at_dt = mz_ore::now::to_datetime(
1425                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1426                );
1427                updates.push(BuiltinTableUpdate::row(
1428                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1429                    Row::pack_slice(&[
1430                        Datum::String(&id.to_string()),
1431                        Datum::String("at"),
1432                        Datum::Null,
1433                        Datum::Null,
1434                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1435                    ]),
1436                    diff,
1437                ));
1438            }
1439        } else {
1440            updates.push(BuiltinTableUpdate::row(
1441                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1442                Row::pack_slice(&[
1443                    Datum::String(&id.to_string()),
1444                    Datum::String("on-commit"),
1445                    Datum::Null,
1446                    Datum::Null,
1447                    Datum::Null,
1448                ]),
1449                diff,
1450            ));
1451        }
1452
1453        updates
1454    }
1455
1456    fn pack_continual_task_update(
1457        &self,
1458        id: CatalogItemId,
1459        oid: u32,
1460        schema_id: &SchemaSpecifier,
1461        name: &str,
1462        owner_id: &RoleId,
1463        privileges: Datum,
1464        ct: &ContinualTask,
1465        diff: Diff,
1466    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1467        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1468            .unwrap_or_else(|e| {
1469                panic!(
1470                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1471                    ct.create_sql, e
1472                )
1473            })
1474            .into_element()
1475            .ast;
1476        let query_string = match &create_stmt {
1477            Statement::CreateContinualTask(stmt) => {
1478                let mut query_string = String::new();
1479                for stmt in &stmt.stmts {
1480                    let s = match stmt {
1481                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1482                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1483                    };
1484                    if query_string.is_empty() {
1485                        query_string = s;
1486                    } else {
1487                        query_string.push_str("; ");
1488                        query_string.push_str(&s);
1489                    }
1490                }
1491                query_string
1492            }
1493            _ => unreachable!(),
1494        };
1495
1496        vec![BuiltinTableUpdate::row(
1497            &*MZ_CONTINUAL_TASKS,
1498            Row::pack_slice(&[
1499                Datum::String(&id.to_string()),
1500                Datum::UInt32(oid),
1501                Datum::String(&schema_id.to_string()),
1502                Datum::String(name),
1503                Datum::String(&ct.cluster_id.to_string()),
1504                Datum::String(&query_string),
1505                Datum::String(&owner_id.to_string()),
1506                privileges,
1507                Datum::String(&ct.create_sql),
1508                Datum::String(&create_stmt.to_ast_string_redacted()),
1509            ]),
1510            diff,
1511        )]
1512    }
1513
1514    fn pack_sink_update(
1515        &self,
1516        id: CatalogItemId,
1517        oid: u32,
1518        schema_id: &SchemaSpecifier,
1519        name: &str,
1520        owner_id: &RoleId,
1521        sink: &Sink,
1522        diff: Diff,
1523    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1524        let mut updates = vec![];
1525        match &sink.connection {
1526            StorageSinkConnection::Kafka(KafkaSinkConnection {
1527                topic: topic_name, ..
1528            }) => {
1529                updates.push(BuiltinTableUpdate::row(
1530                    &*MZ_KAFKA_SINKS,
1531                    Row::pack_slice(&[
1532                        Datum::String(&id.to_string()),
1533                        Datum::String(topic_name.as_str()),
1534                    ]),
1535                    diff,
1536                ));
1537            }
1538        };
1539
1540        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1541            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1542            .into_element()
1543            .ast;
1544
1545        let envelope = sink.envelope();
1546
1547        // The combined format string is used for the deprecated `format` column.
1548        let combined_format = sink.combined_format();
1549        let (key_format, value_format) = sink.formats();
1550
1551        updates.push(BuiltinTableUpdate::row(
1552            &*MZ_SINKS,
1553            Row::pack_slice(&[
1554                Datum::String(&id.to_string()),
1555                Datum::UInt32(oid),
1556                Datum::String(&schema_id.to_string()),
1557                Datum::String(name),
1558                Datum::String(sink.connection.name()),
1559                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1560                // size column now deprecated w/o linked clusters
1561                Datum::Null,
1562                Datum::from(envelope),
1563                Datum::from(combined_format.as_ref()),
1564                Datum::from(key_format),
1565                Datum::String(value_format),
1566                Datum::String(&sink.cluster_id.to_string()),
1567                Datum::String(&owner_id.to_string()),
1568                Datum::String(&sink.create_sql),
1569                Datum::String(&create_stmt.to_ast_string_redacted()),
1570            ]),
1571            diff,
1572        ));
1573
1574        updates
1575    }
1576
1577    fn pack_index_update(
1578        &self,
1579        id: CatalogItemId,
1580        oid: u32,
1581        name: &str,
1582        owner_id: &RoleId,
1583        index: &Index,
1584        diff: Diff,
1585    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1586        let mut updates = vec![];
1587
1588        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1589            .unwrap_or_else(|e| {
1590                panic!(
1591                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1592                    index.create_sql, e
1593                )
1594            })
1595            .into_element()
1596            .ast;
1597
1598        let key_sqls = match &create_stmt {
1599            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1600                .as_ref()
1601                .expect("key_parts is filled in during planning"),
1602            _ => unreachable!(),
1603        };
1604        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1605
1606        updates.push(BuiltinTableUpdate::row(
1607            &*MZ_INDEXES,
1608            Row::pack_slice(&[
1609                Datum::String(&id.to_string()),
1610                Datum::UInt32(oid),
1611                Datum::String(name),
1612                Datum::String(&on_item_id.to_string()),
1613                Datum::String(&index.cluster_id.to_string()),
1614                Datum::String(&owner_id.to_string()),
1615                Datum::String(&index.create_sql),
1616                Datum::String(&create_stmt.to_ast_string_redacted()),
1617            ]),
1618            diff,
1619        ));
1620
1621        for (i, key) in index.keys.iter().enumerate() {
1622            let on_entry = self.get_entry_by_global_id(&index.on);
1623            let nullable = key
1624                .typ(
1625                    &on_entry
1626                        .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1627                        .expect("can only create indexes on items with a valid description")
1628                        .typ()
1629                        .column_types,
1630                )
1631                .nullable;
1632            let seq_in_index = u64::cast_from(i + 1);
1633            let key_sql = key_sqls
1634                .get(i)
1635                .expect("missing sql information for index key")
1636                .to_ast_string_simple();
1637            let (field_number, expression) = match key {
1638                MirScalarExpr::Column(col, _) => {
1639                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1640                }
1641                _ => (Datum::Null, Datum::String(&key_sql)),
1642            };
1643            updates.push(BuiltinTableUpdate::row(
1644                &*MZ_INDEX_COLUMNS,
1645                Row::pack_slice(&[
1646                    Datum::String(&id.to_string()),
1647                    Datum::UInt64(seq_in_index),
1648                    field_number,
1649                    expression,
1650                    Datum::from(nullable),
1651                ]),
1652                diff,
1653            ));
1654        }
1655
1656        updates
1657    }
1658
1659    fn pack_type_update(
1660        &self,
1661        id: CatalogItemId,
1662        oid: u32,
1663        schema_id: &SchemaSpecifier,
1664        name: &str,
1665        owner_id: &RoleId,
1666        privileges: Datum,
1667        typ: &Type,
1668        diff: Diff,
1669    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1670        let mut out = vec![];
1671
1672        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1673            mz_sql::parse::parse(create_sql)
1674                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1675                .into_element()
1676                .ast
1677                .to_ast_string_redacted()
1678        });
1679
1680        out.push(BuiltinTableUpdate::row(
1681            &*MZ_TYPES,
1682            Row::pack_slice(&[
1683                Datum::String(&id.to_string()),
1684                Datum::UInt32(oid),
1685                Datum::String(&schema_id.to_string()),
1686                Datum::String(name),
1687                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1688                Datum::String(&owner_id.to_string()),
1689                privileges,
1690                if let Some(create_sql) = &typ.create_sql {
1691                    Datum::String(create_sql)
1692                } else {
1693                    Datum::Null
1694                },
1695                if let Some(redacted) = &redacted {
1696                    Datum::String(redacted)
1697                } else {
1698                    Datum::Null
1699                },
1700            ]),
1701            diff,
1702        ));
1703
1704        let mut row = Row::default();
1705        let mut packer = row.packer();
1706
1707        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1708            if mods.is_empty() {
1709                packer.push(Datum::Null);
1710            } else {
1711                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1712            }
1713        }
1714
1715        let index_id = match &typ.details.typ {
1716            CatalogType::Array {
1717                element_reference: element_id,
1718            } => {
1719                packer.push(Datum::String(&id.to_string()));
1720                packer.push(Datum::String(&element_id.to_string()));
1721                &MZ_ARRAY_TYPES
1722            }
1723            CatalogType::List {
1724                element_reference: element_id,
1725                element_modifiers,
1726            } => {
1727                packer.push(Datum::String(&id.to_string()));
1728                packer.push(Datum::String(&element_id.to_string()));
1729                append_modifier(&mut packer, element_modifiers);
1730                &MZ_LIST_TYPES
1731            }
1732            CatalogType::Map {
1733                key_reference: key_id,
1734                value_reference: value_id,
1735                key_modifiers,
1736                value_modifiers,
1737            } => {
1738                packer.push(Datum::String(&id.to_string()));
1739                packer.push(Datum::String(&key_id.to_string()));
1740                packer.push(Datum::String(&value_id.to_string()));
1741                append_modifier(&mut packer, key_modifiers);
1742                append_modifier(&mut packer, value_modifiers);
1743                &MZ_MAP_TYPES
1744            }
1745            CatalogType::Pseudo => {
1746                packer.push(Datum::String(&id.to_string()));
1747                &MZ_PSEUDO_TYPES
1748            }
1749            _ => {
1750                packer.push(Datum::String(&id.to_string()));
1751                &MZ_BASE_TYPES
1752            }
1753        };
1754        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1755
1756        if let Some(pg_metadata) = &typ.details.pg_metadata {
1757            out.push(BuiltinTableUpdate::row(
1758                &*MZ_TYPE_PG_METADATA,
1759                Row::pack_slice(&[
1760                    Datum::String(&id.to_string()),
1761                    Datum::UInt32(pg_metadata.typinput_oid),
1762                    Datum::UInt32(pg_metadata.typreceive_oid),
1763                ]),
1764                diff,
1765            ));
1766        }
1767
1768        out
1769    }
1770
1771    fn pack_func_update(
1772        &self,
1773        id: CatalogItemId,
1774        schema_id: &SchemaSpecifier,
1775        name: &str,
1776        owner_id: &RoleId,
1777        func: &Func,
1778        diff: Diff,
1779    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1780        let mut updates = vec![];
1781        for func_impl_details in func.inner.func_impls() {
1782            let arg_type_ids = func_impl_details
1783                .arg_typs
1784                .iter()
1785                .map(|typ| self.get_system_type(typ).id().to_string())
1786                .collect::<Vec<_>>();
1787
1788            let mut row = Row::default();
1789            row.packer()
1790                .try_push_array(
1791                    &[ArrayDimension {
1792                        lower_bound: 1,
1793                        length: arg_type_ids.len(),
1794                    }],
1795                    arg_type_ids.iter().map(|id| Datum::String(id)),
1796                )
1797                .expect(
1798                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1799                );
1800            let arg_type_ids = row.unpack_first();
1801
1802            updates.push(BuiltinTableUpdate::row(
1803                &*MZ_FUNCTIONS,
1804                Row::pack_slice(&[
1805                    Datum::String(&id.to_string()),
1806                    Datum::UInt32(func_impl_details.oid),
1807                    Datum::String(&schema_id.to_string()),
1808                    Datum::String(name),
1809                    arg_type_ids,
1810                    Datum::from(
1811                        func_impl_details
1812                            .variadic_typ
1813                            .map(|typ| self.get_system_type(typ).id().to_string())
1814                            .as_deref(),
1815                    ),
1816                    Datum::from(
1817                        func_impl_details
1818                            .return_typ
1819                            .map(|typ| self.get_system_type(typ).id().to_string())
1820                            .as_deref(),
1821                    ),
1822                    func_impl_details.return_is_set.into(),
1823                    Datum::String(&owner_id.to_string()),
1824                ]),
1825                diff,
1826            ));
1827
1828            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1829                updates.push(BuiltinTableUpdate::row(
1830                    &*MZ_AGGREGATES,
1831                    Row::pack_slice(&[
1832                        Datum::UInt32(func_impl_details.oid),
1833                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1834                        Datum::String("n"),
1835                        Datum::Int16(0),
1836                    ]),
1837                    diff,
1838                ));
1839            }
1840        }
1841        updates
1842    }
1843
1844    pub fn pack_op_update(
1845        &self,
1846        operator: &str,
1847        func_impl_details: FuncImplCatalogDetails,
1848        diff: Diff,
1849    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1850        let arg_type_ids = func_impl_details
1851            .arg_typs
1852            .iter()
1853            .map(|typ| self.get_system_type(typ).id().to_string())
1854            .collect::<Vec<_>>();
1855
1856        let mut row = Row::default();
1857        row.packer()
1858            .try_push_array(
1859                &[ArrayDimension {
1860                    lower_bound: 1,
1861                    length: arg_type_ids.len(),
1862                }],
1863                arg_type_ids.iter().map(|id| Datum::String(id)),
1864            )
1865            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1866        let arg_type_ids = row.unpack_first();
1867
1868        BuiltinTableUpdate::row(
1869            &*MZ_OPERATORS,
1870            Row::pack_slice(&[
1871                Datum::UInt32(func_impl_details.oid),
1872                Datum::String(operator),
1873                arg_type_ids,
1874                Datum::from(
1875                    func_impl_details
1876                        .return_typ
1877                        .map(|typ| self.get_system_type(typ).id().to_string())
1878                        .as_deref(),
1879                ),
1880            ]),
1881            diff,
1882        )
1883    }
1884
1885    fn pack_secret_update(
1886        &self,
1887        id: CatalogItemId,
1888        oid: u32,
1889        schema_id: &SchemaSpecifier,
1890        name: &str,
1891        owner_id: &RoleId,
1892        privileges: Datum,
1893        diff: Diff,
1894    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1895        vec![BuiltinTableUpdate::row(
1896            &*MZ_SECRETS,
1897            Row::pack_slice(&[
1898                Datum::String(&id.to_string()),
1899                Datum::UInt32(oid),
1900                Datum::String(&schema_id.to_string()),
1901                Datum::String(name),
1902                Datum::String(&owner_id.to_string()),
1903                privileges,
1904            ]),
1905            diff,
1906        )]
1907    }
1908
1909    pub fn pack_audit_log_update(
1910        &self,
1911        event: &VersionedEvent,
1912        diff: Diff,
1913    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1914        let (event_type, object_type, details, user, occurred_at): (
1915            &EventType,
1916            &ObjectType,
1917            &EventDetails,
1918            &Option<String>,
1919            u64,
1920        ) = match event {
1921            VersionedEvent::V1(ev) => (
1922                &ev.event_type,
1923                &ev.object_type,
1924                &ev.details,
1925                &ev.user,
1926                ev.occurred_at,
1927            ),
1928        };
1929        let details = Jsonb::from_serde_json(details.as_json())
1930            .map_err(|e| {
1931                Error::new(ErrorKind::Unstructured(format!(
1932                    "could not pack audit log update: {}",
1933                    e
1934                )))
1935            })?
1936            .into_row();
1937        let details = details
1938            .iter()
1939            .next()
1940            .expect("details created above with a single jsonb column");
1941        let dt = mz_ore::now::to_datetime(occurred_at);
1942        let id = event.sortable_id();
1943        Ok(BuiltinTableUpdate::row(
1944            &*MZ_AUDIT_EVENTS,
1945            Row::pack_slice(&[
1946                Datum::UInt64(id),
1947                Datum::String(&format!("{}", event_type)),
1948                Datum::String(&format!("{}", object_type)),
1949                details,
1950                match user {
1951                    Some(user) => Datum::String(user),
1952                    None => Datum::Null,
1953                },
1954                Datum::TimestampTz(dt.try_into().expect("must fit")),
1955            ]),
1956            diff,
1957        ))
1958    }
1959
1960    pub fn pack_storage_usage_update(
1961        &self,
1962        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1963        diff: Diff,
1964    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1965        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1966        let row = Row::pack_slice(&[
1967            Datum::UInt64(event.id),
1968            Datum::from(event.shard_id.as_deref()),
1969            Datum::UInt64(event.size_bytes),
1970            Datum::TimestampTz(
1971                mz_ore::now::to_datetime(event.collection_timestamp)
1972                    .try_into()
1973                    .expect("must fit"),
1974            ),
1975        ]);
1976        BuiltinTableUpdate::row(id, row, diff)
1977    }
1978
1979    pub fn pack_egress_ip_update(
1980        &self,
1981        ip: &IpNet,
1982    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1983        let id = &MZ_EGRESS_IPS;
1984        let addr = ip.addr();
1985        let row = Row::pack_slice(&[
1986            Datum::String(&addr.to_string()),
1987            Datum::Int32(ip.prefix_len().into()),
1988            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1989        ]);
1990        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1991    }
1992
1993    pub fn pack_license_key_update(
1994        &self,
1995        license_key: &ValidatedLicenseKey,
1996    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1997        let id = &MZ_LICENSE_KEYS;
1998        let row = Row::pack_slice(&[
1999            Datum::String(&license_key.id),
2000            Datum::String(&license_key.organization),
2001            Datum::String(&license_key.environment_id),
2002            Datum::TimestampTz(
2003                mz_ore::now::to_datetime(license_key.expiration * 1000)
2004                    .try_into()
2005                    .expect("must fit"),
2006            ),
2007            Datum::TimestampTz(
2008                mz_ore::now::to_datetime(license_key.not_before * 1000)
2009                    .try_into()
2010                    .expect("must fit"),
2011            ),
2012        ]);
2013        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2014    }
2015
2016    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2017        let mut updates = Vec::new();
2018        for (size, alloc) in &self.cluster_replica_sizes.0 {
2019            if alloc.disabled {
2020                continue;
2021            }
2022
2023            // Just invent something when the limits are `None`, which only happens in non-prod
2024            // environments (tests, process orchestrator, etc.)
2025            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2026            let MemoryLimit(ByteSize(memory_bytes)) =
2027                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2028            let DiskLimit(ByteSize(disk_bytes)) =
2029                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2030
2031            let mut disk_limit = disk_bytes;
2032
2033            // If swap is enabled, the memory limiter configuration might further reduce the disk
2034            // limit.
2035            let dyncfg = self.system_config().dyncfgs();
2036            if alloc.swap_enabled && MEMORY_LIMITER_INTERVAL.get(dyncfg) != Duration::ZERO {
2037                let swap_memory_limit = f64::cast_lossy(memory_bytes)
2038                    * MEMORY_LIMITER_USAGE_FACTOR.get(dyncfg)
2039                    * MEMORY_LIMITER_USAGE_BIAS.get(dyncfg);
2040                let swap_memory_limit = u64::cast_lossy(swap_memory_limit);
2041                let swap_disk_limit = swap_memory_limit.saturating_sub(memory_bytes);
2042                disk_limit = cmp::min(swap_disk_limit, disk_limit);
2043            }
2044
2045            let row = Row::pack_slice(&[
2046                size.as_str().into(),
2047                u64::from(alloc.scale).into(),
2048                u64::cast_from(alloc.workers).into(),
2049                cpu_limit.as_nanocpus().into(),
2050                memory_bytes.into(),
2051                disk_limit.into(),
2052                (alloc.credits_per_hour).into(),
2053            ]);
2054
2055            updates.push(BuiltinTableUpdate::row(
2056                &*MZ_CLUSTER_REPLICA_SIZES,
2057                row,
2058                Diff::ONE,
2059            ));
2060        }
2061
2062        updates
2063    }
2064
2065    pub fn pack_subscribe_update(
2066        &self,
2067        id: GlobalId,
2068        subscribe: &ActiveSubscribe,
2069        diff: Diff,
2070    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2071        let mut row = Row::default();
2072        let mut packer = row.packer();
2073        packer.push(Datum::String(&id.to_string()));
2074        packer.push(Datum::Uuid(subscribe.session_uuid));
2075        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2076
2077        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2078        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2079
2080        let depends_on: Vec<_> = subscribe
2081            .depends_on
2082            .iter()
2083            .map(|id| id.to_string())
2084            .collect();
2085        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2086
2087        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2088    }
2089
2090    pub fn pack_session_update(
2091        &self,
2092        conn: &ConnMeta,
2093        diff: Diff,
2094    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2095        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2096        BuiltinTableUpdate::row(
2097            &*MZ_SESSIONS,
2098            Row::pack_slice(&[
2099                Datum::Uuid(conn.uuid()),
2100                Datum::UInt32(conn.conn_id().unhandled()),
2101                Datum::String(&conn.authenticated_role_id().to_string()),
2102                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2103                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2104            ]),
2105            diff,
2106        )
2107    }
2108
2109    pub fn pack_default_privileges_update(
2110        &self,
2111        default_privilege_object: &DefaultPrivilegeObject,
2112        grantee: &RoleId,
2113        acl_mode: &AclMode,
2114        diff: Diff,
2115    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2116        BuiltinTableUpdate::row(
2117            &*MZ_DEFAULT_PRIVILEGES,
2118            Row::pack_slice(&[
2119                default_privilege_object.role_id.to_string().as_str().into(),
2120                default_privilege_object
2121                    .database_id
2122                    .map(|database_id| database_id.to_string())
2123                    .as_deref()
2124                    .into(),
2125                default_privilege_object
2126                    .schema_id
2127                    .map(|schema_id| schema_id.to_string())
2128                    .as_deref()
2129                    .into(),
2130                default_privilege_object
2131                    .object_type
2132                    .to_string()
2133                    .to_lowercase()
2134                    .as_str()
2135                    .into(),
2136                grantee.to_string().as_str().into(),
2137                acl_mode.to_string().as_str().into(),
2138            ]),
2139            diff,
2140        )
2141    }
2142
2143    pub fn pack_system_privileges_update(
2144        &self,
2145        privileges: MzAclItem,
2146        diff: Diff,
2147    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2148        BuiltinTableUpdate::row(
2149            &*MZ_SYSTEM_PRIVILEGES,
2150            Row::pack_slice(&[privileges.into()]),
2151            diff,
2152        )
2153    }
2154
2155    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2156        let mut row = Row::default();
2157        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2158        row.packer()
2159            .try_push_array(
2160                &[ArrayDimension {
2161                    lower_bound: 1,
2162                    length: flat_privileges.len(),
2163                }],
2164                flat_privileges
2165                    .into_iter()
2166                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2167            )
2168            .expect("privileges is 1 dimensional, and its length is used for the array length");
2169        row
2170    }
2171
2172    pub fn pack_comment_update(
2173        &self,
2174        object_id: CommentObjectId,
2175        column_pos: Option<usize>,
2176        comment: &str,
2177        diff: Diff,
2178    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2179        // Use the audit log representation so it's easier to join against.
2180        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2181        let audit_type = super::object_type_to_audit_object_type(object_type);
2182        let object_type_str = audit_type.to_string();
2183
2184        let object_id_str = match object_id {
2185            CommentObjectId::Table(global_id)
2186            | CommentObjectId::View(global_id)
2187            | CommentObjectId::MaterializedView(global_id)
2188            | CommentObjectId::Source(global_id)
2189            | CommentObjectId::Sink(global_id)
2190            | CommentObjectId::Index(global_id)
2191            | CommentObjectId::Func(global_id)
2192            | CommentObjectId::Connection(global_id)
2193            | CommentObjectId::Secret(global_id)
2194            | CommentObjectId::Type(global_id)
2195            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2196            CommentObjectId::Role(role_id) => role_id.to_string(),
2197            CommentObjectId::Database(database_id) => database_id.to_string(),
2198            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2199            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2200            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2201            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2202        };
2203        let column_pos_datum = match column_pos {
2204            Some(pos) => {
2205                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2206                let pos =
2207                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2208                Datum::Int32(pos)
2209            }
2210            None => Datum::Null,
2211        };
2212
2213        BuiltinTableUpdate::row(
2214            &*MZ_COMMENTS,
2215            Row::pack_slice(&[
2216                Datum::String(&object_id_str),
2217                Datum::String(&object_type_str),
2218                column_pos_datum,
2219                Datum::String(comment),
2220            ]),
2221            diff,
2222        )
2223    }
2224
2225    pub fn pack_webhook_source_update(
2226        &self,
2227        item_id: CatalogItemId,
2228        diff: Diff,
2229    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2230        let url = self
2231            .try_get_webhook_url(&item_id)
2232            .expect("webhook source should exist");
2233        let url = url.to_string();
2234        let name = &self.get_entry(&item_id).name().item;
2235        let id_str = item_id.to_string();
2236
2237        BuiltinTableUpdate::row(
2238            &*MZ_WEBHOOKS_SOURCES,
2239            Row::pack_slice(&[
2240                Datum::String(&id_str),
2241                Datum::String(name),
2242                Datum::String(&url),
2243            ]),
2244            diff,
2245        )
2246    }
2247
2248    pub fn pack_source_references_update(
2249        &self,
2250        source_references: &SourceReferences,
2251        diff: Diff,
2252    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2253        let source_id = source_references.source_id.to_string();
2254        let updated_at = &source_references.updated_at;
2255        source_references
2256            .references
2257            .iter()
2258            .map(|reference| {
2259                let mut row = Row::default();
2260                let mut packer = row.packer();
2261                packer.extend([
2262                    Datum::String(&source_id),
2263                    reference
2264                        .namespace
2265                        .as_ref()
2266                        .map(|s| Datum::String(s))
2267                        .unwrap_or(Datum::Null),
2268                    Datum::String(&reference.name),
2269                    Datum::TimestampTz(
2270                        mz_ore::now::to_datetime(*updated_at)
2271                            .try_into()
2272                            .expect("must fit"),
2273                    ),
2274                ]);
2275                if reference.columns.len() > 0 {
2276                    packer
2277                        .try_push_array(
2278                            &[ArrayDimension {
2279                                lower_bound: 1,
2280                                length: reference.columns.len(),
2281                            }],
2282                            reference.columns.iter().map(|col| Datum::String(col)),
2283                        )
2284                        .expect(
2285                            "columns is 1 dimensional, and its length is used for the array length",
2286                        );
2287                } else {
2288                    packer.push(Datum::Null);
2289                }
2290
2291                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2292            })
2293            .collect()
2294    }
2295}