Skip to main content

mz_adapter/catalog/
builtin_table_updates.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18    BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19    MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZE_INTERNAL,
20    MZ_CLUSTER_REPLICA_SIZES, MZ_COLUMNS, MZ_COMMENTS, MZ_EGRESS_IPS, MZ_FUNCTIONS,
21    MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_KAFKA_CONNECTIONS,
22    MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES,
23    MZ_MAP_TYPES, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES,
24    MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES,
25    MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_SESSIONS, MZ_SINKS,
26    MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
27    MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES,
28    MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
29};
30use mz_catalog::config::AwsPrincipalContext;
31use mz_catalog::durable::SourceReferences;
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34    CatalogEntry, CatalogItem, Connection, DataSourceDesc, Func, Index, MaterializedView, Sink,
35    Table, TableDataSource, Type, View,
36};
37use mz_expr::MirScalarExpr;
38use mz_license_keys::ValidatedLicenseKey;
39use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
40use mz_ore::cast::CastFrom;
41use mz_ore::collections::CollectionExt;
42use mz_persist_client::batch::ProtoBatch;
43use mz_repr::adt::array::ArrayDimension;
44use mz_repr::adt::interval::Interval;
45use mz_repr::adt::jsonb::Jsonb;
46use mz_repr::adt::mz_acl_item::PrivilegeMap;
47use mz_repr::refresh_schedule::RefreshEvery;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50    CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
51};
52use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
53use mz_sql::catalog::{CatalogType, TypeCategory};
54use mz_sql::func::FuncImplCatalogDetails;
55use mz_sql::names::{CommentObjectId, SchemaSpecifier};
56use mz_sql::plan::{ConnectionDetails, SshKey};
57use mz_sql_parser::ast::display::AstDisplay;
58use mz_storage_client::client::TableData;
59use mz_storage_types::connections::KafkaConnection;
60use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
61use mz_storage_types::connections::inline::ReferencedConnection;
62use mz_storage_types::connections::string_or_secret::StringOrSecret;
63use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
64use mz_storage_types::sources::{
65    GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
66};
67use smallvec::smallvec;
68
69// DO NOT add any more imports from `crate` outside of `crate::catalog`.
70use crate::active_compute_sink::ActiveSubscribe;
71use crate::catalog::CatalogState;
72use crate::coord::ConnMeta;
73
74/// An update to a built-in table.
75#[derive(Debug, Clone)]
76pub struct BuiltinTableUpdate<T = CatalogItemId> {
77    /// The reference of the table to update.
78    pub id: T,
79    /// The data to put into the table.
80    pub data: TableData,
81}
82
83impl<T> BuiltinTableUpdate<T> {
84    /// Create a [`BuiltinTableUpdate`] from a [`Row`].
85    pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
86        BuiltinTableUpdate {
87            id,
88            data: TableData::Rows(vec![(row, diff)]),
89        }
90    }
91
92    pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
93        BuiltinTableUpdate {
94            id,
95            data: TableData::Batches(smallvec![batch]),
96        }
97    }
98}
99
100impl CatalogState {
101    pub fn resolve_builtin_table_updates(
102        &self,
103        builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
104    ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
105        builtin_table_update
106            .into_iter()
107            .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
108            .collect()
109    }
110
111    pub fn resolve_builtin_table_update(
112        &self,
113        BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
114    ) -> BuiltinTableUpdate<CatalogItemId> {
115        let id = self.resolve_builtin_table(id);
116        BuiltinTableUpdate { id, data }
117    }
118
119    pub fn pack_depends_update(
120        &self,
121        depender: CatalogItemId,
122        dependee: CatalogItemId,
123        diff: Diff,
124    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
125        let row = Row::pack_slice(&[
126            Datum::String(&depender.to_string()),
127            Datum::String(&dependee.to_string()),
128        ]);
129        BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
130    }
131
132    pub(super) fn pack_role_auth_update(
133        &self,
134        id: RoleId,
135        diff: Diff,
136    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
137        let role_auth = self.get_role_auth(&id);
138        let role = self.get_role(&id);
139        BuiltinTableUpdate::row(
140            &*MZ_ROLE_AUTH,
141            Row::pack_slice(&[
142                Datum::String(&role_auth.role_id.to_string()),
143                Datum::UInt32(role.oid),
144                match &role_auth.password_hash {
145                    Some(hash) => Datum::String(hash),
146                    None => Datum::Null,
147                },
148                Datum::TimestampTz(
149                    mz_ore::now::to_datetime(role_auth.updated_at)
150                        .try_into()
151                        .expect("must fit"),
152                ),
153            ]),
154            diff,
155        )
156    }
157
158    pub(super) fn pack_item_update(
159        &self,
160        id: CatalogItemId,
161        diff: Diff,
162    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
163        let entry = self.get_entry(&id);
164        let oid = entry.oid();
165        let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
166        let schema_id = &self
167            .get_schema(
168                &entry.name().qualifiers.database_spec,
169                &entry.name().qualifiers.schema_spec,
170                conn_id,
171            )
172            .id;
173        let name = &entry.name().item;
174        let owner_id = entry.owner_id();
175        let privileges_row = self.pack_privilege_array_row(entry.privileges());
176        let privileges = privileges_row.unpack_first();
177        let mut updates = match entry.item() {
178            CatalogItem::Index(index) => self.pack_index_update(id, index, diff),
179            CatalogItem::Table(table) => {
180                let mut updates = self
181                    .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
182
183                if let TableDataSource::DataSource {
184                    desc: data_source,
185                    timeline: _,
186                } = &table.data_source
187                {
188                    updates.extend(match data_source {
189                        DataSourceDesc::IngestionExport {
190                            ingestion_id,
191                            external_reference: UnresolvedItemName(external_reference),
192                            details: _,
193                            data_config: _,
194                        } => {
195                            let ingestion_entry = self
196                                .get_entry(ingestion_id)
197                                .source_desc()
198                                .expect("primary source exists")
199                                .expect("primary source is a source");
200
201                            match ingestion_entry.connection.name() {
202                                "postgres" => {
203                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
204                                    // The left-most qualification of Postgres
205                                    // tables is the database, but this
206                                    // information is redundant because each
207                                    // Postgres connection connects to only one
208                                    // database.
209                                    let schema_name = external_reference[1].as_str();
210                                    let table_name = external_reference[2].as_str();
211
212                                    self.pack_postgres_source_tables_update(
213                                        id,
214                                        schema_name,
215                                        table_name,
216                                        diff,
217                                    )
218                                }
219                                "mysql" => {
220                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
221                                    let schema_name = external_reference[0].as_str();
222                                    let table_name = external_reference[1].as_str();
223
224                                    self.pack_mysql_source_tables_update(
225                                        id,
226                                        schema_name,
227                                        table_name,
228                                        diff,
229                                    )
230                                }
231                                "sql-server" => {
232                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
233                                    // The left-most qualification of SQL Server tables is
234                                    // the database, but this information is redundant
235                                    // because each SQL Server connection connects to
236                                    // only one database.
237                                    let schema_name = external_reference[1].as_str();
238                                    let table_name = external_reference[2].as_str();
239
240                                    self.pack_sql_server_source_table_update(
241                                        id,
242                                        schema_name,
243                                        table_name,
244                                        diff,
245                                    )
246                                }
247                                // Load generator sources don't have any special
248                                // updates.
249                                "load-generator" => vec![],
250                                "kafka" => {
251                                    mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
252                                    let topic = external_reference[0].as_str();
253                                    let envelope = data_source.envelope();
254                                    let (key_format, value_format) = data_source.formats();
255
256                                    self.pack_kafka_source_tables_update(
257                                        id,
258                                        topic,
259                                        envelope,
260                                        key_format,
261                                        value_format,
262                                        diff,
263                                    )
264                                }
265                                s => unreachable!("{s} sources do not have tables"),
266                            }
267                        }
268                        DataSourceDesc::Ingestion { .. }
269                        | DataSourceDesc::OldSyntaxIngestion { .. }
270                        | DataSourceDesc::Introspection(_)
271                        | DataSourceDesc::Progress
272                        | DataSourceDesc::Webhook { .. }
273                        | DataSourceDesc::Catalog => vec![],
274                    });
275                }
276
277                updates
278            }
279            CatalogItem::Source(source) => {
280                match &source.data_source {
281                    DataSourceDesc::Ingestion { desc, .. }
282                    | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
283                        GenericSourceConnection::Postgres(postgres) => {
284                            self.pack_postgres_source_update(id, postgres, diff)
285                        }
286                        GenericSourceConnection::Kafka(kafka) => {
287                            self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
288                        }
289                        _ => vec![],
290                    },
291                    DataSourceDesc::IngestionExport {
292                        ingestion_id,
293                        external_reference: UnresolvedItemName(external_reference),
294                        details: _,
295                        data_config: _,
296                    } => {
297                        let ingestion_entry = self
298                            .get_entry(ingestion_id)
299                            .source_desc()
300                            .expect("primary source exists")
301                            .expect("primary source is a source");
302
303                        match ingestion_entry.connection.name() {
304                            "postgres" => {
305                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
306                                // The left-most qualification of Postgres
307                                // tables is the database, but this
308                                // information is redundant because each
309                                // Postgres connection connects to only one
310                                // database.
311                                let schema_name = external_reference[1].as_str();
312                                let table_name = external_reference[2].as_str();
313
314                                self.pack_postgres_source_tables_update(
315                                    id,
316                                    schema_name,
317                                    table_name,
318                                    diff,
319                                )
320                            }
321                            "mysql" => {
322                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
323                                let schema_name = external_reference[0].as_str();
324                                let table_name = external_reference[1].as_str();
325
326                                self.pack_mysql_source_tables_update(
327                                    id,
328                                    schema_name,
329                                    table_name,
330                                    diff,
331                                )
332                            }
333                            "sql-server" => {
334                                mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
335                                // The left-most qualification of SQL Server tables is
336                                // the database, but this information is redundant
337                                // because each SQL Server connection connects to
338                                // only one database.
339                                let schema_name = external_reference[1].as_str();
340                                let table_name = external_reference[2].as_str();
341
342                                self.pack_sql_server_source_table_update(
343                                    id,
344                                    schema_name,
345                                    table_name,
346                                    diff,
347                                )
348                            }
349                            // Load generator sources don't have any special
350                            // updates.
351                            "load-generator" => vec![],
352                            s => unreachable!("{s} sources do not have subsources"),
353                        }
354                    }
355                    DataSourceDesc::Webhook { .. } => {
356                        vec![self.pack_webhook_source_update(id, diff)]
357                    }
358                    DataSourceDesc::Introspection(_)
359                    | DataSourceDesc::Progress
360                    | DataSourceDesc::Catalog => vec![],
361                }
362            }
363            CatalogItem::View(view) => {
364                self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
365            }
366            CatalogItem::MaterializedView(mview) => {
367                self.pack_materialized_view_update(id, mview, diff)
368            }
369            CatalogItem::Sink(sink) => {
370                self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
371            }
372            CatalogItem::Type(ty) => {
373                self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
374            }
375            CatalogItem::Func(func) => {
376                self.pack_func_update(id, schema_id, name, owner_id, func, diff)
377            }
378            CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
379            CatalogItem::Connection(connection) => {
380                self.pack_connection_update(id, connection, diff)
381            }
382        };
383
384        if !entry.item().is_temporary() {
385            // Populate or clean up the `mz_object_dependencies` table.
386            // TODO(jkosh44) Unclear if this table wants to include all uses or only references.
387            for dependee in entry.item().references().items() {
388                updates.push(self.pack_depends_update(id, *dependee, diff))
389            }
390        }
391
392        // Always report the latest for an objects columns.
393        if let Some(desc) = entry.relation_desc_latest() {
394            let defaults = match entry.item() {
395                CatalogItem::Table(Table {
396                    data_source: TableDataSource::TableWrites { defaults },
397                    ..
398                }) => Some(defaults),
399                _ => None,
400            };
401            for (i, (column_name, column_type)) in desc.iter().enumerate() {
402                let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
403                let default: Datum = default
404                    .as_ref()
405                    .map(|d| Datum::String(d))
406                    .unwrap_or(Datum::Null);
407                let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
408                let (type_name, type_oid) = match &column_type.scalar_type {
409                    SqlScalarType::List {
410                        custom_id: Some(custom_id),
411                        ..
412                    }
413                    | SqlScalarType::Map {
414                        custom_id: Some(custom_id),
415                        ..
416                    }
417                    | SqlScalarType::Record {
418                        custom_id: Some(custom_id),
419                        ..
420                    } => {
421                        let entry = self.get_entry(custom_id);
422                        // NOTE(benesch): the `mz_columns.type text` field is
423                        // wrong. Types do not have a name that can be
424                        // represented as a single textual field. There can be
425                        // multiple types with the same name in different
426                        // schemas and databases. We should eventually deprecate
427                        // the `type` field in favor of a new `type_id` field
428                        // that can be joined against `mz_types`.
429                        //
430                        // For now, in the interest of pragmatism, we just use
431                        // the type's item name, and accept that there may be
432                        // ambiguity if the same type name is used in multiple
433                        // schemas. The ambiguity is mitigated by the OID, which
434                        // can be joined against `mz_types.oid` to resolve the
435                        // ambiguity.
436                        let name = &*entry.name().item;
437                        let oid = entry.oid();
438                        (name, oid)
439                    }
440                    _ => (pgtype.name(), pgtype.oid()),
441                };
442                updates.push(BuiltinTableUpdate::row(
443                    &*MZ_COLUMNS,
444                    Row::pack_slice(&[
445                        Datum::String(&id.to_string()),
446                        Datum::String(column_name),
447                        Datum::UInt64(u64::cast_from(i + 1)),
448                        Datum::from(column_type.nullable),
449                        Datum::String(type_name),
450                        default,
451                        Datum::UInt32(type_oid),
452                        Datum::Int32(pgtype.typmod()),
453                    ]),
454                    diff,
455                ));
456            }
457        }
458
459        // Use initial lcw so that we can tell apart default from non-existent windows.
460        if let Some(cw) = entry.item().initial_logical_compaction_window() {
461            updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
462        }
463
464        updates.extend(Self::pack_item_global_id_update(entry, diff));
465
466        updates
467    }
468
469    fn pack_item_global_id_update(
470        entry: &CatalogEntry,
471        diff: Diff,
472    ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
473        let id = entry.id().to_string();
474        let global_ids = entry.global_ids();
475        global_ids.map(move |global_id| {
476            BuiltinTableUpdate::row(
477                &*MZ_OBJECT_GLOBAL_IDS,
478                Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
479                diff,
480            )
481        })
482    }
483
484    fn pack_history_retention_strategy_update(
485        &self,
486        id: CatalogItemId,
487        cw: CompactionWindow,
488        diff: Diff,
489    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
490        let cw: u64 = cw.comparable_timestamp().into();
491        let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
492            .expect("must serialize");
493        BuiltinTableUpdate::row(
494            &*MZ_HISTORY_RETENTION_STRATEGIES,
495            Row::pack_slice(&[
496                Datum::String(&id.to_string()),
497                // FOR is the only strategy at the moment. We may introduce FROM or others later.
498                Datum::String("FOR"),
499                cw.into_row().into_element(),
500            ]),
501            diff,
502        )
503    }
504
505    fn pack_table_update(
506        &self,
507        id: CatalogItemId,
508        oid: u32,
509        schema_id: &SchemaSpecifier,
510        name: &str,
511        owner_id: &RoleId,
512        privileges: Datum,
513        diff: Diff,
514        table: &Table,
515    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
516        let redacted = table.create_sql.as_ref().map(|create_sql| {
517            mz_sql::parse::parse(create_sql)
518                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
519                .into_element()
520                .ast
521                .to_ast_string_redacted()
522        });
523        let source_id = if let TableDataSource::DataSource {
524            desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
525            ..
526        } = &table.data_source
527        {
528            Some(ingestion_id.to_string())
529        } else {
530            None
531        };
532
533        vec![BuiltinTableUpdate::row(
534            &*MZ_TABLES,
535            Row::pack_slice(&[
536                Datum::String(&id.to_string()),
537                Datum::UInt32(oid),
538                Datum::String(&schema_id.to_string()),
539                Datum::String(name),
540                Datum::String(&owner_id.to_string()),
541                privileges,
542                if let Some(create_sql) = &table.create_sql {
543                    Datum::String(create_sql)
544                } else {
545                    Datum::Null
546                },
547                if let Some(redacted) = &redacted {
548                    Datum::String(redacted)
549                } else {
550                    Datum::Null
551                },
552                if let Some(source_id) = source_id.as_ref() {
553                    Datum::String(source_id)
554                } else {
555                    Datum::Null
556                },
557            ]),
558            diff,
559        )]
560    }
561
562    fn pack_postgres_source_update(
563        &self,
564        id: CatalogItemId,
565        postgres: &PostgresSourceConnection<ReferencedConnection>,
566        diff: Diff,
567    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
568        vec![BuiltinTableUpdate::row(
569            &*MZ_POSTGRES_SOURCES,
570            Row::pack_slice(&[
571                Datum::String(&id.to_string()),
572                Datum::String(&postgres.publication_details.slot),
573                Datum::from(postgres.publication_details.timeline_id),
574            ]),
575            diff,
576        )]
577    }
578
579    fn pack_kafka_source_update(
580        &self,
581        item_id: CatalogItemId,
582        collection_id: GlobalId,
583        kafka: &KafkaSourceConnection<ReferencedConnection>,
584        diff: Diff,
585    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
586        vec![BuiltinTableUpdate::row(
587            &*MZ_KAFKA_SOURCES,
588            Row::pack_slice(&[
589                Datum::String(&item_id.to_string()),
590                Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
591                Datum::String(&kafka.topic),
592            ]),
593            diff,
594        )]
595    }
596
597    fn pack_postgres_source_tables_update(
598        &self,
599        id: CatalogItemId,
600        schema_name: &str,
601        table_name: &str,
602        diff: Diff,
603    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
604        vec![BuiltinTableUpdate::row(
605            &*MZ_POSTGRES_SOURCE_TABLES,
606            Row::pack_slice(&[
607                Datum::String(&id.to_string()),
608                Datum::String(schema_name),
609                Datum::String(table_name),
610            ]),
611            diff,
612        )]
613    }
614
615    fn pack_mysql_source_tables_update(
616        &self,
617        id: CatalogItemId,
618        schema_name: &str,
619        table_name: &str,
620        diff: Diff,
621    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
622        vec![BuiltinTableUpdate::row(
623            &*MZ_MYSQL_SOURCE_TABLES,
624            Row::pack_slice(&[
625                Datum::String(&id.to_string()),
626                Datum::String(schema_name),
627                Datum::String(table_name),
628            ]),
629            diff,
630        )]
631    }
632
633    fn pack_sql_server_source_table_update(
634        &self,
635        id: CatalogItemId,
636        schema_name: &str,
637        table_name: &str,
638        diff: Diff,
639    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
640        vec![BuiltinTableUpdate::row(
641            &*MZ_SQL_SERVER_SOURCE_TABLES,
642            Row::pack_slice(&[
643                Datum::String(&id.to_string()),
644                Datum::String(schema_name),
645                Datum::String(table_name),
646            ]),
647            diff,
648        )]
649    }
650
651    fn pack_kafka_source_tables_update(
652        &self,
653        id: CatalogItemId,
654        topic: &str,
655        envelope: Option<&str>,
656        key_format: Option<&str>,
657        value_format: Option<&str>,
658        diff: Diff,
659    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
660        vec![BuiltinTableUpdate::row(
661            &*MZ_KAFKA_SOURCE_TABLES,
662            Row::pack_slice(&[
663                Datum::String(&id.to_string()),
664                Datum::String(topic),
665                Datum::from(envelope),
666                Datum::from(key_format),
667                Datum::from(value_format),
668            ]),
669            diff,
670        )]
671    }
672
673    fn pack_connection_update(
674        &self,
675        id: CatalogItemId,
676        connection: &Connection,
677        diff: Diff,
678    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
679        let mut updates = vec![];
680        match connection.details {
681            ConnectionDetails::Kafka(ref kafka) => {
682                updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
683            }
684            ConnectionDetails::Aws(ref aws_config) => {
685                match self.pack_aws_connection_update(id, aws_config, diff) {
686                    Ok(update) => {
687                        updates.push(update);
688                    }
689                    Err(e) => {
690                        tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
691                    }
692                }
693            }
694            ConnectionDetails::AwsPrivatelink(_) => {
695                if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
696                    updates.push(self.pack_aws_privatelink_connection_update(
697                        id,
698                        aws_principal_context,
699                        diff,
700                    ));
701                } else {
702                    tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
703                }
704            }
705            ConnectionDetails::Ssh {
706                ref key_1,
707                ref key_2,
708                ..
709            } => {
710                updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
711            }
712            ConnectionDetails::Csr(_)
713            | ConnectionDetails::GlueSchemaRegistry(_)
714            | ConnectionDetails::Gcp(_)
715            | ConnectionDetails::Postgres(_)
716            | ConnectionDetails::MySql(_)
717            | ConnectionDetails::SqlServer(_)
718            | ConnectionDetails::IcebergCatalog(_) => (),
719        };
720        updates
721    }
722
723    pub(crate) fn pack_ssh_tunnel_connection_update(
724        &self,
725        id: CatalogItemId,
726        key_1: &SshKey,
727        key_2: &SshKey,
728        diff: Diff,
729    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
730        BuiltinTableUpdate::row(
731            &*MZ_SSH_TUNNEL_CONNECTIONS,
732            Row::pack_slice(&[
733                Datum::String(&id.to_string()),
734                Datum::String(key_1.public_key().as_str()),
735                Datum::String(key_2.public_key().as_str()),
736            ]),
737            diff,
738        )
739    }
740
741    fn pack_kafka_connection_update(
742        &self,
743        id: CatalogItemId,
744        kafka: &KafkaConnection<ReferencedConnection>,
745        diff: Diff,
746    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
747        let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
748        let mut row = Row::default();
749        row.packer()
750            .try_push_array(
751                &[ArrayDimension {
752                    lower_bound: 1,
753                    length: kafka.brokers.len(),
754                }],
755                kafka
756                    .brokers
757                    .iter()
758                    .map(|broker| Datum::String(&broker.address)),
759            )
760            .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
761        let brokers = row.unpack_first();
762        vec![BuiltinTableUpdate::row(
763            &*MZ_KAFKA_CONNECTIONS,
764            Row::pack_slice(&[
765                Datum::String(&id.to_string()),
766                brokers,
767                Datum::String(&progress_topic),
768            ]),
769            diff,
770        )]
771    }
772
773    pub fn pack_aws_privatelink_connection_update(
774        &self,
775        connection_id: CatalogItemId,
776        aws_principal_context: &AwsPrincipalContext,
777        diff: Diff,
778    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
779        let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
780        let row = Row::pack_slice(&[
781            Datum::String(&connection_id.to_string()),
782            Datum::String(&aws_principal_context.to_principal_string(connection_id)),
783        ]);
784        BuiltinTableUpdate::row(id, row, diff)
785    }
786
787    pub fn pack_aws_connection_update(
788        &self,
789        connection_id: CatalogItemId,
790        aws_config: &AwsConnection,
791        diff: Diff,
792    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
793        let id = &MZ_AWS_CONNECTIONS;
794
795        let mut access_key_id = None;
796        let mut access_key_id_secret_id = None;
797        let mut secret_access_key_secret_id = None;
798        let mut session_token = None;
799        let mut session_token_secret_id = None;
800        let mut assume_role_arn = None;
801        let mut assume_role_session_name = None;
802        let mut principal = None;
803        let mut external_id = None;
804        let mut example_trust_policy = None;
805        match &aws_config.auth {
806            AwsAuth::Credentials(credentials) => {
807                match &credentials.access_key_id {
808                    StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
809                    StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
810                }
811                secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
812                match credentials.session_token.as_ref() {
813                    None => (),
814                    Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
815                    Some(StringOrSecret::Secret(s)) => {
816                        session_token_secret_id = Some(s.to_string())
817                    }
818                }
819            }
820            AwsAuth::AssumeRole(assume_role) => {
821                assume_role_arn = Some(assume_role.arn.as_str());
822                assume_role_session_name = assume_role.session_name.as_deref();
823                principal = self
824                    .config
825                    .connection_context
826                    .aws_connection_role_arn
827                    .as_deref();
828                external_id =
829                    Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
830                example_trust_policy = {
831                    let policy = assume_role
832                        .example_trust_policy(&self.config.connection_context, connection_id)?;
833                    let policy = Jsonb::from_serde_json(policy).expect("valid json");
834                    Some(policy.into_row())
835                };
836            }
837        }
838
839        let row = Row::pack_slice(&[
840            Datum::String(&connection_id.to_string()),
841            Datum::from(aws_config.endpoint.as_deref()),
842            Datum::from(aws_config.region.as_deref()),
843            Datum::from(access_key_id),
844            Datum::from(access_key_id_secret_id.as_deref()),
845            Datum::from(secret_access_key_secret_id.as_deref()),
846            Datum::from(session_token),
847            Datum::from(session_token_secret_id.as_deref()),
848            Datum::from(assume_role_arn),
849            Datum::from(assume_role_session_name),
850            Datum::from(principal),
851            Datum::from(external_id.as_deref()),
852            Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
853        ]);
854
855        Ok(BuiltinTableUpdate::row(id, row, diff))
856    }
857
858    fn pack_view_update(
859        &self,
860        id: CatalogItemId,
861        oid: u32,
862        schema_id: &SchemaSpecifier,
863        name: &str,
864        owner_id: &RoleId,
865        privileges: Datum,
866        view: &View,
867        diff: Diff,
868    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
869        let create_stmt = mz_sql::parse::parse(&view.create_sql)
870            .unwrap_or_else(|e| {
871                panic!(
872                    "create_sql cannot be invalid: `{}` --- error: `{}`",
873                    view.create_sql, e
874                )
875            })
876            .into_element()
877            .ast;
878        let query = match &create_stmt {
879            Statement::CreateView(stmt) => &stmt.definition.query,
880            _ => unreachable!(),
881        };
882
883        let mut query_string = query.to_ast_string_stable();
884        // PostgreSQL appends a semicolon in `pg_views.definition`, we
885        // do the same for compatibility's sake.
886        query_string.push(';');
887
888        vec![BuiltinTableUpdate::row(
889            &*MZ_VIEWS,
890            Row::pack_slice(&[
891                Datum::String(&id.to_string()),
892                Datum::UInt32(oid),
893                Datum::String(&schema_id.to_string()),
894                Datum::String(name),
895                Datum::String(&query_string),
896                Datum::String(&owner_id.to_string()),
897                privileges,
898                Datum::String(&view.create_sql),
899                Datum::String(&create_stmt.to_ast_string_redacted()),
900            ]),
901            diff,
902        )]
903    }
904
905    fn pack_materialized_view_update(
906        &self,
907        id: CatalogItemId,
908        mview: &MaterializedView,
909        diff: Diff,
910    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
911        let mut updates = Vec::new();
912
913        if let Some(refresh_schedule) = &mview.refresh_schedule {
914            // This can't be `ON COMMIT`, because that is represented by a `None` instead of an
915            // empty `RefreshSchedule`.
916            assert!(!refresh_schedule.is_empty());
917            for RefreshEvery {
918                interval,
919                aligned_to,
920            } in refresh_schedule.everies.iter()
921            {
922                let aligned_to_dt = mz_ore::now::to_datetime(
923                    <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
924                );
925                updates.push(BuiltinTableUpdate::row(
926                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
927                    Row::pack_slice(&[
928                        Datum::String(&id.to_string()),
929                        Datum::String("every"),
930                        Datum::Interval(
931                            Interval::from_duration(interval).expect(
932                                "planning ensured that this is convertible back to Interval",
933                            ),
934                        ),
935                        Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
936                        Datum::Null,
937                    ]),
938                    diff,
939                ));
940            }
941            for at in refresh_schedule.ats.iter() {
942                let at_dt = mz_ore::now::to_datetime(
943                    <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
944                );
945                updates.push(BuiltinTableUpdate::row(
946                    &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
947                    Row::pack_slice(&[
948                        Datum::String(&id.to_string()),
949                        Datum::String("at"),
950                        Datum::Null,
951                        Datum::Null,
952                        Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
953                    ]),
954                    diff,
955                ));
956            }
957        } else {
958            updates.push(BuiltinTableUpdate::row(
959                &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
960                Row::pack_slice(&[
961                    Datum::String(&id.to_string()),
962                    Datum::String("on-commit"),
963                    Datum::Null,
964                    Datum::Null,
965                    Datum::Null,
966                ]),
967                diff,
968            ));
969        }
970
971        if let Some(target_id) = mview.replacement_target {
972            updates.push(BuiltinTableUpdate::row(
973                &*MZ_REPLACEMENTS,
974                Row::pack_slice(&[
975                    Datum::String(&id.to_string()),
976                    Datum::String(&target_id.to_string()),
977                ]),
978                diff,
979            ));
980        }
981
982        updates
983    }
984
985    fn pack_sink_update(
986        &self,
987        id: CatalogItemId,
988        oid: u32,
989        schema_id: &SchemaSpecifier,
990        name: &str,
991        owner_id: &RoleId,
992        sink: &Sink,
993        diff: Diff,
994    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
995        let mut updates = vec![];
996        match &sink.connection {
997            StorageSinkConnection::Kafka(KafkaSinkConnection {
998                topic: topic_name, ..
999            }) => {
1000                updates.push(BuiltinTableUpdate::row(
1001                    &*MZ_KAFKA_SINKS,
1002                    Row::pack_slice(&[
1003                        Datum::String(&id.to_string()),
1004                        Datum::String(topic_name.as_str()),
1005                    ]),
1006                    diff,
1007                ));
1008            }
1009            StorageSinkConnection::Iceberg(IcebergSinkConnection {
1010                namespace, table, ..
1011            }) => {
1012                updates.push(BuiltinTableUpdate::row(
1013                    &*MZ_ICEBERG_SINKS,
1014                    Row::pack_slice(&[
1015                        Datum::String(&id.to_string()),
1016                        Datum::String(namespace.as_str()),
1017                        Datum::String(table.as_str()),
1018                    ]),
1019                    diff,
1020                ));
1021            }
1022        };
1023
1024        let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1025            .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1026            .into_element()
1027            .ast;
1028
1029        let envelope = sink.envelope();
1030
1031        // The combined format string is used for the deprecated `format` column.
1032        let combined_format = sink.combined_format();
1033        let (key_format, value_format) = match sink.formats() {
1034            Some((key_format, value_format)) => (key_format, Some(value_format)),
1035            None => (None, None),
1036        };
1037
1038        updates.push(BuiltinTableUpdate::row(
1039            &*MZ_SINKS,
1040            Row::pack_slice(&[
1041                Datum::String(&id.to_string()),
1042                Datum::UInt32(oid),
1043                Datum::String(&schema_id.to_string()),
1044                Datum::String(name),
1045                Datum::String(sink.connection.name()),
1046                Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1047                // size column now deprecated w/o linked clusters
1048                Datum::Null,
1049                Datum::from(envelope),
1050                // FIXME: These key/value formats are kinda leaky! Should probably live in
1051                // the kafka sink table.
1052                Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1053                Datum::from(key_format),
1054                Datum::from(value_format),
1055                Datum::String(&sink.cluster_id.to_string()),
1056                Datum::String(&owner_id.to_string()),
1057                Datum::String(&sink.create_sql),
1058                Datum::String(&create_stmt.to_ast_string_redacted()),
1059            ]),
1060            diff,
1061        ));
1062
1063        updates
1064    }
1065
1066    fn pack_index_update(
1067        &self,
1068        id: CatalogItemId,
1069        index: &Index,
1070        diff: Diff,
1071    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1072        let mut updates = vec![];
1073
1074        let create_stmt = mz_sql::parse::parse(&index.create_sql)
1075            .unwrap_or_else(|e| {
1076                panic!(
1077                    "create_sql cannot be invalid: `{}` --- error: `{}`",
1078                    index.create_sql, e
1079                )
1080            })
1081            .into_element()
1082            .ast;
1083
1084        let key_sqls = match &create_stmt {
1085            Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1086                .as_ref()
1087                .expect("key_parts is filled in during planning"),
1088            _ => unreachable!(),
1089        };
1090
1091        let on_entry = self.get_entry_by_global_id(&index.on);
1092        let on_desc = on_entry
1093            .relation_desc()
1094            .expect("can only create indexes on items with a valid description");
1095        let repr_col_types: Vec<ReprColumnType> = on_desc
1096            .typ()
1097            .column_types
1098            .iter()
1099            .map(ReprColumnType::from)
1100            .collect();
1101        for (i, key) in index.keys.iter().enumerate() {
1102            let nullable = key.typ(&repr_col_types).nullable;
1103            let seq_in_index = u64::cast_from(i + 1);
1104            let key_sql = key_sqls
1105                .get(i)
1106                .expect("missing sql information for index key")
1107                .to_ast_string_simple();
1108            let (field_number, expression) = match key {
1109                MirScalarExpr::Column(col, _) => {
1110                    (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1111                }
1112                _ => (Datum::Null, Datum::String(&key_sql)),
1113            };
1114            updates.push(BuiltinTableUpdate::row(
1115                &*MZ_INDEX_COLUMNS,
1116                Row::pack_slice(&[
1117                    Datum::String(&id.to_string()),
1118                    Datum::UInt64(seq_in_index),
1119                    field_number,
1120                    expression,
1121                    Datum::from(nullable),
1122                ]),
1123                diff,
1124            ));
1125        }
1126
1127        updates
1128    }
1129
1130    fn pack_type_update(
1131        &self,
1132        id: CatalogItemId,
1133        oid: u32,
1134        schema_id: &SchemaSpecifier,
1135        name: &str,
1136        owner_id: &RoleId,
1137        privileges: Datum,
1138        typ: &Type,
1139        diff: Diff,
1140    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1141        let mut out = vec![];
1142
1143        let redacted = typ.create_sql.as_ref().map(|create_sql| {
1144            mz_sql::parse::parse(create_sql)
1145                .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1146                .into_element()
1147                .ast
1148                .to_ast_string_redacted()
1149        });
1150
1151        out.push(BuiltinTableUpdate::row(
1152            &*MZ_TYPES,
1153            Row::pack_slice(&[
1154                Datum::String(&id.to_string()),
1155                Datum::UInt32(oid),
1156                Datum::String(&schema_id.to_string()),
1157                Datum::String(name),
1158                Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1159                Datum::String(&owner_id.to_string()),
1160                privileges,
1161                if let Some(create_sql) = &typ.create_sql {
1162                    Datum::String(create_sql)
1163                } else {
1164                    Datum::Null
1165                },
1166                if let Some(redacted) = &redacted {
1167                    Datum::String(redacted)
1168                } else {
1169                    Datum::Null
1170                },
1171            ]),
1172            diff,
1173        ));
1174
1175        let mut row = Row::default();
1176        let mut packer = row.packer();
1177
1178        fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1179            if mods.is_empty() {
1180                packer.push(Datum::Null);
1181            } else {
1182                packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1183            }
1184        }
1185
1186        let index_id = match &typ.details.typ {
1187            CatalogType::Array {
1188                element_reference: element_id,
1189            } => {
1190                packer.push(Datum::String(&id.to_string()));
1191                packer.push(Datum::String(&element_id.to_string()));
1192                &MZ_ARRAY_TYPES
1193            }
1194            CatalogType::List {
1195                element_reference: element_id,
1196                element_modifiers,
1197            } => {
1198                packer.push(Datum::String(&id.to_string()));
1199                packer.push(Datum::String(&element_id.to_string()));
1200                append_modifier(&mut packer, element_modifiers);
1201                &MZ_LIST_TYPES
1202            }
1203            CatalogType::Map {
1204                key_reference: key_id,
1205                value_reference: value_id,
1206                key_modifiers,
1207                value_modifiers,
1208            } => {
1209                packer.push(Datum::String(&id.to_string()));
1210                packer.push(Datum::String(&key_id.to_string()));
1211                packer.push(Datum::String(&value_id.to_string()));
1212                append_modifier(&mut packer, key_modifiers);
1213                append_modifier(&mut packer, value_modifiers);
1214                &MZ_MAP_TYPES
1215            }
1216            CatalogType::Pseudo => {
1217                packer.push(Datum::String(&id.to_string()));
1218                &MZ_PSEUDO_TYPES
1219            }
1220            _ => {
1221                packer.push(Datum::String(&id.to_string()));
1222                &MZ_BASE_TYPES
1223            }
1224        };
1225        out.push(BuiltinTableUpdate::row(index_id, row, diff));
1226
1227        if let Some(pg_metadata) = &typ.details.pg_metadata {
1228            out.push(BuiltinTableUpdate::row(
1229                &*MZ_TYPE_PG_METADATA,
1230                Row::pack_slice(&[
1231                    Datum::String(&id.to_string()),
1232                    Datum::UInt32(pg_metadata.typinput_oid),
1233                    Datum::UInt32(pg_metadata.typreceive_oid),
1234                ]),
1235                diff,
1236            ));
1237        }
1238
1239        out
1240    }
1241
1242    fn pack_func_update(
1243        &self,
1244        id: CatalogItemId,
1245        schema_id: &SchemaSpecifier,
1246        name: &str,
1247        owner_id: &RoleId,
1248        func: &Func,
1249        diff: Diff,
1250    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1251        let mut updates = vec![];
1252        for func_impl_details in func.inner.func_impls() {
1253            let arg_type_ids = func_impl_details
1254                .arg_typs
1255                .iter()
1256                .map(|typ| self.get_system_type(typ).id().to_string())
1257                .collect::<Vec<_>>();
1258
1259            let mut row = Row::default();
1260            row.packer()
1261                .try_push_array(
1262                    &[ArrayDimension {
1263                        lower_bound: 1,
1264                        length: arg_type_ids.len(),
1265                    }],
1266                    arg_type_ids.iter().map(|id| Datum::String(id)),
1267                )
1268                .expect(
1269                    "arg_type_ids is 1 dimensional, and its length is used for the array length",
1270                );
1271            let arg_type_ids = row.unpack_first();
1272
1273            updates.push(BuiltinTableUpdate::row(
1274                &*MZ_FUNCTIONS,
1275                Row::pack_slice(&[
1276                    Datum::String(&id.to_string()),
1277                    Datum::UInt32(func_impl_details.oid),
1278                    Datum::String(&schema_id.to_string()),
1279                    Datum::String(name),
1280                    arg_type_ids,
1281                    Datum::from(
1282                        func_impl_details
1283                            .variadic_typ
1284                            .map(|typ| self.get_system_type(typ).id().to_string())
1285                            .as_deref(),
1286                    ),
1287                    Datum::from(
1288                        func_impl_details
1289                            .return_typ
1290                            .map(|typ| self.get_system_type(typ).id().to_string())
1291                            .as_deref(),
1292                    ),
1293                    func_impl_details.return_is_set.into(),
1294                    Datum::String(&owner_id.to_string()),
1295                ]),
1296                diff,
1297            ));
1298
1299            if let mz_sql::func::Func::Aggregate(_) = func.inner {
1300                updates.push(BuiltinTableUpdate::row(
1301                    &*MZ_AGGREGATES,
1302                    Row::pack_slice(&[
1303                        Datum::UInt32(func_impl_details.oid),
1304                        // TODO(database-issues#1064): Support ordered-set aggregate functions.
1305                        Datum::String("n"),
1306                        Datum::Int16(0),
1307                    ]),
1308                    diff,
1309                ));
1310            }
1311        }
1312        updates
1313    }
1314
1315    pub fn pack_op_update(
1316        &self,
1317        operator: &str,
1318        func_impl_details: FuncImplCatalogDetails,
1319        diff: Diff,
1320    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1321        let arg_type_ids = func_impl_details
1322            .arg_typs
1323            .iter()
1324            .map(|typ| self.get_system_type(typ).id().to_string())
1325            .collect::<Vec<_>>();
1326
1327        let mut row = Row::default();
1328        row.packer()
1329            .try_push_array(
1330                &[ArrayDimension {
1331                    lower_bound: 1,
1332                    length: arg_type_ids.len(),
1333                }],
1334                arg_type_ids.iter().map(|id| Datum::String(id)),
1335            )
1336            .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1337        let arg_type_ids = row.unpack_first();
1338
1339        BuiltinTableUpdate::row(
1340            &*MZ_OPERATORS,
1341            Row::pack_slice(&[
1342                Datum::UInt32(func_impl_details.oid),
1343                Datum::String(operator),
1344                arg_type_ids,
1345                Datum::from(
1346                    func_impl_details
1347                        .return_typ
1348                        .map(|typ| self.get_system_type(typ).id().to_string())
1349                        .as_deref(),
1350                ),
1351            ]),
1352            diff,
1353        )
1354    }
1355
1356    pub fn pack_audit_log_update(
1357        &self,
1358        event: &VersionedEvent,
1359        diff: Diff,
1360    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1361        let (event_type, object_type, details, user, occurred_at): (
1362            &EventType,
1363            &ObjectType,
1364            &EventDetails,
1365            &Option<String>,
1366            u64,
1367        ) = match event {
1368            VersionedEvent::V1(ev) => (
1369                &ev.event_type,
1370                &ev.object_type,
1371                &ev.details,
1372                &ev.user,
1373                ev.occurred_at,
1374            ),
1375        };
1376        let details = Jsonb::from_serde_json(details.as_json())
1377            .map_err(|e| {
1378                Error::new(ErrorKind::Unstructured(format!(
1379                    "could not pack audit log update: {}",
1380                    e
1381                )))
1382            })?
1383            .into_row();
1384        let details = details
1385            .iter()
1386            .next()
1387            .expect("details created above with a single jsonb column");
1388        let dt = mz_ore::now::to_datetime(occurred_at);
1389        let id = event.sortable_id();
1390        Ok(BuiltinTableUpdate::row(
1391            &*MZ_AUDIT_EVENTS,
1392            Row::pack_slice(&[
1393                Datum::UInt64(id),
1394                Datum::String(&format!("{}", event_type)),
1395                Datum::String(&format!("{}", object_type)),
1396                details,
1397                match user {
1398                    Some(user) => Datum::String(user),
1399                    None => Datum::Null,
1400                },
1401                Datum::TimestampTz(dt.try_into().expect("must fit")),
1402            ]),
1403            diff,
1404        ))
1405    }
1406
1407    pub fn pack_storage_usage_update(
1408        &self,
1409        VersionedStorageUsage::V1(event): VersionedStorageUsage,
1410        diff: Diff,
1411    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1412        let id = &MZ_STORAGE_USAGE_BY_SHARD;
1413        let row = Row::pack_slice(&[
1414            Datum::UInt64(event.id),
1415            Datum::from(event.shard_id.as_deref()),
1416            Datum::UInt64(event.size_bytes),
1417            Datum::TimestampTz(
1418                mz_ore::now::to_datetime(event.collection_timestamp)
1419                    .try_into()
1420                    .expect("must fit"),
1421            ),
1422        ]);
1423        BuiltinTableUpdate::row(id, row, diff)
1424    }
1425
1426    pub fn pack_egress_ip_update(
1427        &self,
1428        ip: &IpNet,
1429    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1430        let id = &MZ_EGRESS_IPS;
1431        let addr = ip.network();
1432        let row = Row::pack_slice(&[
1433            Datum::String(&addr.to_string()),
1434            Datum::Int32(ip.prefix_len().into()),
1435            Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1436        ]);
1437        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1438    }
1439
1440    pub fn pack_license_key_update(
1441        &self,
1442        license_key: &ValidatedLicenseKey,
1443    ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1444        let id = &MZ_LICENSE_KEYS;
1445        let row = Row::pack_slice(&[
1446            Datum::String(&license_key.id),
1447            Datum::String(&license_key.organization),
1448            Datum::String(&license_key.environment_id),
1449            Datum::TimestampTz(
1450                mz_ore::now::to_datetime(license_key.expiration * 1000)
1451                    .try_into()
1452                    .expect("must fit"),
1453            ),
1454            Datum::TimestampTz(
1455                mz_ore::now::to_datetime(license_key.not_before * 1000)
1456                    .try_into()
1457                    .expect("must fit"),
1458            ),
1459        ]);
1460        Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1461    }
1462
1463    pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1464        let mut updates = Vec::new();
1465        for (size, alloc) in &self.cluster_replica_sizes.0 {
1466            // Just invent something when the limits are `None`, which only happens in non-prod
1467            // environments (tests, process orchestrator, etc.)
1468            let DiskLimit(ByteSize(disk_bytes)) =
1469                (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1470
1471            // The disk column of mz_clusters / mz_cluster_replicas MVs needs
1472            // `swap_enabled` and `disk_bytes`; expose them through a parallel
1473            // internal table. Unlike the public sizes table below, we write
1474            // here unconditionally — `cluster_replica_size_has_disk` previously
1475            // indexed the in-memory map without checking `disabled`, so a
1476            // managed cluster pinned to a disabled size still resolved its
1477            // `disk` column from the real allocation. Writing disabled rows
1478            // here preserves that behavior.
1479            let internal_row = Row::pack_slice(&[
1480                size.as_str().into(),
1481                Datum::from(alloc.swap_enabled),
1482                disk_bytes.into(),
1483            ]);
1484            updates.push(BuiltinTableUpdate::row(
1485                &*MZ_CLUSTER_REPLICA_SIZE_INTERNAL,
1486                internal_row,
1487                Diff::ONE,
1488            ));
1489
1490            if alloc.disabled {
1491                continue;
1492            }
1493
1494            let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1495            let MemoryLimit(ByteSize(memory_bytes)) =
1496                (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1497
1498            let row = Row::pack_slice(&[
1499                size.as_str().into(),
1500                u64::cast_from(alloc.scale).into(),
1501                u64::cast_from(alloc.workers).into(),
1502                cpu_limit.as_nanocpus().into(),
1503                memory_bytes.into(),
1504                disk_bytes.into(),
1505                (alloc.credits_per_hour).into(),
1506            ]);
1507
1508            updates.push(BuiltinTableUpdate::row(
1509                &*MZ_CLUSTER_REPLICA_SIZES,
1510                row,
1511                Diff::ONE,
1512            ));
1513        }
1514
1515        updates
1516    }
1517
1518    pub fn pack_subscribe_update(
1519        &self,
1520        id: GlobalId,
1521        subscribe: &ActiveSubscribe,
1522        diff: Diff,
1523    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1524        let mut row = Row::default();
1525        let mut packer = row.packer();
1526        packer.push(Datum::String(&id.to_string()));
1527        packer.push(Datum::Uuid(subscribe.session_uuid));
1528        packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1529
1530        let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1531        packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1532
1533        let depends_on: Vec<_> = subscribe
1534            .depends_on
1535            .iter()
1536            .map(|id| id.to_string())
1537            .collect();
1538        packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1539
1540        BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1541    }
1542
1543    pub fn pack_session_update(
1544        &self,
1545        conn: &ConnMeta,
1546        diff: Diff,
1547    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1548        let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1549        BuiltinTableUpdate::row(
1550            &*MZ_SESSIONS,
1551            Row::pack_slice(&[
1552                Datum::Uuid(conn.uuid()),
1553                Datum::UInt32(conn.conn_id().unhandled()),
1554                Datum::String(&conn.authenticated_role_id().to_string()),
1555                Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1556                Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1557            ]),
1558            diff,
1559        )
1560    }
1561
1562    fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1563        let mut row = Row::default();
1564        let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1565        row.packer()
1566            .try_push_array(
1567                &[ArrayDimension {
1568                    lower_bound: 1,
1569                    length: flat_privileges.len(),
1570                }],
1571                flat_privileges
1572                    .into_iter()
1573                    .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1574            )
1575            .expect("privileges is 1 dimensional, and its length is used for the array length");
1576        row
1577    }
1578
1579    pub fn pack_comment_update(
1580        &self,
1581        object_id: CommentObjectId,
1582        column_pos: Option<usize>,
1583        comment: &str,
1584        diff: Diff,
1585    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1586        // Use the audit log representation so it's easier to join against.
1587        let object_type = mz_sql::catalog::ObjectType::from(object_id);
1588        let audit_type = super::object_type_to_audit_object_type(object_type);
1589        let object_type_str = audit_type.to_string();
1590
1591        let object_id_str = match object_id {
1592            CommentObjectId::Table(global_id)
1593            | CommentObjectId::View(global_id)
1594            | CommentObjectId::MaterializedView(global_id)
1595            | CommentObjectId::Source(global_id)
1596            | CommentObjectId::Sink(global_id)
1597            | CommentObjectId::Index(global_id)
1598            | CommentObjectId::Func(global_id)
1599            | CommentObjectId::Connection(global_id)
1600            | CommentObjectId::Secret(global_id)
1601            | CommentObjectId::Type(global_id) => global_id.to_string(),
1602            CommentObjectId::Role(role_id) => role_id.to_string(),
1603            CommentObjectId::Database(database_id) => database_id.to_string(),
1604            CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1605            CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1606            CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1607            CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1608        };
1609        let column_pos_datum = match column_pos {
1610            Some(pos) => {
1611                // TODO(parkmycar): https://github.com/MaterializeInc/database-issues/issues/6711.
1612                let pos =
1613                    i32::try_from(pos).expect("we constrain this value in the planning layer");
1614                Datum::Int32(pos)
1615            }
1616            None => Datum::Null,
1617        };
1618
1619        BuiltinTableUpdate::row(
1620            &*MZ_COMMENTS,
1621            Row::pack_slice(&[
1622                Datum::String(&object_id_str),
1623                Datum::String(&object_type_str),
1624                column_pos_datum,
1625                Datum::String(comment),
1626            ]),
1627            diff,
1628        )
1629    }
1630
1631    pub fn pack_webhook_source_update(
1632        &self,
1633        item_id: CatalogItemId,
1634        diff: Diff,
1635    ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1636        let url = self
1637            .try_get_webhook_url(&item_id)
1638            .expect("webhook source should exist");
1639        let url = url.to_string();
1640        let name = &self.get_entry(&item_id).name().item;
1641        let id_str = item_id.to_string();
1642
1643        BuiltinTableUpdate::row(
1644            &*MZ_WEBHOOKS_SOURCES,
1645            Row::pack_slice(&[
1646                Datum::String(&id_str),
1647                Datum::String(name),
1648                Datum::String(&url),
1649            ]),
1650            diff,
1651        )
1652    }
1653
1654    pub fn pack_source_references_update(
1655        &self,
1656        source_references: &SourceReferences,
1657        diff: Diff,
1658    ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1659        let source_id = source_references.source_id.to_string();
1660        let updated_at = &source_references.updated_at;
1661        source_references
1662            .references
1663            .iter()
1664            .map(|reference| {
1665                let mut row = Row::default();
1666                let mut packer = row.packer();
1667                packer.extend([
1668                    Datum::String(&source_id),
1669                    reference
1670                        .namespace
1671                        .as_ref()
1672                        .map(|s| Datum::String(s))
1673                        .unwrap_or(Datum::Null),
1674                    Datum::String(&reference.name),
1675                    Datum::TimestampTz(
1676                        mz_ore::now::to_datetime(*updated_at)
1677                            .try_into()
1678                            .expect("must fit"),
1679                    ),
1680                ]);
1681                if reference.columns.len() > 0 {
1682                    packer
1683                        .try_push_array(
1684                            &[ArrayDimension {
1685                                lower_bound: 1,
1686                                length: reference.columns.len(),
1687                            }],
1688                            reference.columns.iter().map(|col| Datum::String(col)),
1689                        )
1690                        .expect(
1691                            "columns is 1 dimensional, and its length is used for the array length",
1692                        );
1693                } else {
1694                    packer.push(Datum::Null);
1695                }
1696
1697                BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
1698            })
1699            .collect()
1700    }
1701}