1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20 MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_CONTINUAL_TASKS,
21 MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES,
22 MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
23 MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES, MZ_OBJECT_DEPENDENCIES,
25 MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
26 MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SESSIONS,
27 MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
28 MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
29 MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
30};
31use mz_catalog::config::AwsPrincipalContext;
32use mz_catalog::durable::SourceReferences;
33use mz_catalog::memory::error::{Error, ErrorKind};
34use mz_catalog::memory::objects::{
35 CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
36 Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
37};
38use mz_controller::clusters::{
39 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
40};
41use mz_controller_types::ClusterId;
42use mz_expr::MirScalarExpr;
43use mz_license_keys::ValidatedLicenseKey;
44use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
45use mz_ore::cast::CastFrom;
46use mz_ore::collections::CollectionExt;
47use mz_persist_client::batch::ProtoBatch;
48use mz_repr::adt::array::ArrayDimension;
49use mz_repr::adt::interval::Interval;
50use mz_repr::adt::jsonb::Jsonb;
51use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
52use mz_repr::refresh_schedule::RefreshEvery;
53use mz_repr::role_id::RoleId;
54use mz_repr::{
55 CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
56};
57use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
58use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
59use mz_sql::func::FuncImplCatalogDetails;
60use mz_sql::names::{CommentObjectId, SchemaSpecifier};
61use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
62use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
63use mz_sql::session::vars::SessionVars;
64use mz_sql_parser::ast::display::AstDisplay;
65use mz_storage_client::client::TableData;
66use mz_storage_types::connections::KafkaConnection;
67use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
68use mz_storage_types::connections::inline::ReferencedConnection;
69use mz_storage_types::connections::string_or_secret::StringOrSecret;
70use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
71use mz_storage_types::sources::{
72 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
73};
74use smallvec::smallvec;
75
76use crate::active_compute_sink::ActiveSubscribe;
78use crate::catalog::CatalogState;
79use crate::coord::ConnMeta;
80
81#[derive(Debug, Clone)]
83pub struct BuiltinTableUpdate<T = CatalogItemId> {
84 pub id: T,
86 pub data: TableData,
88}
89
90impl<T> BuiltinTableUpdate<T> {
91 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
93 BuiltinTableUpdate {
94 id,
95 data: TableData::Rows(vec![(row, diff)]),
96 }
97 }
98
99 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
100 BuiltinTableUpdate {
101 id,
102 data: TableData::Batches(smallvec![batch]),
103 }
104 }
105}
106
107impl CatalogState {
108 pub fn resolve_builtin_table_updates(
109 &self,
110 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
111 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
112 builtin_table_update
113 .into_iter()
114 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
115 .collect()
116 }
117
118 pub fn resolve_builtin_table_update(
119 &self,
120 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
121 ) -> BuiltinTableUpdate<CatalogItemId> {
122 let id = self.resolve_builtin_table(id);
123 BuiltinTableUpdate { id, data }
124 }
125
126 pub fn pack_depends_update(
127 &self,
128 depender: CatalogItemId,
129 dependee: CatalogItemId,
130 diff: Diff,
131 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
132 let row = Row::pack_slice(&[
133 Datum::String(&depender.to_string()),
134 Datum::String(&dependee.to_string()),
135 ]);
136 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
137 }
138
139 pub(super) fn pack_role_auth_update(
140 &self,
141 id: RoleId,
142 diff: Diff,
143 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144 let role_auth = self.get_role_auth(&id);
145 let role = self.get_role(&id);
146 BuiltinTableUpdate::row(
147 &*MZ_ROLE_AUTH,
148 Row::pack_slice(&[
149 Datum::String(&role_auth.role_id.to_string()),
150 Datum::UInt32(role.oid),
151 match &role_auth.password_hash {
152 Some(hash) => Datum::String(hash),
153 None => Datum::Null,
154 },
155 Datum::TimestampTz(
156 mz_ore::now::to_datetime(role_auth.updated_at)
157 .try_into()
158 .expect("must fit"),
159 ),
160 ]),
161 diff,
162 )
163 }
164
165 pub(super) fn pack_role_update(
166 &self,
167 id: RoleId,
168 diff: Diff,
169 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
170 match id {
171 RoleId::Public => vec![],
173 id => {
174 let role = self.get_role(&id);
175 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
176
177 let rolcanlogin = if let Some(login) = role.attributes.login {
178 login
179 } else {
180 builtin_supers.contains(&role.id)
181 };
182
183 let rolsuper = if let Some(superuser) = role.attributes.superuser {
184 Datum::from(superuser)
185 } else if builtin_supers.contains(&role.id) {
186 Datum::from(true)
188 } else {
189 Datum::Null
198 };
199
200 let role_update = BuiltinTableUpdate::row(
201 &*MZ_ROLES,
202 Row::pack_slice(&[
203 Datum::String(&role.id.to_string()),
204 Datum::UInt32(role.oid),
205 Datum::String(&role.name),
206 Datum::from(role.attributes.inherit),
207 Datum::from(rolcanlogin),
208 rolsuper,
209 ]),
210 diff,
211 );
212 let mut updates = vec![role_update];
213
214 let session_vars_reference = SessionVars::new_unchecked(
217 &mz_build_info::DUMMY_BUILD_INFO,
218 SYSTEM_USER.clone(),
219 None,
220 );
221
222 for (name, val) in role.vars() {
223 let result = session_vars_reference
224 .inspect(name)
225 .and_then(|var| var.check(val.borrow()));
226 let Ok(formatted_val) = result else {
227 tracing::error!(?name, ?val, ?result, "found invalid role default var");
230 continue;
231 };
232
233 let role_var_update = BuiltinTableUpdate::row(
234 &*MZ_ROLE_PARAMETERS,
235 Row::pack_slice(&[
236 Datum::String(&role.id.to_string()),
237 Datum::String(name),
238 Datum::String(&formatted_val),
239 ]),
240 diff,
241 );
242 updates.push(role_var_update);
243 }
244
245 updates
246 }
247 }
248 }
249
250 pub(super) fn pack_cluster_update(
251 &self,
252 name: &str,
253 diff: Diff,
254 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
255 let id = self.clusters_by_name[name];
256 let cluster = &self.clusters_by_id[&id];
257 let row = self.pack_privilege_array_row(cluster.privileges());
258 let privileges = row.unpack_first();
259 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
260 match &cluster.config.variant {
261 ClusterVariant::Managed(config) => (
262 Some(config.size.as_str()),
263 Some(self.cluster_replica_size_has_disk(&config.size)),
264 Some(config.replication_factor),
265 if config.availability_zones.is_empty() {
266 None
267 } else {
268 Some(config.availability_zones.clone())
269 },
270 Some(config.logging.log_logging),
271 config.logging.interval.map(|d| {
272 Interval::from_duration(&d)
273 .expect("planning ensured this convertible back to interval")
274 }),
275 ),
276 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
277 };
278
279 let mut row = Row::default();
280 let mut packer = row.packer();
281 packer.extend([
282 Datum::String(&id.to_string()),
283 Datum::String(name),
284 Datum::String(&cluster.owner_id.to_string()),
285 privileges,
286 cluster.is_managed().into(),
287 size.into(),
288 replication_factor.into(),
289 disk.into(),
290 ]);
291 if let Some(azs) = azs {
292 packer.push_list(azs.iter().map(|az| Datum::String(az)));
293 } else {
294 packer.push(Datum::Null);
295 }
296 packer.push(Datum::from(introspection_debugging));
297 packer.push(Datum::from(introspection_interval));
298
299 let mut updates = Vec::new();
300
301 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
302
303 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
304 let row = match managed_config.schedule {
305 ClusterSchedule::Manual => Row::pack_slice(&[
306 Datum::String(&id.to_string()),
307 Datum::String("manual"),
308 Datum::Null,
309 ]),
310 ClusterSchedule::Refresh {
311 hydration_time_estimate,
312 } => Row::pack_slice(&[
313 Datum::String(&id.to_string()),
314 Datum::String("on-refresh"),
315 Datum::Interval(
316 Interval::from_duration(&hydration_time_estimate)
317 .expect("planning ensured that this is convertible back to Interval"),
318 ),
319 ]),
320 };
321 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
322 }
323
324 updates
325 }
326
327 pub(super) fn pack_cluster_replica_update(
328 &self,
329 cluster_id: ClusterId,
330 name: &str,
331 diff: Diff,
332 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
333 let cluster = &self.clusters_by_id[&cluster_id];
334 let id = cluster.replica_id(name).expect("Must exist");
335 let replica = cluster.replica(id).expect("Must exist");
336
337 let (size, disk, az) = match &replica.config.location {
338 ReplicaLocation::Managed(ManagedReplicaLocation {
341 size,
342 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
343 allocation: _,
344 billed_as: _,
345 internal: _,
346 pending: _,
347 }) => (
348 Some(&**size),
349 Some(self.cluster_replica_size_has_disk(size)),
350 Some(az.as_str()),
351 ),
352 ReplicaLocation::Managed(ManagedReplicaLocation {
353 size,
354 availability_zones: _,
355 allocation: _,
356 billed_as: _,
357 internal: _,
358 pending: _,
359 }) => (
360 Some(&**size),
361 Some(self.cluster_replica_size_has_disk(size)),
362 None,
363 ),
364 ReplicaLocation::Unmanaged(_) => (None, None, None),
365 };
366
367 let cluster_replica_update = BuiltinTableUpdate::row(
368 &*MZ_CLUSTER_REPLICAS,
369 Row::pack_slice(&[
370 Datum::String(&id.to_string()),
371 Datum::String(name),
372 Datum::String(&cluster_id.to_string()),
373 Datum::from(size),
374 Datum::from(az),
375 Datum::String(&replica.owner_id.to_string()),
376 Datum::from(disk),
377 ]),
378 diff,
379 );
380
381 vec![cluster_replica_update]
382 }
383
384 pub(super) fn pack_item_update(
385 &self,
386 id: CatalogItemId,
387 diff: Diff,
388 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
389 let entry = self.get_entry(&id);
390 let oid = entry.oid();
391 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
392 let schema_id = &self
393 .get_schema(
394 &entry.name().qualifiers.database_spec,
395 &entry.name().qualifiers.schema_spec,
396 conn_id,
397 )
398 .id;
399 let name = &entry.name().item;
400 let owner_id = entry.owner_id();
401 let privileges_row = self.pack_privilege_array_row(entry.privileges());
402 let privileges = privileges_row.unpack_first();
403 let mut updates = match entry.item() {
404 CatalogItem::Log(_) => self.pack_source_update(
405 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
406 privileges, diff, None,
407 ),
408 CatalogItem::Index(index) => {
409 self.pack_index_update(id, oid, name, owner_id, index, diff)
410 }
411 CatalogItem::Table(table) => {
412 let mut updates = self
413 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
414
415 if let TableDataSource::DataSource {
416 desc: data_source,
417 timeline: _,
418 } = &table.data_source
419 {
420 updates.extend(match data_source {
421 DataSourceDesc::IngestionExport {
422 ingestion_id,
423 external_reference: UnresolvedItemName(external_reference),
424 details: _,
425 data_config: _,
426 } => {
427 let ingestion_entry = self
428 .get_entry(ingestion_id)
429 .source_desc()
430 .expect("primary source exists")
431 .expect("primary source is a source");
432
433 match ingestion_entry.connection.name() {
434 "postgres" => {
435 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
436 let schema_name = external_reference[1].as_str();
442 let table_name = external_reference[2].as_str();
443
444 self.pack_postgres_source_tables_update(
445 id,
446 schema_name,
447 table_name,
448 diff,
449 )
450 }
451 "mysql" => {
452 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
453 let schema_name = external_reference[0].as_str();
454 let table_name = external_reference[1].as_str();
455
456 self.pack_mysql_source_tables_update(
457 id,
458 schema_name,
459 table_name,
460 diff,
461 )
462 }
463 "sql-server" => {
464 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
465 let schema_name = external_reference[1].as_str();
470 let table_name = external_reference[2].as_str();
471
472 self.pack_sql_server_source_table_update(
473 id,
474 schema_name,
475 table_name,
476 diff,
477 )
478 }
479 "load-generator" => vec![],
482 "kafka" => {
483 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
484 let topic = external_reference[0].as_str();
485 let envelope = data_source.envelope();
486 let (key_format, value_format) = data_source.formats();
487
488 self.pack_kafka_source_tables_update(
489 id,
490 topic,
491 envelope,
492 key_format,
493 value_format,
494 diff,
495 )
496 }
497 s => unreachable!("{s} sources do not have tables"),
498 }
499 }
500 DataSourceDesc::Ingestion { .. }
501 | DataSourceDesc::OldSyntaxIngestion { .. }
502 | DataSourceDesc::Introspection(_)
503 | DataSourceDesc::Progress
504 | DataSourceDesc::Webhook { .. }
505 | DataSourceDesc::Catalog => vec![],
506 });
507 }
508
509 updates
510 }
511 CatalogItem::Source(source) => {
512 let source_type = source.source_type();
513 let connection_id = source.connection_id();
514 let envelope = source.data_source.envelope();
515 let cluster_entry = match source.data_source {
516 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
519 self.get_entry(&ingestion_id)
520 }
521 DataSourceDesc::Ingestion { .. }
522 | DataSourceDesc::OldSyntaxIngestion { .. }
523 | DataSourceDesc::Introspection(_)
524 | DataSourceDesc::Progress
525 | DataSourceDesc::Webhook { .. }
526 | DataSourceDesc::Catalog => entry,
527 };
528
529 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
530
531 let (key_format, value_format) = source.data_source.formats();
532
533 let mut updates = self.pack_source_update(
534 id,
535 oid,
536 schema_id,
537 name,
538 source_type,
539 connection_id,
540 envelope,
541 key_format,
542 value_format,
543 cluster_id.as_deref(),
544 owner_id,
545 privileges,
546 diff,
547 source.create_sql.as_ref(),
548 );
549
550 updates.extend(match &source.data_source {
551 DataSourceDesc::Ingestion { desc, .. }
552 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
553 GenericSourceConnection::Postgres(postgres) => {
554 self.pack_postgres_source_update(id, postgres, diff)
555 }
556 GenericSourceConnection::Kafka(kafka) => {
557 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
558 }
559 _ => vec![],
560 },
561 DataSourceDesc::IngestionExport {
562 ingestion_id,
563 external_reference: UnresolvedItemName(external_reference),
564 details: _,
565 data_config: _,
566 } => {
567 let ingestion_entry = self
568 .get_entry(ingestion_id)
569 .source_desc()
570 .expect("primary source exists")
571 .expect("primary source is a source");
572
573 match ingestion_entry.connection.name() {
574 "postgres" => {
575 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
576 let schema_name = external_reference[1].as_str();
582 let table_name = external_reference[2].as_str();
583
584 self.pack_postgres_source_tables_update(
585 id,
586 schema_name,
587 table_name,
588 diff,
589 )
590 }
591 "mysql" => {
592 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
593 let schema_name = external_reference[0].as_str();
594 let table_name = external_reference[1].as_str();
595
596 self.pack_mysql_source_tables_update(
597 id,
598 schema_name,
599 table_name,
600 diff,
601 )
602 }
603 "sql-server" => {
604 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
605 let schema_name = external_reference[1].as_str();
610 let table_name = external_reference[2].as_str();
611
612 self.pack_sql_server_source_table_update(
613 id,
614 schema_name,
615 table_name,
616 diff,
617 )
618 }
619 "load-generator" => vec![],
622 s => unreachable!("{s} sources do not have subsources"),
623 }
624 }
625 DataSourceDesc::Webhook { .. } => {
626 vec![self.pack_webhook_source_update(id, diff)]
627 }
628 DataSourceDesc::Introspection(_)
629 | DataSourceDesc::Progress
630 | DataSourceDesc::Catalog => vec![],
631 });
632
633 updates
634 }
635 CatalogItem::View(view) => {
636 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
637 }
638 CatalogItem::MaterializedView(mview) => {
639 self.pack_materialized_view_update(id, mview, diff)
640 }
641 CatalogItem::Sink(sink) => {
642 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
643 }
644 CatalogItem::Type(ty) => {
645 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
646 }
647 CatalogItem::Func(func) => {
648 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
649 }
650 CatalogItem::Secret(_) => vec![],
651 CatalogItem::Connection(connection) => {
652 self.pack_connection_update(id, connection, diff)
653 }
654 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
655 id, oid, schema_id, name, owner_id, privileges, ct, diff,
656 ),
657 };
658
659 if !entry.item().is_temporary() {
660 for dependee in entry.item().references().items() {
663 updates.push(self.pack_depends_update(id, *dependee, diff))
664 }
665 }
666
667 if let Some(desc) = entry.relation_desc_latest() {
669 let defaults = match entry.item() {
670 CatalogItem::Table(Table {
671 data_source: TableDataSource::TableWrites { defaults },
672 ..
673 }) => Some(defaults),
674 _ => None,
675 };
676 for (i, (column_name, column_type)) in desc.iter().enumerate() {
677 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
678 let default: Datum = default
679 .as_ref()
680 .map(|d| Datum::String(d))
681 .unwrap_or(Datum::Null);
682 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
683 let (type_name, type_oid) = match &column_type.scalar_type {
684 SqlScalarType::List {
685 custom_id: Some(custom_id),
686 ..
687 }
688 | SqlScalarType::Map {
689 custom_id: Some(custom_id),
690 ..
691 }
692 | SqlScalarType::Record {
693 custom_id: Some(custom_id),
694 ..
695 } => {
696 let entry = self.get_entry(custom_id);
697 let name = &*entry.name().item;
712 let oid = entry.oid();
713 (name, oid)
714 }
715 _ => (pgtype.name(), pgtype.oid()),
716 };
717 updates.push(BuiltinTableUpdate::row(
718 &*MZ_COLUMNS,
719 Row::pack_slice(&[
720 Datum::String(&id.to_string()),
721 Datum::String(column_name),
722 Datum::UInt64(u64::cast_from(i + 1)),
723 Datum::from(column_type.nullable),
724 Datum::String(type_name),
725 default,
726 Datum::UInt32(type_oid),
727 Datum::Int32(pgtype.typmod()),
728 ]),
729 diff,
730 ));
731 }
732 }
733
734 if let Some(cw) = entry.item().initial_logical_compaction_window() {
736 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
737 }
738
739 updates.extend(Self::pack_item_global_id_update(entry, diff));
740
741 updates
742 }
743
744 fn pack_item_global_id_update(
745 entry: &CatalogEntry,
746 diff: Diff,
747 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
748 let id = entry.id().to_string();
749 let global_ids = entry.global_ids();
750 global_ids.map(move |global_id| {
751 BuiltinTableUpdate::row(
752 &*MZ_OBJECT_GLOBAL_IDS,
753 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
754 diff,
755 )
756 })
757 }
758
759 fn pack_history_retention_strategy_update(
760 &self,
761 id: CatalogItemId,
762 cw: CompactionWindow,
763 diff: Diff,
764 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
765 let cw: u64 = cw.comparable_timestamp().into();
766 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
767 .expect("must serialize");
768 BuiltinTableUpdate::row(
769 &*MZ_HISTORY_RETENTION_STRATEGIES,
770 Row::pack_slice(&[
771 Datum::String(&id.to_string()),
772 Datum::String("FOR"),
774 cw.into_row().into_element(),
775 ]),
776 diff,
777 )
778 }
779
780 fn pack_table_update(
781 &self,
782 id: CatalogItemId,
783 oid: u32,
784 schema_id: &SchemaSpecifier,
785 name: &str,
786 owner_id: &RoleId,
787 privileges: Datum,
788 diff: Diff,
789 table: &Table,
790 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
791 let redacted = table.create_sql.as_ref().map(|create_sql| {
792 mz_sql::parse::parse(create_sql)
793 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
794 .into_element()
795 .ast
796 .to_ast_string_redacted()
797 });
798 let source_id = if let TableDataSource::DataSource {
799 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
800 ..
801 } = &table.data_source
802 {
803 Some(ingestion_id.to_string())
804 } else {
805 None
806 };
807
808 vec![BuiltinTableUpdate::row(
809 &*MZ_TABLES,
810 Row::pack_slice(&[
811 Datum::String(&id.to_string()),
812 Datum::UInt32(oid),
813 Datum::String(&schema_id.to_string()),
814 Datum::String(name),
815 Datum::String(&owner_id.to_string()),
816 privileges,
817 if let Some(create_sql) = &table.create_sql {
818 Datum::String(create_sql)
819 } else {
820 Datum::Null
821 },
822 if let Some(redacted) = &redacted {
823 Datum::String(redacted)
824 } else {
825 Datum::Null
826 },
827 if let Some(source_id) = source_id.as_ref() {
828 Datum::String(source_id)
829 } else {
830 Datum::Null
831 },
832 ]),
833 diff,
834 )]
835 }
836
837 fn pack_source_update(
838 &self,
839 id: CatalogItemId,
840 oid: u32,
841 schema_id: &SchemaSpecifier,
842 name: &str,
843 source_desc_name: &str,
844 connection_id: Option<CatalogItemId>,
845 envelope: Option<&str>,
846 key_format: Option<&str>,
847 value_format: Option<&str>,
848 cluster_id: Option<&str>,
849 owner_id: &RoleId,
850 privileges: Datum,
851 diff: Diff,
852 create_sql: Option<&String>,
853 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
854 let redacted = create_sql.map(|create_sql| {
855 let create_stmt = mz_sql::parse::parse(create_sql)
856 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
857 .into_element()
858 .ast;
859 create_stmt.to_ast_string_redacted()
860 });
861 vec![BuiltinTableUpdate::row(
862 &*MZ_SOURCES,
863 Row::pack_slice(&[
864 Datum::String(&id.to_string()),
865 Datum::UInt32(oid),
866 Datum::String(&schema_id.to_string()),
867 Datum::String(name),
868 Datum::String(source_desc_name),
869 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
870 Datum::Null,
873 Datum::from(envelope),
874 Datum::from(key_format),
875 Datum::from(value_format),
876 Datum::from(cluster_id),
877 Datum::String(&owner_id.to_string()),
878 privileges,
879 if let Some(create_sql) = create_sql {
880 Datum::String(create_sql)
881 } else {
882 Datum::Null
883 },
884 if let Some(redacted) = &redacted {
885 Datum::String(redacted)
886 } else {
887 Datum::Null
888 },
889 ]),
890 diff,
891 )]
892 }
893
894 fn pack_postgres_source_update(
895 &self,
896 id: CatalogItemId,
897 postgres: &PostgresSourceConnection<ReferencedConnection>,
898 diff: Diff,
899 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
900 vec![BuiltinTableUpdate::row(
901 &*MZ_POSTGRES_SOURCES,
902 Row::pack_slice(&[
903 Datum::String(&id.to_string()),
904 Datum::String(&postgres.publication_details.slot),
905 Datum::from(postgres.publication_details.timeline_id),
906 ]),
907 diff,
908 )]
909 }
910
911 fn pack_kafka_source_update(
912 &self,
913 item_id: CatalogItemId,
914 collection_id: GlobalId,
915 kafka: &KafkaSourceConnection<ReferencedConnection>,
916 diff: Diff,
917 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
918 vec![BuiltinTableUpdate::row(
919 &*MZ_KAFKA_SOURCES,
920 Row::pack_slice(&[
921 Datum::String(&item_id.to_string()),
922 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
923 Datum::String(&kafka.topic),
924 ]),
925 diff,
926 )]
927 }
928
929 fn pack_postgres_source_tables_update(
930 &self,
931 id: CatalogItemId,
932 schema_name: &str,
933 table_name: &str,
934 diff: Diff,
935 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
936 vec![BuiltinTableUpdate::row(
937 &*MZ_POSTGRES_SOURCE_TABLES,
938 Row::pack_slice(&[
939 Datum::String(&id.to_string()),
940 Datum::String(schema_name),
941 Datum::String(table_name),
942 ]),
943 diff,
944 )]
945 }
946
947 fn pack_mysql_source_tables_update(
948 &self,
949 id: CatalogItemId,
950 schema_name: &str,
951 table_name: &str,
952 diff: Diff,
953 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
954 vec![BuiltinTableUpdate::row(
955 &*MZ_MYSQL_SOURCE_TABLES,
956 Row::pack_slice(&[
957 Datum::String(&id.to_string()),
958 Datum::String(schema_name),
959 Datum::String(table_name),
960 ]),
961 diff,
962 )]
963 }
964
965 fn pack_sql_server_source_table_update(
966 &self,
967 id: CatalogItemId,
968 schema_name: &str,
969 table_name: &str,
970 diff: Diff,
971 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
972 vec![BuiltinTableUpdate::row(
973 &*MZ_SQL_SERVER_SOURCE_TABLES,
974 Row::pack_slice(&[
975 Datum::String(&id.to_string()),
976 Datum::String(schema_name),
977 Datum::String(table_name),
978 ]),
979 diff,
980 )]
981 }
982
983 fn pack_kafka_source_tables_update(
984 &self,
985 id: CatalogItemId,
986 topic: &str,
987 envelope: Option<&str>,
988 key_format: Option<&str>,
989 value_format: Option<&str>,
990 diff: Diff,
991 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
992 vec![BuiltinTableUpdate::row(
993 &*MZ_KAFKA_SOURCE_TABLES,
994 Row::pack_slice(&[
995 Datum::String(&id.to_string()),
996 Datum::String(topic),
997 Datum::from(envelope),
998 Datum::from(key_format),
999 Datum::from(value_format),
1000 ]),
1001 diff,
1002 )]
1003 }
1004
1005 fn pack_connection_update(
1006 &self,
1007 id: CatalogItemId,
1008 connection: &Connection,
1009 diff: Diff,
1010 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1011 let mut updates = vec![];
1012 match connection.details {
1013 ConnectionDetails::Kafka(ref kafka) => {
1014 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1015 }
1016 ConnectionDetails::Aws(ref aws_config) => {
1017 match self.pack_aws_connection_update(id, aws_config, diff) {
1018 Ok(update) => {
1019 updates.push(update);
1020 }
1021 Err(e) => {
1022 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1023 }
1024 }
1025 }
1026 ConnectionDetails::AwsPrivatelink(_) => {
1027 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1028 updates.push(self.pack_aws_privatelink_connection_update(
1029 id,
1030 aws_principal_context,
1031 diff,
1032 ));
1033 } else {
1034 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1035 }
1036 }
1037 ConnectionDetails::Ssh {
1038 ref key_1,
1039 ref key_2,
1040 ..
1041 } => {
1042 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1043 }
1044 ConnectionDetails::Csr(_)
1045 | ConnectionDetails::Postgres(_)
1046 | ConnectionDetails::MySql(_)
1047 | ConnectionDetails::SqlServer(_)
1048 | ConnectionDetails::IcebergCatalog(_) => (),
1049 };
1050 updates
1051 }
1052
1053 pub(crate) fn pack_ssh_tunnel_connection_update(
1054 &self,
1055 id: CatalogItemId,
1056 key_1: &SshKey,
1057 key_2: &SshKey,
1058 diff: Diff,
1059 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1060 BuiltinTableUpdate::row(
1061 &*MZ_SSH_TUNNEL_CONNECTIONS,
1062 Row::pack_slice(&[
1063 Datum::String(&id.to_string()),
1064 Datum::String(key_1.public_key().as_str()),
1065 Datum::String(key_2.public_key().as_str()),
1066 ]),
1067 diff,
1068 )
1069 }
1070
1071 fn pack_kafka_connection_update(
1072 &self,
1073 id: CatalogItemId,
1074 kafka: &KafkaConnection<ReferencedConnection>,
1075 diff: Diff,
1076 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1077 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1078 let mut row = Row::default();
1079 row.packer()
1080 .try_push_array(
1081 &[ArrayDimension {
1082 lower_bound: 1,
1083 length: kafka.brokers.len(),
1084 }],
1085 kafka
1086 .brokers
1087 .iter()
1088 .map(|broker| Datum::String(&broker.address)),
1089 )
1090 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1091 let brokers = row.unpack_first();
1092 vec![BuiltinTableUpdate::row(
1093 &*MZ_KAFKA_CONNECTIONS,
1094 Row::pack_slice(&[
1095 Datum::String(&id.to_string()),
1096 brokers,
1097 Datum::String(&progress_topic),
1098 ]),
1099 diff,
1100 )]
1101 }
1102
1103 pub fn pack_aws_privatelink_connection_update(
1104 &self,
1105 connection_id: CatalogItemId,
1106 aws_principal_context: &AwsPrincipalContext,
1107 diff: Diff,
1108 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1109 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1110 let row = Row::pack_slice(&[
1111 Datum::String(&connection_id.to_string()),
1112 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1113 ]);
1114 BuiltinTableUpdate::row(id, row, diff)
1115 }
1116
1117 pub fn pack_aws_connection_update(
1118 &self,
1119 connection_id: CatalogItemId,
1120 aws_config: &AwsConnection,
1121 diff: Diff,
1122 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1123 let id = &MZ_AWS_CONNECTIONS;
1124
1125 let mut access_key_id = None;
1126 let mut access_key_id_secret_id = None;
1127 let mut secret_access_key_secret_id = None;
1128 let mut session_token = None;
1129 let mut session_token_secret_id = None;
1130 let mut assume_role_arn = None;
1131 let mut assume_role_session_name = None;
1132 let mut principal = None;
1133 let mut external_id = None;
1134 let mut example_trust_policy = None;
1135 match &aws_config.auth {
1136 AwsAuth::Credentials(credentials) => {
1137 match &credentials.access_key_id {
1138 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1139 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1140 }
1141 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1142 match credentials.session_token.as_ref() {
1143 None => (),
1144 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1145 Some(StringOrSecret::Secret(s)) => {
1146 session_token_secret_id = Some(s.to_string())
1147 }
1148 }
1149 }
1150 AwsAuth::AssumeRole(assume_role) => {
1151 assume_role_arn = Some(assume_role.arn.as_str());
1152 assume_role_session_name = assume_role.session_name.as_deref();
1153 principal = self
1154 .config
1155 .connection_context
1156 .aws_connection_role_arn
1157 .as_deref();
1158 external_id =
1159 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1160 example_trust_policy = {
1161 let policy = assume_role
1162 .example_trust_policy(&self.config.connection_context, connection_id)?;
1163 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1164 Some(policy.into_row())
1165 };
1166 }
1167 }
1168
1169 let row = Row::pack_slice(&[
1170 Datum::String(&connection_id.to_string()),
1171 Datum::from(aws_config.endpoint.as_deref()),
1172 Datum::from(aws_config.region.as_deref()),
1173 Datum::from(access_key_id),
1174 Datum::from(access_key_id_secret_id.as_deref()),
1175 Datum::from(secret_access_key_secret_id.as_deref()),
1176 Datum::from(session_token),
1177 Datum::from(session_token_secret_id.as_deref()),
1178 Datum::from(assume_role_arn),
1179 Datum::from(assume_role_session_name),
1180 Datum::from(principal),
1181 Datum::from(external_id.as_deref()),
1182 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1183 ]);
1184
1185 Ok(BuiltinTableUpdate::row(id, row, diff))
1186 }
1187
1188 fn pack_view_update(
1189 &self,
1190 id: CatalogItemId,
1191 oid: u32,
1192 schema_id: &SchemaSpecifier,
1193 name: &str,
1194 owner_id: &RoleId,
1195 privileges: Datum,
1196 view: &View,
1197 diff: Diff,
1198 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1199 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1200 .unwrap_or_else(|e| {
1201 panic!(
1202 "create_sql cannot be invalid: `{}` --- error: `{}`",
1203 view.create_sql, e
1204 )
1205 })
1206 .into_element()
1207 .ast;
1208 let query = match &create_stmt {
1209 Statement::CreateView(stmt) => &stmt.definition.query,
1210 _ => unreachable!(),
1211 };
1212
1213 let mut query_string = query.to_ast_string_stable();
1214 query_string.push(';');
1217
1218 vec![BuiltinTableUpdate::row(
1219 &*MZ_VIEWS,
1220 Row::pack_slice(&[
1221 Datum::String(&id.to_string()),
1222 Datum::UInt32(oid),
1223 Datum::String(&schema_id.to_string()),
1224 Datum::String(name),
1225 Datum::String(&query_string),
1226 Datum::String(&owner_id.to_string()),
1227 privileges,
1228 Datum::String(&view.create_sql),
1229 Datum::String(&create_stmt.to_ast_string_redacted()),
1230 ]),
1231 diff,
1232 )]
1233 }
1234
1235 fn pack_materialized_view_update(
1236 &self,
1237 id: CatalogItemId,
1238 mview: &MaterializedView,
1239 diff: Diff,
1240 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1241 let mut updates = Vec::new();
1242
1243 if let Some(refresh_schedule) = &mview.refresh_schedule {
1244 assert!(!refresh_schedule.is_empty());
1247 for RefreshEvery {
1248 interval,
1249 aligned_to,
1250 } in refresh_schedule.everies.iter()
1251 {
1252 let aligned_to_dt = mz_ore::now::to_datetime(
1253 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1254 );
1255 updates.push(BuiltinTableUpdate::row(
1256 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1257 Row::pack_slice(&[
1258 Datum::String(&id.to_string()),
1259 Datum::String("every"),
1260 Datum::Interval(
1261 Interval::from_duration(interval).expect(
1262 "planning ensured that this is convertible back to Interval",
1263 ),
1264 ),
1265 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1266 Datum::Null,
1267 ]),
1268 diff,
1269 ));
1270 }
1271 for at in refresh_schedule.ats.iter() {
1272 let at_dt = mz_ore::now::to_datetime(
1273 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1274 );
1275 updates.push(BuiltinTableUpdate::row(
1276 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1277 Row::pack_slice(&[
1278 Datum::String(&id.to_string()),
1279 Datum::String("at"),
1280 Datum::Null,
1281 Datum::Null,
1282 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1283 ]),
1284 diff,
1285 ));
1286 }
1287 } else {
1288 updates.push(BuiltinTableUpdate::row(
1289 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1290 Row::pack_slice(&[
1291 Datum::String(&id.to_string()),
1292 Datum::String("on-commit"),
1293 Datum::Null,
1294 Datum::Null,
1295 Datum::Null,
1296 ]),
1297 diff,
1298 ));
1299 }
1300
1301 if let Some(target_id) = mview.replacement_target {
1302 updates.push(BuiltinTableUpdate::row(
1303 &*MZ_REPLACEMENTS,
1304 Row::pack_slice(&[
1305 Datum::String(&id.to_string()),
1306 Datum::String(&target_id.to_string()),
1307 ]),
1308 diff,
1309 ));
1310 }
1311
1312 updates
1313 }
1314
1315 fn pack_continual_task_update(
1316 &self,
1317 id: CatalogItemId,
1318 oid: u32,
1319 schema_id: &SchemaSpecifier,
1320 name: &str,
1321 owner_id: &RoleId,
1322 privileges: Datum,
1323 ct: &ContinualTask,
1324 diff: Diff,
1325 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1326 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1327 .unwrap_or_else(|e| {
1328 panic!(
1329 "create_sql cannot be invalid: `{}` --- error: `{}`",
1330 ct.create_sql, e
1331 )
1332 })
1333 .into_element()
1334 .ast;
1335 let query_string = match &create_stmt {
1336 Statement::CreateContinualTask(stmt) => {
1337 let mut query_string = String::new();
1338 for stmt in &stmt.stmts {
1339 let s = match stmt {
1340 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1341 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1342 };
1343 if query_string.is_empty() {
1344 query_string = s;
1345 } else {
1346 query_string.push_str("; ");
1347 query_string.push_str(&s);
1348 }
1349 }
1350 query_string
1351 }
1352 _ => unreachable!(),
1353 };
1354
1355 vec![BuiltinTableUpdate::row(
1356 &*MZ_CONTINUAL_TASKS,
1357 Row::pack_slice(&[
1358 Datum::String(&id.to_string()),
1359 Datum::UInt32(oid),
1360 Datum::String(&schema_id.to_string()),
1361 Datum::String(name),
1362 Datum::String(&ct.cluster_id.to_string()),
1363 Datum::String(&query_string),
1364 Datum::String(&owner_id.to_string()),
1365 privileges,
1366 Datum::String(&ct.create_sql),
1367 Datum::String(&create_stmt.to_ast_string_redacted()),
1368 ]),
1369 diff,
1370 )]
1371 }
1372
1373 fn pack_sink_update(
1374 &self,
1375 id: CatalogItemId,
1376 oid: u32,
1377 schema_id: &SchemaSpecifier,
1378 name: &str,
1379 owner_id: &RoleId,
1380 sink: &Sink,
1381 diff: Diff,
1382 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1383 let mut updates = vec![];
1384 match &sink.connection {
1385 StorageSinkConnection::Kafka(KafkaSinkConnection {
1386 topic: topic_name, ..
1387 }) => {
1388 updates.push(BuiltinTableUpdate::row(
1389 &*MZ_KAFKA_SINKS,
1390 Row::pack_slice(&[
1391 Datum::String(&id.to_string()),
1392 Datum::String(topic_name.as_str()),
1393 ]),
1394 diff,
1395 ));
1396 }
1397 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1398 namespace, table, ..
1399 }) => {
1400 updates.push(BuiltinTableUpdate::row(
1401 &*MZ_ICEBERG_SINKS,
1402 Row::pack_slice(&[
1403 Datum::String(&id.to_string()),
1404 Datum::String(namespace.as_str()),
1405 Datum::String(table.as_str()),
1406 ]),
1407 diff,
1408 ));
1409 }
1410 };
1411
1412 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1413 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1414 .into_element()
1415 .ast;
1416
1417 let envelope = sink.envelope();
1418
1419 let combined_format = sink.combined_format();
1421 let (key_format, value_format) = match sink.formats() {
1422 Some((key_format, value_format)) => (key_format, Some(value_format)),
1423 None => (None, None),
1424 };
1425
1426 updates.push(BuiltinTableUpdate::row(
1427 &*MZ_SINKS,
1428 Row::pack_slice(&[
1429 Datum::String(&id.to_string()),
1430 Datum::UInt32(oid),
1431 Datum::String(&schema_id.to_string()),
1432 Datum::String(name),
1433 Datum::String(sink.connection.name()),
1434 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1435 Datum::Null,
1437 Datum::from(envelope),
1438 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1441 Datum::from(key_format),
1442 Datum::from(value_format),
1443 Datum::String(&sink.cluster_id.to_string()),
1444 Datum::String(&owner_id.to_string()),
1445 Datum::String(&sink.create_sql),
1446 Datum::String(&create_stmt.to_ast_string_redacted()),
1447 ]),
1448 diff,
1449 ));
1450
1451 updates
1452 }
1453
1454 fn pack_index_update(
1455 &self,
1456 id: CatalogItemId,
1457 oid: u32,
1458 name: &str,
1459 owner_id: &RoleId,
1460 index: &Index,
1461 diff: Diff,
1462 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1463 let mut updates = vec![];
1464
1465 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1466 .unwrap_or_else(|e| {
1467 panic!(
1468 "create_sql cannot be invalid: `{}` --- error: `{}`",
1469 index.create_sql, e
1470 )
1471 })
1472 .into_element()
1473 .ast;
1474
1475 let key_sqls = match &create_stmt {
1476 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1477 .as_ref()
1478 .expect("key_parts is filled in during planning"),
1479 _ => unreachable!(),
1480 };
1481 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1482
1483 updates.push(BuiltinTableUpdate::row(
1484 &*MZ_INDEXES,
1485 Row::pack_slice(&[
1486 Datum::String(&id.to_string()),
1487 Datum::UInt32(oid),
1488 Datum::String(name),
1489 Datum::String(&on_item_id.to_string()),
1490 Datum::String(&index.cluster_id.to_string()),
1491 Datum::String(&owner_id.to_string()),
1492 Datum::String(&index.create_sql),
1493 Datum::String(&create_stmt.to_ast_string_redacted()),
1494 ]),
1495 diff,
1496 ));
1497
1498 let on_entry = self.get_entry_by_global_id(&index.on);
1499 let on_desc = on_entry
1500 .relation_desc()
1501 .expect("can only create indexes on items with a valid description");
1502 let repr_col_types: Vec<ReprColumnType> = on_desc
1503 .typ()
1504 .column_types
1505 .iter()
1506 .map(ReprColumnType::from)
1507 .collect();
1508 for (i, key) in index.keys.iter().enumerate() {
1509 let nullable = key.typ(&repr_col_types).nullable;
1510 let seq_in_index = u64::cast_from(i + 1);
1511 let key_sql = key_sqls
1512 .get(i)
1513 .expect("missing sql information for index key")
1514 .to_ast_string_simple();
1515 let (field_number, expression) = match key {
1516 MirScalarExpr::Column(col, _) => {
1517 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1518 }
1519 _ => (Datum::Null, Datum::String(&key_sql)),
1520 };
1521 updates.push(BuiltinTableUpdate::row(
1522 &*MZ_INDEX_COLUMNS,
1523 Row::pack_slice(&[
1524 Datum::String(&id.to_string()),
1525 Datum::UInt64(seq_in_index),
1526 field_number,
1527 expression,
1528 Datum::from(nullable),
1529 ]),
1530 diff,
1531 ));
1532 }
1533
1534 updates
1535 }
1536
1537 fn pack_type_update(
1538 &self,
1539 id: CatalogItemId,
1540 oid: u32,
1541 schema_id: &SchemaSpecifier,
1542 name: &str,
1543 owner_id: &RoleId,
1544 privileges: Datum,
1545 typ: &Type,
1546 diff: Diff,
1547 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1548 let mut out = vec![];
1549
1550 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1551 mz_sql::parse::parse(create_sql)
1552 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1553 .into_element()
1554 .ast
1555 .to_ast_string_redacted()
1556 });
1557
1558 out.push(BuiltinTableUpdate::row(
1559 &*MZ_TYPES,
1560 Row::pack_slice(&[
1561 Datum::String(&id.to_string()),
1562 Datum::UInt32(oid),
1563 Datum::String(&schema_id.to_string()),
1564 Datum::String(name),
1565 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1566 Datum::String(&owner_id.to_string()),
1567 privileges,
1568 if let Some(create_sql) = &typ.create_sql {
1569 Datum::String(create_sql)
1570 } else {
1571 Datum::Null
1572 },
1573 if let Some(redacted) = &redacted {
1574 Datum::String(redacted)
1575 } else {
1576 Datum::Null
1577 },
1578 ]),
1579 diff,
1580 ));
1581
1582 let mut row = Row::default();
1583 let mut packer = row.packer();
1584
1585 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1586 if mods.is_empty() {
1587 packer.push(Datum::Null);
1588 } else {
1589 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1590 }
1591 }
1592
1593 let index_id = match &typ.details.typ {
1594 CatalogType::Array {
1595 element_reference: element_id,
1596 } => {
1597 packer.push(Datum::String(&id.to_string()));
1598 packer.push(Datum::String(&element_id.to_string()));
1599 &MZ_ARRAY_TYPES
1600 }
1601 CatalogType::List {
1602 element_reference: element_id,
1603 element_modifiers,
1604 } => {
1605 packer.push(Datum::String(&id.to_string()));
1606 packer.push(Datum::String(&element_id.to_string()));
1607 append_modifier(&mut packer, element_modifiers);
1608 &MZ_LIST_TYPES
1609 }
1610 CatalogType::Map {
1611 key_reference: key_id,
1612 value_reference: value_id,
1613 key_modifiers,
1614 value_modifiers,
1615 } => {
1616 packer.push(Datum::String(&id.to_string()));
1617 packer.push(Datum::String(&key_id.to_string()));
1618 packer.push(Datum::String(&value_id.to_string()));
1619 append_modifier(&mut packer, key_modifiers);
1620 append_modifier(&mut packer, value_modifiers);
1621 &MZ_MAP_TYPES
1622 }
1623 CatalogType::Pseudo => {
1624 packer.push(Datum::String(&id.to_string()));
1625 &MZ_PSEUDO_TYPES
1626 }
1627 _ => {
1628 packer.push(Datum::String(&id.to_string()));
1629 &MZ_BASE_TYPES
1630 }
1631 };
1632 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1633
1634 if let Some(pg_metadata) = &typ.details.pg_metadata {
1635 out.push(BuiltinTableUpdate::row(
1636 &*MZ_TYPE_PG_METADATA,
1637 Row::pack_slice(&[
1638 Datum::String(&id.to_string()),
1639 Datum::UInt32(pg_metadata.typinput_oid),
1640 Datum::UInt32(pg_metadata.typreceive_oid),
1641 ]),
1642 diff,
1643 ));
1644 }
1645
1646 out
1647 }
1648
1649 fn pack_func_update(
1650 &self,
1651 id: CatalogItemId,
1652 schema_id: &SchemaSpecifier,
1653 name: &str,
1654 owner_id: &RoleId,
1655 func: &Func,
1656 diff: Diff,
1657 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1658 let mut updates = vec![];
1659 for func_impl_details in func.inner.func_impls() {
1660 let arg_type_ids = func_impl_details
1661 .arg_typs
1662 .iter()
1663 .map(|typ| self.get_system_type(typ).id().to_string())
1664 .collect::<Vec<_>>();
1665
1666 let mut row = Row::default();
1667 row.packer()
1668 .try_push_array(
1669 &[ArrayDimension {
1670 lower_bound: 1,
1671 length: arg_type_ids.len(),
1672 }],
1673 arg_type_ids.iter().map(|id| Datum::String(id)),
1674 )
1675 .expect(
1676 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1677 );
1678 let arg_type_ids = row.unpack_first();
1679
1680 updates.push(BuiltinTableUpdate::row(
1681 &*MZ_FUNCTIONS,
1682 Row::pack_slice(&[
1683 Datum::String(&id.to_string()),
1684 Datum::UInt32(func_impl_details.oid),
1685 Datum::String(&schema_id.to_string()),
1686 Datum::String(name),
1687 arg_type_ids,
1688 Datum::from(
1689 func_impl_details
1690 .variadic_typ
1691 .map(|typ| self.get_system_type(typ).id().to_string())
1692 .as_deref(),
1693 ),
1694 Datum::from(
1695 func_impl_details
1696 .return_typ
1697 .map(|typ| self.get_system_type(typ).id().to_string())
1698 .as_deref(),
1699 ),
1700 func_impl_details.return_is_set.into(),
1701 Datum::String(&owner_id.to_string()),
1702 ]),
1703 diff,
1704 ));
1705
1706 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1707 updates.push(BuiltinTableUpdate::row(
1708 &*MZ_AGGREGATES,
1709 Row::pack_slice(&[
1710 Datum::UInt32(func_impl_details.oid),
1711 Datum::String("n"),
1713 Datum::Int16(0),
1714 ]),
1715 diff,
1716 ));
1717 }
1718 }
1719 updates
1720 }
1721
1722 pub fn pack_op_update(
1723 &self,
1724 operator: &str,
1725 func_impl_details: FuncImplCatalogDetails,
1726 diff: Diff,
1727 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1728 let arg_type_ids = func_impl_details
1729 .arg_typs
1730 .iter()
1731 .map(|typ| self.get_system_type(typ).id().to_string())
1732 .collect::<Vec<_>>();
1733
1734 let mut row = Row::default();
1735 row.packer()
1736 .try_push_array(
1737 &[ArrayDimension {
1738 lower_bound: 1,
1739 length: arg_type_ids.len(),
1740 }],
1741 arg_type_ids.iter().map(|id| Datum::String(id)),
1742 )
1743 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1744 let arg_type_ids = row.unpack_first();
1745
1746 BuiltinTableUpdate::row(
1747 &*MZ_OPERATORS,
1748 Row::pack_slice(&[
1749 Datum::UInt32(func_impl_details.oid),
1750 Datum::String(operator),
1751 arg_type_ids,
1752 Datum::from(
1753 func_impl_details
1754 .return_typ
1755 .map(|typ| self.get_system_type(typ).id().to_string())
1756 .as_deref(),
1757 ),
1758 ]),
1759 diff,
1760 )
1761 }
1762
1763 pub fn pack_audit_log_update(
1764 &self,
1765 event: &VersionedEvent,
1766 diff: Diff,
1767 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1768 let (event_type, object_type, details, user, occurred_at): (
1769 &EventType,
1770 &ObjectType,
1771 &EventDetails,
1772 &Option<String>,
1773 u64,
1774 ) = match event {
1775 VersionedEvent::V1(ev) => (
1776 &ev.event_type,
1777 &ev.object_type,
1778 &ev.details,
1779 &ev.user,
1780 ev.occurred_at,
1781 ),
1782 };
1783 let details = Jsonb::from_serde_json(details.as_json())
1784 .map_err(|e| {
1785 Error::new(ErrorKind::Unstructured(format!(
1786 "could not pack audit log update: {}",
1787 e
1788 )))
1789 })?
1790 .into_row();
1791 let details = details
1792 .iter()
1793 .next()
1794 .expect("details created above with a single jsonb column");
1795 let dt = mz_ore::now::to_datetime(occurred_at);
1796 let id = event.sortable_id();
1797 Ok(BuiltinTableUpdate::row(
1798 &*MZ_AUDIT_EVENTS,
1799 Row::pack_slice(&[
1800 Datum::UInt64(id),
1801 Datum::String(&format!("{}", event_type)),
1802 Datum::String(&format!("{}", object_type)),
1803 details,
1804 match user {
1805 Some(user) => Datum::String(user),
1806 None => Datum::Null,
1807 },
1808 Datum::TimestampTz(dt.try_into().expect("must fit")),
1809 ]),
1810 diff,
1811 ))
1812 }
1813
1814 pub fn pack_storage_usage_update(
1815 &self,
1816 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1817 diff: Diff,
1818 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1819 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1820 let row = Row::pack_slice(&[
1821 Datum::UInt64(event.id),
1822 Datum::from(event.shard_id.as_deref()),
1823 Datum::UInt64(event.size_bytes),
1824 Datum::TimestampTz(
1825 mz_ore::now::to_datetime(event.collection_timestamp)
1826 .try_into()
1827 .expect("must fit"),
1828 ),
1829 ]);
1830 BuiltinTableUpdate::row(id, row, diff)
1831 }
1832
1833 pub fn pack_egress_ip_update(
1834 &self,
1835 ip: &IpNet,
1836 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1837 let id = &MZ_EGRESS_IPS;
1838 let addr = ip.addr();
1839 let row = Row::pack_slice(&[
1840 Datum::String(&addr.to_string()),
1841 Datum::Int32(ip.prefix_len().into()),
1842 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1843 ]);
1844 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1845 }
1846
1847 pub fn pack_license_key_update(
1848 &self,
1849 license_key: &ValidatedLicenseKey,
1850 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1851 let id = &MZ_LICENSE_KEYS;
1852 let row = Row::pack_slice(&[
1853 Datum::String(&license_key.id),
1854 Datum::String(&license_key.organization),
1855 Datum::String(&license_key.environment_id),
1856 Datum::TimestampTz(
1857 mz_ore::now::to_datetime(license_key.expiration * 1000)
1858 .try_into()
1859 .expect("must fit"),
1860 ),
1861 Datum::TimestampTz(
1862 mz_ore::now::to_datetime(license_key.not_before * 1000)
1863 .try_into()
1864 .expect("must fit"),
1865 ),
1866 ]);
1867 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1868 }
1869
1870 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1871 let mut updates = Vec::new();
1872 for (size, alloc) in &self.cluster_replica_sizes.0 {
1873 if alloc.disabled {
1874 continue;
1875 }
1876
1877 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1880 let MemoryLimit(ByteSize(memory_bytes)) =
1881 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1882 let DiskLimit(ByteSize(disk_bytes)) =
1883 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1884
1885 let row = Row::pack_slice(&[
1886 size.as_str().into(),
1887 u64::cast_from(alloc.scale).into(),
1888 u64::cast_from(alloc.workers).into(),
1889 cpu_limit.as_nanocpus().into(),
1890 memory_bytes.into(),
1891 disk_bytes.into(),
1892 (alloc.credits_per_hour).into(),
1893 ]);
1894
1895 updates.push(BuiltinTableUpdate::row(
1896 &*MZ_CLUSTER_REPLICA_SIZES,
1897 row,
1898 Diff::ONE,
1899 ));
1900 }
1901
1902 updates
1903 }
1904
1905 pub fn pack_subscribe_update(
1906 &self,
1907 id: GlobalId,
1908 subscribe: &ActiveSubscribe,
1909 diff: Diff,
1910 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1911 let mut row = Row::default();
1912 let mut packer = row.packer();
1913 packer.push(Datum::String(&id.to_string()));
1914 packer.push(Datum::Uuid(subscribe.session_uuid));
1915 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1916
1917 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1918 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1919
1920 let depends_on: Vec<_> = subscribe
1921 .depends_on
1922 .iter()
1923 .map(|id| id.to_string())
1924 .collect();
1925 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1926
1927 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1928 }
1929
1930 pub fn pack_session_update(
1931 &self,
1932 conn: &ConnMeta,
1933 diff: Diff,
1934 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1935 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1936 BuiltinTableUpdate::row(
1937 &*MZ_SESSIONS,
1938 Row::pack_slice(&[
1939 Datum::Uuid(conn.uuid()),
1940 Datum::UInt32(conn.conn_id().unhandled()),
1941 Datum::String(&conn.authenticated_role_id().to_string()),
1942 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1943 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1944 ]),
1945 diff,
1946 )
1947 }
1948
1949 pub fn pack_default_privileges_update(
1950 &self,
1951 default_privilege_object: &DefaultPrivilegeObject,
1952 grantee: &RoleId,
1953 acl_mode: &AclMode,
1954 diff: Diff,
1955 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1956 BuiltinTableUpdate::row(
1957 &*MZ_DEFAULT_PRIVILEGES,
1958 Row::pack_slice(&[
1959 default_privilege_object.role_id.to_string().as_str().into(),
1960 default_privilege_object
1961 .database_id
1962 .map(|database_id| database_id.to_string())
1963 .as_deref()
1964 .into(),
1965 default_privilege_object
1966 .schema_id
1967 .map(|schema_id| schema_id.to_string())
1968 .as_deref()
1969 .into(),
1970 default_privilege_object
1971 .object_type
1972 .to_string()
1973 .to_lowercase()
1974 .as_str()
1975 .into(),
1976 grantee.to_string().as_str().into(),
1977 acl_mode.to_string().as_str().into(),
1978 ]),
1979 diff,
1980 )
1981 }
1982
1983 pub fn pack_system_privileges_update(
1984 &self,
1985 privileges: MzAclItem,
1986 diff: Diff,
1987 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1988 BuiltinTableUpdate::row(
1989 &*MZ_SYSTEM_PRIVILEGES,
1990 Row::pack_slice(&[privileges.into()]),
1991 diff,
1992 )
1993 }
1994
1995 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1996 let mut row = Row::default();
1997 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1998 row.packer()
1999 .try_push_array(
2000 &[ArrayDimension {
2001 lower_bound: 1,
2002 length: flat_privileges.len(),
2003 }],
2004 flat_privileges
2005 .into_iter()
2006 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2007 )
2008 .expect("privileges is 1 dimensional, and its length is used for the array length");
2009 row
2010 }
2011
2012 pub fn pack_comment_update(
2013 &self,
2014 object_id: CommentObjectId,
2015 column_pos: Option<usize>,
2016 comment: &str,
2017 diff: Diff,
2018 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2019 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2021 let audit_type = super::object_type_to_audit_object_type(object_type);
2022 let object_type_str = audit_type.to_string();
2023
2024 let object_id_str = match object_id {
2025 CommentObjectId::Table(global_id)
2026 | CommentObjectId::View(global_id)
2027 | CommentObjectId::MaterializedView(global_id)
2028 | CommentObjectId::Source(global_id)
2029 | CommentObjectId::Sink(global_id)
2030 | CommentObjectId::Index(global_id)
2031 | CommentObjectId::Func(global_id)
2032 | CommentObjectId::Connection(global_id)
2033 | CommentObjectId::Secret(global_id)
2034 | CommentObjectId::Type(global_id)
2035 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2036 CommentObjectId::Role(role_id) => role_id.to_string(),
2037 CommentObjectId::Database(database_id) => database_id.to_string(),
2038 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2039 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2040 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2041 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2042 };
2043 let column_pos_datum = match column_pos {
2044 Some(pos) => {
2045 let pos =
2047 i32::try_from(pos).expect("we constrain this value in the planning layer");
2048 Datum::Int32(pos)
2049 }
2050 None => Datum::Null,
2051 };
2052
2053 BuiltinTableUpdate::row(
2054 &*MZ_COMMENTS,
2055 Row::pack_slice(&[
2056 Datum::String(&object_id_str),
2057 Datum::String(&object_type_str),
2058 column_pos_datum,
2059 Datum::String(comment),
2060 ]),
2061 diff,
2062 )
2063 }
2064
2065 pub fn pack_webhook_source_update(
2066 &self,
2067 item_id: CatalogItemId,
2068 diff: Diff,
2069 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2070 let url = self
2071 .try_get_webhook_url(&item_id)
2072 .expect("webhook source should exist");
2073 let url = url.to_string();
2074 let name = &self.get_entry(&item_id).name().item;
2075 let id_str = item_id.to_string();
2076
2077 BuiltinTableUpdate::row(
2078 &*MZ_WEBHOOKS_SOURCES,
2079 Row::pack_slice(&[
2080 Datum::String(&id_str),
2081 Datum::String(name),
2082 Datum::String(&url),
2083 ]),
2084 diff,
2085 )
2086 }
2087
2088 pub fn pack_source_references_update(
2089 &self,
2090 source_references: &SourceReferences,
2091 diff: Diff,
2092 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2093 let source_id = source_references.source_id.to_string();
2094 let updated_at = &source_references.updated_at;
2095 source_references
2096 .references
2097 .iter()
2098 .map(|reference| {
2099 let mut row = Row::default();
2100 let mut packer = row.packer();
2101 packer.extend([
2102 Datum::String(&source_id),
2103 reference
2104 .namespace
2105 .as_ref()
2106 .map(|s| Datum::String(s))
2107 .unwrap_or(Datum::Null),
2108 Datum::String(&reference.name),
2109 Datum::TimestampTz(
2110 mz_ore::now::to_datetime(*updated_at)
2111 .try_into()
2112 .expect("must fit"),
2113 ),
2114 ]);
2115 if reference.columns.len() > 0 {
2116 packer
2117 .try_push_array(
2118 &[ArrayDimension {
2119 lower_bound: 1,
2120 length: reference.columns.len(),
2121 }],
2122 reference.columns.iter().map(|col| Datum::String(col)),
2123 )
2124 .expect(
2125 "columns is 1 dimensional, and its length is used for the array length",
2126 );
2127 } else {
2128 packer.push(Datum::Null);
2129 }
2130
2131 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2132 })
2133 .collect()
2134 }
2135}