1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
16use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{
19 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
20 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
21 MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22 MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23 MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
24 MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
26 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
27 MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS,
28 MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES,
29 MZ_PSEUDO_TYPES, MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS,
30 MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
31 MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
32 MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
33 MZ_WEBHOOKS_SOURCES,
34};
35use mz_catalog::config::AwsPrincipalContext;
36use mz_catalog::durable::SourceReferences;
37use mz_catalog::memory::error::{Error, ErrorKind};
38use mz_catalog::memory::objects::{
39 CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
40 Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
41};
42use mz_controller::clusters::{
43 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
44};
45use mz_controller_types::ClusterId;
46use mz_expr::MirScalarExpr;
47use mz_license_keys::ValidatedLicenseKey;
48use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
49use mz_ore::cast::CastFrom;
50use mz_ore::collections::CollectionExt;
51use mz_persist_client::batch::ProtoBatch;
52use mz_repr::adt::array::ArrayDimension;
53use mz_repr::adt::interval::Interval;
54use mz_repr::adt::jsonb::Jsonb;
55use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
56use mz_repr::adt::regex;
57use mz_repr::network_policy_id::NetworkPolicyId;
58use mz_repr::refresh_schedule::RefreshEvery;
59use mz_repr::role_id::RoleId;
60use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
61use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
62use mz_sql::catalog::{
63 CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
64 TypeCategory,
65};
66use mz_sql::func::FuncImplCatalogDetails;
67use mz_sql::names::{
68 CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
69};
70use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
71use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
72use mz_sql::session::vars::SessionVars;
73use mz_sql_parser::ast::display::AstDisplay;
74use mz_storage_client::client::TableData;
75use mz_storage_types::connections::KafkaConnection;
76use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
77use mz_storage_types::connections::inline::ReferencedConnection;
78use mz_storage_types::connections::string_or_secret::StringOrSecret;
79use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
80use mz_storage_types::sources::{
81 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
82};
83use smallvec::smallvec;
84
85use crate::active_compute_sink::ActiveSubscribe;
87use crate::catalog::CatalogState;
88use crate::coord::ConnMeta;
89
90#[derive(Debug, Clone)]
92pub struct BuiltinTableUpdate<T = CatalogItemId> {
93 pub id: T,
95 pub data: TableData,
97}
98
99impl<T> BuiltinTableUpdate<T> {
100 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
102 BuiltinTableUpdate {
103 id,
104 data: TableData::Rows(vec![(row, diff)]),
105 }
106 }
107
108 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
109 BuiltinTableUpdate {
110 id,
111 data: TableData::Batches(smallvec![batch]),
112 }
113 }
114}
115
116impl CatalogState {
117 pub fn resolve_builtin_table_updates(
118 &self,
119 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
120 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
121 builtin_table_update
122 .into_iter()
123 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
124 .collect()
125 }
126
127 pub fn resolve_builtin_table_update(
128 &self,
129 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
130 ) -> BuiltinTableUpdate<CatalogItemId> {
131 let id = self.resolve_builtin_table(id);
132 BuiltinTableUpdate { id, data }
133 }
134
135 pub fn pack_depends_update(
136 &self,
137 depender: CatalogItemId,
138 dependee: CatalogItemId,
139 diff: Diff,
140 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
141 let row = Row::pack_slice(&[
142 Datum::String(&depender.to_string()),
143 Datum::String(&dependee.to_string()),
144 ]);
145 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
146 }
147
148 pub(super) fn pack_database_update(
149 &self,
150 database_id: &DatabaseId,
151 diff: Diff,
152 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
153 let database = self.get_database(database_id);
154 let row = self.pack_privilege_array_row(database.privileges());
155 let privileges = row.unpack_first();
156 BuiltinTableUpdate::row(
157 &*MZ_DATABASES,
158 Row::pack_slice(&[
159 Datum::String(&database.id.to_string()),
160 Datum::UInt32(database.oid),
161 Datum::String(database.name()),
162 Datum::String(&database.owner_id.to_string()),
163 privileges,
164 ]),
165 diff,
166 )
167 }
168
169 pub(super) fn pack_schema_update(
170 &self,
171 database_spec: &ResolvedDatabaseSpecifier,
172 schema_id: &SchemaId,
173 diff: Diff,
174 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
175 let (database_id, schema) = match database_spec {
176 ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
177 ResolvedDatabaseSpecifier::Id(id) => (
178 Some(id.to_string()),
179 &self.database_by_id[id].schemas_by_id[schema_id],
180 ),
181 };
182 let row = self.pack_privilege_array_row(schema.privileges());
183 let privileges = row.unpack_first();
184 BuiltinTableUpdate::row(
185 &*MZ_SCHEMAS,
186 Row::pack_slice(&[
187 Datum::String(&schema_id.to_string()),
188 Datum::UInt32(schema.oid),
189 Datum::from(database_id.as_deref()),
190 Datum::String(&schema.name.schema),
191 Datum::String(&schema.owner_id.to_string()),
192 privileges,
193 ]),
194 diff,
195 )
196 }
197
198 pub(super) fn pack_role_auth_update(
199 &self,
200 id: RoleId,
201 diff: Diff,
202 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
203 let role_auth = self.get_role_auth(&id);
204 let role = self.get_role(&id);
205 BuiltinTableUpdate::row(
206 &*MZ_ROLE_AUTH,
207 Row::pack_slice(&[
208 Datum::String(&role_auth.role_id.to_string()),
209 Datum::UInt32(role.oid),
210 match &role_auth.password_hash {
211 Some(hash) => Datum::String(hash),
212 None => Datum::Null,
213 },
214 Datum::TimestampTz(
215 mz_ore::now::to_datetime(role_auth.updated_at)
216 .try_into()
217 .expect("must fit"),
218 ),
219 ]),
220 diff,
221 )
222 }
223
224 pub(super) fn pack_role_update(
225 &self,
226 id: RoleId,
227 diff: Diff,
228 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
229 match id {
230 RoleId::Public => vec![],
232 id => {
233 let role = self.get_role(&id);
234 let self_managed_auth_enabled = self
235 .system_config()
236 .get(ENABLE_PASSWORD_AUTH.name())
237 .ok()
238 .map(|v| v.value() == "on")
239 .unwrap_or(false);
240
241 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
242
243 let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
263 let matches = regex::Regex::new(cloud_login_regex, true)
264 .expect("valid regex")
265 .is_match(&role.name);
266
267 let rolcanlogin = if self_managed_auth_enabled {
268 role.attributes.login.unwrap_or(false)
269 } else {
270 builtin_supers.contains(&role.id) || matches
271 };
272
273 let rolsuper = if self_managed_auth_enabled {
274 Datum::from(role.attributes.superuser.unwrap_or(false))
275 } else if builtin_supers.contains(&role.id) {
276 Datum::from(true)
277 } else {
278 Datum::Null
279 };
280
281 let role_update = BuiltinTableUpdate::row(
282 &*MZ_ROLES,
283 Row::pack_slice(&[
284 Datum::String(&role.id.to_string()),
285 Datum::UInt32(role.oid),
286 Datum::String(&role.name),
287 Datum::from(role.attributes.inherit),
288 Datum::from(rolcanlogin),
289 rolsuper,
290 ]),
291 diff,
292 );
293 let mut updates = vec![role_update];
294
295 let session_vars_reference = SessionVars::new_unchecked(
298 &mz_build_info::DUMMY_BUILD_INFO,
299 SYSTEM_USER.clone(),
300 None,
301 );
302
303 for (name, val) in role.vars() {
304 let result = session_vars_reference
305 .inspect(name)
306 .and_then(|var| var.check(val.borrow()));
307 let Ok(formatted_val) = result else {
308 tracing::error!(?name, ?val, ?result, "found invalid role default var");
311 continue;
312 };
313
314 let role_var_update = BuiltinTableUpdate::row(
315 &*MZ_ROLE_PARAMETERS,
316 Row::pack_slice(&[
317 Datum::String(&role.id.to_string()),
318 Datum::String(name),
319 Datum::String(&formatted_val),
320 ]),
321 diff,
322 );
323 updates.push(role_var_update);
324 }
325
326 updates
327 }
328 }
329 }
330
331 pub(super) fn pack_role_members_update(
332 &self,
333 role_id: RoleId,
334 member_id: RoleId,
335 diff: Diff,
336 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
337 let grantor_id = self
338 .get_role(&member_id)
339 .membership
340 .map
341 .get(&role_id)
342 .expect("catalog out of sync");
343 BuiltinTableUpdate::row(
344 &*MZ_ROLE_MEMBERS,
345 Row::pack_slice(&[
346 Datum::String(&role_id.to_string()),
347 Datum::String(&member_id.to_string()),
348 Datum::String(&grantor_id.to_string()),
349 ]),
350 diff,
351 )
352 }
353
354 pub(super) fn pack_cluster_update(
355 &self,
356 name: &str,
357 diff: Diff,
358 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
359 let id = self.clusters_by_name[name];
360 let cluster = &self.clusters_by_id[&id];
361 let row = self.pack_privilege_array_row(cluster.privileges());
362 let privileges = row.unpack_first();
363 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
364 match &cluster.config.variant {
365 ClusterVariant::Managed(config) => (
366 Some(config.size.as_str()),
367 Some(self.cluster_replica_size_has_disk(&config.size)),
368 Some(config.replication_factor),
369 if config.availability_zones.is_empty() {
370 None
371 } else {
372 Some(config.availability_zones.clone())
373 },
374 Some(config.logging.log_logging),
375 config.logging.interval.map(|d| {
376 Interval::from_duration(&d)
377 .expect("planning ensured this convertible back to interval")
378 }),
379 ),
380 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
381 };
382
383 let mut row = Row::default();
384 let mut packer = row.packer();
385 packer.extend([
386 Datum::String(&id.to_string()),
387 Datum::String(name),
388 Datum::String(&cluster.owner_id.to_string()),
389 privileges,
390 cluster.is_managed().into(),
391 size.into(),
392 replication_factor.into(),
393 disk.into(),
394 ]);
395 if let Some(azs) = azs {
396 packer.push_list(azs.iter().map(|az| Datum::String(az)));
397 } else {
398 packer.push(Datum::Null);
399 }
400 packer.push(Datum::from(introspection_debugging));
401 packer.push(Datum::from(introspection_interval));
402
403 let mut updates = Vec::new();
404
405 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
406
407 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
408 let row = match managed_config.schedule {
409 ClusterSchedule::Manual => Row::pack_slice(&[
410 Datum::String(&id.to_string()),
411 Datum::String("manual"),
412 Datum::Null,
413 ]),
414 ClusterSchedule::Refresh {
415 hydration_time_estimate,
416 } => Row::pack_slice(&[
417 Datum::String(&id.to_string()),
418 Datum::String("on-refresh"),
419 Datum::Interval(
420 Interval::from_duration(&hydration_time_estimate)
421 .expect("planning ensured that this is convertible back to Interval"),
422 ),
423 ]),
424 };
425 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
426 }
427
428 updates.push(BuiltinTableUpdate::row(
429 &*MZ_CLUSTER_WORKLOAD_CLASSES,
430 Row::pack_slice(&[
431 Datum::String(&id.to_string()),
432 Datum::from(cluster.config.workload_class.as_deref()),
433 ]),
434 diff,
435 ));
436
437 updates
438 }
439
440 pub(super) fn pack_cluster_replica_update(
441 &self,
442 cluster_id: ClusterId,
443 name: &str,
444 diff: Diff,
445 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
446 let cluster = &self.clusters_by_id[&cluster_id];
447 let id = cluster.replica_id(name).expect("Must exist");
448 let replica = cluster.replica(id).expect("Must exist");
449
450 let (size, disk, az, internal, pending) = match &replica.config.location {
451 ReplicaLocation::Managed(ManagedReplicaLocation {
454 size,
455 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
456 allocation: _,
457 billed_as: _,
458 internal,
459 pending,
460 }) => (
461 Some(&**size),
462 Some(self.cluster_replica_size_has_disk(size)),
463 Some(az.as_str()),
464 *internal,
465 *pending,
466 ),
467 ReplicaLocation::Managed(ManagedReplicaLocation {
468 size,
469 availability_zones: _,
470 allocation: _,
471 billed_as: _,
472 internal,
473 pending,
474 }) => (
475 Some(&**size),
476 Some(self.cluster_replica_size_has_disk(size)),
477 None,
478 *internal,
479 *pending,
480 ),
481 _ => (None, None, None, false, false),
482 };
483
484 let cluster_replica_update = BuiltinTableUpdate::row(
485 &*MZ_CLUSTER_REPLICAS,
486 Row::pack_slice(&[
487 Datum::String(&id.to_string()),
488 Datum::String(name),
489 Datum::String(&cluster_id.to_string()),
490 Datum::from(size),
491 Datum::from(az),
492 Datum::String(&replica.owner_id.to_string()),
493 Datum::from(disk),
494 ]),
495 diff,
496 );
497
498 let mut updates = vec![cluster_replica_update];
499
500 if internal {
501 let update = BuiltinTableUpdate::row(
502 &*MZ_INTERNAL_CLUSTER_REPLICAS,
503 Row::pack_slice(&[Datum::String(&id.to_string())]),
504 diff,
505 );
506 updates.push(update);
507 }
508
509 if pending {
510 let update = BuiltinTableUpdate::row(
511 &*MZ_PENDING_CLUSTER_REPLICAS,
512 Row::pack_slice(&[Datum::String(&id.to_string())]),
513 diff,
514 );
515 updates.push(update);
516 }
517
518 updates
519 }
520
521 pub(crate) fn pack_network_policy_update(
522 &self,
523 policy_id: &NetworkPolicyId,
524 diff: Diff,
525 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
526 let policy = self.get_network_policy(policy_id);
527 let row = self.pack_privilege_array_row(&policy.privileges);
528 let privileges = row.unpack_first();
529 let mut updates = Vec::new();
530 for ref rule in policy.rules.clone() {
531 updates.push(BuiltinTableUpdate::row(
532 &*MZ_NETWORK_POLICY_RULES,
533 Row::pack_slice(&[
534 Datum::String(&rule.name),
535 Datum::String(&policy.id.to_string()),
536 Datum::String(&rule.action.to_string()),
537 Datum::String(&rule.address.to_string()),
538 Datum::String(&rule.direction.to_string()),
539 ]),
540 diff,
541 ));
542 }
543 updates.push(BuiltinTableUpdate::row(
544 &*MZ_NETWORK_POLICIES,
545 Row::pack_slice(&[
546 Datum::String(&policy.id.to_string()),
547 Datum::String(&policy.name),
548 Datum::String(&policy.owner_id.to_string()),
549 privileges,
550 Datum::UInt32(policy.oid.clone()),
551 ]),
552 diff,
553 ));
554
555 Ok(updates)
556 }
557
558 pub(super) fn pack_item_update(
559 &self,
560 id: CatalogItemId,
561 diff: Diff,
562 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
563 let entry = self.get_entry(&id);
564 let oid = entry.oid();
565 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
566 let schema_id = &self
567 .get_schema(
568 &entry.name().qualifiers.database_spec,
569 &entry.name().qualifiers.schema_spec,
570 conn_id,
571 )
572 .id;
573 let name = &entry.name().item;
574 let owner_id = entry.owner_id();
575 let privileges_row = self.pack_privilege_array_row(entry.privileges());
576 let privileges = privileges_row.unpack_first();
577 let mut updates = match entry.item() {
578 CatalogItem::Log(_) => self.pack_source_update(
579 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
580 privileges, diff, None,
581 ),
582 CatalogItem::Index(index) => {
583 self.pack_index_update(id, oid, name, owner_id, index, diff)
584 }
585 CatalogItem::Table(table) => {
586 let mut updates = self
587 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
588
589 if let TableDataSource::DataSource {
590 desc: data_source,
591 timeline: _,
592 } = &table.data_source
593 {
594 updates.extend(match data_source {
595 DataSourceDesc::IngestionExport {
596 ingestion_id,
597 external_reference: UnresolvedItemName(external_reference),
598 details: _,
599 data_config: _,
600 } => {
601 let ingestion_entry = self
602 .get_entry(ingestion_id)
603 .source_desc()
604 .expect("primary source exists")
605 .expect("primary source is a source");
606
607 match ingestion_entry.connection.name() {
608 "postgres" => {
609 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
610 let schema_name = external_reference[1].to_ast_string_simple();
616 let table_name = external_reference[2].to_ast_string_simple();
617
618 self.pack_postgres_source_tables_update(
619 id,
620 &schema_name,
621 &table_name,
622 diff,
623 )
624 }
625 "mysql" => {
626 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
627 let schema_name = external_reference[0].to_ast_string_simple();
628 let table_name = external_reference[1].to_ast_string_simple();
629
630 self.pack_mysql_source_tables_update(
631 id,
632 &schema_name,
633 &table_name,
634 diff,
635 )
636 }
637 "sql-server" => {
638 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
639 let schema_name = external_reference[1].to_ast_string_simple();
644 let table_name = external_reference[2].to_ast_string_simple();
645
646 self.pack_sql_server_source_table_update(
647 id,
648 &schema_name,
649 &table_name,
650 diff,
651 )
652 }
653 "load-generator" => vec![],
656 "kafka" => {
657 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
658 let topic = external_reference[0].to_ast_string_simple();
659 let envelope = data_source.envelope();
660 let (key_format, value_format) = data_source.formats();
661
662 self.pack_kafka_source_tables_update(
663 id,
664 &topic,
665 envelope,
666 key_format,
667 value_format,
668 diff,
669 )
670 }
671 s => unreachable!("{s} sources do not have tables"),
672 }
673 }
674 _ => vec![],
675 });
676 }
677
678 updates
679 }
680 CatalogItem::Source(source) => {
681 let source_type = source.source_type();
682 let connection_id = source.connection_id();
683 let envelope = source.data_source.envelope();
684 let cluster_entry = match source.data_source {
685 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
688 self.get_entry(&ingestion_id)
689 }
690 _ => entry,
691 };
692
693 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
694
695 let (key_format, value_format) = source.data_source.formats();
696
697 let mut updates = self.pack_source_update(
698 id,
699 oid,
700 schema_id,
701 name,
702 source_type,
703 connection_id,
704 envelope,
705 key_format,
706 value_format,
707 cluster_id.as_deref(),
708 owner_id,
709 privileges,
710 diff,
711 source.create_sql.as_ref(),
712 );
713
714 updates.extend(match &source.data_source {
715 DataSourceDesc::Ingestion { desc, .. }
716 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
717 GenericSourceConnection::Postgres(postgres) => {
718 self.pack_postgres_source_update(id, postgres, diff)
719 }
720 GenericSourceConnection::Kafka(kafka) => {
721 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
722 }
723 _ => vec![],
724 },
725 DataSourceDesc::IngestionExport {
726 ingestion_id,
727 external_reference: UnresolvedItemName(external_reference),
728 details: _,
729 data_config: _,
730 } => {
731 let ingestion_entry = self
732 .get_entry(ingestion_id)
733 .source_desc()
734 .expect("primary source exists")
735 .expect("primary source is a source");
736
737 match ingestion_entry.connection.name() {
738 "postgres" => {
739 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
740 let schema_name = external_reference[1].to_ast_string_simple();
746 let table_name = external_reference[2].to_ast_string_simple();
747
748 self.pack_postgres_source_tables_update(
749 id,
750 &schema_name,
751 &table_name,
752 diff,
753 )
754 }
755 "mysql" => {
756 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
757 let schema_name = external_reference[0].to_ast_string_simple();
758 let table_name = external_reference[1].to_ast_string_simple();
759
760 self.pack_mysql_source_tables_update(
761 id,
762 &schema_name,
763 &table_name,
764 diff,
765 )
766 }
767 "sql-server" => {
768 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
769 let schema_name = external_reference[1].to_ast_string_simple();
774 let table_name = external_reference[2].to_ast_string_simple();
775
776 self.pack_sql_server_source_table_update(
777 id,
778 &schema_name,
779 &table_name,
780 diff,
781 )
782 }
783 "load-generator" => vec![],
786 s => unreachable!("{s} sources do not have subsources"),
787 }
788 }
789 DataSourceDesc::Webhook { .. } => {
790 vec![self.pack_webhook_source_update(id, diff)]
791 }
792 _ => vec![],
793 });
794
795 updates
796 }
797 CatalogItem::View(view) => {
798 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
799 }
800 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
801 id, oid, schema_id, name, owner_id, privileges, mview, diff,
802 ),
803 CatalogItem::Sink(sink) => {
804 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
805 }
806 CatalogItem::Type(ty) => {
807 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
808 }
809 CatalogItem::Func(func) => {
810 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
811 }
812 CatalogItem::Secret(_) => {
813 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
814 }
815 CatalogItem::Connection(connection) => self.pack_connection_update(
816 id, oid, schema_id, name, owner_id, privileges, connection, diff,
817 ),
818 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
819 id, oid, schema_id, name, owner_id, privileges, ct, diff,
820 ),
821 };
822
823 if !entry.item().is_temporary() {
824 for dependee in entry.item().references().items() {
827 updates.push(self.pack_depends_update(id, *dependee, diff))
828 }
829 }
830
831 let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
832 if let Ok(desc) = entry.desc_latest(&full_name) {
834 let defaults = match entry.item() {
835 CatalogItem::Table(Table {
836 data_source: TableDataSource::TableWrites { defaults },
837 ..
838 }) => Some(defaults),
839 _ => None,
840 };
841 for (i, (column_name, column_type)) in desc.iter().enumerate() {
842 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
843 let default: Datum = default
844 .as_ref()
845 .map(|d| Datum::String(d))
846 .unwrap_or(Datum::Null);
847 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
848 let (type_name, type_oid) = match &column_type.scalar_type {
849 SqlScalarType::List {
850 custom_id: Some(custom_id),
851 ..
852 }
853 | SqlScalarType::Map {
854 custom_id: Some(custom_id),
855 ..
856 }
857 | SqlScalarType::Record {
858 custom_id: Some(custom_id),
859 ..
860 } => {
861 let entry = self.get_entry(custom_id);
862 let name = &*entry.name().item;
877 let oid = entry.oid();
878 (name, oid)
879 }
880 _ => (pgtype.name(), pgtype.oid()),
881 };
882 updates.push(BuiltinTableUpdate::row(
883 &*MZ_COLUMNS,
884 Row::pack_slice(&[
885 Datum::String(&id.to_string()),
886 Datum::String(column_name),
887 Datum::UInt64(u64::cast_from(i + 1)),
888 Datum::from(column_type.nullable),
889 Datum::String(type_name),
890 default,
891 Datum::UInt32(type_oid),
892 Datum::Int32(pgtype.typmod()),
893 ]),
894 diff,
895 ));
896 }
897 }
898
899 if let Some(cw) = entry.item().initial_logical_compaction_window() {
901 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
902 }
903
904 updates.extend(Self::pack_item_global_id_update(entry, diff));
905
906 updates
907 }
908
909 fn pack_item_global_id_update(
910 entry: &CatalogEntry,
911 diff: Diff,
912 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
913 let id = entry.id().to_string();
914 let global_ids = entry.global_ids();
915 global_ids.map(move |global_id| {
916 BuiltinTableUpdate::row(
917 &*MZ_OBJECT_GLOBAL_IDS,
918 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
919 diff,
920 )
921 })
922 }
923
924 fn pack_history_retention_strategy_update(
925 &self,
926 id: CatalogItemId,
927 cw: CompactionWindow,
928 diff: Diff,
929 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
930 let cw: u64 = cw.comparable_timestamp().into();
931 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
932 .expect("must serialize");
933 BuiltinTableUpdate::row(
934 &*MZ_HISTORY_RETENTION_STRATEGIES,
935 Row::pack_slice(&[
936 Datum::String(&id.to_string()),
937 Datum::String("FOR"),
939 cw.into_row().into_element(),
940 ]),
941 diff,
942 )
943 }
944
945 fn pack_table_update(
946 &self,
947 id: CatalogItemId,
948 oid: u32,
949 schema_id: &SchemaSpecifier,
950 name: &str,
951 owner_id: &RoleId,
952 privileges: Datum,
953 diff: Diff,
954 table: &Table,
955 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
956 let redacted = table.create_sql.as_ref().map(|create_sql| {
957 mz_sql::parse::parse(create_sql)
958 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
959 .into_element()
960 .ast
961 .to_ast_string_redacted()
962 });
963 let source_id = if let TableDataSource::DataSource {
964 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
965 ..
966 } = &table.data_source
967 {
968 Some(ingestion_id.to_string())
969 } else {
970 None
971 };
972
973 vec![BuiltinTableUpdate::row(
974 &*MZ_TABLES,
975 Row::pack_slice(&[
976 Datum::String(&id.to_string()),
977 Datum::UInt32(oid),
978 Datum::String(&schema_id.to_string()),
979 Datum::String(name),
980 Datum::String(&owner_id.to_string()),
981 privileges,
982 if let Some(create_sql) = &table.create_sql {
983 Datum::String(create_sql)
984 } else {
985 Datum::Null
986 },
987 if let Some(redacted) = &redacted {
988 Datum::String(redacted)
989 } else {
990 Datum::Null
991 },
992 if let Some(source_id) = source_id.as_ref() {
993 Datum::String(source_id)
994 } else {
995 Datum::Null
996 },
997 ]),
998 diff,
999 )]
1000 }
1001
1002 fn pack_source_update(
1003 &self,
1004 id: CatalogItemId,
1005 oid: u32,
1006 schema_id: &SchemaSpecifier,
1007 name: &str,
1008 source_desc_name: &str,
1009 connection_id: Option<CatalogItemId>,
1010 envelope: Option<&str>,
1011 key_format: Option<&str>,
1012 value_format: Option<&str>,
1013 cluster_id: Option<&str>,
1014 owner_id: &RoleId,
1015 privileges: Datum,
1016 diff: Diff,
1017 create_sql: Option<&String>,
1018 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1019 let redacted = create_sql.map(|create_sql| {
1020 let create_stmt = mz_sql::parse::parse(create_sql)
1021 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1022 .into_element()
1023 .ast;
1024 create_stmt.to_ast_string_redacted()
1025 });
1026 vec![BuiltinTableUpdate::row(
1027 &*MZ_SOURCES,
1028 Row::pack_slice(&[
1029 Datum::String(&id.to_string()),
1030 Datum::UInt32(oid),
1031 Datum::String(&schema_id.to_string()),
1032 Datum::String(name),
1033 Datum::String(source_desc_name),
1034 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1035 Datum::Null,
1038 Datum::from(envelope),
1039 Datum::from(key_format),
1040 Datum::from(value_format),
1041 Datum::from(cluster_id),
1042 Datum::String(&owner_id.to_string()),
1043 privileges,
1044 if let Some(create_sql) = create_sql {
1045 Datum::String(create_sql)
1046 } else {
1047 Datum::Null
1048 },
1049 if let Some(redacted) = &redacted {
1050 Datum::String(redacted)
1051 } else {
1052 Datum::Null
1053 },
1054 ]),
1055 diff,
1056 )]
1057 }
1058
1059 fn pack_postgres_source_update(
1060 &self,
1061 id: CatalogItemId,
1062 postgres: &PostgresSourceConnection<ReferencedConnection>,
1063 diff: Diff,
1064 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1065 vec![BuiltinTableUpdate::row(
1066 &*MZ_POSTGRES_SOURCES,
1067 Row::pack_slice(&[
1068 Datum::String(&id.to_string()),
1069 Datum::String(&postgres.publication_details.slot),
1070 Datum::from(postgres.publication_details.timeline_id),
1071 ]),
1072 diff,
1073 )]
1074 }
1075
1076 fn pack_kafka_source_update(
1077 &self,
1078 item_id: CatalogItemId,
1079 collection_id: GlobalId,
1080 kafka: &KafkaSourceConnection<ReferencedConnection>,
1081 diff: Diff,
1082 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1083 vec![BuiltinTableUpdate::row(
1084 &*MZ_KAFKA_SOURCES,
1085 Row::pack_slice(&[
1086 Datum::String(&item_id.to_string()),
1087 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1088 Datum::String(&kafka.topic),
1089 ]),
1090 diff,
1091 )]
1092 }
1093
1094 fn pack_postgres_source_tables_update(
1095 &self,
1096 id: CatalogItemId,
1097 schema_name: &str,
1098 table_name: &str,
1099 diff: Diff,
1100 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1101 vec![BuiltinTableUpdate::row(
1102 &*MZ_POSTGRES_SOURCE_TABLES,
1103 Row::pack_slice(&[
1104 Datum::String(&id.to_string()),
1105 Datum::String(schema_name),
1106 Datum::String(table_name),
1107 ]),
1108 diff,
1109 )]
1110 }
1111
1112 fn pack_mysql_source_tables_update(
1113 &self,
1114 id: CatalogItemId,
1115 schema_name: &str,
1116 table_name: &str,
1117 diff: Diff,
1118 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1119 vec![BuiltinTableUpdate::row(
1120 &*MZ_MYSQL_SOURCE_TABLES,
1121 Row::pack_slice(&[
1122 Datum::String(&id.to_string()),
1123 Datum::String(schema_name),
1124 Datum::String(table_name),
1125 ]),
1126 diff,
1127 )]
1128 }
1129
1130 fn pack_sql_server_source_table_update(
1131 &self,
1132 id: CatalogItemId,
1133 schema_name: &str,
1134 table_name: &str,
1135 diff: Diff,
1136 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1137 vec![BuiltinTableUpdate::row(
1138 &*MZ_SQL_SERVER_SOURCE_TABLES,
1139 Row::pack_slice(&[
1140 Datum::String(&id.to_string()),
1141 Datum::String(schema_name),
1142 Datum::String(table_name),
1143 ]),
1144 diff,
1145 )]
1146 }
1147
1148 fn pack_kafka_source_tables_update(
1149 &self,
1150 id: CatalogItemId,
1151 topic: &str,
1152 envelope: Option<&str>,
1153 key_format: Option<&str>,
1154 value_format: Option<&str>,
1155 diff: Diff,
1156 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1157 vec![BuiltinTableUpdate::row(
1158 &*MZ_KAFKA_SOURCE_TABLES,
1159 Row::pack_slice(&[
1160 Datum::String(&id.to_string()),
1161 Datum::String(topic),
1162 Datum::from(envelope),
1163 Datum::from(key_format),
1164 Datum::from(value_format),
1165 ]),
1166 diff,
1167 )]
1168 }
1169
1170 fn pack_connection_update(
1171 &self,
1172 id: CatalogItemId,
1173 oid: u32,
1174 schema_id: &SchemaSpecifier,
1175 name: &str,
1176 owner_id: &RoleId,
1177 privileges: Datum,
1178 connection: &Connection,
1179 diff: Diff,
1180 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1181 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1182 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1183 .into_element()
1184 .ast;
1185 let mut updates = vec![BuiltinTableUpdate::row(
1186 &*MZ_CONNECTIONS,
1187 Row::pack_slice(&[
1188 Datum::String(&id.to_string()),
1189 Datum::UInt32(oid),
1190 Datum::String(&schema_id.to_string()),
1191 Datum::String(name),
1192 Datum::String(match connection.details {
1193 ConnectionDetails::Kafka { .. } => "kafka",
1194 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1195 ConnectionDetails::Postgres { .. } => "postgres",
1196 ConnectionDetails::Aws(..) => "aws",
1197 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1198 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1199 ConnectionDetails::MySql { .. } => "mysql",
1200 ConnectionDetails::SqlServer(_) => "sql-server",
1201 ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1202 }),
1203 Datum::String(&owner_id.to_string()),
1204 privileges,
1205 Datum::String(&connection.create_sql),
1206 Datum::String(&create_stmt.to_ast_string_redacted()),
1207 ]),
1208 diff,
1209 )];
1210 match connection.details {
1211 ConnectionDetails::Kafka(ref kafka) => {
1212 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1213 }
1214 ConnectionDetails::Aws(ref aws_config) => {
1215 match self.pack_aws_connection_update(id, aws_config, diff) {
1216 Ok(update) => {
1217 updates.push(update);
1218 }
1219 Err(e) => {
1220 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1221 }
1222 }
1223 }
1224 ConnectionDetails::AwsPrivatelink(_) => {
1225 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1226 updates.push(self.pack_aws_privatelink_connection_update(
1227 id,
1228 aws_principal_context,
1229 diff,
1230 ));
1231 } else {
1232 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1233 }
1234 }
1235 ConnectionDetails::Ssh {
1236 ref key_1,
1237 ref key_2,
1238 ..
1239 } => {
1240 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1241 }
1242 ConnectionDetails::Csr(_)
1243 | ConnectionDetails::Postgres(_)
1244 | ConnectionDetails::MySql(_)
1245 | ConnectionDetails::SqlServer(_)
1246 | ConnectionDetails::IcebergCatalog(_) => (),
1247 };
1248 updates
1249 }
1250
1251 pub(crate) fn pack_ssh_tunnel_connection_update(
1252 &self,
1253 id: CatalogItemId,
1254 key_1: &SshKey,
1255 key_2: &SshKey,
1256 diff: Diff,
1257 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1258 BuiltinTableUpdate::row(
1259 &*MZ_SSH_TUNNEL_CONNECTIONS,
1260 Row::pack_slice(&[
1261 Datum::String(&id.to_string()),
1262 Datum::String(key_1.public_key().as_str()),
1263 Datum::String(key_2.public_key().as_str()),
1264 ]),
1265 diff,
1266 )
1267 }
1268
1269 fn pack_kafka_connection_update(
1270 &self,
1271 id: CatalogItemId,
1272 kafka: &KafkaConnection<ReferencedConnection>,
1273 diff: Diff,
1274 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1275 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1276 let mut row = Row::default();
1277 row.packer()
1278 .try_push_array(
1279 &[ArrayDimension {
1280 lower_bound: 1,
1281 length: kafka.brokers.len(),
1282 }],
1283 kafka
1284 .brokers
1285 .iter()
1286 .map(|broker| Datum::String(&broker.address)),
1287 )
1288 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1289 let brokers = row.unpack_first();
1290 vec![BuiltinTableUpdate::row(
1291 &*MZ_KAFKA_CONNECTIONS,
1292 Row::pack_slice(&[
1293 Datum::String(&id.to_string()),
1294 brokers,
1295 Datum::String(&progress_topic),
1296 ]),
1297 diff,
1298 )]
1299 }
1300
1301 pub fn pack_aws_privatelink_connection_update(
1302 &self,
1303 connection_id: CatalogItemId,
1304 aws_principal_context: &AwsPrincipalContext,
1305 diff: Diff,
1306 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1307 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1308 let row = Row::pack_slice(&[
1309 Datum::String(&connection_id.to_string()),
1310 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1311 ]);
1312 BuiltinTableUpdate::row(id, row, diff)
1313 }
1314
1315 pub fn pack_aws_connection_update(
1316 &self,
1317 connection_id: CatalogItemId,
1318 aws_config: &AwsConnection,
1319 diff: Diff,
1320 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1321 let id = &MZ_AWS_CONNECTIONS;
1322
1323 let mut access_key_id = None;
1324 let mut access_key_id_secret_id = None;
1325 let mut secret_access_key_secret_id = None;
1326 let mut session_token = None;
1327 let mut session_token_secret_id = None;
1328 let mut assume_role_arn = None;
1329 let mut assume_role_session_name = None;
1330 let mut principal = None;
1331 let mut external_id = None;
1332 let mut example_trust_policy = None;
1333 match &aws_config.auth {
1334 AwsAuth::Credentials(credentials) => {
1335 match &credentials.access_key_id {
1336 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1337 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1338 }
1339 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1340 match credentials.session_token.as_ref() {
1341 None => (),
1342 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1343 Some(StringOrSecret::Secret(s)) => {
1344 session_token_secret_id = Some(s.to_string())
1345 }
1346 }
1347 }
1348 AwsAuth::AssumeRole(assume_role) => {
1349 assume_role_arn = Some(assume_role.arn.as_str());
1350 assume_role_session_name = assume_role.session_name.as_deref();
1351 principal = self
1352 .config
1353 .connection_context
1354 .aws_connection_role_arn
1355 .as_deref();
1356 external_id =
1357 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1358 example_trust_policy = {
1359 let policy = assume_role
1360 .example_trust_policy(&self.config.connection_context, connection_id)?;
1361 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1362 Some(policy.into_row())
1363 };
1364 }
1365 }
1366
1367 let row = Row::pack_slice(&[
1368 Datum::String(&connection_id.to_string()),
1369 Datum::from(aws_config.endpoint.as_deref()),
1370 Datum::from(aws_config.region.as_deref()),
1371 Datum::from(access_key_id),
1372 Datum::from(access_key_id_secret_id.as_deref()),
1373 Datum::from(secret_access_key_secret_id.as_deref()),
1374 Datum::from(session_token),
1375 Datum::from(session_token_secret_id.as_deref()),
1376 Datum::from(assume_role_arn),
1377 Datum::from(assume_role_session_name),
1378 Datum::from(principal),
1379 Datum::from(external_id.as_deref()),
1380 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1381 ]);
1382
1383 Ok(BuiltinTableUpdate::row(id, row, diff))
1384 }
1385
1386 fn pack_view_update(
1387 &self,
1388 id: CatalogItemId,
1389 oid: u32,
1390 schema_id: &SchemaSpecifier,
1391 name: &str,
1392 owner_id: &RoleId,
1393 privileges: Datum,
1394 view: &View,
1395 diff: Diff,
1396 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1397 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1398 .unwrap_or_else(|e| {
1399 panic!(
1400 "create_sql cannot be invalid: `{}` --- error: `{}`",
1401 view.create_sql, e
1402 )
1403 })
1404 .into_element()
1405 .ast;
1406 let query = match &create_stmt {
1407 Statement::CreateView(stmt) => &stmt.definition.query,
1408 _ => unreachable!(),
1409 };
1410
1411 let mut query_string = query.to_ast_string_stable();
1412 query_string.push(';');
1415
1416 vec![BuiltinTableUpdate::row(
1417 &*MZ_VIEWS,
1418 Row::pack_slice(&[
1419 Datum::String(&id.to_string()),
1420 Datum::UInt32(oid),
1421 Datum::String(&schema_id.to_string()),
1422 Datum::String(name),
1423 Datum::String(&query_string),
1424 Datum::String(&owner_id.to_string()),
1425 privileges,
1426 Datum::String(&view.create_sql),
1427 Datum::String(&create_stmt.to_ast_string_redacted()),
1428 ]),
1429 diff,
1430 )]
1431 }
1432
1433 fn pack_materialized_view_update(
1434 &self,
1435 id: CatalogItemId,
1436 oid: u32,
1437 schema_id: &SchemaSpecifier,
1438 name: &str,
1439 owner_id: &RoleId,
1440 privileges: Datum,
1441 mview: &MaterializedView,
1442 diff: Diff,
1443 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1444 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1445 .unwrap_or_else(|e| {
1446 panic!(
1447 "create_sql cannot be invalid: `{}` --- error: `{}`",
1448 mview.create_sql, e
1449 )
1450 })
1451 .into_element()
1452 .ast;
1453 let query_string = match &create_stmt {
1454 Statement::CreateMaterializedView(stmt) => {
1455 let mut query_string = stmt.query.to_ast_string_stable();
1456 query_string.push(';');
1459 query_string
1460 }
1461 _ => unreachable!(),
1462 };
1463
1464 let mut updates = Vec::new();
1465
1466 updates.push(BuiltinTableUpdate::row(
1467 &*MZ_MATERIALIZED_VIEWS,
1468 Row::pack_slice(&[
1469 Datum::String(&id.to_string()),
1470 Datum::UInt32(oid),
1471 Datum::String(&schema_id.to_string()),
1472 Datum::String(name),
1473 Datum::String(&mview.cluster_id.to_string()),
1474 Datum::String(&query_string),
1475 Datum::String(&owner_id.to_string()),
1476 privileges,
1477 Datum::String(&mview.create_sql),
1478 Datum::String(&create_stmt.to_ast_string_redacted()),
1479 ]),
1480 diff,
1481 ));
1482
1483 if let Some(refresh_schedule) = &mview.refresh_schedule {
1484 assert!(!refresh_schedule.is_empty());
1487 for RefreshEvery {
1488 interval,
1489 aligned_to,
1490 } in refresh_schedule.everies.iter()
1491 {
1492 let aligned_to_dt = mz_ore::now::to_datetime(
1493 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1494 );
1495 updates.push(BuiltinTableUpdate::row(
1496 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1497 Row::pack_slice(&[
1498 Datum::String(&id.to_string()),
1499 Datum::String("every"),
1500 Datum::Interval(
1501 Interval::from_duration(interval).expect(
1502 "planning ensured that this is convertible back to Interval",
1503 ),
1504 ),
1505 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1506 Datum::Null,
1507 ]),
1508 diff,
1509 ));
1510 }
1511 for at in refresh_schedule.ats.iter() {
1512 let at_dt = mz_ore::now::to_datetime(
1513 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1514 );
1515 updates.push(BuiltinTableUpdate::row(
1516 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1517 Row::pack_slice(&[
1518 Datum::String(&id.to_string()),
1519 Datum::String("at"),
1520 Datum::Null,
1521 Datum::Null,
1522 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1523 ]),
1524 diff,
1525 ));
1526 }
1527 } else {
1528 updates.push(BuiltinTableUpdate::row(
1529 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1530 Row::pack_slice(&[
1531 Datum::String(&id.to_string()),
1532 Datum::String("on-commit"),
1533 Datum::Null,
1534 Datum::Null,
1535 Datum::Null,
1536 ]),
1537 diff,
1538 ));
1539 }
1540
1541 updates
1542 }
1543
1544 fn pack_continual_task_update(
1545 &self,
1546 id: CatalogItemId,
1547 oid: u32,
1548 schema_id: &SchemaSpecifier,
1549 name: &str,
1550 owner_id: &RoleId,
1551 privileges: Datum,
1552 ct: &ContinualTask,
1553 diff: Diff,
1554 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1555 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1556 .unwrap_or_else(|e| {
1557 panic!(
1558 "create_sql cannot be invalid: `{}` --- error: `{}`",
1559 ct.create_sql, e
1560 )
1561 })
1562 .into_element()
1563 .ast;
1564 let query_string = match &create_stmt {
1565 Statement::CreateContinualTask(stmt) => {
1566 let mut query_string = String::new();
1567 for stmt in &stmt.stmts {
1568 let s = match stmt {
1569 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1570 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1571 };
1572 if query_string.is_empty() {
1573 query_string = s;
1574 } else {
1575 query_string.push_str("; ");
1576 query_string.push_str(&s);
1577 }
1578 }
1579 query_string
1580 }
1581 _ => unreachable!(),
1582 };
1583
1584 vec![BuiltinTableUpdate::row(
1585 &*MZ_CONTINUAL_TASKS,
1586 Row::pack_slice(&[
1587 Datum::String(&id.to_string()),
1588 Datum::UInt32(oid),
1589 Datum::String(&schema_id.to_string()),
1590 Datum::String(name),
1591 Datum::String(&ct.cluster_id.to_string()),
1592 Datum::String(&query_string),
1593 Datum::String(&owner_id.to_string()),
1594 privileges,
1595 Datum::String(&ct.create_sql),
1596 Datum::String(&create_stmt.to_ast_string_redacted()),
1597 ]),
1598 diff,
1599 )]
1600 }
1601
1602 fn pack_sink_update(
1603 &self,
1604 id: CatalogItemId,
1605 oid: u32,
1606 schema_id: &SchemaSpecifier,
1607 name: &str,
1608 owner_id: &RoleId,
1609 sink: &Sink,
1610 diff: Diff,
1611 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1612 let mut updates = vec![];
1613 match &sink.connection {
1614 StorageSinkConnection::Kafka(KafkaSinkConnection {
1615 topic: topic_name, ..
1616 }) => {
1617 updates.push(BuiltinTableUpdate::row(
1618 &*MZ_KAFKA_SINKS,
1619 Row::pack_slice(&[
1620 Datum::String(&id.to_string()),
1621 Datum::String(topic_name.as_str()),
1622 ]),
1623 diff,
1624 ));
1625 }
1626 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1627 namespace, table, ..
1628 }) => {
1629 updates.push(BuiltinTableUpdate::row(
1630 &*MZ_ICEBERG_SINKS,
1631 Row::pack_slice(&[
1632 Datum::String(&id.to_string()),
1633 Datum::String(namespace.as_str()),
1634 Datum::String(table.as_str()),
1635 ]),
1636 diff,
1637 ));
1638 }
1639 };
1640
1641 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1642 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1643 .into_element()
1644 .ast;
1645
1646 let envelope = sink.envelope();
1647
1648 let combined_format = sink.combined_format();
1650 let (key_format, value_format) = match sink.formats() {
1651 Some((key_format, value_format)) => (key_format, Some(value_format)),
1652 None => (None, None),
1653 };
1654
1655 updates.push(BuiltinTableUpdate::row(
1656 &*MZ_SINKS,
1657 Row::pack_slice(&[
1658 Datum::String(&id.to_string()),
1659 Datum::UInt32(oid),
1660 Datum::String(&schema_id.to_string()),
1661 Datum::String(name),
1662 Datum::String(sink.connection.name()),
1663 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1664 Datum::Null,
1666 Datum::from(envelope),
1667 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1670 Datum::from(key_format),
1671 Datum::from(value_format),
1672 Datum::String(&sink.cluster_id.to_string()),
1673 Datum::String(&owner_id.to_string()),
1674 Datum::String(&sink.create_sql),
1675 Datum::String(&create_stmt.to_ast_string_redacted()),
1676 ]),
1677 diff,
1678 ));
1679
1680 updates
1681 }
1682
1683 fn pack_index_update(
1684 &self,
1685 id: CatalogItemId,
1686 oid: u32,
1687 name: &str,
1688 owner_id: &RoleId,
1689 index: &Index,
1690 diff: Diff,
1691 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1692 let mut updates = vec![];
1693
1694 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1695 .unwrap_or_else(|e| {
1696 panic!(
1697 "create_sql cannot be invalid: `{}` --- error: `{}`",
1698 index.create_sql, e
1699 )
1700 })
1701 .into_element()
1702 .ast;
1703
1704 let key_sqls = match &create_stmt {
1705 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1706 .as_ref()
1707 .expect("key_parts is filled in during planning"),
1708 _ => unreachable!(),
1709 };
1710 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1711
1712 updates.push(BuiltinTableUpdate::row(
1713 &*MZ_INDEXES,
1714 Row::pack_slice(&[
1715 Datum::String(&id.to_string()),
1716 Datum::UInt32(oid),
1717 Datum::String(name),
1718 Datum::String(&on_item_id.to_string()),
1719 Datum::String(&index.cluster_id.to_string()),
1720 Datum::String(&owner_id.to_string()),
1721 Datum::String(&index.create_sql),
1722 Datum::String(&create_stmt.to_ast_string_redacted()),
1723 ]),
1724 diff,
1725 ));
1726
1727 for (i, key) in index.keys.iter().enumerate() {
1728 let on_entry = self.get_entry_by_global_id(&index.on);
1729 let nullable = key
1730 .typ(
1731 &on_entry
1732 .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1733 .expect("can only create indexes on items with a valid description")
1734 .typ()
1735 .column_types,
1736 )
1737 .nullable;
1738 let seq_in_index = u64::cast_from(i + 1);
1739 let key_sql = key_sqls
1740 .get(i)
1741 .expect("missing sql information for index key")
1742 .to_ast_string_simple();
1743 let (field_number, expression) = match key {
1744 MirScalarExpr::Column(col, _) => {
1745 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1746 }
1747 _ => (Datum::Null, Datum::String(&key_sql)),
1748 };
1749 updates.push(BuiltinTableUpdate::row(
1750 &*MZ_INDEX_COLUMNS,
1751 Row::pack_slice(&[
1752 Datum::String(&id.to_string()),
1753 Datum::UInt64(seq_in_index),
1754 field_number,
1755 expression,
1756 Datum::from(nullable),
1757 ]),
1758 diff,
1759 ));
1760 }
1761
1762 updates
1763 }
1764
1765 fn pack_type_update(
1766 &self,
1767 id: CatalogItemId,
1768 oid: u32,
1769 schema_id: &SchemaSpecifier,
1770 name: &str,
1771 owner_id: &RoleId,
1772 privileges: Datum,
1773 typ: &Type,
1774 diff: Diff,
1775 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1776 let mut out = vec![];
1777
1778 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1779 mz_sql::parse::parse(create_sql)
1780 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1781 .into_element()
1782 .ast
1783 .to_ast_string_redacted()
1784 });
1785
1786 out.push(BuiltinTableUpdate::row(
1787 &*MZ_TYPES,
1788 Row::pack_slice(&[
1789 Datum::String(&id.to_string()),
1790 Datum::UInt32(oid),
1791 Datum::String(&schema_id.to_string()),
1792 Datum::String(name),
1793 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1794 Datum::String(&owner_id.to_string()),
1795 privileges,
1796 if let Some(create_sql) = &typ.create_sql {
1797 Datum::String(create_sql)
1798 } else {
1799 Datum::Null
1800 },
1801 if let Some(redacted) = &redacted {
1802 Datum::String(redacted)
1803 } else {
1804 Datum::Null
1805 },
1806 ]),
1807 diff,
1808 ));
1809
1810 let mut row = Row::default();
1811 let mut packer = row.packer();
1812
1813 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1814 if mods.is_empty() {
1815 packer.push(Datum::Null);
1816 } else {
1817 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1818 }
1819 }
1820
1821 let index_id = match &typ.details.typ {
1822 CatalogType::Array {
1823 element_reference: element_id,
1824 } => {
1825 packer.push(Datum::String(&id.to_string()));
1826 packer.push(Datum::String(&element_id.to_string()));
1827 &MZ_ARRAY_TYPES
1828 }
1829 CatalogType::List {
1830 element_reference: element_id,
1831 element_modifiers,
1832 } => {
1833 packer.push(Datum::String(&id.to_string()));
1834 packer.push(Datum::String(&element_id.to_string()));
1835 append_modifier(&mut packer, element_modifiers);
1836 &MZ_LIST_TYPES
1837 }
1838 CatalogType::Map {
1839 key_reference: key_id,
1840 value_reference: value_id,
1841 key_modifiers,
1842 value_modifiers,
1843 } => {
1844 packer.push(Datum::String(&id.to_string()));
1845 packer.push(Datum::String(&key_id.to_string()));
1846 packer.push(Datum::String(&value_id.to_string()));
1847 append_modifier(&mut packer, key_modifiers);
1848 append_modifier(&mut packer, value_modifiers);
1849 &MZ_MAP_TYPES
1850 }
1851 CatalogType::Pseudo => {
1852 packer.push(Datum::String(&id.to_string()));
1853 &MZ_PSEUDO_TYPES
1854 }
1855 _ => {
1856 packer.push(Datum::String(&id.to_string()));
1857 &MZ_BASE_TYPES
1858 }
1859 };
1860 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1861
1862 if let Some(pg_metadata) = &typ.details.pg_metadata {
1863 out.push(BuiltinTableUpdate::row(
1864 &*MZ_TYPE_PG_METADATA,
1865 Row::pack_slice(&[
1866 Datum::String(&id.to_string()),
1867 Datum::UInt32(pg_metadata.typinput_oid),
1868 Datum::UInt32(pg_metadata.typreceive_oid),
1869 ]),
1870 diff,
1871 ));
1872 }
1873
1874 out
1875 }
1876
1877 fn pack_func_update(
1878 &self,
1879 id: CatalogItemId,
1880 schema_id: &SchemaSpecifier,
1881 name: &str,
1882 owner_id: &RoleId,
1883 func: &Func,
1884 diff: Diff,
1885 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1886 let mut updates = vec![];
1887 for func_impl_details in func.inner.func_impls() {
1888 let arg_type_ids = func_impl_details
1889 .arg_typs
1890 .iter()
1891 .map(|typ| self.get_system_type(typ).id().to_string())
1892 .collect::<Vec<_>>();
1893
1894 let mut row = Row::default();
1895 row.packer()
1896 .try_push_array(
1897 &[ArrayDimension {
1898 lower_bound: 1,
1899 length: arg_type_ids.len(),
1900 }],
1901 arg_type_ids.iter().map(|id| Datum::String(id)),
1902 )
1903 .expect(
1904 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1905 );
1906 let arg_type_ids = row.unpack_first();
1907
1908 updates.push(BuiltinTableUpdate::row(
1909 &*MZ_FUNCTIONS,
1910 Row::pack_slice(&[
1911 Datum::String(&id.to_string()),
1912 Datum::UInt32(func_impl_details.oid),
1913 Datum::String(&schema_id.to_string()),
1914 Datum::String(name),
1915 arg_type_ids,
1916 Datum::from(
1917 func_impl_details
1918 .variadic_typ
1919 .map(|typ| self.get_system_type(typ).id().to_string())
1920 .as_deref(),
1921 ),
1922 Datum::from(
1923 func_impl_details
1924 .return_typ
1925 .map(|typ| self.get_system_type(typ).id().to_string())
1926 .as_deref(),
1927 ),
1928 func_impl_details.return_is_set.into(),
1929 Datum::String(&owner_id.to_string()),
1930 ]),
1931 diff,
1932 ));
1933
1934 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1935 updates.push(BuiltinTableUpdate::row(
1936 &*MZ_AGGREGATES,
1937 Row::pack_slice(&[
1938 Datum::UInt32(func_impl_details.oid),
1939 Datum::String("n"),
1941 Datum::Int16(0),
1942 ]),
1943 diff,
1944 ));
1945 }
1946 }
1947 updates
1948 }
1949
1950 pub fn pack_op_update(
1951 &self,
1952 operator: &str,
1953 func_impl_details: FuncImplCatalogDetails,
1954 diff: Diff,
1955 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1956 let arg_type_ids = func_impl_details
1957 .arg_typs
1958 .iter()
1959 .map(|typ| self.get_system_type(typ).id().to_string())
1960 .collect::<Vec<_>>();
1961
1962 let mut row = Row::default();
1963 row.packer()
1964 .try_push_array(
1965 &[ArrayDimension {
1966 lower_bound: 1,
1967 length: arg_type_ids.len(),
1968 }],
1969 arg_type_ids.iter().map(|id| Datum::String(id)),
1970 )
1971 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1972 let arg_type_ids = row.unpack_first();
1973
1974 BuiltinTableUpdate::row(
1975 &*MZ_OPERATORS,
1976 Row::pack_slice(&[
1977 Datum::UInt32(func_impl_details.oid),
1978 Datum::String(operator),
1979 arg_type_ids,
1980 Datum::from(
1981 func_impl_details
1982 .return_typ
1983 .map(|typ| self.get_system_type(typ).id().to_string())
1984 .as_deref(),
1985 ),
1986 ]),
1987 diff,
1988 )
1989 }
1990
1991 fn pack_secret_update(
1992 &self,
1993 id: CatalogItemId,
1994 oid: u32,
1995 schema_id: &SchemaSpecifier,
1996 name: &str,
1997 owner_id: &RoleId,
1998 privileges: Datum,
1999 diff: Diff,
2000 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2001 vec![BuiltinTableUpdate::row(
2002 &*MZ_SECRETS,
2003 Row::pack_slice(&[
2004 Datum::String(&id.to_string()),
2005 Datum::UInt32(oid),
2006 Datum::String(&schema_id.to_string()),
2007 Datum::String(name),
2008 Datum::String(&owner_id.to_string()),
2009 privileges,
2010 ]),
2011 diff,
2012 )]
2013 }
2014
2015 pub fn pack_audit_log_update(
2016 &self,
2017 event: &VersionedEvent,
2018 diff: Diff,
2019 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2020 let (event_type, object_type, details, user, occurred_at): (
2021 &EventType,
2022 &ObjectType,
2023 &EventDetails,
2024 &Option<String>,
2025 u64,
2026 ) = match event {
2027 VersionedEvent::V1(ev) => (
2028 &ev.event_type,
2029 &ev.object_type,
2030 &ev.details,
2031 &ev.user,
2032 ev.occurred_at,
2033 ),
2034 };
2035 let details = Jsonb::from_serde_json(details.as_json())
2036 .map_err(|e| {
2037 Error::new(ErrorKind::Unstructured(format!(
2038 "could not pack audit log update: {}",
2039 e
2040 )))
2041 })?
2042 .into_row();
2043 let details = details
2044 .iter()
2045 .next()
2046 .expect("details created above with a single jsonb column");
2047 let dt = mz_ore::now::to_datetime(occurred_at);
2048 let id = event.sortable_id();
2049 Ok(BuiltinTableUpdate::row(
2050 &*MZ_AUDIT_EVENTS,
2051 Row::pack_slice(&[
2052 Datum::UInt64(id),
2053 Datum::String(&format!("{}", event_type)),
2054 Datum::String(&format!("{}", object_type)),
2055 details,
2056 match user {
2057 Some(user) => Datum::String(user),
2058 None => Datum::Null,
2059 },
2060 Datum::TimestampTz(dt.try_into().expect("must fit")),
2061 ]),
2062 diff,
2063 ))
2064 }
2065
2066 pub fn pack_storage_usage_update(
2067 &self,
2068 VersionedStorageUsage::V1(event): VersionedStorageUsage,
2069 diff: Diff,
2070 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2071 let id = &MZ_STORAGE_USAGE_BY_SHARD;
2072 let row = Row::pack_slice(&[
2073 Datum::UInt64(event.id),
2074 Datum::from(event.shard_id.as_deref()),
2075 Datum::UInt64(event.size_bytes),
2076 Datum::TimestampTz(
2077 mz_ore::now::to_datetime(event.collection_timestamp)
2078 .try_into()
2079 .expect("must fit"),
2080 ),
2081 ]);
2082 BuiltinTableUpdate::row(id, row, diff)
2083 }
2084
2085 pub fn pack_egress_ip_update(
2086 &self,
2087 ip: &IpNet,
2088 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2089 let id = &MZ_EGRESS_IPS;
2090 let addr = ip.addr();
2091 let row = Row::pack_slice(&[
2092 Datum::String(&addr.to_string()),
2093 Datum::Int32(ip.prefix_len().into()),
2094 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2095 ]);
2096 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2097 }
2098
2099 pub fn pack_license_key_update(
2100 &self,
2101 license_key: &ValidatedLicenseKey,
2102 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2103 let id = &MZ_LICENSE_KEYS;
2104 let row = Row::pack_slice(&[
2105 Datum::String(&license_key.id),
2106 Datum::String(&license_key.organization),
2107 Datum::String(&license_key.environment_id),
2108 Datum::TimestampTz(
2109 mz_ore::now::to_datetime(license_key.expiration * 1000)
2110 .try_into()
2111 .expect("must fit"),
2112 ),
2113 Datum::TimestampTz(
2114 mz_ore::now::to_datetime(license_key.not_before * 1000)
2115 .try_into()
2116 .expect("must fit"),
2117 ),
2118 ]);
2119 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2120 }
2121
2122 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2123 let mut updates = Vec::new();
2124 for (size, alloc) in &self.cluster_replica_sizes.0 {
2125 if alloc.disabled {
2126 continue;
2127 }
2128
2129 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2132 let MemoryLimit(ByteSize(memory_bytes)) =
2133 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2134 let DiskLimit(ByteSize(disk_bytes)) =
2135 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2136
2137 let row = Row::pack_slice(&[
2138 size.as_str().into(),
2139 u64::cast_from(alloc.scale).into(),
2140 u64::cast_from(alloc.workers).into(),
2141 cpu_limit.as_nanocpus().into(),
2142 memory_bytes.into(),
2143 disk_bytes.into(),
2144 (alloc.credits_per_hour).into(),
2145 ]);
2146
2147 updates.push(BuiltinTableUpdate::row(
2148 &*MZ_CLUSTER_REPLICA_SIZES,
2149 row,
2150 Diff::ONE,
2151 ));
2152 }
2153
2154 updates
2155 }
2156
2157 pub fn pack_subscribe_update(
2158 &self,
2159 id: GlobalId,
2160 subscribe: &ActiveSubscribe,
2161 diff: Diff,
2162 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2163 let mut row = Row::default();
2164 let mut packer = row.packer();
2165 packer.push(Datum::String(&id.to_string()));
2166 packer.push(Datum::Uuid(subscribe.session_uuid));
2167 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2168
2169 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2170 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2171
2172 let depends_on: Vec<_> = subscribe
2173 .depends_on
2174 .iter()
2175 .map(|id| id.to_string())
2176 .collect();
2177 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2178
2179 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2180 }
2181
2182 pub fn pack_session_update(
2183 &self,
2184 conn: &ConnMeta,
2185 diff: Diff,
2186 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2187 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2188 BuiltinTableUpdate::row(
2189 &*MZ_SESSIONS,
2190 Row::pack_slice(&[
2191 Datum::Uuid(conn.uuid()),
2192 Datum::UInt32(conn.conn_id().unhandled()),
2193 Datum::String(&conn.authenticated_role_id().to_string()),
2194 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2195 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2196 ]),
2197 diff,
2198 )
2199 }
2200
2201 pub fn pack_default_privileges_update(
2202 &self,
2203 default_privilege_object: &DefaultPrivilegeObject,
2204 grantee: &RoleId,
2205 acl_mode: &AclMode,
2206 diff: Diff,
2207 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2208 BuiltinTableUpdate::row(
2209 &*MZ_DEFAULT_PRIVILEGES,
2210 Row::pack_slice(&[
2211 default_privilege_object.role_id.to_string().as_str().into(),
2212 default_privilege_object
2213 .database_id
2214 .map(|database_id| database_id.to_string())
2215 .as_deref()
2216 .into(),
2217 default_privilege_object
2218 .schema_id
2219 .map(|schema_id| schema_id.to_string())
2220 .as_deref()
2221 .into(),
2222 default_privilege_object
2223 .object_type
2224 .to_string()
2225 .to_lowercase()
2226 .as_str()
2227 .into(),
2228 grantee.to_string().as_str().into(),
2229 acl_mode.to_string().as_str().into(),
2230 ]),
2231 diff,
2232 )
2233 }
2234
2235 pub fn pack_system_privileges_update(
2236 &self,
2237 privileges: MzAclItem,
2238 diff: Diff,
2239 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2240 BuiltinTableUpdate::row(
2241 &*MZ_SYSTEM_PRIVILEGES,
2242 Row::pack_slice(&[privileges.into()]),
2243 diff,
2244 )
2245 }
2246
2247 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2248 let mut row = Row::default();
2249 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2250 row.packer()
2251 .try_push_array(
2252 &[ArrayDimension {
2253 lower_bound: 1,
2254 length: flat_privileges.len(),
2255 }],
2256 flat_privileges
2257 .into_iter()
2258 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2259 )
2260 .expect("privileges is 1 dimensional, and its length is used for the array length");
2261 row
2262 }
2263
2264 pub fn pack_comment_update(
2265 &self,
2266 object_id: CommentObjectId,
2267 column_pos: Option<usize>,
2268 comment: &str,
2269 diff: Diff,
2270 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2271 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2273 let audit_type = super::object_type_to_audit_object_type(object_type);
2274 let object_type_str = audit_type.to_string();
2275
2276 let object_id_str = match object_id {
2277 CommentObjectId::Table(global_id)
2278 | CommentObjectId::View(global_id)
2279 | CommentObjectId::MaterializedView(global_id)
2280 | CommentObjectId::Source(global_id)
2281 | CommentObjectId::Sink(global_id)
2282 | CommentObjectId::Index(global_id)
2283 | CommentObjectId::Func(global_id)
2284 | CommentObjectId::Connection(global_id)
2285 | CommentObjectId::Secret(global_id)
2286 | CommentObjectId::Type(global_id)
2287 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2288 CommentObjectId::Role(role_id) => role_id.to_string(),
2289 CommentObjectId::Database(database_id) => database_id.to_string(),
2290 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2291 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2292 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2293 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2294 };
2295 let column_pos_datum = match column_pos {
2296 Some(pos) => {
2297 let pos =
2299 i32::try_from(pos).expect("we constrain this value in the planning layer");
2300 Datum::Int32(pos)
2301 }
2302 None => Datum::Null,
2303 };
2304
2305 BuiltinTableUpdate::row(
2306 &*MZ_COMMENTS,
2307 Row::pack_slice(&[
2308 Datum::String(&object_id_str),
2309 Datum::String(&object_type_str),
2310 column_pos_datum,
2311 Datum::String(comment),
2312 ]),
2313 diff,
2314 )
2315 }
2316
2317 pub fn pack_webhook_source_update(
2318 &self,
2319 item_id: CatalogItemId,
2320 diff: Diff,
2321 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2322 let url = self
2323 .try_get_webhook_url(&item_id)
2324 .expect("webhook source should exist");
2325 let url = url.to_string();
2326 let name = &self.get_entry(&item_id).name().item;
2327 let id_str = item_id.to_string();
2328
2329 BuiltinTableUpdate::row(
2330 &*MZ_WEBHOOKS_SOURCES,
2331 Row::pack_slice(&[
2332 Datum::String(&id_str),
2333 Datum::String(name),
2334 Datum::String(&url),
2335 ]),
2336 diff,
2337 )
2338 }
2339
2340 pub fn pack_source_references_update(
2341 &self,
2342 source_references: &SourceReferences,
2343 diff: Diff,
2344 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2345 let source_id = source_references.source_id.to_string();
2346 let updated_at = &source_references.updated_at;
2347 source_references
2348 .references
2349 .iter()
2350 .map(|reference| {
2351 let mut row = Row::default();
2352 let mut packer = row.packer();
2353 packer.extend([
2354 Datum::String(&source_id),
2355 reference
2356 .namespace
2357 .as_ref()
2358 .map(|s| Datum::String(s))
2359 .unwrap_or(Datum::Null),
2360 Datum::String(&reference.name),
2361 Datum::TimestampTz(
2362 mz_ore::now::to_datetime(*updated_at)
2363 .try_into()
2364 .expect("must fit"),
2365 ),
2366 ]);
2367 if reference.columns.len() > 0 {
2368 packer
2369 .try_push_array(
2370 &[ArrayDimension {
2371 lower_bound: 1,
2372 length: reference.columns.len(),
2373 }],
2374 reference.columns.iter().map(|col| Datum::String(col)),
2375 )
2376 .expect(
2377 "columns is 1 dimensional, and its length is used for the array length",
2378 );
2379 } else {
2380 packer.push(Datum::Null);
2381 }
2382
2383 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2384 })
2385 .collect()
2386 }
2387}