Skip to main content

mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20    MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_DEFAULT_PRIVILEGES,
21    MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS,
22    MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
23    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25    MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26    MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27    MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
28    MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
29    MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35    CatalogEntry, CatalogItem, ClusterVariant, Connection, DataSourceDesc, Func, Index,
36    MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55    CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76// DO NOT add any more imports from `crate` outside of `crate::catalog`.
77use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81/// An update to a built-in table.
82#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84    /// The reference of the table to update.
85    pub id: T,
86    /// The data to put into the table.
87    pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
92    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93        BuiltinTableUpdate {
94            id,
95            data: TableData::Rows(vec![(row, diff)]),
96        }
97    }
98
99    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100        BuiltinTableUpdate {
101            id,
102            data: TableData::Batches(smallvec![batch]),
103        }
104    }
105}
106
107impl CatalogState {
108    pub fn resolve_builtin_table_updates(
109        &self,
110        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112        builtin_table_update
113            .into_iter()
114            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115            .collect()
116    }
117
118    pub fn resolve_builtin_table_update(
119        &self,
120        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121    ) -> BuiltinTableUpdate<CatalogItemId> {
122        let id = self.resolve_builtin_table(id);
123        BuiltinTableUpdate { id, data }
124    }
125
126    pub fn pack_depends_update(
127        &self,
128        depender: CatalogItemId,
129        dependee: CatalogItemId,
130        diff: Diff,
131    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132        let row = Row::pack_slice(&[
133            Datum::String(&depender.to_string()),
134            Datum::String(&dependee.to_string()),
135        ]);
136        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137    }
138
139    pub(super) fn pack_role_auth_update(
140        &self,
141        id: RoleId,
142        diff: Diff,
143    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144        let role_auth = self.get_role_auth(&id);
145        let role = self.get_role(&id);
146        BuiltinTableUpdate::row(
147            &*MZ_ROLE_AUTH,
148            Row::pack_slice(&[
149                Datum::String(&role_auth.role_id.to_string()),
150                Datum::UInt32(role.oid),
151                match &role_auth.password_hash {
152                    Some(hash) => Datum::String(hash),
153                    None => Datum::Null,
154                },
155                Datum::TimestampTz(
156                    mz_ore::now::to_datetime(role_auth.updated_at)
157                        .try_into()
158                        .expect("must fit"),
159                ),
160            ]),
161            diff,
162        )
163    }
164
165    pub(super) fn pack_role_update(
166        &self,
167        id: RoleId,
168        diff: Diff,
169    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170        match id {
171            // PUBLIC role should not show up in mz_roles.
172            RoleId::Public => vec![],
173            id => {
174                let role = self.get_role(&id);
175                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177                let rolcanlogin = if let Some(login) = role.attributes.login {
178                    login
179                } else {
180                    builtin_supers.contains(&role.id)
181                };
182
183                let rolsuper = if let Some(superuser) = role.attributes.superuser {
184                    Datum::from(superuser)
185                } else if builtin_supers.contains(&role.id) {
186                    // System roles (mz_system, mz_support) are superusers
187                    Datum::from(true)
188                } else {
189                    // In cloud environments, superuser status is determined
190                    // at login, so we set `rolsuper` to `null` here because
191                    // it cannot be known beforehand. For self-managed
192                    // auth, roles created without an explicit SUPERUSER
193                    // attribute would typically have `rolsuper` set to false.
194                    // However, since we cannot reliably distinguish between
195                    // cloud and self-managed here, we conservatively use NULL
196                    // for indeterminate cases.
197                    Datum::Null
198                };
199
200                let role_update = BuiltinTableUpdate::row(
201                    &*MZ_ROLES,
202                    Row::pack_slice(&[
203                        Datum::String(&role.id.to_string()),
204                        Datum::UInt32(role.oid),
205                        Datum::String(&role.name),
206                        Datum::from(role.attributes.inherit),
207                        Datum::from(rolcanlogin),
208                        rolsuper,
209                    ]),
210                    diff,
211                );
212                let mut updates = vec![role_update];
213
214                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
215                // we should instead have a static list of all session vars.
216                let session_vars_reference = SessionVars::new_unchecked(
217                    &mz_build_info::DUMMY_BUILD_INFO,
218                    SYSTEM_USER.clone(),
219                    None,
220                );
221
222                for (name, val) in role.vars() {
223                    let result = session_vars_reference
224                        .inspect(name)
225                        .and_then(|var| var.check(val.borrow()));
226                    let Ok(formatted_val) = result else {
227                        // Note: all variables should have been validated by this point, so we
228                        // shouldn't ever hit this.
229                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
230                        continue;
231                    };
232
233                    let role_var_update = BuiltinTableUpdate::row(
234                        &*MZ_ROLE_PARAMETERS,
235                        Row::pack_slice(&[
236                            Datum::String(&role.id.to_string()),
237                            Datum::String(name),
238                            Datum::String(&formatted_val),
239                        ]),
240                        diff,
241                    );
242                    updates.push(role_var_update);
243                }
244
245                updates
246            }
247        }
248    }
249
250    pub(super) fn pack_cluster_update(
251        &self,
252        name: &str,
253        diff: Diff,
254    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255        let id = self.clusters_by_name[name];
256        let cluster = &self.clusters_by_id[&id];
257        let row = self.pack_privilege_array_row(cluster.privileges());
258        let privileges = row.unpack_first();
259        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260            match &cluster.config.variant {
261                ClusterVariant::Managed(config) => (
262                    Some(config.size.as_str()),
263                    Some(self.cluster_replica_size_has_disk(&config.size)),
264                    Some(config.replication_factor),
265                    if config.availability_zones.is_empty() {
266                        None
267                    } else {
268                        Some(config.availability_zones.clone())
269                    },
270                    Some(config.logging.log_logging),
271                    config.logging.interval.map(|d| {
272                        Interval::from_duration(&d)
273                            .expect("planning ensured this convertible back to interval")
274                    }),
275                ),
276                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277            };
278
279        let mut row = Row::default();
280        let mut packer = row.packer();
281        packer.extend([
282            Datum::String(&id.to_string()),
283            Datum::String(name),
284            Datum::String(&cluster.owner_id.to_string()),
285            privileges,
286            cluster.is_managed().into(),
287            size.into(),
288            replication_factor.into(),
289            disk.into(),
290        ]);
291        if let Some(azs) = azs {
292            packer.push_list(azs.iter().map(|az| Datum::String(az)));
293        } else {
294            packer.push(Datum::Null);
295        }
296        packer.push(Datum::from(introspection_debugging));
297        packer.push(Datum::from(introspection_interval));
298
299        let mut updates = Vec::new();
300
301        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304            let row = match managed_config.schedule {
305                ClusterSchedule::Manual => Row::pack_slice(&[
306                    Datum::String(&id.to_string()),
307                    Datum::String("manual"),
308                    Datum::Null,
309                ]),
310                ClusterSchedule::Refresh {
311                    hydration_time_estimate,
312                } => Row::pack_slice(&[
313                    Datum::String(&id.to_string()),
314                    Datum::String("on-refresh"),
315                    Datum::Interval(
316                        Interval::from_duration(&hydration_time_estimate)
317                            .expect("planning ensured that this is convertible back to Interval"),
318                    ),
319                ]),
320            };
321            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322        }
323
324        updates
325    }
326
327    pub(super) fn pack_cluster_replica_update(
328        &self,
329        cluster_id: ClusterId,
330        name: &str,
331        diff: Diff,
332    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333        let cluster = &self.clusters_by_id[&cluster_id];
334        let id = cluster.replica_id(name).expect("Must exist");
335        let replica = cluster.replica(id).expect("Must exist");
336
337        let (size, disk, az) = match &replica.config.location {
338            // TODO(guswynn): The column should be `availability_zones`, not
339            // `availability_zone`.
340            ReplicaLocation::Managed(ManagedReplicaLocation {
341                size,
342                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343                allocation: _,
344                billed_as: _,
345                internal: _,
346                pending: _,
347            }) => (
348                Some(&**size),
349                Some(self.cluster_replica_size_has_disk(size)),
350                Some(az.as_str()),
351            ),
352            ReplicaLocation::Managed(ManagedReplicaLocation {
353                size,
354                availability_zones: _,
355                allocation: _,
356                billed_as: _,
357                internal: _,
358                pending: _,
359            }) => (
360                Some(&**size),
361                Some(self.cluster_replica_size_has_disk(size)),
362                None,
363            ),
364            ReplicaLocation::Unmanaged(_) => (None, None, None),
365        };
366
367        let cluster_replica_update = BuiltinTableUpdate::row(
368            &*MZ_CLUSTER_REPLICAS,
369            Row::pack_slice(&[
370                Datum::String(&id.to_string()),
371                Datum::String(name),
372                Datum::String(&cluster_id.to_string()),
373                Datum::from(size),
374                Datum::from(az),
375                Datum::String(&replica.owner_id.to_string()),
376                Datum::from(disk),
377            ]),
378            diff,
379        );
380
381        vec![cluster_replica_update]
382    }
383
384    pub(super) fn pack_item_update(
385        &self,
386        id: CatalogItemId,
387        diff: Diff,
388    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389        let entry = self.get_entry(&id);
390        let oid = entry.oid();
391        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392        let schema_id = &self
393            .get_schema(
394                &entry.name().qualifiers.database_spec,
395                &entry.name().qualifiers.schema_spec,
396                conn_id,
397            )
398            .id;
399        let name = &entry.name().item;
400        let owner_id = entry.owner_id();
401        let privileges_row = self.pack_privilege_array_row(entry.privileges());
402        let privileges = privileges_row.unpack_first();
403        let mut updates = match entry.item() {
404            CatalogItem::Index(index) => {
405                self.pack_index_update(id, oid, name, owner_id, index, diff)
406            }
407            CatalogItem::Table(table) => {
408                let mut updates = self
409                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
410
411                if let TableDataSource::DataSource {
412                    desc: data_source,
413                    timeline: _,
414                } = &table.data_source
415                {
416                    updates.extend(match data_source {
417                        DataSourceDesc::IngestionExport {
418                            ingestion_id,
419                            external_reference: UnresolvedItemName(external_reference),
420                            details: _,
421                            data_config: _,
422                        } => {
423                            let ingestion_entry = self
424                                .get_entry(ingestion_id)
425                                .source_desc()
426                                .expect("primary source exists")
427                                .expect("primary source is a source");
428
429                            match ingestion_entry.connection.name() {
430                                "postgres" => {
431                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
432                                    // The left-most qualification of Postgres
433                                    // tables is the database, but this
434                                    // information is redundant because each
435                                    // Postgres connection connects to only one
436                                    // database.
437                                    let schema_name = external_reference[1].as_str();
438                                    let table_name = external_reference[2].as_str();
439
440                                    self.pack_postgres_source_tables_update(
441                                        id,
442                                        schema_name,
443                                        table_name,
444                                        diff,
445                                    )
446                                }
447                                "mysql" => {
448                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
449                                    let schema_name = external_reference[0].as_str();
450                                    let table_name = external_reference[1].as_str();
451
452                                    self.pack_mysql_source_tables_update(
453                                        id,
454                                        schema_name,
455                                        table_name,
456                                        diff,
457                                    )
458                                }
459                                "sql-server" => {
460                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
461                                    // The left-most qualification of SQL Server tables is
462                                    // the database, but this information is redundant
463                                    // because each SQL Server connection connects to
464                                    // only one database.
465                                    let schema_name = external_reference[1].as_str();
466                                    let table_name = external_reference[2].as_str();
467
468                                    self.pack_sql_server_source_table_update(
469                                        id,
470                                        schema_name,
471                                        table_name,
472                                        diff,
473                                    )
474                                }
475                                // Load generator sources don't have any special
476                                // updates.
477                                "load-generator" => vec![],
478                                "kafka" => {
479                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
480                                    let topic = external_reference[0].as_str();
481                                    let envelope = data_source.envelope();
482                                    let (key_format, value_format) = data_source.formats();
483
484                                    self.pack_kafka_source_tables_update(
485                                        id,
486                                        topic,
487                                        envelope,
488                                        key_format,
489                                        value_format,
490                                        diff,
491                                    )
492                                }
493                                s => unreachable!("{s} sources do not have tables"),
494                            }
495                        }
496                        DataSourceDesc::Ingestion { .. }
497                        | DataSourceDesc::OldSyntaxIngestion { .. }
498                        | DataSourceDesc::Introspection(_)
499                        | DataSourceDesc::Progress
500                        | DataSourceDesc::Webhook { .. }
501                        | DataSourceDesc::Catalog => vec![],
502                    });
503                }
504
505                updates
506            }
507            CatalogItem::Source(source) => {
508                match &source.data_source {
509                    DataSourceDesc::Ingestion { desc, .. }
510                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
511                        GenericSourceConnection::Postgres(postgres) => {
512                            self.pack_postgres_source_update(id, postgres, diff)
513                        }
514                        GenericSourceConnection::Kafka(kafka) => {
515                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
516                        }
517                        _ => vec![],
518                    },
519                    DataSourceDesc::IngestionExport {
520                        ingestion_id,
521                        external_reference: UnresolvedItemName(external_reference),
522                        details: _,
523                        data_config: _,
524                    } => {
525                        let ingestion_entry = self
526                            .get_entry(ingestion_id)
527                            .source_desc()
528                            .expect("primary source exists")
529                            .expect("primary source is a source");
530
531                        match ingestion_entry.connection.name() {
532                            "postgres" => {
533                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
534                                // The left-most qualification of Postgres
535                                // tables is the database, but this
536                                // information is redundant because each
537                                // Postgres connection connects to only one
538                                // database.
539                                let schema_name = external_reference[1].as_str();
540                                let table_name = external_reference[2].as_str();
541
542                                self.pack_postgres_source_tables_update(
543                                    id,
544                                    schema_name,
545                                    table_name,
546                                    diff,
547                                )
548                            }
549                            "mysql" => {
550                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
551                                let schema_name = external_reference[0].as_str();
552                                let table_name = external_reference[1].as_str();
553
554                                self.pack_mysql_source_tables_update(
555                                    id,
556                                    schema_name,
557                                    table_name,
558                                    diff,
559                                )
560                            }
561                            "sql-server" => {
562                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
563                                // The left-most qualification of SQL Server tables is
564                                // the database, but this information is redundant
565                                // because each SQL Server connection connects to
566                                // only one database.
567                                let schema_name = external_reference[1].as_str();
568                                let table_name = external_reference[2].as_str();
569
570                                self.pack_sql_server_source_table_update(
571                                    id,
572                                    schema_name,
573                                    table_name,
574                                    diff,
575                                )
576                            }
577                            // Load generator sources don't have any special
578                            // updates.
579                            "load-generator" => vec![],
580                            s => unreachable!("{s} sources do not have subsources"),
581                        }
582                    }
583                    DataSourceDesc::Webhook { .. } => {
584                        vec![self.pack_webhook_source_update(id, diff)]
585                    }
586                    DataSourceDesc::Introspection(_)
587                    | DataSourceDesc::Progress
588                    | DataSourceDesc::Catalog => vec![],
589                }
590            }
591            CatalogItem::View(view) => {
592                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
593            }
594            CatalogItem::MaterializedView(mview) => {
595                self.pack_materialized_view_update(id, mview, diff)
596            }
597            CatalogItem::Sink(sink) => {
598                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
599            }
600            CatalogItem::Type(ty) => {
601                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
602            }
603            CatalogItem::Func(func) => {
604                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
605            }
606            CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
607            CatalogItem::Connection(connection) => {
608                self.pack_connection_update(id, connection, diff)
609            }
610        };
611
612        if !entry.item().is_temporary() {
613            // Populate or clean up the `mz_object_dependencies` table.
614            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
615            for dependee in entry.item().references().items() {
616                updates.push(self.pack_depends_update(id, *dependee, diff))
617            }
618        }
619
620        // Always report the latest for an objects columns.
621        if let Some(desc) = entry.relation_desc_latest() {
622            let defaults = match entry.item() {
623                CatalogItem::Table(Table {
624                    data_source: TableDataSource::TableWrites { defaults },
625                    ..
626                }) => Some(defaults),
627                _ => None,
628            };
629            for (i, (column_name, column_type)) in desc.iter().enumerate() {
630                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
631                let default: Datum = default
632                    .as_ref()
633                    .map(|d| Datum::String(d))
634                    .unwrap_or(Datum::Null);
635                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
636                let (type_name, type_oid) = match &column_type.scalar_type {
637                    SqlScalarType::List {
638                        custom_id: Some(custom_id),
639                        ..
640                    }
641                    | SqlScalarType::Map {
642                        custom_id: Some(custom_id),
643                        ..
644                    }
645                    | SqlScalarType::Record {
646                        custom_id: Some(custom_id),
647                        ..
648                    } => {
649                        let entry = self.get_entry(custom_id);
650                        // NOTE(benesch): the `mz_columns.type text` field is
651                        // wrong. Types do not have a name that can be
652                        // represented as a single textual field. There can be
653                        // multiple types with the same name in different
654                        // schemas and databases. We should eventually deprecate
655                        // the `type` field in favor of a new `type_id` field
656                        // that can be joined against `mz_types`.
657                        //
658                        // For now, in the interest of pragmatism, we just use
659                        // the type's item name, and accept that there may be
660                        // ambiguity if the same type name is used in multiple
661                        // schemas. The ambiguity is mitigated by the OID, which
662                        // can be joined against `mz_types.oid` to resolve the
663                        // ambiguity.
664                        let name = &*entry.name().item;
665                        let oid = entry.oid();
666                        (name, oid)
667                    }
668                    _ => (pgtype.name(), pgtype.oid()),
669                };
670                updates.push(BuiltinTableUpdate::row(
671                    &*MZ_COLUMNS,
672                    Row::pack_slice(&[
673                        Datum::String(&id.to_string()),
674                        Datum::String(column_name),
675                        Datum::UInt64(u64::cast_from(i + 1)),
676                        Datum::from(column_type.nullable),
677                        Datum::String(type_name),
678                        default,
679                        Datum::UInt32(type_oid),
680                        Datum::Int32(pgtype.typmod()),
681                    ]),
682                    diff,
683                ));
684            }
685        }
686
687        // Use initial lcw so that we can tell apart default from non-existent windows.
688        if let Some(cw) = entry.item().initial_logical_compaction_window() {
689            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
690        }
691
692        updates.extend(Self::pack_item_global_id_update(entry, diff));
693
694        updates
695    }
696
697    fn pack_item_global_id_update(
698        entry: &CatalogEntry,
699        diff: Diff,
700    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
701        let id = entry.id().to_string();
702        let global_ids = entry.global_ids();
703        global_ids.map(move |global_id| {
704            BuiltinTableUpdate::row(
705                &*MZ_OBJECT_GLOBAL_IDS,
706                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
707                diff,
708            )
709        })
710    }
711
712    fn pack_history_retention_strategy_update(
713        &self,
714        id: CatalogItemId,
715        cw: CompactionWindow,
716        diff: Diff,
717    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
718        let cw: u64 = cw.comparable_timestamp().into();
719        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
720            .expect("must serialize");
721        BuiltinTableUpdate::row(
722            &*MZ_HISTORY_RETENTION_STRATEGIES,
723            Row::pack_slice(&[
724                Datum::String(&id.to_string()),
725                // FOR is the only strategy at the moment. We may introduce FROM or others later.
726                Datum::String("FOR"),
727                cw.into_row().into_element(),
728            ]),
729            diff,
730        )
731    }
732
733    fn pack_table_update(
734        &self,
735        id: CatalogItemId,
736        oid: u32,
737        schema_id: &SchemaSpecifier,
738        name: &str,
739        owner_id: &RoleId,
740        privileges: Datum,
741        diff: Diff,
742        table: &Table,
743    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
744        let redacted = table.create_sql.as_ref().map(|create_sql| {
745            mz_sql::parse::parse(create_sql)
746                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
747                .into_element()
748                .ast
749                .to_ast_string_redacted()
750        });
751        let source_id = if let TableDataSource::DataSource {
752            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
753            ..
754        } = &table.data_source
755        {
756            Some(ingestion_id.to_string())
757        } else {
758            None
759        };
760
761        vec![BuiltinTableUpdate::row(
762            &*MZ_TABLES,
763            Row::pack_slice(&[
764                Datum::String(&id.to_string()),
765                Datum::UInt32(oid),
766                Datum::String(&schema_id.to_string()),
767                Datum::String(name),
768                Datum::String(&owner_id.to_string()),
769                privileges,
770                if let Some(create_sql) = &table.create_sql {
771                    Datum::String(create_sql)
772                } else {
773                    Datum::Null
774                },
775                if let Some(redacted) = &redacted {
776                    Datum::String(redacted)
777                } else {
778                    Datum::Null
779                },
780                if let Some(source_id) = source_id.as_ref() {
781                    Datum::String(source_id)
782                } else {
783                    Datum::Null
784                },
785            ]),
786            diff,
787        )]
788    }
789
790    fn pack_postgres_source_update(
791        &self,
792        id: CatalogItemId,
793        postgres: &PostgresSourceConnection<ReferencedConnection>,
794        diff: Diff,
795    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
796        vec![BuiltinTableUpdate::row(
797            &*MZ_POSTGRES_SOURCES,
798            Row::pack_slice(&[
799                Datum::String(&id.to_string()),
800                Datum::String(&postgres.publication_details.slot),
801                Datum::from(postgres.publication_details.timeline_id),
802            ]),
803            diff,
804        )]
805    }
806
807    fn pack_kafka_source_update(
808        &self,
809        item_id: CatalogItemId,
810        collection_id: GlobalId,
811        kafka: &KafkaSourceConnection<ReferencedConnection>,
812        diff: Diff,
813    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
814        vec![BuiltinTableUpdate::row(
815            &*MZ_KAFKA_SOURCES,
816            Row::pack_slice(&[
817                Datum::String(&item_id.to_string()),
818                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
819                Datum::String(&kafka.topic),
820            ]),
821            diff,
822        )]
823    }
824
825    fn pack_postgres_source_tables_update(
826        &self,
827        id: CatalogItemId,
828        schema_name: &str,
829        table_name: &str,
830        diff: Diff,
831    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
832        vec![BuiltinTableUpdate::row(
833            &*MZ_POSTGRES_SOURCE_TABLES,
834            Row::pack_slice(&[
835                Datum::String(&id.to_string()),
836                Datum::String(schema_name),
837                Datum::String(table_name),
838            ]),
839            diff,
840        )]
841    }
842
843    fn pack_mysql_source_tables_update(
844        &self,
845        id: CatalogItemId,
846        schema_name: &str,
847        table_name: &str,
848        diff: Diff,
849    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
850        vec![BuiltinTableUpdate::row(
851            &*MZ_MYSQL_SOURCE_TABLES,
852            Row::pack_slice(&[
853                Datum::String(&id.to_string()),
854                Datum::String(schema_name),
855                Datum::String(table_name),
856            ]),
857            diff,
858        )]
859    }
860
861    fn pack_sql_server_source_table_update(
862        &self,
863        id: CatalogItemId,
864        schema_name: &str,
865        table_name: &str,
866        diff: Diff,
867    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
868        vec![BuiltinTableUpdate::row(
869            &*MZ_SQL_SERVER_SOURCE_TABLES,
870            Row::pack_slice(&[
871                Datum::String(&id.to_string()),
872                Datum::String(schema_name),
873                Datum::String(table_name),
874            ]),
875            diff,
876        )]
877    }
878
879    fn pack_kafka_source_tables_update(
880        &self,
881        id: CatalogItemId,
882        topic: &str,
883        envelope: Option<&str>,
884        key_format: Option<&str>,
885        value_format: Option<&str>,
886        diff: Diff,
887    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
888        vec![BuiltinTableUpdate::row(
889            &*MZ_KAFKA_SOURCE_TABLES,
890            Row::pack_slice(&[
891                Datum::String(&id.to_string()),
892                Datum::String(topic),
893                Datum::from(envelope),
894                Datum::from(key_format),
895                Datum::from(value_format),
896            ]),
897            diff,
898        )]
899    }
900
901    fn pack_connection_update(
902        &self,
903        id: CatalogItemId,
904        connection: &Connection,
905        diff: Diff,
906    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
907        let mut updates = vec![];
908        match connection.details {
909            ConnectionDetails::Kafka(ref kafka) => {
910                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
911            }
912            ConnectionDetails::Aws(ref aws_config) => {
913                match self.pack_aws_connection_update(id, aws_config, diff) {
914                    Ok(update) => {
915                        updates.push(update);
916                    }
917                    Err(e) => {
918                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
919                    }
920                }
921            }
922            ConnectionDetails::AwsPrivatelink(_) => {
923                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
924                    updates.push(self.pack_aws_privatelink_connection_update(
925                        id,
926                        aws_principal_context,
927                        diff,
928                    ));
929                } else {
930                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
931                }
932            }
933            ConnectionDetails::Ssh {
934                ref key_1,
935                ref key_2,
936                ..
937            } => {
938                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
939            }
940            ConnectionDetails::Csr(_)
941            | ConnectionDetails::GlueSchemaRegistry(_)
942            | ConnectionDetails::Postgres(_)
943            | ConnectionDetails::MySql(_)
944            | ConnectionDetails::SqlServer(_)
945            | ConnectionDetails::IcebergCatalog(_) => (),
946        };
947        updates
948    }
949
950    pub(crate) fn pack_ssh_tunnel_connection_update(
951        &self,
952        id: CatalogItemId,
953        key_1: &SshKey,
954        key_2: &SshKey,
955        diff: Diff,
956    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
957        BuiltinTableUpdate::row(
958            &*MZ_SSH_TUNNEL_CONNECTIONS,
959            Row::pack_slice(&[
960                Datum::String(&id.to_string()),
961                Datum::String(key_1.public_key().as_str()),
962                Datum::String(key_2.public_key().as_str()),
963            ]),
964            diff,
965        )
966    }
967
968    fn pack_kafka_connection_update(
969        &self,
970        id: CatalogItemId,
971        kafka: &KafkaConnection<ReferencedConnection>,
972        diff: Diff,
973    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
974        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
975        let mut row = Row::default();
976        row.packer()
977            .try_push_array(
978                &[ArrayDimension {
979                    lower_bound: 1,
980                    length: kafka.brokers.len(),
981                }],
982                kafka
983                    .brokers
984                    .iter()
985                    .map(|broker| Datum::String(&broker.address)),
986            )
987            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
988        let brokers = row.unpack_first();
989        vec![BuiltinTableUpdate::row(
990            &*MZ_KAFKA_CONNECTIONS,
991            Row::pack_slice(&[
992                Datum::String(&id.to_string()),
993                brokers,
994                Datum::String(&progress_topic),
995            ]),
996            diff,
997        )]
998    }
999
1000    pub fn pack_aws_privatelink_connection_update(
1001        &self,
1002        connection_id: CatalogItemId,
1003        aws_principal_context: &AwsPrincipalContext,
1004        diff: Diff,
1005    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1006        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1007        let row = Row::pack_slice(&[
1008            Datum::String(&connection_id.to_string()),
1009            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1010        ]);
1011        BuiltinTableUpdate::row(id, row, diff)
1012    }
1013
1014    pub fn pack_aws_connection_update(
1015        &self,
1016        connection_id: CatalogItemId,
1017        aws_config: &AwsConnection,
1018        diff: Diff,
1019    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1020        let id = &MZ_AWS_CONNECTIONS;
1021
1022        let mut access_key_id = None;
1023        let mut access_key_id_secret_id = None;
1024        let mut secret_access_key_secret_id = None;
1025        let mut session_token = None;
1026        let mut session_token_secret_id = None;
1027        let mut assume_role_arn = None;
1028        let mut assume_role_session_name = None;
1029        let mut principal = None;
1030        let mut external_id = None;
1031        let mut example_trust_policy = None;
1032        match &aws_config.auth {
1033            AwsAuth::Credentials(credentials) => {
1034                match &credentials.access_key_id {
1035                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1036                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1037                }
1038                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1039                match credentials.session_token.as_ref() {
1040                    None => (),
1041                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1042                    Some(StringOrSecret::Secret(s)) => {
1043                        session_token_secret_id = Some(s.to_string())
1044                    }
1045                }
1046            }
1047            AwsAuth::AssumeRole(assume_role) => {
1048                assume_role_arn = Some(assume_role.arn.as_str());
1049                assume_role_session_name = assume_role.session_name.as_deref();
1050                principal = self
1051                    .config
1052                    .connection_context
1053                    .aws_connection_role_arn
1054                    .as_deref();
1055                external_id =
1056                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1057                example_trust_policy = {
1058                    let policy = assume_role
1059                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1060                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1061                    Some(policy.into_row())
1062                };
1063            }
1064        }
1065
1066        let row = Row::pack_slice(&[
1067            Datum::String(&connection_id.to_string()),
1068            Datum::from(aws_config.endpoint.as_deref()),
1069            Datum::from(aws_config.region.as_deref()),
1070            Datum::from(access_key_id),
1071            Datum::from(access_key_id_secret_id.as_deref()),
1072            Datum::from(secret_access_key_secret_id.as_deref()),
1073            Datum::from(session_token),
1074            Datum::from(session_token_secret_id.as_deref()),
1075            Datum::from(assume_role_arn),
1076            Datum::from(assume_role_session_name),
1077            Datum::from(principal),
1078            Datum::from(external_id.as_deref()),
1079            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1080        ]);
1081
1082        Ok(BuiltinTableUpdate::row(id, row, diff))
1083    }
1084
1085    fn pack_view_update(
1086        &self,
1087        id: CatalogItemId,
1088        oid: u32,
1089        schema_id: &SchemaSpecifier,
1090        name: &str,
1091        owner_id: &RoleId,
1092        privileges: Datum,
1093        view: &View,
1094        diff: Diff,
1095    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1096        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1097            .unwrap_or_else(|e| {
1098                panic!(
1099                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1100                    view.create_sql, e
1101                )
1102            })
1103            .into_element()
1104            .ast;
1105        let query = match &create_stmt {
1106            Statement::CreateView(stmt) => &stmt.definition.query,
1107            _ => unreachable!(),
1108        };
1109
1110        let mut query_string = query.to_ast_string_stable();
1111        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1112        // do the same for compatibility's sake.
1113        query_string.push(';');
1114
1115        vec![BuiltinTableUpdate::row(
1116            &*MZ_VIEWS,
1117            Row::pack_slice(&[
1118                Datum::String(&id.to_string()),
1119                Datum::UInt32(oid),
1120                Datum::String(&schema_id.to_string()),
1121                Datum::String(name),
1122                Datum::String(&query_string),
1123                Datum::String(&owner_id.to_string()),
1124                privileges,
1125                Datum::String(&view.create_sql),
1126                Datum::String(&create_stmt.to_ast_string_redacted()),
1127            ]),
1128            diff,
1129        )]
1130    }
1131
1132    fn pack_materialized_view_update(
1133        &self,
1134        id: CatalogItemId,
1135        mview: &MaterializedView,
1136        diff: Diff,
1137    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1138        let mut updates = Vec::new();
1139
1140        if let Some(refresh_schedule) = &mview.refresh_schedule {
1141            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1142            // empty `RefreshSchedule`.
1143            assert!(!refresh_schedule.is_empty());
1144            for RefreshEvery {
1145                interval,
1146                aligned_to,
1147            } in refresh_schedule.everies.iter()
1148            {
1149                let aligned_to_dt = mz_ore::now::to_datetime(
1150                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1151                );
1152                updates.push(BuiltinTableUpdate::row(
1153                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1154                    Row::pack_slice(&[
1155                        Datum::String(&id.to_string()),
1156                        Datum::String("every"),
1157                        Datum::Interval(
1158                            Interval::from_duration(interval).expect(
1159                                "planning ensured that this is convertible back to Interval",
1160                            ),
1161                        ),
1162                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1163                        Datum::Null,
1164                    ]),
1165                    diff,
1166                ));
1167            }
1168            for at in refresh_schedule.ats.iter() {
1169                let at_dt = mz_ore::now::to_datetime(
1170                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1171                );
1172                updates.push(BuiltinTableUpdate::row(
1173                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1174                    Row::pack_slice(&[
1175                        Datum::String(&id.to_string()),
1176                        Datum::String("at"),
1177                        Datum::Null,
1178                        Datum::Null,
1179                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1180                    ]),
1181                    diff,
1182                ));
1183            }
1184        } else {
1185            updates.push(BuiltinTableUpdate::row(
1186                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1187                Row::pack_slice(&[
1188                    Datum::String(&id.to_string()),
1189                    Datum::String("on-commit"),
1190                    Datum::Null,
1191                    Datum::Null,
1192                    Datum::Null,
1193                ]),
1194                diff,
1195            ));
1196        }
1197
1198        if let Some(target_id) = mview.replacement_target {
1199            updates.push(BuiltinTableUpdate::row(
1200                &*MZ_REPLACEMENTS,
1201                Row::pack_slice(&[
1202                    Datum::String(&id.to_string()),
1203                    Datum::String(&target_id.to_string()),
1204                ]),
1205                diff,
1206            ));
1207        }
1208
1209        updates
1210    }
1211
1212    fn pack_sink_update(
1213        &self,
1214        id: CatalogItemId,
1215        oid: u32,
1216        schema_id: &SchemaSpecifier,
1217        name: &str,
1218        owner_id: &RoleId,
1219        sink: &Sink,
1220        diff: Diff,
1221    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1222        let mut updates = vec![];
1223        match &sink.connection {
1224            StorageSinkConnection::Kafka(KafkaSinkConnection {
1225                topic: topic_name, ..
1226            }) => {
1227                updates.push(BuiltinTableUpdate::row(
1228                    &*MZ_KAFKA_SINKS,
1229                    Row::pack_slice(&[
1230                        Datum::String(&id.to_string()),
1231                        Datum::String(topic_name.as_str()),
1232                    ]),
1233                    diff,
1234                ));
1235            }
1236            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1237                namespace, table, ..
1238            }) => {
1239                updates.push(BuiltinTableUpdate::row(
1240                    &*MZ_ICEBERG_SINKS,
1241                    Row::pack_slice(&[
1242                        Datum::String(&id.to_string()),
1243                        Datum::String(namespace.as_str()),
1244                        Datum::String(table.as_str()),
1245                    ]),
1246                    diff,
1247                ));
1248            }
1249        };
1250
1251        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1252            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1253            .into_element()
1254            .ast;
1255
1256        let envelope = sink.envelope();
1257
1258        // The combined format string is used for the deprecated `format` column.
1259        let combined_format = sink.combined_format();
1260        let (key_format, value_format) = match sink.formats() {
1261            Some((key_format, value_format)) => (key_format, Some(value_format)),
1262            None => (None, None),
1263        };
1264
1265        updates.push(BuiltinTableUpdate::row(
1266            &*MZ_SINKS,
1267            Row::pack_slice(&[
1268                Datum::String(&id.to_string()),
1269                Datum::UInt32(oid),
1270                Datum::String(&schema_id.to_string()),
1271                Datum::String(name),
1272                Datum::String(sink.connection.name()),
1273                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1274                // size column now deprecated w/o linked clusters
1275                Datum::Null,
1276                Datum::from(envelope),
1277                // FIXME: These key/value formats are kinda leaky! Should probably live in
1278                // the kafka sink table.
1279                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1280                Datum::from(key_format),
1281                Datum::from(value_format),
1282                Datum::String(&sink.cluster_id.to_string()),
1283                Datum::String(&owner_id.to_string()),
1284                Datum::String(&sink.create_sql),
1285                Datum::String(&create_stmt.to_ast_string_redacted()),
1286            ]),
1287            diff,
1288        ));
1289
1290        updates
1291    }
1292
1293    fn pack_index_update(
1294        &self,
1295        id: CatalogItemId,
1296        oid: u32,
1297        name: &str,
1298        owner_id: &RoleId,
1299        index: &Index,
1300        diff: Diff,
1301    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1302        let mut updates = vec![];
1303
1304        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1305            .unwrap_or_else(|e| {
1306                panic!(
1307                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1308                    index.create_sql, e
1309                )
1310            })
1311            .into_element()
1312            .ast;
1313
1314        let key_sqls = match &create_stmt {
1315            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1316                .as_ref()
1317                .expect("key_parts is filled in during planning"),
1318            _ => unreachable!(),
1319        };
1320        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1321
1322        updates.push(BuiltinTableUpdate::row(
1323            &*MZ_INDEXES,
1324            Row::pack_slice(&[
1325                Datum::String(&id.to_string()),
1326                Datum::UInt32(oid),
1327                Datum::String(name),
1328                Datum::String(&on_item_id.to_string()),
1329                Datum::String(&index.cluster_id.to_string()),
1330                Datum::String(&owner_id.to_string()),
1331                Datum::String(&index.create_sql),
1332                Datum::String(&create_stmt.to_ast_string_redacted()),
1333            ]),
1334            diff,
1335        ));
1336
1337        let on_entry = self.get_entry_by_global_id(&index.on);
1338        let on_desc = on_entry
1339            .relation_desc()
1340            .expect("can only create indexes on items with a valid description");
1341        let repr_col_types: Vec<ReprColumnType> = on_desc
1342            .typ()
1343            .column_types
1344            .iter()
1345            .map(ReprColumnType::from)
1346            .collect();
1347        for (i, key) in index.keys.iter().enumerate() {
1348            let nullable = key.typ(&repr_col_types).nullable;
1349            let seq_in_index = u64::cast_from(i + 1);
1350            let key_sql = key_sqls
1351                .get(i)
1352                .expect("missing sql information for index key")
1353                .to_ast_string_simple();
1354            let (field_number, expression) = match key {
1355                MirScalarExpr::Column(col, _) => {
1356                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1357                }
1358                _ => (Datum::Null, Datum::String(&key_sql)),
1359            };
1360            updates.push(BuiltinTableUpdate::row(
1361                &*MZ_INDEX_COLUMNS,
1362                Row::pack_slice(&[
1363                    Datum::String(&id.to_string()),
1364                    Datum::UInt64(seq_in_index),
1365                    field_number,
1366                    expression,
1367                    Datum::from(nullable),
1368                ]),
1369                diff,
1370            ));
1371        }
1372
1373        updates
1374    }
1375
1376    fn pack_type_update(
1377        &self,
1378        id: CatalogItemId,
1379        oid: u32,
1380        schema_id: &SchemaSpecifier,
1381        name: &str,
1382        owner_id: &RoleId,
1383        privileges: Datum,
1384        typ: &Type,
1385        diff: Diff,
1386    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1387        let mut out = vec![];
1388
1389        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1390            mz_sql::parse::parse(create_sql)
1391                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1392                .into_element()
1393                .ast
1394                .to_ast_string_redacted()
1395        });
1396
1397        out.push(BuiltinTableUpdate::row(
1398            &*MZ_TYPES,
1399            Row::pack_slice(&[
1400                Datum::String(&id.to_string()),
1401                Datum::UInt32(oid),
1402                Datum::String(&schema_id.to_string()),
1403                Datum::String(name),
1404                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1405                Datum::String(&owner_id.to_string()),
1406                privileges,
1407                if let Some(create_sql) = &typ.create_sql {
1408                    Datum::String(create_sql)
1409                } else {
1410                    Datum::Null
1411                },
1412                if let Some(redacted) = &redacted {
1413                    Datum::String(redacted)
1414                } else {
1415                    Datum::Null
1416                },
1417            ]),
1418            diff,
1419        ));
1420
1421        let mut row = Row::default();
1422        let mut packer = row.packer();
1423
1424        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1425            if mods.is_empty() {
1426                packer.push(Datum::Null);
1427            } else {
1428                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1429            }
1430        }
1431
1432        let index_id = match &typ.details.typ {
1433            CatalogType::Array {
1434                element_reference: element_id,
1435            } => {
1436                packer.push(Datum::String(&id.to_string()));
1437                packer.push(Datum::String(&element_id.to_string()));
1438                &MZ_ARRAY_TYPES
1439            }
1440            CatalogType::List {
1441                element_reference: element_id,
1442                element_modifiers,
1443            } => {
1444                packer.push(Datum::String(&id.to_string()));
1445                packer.push(Datum::String(&element_id.to_string()));
1446                append_modifier(&mut packer, element_modifiers);
1447                &MZ_LIST_TYPES
1448            }
1449            CatalogType::Map {
1450                key_reference: key_id,
1451                value_reference: value_id,
1452                key_modifiers,
1453                value_modifiers,
1454            } => {
1455                packer.push(Datum::String(&id.to_string()));
1456                packer.push(Datum::String(&key_id.to_string()));
1457                packer.push(Datum::String(&value_id.to_string()));
1458                append_modifier(&mut packer, key_modifiers);
1459                append_modifier(&mut packer, value_modifiers);
1460                &MZ_MAP_TYPES
1461            }
1462            CatalogType::Pseudo => {
1463                packer.push(Datum::String(&id.to_string()));
1464                &MZ_PSEUDO_TYPES
1465            }
1466            _ => {
1467                packer.push(Datum::String(&id.to_string()));
1468                &MZ_BASE_TYPES
1469            }
1470        };
1471        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1472
1473        if let Some(pg_metadata) = &typ.details.pg_metadata {
1474            out.push(BuiltinTableUpdate::row(
1475                &*MZ_TYPE_PG_METADATA,
1476                Row::pack_slice(&[
1477                    Datum::String(&id.to_string()),
1478                    Datum::UInt32(pg_metadata.typinput_oid),
1479                    Datum::UInt32(pg_metadata.typreceive_oid),
1480                ]),
1481                diff,
1482            ));
1483        }
1484
1485        out
1486    }
1487
1488    fn pack_func_update(
1489        &self,
1490        id: CatalogItemId,
1491        schema_id: &SchemaSpecifier,
1492        name: &str,
1493        owner_id: &RoleId,
1494        func: &Func,
1495        diff: Diff,
1496    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1497        let mut updates = vec![];
1498        for func_impl_details in func.inner.func_impls() {
1499            let arg_type_ids = func_impl_details
1500                .arg_typs
1501                .iter()
1502                .map(|typ| self.get_system_type(typ).id().to_string())
1503                .collect::<Vec<_>>();
1504
1505            let mut row = Row::default();
1506            row.packer()
1507                .try_push_array(
1508                    &[ArrayDimension {
1509                        lower_bound: 1,
1510                        length: arg_type_ids.len(),
1511                    }],
1512                    arg_type_ids.iter().map(|id| Datum::String(id)),
1513                )
1514                .expect(
1515                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1516                );
1517            let arg_type_ids = row.unpack_first();
1518
1519            updates.push(BuiltinTableUpdate::row(
1520                &*MZ_FUNCTIONS,
1521                Row::pack_slice(&[
1522                    Datum::String(&id.to_string()),
1523                    Datum::UInt32(func_impl_details.oid),
1524                    Datum::String(&schema_id.to_string()),
1525                    Datum::String(name),
1526                    arg_type_ids,
1527                    Datum::from(
1528                        func_impl_details
1529                            .variadic_typ
1530                            .map(|typ| self.get_system_type(typ).id().to_string())
1531                            .as_deref(),
1532                    ),
1533                    Datum::from(
1534                        func_impl_details
1535                            .return_typ
1536                            .map(|typ| self.get_system_type(typ).id().to_string())
1537                            .as_deref(),
1538                    ),
1539                    func_impl_details.return_is_set.into(),
1540                    Datum::String(&owner_id.to_string()),
1541                ]),
1542                diff,
1543            ));
1544
1545            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1546                updates.push(BuiltinTableUpdate::row(
1547                    &*MZ_AGGREGATES,
1548                    Row::pack_slice(&[
1549                        Datum::UInt32(func_impl_details.oid),
1550                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1551                        Datum::String("n"),
1552                        Datum::Int16(0),
1553                    ]),
1554                    diff,
1555                ));
1556            }
1557        }
1558        updates
1559    }
1560
1561    pub fn pack_op_update(
1562        &self,
1563        operator: &str,
1564        func_impl_details: FuncImplCatalogDetails,
1565        diff: Diff,
1566    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1567        let arg_type_ids = func_impl_details
1568            .arg_typs
1569            .iter()
1570            .map(|typ| self.get_system_type(typ).id().to_string())
1571            .collect::<Vec<_>>();
1572
1573        let mut row = Row::default();
1574        row.packer()
1575            .try_push_array(
1576                &[ArrayDimension {
1577                    lower_bound: 1,
1578                    length: arg_type_ids.len(),
1579                }],
1580                arg_type_ids.iter().map(|id| Datum::String(id)),
1581            )
1582            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1583        let arg_type_ids = row.unpack_first();
1584
1585        BuiltinTableUpdate::row(
1586            &*MZ_OPERATORS,
1587            Row::pack_slice(&[
1588                Datum::UInt32(func_impl_details.oid),
1589                Datum::String(operator),
1590                arg_type_ids,
1591                Datum::from(
1592                    func_impl_details
1593                        .return_typ
1594                        .map(|typ| self.get_system_type(typ).id().to_string())
1595                        .as_deref(),
1596                ),
1597            ]),
1598            diff,
1599        )
1600    }
1601
1602    pub fn pack_audit_log_update(
1603        &self,
1604        event: &VersionedEvent,
1605        diff: Diff,
1606    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1607        let (event_type, object_type, details, user, occurred_at): (
1608            &EventType,
1609            &ObjectType,
1610            &EventDetails,
1611            &Option<String>,
1612            u64,
1613        ) = match event {
1614            VersionedEvent::V1(ev) => (
1615                &ev.event_type,
1616                &ev.object_type,
1617                &ev.details,
1618                &ev.user,
1619                ev.occurred_at,
1620            ),
1621        };
1622        let details = Jsonb::from_serde_json(details.as_json())
1623            .map_err(|e| {
1624                Error::new(ErrorKind::Unstructured(format!(
1625                    "could not pack audit log update: {}",
1626                    e
1627                )))
1628            })?
1629            .into_row();
1630        let details = details
1631            .iter()
1632            .next()
1633            .expect("details created above with a single jsonb column");
1634        let dt = mz_ore::now::to_datetime(occurred_at);
1635        let id = event.sortable_id();
1636        Ok(BuiltinTableUpdate::row(
1637            &*MZ_AUDIT_EVENTS,
1638            Row::pack_slice(&[
1639                Datum::UInt64(id),
1640                Datum::String(&format!("{}", event_type)),
1641                Datum::String(&format!("{}", object_type)),
1642                details,
1643                match user {
1644                    Some(user) => Datum::String(user),
1645                    None => Datum::Null,
1646                },
1647                Datum::TimestampTz(dt.try_into().expect("must fit")),
1648            ]),
1649            diff,
1650        ))
1651    }
1652
1653    pub fn pack_storage_usage_update(
1654        &self,
1655        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1656        diff: Diff,
1657    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1658        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1659        let row = Row::pack_slice(&[
1660            Datum::UInt64(event.id),
1661            Datum::from(event.shard_id.as_deref()),
1662            Datum::UInt64(event.size_bytes),
1663            Datum::TimestampTz(
1664                mz_ore::now::to_datetime(event.collection_timestamp)
1665                    .try_into()
1666                    .expect("must fit"),
1667            ),
1668        ]);
1669        BuiltinTableUpdate::row(id, row, diff)
1670    }
1671
1672    pub fn pack_egress_ip_update(
1673        &self,
1674        ip: &IpNet,
1675    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1676        let id = &MZ_EGRESS_IPS;
1677        let addr = ip.network();
1678        let row = Row::pack_slice(&[
1679            Datum::String(&addr.to_string()),
1680            Datum::Int32(ip.prefix_len().into()),
1681            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1682        ]);
1683        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1684    }
1685
1686    pub fn pack_license_key_update(
1687        &self,
1688        license_key: &ValidatedLicenseKey,
1689    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1690        let id = &MZ_LICENSE_KEYS;
1691        let row = Row::pack_slice(&[
1692            Datum::String(&license_key.id),
1693            Datum::String(&license_key.organization),
1694            Datum::String(&license_key.environment_id),
1695            Datum::TimestampTz(
1696                mz_ore::now::to_datetime(license_key.expiration * 1000)
1697                    .try_into()
1698                    .expect("must fit"),
1699            ),
1700            Datum::TimestampTz(
1701                mz_ore::now::to_datetime(license_key.not_before * 1000)
1702                    .try_into()
1703                    .expect("must fit"),
1704            ),
1705        ]);
1706        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1707    }
1708
1709    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1710        let mut updates = Vec::new();
1711        for (size, alloc) in &self.cluster_replica_sizes.0 {
1712            if alloc.disabled {
1713                continue;
1714            }
1715
1716            // Just invent something when the limits are `None`, which only happens in non-prod
1717            // environments (tests, process orchestrator, etc.)
1718            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1719            let MemoryLimit(ByteSize(memory_bytes)) =
1720                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1721            let DiskLimit(ByteSize(disk_bytes)) =
1722                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1723
1724            let row = Row::pack_slice(&[
1725                size.as_str().into(),
1726                u64::cast_from(alloc.scale).into(),
1727                u64::cast_from(alloc.workers).into(),
1728                cpu_limit.as_nanocpus().into(),
1729                memory_bytes.into(),
1730                disk_bytes.into(),
1731                (alloc.credits_per_hour).into(),
1732            ]);
1733
1734            updates.push(BuiltinTableUpdate::row(
1735                &*MZ_CLUSTER_REPLICA_SIZES,
1736                row,
1737                Diff::ONE,
1738            ));
1739        }
1740
1741        updates
1742    }
1743
1744    pub fn pack_subscribe_update(
1745        &self,
1746        id: GlobalId,
1747        subscribe: &ActiveSubscribe,
1748        diff: Diff,
1749    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1750        let mut row = Row::default();
1751        let mut packer = row.packer();
1752        packer.push(Datum::String(&id.to_string()));
1753        packer.push(Datum::Uuid(subscribe.session_uuid));
1754        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1755
1756        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1757        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1758
1759        let depends_on: Vec<_> = subscribe
1760            .depends_on
1761            .iter()
1762            .map(|id| id.to_string())
1763            .collect();
1764        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1765
1766        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1767    }
1768
1769    pub fn pack_session_update(
1770        &self,
1771        conn: &ConnMeta,
1772        diff: Diff,
1773    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1774        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1775        BuiltinTableUpdate::row(
1776            &*MZ_SESSIONS,
1777            Row::pack_slice(&[
1778                Datum::Uuid(conn.uuid()),
1779                Datum::UInt32(conn.conn_id().unhandled()),
1780                Datum::String(&conn.authenticated_role_id().to_string()),
1781                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1782                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1783            ]),
1784            diff,
1785        )
1786    }
1787
1788    pub fn pack_default_privileges_update(
1789        &self,
1790        default_privilege_object: &DefaultPrivilegeObject,
1791        grantee: &RoleId,
1792        acl_mode: &AclMode,
1793        diff: Diff,
1794    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1795        BuiltinTableUpdate::row(
1796            &*MZ_DEFAULT_PRIVILEGES,
1797            Row::pack_slice(&[
1798                default_privilege_object.role_id.to_string().as_str().into(),
1799                default_privilege_object
1800                    .database_id
1801                    .map(|database_id| database_id.to_string())
1802                    .as_deref()
1803                    .into(),
1804                default_privilege_object
1805                    .schema_id
1806                    .map(|schema_id| schema_id.to_string())
1807                    .as_deref()
1808                    .into(),
1809                default_privilege_object
1810                    .object_type
1811                    .to_string()
1812                    .to_lowercase()
1813                    .as_str()
1814                    .into(),
1815                grantee.to_string().as_str().into(),
1816                acl_mode.to_string().as_str().into(),
1817            ]),
1818            diff,
1819        )
1820    }
1821
1822    pub fn pack_system_privileges_update(
1823        &self,
1824        privileges: MzAclItem,
1825        diff: Diff,
1826    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1827        BuiltinTableUpdate::row(
1828            &*MZ_SYSTEM_PRIVILEGES,
1829            Row::pack_slice(&[privileges.into()]),
1830            diff,
1831        )
1832    }
1833
1834    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1835        let mut row = Row::default();
1836        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1837        row.packer()
1838            .try_push_array(
1839                &[ArrayDimension {
1840                    lower_bound: 1,
1841                    length: flat_privileges.len(),
1842                }],
1843                flat_privileges
1844                    .into_iter()
1845                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1846            )
1847            .expect("privileges is 1 dimensional, and its length is used for the array length");
1848        row
1849    }
1850
1851    pub fn pack_comment_update(
1852        &self,
1853        object_id: CommentObjectId,
1854        column_pos: Option<usize>,
1855        comment: &str,
1856        diff: Diff,
1857    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1858        // Use the audit log representation so it's easier to join against.
1859        let object_type = mz_sql::catalog::ObjectType::from(object_id);
1860        let audit_type = super::object_type_to_audit_object_type(object_type);
1861        let object_type_str = audit_type.to_string();
1862
1863        let object_id_str = match object_id {
1864            CommentObjectId::Table(global_id)
1865            | CommentObjectId::View(global_id)
1866            | CommentObjectId::MaterializedView(global_id)
1867            | CommentObjectId::Source(global_id)
1868            | CommentObjectId::Sink(global_id)
1869            | CommentObjectId::Index(global_id)
1870            | CommentObjectId::Func(global_id)
1871            | CommentObjectId::Connection(global_id)
1872            | CommentObjectId::Secret(global_id)
1873            | CommentObjectId::Type(global_id) => global_id.to_string(),
1874            CommentObjectId::Role(role_id) => role_id.to_string(),
1875            CommentObjectId::Database(database_id) => database_id.to_string(),
1876            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1877            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1878            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1879            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1880        };
1881        let column_pos_datum = match column_pos {
1882            Some(pos) => {
1883                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
1884                let pos =
1885                    i32::try_from(pos).expect("we constrain this value in the planning layer");
1886                Datum::Int32(pos)
1887            }
1888            None => Datum::Null,
1889        };
1890
1891        BuiltinTableUpdate::row(
1892            &*MZ_COMMENTS,
1893            Row::pack_slice(&[
1894                Datum::String(&object_id_str),
1895                Datum::String(&object_type_str),
1896                column_pos_datum,
1897                Datum::String(comment),
1898            ]),
1899            diff,
1900        )
1901    }
1902
1903    pub fn pack_webhook_source_update(
1904        &self,
1905        item_id: CatalogItemId,
1906        diff: Diff,
1907    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1908        let url = self
1909            .try_get_webhook_url(&item_id)
1910            .expect("webhook source should exist");
1911        let url = url.to_string();
1912        let name = &self.get_entry(&item_id).name().item;
1913        let id_str = item_id.to_string();
1914
1915        BuiltinTableUpdate::row(
1916            &*MZ_WEBHOOKS_SOURCES,
1917            Row::pack_slice(&[
1918                Datum::String(&id_str),
1919                Datum::String(name),
1920                Datum::String(&url),
1921            ]),
1922            diff,
1923        )
1924    }
1925
1926    pub fn pack_source_references_update(
1927        &self,
1928        source_references: &SourceReferences,
1929        diff: Diff,
1930    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1931        let source_id = source_references.source_id.to_string();
1932        let updated_at = &source_references.updated_at;
1933        source_references
1934            .references
1935            .iter()
1936            .map(|reference| {
1937                let mut row = Row::default();
1938                let mut packer = row.packer();
1939                packer.extend([
1940                    Datum::String(&source_id),
1941                    reference
1942                        .namespace
1943                        .as_ref()
1944                        .map(|s| Datum::String(s))
1945                        .unwrap_or(Datum::Null),
1946                    Datum::String(&reference.name),
1947                    Datum::TimestampTz(
1948                        mz_ore::now::to_datetime(*updated_at)
1949                            .try_into()
1950                            .expect("must fit"),
1951                    ),
1952                ]);
1953                if reference.columns.len() > 0 {
1954                    packer
1955                        .try_push_array(
1956                            &[ArrayDimension {
1957                                lower_bound: 1,
1958                                length: reference.columns.len(),
1959                            }],
1960                            reference.columns.iter().map(|col| Datum::String(col)),
1961                        )
1962                        .expect(
1963                            "columns is 1 dimensional, and its length is used for the array length",
1964                        );
1965                } else {
1966                    packer.push(Datum::Null);
1967                }
1968
1969                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
1970            })
1971            .collect()
1972    }
1973}