1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20 MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_DEFAULT_PRIVILEGES,
21 MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS,
22 MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
23 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25 MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26 MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27 MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
28 MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
29 MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35 CatalogEntry, CatalogItem, ClusterVariant, Connection, DataSourceDesc, Func, Index,
36 MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55 CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84 pub id: T,
86 pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93 BuiltinTableUpdate {
94 id,
95 data: TableData::Rows(vec![(row, diff)]),
96 }
97 }
98
99 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100 BuiltinTableUpdate {
101 id,
102 data: TableData::Batches(smallvec![batch]),
103 }
104 }
105}
106
107impl CatalogState {
108 pub fn resolve_builtin_table_updates(
109 &self,
110 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112 builtin_table_update
113 .into_iter()
114 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115 .collect()
116 }
117
118 pub fn resolve_builtin_table_update(
119 &self,
120 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121 ) -> BuiltinTableUpdate<CatalogItemId> {
122 let id = self.resolve_builtin_table(id);
123 BuiltinTableUpdate { id, data }
124 }
125
126 pub fn pack_depends_update(
127 &self,
128 depender: CatalogItemId,
129 dependee: CatalogItemId,
130 diff: Diff,
131 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132 let row = Row::pack_slice(&[
133 Datum::String(&depender.to_string()),
134 Datum::String(&dependee.to_string()),
135 ]);
136 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137 }
138
139 pub(super) fn pack_role_auth_update(
140 &self,
141 id: RoleId,
142 diff: Diff,
143 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144 let role_auth = self.get_role_auth(&id);
145 let role = self.get_role(&id);
146 BuiltinTableUpdate::row(
147 &*MZ_ROLE_AUTH,
148 Row::pack_slice(&[
149 Datum::String(&role_auth.role_id.to_string()),
150 Datum::UInt32(role.oid),
151 match &role_auth.password_hash {
152 Some(hash) => Datum::String(hash),
153 None => Datum::Null,
154 },
155 Datum::TimestampTz(
156 mz_ore::now::to_datetime(role_auth.updated_at)
157 .try_into()
158 .expect("must fit"),
159 ),
160 ]),
161 diff,
162 )
163 }
164
165 pub(super) fn pack_role_update(
166 &self,
167 id: RoleId,
168 diff: Diff,
169 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170 match id {
171 RoleId::Public => vec![],
173 id => {
174 let role = self.get_role(&id);
175 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177 let rolcanlogin = if let Some(login) = role.attributes.login {
178 login
179 } else {
180 builtin_supers.contains(&role.id)
181 };
182
183 let rolsuper = if let Some(superuser) = role.attributes.superuser {
184 Datum::from(superuser)
185 } else if builtin_supers.contains(&role.id) {
186 Datum::from(true)
188 } else {
189 Datum::Null
198 };
199
200 let role_update = BuiltinTableUpdate::row(
201 &*MZ_ROLES,
202 Row::pack_slice(&[
203 Datum::String(&role.id.to_string()),
204 Datum::UInt32(role.oid),
205 Datum::String(&role.name),
206 Datum::from(role.attributes.inherit),
207 Datum::from(rolcanlogin),
208 rolsuper,
209 ]),
210 diff,
211 );
212 let mut updates = vec![role_update];
213
214 let session_vars_reference = SessionVars::new_unchecked(
217 &mz_build_info::DUMMY_BUILD_INFO,
218 SYSTEM_USER.clone(),
219 None,
220 );
221
222 for (name, val) in role.vars() {
223 let result = session_vars_reference
224 .inspect(name)
225 .and_then(|var| var.check(val.borrow()));
226 let Ok(formatted_val) = result else {
227 tracing::error!(?name, ?val, ?result, "found invalid role default var");
230 continue;
231 };
232
233 let role_var_update = BuiltinTableUpdate::row(
234 &*MZ_ROLE_PARAMETERS,
235 Row::pack_slice(&[
236 Datum::String(&role.id.to_string()),
237 Datum::String(name),
238 Datum::String(&formatted_val),
239 ]),
240 diff,
241 );
242 updates.push(role_var_update);
243 }
244
245 updates
246 }
247 }
248 }
249
250 pub(super) fn pack_cluster_update(
251 &self,
252 name: &str,
253 diff: Diff,
254 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255 let id = self.clusters_by_name[name];
256 let cluster = &self.clusters_by_id[&id];
257 let row = self.pack_privilege_array_row(cluster.privileges());
258 let privileges = row.unpack_first();
259 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260 match &cluster.config.variant {
261 ClusterVariant::Managed(config) => (
262 Some(config.size.as_str()),
263 Some(self.cluster_replica_size_has_disk(&config.size)),
264 Some(config.replication_factor),
265 if config.availability_zones.is_empty() {
266 None
267 } else {
268 Some(config.availability_zones.clone())
269 },
270 Some(config.logging.log_logging),
271 config.logging.interval.map(|d| {
272 Interval::from_duration(&d)
273 .expect("planning ensured this convertible back to interval")
274 }),
275 ),
276 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277 };
278
279 let mut row = Row::default();
280 let mut packer = row.packer();
281 packer.extend([
282 Datum::String(&id.to_string()),
283 Datum::String(name),
284 Datum::String(&cluster.owner_id.to_string()),
285 privileges,
286 cluster.is_managed().into(),
287 size.into(),
288 replication_factor.into(),
289 disk.into(),
290 ]);
291 if let Some(azs) = azs {
292 packer.push_list(azs.iter().map(|az| Datum::String(az)));
293 } else {
294 packer.push(Datum::Null);
295 }
296 packer.push(Datum::from(introspection_debugging));
297 packer.push(Datum::from(introspection_interval));
298
299 let mut updates = Vec::new();
300
301 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304 let row = match managed_config.schedule {
305 ClusterSchedule::Manual => Row::pack_slice(&[
306 Datum::String(&id.to_string()),
307 Datum::String("manual"),
308 Datum::Null,
309 ]),
310 ClusterSchedule::Refresh {
311 hydration_time_estimate,
312 } => Row::pack_slice(&[
313 Datum::String(&id.to_string()),
314 Datum::String("on-refresh"),
315 Datum::Interval(
316 Interval::from_duration(&hydration_time_estimate)
317 .expect("planning ensured that this is convertible back to Interval"),
318 ),
319 ]),
320 };
321 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322 }
323
324 updates
325 }
326
327 pub(super) fn pack_cluster_replica_update(
328 &self,
329 cluster_id: ClusterId,
330 name: &str,
331 diff: Diff,
332 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333 let cluster = &self.clusters_by_id[&cluster_id];
334 let id = cluster.replica_id(name).expect("Must exist");
335 let replica = cluster.replica(id).expect("Must exist");
336
337 let (size, disk, az) = match &replica.config.location {
338 ReplicaLocation::Managed(ManagedReplicaLocation {
341 size,
342 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343 allocation: _,
344 billed_as: _,
345 internal: _,
346 pending: _,
347 }) => (
348 Some(&**size),
349 Some(self.cluster_replica_size_has_disk(size)),
350 Some(az.as_str()),
351 ),
352 ReplicaLocation::Managed(ManagedReplicaLocation {
353 size,
354 availability_zones: _,
355 allocation: _,
356 billed_as: _,
357 internal: _,
358 pending: _,
359 }) => (
360 Some(&**size),
361 Some(self.cluster_replica_size_has_disk(size)),
362 None,
363 ),
364 ReplicaLocation::Unmanaged(_) => (None, None, None),
365 };
366
367 let cluster_replica_update = BuiltinTableUpdate::row(
368 &*MZ_CLUSTER_REPLICAS,
369 Row::pack_slice(&[
370 Datum::String(&id.to_string()),
371 Datum::String(name),
372 Datum::String(&cluster_id.to_string()),
373 Datum::from(size),
374 Datum::from(az),
375 Datum::String(&replica.owner_id.to_string()),
376 Datum::from(disk),
377 ]),
378 diff,
379 );
380
381 vec![cluster_replica_update]
382 }
383
384 pub(super) fn pack_item_update(
385 &self,
386 id: CatalogItemId,
387 diff: Diff,
388 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389 let entry = self.get_entry(&id);
390 let oid = entry.oid();
391 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392 let schema_id = &self
393 .get_schema(
394 &entry.name().qualifiers.database_spec,
395 &entry.name().qualifiers.schema_spec,
396 conn_id,
397 )
398 .id;
399 let name = &entry.name().item;
400 let owner_id = entry.owner_id();
401 let privileges_row = self.pack_privilege_array_row(entry.privileges());
402 let privileges = privileges_row.unpack_first();
403 let mut updates = match entry.item() {
404 CatalogItem::Log(_) => self.pack_source_update(
405 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
406 privileges, diff, None,
407 ),
408 CatalogItem::Index(index) => {
409 self.pack_index_update(id, oid, name, owner_id, index, diff)
410 }
411 CatalogItem::Table(table) => {
412 let mut updates = self
413 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
414
415 if let TableDataSource::DataSource {
416 desc: data_source,
417 timeline: _,
418 } = &table.data_source
419 {
420 updates.extend(match data_source {
421 DataSourceDesc::IngestionExport {
422 ingestion_id,
423 external_reference: UnresolvedItemName(external_reference),
424 details: _,
425 data_config: _,
426 } => {
427 let ingestion_entry = self
428 .get_entry(ingestion_id)
429 .source_desc()
430 .expect("primary source exists")
431 .expect("primary source is a source");
432
433 match ingestion_entry.connection.name() {
434 "postgres" => {
435 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
436 let schema_name = external_reference[1].as_str();
442 let table_name = external_reference[2].as_str();
443
444 self.pack_postgres_source_tables_update(
445 id,
446 schema_name,
447 table_name,
448 diff,
449 )
450 }
451 "mysql" => {
452 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
453 let schema_name = external_reference[0].as_str();
454 let table_name = external_reference[1].as_str();
455
456 self.pack_mysql_source_tables_update(
457 id,
458 schema_name,
459 table_name,
460 diff,
461 )
462 }
463 "sql-server" => {
464 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
465 let schema_name = external_reference[1].as_str();
470 let table_name = external_reference[2].as_str();
471
472 self.pack_sql_server_source_table_update(
473 id,
474 schema_name,
475 table_name,
476 diff,
477 )
478 }
479 "load-generator" => vec![],
482 "kafka" => {
483 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
484 let topic = external_reference[0].as_str();
485 let envelope = data_source.envelope();
486 let (key_format, value_format) = data_source.formats();
487
488 self.pack_kafka_source_tables_update(
489 id,
490 topic,
491 envelope,
492 key_format,
493 value_format,
494 diff,
495 )
496 }
497 s => unreachable!("{s} sources do not have tables"),
498 }
499 }
500 DataSourceDesc::Ingestion { .. }
501 | DataSourceDesc::OldSyntaxIngestion { .. }
502 | DataSourceDesc::Introspection(_)
503 | DataSourceDesc::Progress
504 | DataSourceDesc::Webhook { .. }
505 | DataSourceDesc::Catalog => vec![],
506 });
507 }
508
509 updates
510 }
511 CatalogItem::Source(source) => {
512 let source_type = source.source_type();
513 let connection_id = source.connection_id();
514 let envelope = source.data_source.envelope();
515 let cluster_entry = match source.data_source {
516 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
519 self.get_entry(&ingestion_id)
520 }
521 DataSourceDesc::Ingestion { .. }
522 | DataSourceDesc::OldSyntaxIngestion { .. }
523 | DataSourceDesc::Introspection(_)
524 | DataSourceDesc::Progress
525 | DataSourceDesc::Webhook { .. }
526 | DataSourceDesc::Catalog => entry,
527 };
528
529 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
530
531 let (key_format, value_format) = source.data_source.formats();
532
533 let mut updates = self.pack_source_update(
534 id,
535 oid,
536 schema_id,
537 name,
538 source_type,
539 connection_id,
540 envelope,
541 key_format,
542 value_format,
543 cluster_id.as_deref(),
544 owner_id,
545 privileges,
546 diff,
547 source.create_sql.as_ref(),
548 );
549
550 updates.extend(match &source.data_source {
551 DataSourceDesc::Ingestion { desc, .. }
552 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
553 GenericSourceConnection::Postgres(postgres) => {
554 self.pack_postgres_source_update(id, postgres, diff)
555 }
556 GenericSourceConnection::Kafka(kafka) => {
557 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
558 }
559 _ => vec![],
560 },
561 DataSourceDesc::IngestionExport {
562 ingestion_id,
563 external_reference: UnresolvedItemName(external_reference),
564 details: _,
565 data_config: _,
566 } => {
567 let ingestion_entry = self
568 .get_entry(ingestion_id)
569 .source_desc()
570 .expect("primary source exists")
571 .expect("primary source is a source");
572
573 match ingestion_entry.connection.name() {
574 "postgres" => {
575 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
576 let schema_name = external_reference[1].as_str();
582 let table_name = external_reference[2].as_str();
583
584 self.pack_postgres_source_tables_update(
585 id,
586 schema_name,
587 table_name,
588 diff,
589 )
590 }
591 "mysql" => {
592 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
593 let schema_name = external_reference[0].as_str();
594 let table_name = external_reference[1].as_str();
595
596 self.pack_mysql_source_tables_update(
597 id,
598 schema_name,
599 table_name,
600 diff,
601 )
602 }
603 "sql-server" => {
604 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
605 let schema_name = external_reference[1].as_str();
610 let table_name = external_reference[2].as_str();
611
612 self.pack_sql_server_source_table_update(
613 id,
614 schema_name,
615 table_name,
616 diff,
617 )
618 }
619 "load-generator" => vec![],
622 s => unreachable!("{s} sources do not have subsources"),
623 }
624 }
625 DataSourceDesc::Webhook { .. } => {
626 vec![self.pack_webhook_source_update(id, diff)]
627 }
628 DataSourceDesc::Introspection(_)
629 | DataSourceDesc::Progress
630 | DataSourceDesc::Catalog => vec![],
631 });
632
633 updates
634 }
635 CatalogItem::View(view) => {
636 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
637 }
638 CatalogItem::MaterializedView(mview) => {
639 self.pack_materialized_view_update(id, mview, diff)
640 }
641 CatalogItem::Sink(sink) => {
642 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
643 }
644 CatalogItem::Type(ty) => {
645 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
646 }
647 CatalogItem::Func(func) => {
648 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
649 }
650 CatalogItem::Secret(_) => vec![],
651 CatalogItem::Connection(connection) => {
652 self.pack_connection_update(id, connection, diff)
653 }
654 };
655
656 if !entry.item().is_temporary() {
657 for dependee in entry.item().references().items() {
660 updates.push(self.pack_depends_update(id, *dependee, diff))
661 }
662 }
663
664 if let Some(desc) = entry.relation_desc_latest() {
666 let defaults = match entry.item() {
667 CatalogItem::Table(Table {
668 data_source: TableDataSource::TableWrites { defaults },
669 ..
670 }) => Some(defaults),
671 _ => None,
672 };
673 for (i, (column_name, column_type)) in desc.iter().enumerate() {
674 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
675 let default: Datum = default
676 .as_ref()
677 .map(|d| Datum::String(d))
678 .unwrap_or(Datum::Null);
679 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
680 let (type_name, type_oid) = match &column_type.scalar_type {
681 SqlScalarType::List {
682 custom_id: Some(custom_id),
683 ..
684 }
685 | SqlScalarType::Map {
686 custom_id: Some(custom_id),
687 ..
688 }
689 | SqlScalarType::Record {
690 custom_id: Some(custom_id),
691 ..
692 } => {
693 let entry = self.get_entry(custom_id);
694 let name = &*entry.name().item;
709 let oid = entry.oid();
710 (name, oid)
711 }
712 _ => (pgtype.name(), pgtype.oid()),
713 };
714 updates.push(BuiltinTableUpdate::row(
715 &*MZ_COLUMNS,
716 Row::pack_slice(&[
717 Datum::String(&id.to_string()),
718 Datum::String(column_name),
719 Datum::UInt64(u64::cast_from(i + 1)),
720 Datum::from(column_type.nullable),
721 Datum::String(type_name),
722 default,
723 Datum::UInt32(type_oid),
724 Datum::Int32(pgtype.typmod()),
725 ]),
726 diff,
727 ));
728 }
729 }
730
731 if let Some(cw) = entry.item().initial_logical_compaction_window() {
733 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
734 }
735
736 updates.extend(Self::pack_item_global_id_update(entry, diff));
737
738 updates
739 }
740
741 fn pack_item_global_id_update(
742 entry: &CatalogEntry,
743 diff: Diff,
744 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
745 let id = entry.id().to_string();
746 let global_ids = entry.global_ids();
747 global_ids.map(move |global_id| {
748 BuiltinTableUpdate::row(
749 &*MZ_OBJECT_GLOBAL_IDS,
750 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
751 diff,
752 )
753 })
754 }
755
756 fn pack_history_retention_strategy_update(
757 &self,
758 id: CatalogItemId,
759 cw: CompactionWindow,
760 diff: Diff,
761 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
762 let cw: u64 = cw.comparable_timestamp().into();
763 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
764 .expect("must serialize");
765 BuiltinTableUpdate::row(
766 &*MZ_HISTORY_RETENTION_STRATEGIES,
767 Row::pack_slice(&[
768 Datum::String(&id.to_string()),
769 Datum::String("FOR"),
771 cw.into_row().into_element(),
772 ]),
773 diff,
774 )
775 }
776
777 fn pack_table_update(
778 &self,
779 id: CatalogItemId,
780 oid: u32,
781 schema_id: &SchemaSpecifier,
782 name: &str,
783 owner_id: &RoleId,
784 privileges: Datum,
785 diff: Diff,
786 table: &Table,
787 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
788 let redacted = table.create_sql.as_ref().map(|create_sql| {
789 mz_sql::parse::parse(create_sql)
790 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
791 .into_element()
792 .ast
793 .to_ast_string_redacted()
794 });
795 let source_id = if let TableDataSource::DataSource {
796 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
797 ..
798 } = &table.data_source
799 {
800 Some(ingestion_id.to_string())
801 } else {
802 None
803 };
804
805 vec![BuiltinTableUpdate::row(
806 &*MZ_TABLES,
807 Row::pack_slice(&[
808 Datum::String(&id.to_string()),
809 Datum::UInt32(oid),
810 Datum::String(&schema_id.to_string()),
811 Datum::String(name),
812 Datum::String(&owner_id.to_string()),
813 privileges,
814 if let Some(create_sql) = &table.create_sql {
815 Datum::String(create_sql)
816 } else {
817 Datum::Null
818 },
819 if let Some(redacted) = &redacted {
820 Datum::String(redacted)
821 } else {
822 Datum::Null
823 },
824 if let Some(source_id) = source_id.as_ref() {
825 Datum::String(source_id)
826 } else {
827 Datum::Null
828 },
829 ]),
830 diff,
831 )]
832 }
833
834 fn pack_source_update(
835 &self,
836 id: CatalogItemId,
837 oid: u32,
838 schema_id: &SchemaSpecifier,
839 name: &str,
840 source_desc_name: &str,
841 connection_id: Option<CatalogItemId>,
842 envelope: Option<&str>,
843 key_format: Option<&str>,
844 value_format: Option<&str>,
845 cluster_id: Option<&str>,
846 owner_id: &RoleId,
847 privileges: Datum,
848 diff: Diff,
849 create_sql: Option<&String>,
850 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
851 let redacted = create_sql.map(|create_sql| {
852 let create_stmt = mz_sql::parse::parse(create_sql)
853 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
854 .into_element()
855 .ast;
856 create_stmt.to_ast_string_redacted()
857 });
858 vec![BuiltinTableUpdate::row(
859 &*MZ_SOURCES,
860 Row::pack_slice(&[
861 Datum::String(&id.to_string()),
862 Datum::UInt32(oid),
863 Datum::String(&schema_id.to_string()),
864 Datum::String(name),
865 Datum::String(source_desc_name),
866 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
867 Datum::Null,
870 Datum::from(envelope),
871 Datum::from(key_format),
872 Datum::from(value_format),
873 Datum::from(cluster_id),
874 Datum::String(&owner_id.to_string()),
875 privileges,
876 if let Some(create_sql) = create_sql {
877 Datum::String(create_sql)
878 } else {
879 Datum::Null
880 },
881 if let Some(redacted) = &redacted {
882 Datum::String(redacted)
883 } else {
884 Datum::Null
885 },
886 ]),
887 diff,
888 )]
889 }
890
891 fn pack_postgres_source_update(
892 &self,
893 id: CatalogItemId,
894 postgres: &PostgresSourceConnection<ReferencedConnection>,
895 diff: Diff,
896 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
897 vec![BuiltinTableUpdate::row(
898 &*MZ_POSTGRES_SOURCES,
899 Row::pack_slice(&[
900 Datum::String(&id.to_string()),
901 Datum::String(&postgres.publication_details.slot),
902 Datum::from(postgres.publication_details.timeline_id),
903 ]),
904 diff,
905 )]
906 }
907
908 fn pack_kafka_source_update(
909 &self,
910 item_id: CatalogItemId,
911 collection_id: GlobalId,
912 kafka: &KafkaSourceConnection<ReferencedConnection>,
913 diff: Diff,
914 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
915 vec![BuiltinTableUpdate::row(
916 &*MZ_KAFKA_SOURCES,
917 Row::pack_slice(&[
918 Datum::String(&item_id.to_string()),
919 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
920 Datum::String(&kafka.topic),
921 ]),
922 diff,
923 )]
924 }
925
926 fn pack_postgres_source_tables_update(
927 &self,
928 id: CatalogItemId,
929 schema_name: &str,
930 table_name: &str,
931 diff: Diff,
932 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
933 vec![BuiltinTableUpdate::row(
934 &*MZ_POSTGRES_SOURCE_TABLES,
935 Row::pack_slice(&[
936 Datum::String(&id.to_string()),
937 Datum::String(schema_name),
938 Datum::String(table_name),
939 ]),
940 diff,
941 )]
942 }
943
944 fn pack_mysql_source_tables_update(
945 &self,
946 id: CatalogItemId,
947 schema_name: &str,
948 table_name: &str,
949 diff: Diff,
950 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
951 vec![BuiltinTableUpdate::row(
952 &*MZ_MYSQL_SOURCE_TABLES,
953 Row::pack_slice(&[
954 Datum::String(&id.to_string()),
955 Datum::String(schema_name),
956 Datum::String(table_name),
957 ]),
958 diff,
959 )]
960 }
961
962 fn pack_sql_server_source_table_update(
963 &self,
964 id: CatalogItemId,
965 schema_name: &str,
966 table_name: &str,
967 diff: Diff,
968 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
969 vec![BuiltinTableUpdate::row(
970 &*MZ_SQL_SERVER_SOURCE_TABLES,
971 Row::pack_slice(&[
972 Datum::String(&id.to_string()),
973 Datum::String(schema_name),
974 Datum::String(table_name),
975 ]),
976 diff,
977 )]
978 }
979
980 fn pack_kafka_source_tables_update(
981 &self,
982 id: CatalogItemId,
983 topic: &str,
984 envelope: Option<&str>,
985 key_format: Option<&str>,
986 value_format: Option<&str>,
987 diff: Diff,
988 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
989 vec![BuiltinTableUpdate::row(
990 &*MZ_KAFKA_SOURCE_TABLES,
991 Row::pack_slice(&[
992 Datum::String(&id.to_string()),
993 Datum::String(topic),
994 Datum::from(envelope),
995 Datum::from(key_format),
996 Datum::from(value_format),
997 ]),
998 diff,
999 )]
1000 }
1001
1002 fn pack_connection_update(
1003 &self,
1004 id: CatalogItemId,
1005 connection: &Connection,
1006 diff: Diff,
1007 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1008 let mut updates = vec![];
1009 match connection.details {
1010 ConnectionDetails::Kafka(ref kafka) => {
1011 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1012 }
1013 ConnectionDetails::Aws(ref aws_config) => {
1014 match self.pack_aws_connection_update(id, aws_config, diff) {
1015 Ok(update) => {
1016 updates.push(update);
1017 }
1018 Err(e) => {
1019 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1020 }
1021 }
1022 }
1023 ConnectionDetails::AwsPrivatelink(_) => {
1024 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1025 updates.push(self.pack_aws_privatelink_connection_update(
1026 id,
1027 aws_principal_context,
1028 diff,
1029 ));
1030 } else {
1031 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1032 }
1033 }
1034 ConnectionDetails::Ssh {
1035 ref key_1,
1036 ref key_2,
1037 ..
1038 } => {
1039 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1040 }
1041 ConnectionDetails::Csr(_)
1042 | ConnectionDetails::Postgres(_)
1043 | ConnectionDetails::MySql(_)
1044 | ConnectionDetails::SqlServer(_)
1045 | ConnectionDetails::IcebergCatalog(_) => (),
1046 };
1047 updates
1048 }
1049
1050 pub(crate) fn pack_ssh_tunnel_connection_update(
1051 &self,
1052 id: CatalogItemId,
1053 key_1: &SshKey,
1054 key_2: &SshKey,
1055 diff: Diff,
1056 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1057 BuiltinTableUpdate::row(
1058 &*MZ_SSH_TUNNEL_CONNECTIONS,
1059 Row::pack_slice(&[
1060 Datum::String(&id.to_string()),
1061 Datum::String(key_1.public_key().as_str()),
1062 Datum::String(key_2.public_key().as_str()),
1063 ]),
1064 diff,
1065 )
1066 }
1067
1068 fn pack_kafka_connection_update(
1069 &self,
1070 id: CatalogItemId,
1071 kafka: &KafkaConnection<ReferencedConnection>,
1072 diff: Diff,
1073 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1074 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1075 let mut row = Row::default();
1076 row.packer()
1077 .try_push_array(
1078 &[ArrayDimension {
1079 lower_bound: 1,
1080 length: kafka.brokers.len(),
1081 }],
1082 kafka
1083 .brokers
1084 .iter()
1085 .map(|broker| Datum::String(&broker.address)),
1086 )
1087 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1088 let brokers = row.unpack_first();
1089 vec![BuiltinTableUpdate::row(
1090 &*MZ_KAFKA_CONNECTIONS,
1091 Row::pack_slice(&[
1092 Datum::String(&id.to_string()),
1093 brokers,
1094 Datum::String(&progress_topic),
1095 ]),
1096 diff,
1097 )]
1098 }
1099
1100 pub fn pack_aws_privatelink_connection_update(
1101 &self,
1102 connection_id: CatalogItemId,
1103 aws_principal_context: &AwsPrincipalContext,
1104 diff: Diff,
1105 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1106 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1107 let row = Row::pack_slice(&[
1108 Datum::String(&connection_id.to_string()),
1109 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1110 ]);
1111 BuiltinTableUpdate::row(id, row, diff)
1112 }
1113
1114 pub fn pack_aws_connection_update(
1115 &self,
1116 connection_id: CatalogItemId,
1117 aws_config: &AwsConnection,
1118 diff: Diff,
1119 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1120 let id = &MZ_AWS_CONNECTIONS;
1121
1122 let mut access_key_id = None;
1123 let mut access_key_id_secret_id = None;
1124 let mut secret_access_key_secret_id = None;
1125 let mut session_token = None;
1126 let mut session_token_secret_id = None;
1127 let mut assume_role_arn = None;
1128 let mut assume_role_session_name = None;
1129 let mut principal = None;
1130 let mut external_id = None;
1131 let mut example_trust_policy = None;
1132 match &aws_config.auth {
1133 AwsAuth::Credentials(credentials) => {
1134 match &credentials.access_key_id {
1135 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1136 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1137 }
1138 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1139 match credentials.session_token.as_ref() {
1140 None => (),
1141 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1142 Some(StringOrSecret::Secret(s)) => {
1143 session_token_secret_id = Some(s.to_string())
1144 }
1145 }
1146 }
1147 AwsAuth::AssumeRole(assume_role) => {
1148 assume_role_arn = Some(assume_role.arn.as_str());
1149 assume_role_session_name = assume_role.session_name.as_deref();
1150 principal = self
1151 .config
1152 .connection_context
1153 .aws_connection_role_arn
1154 .as_deref();
1155 external_id =
1156 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1157 example_trust_policy = {
1158 let policy = assume_role
1159 .example_trust_policy(&self.config.connection_context, connection_id)?;
1160 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1161 Some(policy.into_row())
1162 };
1163 }
1164 }
1165
1166 let row = Row::pack_slice(&[
1167 Datum::String(&connection_id.to_string()),
1168 Datum::from(aws_config.endpoint.as_deref()),
1169 Datum::from(aws_config.region.as_deref()),
1170 Datum::from(access_key_id),
1171 Datum::from(access_key_id_secret_id.as_deref()),
1172 Datum::from(secret_access_key_secret_id.as_deref()),
1173 Datum::from(session_token),
1174 Datum::from(session_token_secret_id.as_deref()),
1175 Datum::from(assume_role_arn),
1176 Datum::from(assume_role_session_name),
1177 Datum::from(principal),
1178 Datum::from(external_id.as_deref()),
1179 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1180 ]);
1181
1182 Ok(BuiltinTableUpdate::row(id, row, diff))
1183 }
1184
1185 fn pack_view_update(
1186 &self,
1187 id: CatalogItemId,
1188 oid: u32,
1189 schema_id: &SchemaSpecifier,
1190 name: &str,
1191 owner_id: &RoleId,
1192 privileges: Datum,
1193 view: &View,
1194 diff: Diff,
1195 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1196 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1197 .unwrap_or_else(|e| {
1198 panic!(
1199 "create_sql cannot be invalid: `{}` --- error: `{}`",
1200 view.create_sql, e
1201 )
1202 })
1203 .into_element()
1204 .ast;
1205 let query = match &create_stmt {
1206 Statement::CreateView(stmt) => &stmt.definition.query,
1207 _ => unreachable!(),
1208 };
1209
1210 let mut query_string = query.to_ast_string_stable();
1211 query_string.push(';');
1214
1215 vec![BuiltinTableUpdate::row(
1216 &*MZ_VIEWS,
1217 Row::pack_slice(&[
1218 Datum::String(&id.to_string()),
1219 Datum::UInt32(oid),
1220 Datum::String(&schema_id.to_string()),
1221 Datum::String(name),
1222 Datum::String(&query_string),
1223 Datum::String(&owner_id.to_string()),
1224 privileges,
1225 Datum::String(&view.create_sql),
1226 Datum::String(&create_stmt.to_ast_string_redacted()),
1227 ]),
1228 diff,
1229 )]
1230 }
1231
1232 fn pack_materialized_view_update(
1233 &self,
1234 id: CatalogItemId,
1235 mview: &MaterializedView,
1236 diff: Diff,
1237 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1238 let mut updates = Vec::new();
1239
1240 if let Some(refresh_schedule) = &mview.refresh_schedule {
1241 assert!(!refresh_schedule.is_empty());
1244 for RefreshEvery {
1245 interval,
1246 aligned_to,
1247 } in refresh_schedule.everies.iter()
1248 {
1249 let aligned_to_dt = mz_ore::now::to_datetime(
1250 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1251 );
1252 updates.push(BuiltinTableUpdate::row(
1253 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1254 Row::pack_slice(&[
1255 Datum::String(&id.to_string()),
1256 Datum::String("every"),
1257 Datum::Interval(
1258 Interval::from_duration(interval).expect(
1259 "planning ensured that this is convertible back to Interval",
1260 ),
1261 ),
1262 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1263 Datum::Null,
1264 ]),
1265 diff,
1266 ));
1267 }
1268 for at in refresh_schedule.ats.iter() {
1269 let at_dt = mz_ore::now::to_datetime(
1270 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1271 );
1272 updates.push(BuiltinTableUpdate::row(
1273 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1274 Row::pack_slice(&[
1275 Datum::String(&id.to_string()),
1276 Datum::String("at"),
1277 Datum::Null,
1278 Datum::Null,
1279 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1280 ]),
1281 diff,
1282 ));
1283 }
1284 } else {
1285 updates.push(BuiltinTableUpdate::row(
1286 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1287 Row::pack_slice(&[
1288 Datum::String(&id.to_string()),
1289 Datum::String("on-commit"),
1290 Datum::Null,
1291 Datum::Null,
1292 Datum::Null,
1293 ]),
1294 diff,
1295 ));
1296 }
1297
1298 if let Some(target_id) = mview.replacement_target {
1299 updates.push(BuiltinTableUpdate::row(
1300 &*MZ_REPLACEMENTS,
1301 Row::pack_slice(&[
1302 Datum::String(&id.to_string()),
1303 Datum::String(&target_id.to_string()),
1304 ]),
1305 diff,
1306 ));
1307 }
1308
1309 updates
1310 }
1311
1312 fn pack_sink_update(
1313 &self,
1314 id: CatalogItemId,
1315 oid: u32,
1316 schema_id: &SchemaSpecifier,
1317 name: &str,
1318 owner_id: &RoleId,
1319 sink: &Sink,
1320 diff: Diff,
1321 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1322 let mut updates = vec![];
1323 match &sink.connection {
1324 StorageSinkConnection::Kafka(KafkaSinkConnection {
1325 topic: topic_name, ..
1326 }) => {
1327 updates.push(BuiltinTableUpdate::row(
1328 &*MZ_KAFKA_SINKS,
1329 Row::pack_slice(&[
1330 Datum::String(&id.to_string()),
1331 Datum::String(topic_name.as_str()),
1332 ]),
1333 diff,
1334 ));
1335 }
1336 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1337 namespace, table, ..
1338 }) => {
1339 updates.push(BuiltinTableUpdate::row(
1340 &*MZ_ICEBERG_SINKS,
1341 Row::pack_slice(&[
1342 Datum::String(&id.to_string()),
1343 Datum::String(namespace.as_str()),
1344 Datum::String(table.as_str()),
1345 ]),
1346 diff,
1347 ));
1348 }
1349 };
1350
1351 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1352 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1353 .into_element()
1354 .ast;
1355
1356 let envelope = sink.envelope();
1357
1358 let combined_format = sink.combined_format();
1360 let (key_format, value_format) = match sink.formats() {
1361 Some((key_format, value_format)) => (key_format, Some(value_format)),
1362 None => (None, None),
1363 };
1364
1365 updates.push(BuiltinTableUpdate::row(
1366 &*MZ_SINKS,
1367 Row::pack_slice(&[
1368 Datum::String(&id.to_string()),
1369 Datum::UInt32(oid),
1370 Datum::String(&schema_id.to_string()),
1371 Datum::String(name),
1372 Datum::String(sink.connection.name()),
1373 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1374 Datum::Null,
1376 Datum::from(envelope),
1377 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1380 Datum::from(key_format),
1381 Datum::from(value_format),
1382 Datum::String(&sink.cluster_id.to_string()),
1383 Datum::String(&owner_id.to_string()),
1384 Datum::String(&sink.create_sql),
1385 Datum::String(&create_stmt.to_ast_string_redacted()),
1386 ]),
1387 diff,
1388 ));
1389
1390 updates
1391 }
1392
1393 fn pack_index_update(
1394 &self,
1395 id: CatalogItemId,
1396 oid: u32,
1397 name: &str,
1398 owner_id: &RoleId,
1399 index: &Index,
1400 diff: Diff,
1401 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1402 let mut updates = vec![];
1403
1404 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1405 .unwrap_or_else(|e| {
1406 panic!(
1407 "create_sql cannot be invalid: `{}` --- error: `{}`",
1408 index.create_sql, e
1409 )
1410 })
1411 .into_element()
1412 .ast;
1413
1414 let key_sqls = match &create_stmt {
1415 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1416 .as_ref()
1417 .expect("key_parts is filled in during planning"),
1418 _ => unreachable!(),
1419 };
1420 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1421
1422 updates.push(BuiltinTableUpdate::row(
1423 &*MZ_INDEXES,
1424 Row::pack_slice(&[
1425 Datum::String(&id.to_string()),
1426 Datum::UInt32(oid),
1427 Datum::String(name),
1428 Datum::String(&on_item_id.to_string()),
1429 Datum::String(&index.cluster_id.to_string()),
1430 Datum::String(&owner_id.to_string()),
1431 Datum::String(&index.create_sql),
1432 Datum::String(&create_stmt.to_ast_string_redacted()),
1433 ]),
1434 diff,
1435 ));
1436
1437 let on_entry = self.get_entry_by_global_id(&index.on);
1438 let on_desc = on_entry
1439 .relation_desc()
1440 .expect("can only create indexes on items with a valid description");
1441 let repr_col_types: Vec<ReprColumnType> = on_desc
1442 .typ()
1443 .column_types
1444 .iter()
1445 .map(ReprColumnType::from)
1446 .collect();
1447 for (i, key) in index.keys.iter().enumerate() {
1448 let nullable = key.typ(&repr_col_types).nullable;
1449 let seq_in_index = u64::cast_from(i + 1);
1450 let key_sql = key_sqls
1451 .get(i)
1452 .expect("missing sql information for index key")
1453 .to_ast_string_simple();
1454 let (field_number, expression) = match key {
1455 MirScalarExpr::Column(col, _) => {
1456 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1457 }
1458 _ => (Datum::Null, Datum::String(&key_sql)),
1459 };
1460 updates.push(BuiltinTableUpdate::row(
1461 &*MZ_INDEX_COLUMNS,
1462 Row::pack_slice(&[
1463 Datum::String(&id.to_string()),
1464 Datum::UInt64(seq_in_index),
1465 field_number,
1466 expression,
1467 Datum::from(nullable),
1468 ]),
1469 diff,
1470 ));
1471 }
1472
1473 updates
1474 }
1475
1476 fn pack_type_update(
1477 &self,
1478 id: CatalogItemId,
1479 oid: u32,
1480 schema_id: &SchemaSpecifier,
1481 name: &str,
1482 owner_id: &RoleId,
1483 privileges: Datum,
1484 typ: &Type,
1485 diff: Diff,
1486 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1487 let mut out = vec![];
1488
1489 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1490 mz_sql::parse::parse(create_sql)
1491 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1492 .into_element()
1493 .ast
1494 .to_ast_string_redacted()
1495 });
1496
1497 out.push(BuiltinTableUpdate::row(
1498 &*MZ_TYPES,
1499 Row::pack_slice(&[
1500 Datum::String(&id.to_string()),
1501 Datum::UInt32(oid),
1502 Datum::String(&schema_id.to_string()),
1503 Datum::String(name),
1504 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1505 Datum::String(&owner_id.to_string()),
1506 privileges,
1507 if let Some(create_sql) = &typ.create_sql {
1508 Datum::String(create_sql)
1509 } else {
1510 Datum::Null
1511 },
1512 if let Some(redacted) = &redacted {
1513 Datum::String(redacted)
1514 } else {
1515 Datum::Null
1516 },
1517 ]),
1518 diff,
1519 ));
1520
1521 let mut row = Row::default();
1522 let mut packer = row.packer();
1523
1524 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1525 if mods.is_empty() {
1526 packer.push(Datum::Null);
1527 } else {
1528 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1529 }
1530 }
1531
1532 let index_id = match &typ.details.typ {
1533 CatalogType::Array {
1534 element_reference: element_id,
1535 } => {
1536 packer.push(Datum::String(&id.to_string()));
1537 packer.push(Datum::String(&element_id.to_string()));
1538 &MZ_ARRAY_TYPES
1539 }
1540 CatalogType::List {
1541 element_reference: element_id,
1542 element_modifiers,
1543 } => {
1544 packer.push(Datum::String(&id.to_string()));
1545 packer.push(Datum::String(&element_id.to_string()));
1546 append_modifier(&mut packer, element_modifiers);
1547 &MZ_LIST_TYPES
1548 }
1549 CatalogType::Map {
1550 key_reference: key_id,
1551 value_reference: value_id,
1552 key_modifiers,
1553 value_modifiers,
1554 } => {
1555 packer.push(Datum::String(&id.to_string()));
1556 packer.push(Datum::String(&key_id.to_string()));
1557 packer.push(Datum::String(&value_id.to_string()));
1558 append_modifier(&mut packer, key_modifiers);
1559 append_modifier(&mut packer, value_modifiers);
1560 &MZ_MAP_TYPES
1561 }
1562 CatalogType::Pseudo => {
1563 packer.push(Datum::String(&id.to_string()));
1564 &MZ_PSEUDO_TYPES
1565 }
1566 _ => {
1567 packer.push(Datum::String(&id.to_string()));
1568 &MZ_BASE_TYPES
1569 }
1570 };
1571 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1572
1573 if let Some(pg_metadata) = &typ.details.pg_metadata {
1574 out.push(BuiltinTableUpdate::row(
1575 &*MZ_TYPE_PG_METADATA,
1576 Row::pack_slice(&[
1577 Datum::String(&id.to_string()),
1578 Datum::UInt32(pg_metadata.typinput_oid),
1579 Datum::UInt32(pg_metadata.typreceive_oid),
1580 ]),
1581 diff,
1582 ));
1583 }
1584
1585 out
1586 }
1587
1588 fn pack_func_update(
1589 &self,
1590 id: CatalogItemId,
1591 schema_id: &SchemaSpecifier,
1592 name: &str,
1593 owner_id: &RoleId,
1594 func: &Func,
1595 diff: Diff,
1596 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1597 let mut updates = vec![];
1598 for func_impl_details in func.inner.func_impls() {
1599 let arg_type_ids = func_impl_details
1600 .arg_typs
1601 .iter()
1602 .map(|typ| self.get_system_type(typ).id().to_string())
1603 .collect::<Vec<_>>();
1604
1605 let mut row = Row::default();
1606 row.packer()
1607 .try_push_array(
1608 &[ArrayDimension {
1609 lower_bound: 1,
1610 length: arg_type_ids.len(),
1611 }],
1612 arg_type_ids.iter().map(|id| Datum::String(id)),
1613 )
1614 .expect(
1615 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1616 );
1617 let arg_type_ids = row.unpack_first();
1618
1619 updates.push(BuiltinTableUpdate::row(
1620 &*MZ_FUNCTIONS,
1621 Row::pack_slice(&[
1622 Datum::String(&id.to_string()),
1623 Datum::UInt32(func_impl_details.oid),
1624 Datum::String(&schema_id.to_string()),
1625 Datum::String(name),
1626 arg_type_ids,
1627 Datum::from(
1628 func_impl_details
1629 .variadic_typ
1630 .map(|typ| self.get_system_type(typ).id().to_string())
1631 .as_deref(),
1632 ),
1633 Datum::from(
1634 func_impl_details
1635 .return_typ
1636 .map(|typ| self.get_system_type(typ).id().to_string())
1637 .as_deref(),
1638 ),
1639 func_impl_details.return_is_set.into(),
1640 Datum::String(&owner_id.to_string()),
1641 ]),
1642 diff,
1643 ));
1644
1645 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1646 updates.push(BuiltinTableUpdate::row(
1647 &*MZ_AGGREGATES,
1648 Row::pack_slice(&[
1649 Datum::UInt32(func_impl_details.oid),
1650 Datum::String("n"),
1652 Datum::Int16(0),
1653 ]),
1654 diff,
1655 ));
1656 }
1657 }
1658 updates
1659 }
1660
1661 pub fn pack_op_update(
1662 &self,
1663 operator: &str,
1664 func_impl_details: FuncImplCatalogDetails,
1665 diff: Diff,
1666 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1667 let arg_type_ids = func_impl_details
1668 .arg_typs
1669 .iter()
1670 .map(|typ| self.get_system_type(typ).id().to_string())
1671 .collect::<Vec<_>>();
1672
1673 let mut row = Row::default();
1674 row.packer()
1675 .try_push_array(
1676 &[ArrayDimension {
1677 lower_bound: 1,
1678 length: arg_type_ids.len(),
1679 }],
1680 arg_type_ids.iter().map(|id| Datum::String(id)),
1681 )
1682 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1683 let arg_type_ids = row.unpack_first();
1684
1685 BuiltinTableUpdate::row(
1686 &*MZ_OPERATORS,
1687 Row::pack_slice(&[
1688 Datum::UInt32(func_impl_details.oid),
1689 Datum::String(operator),
1690 arg_type_ids,
1691 Datum::from(
1692 func_impl_details
1693 .return_typ
1694 .map(|typ| self.get_system_type(typ).id().to_string())
1695 .as_deref(),
1696 ),
1697 ]),
1698 diff,
1699 )
1700 }
1701
1702 pub fn pack_audit_log_update(
1703 &self,
1704 event: &VersionedEvent,
1705 diff: Diff,
1706 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1707 let (event_type, object_type, details, user, occurred_at): (
1708 &EventType,
1709 &ObjectType,
1710 &EventDetails,
1711 &Option<String>,
1712 u64,
1713 ) = match event {
1714 VersionedEvent::V1(ev) => (
1715 &ev.event_type,
1716 &ev.object_type,
1717 &ev.details,
1718 &ev.user,
1719 ev.occurred_at,
1720 ),
1721 };
1722 let details = Jsonb::from_serde_json(details.as_json())
1723 .map_err(|e| {
1724 Error::new(ErrorKind::Unstructured(format!(
1725 "could not pack audit log update: {}",
1726 e
1727 )))
1728 })?
1729 .into_row();
1730 let details = details
1731 .iter()
1732 .next()
1733 .expect("details created above with a single jsonb column");
1734 let dt = mz_ore::now::to_datetime(occurred_at);
1735 let id = event.sortable_id();
1736 Ok(BuiltinTableUpdate::row(
1737 &*MZ_AUDIT_EVENTS,
1738 Row::pack_slice(&[
1739 Datum::UInt64(id),
1740 Datum::String(&format!("{}", event_type)),
1741 Datum::String(&format!("{}", object_type)),
1742 details,
1743 match user {
1744 Some(user) => Datum::String(user),
1745 None => Datum::Null,
1746 },
1747 Datum::TimestampTz(dt.try_into().expect("must fit")),
1748 ]),
1749 diff,
1750 ))
1751 }
1752
1753 pub fn pack_storage_usage_update(
1754 &self,
1755 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1756 diff: Diff,
1757 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1758 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1759 let row = Row::pack_slice(&[
1760 Datum::UInt64(event.id),
1761 Datum::from(event.shard_id.as_deref()),
1762 Datum::UInt64(event.size_bytes),
1763 Datum::TimestampTz(
1764 mz_ore::now::to_datetime(event.collection_timestamp)
1765 .try_into()
1766 .expect("must fit"),
1767 ),
1768 ]);
1769 BuiltinTableUpdate::row(id, row, diff)
1770 }
1771
1772 pub fn pack_egress_ip_update(
1773 &self,
1774 ip: &IpNet,
1775 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1776 let id = &MZ_EGRESS_IPS;
1777 let addr = ip.network();
1778 let row = Row::pack_slice(&[
1779 Datum::String(&addr.to_string()),
1780 Datum::Int32(ip.prefix_len().into()),
1781 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1782 ]);
1783 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1784 }
1785
1786 pub fn pack_license_key_update(
1787 &self,
1788 license_key: &ValidatedLicenseKey,
1789 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1790 let id = &MZ_LICENSE_KEYS;
1791 let row = Row::pack_slice(&[
1792 Datum::String(&license_key.id),
1793 Datum::String(&license_key.organization),
1794 Datum::String(&license_key.environment_id),
1795 Datum::TimestampTz(
1796 mz_ore::now::to_datetime(license_key.expiration * 1000)
1797 .try_into()
1798 .expect("must fit"),
1799 ),
1800 Datum::TimestampTz(
1801 mz_ore::now::to_datetime(license_key.not_before * 1000)
1802 .try_into()
1803 .expect("must fit"),
1804 ),
1805 ]);
1806 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1807 }
1808
1809 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1810 let mut updates = Vec::new();
1811 for (size, alloc) in &self.cluster_replica_sizes.0 {
1812 if alloc.disabled {
1813 continue;
1814 }
1815
1816 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1819 let MemoryLimit(ByteSize(memory_bytes)) =
1820 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1821 let DiskLimit(ByteSize(disk_bytes)) =
1822 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1823
1824 let row = Row::pack_slice(&[
1825 size.as_str().into(),
1826 u64::cast_from(alloc.scale).into(),
1827 u64::cast_from(alloc.workers).into(),
1828 cpu_limit.as_nanocpus().into(),
1829 memory_bytes.into(),
1830 disk_bytes.into(),
1831 (alloc.credits_per_hour).into(),
1832 ]);
1833
1834 updates.push(BuiltinTableUpdate::row(
1835 &*MZ_CLUSTER_REPLICA_SIZES,
1836 row,
1837 Diff::ONE,
1838 ));
1839 }
1840
1841 updates
1842 }
1843
1844 pub fn pack_subscribe_update(
1845 &self,
1846 id: GlobalId,
1847 subscribe: &ActiveSubscribe,
1848 diff: Diff,
1849 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1850 let mut row = Row::default();
1851 let mut packer = row.packer();
1852 packer.push(Datum::String(&id.to_string()));
1853 packer.push(Datum::Uuid(subscribe.session_uuid));
1854 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1855
1856 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1857 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1858
1859 let depends_on: Vec<_> = subscribe
1860 .depends_on
1861 .iter()
1862 .map(|id| id.to_string())
1863 .collect();
1864 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1865
1866 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1867 }
1868
1869 pub fn pack_session_update(
1870 &self,
1871 conn: &ConnMeta,
1872 diff: Diff,
1873 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1874 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1875 BuiltinTableUpdate::row(
1876 &*MZ_SESSIONS,
1877 Row::pack_slice(&[
1878 Datum::Uuid(conn.uuid()),
1879 Datum::UInt32(conn.conn_id().unhandled()),
1880 Datum::String(&conn.authenticated_role_id().to_string()),
1881 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1882 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1883 ]),
1884 diff,
1885 )
1886 }
1887
1888 pub fn pack_default_privileges_update(
1889 &self,
1890 default_privilege_object: &DefaultPrivilegeObject,
1891 grantee: &RoleId,
1892 acl_mode: &AclMode,
1893 diff: Diff,
1894 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1895 BuiltinTableUpdate::row(
1896 &*MZ_DEFAULT_PRIVILEGES,
1897 Row::pack_slice(&[
1898 default_privilege_object.role_id.to_string().as_str().into(),
1899 default_privilege_object
1900 .database_id
1901 .map(|database_id| database_id.to_string())
1902 .as_deref()
1903 .into(),
1904 default_privilege_object
1905 .schema_id
1906 .map(|schema_id| schema_id.to_string())
1907 .as_deref()
1908 .into(),
1909 default_privilege_object
1910 .object_type
1911 .to_string()
1912 .to_lowercase()
1913 .as_str()
1914 .into(),
1915 grantee.to_string().as_str().into(),
1916 acl_mode.to_string().as_str().into(),
1917 ]),
1918 diff,
1919 )
1920 }
1921
1922 pub fn pack_system_privileges_update(
1923 &self,
1924 privileges: MzAclItem,
1925 diff: Diff,
1926 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1927 BuiltinTableUpdate::row(
1928 &*MZ_SYSTEM_PRIVILEGES,
1929 Row::pack_slice(&[privileges.into()]),
1930 diff,
1931 )
1932 }
1933
1934 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1935 let mut row = Row::default();
1936 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1937 row.packer()
1938 .try_push_array(
1939 &[ArrayDimension {
1940 lower_bound: 1,
1941 length: flat_privileges.len(),
1942 }],
1943 flat_privileges
1944 .into_iter()
1945 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1946 )
1947 .expect("privileges is 1 dimensional, and its length is used for the array length");
1948 row
1949 }
1950
1951 pub fn pack_comment_update(
1952 &self,
1953 object_id: CommentObjectId,
1954 column_pos: Option<usize>,
1955 comment: &str,
1956 diff: Diff,
1957 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1958 let object_type = mz_sql::catalog::ObjectType::from(object_id);
1960 let audit_type = super::object_type_to_audit_object_type(object_type);
1961 let object_type_str = audit_type.to_string();
1962
1963 let object_id_str = match object_id {
1964 CommentObjectId::Table(global_id)
1965 | CommentObjectId::View(global_id)
1966 | CommentObjectId::MaterializedView(global_id)
1967 | CommentObjectId::Source(global_id)
1968 | CommentObjectId::Sink(global_id)
1969 | CommentObjectId::Index(global_id)
1970 | CommentObjectId::Func(global_id)
1971 | CommentObjectId::Connection(global_id)
1972 | CommentObjectId::Secret(global_id)
1973 | CommentObjectId::Type(global_id) => global_id.to_string(),
1974 CommentObjectId::Role(role_id) => role_id.to_string(),
1975 CommentObjectId::Database(database_id) => database_id.to_string(),
1976 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1977 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1978 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1979 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1980 };
1981 let column_pos_datum = match column_pos {
1982 Some(pos) => {
1983 let pos =
1985 i32::try_from(pos).expect("we constrain this value in the planning layer");
1986 Datum::Int32(pos)
1987 }
1988 None => Datum::Null,
1989 };
1990
1991 BuiltinTableUpdate::row(
1992 &*MZ_COMMENTS,
1993 Row::pack_slice(&[
1994 Datum::String(&object_id_str),
1995 Datum::String(&object_type_str),
1996 column_pos_datum,
1997 Datum::String(comment),
1998 ]),
1999 diff,
2000 )
2001 }
2002
2003 pub fn pack_webhook_source_update(
2004 &self,
2005 item_id: CatalogItemId,
2006 diff: Diff,
2007 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2008 let url = self
2009 .try_get_webhook_url(&item_id)
2010 .expect("webhook source should exist");
2011 let url = url.to_string();
2012 let name = &self.get_entry(&item_id).name().item;
2013 let id_str = item_id.to_string();
2014
2015 BuiltinTableUpdate::row(
2016 &*MZ_WEBHOOKS_SOURCES,
2017 Row::pack_slice(&[
2018 Datum::String(&id_str),
2019 Datum::String(name),
2020 Datum::String(&url),
2021 ]),
2022 diff,
2023 )
2024 }
2025
2026 pub fn pack_source_references_update(
2027 &self,
2028 source_references: &SourceReferences,
2029 diff: Diff,
2030 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2031 let source_id = source_references.source_id.to_string();
2032 let updated_at = &source_references.updated_at;
2033 source_references
2034 .references
2035 .iter()
2036 .map(|reference| {
2037 let mut row = Row::default();
2038 let mut packer = row.packer();
2039 packer.extend([
2040 Datum::String(&source_id),
2041 reference
2042 .namespace
2043 .as_ref()
2044 .map(|s| Datum::String(s))
2045 .unwrap_or(Datum::Null),
2046 Datum::String(&reference.name),
2047 Datum::TimestampTz(
2048 mz_ore::now::to_datetime(*updated_at)
2049 .try_into()
2050 .expect("must fit"),
2051 ),
2052 ]);
2053 if reference.columns.len() > 0 {
2054 packer
2055 .try_push_array(
2056 &[ArrayDimension {
2057 lower_bound: 1,
2058 length: reference.columns.len(),
2059 }],
2060 reference.columns.iter().map(|col| Datum::String(col)),
2061 )
2062 .expect(
2063 "columns is 1 dimensional, and its length is used for the array length",
2064 );
2065 } else {
2066 packer.push(Datum::Null);
2067 }
2068
2069 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2070 })
2071 .collect()
2072 }
2073}