1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20 MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
21 MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
22 MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
23 MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
24 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
25 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
26 MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
27 MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
28 MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
29 MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
30 MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
31 MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
32 MZ_WEBHOOKS_SOURCES,
33};
34use mz_catalog::config::AwsPrincipalContext;
35use mz_catalog::durable::SourceReferences;
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38 CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
39 Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
40};
41use mz_controller::clusters::{
42 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
43};
44use mz_controller_types::ClusterId;
45use mz_expr::MirScalarExpr;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_persist_client::batch::ProtoBatch;
51use mz_repr::adt::array::ArrayDimension;
52use mz_repr::adt::interval::Interval;
53use mz_repr::adt::jsonb::Jsonb;
54use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
55use mz_repr::adt::regex;
56use mz_repr::network_policy_id::NetworkPolicyId;
57use mz_repr::refresh_schedule::RefreshEvery;
58use mz_repr::role_id::RoleId;
59use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
60use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
61use mz_sql::catalog::{
62 CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
63 TypeCategory,
64};
65use mz_sql::func::FuncImplCatalogDetails;
66use mz_sql::names::{
67 CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
68};
69use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
70use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
71use mz_sql::session::vars::SessionVars;
72use mz_sql_parser::ast::display::AstDisplay;
73use mz_storage_client::client::TableData;
74use mz_storage_types::connections::KafkaConnection;
75use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
76use mz_storage_types::connections::inline::ReferencedConnection;
77use mz_storage_types::connections::string_or_secret::StringOrSecret;
78use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
79use mz_storage_types::sources::{
80 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
81};
82use smallvec::smallvec;
83
84use crate::active_compute_sink::ActiveSubscribe;
86use crate::catalog::CatalogState;
87use crate::coord::ConnMeta;
88
89#[derive(Debug, Clone)]
91pub struct BuiltinTableUpdate<T = CatalogItemId> {
92 pub id: T,
94 pub data: TableData,
96}
97
98impl<T> BuiltinTableUpdate<T> {
99 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
101 BuiltinTableUpdate {
102 id,
103 data: TableData::Rows(vec![(row, diff)]),
104 }
105 }
106
107 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
108 BuiltinTableUpdate {
109 id,
110 data: TableData::Batches(smallvec![batch]),
111 }
112 }
113}
114
115impl CatalogState {
116 pub fn resolve_builtin_table_updates(
117 &self,
118 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
119 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
120 builtin_table_update
121 .into_iter()
122 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
123 .collect()
124 }
125
126 pub fn resolve_builtin_table_update(
127 &self,
128 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
129 ) -> BuiltinTableUpdate<CatalogItemId> {
130 let id = self.resolve_builtin_table(id);
131 BuiltinTableUpdate { id, data }
132 }
133
134 pub fn pack_depends_update(
135 &self,
136 depender: CatalogItemId,
137 dependee: CatalogItemId,
138 diff: Diff,
139 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
140 let row = Row::pack_slice(&[
141 Datum::String(&depender.to_string()),
142 Datum::String(&dependee.to_string()),
143 ]);
144 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
145 }
146
147 pub(super) fn pack_database_update(
148 &self,
149 database_id: &DatabaseId,
150 diff: Diff,
151 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
152 let database = self.get_database(database_id);
153 let row = self.pack_privilege_array_row(database.privileges());
154 let privileges = row.unpack_first();
155 BuiltinTableUpdate::row(
156 &*MZ_DATABASES,
157 Row::pack_slice(&[
158 Datum::String(&database.id.to_string()),
159 Datum::UInt32(database.oid),
160 Datum::String(database.name()),
161 Datum::String(&database.owner_id.to_string()),
162 privileges,
163 ]),
164 diff,
165 )
166 }
167
168 pub(super) fn pack_schema_update(
169 &self,
170 database_spec: &ResolvedDatabaseSpecifier,
171 schema_id: &SchemaId,
172 diff: Diff,
173 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
174 let (database_id, schema) = match database_spec {
175 ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
176 ResolvedDatabaseSpecifier::Id(id) => (
177 Some(id.to_string()),
178 &self.database_by_id[id].schemas_by_id[schema_id],
179 ),
180 };
181 let row = self.pack_privilege_array_row(schema.privileges());
182 let privileges = row.unpack_first();
183 BuiltinTableUpdate::row(
184 &*MZ_SCHEMAS,
185 Row::pack_slice(&[
186 Datum::String(&schema_id.to_string()),
187 Datum::UInt32(schema.oid),
188 Datum::from(database_id.as_deref()),
189 Datum::String(&schema.name.schema),
190 Datum::String(&schema.owner_id.to_string()),
191 privileges,
192 ]),
193 diff,
194 )
195 }
196
197 pub(super) fn pack_role_auth_update(
198 &self,
199 id: RoleId,
200 diff: Diff,
201 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
202 let role_auth = self.get_role_auth(&id);
203 let role = self.get_role(&id);
204 BuiltinTableUpdate::row(
205 &*MZ_ROLE_AUTH,
206 Row::pack_slice(&[
207 Datum::String(&role_auth.role_id.to_string()),
208 Datum::UInt32(role.oid),
209 match &role_auth.password_hash {
210 Some(hash) => Datum::String(hash),
211 None => Datum::Null,
212 },
213 Datum::TimestampTz(
214 mz_ore::now::to_datetime(role_auth.updated_at)
215 .try_into()
216 .expect("must fit"),
217 ),
218 ]),
219 diff,
220 )
221 }
222
223 pub(super) fn pack_role_update(
224 &self,
225 id: RoleId,
226 diff: Diff,
227 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
228 match id {
229 RoleId::Public => vec![],
231 id => {
232 let role = self.get_role(&id);
233 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
234
235 let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
254 let matches_cloud_login_heuristic = regex::Regex::new(cloud_login_regex, true)
255 .expect("valid regex")
256 .is_match(&role.name);
257
258 let rolcanlogin = if let Some(login) = role.attributes.login {
259 login
260 } else {
261 builtin_supers.contains(&role.id) || matches_cloud_login_heuristic
268 };
269
270 let rolsuper = if let Some(superuser) = role.attributes.superuser {
271 Datum::from(superuser)
272 } else if let Some(_) = role.attributes.login {
273 Datum::from(false)
277 } else if builtin_supers.contains(&role.id) {
278 Datum::from(true)
280 } else {
281 Datum::Null
290 };
291
292 let role_update = BuiltinTableUpdate::row(
293 &*MZ_ROLES,
294 Row::pack_slice(&[
295 Datum::String(&role.id.to_string()),
296 Datum::UInt32(role.oid),
297 Datum::String(&role.name),
298 Datum::from(role.attributes.inherit),
299 Datum::from(rolcanlogin),
300 rolsuper,
301 ]),
302 diff,
303 );
304 let mut updates = vec![role_update];
305
306 let session_vars_reference = SessionVars::new_unchecked(
309 &mz_build_info::DUMMY_BUILD_INFO,
310 SYSTEM_USER.clone(),
311 None,
312 );
313
314 for (name, val) in role.vars() {
315 let result = session_vars_reference
316 .inspect(name)
317 .and_then(|var| var.check(val.borrow()));
318 let Ok(formatted_val) = result else {
319 tracing::error!(?name, ?val, ?result, "found invalid role default var");
322 continue;
323 };
324
325 let role_var_update = BuiltinTableUpdate::row(
326 &*MZ_ROLE_PARAMETERS,
327 Row::pack_slice(&[
328 Datum::String(&role.id.to_string()),
329 Datum::String(name),
330 Datum::String(&formatted_val),
331 ]),
332 diff,
333 );
334 updates.push(role_var_update);
335 }
336
337 updates
338 }
339 }
340 }
341
342 pub(super) fn pack_role_members_update(
343 &self,
344 role_id: RoleId,
345 member_id: RoleId,
346 diff: Diff,
347 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
348 let grantor_id = self
349 .get_role(&member_id)
350 .membership
351 .map
352 .get(&role_id)
353 .expect("catalog out of sync");
354 BuiltinTableUpdate::row(
355 &*MZ_ROLE_MEMBERS,
356 Row::pack_slice(&[
357 Datum::String(&role_id.to_string()),
358 Datum::String(&member_id.to_string()),
359 Datum::String(&grantor_id.to_string()),
360 ]),
361 diff,
362 )
363 }
364
365 pub(super) fn pack_cluster_update(
366 &self,
367 name: &str,
368 diff: Diff,
369 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
370 let id = self.clusters_by_name[name];
371 let cluster = &self.clusters_by_id[&id];
372 let row = self.pack_privilege_array_row(cluster.privileges());
373 let privileges = row.unpack_first();
374 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
375 match &cluster.config.variant {
376 ClusterVariant::Managed(config) => (
377 Some(config.size.as_str()),
378 Some(self.cluster_replica_size_has_disk(&config.size)),
379 Some(config.replication_factor),
380 if config.availability_zones.is_empty() {
381 None
382 } else {
383 Some(config.availability_zones.clone())
384 },
385 Some(config.logging.log_logging),
386 config.logging.interval.map(|d| {
387 Interval::from_duration(&d)
388 .expect("planning ensured this convertible back to interval")
389 }),
390 ),
391 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
392 };
393
394 let mut row = Row::default();
395 let mut packer = row.packer();
396 packer.extend([
397 Datum::String(&id.to_string()),
398 Datum::String(name),
399 Datum::String(&cluster.owner_id.to_string()),
400 privileges,
401 cluster.is_managed().into(),
402 size.into(),
403 replication_factor.into(),
404 disk.into(),
405 ]);
406 if let Some(azs) = azs {
407 packer.push_list(azs.iter().map(|az| Datum::String(az)));
408 } else {
409 packer.push(Datum::Null);
410 }
411 packer.push(Datum::from(introspection_debugging));
412 packer.push(Datum::from(introspection_interval));
413
414 let mut updates = Vec::new();
415
416 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
417
418 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
419 let row = match managed_config.schedule {
420 ClusterSchedule::Manual => Row::pack_slice(&[
421 Datum::String(&id.to_string()),
422 Datum::String("manual"),
423 Datum::Null,
424 ]),
425 ClusterSchedule::Refresh {
426 hydration_time_estimate,
427 } => Row::pack_slice(&[
428 Datum::String(&id.to_string()),
429 Datum::String("on-refresh"),
430 Datum::Interval(
431 Interval::from_duration(&hydration_time_estimate)
432 .expect("planning ensured that this is convertible back to Interval"),
433 ),
434 ]),
435 };
436 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
437 }
438
439 updates.push(BuiltinTableUpdate::row(
440 &*MZ_CLUSTER_WORKLOAD_CLASSES,
441 Row::pack_slice(&[
442 Datum::String(&id.to_string()),
443 Datum::from(cluster.config.workload_class.as_deref()),
444 ]),
445 diff,
446 ));
447
448 updates
449 }
450
451 pub(super) fn pack_cluster_replica_update(
452 &self,
453 cluster_id: ClusterId,
454 name: &str,
455 diff: Diff,
456 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
457 let cluster = &self.clusters_by_id[&cluster_id];
458 let id = cluster.replica_id(name).expect("Must exist");
459 let replica = cluster.replica(id).expect("Must exist");
460
461 let (size, disk, az, internal, pending) = match &replica.config.location {
462 ReplicaLocation::Managed(ManagedReplicaLocation {
465 size,
466 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
467 allocation: _,
468 billed_as: _,
469 internal,
470 pending,
471 }) => (
472 Some(&**size),
473 Some(self.cluster_replica_size_has_disk(size)),
474 Some(az.as_str()),
475 *internal,
476 *pending,
477 ),
478 ReplicaLocation::Managed(ManagedReplicaLocation {
479 size,
480 availability_zones: _,
481 allocation: _,
482 billed_as: _,
483 internal,
484 pending,
485 }) => (
486 Some(&**size),
487 Some(self.cluster_replica_size_has_disk(size)),
488 None,
489 *internal,
490 *pending,
491 ),
492 _ => (None, None, None, false, false),
493 };
494
495 let cluster_replica_update = BuiltinTableUpdate::row(
496 &*MZ_CLUSTER_REPLICAS,
497 Row::pack_slice(&[
498 Datum::String(&id.to_string()),
499 Datum::String(name),
500 Datum::String(&cluster_id.to_string()),
501 Datum::from(size),
502 Datum::from(az),
503 Datum::String(&replica.owner_id.to_string()),
504 Datum::from(disk),
505 ]),
506 diff,
507 );
508
509 let mut updates = vec![cluster_replica_update];
510
511 if internal {
512 let update = BuiltinTableUpdate::row(
513 &*MZ_INTERNAL_CLUSTER_REPLICAS,
514 Row::pack_slice(&[Datum::String(&id.to_string())]),
515 diff,
516 );
517 updates.push(update);
518 }
519
520 if pending {
521 let update = BuiltinTableUpdate::row(
522 &*MZ_PENDING_CLUSTER_REPLICAS,
523 Row::pack_slice(&[Datum::String(&id.to_string())]),
524 diff,
525 );
526 updates.push(update);
527 }
528
529 updates
530 }
531
532 pub(crate) fn pack_network_policy_update(
533 &self,
534 policy_id: &NetworkPolicyId,
535 diff: Diff,
536 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
537 let policy = self.get_network_policy(policy_id);
538 let row = self.pack_privilege_array_row(&policy.privileges);
539 let privileges = row.unpack_first();
540 let mut updates = Vec::new();
541 for ref rule in policy.rules.clone() {
542 updates.push(BuiltinTableUpdate::row(
543 &*MZ_NETWORK_POLICY_RULES,
544 Row::pack_slice(&[
545 Datum::String(&rule.name),
546 Datum::String(&policy.id.to_string()),
547 Datum::String(&rule.action.to_string()),
548 Datum::String(&rule.address.to_string()),
549 Datum::String(&rule.direction.to_string()),
550 ]),
551 diff,
552 ));
553 }
554 updates.push(BuiltinTableUpdate::row(
555 &*MZ_NETWORK_POLICIES,
556 Row::pack_slice(&[
557 Datum::String(&policy.id.to_string()),
558 Datum::String(&policy.name),
559 Datum::String(&policy.owner_id.to_string()),
560 privileges,
561 Datum::UInt32(policy.oid.clone()),
562 ]),
563 diff,
564 ));
565
566 Ok(updates)
567 }
568
569 pub(super) fn pack_item_update(
570 &self,
571 id: CatalogItemId,
572 diff: Diff,
573 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
574 let entry = self.get_entry(&id);
575 let oid = entry.oid();
576 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
577 let schema_id = &self
578 .get_schema(
579 &entry.name().qualifiers.database_spec,
580 &entry.name().qualifiers.schema_spec,
581 conn_id,
582 )
583 .id;
584 let name = &entry.name().item;
585 let owner_id = entry.owner_id();
586 let privileges_row = self.pack_privilege_array_row(entry.privileges());
587 let privileges = privileges_row.unpack_first();
588 let mut updates = match entry.item() {
589 CatalogItem::Log(_) => self.pack_source_update(
590 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
591 privileges, diff, None,
592 ),
593 CatalogItem::Index(index) => {
594 self.pack_index_update(id, oid, name, owner_id, index, diff)
595 }
596 CatalogItem::Table(table) => {
597 let mut updates = self
598 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
599
600 if let TableDataSource::DataSource {
601 desc: data_source,
602 timeline: _,
603 } = &table.data_source
604 {
605 updates.extend(match data_source {
606 DataSourceDesc::IngestionExport {
607 ingestion_id,
608 external_reference: UnresolvedItemName(external_reference),
609 details: _,
610 data_config: _,
611 } => {
612 let ingestion_entry = self
613 .get_entry(ingestion_id)
614 .source_desc()
615 .expect("primary source exists")
616 .expect("primary source is a source");
617
618 match ingestion_entry.connection.name() {
619 "postgres" => {
620 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
621 let schema_name = external_reference[1].to_ast_string_simple();
627 let table_name = external_reference[2].to_ast_string_simple();
628
629 self.pack_postgres_source_tables_update(
630 id,
631 &schema_name,
632 &table_name,
633 diff,
634 )
635 }
636 "mysql" => {
637 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
638 let schema_name = external_reference[0].to_ast_string_simple();
639 let table_name = external_reference[1].to_ast_string_simple();
640
641 self.pack_mysql_source_tables_update(
642 id,
643 &schema_name,
644 &table_name,
645 diff,
646 )
647 }
648 "sql-server" => {
649 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
650 let schema_name = external_reference[1].to_ast_string_simple();
655 let table_name = external_reference[2].to_ast_string_simple();
656
657 self.pack_sql_server_source_table_update(
658 id,
659 &schema_name,
660 &table_name,
661 diff,
662 )
663 }
664 "load-generator" => vec![],
667 "kafka" => {
668 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
669 let topic = external_reference[0].to_ast_string_simple();
670 let envelope = data_source.envelope();
671 let (key_format, value_format) = data_source.formats();
672
673 self.pack_kafka_source_tables_update(
674 id,
675 &topic,
676 envelope,
677 key_format,
678 value_format,
679 diff,
680 )
681 }
682 s => unreachable!("{s} sources do not have tables"),
683 }
684 }
685 _ => vec![],
686 });
687 }
688
689 updates
690 }
691 CatalogItem::Source(source) => {
692 let source_type = source.source_type();
693 let connection_id = source.connection_id();
694 let envelope = source.data_source.envelope();
695 let cluster_entry = match source.data_source {
696 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
699 self.get_entry(&ingestion_id)
700 }
701 _ => entry,
702 };
703
704 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
705
706 let (key_format, value_format) = source.data_source.formats();
707
708 let mut updates = self.pack_source_update(
709 id,
710 oid,
711 schema_id,
712 name,
713 source_type,
714 connection_id,
715 envelope,
716 key_format,
717 value_format,
718 cluster_id.as_deref(),
719 owner_id,
720 privileges,
721 diff,
722 source.create_sql.as_ref(),
723 );
724
725 updates.extend(match &source.data_source {
726 DataSourceDesc::Ingestion { desc, .. }
727 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
728 GenericSourceConnection::Postgres(postgres) => {
729 self.pack_postgres_source_update(id, postgres, diff)
730 }
731 GenericSourceConnection::Kafka(kafka) => {
732 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
733 }
734 _ => vec![],
735 },
736 DataSourceDesc::IngestionExport {
737 ingestion_id,
738 external_reference: UnresolvedItemName(external_reference),
739 details: _,
740 data_config: _,
741 } => {
742 let ingestion_entry = self
743 .get_entry(ingestion_id)
744 .source_desc()
745 .expect("primary source exists")
746 .expect("primary source is a source");
747
748 match ingestion_entry.connection.name() {
749 "postgres" => {
750 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
751 let schema_name = external_reference[1].to_ast_string_simple();
757 let table_name = external_reference[2].to_ast_string_simple();
758
759 self.pack_postgres_source_tables_update(
760 id,
761 &schema_name,
762 &table_name,
763 diff,
764 )
765 }
766 "mysql" => {
767 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
768 let schema_name = external_reference[0].to_ast_string_simple();
769 let table_name = external_reference[1].to_ast_string_simple();
770
771 self.pack_mysql_source_tables_update(
772 id,
773 &schema_name,
774 &table_name,
775 diff,
776 )
777 }
778 "sql-server" => {
779 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
780 let schema_name = external_reference[1].to_ast_string_simple();
785 let table_name = external_reference[2].to_ast_string_simple();
786
787 self.pack_sql_server_source_table_update(
788 id,
789 &schema_name,
790 &table_name,
791 diff,
792 )
793 }
794 "load-generator" => vec![],
797 s => unreachable!("{s} sources do not have subsources"),
798 }
799 }
800 DataSourceDesc::Webhook { .. } => {
801 vec![self.pack_webhook_source_update(id, diff)]
802 }
803 _ => vec![],
804 });
805
806 updates
807 }
808 CatalogItem::View(view) => {
809 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
810 }
811 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
812 id, oid, schema_id, name, owner_id, privileges, mview, diff,
813 ),
814 CatalogItem::Sink(sink) => {
815 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
816 }
817 CatalogItem::Type(ty) => {
818 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
819 }
820 CatalogItem::Func(func) => {
821 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
822 }
823 CatalogItem::Secret(_) => {
824 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
825 }
826 CatalogItem::Connection(connection) => self.pack_connection_update(
827 id, oid, schema_id, name, owner_id, privileges, connection, diff,
828 ),
829 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
830 id, oid, schema_id, name, owner_id, privileges, ct, diff,
831 ),
832 };
833
834 if !entry.item().is_temporary() {
835 for dependee in entry.item().references().items() {
838 updates.push(self.pack_depends_update(id, *dependee, diff))
839 }
840 }
841
842 if let Some(desc) = entry.relation_desc_latest() {
844 let defaults = match entry.item() {
845 CatalogItem::Table(Table {
846 data_source: TableDataSource::TableWrites { defaults },
847 ..
848 }) => Some(defaults),
849 _ => None,
850 };
851 for (i, (column_name, column_type)) in desc.iter().enumerate() {
852 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
853 let default: Datum = default
854 .as_ref()
855 .map(|d| Datum::String(d))
856 .unwrap_or(Datum::Null);
857 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
858 let (type_name, type_oid) = match &column_type.scalar_type {
859 SqlScalarType::List {
860 custom_id: Some(custom_id),
861 ..
862 }
863 | SqlScalarType::Map {
864 custom_id: Some(custom_id),
865 ..
866 }
867 | SqlScalarType::Record {
868 custom_id: Some(custom_id),
869 ..
870 } => {
871 let entry = self.get_entry(custom_id);
872 let name = &*entry.name().item;
887 let oid = entry.oid();
888 (name, oid)
889 }
890 _ => (pgtype.name(), pgtype.oid()),
891 };
892 updates.push(BuiltinTableUpdate::row(
893 &*MZ_COLUMNS,
894 Row::pack_slice(&[
895 Datum::String(&id.to_string()),
896 Datum::String(column_name),
897 Datum::UInt64(u64::cast_from(i + 1)),
898 Datum::from(column_type.nullable),
899 Datum::String(type_name),
900 default,
901 Datum::UInt32(type_oid),
902 Datum::Int32(pgtype.typmod()),
903 ]),
904 diff,
905 ));
906 }
907 }
908
909 if let Some(cw) = entry.item().initial_logical_compaction_window() {
911 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
912 }
913
914 updates.extend(Self::pack_item_global_id_update(entry, diff));
915
916 updates
917 }
918
919 fn pack_item_global_id_update(
920 entry: &CatalogEntry,
921 diff: Diff,
922 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
923 let id = entry.id().to_string();
924 let global_ids = entry.global_ids();
925 global_ids.map(move |global_id| {
926 BuiltinTableUpdate::row(
927 &*MZ_OBJECT_GLOBAL_IDS,
928 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
929 diff,
930 )
931 })
932 }
933
934 fn pack_history_retention_strategy_update(
935 &self,
936 id: CatalogItemId,
937 cw: CompactionWindow,
938 diff: Diff,
939 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
940 let cw: u64 = cw.comparable_timestamp().into();
941 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
942 .expect("must serialize");
943 BuiltinTableUpdate::row(
944 &*MZ_HISTORY_RETENTION_STRATEGIES,
945 Row::pack_slice(&[
946 Datum::String(&id.to_string()),
947 Datum::String("FOR"),
949 cw.into_row().into_element(),
950 ]),
951 diff,
952 )
953 }
954
955 fn pack_table_update(
956 &self,
957 id: CatalogItemId,
958 oid: u32,
959 schema_id: &SchemaSpecifier,
960 name: &str,
961 owner_id: &RoleId,
962 privileges: Datum,
963 diff: Diff,
964 table: &Table,
965 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
966 let redacted = table.create_sql.as_ref().map(|create_sql| {
967 mz_sql::parse::parse(create_sql)
968 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
969 .into_element()
970 .ast
971 .to_ast_string_redacted()
972 });
973 let source_id = if let TableDataSource::DataSource {
974 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
975 ..
976 } = &table.data_source
977 {
978 Some(ingestion_id.to_string())
979 } else {
980 None
981 };
982
983 vec![BuiltinTableUpdate::row(
984 &*MZ_TABLES,
985 Row::pack_slice(&[
986 Datum::String(&id.to_string()),
987 Datum::UInt32(oid),
988 Datum::String(&schema_id.to_string()),
989 Datum::String(name),
990 Datum::String(&owner_id.to_string()),
991 privileges,
992 if let Some(create_sql) = &table.create_sql {
993 Datum::String(create_sql)
994 } else {
995 Datum::Null
996 },
997 if let Some(redacted) = &redacted {
998 Datum::String(redacted)
999 } else {
1000 Datum::Null
1001 },
1002 if let Some(source_id) = source_id.as_ref() {
1003 Datum::String(source_id)
1004 } else {
1005 Datum::Null
1006 },
1007 ]),
1008 diff,
1009 )]
1010 }
1011
1012 fn pack_source_update(
1013 &self,
1014 id: CatalogItemId,
1015 oid: u32,
1016 schema_id: &SchemaSpecifier,
1017 name: &str,
1018 source_desc_name: &str,
1019 connection_id: Option<CatalogItemId>,
1020 envelope: Option<&str>,
1021 key_format: Option<&str>,
1022 value_format: Option<&str>,
1023 cluster_id: Option<&str>,
1024 owner_id: &RoleId,
1025 privileges: Datum,
1026 diff: Diff,
1027 create_sql: Option<&String>,
1028 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1029 let redacted = create_sql.map(|create_sql| {
1030 let create_stmt = mz_sql::parse::parse(create_sql)
1031 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1032 .into_element()
1033 .ast;
1034 create_stmt.to_ast_string_redacted()
1035 });
1036 vec![BuiltinTableUpdate::row(
1037 &*MZ_SOURCES,
1038 Row::pack_slice(&[
1039 Datum::String(&id.to_string()),
1040 Datum::UInt32(oid),
1041 Datum::String(&schema_id.to_string()),
1042 Datum::String(name),
1043 Datum::String(source_desc_name),
1044 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1045 Datum::Null,
1048 Datum::from(envelope),
1049 Datum::from(key_format),
1050 Datum::from(value_format),
1051 Datum::from(cluster_id),
1052 Datum::String(&owner_id.to_string()),
1053 privileges,
1054 if let Some(create_sql) = create_sql {
1055 Datum::String(create_sql)
1056 } else {
1057 Datum::Null
1058 },
1059 if let Some(redacted) = &redacted {
1060 Datum::String(redacted)
1061 } else {
1062 Datum::Null
1063 },
1064 ]),
1065 diff,
1066 )]
1067 }
1068
1069 fn pack_postgres_source_update(
1070 &self,
1071 id: CatalogItemId,
1072 postgres: &PostgresSourceConnection<ReferencedConnection>,
1073 diff: Diff,
1074 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1075 vec![BuiltinTableUpdate::row(
1076 &*MZ_POSTGRES_SOURCES,
1077 Row::pack_slice(&[
1078 Datum::String(&id.to_string()),
1079 Datum::String(&postgres.publication_details.slot),
1080 Datum::from(postgres.publication_details.timeline_id),
1081 ]),
1082 diff,
1083 )]
1084 }
1085
1086 fn pack_kafka_source_update(
1087 &self,
1088 item_id: CatalogItemId,
1089 collection_id: GlobalId,
1090 kafka: &KafkaSourceConnection<ReferencedConnection>,
1091 diff: Diff,
1092 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1093 vec![BuiltinTableUpdate::row(
1094 &*MZ_KAFKA_SOURCES,
1095 Row::pack_slice(&[
1096 Datum::String(&item_id.to_string()),
1097 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1098 Datum::String(&kafka.topic),
1099 ]),
1100 diff,
1101 )]
1102 }
1103
1104 fn pack_postgres_source_tables_update(
1105 &self,
1106 id: CatalogItemId,
1107 schema_name: &str,
1108 table_name: &str,
1109 diff: Diff,
1110 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1111 vec![BuiltinTableUpdate::row(
1112 &*MZ_POSTGRES_SOURCE_TABLES,
1113 Row::pack_slice(&[
1114 Datum::String(&id.to_string()),
1115 Datum::String(schema_name),
1116 Datum::String(table_name),
1117 ]),
1118 diff,
1119 )]
1120 }
1121
1122 fn pack_mysql_source_tables_update(
1123 &self,
1124 id: CatalogItemId,
1125 schema_name: &str,
1126 table_name: &str,
1127 diff: Diff,
1128 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1129 vec![BuiltinTableUpdate::row(
1130 &*MZ_MYSQL_SOURCE_TABLES,
1131 Row::pack_slice(&[
1132 Datum::String(&id.to_string()),
1133 Datum::String(schema_name),
1134 Datum::String(table_name),
1135 ]),
1136 diff,
1137 )]
1138 }
1139
1140 fn pack_sql_server_source_table_update(
1141 &self,
1142 id: CatalogItemId,
1143 schema_name: &str,
1144 table_name: &str,
1145 diff: Diff,
1146 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1147 vec![BuiltinTableUpdate::row(
1148 &*MZ_SQL_SERVER_SOURCE_TABLES,
1149 Row::pack_slice(&[
1150 Datum::String(&id.to_string()),
1151 Datum::String(schema_name),
1152 Datum::String(table_name),
1153 ]),
1154 diff,
1155 )]
1156 }
1157
1158 fn pack_kafka_source_tables_update(
1159 &self,
1160 id: CatalogItemId,
1161 topic: &str,
1162 envelope: Option<&str>,
1163 key_format: Option<&str>,
1164 value_format: Option<&str>,
1165 diff: Diff,
1166 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1167 vec![BuiltinTableUpdate::row(
1168 &*MZ_KAFKA_SOURCE_TABLES,
1169 Row::pack_slice(&[
1170 Datum::String(&id.to_string()),
1171 Datum::String(topic),
1172 Datum::from(envelope),
1173 Datum::from(key_format),
1174 Datum::from(value_format),
1175 ]),
1176 diff,
1177 )]
1178 }
1179
1180 fn pack_connection_update(
1181 &self,
1182 id: CatalogItemId,
1183 oid: u32,
1184 schema_id: &SchemaSpecifier,
1185 name: &str,
1186 owner_id: &RoleId,
1187 privileges: Datum,
1188 connection: &Connection,
1189 diff: Diff,
1190 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1191 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1192 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1193 .into_element()
1194 .ast;
1195 let mut updates = vec![BuiltinTableUpdate::row(
1196 &*MZ_CONNECTIONS,
1197 Row::pack_slice(&[
1198 Datum::String(&id.to_string()),
1199 Datum::UInt32(oid),
1200 Datum::String(&schema_id.to_string()),
1201 Datum::String(name),
1202 Datum::String(match connection.details {
1203 ConnectionDetails::Kafka { .. } => "kafka",
1204 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1205 ConnectionDetails::Postgres { .. } => "postgres",
1206 ConnectionDetails::Aws(..) => "aws",
1207 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1208 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1209 ConnectionDetails::MySql { .. } => "mysql",
1210 ConnectionDetails::SqlServer(_) => "sql-server",
1211 ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1212 }),
1213 Datum::String(&owner_id.to_string()),
1214 privileges,
1215 Datum::String(&connection.create_sql),
1216 Datum::String(&create_stmt.to_ast_string_redacted()),
1217 ]),
1218 diff,
1219 )];
1220 match connection.details {
1221 ConnectionDetails::Kafka(ref kafka) => {
1222 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1223 }
1224 ConnectionDetails::Aws(ref aws_config) => {
1225 match self.pack_aws_connection_update(id, aws_config, diff) {
1226 Ok(update) => {
1227 updates.push(update);
1228 }
1229 Err(e) => {
1230 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1231 }
1232 }
1233 }
1234 ConnectionDetails::AwsPrivatelink(_) => {
1235 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1236 updates.push(self.pack_aws_privatelink_connection_update(
1237 id,
1238 aws_principal_context,
1239 diff,
1240 ));
1241 } else {
1242 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1243 }
1244 }
1245 ConnectionDetails::Ssh {
1246 ref key_1,
1247 ref key_2,
1248 ..
1249 } => {
1250 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1251 }
1252 ConnectionDetails::Csr(_)
1253 | ConnectionDetails::Postgres(_)
1254 | ConnectionDetails::MySql(_)
1255 | ConnectionDetails::SqlServer(_)
1256 | ConnectionDetails::IcebergCatalog(_) => (),
1257 };
1258 updates
1259 }
1260
1261 pub(crate) fn pack_ssh_tunnel_connection_update(
1262 &self,
1263 id: CatalogItemId,
1264 key_1: &SshKey,
1265 key_2: &SshKey,
1266 diff: Diff,
1267 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1268 BuiltinTableUpdate::row(
1269 &*MZ_SSH_TUNNEL_CONNECTIONS,
1270 Row::pack_slice(&[
1271 Datum::String(&id.to_string()),
1272 Datum::String(key_1.public_key().as_str()),
1273 Datum::String(key_2.public_key().as_str()),
1274 ]),
1275 diff,
1276 )
1277 }
1278
1279 fn pack_kafka_connection_update(
1280 &self,
1281 id: CatalogItemId,
1282 kafka: &KafkaConnection<ReferencedConnection>,
1283 diff: Diff,
1284 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1285 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1286 let mut row = Row::default();
1287 row.packer()
1288 .try_push_array(
1289 &[ArrayDimension {
1290 lower_bound: 1,
1291 length: kafka.brokers.len(),
1292 }],
1293 kafka
1294 .brokers
1295 .iter()
1296 .map(|broker| Datum::String(&broker.address)),
1297 )
1298 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1299 let brokers = row.unpack_first();
1300 vec![BuiltinTableUpdate::row(
1301 &*MZ_KAFKA_CONNECTIONS,
1302 Row::pack_slice(&[
1303 Datum::String(&id.to_string()),
1304 brokers,
1305 Datum::String(&progress_topic),
1306 ]),
1307 diff,
1308 )]
1309 }
1310
1311 pub fn pack_aws_privatelink_connection_update(
1312 &self,
1313 connection_id: CatalogItemId,
1314 aws_principal_context: &AwsPrincipalContext,
1315 diff: Diff,
1316 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1317 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1318 let row = Row::pack_slice(&[
1319 Datum::String(&connection_id.to_string()),
1320 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1321 ]);
1322 BuiltinTableUpdate::row(id, row, diff)
1323 }
1324
1325 pub fn pack_aws_connection_update(
1326 &self,
1327 connection_id: CatalogItemId,
1328 aws_config: &AwsConnection,
1329 diff: Diff,
1330 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1331 let id = &MZ_AWS_CONNECTIONS;
1332
1333 let mut access_key_id = None;
1334 let mut access_key_id_secret_id = None;
1335 let mut secret_access_key_secret_id = None;
1336 let mut session_token = None;
1337 let mut session_token_secret_id = None;
1338 let mut assume_role_arn = None;
1339 let mut assume_role_session_name = None;
1340 let mut principal = None;
1341 let mut external_id = None;
1342 let mut example_trust_policy = None;
1343 match &aws_config.auth {
1344 AwsAuth::Credentials(credentials) => {
1345 match &credentials.access_key_id {
1346 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1347 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1348 }
1349 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1350 match credentials.session_token.as_ref() {
1351 None => (),
1352 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1353 Some(StringOrSecret::Secret(s)) => {
1354 session_token_secret_id = Some(s.to_string())
1355 }
1356 }
1357 }
1358 AwsAuth::AssumeRole(assume_role) => {
1359 assume_role_arn = Some(assume_role.arn.as_str());
1360 assume_role_session_name = assume_role.session_name.as_deref();
1361 principal = self
1362 .config
1363 .connection_context
1364 .aws_connection_role_arn
1365 .as_deref();
1366 external_id =
1367 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1368 example_trust_policy = {
1369 let policy = assume_role
1370 .example_trust_policy(&self.config.connection_context, connection_id)?;
1371 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1372 Some(policy.into_row())
1373 };
1374 }
1375 }
1376
1377 let row = Row::pack_slice(&[
1378 Datum::String(&connection_id.to_string()),
1379 Datum::from(aws_config.endpoint.as_deref()),
1380 Datum::from(aws_config.region.as_deref()),
1381 Datum::from(access_key_id),
1382 Datum::from(access_key_id_secret_id.as_deref()),
1383 Datum::from(secret_access_key_secret_id.as_deref()),
1384 Datum::from(session_token),
1385 Datum::from(session_token_secret_id.as_deref()),
1386 Datum::from(assume_role_arn),
1387 Datum::from(assume_role_session_name),
1388 Datum::from(principal),
1389 Datum::from(external_id.as_deref()),
1390 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1391 ]);
1392
1393 Ok(BuiltinTableUpdate::row(id, row, diff))
1394 }
1395
1396 fn pack_view_update(
1397 &self,
1398 id: CatalogItemId,
1399 oid: u32,
1400 schema_id: &SchemaSpecifier,
1401 name: &str,
1402 owner_id: &RoleId,
1403 privileges: Datum,
1404 view: &View,
1405 diff: Diff,
1406 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1407 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1408 .unwrap_or_else(|e| {
1409 panic!(
1410 "create_sql cannot be invalid: `{}` --- error: `{}`",
1411 view.create_sql, e
1412 )
1413 })
1414 .into_element()
1415 .ast;
1416 let query = match &create_stmt {
1417 Statement::CreateView(stmt) => &stmt.definition.query,
1418 _ => unreachable!(),
1419 };
1420
1421 let mut query_string = query.to_ast_string_stable();
1422 query_string.push(';');
1425
1426 vec![BuiltinTableUpdate::row(
1427 &*MZ_VIEWS,
1428 Row::pack_slice(&[
1429 Datum::String(&id.to_string()),
1430 Datum::UInt32(oid),
1431 Datum::String(&schema_id.to_string()),
1432 Datum::String(name),
1433 Datum::String(&query_string),
1434 Datum::String(&owner_id.to_string()),
1435 privileges,
1436 Datum::String(&view.create_sql),
1437 Datum::String(&create_stmt.to_ast_string_redacted()),
1438 ]),
1439 diff,
1440 )]
1441 }
1442
1443 fn pack_materialized_view_update(
1444 &self,
1445 id: CatalogItemId,
1446 oid: u32,
1447 schema_id: &SchemaSpecifier,
1448 name: &str,
1449 owner_id: &RoleId,
1450 privileges: Datum,
1451 mview: &MaterializedView,
1452 diff: Diff,
1453 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1454 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1455 .unwrap_or_else(|e| {
1456 panic!(
1457 "create_sql cannot be invalid: `{}` --- error: `{}`",
1458 mview.create_sql, e
1459 )
1460 })
1461 .into_element()
1462 .ast;
1463 let query_string = match &create_stmt {
1464 Statement::CreateMaterializedView(stmt) => {
1465 let mut query_string = stmt.query.to_ast_string_stable();
1466 query_string.push(';');
1469 query_string
1470 }
1471 _ => unreachable!(),
1472 };
1473
1474 let mut updates = Vec::new();
1475
1476 updates.push(BuiltinTableUpdate::row(
1477 &*MZ_MATERIALIZED_VIEWS,
1478 Row::pack_slice(&[
1479 Datum::String(&id.to_string()),
1480 Datum::UInt32(oid),
1481 Datum::String(&schema_id.to_string()),
1482 Datum::String(name),
1483 Datum::String(&mview.cluster_id.to_string()),
1484 Datum::String(&query_string),
1485 Datum::String(&owner_id.to_string()),
1486 privileges,
1487 Datum::String(&mview.create_sql),
1488 Datum::String(&create_stmt.to_ast_string_redacted()),
1489 ]),
1490 diff,
1491 ));
1492
1493 if let Some(refresh_schedule) = &mview.refresh_schedule {
1494 assert!(!refresh_schedule.is_empty());
1497 for RefreshEvery {
1498 interval,
1499 aligned_to,
1500 } in refresh_schedule.everies.iter()
1501 {
1502 let aligned_to_dt = mz_ore::now::to_datetime(
1503 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1504 );
1505 updates.push(BuiltinTableUpdate::row(
1506 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1507 Row::pack_slice(&[
1508 Datum::String(&id.to_string()),
1509 Datum::String("every"),
1510 Datum::Interval(
1511 Interval::from_duration(interval).expect(
1512 "planning ensured that this is convertible back to Interval",
1513 ),
1514 ),
1515 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1516 Datum::Null,
1517 ]),
1518 diff,
1519 ));
1520 }
1521 for at in refresh_schedule.ats.iter() {
1522 let at_dt = mz_ore::now::to_datetime(
1523 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1524 );
1525 updates.push(BuiltinTableUpdate::row(
1526 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1527 Row::pack_slice(&[
1528 Datum::String(&id.to_string()),
1529 Datum::String("at"),
1530 Datum::Null,
1531 Datum::Null,
1532 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1533 ]),
1534 diff,
1535 ));
1536 }
1537 } else {
1538 updates.push(BuiltinTableUpdate::row(
1539 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1540 Row::pack_slice(&[
1541 Datum::String(&id.to_string()),
1542 Datum::String("on-commit"),
1543 Datum::Null,
1544 Datum::Null,
1545 Datum::Null,
1546 ]),
1547 diff,
1548 ));
1549 }
1550
1551 if let Some(target_id) = mview.replacement_target {
1552 updates.push(BuiltinTableUpdate::row(
1553 &*MZ_REPLACEMENTS,
1554 Row::pack_slice(&[
1555 Datum::String(&id.to_string()),
1556 Datum::String(&target_id.to_string()),
1557 ]),
1558 diff,
1559 ));
1560 }
1561
1562 updates
1563 }
1564
1565 fn pack_continual_task_update(
1566 &self,
1567 id: CatalogItemId,
1568 oid: u32,
1569 schema_id: &SchemaSpecifier,
1570 name: &str,
1571 owner_id: &RoleId,
1572 privileges: Datum,
1573 ct: &ContinualTask,
1574 diff: Diff,
1575 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1576 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1577 .unwrap_or_else(|e| {
1578 panic!(
1579 "create_sql cannot be invalid: `{}` --- error: `{}`",
1580 ct.create_sql, e
1581 )
1582 })
1583 .into_element()
1584 .ast;
1585 let query_string = match &create_stmt {
1586 Statement::CreateContinualTask(stmt) => {
1587 let mut query_string = String::new();
1588 for stmt in &stmt.stmts {
1589 let s = match stmt {
1590 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1591 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1592 };
1593 if query_string.is_empty() {
1594 query_string = s;
1595 } else {
1596 query_string.push_str("; ");
1597 query_string.push_str(&s);
1598 }
1599 }
1600 query_string
1601 }
1602 _ => unreachable!(),
1603 };
1604
1605 vec![BuiltinTableUpdate::row(
1606 &*MZ_CONTINUAL_TASKS,
1607 Row::pack_slice(&[
1608 Datum::String(&id.to_string()),
1609 Datum::UInt32(oid),
1610 Datum::String(&schema_id.to_string()),
1611 Datum::String(name),
1612 Datum::String(&ct.cluster_id.to_string()),
1613 Datum::String(&query_string),
1614 Datum::String(&owner_id.to_string()),
1615 privileges,
1616 Datum::String(&ct.create_sql),
1617 Datum::String(&create_stmt.to_ast_string_redacted()),
1618 ]),
1619 diff,
1620 )]
1621 }
1622
1623 fn pack_sink_update(
1624 &self,
1625 id: CatalogItemId,
1626 oid: u32,
1627 schema_id: &SchemaSpecifier,
1628 name: &str,
1629 owner_id: &RoleId,
1630 sink: &Sink,
1631 diff: Diff,
1632 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1633 let mut updates = vec![];
1634 match &sink.connection {
1635 StorageSinkConnection::Kafka(KafkaSinkConnection {
1636 topic: topic_name, ..
1637 }) => {
1638 updates.push(BuiltinTableUpdate::row(
1639 &*MZ_KAFKA_SINKS,
1640 Row::pack_slice(&[
1641 Datum::String(&id.to_string()),
1642 Datum::String(topic_name.as_str()),
1643 ]),
1644 diff,
1645 ));
1646 }
1647 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1648 namespace, table, ..
1649 }) => {
1650 updates.push(BuiltinTableUpdate::row(
1651 &*MZ_ICEBERG_SINKS,
1652 Row::pack_slice(&[
1653 Datum::String(&id.to_string()),
1654 Datum::String(namespace.as_str()),
1655 Datum::String(table.as_str()),
1656 ]),
1657 diff,
1658 ));
1659 }
1660 };
1661
1662 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1663 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1664 .into_element()
1665 .ast;
1666
1667 let envelope = sink.envelope();
1668
1669 let combined_format = sink.combined_format();
1671 let (key_format, value_format) = match sink.formats() {
1672 Some((key_format, value_format)) => (key_format, Some(value_format)),
1673 None => (None, None),
1674 };
1675
1676 updates.push(BuiltinTableUpdate::row(
1677 &*MZ_SINKS,
1678 Row::pack_slice(&[
1679 Datum::String(&id.to_string()),
1680 Datum::UInt32(oid),
1681 Datum::String(&schema_id.to_string()),
1682 Datum::String(name),
1683 Datum::String(sink.connection.name()),
1684 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1685 Datum::Null,
1687 Datum::from(envelope),
1688 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1691 Datum::from(key_format),
1692 Datum::from(value_format),
1693 Datum::String(&sink.cluster_id.to_string()),
1694 Datum::String(&owner_id.to_string()),
1695 Datum::String(&sink.create_sql),
1696 Datum::String(&create_stmt.to_ast_string_redacted()),
1697 ]),
1698 diff,
1699 ));
1700
1701 updates
1702 }
1703
1704 fn pack_index_update(
1705 &self,
1706 id: CatalogItemId,
1707 oid: u32,
1708 name: &str,
1709 owner_id: &RoleId,
1710 index: &Index,
1711 diff: Diff,
1712 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1713 let mut updates = vec![];
1714
1715 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1716 .unwrap_or_else(|e| {
1717 panic!(
1718 "create_sql cannot be invalid: `{}` --- error: `{}`",
1719 index.create_sql, e
1720 )
1721 })
1722 .into_element()
1723 .ast;
1724
1725 let key_sqls = match &create_stmt {
1726 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1727 .as_ref()
1728 .expect("key_parts is filled in during planning"),
1729 _ => unreachable!(),
1730 };
1731 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1732
1733 updates.push(BuiltinTableUpdate::row(
1734 &*MZ_INDEXES,
1735 Row::pack_slice(&[
1736 Datum::String(&id.to_string()),
1737 Datum::UInt32(oid),
1738 Datum::String(name),
1739 Datum::String(&on_item_id.to_string()),
1740 Datum::String(&index.cluster_id.to_string()),
1741 Datum::String(&owner_id.to_string()),
1742 Datum::String(&index.create_sql),
1743 Datum::String(&create_stmt.to_ast_string_redacted()),
1744 ]),
1745 diff,
1746 ));
1747
1748 for (i, key) in index.keys.iter().enumerate() {
1749 let on_entry = self.get_entry_by_global_id(&index.on);
1750 let on_desc = on_entry
1751 .relation_desc()
1752 .expect("can only create indexes on items with a valid description");
1753 let nullable = key.typ(&on_desc.typ().column_types).nullable;
1754 let seq_in_index = u64::cast_from(i + 1);
1755 let key_sql = key_sqls
1756 .get(i)
1757 .expect("missing sql information for index key")
1758 .to_ast_string_simple();
1759 let (field_number, expression) = match key {
1760 MirScalarExpr::Column(col, _) => {
1761 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1762 }
1763 _ => (Datum::Null, Datum::String(&key_sql)),
1764 };
1765 updates.push(BuiltinTableUpdate::row(
1766 &*MZ_INDEX_COLUMNS,
1767 Row::pack_slice(&[
1768 Datum::String(&id.to_string()),
1769 Datum::UInt64(seq_in_index),
1770 field_number,
1771 expression,
1772 Datum::from(nullable),
1773 ]),
1774 diff,
1775 ));
1776 }
1777
1778 updates
1779 }
1780
1781 fn pack_type_update(
1782 &self,
1783 id: CatalogItemId,
1784 oid: u32,
1785 schema_id: &SchemaSpecifier,
1786 name: &str,
1787 owner_id: &RoleId,
1788 privileges: Datum,
1789 typ: &Type,
1790 diff: Diff,
1791 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1792 let mut out = vec![];
1793
1794 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1795 mz_sql::parse::parse(create_sql)
1796 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1797 .into_element()
1798 .ast
1799 .to_ast_string_redacted()
1800 });
1801
1802 out.push(BuiltinTableUpdate::row(
1803 &*MZ_TYPES,
1804 Row::pack_slice(&[
1805 Datum::String(&id.to_string()),
1806 Datum::UInt32(oid),
1807 Datum::String(&schema_id.to_string()),
1808 Datum::String(name),
1809 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1810 Datum::String(&owner_id.to_string()),
1811 privileges,
1812 if let Some(create_sql) = &typ.create_sql {
1813 Datum::String(create_sql)
1814 } else {
1815 Datum::Null
1816 },
1817 if let Some(redacted) = &redacted {
1818 Datum::String(redacted)
1819 } else {
1820 Datum::Null
1821 },
1822 ]),
1823 diff,
1824 ));
1825
1826 let mut row = Row::default();
1827 let mut packer = row.packer();
1828
1829 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1830 if mods.is_empty() {
1831 packer.push(Datum::Null);
1832 } else {
1833 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1834 }
1835 }
1836
1837 let index_id = match &typ.details.typ {
1838 CatalogType::Array {
1839 element_reference: element_id,
1840 } => {
1841 packer.push(Datum::String(&id.to_string()));
1842 packer.push(Datum::String(&element_id.to_string()));
1843 &MZ_ARRAY_TYPES
1844 }
1845 CatalogType::List {
1846 element_reference: element_id,
1847 element_modifiers,
1848 } => {
1849 packer.push(Datum::String(&id.to_string()));
1850 packer.push(Datum::String(&element_id.to_string()));
1851 append_modifier(&mut packer, element_modifiers);
1852 &MZ_LIST_TYPES
1853 }
1854 CatalogType::Map {
1855 key_reference: key_id,
1856 value_reference: value_id,
1857 key_modifiers,
1858 value_modifiers,
1859 } => {
1860 packer.push(Datum::String(&id.to_string()));
1861 packer.push(Datum::String(&key_id.to_string()));
1862 packer.push(Datum::String(&value_id.to_string()));
1863 append_modifier(&mut packer, key_modifiers);
1864 append_modifier(&mut packer, value_modifiers);
1865 &MZ_MAP_TYPES
1866 }
1867 CatalogType::Pseudo => {
1868 packer.push(Datum::String(&id.to_string()));
1869 &MZ_PSEUDO_TYPES
1870 }
1871 _ => {
1872 packer.push(Datum::String(&id.to_string()));
1873 &MZ_BASE_TYPES
1874 }
1875 };
1876 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1877
1878 if let Some(pg_metadata) = &typ.details.pg_metadata {
1879 out.push(BuiltinTableUpdate::row(
1880 &*MZ_TYPE_PG_METADATA,
1881 Row::pack_slice(&[
1882 Datum::String(&id.to_string()),
1883 Datum::UInt32(pg_metadata.typinput_oid),
1884 Datum::UInt32(pg_metadata.typreceive_oid),
1885 ]),
1886 diff,
1887 ));
1888 }
1889
1890 out
1891 }
1892
1893 fn pack_func_update(
1894 &self,
1895 id: CatalogItemId,
1896 schema_id: &SchemaSpecifier,
1897 name: &str,
1898 owner_id: &RoleId,
1899 func: &Func,
1900 diff: Diff,
1901 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1902 let mut updates = vec![];
1903 for func_impl_details in func.inner.func_impls() {
1904 let arg_type_ids = func_impl_details
1905 .arg_typs
1906 .iter()
1907 .map(|typ| self.get_system_type(typ).id().to_string())
1908 .collect::<Vec<_>>();
1909
1910 let mut row = Row::default();
1911 row.packer()
1912 .try_push_array(
1913 &[ArrayDimension {
1914 lower_bound: 1,
1915 length: arg_type_ids.len(),
1916 }],
1917 arg_type_ids.iter().map(|id| Datum::String(id)),
1918 )
1919 .expect(
1920 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1921 );
1922 let arg_type_ids = row.unpack_first();
1923
1924 updates.push(BuiltinTableUpdate::row(
1925 &*MZ_FUNCTIONS,
1926 Row::pack_slice(&[
1927 Datum::String(&id.to_string()),
1928 Datum::UInt32(func_impl_details.oid),
1929 Datum::String(&schema_id.to_string()),
1930 Datum::String(name),
1931 arg_type_ids,
1932 Datum::from(
1933 func_impl_details
1934 .variadic_typ
1935 .map(|typ| self.get_system_type(typ).id().to_string())
1936 .as_deref(),
1937 ),
1938 Datum::from(
1939 func_impl_details
1940 .return_typ
1941 .map(|typ| self.get_system_type(typ).id().to_string())
1942 .as_deref(),
1943 ),
1944 func_impl_details.return_is_set.into(),
1945 Datum::String(&owner_id.to_string()),
1946 ]),
1947 diff,
1948 ));
1949
1950 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1951 updates.push(BuiltinTableUpdate::row(
1952 &*MZ_AGGREGATES,
1953 Row::pack_slice(&[
1954 Datum::UInt32(func_impl_details.oid),
1955 Datum::String("n"),
1957 Datum::Int16(0),
1958 ]),
1959 diff,
1960 ));
1961 }
1962 }
1963 updates
1964 }
1965
1966 pub fn pack_op_update(
1967 &self,
1968 operator: &str,
1969 func_impl_details: FuncImplCatalogDetails,
1970 diff: Diff,
1971 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1972 let arg_type_ids = func_impl_details
1973 .arg_typs
1974 .iter()
1975 .map(|typ| self.get_system_type(typ).id().to_string())
1976 .collect::<Vec<_>>();
1977
1978 let mut row = Row::default();
1979 row.packer()
1980 .try_push_array(
1981 &[ArrayDimension {
1982 lower_bound: 1,
1983 length: arg_type_ids.len(),
1984 }],
1985 arg_type_ids.iter().map(|id| Datum::String(id)),
1986 )
1987 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1988 let arg_type_ids = row.unpack_first();
1989
1990 BuiltinTableUpdate::row(
1991 &*MZ_OPERATORS,
1992 Row::pack_slice(&[
1993 Datum::UInt32(func_impl_details.oid),
1994 Datum::String(operator),
1995 arg_type_ids,
1996 Datum::from(
1997 func_impl_details
1998 .return_typ
1999 .map(|typ| self.get_system_type(typ).id().to_string())
2000 .as_deref(),
2001 ),
2002 ]),
2003 diff,
2004 )
2005 }
2006
2007 fn pack_secret_update(
2008 &self,
2009 id: CatalogItemId,
2010 oid: u32,
2011 schema_id: &SchemaSpecifier,
2012 name: &str,
2013 owner_id: &RoleId,
2014 privileges: Datum,
2015 diff: Diff,
2016 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2017 vec![BuiltinTableUpdate::row(
2018 &*MZ_SECRETS,
2019 Row::pack_slice(&[
2020 Datum::String(&id.to_string()),
2021 Datum::UInt32(oid),
2022 Datum::String(&schema_id.to_string()),
2023 Datum::String(name),
2024 Datum::String(&owner_id.to_string()),
2025 privileges,
2026 ]),
2027 diff,
2028 )]
2029 }
2030
2031 pub fn pack_audit_log_update(
2032 &self,
2033 event: &VersionedEvent,
2034 diff: Diff,
2035 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2036 let (event_type, object_type, details, user, occurred_at): (
2037 &EventType,
2038 &ObjectType,
2039 &EventDetails,
2040 &Option<String>,
2041 u64,
2042 ) = match event {
2043 VersionedEvent::V1(ev) => (
2044 &ev.event_type,
2045 &ev.object_type,
2046 &ev.details,
2047 &ev.user,
2048 ev.occurred_at,
2049 ),
2050 };
2051 let details = Jsonb::from_serde_json(details.as_json())
2052 .map_err(|e| {
2053 Error::new(ErrorKind::Unstructured(format!(
2054 "could not pack audit log update: {}",
2055 e
2056 )))
2057 })?
2058 .into_row();
2059 let details = details
2060 .iter()
2061 .next()
2062 .expect("details created above with a single jsonb column");
2063 let dt = mz_ore::now::to_datetime(occurred_at);
2064 let id = event.sortable_id();
2065 Ok(BuiltinTableUpdate::row(
2066 &*MZ_AUDIT_EVENTS,
2067 Row::pack_slice(&[
2068 Datum::UInt64(id),
2069 Datum::String(&format!("{}", event_type)),
2070 Datum::String(&format!("{}", object_type)),
2071 details,
2072 match user {
2073 Some(user) => Datum::String(user),
2074 None => Datum::Null,
2075 },
2076 Datum::TimestampTz(dt.try_into().expect("must fit")),
2077 ]),
2078 diff,
2079 ))
2080 }
2081
2082 pub fn pack_storage_usage_update(
2083 &self,
2084 VersionedStorageUsage::V1(event): VersionedStorageUsage,
2085 diff: Diff,
2086 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2087 let id = &MZ_STORAGE_USAGE_BY_SHARD;
2088 let row = Row::pack_slice(&[
2089 Datum::UInt64(event.id),
2090 Datum::from(event.shard_id.as_deref()),
2091 Datum::UInt64(event.size_bytes),
2092 Datum::TimestampTz(
2093 mz_ore::now::to_datetime(event.collection_timestamp)
2094 .try_into()
2095 .expect("must fit"),
2096 ),
2097 ]);
2098 BuiltinTableUpdate::row(id, row, diff)
2099 }
2100
2101 pub fn pack_egress_ip_update(
2102 &self,
2103 ip: &IpNet,
2104 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2105 let id = &MZ_EGRESS_IPS;
2106 let addr = ip.addr();
2107 let row = Row::pack_slice(&[
2108 Datum::String(&addr.to_string()),
2109 Datum::Int32(ip.prefix_len().into()),
2110 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2111 ]);
2112 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2113 }
2114
2115 pub fn pack_license_key_update(
2116 &self,
2117 license_key: &ValidatedLicenseKey,
2118 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2119 let id = &MZ_LICENSE_KEYS;
2120 let row = Row::pack_slice(&[
2121 Datum::String(&license_key.id),
2122 Datum::String(&license_key.organization),
2123 Datum::String(&license_key.environment_id),
2124 Datum::TimestampTz(
2125 mz_ore::now::to_datetime(license_key.expiration * 1000)
2126 .try_into()
2127 .expect("must fit"),
2128 ),
2129 Datum::TimestampTz(
2130 mz_ore::now::to_datetime(license_key.not_before * 1000)
2131 .try_into()
2132 .expect("must fit"),
2133 ),
2134 ]);
2135 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2136 }
2137
2138 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2139 let mut updates = Vec::new();
2140 for (size, alloc) in &self.cluster_replica_sizes.0 {
2141 if alloc.disabled {
2142 continue;
2143 }
2144
2145 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2148 let MemoryLimit(ByteSize(memory_bytes)) =
2149 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2150 let DiskLimit(ByteSize(disk_bytes)) =
2151 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2152
2153 let row = Row::pack_slice(&[
2154 size.as_str().into(),
2155 u64::cast_from(alloc.scale).into(),
2156 u64::cast_from(alloc.workers).into(),
2157 cpu_limit.as_nanocpus().into(),
2158 memory_bytes.into(),
2159 disk_bytes.into(),
2160 (alloc.credits_per_hour).into(),
2161 ]);
2162
2163 updates.push(BuiltinTableUpdate::row(
2164 &*MZ_CLUSTER_REPLICA_SIZES,
2165 row,
2166 Diff::ONE,
2167 ));
2168 }
2169
2170 updates
2171 }
2172
2173 pub fn pack_subscribe_update(
2174 &self,
2175 id: GlobalId,
2176 subscribe: &ActiveSubscribe,
2177 diff: Diff,
2178 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2179 let mut row = Row::default();
2180 let mut packer = row.packer();
2181 packer.push(Datum::String(&id.to_string()));
2182 packer.push(Datum::Uuid(subscribe.session_uuid));
2183 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2184
2185 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2186 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2187
2188 let depends_on: Vec<_> = subscribe
2189 .depends_on
2190 .iter()
2191 .map(|id| id.to_string())
2192 .collect();
2193 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2194
2195 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2196 }
2197
2198 pub fn pack_session_update(
2199 &self,
2200 conn: &ConnMeta,
2201 diff: Diff,
2202 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2203 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2204 BuiltinTableUpdate::row(
2205 &*MZ_SESSIONS,
2206 Row::pack_slice(&[
2207 Datum::Uuid(conn.uuid()),
2208 Datum::UInt32(conn.conn_id().unhandled()),
2209 Datum::String(&conn.authenticated_role_id().to_string()),
2210 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2211 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2212 ]),
2213 diff,
2214 )
2215 }
2216
2217 pub fn pack_default_privileges_update(
2218 &self,
2219 default_privilege_object: &DefaultPrivilegeObject,
2220 grantee: &RoleId,
2221 acl_mode: &AclMode,
2222 diff: Diff,
2223 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2224 BuiltinTableUpdate::row(
2225 &*MZ_DEFAULT_PRIVILEGES,
2226 Row::pack_slice(&[
2227 default_privilege_object.role_id.to_string().as_str().into(),
2228 default_privilege_object
2229 .database_id
2230 .map(|database_id| database_id.to_string())
2231 .as_deref()
2232 .into(),
2233 default_privilege_object
2234 .schema_id
2235 .map(|schema_id| schema_id.to_string())
2236 .as_deref()
2237 .into(),
2238 default_privilege_object
2239 .object_type
2240 .to_string()
2241 .to_lowercase()
2242 .as_str()
2243 .into(),
2244 grantee.to_string().as_str().into(),
2245 acl_mode.to_string().as_str().into(),
2246 ]),
2247 diff,
2248 )
2249 }
2250
2251 pub fn pack_system_privileges_update(
2252 &self,
2253 privileges: MzAclItem,
2254 diff: Diff,
2255 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2256 BuiltinTableUpdate::row(
2257 &*MZ_SYSTEM_PRIVILEGES,
2258 Row::pack_slice(&[privileges.into()]),
2259 diff,
2260 )
2261 }
2262
2263 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2264 let mut row = Row::default();
2265 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2266 row.packer()
2267 .try_push_array(
2268 &[ArrayDimension {
2269 lower_bound: 1,
2270 length: flat_privileges.len(),
2271 }],
2272 flat_privileges
2273 .into_iter()
2274 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2275 )
2276 .expect("privileges is 1 dimensional, and its length is used for the array length");
2277 row
2278 }
2279
2280 pub fn pack_comment_update(
2281 &self,
2282 object_id: CommentObjectId,
2283 column_pos: Option<usize>,
2284 comment: &str,
2285 diff: Diff,
2286 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2287 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2289 let audit_type = super::object_type_to_audit_object_type(object_type);
2290 let object_type_str = audit_type.to_string();
2291
2292 let object_id_str = match object_id {
2293 CommentObjectId::Table(global_id)
2294 | CommentObjectId::View(global_id)
2295 | CommentObjectId::MaterializedView(global_id)
2296 | CommentObjectId::Source(global_id)
2297 | CommentObjectId::Sink(global_id)
2298 | CommentObjectId::Index(global_id)
2299 | CommentObjectId::Func(global_id)
2300 | CommentObjectId::Connection(global_id)
2301 | CommentObjectId::Secret(global_id)
2302 | CommentObjectId::Type(global_id)
2303 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2304 CommentObjectId::Role(role_id) => role_id.to_string(),
2305 CommentObjectId::Database(database_id) => database_id.to_string(),
2306 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2307 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2308 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2309 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2310 };
2311 let column_pos_datum = match column_pos {
2312 Some(pos) => {
2313 let pos =
2315 i32::try_from(pos).expect("we constrain this value in the planning layer");
2316 Datum::Int32(pos)
2317 }
2318 None => Datum::Null,
2319 };
2320
2321 BuiltinTableUpdate::row(
2322 &*MZ_COMMENTS,
2323 Row::pack_slice(&[
2324 Datum::String(&object_id_str),
2325 Datum::String(&object_type_str),
2326 column_pos_datum,
2327 Datum::String(comment),
2328 ]),
2329 diff,
2330 )
2331 }
2332
2333 pub fn pack_webhook_source_update(
2334 &self,
2335 item_id: CatalogItemId,
2336 diff: Diff,
2337 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2338 let url = self
2339 .try_get_webhook_url(&item_id)
2340 .expect("webhook source should exist");
2341 let url = url.to_string();
2342 let name = &self.get_entry(&item_id).name().item;
2343 let id_str = item_id.to_string();
2344
2345 BuiltinTableUpdate::row(
2346 &*MZ_WEBHOOKS_SOURCES,
2347 Row::pack_slice(&[
2348 Datum::String(&id_str),
2349 Datum::String(name),
2350 Datum::String(&url),
2351 ]),
2352 diff,
2353 )
2354 }
2355
2356 pub fn pack_source_references_update(
2357 &self,
2358 source_references: &SourceReferences,
2359 diff: Diff,
2360 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2361 let source_id = source_references.source_id.to_string();
2362 let updated_at = &source_references.updated_at;
2363 source_references
2364 .references
2365 .iter()
2366 .map(|reference| {
2367 let mut row = Row::default();
2368 let mut packer = row.packer();
2369 packer.extend([
2370 Datum::String(&source_id),
2371 reference
2372 .namespace
2373 .as_ref()
2374 .map(|s| Datum::String(s))
2375 .unwrap_or(Datum::Null),
2376 Datum::String(&reference.name),
2377 Datum::TimestampTz(
2378 mz_ore::now::to_datetime(*updated_at)
2379 .try_into()
2380 .expect("must fit"),
2381 ),
2382 ]);
2383 if reference.columns.len() > 0 {
2384 packer
2385 .try_push_array(
2386 &[ArrayDimension {
2387 lower_bound: 1,
2388 length: reference.columns.len(),
2389 }],
2390 reference.columns.iter().map(|col| Datum::String(col)),
2391 )
2392 .expect(
2393 "columns is 1 dimensional, and its length is used for the array length",
2394 );
2395 } else {
2396 packer.push(Datum::Null);
2397 }
2398
2399 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2400 })
2401 .collect()
2402 }
2403}