1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
16use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{
19    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
20    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
21    MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22    MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23    MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
24    MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
26    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
27    MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
28    MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
29    MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS,
30    MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
31    MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
32    MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
33};
34use mz_catalog::config::AwsPrincipalContext;
35use mz_catalog::durable::SourceReferences;
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38    CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
39    MaterializedView, Sink, Table, TableDataSource, Type, View,
40};
41use mz_controller::clusters::{
42    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
43};
44use mz_controller_types::ClusterId;
45use mz_expr::MirScalarExpr;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_persist_client::batch::ProtoBatch;
51use mz_repr::adt::array::ArrayDimension;
52use mz_repr::adt::interval::Interval;
53use mz_repr::adt::jsonb::Jsonb;
54use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
55use mz_repr::adt::regex;
56use mz_repr::network_policy_id::NetworkPolicyId;
57use mz_repr::refresh_schedule::RefreshEvery;
58use mz_repr::role_id::RoleId;
59use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
60use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
61use mz_sql::catalog::{
62    CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
63    TypeCategory,
64};
65use mz_sql::func::FuncImplCatalogDetails;
66use mz_sql::names::{
67    CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
68};
69use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
70use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
71use mz_sql::session::vars::SessionVars;
72use mz_sql_parser::ast::display::AstDisplay;
73use mz_storage_client::client::TableData;
74use mz_storage_types::connections::KafkaConnection;
75use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
76use mz_storage_types::connections::inline::ReferencedConnection;
77use mz_storage_types::connections::string_or_secret::StringOrSecret;
78use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
79use mz_storage_types::sources::{
80    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
81};
82use smallvec::smallvec;
83
84use crate::active_compute_sink::ActiveSubscribe;
86use crate::catalog::CatalogState;
87use crate::coord::ConnMeta;
88
89#[derive(Debug, Clone)]
91pub struct BuiltinTableUpdate<T = CatalogItemId> {
92    pub id: T,
94    pub data: TableData,
96}
97
98impl<T> BuiltinTableUpdate<T> {
99    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
101        BuiltinTableUpdate {
102            id,
103            data: TableData::Rows(vec![(row, diff)]),
104        }
105    }
106
107    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
108        BuiltinTableUpdate {
109            id,
110            data: TableData::Batches(smallvec![batch]),
111        }
112    }
113}
114
115impl CatalogState {
116    pub fn resolve_builtin_table_updates(
117        &self,
118        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
119    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
120        builtin_table_update
121            .into_iter()
122            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
123            .collect()
124    }
125
126    pub fn resolve_builtin_table_update(
127        &self,
128        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
129    ) -> BuiltinTableUpdate<CatalogItemId> {
130        let id = self.resolve_builtin_table(id);
131        BuiltinTableUpdate { id, data }
132    }
133
134    pub fn pack_depends_update(
135        &self,
136        depender: CatalogItemId,
137        dependee: CatalogItemId,
138        diff: Diff,
139    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
140        let row = Row::pack_slice(&[
141            Datum::String(&depender.to_string()),
142            Datum::String(&dependee.to_string()),
143        ]);
144        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
145    }
146
147    pub(super) fn pack_database_update(
148        &self,
149        database_id: &DatabaseId,
150        diff: Diff,
151    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
152        let database = self.get_database(database_id);
153        let row = self.pack_privilege_array_row(database.privileges());
154        let privileges = row.unpack_first();
155        BuiltinTableUpdate::row(
156            &*MZ_DATABASES,
157            Row::pack_slice(&[
158                Datum::String(&database.id.to_string()),
159                Datum::UInt32(database.oid),
160                Datum::String(database.name()),
161                Datum::String(&database.owner_id.to_string()),
162                privileges,
163            ]),
164            diff,
165        )
166    }
167
168    pub(super) fn pack_schema_update(
169        &self,
170        database_spec: &ResolvedDatabaseSpecifier,
171        schema_id: &SchemaId,
172        diff: Diff,
173    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
174        let (database_id, schema) = match database_spec {
175            ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
176            ResolvedDatabaseSpecifier::Id(id) => (
177                Some(id.to_string()),
178                &self.database_by_id[id].schemas_by_id[schema_id],
179            ),
180        };
181        let row = self.pack_privilege_array_row(schema.privileges());
182        let privileges = row.unpack_first();
183        BuiltinTableUpdate::row(
184            &*MZ_SCHEMAS,
185            Row::pack_slice(&[
186                Datum::String(&schema_id.to_string()),
187                Datum::UInt32(schema.oid),
188                Datum::from(database_id.as_deref()),
189                Datum::String(&schema.name.schema),
190                Datum::String(&schema.owner_id.to_string()),
191                privileges,
192            ]),
193            diff,
194        )
195    }
196
197    pub(super) fn pack_role_auth_update(
198        &self,
199        id: RoleId,
200        diff: Diff,
201    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
202        let role_auth = self.get_role_auth(&id);
203        let role = self.get_role(&id);
204        BuiltinTableUpdate::row(
205            &*MZ_ROLE_AUTH,
206            Row::pack_slice(&[
207                Datum::String(&role_auth.role_id.to_string()),
208                Datum::UInt32(role.oid),
209                match &role_auth.password_hash {
210                    Some(hash) => Datum::String(hash),
211                    None => Datum::Null,
212                },
213                Datum::TimestampTz(
214                    mz_ore::now::to_datetime(role_auth.updated_at)
215                        .try_into()
216                        .expect("must fit"),
217                ),
218            ]),
219            diff,
220        )
221    }
222
223    pub(super) fn pack_role_update(
224        &self,
225        id: RoleId,
226        diff: Diff,
227    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
228        match id {
229            RoleId::Public => vec![],
231            id => {
232                let role = self.get_role(&id);
233                let self_managed_auth_enabled = self
234                    .system_config()
235                    .get(ENABLE_PASSWORD_AUTH.name())
236                    .ok()
237                    .map(|v| v.value() == "on")
238                    .unwrap_or(false);
239
240                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
241
242                let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
262                let matches = regex::Regex::new(cloud_login_regex, true)
263                    .expect("valid regex")
264                    .is_match(&role.name);
265
266                let rolcanlogin = if self_managed_auth_enabled {
267                    role.attributes.login.unwrap_or(false)
268                } else {
269                    builtin_supers.contains(&role.id) || matches
270                };
271
272                let rolsuper = if self_managed_auth_enabled {
273                    Datum::from(role.attributes.superuser.unwrap_or(false))
274                } else if builtin_supers.contains(&role.id) {
275                    Datum::from(true)
276                } else {
277                    Datum::Null
278                };
279
280                let role_update = BuiltinTableUpdate::row(
281                    &*MZ_ROLES,
282                    Row::pack_slice(&[
283                        Datum::String(&role.id.to_string()),
284                        Datum::UInt32(role.oid),
285                        Datum::String(&role.name),
286                        Datum::from(role.attributes.inherit),
287                        Datum::from(rolcanlogin),
288                        rolsuper,
289                    ]),
290                    diff,
291                );
292                let mut updates = vec![role_update];
293
294                let session_vars_reference = SessionVars::new_unchecked(
297                    &mz_build_info::DUMMY_BUILD_INFO,
298                    SYSTEM_USER.clone(),
299                    None,
300                );
301
302                for (name, val) in role.vars() {
303                    let result = session_vars_reference
304                        .inspect(name)
305                        .and_then(|var| var.check(val.borrow()));
306                    let Ok(formatted_val) = result else {
307                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
310                        continue;
311                    };
312
313                    let role_var_update = BuiltinTableUpdate::row(
314                        &*MZ_ROLE_PARAMETERS,
315                        Row::pack_slice(&[
316                            Datum::String(&role.id.to_string()),
317                            Datum::String(name),
318                            Datum::String(&formatted_val),
319                        ]),
320                        diff,
321                    );
322                    updates.push(role_var_update);
323                }
324
325                updates
326            }
327        }
328    }
329
330    pub(super) fn pack_role_members_update(
331        &self,
332        role_id: RoleId,
333        member_id: RoleId,
334        diff: Diff,
335    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
336        let grantor_id = self
337            .get_role(&member_id)
338            .membership
339            .map
340            .get(&role_id)
341            .expect("catalog out of sync");
342        BuiltinTableUpdate::row(
343            &*MZ_ROLE_MEMBERS,
344            Row::pack_slice(&[
345                Datum::String(&role_id.to_string()),
346                Datum::String(&member_id.to_string()),
347                Datum::String(&grantor_id.to_string()),
348            ]),
349            diff,
350        )
351    }
352
353    pub(super) fn pack_cluster_update(
354        &self,
355        name: &str,
356        diff: Diff,
357    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
358        let id = self.clusters_by_name[name];
359        let cluster = &self.clusters_by_id[&id];
360        let row = self.pack_privilege_array_row(cluster.privileges());
361        let privileges = row.unpack_first();
362        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
363            match &cluster.config.variant {
364                ClusterVariant::Managed(config) => (
365                    Some(config.size.as_str()),
366                    Some(self.cluster_replica_size_has_disk(&config.size)),
367                    Some(config.replication_factor),
368                    if config.availability_zones.is_empty() {
369                        None
370                    } else {
371                        Some(config.availability_zones.clone())
372                    },
373                    Some(config.logging.log_logging),
374                    config.logging.interval.map(|d| {
375                        Interval::from_duration(&d)
376                            .expect("planning ensured this convertible back to interval")
377                    }),
378                ),
379                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
380            };
381
382        let mut row = Row::default();
383        let mut packer = row.packer();
384        packer.extend([
385            Datum::String(&id.to_string()),
386            Datum::String(name),
387            Datum::String(&cluster.owner_id.to_string()),
388            privileges,
389            cluster.is_managed().into(),
390            size.into(),
391            replication_factor.into(),
392            disk.into(),
393        ]);
394        if let Some(azs) = azs {
395            packer.push_list(azs.iter().map(|az| Datum::String(az)));
396        } else {
397            packer.push(Datum::Null);
398        }
399        packer.push(Datum::from(introspection_debugging));
400        packer.push(Datum::from(introspection_interval));
401
402        let mut updates = Vec::new();
403
404        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
405
406        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
407            let row = match managed_config.schedule {
408                ClusterSchedule::Manual => Row::pack_slice(&[
409                    Datum::String(&id.to_string()),
410                    Datum::String("manual"),
411                    Datum::Null,
412                ]),
413                ClusterSchedule::Refresh {
414                    hydration_time_estimate,
415                } => Row::pack_slice(&[
416                    Datum::String(&id.to_string()),
417                    Datum::String("on-refresh"),
418                    Datum::Interval(
419                        Interval::from_duration(&hydration_time_estimate)
420                            .expect("planning ensured that this is convertible back to Interval"),
421                    ),
422                ]),
423            };
424            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
425        }
426
427        updates.push(BuiltinTableUpdate::row(
428            &*MZ_CLUSTER_WORKLOAD_CLASSES,
429            Row::pack_slice(&[
430                Datum::String(&id.to_string()),
431                Datum::from(cluster.config.workload_class.as_deref()),
432            ]),
433            diff,
434        ));
435
436        updates
437    }
438
439    pub(super) fn pack_cluster_replica_update(
440        &self,
441        cluster_id: ClusterId,
442        name: &str,
443        diff: Diff,
444    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
445        let cluster = &self.clusters_by_id[&cluster_id];
446        let id = cluster.replica_id(name).expect("Must exist");
447        let replica = cluster.replica(id).expect("Must exist");
448
449        let (size, disk, az, internal, pending) = match &replica.config.location {
450            ReplicaLocation::Managed(ManagedReplicaLocation {
453                size,
454                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
455                allocation: _,
456                billed_as: _,
457                internal,
458                pending,
459            }) => (
460                Some(&**size),
461                Some(self.cluster_replica_size_has_disk(size)),
462                Some(az.as_str()),
463                *internal,
464                *pending,
465            ),
466            ReplicaLocation::Managed(ManagedReplicaLocation {
467                size,
468                availability_zones: _,
469                allocation: _,
470                billed_as: _,
471                internal,
472                pending,
473            }) => (
474                Some(&**size),
475                Some(self.cluster_replica_size_has_disk(size)),
476                None,
477                *internal,
478                *pending,
479            ),
480            _ => (None, None, None, false, false),
481        };
482
483        let cluster_replica_update = BuiltinTableUpdate::row(
484            &*MZ_CLUSTER_REPLICAS,
485            Row::pack_slice(&[
486                Datum::String(&id.to_string()),
487                Datum::String(name),
488                Datum::String(&cluster_id.to_string()),
489                Datum::from(size),
490                Datum::from(az),
491                Datum::String(&replica.owner_id.to_string()),
492                Datum::from(disk),
493            ]),
494            diff,
495        );
496
497        let mut updates = vec![cluster_replica_update];
498
499        if internal {
500            let update = BuiltinTableUpdate::row(
501                &*MZ_INTERNAL_CLUSTER_REPLICAS,
502                Row::pack_slice(&[Datum::String(&id.to_string())]),
503                diff,
504            );
505            updates.push(update);
506        }
507
508        if pending {
509            let update = BuiltinTableUpdate::row(
510                &*MZ_PENDING_CLUSTER_REPLICAS,
511                Row::pack_slice(&[Datum::String(&id.to_string())]),
512                diff,
513            );
514            updates.push(update);
515        }
516
517        updates
518    }
519
520    pub(crate) fn pack_network_policy_update(
521        &self,
522        policy_id: &NetworkPolicyId,
523        diff: Diff,
524    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
525        let policy = self.get_network_policy(policy_id);
526        let row = self.pack_privilege_array_row(&policy.privileges);
527        let privileges = row.unpack_first();
528        let mut updates = Vec::new();
529        for ref rule in policy.rules.clone() {
530            updates.push(BuiltinTableUpdate::row(
531                &*MZ_NETWORK_POLICY_RULES,
532                Row::pack_slice(&[
533                    Datum::String(&rule.name),
534                    Datum::String(&policy.id.to_string()),
535                    Datum::String(&rule.action.to_string()),
536                    Datum::String(&rule.address.to_string()),
537                    Datum::String(&rule.direction.to_string()),
538                ]),
539                diff,
540            ));
541        }
542        updates.push(BuiltinTableUpdate::row(
543            &*MZ_NETWORK_POLICIES,
544            Row::pack_slice(&[
545                Datum::String(&policy.id.to_string()),
546                Datum::String(&policy.name),
547                Datum::String(&policy.owner_id.to_string()),
548                privileges,
549                Datum::UInt32(policy.oid.clone()),
550            ]),
551            diff,
552        ));
553
554        Ok(updates)
555    }
556
557    pub(super) fn pack_item_update(
558        &self,
559        id: CatalogItemId,
560        diff: Diff,
561    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
562        let entry = self.get_entry(&id);
563        let oid = entry.oid();
564        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
565        let schema_id = &self
566            .get_schema(
567                &entry.name().qualifiers.database_spec,
568                &entry.name().qualifiers.schema_spec,
569                conn_id,
570            )
571            .id;
572        let name = &entry.name().item;
573        let owner_id = entry.owner_id();
574        let privileges_row = self.pack_privilege_array_row(entry.privileges());
575        let privileges = privileges_row.unpack_first();
576        let mut updates = match entry.item() {
577            CatalogItem::Log(_) => self.pack_source_update(
578                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
579                privileges, diff, None,
580            ),
581            CatalogItem::Index(index) => {
582                self.pack_index_update(id, oid, name, owner_id, index, diff)
583            }
584            CatalogItem::Table(table) => {
585                let mut updates = self
586                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
587
588                if let TableDataSource::DataSource {
589                    desc: data_source,
590                    timeline: _,
591                } = &table.data_source
592                {
593                    updates.extend(match data_source {
594                        DataSourceDesc::IngestionExport {
595                            ingestion_id,
596                            external_reference: UnresolvedItemName(external_reference),
597                            details: _,
598                            data_config: _,
599                        } => {
600                            let ingestion_entry = self
601                                .get_entry(ingestion_id)
602                                .source_desc()
603                                .expect("primary source exists")
604                                .expect("primary source is a source");
605
606                            match ingestion_entry.connection.name() {
607                                "postgres" => {
608                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
609                                    let schema_name = external_reference[1].to_ast_string_simple();
615                                    let table_name = external_reference[2].to_ast_string_simple();
616
617                                    self.pack_postgres_source_tables_update(
618                                        id,
619                                        &schema_name,
620                                        &table_name,
621                                        diff,
622                                    )
623                                }
624                                "mysql" => {
625                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
626                                    let schema_name = external_reference[0].to_ast_string_simple();
627                                    let table_name = external_reference[1].to_ast_string_simple();
628
629                                    self.pack_mysql_source_tables_update(
630                                        id,
631                                        &schema_name,
632                                        &table_name,
633                                        diff,
634                                    )
635                                }
636                                "sql-server" => {
637                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
638                                    let schema_name = external_reference[1].to_ast_string_simple();
643                                    let table_name = external_reference[2].to_ast_string_simple();
644
645                                    self.pack_sql_server_source_table_update(
646                                        id,
647                                        &schema_name,
648                                        &table_name,
649                                        diff,
650                                    )
651                                }
652                                "load-generator" => vec![],
655                                "kafka" => {
656                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
657                                    let topic = external_reference[0].to_ast_string_simple();
658                                    let envelope = data_source.envelope();
659                                    let (key_format, value_format) = data_source.formats();
660
661                                    self.pack_kafka_source_tables_update(
662                                        id,
663                                        &topic,
664                                        envelope,
665                                        key_format,
666                                        value_format,
667                                        diff,
668                                    )
669                                }
670                                s => unreachable!("{s} sources do not have tables"),
671                            }
672                        }
673                        _ => vec![],
674                    });
675                }
676
677                updates
678            }
679            CatalogItem::Source(source) => {
680                let source_type = source.source_type();
681                let connection_id = source.connection_id();
682                let envelope = source.data_source.envelope();
683                let cluster_entry = match source.data_source {
684                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
687                        self.get_entry(&ingestion_id)
688                    }
689                    _ => entry,
690                };
691
692                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
693
694                let (key_format, value_format) = source.data_source.formats();
695
696                let mut updates = self.pack_source_update(
697                    id,
698                    oid,
699                    schema_id,
700                    name,
701                    source_type,
702                    connection_id,
703                    envelope,
704                    key_format,
705                    value_format,
706                    cluster_id.as_deref(),
707                    owner_id,
708                    privileges,
709                    diff,
710                    source.create_sql.as_ref(),
711                );
712
713                updates.extend(match &source.data_source {
714                    DataSourceDesc::Ingestion { desc, .. }
715                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
716                        GenericSourceConnection::Postgres(postgres) => {
717                            self.pack_postgres_source_update(id, postgres, diff)
718                        }
719                        GenericSourceConnection::Kafka(kafka) => {
720                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
721                        }
722                        _ => vec![],
723                    },
724                    DataSourceDesc::IngestionExport {
725                        ingestion_id,
726                        external_reference: UnresolvedItemName(external_reference),
727                        details: _,
728                        data_config: _,
729                    } => {
730                        let ingestion_entry = self
731                            .get_entry(ingestion_id)
732                            .source_desc()
733                            .expect("primary source exists")
734                            .expect("primary source is a source");
735
736                        match ingestion_entry.connection.name() {
737                            "postgres" => {
738                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
739                                let schema_name = external_reference[1].to_ast_string_simple();
745                                let table_name = external_reference[2].to_ast_string_simple();
746
747                                self.pack_postgres_source_tables_update(
748                                    id,
749                                    &schema_name,
750                                    &table_name,
751                                    diff,
752                                )
753                            }
754                            "mysql" => {
755                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
756                                let schema_name = external_reference[0].to_ast_string_simple();
757                                let table_name = external_reference[1].to_ast_string_simple();
758
759                                self.pack_mysql_source_tables_update(
760                                    id,
761                                    &schema_name,
762                                    &table_name,
763                                    diff,
764                                )
765                            }
766                            "sql-server" => {
767                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
768                                let schema_name = external_reference[1].to_ast_string_simple();
773                                let table_name = external_reference[2].to_ast_string_simple();
774
775                                self.pack_sql_server_source_table_update(
776                                    id,
777                                    &schema_name,
778                                    &table_name,
779                                    diff,
780                                )
781                            }
782                            "load-generator" => vec![],
785                            s => unreachable!("{s} sources do not have subsources"),
786                        }
787                    }
788                    DataSourceDesc::Webhook { .. } => {
789                        vec![self.pack_webhook_source_update(id, diff)]
790                    }
791                    _ => vec![],
792                });
793
794                updates
795            }
796            CatalogItem::View(view) => {
797                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
798            }
799            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
800                id, oid, schema_id, name, owner_id, privileges, mview, diff,
801            ),
802            CatalogItem::Sink(sink) => {
803                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
804            }
805            CatalogItem::Type(ty) => {
806                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
807            }
808            CatalogItem::Func(func) => {
809                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
810            }
811            CatalogItem::Secret(_) => {
812                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
813            }
814            CatalogItem::Connection(connection) => self.pack_connection_update(
815                id, oid, schema_id, name, owner_id, privileges, connection, diff,
816            ),
817            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
818                id, oid, schema_id, name, owner_id, privileges, ct, diff,
819            ),
820        };
821
822        if !entry.item().is_temporary() {
823            for dependee in entry.item().references().items() {
826                updates.push(self.pack_depends_update(id, *dependee, diff))
827            }
828        }
829
830        let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
831        if let Ok(desc) = entry.desc_latest(&full_name) {
833            let defaults = match entry.item() {
834                CatalogItem::Table(Table {
835                    data_source: TableDataSource::TableWrites { defaults },
836                    ..
837                }) => Some(defaults),
838                _ => None,
839            };
840            for (i, (column_name, column_type)) in desc.iter().enumerate() {
841                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
842                let default: Datum = default
843                    .as_ref()
844                    .map(|d| Datum::String(d))
845                    .unwrap_or(Datum::Null);
846                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
847                let (type_name, type_oid) = match &column_type.scalar_type {
848                    SqlScalarType::List {
849                        custom_id: Some(custom_id),
850                        ..
851                    }
852                    | SqlScalarType::Map {
853                        custom_id: Some(custom_id),
854                        ..
855                    }
856                    | SqlScalarType::Record {
857                        custom_id: Some(custom_id),
858                        ..
859                    } => {
860                        let entry = self.get_entry(custom_id);
861                        let name = &*entry.name().item;
876                        let oid = entry.oid();
877                        (name, oid)
878                    }
879                    _ => (pgtype.name(), pgtype.oid()),
880                };
881                updates.push(BuiltinTableUpdate::row(
882                    &*MZ_COLUMNS,
883                    Row::pack_slice(&[
884                        Datum::String(&id.to_string()),
885                        Datum::String(column_name),
886                        Datum::UInt64(u64::cast_from(i + 1)),
887                        Datum::from(column_type.nullable),
888                        Datum::String(type_name),
889                        default,
890                        Datum::UInt32(type_oid),
891                        Datum::Int32(pgtype.typmod()),
892                    ]),
893                    diff,
894                ));
895            }
896        }
897
898        if let Some(cw) = entry.item().initial_logical_compaction_window() {
900            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
901        }
902
903        updates
904    }
905
906    fn pack_history_retention_strategy_update(
907        &self,
908        id: CatalogItemId,
909        cw: CompactionWindow,
910        diff: Diff,
911    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
912        let cw: u64 = cw.comparable_timestamp().into();
913        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
914            .expect("must serialize");
915        BuiltinTableUpdate::row(
916            &*MZ_HISTORY_RETENTION_STRATEGIES,
917            Row::pack_slice(&[
918                Datum::String(&id.to_string()),
919                Datum::String("FOR"),
921                cw.into_row().into_element(),
922            ]),
923            diff,
924        )
925    }
926
927    fn pack_table_update(
928        &self,
929        id: CatalogItemId,
930        oid: u32,
931        schema_id: &SchemaSpecifier,
932        name: &str,
933        owner_id: &RoleId,
934        privileges: Datum,
935        diff: Diff,
936        table: &Table,
937    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
938        let redacted = table.create_sql.as_ref().map(|create_sql| {
939            mz_sql::parse::parse(create_sql)
940                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
941                .into_element()
942                .ast
943                .to_ast_string_redacted()
944        });
945        let source_id = if let TableDataSource::DataSource {
946            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
947            ..
948        } = &table.data_source
949        {
950            Some(ingestion_id.to_string())
951        } else {
952            None
953        };
954
955        vec![BuiltinTableUpdate::row(
956            &*MZ_TABLES,
957            Row::pack_slice(&[
958                Datum::String(&id.to_string()),
959                Datum::UInt32(oid),
960                Datum::String(&schema_id.to_string()),
961                Datum::String(name),
962                Datum::String(&owner_id.to_string()),
963                privileges,
964                if let Some(create_sql) = &table.create_sql {
965                    Datum::String(create_sql)
966                } else {
967                    Datum::Null
968                },
969                if let Some(redacted) = &redacted {
970                    Datum::String(redacted)
971                } else {
972                    Datum::Null
973                },
974                if let Some(source_id) = source_id.as_ref() {
975                    Datum::String(source_id)
976                } else {
977                    Datum::Null
978                },
979            ]),
980            diff,
981        )]
982    }
983
984    fn pack_source_update(
985        &self,
986        id: CatalogItemId,
987        oid: u32,
988        schema_id: &SchemaSpecifier,
989        name: &str,
990        source_desc_name: &str,
991        connection_id: Option<CatalogItemId>,
992        envelope: Option<&str>,
993        key_format: Option<&str>,
994        value_format: Option<&str>,
995        cluster_id: Option<&str>,
996        owner_id: &RoleId,
997        privileges: Datum,
998        diff: Diff,
999        create_sql: Option<&String>,
1000    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1001        let redacted = create_sql.map(|create_sql| {
1002            let create_stmt = mz_sql::parse::parse(create_sql)
1003                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1004                .into_element()
1005                .ast;
1006            create_stmt.to_ast_string_redacted()
1007        });
1008        vec![BuiltinTableUpdate::row(
1009            &*MZ_SOURCES,
1010            Row::pack_slice(&[
1011                Datum::String(&id.to_string()),
1012                Datum::UInt32(oid),
1013                Datum::String(&schema_id.to_string()),
1014                Datum::String(name),
1015                Datum::String(source_desc_name),
1016                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1017                Datum::Null,
1020                Datum::from(envelope),
1021                Datum::from(key_format),
1022                Datum::from(value_format),
1023                Datum::from(cluster_id),
1024                Datum::String(&owner_id.to_string()),
1025                privileges,
1026                if let Some(create_sql) = create_sql {
1027                    Datum::String(create_sql)
1028                } else {
1029                    Datum::Null
1030                },
1031                if let Some(redacted) = &redacted {
1032                    Datum::String(redacted)
1033                } else {
1034                    Datum::Null
1035                },
1036            ]),
1037            diff,
1038        )]
1039    }
1040
1041    fn pack_postgres_source_update(
1042        &self,
1043        id: CatalogItemId,
1044        postgres: &PostgresSourceConnection<ReferencedConnection>,
1045        diff: Diff,
1046    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1047        vec![BuiltinTableUpdate::row(
1048            &*MZ_POSTGRES_SOURCES,
1049            Row::pack_slice(&[
1050                Datum::String(&id.to_string()),
1051                Datum::String(&postgres.publication_details.slot),
1052                Datum::from(postgres.publication_details.timeline_id),
1053            ]),
1054            diff,
1055        )]
1056    }
1057
1058    fn pack_kafka_source_update(
1059        &self,
1060        item_id: CatalogItemId,
1061        collection_id: GlobalId,
1062        kafka: &KafkaSourceConnection<ReferencedConnection>,
1063        diff: Diff,
1064    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1065        vec![BuiltinTableUpdate::row(
1066            &*MZ_KAFKA_SOURCES,
1067            Row::pack_slice(&[
1068                Datum::String(&item_id.to_string()),
1069                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1070                Datum::String(&kafka.topic),
1071            ]),
1072            diff,
1073        )]
1074    }
1075
1076    fn pack_postgres_source_tables_update(
1077        &self,
1078        id: CatalogItemId,
1079        schema_name: &str,
1080        table_name: &str,
1081        diff: Diff,
1082    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1083        vec![BuiltinTableUpdate::row(
1084            &*MZ_POSTGRES_SOURCE_TABLES,
1085            Row::pack_slice(&[
1086                Datum::String(&id.to_string()),
1087                Datum::String(schema_name),
1088                Datum::String(table_name),
1089            ]),
1090            diff,
1091        )]
1092    }
1093
1094    fn pack_mysql_source_tables_update(
1095        &self,
1096        id: CatalogItemId,
1097        schema_name: &str,
1098        table_name: &str,
1099        diff: Diff,
1100    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1101        vec![BuiltinTableUpdate::row(
1102            &*MZ_MYSQL_SOURCE_TABLES,
1103            Row::pack_slice(&[
1104                Datum::String(&id.to_string()),
1105                Datum::String(schema_name),
1106                Datum::String(table_name),
1107            ]),
1108            diff,
1109        )]
1110    }
1111
1112    fn pack_sql_server_source_table_update(
1113        &self,
1114        id: CatalogItemId,
1115        schema_name: &str,
1116        table_name: &str,
1117        diff: Diff,
1118    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1119        vec![BuiltinTableUpdate::row(
1120            &*MZ_SQL_SERVER_SOURCE_TABLES,
1121            Row::pack_slice(&[
1122                Datum::String(&id.to_string()),
1123                Datum::String(schema_name),
1124                Datum::String(table_name),
1125            ]),
1126            diff,
1127        )]
1128    }
1129
1130    fn pack_kafka_source_tables_update(
1131        &self,
1132        id: CatalogItemId,
1133        topic: &str,
1134        envelope: Option<&str>,
1135        key_format: Option<&str>,
1136        value_format: Option<&str>,
1137        diff: Diff,
1138    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1139        vec![BuiltinTableUpdate::row(
1140            &*MZ_KAFKA_SOURCE_TABLES,
1141            Row::pack_slice(&[
1142                Datum::String(&id.to_string()),
1143                Datum::String(topic),
1144                Datum::from(envelope),
1145                Datum::from(key_format),
1146                Datum::from(value_format),
1147            ]),
1148            diff,
1149        )]
1150    }
1151
1152    fn pack_connection_update(
1153        &self,
1154        id: CatalogItemId,
1155        oid: u32,
1156        schema_id: &SchemaSpecifier,
1157        name: &str,
1158        owner_id: &RoleId,
1159        privileges: Datum,
1160        connection: &Connection,
1161        diff: Diff,
1162    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1163        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1164            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1165            .into_element()
1166            .ast;
1167        let mut updates = vec![BuiltinTableUpdate::row(
1168            &*MZ_CONNECTIONS,
1169            Row::pack_slice(&[
1170                Datum::String(&id.to_string()),
1171                Datum::UInt32(oid),
1172                Datum::String(&schema_id.to_string()),
1173                Datum::String(name),
1174                Datum::String(match connection.details {
1175                    ConnectionDetails::Kafka { .. } => "kafka",
1176                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1177                    ConnectionDetails::Postgres { .. } => "postgres",
1178                    ConnectionDetails::Aws(..) => "aws",
1179                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1180                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1181                    ConnectionDetails::MySql { .. } => "mysql",
1182                    ConnectionDetails::SqlServer(_) => "sql-server",
1183                    ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1184                }),
1185                Datum::String(&owner_id.to_string()),
1186                privileges,
1187                Datum::String(&connection.create_sql),
1188                Datum::String(&create_stmt.to_ast_string_redacted()),
1189            ]),
1190            diff,
1191        )];
1192        match connection.details {
1193            ConnectionDetails::Kafka(ref kafka) => {
1194                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1195            }
1196            ConnectionDetails::Aws(ref aws_config) => {
1197                match self.pack_aws_connection_update(id, aws_config, diff) {
1198                    Ok(update) => {
1199                        updates.push(update);
1200                    }
1201                    Err(e) => {
1202                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1203                    }
1204                }
1205            }
1206            ConnectionDetails::AwsPrivatelink(_) => {
1207                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1208                    updates.push(self.pack_aws_privatelink_connection_update(
1209                        id,
1210                        aws_principal_context,
1211                        diff,
1212                    ));
1213                } else {
1214                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1215                }
1216            }
1217            ConnectionDetails::Ssh {
1218                ref key_1,
1219                ref key_2,
1220                ..
1221            } => {
1222                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1223            }
1224            ConnectionDetails::Csr(_)
1225            | ConnectionDetails::Postgres(_)
1226            | ConnectionDetails::MySql(_)
1227            | ConnectionDetails::SqlServer(_)
1228            | ConnectionDetails::IcebergCatalog(_) => (),
1229        };
1230        updates
1231    }
1232
1233    pub(crate) fn pack_ssh_tunnel_connection_update(
1234        &self,
1235        id: CatalogItemId,
1236        key_1: &SshKey,
1237        key_2: &SshKey,
1238        diff: Diff,
1239    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1240        BuiltinTableUpdate::row(
1241            &*MZ_SSH_TUNNEL_CONNECTIONS,
1242            Row::pack_slice(&[
1243                Datum::String(&id.to_string()),
1244                Datum::String(key_1.public_key().as_str()),
1245                Datum::String(key_2.public_key().as_str()),
1246            ]),
1247            diff,
1248        )
1249    }
1250
1251    fn pack_kafka_connection_update(
1252        &self,
1253        id: CatalogItemId,
1254        kafka: &KafkaConnection<ReferencedConnection>,
1255        diff: Diff,
1256    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1257        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1258        let mut row = Row::default();
1259        row.packer()
1260            .try_push_array(
1261                &[ArrayDimension {
1262                    lower_bound: 1,
1263                    length: kafka.brokers.len(),
1264                }],
1265                kafka
1266                    .brokers
1267                    .iter()
1268                    .map(|broker| Datum::String(&broker.address)),
1269            )
1270            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1271        let brokers = row.unpack_first();
1272        vec![BuiltinTableUpdate::row(
1273            &*MZ_KAFKA_CONNECTIONS,
1274            Row::pack_slice(&[
1275                Datum::String(&id.to_string()),
1276                brokers,
1277                Datum::String(&progress_topic),
1278            ]),
1279            diff,
1280        )]
1281    }
1282
1283    pub fn pack_aws_privatelink_connection_update(
1284        &self,
1285        connection_id: CatalogItemId,
1286        aws_principal_context: &AwsPrincipalContext,
1287        diff: Diff,
1288    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1289        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1290        let row = Row::pack_slice(&[
1291            Datum::String(&connection_id.to_string()),
1292            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1293        ]);
1294        BuiltinTableUpdate::row(id, row, diff)
1295    }
1296
1297    pub fn pack_aws_connection_update(
1298        &self,
1299        connection_id: CatalogItemId,
1300        aws_config: &AwsConnection,
1301        diff: Diff,
1302    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1303        let id = &MZ_AWS_CONNECTIONS;
1304
1305        let mut access_key_id = None;
1306        let mut access_key_id_secret_id = None;
1307        let mut secret_access_key_secret_id = None;
1308        let mut session_token = None;
1309        let mut session_token_secret_id = None;
1310        let mut assume_role_arn = None;
1311        let mut assume_role_session_name = None;
1312        let mut principal = None;
1313        let mut external_id = None;
1314        let mut example_trust_policy = None;
1315        match &aws_config.auth {
1316            AwsAuth::Credentials(credentials) => {
1317                match &credentials.access_key_id {
1318                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1319                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1320                }
1321                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1322                match credentials.session_token.as_ref() {
1323                    None => (),
1324                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1325                    Some(StringOrSecret::Secret(s)) => {
1326                        session_token_secret_id = Some(s.to_string())
1327                    }
1328                }
1329            }
1330            AwsAuth::AssumeRole(assume_role) => {
1331                assume_role_arn = Some(assume_role.arn.as_str());
1332                assume_role_session_name = assume_role.session_name.as_deref();
1333                principal = self
1334                    .config
1335                    .connection_context
1336                    .aws_connection_role_arn
1337                    .as_deref();
1338                external_id =
1339                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1340                example_trust_policy = {
1341                    let policy = assume_role
1342                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1343                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1344                    Some(policy.into_row())
1345                };
1346            }
1347        }
1348
1349        let row = Row::pack_slice(&[
1350            Datum::String(&connection_id.to_string()),
1351            Datum::from(aws_config.endpoint.as_deref()),
1352            Datum::from(aws_config.region.as_deref()),
1353            Datum::from(access_key_id),
1354            Datum::from(access_key_id_secret_id.as_deref()),
1355            Datum::from(secret_access_key_secret_id.as_deref()),
1356            Datum::from(session_token),
1357            Datum::from(session_token_secret_id.as_deref()),
1358            Datum::from(assume_role_arn),
1359            Datum::from(assume_role_session_name),
1360            Datum::from(principal),
1361            Datum::from(external_id.as_deref()),
1362            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1363        ]);
1364
1365        Ok(BuiltinTableUpdate::row(id, row, diff))
1366    }
1367
1368    fn pack_view_update(
1369        &self,
1370        id: CatalogItemId,
1371        oid: u32,
1372        schema_id: &SchemaSpecifier,
1373        name: &str,
1374        owner_id: &RoleId,
1375        privileges: Datum,
1376        view: &View,
1377        diff: Diff,
1378    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1379        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1380            .unwrap_or_else(|e| {
1381                panic!(
1382                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1383                    view.create_sql, e
1384                )
1385            })
1386            .into_element()
1387            .ast;
1388        let query = match &create_stmt {
1389            Statement::CreateView(stmt) => &stmt.definition.query,
1390            _ => unreachable!(),
1391        };
1392
1393        let mut query_string = query.to_ast_string_stable();
1394        query_string.push(';');
1397
1398        vec![BuiltinTableUpdate::row(
1399            &*MZ_VIEWS,
1400            Row::pack_slice(&[
1401                Datum::String(&id.to_string()),
1402                Datum::UInt32(oid),
1403                Datum::String(&schema_id.to_string()),
1404                Datum::String(name),
1405                Datum::String(&query_string),
1406                Datum::String(&owner_id.to_string()),
1407                privileges,
1408                Datum::String(&view.create_sql),
1409                Datum::String(&create_stmt.to_ast_string_redacted()),
1410            ]),
1411            diff,
1412        )]
1413    }
1414
1415    fn pack_materialized_view_update(
1416        &self,
1417        id: CatalogItemId,
1418        oid: u32,
1419        schema_id: &SchemaSpecifier,
1420        name: &str,
1421        owner_id: &RoleId,
1422        privileges: Datum,
1423        mview: &MaterializedView,
1424        diff: Diff,
1425    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1426        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1427            .unwrap_or_else(|e| {
1428                panic!(
1429                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1430                    mview.create_sql, e
1431                )
1432            })
1433            .into_element()
1434            .ast;
1435        let query_string = match &create_stmt {
1436            Statement::CreateMaterializedView(stmt) => {
1437                let mut query_string = stmt.query.to_ast_string_stable();
1438                query_string.push(';');
1441                query_string
1442            }
1443            _ => unreachable!(),
1444        };
1445
1446        let mut updates = Vec::new();
1447
1448        updates.push(BuiltinTableUpdate::row(
1449            &*MZ_MATERIALIZED_VIEWS,
1450            Row::pack_slice(&[
1451                Datum::String(&id.to_string()),
1452                Datum::UInt32(oid),
1453                Datum::String(&schema_id.to_string()),
1454                Datum::String(name),
1455                Datum::String(&mview.cluster_id.to_string()),
1456                Datum::String(&query_string),
1457                Datum::String(&owner_id.to_string()),
1458                privileges,
1459                Datum::String(&mview.create_sql),
1460                Datum::String(&create_stmt.to_ast_string_redacted()),
1461            ]),
1462            diff,
1463        ));
1464
1465        if let Some(refresh_schedule) = &mview.refresh_schedule {
1466            assert!(!refresh_schedule.is_empty());
1469            for RefreshEvery {
1470                interval,
1471                aligned_to,
1472            } in refresh_schedule.everies.iter()
1473            {
1474                let aligned_to_dt = mz_ore::now::to_datetime(
1475                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1476                );
1477                updates.push(BuiltinTableUpdate::row(
1478                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1479                    Row::pack_slice(&[
1480                        Datum::String(&id.to_string()),
1481                        Datum::String("every"),
1482                        Datum::Interval(
1483                            Interval::from_duration(interval).expect(
1484                                "planning ensured that this is convertible back to Interval",
1485                            ),
1486                        ),
1487                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1488                        Datum::Null,
1489                    ]),
1490                    diff,
1491                ));
1492            }
1493            for at in refresh_schedule.ats.iter() {
1494                let at_dt = mz_ore::now::to_datetime(
1495                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1496                );
1497                updates.push(BuiltinTableUpdate::row(
1498                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1499                    Row::pack_slice(&[
1500                        Datum::String(&id.to_string()),
1501                        Datum::String("at"),
1502                        Datum::Null,
1503                        Datum::Null,
1504                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1505                    ]),
1506                    diff,
1507                ));
1508            }
1509        } else {
1510            updates.push(BuiltinTableUpdate::row(
1511                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1512                Row::pack_slice(&[
1513                    Datum::String(&id.to_string()),
1514                    Datum::String("on-commit"),
1515                    Datum::Null,
1516                    Datum::Null,
1517                    Datum::Null,
1518                ]),
1519                diff,
1520            ));
1521        }
1522
1523        updates
1524    }
1525
1526    fn pack_continual_task_update(
1527        &self,
1528        id: CatalogItemId,
1529        oid: u32,
1530        schema_id: &SchemaSpecifier,
1531        name: &str,
1532        owner_id: &RoleId,
1533        privileges: Datum,
1534        ct: &ContinualTask,
1535        diff: Diff,
1536    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1537        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1538            .unwrap_or_else(|e| {
1539                panic!(
1540                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1541                    ct.create_sql, e
1542                )
1543            })
1544            .into_element()
1545            .ast;
1546        let query_string = match &create_stmt {
1547            Statement::CreateContinualTask(stmt) => {
1548                let mut query_string = String::new();
1549                for stmt in &stmt.stmts {
1550                    let s = match stmt {
1551                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1552                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1553                    };
1554                    if query_string.is_empty() {
1555                        query_string = s;
1556                    } else {
1557                        query_string.push_str("; ");
1558                        query_string.push_str(&s);
1559                    }
1560                }
1561                query_string
1562            }
1563            _ => unreachable!(),
1564        };
1565
1566        vec![BuiltinTableUpdate::row(
1567            &*MZ_CONTINUAL_TASKS,
1568            Row::pack_slice(&[
1569                Datum::String(&id.to_string()),
1570                Datum::UInt32(oid),
1571                Datum::String(&schema_id.to_string()),
1572                Datum::String(name),
1573                Datum::String(&ct.cluster_id.to_string()),
1574                Datum::String(&query_string),
1575                Datum::String(&owner_id.to_string()),
1576                privileges,
1577                Datum::String(&ct.create_sql),
1578                Datum::String(&create_stmt.to_ast_string_redacted()),
1579            ]),
1580            diff,
1581        )]
1582    }
1583
1584    fn pack_sink_update(
1585        &self,
1586        id: CatalogItemId,
1587        oid: u32,
1588        schema_id: &SchemaSpecifier,
1589        name: &str,
1590        owner_id: &RoleId,
1591        sink: &Sink,
1592        diff: Diff,
1593    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1594        let mut updates = vec![];
1595        match &sink.connection {
1596            StorageSinkConnection::Kafka(KafkaSinkConnection {
1597                topic: topic_name, ..
1598            }) => {
1599                updates.push(BuiltinTableUpdate::row(
1600                    &*MZ_KAFKA_SINKS,
1601                    Row::pack_slice(&[
1602                        Datum::String(&id.to_string()),
1603                        Datum::String(topic_name.as_str()),
1604                    ]),
1605                    diff,
1606                ));
1607            }
1608            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1609                namespace, table, ..
1610            }) => {
1611                updates.push(BuiltinTableUpdate::row(
1612                    &*MZ_ICEBERG_SINKS,
1613                    Row::pack_slice(&[
1614                        Datum::String(&id.to_string()),
1615                        Datum::String(namespace.as_str()),
1616                        Datum::String(table.as_str()),
1617                    ]),
1618                    diff,
1619                ));
1620            }
1621        };
1622
1623        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1624            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1625            .into_element()
1626            .ast;
1627
1628        let envelope = sink.envelope();
1629
1630        let combined_format = sink.combined_format();
1632        let (key_format, value_format) = match sink.formats() {
1633            Some((key_format, value_format)) => (key_format, Some(value_format)),
1634            None => (None, None),
1635        };
1636
1637        updates.push(BuiltinTableUpdate::row(
1638            &*MZ_SINKS,
1639            Row::pack_slice(&[
1640                Datum::String(&id.to_string()),
1641                Datum::UInt32(oid),
1642                Datum::String(&schema_id.to_string()),
1643                Datum::String(name),
1644                Datum::String(sink.connection.name()),
1645                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1646                Datum::Null,
1648                Datum::from(envelope),
1649                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1652                Datum::from(key_format),
1653                Datum::from(value_format),
1654                Datum::String(&sink.cluster_id.to_string()),
1655                Datum::String(&owner_id.to_string()),
1656                Datum::String(&sink.create_sql),
1657                Datum::String(&create_stmt.to_ast_string_redacted()),
1658            ]),
1659            diff,
1660        ));
1661
1662        updates
1663    }
1664
1665    fn pack_index_update(
1666        &self,
1667        id: CatalogItemId,
1668        oid: u32,
1669        name: &str,
1670        owner_id: &RoleId,
1671        index: &Index,
1672        diff: Diff,
1673    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1674        let mut updates = vec![];
1675
1676        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1677            .unwrap_or_else(|e| {
1678                panic!(
1679                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1680                    index.create_sql, e
1681                )
1682            })
1683            .into_element()
1684            .ast;
1685
1686        let key_sqls = match &create_stmt {
1687            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1688                .as_ref()
1689                .expect("key_parts is filled in during planning"),
1690            _ => unreachable!(),
1691        };
1692        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1693
1694        updates.push(BuiltinTableUpdate::row(
1695            &*MZ_INDEXES,
1696            Row::pack_slice(&[
1697                Datum::String(&id.to_string()),
1698                Datum::UInt32(oid),
1699                Datum::String(name),
1700                Datum::String(&on_item_id.to_string()),
1701                Datum::String(&index.cluster_id.to_string()),
1702                Datum::String(&owner_id.to_string()),
1703                Datum::String(&index.create_sql),
1704                Datum::String(&create_stmt.to_ast_string_redacted()),
1705            ]),
1706            diff,
1707        ));
1708
1709        for (i, key) in index.keys.iter().enumerate() {
1710            let on_entry = self.get_entry_by_global_id(&index.on);
1711            let nullable = key
1712                .typ(
1713                    &on_entry
1714                        .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1715                        .expect("can only create indexes on items with a valid description")
1716                        .typ()
1717                        .column_types,
1718                )
1719                .nullable;
1720            let seq_in_index = u64::cast_from(i + 1);
1721            let key_sql = key_sqls
1722                .get(i)
1723                .expect("missing sql information for index key")
1724                .to_ast_string_simple();
1725            let (field_number, expression) = match key {
1726                MirScalarExpr::Column(col, _) => {
1727                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1728                }
1729                _ => (Datum::Null, Datum::String(&key_sql)),
1730            };
1731            updates.push(BuiltinTableUpdate::row(
1732                &*MZ_INDEX_COLUMNS,
1733                Row::pack_slice(&[
1734                    Datum::String(&id.to_string()),
1735                    Datum::UInt64(seq_in_index),
1736                    field_number,
1737                    expression,
1738                    Datum::from(nullable),
1739                ]),
1740                diff,
1741            ));
1742        }
1743
1744        updates
1745    }
1746
1747    fn pack_type_update(
1748        &self,
1749        id: CatalogItemId,
1750        oid: u32,
1751        schema_id: &SchemaSpecifier,
1752        name: &str,
1753        owner_id: &RoleId,
1754        privileges: Datum,
1755        typ: &Type,
1756        diff: Diff,
1757    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1758        let mut out = vec![];
1759
1760        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1761            mz_sql::parse::parse(create_sql)
1762                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1763                .into_element()
1764                .ast
1765                .to_ast_string_redacted()
1766        });
1767
1768        out.push(BuiltinTableUpdate::row(
1769            &*MZ_TYPES,
1770            Row::pack_slice(&[
1771                Datum::String(&id.to_string()),
1772                Datum::UInt32(oid),
1773                Datum::String(&schema_id.to_string()),
1774                Datum::String(name),
1775                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1776                Datum::String(&owner_id.to_string()),
1777                privileges,
1778                if let Some(create_sql) = &typ.create_sql {
1779                    Datum::String(create_sql)
1780                } else {
1781                    Datum::Null
1782                },
1783                if let Some(redacted) = &redacted {
1784                    Datum::String(redacted)
1785                } else {
1786                    Datum::Null
1787                },
1788            ]),
1789            diff,
1790        ));
1791
1792        let mut row = Row::default();
1793        let mut packer = row.packer();
1794
1795        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1796            if mods.is_empty() {
1797                packer.push(Datum::Null);
1798            } else {
1799                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1800            }
1801        }
1802
1803        let index_id = match &typ.details.typ {
1804            CatalogType::Array {
1805                element_reference: element_id,
1806            } => {
1807                packer.push(Datum::String(&id.to_string()));
1808                packer.push(Datum::String(&element_id.to_string()));
1809                &MZ_ARRAY_TYPES
1810            }
1811            CatalogType::List {
1812                element_reference: element_id,
1813                element_modifiers,
1814            } => {
1815                packer.push(Datum::String(&id.to_string()));
1816                packer.push(Datum::String(&element_id.to_string()));
1817                append_modifier(&mut packer, element_modifiers);
1818                &MZ_LIST_TYPES
1819            }
1820            CatalogType::Map {
1821                key_reference: key_id,
1822                value_reference: value_id,
1823                key_modifiers,
1824                value_modifiers,
1825            } => {
1826                packer.push(Datum::String(&id.to_string()));
1827                packer.push(Datum::String(&key_id.to_string()));
1828                packer.push(Datum::String(&value_id.to_string()));
1829                append_modifier(&mut packer, key_modifiers);
1830                append_modifier(&mut packer, value_modifiers);
1831                &MZ_MAP_TYPES
1832            }
1833            CatalogType::Pseudo => {
1834                packer.push(Datum::String(&id.to_string()));
1835                &MZ_PSEUDO_TYPES
1836            }
1837            _ => {
1838                packer.push(Datum::String(&id.to_string()));
1839                &MZ_BASE_TYPES
1840            }
1841        };
1842        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1843
1844        if let Some(pg_metadata) = &typ.details.pg_metadata {
1845            out.push(BuiltinTableUpdate::row(
1846                &*MZ_TYPE_PG_METADATA,
1847                Row::pack_slice(&[
1848                    Datum::String(&id.to_string()),
1849                    Datum::UInt32(pg_metadata.typinput_oid),
1850                    Datum::UInt32(pg_metadata.typreceive_oid),
1851                ]),
1852                diff,
1853            ));
1854        }
1855
1856        out
1857    }
1858
1859    fn pack_func_update(
1860        &self,
1861        id: CatalogItemId,
1862        schema_id: &SchemaSpecifier,
1863        name: &str,
1864        owner_id: &RoleId,
1865        func: &Func,
1866        diff: Diff,
1867    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1868        let mut updates = vec![];
1869        for func_impl_details in func.inner.func_impls() {
1870            let arg_type_ids = func_impl_details
1871                .arg_typs
1872                .iter()
1873                .map(|typ| self.get_system_type(typ).id().to_string())
1874                .collect::<Vec<_>>();
1875
1876            let mut row = Row::default();
1877            row.packer()
1878                .try_push_array(
1879                    &[ArrayDimension {
1880                        lower_bound: 1,
1881                        length: arg_type_ids.len(),
1882                    }],
1883                    arg_type_ids.iter().map(|id| Datum::String(id)),
1884                )
1885                .expect(
1886                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1887                );
1888            let arg_type_ids = row.unpack_first();
1889
1890            updates.push(BuiltinTableUpdate::row(
1891                &*MZ_FUNCTIONS,
1892                Row::pack_slice(&[
1893                    Datum::String(&id.to_string()),
1894                    Datum::UInt32(func_impl_details.oid),
1895                    Datum::String(&schema_id.to_string()),
1896                    Datum::String(name),
1897                    arg_type_ids,
1898                    Datum::from(
1899                        func_impl_details
1900                            .variadic_typ
1901                            .map(|typ| self.get_system_type(typ).id().to_string())
1902                            .as_deref(),
1903                    ),
1904                    Datum::from(
1905                        func_impl_details
1906                            .return_typ
1907                            .map(|typ| self.get_system_type(typ).id().to_string())
1908                            .as_deref(),
1909                    ),
1910                    func_impl_details.return_is_set.into(),
1911                    Datum::String(&owner_id.to_string()),
1912                ]),
1913                diff,
1914            ));
1915
1916            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1917                updates.push(BuiltinTableUpdate::row(
1918                    &*MZ_AGGREGATES,
1919                    Row::pack_slice(&[
1920                        Datum::UInt32(func_impl_details.oid),
1921                        Datum::String("n"),
1923                        Datum::Int16(0),
1924                    ]),
1925                    diff,
1926                ));
1927            }
1928        }
1929        updates
1930    }
1931
1932    pub fn pack_op_update(
1933        &self,
1934        operator: &str,
1935        func_impl_details: FuncImplCatalogDetails,
1936        diff: Diff,
1937    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1938        let arg_type_ids = func_impl_details
1939            .arg_typs
1940            .iter()
1941            .map(|typ| self.get_system_type(typ).id().to_string())
1942            .collect::<Vec<_>>();
1943
1944        let mut row = Row::default();
1945        row.packer()
1946            .try_push_array(
1947                &[ArrayDimension {
1948                    lower_bound: 1,
1949                    length: arg_type_ids.len(),
1950                }],
1951                arg_type_ids.iter().map(|id| Datum::String(id)),
1952            )
1953            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1954        let arg_type_ids = row.unpack_first();
1955
1956        BuiltinTableUpdate::row(
1957            &*MZ_OPERATORS,
1958            Row::pack_slice(&[
1959                Datum::UInt32(func_impl_details.oid),
1960                Datum::String(operator),
1961                arg_type_ids,
1962                Datum::from(
1963                    func_impl_details
1964                        .return_typ
1965                        .map(|typ| self.get_system_type(typ).id().to_string())
1966                        .as_deref(),
1967                ),
1968            ]),
1969            diff,
1970        )
1971    }
1972
1973    fn pack_secret_update(
1974        &self,
1975        id: CatalogItemId,
1976        oid: u32,
1977        schema_id: &SchemaSpecifier,
1978        name: &str,
1979        owner_id: &RoleId,
1980        privileges: Datum,
1981        diff: Diff,
1982    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1983        vec![BuiltinTableUpdate::row(
1984            &*MZ_SECRETS,
1985            Row::pack_slice(&[
1986                Datum::String(&id.to_string()),
1987                Datum::UInt32(oid),
1988                Datum::String(&schema_id.to_string()),
1989                Datum::String(name),
1990                Datum::String(&owner_id.to_string()),
1991                privileges,
1992            ]),
1993            diff,
1994        )]
1995    }
1996
1997    pub fn pack_audit_log_update(
1998        &self,
1999        event: &VersionedEvent,
2000        diff: Diff,
2001    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2002        let (event_type, object_type, details, user, occurred_at): (
2003            &EventType,
2004            &ObjectType,
2005            &EventDetails,
2006            &Option<String>,
2007            u64,
2008        ) = match event {
2009            VersionedEvent::V1(ev) => (
2010                &ev.event_type,
2011                &ev.object_type,
2012                &ev.details,
2013                &ev.user,
2014                ev.occurred_at,
2015            ),
2016        };
2017        let details = Jsonb::from_serde_json(details.as_json())
2018            .map_err(|e| {
2019                Error::new(ErrorKind::Unstructured(format!(
2020                    "could not pack audit log update: {}",
2021                    e
2022                )))
2023            })?
2024            .into_row();
2025        let details = details
2026            .iter()
2027            .next()
2028            .expect("details created above with a single jsonb column");
2029        let dt = mz_ore::now::to_datetime(occurred_at);
2030        let id = event.sortable_id();
2031        Ok(BuiltinTableUpdate::row(
2032            &*MZ_AUDIT_EVENTS,
2033            Row::pack_slice(&[
2034                Datum::UInt64(id),
2035                Datum::String(&format!("{}", event_type)),
2036                Datum::String(&format!("{}", object_type)),
2037                details,
2038                match user {
2039                    Some(user) => Datum::String(user),
2040                    None => Datum::Null,
2041                },
2042                Datum::TimestampTz(dt.try_into().expect("must fit")),
2043            ]),
2044            diff,
2045        ))
2046    }
2047
2048    pub fn pack_storage_usage_update(
2049        &self,
2050        VersionedStorageUsage::V1(event): VersionedStorageUsage,
2051        diff: Diff,
2052    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2053        let id = &MZ_STORAGE_USAGE_BY_SHARD;
2054        let row = Row::pack_slice(&[
2055            Datum::UInt64(event.id),
2056            Datum::from(event.shard_id.as_deref()),
2057            Datum::UInt64(event.size_bytes),
2058            Datum::TimestampTz(
2059                mz_ore::now::to_datetime(event.collection_timestamp)
2060                    .try_into()
2061                    .expect("must fit"),
2062            ),
2063        ]);
2064        BuiltinTableUpdate::row(id, row, diff)
2065    }
2066
2067    pub fn pack_egress_ip_update(
2068        &self,
2069        ip: &IpNet,
2070    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2071        let id = &MZ_EGRESS_IPS;
2072        let addr = ip.addr();
2073        let row = Row::pack_slice(&[
2074            Datum::String(&addr.to_string()),
2075            Datum::Int32(ip.prefix_len().into()),
2076            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2077        ]);
2078        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2079    }
2080
2081    pub fn pack_license_key_update(
2082        &self,
2083        license_key: &ValidatedLicenseKey,
2084    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2085        let id = &MZ_LICENSE_KEYS;
2086        let row = Row::pack_slice(&[
2087            Datum::String(&license_key.id),
2088            Datum::String(&license_key.organization),
2089            Datum::String(&license_key.environment_id),
2090            Datum::TimestampTz(
2091                mz_ore::now::to_datetime(license_key.expiration * 1000)
2092                    .try_into()
2093                    .expect("must fit"),
2094            ),
2095            Datum::TimestampTz(
2096                mz_ore::now::to_datetime(license_key.not_before * 1000)
2097                    .try_into()
2098                    .expect("must fit"),
2099            ),
2100        ]);
2101        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2102    }
2103
2104    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2105        let mut updates = Vec::new();
2106        for (size, alloc) in &self.cluster_replica_sizes.0 {
2107            if alloc.disabled {
2108                continue;
2109            }
2110
2111            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2114            let MemoryLimit(ByteSize(memory_bytes)) =
2115                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2116            let DiskLimit(ByteSize(disk_bytes)) =
2117                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2118
2119            let row = Row::pack_slice(&[
2120                size.as_str().into(),
2121                u64::from(alloc.scale).into(),
2122                u64::cast_from(alloc.workers).into(),
2123                cpu_limit.as_nanocpus().into(),
2124                memory_bytes.into(),
2125                disk_bytes.into(),
2126                (alloc.credits_per_hour).into(),
2127            ]);
2128
2129            updates.push(BuiltinTableUpdate::row(
2130                &*MZ_CLUSTER_REPLICA_SIZES,
2131                row,
2132                Diff::ONE,
2133            ));
2134        }
2135
2136        updates
2137    }
2138
2139    pub fn pack_subscribe_update(
2140        &self,
2141        id: GlobalId,
2142        subscribe: &ActiveSubscribe,
2143        diff: Diff,
2144    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2145        let mut row = Row::default();
2146        let mut packer = row.packer();
2147        packer.push(Datum::String(&id.to_string()));
2148        packer.push(Datum::Uuid(subscribe.session_uuid));
2149        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2150
2151        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2152        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2153
2154        let depends_on: Vec<_> = subscribe
2155            .depends_on
2156            .iter()
2157            .map(|id| id.to_string())
2158            .collect();
2159        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2160
2161        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2162    }
2163
2164    pub fn pack_session_update(
2165        &self,
2166        conn: &ConnMeta,
2167        diff: Diff,
2168    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2169        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2170        BuiltinTableUpdate::row(
2171            &*MZ_SESSIONS,
2172            Row::pack_slice(&[
2173                Datum::Uuid(conn.uuid()),
2174                Datum::UInt32(conn.conn_id().unhandled()),
2175                Datum::String(&conn.authenticated_role_id().to_string()),
2176                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2177                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2178            ]),
2179            diff,
2180        )
2181    }
2182
2183    pub fn pack_default_privileges_update(
2184        &self,
2185        default_privilege_object: &DefaultPrivilegeObject,
2186        grantee: &RoleId,
2187        acl_mode: &AclMode,
2188        diff: Diff,
2189    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2190        BuiltinTableUpdate::row(
2191            &*MZ_DEFAULT_PRIVILEGES,
2192            Row::pack_slice(&[
2193                default_privilege_object.role_id.to_string().as_str().into(),
2194                default_privilege_object
2195                    .database_id
2196                    .map(|database_id| database_id.to_string())
2197                    .as_deref()
2198                    .into(),
2199                default_privilege_object
2200                    .schema_id
2201                    .map(|schema_id| schema_id.to_string())
2202                    .as_deref()
2203                    .into(),
2204                default_privilege_object
2205                    .object_type
2206                    .to_string()
2207                    .to_lowercase()
2208                    .as_str()
2209                    .into(),
2210                grantee.to_string().as_str().into(),
2211                acl_mode.to_string().as_str().into(),
2212            ]),
2213            diff,
2214        )
2215    }
2216
2217    pub fn pack_system_privileges_update(
2218        &self,
2219        privileges: MzAclItem,
2220        diff: Diff,
2221    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2222        BuiltinTableUpdate::row(
2223            &*MZ_SYSTEM_PRIVILEGES,
2224            Row::pack_slice(&[privileges.into()]),
2225            diff,
2226        )
2227    }
2228
2229    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2230        let mut row = Row::default();
2231        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2232        row.packer()
2233            .try_push_array(
2234                &[ArrayDimension {
2235                    lower_bound: 1,
2236                    length: flat_privileges.len(),
2237                }],
2238                flat_privileges
2239                    .into_iter()
2240                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2241            )
2242            .expect("privileges is 1 dimensional, and its length is used for the array length");
2243        row
2244    }
2245
2246    pub fn pack_comment_update(
2247        &self,
2248        object_id: CommentObjectId,
2249        column_pos: Option<usize>,
2250        comment: &str,
2251        diff: Diff,
2252    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2253        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2255        let audit_type = super::object_type_to_audit_object_type(object_type);
2256        let object_type_str = audit_type.to_string();
2257
2258        let object_id_str = match object_id {
2259            CommentObjectId::Table(global_id)
2260            | CommentObjectId::View(global_id)
2261            | CommentObjectId::MaterializedView(global_id)
2262            | CommentObjectId::Source(global_id)
2263            | CommentObjectId::Sink(global_id)
2264            | CommentObjectId::Index(global_id)
2265            | CommentObjectId::Func(global_id)
2266            | CommentObjectId::Connection(global_id)
2267            | CommentObjectId::Secret(global_id)
2268            | CommentObjectId::Type(global_id)
2269            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2270            CommentObjectId::Role(role_id) => role_id.to_string(),
2271            CommentObjectId::Database(database_id) => database_id.to_string(),
2272            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2273            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2274            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2275            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2276        };
2277        let column_pos_datum = match column_pos {
2278            Some(pos) => {
2279                let pos =
2281                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2282                Datum::Int32(pos)
2283            }
2284            None => Datum::Null,
2285        };
2286
2287        BuiltinTableUpdate::row(
2288            &*MZ_COMMENTS,
2289            Row::pack_slice(&[
2290                Datum::String(&object_id_str),
2291                Datum::String(&object_type_str),
2292                column_pos_datum,
2293                Datum::String(comment),
2294            ]),
2295            diff,
2296        )
2297    }
2298
2299    pub fn pack_webhook_source_update(
2300        &self,
2301        item_id: CatalogItemId,
2302        diff: Diff,
2303    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2304        let url = self
2305            .try_get_webhook_url(&item_id)
2306            .expect("webhook source should exist");
2307        let url = url.to_string();
2308        let name = &self.get_entry(&item_id).name().item;
2309        let id_str = item_id.to_string();
2310
2311        BuiltinTableUpdate::row(
2312            &*MZ_WEBHOOKS_SOURCES,
2313            Row::pack_slice(&[
2314                Datum::String(&id_str),
2315                Datum::String(name),
2316                Datum::String(&url),
2317            ]),
2318            diff,
2319        )
2320    }
2321
2322    pub fn pack_source_references_update(
2323        &self,
2324        source_references: &SourceReferences,
2325        diff: Diff,
2326    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2327        let source_id = source_references.source_id.to_string();
2328        let updated_at = &source_references.updated_at;
2329        source_references
2330            .references
2331            .iter()
2332            .map(|reference| {
2333                let mut row = Row::default();
2334                let mut packer = row.packer();
2335                packer.extend([
2336                    Datum::String(&source_id),
2337                    reference
2338                        .namespace
2339                        .as_ref()
2340                        .map(|s| Datum::String(s))
2341                        .unwrap_or(Datum::Null),
2342                    Datum::String(&reference.name),
2343                    Datum::TimestampTz(
2344                        mz_ore::now::to_datetime(*updated_at)
2345                            .try_into()
2346                            .expect("must fit"),
2347                    ),
2348                ]);
2349                if reference.columns.len() > 0 {
2350                    packer
2351                        .try_push_array(
2352                            &[ArrayDimension {
2353                                lower_bound: 1,
2354                                length: reference.columns.len(),
2355                            }],
2356                            reference.columns.iter().map(|col| Datum::String(col)),
2357                        )
2358                        .expect(
2359                            "columns is 1 dimensional, and its length is used for the array length",
2360                        );
2361                } else {
2362                    packer.push(Datum::Null);
2363                }
2364
2365                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2366            })
2367            .collect()
2368    }
2369}