Skip to main content

mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20    MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_DEFAULT_PRIVILEGES,
21    MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS,
22    MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
23    MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24    MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25    MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26    MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27    MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
28    MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
29    MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35    CatalogEntry, CatalogItem, ClusterVariant, Connection, DataSourceDesc, Func, Index,
36    MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39    ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55    CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76// DO NOT add any more imports from `crate` outside of `crate::catalog`.
77use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81/// An update to a built-in table.
82#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84    /// The reference of the table to update.
85    pub id: T,
86    /// The data to put into the table.
87    pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
92    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93        BuiltinTableUpdate {
94            id,
95            data: TableData::Rows(vec![(row, diff)]),
96        }
97    }
98
99    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100        BuiltinTableUpdate {
101            id,
102            data: TableData::Batches(smallvec![batch]),
103        }
104    }
105}
106
107impl CatalogState {
108    pub fn resolve_builtin_table_updates(
109        &self,
110        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112        builtin_table_update
113            .into_iter()
114            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115            .collect()
116    }
117
118    pub fn resolve_builtin_table_update(
119        &self,
120        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121    ) -> BuiltinTableUpdate<CatalogItemId> {
122        let id = self.resolve_builtin_table(id);
123        BuiltinTableUpdate { id, data }
124    }
125
126    pub fn pack_depends_update(
127        &self,
128        depender: CatalogItemId,
129        dependee: CatalogItemId,
130        diff: Diff,
131    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132        let row = Row::pack_slice(&[
133            Datum::String(&depender.to_string()),
134            Datum::String(&dependee.to_string()),
135        ]);
136        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137    }
138
139    pub(super) fn pack_role_auth_update(
140        &self,
141        id: RoleId,
142        diff: Diff,
143    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144        let role_auth = self.get_role_auth(&id);
145        let role = self.get_role(&id);
146        BuiltinTableUpdate::row(
147            &*MZ_ROLE_AUTH,
148            Row::pack_slice(&[
149                Datum::String(&role_auth.role_id.to_string()),
150                Datum::UInt32(role.oid),
151                match &role_auth.password_hash {
152                    Some(hash) => Datum::String(hash),
153                    None => Datum::Null,
154                },
155                Datum::TimestampTz(
156                    mz_ore::now::to_datetime(role_auth.updated_at)
157                        .try_into()
158                        .expect("must fit"),
159                ),
160            ]),
161            diff,
162        )
163    }
164
165    pub(super) fn pack_role_update(
166        &self,
167        id: RoleId,
168        diff: Diff,
169    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170        match id {
171            // PUBLIC role should not show up in mz_roles.
172            RoleId::Public => vec![],
173            id => {
174                let role = self.get_role(&id);
175                let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177                let rolcanlogin = if let Some(login) = role.attributes.login {
178                    login
179                } else {
180                    builtin_supers.contains(&role.id)
181                };
182
183                let rolsuper = if let Some(superuser) = role.attributes.superuser {
184                    Datum::from(superuser)
185                } else if builtin_supers.contains(&role.id) {
186                    // System roles (mz_system, mz_support) are superusers
187                    Datum::from(true)
188                } else {
189                    // In cloud environments, superuser status is determined
190                    // at login, so we set `rolsuper` to `null` here because
191                    // it cannot be known beforehand. For self-managed
192                    // auth, roles created without an explicit SUPERUSER
193                    // attribute would typically have `rolsuper` set to false.
194                    // However, since we cannot reliably distinguish between
195                    // cloud and self-managed here, we conservatively use NULL
196                    // for indeterminate cases.
197                    Datum::Null
198                };
199
200                let role_update = BuiltinTableUpdate::row(
201                    &*MZ_ROLES,
202                    Row::pack_slice(&[
203                        Datum::String(&role.id.to_string()),
204                        Datum::UInt32(role.oid),
205                        Datum::String(&role.name),
206                        Datum::from(role.attributes.inherit),
207                        Datum::from(rolcanlogin),
208                        rolsuper,
209                    ]),
210                    diff,
211                );
212                let mut updates = vec![role_update];
213
214                // HACK/TODO(parkmycar): Creating an empty SessionVars like this is pretty hacky,
215                // we should instead have a static list of all session vars.
216                let session_vars_reference = SessionVars::new_unchecked(
217                    &mz_build_info::DUMMY_BUILD_INFO,
218                    SYSTEM_USER.clone(),
219                    None,
220                );
221
222                for (name, val) in role.vars() {
223                    let result = session_vars_reference
224                        .inspect(name)
225                        .and_then(|var| var.check(val.borrow()));
226                    let Ok(formatted_val) = result else {
227                        // Note: all variables should have been validated by this point, so we
228                        // shouldn't ever hit this.
229                        tracing::error!(?name, ?val, ?result, "found invalid role default var");
230                        continue;
231                    };
232
233                    let role_var_update = BuiltinTableUpdate::row(
234                        &*MZ_ROLE_PARAMETERS,
235                        Row::pack_slice(&[
236                            Datum::String(&role.id.to_string()),
237                            Datum::String(name),
238                            Datum::String(&formatted_val),
239                        ]),
240                        diff,
241                    );
242                    updates.push(role_var_update);
243                }
244
245                updates
246            }
247        }
248    }
249
250    pub(super) fn pack_cluster_update(
251        &self,
252        name: &str,
253        diff: Diff,
254    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255        let id = self.clusters_by_name[name];
256        let cluster = &self.clusters_by_id[&id];
257        let row = self.pack_privilege_array_row(cluster.privileges());
258        let privileges = row.unpack_first();
259        let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260            match &cluster.config.variant {
261                ClusterVariant::Managed(config) => (
262                    Some(config.size.as_str()),
263                    Some(self.cluster_replica_size_has_disk(&config.size)),
264                    Some(config.replication_factor),
265                    if config.availability_zones.is_empty() {
266                        None
267                    } else {
268                        Some(config.availability_zones.clone())
269                    },
270                    Some(config.logging.log_logging),
271                    config.logging.interval.map(|d| {
272                        Interval::from_duration(&d)
273                            .expect("planning ensured this convertible back to interval")
274                    }),
275                ),
276                ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277            };
278
279        let mut row = Row::default();
280        let mut packer = row.packer();
281        packer.extend([
282            Datum::String(&id.to_string()),
283            Datum::String(name),
284            Datum::String(&cluster.owner_id.to_string()),
285            privileges,
286            cluster.is_managed().into(),
287            size.into(),
288            replication_factor.into(),
289            disk.into(),
290        ]);
291        if let Some(azs) = azs {
292            packer.push_list(azs.iter().map(|az| Datum::String(az)));
293        } else {
294            packer.push(Datum::Null);
295        }
296        packer.push(Datum::from(introspection_debugging));
297        packer.push(Datum::from(introspection_interval));
298
299        let mut updates = Vec::new();
300
301        updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303        if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304            let row = match managed_config.schedule {
305                ClusterSchedule::Manual => Row::pack_slice(&[
306                    Datum::String(&id.to_string()),
307                    Datum::String("manual"),
308                    Datum::Null,
309                ]),
310                ClusterSchedule::Refresh {
311                    hydration_time_estimate,
312                } => Row::pack_slice(&[
313                    Datum::String(&id.to_string()),
314                    Datum::String("on-refresh"),
315                    Datum::Interval(
316                        Interval::from_duration(&hydration_time_estimate)
317                            .expect("planning ensured that this is convertible back to Interval"),
318                    ),
319                ]),
320            };
321            updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322        }
323
324        updates
325    }
326
327    pub(super) fn pack_cluster_replica_update(
328        &self,
329        cluster_id: ClusterId,
330        name: &str,
331        diff: Diff,
332    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333        let cluster = &self.clusters_by_id[&cluster_id];
334        let id = cluster.replica_id(name).expect("Must exist");
335        let replica = cluster.replica(id).expect("Must exist");
336
337        let (size, disk, az) = match &replica.config.location {
338            // TODO(guswynn): The column should be `availability_zones`, not
339            // `availability_zone`.
340            ReplicaLocation::Managed(ManagedReplicaLocation {
341                size,
342                availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343                allocation: _,
344                billed_as: _,
345                internal: _,
346                pending: _,
347            }) => (
348                Some(&**size),
349                Some(self.cluster_replica_size_has_disk(size)),
350                Some(az.as_str()),
351            ),
352            ReplicaLocation::Managed(ManagedReplicaLocation {
353                size,
354                availability_zones: _,
355                allocation: _,
356                billed_as: _,
357                internal: _,
358                pending: _,
359            }) => (
360                Some(&**size),
361                Some(self.cluster_replica_size_has_disk(size)),
362                None,
363            ),
364            ReplicaLocation::Unmanaged(_) => (None, None, None),
365        };
366
367        let cluster_replica_update = BuiltinTableUpdate::row(
368            &*MZ_CLUSTER_REPLICAS,
369            Row::pack_slice(&[
370                Datum::String(&id.to_string()),
371                Datum::String(name),
372                Datum::String(&cluster_id.to_string()),
373                Datum::from(size),
374                Datum::from(az),
375                Datum::String(&replica.owner_id.to_string()),
376                Datum::from(disk),
377            ]),
378            diff,
379        );
380
381        vec![cluster_replica_update]
382    }
383
384    pub(super) fn pack_item_update(
385        &self,
386        id: CatalogItemId,
387        diff: Diff,
388    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389        let entry = self.get_entry(&id);
390        let oid = entry.oid();
391        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392        let schema_id = &self
393            .get_schema(
394                &entry.name().qualifiers.database_spec,
395                &entry.name().qualifiers.schema_spec,
396                conn_id,
397            )
398            .id;
399        let name = &entry.name().item;
400        let owner_id = entry.owner_id();
401        let privileges_row = self.pack_privilege_array_row(entry.privileges());
402        let privileges = privileges_row.unpack_first();
403        let mut updates = match entry.item() {
404            CatalogItem::Index(index) => {
405                self.pack_index_update(id, oid, name, owner_id, index, diff)
406            }
407            CatalogItem::Table(table) => {
408                let mut updates = self
409                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
410
411                if let TableDataSource::DataSource {
412                    desc: data_source,
413                    timeline: _,
414                } = &table.data_source
415                {
416                    updates.extend(match data_source {
417                        DataSourceDesc::IngestionExport {
418                            ingestion_id,
419                            external_reference: UnresolvedItemName(external_reference),
420                            details: _,
421                            data_config: _,
422                        } => {
423                            let ingestion_entry = self
424                                .get_entry(ingestion_id)
425                                .source_desc()
426                                .expect("primary source exists")
427                                .expect("primary source is a source");
428
429                            match ingestion_entry.connection.name() {
430                                "postgres" => {
431                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
432                                    // The left-most qualification of Postgres
433                                    // tables is the database, but this
434                                    // information is redundant because each
435                                    // Postgres connection connects to only one
436                                    // database.
437                                    let schema_name = external_reference[1].as_str();
438                                    let table_name = external_reference[2].as_str();
439
440                                    self.pack_postgres_source_tables_update(
441                                        id,
442                                        schema_name,
443                                        table_name,
444                                        diff,
445                                    )
446                                }
447                                "mysql" => {
448                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
449                                    let schema_name = external_reference[0].as_str();
450                                    let table_name = external_reference[1].as_str();
451
452                                    self.pack_mysql_source_tables_update(
453                                        id,
454                                        schema_name,
455                                        table_name,
456                                        diff,
457                                    )
458                                }
459                                "sql-server" => {
460                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
461                                    // The left-most qualification of SQL Server tables is
462                                    // the database, but this information is redundant
463                                    // because each SQL Server connection connects to
464                                    // only one database.
465                                    let schema_name = external_reference[1].as_str();
466                                    let table_name = external_reference[2].as_str();
467
468                                    self.pack_sql_server_source_table_update(
469                                        id,
470                                        schema_name,
471                                        table_name,
472                                        diff,
473                                    )
474                                }
475                                // Load generator sources don't have any special
476                                // updates.
477                                "load-generator" => vec![],
478                                "kafka" => {
479                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
480                                    let topic = external_reference[0].as_str();
481                                    let envelope = data_source.envelope();
482                                    let (key_format, value_format) = data_source.formats();
483
484                                    self.pack_kafka_source_tables_update(
485                                        id,
486                                        topic,
487                                        envelope,
488                                        key_format,
489                                        value_format,
490                                        diff,
491                                    )
492                                }
493                                s => unreachable!("{s} sources do not have tables"),
494                            }
495                        }
496                        DataSourceDesc::Ingestion { .. }
497                        | DataSourceDesc::OldSyntaxIngestion { .. }
498                        | DataSourceDesc::Introspection(_)
499                        | DataSourceDesc::Progress
500                        | DataSourceDesc::Webhook { .. }
501                        | DataSourceDesc::Catalog => vec![],
502                    });
503                }
504
505                updates
506            }
507            CatalogItem::Source(source) => {
508                match &source.data_source {
509                    DataSourceDesc::Ingestion { desc, .. }
510                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
511                        GenericSourceConnection::Postgres(postgres) => {
512                            self.pack_postgres_source_update(id, postgres, diff)
513                        }
514                        GenericSourceConnection::Kafka(kafka) => {
515                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
516                        }
517                        _ => vec![],
518                    },
519                    DataSourceDesc::IngestionExport {
520                        ingestion_id,
521                        external_reference: UnresolvedItemName(external_reference),
522                        details: _,
523                        data_config: _,
524                    } => {
525                        let ingestion_entry = self
526                            .get_entry(ingestion_id)
527                            .source_desc()
528                            .expect("primary source exists")
529                            .expect("primary source is a source");
530
531                        match ingestion_entry.connection.name() {
532                            "postgres" => {
533                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
534                                // The left-most qualification of Postgres
535                                // tables is the database, but this
536                                // information is redundant because each
537                                // Postgres connection connects to only one
538                                // database.
539                                let schema_name = external_reference[1].as_str();
540                                let table_name = external_reference[2].as_str();
541
542                                self.pack_postgres_source_tables_update(
543                                    id,
544                                    schema_name,
545                                    table_name,
546                                    diff,
547                                )
548                            }
549                            "mysql" => {
550                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
551                                let schema_name = external_reference[0].as_str();
552                                let table_name = external_reference[1].as_str();
553
554                                self.pack_mysql_source_tables_update(
555                                    id,
556                                    schema_name,
557                                    table_name,
558                                    diff,
559                                )
560                            }
561                            "sql-server" => {
562                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
563                                // The left-most qualification of SQL Server tables is
564                                // the database, but this information is redundant
565                                // because each SQL Server connection connects to
566                                // only one database.
567                                let schema_name = external_reference[1].as_str();
568                                let table_name = external_reference[2].as_str();
569
570                                self.pack_sql_server_source_table_update(
571                                    id,
572                                    schema_name,
573                                    table_name,
574                                    diff,
575                                )
576                            }
577                            // Load generator sources don't have any special
578                            // updates.
579                            "load-generator" => vec![],
580                            s => unreachable!("{s} sources do not have subsources"),
581                        }
582                    }
583                    DataSourceDesc::Webhook { .. } => {
584                        vec![self.pack_webhook_source_update(id, diff)]
585                    }
586                    DataSourceDesc::Introspection(_)
587                    | DataSourceDesc::Progress
588                    | DataSourceDesc::Catalog => vec![],
589                }
590            }
591            CatalogItem::View(view) => {
592                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
593            }
594            CatalogItem::MaterializedView(mview) => {
595                self.pack_materialized_view_update(id, mview, diff)
596            }
597            CatalogItem::Sink(sink) => {
598                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
599            }
600            CatalogItem::Type(ty) => {
601                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
602            }
603            CatalogItem::Func(func) => {
604                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
605            }
606            CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
607            CatalogItem::Connection(connection) => {
608                self.pack_connection_update(id, connection, diff)
609            }
610        };
611
612        if !entry.item().is_temporary() {
613            // Populate or clean up the `mz_object_dependencies` table.
614            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
615            for dependee in entry.item().references().items() {
616                updates.push(self.pack_depends_update(id, *dependee, diff))
617            }
618        }
619
620        // Always report the latest for an objects columns.
621        if let Some(desc) = entry.relation_desc_latest() {
622            let defaults = match entry.item() {
623                CatalogItem::Table(Table {
624                    data_source: TableDataSource::TableWrites { defaults },
625                    ..
626                }) => Some(defaults),
627                _ => None,
628            };
629            for (i, (column_name, column_type)) in desc.iter().enumerate() {
630                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
631                let default: Datum = default
632                    .as_ref()
633                    .map(|d| Datum::String(d))
634                    .unwrap_or(Datum::Null);
635                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
636                let (type_name, type_oid) = match &column_type.scalar_type {
637                    SqlScalarType::List {
638                        custom_id: Some(custom_id),
639                        ..
640                    }
641                    | SqlScalarType::Map {
642                        custom_id: Some(custom_id),
643                        ..
644                    }
645                    | SqlScalarType::Record {
646                        custom_id: Some(custom_id),
647                        ..
648                    } => {
649                        let entry = self.get_entry(custom_id);
650                        // NOTE(benesch): the `mz_columns.type text` field is
651                        // wrong. Types do not have a name that can be
652                        // represented as a single textual field. There can be
653                        // multiple types with the same name in different
654                        // schemas and databases. We should eventually deprecate
655                        // the `type` field in favor of a new `type_id` field
656                        // that can be joined against `mz_types`.
657                        //
658                        // For now, in the interest of pragmatism, we just use
659                        // the type's item name, and accept that there may be
660                        // ambiguity if the same type name is used in multiple
661                        // schemas. The ambiguity is mitigated by the OID, which
662                        // can be joined against `mz_types.oid` to resolve the
663                        // ambiguity.
664                        let name = &*entry.name().item;
665                        let oid = entry.oid();
666                        (name, oid)
667                    }
668                    _ => (pgtype.name(), pgtype.oid()),
669                };
670                updates.push(BuiltinTableUpdate::row(
671                    &*MZ_COLUMNS,
672                    Row::pack_slice(&[
673                        Datum::String(&id.to_string()),
674                        Datum::String(column_name),
675                        Datum::UInt64(u64::cast_from(i + 1)),
676                        Datum::from(column_type.nullable),
677                        Datum::String(type_name),
678                        default,
679                        Datum::UInt32(type_oid),
680                        Datum::Int32(pgtype.typmod()),
681                    ]),
682                    diff,
683                ));
684            }
685        }
686
687        // Use initial lcw so that we can tell apart default from non-existent windows.
688        if let Some(cw) = entry.item().initial_logical_compaction_window() {
689            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
690        }
691
692        updates.extend(Self::pack_item_global_id_update(entry, diff));
693
694        updates
695    }
696
697    fn pack_item_global_id_update(
698        entry: &CatalogEntry,
699        diff: Diff,
700    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
701        let id = entry.id().to_string();
702        let global_ids = entry.global_ids();
703        global_ids.map(move |global_id| {
704            BuiltinTableUpdate::row(
705                &*MZ_OBJECT_GLOBAL_IDS,
706                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
707                diff,
708            )
709        })
710    }
711
712    fn pack_history_retention_strategy_update(
713        &self,
714        id: CatalogItemId,
715        cw: CompactionWindow,
716        diff: Diff,
717    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
718        let cw: u64 = cw.comparable_timestamp().into();
719        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
720            .expect("must serialize");
721        BuiltinTableUpdate::row(
722            &*MZ_HISTORY_RETENTION_STRATEGIES,
723            Row::pack_slice(&[
724                Datum::String(&id.to_string()),
725                // FOR is the only strategy at the moment. We may introduce FROM or others later.
726                Datum::String("FOR"),
727                cw.into_row().into_element(),
728            ]),
729            diff,
730        )
731    }
732
733    fn pack_table_update(
734        &self,
735        id: CatalogItemId,
736        oid: u32,
737        schema_id: &SchemaSpecifier,
738        name: &str,
739        owner_id: &RoleId,
740        privileges: Datum,
741        diff: Diff,
742        table: &Table,
743    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
744        let redacted = table.create_sql.as_ref().map(|create_sql| {
745            mz_sql::parse::parse(create_sql)
746                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
747                .into_element()
748                .ast
749                .to_ast_string_redacted()
750        });
751        let source_id = if let TableDataSource::DataSource {
752            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
753            ..
754        } = &table.data_source
755        {
756            Some(ingestion_id.to_string())
757        } else {
758            None
759        };
760
761        vec![BuiltinTableUpdate::row(
762            &*MZ_TABLES,
763            Row::pack_slice(&[
764                Datum::String(&id.to_string()),
765                Datum::UInt32(oid),
766                Datum::String(&schema_id.to_string()),
767                Datum::String(name),
768                Datum::String(&owner_id.to_string()),
769                privileges,
770                if let Some(create_sql) = &table.create_sql {
771                    Datum::String(create_sql)
772                } else {
773                    Datum::Null
774                },
775                if let Some(redacted) = &redacted {
776                    Datum::String(redacted)
777                } else {
778                    Datum::Null
779                },
780                if let Some(source_id) = source_id.as_ref() {
781                    Datum::String(source_id)
782                } else {
783                    Datum::Null
784                },
785            ]),
786            diff,
787        )]
788    }
789
790    fn pack_postgres_source_update(
791        &self,
792        id: CatalogItemId,
793        postgres: &PostgresSourceConnection<ReferencedConnection>,
794        diff: Diff,
795    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
796        vec![BuiltinTableUpdate::row(
797            &*MZ_POSTGRES_SOURCES,
798            Row::pack_slice(&[
799                Datum::String(&id.to_string()),
800                Datum::String(&postgres.publication_details.slot),
801                Datum::from(postgres.publication_details.timeline_id),
802            ]),
803            diff,
804        )]
805    }
806
807    fn pack_kafka_source_update(
808        &self,
809        item_id: CatalogItemId,
810        collection_id: GlobalId,
811        kafka: &KafkaSourceConnection<ReferencedConnection>,
812        diff: Diff,
813    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
814        vec![BuiltinTableUpdate::row(
815            &*MZ_KAFKA_SOURCES,
816            Row::pack_slice(&[
817                Datum::String(&item_id.to_string()),
818                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
819                Datum::String(&kafka.topic),
820            ]),
821            diff,
822        )]
823    }
824
825    fn pack_postgres_source_tables_update(
826        &self,
827        id: CatalogItemId,
828        schema_name: &str,
829        table_name: &str,
830        diff: Diff,
831    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
832        vec![BuiltinTableUpdate::row(
833            &*MZ_POSTGRES_SOURCE_TABLES,
834            Row::pack_slice(&[
835                Datum::String(&id.to_string()),
836                Datum::String(schema_name),
837                Datum::String(table_name),
838            ]),
839            diff,
840        )]
841    }
842
843    fn pack_mysql_source_tables_update(
844        &self,
845        id: CatalogItemId,
846        schema_name: &str,
847        table_name: &str,
848        diff: Diff,
849    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
850        vec![BuiltinTableUpdate::row(
851            &*MZ_MYSQL_SOURCE_TABLES,
852            Row::pack_slice(&[
853                Datum::String(&id.to_string()),
854                Datum::String(schema_name),
855                Datum::String(table_name),
856            ]),
857            diff,
858        )]
859    }
860
861    fn pack_sql_server_source_table_update(
862        &self,
863        id: CatalogItemId,
864        schema_name: &str,
865        table_name: &str,
866        diff: Diff,
867    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
868        vec![BuiltinTableUpdate::row(
869            &*MZ_SQL_SERVER_SOURCE_TABLES,
870            Row::pack_slice(&[
871                Datum::String(&id.to_string()),
872                Datum::String(schema_name),
873                Datum::String(table_name),
874            ]),
875            diff,
876        )]
877    }
878
879    fn pack_kafka_source_tables_update(
880        &self,
881        id: CatalogItemId,
882        topic: &str,
883        envelope: Option<&str>,
884        key_format: Option<&str>,
885        value_format: Option<&str>,
886        diff: Diff,
887    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
888        vec![BuiltinTableUpdate::row(
889            &*MZ_KAFKA_SOURCE_TABLES,
890            Row::pack_slice(&[
891                Datum::String(&id.to_string()),
892                Datum::String(topic),
893                Datum::from(envelope),
894                Datum::from(key_format),
895                Datum::from(value_format),
896            ]),
897            diff,
898        )]
899    }
900
901    fn pack_connection_update(
902        &self,
903        id: CatalogItemId,
904        connection: &Connection,
905        diff: Diff,
906    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
907        let mut updates = vec![];
908        match connection.details {
909            ConnectionDetails::Kafka(ref kafka) => {
910                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
911            }
912            ConnectionDetails::Aws(ref aws_config) => {
913                match self.pack_aws_connection_update(id, aws_config, diff) {
914                    Ok(update) => {
915                        updates.push(update);
916                    }
917                    Err(e) => {
918                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
919                    }
920                }
921            }
922            ConnectionDetails::AwsPrivatelink(_) => {
923                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
924                    updates.push(self.pack_aws_privatelink_connection_update(
925                        id,
926                        aws_principal_context,
927                        diff,
928                    ));
929                } else {
930                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
931                }
932            }
933            ConnectionDetails::Ssh {
934                ref key_1,
935                ref key_2,
936                ..
937            } => {
938                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
939            }
940            ConnectionDetails::Csr(_)
941            | ConnectionDetails::Postgres(_)
942            | ConnectionDetails::MySql(_)
943            | ConnectionDetails::SqlServer(_)
944            | ConnectionDetails::IcebergCatalog(_) => (),
945        };
946        updates
947    }
948
949    pub(crate) fn pack_ssh_tunnel_connection_update(
950        &self,
951        id: CatalogItemId,
952        key_1: &SshKey,
953        key_2: &SshKey,
954        diff: Diff,
955    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
956        BuiltinTableUpdate::row(
957            &*MZ_SSH_TUNNEL_CONNECTIONS,
958            Row::pack_slice(&[
959                Datum::String(&id.to_string()),
960                Datum::String(key_1.public_key().as_str()),
961                Datum::String(key_2.public_key().as_str()),
962            ]),
963            diff,
964        )
965    }
966
967    fn pack_kafka_connection_update(
968        &self,
969        id: CatalogItemId,
970        kafka: &KafkaConnection<ReferencedConnection>,
971        diff: Diff,
972    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
973        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
974        let mut row = Row::default();
975        row.packer()
976            .try_push_array(
977                &[ArrayDimension {
978                    lower_bound: 1,
979                    length: kafka.brokers.len(),
980                }],
981                kafka
982                    .brokers
983                    .iter()
984                    .map(|broker| Datum::String(&broker.address)),
985            )
986            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
987        let brokers = row.unpack_first();
988        vec![BuiltinTableUpdate::row(
989            &*MZ_KAFKA_CONNECTIONS,
990            Row::pack_slice(&[
991                Datum::String(&id.to_string()),
992                brokers,
993                Datum::String(&progress_topic),
994            ]),
995            diff,
996        )]
997    }
998
999    pub fn pack_aws_privatelink_connection_update(
1000        &self,
1001        connection_id: CatalogItemId,
1002        aws_principal_context: &AwsPrincipalContext,
1003        diff: Diff,
1004    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1005        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1006        let row = Row::pack_slice(&[
1007            Datum::String(&connection_id.to_string()),
1008            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1009        ]);
1010        BuiltinTableUpdate::row(id, row, diff)
1011    }
1012
1013    pub fn pack_aws_connection_update(
1014        &self,
1015        connection_id: CatalogItemId,
1016        aws_config: &AwsConnection,
1017        diff: Diff,
1018    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1019        let id = &MZ_AWS_CONNECTIONS;
1020
1021        let mut access_key_id = None;
1022        let mut access_key_id_secret_id = None;
1023        let mut secret_access_key_secret_id = None;
1024        let mut session_token = None;
1025        let mut session_token_secret_id = None;
1026        let mut assume_role_arn = None;
1027        let mut assume_role_session_name = None;
1028        let mut principal = None;
1029        let mut external_id = None;
1030        let mut example_trust_policy = None;
1031        match &aws_config.auth {
1032            AwsAuth::Credentials(credentials) => {
1033                match &credentials.access_key_id {
1034                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1035                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1036                }
1037                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1038                match credentials.session_token.as_ref() {
1039                    None => (),
1040                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1041                    Some(StringOrSecret::Secret(s)) => {
1042                        session_token_secret_id = Some(s.to_string())
1043                    }
1044                }
1045            }
1046            AwsAuth::AssumeRole(assume_role) => {
1047                assume_role_arn = Some(assume_role.arn.as_str());
1048                assume_role_session_name = assume_role.session_name.as_deref();
1049                principal = self
1050                    .config
1051                    .connection_context
1052                    .aws_connection_role_arn
1053                    .as_deref();
1054                external_id =
1055                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1056                example_trust_policy = {
1057                    let policy = assume_role
1058                        .example_trust_policy(&self.config.connection_context, connection_id)?;
1059                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
1060                    Some(policy.into_row())
1061                };
1062            }
1063        }
1064
1065        let row = Row::pack_slice(&[
1066            Datum::String(&connection_id.to_string()),
1067            Datum::from(aws_config.endpoint.as_deref()),
1068            Datum::from(aws_config.region.as_deref()),
1069            Datum::from(access_key_id),
1070            Datum::from(access_key_id_secret_id.as_deref()),
1071            Datum::from(secret_access_key_secret_id.as_deref()),
1072            Datum::from(session_token),
1073            Datum::from(session_token_secret_id.as_deref()),
1074            Datum::from(assume_role_arn),
1075            Datum::from(assume_role_session_name),
1076            Datum::from(principal),
1077            Datum::from(external_id.as_deref()),
1078            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1079        ]);
1080
1081        Ok(BuiltinTableUpdate::row(id, row, diff))
1082    }
1083
1084    fn pack_view_update(
1085        &self,
1086        id: CatalogItemId,
1087        oid: u32,
1088        schema_id: &SchemaSpecifier,
1089        name: &str,
1090        owner_id: &RoleId,
1091        privileges: Datum,
1092        view: &View,
1093        diff: Diff,
1094    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1095        let create_stmt = mz_sql::parse::parse(&view.create_sql)
1096            .unwrap_or_else(|e| {
1097                panic!(
1098                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1099                    view.create_sql, e
1100                )
1101            })
1102            .into_element()
1103            .ast;
1104        let query = match &create_stmt {
1105            Statement::CreateView(stmt) => &stmt.definition.query,
1106            _ => unreachable!(),
1107        };
1108
1109        let mut query_string = query.to_ast_string_stable();
1110        // PostgreSQL appends a semicolon in `pg_views.definition`, we
1111        // do the same for compatibility's sake.
1112        query_string.push(';');
1113
1114        vec![BuiltinTableUpdate::row(
1115            &*MZ_VIEWS,
1116            Row::pack_slice(&[
1117                Datum::String(&id.to_string()),
1118                Datum::UInt32(oid),
1119                Datum::String(&schema_id.to_string()),
1120                Datum::String(name),
1121                Datum::String(&query_string),
1122                Datum::String(&owner_id.to_string()),
1123                privileges,
1124                Datum::String(&view.create_sql),
1125                Datum::String(&create_stmt.to_ast_string_redacted()),
1126            ]),
1127            diff,
1128        )]
1129    }
1130
1131    fn pack_materialized_view_update(
1132        &self,
1133        id: CatalogItemId,
1134        mview: &MaterializedView,
1135        diff: Diff,
1136    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1137        let mut updates = Vec::new();
1138
1139        if let Some(refresh_schedule) = &mview.refresh_schedule {
1140            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
1141            // empty `RefreshSchedule`.
1142            assert!(!refresh_schedule.is_empty());
1143            for RefreshEvery {
1144                interval,
1145                aligned_to,
1146            } in refresh_schedule.everies.iter()
1147            {
1148                let aligned_to_dt = mz_ore::now::to_datetime(
1149                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1150                );
1151                updates.push(BuiltinTableUpdate::row(
1152                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1153                    Row::pack_slice(&[
1154                        Datum::String(&id.to_string()),
1155                        Datum::String("every"),
1156                        Datum::Interval(
1157                            Interval::from_duration(interval).expect(
1158                                "planning ensured that this is convertible back to Interval",
1159                            ),
1160                        ),
1161                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1162                        Datum::Null,
1163                    ]),
1164                    diff,
1165                ));
1166            }
1167            for at in refresh_schedule.ats.iter() {
1168                let at_dt = mz_ore::now::to_datetime(
1169                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1170                );
1171                updates.push(BuiltinTableUpdate::row(
1172                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1173                    Row::pack_slice(&[
1174                        Datum::String(&id.to_string()),
1175                        Datum::String("at"),
1176                        Datum::Null,
1177                        Datum::Null,
1178                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1179                    ]),
1180                    diff,
1181                ));
1182            }
1183        } else {
1184            updates.push(BuiltinTableUpdate::row(
1185                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1186                Row::pack_slice(&[
1187                    Datum::String(&id.to_string()),
1188                    Datum::String("on-commit"),
1189                    Datum::Null,
1190                    Datum::Null,
1191                    Datum::Null,
1192                ]),
1193                diff,
1194            ));
1195        }
1196
1197        if let Some(target_id) = mview.replacement_target {
1198            updates.push(BuiltinTableUpdate::row(
1199                &*MZ_REPLACEMENTS,
1200                Row::pack_slice(&[
1201                    Datum::String(&id.to_string()),
1202                    Datum::String(&target_id.to_string()),
1203                ]),
1204                diff,
1205            ));
1206        }
1207
1208        updates
1209    }
1210
1211    fn pack_sink_update(
1212        &self,
1213        id: CatalogItemId,
1214        oid: u32,
1215        schema_id: &SchemaSpecifier,
1216        name: &str,
1217        owner_id: &RoleId,
1218        sink: &Sink,
1219        diff: Diff,
1220    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1221        let mut updates = vec![];
1222        match &sink.connection {
1223            StorageSinkConnection::Kafka(KafkaSinkConnection {
1224                topic: topic_name, ..
1225            }) => {
1226                updates.push(BuiltinTableUpdate::row(
1227                    &*MZ_KAFKA_SINKS,
1228                    Row::pack_slice(&[
1229                        Datum::String(&id.to_string()),
1230                        Datum::String(topic_name.as_str()),
1231                    ]),
1232                    diff,
1233                ));
1234            }
1235            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1236                namespace, table, ..
1237            }) => {
1238                updates.push(BuiltinTableUpdate::row(
1239                    &*MZ_ICEBERG_SINKS,
1240                    Row::pack_slice(&[
1241                        Datum::String(&id.to_string()),
1242                        Datum::String(namespace.as_str()),
1243                        Datum::String(table.as_str()),
1244                    ]),
1245                    diff,
1246                ));
1247            }
1248        };
1249
1250        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1251            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1252            .into_element()
1253            .ast;
1254
1255        let envelope = sink.envelope();
1256
1257        // The combined format string is used for the deprecated `format` column.
1258        let combined_format = sink.combined_format();
1259        let (key_format, value_format) = match sink.formats() {
1260            Some((key_format, value_format)) => (key_format, Some(value_format)),
1261            None => (None, None),
1262        };
1263
1264        updates.push(BuiltinTableUpdate::row(
1265            &*MZ_SINKS,
1266            Row::pack_slice(&[
1267                Datum::String(&id.to_string()),
1268                Datum::UInt32(oid),
1269                Datum::String(&schema_id.to_string()),
1270                Datum::String(name),
1271                Datum::String(sink.connection.name()),
1272                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1273                // size column now deprecated w/o linked clusters
1274                Datum::Null,
1275                Datum::from(envelope),
1276                // FIXME: These key/value formats are kinda leaky! Should probably live in
1277                // the kafka sink table.
1278                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1279                Datum::from(key_format),
1280                Datum::from(value_format),
1281                Datum::String(&sink.cluster_id.to_string()),
1282                Datum::String(&owner_id.to_string()),
1283                Datum::String(&sink.create_sql),
1284                Datum::String(&create_stmt.to_ast_string_redacted()),
1285            ]),
1286            diff,
1287        ));
1288
1289        updates
1290    }
1291
1292    fn pack_index_update(
1293        &self,
1294        id: CatalogItemId,
1295        oid: u32,
1296        name: &str,
1297        owner_id: &RoleId,
1298        index: &Index,
1299        diff: Diff,
1300    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1301        let mut updates = vec![];
1302
1303        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1304            .unwrap_or_else(|e| {
1305                panic!(
1306                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1307                    index.create_sql, e
1308                )
1309            })
1310            .into_element()
1311            .ast;
1312
1313        let key_sqls = match &create_stmt {
1314            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1315                .as_ref()
1316                .expect("key_parts is filled in during planning"),
1317            _ => unreachable!(),
1318        };
1319        let on_item_id = self.get_entry_by_global_id(&index.on).id();
1320
1321        updates.push(BuiltinTableUpdate::row(
1322            &*MZ_INDEXES,
1323            Row::pack_slice(&[
1324                Datum::String(&id.to_string()),
1325                Datum::UInt32(oid),
1326                Datum::String(name),
1327                Datum::String(&on_item_id.to_string()),
1328                Datum::String(&index.cluster_id.to_string()),
1329                Datum::String(&owner_id.to_string()),
1330                Datum::String(&index.create_sql),
1331                Datum::String(&create_stmt.to_ast_string_redacted()),
1332            ]),
1333            diff,
1334        ));
1335
1336        let on_entry = self.get_entry_by_global_id(&index.on);
1337        let on_desc = on_entry
1338            .relation_desc()
1339            .expect("can only create indexes on items with a valid description");
1340        let repr_col_types: Vec<ReprColumnType> = on_desc
1341            .typ()
1342            .column_types
1343            .iter()
1344            .map(ReprColumnType::from)
1345            .collect();
1346        for (i, key) in index.keys.iter().enumerate() {
1347            let nullable = key.typ(&repr_col_types).nullable;
1348            let seq_in_index = u64::cast_from(i + 1);
1349            let key_sql = key_sqls
1350                .get(i)
1351                .expect("missing sql information for index key")
1352                .to_ast_string_simple();
1353            let (field_number, expression) = match key {
1354                MirScalarExpr::Column(col, _) => {
1355                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1356                }
1357                _ => (Datum::Null, Datum::String(&key_sql)),
1358            };
1359            updates.push(BuiltinTableUpdate::row(
1360                &*MZ_INDEX_COLUMNS,
1361                Row::pack_slice(&[
1362                    Datum::String(&id.to_string()),
1363                    Datum::UInt64(seq_in_index),
1364                    field_number,
1365                    expression,
1366                    Datum::from(nullable),
1367                ]),
1368                diff,
1369            ));
1370        }
1371
1372        updates
1373    }
1374
1375    fn pack_type_update(
1376        &self,
1377        id: CatalogItemId,
1378        oid: u32,
1379        schema_id: &SchemaSpecifier,
1380        name: &str,
1381        owner_id: &RoleId,
1382        privileges: Datum,
1383        typ: &Type,
1384        diff: Diff,
1385    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1386        let mut out = vec![];
1387
1388        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1389            mz_sql::parse::parse(create_sql)
1390                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1391                .into_element()
1392                .ast
1393                .to_ast_string_redacted()
1394        });
1395
1396        out.push(BuiltinTableUpdate::row(
1397            &*MZ_TYPES,
1398            Row::pack_slice(&[
1399                Datum::String(&id.to_string()),
1400                Datum::UInt32(oid),
1401                Datum::String(&schema_id.to_string()),
1402                Datum::String(name),
1403                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1404                Datum::String(&owner_id.to_string()),
1405                privileges,
1406                if let Some(create_sql) = &typ.create_sql {
1407                    Datum::String(create_sql)
1408                } else {
1409                    Datum::Null
1410                },
1411                if let Some(redacted) = &redacted {
1412                    Datum::String(redacted)
1413                } else {
1414                    Datum::Null
1415                },
1416            ]),
1417            diff,
1418        ));
1419
1420        let mut row = Row::default();
1421        let mut packer = row.packer();
1422
1423        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1424            if mods.is_empty() {
1425                packer.push(Datum::Null);
1426            } else {
1427                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1428            }
1429        }
1430
1431        let index_id = match &typ.details.typ {
1432            CatalogType::Array {
1433                element_reference: element_id,
1434            } => {
1435                packer.push(Datum::String(&id.to_string()));
1436                packer.push(Datum::String(&element_id.to_string()));
1437                &MZ_ARRAY_TYPES
1438            }
1439            CatalogType::List {
1440                element_reference: element_id,
1441                element_modifiers,
1442            } => {
1443                packer.push(Datum::String(&id.to_string()));
1444                packer.push(Datum::String(&element_id.to_string()));
1445                append_modifier(&mut packer, element_modifiers);
1446                &MZ_LIST_TYPES
1447            }
1448            CatalogType::Map {
1449                key_reference: key_id,
1450                value_reference: value_id,
1451                key_modifiers,
1452                value_modifiers,
1453            } => {
1454                packer.push(Datum::String(&id.to_string()));
1455                packer.push(Datum::String(&key_id.to_string()));
1456                packer.push(Datum::String(&value_id.to_string()));
1457                append_modifier(&mut packer, key_modifiers);
1458                append_modifier(&mut packer, value_modifiers);
1459                &MZ_MAP_TYPES
1460            }
1461            CatalogType::Pseudo => {
1462                packer.push(Datum::String(&id.to_string()));
1463                &MZ_PSEUDO_TYPES
1464            }
1465            _ => {
1466                packer.push(Datum::String(&id.to_string()));
1467                &MZ_BASE_TYPES
1468            }
1469        };
1470        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1471
1472        if let Some(pg_metadata) = &typ.details.pg_metadata {
1473            out.push(BuiltinTableUpdate::row(
1474                &*MZ_TYPE_PG_METADATA,
1475                Row::pack_slice(&[
1476                    Datum::String(&id.to_string()),
1477                    Datum::UInt32(pg_metadata.typinput_oid),
1478                    Datum::UInt32(pg_metadata.typreceive_oid),
1479                ]),
1480                diff,
1481            ));
1482        }
1483
1484        out
1485    }
1486
1487    fn pack_func_update(
1488        &self,
1489        id: CatalogItemId,
1490        schema_id: &SchemaSpecifier,
1491        name: &str,
1492        owner_id: &RoleId,
1493        func: &Func,
1494        diff: Diff,
1495    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1496        let mut updates = vec![];
1497        for func_impl_details in func.inner.func_impls() {
1498            let arg_type_ids = func_impl_details
1499                .arg_typs
1500                .iter()
1501                .map(|typ| self.get_system_type(typ).id().to_string())
1502                .collect::<Vec<_>>();
1503
1504            let mut row = Row::default();
1505            row.packer()
1506                .try_push_array(
1507                    &[ArrayDimension {
1508                        lower_bound: 1,
1509                        length: arg_type_ids.len(),
1510                    }],
1511                    arg_type_ids.iter().map(|id| Datum::String(id)),
1512                )
1513                .expect(
1514                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1515                );
1516            let arg_type_ids = row.unpack_first();
1517
1518            updates.push(BuiltinTableUpdate::row(
1519                &*MZ_FUNCTIONS,
1520                Row::pack_slice(&[
1521                    Datum::String(&id.to_string()),
1522                    Datum::UInt32(func_impl_details.oid),
1523                    Datum::String(&schema_id.to_string()),
1524                    Datum::String(name),
1525                    arg_type_ids,
1526                    Datum::from(
1527                        func_impl_details
1528                            .variadic_typ
1529                            .map(|typ| self.get_system_type(typ).id().to_string())
1530                            .as_deref(),
1531                    ),
1532                    Datum::from(
1533                        func_impl_details
1534                            .return_typ
1535                            .map(|typ| self.get_system_type(typ).id().to_string())
1536                            .as_deref(),
1537                    ),
1538                    func_impl_details.return_is_set.into(),
1539                    Datum::String(&owner_id.to_string()),
1540                ]),
1541                diff,
1542            ));
1543
1544            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1545                updates.push(BuiltinTableUpdate::row(
1546                    &*MZ_AGGREGATES,
1547                    Row::pack_slice(&[
1548                        Datum::UInt32(func_impl_details.oid),
1549                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1550                        Datum::String("n"),
1551                        Datum::Int16(0),
1552                    ]),
1553                    diff,
1554                ));
1555            }
1556        }
1557        updates
1558    }
1559
1560    pub fn pack_op_update(
1561        &self,
1562        operator: &str,
1563        func_impl_details: FuncImplCatalogDetails,
1564        diff: Diff,
1565    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1566        let arg_type_ids = func_impl_details
1567            .arg_typs
1568            .iter()
1569            .map(|typ| self.get_system_type(typ).id().to_string())
1570            .collect::<Vec<_>>();
1571
1572        let mut row = Row::default();
1573        row.packer()
1574            .try_push_array(
1575                &[ArrayDimension {
1576                    lower_bound: 1,
1577                    length: arg_type_ids.len(),
1578                }],
1579                arg_type_ids.iter().map(|id| Datum::String(id)),
1580            )
1581            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1582        let arg_type_ids = row.unpack_first();
1583
1584        BuiltinTableUpdate::row(
1585            &*MZ_OPERATORS,
1586            Row::pack_slice(&[
1587                Datum::UInt32(func_impl_details.oid),
1588                Datum::String(operator),
1589                arg_type_ids,
1590                Datum::from(
1591                    func_impl_details
1592                        .return_typ
1593                        .map(|typ| self.get_system_type(typ).id().to_string())
1594                        .as_deref(),
1595                ),
1596            ]),
1597            diff,
1598        )
1599    }
1600
1601    pub fn pack_audit_log_update(
1602        &self,
1603        event: &VersionedEvent,
1604        diff: Diff,
1605    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1606        let (event_type, object_type, details, user, occurred_at): (
1607            &EventType,
1608            &ObjectType,
1609            &EventDetails,
1610            &Option<String>,
1611            u64,
1612        ) = match event {
1613            VersionedEvent::V1(ev) => (
1614                &ev.event_type,
1615                &ev.object_type,
1616                &ev.details,
1617                &ev.user,
1618                ev.occurred_at,
1619            ),
1620        };
1621        let details = Jsonb::from_serde_json(details.as_json())
1622            .map_err(|e| {
1623                Error::new(ErrorKind::Unstructured(format!(
1624                    "could not pack audit log update: {}",
1625                    e
1626                )))
1627            })?
1628            .into_row();
1629        let details = details
1630            .iter()
1631            .next()
1632            .expect("details created above with a single jsonb column");
1633        let dt = mz_ore::now::to_datetime(occurred_at);
1634        let id = event.sortable_id();
1635        Ok(BuiltinTableUpdate::row(
1636            &*MZ_AUDIT_EVENTS,
1637            Row::pack_slice(&[
1638                Datum::UInt64(id),
1639                Datum::String(&format!("{}", event_type)),
1640                Datum::String(&format!("{}", object_type)),
1641                details,
1642                match user {
1643                    Some(user) => Datum::String(user),
1644                    None => Datum::Null,
1645                },
1646                Datum::TimestampTz(dt.try_into().expect("must fit")),
1647            ]),
1648            diff,
1649        ))
1650    }
1651
1652    pub fn pack_storage_usage_update(
1653        &self,
1654        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1655        diff: Diff,
1656    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1657        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1658        let row = Row::pack_slice(&[
1659            Datum::UInt64(event.id),
1660            Datum::from(event.shard_id.as_deref()),
1661            Datum::UInt64(event.size_bytes),
1662            Datum::TimestampTz(
1663                mz_ore::now::to_datetime(event.collection_timestamp)
1664                    .try_into()
1665                    .expect("must fit"),
1666            ),
1667        ]);
1668        BuiltinTableUpdate::row(id, row, diff)
1669    }
1670
1671    pub fn pack_egress_ip_update(
1672        &self,
1673        ip: &IpNet,
1674    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1675        let id = &MZ_EGRESS_IPS;
1676        let addr = ip.network();
1677        let row = Row::pack_slice(&[
1678            Datum::String(&addr.to_string()),
1679            Datum::Int32(ip.prefix_len().into()),
1680            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1681        ]);
1682        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1683    }
1684
1685    pub fn pack_license_key_update(
1686        &self,
1687        license_key: &ValidatedLicenseKey,
1688    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1689        let id = &MZ_LICENSE_KEYS;
1690        let row = Row::pack_slice(&[
1691            Datum::String(&license_key.id),
1692            Datum::String(&license_key.organization),
1693            Datum::String(&license_key.environment_id),
1694            Datum::TimestampTz(
1695                mz_ore::now::to_datetime(license_key.expiration * 1000)
1696                    .try_into()
1697                    .expect("must fit"),
1698            ),
1699            Datum::TimestampTz(
1700                mz_ore::now::to_datetime(license_key.not_before * 1000)
1701                    .try_into()
1702                    .expect("must fit"),
1703            ),
1704        ]);
1705        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1706    }
1707
1708    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1709        let mut updates = Vec::new();
1710        for (size, alloc) in &self.cluster_replica_sizes.0 {
1711            if alloc.disabled {
1712                continue;
1713            }
1714
1715            // Just invent something when the limits are `None`, which only happens in non-prod
1716            // environments (tests, process orchestrator, etc.)
1717            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1718            let MemoryLimit(ByteSize(memory_bytes)) =
1719                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1720            let DiskLimit(ByteSize(disk_bytes)) =
1721                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1722
1723            let row = Row::pack_slice(&[
1724                size.as_str().into(),
1725                u64::cast_from(alloc.scale).into(),
1726                u64::cast_from(alloc.workers).into(),
1727                cpu_limit.as_nanocpus().into(),
1728                memory_bytes.into(),
1729                disk_bytes.into(),
1730                (alloc.credits_per_hour).into(),
1731            ]);
1732
1733            updates.push(BuiltinTableUpdate::row(
1734                &*MZ_CLUSTER_REPLICA_SIZES,
1735                row,
1736                Diff::ONE,
1737            ));
1738        }
1739
1740        updates
1741    }
1742
1743    pub fn pack_subscribe_update(
1744        &self,
1745        id: GlobalId,
1746        subscribe: &ActiveSubscribe,
1747        diff: Diff,
1748    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1749        let mut row = Row::default();
1750        let mut packer = row.packer();
1751        packer.push(Datum::String(&id.to_string()));
1752        packer.push(Datum::Uuid(subscribe.session_uuid));
1753        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1754
1755        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1756        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1757
1758        let depends_on: Vec<_> = subscribe
1759            .depends_on
1760            .iter()
1761            .map(|id| id.to_string())
1762            .collect();
1763        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1764
1765        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1766    }
1767
1768    pub fn pack_session_update(
1769        &self,
1770        conn: &ConnMeta,
1771        diff: Diff,
1772    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1773        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1774        BuiltinTableUpdate::row(
1775            &*MZ_SESSIONS,
1776            Row::pack_slice(&[
1777                Datum::Uuid(conn.uuid()),
1778                Datum::UInt32(conn.conn_id().unhandled()),
1779                Datum::String(&conn.authenticated_role_id().to_string()),
1780                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1781                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1782            ]),
1783            diff,
1784        )
1785    }
1786
1787    pub fn pack_default_privileges_update(
1788        &self,
1789        default_privilege_object: &DefaultPrivilegeObject,
1790        grantee: &RoleId,
1791        acl_mode: &AclMode,
1792        diff: Diff,
1793    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1794        BuiltinTableUpdate::row(
1795            &*MZ_DEFAULT_PRIVILEGES,
1796            Row::pack_slice(&[
1797                default_privilege_object.role_id.to_string().as_str().into(),
1798                default_privilege_object
1799                    .database_id
1800                    .map(|database_id| database_id.to_string())
1801                    .as_deref()
1802                    .into(),
1803                default_privilege_object
1804                    .schema_id
1805                    .map(|schema_id| schema_id.to_string())
1806                    .as_deref()
1807                    .into(),
1808                default_privilege_object
1809                    .object_type
1810                    .to_string()
1811                    .to_lowercase()
1812                    .as_str()
1813                    .into(),
1814                grantee.to_string().as_str().into(),
1815                acl_mode.to_string().as_str().into(),
1816            ]),
1817            diff,
1818        )
1819    }
1820
1821    pub fn pack_system_privileges_update(
1822        &self,
1823        privileges: MzAclItem,
1824        diff: Diff,
1825    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1826        BuiltinTableUpdate::row(
1827            &*MZ_SYSTEM_PRIVILEGES,
1828            Row::pack_slice(&[privileges.into()]),
1829            diff,
1830        )
1831    }
1832
1833    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1834        let mut row = Row::default();
1835        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1836        row.packer()
1837            .try_push_array(
1838                &[ArrayDimension {
1839                    lower_bound: 1,
1840                    length: flat_privileges.len(),
1841                }],
1842                flat_privileges
1843                    .into_iter()
1844                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1845            )
1846            .expect("privileges is 1 dimensional, and its length is used for the array length");
1847        row
1848    }
1849
1850    pub fn pack_comment_update(
1851        &self,
1852        object_id: CommentObjectId,
1853        column_pos: Option<usize>,
1854        comment: &str,
1855        diff: Diff,
1856    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1857        // Use the audit log representation so it's easier to join against.
1858        let object_type = mz_sql::catalog::ObjectType::from(object_id);
1859        let audit_type = super::object_type_to_audit_object_type(object_type);
1860        let object_type_str = audit_type.to_string();
1861
1862        let object_id_str = match object_id {
1863            CommentObjectId::Table(global_id)
1864            | CommentObjectId::View(global_id)
1865            | CommentObjectId::MaterializedView(global_id)
1866            | CommentObjectId::Source(global_id)
1867            | CommentObjectId::Sink(global_id)
1868            | CommentObjectId::Index(global_id)
1869            | CommentObjectId::Func(global_id)
1870            | CommentObjectId::Connection(global_id)
1871            | CommentObjectId::Secret(global_id)
1872            | CommentObjectId::Type(global_id) => global_id.to_string(),
1873            CommentObjectId::Role(role_id) => role_id.to_string(),
1874            CommentObjectId::Database(database_id) => database_id.to_string(),
1875            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1876            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1877            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1878            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1879        };
1880        let column_pos_datum = match column_pos {
1881            Some(pos) => {
1882                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
1883                let pos =
1884                    i32::try_from(pos).expect("we constrain this value in the planning layer");
1885                Datum::Int32(pos)
1886            }
1887            None => Datum::Null,
1888        };
1889
1890        BuiltinTableUpdate::row(
1891            &*MZ_COMMENTS,
1892            Row::pack_slice(&[
1893                Datum::String(&object_id_str),
1894                Datum::String(&object_type_str),
1895                column_pos_datum,
1896                Datum::String(comment),
1897            ]),
1898            diff,
1899        )
1900    }
1901
1902    pub fn pack_webhook_source_update(
1903        &self,
1904        item_id: CatalogItemId,
1905        diff: Diff,
1906    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1907        let url = self
1908            .try_get_webhook_url(&item_id)
1909            .expect("webhook source should exist");
1910        let url = url.to_string();
1911        let name = &self.get_entry(&item_id).name().item;
1912        let id_str = item_id.to_string();
1913
1914        BuiltinTableUpdate::row(
1915            &*MZ_WEBHOOKS_SOURCES,
1916            Row::pack_slice(&[
1917                Datum::String(&id_str),
1918                Datum::String(name),
1919                Datum::String(&url),
1920            ]),
1921            diff,
1922        )
1923    }
1924
1925    pub fn pack_source_references_update(
1926        &self,
1927        source_references: &SourceReferences,
1928        diff: Diff,
1929    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1930        let source_id = source_references.source_id.to_string();
1931        let updated_at = &source_references.updated_at;
1932        source_references
1933            .references
1934            .iter()
1935            .map(|reference| {
1936                let mut row = Row::default();
1937                let mut packer = row.packer();
1938                packer.extend([
1939                    Datum::String(&source_id),
1940                    reference
1941                        .namespace
1942                        .as_ref()
1943                        .map(|s| Datum::String(s))
1944                        .unwrap_or(Datum::Null),
1945                    Datum::String(&reference.name),
1946                    Datum::TimestampTz(
1947                        mz_ore::now::to_datetime(*updated_at)
1948                            .try_into()
1949                            .expect("must fit"),
1950                    ),
1951                ]);
1952                if reference.columns.len() > 0 {
1953                    packer
1954                        .try_push_array(
1955                            &[ArrayDimension {
1956                                lower_bound: 1,
1957                                length: reference.columns.len(),
1958                            }],
1959                            reference.columns.iter().map(|col| Datum::String(col)),
1960                        )
1961                        .expect(
1962                            "columns is 1 dimensional, and its length is used for the array length",
1963                        );
1964                } else {
1965                    packer.push(Datum::Null);
1966                }
1967
1968                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
1969            })
1970            .collect()
1971    }
1972}