1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20 MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_DEFAULT_PRIVILEGES,
21 MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS,
22 MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
23 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25 MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26 MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27 MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
28 MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
29 MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35 CatalogEntry, CatalogItem, ClusterVariant, Connection, DataSourceDesc, Func, Index,
36 MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55 CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84 pub id: T,
86 pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93 BuiltinTableUpdate {
94 id,
95 data: TableData::Rows(vec![(row, diff)]),
96 }
97 }
98
99 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100 BuiltinTableUpdate {
101 id,
102 data: TableData::Batches(smallvec![batch]),
103 }
104 }
105}
106
107impl CatalogState {
108 pub fn resolve_builtin_table_updates(
109 &self,
110 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112 builtin_table_update
113 .into_iter()
114 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115 .collect()
116 }
117
118 pub fn resolve_builtin_table_update(
119 &self,
120 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121 ) -> BuiltinTableUpdate<CatalogItemId> {
122 let id = self.resolve_builtin_table(id);
123 BuiltinTableUpdate { id, data }
124 }
125
126 pub fn pack_depends_update(
127 &self,
128 depender: CatalogItemId,
129 dependee: CatalogItemId,
130 diff: Diff,
131 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132 let row = Row::pack_slice(&[
133 Datum::String(&depender.to_string()),
134 Datum::String(&dependee.to_string()),
135 ]);
136 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137 }
138
139 pub(super) fn pack_role_auth_update(
140 &self,
141 id: RoleId,
142 diff: Diff,
143 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144 let role_auth = self.get_role_auth(&id);
145 let role = self.get_role(&id);
146 BuiltinTableUpdate::row(
147 &*MZ_ROLE_AUTH,
148 Row::pack_slice(&[
149 Datum::String(&role_auth.role_id.to_string()),
150 Datum::UInt32(role.oid),
151 match &role_auth.password_hash {
152 Some(hash) => Datum::String(hash),
153 None => Datum::Null,
154 },
155 Datum::TimestampTz(
156 mz_ore::now::to_datetime(role_auth.updated_at)
157 .try_into()
158 .expect("must fit"),
159 ),
160 ]),
161 diff,
162 )
163 }
164
165 pub(super) fn pack_role_update(
166 &self,
167 id: RoleId,
168 diff: Diff,
169 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170 match id {
171 RoleId::Public => vec![],
173 id => {
174 let role = self.get_role(&id);
175 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177 let rolcanlogin = if let Some(login) = role.attributes.login {
178 login
179 } else {
180 builtin_supers.contains(&role.id)
181 };
182
183 let rolsuper = if let Some(superuser) = role.attributes.superuser {
184 Datum::from(superuser)
185 } else if builtin_supers.contains(&role.id) {
186 Datum::from(true)
188 } else {
189 Datum::Null
198 };
199
200 let role_update = BuiltinTableUpdate::row(
201 &*MZ_ROLES,
202 Row::pack_slice(&[
203 Datum::String(&role.id.to_string()),
204 Datum::UInt32(role.oid),
205 Datum::String(&role.name),
206 Datum::from(role.attributes.inherit),
207 Datum::from(rolcanlogin),
208 rolsuper,
209 ]),
210 diff,
211 );
212 let mut updates = vec![role_update];
213
214 let session_vars_reference = SessionVars::new_unchecked(
217 &mz_build_info::DUMMY_BUILD_INFO,
218 SYSTEM_USER.clone(),
219 None,
220 );
221
222 for (name, val) in role.vars() {
223 let result = session_vars_reference
224 .inspect(name)
225 .and_then(|var| var.check(val.borrow()));
226 let Ok(formatted_val) = result else {
227 tracing::error!(?name, ?val, ?result, "found invalid role default var");
230 continue;
231 };
232
233 let role_var_update = BuiltinTableUpdate::row(
234 &*MZ_ROLE_PARAMETERS,
235 Row::pack_slice(&[
236 Datum::String(&role.id.to_string()),
237 Datum::String(name),
238 Datum::String(&formatted_val),
239 ]),
240 diff,
241 );
242 updates.push(role_var_update);
243 }
244
245 updates
246 }
247 }
248 }
249
250 pub(super) fn pack_cluster_update(
251 &self,
252 name: &str,
253 diff: Diff,
254 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255 let id = self.clusters_by_name[name];
256 let cluster = &self.clusters_by_id[&id];
257 let row = self.pack_privilege_array_row(cluster.privileges());
258 let privileges = row.unpack_first();
259 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260 match &cluster.config.variant {
261 ClusterVariant::Managed(config) => (
262 Some(config.size.as_str()),
263 Some(self.cluster_replica_size_has_disk(&config.size)),
264 Some(config.replication_factor),
265 if config.availability_zones.is_empty() {
266 None
267 } else {
268 Some(config.availability_zones.clone())
269 },
270 Some(config.logging.log_logging),
271 config.logging.interval.map(|d| {
272 Interval::from_duration(&d)
273 .expect("planning ensured this convertible back to interval")
274 }),
275 ),
276 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277 };
278
279 let mut row = Row::default();
280 let mut packer = row.packer();
281 packer.extend([
282 Datum::String(&id.to_string()),
283 Datum::String(name),
284 Datum::String(&cluster.owner_id.to_string()),
285 privileges,
286 cluster.is_managed().into(),
287 size.into(),
288 replication_factor.into(),
289 disk.into(),
290 ]);
291 if let Some(azs) = azs {
292 packer.push_list(azs.iter().map(|az| Datum::String(az)));
293 } else {
294 packer.push(Datum::Null);
295 }
296 packer.push(Datum::from(introspection_debugging));
297 packer.push(Datum::from(introspection_interval));
298
299 let mut updates = Vec::new();
300
301 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304 let row = match managed_config.schedule {
305 ClusterSchedule::Manual => Row::pack_slice(&[
306 Datum::String(&id.to_string()),
307 Datum::String("manual"),
308 Datum::Null,
309 ]),
310 ClusterSchedule::Refresh {
311 hydration_time_estimate,
312 } => Row::pack_slice(&[
313 Datum::String(&id.to_string()),
314 Datum::String("on-refresh"),
315 Datum::Interval(
316 Interval::from_duration(&hydration_time_estimate)
317 .expect("planning ensured that this is convertible back to Interval"),
318 ),
319 ]),
320 };
321 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322 }
323
324 updates
325 }
326
327 pub(super) fn pack_cluster_replica_update(
328 &self,
329 cluster_id: ClusterId,
330 name: &str,
331 diff: Diff,
332 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333 let cluster = &self.clusters_by_id[&cluster_id];
334 let id = cluster.replica_id(name).expect("Must exist");
335 let replica = cluster.replica(id).expect("Must exist");
336
337 let (size, disk, az) = match &replica.config.location {
338 ReplicaLocation::Managed(ManagedReplicaLocation {
341 size,
342 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343 allocation: _,
344 billed_as: _,
345 internal: _,
346 pending: _,
347 }) => (
348 Some(&**size),
349 Some(self.cluster_replica_size_has_disk(size)),
350 Some(az.as_str()),
351 ),
352 ReplicaLocation::Managed(ManagedReplicaLocation {
353 size,
354 availability_zones: _,
355 allocation: _,
356 billed_as: _,
357 internal: _,
358 pending: _,
359 }) => (
360 Some(&**size),
361 Some(self.cluster_replica_size_has_disk(size)),
362 None,
363 ),
364 ReplicaLocation::Unmanaged(_) => (None, None, None),
365 };
366
367 let cluster_replica_update = BuiltinTableUpdate::row(
368 &*MZ_CLUSTER_REPLICAS,
369 Row::pack_slice(&[
370 Datum::String(&id.to_string()),
371 Datum::String(name),
372 Datum::String(&cluster_id.to_string()),
373 Datum::from(size),
374 Datum::from(az),
375 Datum::String(&replica.owner_id.to_string()),
376 Datum::from(disk),
377 ]),
378 diff,
379 );
380
381 vec![cluster_replica_update]
382 }
383
384 pub(super) fn pack_item_update(
385 &self,
386 id: CatalogItemId,
387 diff: Diff,
388 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389 let entry = self.get_entry(&id);
390 let oid = entry.oid();
391 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392 let schema_id = &self
393 .get_schema(
394 &entry.name().qualifiers.database_spec,
395 &entry.name().qualifiers.schema_spec,
396 conn_id,
397 )
398 .id;
399 let name = &entry.name().item;
400 let owner_id = entry.owner_id();
401 let privileges_row = self.pack_privilege_array_row(entry.privileges());
402 let privileges = privileges_row.unpack_first();
403 let mut updates = match entry.item() {
404 CatalogItem::Index(index) => {
405 self.pack_index_update(id, oid, name, owner_id, index, diff)
406 }
407 CatalogItem::Table(table) => {
408 let mut updates = self
409 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
410
411 if let TableDataSource::DataSource {
412 desc: data_source,
413 timeline: _,
414 } = &table.data_source
415 {
416 updates.extend(match data_source {
417 DataSourceDesc::IngestionExport {
418 ingestion_id,
419 external_reference: UnresolvedItemName(external_reference),
420 details: _,
421 data_config: _,
422 } => {
423 let ingestion_entry = self
424 .get_entry(ingestion_id)
425 .source_desc()
426 .expect("primary source exists")
427 .expect("primary source is a source");
428
429 match ingestion_entry.connection.name() {
430 "postgres" => {
431 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
432 let schema_name = external_reference[1].as_str();
438 let table_name = external_reference[2].as_str();
439
440 self.pack_postgres_source_tables_update(
441 id,
442 schema_name,
443 table_name,
444 diff,
445 )
446 }
447 "mysql" => {
448 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
449 let schema_name = external_reference[0].as_str();
450 let table_name = external_reference[1].as_str();
451
452 self.pack_mysql_source_tables_update(
453 id,
454 schema_name,
455 table_name,
456 diff,
457 )
458 }
459 "sql-server" => {
460 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
461 let schema_name = external_reference[1].as_str();
466 let table_name = external_reference[2].as_str();
467
468 self.pack_sql_server_source_table_update(
469 id,
470 schema_name,
471 table_name,
472 diff,
473 )
474 }
475 "load-generator" => vec![],
478 "kafka" => {
479 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
480 let topic = external_reference[0].as_str();
481 let envelope = data_source.envelope();
482 let (key_format, value_format) = data_source.formats();
483
484 self.pack_kafka_source_tables_update(
485 id,
486 topic,
487 envelope,
488 key_format,
489 value_format,
490 diff,
491 )
492 }
493 s => unreachable!("{s} sources do not have tables"),
494 }
495 }
496 DataSourceDesc::Ingestion { .. }
497 | DataSourceDesc::OldSyntaxIngestion { .. }
498 | DataSourceDesc::Introspection(_)
499 | DataSourceDesc::Progress
500 | DataSourceDesc::Webhook { .. }
501 | DataSourceDesc::Catalog => vec![],
502 });
503 }
504
505 updates
506 }
507 CatalogItem::Source(source) => {
508 match &source.data_source {
509 DataSourceDesc::Ingestion { desc, .. }
510 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
511 GenericSourceConnection::Postgres(postgres) => {
512 self.pack_postgres_source_update(id, postgres, diff)
513 }
514 GenericSourceConnection::Kafka(kafka) => {
515 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
516 }
517 _ => vec![],
518 },
519 DataSourceDesc::IngestionExport {
520 ingestion_id,
521 external_reference: UnresolvedItemName(external_reference),
522 details: _,
523 data_config: _,
524 } => {
525 let ingestion_entry = self
526 .get_entry(ingestion_id)
527 .source_desc()
528 .expect("primary source exists")
529 .expect("primary source is a source");
530
531 match ingestion_entry.connection.name() {
532 "postgres" => {
533 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
534 let schema_name = external_reference[1].as_str();
540 let table_name = external_reference[2].as_str();
541
542 self.pack_postgres_source_tables_update(
543 id,
544 schema_name,
545 table_name,
546 diff,
547 )
548 }
549 "mysql" => {
550 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
551 let schema_name = external_reference[0].as_str();
552 let table_name = external_reference[1].as_str();
553
554 self.pack_mysql_source_tables_update(
555 id,
556 schema_name,
557 table_name,
558 diff,
559 )
560 }
561 "sql-server" => {
562 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
563 let schema_name = external_reference[1].as_str();
568 let table_name = external_reference[2].as_str();
569
570 self.pack_sql_server_source_table_update(
571 id,
572 schema_name,
573 table_name,
574 diff,
575 )
576 }
577 "load-generator" => vec![],
580 s => unreachable!("{s} sources do not have subsources"),
581 }
582 }
583 DataSourceDesc::Webhook { .. } => {
584 vec![self.pack_webhook_source_update(id, diff)]
585 }
586 DataSourceDesc::Introspection(_)
587 | DataSourceDesc::Progress
588 | DataSourceDesc::Catalog => vec![],
589 }
590 }
591 CatalogItem::View(view) => {
592 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
593 }
594 CatalogItem::MaterializedView(mview) => {
595 self.pack_materialized_view_update(id, mview, diff)
596 }
597 CatalogItem::Sink(sink) => {
598 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
599 }
600 CatalogItem::Type(ty) => {
601 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
602 }
603 CatalogItem::Func(func) => {
604 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
605 }
606 CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
607 CatalogItem::Connection(connection) => {
608 self.pack_connection_update(id, connection, diff)
609 }
610 };
611
612 if !entry.item().is_temporary() {
613 for dependee in entry.item().references().items() {
616 updates.push(self.pack_depends_update(id, *dependee, diff))
617 }
618 }
619
620 if let Some(desc) = entry.relation_desc_latest() {
622 let defaults = match entry.item() {
623 CatalogItem::Table(Table {
624 data_source: TableDataSource::TableWrites { defaults },
625 ..
626 }) => Some(defaults),
627 _ => None,
628 };
629 for (i, (column_name, column_type)) in desc.iter().enumerate() {
630 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
631 let default: Datum = default
632 .as_ref()
633 .map(|d| Datum::String(d))
634 .unwrap_or(Datum::Null);
635 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
636 let (type_name, type_oid) = match &column_type.scalar_type {
637 SqlScalarType::List {
638 custom_id: Some(custom_id),
639 ..
640 }
641 | SqlScalarType::Map {
642 custom_id: Some(custom_id),
643 ..
644 }
645 | SqlScalarType::Record {
646 custom_id: Some(custom_id),
647 ..
648 } => {
649 let entry = self.get_entry(custom_id);
650 let name = &*entry.name().item;
665 let oid = entry.oid();
666 (name, oid)
667 }
668 _ => (pgtype.name(), pgtype.oid()),
669 };
670 updates.push(BuiltinTableUpdate::row(
671 &*MZ_COLUMNS,
672 Row::pack_slice(&[
673 Datum::String(&id.to_string()),
674 Datum::String(column_name),
675 Datum::UInt64(u64::cast_from(i + 1)),
676 Datum::from(column_type.nullable),
677 Datum::String(type_name),
678 default,
679 Datum::UInt32(type_oid),
680 Datum::Int32(pgtype.typmod()),
681 ]),
682 diff,
683 ));
684 }
685 }
686
687 if let Some(cw) = entry.item().initial_logical_compaction_window() {
689 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
690 }
691
692 updates.extend(Self::pack_item_global_id_update(entry, diff));
693
694 updates
695 }
696
697 fn pack_item_global_id_update(
698 entry: &CatalogEntry,
699 diff: Diff,
700 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
701 let id = entry.id().to_string();
702 let global_ids = entry.global_ids();
703 global_ids.map(move |global_id| {
704 BuiltinTableUpdate::row(
705 &*MZ_OBJECT_GLOBAL_IDS,
706 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
707 diff,
708 )
709 })
710 }
711
712 fn pack_history_retention_strategy_update(
713 &self,
714 id: CatalogItemId,
715 cw: CompactionWindow,
716 diff: Diff,
717 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
718 let cw: u64 = cw.comparable_timestamp().into();
719 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
720 .expect("must serialize");
721 BuiltinTableUpdate::row(
722 &*MZ_HISTORY_RETENTION_STRATEGIES,
723 Row::pack_slice(&[
724 Datum::String(&id.to_string()),
725 Datum::String("FOR"),
727 cw.into_row().into_element(),
728 ]),
729 diff,
730 )
731 }
732
733 fn pack_table_update(
734 &self,
735 id: CatalogItemId,
736 oid: u32,
737 schema_id: &SchemaSpecifier,
738 name: &str,
739 owner_id: &RoleId,
740 privileges: Datum,
741 diff: Diff,
742 table: &Table,
743 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
744 let redacted = table.create_sql.as_ref().map(|create_sql| {
745 mz_sql::parse::parse(create_sql)
746 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
747 .into_element()
748 .ast
749 .to_ast_string_redacted()
750 });
751 let source_id = if let TableDataSource::DataSource {
752 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
753 ..
754 } = &table.data_source
755 {
756 Some(ingestion_id.to_string())
757 } else {
758 None
759 };
760
761 vec![BuiltinTableUpdate::row(
762 &*MZ_TABLES,
763 Row::pack_slice(&[
764 Datum::String(&id.to_string()),
765 Datum::UInt32(oid),
766 Datum::String(&schema_id.to_string()),
767 Datum::String(name),
768 Datum::String(&owner_id.to_string()),
769 privileges,
770 if let Some(create_sql) = &table.create_sql {
771 Datum::String(create_sql)
772 } else {
773 Datum::Null
774 },
775 if let Some(redacted) = &redacted {
776 Datum::String(redacted)
777 } else {
778 Datum::Null
779 },
780 if let Some(source_id) = source_id.as_ref() {
781 Datum::String(source_id)
782 } else {
783 Datum::Null
784 },
785 ]),
786 diff,
787 )]
788 }
789
790 fn pack_postgres_source_update(
791 &self,
792 id: CatalogItemId,
793 postgres: &PostgresSourceConnection<ReferencedConnection>,
794 diff: Diff,
795 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
796 vec![BuiltinTableUpdate::row(
797 &*MZ_POSTGRES_SOURCES,
798 Row::pack_slice(&[
799 Datum::String(&id.to_string()),
800 Datum::String(&postgres.publication_details.slot),
801 Datum::from(postgres.publication_details.timeline_id),
802 ]),
803 diff,
804 )]
805 }
806
807 fn pack_kafka_source_update(
808 &self,
809 item_id: CatalogItemId,
810 collection_id: GlobalId,
811 kafka: &KafkaSourceConnection<ReferencedConnection>,
812 diff: Diff,
813 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
814 vec![BuiltinTableUpdate::row(
815 &*MZ_KAFKA_SOURCES,
816 Row::pack_slice(&[
817 Datum::String(&item_id.to_string()),
818 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
819 Datum::String(&kafka.topic),
820 ]),
821 diff,
822 )]
823 }
824
825 fn pack_postgres_source_tables_update(
826 &self,
827 id: CatalogItemId,
828 schema_name: &str,
829 table_name: &str,
830 diff: Diff,
831 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
832 vec![BuiltinTableUpdate::row(
833 &*MZ_POSTGRES_SOURCE_TABLES,
834 Row::pack_slice(&[
835 Datum::String(&id.to_string()),
836 Datum::String(schema_name),
837 Datum::String(table_name),
838 ]),
839 diff,
840 )]
841 }
842
843 fn pack_mysql_source_tables_update(
844 &self,
845 id: CatalogItemId,
846 schema_name: &str,
847 table_name: &str,
848 diff: Diff,
849 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
850 vec![BuiltinTableUpdate::row(
851 &*MZ_MYSQL_SOURCE_TABLES,
852 Row::pack_slice(&[
853 Datum::String(&id.to_string()),
854 Datum::String(schema_name),
855 Datum::String(table_name),
856 ]),
857 diff,
858 )]
859 }
860
861 fn pack_sql_server_source_table_update(
862 &self,
863 id: CatalogItemId,
864 schema_name: &str,
865 table_name: &str,
866 diff: Diff,
867 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
868 vec![BuiltinTableUpdate::row(
869 &*MZ_SQL_SERVER_SOURCE_TABLES,
870 Row::pack_slice(&[
871 Datum::String(&id.to_string()),
872 Datum::String(schema_name),
873 Datum::String(table_name),
874 ]),
875 diff,
876 )]
877 }
878
879 fn pack_kafka_source_tables_update(
880 &self,
881 id: CatalogItemId,
882 topic: &str,
883 envelope: Option<&str>,
884 key_format: Option<&str>,
885 value_format: Option<&str>,
886 diff: Diff,
887 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
888 vec![BuiltinTableUpdate::row(
889 &*MZ_KAFKA_SOURCE_TABLES,
890 Row::pack_slice(&[
891 Datum::String(&id.to_string()),
892 Datum::String(topic),
893 Datum::from(envelope),
894 Datum::from(key_format),
895 Datum::from(value_format),
896 ]),
897 diff,
898 )]
899 }
900
901 fn pack_connection_update(
902 &self,
903 id: CatalogItemId,
904 connection: &Connection,
905 diff: Diff,
906 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
907 let mut updates = vec![];
908 match connection.details {
909 ConnectionDetails::Kafka(ref kafka) => {
910 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
911 }
912 ConnectionDetails::Aws(ref aws_config) => {
913 match self.pack_aws_connection_update(id, aws_config, diff) {
914 Ok(update) => {
915 updates.push(update);
916 }
917 Err(e) => {
918 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
919 }
920 }
921 }
922 ConnectionDetails::AwsPrivatelink(_) => {
923 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
924 updates.push(self.pack_aws_privatelink_connection_update(
925 id,
926 aws_principal_context,
927 diff,
928 ));
929 } else {
930 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
931 }
932 }
933 ConnectionDetails::Ssh {
934 ref key_1,
935 ref key_2,
936 ..
937 } => {
938 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
939 }
940 ConnectionDetails::Csr(_)
941 | ConnectionDetails::GlueSchemaRegistry(_)
942 | ConnectionDetails::Postgres(_)
943 | ConnectionDetails::MySql(_)
944 | ConnectionDetails::SqlServer(_)
945 | ConnectionDetails::IcebergCatalog(_) => (),
946 };
947 updates
948 }
949
950 pub(crate) fn pack_ssh_tunnel_connection_update(
951 &self,
952 id: CatalogItemId,
953 key_1: &SshKey,
954 key_2: &SshKey,
955 diff: Diff,
956 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
957 BuiltinTableUpdate::row(
958 &*MZ_SSH_TUNNEL_CONNECTIONS,
959 Row::pack_slice(&[
960 Datum::String(&id.to_string()),
961 Datum::String(key_1.public_key().as_str()),
962 Datum::String(key_2.public_key().as_str()),
963 ]),
964 diff,
965 )
966 }
967
968 fn pack_kafka_connection_update(
969 &self,
970 id: CatalogItemId,
971 kafka: &KafkaConnection<ReferencedConnection>,
972 diff: Diff,
973 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
974 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
975 let mut row = Row::default();
976 row.packer()
977 .try_push_array(
978 &[ArrayDimension {
979 lower_bound: 1,
980 length: kafka.brokers.len(),
981 }],
982 kafka
983 .brokers
984 .iter()
985 .map(|broker| Datum::String(&broker.address)),
986 )
987 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
988 let brokers = row.unpack_first();
989 vec![BuiltinTableUpdate::row(
990 &*MZ_KAFKA_CONNECTIONS,
991 Row::pack_slice(&[
992 Datum::String(&id.to_string()),
993 brokers,
994 Datum::String(&progress_topic),
995 ]),
996 diff,
997 )]
998 }
999
1000 pub fn pack_aws_privatelink_connection_update(
1001 &self,
1002 connection_id: CatalogItemId,
1003 aws_principal_context: &AwsPrincipalContext,
1004 diff: Diff,
1005 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1006 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1007 let row = Row::pack_slice(&[
1008 Datum::String(&connection_id.to_string()),
1009 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1010 ]);
1011 BuiltinTableUpdate::row(id, row, diff)
1012 }
1013
1014 pub fn pack_aws_connection_update(
1015 &self,
1016 connection_id: CatalogItemId,
1017 aws_config: &AwsConnection,
1018 diff: Diff,
1019 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1020 let id = &MZ_AWS_CONNECTIONS;
1021
1022 let mut access_key_id = None;
1023 let mut access_key_id_secret_id = None;
1024 let mut secret_access_key_secret_id = None;
1025 let mut session_token = None;
1026 let mut session_token_secret_id = None;
1027 let mut assume_role_arn = None;
1028 let mut assume_role_session_name = None;
1029 let mut principal = None;
1030 let mut external_id = None;
1031 let mut example_trust_policy = None;
1032 match &aws_config.auth {
1033 AwsAuth::Credentials(credentials) => {
1034 match &credentials.access_key_id {
1035 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1036 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1037 }
1038 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1039 match credentials.session_token.as_ref() {
1040 None => (),
1041 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1042 Some(StringOrSecret::Secret(s)) => {
1043 session_token_secret_id = Some(s.to_string())
1044 }
1045 }
1046 }
1047 AwsAuth::AssumeRole(assume_role) => {
1048 assume_role_arn = Some(assume_role.arn.as_str());
1049 assume_role_session_name = assume_role.session_name.as_deref();
1050 principal = self
1051 .config
1052 .connection_context
1053 .aws_connection_role_arn
1054 .as_deref();
1055 external_id =
1056 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1057 example_trust_policy = {
1058 let policy = assume_role
1059 .example_trust_policy(&self.config.connection_context, connection_id)?;
1060 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1061 Some(policy.into_row())
1062 };
1063 }
1064 }
1065
1066 let row = Row::pack_slice(&[
1067 Datum::String(&connection_id.to_string()),
1068 Datum::from(aws_config.endpoint.as_deref()),
1069 Datum::from(aws_config.region.as_deref()),
1070 Datum::from(access_key_id),
1071 Datum::from(access_key_id_secret_id.as_deref()),
1072 Datum::from(secret_access_key_secret_id.as_deref()),
1073 Datum::from(session_token),
1074 Datum::from(session_token_secret_id.as_deref()),
1075 Datum::from(assume_role_arn),
1076 Datum::from(assume_role_session_name),
1077 Datum::from(principal),
1078 Datum::from(external_id.as_deref()),
1079 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1080 ]);
1081
1082 Ok(BuiltinTableUpdate::row(id, row, diff))
1083 }
1084
1085 fn pack_view_update(
1086 &self,
1087 id: CatalogItemId,
1088 oid: u32,
1089 schema_id: &SchemaSpecifier,
1090 name: &str,
1091 owner_id: &RoleId,
1092 privileges: Datum,
1093 view: &View,
1094 diff: Diff,
1095 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1096 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1097 .unwrap_or_else(|e| {
1098 panic!(
1099 "create_sql cannot be invalid: `{}` --- error: `{}`",
1100 view.create_sql, e
1101 )
1102 })
1103 .into_element()
1104 .ast;
1105 let query = match &create_stmt {
1106 Statement::CreateView(stmt) => &stmt.definition.query,
1107 _ => unreachable!(),
1108 };
1109
1110 let mut query_string = query.to_ast_string_stable();
1111 query_string.push(';');
1114
1115 vec![BuiltinTableUpdate::row(
1116 &*MZ_VIEWS,
1117 Row::pack_slice(&[
1118 Datum::String(&id.to_string()),
1119 Datum::UInt32(oid),
1120 Datum::String(&schema_id.to_string()),
1121 Datum::String(name),
1122 Datum::String(&query_string),
1123 Datum::String(&owner_id.to_string()),
1124 privileges,
1125 Datum::String(&view.create_sql),
1126 Datum::String(&create_stmt.to_ast_string_redacted()),
1127 ]),
1128 diff,
1129 )]
1130 }
1131
1132 fn pack_materialized_view_update(
1133 &self,
1134 id: CatalogItemId,
1135 mview: &MaterializedView,
1136 diff: Diff,
1137 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1138 let mut updates = Vec::new();
1139
1140 if let Some(refresh_schedule) = &mview.refresh_schedule {
1141 assert!(!refresh_schedule.is_empty());
1144 for RefreshEvery {
1145 interval,
1146 aligned_to,
1147 } in refresh_schedule.everies.iter()
1148 {
1149 let aligned_to_dt = mz_ore::now::to_datetime(
1150 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1151 );
1152 updates.push(BuiltinTableUpdate::row(
1153 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1154 Row::pack_slice(&[
1155 Datum::String(&id.to_string()),
1156 Datum::String("every"),
1157 Datum::Interval(
1158 Interval::from_duration(interval).expect(
1159 "planning ensured that this is convertible back to Interval",
1160 ),
1161 ),
1162 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1163 Datum::Null,
1164 ]),
1165 diff,
1166 ));
1167 }
1168 for at in refresh_schedule.ats.iter() {
1169 let at_dt = mz_ore::now::to_datetime(
1170 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1171 );
1172 updates.push(BuiltinTableUpdate::row(
1173 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1174 Row::pack_slice(&[
1175 Datum::String(&id.to_string()),
1176 Datum::String("at"),
1177 Datum::Null,
1178 Datum::Null,
1179 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1180 ]),
1181 diff,
1182 ));
1183 }
1184 } else {
1185 updates.push(BuiltinTableUpdate::row(
1186 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1187 Row::pack_slice(&[
1188 Datum::String(&id.to_string()),
1189 Datum::String("on-commit"),
1190 Datum::Null,
1191 Datum::Null,
1192 Datum::Null,
1193 ]),
1194 diff,
1195 ));
1196 }
1197
1198 if let Some(target_id) = mview.replacement_target {
1199 updates.push(BuiltinTableUpdate::row(
1200 &*MZ_REPLACEMENTS,
1201 Row::pack_slice(&[
1202 Datum::String(&id.to_string()),
1203 Datum::String(&target_id.to_string()),
1204 ]),
1205 diff,
1206 ));
1207 }
1208
1209 updates
1210 }
1211
1212 fn pack_sink_update(
1213 &self,
1214 id: CatalogItemId,
1215 oid: u32,
1216 schema_id: &SchemaSpecifier,
1217 name: &str,
1218 owner_id: &RoleId,
1219 sink: &Sink,
1220 diff: Diff,
1221 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1222 let mut updates = vec![];
1223 match &sink.connection {
1224 StorageSinkConnection::Kafka(KafkaSinkConnection {
1225 topic: topic_name, ..
1226 }) => {
1227 updates.push(BuiltinTableUpdate::row(
1228 &*MZ_KAFKA_SINKS,
1229 Row::pack_slice(&[
1230 Datum::String(&id.to_string()),
1231 Datum::String(topic_name.as_str()),
1232 ]),
1233 diff,
1234 ));
1235 }
1236 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1237 namespace, table, ..
1238 }) => {
1239 updates.push(BuiltinTableUpdate::row(
1240 &*MZ_ICEBERG_SINKS,
1241 Row::pack_slice(&[
1242 Datum::String(&id.to_string()),
1243 Datum::String(namespace.as_str()),
1244 Datum::String(table.as_str()),
1245 ]),
1246 diff,
1247 ));
1248 }
1249 };
1250
1251 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1252 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1253 .into_element()
1254 .ast;
1255
1256 let envelope = sink.envelope();
1257
1258 let combined_format = sink.combined_format();
1260 let (key_format, value_format) = match sink.formats() {
1261 Some((key_format, value_format)) => (key_format, Some(value_format)),
1262 None => (None, None),
1263 };
1264
1265 updates.push(BuiltinTableUpdate::row(
1266 &*MZ_SINKS,
1267 Row::pack_slice(&[
1268 Datum::String(&id.to_string()),
1269 Datum::UInt32(oid),
1270 Datum::String(&schema_id.to_string()),
1271 Datum::String(name),
1272 Datum::String(sink.connection.name()),
1273 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1274 Datum::Null,
1276 Datum::from(envelope),
1277 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1280 Datum::from(key_format),
1281 Datum::from(value_format),
1282 Datum::String(&sink.cluster_id.to_string()),
1283 Datum::String(&owner_id.to_string()),
1284 Datum::String(&sink.create_sql),
1285 Datum::String(&create_stmt.to_ast_string_redacted()),
1286 ]),
1287 diff,
1288 ));
1289
1290 updates
1291 }
1292
1293 fn pack_index_update(
1294 &self,
1295 id: CatalogItemId,
1296 oid: u32,
1297 name: &str,
1298 owner_id: &RoleId,
1299 index: &Index,
1300 diff: Diff,
1301 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1302 let mut updates = vec![];
1303
1304 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1305 .unwrap_or_else(|e| {
1306 panic!(
1307 "create_sql cannot be invalid: `{}` --- error: `{}`",
1308 index.create_sql, e
1309 )
1310 })
1311 .into_element()
1312 .ast;
1313
1314 let key_sqls = match &create_stmt {
1315 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1316 .as_ref()
1317 .expect("key_parts is filled in during planning"),
1318 _ => unreachable!(),
1319 };
1320 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1321
1322 updates.push(BuiltinTableUpdate::row(
1323 &*MZ_INDEXES,
1324 Row::pack_slice(&[
1325 Datum::String(&id.to_string()),
1326 Datum::UInt32(oid),
1327 Datum::String(name),
1328 Datum::String(&on_item_id.to_string()),
1329 Datum::String(&index.cluster_id.to_string()),
1330 Datum::String(&owner_id.to_string()),
1331 Datum::String(&index.create_sql),
1332 Datum::String(&create_stmt.to_ast_string_redacted()),
1333 ]),
1334 diff,
1335 ));
1336
1337 let on_entry = self.get_entry_by_global_id(&index.on);
1338 let on_desc = on_entry
1339 .relation_desc()
1340 .expect("can only create indexes on items with a valid description");
1341 let repr_col_types: Vec<ReprColumnType> = on_desc
1342 .typ()
1343 .column_types
1344 .iter()
1345 .map(ReprColumnType::from)
1346 .collect();
1347 for (i, key) in index.keys.iter().enumerate() {
1348 let nullable = key.typ(&repr_col_types).nullable;
1349 let seq_in_index = u64::cast_from(i + 1);
1350 let key_sql = key_sqls
1351 .get(i)
1352 .expect("missing sql information for index key")
1353 .to_ast_string_simple();
1354 let (field_number, expression) = match key {
1355 MirScalarExpr::Column(col, _) => {
1356 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1357 }
1358 _ => (Datum::Null, Datum::String(&key_sql)),
1359 };
1360 updates.push(BuiltinTableUpdate::row(
1361 &*MZ_INDEX_COLUMNS,
1362 Row::pack_slice(&[
1363 Datum::String(&id.to_string()),
1364 Datum::UInt64(seq_in_index),
1365 field_number,
1366 expression,
1367 Datum::from(nullable),
1368 ]),
1369 diff,
1370 ));
1371 }
1372
1373 updates
1374 }
1375
1376 fn pack_type_update(
1377 &self,
1378 id: CatalogItemId,
1379 oid: u32,
1380 schema_id: &SchemaSpecifier,
1381 name: &str,
1382 owner_id: &RoleId,
1383 privileges: Datum,
1384 typ: &Type,
1385 diff: Diff,
1386 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1387 let mut out = vec![];
1388
1389 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1390 mz_sql::parse::parse(create_sql)
1391 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1392 .into_element()
1393 .ast
1394 .to_ast_string_redacted()
1395 });
1396
1397 out.push(BuiltinTableUpdate::row(
1398 &*MZ_TYPES,
1399 Row::pack_slice(&[
1400 Datum::String(&id.to_string()),
1401 Datum::UInt32(oid),
1402 Datum::String(&schema_id.to_string()),
1403 Datum::String(name),
1404 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1405 Datum::String(&owner_id.to_string()),
1406 privileges,
1407 if let Some(create_sql) = &typ.create_sql {
1408 Datum::String(create_sql)
1409 } else {
1410 Datum::Null
1411 },
1412 if let Some(redacted) = &redacted {
1413 Datum::String(redacted)
1414 } else {
1415 Datum::Null
1416 },
1417 ]),
1418 diff,
1419 ));
1420
1421 let mut row = Row::default();
1422 let mut packer = row.packer();
1423
1424 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1425 if mods.is_empty() {
1426 packer.push(Datum::Null);
1427 } else {
1428 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1429 }
1430 }
1431
1432 let index_id = match &typ.details.typ {
1433 CatalogType::Array {
1434 element_reference: element_id,
1435 } => {
1436 packer.push(Datum::String(&id.to_string()));
1437 packer.push(Datum::String(&element_id.to_string()));
1438 &MZ_ARRAY_TYPES
1439 }
1440 CatalogType::List {
1441 element_reference: element_id,
1442 element_modifiers,
1443 } => {
1444 packer.push(Datum::String(&id.to_string()));
1445 packer.push(Datum::String(&element_id.to_string()));
1446 append_modifier(&mut packer, element_modifiers);
1447 &MZ_LIST_TYPES
1448 }
1449 CatalogType::Map {
1450 key_reference: key_id,
1451 value_reference: value_id,
1452 key_modifiers,
1453 value_modifiers,
1454 } => {
1455 packer.push(Datum::String(&id.to_string()));
1456 packer.push(Datum::String(&key_id.to_string()));
1457 packer.push(Datum::String(&value_id.to_string()));
1458 append_modifier(&mut packer, key_modifiers);
1459 append_modifier(&mut packer, value_modifiers);
1460 &MZ_MAP_TYPES
1461 }
1462 CatalogType::Pseudo => {
1463 packer.push(Datum::String(&id.to_string()));
1464 &MZ_PSEUDO_TYPES
1465 }
1466 _ => {
1467 packer.push(Datum::String(&id.to_string()));
1468 &MZ_BASE_TYPES
1469 }
1470 };
1471 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1472
1473 if let Some(pg_metadata) = &typ.details.pg_metadata {
1474 out.push(BuiltinTableUpdate::row(
1475 &*MZ_TYPE_PG_METADATA,
1476 Row::pack_slice(&[
1477 Datum::String(&id.to_string()),
1478 Datum::UInt32(pg_metadata.typinput_oid),
1479 Datum::UInt32(pg_metadata.typreceive_oid),
1480 ]),
1481 diff,
1482 ));
1483 }
1484
1485 out
1486 }
1487
1488 fn pack_func_update(
1489 &self,
1490 id: CatalogItemId,
1491 schema_id: &SchemaSpecifier,
1492 name: &str,
1493 owner_id: &RoleId,
1494 func: &Func,
1495 diff: Diff,
1496 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1497 let mut updates = vec![];
1498 for func_impl_details in func.inner.func_impls() {
1499 let arg_type_ids = func_impl_details
1500 .arg_typs
1501 .iter()
1502 .map(|typ| self.get_system_type(typ).id().to_string())
1503 .collect::<Vec<_>>();
1504
1505 let mut row = Row::default();
1506 row.packer()
1507 .try_push_array(
1508 &[ArrayDimension {
1509 lower_bound: 1,
1510 length: arg_type_ids.len(),
1511 }],
1512 arg_type_ids.iter().map(|id| Datum::String(id)),
1513 )
1514 .expect(
1515 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1516 );
1517 let arg_type_ids = row.unpack_first();
1518
1519 updates.push(BuiltinTableUpdate::row(
1520 &*MZ_FUNCTIONS,
1521 Row::pack_slice(&[
1522 Datum::String(&id.to_string()),
1523 Datum::UInt32(func_impl_details.oid),
1524 Datum::String(&schema_id.to_string()),
1525 Datum::String(name),
1526 arg_type_ids,
1527 Datum::from(
1528 func_impl_details
1529 .variadic_typ
1530 .map(|typ| self.get_system_type(typ).id().to_string())
1531 .as_deref(),
1532 ),
1533 Datum::from(
1534 func_impl_details
1535 .return_typ
1536 .map(|typ| self.get_system_type(typ).id().to_string())
1537 .as_deref(),
1538 ),
1539 func_impl_details.return_is_set.into(),
1540 Datum::String(&owner_id.to_string()),
1541 ]),
1542 diff,
1543 ));
1544
1545 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1546 updates.push(BuiltinTableUpdate::row(
1547 &*MZ_AGGREGATES,
1548 Row::pack_slice(&[
1549 Datum::UInt32(func_impl_details.oid),
1550 Datum::String("n"),
1552 Datum::Int16(0),
1553 ]),
1554 diff,
1555 ));
1556 }
1557 }
1558 updates
1559 }
1560
1561 pub fn pack_op_update(
1562 &self,
1563 operator: &str,
1564 func_impl_details: FuncImplCatalogDetails,
1565 diff: Diff,
1566 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1567 let arg_type_ids = func_impl_details
1568 .arg_typs
1569 .iter()
1570 .map(|typ| self.get_system_type(typ).id().to_string())
1571 .collect::<Vec<_>>();
1572
1573 let mut row = Row::default();
1574 row.packer()
1575 .try_push_array(
1576 &[ArrayDimension {
1577 lower_bound: 1,
1578 length: arg_type_ids.len(),
1579 }],
1580 arg_type_ids.iter().map(|id| Datum::String(id)),
1581 )
1582 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1583 let arg_type_ids = row.unpack_first();
1584
1585 BuiltinTableUpdate::row(
1586 &*MZ_OPERATORS,
1587 Row::pack_slice(&[
1588 Datum::UInt32(func_impl_details.oid),
1589 Datum::String(operator),
1590 arg_type_ids,
1591 Datum::from(
1592 func_impl_details
1593 .return_typ
1594 .map(|typ| self.get_system_type(typ).id().to_string())
1595 .as_deref(),
1596 ),
1597 ]),
1598 diff,
1599 )
1600 }
1601
1602 pub fn pack_audit_log_update(
1603 &self,
1604 event: &VersionedEvent,
1605 diff: Diff,
1606 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1607 let (event_type, object_type, details, user, occurred_at): (
1608 &EventType,
1609 &ObjectType,
1610 &EventDetails,
1611 &Option<String>,
1612 u64,
1613 ) = match event {
1614 VersionedEvent::V1(ev) => (
1615 &ev.event_type,
1616 &ev.object_type,
1617 &ev.details,
1618 &ev.user,
1619 ev.occurred_at,
1620 ),
1621 };
1622 let details = Jsonb::from_serde_json(details.as_json())
1623 .map_err(|e| {
1624 Error::new(ErrorKind::Unstructured(format!(
1625 "could not pack audit log update: {}",
1626 e
1627 )))
1628 })?
1629 .into_row();
1630 let details = details
1631 .iter()
1632 .next()
1633 .expect("details created above with a single jsonb column");
1634 let dt = mz_ore::now::to_datetime(occurred_at);
1635 let id = event.sortable_id();
1636 Ok(BuiltinTableUpdate::row(
1637 &*MZ_AUDIT_EVENTS,
1638 Row::pack_slice(&[
1639 Datum::UInt64(id),
1640 Datum::String(&format!("{}", event_type)),
1641 Datum::String(&format!("{}", object_type)),
1642 details,
1643 match user {
1644 Some(user) => Datum::String(user),
1645 None => Datum::Null,
1646 },
1647 Datum::TimestampTz(dt.try_into().expect("must fit")),
1648 ]),
1649 diff,
1650 ))
1651 }
1652
1653 pub fn pack_storage_usage_update(
1654 &self,
1655 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1656 diff: Diff,
1657 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1658 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1659 let row = Row::pack_slice(&[
1660 Datum::UInt64(event.id),
1661 Datum::from(event.shard_id.as_deref()),
1662 Datum::UInt64(event.size_bytes),
1663 Datum::TimestampTz(
1664 mz_ore::now::to_datetime(event.collection_timestamp)
1665 .try_into()
1666 .expect("must fit"),
1667 ),
1668 ]);
1669 BuiltinTableUpdate::row(id, row, diff)
1670 }
1671
1672 pub fn pack_egress_ip_update(
1673 &self,
1674 ip: &IpNet,
1675 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1676 let id = &MZ_EGRESS_IPS;
1677 let addr = ip.network();
1678 let row = Row::pack_slice(&[
1679 Datum::String(&addr.to_string()),
1680 Datum::Int32(ip.prefix_len().into()),
1681 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1682 ]);
1683 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1684 }
1685
1686 pub fn pack_license_key_update(
1687 &self,
1688 license_key: &ValidatedLicenseKey,
1689 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1690 let id = &MZ_LICENSE_KEYS;
1691 let row = Row::pack_slice(&[
1692 Datum::String(&license_key.id),
1693 Datum::String(&license_key.organization),
1694 Datum::String(&license_key.environment_id),
1695 Datum::TimestampTz(
1696 mz_ore::now::to_datetime(license_key.expiration * 1000)
1697 .try_into()
1698 .expect("must fit"),
1699 ),
1700 Datum::TimestampTz(
1701 mz_ore::now::to_datetime(license_key.not_before * 1000)
1702 .try_into()
1703 .expect("must fit"),
1704 ),
1705 ]);
1706 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1707 }
1708
1709 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1710 let mut updates = Vec::new();
1711 for (size, alloc) in &self.cluster_replica_sizes.0 {
1712 if alloc.disabled {
1713 continue;
1714 }
1715
1716 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1719 let MemoryLimit(ByteSize(memory_bytes)) =
1720 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1721 let DiskLimit(ByteSize(disk_bytes)) =
1722 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1723
1724 let row = Row::pack_slice(&[
1725 size.as_str().into(),
1726 u64::cast_from(alloc.scale).into(),
1727 u64::cast_from(alloc.workers).into(),
1728 cpu_limit.as_nanocpus().into(),
1729 memory_bytes.into(),
1730 disk_bytes.into(),
1731 (alloc.credits_per_hour).into(),
1732 ]);
1733
1734 updates.push(BuiltinTableUpdate::row(
1735 &*MZ_CLUSTER_REPLICA_SIZES,
1736 row,
1737 Diff::ONE,
1738 ));
1739 }
1740
1741 updates
1742 }
1743
1744 pub fn pack_subscribe_update(
1745 &self,
1746 id: GlobalId,
1747 subscribe: &ActiveSubscribe,
1748 diff: Diff,
1749 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1750 let mut row = Row::default();
1751 let mut packer = row.packer();
1752 packer.push(Datum::String(&id.to_string()));
1753 packer.push(Datum::Uuid(subscribe.session_uuid));
1754 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1755
1756 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1757 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1758
1759 let depends_on: Vec<_> = subscribe
1760 .depends_on
1761 .iter()
1762 .map(|id| id.to_string())
1763 .collect();
1764 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1765
1766 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1767 }
1768
1769 pub fn pack_session_update(
1770 &self,
1771 conn: &ConnMeta,
1772 diff: Diff,
1773 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1774 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1775 BuiltinTableUpdate::row(
1776 &*MZ_SESSIONS,
1777 Row::pack_slice(&[
1778 Datum::Uuid(conn.uuid()),
1779 Datum::UInt32(conn.conn_id().unhandled()),
1780 Datum::String(&conn.authenticated_role_id().to_string()),
1781 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1782 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1783 ]),
1784 diff,
1785 )
1786 }
1787
1788 pub fn pack_default_privileges_update(
1789 &self,
1790 default_privilege_object: &DefaultPrivilegeObject,
1791 grantee: &RoleId,
1792 acl_mode: &AclMode,
1793 diff: Diff,
1794 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1795 BuiltinTableUpdate::row(
1796 &*MZ_DEFAULT_PRIVILEGES,
1797 Row::pack_slice(&[
1798 default_privilege_object.role_id.to_string().as_str().into(),
1799 default_privilege_object
1800 .database_id
1801 .map(|database_id| database_id.to_string())
1802 .as_deref()
1803 .into(),
1804 default_privilege_object
1805 .schema_id
1806 .map(|schema_id| schema_id.to_string())
1807 .as_deref()
1808 .into(),
1809 default_privilege_object
1810 .object_type
1811 .to_string()
1812 .to_lowercase()
1813 .as_str()
1814 .into(),
1815 grantee.to_string().as_str().into(),
1816 acl_mode.to_string().as_str().into(),
1817 ]),
1818 diff,
1819 )
1820 }
1821
1822 pub fn pack_system_privileges_update(
1823 &self,
1824 privileges: MzAclItem,
1825 diff: Diff,
1826 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1827 BuiltinTableUpdate::row(
1828 &*MZ_SYSTEM_PRIVILEGES,
1829 Row::pack_slice(&[privileges.into()]),
1830 diff,
1831 )
1832 }
1833
1834 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1835 let mut row = Row::default();
1836 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1837 row.packer()
1838 .try_push_array(
1839 &[ArrayDimension {
1840 lower_bound: 1,
1841 length: flat_privileges.len(),
1842 }],
1843 flat_privileges
1844 .into_iter()
1845 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1846 )
1847 .expect("privileges is 1 dimensional, and its length is used for the array length");
1848 row
1849 }
1850
1851 pub fn pack_comment_update(
1852 &self,
1853 object_id: CommentObjectId,
1854 column_pos: Option<usize>,
1855 comment: &str,
1856 diff: Diff,
1857 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1858 let object_type = mz_sql::catalog::ObjectType::from(object_id);
1860 let audit_type = super::object_type_to_audit_object_type(object_type);
1861 let object_type_str = audit_type.to_string();
1862
1863 let object_id_str = match object_id {
1864 CommentObjectId::Table(global_id)
1865 | CommentObjectId::View(global_id)
1866 | CommentObjectId::MaterializedView(global_id)
1867 | CommentObjectId::Source(global_id)
1868 | CommentObjectId::Sink(global_id)
1869 | CommentObjectId::Index(global_id)
1870 | CommentObjectId::Func(global_id)
1871 | CommentObjectId::Connection(global_id)
1872 | CommentObjectId::Secret(global_id)
1873 | CommentObjectId::Type(global_id) => global_id.to_string(),
1874 CommentObjectId::Role(role_id) => role_id.to_string(),
1875 CommentObjectId::Database(database_id) => database_id.to_string(),
1876 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1877 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1878 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1879 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1880 };
1881 let column_pos_datum = match column_pos {
1882 Some(pos) => {
1883 let pos =
1885 i32::try_from(pos).expect("we constrain this value in the planning layer");
1886 Datum::Int32(pos)
1887 }
1888 None => Datum::Null,
1889 };
1890
1891 BuiltinTableUpdate::row(
1892 &*MZ_COMMENTS,
1893 Row::pack_slice(&[
1894 Datum::String(&object_id_str),
1895 Datum::String(&object_type_str),
1896 column_pos_datum,
1897 Datum::String(comment),
1898 ]),
1899 diff,
1900 )
1901 }
1902
1903 pub fn pack_webhook_source_update(
1904 &self,
1905 item_id: CatalogItemId,
1906 diff: Diff,
1907 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1908 let url = self
1909 .try_get_webhook_url(&item_id)
1910 .expect("webhook source should exist");
1911 let url = url.to_string();
1912 let name = &self.get_entry(&item_id).name().item;
1913 let id_str = item_id.to_string();
1914
1915 BuiltinTableUpdate::row(
1916 &*MZ_WEBHOOKS_SOURCES,
1917 Row::pack_slice(&[
1918 Datum::String(&id_str),
1919 Datum::String(name),
1920 Datum::String(&url),
1921 ]),
1922 diff,
1923 )
1924 }
1925
1926 pub fn pack_source_references_update(
1927 &self,
1928 source_references: &SourceReferences,
1929 diff: Diff,
1930 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1931 let source_id = source_references.source_id.to_string();
1932 let updated_at = &source_references.updated_at;
1933 source_references
1934 .references
1935 .iter()
1936 .map(|reference| {
1937 let mut row = Row::default();
1938 let mut packer = row.packer();
1939 packer.extend([
1940 Datum::String(&source_id),
1941 reference
1942 .namespace
1943 .as_ref()
1944 .map(|s| Datum::String(s))
1945 .unwrap_or(Datum::Null),
1946 Datum::String(&reference.name),
1947 Datum::TimestampTz(
1948 mz_ore::now::to_datetime(*updated_at)
1949 .try_into()
1950 .expect("must fit"),
1951 ),
1952 ]);
1953 if reference.columns.len() > 0 {
1954 packer
1955 .try_push_array(
1956 &[ArrayDimension {
1957 lower_bound: 1,
1958 length: reference.columns.len(),
1959 }],
1960 reference.columns.iter().map(|col| Datum::String(col)),
1961 )
1962 .expect(
1963 "columns is 1 dimensional, and its length is used for the array length",
1964 );
1965 } else {
1966 packer.push(Datum::Null);
1967 }
1968
1969 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
1970 })
1971 .collect()
1972 }
1973}