mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_METRICS,
20    MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES, MZ_CLUSTER_REPLICAS,
21    MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22    MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23    MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEX_COLUMNS, MZ_INDEXES,
24    MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25    MZ_KAFKA_SOURCES, MZ_LIST_TYPES, MZ_MAP_TYPES, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
26    MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES, MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES,
27    MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES,
28    MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
29    MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
30    MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
31    MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
32    MZ_WEBHOOKS_SOURCES,
33};
34use mz_catalog::config::AwsPrincipalContext;
35use mz_catalog::durable::SourceReferences;
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38    CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, ContinualTask,
39    DataSourceDesc, Func, Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
40};
41use mz_controller::clusters::{
42    ClusterStatus, ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ProcessId,
43    ReplicaAllocation, ReplicaLocation,
44};
45use mz_controller_types::{ClusterId, ReplicaId};
46use mz_expr::MirScalarExpr;
47use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit, ServiceProcessMetrics};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_persist_client::batch::ProtoBatch;
51use mz_repr::adt::array::ArrayDimension;
52use mz_repr::adt::interval::Interval;
53use mz_repr::adt::jsonb::Jsonb;
54use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
55use mz_repr::network_policy_id::NetworkPolicyId;
56use mz_repr::refresh_schedule::RefreshEvery;
57use mz_repr::role_id::RoleId;
58use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, ScalarType, Timestamp};
59use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
60use mz_sql::catalog::{
61    CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
62    TypeCategory,
63};
64use mz_sql::func::FuncImplCatalogDetails;
65use mz_sql::names::{
66    CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
67};
68use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
69use mz_sql::session::user::SYSTEM_USER;
70use mz_sql::session::vars::SessionVars;
71use mz_sql_parser::ast::display::AstDisplay;
72use mz_storage_client::client::TableData;
73use mz_storage_types::connections::KafkaConnection;
74use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
75use mz_storage_types::connections::inline::ReferencedConnection;
76use mz_storage_types::connections::string_or_secret::StringOrSecret;
77use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
78use mz_storage_types::sources::{
79    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
80};
81use smallvec::smallvec;
82
83// DO NOT add any more imports from `crate` outside of `crate::catalog`.
84use crate::active_compute_sink::ActiveSubscribe;
85use crate::catalog::CatalogState;
86use crate::coord::ConnMeta;
87
88/// An update to a built-in table.
89#[derive(Debug, Clone)]
90pub struct BuiltinTableUpdate<T = CatalogItemId> {
91    /// The reference of the table to update.
92    pub id: T,
93    /// The data to put into the table.
94    pub data: TableData,
95}
96
97impl<T> BuiltinTableUpdate<T> {
98    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
99    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
100        BuiltinTableUpdate {
101            id,
102            data: TableData::Rows(vec![(row, diff)]),
103        }
104    }
105
106    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
107        BuiltinTableUpdate {
108            id,
109            data: TableData::Batches(smallvec![batch]),
110        }
111    }
112}
113
114impl CatalogState {
115    pub fn resolve_builtin_table_updates(
116        &self,
117        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
118    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
119        builtin_table_update
120            .into_iter()
121            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
122            .collect()
123    }
124
125    pub fn resolve_builtin_table_update(
126        &self,
127        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
128    ) -> BuiltinTableUpdate<CatalogItemId> {
129        let id = self.resolve_builtin_table(id);
130        BuiltinTableUpdate { id, data }
131    }
132
133    pub fn pack_depends_update(
134        &self,
135        depender: CatalogItemId,
136        dependee: CatalogItemId,
137        diff: Diff,
138    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
139        let row = Row::pack_slice(&[
140            Datum::String(&depender.to_string()),
141            Datum::String(&dependee.to_string()),
142        ]);
143        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
144    }
145
146    pub(super) fn pack_database_update(
147        &self,
148        database_id: &DatabaseId,
149        diff: Diff,
150    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
151        let database = self.get_database(database_id);
152        let row = self.pack_privilege_array_row(database.privileges());
153        let privileges = row.unpack_first();
154        BuiltinTableUpdate::row(
155            &*MZ_DATABASES,
156            Row::pack_slice(&[
157                Datum::String(&database.id.to_string()),
158                Datum::UInt32(database.oid),
159                Datum::String(database.name()),
160                Datum::String(&database.owner_id.to_string()),
161                privileges,
162            ]),
163            diff,
164        )
165    }
166
167    pub(super) fn pack_schema_update(
168        &self,
169        database_spec: &ResolvedDatabaseSpecifier,
170        schema_id: &SchemaId,
171        diff: Diff,
172    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
173        let (database_id, schema) = match database_spec {
174            ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
175            ResolvedDatabaseSpecifier::Id(id) => (
176                Some(id.to_string()),
177                &self.database_by_id[id].schemas_by_id[schema_id],
178            ),
179        };
180        let row = self.pack_privilege_array_row(schema.privileges());
181        let privileges = row.unpack_first();
182        BuiltinTableUpdate::row(
183            &*MZ_SCHEMAS,
184            Row::pack_slice(&[
185                Datum::String(&schema_id.to_string()),
186                Datum::UInt32(schema.oid),
187                Datum::from(database_id.as_deref()),
188                Datum::String(&schema.name.schema),
189                Datum::String(&schema.owner_id.to_string()),
190                privileges,
191            ]),
192            diff,
193        )
194    }
195
196    pub(super) fn pack_role_update(
197        &self,
198        id: RoleId,
199        diff: Diff,
200    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
201        match id {
202            // PUBLIC role should not show up in mz_roles.
203            RoleId::Public => vec![],
204            id => {
205                let role = self.get_role(&id);
206                let role_update = BuiltinTableUpdate::row(
207                    &*MZ_ROLES,
208                    Row::pack_slice(&[
209                        Datum::String(&role.id.to_string()),
210                        Datum::UInt32(role.oid),
211                        Datum::String(&role.name),
212                        Datum::from(role.attributes.inherit),
213                    ]),
214                    diff,
215                );
216                let mut updates = vec![role_update];
217
218                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
219                // we should instead have a static list of all session vars.
220                let session_vars_reference = SessionVars::new_unchecked(
221                    &mz_build_info::DUMMY_BUILD_INFO,
222                    SYSTEM_USER.clone(),
223                    None,
224                );
225
226                for (name, val) in role.vars() {
227                    let result = session_vars_reference
228                        .inspect(name)
229                        .and_then(|var| var.check(val.borrow()));
230                    let Ok(formatted_val) = result else {
231                        // Note: all variables should have been validated by this point, so we
232                        // shouldn't ever hit this.
233                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
234                        continue;
235                    };
236
237                    let role_var_update = BuiltinTableUpdate::row(
238                        &*MZ_ROLE_PARAMETERS,
239                        Row::pack_slice(&[
240                            Datum::String(&role.id.to_string()),
241                            Datum::String(name),
242                            Datum::String(&formatted_val),
243                        ]),
244                        diff,
245                    );
246                    updates.push(role_var_update);
247                }
248
249                updates
250            }
251        }
252    }
253
254    pub(super) fn pack_role_members_update(
255        &self,
256        role_id: RoleId,
257        member_id: RoleId,
258        diff: Diff,
259    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
260        let grantor_id = self
261            .get_role(&member_id)
262            .membership
263            .map
264            .get(&role_id)
265            .expect("catalog out of sync");
266        BuiltinTableUpdate::row(
267            &*MZ_ROLE_MEMBERS,
268            Row::pack_slice(&[
269                Datum::String(&role_id.to_string()),
270                Datum::String(&member_id.to_string()),
271                Datum::String(&grantor_id.to_string()),
272            ]),
273            diff,
274        )
275    }
276
277    pub(super) fn pack_cluster_update(
278        &self,
279        name: &str,
280        diff: Diff,
281    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
282        let id = self.clusters_by_name[name];
283        let cluster = &self.clusters_by_id[&id];
284        let row = self.pack_privilege_array_row(cluster.privileges());
285        let privileges = row.unpack_first();
286        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
287            match &cluster.config.variant {
288                ClusterVariant::Managed(config) => (
289                    Some(config.size.as_str()),
290                    Some(config.disk),
291                    Some(config.replication_factor),
292                    if config.availability_zones.is_empty() {
293                        None
294                    } else {
295                        Some(config.availability_zones.clone())
296                    },
297                    Some(config.logging.log_logging),
298                    config.logging.interval.map(|d| {
299                        Interval::from_duration(&d)
300                            .expect("planning ensured this convertible back to interval")
301                    }),
302                ),
303                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
304            };
305
306        let mut row = Row::default();
307        let mut packer = row.packer();
308        packer.extend([
309            Datum::String(&id.to_string()),
310            Datum::String(name),
311            Datum::String(&cluster.owner_id.to_string()),
312            privileges,
313            cluster.is_managed().into(),
314            size.into(),
315            replication_factor.into(),
316            disk.into(),
317        ]);
318        if let Some(azs) = azs {
319            packer.push_list(azs.iter().map(|az| Datum::String(az)));
320        } else {
321            packer.push(Datum::Null);
322        }
323        packer.push(Datum::from(introspection_debugging));
324        packer.push(Datum::from(introspection_interval));
325
326        let mut updates = Vec::new();
327
328        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
329
330        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
331            let row = match managed_config.schedule {
332                ClusterSchedule::Manual => Row::pack_slice(&[
333                    Datum::String(&id.to_string()),
334                    Datum::String("manual"),
335                    Datum::Null,
336                ]),
337                ClusterSchedule::Refresh {
338                    hydration_time_estimate,
339                } => Row::pack_slice(&[
340                    Datum::String(&id.to_string()),
341                    Datum::String("on-refresh"),
342                    Datum::Interval(
343                        Interval::from_duration(&hydration_time_estimate)
344                            .expect("planning ensured that this is convertible back to Interval"),
345                    ),
346                ]),
347            };
348            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
349        }
350
351        updates.push(BuiltinTableUpdate::row(
352            &*MZ_CLUSTER_WORKLOAD_CLASSES,
353            Row::pack_slice(&[
354                Datum::String(&id.to_string()),
355                Datum::from(cluster.config.workload_class.as_deref()),
356            ]),
357            diff,
358        ));
359
360        updates
361    }
362
363    pub(super) fn pack_cluster_replica_update(
364        &self,
365        cluster_id: ClusterId,
366        name: &str,
367        diff: Diff,
368    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
369        let cluster = &self.clusters_by_id[&cluster_id];
370        let id = cluster.replica_id(name).expect("Must exist");
371        let replica = cluster.replica(id).expect("Must exist");
372
373        let (size, disk, az, internal, pending) = match &replica.config.location {
374            // TODO(guswynn): The column should be `availability_zones`, not
375            // `availability_zone`.
376            ReplicaLocation::Managed(ManagedReplicaLocation {
377                size,
378                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
379                allocation: _,
380                disk,
381                billed_as: _,
382                internal,
383                pending,
384            }) => (
385                Some(&**size),
386                Some(*disk),
387                Some(az.as_str()),
388                *internal,
389                *pending,
390            ),
391            ReplicaLocation::Managed(ManagedReplicaLocation {
392                size,
393                availability_zones: _,
394                allocation: _,
395                disk,
396                billed_as: _,
397                internal,
398                pending,
399            }) => (Some(&**size), Some(*disk), None, *internal, *pending),
400            _ => (None, None, None, false, false),
401        };
402
403        let cluster_replica_update = BuiltinTableUpdate::row(
404            &*MZ_CLUSTER_REPLICAS,
405            Row::pack_slice(&[
406                Datum::String(&id.to_string()),
407                Datum::String(name),
408                Datum::String(&cluster_id.to_string()),
409                Datum::from(size),
410                Datum::from(az),
411                Datum::String(&replica.owner_id.to_string()),
412                Datum::from(disk),
413            ]),
414            diff,
415        );
416
417        let mut updates = vec![cluster_replica_update];
418
419        if internal {
420            let update = BuiltinTableUpdate::row(
421                &*MZ_INTERNAL_CLUSTER_REPLICAS,
422                Row::pack_slice(&[Datum::String(&id.to_string())]),
423                diff,
424            );
425            updates.push(update);
426        }
427
428        if pending {
429            let update = BuiltinTableUpdate::row(
430                &*MZ_PENDING_CLUSTER_REPLICAS,
431                Row::pack_slice(&[Datum::String(&id.to_string())]),
432                diff,
433            );
434            updates.push(update);
435        }
436
437        updates
438    }
439
440    // TODO(jkosh44) This should not be a builtin table, it should be a builtin source.
441    pub(crate) fn pack_cluster_replica_status_update(
442        &self,
443        replica_id: ReplicaId,
444        process_id: ProcessId,
445        event: &ClusterReplicaProcessStatus,
446        diff: Diff,
447    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
448        let status = event.status.as_kebab_case_str();
449
450        let offline_reason = match event.status {
451            ClusterStatus::Online => None,
452            ClusterStatus::Offline(None) => None,
453            ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
454        };
455
456        BuiltinTableUpdate::row(
457            &*MZ_CLUSTER_REPLICA_STATUSES,
458            Row::pack_slice(&[
459                Datum::String(&replica_id.to_string()),
460                Datum::UInt64(process_id),
461                Datum::String(status),
462                Datum::from(offline_reason.as_deref()),
463                Datum::TimestampTz(event.time.try_into().expect("must fit")),
464            ]),
465            diff,
466        )
467    }
468
469    pub(crate) fn pack_network_policy_update(
470        &self,
471        policy_id: &NetworkPolicyId,
472        diff: Diff,
473    ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
474        let policy = self.get_network_policy(policy_id);
475        let row = self.pack_privilege_array_row(&policy.privileges);
476        let privileges = row.unpack_first();
477        let mut updates = Vec::new();
478        for ref rule in policy.rules.clone() {
479            updates.push(BuiltinTableUpdate::row(
480                &*MZ_NETWORK_POLICY_RULES,
481                Row::pack_slice(&[
482                    Datum::String(&rule.name),
483                    Datum::String(&policy.id.to_string()),
484                    Datum::String(&rule.action.to_string()),
485                    Datum::String(&rule.address.to_string()),
486                    Datum::String(&rule.direction.to_string()),
487                ]),
488                diff,
489            ));
490        }
491        updates.push(BuiltinTableUpdate::row(
492            &*MZ_NETWORK_POLICIES,
493            Row::pack_slice(&[
494                Datum::String(&policy.id.to_string()),
495                Datum::String(&policy.name),
496                Datum::String(&policy.owner_id.to_string()),
497                privileges,
498                Datum::UInt32(policy.oid.clone()),
499            ]),
500            diff,
501        ));
502
503        Ok(updates)
504    }
505
506    pub(super) fn pack_item_update(
507        &self,
508        id: CatalogItemId,
509        diff: Diff,
510    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
511        let entry = self.get_entry(&id);
512        let oid = entry.oid();
513        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
514        let schema_id = &self
515            .get_schema(
516                &entry.name().qualifiers.database_spec,
517                &entry.name().qualifiers.schema_spec,
518                conn_id,
519            )
520            .id;
521        let name = &entry.name().item;
522        let owner_id = entry.owner_id();
523        let privileges_row = self.pack_privilege_array_row(entry.privileges());
524        let privileges = privileges_row.unpack_first();
525        let mut updates = match entry.item() {
526            CatalogItem::Log(_) => self.pack_source_update(
527                id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
528                privileges, diff, None,
529            ),
530            CatalogItem::Index(index) => {
531                self.pack_index_update(id, oid, name, owner_id, index, diff)
532            }
533            CatalogItem::Table(table) => {
534                let mut updates = self
535                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
536
537                if let TableDataSource::DataSource {
538                    desc: data_source,
539                    timeline: _,
540                } = &table.data_source
541                {
542                    updates.extend(match data_source {
543                        DataSourceDesc::IngestionExport {
544                            ingestion_id,
545                            external_reference: UnresolvedItemName(external_reference),
546                            details: _,
547                            data_config: _,
548                        } => {
549                            let ingestion_entry = self
550                                .get_entry(ingestion_id)
551                                .source_desc()
552                                .expect("primary source exists")
553                                .expect("primary source is a source");
554
555                            match ingestion_entry.connection.name() {
556                                "postgres" => {
557                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
558                                    // The left-most qualification of Postgres
559                                    // tables is the database, but this
560                                    // information is redundant because each
561                                    // Postgres connection connects to only one
562                                    // database.
563                                    let schema_name = external_reference[1].to_ast_string_simple();
564                                    let table_name = external_reference[2].to_ast_string_simple();
565
566                                    self.pack_postgres_source_tables_update(
567                                        id,
568                                        &schema_name,
569                                        &table_name,
570                                        diff,
571                                    )
572                                }
573                                "mysql" => {
574                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
575                                    let schema_name = external_reference[0].to_ast_string_simple();
576                                    let table_name = external_reference[1].to_ast_string_simple();
577
578                                    self.pack_mysql_source_tables_update(
579                                        id,
580                                        &schema_name,
581                                        &table_name,
582                                        diff,
583                                    )
584                                }
585                                "sql-server" => {
586                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
587                                    // The left-most qualification of SQL Server tables is
588                                    // the database, but this information is redundant
589                                    // because each SQL Server connection connects to
590                                    // only one database.
591                                    let schema_name = external_reference[1].to_ast_string_simple();
592                                    let table_name = external_reference[2].to_ast_string_simple();
593
594                                    self.pack_sql_server_source_table_update(
595                                        id,
596                                        &schema_name,
597                                        &table_name,
598                                        diff,
599                                    )
600                                }
601                                // Load generator sources don't have any special
602                                // updates.
603                                "load-generator" => vec![],
604                                "kafka" => {
605                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
606                                    let topic = external_reference[0].to_ast_string_simple();
607                                    let envelope = data_source.envelope();
608                                    let (key_format, value_format) = data_source.formats();
609
610                                    self.pack_kafka_source_tables_update(
611                                        id,
612                                        &topic,
613                                        envelope,
614                                        key_format,
615                                        value_format,
616                                        diff,
617                                    )
618                                }
619                                s => unreachable!("{s} sources do not have tables"),
620                            }
621                        }
622                        _ => vec![],
623                    });
624                }
625
626                updates
627            }
628            CatalogItem::Source(source) => {
629                let source_type = source.source_type();
630                let connection_id = source.connection_id();
631                let envelope = source.data_source.envelope();
632                let cluster_entry = match source.data_source {
633                    // Ingestion exports don't have their own cluster, but
634                    // run on their ingestion's cluster.
635                    DataSourceDesc::IngestionExport { ingestion_id, .. } => {
636                        self.get_entry(&ingestion_id)
637                    }
638                    _ => entry,
639                };
640
641                let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
642
643                let (key_format, value_format) = source.data_source.formats();
644
645                let mut updates = self.pack_source_update(
646                    id,
647                    oid,
648                    schema_id,
649                    name,
650                    source_type,
651                    connection_id,
652                    envelope,
653                    key_format,
654                    value_format,
655                    cluster_id.as_deref(),
656                    owner_id,
657                    privileges,
658                    diff,
659                    source.create_sql.as_ref(),
660                );
661
662                updates.extend(match &source.data_source {
663                    DataSourceDesc::Ingestion { ingestion_desc, .. } => {
664                        match &ingestion_desc.desc.connection {
665                            GenericSourceConnection::Postgres(postgres) => {
666                                self.pack_postgres_source_update(id, postgres, diff)
667                            }
668                            GenericSourceConnection::Kafka(kafka) => {
669                                self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
670                            }
671                            _ => vec![],
672                        }
673                    }
674                    DataSourceDesc::IngestionExport {
675                        ingestion_id,
676                        external_reference: UnresolvedItemName(external_reference),
677                        details: _,
678                        data_config: _,
679                    } => {
680                        let ingestion_entry = self
681                            .get_entry(ingestion_id)
682                            .source_desc()
683                            .expect("primary source exists")
684                            .expect("primary source is a source");
685
686                        match ingestion_entry.connection.name() {
687                            "postgres" => {
688                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
689                                // The left-most qualification of Postgres
690                                // tables is the database, but this
691                                // information is redundant because each
692                                // Postgres connection connects to only one
693                                // database.
694                                let schema_name = external_reference[1].to_ast_string_simple();
695                                let table_name = external_reference[2].to_ast_string_simple();
696
697                                self.pack_postgres_source_tables_update(
698                                    id,
699                                    &schema_name,
700                                    &table_name,
701                                    diff,
702                                )
703                            }
704                            "mysql" => {
705                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
706                                let schema_name = external_reference[0].to_ast_string_simple();
707                                let table_name = external_reference[1].to_ast_string_simple();
708
709                                self.pack_mysql_source_tables_update(
710                                    id,
711                                    &schema_name,
712                                    &table_name,
713                                    diff,
714                                )
715                            }
716                            "sql-server" => {
717                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
718                                // The left-most qualification of SQL Server tables is
719                                // the database, but this information is redundant
720                                // because each SQL Server connection connects to
721                                // only one database.
722                                let schema_name = external_reference[1].to_ast_string_simple();
723                                let table_name = external_reference[2].to_ast_string_simple();
724
725                                self.pack_sql_server_source_table_update(
726                                    id,
727                                    &schema_name,
728                                    &table_name,
729                                    diff,
730                                )
731                            }
732                            // Load generator sources don't have any special
733                            // updates.
734                            "load-generator" => vec![],
735                            s => unreachable!("{s} sources do not have subsources"),
736                        }
737                    }
738                    DataSourceDesc::Webhook { .. } => {
739                        vec![self.pack_webhook_source_update(id, diff)]
740                    }
741                    _ => vec![],
742                });
743
744                updates
745            }
746            CatalogItem::View(view) => {
747                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
748            }
749            CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
750                id, oid, schema_id, name, owner_id, privileges, mview, diff,
751            ),
752            CatalogItem::Sink(sink) => {
753                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
754            }
755            CatalogItem::Type(ty) => {
756                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
757            }
758            CatalogItem::Func(func) => {
759                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
760            }
761            CatalogItem::Secret(_) => {
762                self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
763            }
764            CatalogItem::Connection(connection) => self.pack_connection_update(
765                id, oid, schema_id, name, owner_id, privileges, connection, diff,
766            ),
767            CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
768                id, oid, schema_id, name, owner_id, privileges, ct, diff,
769            ),
770        };
771
772        if !entry.item().is_temporary() {
773            // Populate or clean up the `mz_object_dependencies` table.
774            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
775            for dependee in entry.item().references().items() {
776                updates.push(self.pack_depends_update(id, *dependee, diff))
777            }
778        }
779
780        let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
781        // Always report the latest for an objects columns.
782        if let Ok(desc) = entry.desc_latest(&full_name) {
783            let defaults = match entry.item() {
784                CatalogItem::Table(Table {
785                    data_source: TableDataSource::TableWrites { defaults },
786                    ..
787                }) => Some(defaults),
788                _ => None,
789            };
790            for (i, (column_name, column_type)) in desc.iter().enumerate() {
791                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
792                let default: Datum = default
793                    .as_ref()
794                    .map(|d| Datum::String(d))
795                    .unwrap_or(Datum::Null);
796                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
797                let (type_name, type_oid) = match &column_type.scalar_type {
798                    ScalarType::List {
799                        custom_id: Some(custom_id),
800                        ..
801                    }
802                    | ScalarType::Map {
803                        custom_id: Some(custom_id),
804                        ..
805                    }
806                    | ScalarType::Record {
807                        custom_id: Some(custom_id),
808                        ..
809                    } => {
810                        let entry = self.get_entry(custom_id);
811                        // NOTE(benesch): the `mz_columns.type text` field is
812                        // wrong. Types do not have a name that can be
813                        // represented as a single textual field. There can be
814                        // multiple types with the same name in different
815                        // schemas and databases. We should eventually deprecate
816                        // the `type` field in favor of a new `type_id` field
817                        // that can be joined against `mz_types`.
818                        //
819                        // For now, in the interest of pragmatism, we just use
820                        // the type's item name, and accept that there may be
821                        // ambiguity if the same type name is used in multiple
822                        // schemas. The ambiguity is mitigated by the OID, which
823                        // can be joined against `mz_types.oid` to resolve the
824                        // ambiguity.
825                        let name = &*entry.name().item;
826                        let oid = entry.oid();
827                        (name, oid)
828                    }
829                    _ => (pgtype.name(), pgtype.oid()),
830                };
831                updates.push(BuiltinTableUpdate::row(
832                    &*MZ_COLUMNS,
833                    Row::pack_slice(&[
834                        Datum::String(&id.to_string()),
835                        Datum::String(column_name.as_str()),
836                        Datum::UInt64(u64::cast_from(i + 1)),
837                        Datum::from(column_type.nullable),
838                        Datum::String(type_name),
839                        default,
840                        Datum::UInt32(type_oid),
841                        Datum::Int32(pgtype.typmod()),
842                    ]),
843                    diff,
844                ));
845            }
846        }
847
848        // Use initial lcw so that we can tell apart default from non-existent windows.
849        if let Some(cw) = entry.item().initial_logical_compaction_window() {
850            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
851            // Propagate source export changes.
852            for (cw, mut ids) in self.source_compaction_windows([id]) {
853                // Id already accounted for above.
854                ids.remove(&id);
855                for sub_id in ids {
856                    updates.push(self.pack_history_retention_strategy_update(sub_id, cw, diff));
857                }
858            }
859        }
860
861        updates
862    }
863
864    fn pack_history_retention_strategy_update(
865        &self,
866        id: CatalogItemId,
867        cw: CompactionWindow,
868        diff: Diff,
869    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
870        let cw: u64 = cw.comparable_timestamp().into();
871        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
872            .expect("must serialize");
873        BuiltinTableUpdate::row(
874            &*MZ_HISTORY_RETENTION_STRATEGIES,
875            Row::pack_slice(&[
876                Datum::String(&id.to_string()),
877                // FOR is the only strategy at the moment. We may introduce FROM or others later.
878                Datum::String("FOR"),
879                cw.into_row().into_element(),
880            ]),
881            diff,
882        )
883    }
884
885    fn pack_table_update(
886        &self,
887        id: CatalogItemId,
888        oid: u32,
889        schema_id: &SchemaSpecifier,
890        name: &str,
891        owner_id: &RoleId,
892        privileges: Datum,
893        diff: Diff,
894        table: &Table,
895    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
896        let redacted = table.create_sql.as_ref().map(|create_sql| {
897            mz_sql::parse::parse(create_sql)
898                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
899                .into_element()
900                .ast
901                .to_ast_string_redacted()
902        });
903        let source_id = if let TableDataSource::DataSource {
904            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
905            ..
906        } = &table.data_source
907        {
908            Some(ingestion_id.to_string())
909        } else {
910            None
911        };
912
913        vec![BuiltinTableUpdate::row(
914            &*MZ_TABLES,
915            Row::pack_slice(&[
916                Datum::String(&id.to_string()),
917                Datum::UInt32(oid),
918                Datum::String(&schema_id.to_string()),
919                Datum::String(name),
920                Datum::String(&owner_id.to_string()),
921                privileges,
922                if let Some(create_sql) = &table.create_sql {
923                    Datum::String(create_sql)
924                } else {
925                    Datum::Null
926                },
927                if let Some(redacted) = &redacted {
928                    Datum::String(redacted)
929                } else {
930                    Datum::Null
931                },
932                if let Some(source_id) = source_id.as_ref() {
933                    Datum::String(source_id)
934                } else {
935                    Datum::Null
936                },
937            ]),
938            diff,
939        )]
940    }
941
942    fn pack_source_update(
943        &self,
944        id: CatalogItemId,
945        oid: u32,
946        schema_id: &SchemaSpecifier,
947        name: &str,
948        source_desc_name: &str,
949        connection_id: Option<CatalogItemId>,
950        envelope: Option<&str>,
951        key_format: Option<&str>,
952        value_format: Option<&str>,
953        cluster_id: Option<&str>,
954        owner_id: &RoleId,
955        privileges: Datum,
956        diff: Diff,
957        create_sql: Option<&String>,
958    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
959        let redacted = create_sql.map(|create_sql| {
960            let create_stmt = mz_sql::parse::parse(create_sql)
961                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
962                .into_element()
963                .ast;
964            create_stmt.to_ast_string_redacted()
965        });
966        vec![BuiltinTableUpdate::row(
967            &*MZ_SOURCES,
968            Row::pack_slice(&[
969                Datum::String(&id.to_string()),
970                Datum::UInt32(oid),
971                Datum::String(&schema_id.to_string()),
972                Datum::String(name),
973                Datum::String(source_desc_name),
974                Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
975                // This is the "source size", which is a remnant from linked
976                // clusters.
977                Datum::Null,
978                Datum::from(envelope),
979                Datum::from(key_format),
980                Datum::from(value_format),
981                Datum::from(cluster_id),
982                Datum::String(&owner_id.to_string()),
983                privileges,
984                if let Some(create_sql) = create_sql {
985                    Datum::String(create_sql)
986                } else {
987                    Datum::Null
988                },
989                if let Some(redacted) = &redacted {
990                    Datum::String(redacted)
991                } else {
992                    Datum::Null
993                },
994            ]),
995            diff,
996        )]
997    }
998
999    fn pack_postgres_source_update(
1000        &self,
1001        id: CatalogItemId,
1002        postgres: &PostgresSourceConnection<ReferencedConnection>,
1003        diff: Diff,
1004    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1005        vec![BuiltinTableUpdate::row(
1006            &*MZ_POSTGRES_SOURCES,
1007            Row::pack_slice(&[
1008                Datum::String(&id.to_string()),
1009                Datum::String(&postgres.publication_details.slot),
1010                Datum::from(postgres.publication_details.timeline_id),
1011            ]),
1012            diff,
1013        )]
1014    }
1015
1016    fn pack_kafka_source_update(
1017        &self,
1018        item_id: CatalogItemId,
1019        collection_id: GlobalId,
1020        kafka: &KafkaSourceConnection<ReferencedConnection>,
1021        diff: Diff,
1022    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1023        vec![BuiltinTableUpdate::row(
1024            &*MZ_KAFKA_SOURCES,
1025            Row::pack_slice(&[
1026                Datum::String(&item_id.to_string()),
1027                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1028                Datum::String(&kafka.topic),
1029            ]),
1030            diff,
1031        )]
1032    }
1033
1034    fn pack_postgres_source_tables_update(
1035        &self,
1036        id: CatalogItemId,
1037        schema_name: &str,
1038        table_name: &str,
1039        diff: Diff,
1040    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1041        vec![BuiltinTableUpdate::row(
1042            &*MZ_POSTGRES_SOURCE_TABLES,
1043            Row::pack_slice(&[
1044                Datum::String(&id.to_string()),
1045                Datum::String(schema_name),
1046                Datum::String(table_name),
1047            ]),
1048            diff,
1049        )]
1050    }
1051
1052    fn pack_mysql_source_tables_update(
1053        &self,
1054        id: CatalogItemId,
1055        schema_name: &str,
1056        table_name: &str,
1057        diff: Diff,
1058    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1059        vec![BuiltinTableUpdate::row(
1060            &*MZ_MYSQL_SOURCE_TABLES,
1061            Row::pack_slice(&[
1062                Datum::String(&id.to_string()),
1063                Datum::String(schema_name),
1064                Datum::String(table_name),
1065            ]),
1066            diff,
1067        )]
1068    }
1069
1070    fn pack_sql_server_source_table_update(
1071        &self,
1072        id: CatalogItemId,
1073        schema_name: &str,
1074        table_name: &str,
1075        diff: Diff,
1076    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1077        vec![BuiltinTableUpdate::row(
1078            &*MZ_SQL_SERVER_SOURCE_TABLES,
1079            Row::pack_slice(&[
1080                Datum::String(&id.to_string()),
1081                Datum::String(schema_name),
1082                Datum::String(table_name),
1083            ]),
1084            diff,
1085        )]
1086    }
1087
1088    fn pack_kafka_source_tables_update(
1089        &self,
1090        id: CatalogItemId,
1091        topic: &str,
1092        envelope: Option<&str>,
1093        key_format: Option<&str>,
1094        value_format: Option<&str>,
1095        diff: Diff,
1096    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1097        vec![BuiltinTableUpdate::row(
1098            &*MZ_KAFKA_SOURCE_TABLES,
1099            Row::pack_slice(&[
1100                Datum::String(&id.to_string()),
1101                Datum::String(topic),
1102                Datum::from(envelope),
1103                Datum::from(key_format),
1104                Datum::from(value_format),
1105            ]),
1106            diff,
1107        )]
1108    }
1109
1110    fn pack_connection_update(
1111        &self,
1112        id: CatalogItemId,
1113        oid: u32,
1114        schema_id: &SchemaSpecifier,
1115        name: &str,
1116        owner_id: &RoleId,
1117        privileges: Datum,
1118        connection: &Connection,
1119        diff: Diff,
1120    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1121        let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1122            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1123            .into_element()
1124            .ast;
1125        let mut updates = vec![BuiltinTableUpdate::row(
1126            &*MZ_CONNECTIONS,
1127            Row::pack_slice(&[
1128                Datum::String(&id.to_string()),
1129                Datum::UInt32(oid),
1130                Datum::String(&schema_id.to_string()),
1131                Datum::String(name),
1132                Datum::String(match connection.details {
1133                    ConnectionDetails::Kafka { .. } => "kafka",
1134                    ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1135                    ConnectionDetails::Postgres { .. } => "postgres",
1136                    ConnectionDetails::Aws(..) => "aws",
1137                    ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1138                    ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1139                    ConnectionDetails::MySql { .. } => "mysql",
1140                    ConnectionDetails::SqlServer(_) => "sql-server",
1141                }),
1142                Datum::String(&owner_id.to_string()),
1143                privileges,
1144                Datum::String(&connection.create_sql),
1145                Datum::String(&create_stmt.to_ast_string_redacted()),
1146            ]),
1147            diff,
1148        )];
1149        match connection.details {
1150            ConnectionDetails::Kafka(ref kafka) => {
1151                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1152            }
1153            ConnectionDetails::Aws(ref aws_config) => {
1154                match self.pack_aws_connection_update(id, aws_config, diff) {
1155                    Ok(update) => {
1156                        updates.push(update);
1157                    }
1158                    Err(e) => {
1159                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1160                    }
1161                }
1162            }
1163            ConnectionDetails::AwsPrivatelink(_) => {
1164                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1165                    updates.push(self.pack_aws_privatelink_connection_update(
1166                        id,
1167                        aws_principal_context,
1168                        diff,
1169                    ));
1170                } else {
1171                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1172                }
1173            }
1174            ConnectionDetails::Ssh {
1175                ref key_1,
1176                ref key_2,
1177                ..
1178            } => {
1179                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1180            }
1181            ConnectionDetails::Csr(_)
1182            | ConnectionDetails::Postgres(_)
1183            | ConnectionDetails::MySql(_)
1184            | ConnectionDetails::SqlServer(_) => (),
1185        };
1186        updates
1187    }
1188
1189    pub(crate) fn pack_ssh_tunnel_connection_update(
1190        &self,
1191        id: CatalogItemId,
1192        key_1: &SshKey,
1193        key_2: &SshKey,
1194        diff: Diff,
1195    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1196        BuiltinTableUpdate::row(
1197            &*MZ_SSH_TUNNEL_CONNECTIONS,
1198            Row::pack_slice(&[
1199                Datum::String(&id.to_string()),
1200                Datum::String(key_1.public_key().as_str()),
1201                Datum::String(key_2.public_key().as_str()),
1202            ]),
1203            diff,
1204        )
1205    }
1206
1207    fn pack_kafka_connection_update(
1208        &self,
1209        id: CatalogItemId,
1210        kafka: &KafkaConnection<ReferencedConnection>,
1211        diff: Diff,
1212    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1213        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1214        let mut row = Row::default();
1215        row.packer()
1216            .try_push_array(
1217                &[ArrayDimension {
1218                    lower_bound: 1,
1219                    length: kafka.brokers.len(),
1220                }],
1221                kafka
1222                    .brokers
1223                    .iter()
1224                    .map(|broker| Datum::String(&broker.address)),
1225            )
1226            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1227        let brokers = row.unpack_first();
1228        vec![BuiltinTableUpdate::row(
1229            &*MZ_KAFKA_CONNECTIONS,
1230            Row::pack_slice(&[
1231                Datum::String(&id.to_string()),
1232                brokers,
1233                Datum::String(&progress_topic),
1234            ]),
1235            diff,
1236        )]
1237    }
1238
1239    pub fn pack_aws_privatelink_connection_update(
1240        &self,
1241        connection_id: CatalogItemId,
1242        aws_principal_context: &AwsPrincipalContext,
1243        diff: Diff,
1244    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1245        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1246        let row = Row::pack_slice(&[
1247            Datum::String(&connection_id.to_string()),
1248            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1249        ]);
1250        BuiltinTableUpdate::row(id, row, diff)
1251    }
1252
1253    pub fn pack_aws_connection_update(
1254        &self,
1255        connection_id: CatalogItemId,
1256        aws_config: &AwsConnection,
1257        diff: Diff,
1258    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1259        let id = &MZ_AWS_CONNECTIONS;
1260
1261        let mut access_key_id = None;
1262        let mut access_key_id_secret_id = None;
1263        let mut secret_access_key_secret_id = None;
1264        let mut session_token = None;
1265        let mut session_token_secret_id = None;
1266        let mut assume_role_arn = None;
1267        let mut assume_role_session_name = None;
1268        let mut principal = None;
1269        let mut external_id = None;
1270        let mut example_trust_policy = None;
1271        match &aws_config.auth {
1272            AwsAuth::Credentials(credentials) => {
1273                match &credentials.access_key_id {
1274                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1275                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1276                }
1277                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1278                match credentials.session_token.as_ref() {
1279                    None => (),
1280                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1281                    Some(StringOrSecret::Secret(s)) => {
1282                        session_token_secret_id = Some(s.to_string())
1283                    }
1284                }
1285            }
1286            AwsAuth::AssumeRole(assume_role) => {
1287                assume_role_arn = Some(assume_role.arn.as_str());
1288                assume_role_session_name = assume_role.session_name.as_deref();
1289                principal = self
1290                    .config
1291                    .connection_context
1292                    .aws_connection_role_arn
1293                    .as_deref();
1294                external_id =
1295                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1296                example_trust_policy = {
1297                    let policy = assume_role
1298                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1299                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1300                    Some(policy.into_row())
1301                };
1302            }
1303        }
1304
1305        let row = Row::pack_slice(&[
1306            Datum::String(&connection_id.to_string()),
1307            Datum::from(aws_config.endpoint.as_deref()),
1308            Datum::from(aws_config.region.as_deref()),
1309            Datum::from(access_key_id),
1310            Datum::from(access_key_id_secret_id.as_deref()),
1311            Datum::from(secret_access_key_secret_id.as_deref()),
1312            Datum::from(session_token),
1313            Datum::from(session_token_secret_id.as_deref()),
1314            Datum::from(assume_role_arn),
1315            Datum::from(assume_role_session_name),
1316            Datum::from(principal),
1317            Datum::from(external_id.as_deref()),
1318            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1319        ]);
1320
1321        Ok(BuiltinTableUpdate::row(id, row, diff))
1322    }
1323
1324    fn pack_view_update(
1325        &self,
1326        id: CatalogItemId,
1327        oid: u32,
1328        schema_id: &SchemaSpecifier,
1329        name: &str,
1330        owner_id: &RoleId,
1331        privileges: Datum,
1332        view: &View,
1333        diff: Diff,
1334    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1335        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1336            .unwrap_or_else(|e| {
1337                panic!(
1338                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1339                    view.create_sql, e
1340                )
1341            })
1342            .into_element()
1343            .ast;
1344        let query = match &create_stmt {
1345            Statement::CreateView(stmt) => &stmt.definition.query,
1346            _ => unreachable!(),
1347        };
1348
1349        let mut query_string = query.to_ast_string_stable();
1350        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1351        // do the same for compatibility's sake.
1352        query_string.push(';');
1353
1354        vec![BuiltinTableUpdate::row(
1355            &*MZ_VIEWS,
1356            Row::pack_slice(&[
1357                Datum::String(&id.to_string()),
1358                Datum::UInt32(oid),
1359                Datum::String(&schema_id.to_string()),
1360                Datum::String(name),
1361                Datum::String(&query_string),
1362                Datum::String(&owner_id.to_string()),
1363                privileges,
1364                Datum::String(&view.create_sql),
1365                Datum::String(&create_stmt.to_ast_string_redacted()),
1366            ]),
1367            diff,
1368        )]
1369    }
1370
1371    fn pack_materialized_view_update(
1372        &self,
1373        id: CatalogItemId,
1374        oid: u32,
1375        schema_id: &SchemaSpecifier,
1376        name: &str,
1377        owner_id: &RoleId,
1378        privileges: Datum,
1379        mview: &MaterializedView,
1380        diff: Diff,
1381    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1382        let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1383            .unwrap_or_else(|e| {
1384                panic!(
1385                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1386                    mview.create_sql, e
1387                )
1388            })
1389            .into_element()
1390            .ast;
1391        let query_string = match &create_stmt {
1392            Statement::CreateMaterializedView(stmt) => {
1393                let mut query_string = stmt.query.to_ast_string_stable();
1394                // PostgreSQL appends a semicolon in `pg_matviews.definition`, we
1395                // do the same for compatibility's sake.
1396                query_string.push(';');
1397                query_string
1398            }
1399            _ => unreachable!(),
1400        };
1401
1402        let mut updates = Vec::new();
1403
1404        updates.push(BuiltinTableUpdate::row(
1405            &*MZ_MATERIALIZED_VIEWS,
1406            Row::pack_slice(&[
1407                Datum::String(&id.to_string()),
1408                Datum::UInt32(oid),
1409                Datum::String(&schema_id.to_string()),
1410                Datum::String(name),
1411                Datum::String(&mview.cluster_id.to_string()),
1412                Datum::String(&query_string),
1413                Datum::String(&owner_id.to_string()),
1414                privileges,
1415                Datum::String(&mview.create_sql),
1416                Datum::String(&create_stmt.to_ast_string_redacted()),
1417            ]),
1418            diff,
1419        ));
1420
1421        if let Some(refresh_schedule) = &mview.refresh_schedule {
1422            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1423            // empty `RefreshSchedule`.
1424            assert!(!refresh_schedule.is_empty());
1425            for RefreshEvery {
1426                interval,
1427                aligned_to,
1428            } in refresh_schedule.everies.iter()
1429            {
1430                let aligned_to_dt = mz_ore::now::to_datetime(
1431                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1432                );
1433                updates.push(BuiltinTableUpdate::row(
1434                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1435                    Row::pack_slice(&[
1436                        Datum::String(&id.to_string()),
1437                        Datum::String("every"),
1438                        Datum::Interval(
1439                            Interval::from_duration(interval).expect(
1440                                "planning ensured that this is convertible back to Interval",
1441                            ),
1442                        ),
1443                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1444                        Datum::Null,
1445                    ]),
1446                    diff,
1447                ));
1448            }
1449            for at in refresh_schedule.ats.iter() {
1450                let at_dt = mz_ore::now::to_datetime(
1451                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1452                );
1453                updates.push(BuiltinTableUpdate::row(
1454                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1455                    Row::pack_slice(&[
1456                        Datum::String(&id.to_string()),
1457                        Datum::String("at"),
1458                        Datum::Null,
1459                        Datum::Null,
1460                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1461                    ]),
1462                    diff,
1463                ));
1464            }
1465        } else {
1466            updates.push(BuiltinTableUpdate::row(
1467                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1468                Row::pack_slice(&[
1469                    Datum::String(&id.to_string()),
1470                    Datum::String("on-commit"),
1471                    Datum::Null,
1472                    Datum::Null,
1473                    Datum::Null,
1474                ]),
1475                diff,
1476            ));
1477        }
1478
1479        updates
1480    }
1481
1482    fn pack_continual_task_update(
1483        &self,
1484        id: CatalogItemId,
1485        oid: u32,
1486        schema_id: &SchemaSpecifier,
1487        name: &str,
1488        owner_id: &RoleId,
1489        privileges: Datum,
1490        ct: &ContinualTask,
1491        diff: Diff,
1492    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1493        let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1494            .unwrap_or_else(|e| {
1495                panic!(
1496                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1497                    ct.create_sql, e
1498                )
1499            })
1500            .into_element()
1501            .ast;
1502        let query_string = match &create_stmt {
1503            Statement::CreateContinualTask(stmt) => {
1504                let mut query_string = String::new();
1505                for stmt in &stmt.stmts {
1506                    let s = match stmt {
1507                        ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1508                        ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1509                    };
1510                    if query_string.is_empty() {
1511                        query_string = s;
1512                    } else {
1513                        query_string.push_str("; ");
1514                        query_string.push_str(&s);
1515                    }
1516                }
1517                query_string
1518            }
1519            _ => unreachable!(),
1520        };
1521
1522        vec![BuiltinTableUpdate::row(
1523            &*MZ_CONTINUAL_TASKS,
1524            Row::pack_slice(&[
1525                Datum::String(&id.to_string()),
1526                Datum::UInt32(oid),
1527                Datum::String(&schema_id.to_string()),
1528                Datum::String(name),
1529                Datum::String(&ct.cluster_id.to_string()),
1530                Datum::String(&query_string),
1531                Datum::String(&owner_id.to_string()),
1532                privileges,
1533                Datum::String(&ct.create_sql),
1534                Datum::String(&create_stmt.to_ast_string_redacted()),
1535            ]),
1536            diff,
1537        )]
1538    }
1539
1540    fn pack_sink_update(
1541        &self,
1542        id: CatalogItemId,
1543        oid: u32,
1544        schema_id: &SchemaSpecifier,
1545        name: &str,
1546        owner_id: &RoleId,
1547        sink: &Sink,
1548        diff: Diff,
1549    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1550        let mut updates = vec![];
1551        match &sink.connection {
1552            StorageSinkConnection::Kafka(KafkaSinkConnection {
1553                topic: topic_name, ..
1554            }) => {
1555                updates.push(BuiltinTableUpdate::row(
1556                    &*MZ_KAFKA_SINKS,
1557                    Row::pack_slice(&[
1558                        Datum::String(&id.to_string()),
1559                        Datum::String(topic_name.as_str()),
1560                    ]),
1561                    diff,
1562                ));
1563            }
1564        };
1565
1566        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1567            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1568            .into_element()
1569            .ast;
1570
1571        let envelope = sink.envelope();
1572
1573        // The combined format string is used for the deprecated `format` column.
1574        let combined_format = sink.combined_format();
1575        let (key_format, value_format) = sink.formats();
1576
1577        updates.push(BuiltinTableUpdate::row(
1578            &*MZ_SINKS,
1579            Row::pack_slice(&[
1580                Datum::String(&id.to_string()),
1581                Datum::UInt32(oid),
1582                Datum::String(&schema_id.to_string()),
1583                Datum::String(name),
1584                Datum::String(sink.connection.name()),
1585                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1586                // size column now deprecated w/o linked clusters
1587                Datum::Null,
1588                Datum::from(envelope),
1589                Datum::from(combined_format.as_ref()),
1590                Datum::from(key_format),
1591                Datum::String(value_format),
1592                Datum::String(&sink.cluster_id.to_string()),
1593                Datum::String(&owner_id.to_string()),
1594                Datum::String(&sink.create_sql),
1595                Datum::String(&create_stmt.to_ast_string_redacted()),
1596            ]),
1597            diff,
1598        ));
1599
1600        updates
1601    }
1602
1603    fn pack_index_update(
1604        &self,
1605        id: CatalogItemId,
1606        oid: u32,
1607        name: &str,
1608        owner_id: &RoleId,
1609        index: &Index,
1610        diff: Diff,
1611    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1612        let mut updates = vec![];
1613
1614        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1615            .unwrap_or_else(|e| {
1616                panic!(
1617                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1618                    index.create_sql, e
1619                )
1620            })
1621            .into_element()
1622            .ast;
1623
1624        let key_sqls = match &create_stmt {
1625            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1626                .as_ref()
1627                .expect("key_parts is filled in during planning"),
1628            _ => unreachable!(),
1629        };
1630        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1631
1632        updates.push(BuiltinTableUpdate::row(
1633            &*MZ_INDEXES,
1634            Row::pack_slice(&[
1635                Datum::String(&id.to_string()),
1636                Datum::UInt32(oid),
1637                Datum::String(name),
1638                Datum::String(&on_item_id.to_string()),
1639                Datum::String(&index.cluster_id.to_string()),
1640                Datum::String(&owner_id.to_string()),
1641                Datum::String(&index.create_sql),
1642                Datum::String(&create_stmt.to_ast_string_redacted()),
1643            ]),
1644            diff,
1645        ));
1646
1647        for (i, key) in index.keys.iter().enumerate() {
1648            let on_entry = self.get_entry_by_global_id(&index.on);
1649            let nullable = key
1650                .typ(
1651                    &on_entry
1652                        .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1653                        .expect("can only create indexes on items with a valid description")
1654                        .typ()
1655                        .column_types,
1656                )
1657                .nullable;
1658            let seq_in_index = u64::cast_from(i + 1);
1659            let key_sql = key_sqls
1660                .get(i)
1661                .expect("missing sql information for index key")
1662                .to_ast_string_simple();
1663            let (field_number, expression) = match key {
1664                MirScalarExpr::Column(col) => {
1665                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1666                }
1667                _ => (Datum::Null, Datum::String(&key_sql)),
1668            };
1669            updates.push(BuiltinTableUpdate::row(
1670                &*MZ_INDEX_COLUMNS,
1671                Row::pack_slice(&[
1672                    Datum::String(&id.to_string()),
1673                    Datum::UInt64(seq_in_index),
1674                    field_number,
1675                    expression,
1676                    Datum::from(nullable),
1677                ]),
1678                diff,
1679            ));
1680        }
1681
1682        updates
1683    }
1684
1685    fn pack_type_update(
1686        &self,
1687        id: CatalogItemId,
1688        oid: u32,
1689        schema_id: &SchemaSpecifier,
1690        name: &str,
1691        owner_id: &RoleId,
1692        privileges: Datum,
1693        typ: &Type,
1694        diff: Diff,
1695    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1696        let mut out = vec![];
1697
1698        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1699            mz_sql::parse::parse(create_sql)
1700                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1701                .into_element()
1702                .ast
1703                .to_ast_string_redacted()
1704        });
1705
1706        out.push(BuiltinTableUpdate::row(
1707            &*MZ_TYPES,
1708            Row::pack_slice(&[
1709                Datum::String(&id.to_string()),
1710                Datum::UInt32(oid),
1711                Datum::String(&schema_id.to_string()),
1712                Datum::String(name),
1713                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1714                Datum::String(&owner_id.to_string()),
1715                privileges,
1716                if let Some(create_sql) = &typ.create_sql {
1717                    Datum::String(create_sql)
1718                } else {
1719                    Datum::Null
1720                },
1721                if let Some(redacted) = &redacted {
1722                    Datum::String(redacted)
1723                } else {
1724                    Datum::Null
1725                },
1726            ]),
1727            diff,
1728        ));
1729
1730        let mut row = Row::default();
1731        let mut packer = row.packer();
1732
1733        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1734            if mods.is_empty() {
1735                packer.push(Datum::Null);
1736            } else {
1737                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1738            }
1739        }
1740
1741        let index_id = match &typ.details.typ {
1742            CatalogType::Array {
1743                element_reference: element_id,
1744            } => {
1745                packer.push(Datum::String(&id.to_string()));
1746                packer.push(Datum::String(&element_id.to_string()));
1747                &MZ_ARRAY_TYPES
1748            }
1749            CatalogType::List {
1750                element_reference: element_id,
1751                element_modifiers,
1752            } => {
1753                packer.push(Datum::String(&id.to_string()));
1754                packer.push(Datum::String(&element_id.to_string()));
1755                append_modifier(&mut packer, element_modifiers);
1756                &MZ_LIST_TYPES
1757            }
1758            CatalogType::Map {
1759                key_reference: key_id,
1760                value_reference: value_id,
1761                key_modifiers,
1762                value_modifiers,
1763            } => {
1764                packer.push(Datum::String(&id.to_string()));
1765                packer.push(Datum::String(&key_id.to_string()));
1766                packer.push(Datum::String(&value_id.to_string()));
1767                append_modifier(&mut packer, key_modifiers);
1768                append_modifier(&mut packer, value_modifiers);
1769                &MZ_MAP_TYPES
1770            }
1771            CatalogType::Pseudo => {
1772                packer.push(Datum::String(&id.to_string()));
1773                &MZ_PSEUDO_TYPES
1774            }
1775            _ => {
1776                packer.push(Datum::String(&id.to_string()));
1777                &MZ_BASE_TYPES
1778            }
1779        };
1780        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1781
1782        if let Some(pg_metadata) = &typ.details.pg_metadata {
1783            out.push(BuiltinTableUpdate::row(
1784                &*MZ_TYPE_PG_METADATA,
1785                Row::pack_slice(&[
1786                    Datum::String(&id.to_string()),
1787                    Datum::UInt32(pg_metadata.typinput_oid),
1788                    Datum::UInt32(pg_metadata.typreceive_oid),
1789                ]),
1790                diff,
1791            ));
1792        }
1793
1794        out
1795    }
1796
1797    fn pack_func_update(
1798        &self,
1799        id: CatalogItemId,
1800        schema_id: &SchemaSpecifier,
1801        name: &str,
1802        owner_id: &RoleId,
1803        func: &Func,
1804        diff: Diff,
1805    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1806        let mut updates = vec![];
1807        for func_impl_details in func.inner.func_impls() {
1808            let arg_type_ids = func_impl_details
1809                .arg_typs
1810                .iter()
1811                .map(|typ| self.get_system_type(typ).id().to_string())
1812                .collect::<Vec<_>>();
1813
1814            let mut row = Row::default();
1815            row.packer()
1816                .try_push_array(
1817                    &[ArrayDimension {
1818                        lower_bound: 1,
1819                        length: arg_type_ids.len(),
1820                    }],
1821                    arg_type_ids.iter().map(|id| Datum::String(id)),
1822                )
1823                .expect(
1824                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1825                );
1826            let arg_type_ids = row.unpack_first();
1827
1828            updates.push(BuiltinTableUpdate::row(
1829                &*MZ_FUNCTIONS,
1830                Row::pack_slice(&[
1831                    Datum::String(&id.to_string()),
1832                    Datum::UInt32(func_impl_details.oid),
1833                    Datum::String(&schema_id.to_string()),
1834                    Datum::String(name),
1835                    arg_type_ids,
1836                    Datum::from(
1837                        func_impl_details
1838                            .variadic_typ
1839                            .map(|typ| self.get_system_type(typ).id().to_string())
1840                            .as_deref(),
1841                    ),
1842                    Datum::from(
1843                        func_impl_details
1844                            .return_typ
1845                            .map(|typ| self.get_system_type(typ).id().to_string())
1846                            .as_deref(),
1847                    ),
1848                    func_impl_details.return_is_set.into(),
1849                    Datum::String(&owner_id.to_string()),
1850                ]),
1851                diff,
1852            ));
1853
1854            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1855                updates.push(BuiltinTableUpdate::row(
1856                    &*MZ_AGGREGATES,
1857                    Row::pack_slice(&[
1858                        Datum::UInt32(func_impl_details.oid),
1859                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1860                        Datum::String("n"),
1861                        Datum::Int16(0),
1862                    ]),
1863                    diff,
1864                ));
1865            }
1866        }
1867        updates
1868    }
1869
1870    pub fn pack_op_update(
1871        &self,
1872        operator: &str,
1873        func_impl_details: FuncImplCatalogDetails,
1874        diff: Diff,
1875    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1876        let arg_type_ids = func_impl_details
1877            .arg_typs
1878            .iter()
1879            .map(|typ| self.get_system_type(typ).id().to_string())
1880            .collect::<Vec<_>>();
1881
1882        let mut row = Row::default();
1883        row.packer()
1884            .try_push_array(
1885                &[ArrayDimension {
1886                    lower_bound: 1,
1887                    length: arg_type_ids.len(),
1888                }],
1889                arg_type_ids.iter().map(|id| Datum::String(id)),
1890            )
1891            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1892        let arg_type_ids = row.unpack_first();
1893
1894        BuiltinTableUpdate::row(
1895            &*MZ_OPERATORS,
1896            Row::pack_slice(&[
1897                Datum::UInt32(func_impl_details.oid),
1898                Datum::String(operator),
1899                arg_type_ids,
1900                Datum::from(
1901                    func_impl_details
1902                        .return_typ
1903                        .map(|typ| self.get_system_type(typ).id().to_string())
1904                        .as_deref(),
1905                ),
1906            ]),
1907            diff,
1908        )
1909    }
1910
1911    fn pack_secret_update(
1912        &self,
1913        id: CatalogItemId,
1914        oid: u32,
1915        schema_id: &SchemaSpecifier,
1916        name: &str,
1917        owner_id: &RoleId,
1918        privileges: Datum,
1919        diff: Diff,
1920    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1921        vec![BuiltinTableUpdate::row(
1922            &*MZ_SECRETS,
1923            Row::pack_slice(&[
1924                Datum::String(&id.to_string()),
1925                Datum::UInt32(oid),
1926                Datum::String(&schema_id.to_string()),
1927                Datum::String(name),
1928                Datum::String(&owner_id.to_string()),
1929                privileges,
1930            ]),
1931            diff,
1932        )]
1933    }
1934
1935    pub fn pack_audit_log_update(
1936        &self,
1937        event: &VersionedEvent,
1938        diff: Diff,
1939    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1940        let (event_type, object_type, details, user, occurred_at): (
1941            &EventType,
1942            &ObjectType,
1943            &EventDetails,
1944            &Option<String>,
1945            u64,
1946        ) = match event {
1947            VersionedEvent::V1(ev) => (
1948                &ev.event_type,
1949                &ev.object_type,
1950                &ev.details,
1951                &ev.user,
1952                ev.occurred_at,
1953            ),
1954        };
1955        let details = Jsonb::from_serde_json(details.as_json())
1956            .map_err(|e| {
1957                Error::new(ErrorKind::Unstructured(format!(
1958                    "could not pack audit log update: {}",
1959                    e
1960                )))
1961            })?
1962            .into_row();
1963        let details = details
1964            .iter()
1965            .next()
1966            .expect("details created above with a single jsonb column");
1967        let dt = mz_ore::now::to_datetime(occurred_at);
1968        let id = event.sortable_id();
1969        Ok(BuiltinTableUpdate::row(
1970            &*MZ_AUDIT_EVENTS,
1971            Row::pack_slice(&[
1972                Datum::UInt64(id),
1973                Datum::String(&format!("{}", event_type)),
1974                Datum::String(&format!("{}", object_type)),
1975                details,
1976                match user {
1977                    Some(user) => Datum::String(user),
1978                    None => Datum::Null,
1979                },
1980                Datum::TimestampTz(dt.try_into().expect("must fit")),
1981            ]),
1982            diff,
1983        ))
1984    }
1985
1986    pub fn pack_storage_usage_update(
1987        &self,
1988        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1989        diff: Diff,
1990    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1991        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1992        let row = Row::pack_slice(&[
1993            Datum::UInt64(event.id),
1994            Datum::from(event.shard_id.as_deref()),
1995            Datum::UInt64(event.size_bytes),
1996            Datum::TimestampTz(
1997                mz_ore::now::to_datetime(event.collection_timestamp)
1998                    .try_into()
1999                    .expect("must fit"),
2000            ),
2001        ]);
2002        BuiltinTableUpdate::row(id, row, diff)
2003    }
2004
2005    pub fn pack_egress_ip_update(
2006        &self,
2007        ip: &IpNet,
2008    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2009        let id = &MZ_EGRESS_IPS;
2010        let addr = ip.addr();
2011        let row = Row::pack_slice(&[
2012            Datum::String(&addr.to_string()),
2013            Datum::Int32(ip.prefix_len().into()),
2014            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2015        ]);
2016        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2017    }
2018
2019    pub fn pack_replica_metric_updates(
2020        &self,
2021        replica_id: ReplicaId,
2022        updates: &[ServiceProcessMetrics],
2023        diff: Diff,
2024    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2025        let id = &*MZ_CLUSTER_REPLICA_METRICS;
2026        let rows = updates.iter().enumerate().map(
2027            |(
2028                process_id,
2029                ServiceProcessMetrics {
2030                    cpu_nano_cores,
2031                    memory_bytes,
2032                    disk_usage_bytes,
2033                },
2034            )| {
2035                Row::pack_slice(&[
2036                    Datum::String(&replica_id.to_string()),
2037                    u64::cast_from(process_id).into(),
2038                    (*cpu_nano_cores).into(),
2039                    (*memory_bytes).into(),
2040                    (*disk_usage_bytes).into(),
2041                ])
2042            },
2043        );
2044        let updates = rows
2045            .map(|row| BuiltinTableUpdate::row(id, row, diff))
2046            .collect();
2047        updates
2048    }
2049
2050    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2051        let id = &*MZ_CLUSTER_REPLICA_SIZES;
2052        let updates = self
2053            .cluster_replica_sizes
2054            .0
2055            .iter()
2056            .filter(|(_, a)| !a.disabled)
2057            .map(
2058                |(
2059                    size,
2060                    ReplicaAllocation {
2061                        memory_limit,
2062                        cpu_limit,
2063                        disk_limit,
2064                        scale,
2065                        workers,
2066                        credits_per_hour,
2067                        cpu_exclusive: _,
2068                        is_cc: _,
2069                        disabled: _,
2070                        selectors: _,
2071                    },
2072                )| {
2073                    // Just invent something when the limits are `None`,
2074                    // which only happens in non-prod environments (tests, process orchestrator, etc.)
2075                    let cpu_limit = cpu_limit.unwrap_or(CpuLimit::MAX);
2076                    let MemoryLimit(ByteSize(memory_bytes)) =
2077                        (*memory_limit).unwrap_or(MemoryLimit::MAX);
2078                    let DiskLimit(ByteSize(disk_bytes)) =
2079                        (*disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2080                    let row = Row::pack_slice(&[
2081                        size.as_str().into(),
2082                        u64::from(*scale).into(),
2083                        u64::cast_from(*workers).into(),
2084                        cpu_limit.as_nanocpus().into(),
2085                        memory_bytes.into(),
2086                        // TODO(guswynn): disk size will be filled in later.
2087                        disk_bytes.into(),
2088                        (*credits_per_hour).into(),
2089                    ]);
2090                    BuiltinTableUpdate::row(id, row, Diff::ONE)
2091                },
2092            )
2093            .collect();
2094        updates
2095    }
2096
2097    pub fn pack_subscribe_update(
2098        &self,
2099        id: GlobalId,
2100        subscribe: &ActiveSubscribe,
2101        diff: Diff,
2102    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2103        let mut row = Row::default();
2104        let mut packer = row.packer();
2105        packer.push(Datum::String(&id.to_string()));
2106        packer.push(Datum::Uuid(subscribe.session_uuid));
2107        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2108
2109        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2110        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2111
2112        let depends_on: Vec<_> = subscribe
2113            .depends_on
2114            .iter()
2115            .map(|id| id.to_string())
2116            .collect();
2117        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2118
2119        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2120    }
2121
2122    pub fn pack_session_update(
2123        &self,
2124        conn: &ConnMeta,
2125        diff: Diff,
2126    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2127        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2128        BuiltinTableUpdate::row(
2129            &*MZ_SESSIONS,
2130            Row::pack_slice(&[
2131                Datum::Uuid(conn.uuid()),
2132                Datum::UInt32(conn.conn_id().unhandled()),
2133                Datum::String(&conn.authenticated_role_id().to_string()),
2134                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2135                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2136            ]),
2137            diff,
2138        )
2139    }
2140
2141    pub fn pack_default_privileges_update(
2142        &self,
2143        default_privilege_object: &DefaultPrivilegeObject,
2144        grantee: &RoleId,
2145        acl_mode: &AclMode,
2146        diff: Diff,
2147    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2148        BuiltinTableUpdate::row(
2149            &*MZ_DEFAULT_PRIVILEGES,
2150            Row::pack_slice(&[
2151                default_privilege_object.role_id.to_string().as_str().into(),
2152                default_privilege_object
2153                    .database_id
2154                    .map(|database_id| database_id.to_string())
2155                    .as_deref()
2156                    .into(),
2157                default_privilege_object
2158                    .schema_id
2159                    .map(|schema_id| schema_id.to_string())
2160                    .as_deref()
2161                    .into(),
2162                default_privilege_object
2163                    .object_type
2164                    .to_string()
2165                    .to_lowercase()
2166                    .as_str()
2167                    .into(),
2168                grantee.to_string().as_str().into(),
2169                acl_mode.to_string().as_str().into(),
2170            ]),
2171            diff,
2172        )
2173    }
2174
2175    pub fn pack_system_privileges_update(
2176        &self,
2177        privileges: MzAclItem,
2178        diff: Diff,
2179    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2180        BuiltinTableUpdate::row(
2181            &*MZ_SYSTEM_PRIVILEGES,
2182            Row::pack_slice(&[privileges.into()]),
2183            diff,
2184        )
2185    }
2186
2187    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2188        let mut row = Row::default();
2189        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2190        row.packer()
2191            .try_push_array(
2192                &[ArrayDimension {
2193                    lower_bound: 1,
2194                    length: flat_privileges.len(),
2195                }],
2196                flat_privileges
2197                    .into_iter()
2198                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2199            )
2200            .expect("privileges is 1 dimensional, and its length is used for the array length");
2201        row
2202    }
2203
2204    pub fn pack_comment_update(
2205        &self,
2206        object_id: CommentObjectId,
2207        column_pos: Option<usize>,
2208        comment: &str,
2209        diff: Diff,
2210    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2211        // Use the audit log representation so it's easier to join against.
2212        let object_type = mz_sql::catalog::ObjectType::from(object_id);
2213        let audit_type = super::object_type_to_audit_object_type(object_type);
2214        let object_type_str = audit_type.to_string();
2215
2216        let object_id_str = match object_id {
2217            CommentObjectId::Table(global_id)
2218            | CommentObjectId::View(global_id)
2219            | CommentObjectId::MaterializedView(global_id)
2220            | CommentObjectId::Source(global_id)
2221            | CommentObjectId::Sink(global_id)
2222            | CommentObjectId::Index(global_id)
2223            | CommentObjectId::Func(global_id)
2224            | CommentObjectId::Connection(global_id)
2225            | CommentObjectId::Secret(global_id)
2226            | CommentObjectId::Type(global_id)
2227            | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2228            CommentObjectId::Role(role_id) => role_id.to_string(),
2229            CommentObjectId::Database(database_id) => database_id.to_string(),
2230            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2231            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2232            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2233            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2234        };
2235        let column_pos_datum = match column_pos {
2236            Some(pos) => {
2237                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
2238                let pos =
2239                    i32::try_from(pos).expect("we constrain this value in the planning layer");
2240                Datum::Int32(pos)
2241            }
2242            None => Datum::Null,
2243        };
2244
2245        BuiltinTableUpdate::row(
2246            &*MZ_COMMENTS,
2247            Row::pack_slice(&[
2248                Datum::String(&object_id_str),
2249                Datum::String(&object_type_str),
2250                column_pos_datum,
2251                Datum::String(comment),
2252            ]),
2253            diff,
2254        )
2255    }
2256
2257    pub fn pack_webhook_source_update(
2258        &self,
2259        item_id: CatalogItemId,
2260        diff: Diff,
2261    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2262        let url = self
2263            .try_get_webhook_url(&item_id)
2264            .expect("webhook source should exist");
2265        let url = url.to_string();
2266        let name = &self.get_entry(&item_id).name().item;
2267        let id_str = item_id.to_string();
2268
2269        BuiltinTableUpdate::row(
2270            &*MZ_WEBHOOKS_SOURCES,
2271            Row::pack_slice(&[
2272                Datum::String(&id_str),
2273                Datum::String(name),
2274                Datum::String(&url),
2275            ]),
2276            diff,
2277        )
2278    }
2279
2280    pub fn pack_source_references_update(
2281        &self,
2282        source_references: &SourceReferences,
2283        diff: Diff,
2284    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2285        let source_id = source_references.source_id.to_string();
2286        let updated_at = &source_references.updated_at;
2287        source_references
2288            .references
2289            .iter()
2290            .map(|reference| {
2291                let mut row = Row::default();
2292                let mut packer = row.packer();
2293                packer.extend([
2294                    Datum::String(&source_id),
2295                    reference
2296                        .namespace
2297                        .as_ref()
2298                        .map(|s| Datum::String(s))
2299                        .unwrap_or(Datum::Null),
2300                    Datum::String(&reference.name),
2301                    Datum::TimestampTz(
2302                        mz_ore::now::to_datetime(*updated_at)
2303                            .try_into()
2304                            .expect("must fit"),
2305                    ),
2306                ]);
2307                if reference.columns.len() > 0 {
2308                    packer
2309                        .try_push_array(
2310                            &[ArrayDimension {
2311                                lower_bound: 1,
2312                                length: reference.columns.len(),
2313                            }],
2314                            reference.columns.iter().map(|col| Datum::String(col)),
2315                        )
2316                        .expect(
2317                            "columns is 1 dimensional, and its length is used for the array length",
2318                        );
2319                } else {
2320                    packer.push(Datum::Null);
2321                }
2322
2323                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2324            })
2325            .collect()
2326    }
2327}