1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_adapter_types::dyncfgs::ENABLE_PASSWORD_AUTH;
16use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{
19 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
20 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
21 MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22 MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23 MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES,
24 MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
26 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
27 MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
28 MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
29 MZ_ROLE_AUTH, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS,
30 MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES,
31 MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
32 MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
33};
34use mz_catalog::config::AwsPrincipalContext;
35use mz_catalog::durable::SourceReferences;
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38 CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
39 MaterializedView, Sink, Table, TableDataSource, Type, View,
40};
41use mz_controller::clusters::{
42 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
43};
44use mz_controller_types::ClusterId;
45use mz_expr::MirScalarExpr;
46use mz_license_keys::ValidatedLicenseKey;
47use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_persist_client::batch::ProtoBatch;
51use mz_repr::adt::array::ArrayDimension;
52use mz_repr::adt::interval::Interval;
53use mz_repr::adt::jsonb::Jsonb;
54use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
55use mz_repr::adt::regex;
56use mz_repr::network_policy_id::NetworkPolicyId;
57use mz_repr::refresh_schedule::RefreshEvery;
58use mz_repr::role_id::RoleId;
59use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
60use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
61use mz_sql::catalog::{
62 CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
63 TypeCategory,
64};
65use mz_sql::func::FuncImplCatalogDetails;
66use mz_sql::names::{
67 CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
68};
69use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
70use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
71use mz_sql::session::vars::SessionVars;
72use mz_sql_parser::ast::display::AstDisplay;
73use mz_storage_client::client::TableData;
74use mz_storage_types::connections::KafkaConnection;
75use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
76use mz_storage_types::connections::inline::ReferencedConnection;
77use mz_storage_types::connections::string_or_secret::StringOrSecret;
78use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
79use mz_storage_types::sources::{
80 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
81};
82use smallvec::smallvec;
83
84use crate::active_compute_sink::ActiveSubscribe;
86use crate::catalog::CatalogState;
87use crate::coord::ConnMeta;
88
89#[derive(Debug, Clone)]
91pub struct BuiltinTableUpdate<T = CatalogItemId> {
92 pub id: T,
94 pub data: TableData,
96}
97
98impl<T> BuiltinTableUpdate<T> {
99 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
101 BuiltinTableUpdate {
102 id,
103 data: TableData::Rows(vec![(row, diff)]),
104 }
105 }
106
107 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
108 BuiltinTableUpdate {
109 id,
110 data: TableData::Batches(smallvec![batch]),
111 }
112 }
113}
114
115impl CatalogState {
116 pub fn resolve_builtin_table_updates(
117 &self,
118 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
119 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
120 builtin_table_update
121 .into_iter()
122 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
123 .collect()
124 }
125
126 pub fn resolve_builtin_table_update(
127 &self,
128 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
129 ) -> BuiltinTableUpdate<CatalogItemId> {
130 let id = self.resolve_builtin_table(id);
131 BuiltinTableUpdate { id, data }
132 }
133
134 pub fn pack_depends_update(
135 &self,
136 depender: CatalogItemId,
137 dependee: CatalogItemId,
138 diff: Diff,
139 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
140 let row = Row::pack_slice(&[
141 Datum::String(&depender.to_string()),
142 Datum::String(&dependee.to_string()),
143 ]);
144 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
145 }
146
147 pub(super) fn pack_database_update(
148 &self,
149 database_id: &DatabaseId,
150 diff: Diff,
151 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
152 let database = self.get_database(database_id);
153 let row = self.pack_privilege_array_row(database.privileges());
154 let privileges = row.unpack_first();
155 BuiltinTableUpdate::row(
156 &*MZ_DATABASES,
157 Row::pack_slice(&[
158 Datum::String(&database.id.to_string()),
159 Datum::UInt32(database.oid),
160 Datum::String(database.name()),
161 Datum::String(&database.owner_id.to_string()),
162 privileges,
163 ]),
164 diff,
165 )
166 }
167
168 pub(super) fn pack_schema_update(
169 &self,
170 database_spec: &ResolvedDatabaseSpecifier,
171 schema_id: &SchemaId,
172 diff: Diff,
173 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
174 let (database_id, schema) = match database_spec {
175 ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
176 ResolvedDatabaseSpecifier::Id(id) => (
177 Some(id.to_string()),
178 &self.database_by_id[id].schemas_by_id[schema_id],
179 ),
180 };
181 let row = self.pack_privilege_array_row(schema.privileges());
182 let privileges = row.unpack_first();
183 BuiltinTableUpdate::row(
184 &*MZ_SCHEMAS,
185 Row::pack_slice(&[
186 Datum::String(&schema_id.to_string()),
187 Datum::UInt32(schema.oid),
188 Datum::from(database_id.as_deref()),
189 Datum::String(&schema.name.schema),
190 Datum::String(&schema.owner_id.to_string()),
191 privileges,
192 ]),
193 diff,
194 )
195 }
196
197 pub(super) fn pack_role_auth_update(
198 &self,
199 id: RoleId,
200 diff: Diff,
201 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
202 let role_auth = self.get_role_auth(&id);
203 let role = self.get_role(&id);
204 BuiltinTableUpdate::row(
205 &*MZ_ROLE_AUTH,
206 Row::pack_slice(&[
207 Datum::String(&role_auth.role_id.to_string()),
208 Datum::UInt32(role.oid),
209 match &role_auth.password_hash {
210 Some(hash) => Datum::String(hash),
211 None => Datum::Null,
212 },
213 Datum::TimestampTz(
214 mz_ore::now::to_datetime(role_auth.updated_at)
215 .try_into()
216 .expect("must fit"),
217 ),
218 ]),
219 diff,
220 )
221 }
222
223 pub(super) fn pack_role_update(
224 &self,
225 id: RoleId,
226 diff: Diff,
227 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
228 match id {
229 RoleId::Public => vec![],
231 id => {
232 let role = self.get_role(&id);
233 let self_managed_auth_enabled = self
234 .system_config()
235 .get(ENABLE_PASSWORD_AUTH.name())
236 .ok()
237 .map(|v| v.value() == "on")
238 .unwrap_or(false);
239
240 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
241
242 let cloud_login_regex = "^[^@]+@[^@]+\\.[^@]+$";
262 let matches = regex::Regex::new(cloud_login_regex, true)
263 .expect("valid regex")
264 .is_match(&role.name);
265
266 let rolcanlogin = if self_managed_auth_enabled {
267 role.attributes.login.unwrap_or(false)
268 } else {
269 builtin_supers.contains(&role.id) || matches
270 };
271
272 let rolsuper = if self_managed_auth_enabled {
273 Datum::from(role.attributes.superuser.unwrap_or(false))
274 } else if builtin_supers.contains(&role.id) {
275 Datum::from(true)
276 } else {
277 Datum::Null
278 };
279
280 let role_update = BuiltinTableUpdate::row(
281 &*MZ_ROLES,
282 Row::pack_slice(&[
283 Datum::String(&role.id.to_string()),
284 Datum::UInt32(role.oid),
285 Datum::String(&role.name),
286 Datum::from(role.attributes.inherit),
287 Datum::from(rolcanlogin),
288 rolsuper,
289 ]),
290 diff,
291 );
292 let mut updates = vec![role_update];
293
294 let session_vars_reference = SessionVars::new_unchecked(
297 &mz_build_info::DUMMY_BUILD_INFO,
298 SYSTEM_USER.clone(),
299 None,
300 );
301
302 for (name, val) in role.vars() {
303 let result = session_vars_reference
304 .inspect(name)
305 .and_then(|var| var.check(val.borrow()));
306 let Ok(formatted_val) = result else {
307 tracing::error!(?name, ?val, ?result, "found invalid role default var");
310 continue;
311 };
312
313 let role_var_update = BuiltinTableUpdate::row(
314 &*MZ_ROLE_PARAMETERS,
315 Row::pack_slice(&[
316 Datum::String(&role.id.to_string()),
317 Datum::String(name),
318 Datum::String(&formatted_val),
319 ]),
320 diff,
321 );
322 updates.push(role_var_update);
323 }
324
325 updates
326 }
327 }
328 }
329
330 pub(super) fn pack_role_members_update(
331 &self,
332 role_id: RoleId,
333 member_id: RoleId,
334 diff: Diff,
335 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
336 let grantor_id = self
337 .get_role(&member_id)
338 .membership
339 .map
340 .get(&role_id)
341 .expect("catalog out of sync");
342 BuiltinTableUpdate::row(
343 &*MZ_ROLE_MEMBERS,
344 Row::pack_slice(&[
345 Datum::String(&role_id.to_string()),
346 Datum::String(&member_id.to_string()),
347 Datum::String(&grantor_id.to_string()),
348 ]),
349 diff,
350 )
351 }
352
353 pub(super) fn pack_cluster_update(
354 &self,
355 name: &str,
356 diff: Diff,
357 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
358 let id = self.clusters_by_name[name];
359 let cluster = &self.clusters_by_id[&id];
360 let row = self.pack_privilege_array_row(cluster.privileges());
361 let privileges = row.unpack_first();
362 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
363 match &cluster.config.variant {
364 ClusterVariant::Managed(config) => (
365 Some(config.size.as_str()),
366 Some(self.cluster_replica_size_has_disk(&config.size)),
367 Some(config.replication_factor),
368 if config.availability_zones.is_empty() {
369 None
370 } else {
371 Some(config.availability_zones.clone())
372 },
373 Some(config.logging.log_logging),
374 config.logging.interval.map(|d| {
375 Interval::from_duration(&d)
376 .expect("planning ensured this convertible back to interval")
377 }),
378 ),
379 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
380 };
381
382 let mut row = Row::default();
383 let mut packer = row.packer();
384 packer.extend([
385 Datum::String(&id.to_string()),
386 Datum::String(name),
387 Datum::String(&cluster.owner_id.to_string()),
388 privileges,
389 cluster.is_managed().into(),
390 size.into(),
391 replication_factor.into(),
392 disk.into(),
393 ]);
394 if let Some(azs) = azs {
395 packer.push_list(azs.iter().map(|az| Datum::String(az)));
396 } else {
397 packer.push(Datum::Null);
398 }
399 packer.push(Datum::from(introspection_debugging));
400 packer.push(Datum::from(introspection_interval));
401
402 let mut updates = Vec::new();
403
404 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
405
406 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
407 let row = match managed_config.schedule {
408 ClusterSchedule::Manual => Row::pack_slice(&[
409 Datum::String(&id.to_string()),
410 Datum::String("manual"),
411 Datum::Null,
412 ]),
413 ClusterSchedule::Refresh {
414 hydration_time_estimate,
415 } => Row::pack_slice(&[
416 Datum::String(&id.to_string()),
417 Datum::String("on-refresh"),
418 Datum::Interval(
419 Interval::from_duration(&hydration_time_estimate)
420 .expect("planning ensured that this is convertible back to Interval"),
421 ),
422 ]),
423 };
424 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
425 }
426
427 updates.push(BuiltinTableUpdate::row(
428 &*MZ_CLUSTER_WORKLOAD_CLASSES,
429 Row::pack_slice(&[
430 Datum::String(&id.to_string()),
431 Datum::from(cluster.config.workload_class.as_deref()),
432 ]),
433 diff,
434 ));
435
436 updates
437 }
438
439 pub(super) fn pack_cluster_replica_update(
440 &self,
441 cluster_id: ClusterId,
442 name: &str,
443 diff: Diff,
444 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
445 let cluster = &self.clusters_by_id[&cluster_id];
446 let id = cluster.replica_id(name).expect("Must exist");
447 let replica = cluster.replica(id).expect("Must exist");
448
449 let (size, disk, az, internal, pending) = match &replica.config.location {
450 ReplicaLocation::Managed(ManagedReplicaLocation {
453 size,
454 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
455 allocation: _,
456 billed_as: _,
457 internal,
458 pending,
459 }) => (
460 Some(&**size),
461 Some(self.cluster_replica_size_has_disk(size)),
462 Some(az.as_str()),
463 *internal,
464 *pending,
465 ),
466 ReplicaLocation::Managed(ManagedReplicaLocation {
467 size,
468 availability_zones: _,
469 allocation: _,
470 billed_as: _,
471 internal,
472 pending,
473 }) => (
474 Some(&**size),
475 Some(self.cluster_replica_size_has_disk(size)),
476 None,
477 *internal,
478 *pending,
479 ),
480 _ => (None, None, None, false, false),
481 };
482
483 let cluster_replica_update = BuiltinTableUpdate::row(
484 &*MZ_CLUSTER_REPLICAS,
485 Row::pack_slice(&[
486 Datum::String(&id.to_string()),
487 Datum::String(name),
488 Datum::String(&cluster_id.to_string()),
489 Datum::from(size),
490 Datum::from(az),
491 Datum::String(&replica.owner_id.to_string()),
492 Datum::from(disk),
493 ]),
494 diff,
495 );
496
497 let mut updates = vec![cluster_replica_update];
498
499 if internal {
500 let update = BuiltinTableUpdate::row(
501 &*MZ_INTERNAL_CLUSTER_REPLICAS,
502 Row::pack_slice(&[Datum::String(&id.to_string())]),
503 diff,
504 );
505 updates.push(update);
506 }
507
508 if pending {
509 let update = BuiltinTableUpdate::row(
510 &*MZ_PENDING_CLUSTER_REPLICAS,
511 Row::pack_slice(&[Datum::String(&id.to_string())]),
512 diff,
513 );
514 updates.push(update);
515 }
516
517 updates
518 }
519
520 pub(crate) fn pack_network_policy_update(
521 &self,
522 policy_id: &NetworkPolicyId,
523 diff: Diff,
524 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
525 let policy = self.get_network_policy(policy_id);
526 let row = self.pack_privilege_array_row(&policy.privileges);
527 let privileges = row.unpack_first();
528 let mut updates = Vec::new();
529 for ref rule in policy.rules.clone() {
530 updates.push(BuiltinTableUpdate::row(
531 &*MZ_NETWORK_POLICY_RULES,
532 Row::pack_slice(&[
533 Datum::String(&rule.name),
534 Datum::String(&policy.id.to_string()),
535 Datum::String(&rule.action.to_string()),
536 Datum::String(&rule.address.to_string()),
537 Datum::String(&rule.direction.to_string()),
538 ]),
539 diff,
540 ));
541 }
542 updates.push(BuiltinTableUpdate::row(
543 &*MZ_NETWORK_POLICIES,
544 Row::pack_slice(&[
545 Datum::String(&policy.id.to_string()),
546 Datum::String(&policy.name),
547 Datum::String(&policy.owner_id.to_string()),
548 privileges,
549 Datum::UInt32(policy.oid.clone()),
550 ]),
551 diff,
552 ));
553
554 Ok(updates)
555 }
556
557 pub(super) fn pack_item_update(
558 &self,
559 id: CatalogItemId,
560 diff: Diff,
561 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
562 let entry = self.get_entry(&id);
563 let oid = entry.oid();
564 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
565 let schema_id = &self
566 .get_schema(
567 &entry.name().qualifiers.database_spec,
568 &entry.name().qualifiers.schema_spec,
569 conn_id,
570 )
571 .id;
572 let name = &entry.name().item;
573 let owner_id = entry.owner_id();
574 let privileges_row = self.pack_privilege_array_row(entry.privileges());
575 let privileges = privileges_row.unpack_first();
576 let mut updates = match entry.item() {
577 CatalogItem::Log(_) => self.pack_source_update(
578 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
579 privileges, diff, None,
580 ),
581 CatalogItem::Index(index) => {
582 self.pack_index_update(id, oid, name, owner_id, index, diff)
583 }
584 CatalogItem::Table(table) => {
585 let mut updates = self
586 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
587
588 if let TableDataSource::DataSource {
589 desc: data_source,
590 timeline: _,
591 } = &table.data_source
592 {
593 updates.extend(match data_source {
594 DataSourceDesc::IngestionExport {
595 ingestion_id,
596 external_reference: UnresolvedItemName(external_reference),
597 details: _,
598 data_config: _,
599 } => {
600 let ingestion_entry = self
601 .get_entry(ingestion_id)
602 .source_desc()
603 .expect("primary source exists")
604 .expect("primary source is a source");
605
606 match ingestion_entry.connection.name() {
607 "postgres" => {
608 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
609 let schema_name = external_reference[1].to_ast_string_simple();
615 let table_name = external_reference[2].to_ast_string_simple();
616
617 self.pack_postgres_source_tables_update(
618 id,
619 &schema_name,
620 &table_name,
621 diff,
622 )
623 }
624 "mysql" => {
625 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
626 let schema_name = external_reference[0].to_ast_string_simple();
627 let table_name = external_reference[1].to_ast_string_simple();
628
629 self.pack_mysql_source_tables_update(
630 id,
631 &schema_name,
632 &table_name,
633 diff,
634 )
635 }
636 "sql-server" => {
637 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
638 let schema_name = external_reference[1].to_ast_string_simple();
643 let table_name = external_reference[2].to_ast_string_simple();
644
645 self.pack_sql_server_source_table_update(
646 id,
647 &schema_name,
648 &table_name,
649 diff,
650 )
651 }
652 "load-generator" => vec![],
655 "kafka" => {
656 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
657 let topic = external_reference[0].to_ast_string_simple();
658 let envelope = data_source.envelope();
659 let (key_format, value_format) = data_source.formats();
660
661 self.pack_kafka_source_tables_update(
662 id,
663 &topic,
664 envelope,
665 key_format,
666 value_format,
667 diff,
668 )
669 }
670 s => unreachable!("{s} sources do not have tables"),
671 }
672 }
673 _ => vec![],
674 });
675 }
676
677 updates
678 }
679 CatalogItem::Source(source) => {
680 let source_type = source.source_type();
681 let connection_id = source.connection_id();
682 let envelope = source.data_source.envelope();
683 let cluster_entry = match source.data_source {
684 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
687 self.get_entry(&ingestion_id)
688 }
689 _ => entry,
690 };
691
692 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
693
694 let (key_format, value_format) = source.data_source.formats();
695
696 let mut updates = self.pack_source_update(
697 id,
698 oid,
699 schema_id,
700 name,
701 source_type,
702 connection_id,
703 envelope,
704 key_format,
705 value_format,
706 cluster_id.as_deref(),
707 owner_id,
708 privileges,
709 diff,
710 source.create_sql.as_ref(),
711 );
712
713 updates.extend(match &source.data_source {
714 DataSourceDesc::Ingestion { desc, .. }
715 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
716 GenericSourceConnection::Postgres(postgres) => {
717 self.pack_postgres_source_update(id, postgres, diff)
718 }
719 GenericSourceConnection::Kafka(kafka) => {
720 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
721 }
722 _ => vec![],
723 },
724 DataSourceDesc::IngestionExport {
725 ingestion_id,
726 external_reference: UnresolvedItemName(external_reference),
727 details: _,
728 data_config: _,
729 } => {
730 let ingestion_entry = self
731 .get_entry(ingestion_id)
732 .source_desc()
733 .expect("primary source exists")
734 .expect("primary source is a source");
735
736 match ingestion_entry.connection.name() {
737 "postgres" => {
738 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
739 let schema_name = external_reference[1].to_ast_string_simple();
745 let table_name = external_reference[2].to_ast_string_simple();
746
747 self.pack_postgres_source_tables_update(
748 id,
749 &schema_name,
750 &table_name,
751 diff,
752 )
753 }
754 "mysql" => {
755 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
756 let schema_name = external_reference[0].to_ast_string_simple();
757 let table_name = external_reference[1].to_ast_string_simple();
758
759 self.pack_mysql_source_tables_update(
760 id,
761 &schema_name,
762 &table_name,
763 diff,
764 )
765 }
766 "sql-server" => {
767 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
768 let schema_name = external_reference[1].to_ast_string_simple();
773 let table_name = external_reference[2].to_ast_string_simple();
774
775 self.pack_sql_server_source_table_update(
776 id,
777 &schema_name,
778 &table_name,
779 diff,
780 )
781 }
782 "load-generator" => vec![],
785 s => unreachable!("{s} sources do not have subsources"),
786 }
787 }
788 DataSourceDesc::Webhook { .. } => {
789 vec![self.pack_webhook_source_update(id, diff)]
790 }
791 _ => vec![],
792 });
793
794 updates
795 }
796 CatalogItem::View(view) => {
797 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
798 }
799 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
800 id, oid, schema_id, name, owner_id, privileges, mview, diff,
801 ),
802 CatalogItem::Sink(sink) => {
803 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
804 }
805 CatalogItem::Type(ty) => {
806 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
807 }
808 CatalogItem::Func(func) => {
809 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
810 }
811 CatalogItem::Secret(_) => {
812 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
813 }
814 CatalogItem::Connection(connection) => self.pack_connection_update(
815 id, oid, schema_id, name, owner_id, privileges, connection, diff,
816 ),
817 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
818 id, oid, schema_id, name, owner_id, privileges, ct, diff,
819 ),
820 };
821
822 if !entry.item().is_temporary() {
823 for dependee in entry.item().references().items() {
826 updates.push(self.pack_depends_update(id, *dependee, diff))
827 }
828 }
829
830 let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
831 if let Ok(desc) = entry.desc_latest(&full_name) {
833 let defaults = match entry.item() {
834 CatalogItem::Table(Table {
835 data_source: TableDataSource::TableWrites { defaults },
836 ..
837 }) => Some(defaults),
838 _ => None,
839 };
840 for (i, (column_name, column_type)) in desc.iter().enumerate() {
841 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
842 let default: Datum = default
843 .as_ref()
844 .map(|d| Datum::String(d))
845 .unwrap_or(Datum::Null);
846 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
847 let (type_name, type_oid) = match &column_type.scalar_type {
848 SqlScalarType::List {
849 custom_id: Some(custom_id),
850 ..
851 }
852 | SqlScalarType::Map {
853 custom_id: Some(custom_id),
854 ..
855 }
856 | SqlScalarType::Record {
857 custom_id: Some(custom_id),
858 ..
859 } => {
860 let entry = self.get_entry(custom_id);
861 let name = &*entry.name().item;
876 let oid = entry.oid();
877 (name, oid)
878 }
879 _ => (pgtype.name(), pgtype.oid()),
880 };
881 updates.push(BuiltinTableUpdate::row(
882 &*MZ_COLUMNS,
883 Row::pack_slice(&[
884 Datum::String(&id.to_string()),
885 Datum::String(column_name),
886 Datum::UInt64(u64::cast_from(i + 1)),
887 Datum::from(column_type.nullable),
888 Datum::String(type_name),
889 default,
890 Datum::UInt32(type_oid),
891 Datum::Int32(pgtype.typmod()),
892 ]),
893 diff,
894 ));
895 }
896 }
897
898 if let Some(cw) = entry.item().initial_logical_compaction_window() {
900 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
901 }
902
903 updates
904 }
905
906 fn pack_history_retention_strategy_update(
907 &self,
908 id: CatalogItemId,
909 cw: CompactionWindow,
910 diff: Diff,
911 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
912 let cw: u64 = cw.comparable_timestamp().into();
913 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
914 .expect("must serialize");
915 BuiltinTableUpdate::row(
916 &*MZ_HISTORY_RETENTION_STRATEGIES,
917 Row::pack_slice(&[
918 Datum::String(&id.to_string()),
919 Datum::String("FOR"),
921 cw.into_row().into_element(),
922 ]),
923 diff,
924 )
925 }
926
927 fn pack_table_update(
928 &self,
929 id: CatalogItemId,
930 oid: u32,
931 schema_id: &SchemaSpecifier,
932 name: &str,
933 owner_id: &RoleId,
934 privileges: Datum,
935 diff: Diff,
936 table: &Table,
937 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
938 let redacted = table.create_sql.as_ref().map(|create_sql| {
939 mz_sql::parse::parse(create_sql)
940 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
941 .into_element()
942 .ast
943 .to_ast_string_redacted()
944 });
945 let source_id = if let TableDataSource::DataSource {
946 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
947 ..
948 } = &table.data_source
949 {
950 Some(ingestion_id.to_string())
951 } else {
952 None
953 };
954
955 vec![BuiltinTableUpdate::row(
956 &*MZ_TABLES,
957 Row::pack_slice(&[
958 Datum::String(&id.to_string()),
959 Datum::UInt32(oid),
960 Datum::String(&schema_id.to_string()),
961 Datum::String(name),
962 Datum::String(&owner_id.to_string()),
963 privileges,
964 if let Some(create_sql) = &table.create_sql {
965 Datum::String(create_sql)
966 } else {
967 Datum::Null
968 },
969 if let Some(redacted) = &redacted {
970 Datum::String(redacted)
971 } else {
972 Datum::Null
973 },
974 if let Some(source_id) = source_id.as_ref() {
975 Datum::String(source_id)
976 } else {
977 Datum::Null
978 },
979 ]),
980 diff,
981 )]
982 }
983
984 fn pack_source_update(
985 &self,
986 id: CatalogItemId,
987 oid: u32,
988 schema_id: &SchemaSpecifier,
989 name: &str,
990 source_desc_name: &str,
991 connection_id: Option<CatalogItemId>,
992 envelope: Option<&str>,
993 key_format: Option<&str>,
994 value_format: Option<&str>,
995 cluster_id: Option<&str>,
996 owner_id: &RoleId,
997 privileges: Datum,
998 diff: Diff,
999 create_sql: Option<&String>,
1000 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1001 let redacted = create_sql.map(|create_sql| {
1002 let create_stmt = mz_sql::parse::parse(create_sql)
1003 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1004 .into_element()
1005 .ast;
1006 create_stmt.to_ast_string_redacted()
1007 });
1008 vec![BuiltinTableUpdate::row(
1009 &*MZ_SOURCES,
1010 Row::pack_slice(&[
1011 Datum::String(&id.to_string()),
1012 Datum::UInt32(oid),
1013 Datum::String(&schema_id.to_string()),
1014 Datum::String(name),
1015 Datum::String(source_desc_name),
1016 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
1017 Datum::Null,
1020 Datum::from(envelope),
1021 Datum::from(key_format),
1022 Datum::from(value_format),
1023 Datum::from(cluster_id),
1024 Datum::String(&owner_id.to_string()),
1025 privileges,
1026 if let Some(create_sql) = create_sql {
1027 Datum::String(create_sql)
1028 } else {
1029 Datum::Null
1030 },
1031 if let Some(redacted) = &redacted {
1032 Datum::String(redacted)
1033 } else {
1034 Datum::Null
1035 },
1036 ]),
1037 diff,
1038 )]
1039 }
1040
1041 fn pack_postgres_source_update(
1042 &self,
1043 id: CatalogItemId,
1044 postgres: &PostgresSourceConnection<ReferencedConnection>,
1045 diff: Diff,
1046 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1047 vec![BuiltinTableUpdate::row(
1048 &*MZ_POSTGRES_SOURCES,
1049 Row::pack_slice(&[
1050 Datum::String(&id.to_string()),
1051 Datum::String(&postgres.publication_details.slot),
1052 Datum::from(postgres.publication_details.timeline_id),
1053 ]),
1054 diff,
1055 )]
1056 }
1057
1058 fn pack_kafka_source_update(
1059 &self,
1060 item_id: CatalogItemId,
1061 collection_id: GlobalId,
1062 kafka: &KafkaSourceConnection<ReferencedConnection>,
1063 diff: Diff,
1064 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1065 vec![BuiltinTableUpdate::row(
1066 &*MZ_KAFKA_SOURCES,
1067 Row::pack_slice(&[
1068 Datum::String(&item_id.to_string()),
1069 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1070 Datum::String(&kafka.topic),
1071 ]),
1072 diff,
1073 )]
1074 }
1075
1076 fn pack_postgres_source_tables_update(
1077 &self,
1078 id: CatalogItemId,
1079 schema_name: &str,
1080 table_name: &str,
1081 diff: Diff,
1082 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1083 vec![BuiltinTableUpdate::row(
1084 &*MZ_POSTGRES_SOURCE_TABLES,
1085 Row::pack_slice(&[
1086 Datum::String(&id.to_string()),
1087 Datum::String(schema_name),
1088 Datum::String(table_name),
1089 ]),
1090 diff,
1091 )]
1092 }
1093
1094 fn pack_mysql_source_tables_update(
1095 &self,
1096 id: CatalogItemId,
1097 schema_name: &str,
1098 table_name: &str,
1099 diff: Diff,
1100 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1101 vec![BuiltinTableUpdate::row(
1102 &*MZ_MYSQL_SOURCE_TABLES,
1103 Row::pack_slice(&[
1104 Datum::String(&id.to_string()),
1105 Datum::String(schema_name),
1106 Datum::String(table_name),
1107 ]),
1108 diff,
1109 )]
1110 }
1111
1112 fn pack_sql_server_source_table_update(
1113 &self,
1114 id: CatalogItemId,
1115 schema_name: &str,
1116 table_name: &str,
1117 diff: Diff,
1118 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1119 vec![BuiltinTableUpdate::row(
1120 &*MZ_SQL_SERVER_SOURCE_TABLES,
1121 Row::pack_slice(&[
1122 Datum::String(&id.to_string()),
1123 Datum::String(schema_name),
1124 Datum::String(table_name),
1125 ]),
1126 diff,
1127 )]
1128 }
1129
1130 fn pack_kafka_source_tables_update(
1131 &self,
1132 id: CatalogItemId,
1133 topic: &str,
1134 envelope: Option<&str>,
1135 key_format: Option<&str>,
1136 value_format: Option<&str>,
1137 diff: Diff,
1138 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1139 vec![BuiltinTableUpdate::row(
1140 &*MZ_KAFKA_SOURCE_TABLES,
1141 Row::pack_slice(&[
1142 Datum::String(&id.to_string()),
1143 Datum::String(topic),
1144 Datum::from(envelope),
1145 Datum::from(key_format),
1146 Datum::from(value_format),
1147 ]),
1148 diff,
1149 )]
1150 }
1151
1152 fn pack_connection_update(
1153 &self,
1154 id: CatalogItemId,
1155 oid: u32,
1156 schema_id: &SchemaSpecifier,
1157 name: &str,
1158 owner_id: &RoleId,
1159 privileges: Datum,
1160 connection: &Connection,
1161 diff: Diff,
1162 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1163 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1164 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1165 .into_element()
1166 .ast;
1167 let mut updates = vec![BuiltinTableUpdate::row(
1168 &*MZ_CONNECTIONS,
1169 Row::pack_slice(&[
1170 Datum::String(&id.to_string()),
1171 Datum::UInt32(oid),
1172 Datum::String(&schema_id.to_string()),
1173 Datum::String(name),
1174 Datum::String(match connection.details {
1175 ConnectionDetails::Kafka { .. } => "kafka",
1176 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1177 ConnectionDetails::Postgres { .. } => "postgres",
1178 ConnectionDetails::Aws(..) => "aws",
1179 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1180 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1181 ConnectionDetails::MySql { .. } => "mysql",
1182 ConnectionDetails::SqlServer(_) => "sql-server",
1183 ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1184 }),
1185 Datum::String(&owner_id.to_string()),
1186 privileges,
1187 Datum::String(&connection.create_sql),
1188 Datum::String(&create_stmt.to_ast_string_redacted()),
1189 ]),
1190 diff,
1191 )];
1192 match connection.details {
1193 ConnectionDetails::Kafka(ref kafka) => {
1194 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1195 }
1196 ConnectionDetails::Aws(ref aws_config) => {
1197 match self.pack_aws_connection_update(id, aws_config, diff) {
1198 Ok(update) => {
1199 updates.push(update);
1200 }
1201 Err(e) => {
1202 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1203 }
1204 }
1205 }
1206 ConnectionDetails::AwsPrivatelink(_) => {
1207 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1208 updates.push(self.pack_aws_privatelink_connection_update(
1209 id,
1210 aws_principal_context,
1211 diff,
1212 ));
1213 } else {
1214 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1215 }
1216 }
1217 ConnectionDetails::Ssh {
1218 ref key_1,
1219 ref key_2,
1220 ..
1221 } => {
1222 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1223 }
1224 ConnectionDetails::Csr(_)
1225 | ConnectionDetails::Postgres(_)
1226 | ConnectionDetails::MySql(_)
1227 | ConnectionDetails::SqlServer(_)
1228 | ConnectionDetails::IcebergCatalog(_) => (),
1229 };
1230 updates
1231 }
1232
1233 pub(crate) fn pack_ssh_tunnel_connection_update(
1234 &self,
1235 id: CatalogItemId,
1236 key_1: &SshKey,
1237 key_2: &SshKey,
1238 diff: Diff,
1239 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1240 BuiltinTableUpdate::row(
1241 &*MZ_SSH_TUNNEL_CONNECTIONS,
1242 Row::pack_slice(&[
1243 Datum::String(&id.to_string()),
1244 Datum::String(key_1.public_key().as_str()),
1245 Datum::String(key_2.public_key().as_str()),
1246 ]),
1247 diff,
1248 )
1249 }
1250
1251 fn pack_kafka_connection_update(
1252 &self,
1253 id: CatalogItemId,
1254 kafka: &KafkaConnection<ReferencedConnection>,
1255 diff: Diff,
1256 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1257 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1258 let mut row = Row::default();
1259 row.packer()
1260 .try_push_array(
1261 &[ArrayDimension {
1262 lower_bound: 1,
1263 length: kafka.brokers.len(),
1264 }],
1265 kafka
1266 .brokers
1267 .iter()
1268 .map(|broker| Datum::String(&broker.address)),
1269 )
1270 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1271 let brokers = row.unpack_first();
1272 vec![BuiltinTableUpdate::row(
1273 &*MZ_KAFKA_CONNECTIONS,
1274 Row::pack_slice(&[
1275 Datum::String(&id.to_string()),
1276 brokers,
1277 Datum::String(&progress_topic),
1278 ]),
1279 diff,
1280 )]
1281 }
1282
1283 pub fn pack_aws_privatelink_connection_update(
1284 &self,
1285 connection_id: CatalogItemId,
1286 aws_principal_context: &AwsPrincipalContext,
1287 diff: Diff,
1288 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1289 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1290 let row = Row::pack_slice(&[
1291 Datum::String(&connection_id.to_string()),
1292 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1293 ]);
1294 BuiltinTableUpdate::row(id, row, diff)
1295 }
1296
1297 pub fn pack_aws_connection_update(
1298 &self,
1299 connection_id: CatalogItemId,
1300 aws_config: &AwsConnection,
1301 diff: Diff,
1302 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1303 let id = &MZ_AWS_CONNECTIONS;
1304
1305 let mut access_key_id = None;
1306 let mut access_key_id_secret_id = None;
1307 let mut secret_access_key_secret_id = None;
1308 let mut session_token = None;
1309 let mut session_token_secret_id = None;
1310 let mut assume_role_arn = None;
1311 let mut assume_role_session_name = None;
1312 let mut principal = None;
1313 let mut external_id = None;
1314 let mut example_trust_policy = None;
1315 match &aws_config.auth {
1316 AwsAuth::Credentials(credentials) => {
1317 match &credentials.access_key_id {
1318 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1319 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1320 }
1321 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1322 match credentials.session_token.as_ref() {
1323 None => (),
1324 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1325 Some(StringOrSecret::Secret(s)) => {
1326 session_token_secret_id = Some(s.to_string())
1327 }
1328 }
1329 }
1330 AwsAuth::AssumeRole(assume_role) => {
1331 assume_role_arn = Some(assume_role.arn.as_str());
1332 assume_role_session_name = assume_role.session_name.as_deref();
1333 principal = self
1334 .config
1335 .connection_context
1336 .aws_connection_role_arn
1337 .as_deref();
1338 external_id =
1339 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1340 example_trust_policy = {
1341 let policy = assume_role
1342 .example_trust_policy(&self.config.connection_context, connection_id)?;
1343 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1344 Some(policy.into_row())
1345 };
1346 }
1347 }
1348
1349 let row = Row::pack_slice(&[
1350 Datum::String(&connection_id.to_string()),
1351 Datum::from(aws_config.endpoint.as_deref()),
1352 Datum::from(aws_config.region.as_deref()),
1353 Datum::from(access_key_id),
1354 Datum::from(access_key_id_secret_id.as_deref()),
1355 Datum::from(secret_access_key_secret_id.as_deref()),
1356 Datum::from(session_token),
1357 Datum::from(session_token_secret_id.as_deref()),
1358 Datum::from(assume_role_arn),
1359 Datum::from(assume_role_session_name),
1360 Datum::from(principal),
1361 Datum::from(external_id.as_deref()),
1362 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1363 ]);
1364
1365 Ok(BuiltinTableUpdate::row(id, row, diff))
1366 }
1367
1368 fn pack_view_update(
1369 &self,
1370 id: CatalogItemId,
1371 oid: u32,
1372 schema_id: &SchemaSpecifier,
1373 name: &str,
1374 owner_id: &RoleId,
1375 privileges: Datum,
1376 view: &View,
1377 diff: Diff,
1378 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1379 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1380 .unwrap_or_else(|e| {
1381 panic!(
1382 "create_sql cannot be invalid: `{}` --- error: `{}`",
1383 view.create_sql, e
1384 )
1385 })
1386 .into_element()
1387 .ast;
1388 let query = match &create_stmt {
1389 Statement::CreateView(stmt) => &stmt.definition.query,
1390 _ => unreachable!(),
1391 };
1392
1393 let mut query_string = query.to_ast_string_stable();
1394 query_string.push(';');
1397
1398 vec![BuiltinTableUpdate::row(
1399 &*MZ_VIEWS,
1400 Row::pack_slice(&[
1401 Datum::String(&id.to_string()),
1402 Datum::UInt32(oid),
1403 Datum::String(&schema_id.to_string()),
1404 Datum::String(name),
1405 Datum::String(&query_string),
1406 Datum::String(&owner_id.to_string()),
1407 privileges,
1408 Datum::String(&view.create_sql),
1409 Datum::String(&create_stmt.to_ast_string_redacted()),
1410 ]),
1411 diff,
1412 )]
1413 }
1414
1415 fn pack_materialized_view_update(
1416 &self,
1417 id: CatalogItemId,
1418 oid: u32,
1419 schema_id: &SchemaSpecifier,
1420 name: &str,
1421 owner_id: &RoleId,
1422 privileges: Datum,
1423 mview: &MaterializedView,
1424 diff: Diff,
1425 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1426 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1427 .unwrap_or_else(|e| {
1428 panic!(
1429 "create_sql cannot be invalid: `{}` --- error: `{}`",
1430 mview.create_sql, e
1431 )
1432 })
1433 .into_element()
1434 .ast;
1435 let query_string = match &create_stmt {
1436 Statement::CreateMaterializedView(stmt) => {
1437 let mut query_string = stmt.query.to_ast_string_stable();
1438 query_string.push(';');
1441 query_string
1442 }
1443 _ => unreachable!(),
1444 };
1445
1446 let mut updates = Vec::new();
1447
1448 updates.push(BuiltinTableUpdate::row(
1449 &*MZ_MATERIALIZED_VIEWS,
1450 Row::pack_slice(&[
1451 Datum::String(&id.to_string()),
1452 Datum::UInt32(oid),
1453 Datum::String(&schema_id.to_string()),
1454 Datum::String(name),
1455 Datum::String(&mview.cluster_id.to_string()),
1456 Datum::String(&query_string),
1457 Datum::String(&owner_id.to_string()),
1458 privileges,
1459 Datum::String(&mview.create_sql),
1460 Datum::String(&create_stmt.to_ast_string_redacted()),
1461 ]),
1462 diff,
1463 ));
1464
1465 if let Some(refresh_schedule) = &mview.refresh_schedule {
1466 assert!(!refresh_schedule.is_empty());
1469 for RefreshEvery {
1470 interval,
1471 aligned_to,
1472 } in refresh_schedule.everies.iter()
1473 {
1474 let aligned_to_dt = mz_ore::now::to_datetime(
1475 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1476 );
1477 updates.push(BuiltinTableUpdate::row(
1478 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1479 Row::pack_slice(&[
1480 Datum::String(&id.to_string()),
1481 Datum::String("every"),
1482 Datum::Interval(
1483 Interval::from_duration(interval).expect(
1484 "planning ensured that this is convertible back to Interval",
1485 ),
1486 ),
1487 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1488 Datum::Null,
1489 ]),
1490 diff,
1491 ));
1492 }
1493 for at in refresh_schedule.ats.iter() {
1494 let at_dt = mz_ore::now::to_datetime(
1495 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1496 );
1497 updates.push(BuiltinTableUpdate::row(
1498 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1499 Row::pack_slice(&[
1500 Datum::String(&id.to_string()),
1501 Datum::String("at"),
1502 Datum::Null,
1503 Datum::Null,
1504 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1505 ]),
1506 diff,
1507 ));
1508 }
1509 } else {
1510 updates.push(BuiltinTableUpdate::row(
1511 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1512 Row::pack_slice(&[
1513 Datum::String(&id.to_string()),
1514 Datum::String("on-commit"),
1515 Datum::Null,
1516 Datum::Null,
1517 Datum::Null,
1518 ]),
1519 diff,
1520 ));
1521 }
1522
1523 updates
1524 }
1525
1526 fn pack_continual_task_update(
1527 &self,
1528 id: CatalogItemId,
1529 oid: u32,
1530 schema_id: &SchemaSpecifier,
1531 name: &str,
1532 owner_id: &RoleId,
1533 privileges: Datum,
1534 ct: &ContinualTask,
1535 diff: Diff,
1536 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1537 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1538 .unwrap_or_else(|e| {
1539 panic!(
1540 "create_sql cannot be invalid: `{}` --- error: `{}`",
1541 ct.create_sql, e
1542 )
1543 })
1544 .into_element()
1545 .ast;
1546 let query_string = match &create_stmt {
1547 Statement::CreateContinualTask(stmt) => {
1548 let mut query_string = String::new();
1549 for stmt in &stmt.stmts {
1550 let s = match stmt {
1551 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1552 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1553 };
1554 if query_string.is_empty() {
1555 query_string = s;
1556 } else {
1557 query_string.push_str("; ");
1558 query_string.push_str(&s);
1559 }
1560 }
1561 query_string
1562 }
1563 _ => unreachable!(),
1564 };
1565
1566 vec![BuiltinTableUpdate::row(
1567 &*MZ_CONTINUAL_TASKS,
1568 Row::pack_slice(&[
1569 Datum::String(&id.to_string()),
1570 Datum::UInt32(oid),
1571 Datum::String(&schema_id.to_string()),
1572 Datum::String(name),
1573 Datum::String(&ct.cluster_id.to_string()),
1574 Datum::String(&query_string),
1575 Datum::String(&owner_id.to_string()),
1576 privileges,
1577 Datum::String(&ct.create_sql),
1578 Datum::String(&create_stmt.to_ast_string_redacted()),
1579 ]),
1580 diff,
1581 )]
1582 }
1583
1584 fn pack_sink_update(
1585 &self,
1586 id: CatalogItemId,
1587 oid: u32,
1588 schema_id: &SchemaSpecifier,
1589 name: &str,
1590 owner_id: &RoleId,
1591 sink: &Sink,
1592 diff: Diff,
1593 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1594 let mut updates = vec![];
1595 match &sink.connection {
1596 StorageSinkConnection::Kafka(KafkaSinkConnection {
1597 topic: topic_name, ..
1598 }) => {
1599 updates.push(BuiltinTableUpdate::row(
1600 &*MZ_KAFKA_SINKS,
1601 Row::pack_slice(&[
1602 Datum::String(&id.to_string()),
1603 Datum::String(topic_name.as_str()),
1604 ]),
1605 diff,
1606 ));
1607 }
1608 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1609 namespace, table, ..
1610 }) => {
1611 updates.push(BuiltinTableUpdate::row(
1612 &*MZ_ICEBERG_SINKS,
1613 Row::pack_slice(&[
1614 Datum::String(&id.to_string()),
1615 Datum::String(namespace.as_str()),
1616 Datum::String(table.as_str()),
1617 ]),
1618 diff,
1619 ));
1620 }
1621 };
1622
1623 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1624 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1625 .into_element()
1626 .ast;
1627
1628 let envelope = sink.envelope();
1629
1630 let combined_format = sink.combined_format();
1632 let (key_format, value_format) = match sink.formats() {
1633 Some((key_format, value_format)) => (key_format, Some(value_format)),
1634 None => (None, None),
1635 };
1636
1637 updates.push(BuiltinTableUpdate::row(
1638 &*MZ_SINKS,
1639 Row::pack_slice(&[
1640 Datum::String(&id.to_string()),
1641 Datum::UInt32(oid),
1642 Datum::String(&schema_id.to_string()),
1643 Datum::String(name),
1644 Datum::String(sink.connection.name()),
1645 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1646 Datum::Null,
1648 Datum::from(envelope),
1649 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1652 Datum::from(key_format),
1653 Datum::from(value_format),
1654 Datum::String(&sink.cluster_id.to_string()),
1655 Datum::String(&owner_id.to_string()),
1656 Datum::String(&sink.create_sql),
1657 Datum::String(&create_stmt.to_ast_string_redacted()),
1658 ]),
1659 diff,
1660 ));
1661
1662 updates
1663 }
1664
1665 fn pack_index_update(
1666 &self,
1667 id: CatalogItemId,
1668 oid: u32,
1669 name: &str,
1670 owner_id: &RoleId,
1671 index: &Index,
1672 diff: Diff,
1673 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1674 let mut updates = vec![];
1675
1676 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1677 .unwrap_or_else(|e| {
1678 panic!(
1679 "create_sql cannot be invalid: `{}` --- error: `{}`",
1680 index.create_sql, e
1681 )
1682 })
1683 .into_element()
1684 .ast;
1685
1686 let key_sqls = match &create_stmt {
1687 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1688 .as_ref()
1689 .expect("key_parts is filled in during planning"),
1690 _ => unreachable!(),
1691 };
1692 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1693
1694 updates.push(BuiltinTableUpdate::row(
1695 &*MZ_INDEXES,
1696 Row::pack_slice(&[
1697 Datum::String(&id.to_string()),
1698 Datum::UInt32(oid),
1699 Datum::String(name),
1700 Datum::String(&on_item_id.to_string()),
1701 Datum::String(&index.cluster_id.to_string()),
1702 Datum::String(&owner_id.to_string()),
1703 Datum::String(&index.create_sql),
1704 Datum::String(&create_stmt.to_ast_string_redacted()),
1705 ]),
1706 diff,
1707 ));
1708
1709 for (i, key) in index.keys.iter().enumerate() {
1710 let on_entry = self.get_entry_by_global_id(&index.on);
1711 let nullable = key
1712 .typ(
1713 &on_entry
1714 .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1715 .expect("can only create indexes on items with a valid description")
1716 .typ()
1717 .column_types,
1718 )
1719 .nullable;
1720 let seq_in_index = u64::cast_from(i + 1);
1721 let key_sql = key_sqls
1722 .get(i)
1723 .expect("missing sql information for index key")
1724 .to_ast_string_simple();
1725 let (field_number, expression) = match key {
1726 MirScalarExpr::Column(col, _) => {
1727 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1728 }
1729 _ => (Datum::Null, Datum::String(&key_sql)),
1730 };
1731 updates.push(BuiltinTableUpdate::row(
1732 &*MZ_INDEX_COLUMNS,
1733 Row::pack_slice(&[
1734 Datum::String(&id.to_string()),
1735 Datum::UInt64(seq_in_index),
1736 field_number,
1737 expression,
1738 Datum::from(nullable),
1739 ]),
1740 diff,
1741 ));
1742 }
1743
1744 updates
1745 }
1746
1747 fn pack_type_update(
1748 &self,
1749 id: CatalogItemId,
1750 oid: u32,
1751 schema_id: &SchemaSpecifier,
1752 name: &str,
1753 owner_id: &RoleId,
1754 privileges: Datum,
1755 typ: &Type,
1756 diff: Diff,
1757 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1758 let mut out = vec![];
1759
1760 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1761 mz_sql::parse::parse(create_sql)
1762 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1763 .into_element()
1764 .ast
1765 .to_ast_string_redacted()
1766 });
1767
1768 out.push(BuiltinTableUpdate::row(
1769 &*MZ_TYPES,
1770 Row::pack_slice(&[
1771 Datum::String(&id.to_string()),
1772 Datum::UInt32(oid),
1773 Datum::String(&schema_id.to_string()),
1774 Datum::String(name),
1775 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1776 Datum::String(&owner_id.to_string()),
1777 privileges,
1778 if let Some(create_sql) = &typ.create_sql {
1779 Datum::String(create_sql)
1780 } else {
1781 Datum::Null
1782 },
1783 if let Some(redacted) = &redacted {
1784 Datum::String(redacted)
1785 } else {
1786 Datum::Null
1787 },
1788 ]),
1789 diff,
1790 ));
1791
1792 let mut row = Row::default();
1793 let mut packer = row.packer();
1794
1795 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1796 if mods.is_empty() {
1797 packer.push(Datum::Null);
1798 } else {
1799 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1800 }
1801 }
1802
1803 let index_id = match &typ.details.typ {
1804 CatalogType::Array {
1805 element_reference: element_id,
1806 } => {
1807 packer.push(Datum::String(&id.to_string()));
1808 packer.push(Datum::String(&element_id.to_string()));
1809 &MZ_ARRAY_TYPES
1810 }
1811 CatalogType::List {
1812 element_reference: element_id,
1813 element_modifiers,
1814 } => {
1815 packer.push(Datum::String(&id.to_string()));
1816 packer.push(Datum::String(&element_id.to_string()));
1817 append_modifier(&mut packer, element_modifiers);
1818 &MZ_LIST_TYPES
1819 }
1820 CatalogType::Map {
1821 key_reference: key_id,
1822 value_reference: value_id,
1823 key_modifiers,
1824 value_modifiers,
1825 } => {
1826 packer.push(Datum::String(&id.to_string()));
1827 packer.push(Datum::String(&key_id.to_string()));
1828 packer.push(Datum::String(&value_id.to_string()));
1829 append_modifier(&mut packer, key_modifiers);
1830 append_modifier(&mut packer, value_modifiers);
1831 &MZ_MAP_TYPES
1832 }
1833 CatalogType::Pseudo => {
1834 packer.push(Datum::String(&id.to_string()));
1835 &MZ_PSEUDO_TYPES
1836 }
1837 _ => {
1838 packer.push(Datum::String(&id.to_string()));
1839 &MZ_BASE_TYPES
1840 }
1841 };
1842 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1843
1844 if let Some(pg_metadata) = &typ.details.pg_metadata {
1845 out.push(BuiltinTableUpdate::row(
1846 &*MZ_TYPE_PG_METADATA,
1847 Row::pack_slice(&[
1848 Datum::String(&id.to_string()),
1849 Datum::UInt32(pg_metadata.typinput_oid),
1850 Datum::UInt32(pg_metadata.typreceive_oid),
1851 ]),
1852 diff,
1853 ));
1854 }
1855
1856 out
1857 }
1858
1859 fn pack_func_update(
1860 &self,
1861 id: CatalogItemId,
1862 schema_id: &SchemaSpecifier,
1863 name: &str,
1864 owner_id: &RoleId,
1865 func: &Func,
1866 diff: Diff,
1867 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1868 let mut updates = vec![];
1869 for func_impl_details in func.inner.func_impls() {
1870 let arg_type_ids = func_impl_details
1871 .arg_typs
1872 .iter()
1873 .map(|typ| self.get_system_type(typ).id().to_string())
1874 .collect::<Vec<_>>();
1875
1876 let mut row = Row::default();
1877 row.packer()
1878 .try_push_array(
1879 &[ArrayDimension {
1880 lower_bound: 1,
1881 length: arg_type_ids.len(),
1882 }],
1883 arg_type_ids.iter().map(|id| Datum::String(id)),
1884 )
1885 .expect(
1886 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1887 );
1888 let arg_type_ids = row.unpack_first();
1889
1890 updates.push(BuiltinTableUpdate::row(
1891 &*MZ_FUNCTIONS,
1892 Row::pack_slice(&[
1893 Datum::String(&id.to_string()),
1894 Datum::UInt32(func_impl_details.oid),
1895 Datum::String(&schema_id.to_string()),
1896 Datum::String(name),
1897 arg_type_ids,
1898 Datum::from(
1899 func_impl_details
1900 .variadic_typ
1901 .map(|typ| self.get_system_type(typ).id().to_string())
1902 .as_deref(),
1903 ),
1904 Datum::from(
1905 func_impl_details
1906 .return_typ
1907 .map(|typ| self.get_system_type(typ).id().to_string())
1908 .as_deref(),
1909 ),
1910 func_impl_details.return_is_set.into(),
1911 Datum::String(&owner_id.to_string()),
1912 ]),
1913 diff,
1914 ));
1915
1916 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1917 updates.push(BuiltinTableUpdate::row(
1918 &*MZ_AGGREGATES,
1919 Row::pack_slice(&[
1920 Datum::UInt32(func_impl_details.oid),
1921 Datum::String("n"),
1923 Datum::Int16(0),
1924 ]),
1925 diff,
1926 ));
1927 }
1928 }
1929 updates
1930 }
1931
1932 pub fn pack_op_update(
1933 &self,
1934 operator: &str,
1935 func_impl_details: FuncImplCatalogDetails,
1936 diff: Diff,
1937 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1938 let arg_type_ids = func_impl_details
1939 .arg_typs
1940 .iter()
1941 .map(|typ| self.get_system_type(typ).id().to_string())
1942 .collect::<Vec<_>>();
1943
1944 let mut row = Row::default();
1945 row.packer()
1946 .try_push_array(
1947 &[ArrayDimension {
1948 lower_bound: 1,
1949 length: arg_type_ids.len(),
1950 }],
1951 arg_type_ids.iter().map(|id| Datum::String(id)),
1952 )
1953 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1954 let arg_type_ids = row.unpack_first();
1955
1956 BuiltinTableUpdate::row(
1957 &*MZ_OPERATORS,
1958 Row::pack_slice(&[
1959 Datum::UInt32(func_impl_details.oid),
1960 Datum::String(operator),
1961 arg_type_ids,
1962 Datum::from(
1963 func_impl_details
1964 .return_typ
1965 .map(|typ| self.get_system_type(typ).id().to_string())
1966 .as_deref(),
1967 ),
1968 ]),
1969 diff,
1970 )
1971 }
1972
1973 fn pack_secret_update(
1974 &self,
1975 id: CatalogItemId,
1976 oid: u32,
1977 schema_id: &SchemaSpecifier,
1978 name: &str,
1979 owner_id: &RoleId,
1980 privileges: Datum,
1981 diff: Diff,
1982 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1983 vec![BuiltinTableUpdate::row(
1984 &*MZ_SECRETS,
1985 Row::pack_slice(&[
1986 Datum::String(&id.to_string()),
1987 Datum::UInt32(oid),
1988 Datum::String(&schema_id.to_string()),
1989 Datum::String(name),
1990 Datum::String(&owner_id.to_string()),
1991 privileges,
1992 ]),
1993 diff,
1994 )]
1995 }
1996
1997 pub fn pack_audit_log_update(
1998 &self,
1999 event: &VersionedEvent,
2000 diff: Diff,
2001 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2002 let (event_type, object_type, details, user, occurred_at): (
2003 &EventType,
2004 &ObjectType,
2005 &EventDetails,
2006 &Option<String>,
2007 u64,
2008 ) = match event {
2009 VersionedEvent::V1(ev) => (
2010 &ev.event_type,
2011 &ev.object_type,
2012 &ev.details,
2013 &ev.user,
2014 ev.occurred_at,
2015 ),
2016 };
2017 let details = Jsonb::from_serde_json(details.as_json())
2018 .map_err(|e| {
2019 Error::new(ErrorKind::Unstructured(format!(
2020 "could not pack audit log update: {}",
2021 e
2022 )))
2023 })?
2024 .into_row();
2025 let details = details
2026 .iter()
2027 .next()
2028 .expect("details created above with a single jsonb column");
2029 let dt = mz_ore::now::to_datetime(occurred_at);
2030 let id = event.sortable_id();
2031 Ok(BuiltinTableUpdate::row(
2032 &*MZ_AUDIT_EVENTS,
2033 Row::pack_slice(&[
2034 Datum::UInt64(id),
2035 Datum::String(&format!("{}", event_type)),
2036 Datum::String(&format!("{}", object_type)),
2037 details,
2038 match user {
2039 Some(user) => Datum::String(user),
2040 None => Datum::Null,
2041 },
2042 Datum::TimestampTz(dt.try_into().expect("must fit")),
2043 ]),
2044 diff,
2045 ))
2046 }
2047
2048 pub fn pack_storage_usage_update(
2049 &self,
2050 VersionedStorageUsage::V1(event): VersionedStorageUsage,
2051 diff: Diff,
2052 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2053 let id = &MZ_STORAGE_USAGE_BY_SHARD;
2054 let row = Row::pack_slice(&[
2055 Datum::UInt64(event.id),
2056 Datum::from(event.shard_id.as_deref()),
2057 Datum::UInt64(event.size_bytes),
2058 Datum::TimestampTz(
2059 mz_ore::now::to_datetime(event.collection_timestamp)
2060 .try_into()
2061 .expect("must fit"),
2062 ),
2063 ]);
2064 BuiltinTableUpdate::row(id, row, diff)
2065 }
2066
2067 pub fn pack_egress_ip_update(
2068 &self,
2069 ip: &IpNet,
2070 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2071 let id = &MZ_EGRESS_IPS;
2072 let addr = ip.addr();
2073 let row = Row::pack_slice(&[
2074 Datum::String(&addr.to_string()),
2075 Datum::Int32(ip.prefix_len().into()),
2076 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2077 ]);
2078 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2079 }
2080
2081 pub fn pack_license_key_update(
2082 &self,
2083 license_key: &ValidatedLicenseKey,
2084 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2085 let id = &MZ_LICENSE_KEYS;
2086 let row = Row::pack_slice(&[
2087 Datum::String(&license_key.id),
2088 Datum::String(&license_key.organization),
2089 Datum::String(&license_key.environment_id),
2090 Datum::TimestampTz(
2091 mz_ore::now::to_datetime(license_key.expiration * 1000)
2092 .try_into()
2093 .expect("must fit"),
2094 ),
2095 Datum::TimestampTz(
2096 mz_ore::now::to_datetime(license_key.not_before * 1000)
2097 .try_into()
2098 .expect("must fit"),
2099 ),
2100 ]);
2101 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2102 }
2103
2104 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2105 let mut updates = Vec::new();
2106 for (size, alloc) in &self.cluster_replica_sizes.0 {
2107 if alloc.disabled {
2108 continue;
2109 }
2110
2111 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2114 let MemoryLimit(ByteSize(memory_bytes)) =
2115 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2116 let DiskLimit(ByteSize(disk_bytes)) =
2117 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2118
2119 let row = Row::pack_slice(&[
2120 size.as_str().into(),
2121 u64::from(alloc.scale).into(),
2122 u64::cast_from(alloc.workers).into(),
2123 cpu_limit.as_nanocpus().into(),
2124 memory_bytes.into(),
2125 disk_bytes.into(),
2126 (alloc.credits_per_hour).into(),
2127 ]);
2128
2129 updates.push(BuiltinTableUpdate::row(
2130 &*MZ_CLUSTER_REPLICA_SIZES,
2131 row,
2132 Diff::ONE,
2133 ));
2134 }
2135
2136 updates
2137 }
2138
2139 pub fn pack_subscribe_update(
2140 &self,
2141 id: GlobalId,
2142 subscribe: &ActiveSubscribe,
2143 diff: Diff,
2144 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2145 let mut row = Row::default();
2146 let mut packer = row.packer();
2147 packer.push(Datum::String(&id.to_string()));
2148 packer.push(Datum::Uuid(subscribe.session_uuid));
2149 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2150
2151 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2152 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2153
2154 let depends_on: Vec<_> = subscribe
2155 .depends_on
2156 .iter()
2157 .map(|id| id.to_string())
2158 .collect();
2159 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2160
2161 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2162 }
2163
2164 pub fn pack_session_update(
2165 &self,
2166 conn: &ConnMeta,
2167 diff: Diff,
2168 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2169 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2170 BuiltinTableUpdate::row(
2171 &*MZ_SESSIONS,
2172 Row::pack_slice(&[
2173 Datum::Uuid(conn.uuid()),
2174 Datum::UInt32(conn.conn_id().unhandled()),
2175 Datum::String(&conn.authenticated_role_id().to_string()),
2176 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2177 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2178 ]),
2179 diff,
2180 )
2181 }
2182
2183 pub fn pack_default_privileges_update(
2184 &self,
2185 default_privilege_object: &DefaultPrivilegeObject,
2186 grantee: &RoleId,
2187 acl_mode: &AclMode,
2188 diff: Diff,
2189 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2190 BuiltinTableUpdate::row(
2191 &*MZ_DEFAULT_PRIVILEGES,
2192 Row::pack_slice(&[
2193 default_privilege_object.role_id.to_string().as_str().into(),
2194 default_privilege_object
2195 .database_id
2196 .map(|database_id| database_id.to_string())
2197 .as_deref()
2198 .into(),
2199 default_privilege_object
2200 .schema_id
2201 .map(|schema_id| schema_id.to_string())
2202 .as_deref()
2203 .into(),
2204 default_privilege_object
2205 .object_type
2206 .to_string()
2207 .to_lowercase()
2208 .as_str()
2209 .into(),
2210 grantee.to_string().as_str().into(),
2211 acl_mode.to_string().as_str().into(),
2212 ]),
2213 diff,
2214 )
2215 }
2216
2217 pub fn pack_system_privileges_update(
2218 &self,
2219 privileges: MzAclItem,
2220 diff: Diff,
2221 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2222 BuiltinTableUpdate::row(
2223 &*MZ_SYSTEM_PRIVILEGES,
2224 Row::pack_slice(&[privileges.into()]),
2225 diff,
2226 )
2227 }
2228
2229 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2230 let mut row = Row::default();
2231 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2232 row.packer()
2233 .try_push_array(
2234 &[ArrayDimension {
2235 lower_bound: 1,
2236 length: flat_privileges.len(),
2237 }],
2238 flat_privileges
2239 .into_iter()
2240 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2241 )
2242 .expect("privileges is 1 dimensional, and its length is used for the array length");
2243 row
2244 }
2245
2246 pub fn pack_comment_update(
2247 &self,
2248 object_id: CommentObjectId,
2249 column_pos: Option<usize>,
2250 comment: &str,
2251 diff: Diff,
2252 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2253 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2255 let audit_type = super::object_type_to_audit_object_type(object_type);
2256 let object_type_str = audit_type.to_string();
2257
2258 let object_id_str = match object_id {
2259 CommentObjectId::Table(global_id)
2260 | CommentObjectId::View(global_id)
2261 | CommentObjectId::MaterializedView(global_id)
2262 | CommentObjectId::Source(global_id)
2263 | CommentObjectId::Sink(global_id)
2264 | CommentObjectId::Index(global_id)
2265 | CommentObjectId::Func(global_id)
2266 | CommentObjectId::Connection(global_id)
2267 | CommentObjectId::Secret(global_id)
2268 | CommentObjectId::Type(global_id)
2269 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2270 CommentObjectId::Role(role_id) => role_id.to_string(),
2271 CommentObjectId::Database(database_id) => database_id.to_string(),
2272 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2273 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2274 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2275 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2276 };
2277 let column_pos_datum = match column_pos {
2278 Some(pos) => {
2279 let pos =
2281 i32::try_from(pos).expect("we constrain this value in the planning layer");
2282 Datum::Int32(pos)
2283 }
2284 None => Datum::Null,
2285 };
2286
2287 BuiltinTableUpdate::row(
2288 &*MZ_COMMENTS,
2289 Row::pack_slice(&[
2290 Datum::String(&object_id_str),
2291 Datum::String(&object_type_str),
2292 column_pos_datum,
2293 Datum::String(comment),
2294 ]),
2295 diff,
2296 )
2297 }
2298
2299 pub fn pack_webhook_source_update(
2300 &self,
2301 item_id: CatalogItemId,
2302 diff: Diff,
2303 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2304 let url = self
2305 .try_get_webhook_url(&item_id)
2306 .expect("webhook source should exist");
2307 let url = url.to_string();
2308 let name = &self.get_entry(&item_id).name().item;
2309 let id_str = item_id.to_string();
2310
2311 BuiltinTableUpdate::row(
2312 &*MZ_WEBHOOKS_SOURCES,
2313 Row::pack_slice(&[
2314 Datum::String(&id_str),
2315 Datum::String(name),
2316 Datum::String(&url),
2317 ]),
2318 diff,
2319 )
2320 }
2321
2322 pub fn pack_source_references_update(
2323 &self,
2324 source_references: &SourceReferences,
2325 diff: Diff,
2326 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2327 let source_id = source_references.source_id.to_string();
2328 let updated_at = &source_references.updated_at;
2329 source_references
2330 .references
2331 .iter()
2332 .map(|reference| {
2333 let mut row = Row::default();
2334 let mut packer = row.packer();
2335 packer.extend([
2336 Datum::String(&source_id),
2337 reference
2338 .namespace
2339 .as_ref()
2340 .map(|s| Datum::String(s))
2341 .unwrap_or(Datum::Null),
2342 Datum::String(&reference.name),
2343 Datum::TimestampTz(
2344 mz_ore::now::to_datetime(*updated_at)
2345 .try_into()
2346 .expect("must fit"),
2347 ),
2348 ]);
2349 if reference.columns.len() > 0 {
2350 packer
2351 .try_push_array(
2352 &[ArrayDimension {
2353 lower_bound: 1,
2354 length: reference.columns.len(),
2355 }],
2356 reference.columns.iter().map(|col| Datum::String(col)),
2357 )
2358 .expect(
2359 "columns is 1 dimensional, and its length is used for the array length",
2360 );
2361 } else {
2362 packer.push(Datum::Null);
2363 }
2364
2365 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2366 })
2367 .collect()
2368 }
2369}