1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
16use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{
19 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
20 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
21 MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22 MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23 MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
24 MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
26 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
27 MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
28 MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
29 MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
30 MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31 MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32 MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33 MZ_WEBHOOKS_SOURCES,
34};
35use mz_catalog::config::AwsPrincipalContext;
36use mz_catalog::durable::SourceReferences;
37use mz_catalog::memory::error::{Error, ErrorKind};
38use mz_catalog::memory::objects::{
39 CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
40 Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
41};
42use mz_controller::clusters::{
43 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
44};
45use mz_controller_types::ClusterId;
46use mz_expr::MirScalarExpr;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
49use mz_ore::cast::CastFrom;
50use mz_ore::collections::CollectionExt;
51use mz_persist_client::batch::ProtoBatch;
52use mz_repr::adt::array::ArrayDimension;
53use mz_repr::adt::interval::Interval;
54use mz_repr::adt::jsonb::Jsonb;
55use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
56use mz_repr::adt::regex;
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::refresh_schedule::RefreshEvery;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
61use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
62use mz_sql::catalog::{
63 CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
64 TypeCategory,
65};
66use mz_sql::func::FuncImplCatalogDetails;
67use mz_sql::names::{
68 CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
69};
70use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
71use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
72use mz_sql::session::vars::SessionVars;
73use mz_sql_parser::ast::display::AstDisplay;
74use mz_storage_client::client::TableData;
75use mz_storage_types::connections::KafkaConnection;
76use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
77use mz_storage_types::connections::inline::ReferencedConnection;
78use mz_storage_types::connections::string_or_secret::StringOrSecret;
79use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
80use mz_storage_types::sources::{
81 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
82};
83use smallvec::smallvec;
84
85use crate::active_compute_sink::ActiveSubscribe;
87use crate::catalog::CatalogState;
88use crate::coord::ConnMeta;
89
90#[derive(Debug, Clone)]
92pub struct BuiltinTableUpdate<T = CatalogItemId> {
93 pub id: T,
95 pub data: TableData,
97}
98
99impl<T> BuiltinTableUpdate<T> {
100 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
102 BuiltinTableUpdate {
103 id,
104 data: TableData::Rows(vec![(row, diff)]),
105 }
106 }
107
108 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
109 BuiltinTableUpdate {
110 id,
111 data: TableData::Batches(smallvec![batch]),
112 }
113 }
114}
115
116impl CatalogState {
117 pub fn resolve_builtin_table_updates(
118 &self,
119 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
120 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
121 builtin_table_update
122 .into_iter()
123 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
124 .collect()
125 }
126
127 pub fn resolve_builtin_table_update(
128 &self,
129 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
130 ) -> BuiltinTableUpdate<CatalogItemId> {
131 let id = self.resolve_builtin_table(id);
132 BuiltinTableUpdate { id, data }
133 }
134
135 pub fn pack_depends_update(
136 &self,
137 depender: CatalogItemId,
138 dependee: CatalogItemId,
139 diff: Diff,
140 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
141 let row = Row::pack_slice(&[
142 Datum::String(&depender.to_string()),
143 Datum::String(&dependee.to_string()),
144 ]);
145 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
146 }
147
148 pub(super) fn pack_database_update(
149 &self,
150 database_id: &DatabaseId,
151 diff: Diff,
152 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
153 let database = self.get_database(database_id);
154 let row = self.pack_privilege_array_row(database.privileges());
155 let privileges = row.unpack_first();
156 BuiltinTableUpdate::row(
157 &*MZ_DATABASES,
158 Row::pack_slice(&[
159 Datum::String(&database.id.to_string()),
160 Datum::UInt32(database.oid),
161 Datum::String(database.name()),
162 Datum::String(&database.owner_id.to_string()),
163 privileges,
164 ]),
165 diff,
166 )
167 }
168
169 pub(super) fn pack_schema_update(
170 &self,
171 database_spec: &ResolvedDatabaseSpecifier,
172 schema_id: &SchemaId,
173 diff: Diff,
174 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
175 let (database_id, schema) = match database_spec {
176 ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
177 ResolvedDatabaseSpecifier::Id(id) => (
178 Some(id.to_string()),
179 &self.database_by_id[id].schemas_by_id[schema_id],
180 ),
181 };
182 let row = self.pack_privilege_array_row(schema.privileges());
183 let privileges = row.unpack_first();
184 BuiltinTableUpdate::row(
185 &*MZ_SCHEMAS,
186 Row::pack_slice(&[
187 Datum::String(&schema_id.to_string()),
188 Datum::UInt32(schema.oid),
189 Datum::from(database_id.as_deref()),
190 Datum::String(&schema.name.schema),
191 Datum::String(&schema.owner_id.to_string()),
192 privileges,
193 ]),
194 diff,
195 )
196 }
197
198 pub(super) fn pack_role_auth_update(
199 &self,
200 id: RoleId,
201 diff: Diff,
202 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
203 let role_auth = self.get_role_auth(&id);
204 let role = self.get_role(&id);
205 BuiltinTableUpdate::row(
206 &*MZ_ROLE_AUTH,
207 Row::pack_slice(&[
208 Datum::String(&role_auth.role_id.to_string()),
209 Datum::UInt32(role.oid),
210 match &role_auth.password_hash {
211 Some(hash) => Datum::String(hash),
212 None => Datum::Null,
213 },
214 Datum::TimestampTz(
215 mz_ore::now::to_datetime(role_auth.updated_at)
216 .try_into()
217 .expect("must fit"),
218 ),
219 ]),
220 diff,
221 )
222 }
223
224 pub(super) fn pack_role_update(
225 &self,
226 id: RoleId,
227 diff: Diff,
228 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
229 match id {
230 RoleId::Public => vec![],
232 id => {
233 let role = self.get_role(&id);
234 let self_managed_auth_enabled = self
235 .system_config()
236 .get(ENABLE_PASSWORD_AUTH.name())
237 .ok()
238 .map(|v| v.value() == "on")
239 .unwrap_or(false);
240
241 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
242
243 let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
263 let matches = regex::Regex::new(cloud_login_regex, true)
264 .expect("valid regex")
265 .is_match(&role.name);
266
267 let rolcanlogin = if self_managed_auth_enabled {
268 role.attributes.login.unwrap_or(false)
269 } else {
270 builtin_supers.contains(&role.id) || matches
271 };
272
273 let rolsuper = if self_managed_auth_enabled {
274 Datum::from(role.attributes.superuser.unwrap_or(false))
275 } else if builtin_supers.contains(&role.id) {
276 Datum::from(true)
277 } else {
278 Datum::Null
279 };
280
281 let role_update = BuiltinTableUpdate::row(
282 &*MZ_ROLES,
283 Row::pack_slice(&[
284 Datum::String(&role.id.to_string()),
285 Datum::UInt32(role.oid),
286 Datum::String(&role.name),
287 Datum::from(role.attributes.inherit),
288 Datum::from(rolcanlogin),
289 rolsuper,
290 ]),
291 diff,
292 );
293 let mut updates = vec![role_update];
294
295 let session_vars_reference = SessionVars::new_unchecked(
298 &mz_build_info::DUMMY_BUILD_INFO,
299 SYSTEM_USER.clone(),
300 None,
301 );
302
303 for (name, val) in role.vars() {
304 let result = session_vars_reference
305 .inspect(name)
306 .and_then(|var| var.check(val.borrow()));
307 let Ok(formatted_val) = result else {
308 tracing::error!(?name, ?val, ?result, "found invalid role default var");
311 continue;
312 };
313
314 let role_var_update = BuiltinTableUpdate::row(
315 &*MZ_ROLE_PARAMETERS,
316 Row::pack_slice(&[
317 Datum::String(&role.id.to_string()),
318 Datum::String(name),
319 Datum::String(&formatted_val),
320 ]),
321 diff,
322 );
323 updates.push(role_var_update);
324 }
325
326 updates
327 }
328 }
329 }
330
331 pub(super) fn pack_role_members_update(
332 &self,
333 role_id: RoleId,
334 member_id: RoleId,
335 diff: Diff,
336 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
337 let grantor_id = self
338 .get_role(&member_id)
339 .membership
340 .map
341 .get(&role_id)
342 .expect("catalog out of sync");
343 BuiltinTableUpdate::row(
344 &*MZ_ROLE_MEMBERS,
345 Row::pack_slice(&[
346 Datum::String(&role_id.to_string()),
347 Datum::String(&member_id.to_string()),
348 Datum::String(&grantor_id.to_string()),
349 ]),
350 diff,
351 )
352 }
353
354 pub(super) fn pack_cluster_update(
355 &self,
356 name: &str,
357 diff: Diff,
358 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
359 let id = self.clusters_by_name[name];
360 let cluster = &self.clusters_by_id[&id];
361 let row = self.pack_privilege_array_row(cluster.privileges());
362 let privileges = row.unpack_first();
363 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
364 match &cluster.config.variant {
365 ClusterVariant::Managed(config) => (
366 Some(config.size.as_str()),
367 Some(self.cluster_replica_size_has_disk(&config.size)),
368 Some(config.replication_factor),
369 if config.availability_zones.is_empty() {
370 None
371 } else {
372 Some(config.availability_zones.clone())
373 },
374 Some(config.logging.log_logging),
375 config.logging.interval.map(|d| {
376 Interval::from_duration(&d)
377 .expect("planning ensured this convertible back to interval")
378 }),
379 ),
380 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
381 };
382
383 let mut row = Row::default();
384 let mut packer = row.packer();
385 packer.extend([
386 Datum::String(&id.to_string()),
387 Datum::String(name),
388 Datum::String(&cluster.owner_id.to_string()),
389 privileges,
390 cluster.is_managed().into(),
391 size.into(),
392 replication_factor.into(),
393 disk.into(),
394 ]);
395 if let Some(azs) = azs {
396 packer.push_list(azs.iter().map(|az| Datum::String(az)));
397 } else {
398 packer.push(Datum::Null);
399 }
400 packer.push(Datum::from(introspection_debugging));
401 packer.push(Datum::from(introspection_interval));
402
403 let mut updates = Vec::new();
404
405 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
406
407 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
408 let row = match managed_config.schedule {
409 ClusterSchedule::Manual => Row::pack_slice(&[
410 Datum::String(&id.to_string()),
411 Datum::String("manual"),
412 Datum::Null,
413 ]),
414 ClusterSchedule::Refresh {
415 hydration_time_estimate,
416 } => Row::pack_slice(&[
417 Datum::String(&id.to_string()),
418 Datum::String("on-refresh"),
419 Datum::Interval(
420 Interval::from_duration(&hydration_time_estimate)
421 .expect("planning ensured that this is convertible back to Interval"),
422 ),
423 ]),
424 };
425 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
426 }
427
428 updates.push(BuiltinTableUpdate::row(
429 &*MZ_CLUSTER_WORKLOAD_CLASSES,
430 Row::pack_slice(&[
431 Datum::String(&id.to_string()),
432 Datum::from(cluster.config.workload_class.as_deref()),
433 ]),
434 diff,
435 ));
436
437 updates
438 }
439
440 pub(super) fn pack_cluster_replica_update(
441 &self,
442 cluster_id: ClusterId,
443 name: &str,
444 diff: Diff,
445 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
446 let cluster = &self.clusters_by_id[&cluster_id];
447 let id = cluster.replica_id(name).expect("Must exist");
448 let replica = cluster.replica(id).expect("Must exist");
449
450 let (size, disk, az, internal, pending) = match &replica.config.location {
451 ReplicaLocation::Managed(ManagedReplicaLocation {
454 size,
455 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
456 allocation: _,
457 billed_as: _,
458 internal,
459 pending,
460 }) => (
461 Some(&**size),
462 Some(self.cluster_replica_size_has_disk(size)),
463 Some(az.as_str()),
464 *internal,
465 *pending,
466 ),
467 ReplicaLocation::Managed(ManagedReplicaLocation {
468 size,
469 availability_zones: _,
470 allocation: _,
471 billed_as: _,
472 internal,
473 pending,
474 }) => (
475 Some(&**size),
476 Some(self.cluster_replica_size_has_disk(size)),
477 None,
478 *internal,
479 *pending,
480 ),
481 _ => (None, None, None, false, false),
482 };
483
484 let cluster_replica_update = BuiltinTableUpdate::row(
485 &*MZ_CLUSTER_REPLICAS,
486 Row::pack_slice(&[
487 Datum::String(&id.to_string()),
488 Datum::String(name),
489 Datum::String(&cluster_id.to_string()),
490 Datum::from(size),
491 Datum::from(az),
492 Datum::String(&replica.owner_id.to_string()),
493 Datum::from(disk),
494 ]),
495 diff,
496 );
497
498 let mut updates = vec![cluster_replica_update];
499
500 if internal {
501 let update = BuiltinTableUpdate::row(
502 &*MZ_INTERNAL_CLUSTER_REPLICAS,
503 Row::pack_slice(&[Datum::String(&id.to_string())]),
504 diff,
505 );
506 updates.push(update);
507 }
508
509 if pending {
510 let update = BuiltinTableUpdate::row(
511 &*MZ_PENDING_CLUSTER_REPLICAS,
512 Row::pack_slice(&[Datum::String(&id.to_string())]),
513 diff,
514 );
515 updates.push(update);
516 }
517
518 updates
519 }
520
521 pub(crate) fn pack_network_policy_update(
522 &self,
523 policy_id: &NetworkPolicyId,
524 diff: Diff,
525 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
526 let policy = self.get_network_policy(policy_id);
527 let row = self.pack_privilege_array_row(&policy.privileges);
528 let privileges = row.unpack_first();
529 let mut updates = Vec::new();
530 for ref rule in policy.rules.clone() {
531 updates.push(BuiltinTableUpdate::row(
532 &*MZ_NETWORK_POLICY_RULES,
533 Row::pack_slice(&[
534 Datum::String(&rule.name),
535 Datum::String(&policy.id.to_string()),
536 Datum::String(&rule.action.to_string()),
537 Datum::String(&rule.address.to_string()),
538 Datum::String(&rule.direction.to_string()),
539 ]),
540 diff,
541 ));
542 }
543 updates.push(BuiltinTableUpdate::row(
544 &*MZ_NETWORK_POLICIES,
545 Row::pack_slice(&[
546 Datum::String(&policy.id.to_string()),
547 Datum::String(&policy.name),
548 Datum::String(&policy.owner_id.to_string()),
549 privileges,
550 Datum::UInt32(policy.oid.clone()),
551 ]),
552 diff,
553 ));
554
555 Ok(updates)
556 }
557
558 pub(super) fn pack_item_update(
559 &self,
560 id: CatalogItemId,
561 diff: Diff,
562 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
563 let entry = self.get_entry(&id);
564 let oid = entry.oid();
565 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
566 let schema_id = &self
567 .get_schema(
568 &entry.name().qualifiers.database_spec,
569 &entry.name().qualifiers.schema_spec,
570 conn_id,
571 )
572 .id;
573 let name = &entry.name().item;
574 let owner_id = entry.owner_id();
575 let privileges_row = self.pack_privilege_array_row(entry.privileges());
576 let privileges = privileges_row.unpack_first();
577 let mut updates = match entry.item() {
578 CatalogItem::Log(_) => self.pack_source_update(
579 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
580 privileges, diff, None,
581 ),
582 CatalogItem::Index(index) => {
583 self.pack_index_update(id, oid, name, owner_id, index, diff)
584 }
585 CatalogItem::Table(table) => {
586 let mut updates = self
587 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
588
589 if let TableDataSource::DataSource {
590 desc: data_source,
591 timeline: _,
592 } = &table.data_source
593 {
594 updates.extend(match data_source {
595 DataSourceDesc::IngestionExport {
596 ingestion_id,
597 external_reference: UnresolvedItemName(external_reference),
598 details: _,
599 data_config: _,
600 } => {
601 let ingestion_entry = self
602 .get_entry(ingestion_id)
603 .source_desc()
604 .expect("primary source exists")
605 .expect("primary source is a source");
606
607 match ingestion_entry.connection.name() {
608 "postgres" => {
609 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
610 let schema_name = external_reference[1].to_ast_string_simple();
616 let table_name = external_reference[2].to_ast_string_simple();
617
618 self.pack_postgres_source_tables_update(
619 id,
620 &schema_name,
621 &table_name,
622 diff,
623 )
624 }
625 "mysql" => {
626 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
627 let schema_name = external_reference[0].to_ast_string_simple();
628 let table_name = external_reference[1].to_ast_string_simple();
629
630 self.pack_mysql_source_tables_update(
631 id,
632 &schema_name,
633 &table_name,
634 diff,
635 )
636 }
637 "sql-server" => {
638 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
639 let schema_name = external_reference[1].to_ast_string_simple();
644 let table_name = external_reference[2].to_ast_string_simple();
645
646 self.pack_sql_server_source_table_update(
647 id,
648 &schema_name,
649 &table_name,
650 diff,
651 )
652 }
653 "load-generator" => vec![],
656 "kafka" => {
657 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
658 let topic = external_reference[0].to_ast_string_simple();
659 let envelope = data_source.envelope();
660 let (key_format, value_format) = data_source.formats();
661
662 self.pack_kafka_source_tables_update(
663 id,
664 &topic,
665 envelope,
666 key_format,
667 value_format,
668 diff,
669 )
670 }
671 s => unreachable!("{s} sources do not have tables"),
672 }
673 }
674 _ => vec![],
675 });
676 }
677
678 updates
679 }
680 CatalogItem::Source(source) => {
681 let source_type = source.source_type();
682 let connection_id = source.connection_id();
683 let envelope = source.data_source.envelope();
684 let cluster_entry = match source.data_source {
685 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
688 self.get_entry(&ingestion_id)
689 }
690 _ => entry,
691 };
692
693 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
694
695 let (key_format, value_format) = source.data_source.formats();
696
697 let mut updates = self.pack_source_update(
698 id,
699 oid,
700 schema_id,
701 name,
702 source_type,
703 connection_id,
704 envelope,
705 key_format,
706 value_format,
707 cluster_id.as_deref(),
708 owner_id,
709 privileges,
710 diff,
711 source.create_sql.as_ref(),
712 );
713
714 updates.extend(match &source.data_source {
715 DataSourceDesc::Ingestion { desc, .. }
716 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
717 GenericSourceConnection::Postgres(postgres) => {
718 self.pack_postgres_source_update(id, postgres, diff)
719 }
720 GenericSourceConnection::Kafka(kafka) => {
721 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
722 }
723 _ => vec![],
724 },
725 DataSourceDesc::IngestionExport {
726 ingestion_id,
727 external_reference: UnresolvedItemName(external_reference),
728 details: _,
729 data_config: _,
730 } => {
731 let ingestion_entry = self
732 .get_entry(ingestion_id)
733 .source_desc()
734 .expect("primary source exists")
735 .expect("primary source is a source");
736
737 match ingestion_entry.connection.name() {
738 "postgres" => {
739 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
740 let schema_name = external_reference[1].to_ast_string_simple();
746 let table_name = external_reference[2].to_ast_string_simple();
747
748 self.pack_postgres_source_tables_update(
749 id,
750 &schema_name,
751 &table_name,
752 diff,
753 )
754 }
755 "mysql" => {
756 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
757 let schema_name = external_reference[0].to_ast_string_simple();
758 let table_name = external_reference[1].to_ast_string_simple();
759
760 self.pack_mysql_source_tables_update(
761 id,
762 &schema_name,
763 &table_name,
764 diff,
765 )
766 }
767 "sql-server" => {
768 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
769 let schema_name = external_reference[1].to_ast_string_simple();
774 let table_name = external_reference[2].to_ast_string_simple();
775
776 self.pack_sql_server_source_table_update(
777 id,
778 &schema_name,
779 &table_name,
780 diff,
781 )
782 }
783 "load-generator" => vec![],
786 s => unreachable!("{s} sources do not have subsources"),
787 }
788 }
789 DataSourceDesc::Webhook { .. } => {
790 vec![self.pack_webhook_source_update(id, diff)]
791 }
792 _ => vec![],
793 });
794
795 updates
796 }
797 CatalogItem::View(view) => {
798 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
799 }
800 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
801 id, oid, schema_id, name, owner_id, privileges, mview, diff,
802 ),
803 CatalogItem::Sink(sink) => {
804 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
805 }
806 CatalogItem::Type(ty) => {
807 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
808 }
809 CatalogItem::Func(func) => {
810 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
811 }
812 CatalogItem::Secret(_) => {
813 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
814 }
815 CatalogItem::Connection(connection) => self.pack_connection_update(
816 id, oid, schema_id, name, owner_id, privileges, connection, diff,
817 ),
818 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
819 id, oid, schema_id, name, owner_id, privileges, ct, diff,
820 ),
821 };
822
823 if !entry.item().is_temporary() {
824 for dependee in entry.item().references().items() {
827 updates.push(self.pack_depends_update(id, *dependee, diff))
828 }
829 }
830
831 if let Some(desc) = entry.relation_desc_latest() {
833 let defaults = match entry.item() {
834 CatalogItem::Table(Table {
835 data_source: TableDataSource::TableWrites { defaults },
836 ..
837 }) => Some(defaults),
838 _ => None,
839 };
840 for (i, (column_name, column_type)) in desc.iter().enumerate() {
841 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
842 let default: Datum = default
843 .as_ref()
844 .map(|d| Datum::String(d))
845 .unwrap_or(Datum::Null);
846 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
847 let (type_name, type_oid) = match &column_type.scalar_type {
848 SqlScalarType::List {
849 custom_id: Some(custom_id),
850 ..
851 }
852 | SqlScalarType::Map {
853 custom_id: Some(custom_id),
854 ..
855 }
856 | SqlScalarType::Record {
857 custom_id: Some(custom_id),
858 ..
859 } => {
860 let entry = self.get_entry(custom_id);
861 let name = &*entry.name().item;
876 let oid = entry.oid();
877 (name, oid)
878 }
879 _ => (pgtype.name(), pgtype.oid()),
880 };
881 updates.push(BuiltinTableUpdate::row(
882 &*MZ_COLUMNS,
883 Row::pack_slice(&[
884 Datum::String(&id.to_string()),
885 Datum::String(column_name),
886 Datum::UInt64(u64::cast_from(i + 1)),
887 Datum::from(column_type.nullable),
888 Datum::String(type_name),
889 default,
890 Datum::UInt32(type_oid),
891 Datum::Int32(pgtype.typmod()),
892 ]),
893 diff,
894 ));
895 }
896 }
897
898 if let Some(cw) = entry.item().initial_logical_compaction_window() {
900 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
901 }
902
903 updates.extend(Self::pack_item_global_id_update(entry, diff));
904
905 updates
906 }
907
908 fn pack_item_global_id_update(
909 entry: &CatalogEntry,
910 diff: Diff,
911 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
912 let id = entry.id().to_string();
913 let global_ids = entry.global_ids();
914 global_ids.map(move |global_id| {
915 BuiltinTableUpdate::row(
916 &*MZ_OBJECT_GLOBAL_IDS,
917 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
918 diff,
919 )
920 })
921 }
922
923 fn pack_history_retention_strategy_update(
924 &self,
925 id: CatalogItemId,
926 cw: CompactionWindow,
927 diff: Diff,
928 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
929 let cw: u64 = cw.comparable_timestamp().into();
930 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
931 .expect("must serialize");
932 BuiltinTableUpdate::row(
933 &*MZ_HISTORY_RETENTION_STRATEGIES,
934 Row::pack_slice(&[
935 Datum::String(&id.to_string()),
936 Datum::String("FOR"),
938 cw.into_row().into_element(),
939 ]),
940 diff,
941 )
942 }
943
944 fn pack_table_update(
945 &self,
946 id: CatalogItemId,
947 oid: u32,
948 schema_id: &SchemaSpecifier,
949 name: &str,
950 owner_id: &RoleId,
951 privileges: Datum,
952 diff: Diff,
953 table: &Table,
954 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
955 let redacted = table.create_sql.as_ref().map(|create_sql| {
956 mz_sql::parse::parse(create_sql)
957 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
958 .into_element()
959 .ast
960 .to_ast_string_redacted()
961 });
962 let source_id = if let TableDataSource::DataSource {
963 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
964 ..
965 } = &table.data_source
966 {
967 Some(ingestion_id.to_string())
968 } else {
969 None
970 };
971
972 vec![BuiltinTableUpdate::row(
973 &*MZ_TABLES,
974 Row::pack_slice(&[
975 Datum::String(&id.to_string()),
976 Datum::UInt32(oid),
977 Datum::String(&schema_id.to_string()),
978 Datum::String(name),
979 Datum::String(&owner_id.to_string()),
980 privileges,
981 if let Some(create_sql) = &table.create_sql {
982 Datum::String(create_sql)
983 } else {
984 Datum::Null
985 },
986 if let Some(redacted) = &redacted {
987 Datum::String(redacted)
988 } else {
989 Datum::Null
990 },
991 if let Some(source_id) = source_id.as_ref() {
992 Datum::String(source_id)
993 } else {
994 Datum::Null
995 },
996 ]),
997 diff,
998 )]
999 }
1000
1001 fn pack_source_update(
1002 &self,
1003 id: CatalogItemId,
1004 oid: u32,
1005 schema_id: &SchemaSpecifier,
1006 name: &str,
1007 source_desc_name: &str,
1008 connection_id: Option<CatalogItemId>,
1009 envelope: Option<&str>,
1010 key_format: Option<&str>,
1011 value_format: Option<&str>,
1012 cluster_id: Option<&str>,
1013 owner_id: &RoleId,
1014 privileges: Datum,
1015 diff: Diff,
1016 create_sql: Option<&String>,
1017 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1018 let redacted = create_sql.map(|create_sql| {
1019 let create_stmt = mz_sql::parse::parse(create_sql)
1020 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1021 .into_element()
1022 .ast;
1023 create_stmt.to_ast_string_redacted()
1024 });
1025 vec![BuiltinTableUpdate::row(
1026 &*MZ_SOURCES,
1027 Row::pack_slice(&[
1028 Datum::String(&id.to_string()),
1029 Datum::UInt32(oid),
1030 Datum::String(&schema_id.to_string()),
1031 Datum::String(name),
1032 Datum::String(source_desc_name),
1033 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1034 Datum::Null,
1037 Datum::from(envelope),
1038 Datum::from(key_format),
1039 Datum::from(value_format),
1040 Datum::from(cluster_id),
1041 Datum::String(&owner_id.to_string()),
1042 privileges,
1043 if let Some(create_sql) = create_sql {
1044 Datum::String(create_sql)
1045 } else {
1046 Datum::Null
1047 },
1048 if let Some(redacted) = &redacted {
1049 Datum::String(redacted)
1050 } else {
1051 Datum::Null
1052 },
1053 ]),
1054 diff,
1055 )]
1056 }
1057
1058 fn pack_postgres_source_update(
1059 &self,
1060 id: CatalogItemId,
1061 postgres: &PostgresSourceConnection<ReferencedConnection>,
1062 diff: Diff,
1063 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1064 vec![BuiltinTableUpdate::row(
1065 &*MZ_POSTGRES_SOURCES,
1066 Row::pack_slice(&[
1067 Datum::String(&id.to_string()),
1068 Datum::String(&postgres.publication_details.slot),
1069 Datum::from(postgres.publication_details.timeline_id),
1070 ]),
1071 diff,
1072 )]
1073 }
1074
1075 fn pack_kafka_source_update(
1076 &self,
1077 item_id: CatalogItemId,
1078 collection_id: GlobalId,
1079 kafka: &KafkaSourceConnection<ReferencedConnection>,
1080 diff: Diff,
1081 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1082 vec![BuiltinTableUpdate::row(
1083 &*MZ_KAFKA_SOURCES,
1084 Row::pack_slice(&[
1085 Datum::String(&item_id.to_string()),
1086 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1087 Datum::String(&kafka.topic),
1088 ]),
1089 diff,
1090 )]
1091 }
1092
1093 fn pack_postgres_source_tables_update(
1094 &self,
1095 id: CatalogItemId,
1096 schema_name: &str,
1097 table_name: &str,
1098 diff: Diff,
1099 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1100 vec![BuiltinTableUpdate::row(
1101 &*MZ_POSTGRES_SOURCE_TABLES,
1102 Row::pack_slice(&[
1103 Datum::String(&id.to_string()),
1104 Datum::String(schema_name),
1105 Datum::String(table_name),
1106 ]),
1107 diff,
1108 )]
1109 }
1110
1111 fn pack_mysql_source_tables_update(
1112 &self,
1113 id: CatalogItemId,
1114 schema_name: &str,
1115 table_name: &str,
1116 diff: Diff,
1117 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1118 vec![BuiltinTableUpdate::row(
1119 &*MZ_MYSQL_SOURCE_TABLES,
1120 Row::pack_slice(&[
1121 Datum::String(&id.to_string()),
1122 Datum::String(schema_name),
1123 Datum::String(table_name),
1124 ]),
1125 diff,
1126 )]
1127 }
1128
1129 fn pack_sql_server_source_table_update(
1130 &self,
1131 id: CatalogItemId,
1132 schema_name: &str,
1133 table_name: &str,
1134 diff: Diff,
1135 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1136 vec![BuiltinTableUpdate::row(
1137 &*MZ_SQL_SERVER_SOURCE_TABLES,
1138 Row::pack_slice(&[
1139 Datum::String(&id.to_string()),
1140 Datum::String(schema_name),
1141 Datum::String(table_name),
1142 ]),
1143 diff,
1144 )]
1145 }
1146
1147 fn pack_kafka_source_tables_update(
1148 &self,
1149 id: CatalogItemId,
1150 topic: &str,
1151 envelope: Option<&str>,
1152 key_format: Option<&str>,
1153 value_format: Option<&str>,
1154 diff: Diff,
1155 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1156 vec![BuiltinTableUpdate::row(
1157 &*MZ_KAFKA_SOURCE_TABLES,
1158 Row::pack_slice(&[
1159 Datum::String(&id.to_string()),
1160 Datum::String(topic),
1161 Datum::from(envelope),
1162 Datum::from(key_format),
1163 Datum::from(value_format),
1164 ]),
1165 diff,
1166 )]
1167 }
1168
1169 fn pack_connection_update(
1170 &self,
1171 id: CatalogItemId,
1172 oid: u32,
1173 schema_id: &SchemaSpecifier,
1174 name: &str,
1175 owner_id: &RoleId,
1176 privileges: Datum,
1177 connection: &Connection,
1178 diff: Diff,
1179 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1180 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1181 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1182 .into_element()
1183 .ast;
1184 let mut updates = vec![BuiltinTableUpdate::row(
1185 &*MZ_CONNECTIONS,
1186 Row::pack_slice(&[
1187 Datum::String(&id.to_string()),
1188 Datum::UInt32(oid),
1189 Datum::String(&schema_id.to_string()),
1190 Datum::String(name),
1191 Datum::String(match connection.details {
1192 ConnectionDetails::Kafka { .. } => "kafka",
1193 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1194 ConnectionDetails::Postgres { .. } => "postgres",
1195 ConnectionDetails::Aws(..) => "aws",
1196 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1197 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1198 ConnectionDetails::MySql { .. } => "mysql",
1199 ConnectionDetails::SqlServer(_) => "sql-server",
1200 ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1201 }),
1202 Datum::String(&owner_id.to_string()),
1203 privileges,
1204 Datum::String(&connection.create_sql),
1205 Datum::String(&create_stmt.to_ast_string_redacted()),
1206 ]),
1207 diff,
1208 )];
1209 match connection.details {
1210 ConnectionDetails::Kafka(ref kafka) => {
1211 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1212 }
1213 ConnectionDetails::Aws(ref aws_config) => {
1214 match self.pack_aws_connection_update(id, aws_config, diff) {
1215 Ok(update) => {
1216 updates.push(update);
1217 }
1218 Err(e) => {
1219 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1220 }
1221 }
1222 }
1223 ConnectionDetails::AwsPrivatelink(_) => {
1224 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1225 updates.push(self.pack_aws_privatelink_connection_update(
1226 id,
1227 aws_principal_context,
1228 diff,
1229 ));
1230 } else {
1231 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1232 }
1233 }
1234 ConnectionDetails::Ssh {
1235 ref key_1,
1236 ref key_2,
1237 ..
1238 } => {
1239 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1240 }
1241 ConnectionDetails::Csr(_)
1242 | ConnectionDetails::Postgres(_)
1243 | ConnectionDetails::MySql(_)
1244 | ConnectionDetails::SqlServer(_)
1245 | ConnectionDetails::IcebergCatalog(_) => (),
1246 };
1247 updates
1248 }
1249
1250 pub(crate) fn pack_ssh_tunnel_connection_update(
1251 &self,
1252 id: CatalogItemId,
1253 key_1: &SshKey,
1254 key_2: &SshKey,
1255 diff: Diff,
1256 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1257 BuiltinTableUpdate::row(
1258 &*MZ_SSH_TUNNEL_CONNECTIONS,
1259 Row::pack_slice(&[
1260 Datum::String(&id.to_string()),
1261 Datum::String(key_1.public_key().as_str()),
1262 Datum::String(key_2.public_key().as_str()),
1263 ]),
1264 diff,
1265 )
1266 }
1267
1268 fn pack_kafka_connection_update(
1269 &self,
1270 id: CatalogItemId,
1271 kafka: &KafkaConnection<ReferencedConnection>,
1272 diff: Diff,
1273 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1274 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1275 let mut row = Row::default();
1276 row.packer()
1277 .try_push_array(
1278 &[ArrayDimension {
1279 lower_bound: 1,
1280 length: kafka.brokers.len(),
1281 }],
1282 kafka
1283 .brokers
1284 .iter()
1285 .map(|broker| Datum::String(&broker.address)),
1286 )
1287 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1288 let brokers = row.unpack_first();
1289 vec![BuiltinTableUpdate::row(
1290 &*MZ_KAFKA_CONNECTIONS,
1291 Row::pack_slice(&[
1292 Datum::String(&id.to_string()),
1293 brokers,
1294 Datum::String(&progress_topic),
1295 ]),
1296 diff,
1297 )]
1298 }
1299
1300 pub fn pack_aws_privatelink_connection_update(
1301 &self,
1302 connection_id: CatalogItemId,
1303 aws_principal_context: &AwsPrincipalContext,
1304 diff: Diff,
1305 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1306 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1307 let row = Row::pack_slice(&[
1308 Datum::String(&connection_id.to_string()),
1309 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1310 ]);
1311 BuiltinTableUpdate::row(id, row, diff)
1312 }
1313
1314 pub fn pack_aws_connection_update(
1315 &self,
1316 connection_id: CatalogItemId,
1317 aws_config: &AwsConnection,
1318 diff: Diff,
1319 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1320 let id = &MZ_AWS_CONNECTIONS;
1321
1322 let mut access_key_id = None;
1323 let mut access_key_id_secret_id = None;
1324 let mut secret_access_key_secret_id = None;
1325 let mut session_token = None;
1326 let mut session_token_secret_id = None;
1327 let mut assume_role_arn = None;
1328 let mut assume_role_session_name = None;
1329 let mut principal = None;
1330 let mut external_id = None;
1331 let mut example_trust_policy = None;
1332 match &aws_config.auth {
1333 AwsAuth::Credentials(credentials) => {
1334 match &credentials.access_key_id {
1335 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1336 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1337 }
1338 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1339 match credentials.session_token.as_ref() {
1340 None => (),
1341 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1342 Some(StringOrSecret::Secret(s)) => {
1343 session_token_secret_id = Some(s.to_string())
1344 }
1345 }
1346 }
1347 AwsAuth::AssumeRole(assume_role) => {
1348 assume_role_arn = Some(assume_role.arn.as_str());
1349 assume_role_session_name = assume_role.session_name.as_deref();
1350 principal = self
1351 .config
1352 .connection_context
1353 .aws_connection_role_arn
1354 .as_deref();
1355 external_id =
1356 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1357 example_trust_policy = {
1358 let policy = assume_role
1359 .example_trust_policy(&self.config.connection_context, connection_id)?;
1360 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1361 Some(policy.into_row())
1362 };
1363 }
1364 }
1365
1366 let row = Row::pack_slice(&[
1367 Datum::String(&connection_id.to_string()),
1368 Datum::from(aws_config.endpoint.as_deref()),
1369 Datum::from(aws_config.region.as_deref()),
1370 Datum::from(access_key_id),
1371 Datum::from(access_key_id_secret_id.as_deref()),
1372 Datum::from(secret_access_key_secret_id.as_deref()),
1373 Datum::from(session_token),
1374 Datum::from(session_token_secret_id.as_deref()),
1375 Datum::from(assume_role_arn),
1376 Datum::from(assume_role_session_name),
1377 Datum::from(principal),
1378 Datum::from(external_id.as_deref()),
1379 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1380 ]);
1381
1382 Ok(BuiltinTableUpdate::row(id, row, diff))
1383 }
1384
1385 fn pack_view_update(
1386 &self,
1387 id: CatalogItemId,
1388 oid: u32,
1389 schema_id: &SchemaSpecifier,
1390 name: &str,
1391 owner_id: &RoleId,
1392 privileges: Datum,
1393 view: &View,
1394 diff: Diff,
1395 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1396 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1397 .unwrap_or_else(|e| {
1398 panic!(
1399 "create_sql cannot be invalid: `{}` --- error: `{}`",
1400 view.create_sql, e
1401 )
1402 })
1403 .into_element()
1404 .ast;
1405 let query = match &create_stmt {
1406 Statement::CreateView(stmt) => &stmt.definition.query,
1407 _ => unreachable!(),
1408 };
1409
1410 let mut query_string = query.to_ast_string_stable();
1411 query_string.push(';');
1414
1415 vec![BuiltinTableUpdate::row(
1416 &*MZ_VIEWS,
1417 Row::pack_slice(&[
1418 Datum::String(&id.to_string()),
1419 Datum::UInt32(oid),
1420 Datum::String(&schema_id.to_string()),
1421 Datum::String(name),
1422 Datum::String(&query_string),
1423 Datum::String(&owner_id.to_string()),
1424 privileges,
1425 Datum::String(&view.create_sql),
1426 Datum::String(&create_stmt.to_ast_string_redacted()),
1427 ]),
1428 diff,
1429 )]
1430 }
1431
1432 fn pack_materialized_view_update(
1433 &self,
1434 id: CatalogItemId,
1435 oid: u32,
1436 schema_id: &SchemaSpecifier,
1437 name: &str,
1438 owner_id: &RoleId,
1439 privileges: Datum,
1440 mview: &MaterializedView,
1441 diff: Diff,
1442 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1443 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1444 .unwrap_or_else(|e| {
1445 panic!(
1446 "create_sql cannot be invalid: `{}` --- error: `{}`",
1447 mview.create_sql, e
1448 )
1449 })
1450 .into_element()
1451 .ast;
1452 let query_string = match &create_stmt {
1453 Statement::CreateMaterializedView(stmt) => {
1454 let mut query_string = stmt.query.to_ast_string_stable();
1455 query_string.push(';');
1458 query_string
1459 }
1460 _ => unreachable!(),
1461 };
1462
1463 let mut updates = Vec::new();
1464
1465 updates.push(BuiltinTableUpdate::row(
1466 &*MZ_MATERIALIZED_VIEWS,
1467 Row::pack_slice(&[
1468 Datum::String(&id.to_string()),
1469 Datum::UInt32(oid),
1470 Datum::String(&schema_id.to_string()),
1471 Datum::String(name),
1472 Datum::String(&mview.cluster_id.to_string()),
1473 Datum::String(&query_string),
1474 Datum::String(&owner_id.to_string()),
1475 privileges,
1476 Datum::String(&mview.create_sql),
1477 Datum::String(&create_stmt.to_ast_string_redacted()),
1478 ]),
1479 diff,
1480 ));
1481
1482 if let Some(refresh_schedule) = &mview.refresh_schedule {
1483 assert!(!refresh_schedule.is_empty());
1486 for RefreshEvery {
1487 interval,
1488 aligned_to,
1489 } in refresh_schedule.everies.iter()
1490 {
1491 let aligned_to_dt = mz_ore::now::to_datetime(
1492 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1493 );
1494 updates.push(BuiltinTableUpdate::row(
1495 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1496 Row::pack_slice(&[
1497 Datum::String(&id.to_string()),
1498 Datum::String("every"),
1499 Datum::Interval(
1500 Interval::from_duration(interval).expect(
1501 "planning ensured that this is convertible back to Interval",
1502 ),
1503 ),
1504 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1505 Datum::Null,
1506 ]),
1507 diff,
1508 ));
1509 }
1510 for at in refresh_schedule.ats.iter() {
1511 let at_dt = mz_ore::now::to_datetime(
1512 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1513 );
1514 updates.push(BuiltinTableUpdate::row(
1515 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1516 Row::pack_slice(&[
1517 Datum::String(&id.to_string()),
1518 Datum::String("at"),
1519 Datum::Null,
1520 Datum::Null,
1521 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1522 ]),
1523 diff,
1524 ));
1525 }
1526 } else {
1527 updates.push(BuiltinTableUpdate::row(
1528 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1529 Row::pack_slice(&[
1530 Datum::String(&id.to_string()),
1531 Datum::String("on-commit"),
1532 Datum::Null,
1533 Datum::Null,
1534 Datum::Null,
1535 ]),
1536 diff,
1537 ));
1538 }
1539
1540 if let Some(target_id) = mview.replacement_target {
1541 updates.push(BuiltinTableUpdate::row(
1542 &*MZ_REPLACEMENTS,
1543 Row::pack_slice(&[
1544 Datum::String(&id.to_string()),
1545 Datum::String(&target_id.to_string()),
1546 ]),
1547 diff,
1548 ));
1549 }
1550
1551 updates
1552 }
1553
1554 fn pack_continual_task_update(
1555 &self,
1556 id: CatalogItemId,
1557 oid: u32,
1558 schema_id: &SchemaSpecifier,
1559 name: &str,
1560 owner_id: &RoleId,
1561 privileges: Datum,
1562 ct: &ContinualTask,
1563 diff: Diff,
1564 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1565 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1566 .unwrap_or_else(|e| {
1567 panic!(
1568 "create_sql cannot be invalid: `{}` --- error: `{}`",
1569 ct.create_sql, e
1570 )
1571 })
1572 .into_element()
1573 .ast;
1574 let query_string = match &create_stmt {
1575 Statement::CreateContinualTask(stmt) => {
1576 let mut query_string = String::new();
1577 for stmt in &stmt.stmts {
1578 let s = match stmt {
1579 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1580 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1581 };
1582 if query_string.is_empty() {
1583 query_string = s;
1584 } else {
1585 query_string.push_str("; ");
1586 query_string.push_str(&s);
1587 }
1588 }
1589 query_string
1590 }
1591 _ => unreachable!(),
1592 };
1593
1594 vec![BuiltinTableUpdate::row(
1595 &*MZ_CONTINUAL_TASKS,
1596 Row::pack_slice(&[
1597 Datum::String(&id.to_string()),
1598 Datum::UInt32(oid),
1599 Datum::String(&schema_id.to_string()),
1600 Datum::String(name),
1601 Datum::String(&ct.cluster_id.to_string()),
1602 Datum::String(&query_string),
1603 Datum::String(&owner_id.to_string()),
1604 privileges,
1605 Datum::String(&ct.create_sql),
1606 Datum::String(&create_stmt.to_ast_string_redacted()),
1607 ]),
1608 diff,
1609 )]
1610 }
1611
1612 fn pack_sink_update(
1613 &self,
1614 id: CatalogItemId,
1615 oid: u32,
1616 schema_id: &SchemaSpecifier,
1617 name: &str,
1618 owner_id: &RoleId,
1619 sink: &Sink,
1620 diff: Diff,
1621 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1622 let mut updates = vec![];
1623 match &sink.connection {
1624 StorageSinkConnection::Kafka(KafkaSinkConnection {
1625 topic: topic_name, ..
1626 }) => {
1627 updates.push(BuiltinTableUpdate::row(
1628 &*MZ_KAFKA_SINKS,
1629 Row::pack_slice(&[
1630 Datum::String(&id.to_string()),
1631 Datum::String(topic_name.as_str()),
1632 ]),
1633 diff,
1634 ));
1635 }
1636 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1637 namespace, table, ..
1638 }) => {
1639 updates.push(BuiltinTableUpdate::row(
1640 &*MZ_ICEBERG_SINKS,
1641 Row::pack_slice(&[
1642 Datum::String(&id.to_string()),
1643 Datum::String(namespace.as_str()),
1644 Datum::String(table.as_str()),
1645 ]),
1646 diff,
1647 ));
1648 }
1649 };
1650
1651 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1652 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1653 .into_element()
1654 .ast;
1655
1656 let envelope = sink.envelope();
1657
1658 let combined_format = sink.combined_format();
1660 let (key_format, value_format) = match sink.formats() {
1661 Some((key_format, value_format)) => (key_format, Some(value_format)),
1662 None => (None, None),
1663 };
1664
1665 updates.push(BuiltinTableUpdate::row(
1666 &*MZ_SINKS,
1667 Row::pack_slice(&[
1668 Datum::String(&id.to_string()),
1669 Datum::UInt32(oid),
1670 Datum::String(&schema_id.to_string()),
1671 Datum::String(name),
1672 Datum::String(sink.connection.name()),
1673 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1674 Datum::Null,
1676 Datum::from(envelope),
1677 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1680 Datum::from(key_format),
1681 Datum::from(value_format),
1682 Datum::String(&sink.cluster_id.to_string()),
1683 Datum::String(&owner_id.to_string()),
1684 Datum::String(&sink.create_sql),
1685 Datum::String(&create_stmt.to_ast_string_redacted()),
1686 ]),
1687 diff,
1688 ));
1689
1690 updates
1691 }
1692
1693 fn pack_index_update(
1694 &self,
1695 id: CatalogItemId,
1696 oid: u32,
1697 name: &str,
1698 owner_id: &RoleId,
1699 index: &Index,
1700 diff: Diff,
1701 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1702 let mut updates = vec![];
1703
1704 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1705 .unwrap_or_else(|e| {
1706 panic!(
1707 "create_sql cannot be invalid: `{}` --- error: `{}`",
1708 index.create_sql, e
1709 )
1710 })
1711 .into_element()
1712 .ast;
1713
1714 let key_sqls = match &create_stmt {
1715 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1716 .as_ref()
1717 .expect("key_parts is filled in during planning"),
1718 _ => unreachable!(),
1719 };
1720 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1721
1722 updates.push(BuiltinTableUpdate::row(
1723 &*MZ_INDEXES,
1724 Row::pack_slice(&[
1725 Datum::String(&id.to_string()),
1726 Datum::UInt32(oid),
1727 Datum::String(name),
1728 Datum::String(&on_item_id.to_string()),
1729 Datum::String(&index.cluster_id.to_string()),
1730 Datum::String(&owner_id.to_string()),
1731 Datum::String(&index.create_sql),
1732 Datum::String(&create_stmt.to_ast_string_redacted()),
1733 ]),
1734 diff,
1735 ));
1736
1737 for (i, key) in index.keys.iter().enumerate() {
1738 let on_entry = self.get_entry_by_global_id(&index.on);
1739 let on_desc = on_entry
1740 .relation_desc()
1741 .expect("can only create indexes on items with a valid description");
1742 let nullable = key.typ(&on_desc.typ().column_types).nullable;
1743 let seq_in_index = u64::cast_from(i + 1);
1744 let key_sql = key_sqls
1745 .get(i)
1746 .expect("missing sql information for index key")
1747 .to_ast_string_simple();
1748 let (field_number, expression) = match key {
1749 MirScalarExpr::Column(col, _) => {
1750 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1751 }
1752 _ => (Datum::Null, Datum::String(&key_sql)),
1753 };
1754 updates.push(BuiltinTableUpdate::row(
1755 &*MZ_INDEX_COLUMNS,
1756 Row::pack_slice(&[
1757 Datum::String(&id.to_string()),
1758 Datum::UInt64(seq_in_index),
1759 field_number,
1760 expression,
1761 Datum::from(nullable),
1762 ]),
1763 diff,
1764 ));
1765 }
1766
1767 updates
1768 }
1769
1770 fn pack_type_update(
1771 &self,
1772 id: CatalogItemId,
1773 oid: u32,
1774 schema_id: &SchemaSpecifier,
1775 name: &str,
1776 owner_id: &RoleId,
1777 privileges: Datum,
1778 typ: &Type,
1779 diff: Diff,
1780 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1781 let mut out = vec![];
1782
1783 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1784 mz_sql::parse::parse(create_sql)
1785 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1786 .into_element()
1787 .ast
1788 .to_ast_string_redacted()
1789 });
1790
1791 out.push(BuiltinTableUpdate::row(
1792 &*MZ_TYPES,
1793 Row::pack_slice(&[
1794 Datum::String(&id.to_string()),
1795 Datum::UInt32(oid),
1796 Datum::String(&schema_id.to_string()),
1797 Datum::String(name),
1798 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1799 Datum::String(&owner_id.to_string()),
1800 privileges,
1801 if let Some(create_sql) = &typ.create_sql {
1802 Datum::String(create_sql)
1803 } else {
1804 Datum::Null
1805 },
1806 if let Some(redacted) = &redacted {
1807 Datum::String(redacted)
1808 } else {
1809 Datum::Null
1810 },
1811 ]),
1812 diff,
1813 ));
1814
1815 let mut row = Row::default();
1816 let mut packer = row.packer();
1817
1818 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1819 if mods.is_empty() {
1820 packer.push(Datum::Null);
1821 } else {
1822 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1823 }
1824 }
1825
1826 let index_id = match &typ.details.typ {
1827 CatalogType::Array {
1828 element_reference: element_id,
1829 } => {
1830 packer.push(Datum::String(&id.to_string()));
1831 packer.push(Datum::String(&element_id.to_string()));
1832 &MZ_ARRAY_TYPES
1833 }
1834 CatalogType::List {
1835 element_reference: element_id,
1836 element_modifiers,
1837 } => {
1838 packer.push(Datum::String(&id.to_string()));
1839 packer.push(Datum::String(&element_id.to_string()));
1840 append_modifier(&mut packer, element_modifiers);
1841 &MZ_LIST_TYPES
1842 }
1843 CatalogType::Map {
1844 key_reference: key_id,
1845 value_reference: value_id,
1846 key_modifiers,
1847 value_modifiers,
1848 } => {
1849 packer.push(Datum::String(&id.to_string()));
1850 packer.push(Datum::String(&key_id.to_string()));
1851 packer.push(Datum::String(&value_id.to_string()));
1852 append_modifier(&mut packer, key_modifiers);
1853 append_modifier(&mut packer, value_modifiers);
1854 &MZ_MAP_TYPES
1855 }
1856 CatalogType::Pseudo => {
1857 packer.push(Datum::String(&id.to_string()));
1858 &MZ_PSEUDO_TYPES
1859 }
1860 _ => {
1861 packer.push(Datum::String(&id.to_string()));
1862 &MZ_BASE_TYPES
1863 }
1864 };
1865 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1866
1867 if let Some(pg_metadata) = &typ.details.pg_metadata {
1868 out.push(BuiltinTableUpdate::row(
1869 &*MZ_TYPE_PG_METADATA,
1870 Row::pack_slice(&[
1871 Datum::String(&id.to_string()),
1872 Datum::UInt32(pg_metadata.typinput_oid),
1873 Datum::UInt32(pg_metadata.typreceive_oid),
1874 ]),
1875 diff,
1876 ));
1877 }
1878
1879 out
1880 }
1881
1882 fn pack_func_update(
1883 &self,
1884 id: CatalogItemId,
1885 schema_id: &SchemaSpecifier,
1886 name: &str,
1887 owner_id: &RoleId,
1888 func: &Func,
1889 diff: Diff,
1890 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1891 let mut updates = vec![];
1892 for func_impl_details in func.inner.func_impls() {
1893 let arg_type_ids = func_impl_details
1894 .arg_typs
1895 .iter()
1896 .map(|typ| self.get_system_type(typ).id().to_string())
1897 .collect::<Vec<_>>();
1898
1899 let mut row = Row::default();
1900 row.packer()
1901 .try_push_array(
1902 &[ArrayDimension {
1903 lower_bound: 1,
1904 length: arg_type_ids.len(),
1905 }],
1906 arg_type_ids.iter().map(|id| Datum::String(id)),
1907 )
1908 .expect(
1909 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1910 );
1911 let arg_type_ids = row.unpack_first();
1912
1913 updates.push(BuiltinTableUpdate::row(
1914 &*MZ_FUNCTIONS,
1915 Row::pack_slice(&[
1916 Datum::String(&id.to_string()),
1917 Datum::UInt32(func_impl_details.oid),
1918 Datum::String(&schema_id.to_string()),
1919 Datum::String(name),
1920 arg_type_ids,
1921 Datum::from(
1922 func_impl_details
1923 .variadic_typ
1924 .map(|typ| self.get_system_type(typ).id().to_string())
1925 .as_deref(),
1926 ),
1927 Datum::from(
1928 func_impl_details
1929 .return_typ
1930 .map(|typ| self.get_system_type(typ).id().to_string())
1931 .as_deref(),
1932 ),
1933 func_impl_details.return_is_set.into(),
1934 Datum::String(&owner_id.to_string()),
1935 ]),
1936 diff,
1937 ));
1938
1939 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1940 updates.push(BuiltinTableUpdate::row(
1941 &*MZ_AGGREGATES,
1942 Row::pack_slice(&[
1943 Datum::UInt32(func_impl_details.oid),
1944 Datum::String("n"),
1946 Datum::Int16(0),
1947 ]),
1948 diff,
1949 ));
1950 }
1951 }
1952 updates
1953 }
1954
1955 pub fn pack_op_update(
1956 &self,
1957 operator: &str,
1958 func_impl_details: FuncImplCatalogDetails,
1959 diff: Diff,
1960 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1961 let arg_type_ids = func_impl_details
1962 .arg_typs
1963 .iter()
1964 .map(|typ| self.get_system_type(typ).id().to_string())
1965 .collect::<Vec<_>>();
1966
1967 let mut row = Row::default();
1968 row.packer()
1969 .try_push_array(
1970 &[ArrayDimension {
1971 lower_bound: 1,
1972 length: arg_type_ids.len(),
1973 }],
1974 arg_type_ids.iter().map(|id| Datum::String(id)),
1975 )
1976 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1977 let arg_type_ids = row.unpack_first();
1978
1979 BuiltinTableUpdate::row(
1980 &*MZ_OPERATORS,
1981 Row::pack_slice(&[
1982 Datum::UInt32(func_impl_details.oid),
1983 Datum::String(operator),
1984 arg_type_ids,
1985 Datum::from(
1986 func_impl_details
1987 .return_typ
1988 .map(|typ| self.get_system_type(typ).id().to_string())
1989 .as_deref(),
1990 ),
1991 ]),
1992 diff,
1993 )
1994 }
1995
1996 fn pack_secret_update(
1997 &self,
1998 id: CatalogItemId,
1999 oid: u32,
2000 schema_id: &SchemaSpecifier,
2001 name: &str,
2002 owner_id: &RoleId,
2003 privileges: Datum,
2004 diff: Diff,
2005 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2006 vec![BuiltinTableUpdate::row(
2007 &*MZ_SECRETS,
2008 Row::pack_slice(&[
2009 Datum::String(&id.to_string()),
2010 Datum::UInt32(oid),
2011 Datum::String(&schema_id.to_string()),
2012 Datum::String(name),
2013 Datum::String(&owner_id.to_string()),
2014 privileges,
2015 ]),
2016 diff,
2017 )]
2018 }
2019
2020 pub fn pack_audit_log_update(
2021 &self,
2022 event: &VersionedEvent,
2023 diff: Diff,
2024 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2025 let (event_type, object_type, details, user, occurred_at): (
2026 &EventType,
2027 &ObjectType,
2028 &EventDetails,
2029 &Option<String>,
2030 u64,
2031 ) = match event {
2032 VersionedEvent::V1(ev) => (
2033 &ev.event_type,
2034 &ev.object_type,
2035 &ev.details,
2036 &ev.user,
2037 ev.occurred_at,
2038 ),
2039 };
2040 let details = Jsonb::from_serde_json(details.as_json())
2041 .map_err(|e| {
2042 Error::new(ErrorKind::Unstructured(format!(
2043 "could not pack audit log update: {}",
2044 e
2045 )))
2046 })?
2047 .into_row();
2048 let details = details
2049 .iter()
2050 .next()
2051 .expect("details created above with a single jsonb column");
2052 let dt = mz_ore::now::to_datetime(occurred_at);
2053 let id = event.sortable_id();
2054 Ok(BuiltinTableUpdate::row(
2055 &*MZ_AUDIT_EVENTS,
2056 Row::pack_slice(&[
2057 Datum::UInt64(id),
2058 Datum::String(&format!("{}", event_type)),
2059 Datum::String(&format!("{}", object_type)),
2060 details,
2061 match user {
2062 Some(user) => Datum::String(user),
2063 None => Datum::Null,
2064 },
2065 Datum::TimestampTz(dt.try_into().expect("must fit")),
2066 ]),
2067 diff,
2068 ))
2069 }
2070
2071 pub fn pack_storage_usage_update(
2072 &self,
2073 VersionedStorageUsage::V1(event): VersionedStorageUsage,
2074 diff: Diff,
2075 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2076 let id = &MZ_STORAGE_USAGE_BY_SHARD;
2077 let row = Row::pack_slice(&[
2078 Datum::UInt64(event.id),
2079 Datum::from(event.shard_id.as_deref()),
2080 Datum::UInt64(event.size_bytes),
2081 Datum::TimestampTz(
2082 mz_ore::now::to_datetime(event.collection_timestamp)
2083 .try_into()
2084 .expect("must fit"),
2085 ),
2086 ]);
2087 BuiltinTableUpdate::row(id, row, diff)
2088 }
2089
2090 pub fn pack_egress_ip_update(
2091 &self,
2092 ip: &IpNet,
2093 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2094 let id = &MZ_EGRESS_IPS;
2095 let addr = ip.addr();
2096 let row = Row::pack_slice(&[
2097 Datum::String(&addr.to_string()),
2098 Datum::Int32(ip.prefix_len().into()),
2099 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2100 ]);
2101 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2102 }
2103
2104 pub fn pack_license_key_update(
2105 &self,
2106 license_key: &ValidatedLicenseKey,
2107 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2108 let id = &MZ_LICENSE_KEYS;
2109 let row = Row::pack_slice(&[
2110 Datum::String(&license_key.id),
2111 Datum::String(&license_key.organization),
2112 Datum::String(&license_key.environment_id),
2113 Datum::TimestampTz(
2114 mz_ore::now::to_datetime(license_key.expiration * 1000)
2115 .try_into()
2116 .expect("must fit"),
2117 ),
2118 Datum::TimestampTz(
2119 mz_ore::now::to_datetime(license_key.not_before * 1000)
2120 .try_into()
2121 .expect("must fit"),
2122 ),
2123 ]);
2124 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2125 }
2126
2127 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2128 let mut updates = Vec::new();
2129 for (size, alloc) in &self.cluster_replica_sizes.0 {
2130 if alloc.disabled {
2131 continue;
2132 }
2133
2134 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2137 let MemoryLimit(ByteSize(memory_bytes)) =
2138 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2139 let DiskLimit(ByteSize(disk_bytes)) =
2140 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2141
2142 let row = Row::pack_slice(&[
2143 size.as_str().into(),
2144 u64::cast_from(alloc.scale).into(),
2145 u64::cast_from(alloc.workers).into(),
2146 cpu_limit.as_nanocpus().into(),
2147 memory_bytes.into(),
2148 disk_bytes.into(),
2149 (alloc.credits_per_hour).into(),
2150 ]);
2151
2152 updates.push(BuiltinTableUpdate::row(
2153 &*MZ_CLUSTER_REPLICA_SIZES,
2154 row,
2155 Diff::ONE,
2156 ));
2157 }
2158
2159 updates
2160 }
2161
2162 pub fn pack_subscribe_update(
2163 &self,
2164 id: GlobalId,
2165 subscribe: &ActiveSubscribe,
2166 diff: Diff,
2167 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2168 let mut row = Row::default();
2169 let mut packer = row.packer();
2170 packer.push(Datum::String(&id.to_string()));
2171 packer.push(Datum::Uuid(subscribe.session_uuid));
2172 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2173
2174 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2175 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2176
2177 let depends_on: Vec<_> = subscribe
2178 .depends_on
2179 .iter()
2180 .map(|id| id.to_string())
2181 .collect();
2182 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2183
2184 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2185 }
2186
2187 pub fn pack_session_update(
2188 &self,
2189 conn: &ConnMeta,
2190 diff: Diff,
2191 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2192 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2193 BuiltinTableUpdate::row(
2194 &*MZ_SESSIONS,
2195 Row::pack_slice(&[
2196 Datum::Uuid(conn.uuid()),
2197 Datum::UInt32(conn.conn_id().unhandled()),
2198 Datum::String(&conn.authenticated_role_id().to_string()),
2199 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2200 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2201 ]),
2202 diff,
2203 )
2204 }
2205
2206 pub fn pack_default_privileges_update(
2207 &self,
2208 default_privilege_object: &DefaultPrivilegeObject,
2209 grantee: &RoleId,
2210 acl_mode: &AclMode,
2211 diff: Diff,
2212 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2213 BuiltinTableUpdate::row(
2214 &*MZ_DEFAULT_PRIVILEGES,
2215 Row::pack_slice(&[
2216 default_privilege_object.role_id.to_string().as_str().into(),
2217 default_privilege_object
2218 .database_id
2219 .map(|database_id| database_id.to_string())
2220 .as_deref()
2221 .into(),
2222 default_privilege_object
2223 .schema_id
2224 .map(|schema_id| schema_id.to_string())
2225 .as_deref()
2226 .into(),
2227 default_privilege_object
2228 .object_type
2229 .to_string()
2230 .to_lowercase()
2231 .as_str()
2232 .into(),
2233 grantee.to_string().as_str().into(),
2234 acl_mode.to_string().as_str().into(),
2235 ]),
2236 diff,
2237 )
2238 }
2239
2240 pub fn pack_system_privileges_update(
2241 &self,
2242 privileges: MzAclItem,
2243 diff: Diff,
2244 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2245 BuiltinTableUpdate::row(
2246 &*MZ_SYSTEM_PRIVILEGES,
2247 Row::pack_slice(&[privileges.into()]),
2248 diff,
2249 )
2250 }
2251
2252 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2253 let mut row = Row::default();
2254 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2255 row.packer()
2256 .try_push_array(
2257 &[ArrayDimension {
2258 lower_bound: 1,
2259 length: flat_privileges.len(),
2260 }],
2261 flat_privileges
2262 .into_iter()
2263 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2264 )
2265 .expect("privileges is 1 dimensional, and its length is used for the array length");
2266 row
2267 }
2268
2269 pub fn pack_comment_update(
2270 &self,
2271 object_id: CommentObjectId,
2272 column_pos: Option<usize>,
2273 comment: &str,
2274 diff: Diff,
2275 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2276 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2278 let audit_type = super::object_type_to_audit_object_type(object_type);
2279 let object_type_str = audit_type.to_string();
2280
2281 let object_id_str = match object_id {
2282 CommentObjectId::Table(global_id)
2283 | CommentObjectId::View(global_id)
2284 | CommentObjectId::MaterializedView(global_id)
2285 | CommentObjectId::Source(global_id)
2286 | CommentObjectId::Sink(global_id)
2287 | CommentObjectId::Index(global_id)
2288 | CommentObjectId::Func(global_id)
2289 | CommentObjectId::Connection(global_id)
2290 | CommentObjectId::Secret(global_id)
2291 | CommentObjectId::Type(global_id)
2292 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2293 CommentObjectId::Role(role_id) => role_id.to_string(),
2294 CommentObjectId::Database(database_id) => database_id.to_string(),
2295 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2296 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2297 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2298 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2299 };
2300 let column_pos_datum = match column_pos {
2301 Some(pos) => {
2302 let pos =
2304 i32::try_from(pos).expect("we constrain this value in the planning layer");
2305 Datum::Int32(pos)
2306 }
2307 None => Datum::Null,
2308 };
2309
2310 BuiltinTableUpdate::row(
2311 &*MZ_COMMENTS,
2312 Row::pack_slice(&[
2313 Datum::String(&object_id_str),
2314 Datum::String(&object_type_str),
2315 column_pos_datum,
2316 Datum::String(comment),
2317 ]),
2318 diff,
2319 )
2320 }
2321
2322 pub fn pack_webhook_source_update(
2323 &self,
2324 item_id: CatalogItemId,
2325 diff: Diff,
2326 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2327 let url = self
2328 .try_get_webhook_url(&item_id)
2329 .expect("webhook source should exist");
2330 let url = url.to_string();
2331 let name = &self.get_entry(&item_id).name().item;
2332 let id_str = item_id.to_string();
2333
2334 BuiltinTableUpdate::row(
2335 &*MZ_WEBHOOKS_SOURCES,
2336 Row::pack_slice(&[
2337 Datum::String(&id_str),
2338 Datum::String(name),
2339 Datum::String(&url),
2340 ]),
2341 diff,
2342 )
2343 }
2344
2345 pub fn pack_source_references_update(
2346 &self,
2347 source_references: &SourceReferences,
2348 diff: Diff,
2349 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2350 let source_id = source_references.source_id.to_string();
2351 let updated_at = &source_references.updated_at;
2352 source_references
2353 .references
2354 .iter()
2355 .map(|reference| {
2356 let mut row = Row::default();
2357 let mut packer = row.packer();
2358 packer.extend([
2359 Datum::String(&source_id),
2360 reference
2361 .namespace
2362 .as_ref()
2363 .map(|s| Datum::String(s))
2364 .unwrap_or(Datum::Null),
2365 Datum::String(&reference.name),
2366 Datum::TimestampTz(
2367 mz_ore::now::to_datetime(*updated_at)
2368 .try_into()
2369 .expect("must fit"),
2370 ),
2371 ]);
2372 if reference.columns.len() > 0 {
2373 packer
2374 .try_push_array(
2375 &[ArrayDimension {
2376 lower_bound: 1,
2377 length: reference.columns.len(),
2378 }],
2379 reference.columns.iter().map(|col| Datum::String(col)),
2380 )
2381 .expect(
2382 "columns is 1 dimensional, and its length is used for the array length",
2383 );
2384 } else {
2385 packer.push(Datum::Null);
2386 }
2387
2388 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2389 })
2390 .collect()
2391 }
2392}