1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20 MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
21 MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
22 MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEX_COLUMNS, MZ_INDEXES,
23 MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
24 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
25 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
26 MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
27 MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
28 MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS,
29 MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
30 MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
31 MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
32};
33use mz_catalog::config::AwsPrincipalContext;
34use mz_catalog::durable::SourceReferences;
35use mz_catalog::memory::error::{Error, ErrorKind};
36use mz_catalog::memory::objects::{
37 CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
38 MaterializedView, Sink, Table, TableDataSource, Type, View,
39};
40use mz_controller::clusters::{
41 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
42};
43use mz_controller_types::ClusterId;
44use mz_expr::MirScalarExpr;
45use mz_license_keys::ValidatedLicenseKey;
46use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
47use mz_ore::cast::CastFrom;
48use mz_ore::collections::CollectionExt;
49use mz_persist_client::batch::ProtoBatch;
50use mz_repr::adt::array::ArrayDimension;
51use mz_repr::adt::interval::Interval;
52use mz_repr::adt::jsonb::Jsonb;
53use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
54use mz_repr::network_policy_id::NetworkPolicyId;
55use mz_repr::refresh_schedule::RefreshEvery;
56use mz_repr::role_id::RoleId;
57use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
58use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
59use mz_sql::catalog::{
60 CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
61 TypeCategory,
62};
63use mz_sql::func::FuncImplCatalogDetails;
64use mz_sql::names::{
65 CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
66};
67use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
68use mz_sql::session::user::SYSTEM_USER;
69use mz_sql::session::vars::SessionVars;
70use mz_sql_parser::ast::display::AstDisplay;
71use mz_storage_client::client::TableData;
72use mz_storage_types::connections::KafkaConnection;
73use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
74use mz_storage_types::connections::inline::ReferencedConnection;
75use mz_storage_types::connections::string_or_secret::StringOrSecret;
76use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
77use mz_storage_types::sources::{
78 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
79};
80use smallvec::smallvec;
81
82use crate::active_compute_sink::ActiveSubscribe;
84use crate::catalog::CatalogState;
85use crate::coord::ConnMeta;
86
87#[derive(Debug, Clone)]
89pub struct BuiltinTableUpdate<T = CatalogItemId> {
90 pub id: T,
92 pub data: TableData,
94}
95
96impl<T> BuiltinTableUpdate<T> {
97 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
99 BuiltinTableUpdate {
100 id,
101 data: TableData::Rows(vec![(row, diff)]),
102 }
103 }
104
105 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
106 BuiltinTableUpdate {
107 id,
108 data: TableData::Batches(smallvec![batch]),
109 }
110 }
111}
112
113impl CatalogState {
114 pub fn resolve_builtin_table_updates(
115 &self,
116 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
117 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
118 builtin_table_update
119 .into_iter()
120 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
121 .collect()
122 }
123
124 pub fn resolve_builtin_table_update(
125 &self,
126 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
127 ) -> BuiltinTableUpdate<CatalogItemId> {
128 let id = self.resolve_builtin_table(id);
129 BuiltinTableUpdate { id, data }
130 }
131
132 pub fn pack_depends_update(
133 &self,
134 depender: CatalogItemId,
135 dependee: CatalogItemId,
136 diff: Diff,
137 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
138 let row = Row::pack_slice(&[
139 Datum::String(&depender.to_string()),
140 Datum::String(&dependee.to_string()),
141 ]);
142 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
143 }
144
145 pub(super) fn pack_database_update(
146 &self,
147 database_id: &DatabaseId,
148 diff: Diff,
149 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
150 let database = self.get_database(database_id);
151 let row = self.pack_privilege_array_row(database.privileges());
152 let privileges = row.unpack_first();
153 BuiltinTableUpdate::row(
154 &*MZ_DATABASES,
155 Row::pack_slice(&[
156 Datum::String(&database.id.to_string()),
157 Datum::UInt32(database.oid),
158 Datum::String(database.name()),
159 Datum::String(&database.owner_id.to_string()),
160 privileges,
161 ]),
162 diff,
163 )
164 }
165
166 pub(super) fn pack_schema_update(
167 &self,
168 database_spec: &ResolvedDatabaseSpecifier,
169 schema_id: &SchemaId,
170 diff: Diff,
171 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
172 let (database_id, schema) = match database_spec {
173 ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
174 ResolvedDatabaseSpecifier::Id(id) => (
175 Some(id.to_string()),
176 &self.database_by_id[id].schemas_by_id[schema_id],
177 ),
178 };
179 let row = self.pack_privilege_array_row(schema.privileges());
180 let privileges = row.unpack_first();
181 BuiltinTableUpdate::row(
182 &*MZ_SCHEMAS,
183 Row::pack_slice(&[
184 Datum::String(&schema_id.to_string()),
185 Datum::UInt32(schema.oid),
186 Datum::from(database_id.as_deref()),
187 Datum::String(&schema.name.schema),
188 Datum::String(&schema.owner_id.to_string()),
189 privileges,
190 ]),
191 diff,
192 )
193 }
194
195 pub(super) fn pack_role_update(
196 &self,
197 id: RoleId,
198 diff: Diff,
199 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
200 match id {
201 RoleId::Public => vec![],
203 id => {
204 let role = self.get_role(&id);
205 let role_update = BuiltinTableUpdate::row(
206 &*MZ_ROLES,
207 Row::pack_slice(&[
208 Datum::String(&role.id.to_string()),
209 Datum::UInt32(role.oid),
210 Datum::String(&role.name),
211 Datum::from(role.attributes.inherit),
212 ]),
213 diff,
214 );
215 let mut updates = vec![role_update];
216
217 let session_vars_reference = SessionVars::new_unchecked(
220 &mz_build_info::DUMMY_BUILD_INFO,
221 SYSTEM_USER.clone(),
222 None,
223 );
224
225 for (name, val) in role.vars() {
226 let result = session_vars_reference
227 .inspect(name)
228 .and_then(|var| var.check(val.borrow()));
229 let Ok(formatted_val) = result else {
230 tracing::error!(?name, ?val, ?result, "found invalid role default var");
233 continue;
234 };
235
236 let role_var_update = BuiltinTableUpdate::row(
237 &*MZ_ROLE_PARAMETERS,
238 Row::pack_slice(&[
239 Datum::String(&role.id.to_string()),
240 Datum::String(name),
241 Datum::String(&formatted_val),
242 ]),
243 diff,
244 );
245 updates.push(role_var_update);
246 }
247
248 updates
249 }
250 }
251 }
252
253 pub(super) fn pack_role_members_update(
254 &self,
255 role_id: RoleId,
256 member_id: RoleId,
257 diff: Diff,
258 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
259 let grantor_id = self
260 .get_role(&member_id)
261 .membership
262 .map
263 .get(&role_id)
264 .expect("catalog out of sync");
265 BuiltinTableUpdate::row(
266 &*MZ_ROLE_MEMBERS,
267 Row::pack_slice(&[
268 Datum::String(&role_id.to_string()),
269 Datum::String(&member_id.to_string()),
270 Datum::String(&grantor_id.to_string()),
271 ]),
272 diff,
273 )
274 }
275
276 pub(super) fn pack_cluster_update(
277 &self,
278 name: &str,
279 diff: Diff,
280 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
281 let id = self.clusters_by_name[name];
282 let cluster = &self.clusters_by_id[&id];
283 let row = self.pack_privilege_array_row(cluster.privileges());
284 let privileges = row.unpack_first();
285 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
286 match &cluster.config.variant {
287 ClusterVariant::Managed(config) => (
288 Some(config.size.as_str()),
289 Some(self.cluster_replica_size_has_disk(&config.size)),
290 Some(config.replication_factor),
291 if config.availability_zones.is_empty() {
292 None
293 } else {
294 Some(config.availability_zones.clone())
295 },
296 Some(config.logging.log_logging),
297 config.logging.interval.map(|d| {
298 Interval::from_duration(&d)
299 .expect("planning ensured this convertible back to interval")
300 }),
301 ),
302 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
303 };
304
305 let mut row = Row::default();
306 let mut packer = row.packer();
307 packer.extend([
308 Datum::String(&id.to_string()),
309 Datum::String(name),
310 Datum::String(&cluster.owner_id.to_string()),
311 privileges,
312 cluster.is_managed().into(),
313 size.into(),
314 replication_factor.into(),
315 disk.into(),
316 ]);
317 if let Some(azs) = azs {
318 packer.push_list(azs.iter().map(|az| Datum::String(az)));
319 } else {
320 packer.push(Datum::Null);
321 }
322 packer.push(Datum::from(introspection_debugging));
323 packer.push(Datum::from(introspection_interval));
324
325 let mut updates = Vec::new();
326
327 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
328
329 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
330 let row = match managed_config.schedule {
331 ClusterSchedule::Manual => Row::pack_slice(&[
332 Datum::String(&id.to_string()),
333 Datum::String("manual"),
334 Datum::Null,
335 ]),
336 ClusterSchedule::Refresh {
337 hydration_time_estimate,
338 } => Row::pack_slice(&[
339 Datum::String(&id.to_string()),
340 Datum::String("on-refresh"),
341 Datum::Interval(
342 Interval::from_duration(&hydration_time_estimate)
343 .expect("planning ensured that this is convertible back to Interval"),
344 ),
345 ]),
346 };
347 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
348 }
349
350 updates.push(BuiltinTableUpdate::row(
351 &*MZ_CLUSTER_WORKLOAD_CLASSES,
352 Row::pack_slice(&[
353 Datum::String(&id.to_string()),
354 Datum::from(cluster.config.workload_class.as_deref()),
355 ]),
356 diff,
357 ));
358
359 updates
360 }
361
362 pub(super) fn pack_cluster_replica_update(
363 &self,
364 cluster_id: ClusterId,
365 name: &str,
366 diff: Diff,
367 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
368 let cluster = &self.clusters_by_id[&cluster_id];
369 let id = cluster.replica_id(name).expect("Must exist");
370 let replica = cluster.replica(id).expect("Must exist");
371
372 let (size, disk, az, internal, pending) = match &replica.config.location {
373 ReplicaLocation::Managed(ManagedReplicaLocation {
376 size,
377 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
378 allocation: _,
379 billed_as: _,
380 internal,
381 pending,
382 }) => (
383 Some(&**size),
384 Some(self.cluster_replica_size_has_disk(size)),
385 Some(az.as_str()),
386 *internal,
387 *pending,
388 ),
389 ReplicaLocation::Managed(ManagedReplicaLocation {
390 size,
391 availability_zones: _,
392 allocation: _,
393 billed_as: _,
394 internal,
395 pending,
396 }) => (
397 Some(&**size),
398 Some(self.cluster_replica_size_has_disk(size)),
399 None,
400 *internal,
401 *pending,
402 ),
403 _ => (None, None, None, false, false),
404 };
405
406 let cluster_replica_update = BuiltinTableUpdate::row(
407 &*MZ_CLUSTER_REPLICAS,
408 Row::pack_slice(&[
409 Datum::String(&id.to_string()),
410 Datum::String(name),
411 Datum::String(&cluster_id.to_string()),
412 Datum::from(size),
413 Datum::from(az),
414 Datum::String(&replica.owner_id.to_string()),
415 Datum::from(disk),
416 ]),
417 diff,
418 );
419
420 let mut updates = vec![cluster_replica_update];
421
422 if internal {
423 let update = BuiltinTableUpdate::row(
424 &*MZ_INTERNAL_CLUSTER_REPLICAS,
425 Row::pack_slice(&[Datum::String(&id.to_string())]),
426 diff,
427 );
428 updates.push(update);
429 }
430
431 if pending {
432 let update = BuiltinTableUpdate::row(
433 &*MZ_PENDING_CLUSTER_REPLICAS,
434 Row::pack_slice(&[Datum::String(&id.to_string())]),
435 diff,
436 );
437 updates.push(update);
438 }
439
440 updates
441 }
442
443 pub(crate) fn pack_network_policy_update(
444 &self,
445 policy_id: &NetworkPolicyId,
446 diff: Diff,
447 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
448 let policy = self.get_network_policy(policy_id);
449 let row = self.pack_privilege_array_row(&policy.privileges);
450 let privileges = row.unpack_first();
451 let mut updates = Vec::new();
452 for ref rule in policy.rules.clone() {
453 updates.push(BuiltinTableUpdate::row(
454 &*MZ_NETWORK_POLICY_RULES,
455 Row::pack_slice(&[
456 Datum::String(&rule.name),
457 Datum::String(&policy.id.to_string()),
458 Datum::String(&rule.action.to_string()),
459 Datum::String(&rule.address.to_string()),
460 Datum::String(&rule.direction.to_string()),
461 ]),
462 diff,
463 ));
464 }
465 updates.push(BuiltinTableUpdate::row(
466 &*MZ_NETWORK_POLICIES,
467 Row::pack_slice(&[
468 Datum::String(&policy.id.to_string()),
469 Datum::String(&policy.name),
470 Datum::String(&policy.owner_id.to_string()),
471 privileges,
472 Datum::UInt32(policy.oid.clone()),
473 ]),
474 diff,
475 ));
476
477 Ok(updates)
478 }
479
480 pub(super) fn pack_item_update(
481 &self,
482 id: CatalogItemId,
483 diff: Diff,
484 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
485 let entry = self.get_entry(&id);
486 let oid = entry.oid();
487 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
488 let schema_id = &self
489 .get_schema(
490 &entry.name().qualifiers.database_spec,
491 &entry.name().qualifiers.schema_spec,
492 conn_id,
493 )
494 .id;
495 let name = &entry.name().item;
496 let owner_id = entry.owner_id();
497 let privileges_row = self.pack_privilege_array_row(entry.privileges());
498 let privileges = privileges_row.unpack_first();
499 let mut updates = match entry.item() {
500 CatalogItem::Log(_) => self.pack_source_update(
501 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
502 privileges, diff, None,
503 ),
504 CatalogItem::Index(index) => {
505 self.pack_index_update(id, oid, name, owner_id, index, diff)
506 }
507 CatalogItem::Table(table) => {
508 let mut updates = self
509 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
510
511 if let TableDataSource::DataSource {
512 desc: data_source,
513 timeline: _,
514 } = &table.data_source
515 {
516 updates.extend(match data_source {
517 DataSourceDesc::IngestionExport {
518 ingestion_id,
519 external_reference: UnresolvedItemName(external_reference),
520 details: _,
521 data_config: _,
522 } => {
523 let ingestion_entry = self
524 .get_entry(ingestion_id)
525 .source_desc()
526 .expect("primary source exists")
527 .expect("primary source is a source");
528
529 match ingestion_entry.connection.name() {
530 "postgres" => {
531 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
532 let schema_name = external_reference[1].to_ast_string_simple();
538 let table_name = external_reference[2].to_ast_string_simple();
539
540 self.pack_postgres_source_tables_update(
541 id,
542 &schema_name,
543 &table_name,
544 diff,
545 )
546 }
547 "mysql" => {
548 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
549 let schema_name = external_reference[0].to_ast_string_simple();
550 let table_name = external_reference[1].to_ast_string_simple();
551
552 self.pack_mysql_source_tables_update(
553 id,
554 &schema_name,
555 &table_name,
556 diff,
557 )
558 }
559 "sql-server" => {
560 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
561 let schema_name = external_reference[1].to_ast_string_simple();
566 let table_name = external_reference[2].to_ast_string_simple();
567
568 self.pack_sql_server_source_table_update(
569 id,
570 &schema_name,
571 &table_name,
572 diff,
573 )
574 }
575 "load-generator" => vec![],
578 "kafka" => {
579 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
580 let topic = external_reference[0].to_ast_string_simple();
581 let envelope = data_source.envelope();
582 let (key_format, value_format) = data_source.formats();
583
584 self.pack_kafka_source_tables_update(
585 id,
586 &topic,
587 envelope,
588 key_format,
589 value_format,
590 diff,
591 )
592 }
593 s => unreachable!("{s} sources do not have tables"),
594 }
595 }
596 _ => vec![],
597 });
598 }
599
600 updates
601 }
602 CatalogItem::Source(source) => {
603 let source_type = source.source_type();
604 let connection_id = source.connection_id();
605 let envelope = source.data_source.envelope();
606 let cluster_entry = match source.data_source {
607 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
610 self.get_entry(&ingestion_id)
611 }
612 _ => entry,
613 };
614
615 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
616
617 let (key_format, value_format) = source.data_source.formats();
618
619 let mut updates = self.pack_source_update(
620 id,
621 oid,
622 schema_id,
623 name,
624 source_type,
625 connection_id,
626 envelope,
627 key_format,
628 value_format,
629 cluster_id.as_deref(),
630 owner_id,
631 privileges,
632 diff,
633 source.create_sql.as_ref(),
634 );
635
636 updates.extend(match &source.data_source {
637 DataSourceDesc::Ingestion { desc, .. }
638 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
639 GenericSourceConnection::Postgres(postgres) => {
640 self.pack_postgres_source_update(id, postgres, diff)
641 }
642 GenericSourceConnection::Kafka(kafka) => {
643 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
644 }
645 _ => vec![],
646 },
647 DataSourceDesc::IngestionExport {
648 ingestion_id,
649 external_reference: UnresolvedItemName(external_reference),
650 details: _,
651 data_config: _,
652 } => {
653 let ingestion_entry = self
654 .get_entry(ingestion_id)
655 .source_desc()
656 .expect("primary source exists")
657 .expect("primary source is a source");
658
659 match ingestion_entry.connection.name() {
660 "postgres" => {
661 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
662 let schema_name = external_reference[1].to_ast_string_simple();
668 let table_name = external_reference[2].to_ast_string_simple();
669
670 self.pack_postgres_source_tables_update(
671 id,
672 &schema_name,
673 &table_name,
674 diff,
675 )
676 }
677 "mysql" => {
678 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
679 let schema_name = external_reference[0].to_ast_string_simple();
680 let table_name = external_reference[1].to_ast_string_simple();
681
682 self.pack_mysql_source_tables_update(
683 id,
684 &schema_name,
685 &table_name,
686 diff,
687 )
688 }
689 "sql-server" => {
690 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
691 let schema_name = external_reference[1].to_ast_string_simple();
696 let table_name = external_reference[2].to_ast_string_simple();
697
698 self.pack_sql_server_source_table_update(
699 id,
700 &schema_name,
701 &table_name,
702 diff,
703 )
704 }
705 "load-generator" => vec![],
708 s => unreachable!("{s} sources do not have subsources"),
709 }
710 }
711 DataSourceDesc::Webhook { .. } => {
712 vec![self.pack_webhook_source_update(id, diff)]
713 }
714 _ => vec![],
715 });
716
717 updates
718 }
719 CatalogItem::View(view) => {
720 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
721 }
722 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
723 id, oid, schema_id, name, owner_id, privileges, mview, diff,
724 ),
725 CatalogItem::Sink(sink) => {
726 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
727 }
728 CatalogItem::Type(ty) => {
729 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
730 }
731 CatalogItem::Func(func) => {
732 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
733 }
734 CatalogItem::Secret(_) => {
735 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
736 }
737 CatalogItem::Connection(connection) => self.pack_connection_update(
738 id, oid, schema_id, name, owner_id, privileges, connection, diff,
739 ),
740 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
741 id, oid, schema_id, name, owner_id, privileges, ct, diff,
742 ),
743 };
744
745 if !entry.item().is_temporary() {
746 for dependee in entry.item().references().items() {
749 updates.push(self.pack_depends_update(id, *dependee, diff))
750 }
751 }
752
753 let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
754 if let Ok(desc) = entry.desc_latest(&full_name) {
756 let defaults = match entry.item() {
757 CatalogItem::Table(Table {
758 data_source: TableDataSource::TableWrites { defaults },
759 ..
760 }) => Some(defaults),
761 _ => None,
762 };
763 for (i, (column_name, column_type)) in desc.iter().enumerate() {
764 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
765 let default: Datum = default
766 .as_ref()
767 .map(|d| Datum::String(d))
768 .unwrap_or(Datum::Null);
769 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
770 let (type_name, type_oid) = match &column_type.scalar_type {
771 SqlScalarType::List {
772 custom_id: Some(custom_id),
773 ..
774 }
775 | SqlScalarType::Map {
776 custom_id: Some(custom_id),
777 ..
778 }
779 | SqlScalarType::Record {
780 custom_id: Some(custom_id),
781 ..
782 } => {
783 let entry = self.get_entry(custom_id);
784 let name = &*entry.name().item;
799 let oid = entry.oid();
800 (name, oid)
801 }
802 _ => (pgtype.name(), pgtype.oid()),
803 };
804 updates.push(BuiltinTableUpdate::row(
805 &*MZ_COLUMNS,
806 Row::pack_slice(&[
807 Datum::String(&id.to_string()),
808 Datum::String(column_name),
809 Datum::UInt64(u64::cast_from(i + 1)),
810 Datum::from(column_type.nullable),
811 Datum::String(type_name),
812 default,
813 Datum::UInt32(type_oid),
814 Datum::Int32(pgtype.typmod()),
815 ]),
816 diff,
817 ));
818 }
819 }
820
821 if let Some(cw) = entry.item().initial_logical_compaction_window() {
823 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
824 }
825
826 updates
827 }
828
829 fn pack_history_retention_strategy_update(
830 &self,
831 id: CatalogItemId,
832 cw: CompactionWindow,
833 diff: Diff,
834 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
835 let cw: u64 = cw.comparable_timestamp().into();
836 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
837 .expect("must serialize");
838 BuiltinTableUpdate::row(
839 &*MZ_HISTORY_RETENTION_STRATEGIES,
840 Row::pack_slice(&[
841 Datum::String(&id.to_string()),
842 Datum::String("FOR"),
844 cw.into_row().into_element(),
845 ]),
846 diff,
847 )
848 }
849
850 fn pack_table_update(
851 &self,
852 id: CatalogItemId,
853 oid: u32,
854 schema_id: &SchemaSpecifier,
855 name: &str,
856 owner_id: &RoleId,
857 privileges: Datum,
858 diff: Diff,
859 table: &Table,
860 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
861 let redacted = table.create_sql.as_ref().map(|create_sql| {
862 mz_sql::parse::parse(create_sql)
863 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
864 .into_element()
865 .ast
866 .to_ast_string_redacted()
867 });
868 let source_id = if let TableDataSource::DataSource {
869 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
870 ..
871 } = &table.data_source
872 {
873 Some(ingestion_id.to_string())
874 } else {
875 None
876 };
877
878 vec![BuiltinTableUpdate::row(
879 &*MZ_TABLES,
880 Row::pack_slice(&[
881 Datum::String(&id.to_string()),
882 Datum::UInt32(oid),
883 Datum::String(&schema_id.to_string()),
884 Datum::String(name),
885 Datum::String(&owner_id.to_string()),
886 privileges,
887 if let Some(create_sql) = &table.create_sql {
888 Datum::String(create_sql)
889 } else {
890 Datum::Null
891 },
892 if let Some(redacted) = &redacted {
893 Datum::String(redacted)
894 } else {
895 Datum::Null
896 },
897 if let Some(source_id) = source_id.as_ref() {
898 Datum::String(source_id)
899 } else {
900 Datum::Null
901 },
902 ]),
903 diff,
904 )]
905 }
906
907 fn pack_source_update(
908 &self,
909 id: CatalogItemId,
910 oid: u32,
911 schema_id: &SchemaSpecifier,
912 name: &str,
913 source_desc_name: &str,
914 connection_id: Option<CatalogItemId>,
915 envelope: Option<&str>,
916 key_format: Option<&str>,
917 value_format: Option<&str>,
918 cluster_id: Option<&str>,
919 owner_id: &RoleId,
920 privileges: Datum,
921 diff: Diff,
922 create_sql: Option<&String>,
923 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
924 let redacted = create_sql.map(|create_sql| {
925 let create_stmt = mz_sql::parse::parse(create_sql)
926 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
927 .into_element()
928 .ast;
929 create_stmt.to_ast_string_redacted()
930 });
931 vec![BuiltinTableUpdate::row(
932 &*MZ_SOURCES,
933 Row::pack_slice(&[
934 Datum::String(&id.to_string()),
935 Datum::UInt32(oid),
936 Datum::String(&schema_id.to_string()),
937 Datum::String(name),
938 Datum::String(source_desc_name),
939 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
940 Datum::Null,
943 Datum::from(envelope),
944 Datum::from(key_format),
945 Datum::from(value_format),
946 Datum::from(cluster_id),
947 Datum::String(&owner_id.to_string()),
948 privileges,
949 if let Some(create_sql) = create_sql {
950 Datum::String(create_sql)
951 } else {
952 Datum::Null
953 },
954 if let Some(redacted) = &redacted {
955 Datum::String(redacted)
956 } else {
957 Datum::Null
958 },
959 ]),
960 diff,
961 )]
962 }
963
964 fn pack_postgres_source_update(
965 &self,
966 id: CatalogItemId,
967 postgres: &PostgresSourceConnection<ReferencedConnection>,
968 diff: Diff,
969 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
970 vec![BuiltinTableUpdate::row(
971 &*MZ_POSTGRES_SOURCES,
972 Row::pack_slice(&[
973 Datum::String(&id.to_string()),
974 Datum::String(&postgres.publication_details.slot),
975 Datum::from(postgres.publication_details.timeline_id),
976 ]),
977 diff,
978 )]
979 }
980
981 fn pack_kafka_source_update(
982 &self,
983 item_id: CatalogItemId,
984 collection_id: GlobalId,
985 kafka: &KafkaSourceConnection<ReferencedConnection>,
986 diff: Diff,
987 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
988 vec![BuiltinTableUpdate::row(
989 &*MZ_KAFKA_SOURCES,
990 Row::pack_slice(&[
991 Datum::String(&item_id.to_string()),
992 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
993 Datum::String(&kafka.topic),
994 ]),
995 diff,
996 )]
997 }
998
999 fn pack_postgres_source_tables_update(
1000 &self,
1001 id: CatalogItemId,
1002 schema_name: &str,
1003 table_name: &str,
1004 diff: Diff,
1005 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1006 vec![BuiltinTableUpdate::row(
1007 &*MZ_POSTGRES_SOURCE_TABLES,
1008 Row::pack_slice(&[
1009 Datum::String(&id.to_string()),
1010 Datum::String(schema_name),
1011 Datum::String(table_name),
1012 ]),
1013 diff,
1014 )]
1015 }
1016
1017 fn pack_mysql_source_tables_update(
1018 &self,
1019 id: CatalogItemId,
1020 schema_name: &str,
1021 table_name: &str,
1022 diff: Diff,
1023 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1024 vec![BuiltinTableUpdate::row(
1025 &*MZ_MYSQL_SOURCE_TABLES,
1026 Row::pack_slice(&[
1027 Datum::String(&id.to_string()),
1028 Datum::String(schema_name),
1029 Datum::String(table_name),
1030 ]),
1031 diff,
1032 )]
1033 }
1034
1035 fn pack_sql_server_source_table_update(
1036 &self,
1037 id: CatalogItemId,
1038 schema_name: &str,
1039 table_name: &str,
1040 diff: Diff,
1041 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1042 vec![BuiltinTableUpdate::row(
1043 &*MZ_SQL_SERVER_SOURCE_TABLES,
1044 Row::pack_slice(&[
1045 Datum::String(&id.to_string()),
1046 Datum::String(schema_name),
1047 Datum::String(table_name),
1048 ]),
1049 diff,
1050 )]
1051 }
1052
1053 fn pack_kafka_source_tables_update(
1054 &self,
1055 id: CatalogItemId,
1056 topic: &str,
1057 envelope: Option<&str>,
1058 key_format: Option<&str>,
1059 value_format: Option<&str>,
1060 diff: Diff,
1061 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1062 vec![BuiltinTableUpdate::row(
1063 &*MZ_KAFKA_SOURCE_TABLES,
1064 Row::pack_slice(&[
1065 Datum::String(&id.to_string()),
1066 Datum::String(topic),
1067 Datum::from(envelope),
1068 Datum::from(key_format),
1069 Datum::from(value_format),
1070 ]),
1071 diff,
1072 )]
1073 }
1074
1075 fn pack_connection_update(
1076 &self,
1077 id: CatalogItemId,
1078 oid: u32,
1079 schema_id: &SchemaSpecifier,
1080 name: &str,
1081 owner_id: &RoleId,
1082 privileges: Datum,
1083 connection: &Connection,
1084 diff: Diff,
1085 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1086 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1087 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1088 .into_element()
1089 .ast;
1090 let mut updates = vec![BuiltinTableUpdate::row(
1091 &*MZ_CONNECTIONS,
1092 Row::pack_slice(&[
1093 Datum::String(&id.to_string()),
1094 Datum::UInt32(oid),
1095 Datum::String(&schema_id.to_string()),
1096 Datum::String(name),
1097 Datum::String(match connection.details {
1098 ConnectionDetails::Kafka { .. } => "kafka",
1099 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1100 ConnectionDetails::Postgres { .. } => "postgres",
1101 ConnectionDetails::Aws(..) => "aws",
1102 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1103 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1104 ConnectionDetails::MySql { .. } => "mysql",
1105 ConnectionDetails::SqlServer(_) => "sql-server",
1106 ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1107 }),
1108 Datum::String(&owner_id.to_string()),
1109 privileges,
1110 Datum::String(&connection.create_sql),
1111 Datum::String(&create_stmt.to_ast_string_redacted()),
1112 ]),
1113 diff,
1114 )];
1115 match connection.details {
1116 ConnectionDetails::Kafka(ref kafka) => {
1117 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1118 }
1119 ConnectionDetails::Aws(ref aws_config) => {
1120 match self.pack_aws_connection_update(id, aws_config, diff) {
1121 Ok(update) => {
1122 updates.push(update);
1123 }
1124 Err(e) => {
1125 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1126 }
1127 }
1128 }
1129 ConnectionDetails::AwsPrivatelink(_) => {
1130 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1131 updates.push(self.pack_aws_privatelink_connection_update(
1132 id,
1133 aws_principal_context,
1134 diff,
1135 ));
1136 } else {
1137 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1138 }
1139 }
1140 ConnectionDetails::Ssh {
1141 ref key_1,
1142 ref key_2,
1143 ..
1144 } => {
1145 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1146 }
1147 ConnectionDetails::Csr(_)
1148 | ConnectionDetails::Postgres(_)
1149 | ConnectionDetails::MySql(_)
1150 | ConnectionDetails::SqlServer(_)
1151 | ConnectionDetails::IcebergCatalog(_) => (),
1152 };
1153 updates
1154 }
1155
1156 pub(crate) fn pack_ssh_tunnel_connection_update(
1157 &self,
1158 id: CatalogItemId,
1159 key_1: &SshKey,
1160 key_2: &SshKey,
1161 diff: Diff,
1162 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1163 BuiltinTableUpdate::row(
1164 &*MZ_SSH_TUNNEL_CONNECTIONS,
1165 Row::pack_slice(&[
1166 Datum::String(&id.to_string()),
1167 Datum::String(key_1.public_key().as_str()),
1168 Datum::String(key_2.public_key().as_str()),
1169 ]),
1170 diff,
1171 )
1172 }
1173
1174 fn pack_kafka_connection_update(
1175 &self,
1176 id: CatalogItemId,
1177 kafka: &KafkaConnection<ReferencedConnection>,
1178 diff: Diff,
1179 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1180 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1181 let mut row = Row::default();
1182 row.packer()
1183 .try_push_array(
1184 &[ArrayDimension {
1185 lower_bound: 1,
1186 length: kafka.brokers.len(),
1187 }],
1188 kafka
1189 .brokers
1190 .iter()
1191 .map(|broker| Datum::String(&broker.address)),
1192 )
1193 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1194 let brokers = row.unpack_first();
1195 vec![BuiltinTableUpdate::row(
1196 &*MZ_KAFKA_CONNECTIONS,
1197 Row::pack_slice(&[
1198 Datum::String(&id.to_string()),
1199 brokers,
1200 Datum::String(&progress_topic),
1201 ]),
1202 diff,
1203 )]
1204 }
1205
1206 pub fn pack_aws_privatelink_connection_update(
1207 &self,
1208 connection_id: CatalogItemId,
1209 aws_principal_context: &AwsPrincipalContext,
1210 diff: Diff,
1211 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1212 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1213 let row = Row::pack_slice(&[
1214 Datum::String(&connection_id.to_string()),
1215 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1216 ]);
1217 BuiltinTableUpdate::row(id, row, diff)
1218 }
1219
1220 pub fn pack_aws_connection_update(
1221 &self,
1222 connection_id: CatalogItemId,
1223 aws_config: &AwsConnection,
1224 diff: Diff,
1225 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1226 let id = &MZ_AWS_CONNECTIONS;
1227
1228 let mut access_key_id = None;
1229 let mut access_key_id_secret_id = None;
1230 let mut secret_access_key_secret_id = None;
1231 let mut session_token = None;
1232 let mut session_token_secret_id = None;
1233 let mut assume_role_arn = None;
1234 let mut assume_role_session_name = None;
1235 let mut principal = None;
1236 let mut external_id = None;
1237 let mut example_trust_policy = None;
1238 match &aws_config.auth {
1239 AwsAuth::Credentials(credentials) => {
1240 match &credentials.access_key_id {
1241 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1242 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1243 }
1244 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1245 match credentials.session_token.as_ref() {
1246 None => (),
1247 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1248 Some(StringOrSecret::Secret(s)) => {
1249 session_token_secret_id = Some(s.to_string())
1250 }
1251 }
1252 }
1253 AwsAuth::AssumeRole(assume_role) => {
1254 assume_role_arn = Some(assume_role.arn.as_str());
1255 assume_role_session_name = assume_role.session_name.as_deref();
1256 principal = self
1257 .config
1258 .connection_context
1259 .aws_connection_role_arn
1260 .as_deref();
1261 external_id =
1262 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1263 example_trust_policy = {
1264 let policy = assume_role
1265 .example_trust_policy(&self.config.connection_context, connection_id)?;
1266 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1267 Some(policy.into_row())
1268 };
1269 }
1270 }
1271
1272 let row = Row::pack_slice(&[
1273 Datum::String(&connection_id.to_string()),
1274 Datum::from(aws_config.endpoint.as_deref()),
1275 Datum::from(aws_config.region.as_deref()),
1276 Datum::from(access_key_id),
1277 Datum::from(access_key_id_secret_id.as_deref()),
1278 Datum::from(secret_access_key_secret_id.as_deref()),
1279 Datum::from(session_token),
1280 Datum::from(session_token_secret_id.as_deref()),
1281 Datum::from(assume_role_arn),
1282 Datum::from(assume_role_session_name),
1283 Datum::from(principal),
1284 Datum::from(external_id.as_deref()),
1285 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1286 ]);
1287
1288 Ok(BuiltinTableUpdate::row(id, row, diff))
1289 }
1290
1291 fn pack_view_update(
1292 &self,
1293 id: CatalogItemId,
1294 oid: u32,
1295 schema_id: &SchemaSpecifier,
1296 name: &str,
1297 owner_id: &RoleId,
1298 privileges: Datum,
1299 view: &View,
1300 diff: Diff,
1301 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1302 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1303 .unwrap_or_else(|e| {
1304 panic!(
1305 "create_sql cannot be invalid: `{}` --- error: `{}`",
1306 view.create_sql, e
1307 )
1308 })
1309 .into_element()
1310 .ast;
1311 let query = match &create_stmt {
1312 Statement::CreateView(stmt) => &stmt.definition.query,
1313 _ => unreachable!(),
1314 };
1315
1316 let mut query_string = query.to_ast_string_stable();
1317 query_string.push(';');
1320
1321 vec![BuiltinTableUpdate::row(
1322 &*MZ_VIEWS,
1323 Row::pack_slice(&[
1324 Datum::String(&id.to_string()),
1325 Datum::UInt32(oid),
1326 Datum::String(&schema_id.to_string()),
1327 Datum::String(name),
1328 Datum::String(&query_string),
1329 Datum::String(&owner_id.to_string()),
1330 privileges,
1331 Datum::String(&view.create_sql),
1332 Datum::String(&create_stmt.to_ast_string_redacted()),
1333 ]),
1334 diff,
1335 )]
1336 }
1337
1338 fn pack_materialized_view_update(
1339 &self,
1340 id: CatalogItemId,
1341 oid: u32,
1342 schema_id: &SchemaSpecifier,
1343 name: &str,
1344 owner_id: &RoleId,
1345 privileges: Datum,
1346 mview: &MaterializedView,
1347 diff: Diff,
1348 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1349 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1350 .unwrap_or_else(|e| {
1351 panic!(
1352 "create_sql cannot be invalid: `{}` --- error: `{}`",
1353 mview.create_sql, e
1354 )
1355 })
1356 .into_element()
1357 .ast;
1358 let query_string = match &create_stmt {
1359 Statement::CreateMaterializedView(stmt) => {
1360 let mut query_string = stmt.query.to_ast_string_stable();
1361 query_string.push(';');
1364 query_string
1365 }
1366 _ => unreachable!(),
1367 };
1368
1369 let mut updates = Vec::new();
1370
1371 updates.push(BuiltinTableUpdate::row(
1372 &*MZ_MATERIALIZED_VIEWS,
1373 Row::pack_slice(&[
1374 Datum::String(&id.to_string()),
1375 Datum::UInt32(oid),
1376 Datum::String(&schema_id.to_string()),
1377 Datum::String(name),
1378 Datum::String(&mview.cluster_id.to_string()),
1379 Datum::String(&query_string),
1380 Datum::String(&owner_id.to_string()),
1381 privileges,
1382 Datum::String(&mview.create_sql),
1383 Datum::String(&create_stmt.to_ast_string_redacted()),
1384 ]),
1385 diff,
1386 ));
1387
1388 if let Some(refresh_schedule) = &mview.refresh_schedule {
1389 assert!(!refresh_schedule.is_empty());
1392 for RefreshEvery {
1393 interval,
1394 aligned_to,
1395 } in refresh_schedule.everies.iter()
1396 {
1397 let aligned_to_dt = mz_ore::now::to_datetime(
1398 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1399 );
1400 updates.push(BuiltinTableUpdate::row(
1401 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1402 Row::pack_slice(&[
1403 Datum::String(&id.to_string()),
1404 Datum::String("every"),
1405 Datum::Interval(
1406 Interval::from_duration(interval).expect(
1407 "planning ensured that this is convertible back to Interval",
1408 ),
1409 ),
1410 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1411 Datum::Null,
1412 ]),
1413 diff,
1414 ));
1415 }
1416 for at in refresh_schedule.ats.iter() {
1417 let at_dt = mz_ore::now::to_datetime(
1418 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1419 );
1420 updates.push(BuiltinTableUpdate::row(
1421 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1422 Row::pack_slice(&[
1423 Datum::String(&id.to_string()),
1424 Datum::String("at"),
1425 Datum::Null,
1426 Datum::Null,
1427 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1428 ]),
1429 diff,
1430 ));
1431 }
1432 } else {
1433 updates.push(BuiltinTableUpdate::row(
1434 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1435 Row::pack_slice(&[
1436 Datum::String(&id.to_string()),
1437 Datum::String("on-commit"),
1438 Datum::Null,
1439 Datum::Null,
1440 Datum::Null,
1441 ]),
1442 diff,
1443 ));
1444 }
1445
1446 updates
1447 }
1448
1449 fn pack_continual_task_update(
1450 &self,
1451 id: CatalogItemId,
1452 oid: u32,
1453 schema_id: &SchemaSpecifier,
1454 name: &str,
1455 owner_id: &RoleId,
1456 privileges: Datum,
1457 ct: &ContinualTask,
1458 diff: Diff,
1459 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1460 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1461 .unwrap_or_else(|e| {
1462 panic!(
1463 "create_sql cannot be invalid: `{}` --- error: `{}`",
1464 ct.create_sql, e
1465 )
1466 })
1467 .into_element()
1468 .ast;
1469 let query_string = match &create_stmt {
1470 Statement::CreateContinualTask(stmt) => {
1471 let mut query_string = String::new();
1472 for stmt in &stmt.stmts {
1473 let s = match stmt {
1474 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1475 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1476 };
1477 if query_string.is_empty() {
1478 query_string = s;
1479 } else {
1480 query_string.push_str("; ");
1481 query_string.push_str(&s);
1482 }
1483 }
1484 query_string
1485 }
1486 _ => unreachable!(),
1487 };
1488
1489 vec![BuiltinTableUpdate::row(
1490 &*MZ_CONTINUAL_TASKS,
1491 Row::pack_slice(&[
1492 Datum::String(&id.to_string()),
1493 Datum::UInt32(oid),
1494 Datum::String(&schema_id.to_string()),
1495 Datum::String(name),
1496 Datum::String(&ct.cluster_id.to_string()),
1497 Datum::String(&query_string),
1498 Datum::String(&owner_id.to_string()),
1499 privileges,
1500 Datum::String(&ct.create_sql),
1501 Datum::String(&create_stmt.to_ast_string_redacted()),
1502 ]),
1503 diff,
1504 )]
1505 }
1506
1507 fn pack_sink_update(
1508 &self,
1509 id: CatalogItemId,
1510 oid: u32,
1511 schema_id: &SchemaSpecifier,
1512 name: &str,
1513 owner_id: &RoleId,
1514 sink: &Sink,
1515 diff: Diff,
1516 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1517 let mut updates = vec![];
1518 match &sink.connection {
1519 StorageSinkConnection::Kafka(KafkaSinkConnection {
1520 topic: topic_name, ..
1521 }) => {
1522 updates.push(BuiltinTableUpdate::row(
1523 &*MZ_KAFKA_SINKS,
1524 Row::pack_slice(&[
1525 Datum::String(&id.to_string()),
1526 Datum::String(topic_name.as_str()),
1527 ]),
1528 diff,
1529 ));
1530 }
1531 };
1532
1533 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1534 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1535 .into_element()
1536 .ast;
1537
1538 let envelope = sink.envelope();
1539
1540 let combined_format = sink.combined_format();
1542 let (key_format, value_format) = sink.formats();
1543
1544 updates.push(BuiltinTableUpdate::row(
1545 &*MZ_SINKS,
1546 Row::pack_slice(&[
1547 Datum::String(&id.to_string()),
1548 Datum::UInt32(oid),
1549 Datum::String(&schema_id.to_string()),
1550 Datum::String(name),
1551 Datum::String(sink.connection.name()),
1552 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1553 Datum::Null,
1555 Datum::from(envelope),
1556 Datum::from(combined_format.as_ref()),
1557 Datum::from(key_format),
1558 Datum::String(value_format),
1559 Datum::String(&sink.cluster_id.to_string()),
1560 Datum::String(&owner_id.to_string()),
1561 Datum::String(&sink.create_sql),
1562 Datum::String(&create_stmt.to_ast_string_redacted()),
1563 ]),
1564 diff,
1565 ));
1566
1567 updates
1568 }
1569
1570 fn pack_index_update(
1571 &self,
1572 id: CatalogItemId,
1573 oid: u32,
1574 name: &str,
1575 owner_id: &RoleId,
1576 index: &Index,
1577 diff: Diff,
1578 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1579 let mut updates = vec![];
1580
1581 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1582 .unwrap_or_else(|e| {
1583 panic!(
1584 "create_sql cannot be invalid: `{}` --- error: `{}`",
1585 index.create_sql, e
1586 )
1587 })
1588 .into_element()
1589 .ast;
1590
1591 let key_sqls = match &create_stmt {
1592 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1593 .as_ref()
1594 .expect("key_parts is filled in during planning"),
1595 _ => unreachable!(),
1596 };
1597 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1598
1599 updates.push(BuiltinTableUpdate::row(
1600 &*MZ_INDEXES,
1601 Row::pack_slice(&[
1602 Datum::String(&id.to_string()),
1603 Datum::UInt32(oid),
1604 Datum::String(name),
1605 Datum::String(&on_item_id.to_string()),
1606 Datum::String(&index.cluster_id.to_string()),
1607 Datum::String(&owner_id.to_string()),
1608 Datum::String(&index.create_sql),
1609 Datum::String(&create_stmt.to_ast_string_redacted()),
1610 ]),
1611 diff,
1612 ));
1613
1614 for (i, key) in index.keys.iter().enumerate() {
1615 let on_entry = self.get_entry_by_global_id(&index.on);
1616 let nullable = key
1617 .typ(
1618 &on_entry
1619 .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1620 .expect("can only create indexes on items with a valid description")
1621 .typ()
1622 .column_types,
1623 )
1624 .nullable;
1625 let seq_in_index = u64::cast_from(i + 1);
1626 let key_sql = key_sqls
1627 .get(i)
1628 .expect("missing sql information for index key")
1629 .to_ast_string_simple();
1630 let (field_number, expression) = match key {
1631 MirScalarExpr::Column(col, _) => {
1632 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1633 }
1634 _ => (Datum::Null, Datum::String(&key_sql)),
1635 };
1636 updates.push(BuiltinTableUpdate::row(
1637 &*MZ_INDEX_COLUMNS,
1638 Row::pack_slice(&[
1639 Datum::String(&id.to_string()),
1640 Datum::UInt64(seq_in_index),
1641 field_number,
1642 expression,
1643 Datum::from(nullable),
1644 ]),
1645 diff,
1646 ));
1647 }
1648
1649 updates
1650 }
1651
1652 fn pack_type_update(
1653 &self,
1654 id: CatalogItemId,
1655 oid: u32,
1656 schema_id: &SchemaSpecifier,
1657 name: &str,
1658 owner_id: &RoleId,
1659 privileges: Datum,
1660 typ: &Type,
1661 diff: Diff,
1662 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1663 let mut out = vec![];
1664
1665 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1666 mz_sql::parse::parse(create_sql)
1667 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1668 .into_element()
1669 .ast
1670 .to_ast_string_redacted()
1671 });
1672
1673 out.push(BuiltinTableUpdate::row(
1674 &*MZ_TYPES,
1675 Row::pack_slice(&[
1676 Datum::String(&id.to_string()),
1677 Datum::UInt32(oid),
1678 Datum::String(&schema_id.to_string()),
1679 Datum::String(name),
1680 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1681 Datum::String(&owner_id.to_string()),
1682 privileges,
1683 if let Some(create_sql) = &typ.create_sql {
1684 Datum::String(create_sql)
1685 } else {
1686 Datum::Null
1687 },
1688 if let Some(redacted) = &redacted {
1689 Datum::String(redacted)
1690 } else {
1691 Datum::Null
1692 },
1693 ]),
1694 diff,
1695 ));
1696
1697 let mut row = Row::default();
1698 let mut packer = row.packer();
1699
1700 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1701 if mods.is_empty() {
1702 packer.push(Datum::Null);
1703 } else {
1704 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1705 }
1706 }
1707
1708 let index_id = match &typ.details.typ {
1709 CatalogType::Array {
1710 element_reference: element_id,
1711 } => {
1712 packer.push(Datum::String(&id.to_string()));
1713 packer.push(Datum::String(&element_id.to_string()));
1714 &MZ_ARRAY_TYPES
1715 }
1716 CatalogType::List {
1717 element_reference: element_id,
1718 element_modifiers,
1719 } => {
1720 packer.push(Datum::String(&id.to_string()));
1721 packer.push(Datum::String(&element_id.to_string()));
1722 append_modifier(&mut packer, element_modifiers);
1723 &MZ_LIST_TYPES
1724 }
1725 CatalogType::Map {
1726 key_reference: key_id,
1727 value_reference: value_id,
1728 key_modifiers,
1729 value_modifiers,
1730 } => {
1731 packer.push(Datum::String(&id.to_string()));
1732 packer.push(Datum::String(&key_id.to_string()));
1733 packer.push(Datum::String(&value_id.to_string()));
1734 append_modifier(&mut packer, key_modifiers);
1735 append_modifier(&mut packer, value_modifiers);
1736 &MZ_MAP_TYPES
1737 }
1738 CatalogType::Pseudo => {
1739 packer.push(Datum::String(&id.to_string()));
1740 &MZ_PSEUDO_TYPES
1741 }
1742 _ => {
1743 packer.push(Datum::String(&id.to_string()));
1744 &MZ_BASE_TYPES
1745 }
1746 };
1747 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1748
1749 if let Some(pg_metadata) = &typ.details.pg_metadata {
1750 out.push(BuiltinTableUpdate::row(
1751 &*MZ_TYPE_PG_METADATA,
1752 Row::pack_slice(&[
1753 Datum::String(&id.to_string()),
1754 Datum::UInt32(pg_metadata.typinput_oid),
1755 Datum::UInt32(pg_metadata.typreceive_oid),
1756 ]),
1757 diff,
1758 ));
1759 }
1760
1761 out
1762 }
1763
1764 fn pack_func_update(
1765 &self,
1766 id: CatalogItemId,
1767 schema_id: &SchemaSpecifier,
1768 name: &str,
1769 owner_id: &RoleId,
1770 func: &Func,
1771 diff: Diff,
1772 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1773 let mut updates = vec![];
1774 for func_impl_details in func.inner.func_impls() {
1775 let arg_type_ids = func_impl_details
1776 .arg_typs
1777 .iter()
1778 .map(|typ| self.get_system_type(typ).id().to_string())
1779 .collect::<Vec<_>>();
1780
1781 let mut row = Row::default();
1782 row.packer()
1783 .try_push_array(
1784 &[ArrayDimension {
1785 lower_bound: 1,
1786 length: arg_type_ids.len(),
1787 }],
1788 arg_type_ids.iter().map(|id| Datum::String(id)),
1789 )
1790 .expect(
1791 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1792 );
1793 let arg_type_ids = row.unpack_first();
1794
1795 updates.push(BuiltinTableUpdate::row(
1796 &*MZ_FUNCTIONS,
1797 Row::pack_slice(&[
1798 Datum::String(&id.to_string()),
1799 Datum::UInt32(func_impl_details.oid),
1800 Datum::String(&schema_id.to_string()),
1801 Datum::String(name),
1802 arg_type_ids,
1803 Datum::from(
1804 func_impl_details
1805 .variadic_typ
1806 .map(|typ| self.get_system_type(typ).id().to_string())
1807 .as_deref(),
1808 ),
1809 Datum::from(
1810 func_impl_details
1811 .return_typ
1812 .map(|typ| self.get_system_type(typ).id().to_string())
1813 .as_deref(),
1814 ),
1815 func_impl_details.return_is_set.into(),
1816 Datum::String(&owner_id.to_string()),
1817 ]),
1818 diff,
1819 ));
1820
1821 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1822 updates.push(BuiltinTableUpdate::row(
1823 &*MZ_AGGREGATES,
1824 Row::pack_slice(&[
1825 Datum::UInt32(func_impl_details.oid),
1826 Datum::String("n"),
1828 Datum::Int16(0),
1829 ]),
1830 diff,
1831 ));
1832 }
1833 }
1834 updates
1835 }
1836
1837 pub fn pack_op_update(
1838 &self,
1839 operator: &str,
1840 func_impl_details: FuncImplCatalogDetails,
1841 diff: Diff,
1842 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1843 let arg_type_ids = func_impl_details
1844 .arg_typs
1845 .iter()
1846 .map(|typ| self.get_system_type(typ).id().to_string())
1847 .collect::<Vec<_>>();
1848
1849 let mut row = Row::default();
1850 row.packer()
1851 .try_push_array(
1852 &[ArrayDimension {
1853 lower_bound: 1,
1854 length: arg_type_ids.len(),
1855 }],
1856 arg_type_ids.iter().map(|id| Datum::String(id)),
1857 )
1858 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1859 let arg_type_ids = row.unpack_first();
1860
1861 BuiltinTableUpdate::row(
1862 &*MZ_OPERATORS,
1863 Row::pack_slice(&[
1864 Datum::UInt32(func_impl_details.oid),
1865 Datum::String(operator),
1866 arg_type_ids,
1867 Datum::from(
1868 func_impl_details
1869 .return_typ
1870 .map(|typ| self.get_system_type(typ).id().to_string())
1871 .as_deref(),
1872 ),
1873 ]),
1874 diff,
1875 )
1876 }
1877
1878 fn pack_secret_update(
1879 &self,
1880 id: CatalogItemId,
1881 oid: u32,
1882 schema_id: &SchemaSpecifier,
1883 name: &str,
1884 owner_id: &RoleId,
1885 privileges: Datum,
1886 diff: Diff,
1887 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1888 vec![BuiltinTableUpdate::row(
1889 &*MZ_SECRETS,
1890 Row::pack_slice(&[
1891 Datum::String(&id.to_string()),
1892 Datum::UInt32(oid),
1893 Datum::String(&schema_id.to_string()),
1894 Datum::String(name),
1895 Datum::String(&owner_id.to_string()),
1896 privileges,
1897 ]),
1898 diff,
1899 )]
1900 }
1901
1902 pub fn pack_audit_log_update(
1903 &self,
1904 event: &VersionedEvent,
1905 diff: Diff,
1906 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1907 let (event_type, object_type, details, user, occurred_at): (
1908 &EventType,
1909 &ObjectType,
1910 &EventDetails,
1911 &Option<String>,
1912 u64,
1913 ) = match event {
1914 VersionedEvent::V1(ev) => (
1915 &ev.event_type,
1916 &ev.object_type,
1917 &ev.details,
1918 &ev.user,
1919 ev.occurred_at,
1920 ),
1921 };
1922 let details = Jsonb::from_serde_json(details.as_json())
1923 .map_err(|e| {
1924 Error::new(ErrorKind::Unstructured(format!(
1925 "could not pack audit log update: {}",
1926 e
1927 )))
1928 })?
1929 .into_row();
1930 let details = details
1931 .iter()
1932 .next()
1933 .expect("details created above with a single jsonb column");
1934 let dt = mz_ore::now::to_datetime(occurred_at);
1935 let id = event.sortable_id();
1936 Ok(BuiltinTableUpdate::row(
1937 &*MZ_AUDIT_EVENTS,
1938 Row::pack_slice(&[
1939 Datum::UInt64(id),
1940 Datum::String(&format!("{}", event_type)),
1941 Datum::String(&format!("{}", object_type)),
1942 details,
1943 match user {
1944 Some(user) => Datum::String(user),
1945 None => Datum::Null,
1946 },
1947 Datum::TimestampTz(dt.try_into().expect("must fit")),
1948 ]),
1949 diff,
1950 ))
1951 }
1952
1953 pub fn pack_storage_usage_update(
1954 &self,
1955 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1956 diff: Diff,
1957 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1958 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1959 let row = Row::pack_slice(&[
1960 Datum::UInt64(event.id),
1961 Datum::from(event.shard_id.as_deref()),
1962 Datum::UInt64(event.size_bytes),
1963 Datum::TimestampTz(
1964 mz_ore::now::to_datetime(event.collection_timestamp)
1965 .try_into()
1966 .expect("must fit"),
1967 ),
1968 ]);
1969 BuiltinTableUpdate::row(id, row, diff)
1970 }
1971
1972 pub fn pack_egress_ip_update(
1973 &self,
1974 ip: &IpNet,
1975 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1976 let id = &MZ_EGRESS_IPS;
1977 let addr = ip.addr();
1978 let row = Row::pack_slice(&[
1979 Datum::String(&addr.to_string()),
1980 Datum::Int32(ip.prefix_len().into()),
1981 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1982 ]);
1983 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1984 }
1985
1986 pub fn pack_license_key_update(
1987 &self,
1988 license_key: &ValidatedLicenseKey,
1989 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1990 let id = &MZ_LICENSE_KEYS;
1991 let row = Row::pack_slice(&[
1992 Datum::String(&license_key.id),
1993 Datum::String(&license_key.organization),
1994 Datum::String(&license_key.environment_id),
1995 Datum::TimestampTz(
1996 mz_ore::now::to_datetime(license_key.expiration * 1000)
1997 .try_into()
1998 .expect("must fit"),
1999 ),
2000 Datum::TimestampTz(
2001 mz_ore::now::to_datetime(license_key.not_before * 1000)
2002 .try_into()
2003 .expect("must fit"),
2004 ),
2005 ]);
2006 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2007 }
2008
2009 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2010 let mut updates = Vec::new();
2011 for (size, alloc) in &self.cluster_replica_sizes.0 {
2012 if alloc.disabled {
2013 continue;
2014 }
2015
2016 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2019 let MemoryLimit(ByteSize(memory_bytes)) =
2020 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2021 let DiskLimit(ByteSize(disk_bytes)) =
2022 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2023
2024 let row = Row::pack_slice(&[
2025 size.as_str().into(),
2026 u64::from(alloc.scale).into(),
2027 u64::cast_from(alloc.workers).into(),
2028 cpu_limit.as_nanocpus().into(),
2029 memory_bytes.into(),
2030 disk_bytes.into(),
2031 (alloc.credits_per_hour).into(),
2032 ]);
2033
2034 updates.push(BuiltinTableUpdate::row(
2035 &*MZ_CLUSTER_REPLICA_SIZES,
2036 row,
2037 Diff::ONE,
2038 ));
2039 }
2040
2041 updates
2042 }
2043
2044 pub fn pack_subscribe_update(
2045 &self,
2046 id: GlobalId,
2047 subscribe: &ActiveSubscribe,
2048 diff: Diff,
2049 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2050 let mut row = Row::default();
2051 let mut packer = row.packer();
2052 packer.push(Datum::String(&id.to_string()));
2053 packer.push(Datum::Uuid(subscribe.session_uuid));
2054 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2055
2056 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2057 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2058
2059 let depends_on: Vec<_> = subscribe
2060 .depends_on
2061 .iter()
2062 .map(|id| id.to_string())
2063 .collect();
2064 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2065
2066 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2067 }
2068
2069 pub fn pack_session_update(
2070 &self,
2071 conn: &ConnMeta,
2072 diff: Diff,
2073 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2074 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2075 BuiltinTableUpdate::row(
2076 &*MZ_SESSIONS,
2077 Row::pack_slice(&[
2078 Datum::Uuid(conn.uuid()),
2079 Datum::UInt32(conn.conn_id().unhandled()),
2080 Datum::String(&conn.authenticated_role_id().to_string()),
2081 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2082 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2083 ]),
2084 diff,
2085 )
2086 }
2087
2088 pub fn pack_default_privileges_update(
2089 &self,
2090 default_privilege_object: &DefaultPrivilegeObject,
2091 grantee: &RoleId,
2092 acl_mode: &AclMode,
2093 diff: Diff,
2094 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2095 BuiltinTableUpdate::row(
2096 &*MZ_DEFAULT_PRIVILEGES,
2097 Row::pack_slice(&[
2098 default_privilege_object.role_id.to_string().as_str().into(),
2099 default_privilege_object
2100 .database_id
2101 .map(|database_id| database_id.to_string())
2102 .as_deref()
2103 .into(),
2104 default_privilege_object
2105 .schema_id
2106 .map(|schema_id| schema_id.to_string())
2107 .as_deref()
2108 .into(),
2109 default_privilege_object
2110 .object_type
2111 .to_string()
2112 .to_lowercase()
2113 .as_str()
2114 .into(),
2115 grantee.to_string().as_str().into(),
2116 acl_mode.to_string().as_str().into(),
2117 ]),
2118 diff,
2119 )
2120 }
2121
2122 pub fn pack_system_privileges_update(
2123 &self,
2124 privileges: MzAclItem,
2125 diff: Diff,
2126 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2127 BuiltinTableUpdate::row(
2128 &*MZ_SYSTEM_PRIVILEGES,
2129 Row::pack_slice(&[privileges.into()]),
2130 diff,
2131 )
2132 }
2133
2134 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2135 let mut row = Row::default();
2136 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2137 row.packer()
2138 .try_push_array(
2139 &[ArrayDimension {
2140 lower_bound: 1,
2141 length: flat_privileges.len(),
2142 }],
2143 flat_privileges
2144 .into_iter()
2145 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2146 )
2147 .expect("privileges is 1 dimensional, and its length is used for the array length");
2148 row
2149 }
2150
2151 pub fn pack_comment_update(
2152 &self,
2153 object_id: CommentObjectId,
2154 column_pos: Option<usize>,
2155 comment: &str,
2156 diff: Diff,
2157 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2158 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2160 let audit_type = super::object_type_to_audit_object_type(object_type);
2161 let object_type_str = audit_type.to_string();
2162
2163 let object_id_str = match object_id {
2164 CommentObjectId::Table(global_id)
2165 | CommentObjectId::View(global_id)
2166 | CommentObjectId::MaterializedView(global_id)
2167 | CommentObjectId::Source(global_id)
2168 | CommentObjectId::Sink(global_id)
2169 | CommentObjectId::Index(global_id)
2170 | CommentObjectId::Func(global_id)
2171 | CommentObjectId::Connection(global_id)
2172 | CommentObjectId::Secret(global_id)
2173 | CommentObjectId::Type(global_id)
2174 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2175 CommentObjectId::Role(role_id) => role_id.to_string(),
2176 CommentObjectId::Database(database_id) => database_id.to_string(),
2177 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2178 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2179 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2180 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2181 };
2182 let column_pos_datum = match column_pos {
2183 Some(pos) => {
2184 let pos =
2186 i32::try_from(pos).expect("we constrain this value in the planning layer");
2187 Datum::Int32(pos)
2188 }
2189 None => Datum::Null,
2190 };
2191
2192 BuiltinTableUpdate::row(
2193 &*MZ_COMMENTS,
2194 Row::pack_slice(&[
2195 Datum::String(&object_id_str),
2196 Datum::String(&object_type_str),
2197 column_pos_datum,
2198 Datum::String(comment),
2199 ]),
2200 diff,
2201 )
2202 }
2203
2204 pub fn pack_webhook_source_update(
2205 &self,
2206 item_id: CatalogItemId,
2207 diff: Diff,
2208 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2209 let url = self
2210 .try_get_webhook_url(&item_id)
2211 .expect("webhook source should exist");
2212 let url = url.to_string();
2213 let name = &self.get_entry(&item_id).name().item;
2214 let id_str = item_id.to_string();
2215
2216 BuiltinTableUpdate::row(
2217 &*MZ_WEBHOOKS_SOURCES,
2218 Row::pack_slice(&[
2219 Datum::String(&id_str),
2220 Datum::String(name),
2221 Datum::String(&url),
2222 ]),
2223 diff,
2224 )
2225 }
2226
2227 pub fn pack_source_references_update(
2228 &self,
2229 source_references: &SourceReferences,
2230 diff: Diff,
2231 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2232 let source_id = source_references.source_id.to_string();
2233 let updated_at = &source_references.updated_at;
2234 source_references
2235 .references
2236 .iter()
2237 .map(|reference| {
2238 let mut row = Row::default();
2239 let mut packer = row.packer();
2240 packer.extend([
2241 Datum::String(&source_id),
2242 reference
2243 .namespace
2244 .as_ref()
2245 .map(|s| Datum::String(s))
2246 .unwrap_or(Datum::Null),
2247 Datum::String(&reference.name),
2248 Datum::TimestampTz(
2249 mz_ore::now::to_datetime(*updated_at)
2250 .try_into()
2251 .expect("must fit"),
2252 ),
2253 ]);
2254 if reference.columns.len() > 0 {
2255 packer
2256 .try_push_array(
2257 &[ArrayDimension {
2258 lower_bound: 1,
2259 length: reference.columns.len(),
2260 }],
2261 reference.columns.iter().map(|col| Datum::String(col)),
2262 )
2263 .expect(
2264 "columns is 1 dimensional, and its length is used for the array length",
2265 );
2266 } else {
2267 packer.push(Datum::Null);
2268 }
2269
2270 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2271 })
2272 .collect()
2273 }
2274}