1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20 MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_DEFAULT_PRIVILEGES,
21 MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS,
22 MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
23 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25 MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26 MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27 MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
28 MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
29 MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35 CatalogEntry, CatalogItem, ClusterVariant, Connection, DataSourceDesc, Func, Index,
36 MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55 CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84 pub id: T,
86 pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93 BuiltinTableUpdate {
94 id,
95 data: TableData::Rows(vec![(row, diff)]),
96 }
97 }
98
99 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100 BuiltinTableUpdate {
101 id,
102 data: TableData::Batches(smallvec![batch]),
103 }
104 }
105}
106
107impl CatalogState {
108 pub fn resolve_builtin_table_updates(
109 &self,
110 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112 builtin_table_update
113 .into_iter()
114 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115 .collect()
116 }
117
118 pub fn resolve_builtin_table_update(
119 &self,
120 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121 ) -> BuiltinTableUpdate<CatalogItemId> {
122 let id = self.resolve_builtin_table(id);
123 BuiltinTableUpdate { id, data }
124 }
125
126 pub fn pack_depends_update(
127 &self,
128 depender: CatalogItemId,
129 dependee: CatalogItemId,
130 diff: Diff,
131 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132 let row = Row::pack_slice(&[
133 Datum::String(&depender.to_string()),
134 Datum::String(&dependee.to_string()),
135 ]);
136 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137 }
138
139 pub(super) fn pack_role_auth_update(
140 &self,
141 id: RoleId,
142 diff: Diff,
143 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144 let role_auth = self.get_role_auth(&id);
145 let role = self.get_role(&id);
146 BuiltinTableUpdate::row(
147 &*MZ_ROLE_AUTH,
148 Row::pack_slice(&[
149 Datum::String(&role_auth.role_id.to_string()),
150 Datum::UInt32(role.oid),
151 match &role_auth.password_hash {
152 Some(hash) => Datum::String(hash),
153 None => Datum::Null,
154 },
155 Datum::TimestampTz(
156 mz_ore::now::to_datetime(role_auth.updated_at)
157 .try_into()
158 .expect("must fit"),
159 ),
160 ]),
161 diff,
162 )
163 }
164
165 pub(super) fn pack_role_update(
166 &self,
167 id: RoleId,
168 diff: Diff,
169 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170 match id {
171 RoleId::Public => vec![],
173 id => {
174 let role = self.get_role(&id);
175 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177 let rolcanlogin = if let Some(login) = role.attributes.login {
178 login
179 } else {
180 builtin_supers.contains(&role.id)
181 };
182
183 let rolsuper = if let Some(superuser) = role.attributes.superuser {
184 Datum::from(superuser)
185 } else if builtin_supers.contains(&role.id) {
186 Datum::from(true)
188 } else {
189 Datum::Null
198 };
199
200 let role_update = BuiltinTableUpdate::row(
201 &*MZ_ROLES,
202 Row::pack_slice(&[
203 Datum::String(&role.id.to_string()),
204 Datum::UInt32(role.oid),
205 Datum::String(&role.name),
206 Datum::from(role.attributes.inherit),
207 Datum::from(rolcanlogin),
208 rolsuper,
209 ]),
210 diff,
211 );
212 let mut updates = vec![role_update];
213
214 let session_vars_reference = SessionVars::new_unchecked(
217 &mz_build_info::DUMMY_BUILD_INFO,
218 SYSTEM_USER.clone(),
219 None,
220 );
221
222 for (name, val) in role.vars() {
223 let result = session_vars_reference
224 .inspect(name)
225 .and_then(|var| var.check(val.borrow()));
226 let Ok(formatted_val) = result else {
227 tracing::error!(?name, ?val, ?result, "found invalid role default var");
230 continue;
231 };
232
233 let role_var_update = BuiltinTableUpdate::row(
234 &*MZ_ROLE_PARAMETERS,
235 Row::pack_slice(&[
236 Datum::String(&role.id.to_string()),
237 Datum::String(name),
238 Datum::String(&formatted_val),
239 ]),
240 diff,
241 );
242 updates.push(role_var_update);
243 }
244
245 updates
246 }
247 }
248 }
249
250 pub(super) fn pack_cluster_update(
251 &self,
252 name: &str,
253 diff: Diff,
254 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255 let id = self.clusters_by_name[name];
256 let cluster = &self.clusters_by_id[&id];
257 let row = self.pack_privilege_array_row(cluster.privileges());
258 let privileges = row.unpack_first();
259 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260 match &cluster.config.variant {
261 ClusterVariant::Managed(config) => (
262 Some(config.size.as_str()),
263 Some(self.cluster_replica_size_has_disk(&config.size)),
264 Some(config.replication_factor),
265 if config.availability_zones.is_empty() {
266 None
267 } else {
268 Some(config.availability_zones.clone())
269 },
270 Some(config.logging.log_logging),
271 config.logging.interval.map(|d| {
272 Interval::from_duration(&d)
273 .expect("planning ensured this convertible back to interval")
274 }),
275 ),
276 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277 };
278
279 let mut row = Row::default();
280 let mut packer = row.packer();
281 packer.extend([
282 Datum::String(&id.to_string()),
283 Datum::String(name),
284 Datum::String(&cluster.owner_id.to_string()),
285 privileges,
286 cluster.is_managed().into(),
287 size.into(),
288 replication_factor.into(),
289 disk.into(),
290 ]);
291 if let Some(azs) = azs {
292 packer.push_list(azs.iter().map(|az| Datum::String(az)));
293 } else {
294 packer.push(Datum::Null);
295 }
296 packer.push(Datum::from(introspection_debugging));
297 packer.push(Datum::from(introspection_interval));
298
299 let mut updates = Vec::new();
300
301 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304 let row = match managed_config.schedule {
305 ClusterSchedule::Manual => Row::pack_slice(&[
306 Datum::String(&id.to_string()),
307 Datum::String("manual"),
308 Datum::Null,
309 ]),
310 ClusterSchedule::Refresh {
311 hydration_time_estimate,
312 } => Row::pack_slice(&[
313 Datum::String(&id.to_string()),
314 Datum::String("on-refresh"),
315 Datum::Interval(
316 Interval::from_duration(&hydration_time_estimate)
317 .expect("planning ensured that this is convertible back to Interval"),
318 ),
319 ]),
320 };
321 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322 }
323
324 updates
325 }
326
327 pub(super) fn pack_cluster_replica_update(
328 &self,
329 cluster_id: ClusterId,
330 name: &str,
331 diff: Diff,
332 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333 let cluster = &self.clusters_by_id[&cluster_id];
334 let id = cluster.replica_id(name).expect("Must exist");
335 let replica = cluster.replica(id).expect("Must exist");
336
337 let (size, disk, az) = match &replica.config.location {
338 ReplicaLocation::Managed(ManagedReplicaLocation {
341 size,
342 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343 allocation: _,
344 billed_as: _,
345 internal: _,
346 pending: _,
347 }) => (
348 Some(&**size),
349 Some(self.cluster_replica_size_has_disk(size)),
350 Some(az.as_str()),
351 ),
352 ReplicaLocation::Managed(ManagedReplicaLocation {
353 size,
354 availability_zones: _,
355 allocation: _,
356 billed_as: _,
357 internal: _,
358 pending: _,
359 }) => (
360 Some(&**size),
361 Some(self.cluster_replica_size_has_disk(size)),
362 None,
363 ),
364 ReplicaLocation::Unmanaged(_) => (None, None, None),
365 };
366
367 let cluster_replica_update = BuiltinTableUpdate::row(
368 &*MZ_CLUSTER_REPLICAS,
369 Row::pack_slice(&[
370 Datum::String(&id.to_string()),
371 Datum::String(name),
372 Datum::String(&cluster_id.to_string()),
373 Datum::from(size),
374 Datum::from(az),
375 Datum::String(&replica.owner_id.to_string()),
376 Datum::from(disk),
377 ]),
378 diff,
379 );
380
381 vec![cluster_replica_update]
382 }
383
384 pub(super) fn pack_item_update(
385 &self,
386 id: CatalogItemId,
387 diff: Diff,
388 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389 let entry = self.get_entry(&id);
390 let oid = entry.oid();
391 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392 let schema_id = &self
393 .get_schema(
394 &entry.name().qualifiers.database_spec,
395 &entry.name().qualifiers.schema_spec,
396 conn_id,
397 )
398 .id;
399 let name = &entry.name().item;
400 let owner_id = entry.owner_id();
401 let privileges_row = self.pack_privilege_array_row(entry.privileges());
402 let privileges = privileges_row.unpack_first();
403 let mut updates = match entry.item() {
404 CatalogItem::Index(index) => {
405 self.pack_index_update(id, oid, name, owner_id, index, diff)
406 }
407 CatalogItem::Table(table) => {
408 let mut updates = self
409 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
410
411 if let TableDataSource::DataSource {
412 desc: data_source,
413 timeline: _,
414 } = &table.data_source
415 {
416 updates.extend(match data_source {
417 DataSourceDesc::IngestionExport {
418 ingestion_id,
419 external_reference: UnresolvedItemName(external_reference),
420 details: _,
421 data_config: _,
422 } => {
423 let ingestion_entry = self
424 .get_entry(ingestion_id)
425 .source_desc()
426 .expect("primary source exists")
427 .expect("primary source is a source");
428
429 match ingestion_entry.connection.name() {
430 "postgres" => {
431 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
432 let schema_name = external_reference[1].as_str();
438 let table_name = external_reference[2].as_str();
439
440 self.pack_postgres_source_tables_update(
441 id,
442 schema_name,
443 table_name,
444 diff,
445 )
446 }
447 "mysql" => {
448 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
449 let schema_name = external_reference[0].as_str();
450 let table_name = external_reference[1].as_str();
451
452 self.pack_mysql_source_tables_update(
453 id,
454 schema_name,
455 table_name,
456 diff,
457 )
458 }
459 "sql-server" => {
460 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
461 let schema_name = external_reference[1].as_str();
466 let table_name = external_reference[2].as_str();
467
468 self.pack_sql_server_source_table_update(
469 id,
470 schema_name,
471 table_name,
472 diff,
473 )
474 }
475 "load-generator" => vec![],
478 "kafka" => {
479 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
480 let topic = external_reference[0].as_str();
481 let envelope = data_source.envelope();
482 let (key_format, value_format) = data_source.formats();
483
484 self.pack_kafka_source_tables_update(
485 id,
486 topic,
487 envelope,
488 key_format,
489 value_format,
490 diff,
491 )
492 }
493 s => unreachable!("{s} sources do not have tables"),
494 }
495 }
496 DataSourceDesc::Ingestion { .. }
497 | DataSourceDesc::OldSyntaxIngestion { .. }
498 | DataSourceDesc::Introspection(_)
499 | DataSourceDesc::Progress
500 | DataSourceDesc::Webhook { .. }
501 | DataSourceDesc::Catalog => vec![],
502 });
503 }
504
505 updates
506 }
507 CatalogItem::Source(source) => {
508 match &source.data_source {
509 DataSourceDesc::Ingestion { desc, .. }
510 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
511 GenericSourceConnection::Postgres(postgres) => {
512 self.pack_postgres_source_update(id, postgres, diff)
513 }
514 GenericSourceConnection::Kafka(kafka) => {
515 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
516 }
517 _ => vec![],
518 },
519 DataSourceDesc::IngestionExport {
520 ingestion_id,
521 external_reference: UnresolvedItemName(external_reference),
522 details: _,
523 data_config: _,
524 } => {
525 let ingestion_entry = self
526 .get_entry(ingestion_id)
527 .source_desc()
528 .expect("primary source exists")
529 .expect("primary source is a source");
530
531 match ingestion_entry.connection.name() {
532 "postgres" => {
533 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
534 let schema_name = external_reference[1].as_str();
540 let table_name = external_reference[2].as_str();
541
542 self.pack_postgres_source_tables_update(
543 id,
544 schema_name,
545 table_name,
546 diff,
547 )
548 }
549 "mysql" => {
550 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
551 let schema_name = external_reference[0].as_str();
552 let table_name = external_reference[1].as_str();
553
554 self.pack_mysql_source_tables_update(
555 id,
556 schema_name,
557 table_name,
558 diff,
559 )
560 }
561 "sql-server" => {
562 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
563 let schema_name = external_reference[1].as_str();
568 let table_name = external_reference[2].as_str();
569
570 self.pack_sql_server_source_table_update(
571 id,
572 schema_name,
573 table_name,
574 diff,
575 )
576 }
577 "load-generator" => vec![],
580 s => unreachable!("{s} sources do not have subsources"),
581 }
582 }
583 DataSourceDesc::Webhook { .. } => {
584 vec![self.pack_webhook_source_update(id, diff)]
585 }
586 DataSourceDesc::Introspection(_)
587 | DataSourceDesc::Progress
588 | DataSourceDesc::Catalog => vec![],
589 }
590 }
591 CatalogItem::View(view) => {
592 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
593 }
594 CatalogItem::MaterializedView(mview) => {
595 self.pack_materialized_view_update(id, mview, diff)
596 }
597 CatalogItem::Sink(sink) => {
598 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
599 }
600 CatalogItem::Type(ty) => {
601 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
602 }
603 CatalogItem::Func(func) => {
604 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
605 }
606 CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
607 CatalogItem::Connection(connection) => {
608 self.pack_connection_update(id, connection, diff)
609 }
610 };
611
612 if !entry.item().is_temporary() {
613 for dependee in entry.item().references().items() {
616 updates.push(self.pack_depends_update(id, *dependee, diff))
617 }
618 }
619
620 if let Some(desc) = entry.relation_desc_latest() {
622 let defaults = match entry.item() {
623 CatalogItem::Table(Table {
624 data_source: TableDataSource::TableWrites { defaults },
625 ..
626 }) => Some(defaults),
627 _ => None,
628 };
629 for (i, (column_name, column_type)) in desc.iter().enumerate() {
630 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
631 let default: Datum = default
632 .as_ref()
633 .map(|d| Datum::String(d))
634 .unwrap_or(Datum::Null);
635 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
636 let (type_name, type_oid) = match &column_type.scalar_type {
637 SqlScalarType::List {
638 custom_id: Some(custom_id),
639 ..
640 }
641 | SqlScalarType::Map {
642 custom_id: Some(custom_id),
643 ..
644 }
645 | SqlScalarType::Record {
646 custom_id: Some(custom_id),
647 ..
648 } => {
649 let entry = self.get_entry(custom_id);
650 let name = &*entry.name().item;
665 let oid = entry.oid();
666 (name, oid)
667 }
668 _ => (pgtype.name(), pgtype.oid()),
669 };
670 updates.push(BuiltinTableUpdate::row(
671 &*MZ_COLUMNS,
672 Row::pack_slice(&[
673 Datum::String(&id.to_string()),
674 Datum::String(column_name),
675 Datum::UInt64(u64::cast_from(i + 1)),
676 Datum::from(column_type.nullable),
677 Datum::String(type_name),
678 default,
679 Datum::UInt32(type_oid),
680 Datum::Int32(pgtype.typmod()),
681 ]),
682 diff,
683 ));
684 }
685 }
686
687 if let Some(cw) = entry.item().initial_logical_compaction_window() {
689 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
690 }
691
692 updates.extend(Self::pack_item_global_id_update(entry, diff));
693
694 updates
695 }
696
697 fn pack_item_global_id_update(
698 entry: &CatalogEntry,
699 diff: Diff,
700 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
701 let id = entry.id().to_string();
702 let global_ids = entry.global_ids();
703 global_ids.map(move |global_id| {
704 BuiltinTableUpdate::row(
705 &*MZ_OBJECT_GLOBAL_IDS,
706 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
707 diff,
708 )
709 })
710 }
711
712 fn pack_history_retention_strategy_update(
713 &self,
714 id: CatalogItemId,
715 cw: CompactionWindow,
716 diff: Diff,
717 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
718 let cw: u64 = cw.comparable_timestamp().into();
719 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
720 .expect("must serialize");
721 BuiltinTableUpdate::row(
722 &*MZ_HISTORY_RETENTION_STRATEGIES,
723 Row::pack_slice(&[
724 Datum::String(&id.to_string()),
725 Datum::String("FOR"),
727 cw.into_row().into_element(),
728 ]),
729 diff,
730 )
731 }
732
733 fn pack_table_update(
734 &self,
735 id: CatalogItemId,
736 oid: u32,
737 schema_id: &SchemaSpecifier,
738 name: &str,
739 owner_id: &RoleId,
740 privileges: Datum,
741 diff: Diff,
742 table: &Table,
743 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
744 let redacted = table.create_sql.as_ref().map(|create_sql| {
745 mz_sql::parse::parse(create_sql)
746 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
747 .into_element()
748 .ast
749 .to_ast_string_redacted()
750 });
751 let source_id = if let TableDataSource::DataSource {
752 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
753 ..
754 } = &table.data_source
755 {
756 Some(ingestion_id.to_string())
757 } else {
758 None
759 };
760
761 vec![BuiltinTableUpdate::row(
762 &*MZ_TABLES,
763 Row::pack_slice(&[
764 Datum::String(&id.to_string()),
765 Datum::UInt32(oid),
766 Datum::String(&schema_id.to_string()),
767 Datum::String(name),
768 Datum::String(&owner_id.to_string()),
769 privileges,
770 if let Some(create_sql) = &table.create_sql {
771 Datum::String(create_sql)
772 } else {
773 Datum::Null
774 },
775 if let Some(redacted) = &redacted {
776 Datum::String(redacted)
777 } else {
778 Datum::Null
779 },
780 if let Some(source_id) = source_id.as_ref() {
781 Datum::String(source_id)
782 } else {
783 Datum::Null
784 },
785 ]),
786 diff,
787 )]
788 }
789
790 fn pack_postgres_source_update(
791 &self,
792 id: CatalogItemId,
793 postgres: &PostgresSourceConnection<ReferencedConnection>,
794 diff: Diff,
795 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
796 vec![BuiltinTableUpdate::row(
797 &*MZ_POSTGRES_SOURCES,
798 Row::pack_slice(&[
799 Datum::String(&id.to_string()),
800 Datum::String(&postgres.publication_details.slot),
801 Datum::from(postgres.publication_details.timeline_id),
802 ]),
803 diff,
804 )]
805 }
806
807 fn pack_kafka_source_update(
808 &self,
809 item_id: CatalogItemId,
810 collection_id: GlobalId,
811 kafka: &KafkaSourceConnection<ReferencedConnection>,
812 diff: Diff,
813 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
814 vec![BuiltinTableUpdate::row(
815 &*MZ_KAFKA_SOURCES,
816 Row::pack_slice(&[
817 Datum::String(&item_id.to_string()),
818 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
819 Datum::String(&kafka.topic),
820 ]),
821 diff,
822 )]
823 }
824
825 fn pack_postgres_source_tables_update(
826 &self,
827 id: CatalogItemId,
828 schema_name: &str,
829 table_name: &str,
830 diff: Diff,
831 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
832 vec![BuiltinTableUpdate::row(
833 &*MZ_POSTGRES_SOURCE_TABLES,
834 Row::pack_slice(&[
835 Datum::String(&id.to_string()),
836 Datum::String(schema_name),
837 Datum::String(table_name),
838 ]),
839 diff,
840 )]
841 }
842
843 fn pack_mysql_source_tables_update(
844 &self,
845 id: CatalogItemId,
846 schema_name: &str,
847 table_name: &str,
848 diff: Diff,
849 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
850 vec![BuiltinTableUpdate::row(
851 &*MZ_MYSQL_SOURCE_TABLES,
852 Row::pack_slice(&[
853 Datum::String(&id.to_string()),
854 Datum::String(schema_name),
855 Datum::String(table_name),
856 ]),
857 diff,
858 )]
859 }
860
861 fn pack_sql_server_source_table_update(
862 &self,
863 id: CatalogItemId,
864 schema_name: &str,
865 table_name: &str,
866 diff: Diff,
867 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
868 vec![BuiltinTableUpdate::row(
869 &*MZ_SQL_SERVER_SOURCE_TABLES,
870 Row::pack_slice(&[
871 Datum::String(&id.to_string()),
872 Datum::String(schema_name),
873 Datum::String(table_name),
874 ]),
875 diff,
876 )]
877 }
878
879 fn pack_kafka_source_tables_update(
880 &self,
881 id: CatalogItemId,
882 topic: &str,
883 envelope: Option<&str>,
884 key_format: Option<&str>,
885 value_format: Option<&str>,
886 diff: Diff,
887 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
888 vec![BuiltinTableUpdate::row(
889 &*MZ_KAFKA_SOURCE_TABLES,
890 Row::pack_slice(&[
891 Datum::String(&id.to_string()),
892 Datum::String(topic),
893 Datum::from(envelope),
894 Datum::from(key_format),
895 Datum::from(value_format),
896 ]),
897 diff,
898 )]
899 }
900
901 fn pack_connection_update(
902 &self,
903 id: CatalogItemId,
904 connection: &Connection,
905 diff: Diff,
906 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
907 let mut updates = vec![];
908 match connection.details {
909 ConnectionDetails::Kafka(ref kafka) => {
910 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
911 }
912 ConnectionDetails::Aws(ref aws_config) => {
913 match self.pack_aws_connection_update(id, aws_config, diff) {
914 Ok(update) => {
915 updates.push(update);
916 }
917 Err(e) => {
918 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
919 }
920 }
921 }
922 ConnectionDetails::AwsPrivatelink(_) => {
923 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
924 updates.push(self.pack_aws_privatelink_connection_update(
925 id,
926 aws_principal_context,
927 diff,
928 ));
929 } else {
930 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
931 }
932 }
933 ConnectionDetails::Ssh {
934 ref key_1,
935 ref key_2,
936 ..
937 } => {
938 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
939 }
940 ConnectionDetails::Csr(_)
941 | ConnectionDetails::Postgres(_)
942 | ConnectionDetails::MySql(_)
943 | ConnectionDetails::SqlServer(_)
944 | ConnectionDetails::IcebergCatalog(_) => (),
945 };
946 updates
947 }
948
949 pub(crate) fn pack_ssh_tunnel_connection_update(
950 &self,
951 id: CatalogItemId,
952 key_1: &SshKey,
953 key_2: &SshKey,
954 diff: Diff,
955 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
956 BuiltinTableUpdate::row(
957 &*MZ_SSH_TUNNEL_CONNECTIONS,
958 Row::pack_slice(&[
959 Datum::String(&id.to_string()),
960 Datum::String(key_1.public_key().as_str()),
961 Datum::String(key_2.public_key().as_str()),
962 ]),
963 diff,
964 )
965 }
966
967 fn pack_kafka_connection_update(
968 &self,
969 id: CatalogItemId,
970 kafka: &KafkaConnection<ReferencedConnection>,
971 diff: Diff,
972 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
973 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
974 let mut row = Row::default();
975 row.packer()
976 .try_push_array(
977 &[ArrayDimension {
978 lower_bound: 1,
979 length: kafka.brokers.len(),
980 }],
981 kafka
982 .brokers
983 .iter()
984 .map(|broker| Datum::String(&broker.address)),
985 )
986 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
987 let brokers = row.unpack_first();
988 vec![BuiltinTableUpdate::row(
989 &*MZ_KAFKA_CONNECTIONS,
990 Row::pack_slice(&[
991 Datum::String(&id.to_string()),
992 brokers,
993 Datum::String(&progress_topic),
994 ]),
995 diff,
996 )]
997 }
998
999 pub fn pack_aws_privatelink_connection_update(
1000 &self,
1001 connection_id: CatalogItemId,
1002 aws_principal_context: &AwsPrincipalContext,
1003 diff: Diff,
1004 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1005 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1006 let row = Row::pack_slice(&[
1007 Datum::String(&connection_id.to_string()),
1008 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1009 ]);
1010 BuiltinTableUpdate::row(id, row, diff)
1011 }
1012
1013 pub fn pack_aws_connection_update(
1014 &self,
1015 connection_id: CatalogItemId,
1016 aws_config: &AwsConnection,
1017 diff: Diff,
1018 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1019 let id = &MZ_AWS_CONNECTIONS;
1020
1021 let mut access_key_id = None;
1022 let mut access_key_id_secret_id = None;
1023 let mut secret_access_key_secret_id = None;
1024 let mut session_token = None;
1025 let mut session_token_secret_id = None;
1026 let mut assume_role_arn = None;
1027 let mut assume_role_session_name = None;
1028 let mut principal = None;
1029 let mut external_id = None;
1030 let mut example_trust_policy = None;
1031 match &aws_config.auth {
1032 AwsAuth::Credentials(credentials) => {
1033 match &credentials.access_key_id {
1034 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1035 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1036 }
1037 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1038 match credentials.session_token.as_ref() {
1039 None => (),
1040 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1041 Some(StringOrSecret::Secret(s)) => {
1042 session_token_secret_id = Some(s.to_string())
1043 }
1044 }
1045 }
1046 AwsAuth::AssumeRole(assume_role) => {
1047 assume_role_arn = Some(assume_role.arn.as_str());
1048 assume_role_session_name = assume_role.session_name.as_deref();
1049 principal = self
1050 .config
1051 .connection_context
1052 .aws_connection_role_arn
1053 .as_deref();
1054 external_id =
1055 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1056 example_trust_policy = {
1057 let policy = assume_role
1058 .example_trust_policy(&self.config.connection_context, connection_id)?;
1059 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1060 Some(policy.into_row())
1061 };
1062 }
1063 }
1064
1065 let row = Row::pack_slice(&[
1066 Datum::String(&connection_id.to_string()),
1067 Datum::from(aws_config.endpoint.as_deref()),
1068 Datum::from(aws_config.region.as_deref()),
1069 Datum::from(access_key_id),
1070 Datum::from(access_key_id_secret_id.as_deref()),
1071 Datum::from(secret_access_key_secret_id.as_deref()),
1072 Datum::from(session_token),
1073 Datum::from(session_token_secret_id.as_deref()),
1074 Datum::from(assume_role_arn),
1075 Datum::from(assume_role_session_name),
1076 Datum::from(principal),
1077 Datum::from(external_id.as_deref()),
1078 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1079 ]);
1080
1081 Ok(BuiltinTableUpdate::row(id, row, diff))
1082 }
1083
1084 fn pack_view_update(
1085 &self,
1086 id: CatalogItemId,
1087 oid: u32,
1088 schema_id: &SchemaSpecifier,
1089 name: &str,
1090 owner_id: &RoleId,
1091 privileges: Datum,
1092 view: &View,
1093 diff: Diff,
1094 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1095 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1096 .unwrap_or_else(|e| {
1097 panic!(
1098 "create_sql cannot be invalid: `{}` --- error: `{}`",
1099 view.create_sql, e
1100 )
1101 })
1102 .into_element()
1103 .ast;
1104 let query = match &create_stmt {
1105 Statement::CreateView(stmt) => &stmt.definition.query,
1106 _ => unreachable!(),
1107 };
1108
1109 let mut query_string = query.to_ast_string_stable();
1110 query_string.push(';');
1113
1114 vec![BuiltinTableUpdate::row(
1115 &*MZ_VIEWS,
1116 Row::pack_slice(&[
1117 Datum::String(&id.to_string()),
1118 Datum::UInt32(oid),
1119 Datum::String(&schema_id.to_string()),
1120 Datum::String(name),
1121 Datum::String(&query_string),
1122 Datum::String(&owner_id.to_string()),
1123 privileges,
1124 Datum::String(&view.create_sql),
1125 Datum::String(&create_stmt.to_ast_string_redacted()),
1126 ]),
1127 diff,
1128 )]
1129 }
1130
1131 fn pack_materialized_view_update(
1132 &self,
1133 id: CatalogItemId,
1134 mview: &MaterializedView,
1135 diff: Diff,
1136 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1137 let mut updates = Vec::new();
1138
1139 if let Some(refresh_schedule) = &mview.refresh_schedule {
1140 assert!(!refresh_schedule.is_empty());
1143 for RefreshEvery {
1144 interval,
1145 aligned_to,
1146 } in refresh_schedule.everies.iter()
1147 {
1148 let aligned_to_dt = mz_ore::now::to_datetime(
1149 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1150 );
1151 updates.push(BuiltinTableUpdate::row(
1152 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1153 Row::pack_slice(&[
1154 Datum::String(&id.to_string()),
1155 Datum::String("every"),
1156 Datum::Interval(
1157 Interval::from_duration(interval).expect(
1158 "planning ensured that this is convertible back to Interval",
1159 ),
1160 ),
1161 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1162 Datum::Null,
1163 ]),
1164 diff,
1165 ));
1166 }
1167 for at in refresh_schedule.ats.iter() {
1168 let at_dt = mz_ore::now::to_datetime(
1169 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1170 );
1171 updates.push(BuiltinTableUpdate::row(
1172 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1173 Row::pack_slice(&[
1174 Datum::String(&id.to_string()),
1175 Datum::String("at"),
1176 Datum::Null,
1177 Datum::Null,
1178 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1179 ]),
1180 diff,
1181 ));
1182 }
1183 } else {
1184 updates.push(BuiltinTableUpdate::row(
1185 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1186 Row::pack_slice(&[
1187 Datum::String(&id.to_string()),
1188 Datum::String("on-commit"),
1189 Datum::Null,
1190 Datum::Null,
1191 Datum::Null,
1192 ]),
1193 diff,
1194 ));
1195 }
1196
1197 if let Some(target_id) = mview.replacement_target {
1198 updates.push(BuiltinTableUpdate::row(
1199 &*MZ_REPLACEMENTS,
1200 Row::pack_slice(&[
1201 Datum::String(&id.to_string()),
1202 Datum::String(&target_id.to_string()),
1203 ]),
1204 diff,
1205 ));
1206 }
1207
1208 updates
1209 }
1210
1211 fn pack_sink_update(
1212 &self,
1213 id: CatalogItemId,
1214 oid: u32,
1215 schema_id: &SchemaSpecifier,
1216 name: &str,
1217 owner_id: &RoleId,
1218 sink: &Sink,
1219 diff: Diff,
1220 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1221 let mut updates = vec![];
1222 match &sink.connection {
1223 StorageSinkConnection::Kafka(KafkaSinkConnection {
1224 topic: topic_name, ..
1225 }) => {
1226 updates.push(BuiltinTableUpdate::row(
1227 &*MZ_KAFKA_SINKS,
1228 Row::pack_slice(&[
1229 Datum::String(&id.to_string()),
1230 Datum::String(topic_name.as_str()),
1231 ]),
1232 diff,
1233 ));
1234 }
1235 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1236 namespace, table, ..
1237 }) => {
1238 updates.push(BuiltinTableUpdate::row(
1239 &*MZ_ICEBERG_SINKS,
1240 Row::pack_slice(&[
1241 Datum::String(&id.to_string()),
1242 Datum::String(namespace.as_str()),
1243 Datum::String(table.as_str()),
1244 ]),
1245 diff,
1246 ));
1247 }
1248 };
1249
1250 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1251 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1252 .into_element()
1253 .ast;
1254
1255 let envelope = sink.envelope();
1256
1257 let combined_format = sink.combined_format();
1259 let (key_format, value_format) = match sink.formats() {
1260 Some((key_format, value_format)) => (key_format, Some(value_format)),
1261 None => (None, None),
1262 };
1263
1264 updates.push(BuiltinTableUpdate::row(
1265 &*MZ_SINKS,
1266 Row::pack_slice(&[
1267 Datum::String(&id.to_string()),
1268 Datum::UInt32(oid),
1269 Datum::String(&schema_id.to_string()),
1270 Datum::String(name),
1271 Datum::String(sink.connection.name()),
1272 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1273 Datum::Null,
1275 Datum::from(envelope),
1276 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1279 Datum::from(key_format),
1280 Datum::from(value_format),
1281 Datum::String(&sink.cluster_id.to_string()),
1282 Datum::String(&owner_id.to_string()),
1283 Datum::String(&sink.create_sql),
1284 Datum::String(&create_stmt.to_ast_string_redacted()),
1285 ]),
1286 diff,
1287 ));
1288
1289 updates
1290 }
1291
1292 fn pack_index_update(
1293 &self,
1294 id: CatalogItemId,
1295 oid: u32,
1296 name: &str,
1297 owner_id: &RoleId,
1298 index: &Index,
1299 diff: Diff,
1300 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1301 let mut updates = vec![];
1302
1303 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1304 .unwrap_or_else(|e| {
1305 panic!(
1306 "create_sql cannot be invalid: `{}` --- error: `{}`",
1307 index.create_sql, e
1308 )
1309 })
1310 .into_element()
1311 .ast;
1312
1313 let key_sqls = match &create_stmt {
1314 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1315 .as_ref()
1316 .expect("key_parts is filled in during planning"),
1317 _ => unreachable!(),
1318 };
1319 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1320
1321 updates.push(BuiltinTableUpdate::row(
1322 &*MZ_INDEXES,
1323 Row::pack_slice(&[
1324 Datum::String(&id.to_string()),
1325 Datum::UInt32(oid),
1326 Datum::String(name),
1327 Datum::String(&on_item_id.to_string()),
1328 Datum::String(&index.cluster_id.to_string()),
1329 Datum::String(&owner_id.to_string()),
1330 Datum::String(&index.create_sql),
1331 Datum::String(&create_stmt.to_ast_string_redacted()),
1332 ]),
1333 diff,
1334 ));
1335
1336 let on_entry = self.get_entry_by_global_id(&index.on);
1337 let on_desc = on_entry
1338 .relation_desc()
1339 .expect("can only create indexes on items with a valid description");
1340 let repr_col_types: Vec<ReprColumnType> = on_desc
1341 .typ()
1342 .column_types
1343 .iter()
1344 .map(ReprColumnType::from)
1345 .collect();
1346 for (i, key) in index.keys.iter().enumerate() {
1347 let nullable = key.typ(&repr_col_types).nullable;
1348 let seq_in_index = u64::cast_from(i + 1);
1349 let key_sql = key_sqls
1350 .get(i)
1351 .expect("missing sql information for index key")
1352 .to_ast_string_simple();
1353 let (field_number, expression) = match key {
1354 MirScalarExpr::Column(col, _) => {
1355 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1356 }
1357 _ => (Datum::Null, Datum::String(&key_sql)),
1358 };
1359 updates.push(BuiltinTableUpdate::row(
1360 &*MZ_INDEX_COLUMNS,
1361 Row::pack_slice(&[
1362 Datum::String(&id.to_string()),
1363 Datum::UInt64(seq_in_index),
1364 field_number,
1365 expression,
1366 Datum::from(nullable),
1367 ]),
1368 diff,
1369 ));
1370 }
1371
1372 updates
1373 }
1374
1375 fn pack_type_update(
1376 &self,
1377 id: CatalogItemId,
1378 oid: u32,
1379 schema_id: &SchemaSpecifier,
1380 name: &str,
1381 owner_id: &RoleId,
1382 privileges: Datum,
1383 typ: &Type,
1384 diff: Diff,
1385 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1386 let mut out = vec![];
1387
1388 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1389 mz_sql::parse::parse(create_sql)
1390 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1391 .into_element()
1392 .ast
1393 .to_ast_string_redacted()
1394 });
1395
1396 out.push(BuiltinTableUpdate::row(
1397 &*MZ_TYPES,
1398 Row::pack_slice(&[
1399 Datum::String(&id.to_string()),
1400 Datum::UInt32(oid),
1401 Datum::String(&schema_id.to_string()),
1402 Datum::String(name),
1403 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1404 Datum::String(&owner_id.to_string()),
1405 privileges,
1406 if let Some(create_sql) = &typ.create_sql {
1407 Datum::String(create_sql)
1408 } else {
1409 Datum::Null
1410 },
1411 if let Some(redacted) = &redacted {
1412 Datum::String(redacted)
1413 } else {
1414 Datum::Null
1415 },
1416 ]),
1417 diff,
1418 ));
1419
1420 let mut row = Row::default();
1421 let mut packer = row.packer();
1422
1423 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1424 if mods.is_empty() {
1425 packer.push(Datum::Null);
1426 } else {
1427 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1428 }
1429 }
1430
1431 let index_id = match &typ.details.typ {
1432 CatalogType::Array {
1433 element_reference: element_id,
1434 } => {
1435 packer.push(Datum::String(&id.to_string()));
1436 packer.push(Datum::String(&element_id.to_string()));
1437 &MZ_ARRAY_TYPES
1438 }
1439 CatalogType::List {
1440 element_reference: element_id,
1441 element_modifiers,
1442 } => {
1443 packer.push(Datum::String(&id.to_string()));
1444 packer.push(Datum::String(&element_id.to_string()));
1445 append_modifier(&mut packer, element_modifiers);
1446 &MZ_LIST_TYPES
1447 }
1448 CatalogType::Map {
1449 key_reference: key_id,
1450 value_reference: value_id,
1451 key_modifiers,
1452 value_modifiers,
1453 } => {
1454 packer.push(Datum::String(&id.to_string()));
1455 packer.push(Datum::String(&key_id.to_string()));
1456 packer.push(Datum::String(&value_id.to_string()));
1457 append_modifier(&mut packer, key_modifiers);
1458 append_modifier(&mut packer, value_modifiers);
1459 &MZ_MAP_TYPES
1460 }
1461 CatalogType::Pseudo => {
1462 packer.push(Datum::String(&id.to_string()));
1463 &MZ_PSEUDO_TYPES
1464 }
1465 _ => {
1466 packer.push(Datum::String(&id.to_string()));
1467 &MZ_BASE_TYPES
1468 }
1469 };
1470 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1471
1472 if let Some(pg_metadata) = &typ.details.pg_metadata {
1473 out.push(BuiltinTableUpdate::row(
1474 &*MZ_TYPE_PG_METADATA,
1475 Row::pack_slice(&[
1476 Datum::String(&id.to_string()),
1477 Datum::UInt32(pg_metadata.typinput_oid),
1478 Datum::UInt32(pg_metadata.typreceive_oid),
1479 ]),
1480 diff,
1481 ));
1482 }
1483
1484 out
1485 }
1486
1487 fn pack_func_update(
1488 &self,
1489 id: CatalogItemId,
1490 schema_id: &SchemaSpecifier,
1491 name: &str,
1492 owner_id: &RoleId,
1493 func: &Func,
1494 diff: Diff,
1495 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1496 let mut updates = vec![];
1497 for func_impl_details in func.inner.func_impls() {
1498 let arg_type_ids = func_impl_details
1499 .arg_typs
1500 .iter()
1501 .map(|typ| self.get_system_type(typ).id().to_string())
1502 .collect::<Vec<_>>();
1503
1504 let mut row = Row::default();
1505 row.packer()
1506 .try_push_array(
1507 &[ArrayDimension {
1508 lower_bound: 1,
1509 length: arg_type_ids.len(),
1510 }],
1511 arg_type_ids.iter().map(|id| Datum::String(id)),
1512 )
1513 .expect(
1514 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1515 );
1516 let arg_type_ids = row.unpack_first();
1517
1518 updates.push(BuiltinTableUpdate::row(
1519 &*MZ_FUNCTIONS,
1520 Row::pack_slice(&[
1521 Datum::String(&id.to_string()),
1522 Datum::UInt32(func_impl_details.oid),
1523 Datum::String(&schema_id.to_string()),
1524 Datum::String(name),
1525 arg_type_ids,
1526 Datum::from(
1527 func_impl_details
1528 .variadic_typ
1529 .map(|typ| self.get_system_type(typ).id().to_string())
1530 .as_deref(),
1531 ),
1532 Datum::from(
1533 func_impl_details
1534 .return_typ
1535 .map(|typ| self.get_system_type(typ).id().to_string())
1536 .as_deref(),
1537 ),
1538 func_impl_details.return_is_set.into(),
1539 Datum::String(&owner_id.to_string()),
1540 ]),
1541 diff,
1542 ));
1543
1544 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1545 updates.push(BuiltinTableUpdate::row(
1546 &*MZ_AGGREGATES,
1547 Row::pack_slice(&[
1548 Datum::UInt32(func_impl_details.oid),
1549 Datum::String("n"),
1551 Datum::Int16(0),
1552 ]),
1553 diff,
1554 ));
1555 }
1556 }
1557 updates
1558 }
1559
1560 pub fn pack_op_update(
1561 &self,
1562 operator: &str,
1563 func_impl_details: FuncImplCatalogDetails,
1564 diff: Diff,
1565 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1566 let arg_type_ids = func_impl_details
1567 .arg_typs
1568 .iter()
1569 .map(|typ| self.get_system_type(typ).id().to_string())
1570 .collect::<Vec<_>>();
1571
1572 let mut row = Row::default();
1573 row.packer()
1574 .try_push_array(
1575 &[ArrayDimension {
1576 lower_bound: 1,
1577 length: arg_type_ids.len(),
1578 }],
1579 arg_type_ids.iter().map(|id| Datum::String(id)),
1580 )
1581 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1582 let arg_type_ids = row.unpack_first();
1583
1584 BuiltinTableUpdate::row(
1585 &*MZ_OPERATORS,
1586 Row::pack_slice(&[
1587 Datum::UInt32(func_impl_details.oid),
1588 Datum::String(operator),
1589 arg_type_ids,
1590 Datum::from(
1591 func_impl_details
1592 .return_typ
1593 .map(|typ| self.get_system_type(typ).id().to_string())
1594 .as_deref(),
1595 ),
1596 ]),
1597 diff,
1598 )
1599 }
1600
1601 pub fn pack_audit_log_update(
1602 &self,
1603 event: &VersionedEvent,
1604 diff: Diff,
1605 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1606 let (event_type, object_type, details, user, occurred_at): (
1607 &EventType,
1608 &ObjectType,
1609 &EventDetails,
1610 &Option<String>,
1611 u64,
1612 ) = match event {
1613 VersionedEvent::V1(ev) => (
1614 &ev.event_type,
1615 &ev.object_type,
1616 &ev.details,
1617 &ev.user,
1618 ev.occurred_at,
1619 ),
1620 };
1621 let details = Jsonb::from_serde_json(details.as_json())
1622 .map_err(|e| {
1623 Error::new(ErrorKind::Unstructured(format!(
1624 "could not pack audit log update: {}",
1625 e
1626 )))
1627 })?
1628 .into_row();
1629 let details = details
1630 .iter()
1631 .next()
1632 .expect("details created above with a single jsonb column");
1633 let dt = mz_ore::now::to_datetime(occurred_at);
1634 let id = event.sortable_id();
1635 Ok(BuiltinTableUpdate::row(
1636 &*MZ_AUDIT_EVENTS,
1637 Row::pack_slice(&[
1638 Datum::UInt64(id),
1639 Datum::String(&format!("{}", event_type)),
1640 Datum::String(&format!("{}", object_type)),
1641 details,
1642 match user {
1643 Some(user) => Datum::String(user),
1644 None => Datum::Null,
1645 },
1646 Datum::TimestampTz(dt.try_into().expect("must fit")),
1647 ]),
1648 diff,
1649 ))
1650 }
1651
1652 pub fn pack_storage_usage_update(
1653 &self,
1654 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1655 diff: Diff,
1656 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1657 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1658 let row = Row::pack_slice(&[
1659 Datum::UInt64(event.id),
1660 Datum::from(event.shard_id.as_deref()),
1661 Datum::UInt64(event.size_bytes),
1662 Datum::TimestampTz(
1663 mz_ore::now::to_datetime(event.collection_timestamp)
1664 .try_into()
1665 .expect("must fit"),
1666 ),
1667 ]);
1668 BuiltinTableUpdate::row(id, row, diff)
1669 }
1670
1671 pub fn pack_egress_ip_update(
1672 &self,
1673 ip: &IpNet,
1674 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1675 let id = &MZ_EGRESS_IPS;
1676 let addr = ip.network();
1677 let row = Row::pack_slice(&[
1678 Datum::String(&addr.to_string()),
1679 Datum::Int32(ip.prefix_len().into()),
1680 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1681 ]);
1682 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1683 }
1684
1685 pub fn pack_license_key_update(
1686 &self,
1687 license_key: &ValidatedLicenseKey,
1688 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1689 let id = &MZ_LICENSE_KEYS;
1690 let row = Row::pack_slice(&[
1691 Datum::String(&license_key.id),
1692 Datum::String(&license_key.organization),
1693 Datum::String(&license_key.environment_id),
1694 Datum::TimestampTz(
1695 mz_ore::now::to_datetime(license_key.expiration * 1000)
1696 .try_into()
1697 .expect("must fit"),
1698 ),
1699 Datum::TimestampTz(
1700 mz_ore::now::to_datetime(license_key.not_before * 1000)
1701 .try_into()
1702 .expect("must fit"),
1703 ),
1704 ]);
1705 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1706 }
1707
1708 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1709 let mut updates = Vec::new();
1710 for (size, alloc) in &self.cluster_replica_sizes.0 {
1711 if alloc.disabled {
1712 continue;
1713 }
1714
1715 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1718 let MemoryLimit(ByteSize(memory_bytes)) =
1719 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1720 let DiskLimit(ByteSize(disk_bytes)) =
1721 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1722
1723 let row = Row::pack_slice(&[
1724 size.as_str().into(),
1725 u64::cast_from(alloc.scale).into(),
1726 u64::cast_from(alloc.workers).into(),
1727 cpu_limit.as_nanocpus().into(),
1728 memory_bytes.into(),
1729 disk_bytes.into(),
1730 (alloc.credits_per_hour).into(),
1731 ]);
1732
1733 updates.push(BuiltinTableUpdate::row(
1734 &*MZ_CLUSTER_REPLICA_SIZES,
1735 row,
1736 Diff::ONE,
1737 ));
1738 }
1739
1740 updates
1741 }
1742
1743 pub fn pack_subscribe_update(
1744 &self,
1745 id: GlobalId,
1746 subscribe: &ActiveSubscribe,
1747 diff: Diff,
1748 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1749 let mut row = Row::default();
1750 let mut packer = row.packer();
1751 packer.push(Datum::String(&id.to_string()));
1752 packer.push(Datum::Uuid(subscribe.session_uuid));
1753 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1754
1755 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1756 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1757
1758 let depends_on: Vec<_> = subscribe
1759 .depends_on
1760 .iter()
1761 .map(|id| id.to_string())
1762 .collect();
1763 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1764
1765 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1766 }
1767
1768 pub fn pack_session_update(
1769 &self,
1770 conn: &ConnMeta,
1771 diff: Diff,
1772 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1773 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1774 BuiltinTableUpdate::row(
1775 &*MZ_SESSIONS,
1776 Row::pack_slice(&[
1777 Datum::Uuid(conn.uuid()),
1778 Datum::UInt32(conn.conn_id().unhandled()),
1779 Datum::String(&conn.authenticated_role_id().to_string()),
1780 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1781 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1782 ]),
1783 diff,
1784 )
1785 }
1786
1787 pub fn pack_default_privileges_update(
1788 &self,
1789 default_privilege_object: &DefaultPrivilegeObject,
1790 grantee: &RoleId,
1791 acl_mode: &AclMode,
1792 diff: Diff,
1793 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1794 BuiltinTableUpdate::row(
1795 &*MZ_DEFAULT_PRIVILEGES,
1796 Row::pack_slice(&[
1797 default_privilege_object.role_id.to_string().as_str().into(),
1798 default_privilege_object
1799 .database_id
1800 .map(|database_id| database_id.to_string())
1801 .as_deref()
1802 .into(),
1803 default_privilege_object
1804 .schema_id
1805 .map(|schema_id| schema_id.to_string())
1806 .as_deref()
1807 .into(),
1808 default_privilege_object
1809 .object_type
1810 .to_string()
1811 .to_lowercase()
1812 .as_str()
1813 .into(),
1814 grantee.to_string().as_str().into(),
1815 acl_mode.to_string().as_str().into(),
1816 ]),
1817 diff,
1818 )
1819 }
1820
1821 pub fn pack_system_privileges_update(
1822 &self,
1823 privileges: MzAclItem,
1824 diff: Diff,
1825 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1826 BuiltinTableUpdate::row(
1827 &*MZ_SYSTEM_PRIVILEGES,
1828 Row::pack_slice(&[privileges.into()]),
1829 diff,
1830 )
1831 }
1832
1833 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1834 let mut row = Row::default();
1835 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1836 row.packer()
1837 .try_push_array(
1838 &[ArrayDimension {
1839 lower_bound: 1,
1840 length: flat_privileges.len(),
1841 }],
1842 flat_privileges
1843 .into_iter()
1844 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1845 )
1846 .expect("privileges is 1 dimensional, and its length is used for the array length");
1847 row
1848 }
1849
1850 pub fn pack_comment_update(
1851 &self,
1852 object_id: CommentObjectId,
1853 column_pos: Option<usize>,
1854 comment: &str,
1855 diff: Diff,
1856 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1857 let object_type = mz_sql::catalog::ObjectType::from(object_id);
1859 let audit_type = super::object_type_to_audit_object_type(object_type);
1860 let object_type_str = audit_type.to_string();
1861
1862 let object_id_str = match object_id {
1863 CommentObjectId::Table(global_id)
1864 | CommentObjectId::View(global_id)
1865 | CommentObjectId::MaterializedView(global_id)
1866 | CommentObjectId::Source(global_id)
1867 | CommentObjectId::Sink(global_id)
1868 | CommentObjectId::Index(global_id)
1869 | CommentObjectId::Func(global_id)
1870 | CommentObjectId::Connection(global_id)
1871 | CommentObjectId::Secret(global_id)
1872 | CommentObjectId::Type(global_id) => global_id.to_string(),
1873 CommentObjectId::Role(role_id) => role_id.to_string(),
1874 CommentObjectId::Database(database_id) => database_id.to_string(),
1875 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1876 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1877 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1878 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1879 };
1880 let column_pos_datum = match column_pos {
1881 Some(pos) => {
1882 let pos =
1884 i32::try_from(pos).expect("we constrain this value in the planning layer");
1885 Datum::Int32(pos)
1886 }
1887 None => Datum::Null,
1888 };
1889
1890 BuiltinTableUpdate::row(
1891 &*MZ_COMMENTS,
1892 Row::pack_slice(&[
1893 Datum::String(&object_id_str),
1894 Datum::String(&object_type_str),
1895 column_pos_datum,
1896 Datum::String(comment),
1897 ]),
1898 diff,
1899 )
1900 }
1901
1902 pub fn pack_webhook_source_update(
1903 &self,
1904 item_id: CatalogItemId,
1905 diff: Diff,
1906 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1907 let url = self
1908 .try_get_webhook_url(&item_id)
1909 .expect("webhook source should exist");
1910 let url = url.to_string();
1911 let name = &self.get_entry(&item_id).name().item;
1912 let id_str = item_id.to_string();
1913
1914 BuiltinTableUpdate::row(
1915 &*MZ_WEBHOOKS_SOURCES,
1916 Row::pack_slice(&[
1917 Datum::String(&id_str),
1918 Datum::String(name),
1919 Datum::String(&url),
1920 ]),
1921 diff,
1922 )
1923 }
1924
1925 pub fn pack_source_references_update(
1926 &self,
1927 source_references: &SourceReferences,
1928 diff: Diff,
1929 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1930 let source_id = source_references.source_id.to_string();
1931 let updated_at = &source_references.updated_at;
1932 source_references
1933 .references
1934 .iter()
1935 .map(|reference| {
1936 let mut row = Row::default();
1937 let mut packer = row.packer();
1938 packer.extend([
1939 Datum::String(&source_id),
1940 reference
1941 .namespace
1942 .as_ref()
1943 .map(|s| Datum::String(s))
1944 .unwrap_or(Datum::Null),
1945 Datum::String(&reference.name),
1946 Datum::TimestampTz(
1947 mz_ore::now::to_datetime(*updated_at)
1948 .try_into()
1949 .expect("must fit"),
1950 ),
1951 ]);
1952 if reference.columns.len() > 0 {
1953 packer
1954 .try_push_array(
1955 &[ArrayDimension {
1956 lower_bound: 1,
1957 length: reference.columns.len(),
1958 }],
1959 reference.columns.iter().map(|col| Datum::String(col)),
1960 )
1961 .expect(
1962 "columns is 1 dimensional, and its length is used for the array length",
1963 );
1964 } else {
1965 packer.push(Datum::Null);
1966 }
1967
1968 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
1969 })
1970 .collect()
1971 }
1972}