1mod notice;
11
12use std::cmp;
13use std::time::Duration;
14
15use bytesize::ByteSize;
16use ipnet::IpNet;
17use mz_adapter_types::compaction::CompactionWindow;
18use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
19use mz_catalog::SYSTEM_CONN_ID;
20use mz_catalog::builtin::{
21 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
22 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
23 MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
24 MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
25 MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEX_COLUMNS, MZ_INDEXES,
26 MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
27 MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
28 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
29 MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES, MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS,
30 MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES, MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES,
31 MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES, MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS,
32 MZ_SOURCE_REFERENCES, MZ_SOURCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
33 MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES,
34 MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
35};
36use mz_catalog::config::AwsPrincipalContext;
37use mz_catalog::durable::SourceReferences;
38use mz_catalog::memory::error::{Error, ErrorKind};
39use mz_catalog::memory::objects::{
40 CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func, Index,
41 MaterializedView, Sink, Table, TableDataSource, Type, View,
42};
43use mz_compute_types::dyncfgs::{
44 MEMORY_LIMITER_INTERVAL, MEMORY_LIMITER_USAGE_BIAS, MEMORY_LIMITER_USAGE_FACTOR,
45};
46use mz_controller::clusters::{
47 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
48};
49use mz_controller_types::ClusterId;
50use mz_expr::MirScalarExpr;
51use mz_license_keys::ValidatedLicenseKey;
52use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
53use mz_ore::cast::{CastFrom, CastLossy};
54use mz_ore::collections::CollectionExt;
55use mz_persist_client::batch::ProtoBatch;
56use mz_repr::adt::array::ArrayDimension;
57use mz_repr::adt::interval::Interval;
58use mz_repr::adt::jsonb::Jsonb;
59use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
60use mz_repr::network_policy_id::NetworkPolicyId;
61use mz_repr::refresh_schedule::RefreshEvery;
62use mz_repr::role_id::RoleId;
63use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, SqlScalarType, Timestamp};
64use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
65use mz_sql::catalog::{
66 CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
67 TypeCategory,
68};
69use mz_sql::func::FuncImplCatalogDetails;
70use mz_sql::names::{
71 CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
72};
73use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
74use mz_sql::session::user::SYSTEM_USER;
75use mz_sql::session::vars::SessionVars;
76use mz_sql_parser::ast::display::AstDisplay;
77use mz_storage_client::client::TableData;
78use mz_storage_types::connections::KafkaConnection;
79use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
80use mz_storage_types::connections::inline::ReferencedConnection;
81use mz_storage_types::connections::string_or_secret::StringOrSecret;
82use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
83use mz_storage_types::sources::{
84 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
85};
86use smallvec::smallvec;
87
88use crate::active_compute_sink::ActiveSubscribe;
90use crate::catalog::CatalogState;
91use crate::coord::ConnMeta;
92
93#[derive(Debug, Clone)]
95pub struct BuiltinTableUpdate<T = CatalogItemId> {
96 pub id: T,
98 pub data: TableData,
100}
101
102impl<T> BuiltinTableUpdate<T> {
103 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
105 BuiltinTableUpdate {
106 id,
107 data: TableData::Rows(vec![(row, diff)]),
108 }
109 }
110
111 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
112 BuiltinTableUpdate {
113 id,
114 data: TableData::Batches(smallvec![batch]),
115 }
116 }
117}
118
119impl CatalogState {
120 pub fn resolve_builtin_table_updates(
121 &self,
122 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
123 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
124 builtin_table_update
125 .into_iter()
126 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
127 .collect()
128 }
129
130 pub fn resolve_builtin_table_update(
131 &self,
132 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
133 ) -> BuiltinTableUpdate<CatalogItemId> {
134 let id = self.resolve_builtin_table(id);
135 BuiltinTableUpdate { id, data }
136 }
137
138 pub fn pack_depends_update(
139 &self,
140 depender: CatalogItemId,
141 dependee: CatalogItemId,
142 diff: Diff,
143 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
144 let row = Row::pack_slice(&[
145 Datum::String(&depender.to_string()),
146 Datum::String(&dependee.to_string()),
147 ]);
148 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
149 }
150
151 pub(super) fn pack_database_update(
152 &self,
153 database_id: &DatabaseId,
154 diff: Diff,
155 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
156 let database = self.get_database(database_id);
157 let row = self.pack_privilege_array_row(database.privileges());
158 let privileges = row.unpack_first();
159 BuiltinTableUpdate::row(
160 &*MZ_DATABASES,
161 Row::pack_slice(&[
162 Datum::String(&database.id.to_string()),
163 Datum::UInt32(database.oid),
164 Datum::String(database.name()),
165 Datum::String(&database.owner_id.to_string()),
166 privileges,
167 ]),
168 diff,
169 )
170 }
171
172 pub(super) fn pack_schema_update(
173 &self,
174 database_spec: &ResolvedDatabaseSpecifier,
175 schema_id: &SchemaId,
176 diff: Diff,
177 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
178 let (database_id, schema) = match database_spec {
179 ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
180 ResolvedDatabaseSpecifier::Id(id) => (
181 Some(id.to_string()),
182 &self.database_by_id[id].schemas_by_id[schema_id],
183 ),
184 };
185 let row = self.pack_privilege_array_row(schema.privileges());
186 let privileges = row.unpack_first();
187 BuiltinTableUpdate::row(
188 &*MZ_SCHEMAS,
189 Row::pack_slice(&[
190 Datum::String(&schema_id.to_string()),
191 Datum::UInt32(schema.oid),
192 Datum::from(database_id.as_deref()),
193 Datum::String(&schema.name.schema),
194 Datum::String(&schema.owner_id.to_string()),
195 privileges,
196 ]),
197 diff,
198 )
199 }
200
201 pub(super) fn pack_role_update(
202 &self,
203 id: RoleId,
204 diff: Diff,
205 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
206 match id {
207 RoleId::Public => vec![],
209 id => {
210 let role = self.get_role(&id);
211 let role_update = BuiltinTableUpdate::row(
212 &*MZ_ROLES,
213 Row::pack_slice(&[
214 Datum::String(&role.id.to_string()),
215 Datum::UInt32(role.oid),
216 Datum::String(&role.name),
217 Datum::from(role.attributes.inherit),
218 ]),
219 diff,
220 );
221 let mut updates = vec![role_update];
222
223 let session_vars_reference = SessionVars::new_unchecked(
226 &mz_build_info::DUMMY_BUILD_INFO,
227 SYSTEM_USER.clone(),
228 None,
229 );
230
231 for (name, val) in role.vars() {
232 let result = session_vars_reference
233 .inspect(name)
234 .and_then(|var| var.check(val.borrow()));
235 let Ok(formatted_val) = result else {
236 tracing::error!(?name, ?val, ?result, "found invalid role default var");
239 continue;
240 };
241
242 let role_var_update = BuiltinTableUpdate::row(
243 &*MZ_ROLE_PARAMETERS,
244 Row::pack_slice(&[
245 Datum::String(&role.id.to_string()),
246 Datum::String(name),
247 Datum::String(&formatted_val),
248 ]),
249 diff,
250 );
251 updates.push(role_var_update);
252 }
253
254 updates
255 }
256 }
257 }
258
259 pub(super) fn pack_role_members_update(
260 &self,
261 role_id: RoleId,
262 member_id: RoleId,
263 diff: Diff,
264 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
265 let grantor_id = self
266 .get_role(&member_id)
267 .membership
268 .map
269 .get(&role_id)
270 .expect("catalog out of sync");
271 BuiltinTableUpdate::row(
272 &*MZ_ROLE_MEMBERS,
273 Row::pack_slice(&[
274 Datum::String(&role_id.to_string()),
275 Datum::String(&member_id.to_string()),
276 Datum::String(&grantor_id.to_string()),
277 ]),
278 diff,
279 )
280 }
281
282 pub(super) fn pack_cluster_update(
283 &self,
284 name: &str,
285 diff: Diff,
286 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
287 let id = self.clusters_by_name[name];
288 let cluster = &self.clusters_by_id[&id];
289 let row = self.pack_privilege_array_row(cluster.privileges());
290 let privileges = row.unpack_first();
291 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
292 match &cluster.config.variant {
293 ClusterVariant::Managed(config) => (
294 Some(config.size.as_str()),
295 Some(self.cluster_replica_size_has_disk(&config.size)),
296 Some(config.replication_factor),
297 if config.availability_zones.is_empty() {
298 None
299 } else {
300 Some(config.availability_zones.clone())
301 },
302 Some(config.logging.log_logging),
303 config.logging.interval.map(|d| {
304 Interval::from_duration(&d)
305 .expect("planning ensured this convertible back to interval")
306 }),
307 ),
308 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
309 };
310
311 let mut row = Row::default();
312 let mut packer = row.packer();
313 packer.extend([
314 Datum::String(&id.to_string()),
315 Datum::String(name),
316 Datum::String(&cluster.owner_id.to_string()),
317 privileges,
318 cluster.is_managed().into(),
319 size.into(),
320 replication_factor.into(),
321 disk.into(),
322 ]);
323 if let Some(azs) = azs {
324 packer.push_list(azs.iter().map(|az| Datum::String(az)));
325 } else {
326 packer.push(Datum::Null);
327 }
328 packer.push(Datum::from(introspection_debugging));
329 packer.push(Datum::from(introspection_interval));
330
331 let mut updates = Vec::new();
332
333 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
334
335 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
336 let row = match managed_config.schedule {
337 ClusterSchedule::Manual => Row::pack_slice(&[
338 Datum::String(&id.to_string()),
339 Datum::String("manual"),
340 Datum::Null,
341 ]),
342 ClusterSchedule::Refresh {
343 hydration_time_estimate,
344 } => Row::pack_slice(&[
345 Datum::String(&id.to_string()),
346 Datum::String("on-refresh"),
347 Datum::Interval(
348 Interval::from_duration(&hydration_time_estimate)
349 .expect("planning ensured that this is convertible back to Interval"),
350 ),
351 ]),
352 };
353 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
354 }
355
356 updates.push(BuiltinTableUpdate::row(
357 &*MZ_CLUSTER_WORKLOAD_CLASSES,
358 Row::pack_slice(&[
359 Datum::String(&id.to_string()),
360 Datum::from(cluster.config.workload_class.as_deref()),
361 ]),
362 diff,
363 ));
364
365 updates
366 }
367
368 pub(super) fn pack_cluster_replica_update(
369 &self,
370 cluster_id: ClusterId,
371 name: &str,
372 diff: Diff,
373 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
374 let cluster = &self.clusters_by_id[&cluster_id];
375 let id = cluster.replica_id(name).expect("Must exist");
376 let replica = cluster.replica(id).expect("Must exist");
377
378 let (size, disk, az, internal, pending) = match &replica.config.location {
379 ReplicaLocation::Managed(ManagedReplicaLocation {
382 size,
383 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
384 allocation: _,
385 billed_as: _,
386 internal,
387 pending,
388 }) => (
389 Some(&**size),
390 Some(self.cluster_replica_size_has_disk(size)),
391 Some(az.as_str()),
392 *internal,
393 *pending,
394 ),
395 ReplicaLocation::Managed(ManagedReplicaLocation {
396 size,
397 availability_zones: _,
398 allocation: _,
399 billed_as: _,
400 internal,
401 pending,
402 }) => (
403 Some(&**size),
404 Some(self.cluster_replica_size_has_disk(size)),
405 None,
406 *internal,
407 *pending,
408 ),
409 _ => (None, None, None, false, false),
410 };
411
412 let cluster_replica_update = BuiltinTableUpdate::row(
413 &*MZ_CLUSTER_REPLICAS,
414 Row::pack_slice(&[
415 Datum::String(&id.to_string()),
416 Datum::String(name),
417 Datum::String(&cluster_id.to_string()),
418 Datum::from(size),
419 Datum::from(az),
420 Datum::String(&replica.owner_id.to_string()),
421 Datum::from(disk),
422 ]),
423 diff,
424 );
425
426 let mut updates = vec![cluster_replica_update];
427
428 if internal {
429 let update = BuiltinTableUpdate::row(
430 &*MZ_INTERNAL_CLUSTER_REPLICAS,
431 Row::pack_slice(&[Datum::String(&id.to_string())]),
432 diff,
433 );
434 updates.push(update);
435 }
436
437 if pending {
438 let update = BuiltinTableUpdate::row(
439 &*MZ_PENDING_CLUSTER_REPLICAS,
440 Row::pack_slice(&[Datum::String(&id.to_string())]),
441 diff,
442 );
443 updates.push(update);
444 }
445
446 updates
447 }
448
449 pub(crate) fn pack_network_policy_update(
450 &self,
451 policy_id: &NetworkPolicyId,
452 diff: Diff,
453 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
454 let policy = self.get_network_policy(policy_id);
455 let row = self.pack_privilege_array_row(&policy.privileges);
456 let privileges = row.unpack_first();
457 let mut updates = Vec::new();
458 for ref rule in policy.rules.clone() {
459 updates.push(BuiltinTableUpdate::row(
460 &*MZ_NETWORK_POLICY_RULES,
461 Row::pack_slice(&[
462 Datum::String(&rule.name),
463 Datum::String(&policy.id.to_string()),
464 Datum::String(&rule.action.to_string()),
465 Datum::String(&rule.address.to_string()),
466 Datum::String(&rule.direction.to_string()),
467 ]),
468 diff,
469 ));
470 }
471 updates.push(BuiltinTableUpdate::row(
472 &*MZ_NETWORK_POLICIES,
473 Row::pack_slice(&[
474 Datum::String(&policy.id.to_string()),
475 Datum::String(&policy.name),
476 Datum::String(&policy.owner_id.to_string()),
477 privileges,
478 Datum::UInt32(policy.oid.clone()),
479 ]),
480 diff,
481 ));
482
483 Ok(updates)
484 }
485
486 pub(super) fn pack_item_update(
487 &self,
488 id: CatalogItemId,
489 diff: Diff,
490 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
491 let entry = self.get_entry(&id);
492 let oid = entry.oid();
493 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
494 let schema_id = &self
495 .get_schema(
496 &entry.name().qualifiers.database_spec,
497 &entry.name().qualifiers.schema_spec,
498 conn_id,
499 )
500 .id;
501 let name = &entry.name().item;
502 let owner_id = entry.owner_id();
503 let privileges_row = self.pack_privilege_array_row(entry.privileges());
504 let privileges = privileges_row.unpack_first();
505 let mut updates = match entry.item() {
506 CatalogItem::Log(_) => self.pack_source_update(
507 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
508 privileges, diff, None,
509 ),
510 CatalogItem::Index(index) => {
511 self.pack_index_update(id, oid, name, owner_id, index, diff)
512 }
513 CatalogItem::Table(table) => {
514 let mut updates = self
515 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
516
517 if let TableDataSource::DataSource {
518 desc: data_source,
519 timeline: _,
520 } = &table.data_source
521 {
522 updates.extend(match data_source {
523 DataSourceDesc::IngestionExport {
524 ingestion_id,
525 external_reference: UnresolvedItemName(external_reference),
526 details: _,
527 data_config: _,
528 } => {
529 let ingestion_entry = self
530 .get_entry(ingestion_id)
531 .source_desc()
532 .expect("primary source exists")
533 .expect("primary source is a source");
534
535 match ingestion_entry.connection.name() {
536 "postgres" => {
537 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
538 let schema_name = external_reference[1].to_ast_string_simple();
544 let table_name = external_reference[2].to_ast_string_simple();
545
546 self.pack_postgres_source_tables_update(
547 id,
548 &schema_name,
549 &table_name,
550 diff,
551 )
552 }
553 "mysql" => {
554 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
555 let schema_name = external_reference[0].to_ast_string_simple();
556 let table_name = external_reference[1].to_ast_string_simple();
557
558 self.pack_mysql_source_tables_update(
559 id,
560 &schema_name,
561 &table_name,
562 diff,
563 )
564 }
565 "sql-server" => {
566 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
567 let schema_name = external_reference[1].to_ast_string_simple();
572 let table_name = external_reference[2].to_ast_string_simple();
573
574 self.pack_sql_server_source_table_update(
575 id,
576 &schema_name,
577 &table_name,
578 diff,
579 )
580 }
581 "load-generator" => vec![],
584 "kafka" => {
585 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
586 let topic = external_reference[0].to_ast_string_simple();
587 let envelope = data_source.envelope();
588 let (key_format, value_format) = data_source.formats();
589
590 self.pack_kafka_source_tables_update(
591 id,
592 &topic,
593 envelope,
594 key_format,
595 value_format,
596 diff,
597 )
598 }
599 s => unreachable!("{s} sources do not have tables"),
600 }
601 }
602 _ => vec![],
603 });
604 }
605
606 updates
607 }
608 CatalogItem::Source(source) => {
609 let source_type = source.source_type();
610 let connection_id = source.connection_id();
611 let envelope = source.data_source.envelope();
612 let cluster_entry = match source.data_source {
613 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
616 self.get_entry(&ingestion_id)
617 }
618 _ => entry,
619 };
620
621 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
622
623 let (key_format, value_format) = source.data_source.formats();
624
625 let mut updates = self.pack_source_update(
626 id,
627 oid,
628 schema_id,
629 name,
630 source_type,
631 connection_id,
632 envelope,
633 key_format,
634 value_format,
635 cluster_id.as_deref(),
636 owner_id,
637 privileges,
638 diff,
639 source.create_sql.as_ref(),
640 );
641
642 updates.extend(match &source.data_source {
643 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
644 match &ingestion_desc.desc.connection {
645 GenericSourceConnection::Postgres(postgres) => {
646 self.pack_postgres_source_update(id, postgres, diff)
647 }
648 GenericSourceConnection::Kafka(kafka) => {
649 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
650 }
651 _ => vec![],
652 }
653 }
654 DataSourceDesc::IngestionExport {
655 ingestion_id,
656 external_reference: UnresolvedItemName(external_reference),
657 details: _,
658 data_config: _,
659 } => {
660 let ingestion_entry = self
661 .get_entry(ingestion_id)
662 .source_desc()
663 .expect("primary source exists")
664 .expect("primary source is a source");
665
666 match ingestion_entry.connection.name() {
667 "postgres" => {
668 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
669 let schema_name = external_reference[1].to_ast_string_simple();
675 let table_name = external_reference[2].to_ast_string_simple();
676
677 self.pack_postgres_source_tables_update(
678 id,
679 &schema_name,
680 &table_name,
681 diff,
682 )
683 }
684 "mysql" => {
685 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
686 let schema_name = external_reference[0].to_ast_string_simple();
687 let table_name = external_reference[1].to_ast_string_simple();
688
689 self.pack_mysql_source_tables_update(
690 id,
691 &schema_name,
692 &table_name,
693 diff,
694 )
695 }
696 "sql-server" => {
697 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
698 let schema_name = external_reference[1].to_ast_string_simple();
703 let table_name = external_reference[2].to_ast_string_simple();
704
705 self.pack_sql_server_source_table_update(
706 id,
707 &schema_name,
708 &table_name,
709 diff,
710 )
711 }
712 "load-generator" => vec![],
715 s => unreachable!("{s} sources do not have subsources"),
716 }
717 }
718 DataSourceDesc::Webhook { .. } => {
719 vec![self.pack_webhook_source_update(id, diff)]
720 }
721 _ => vec![],
722 });
723
724 updates
725 }
726 CatalogItem::View(view) => {
727 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
728 }
729 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
730 id, oid, schema_id, name, owner_id, privileges, mview, diff,
731 ),
732 CatalogItem::Sink(sink) => {
733 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
734 }
735 CatalogItem::Type(ty) => {
736 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
737 }
738 CatalogItem::Func(func) => {
739 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
740 }
741 CatalogItem::Secret(_) => {
742 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
743 }
744 CatalogItem::Connection(connection) => self.pack_connection_update(
745 id, oid, schema_id, name, owner_id, privileges, connection, diff,
746 ),
747 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
748 id, oid, schema_id, name, owner_id, privileges, ct, diff,
749 ),
750 };
751
752 if !entry.item().is_temporary() {
753 for dependee in entry.item().references().items() {
756 updates.push(self.pack_depends_update(id, *dependee, diff))
757 }
758 }
759
760 let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
761 if let Ok(desc) = entry.desc_latest(&full_name) {
763 let defaults = match entry.item() {
764 CatalogItem::Table(Table {
765 data_source: TableDataSource::TableWrites { defaults },
766 ..
767 }) => Some(defaults),
768 _ => None,
769 };
770 for (i, (column_name, column_type)) in desc.iter().enumerate() {
771 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
772 let default: Datum = default
773 .as_ref()
774 .map(|d| Datum::String(d))
775 .unwrap_or(Datum::Null);
776 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
777 let (type_name, type_oid) = match &column_type.scalar_type {
778 SqlScalarType::List {
779 custom_id: Some(custom_id),
780 ..
781 }
782 | SqlScalarType::Map {
783 custom_id: Some(custom_id),
784 ..
785 }
786 | SqlScalarType::Record {
787 custom_id: Some(custom_id),
788 ..
789 } => {
790 let entry = self.get_entry(custom_id);
791 let name = &*entry.name().item;
806 let oid = entry.oid();
807 (name, oid)
808 }
809 _ => (pgtype.name(), pgtype.oid()),
810 };
811 updates.push(BuiltinTableUpdate::row(
812 &*MZ_COLUMNS,
813 Row::pack_slice(&[
814 Datum::String(&id.to_string()),
815 Datum::String(column_name),
816 Datum::UInt64(u64::cast_from(i + 1)),
817 Datum::from(column_type.nullable),
818 Datum::String(type_name),
819 default,
820 Datum::UInt32(type_oid),
821 Datum::Int32(pgtype.typmod()),
822 ]),
823 diff,
824 ));
825 }
826 }
827
828 if let Some(cw) = entry.item().initial_logical_compaction_window() {
830 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
831 }
832
833 updates
834 }
835
836 fn pack_history_retention_strategy_update(
837 &self,
838 id: CatalogItemId,
839 cw: CompactionWindow,
840 diff: Diff,
841 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
842 let cw: u64 = cw.comparable_timestamp().into();
843 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
844 .expect("must serialize");
845 BuiltinTableUpdate::row(
846 &*MZ_HISTORY_RETENTION_STRATEGIES,
847 Row::pack_slice(&[
848 Datum::String(&id.to_string()),
849 Datum::String("FOR"),
851 cw.into_row().into_element(),
852 ]),
853 diff,
854 )
855 }
856
857 fn pack_table_update(
858 &self,
859 id: CatalogItemId,
860 oid: u32,
861 schema_id: &SchemaSpecifier,
862 name: &str,
863 owner_id: &RoleId,
864 privileges: Datum,
865 diff: Diff,
866 table: &Table,
867 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
868 let redacted = table.create_sql.as_ref().map(|create_sql| {
869 mz_sql::parse::parse(create_sql)
870 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
871 .into_element()
872 .ast
873 .to_ast_string_redacted()
874 });
875 let source_id = if let TableDataSource::DataSource {
876 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
877 ..
878 } = &table.data_source
879 {
880 Some(ingestion_id.to_string())
881 } else {
882 None
883 };
884
885 vec![BuiltinTableUpdate::row(
886 &*MZ_TABLES,
887 Row::pack_slice(&[
888 Datum::String(&id.to_string()),
889 Datum::UInt32(oid),
890 Datum::String(&schema_id.to_string()),
891 Datum::String(name),
892 Datum::String(&owner_id.to_string()),
893 privileges,
894 if let Some(create_sql) = &table.create_sql {
895 Datum::String(create_sql)
896 } else {
897 Datum::Null
898 },
899 if let Some(redacted) = &redacted {
900 Datum::String(redacted)
901 } else {
902 Datum::Null
903 },
904 if let Some(source_id) = source_id.as_ref() {
905 Datum::String(source_id)
906 } else {
907 Datum::Null
908 },
909 ]),
910 diff,
911 )]
912 }
913
914 fn pack_source_update(
915 &self,
916 id: CatalogItemId,
917 oid: u32,
918 schema_id: &SchemaSpecifier,
919 name: &str,
920 source_desc_name: &str,
921 connection_id: Option<CatalogItemId>,
922 envelope: Option<&str>,
923 key_format: Option<&str>,
924 value_format: Option<&str>,
925 cluster_id: Option<&str>,
926 owner_id: &RoleId,
927 privileges: Datum,
928 diff: Diff,
929 create_sql: Option<&String>,
930 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
931 let redacted = create_sql.map(|create_sql| {
932 let create_stmt = mz_sql::parse::parse(create_sql)
933 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
934 .into_element()
935 .ast;
936 create_stmt.to_ast_string_redacted()
937 });
938 vec![BuiltinTableUpdate::row(
939 &*MZ_SOURCES,
940 Row::pack_slice(&[
941 Datum::String(&id.to_string()),
942 Datum::UInt32(oid),
943 Datum::String(&schema_id.to_string()),
944 Datum::String(name),
945 Datum::String(source_desc_name),
946 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
947 Datum::Null,
950 Datum::from(envelope),
951 Datum::from(key_format),
952 Datum::from(value_format),
953 Datum::from(cluster_id),
954 Datum::String(&owner_id.to_string()),
955 privileges,
956 if let Some(create_sql) = create_sql {
957 Datum::String(create_sql)
958 } else {
959 Datum::Null
960 },
961 if let Some(redacted) = &redacted {
962 Datum::String(redacted)
963 } else {
964 Datum::Null
965 },
966 ]),
967 diff,
968 )]
969 }
970
971 fn pack_postgres_source_update(
972 &self,
973 id: CatalogItemId,
974 postgres: &PostgresSourceConnection<ReferencedConnection>,
975 diff: Diff,
976 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
977 vec![BuiltinTableUpdate::row(
978 &*MZ_POSTGRES_SOURCES,
979 Row::pack_slice(&[
980 Datum::String(&id.to_string()),
981 Datum::String(&postgres.publication_details.slot),
982 Datum::from(postgres.publication_details.timeline_id),
983 ]),
984 diff,
985 )]
986 }
987
988 fn pack_kafka_source_update(
989 &self,
990 item_id: CatalogItemId,
991 collection_id: GlobalId,
992 kafka: &KafkaSourceConnection<ReferencedConnection>,
993 diff: Diff,
994 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
995 vec![BuiltinTableUpdate::row(
996 &*MZ_KAFKA_SOURCES,
997 Row::pack_slice(&[
998 Datum::String(&item_id.to_string()),
999 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1000 Datum::String(&kafka.topic),
1001 ]),
1002 diff,
1003 )]
1004 }
1005
1006 fn pack_postgres_source_tables_update(
1007 &self,
1008 id: CatalogItemId,
1009 schema_name: &str,
1010 table_name: &str,
1011 diff: Diff,
1012 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1013 vec![BuiltinTableUpdate::row(
1014 &*MZ_POSTGRES_SOURCE_TABLES,
1015 Row::pack_slice(&[
1016 Datum::String(&id.to_string()),
1017 Datum::String(schema_name),
1018 Datum::String(table_name),
1019 ]),
1020 diff,
1021 )]
1022 }
1023
1024 fn pack_mysql_source_tables_update(
1025 &self,
1026 id: CatalogItemId,
1027 schema_name: &str,
1028 table_name: &str,
1029 diff: Diff,
1030 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1031 vec![BuiltinTableUpdate::row(
1032 &*MZ_MYSQL_SOURCE_TABLES,
1033 Row::pack_slice(&[
1034 Datum::String(&id.to_string()),
1035 Datum::String(schema_name),
1036 Datum::String(table_name),
1037 ]),
1038 diff,
1039 )]
1040 }
1041
1042 fn pack_sql_server_source_table_update(
1043 &self,
1044 id: CatalogItemId,
1045 schema_name: &str,
1046 table_name: &str,
1047 diff: Diff,
1048 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1049 vec![BuiltinTableUpdate::row(
1050 &*MZ_SQL_SERVER_SOURCE_TABLES,
1051 Row::pack_slice(&[
1052 Datum::String(&id.to_string()),
1053 Datum::String(schema_name),
1054 Datum::String(table_name),
1055 ]),
1056 diff,
1057 )]
1058 }
1059
1060 fn pack_kafka_source_tables_update(
1061 &self,
1062 id: CatalogItemId,
1063 topic: &str,
1064 envelope: Option<&str>,
1065 key_format: Option<&str>,
1066 value_format: Option<&str>,
1067 diff: Diff,
1068 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1069 vec![BuiltinTableUpdate::row(
1070 &*MZ_KAFKA_SOURCE_TABLES,
1071 Row::pack_slice(&[
1072 Datum::String(&id.to_string()),
1073 Datum::String(topic),
1074 Datum::from(envelope),
1075 Datum::from(key_format),
1076 Datum::from(value_format),
1077 ]),
1078 diff,
1079 )]
1080 }
1081
1082 fn pack_connection_update(
1083 &self,
1084 id: CatalogItemId,
1085 oid: u32,
1086 schema_id: &SchemaSpecifier,
1087 name: &str,
1088 owner_id: &RoleId,
1089 privileges: Datum,
1090 connection: &Connection,
1091 diff: Diff,
1092 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1093 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1094 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1095 .into_element()
1096 .ast;
1097 let mut updates = vec![BuiltinTableUpdate::row(
1098 &*MZ_CONNECTIONS,
1099 Row::pack_slice(&[
1100 Datum::String(&id.to_string()),
1101 Datum::UInt32(oid),
1102 Datum::String(&schema_id.to_string()),
1103 Datum::String(name),
1104 Datum::String(match connection.details {
1105 ConnectionDetails::Kafka { .. } => "kafka",
1106 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1107 ConnectionDetails::Postgres { .. } => "postgres",
1108 ConnectionDetails::Aws(..) => "aws",
1109 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1110 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1111 ConnectionDetails::MySql { .. } => "mysql",
1112 ConnectionDetails::SqlServer(_) => "sql-server",
1113 ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1114 }),
1115 Datum::String(&owner_id.to_string()),
1116 privileges,
1117 Datum::String(&connection.create_sql),
1118 Datum::String(&create_stmt.to_ast_string_redacted()),
1119 ]),
1120 diff,
1121 )];
1122 match connection.details {
1123 ConnectionDetails::Kafka(ref kafka) => {
1124 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1125 }
1126 ConnectionDetails::Aws(ref aws_config) => {
1127 match self.pack_aws_connection_update(id, aws_config, diff) {
1128 Ok(update) => {
1129 updates.push(update);
1130 }
1131 Err(e) => {
1132 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1133 }
1134 }
1135 }
1136 ConnectionDetails::AwsPrivatelink(_) => {
1137 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1138 updates.push(self.pack_aws_privatelink_connection_update(
1139 id,
1140 aws_principal_context,
1141 diff,
1142 ));
1143 } else {
1144 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1145 }
1146 }
1147 ConnectionDetails::Ssh {
1148 ref key_1,
1149 ref key_2,
1150 ..
1151 } => {
1152 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1153 }
1154 ConnectionDetails::Csr(_)
1155 | ConnectionDetails::Postgres(_)
1156 | ConnectionDetails::MySql(_)
1157 | ConnectionDetails::SqlServer(_)
1158 | ConnectionDetails::IcebergCatalog(_) => (),
1159 };
1160 updates
1161 }
1162
1163 pub(crate) fn pack_ssh_tunnel_connection_update(
1164 &self,
1165 id: CatalogItemId,
1166 key_1: &SshKey,
1167 key_2: &SshKey,
1168 diff: Diff,
1169 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1170 BuiltinTableUpdate::row(
1171 &*MZ_SSH_TUNNEL_CONNECTIONS,
1172 Row::pack_slice(&[
1173 Datum::String(&id.to_string()),
1174 Datum::String(key_1.public_key().as_str()),
1175 Datum::String(key_2.public_key().as_str()),
1176 ]),
1177 diff,
1178 )
1179 }
1180
1181 fn pack_kafka_connection_update(
1182 &self,
1183 id: CatalogItemId,
1184 kafka: &KafkaConnection<ReferencedConnection>,
1185 diff: Diff,
1186 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1187 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1188 let mut row = Row::default();
1189 row.packer()
1190 .try_push_array(
1191 &[ArrayDimension {
1192 lower_bound: 1,
1193 length: kafka.brokers.len(),
1194 }],
1195 kafka
1196 .brokers
1197 .iter()
1198 .map(|broker| Datum::String(&broker.address)),
1199 )
1200 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1201 let brokers = row.unpack_first();
1202 vec![BuiltinTableUpdate::row(
1203 &*MZ_KAFKA_CONNECTIONS,
1204 Row::pack_slice(&[
1205 Datum::String(&id.to_string()),
1206 brokers,
1207 Datum::String(&progress_topic),
1208 ]),
1209 diff,
1210 )]
1211 }
1212
1213 pub fn pack_aws_privatelink_connection_update(
1214 &self,
1215 connection_id: CatalogItemId,
1216 aws_principal_context: &AwsPrincipalContext,
1217 diff: Diff,
1218 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1219 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1220 let row = Row::pack_slice(&[
1221 Datum::String(&connection_id.to_string()),
1222 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1223 ]);
1224 BuiltinTableUpdate::row(id, row, diff)
1225 }
1226
1227 pub fn pack_aws_connection_update(
1228 &self,
1229 connection_id: CatalogItemId,
1230 aws_config: &AwsConnection,
1231 diff: Diff,
1232 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1233 let id = &MZ_AWS_CONNECTIONS;
1234
1235 let mut access_key_id = None;
1236 let mut access_key_id_secret_id = None;
1237 let mut secret_access_key_secret_id = None;
1238 let mut session_token = None;
1239 let mut session_token_secret_id = None;
1240 let mut assume_role_arn = None;
1241 let mut assume_role_session_name = None;
1242 let mut principal = None;
1243 let mut external_id = None;
1244 let mut example_trust_policy = None;
1245 match &aws_config.auth {
1246 AwsAuth::Credentials(credentials) => {
1247 match &credentials.access_key_id {
1248 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1249 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1250 }
1251 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1252 match credentials.session_token.as_ref() {
1253 None => (),
1254 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1255 Some(StringOrSecret::Secret(s)) => {
1256 session_token_secret_id = Some(s.to_string())
1257 }
1258 }
1259 }
1260 AwsAuth::AssumeRole(assume_role) => {
1261 assume_role_arn = Some(assume_role.arn.as_str());
1262 assume_role_session_name = assume_role.session_name.as_deref();
1263 principal = self
1264 .config
1265 .connection_context
1266 .aws_connection_role_arn
1267 .as_deref();
1268 external_id =
1269 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1270 example_trust_policy = {
1271 let policy = assume_role
1272 .example_trust_policy(&self.config.connection_context, connection_id)?;
1273 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1274 Some(policy.into_row())
1275 };
1276 }
1277 }
1278
1279 let row = Row::pack_slice(&[
1280 Datum::String(&connection_id.to_string()),
1281 Datum::from(aws_config.endpoint.as_deref()),
1282 Datum::from(aws_config.region.as_deref()),
1283 Datum::from(access_key_id),
1284 Datum::from(access_key_id_secret_id.as_deref()),
1285 Datum::from(secret_access_key_secret_id.as_deref()),
1286 Datum::from(session_token),
1287 Datum::from(session_token_secret_id.as_deref()),
1288 Datum::from(assume_role_arn),
1289 Datum::from(assume_role_session_name),
1290 Datum::from(principal),
1291 Datum::from(external_id.as_deref()),
1292 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1293 ]);
1294
1295 Ok(BuiltinTableUpdate::row(id, row, diff))
1296 }
1297
1298 fn pack_view_update(
1299 &self,
1300 id: CatalogItemId,
1301 oid: u32,
1302 schema_id: &SchemaSpecifier,
1303 name: &str,
1304 owner_id: &RoleId,
1305 privileges: Datum,
1306 view: &View,
1307 diff: Diff,
1308 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1309 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1310 .unwrap_or_else(|e| {
1311 panic!(
1312 "create_sql cannot be invalid: `{}` --- error: `{}`",
1313 view.create_sql, e
1314 )
1315 })
1316 .into_element()
1317 .ast;
1318 let query = match &create_stmt {
1319 Statement::CreateView(stmt) => &stmt.definition.query,
1320 _ => unreachable!(),
1321 };
1322
1323 let mut query_string = query.to_ast_string_stable();
1324 query_string.push(';');
1327
1328 vec![BuiltinTableUpdate::row(
1329 &*MZ_VIEWS,
1330 Row::pack_slice(&[
1331 Datum::String(&id.to_string()),
1332 Datum::UInt32(oid),
1333 Datum::String(&schema_id.to_string()),
1334 Datum::String(name),
1335 Datum::String(&query_string),
1336 Datum::String(&owner_id.to_string()),
1337 privileges,
1338 Datum::String(&view.create_sql),
1339 Datum::String(&create_stmt.to_ast_string_redacted()),
1340 ]),
1341 diff,
1342 )]
1343 }
1344
1345 fn pack_materialized_view_update(
1346 &self,
1347 id: CatalogItemId,
1348 oid: u32,
1349 schema_id: &SchemaSpecifier,
1350 name: &str,
1351 owner_id: &RoleId,
1352 privileges: Datum,
1353 mview: &MaterializedView,
1354 diff: Diff,
1355 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1356 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1357 .unwrap_or_else(|e| {
1358 panic!(
1359 "create_sql cannot be invalid: `{}` --- error: `{}`",
1360 mview.create_sql, e
1361 )
1362 })
1363 .into_element()
1364 .ast;
1365 let query_string = match &create_stmt {
1366 Statement::CreateMaterializedView(stmt) => {
1367 let mut query_string = stmt.query.to_ast_string_stable();
1368 query_string.push(';');
1371 query_string
1372 }
1373 _ => unreachable!(),
1374 };
1375
1376 let mut updates = Vec::new();
1377
1378 updates.push(BuiltinTableUpdate::row(
1379 &*MZ_MATERIALIZED_VIEWS,
1380 Row::pack_slice(&[
1381 Datum::String(&id.to_string()),
1382 Datum::UInt32(oid),
1383 Datum::String(&schema_id.to_string()),
1384 Datum::String(name),
1385 Datum::String(&mview.cluster_id.to_string()),
1386 Datum::String(&query_string),
1387 Datum::String(&owner_id.to_string()),
1388 privileges,
1389 Datum::String(&mview.create_sql),
1390 Datum::String(&create_stmt.to_ast_string_redacted()),
1391 ]),
1392 diff,
1393 ));
1394
1395 if let Some(refresh_schedule) = &mview.refresh_schedule {
1396 assert!(!refresh_schedule.is_empty());
1399 for RefreshEvery {
1400 interval,
1401 aligned_to,
1402 } in refresh_schedule.everies.iter()
1403 {
1404 let aligned_to_dt = mz_ore::now::to_datetime(
1405 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1406 );
1407 updates.push(BuiltinTableUpdate::row(
1408 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1409 Row::pack_slice(&[
1410 Datum::String(&id.to_string()),
1411 Datum::String("every"),
1412 Datum::Interval(
1413 Interval::from_duration(interval).expect(
1414 "planning ensured that this is convertible back to Interval",
1415 ),
1416 ),
1417 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1418 Datum::Null,
1419 ]),
1420 diff,
1421 ));
1422 }
1423 for at in refresh_schedule.ats.iter() {
1424 let at_dt = mz_ore::now::to_datetime(
1425 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1426 );
1427 updates.push(BuiltinTableUpdate::row(
1428 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1429 Row::pack_slice(&[
1430 Datum::String(&id.to_string()),
1431 Datum::String("at"),
1432 Datum::Null,
1433 Datum::Null,
1434 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1435 ]),
1436 diff,
1437 ));
1438 }
1439 } else {
1440 updates.push(BuiltinTableUpdate::row(
1441 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1442 Row::pack_slice(&[
1443 Datum::String(&id.to_string()),
1444 Datum::String("on-commit"),
1445 Datum::Null,
1446 Datum::Null,
1447 Datum::Null,
1448 ]),
1449 diff,
1450 ));
1451 }
1452
1453 updates
1454 }
1455
1456 fn pack_continual_task_update(
1457 &self,
1458 id: CatalogItemId,
1459 oid: u32,
1460 schema_id: &SchemaSpecifier,
1461 name: &str,
1462 owner_id: &RoleId,
1463 privileges: Datum,
1464 ct: &ContinualTask,
1465 diff: Diff,
1466 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1467 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1468 .unwrap_or_else(|e| {
1469 panic!(
1470 "create_sql cannot be invalid: `{}` --- error: `{}`",
1471 ct.create_sql, e
1472 )
1473 })
1474 .into_element()
1475 .ast;
1476 let query_string = match &create_stmt {
1477 Statement::CreateContinualTask(stmt) => {
1478 let mut query_string = String::new();
1479 for stmt in &stmt.stmts {
1480 let s = match stmt {
1481 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1482 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1483 };
1484 if query_string.is_empty() {
1485 query_string = s;
1486 } else {
1487 query_string.push_str("; ");
1488 query_string.push_str(&s);
1489 }
1490 }
1491 query_string
1492 }
1493 _ => unreachable!(),
1494 };
1495
1496 vec![BuiltinTableUpdate::row(
1497 &*MZ_CONTINUAL_TASKS,
1498 Row::pack_slice(&[
1499 Datum::String(&id.to_string()),
1500 Datum::UInt32(oid),
1501 Datum::String(&schema_id.to_string()),
1502 Datum::String(name),
1503 Datum::String(&ct.cluster_id.to_string()),
1504 Datum::String(&query_string),
1505 Datum::String(&owner_id.to_string()),
1506 privileges,
1507 Datum::String(&ct.create_sql),
1508 Datum::String(&create_stmt.to_ast_string_redacted()),
1509 ]),
1510 diff,
1511 )]
1512 }
1513
1514 fn pack_sink_update(
1515 &self,
1516 id: CatalogItemId,
1517 oid: u32,
1518 schema_id: &SchemaSpecifier,
1519 name: &str,
1520 owner_id: &RoleId,
1521 sink: &Sink,
1522 diff: Diff,
1523 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1524 let mut updates = vec![];
1525 match &sink.connection {
1526 StorageSinkConnection::Kafka(KafkaSinkConnection {
1527 topic: topic_name, ..
1528 }) => {
1529 updates.push(BuiltinTableUpdate::row(
1530 &*MZ_KAFKA_SINKS,
1531 Row::pack_slice(&[
1532 Datum::String(&id.to_string()),
1533 Datum::String(topic_name.as_str()),
1534 ]),
1535 diff,
1536 ));
1537 }
1538 };
1539
1540 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1541 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1542 .into_element()
1543 .ast;
1544
1545 let envelope = sink.envelope();
1546
1547 let combined_format = sink.combined_format();
1549 let (key_format, value_format) = sink.formats();
1550
1551 updates.push(BuiltinTableUpdate::row(
1552 &*MZ_SINKS,
1553 Row::pack_slice(&[
1554 Datum::String(&id.to_string()),
1555 Datum::UInt32(oid),
1556 Datum::String(&schema_id.to_string()),
1557 Datum::String(name),
1558 Datum::String(sink.connection.name()),
1559 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1560 Datum::Null,
1562 Datum::from(envelope),
1563 Datum::from(combined_format.as_ref()),
1564 Datum::from(key_format),
1565 Datum::String(value_format),
1566 Datum::String(&sink.cluster_id.to_string()),
1567 Datum::String(&owner_id.to_string()),
1568 Datum::String(&sink.create_sql),
1569 Datum::String(&create_stmt.to_ast_string_redacted()),
1570 ]),
1571 diff,
1572 ));
1573
1574 updates
1575 }
1576
1577 fn pack_index_update(
1578 &self,
1579 id: CatalogItemId,
1580 oid: u32,
1581 name: &str,
1582 owner_id: &RoleId,
1583 index: &Index,
1584 diff: Diff,
1585 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1586 let mut updates = vec![];
1587
1588 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1589 .unwrap_or_else(|e| {
1590 panic!(
1591 "create_sql cannot be invalid: `{}` --- error: `{}`",
1592 index.create_sql, e
1593 )
1594 })
1595 .into_element()
1596 .ast;
1597
1598 let key_sqls = match &create_stmt {
1599 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1600 .as_ref()
1601 .expect("key_parts is filled in during planning"),
1602 _ => unreachable!(),
1603 };
1604 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1605
1606 updates.push(BuiltinTableUpdate::row(
1607 &*MZ_INDEXES,
1608 Row::pack_slice(&[
1609 Datum::String(&id.to_string()),
1610 Datum::UInt32(oid),
1611 Datum::String(name),
1612 Datum::String(&on_item_id.to_string()),
1613 Datum::String(&index.cluster_id.to_string()),
1614 Datum::String(&owner_id.to_string()),
1615 Datum::String(&index.create_sql),
1616 Datum::String(&create_stmt.to_ast_string_redacted()),
1617 ]),
1618 diff,
1619 ));
1620
1621 for (i, key) in index.keys.iter().enumerate() {
1622 let on_entry = self.get_entry_by_global_id(&index.on);
1623 let nullable = key
1624 .typ(
1625 &on_entry
1626 .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1627 .expect("can only create indexes on items with a valid description")
1628 .typ()
1629 .column_types,
1630 )
1631 .nullable;
1632 let seq_in_index = u64::cast_from(i + 1);
1633 let key_sql = key_sqls
1634 .get(i)
1635 .expect("missing sql information for index key")
1636 .to_ast_string_simple();
1637 let (field_number, expression) = match key {
1638 MirScalarExpr::Column(col, _) => {
1639 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1640 }
1641 _ => (Datum::Null, Datum::String(&key_sql)),
1642 };
1643 updates.push(BuiltinTableUpdate::row(
1644 &*MZ_INDEX_COLUMNS,
1645 Row::pack_slice(&[
1646 Datum::String(&id.to_string()),
1647 Datum::UInt64(seq_in_index),
1648 field_number,
1649 expression,
1650 Datum::from(nullable),
1651 ]),
1652 diff,
1653 ));
1654 }
1655
1656 updates
1657 }
1658
1659 fn pack_type_update(
1660 &self,
1661 id: CatalogItemId,
1662 oid: u32,
1663 schema_id: &SchemaSpecifier,
1664 name: &str,
1665 owner_id: &RoleId,
1666 privileges: Datum,
1667 typ: &Type,
1668 diff: Diff,
1669 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1670 let mut out = vec![];
1671
1672 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1673 mz_sql::parse::parse(create_sql)
1674 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1675 .into_element()
1676 .ast
1677 .to_ast_string_redacted()
1678 });
1679
1680 out.push(BuiltinTableUpdate::row(
1681 &*MZ_TYPES,
1682 Row::pack_slice(&[
1683 Datum::String(&id.to_string()),
1684 Datum::UInt32(oid),
1685 Datum::String(&schema_id.to_string()),
1686 Datum::String(name),
1687 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1688 Datum::String(&owner_id.to_string()),
1689 privileges,
1690 if let Some(create_sql) = &typ.create_sql {
1691 Datum::String(create_sql)
1692 } else {
1693 Datum::Null
1694 },
1695 if let Some(redacted) = &redacted {
1696 Datum::String(redacted)
1697 } else {
1698 Datum::Null
1699 },
1700 ]),
1701 diff,
1702 ));
1703
1704 let mut row = Row::default();
1705 let mut packer = row.packer();
1706
1707 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1708 if mods.is_empty() {
1709 packer.push(Datum::Null);
1710 } else {
1711 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1712 }
1713 }
1714
1715 let index_id = match &typ.details.typ {
1716 CatalogType::Array {
1717 element_reference: element_id,
1718 } => {
1719 packer.push(Datum::String(&id.to_string()));
1720 packer.push(Datum::String(&element_id.to_string()));
1721 &MZ_ARRAY_TYPES
1722 }
1723 CatalogType::List {
1724 element_reference: element_id,
1725 element_modifiers,
1726 } => {
1727 packer.push(Datum::String(&id.to_string()));
1728 packer.push(Datum::String(&element_id.to_string()));
1729 append_modifier(&mut packer, element_modifiers);
1730 &MZ_LIST_TYPES
1731 }
1732 CatalogType::Map {
1733 key_reference: key_id,
1734 value_reference: value_id,
1735 key_modifiers,
1736 value_modifiers,
1737 } => {
1738 packer.push(Datum::String(&id.to_string()));
1739 packer.push(Datum::String(&key_id.to_string()));
1740 packer.push(Datum::String(&value_id.to_string()));
1741 append_modifier(&mut packer, key_modifiers);
1742 append_modifier(&mut packer, value_modifiers);
1743 &MZ_MAP_TYPES
1744 }
1745 CatalogType::Pseudo => {
1746 packer.push(Datum::String(&id.to_string()));
1747 &MZ_PSEUDO_TYPES
1748 }
1749 _ => {
1750 packer.push(Datum::String(&id.to_string()));
1751 &MZ_BASE_TYPES
1752 }
1753 };
1754 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1755
1756 if let Some(pg_metadata) = &typ.details.pg_metadata {
1757 out.push(BuiltinTableUpdate::row(
1758 &*MZ_TYPE_PG_METADATA,
1759 Row::pack_slice(&[
1760 Datum::String(&id.to_string()),
1761 Datum::UInt32(pg_metadata.typinput_oid),
1762 Datum::UInt32(pg_metadata.typreceive_oid),
1763 ]),
1764 diff,
1765 ));
1766 }
1767
1768 out
1769 }
1770
1771 fn pack_func_update(
1772 &self,
1773 id: CatalogItemId,
1774 schema_id: &SchemaSpecifier,
1775 name: &str,
1776 owner_id: &RoleId,
1777 func: &Func,
1778 diff: Diff,
1779 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1780 let mut updates = vec![];
1781 for func_impl_details in func.inner.func_impls() {
1782 let arg_type_ids = func_impl_details
1783 .arg_typs
1784 .iter()
1785 .map(|typ| self.get_system_type(typ).id().to_string())
1786 .collect::<Vec<_>>();
1787
1788 let mut row = Row::default();
1789 row.packer()
1790 .try_push_array(
1791 &[ArrayDimension {
1792 lower_bound: 1,
1793 length: arg_type_ids.len(),
1794 }],
1795 arg_type_ids.iter().map(|id| Datum::String(id)),
1796 )
1797 .expect(
1798 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1799 );
1800 let arg_type_ids = row.unpack_first();
1801
1802 updates.push(BuiltinTableUpdate::row(
1803 &*MZ_FUNCTIONS,
1804 Row::pack_slice(&[
1805 Datum::String(&id.to_string()),
1806 Datum::UInt32(func_impl_details.oid),
1807 Datum::String(&schema_id.to_string()),
1808 Datum::String(name),
1809 arg_type_ids,
1810 Datum::from(
1811 func_impl_details
1812 .variadic_typ
1813 .map(|typ| self.get_system_type(typ).id().to_string())
1814 .as_deref(),
1815 ),
1816 Datum::from(
1817 func_impl_details
1818 .return_typ
1819 .map(|typ| self.get_system_type(typ).id().to_string())
1820 .as_deref(),
1821 ),
1822 func_impl_details.return_is_set.into(),
1823 Datum::String(&owner_id.to_string()),
1824 ]),
1825 diff,
1826 ));
1827
1828 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1829 updates.push(BuiltinTableUpdate::row(
1830 &*MZ_AGGREGATES,
1831 Row::pack_slice(&[
1832 Datum::UInt32(func_impl_details.oid),
1833 Datum::String("n"),
1835 Datum::Int16(0),
1836 ]),
1837 diff,
1838 ));
1839 }
1840 }
1841 updates
1842 }
1843
1844 pub fn pack_op_update(
1845 &self,
1846 operator: &str,
1847 func_impl_details: FuncImplCatalogDetails,
1848 diff: Diff,
1849 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1850 let arg_type_ids = func_impl_details
1851 .arg_typs
1852 .iter()
1853 .map(|typ| self.get_system_type(typ).id().to_string())
1854 .collect::<Vec<_>>();
1855
1856 let mut row = Row::default();
1857 row.packer()
1858 .try_push_array(
1859 &[ArrayDimension {
1860 lower_bound: 1,
1861 length: arg_type_ids.len(),
1862 }],
1863 arg_type_ids.iter().map(|id| Datum::String(id)),
1864 )
1865 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1866 let arg_type_ids = row.unpack_first();
1867
1868 BuiltinTableUpdate::row(
1869 &*MZ_OPERATORS,
1870 Row::pack_slice(&[
1871 Datum::UInt32(func_impl_details.oid),
1872 Datum::String(operator),
1873 arg_type_ids,
1874 Datum::from(
1875 func_impl_details
1876 .return_typ
1877 .map(|typ| self.get_system_type(typ).id().to_string())
1878 .as_deref(),
1879 ),
1880 ]),
1881 diff,
1882 )
1883 }
1884
1885 fn pack_secret_update(
1886 &self,
1887 id: CatalogItemId,
1888 oid: u32,
1889 schema_id: &SchemaSpecifier,
1890 name: &str,
1891 owner_id: &RoleId,
1892 privileges: Datum,
1893 diff: Diff,
1894 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1895 vec![BuiltinTableUpdate::row(
1896 &*MZ_SECRETS,
1897 Row::pack_slice(&[
1898 Datum::String(&id.to_string()),
1899 Datum::UInt32(oid),
1900 Datum::String(&schema_id.to_string()),
1901 Datum::String(name),
1902 Datum::String(&owner_id.to_string()),
1903 privileges,
1904 ]),
1905 diff,
1906 )]
1907 }
1908
1909 pub fn pack_audit_log_update(
1910 &self,
1911 event: &VersionedEvent,
1912 diff: Diff,
1913 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1914 let (event_type, object_type, details, user, occurred_at): (
1915 &EventType,
1916 &ObjectType,
1917 &EventDetails,
1918 &Option<String>,
1919 u64,
1920 ) = match event {
1921 VersionedEvent::V1(ev) => (
1922 &ev.event_type,
1923 &ev.object_type,
1924 &ev.details,
1925 &ev.user,
1926 ev.occurred_at,
1927 ),
1928 };
1929 let details = Jsonb::from_serde_json(details.as_json())
1930 .map_err(|e| {
1931 Error::new(ErrorKind::Unstructured(format!(
1932 "could not pack audit log update: {}",
1933 e
1934 )))
1935 })?
1936 .into_row();
1937 let details = details
1938 .iter()
1939 .next()
1940 .expect("details created above with a single jsonb column");
1941 let dt = mz_ore::now::to_datetime(occurred_at);
1942 let id = event.sortable_id();
1943 Ok(BuiltinTableUpdate::row(
1944 &*MZ_AUDIT_EVENTS,
1945 Row::pack_slice(&[
1946 Datum::UInt64(id),
1947 Datum::String(&format!("{}", event_type)),
1948 Datum::String(&format!("{}", object_type)),
1949 details,
1950 match user {
1951 Some(user) => Datum::String(user),
1952 None => Datum::Null,
1953 },
1954 Datum::TimestampTz(dt.try_into().expect("must fit")),
1955 ]),
1956 diff,
1957 ))
1958 }
1959
1960 pub fn pack_storage_usage_update(
1961 &self,
1962 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1963 diff: Diff,
1964 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1965 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1966 let row = Row::pack_slice(&[
1967 Datum::UInt64(event.id),
1968 Datum::from(event.shard_id.as_deref()),
1969 Datum::UInt64(event.size_bytes),
1970 Datum::TimestampTz(
1971 mz_ore::now::to_datetime(event.collection_timestamp)
1972 .try_into()
1973 .expect("must fit"),
1974 ),
1975 ]);
1976 BuiltinTableUpdate::row(id, row, diff)
1977 }
1978
1979 pub fn pack_egress_ip_update(
1980 &self,
1981 ip: &IpNet,
1982 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1983 let id = &MZ_EGRESS_IPS;
1984 let addr = ip.addr();
1985 let row = Row::pack_slice(&[
1986 Datum::String(&addr.to_string()),
1987 Datum::Int32(ip.prefix_len().into()),
1988 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1989 ]);
1990 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1991 }
1992
1993 pub fn pack_license_key_update(
1994 &self,
1995 license_key: &ValidatedLicenseKey,
1996 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1997 let id = &MZ_LICENSE_KEYS;
1998 let row = Row::pack_slice(&[
1999 Datum::String(&license_key.id),
2000 Datum::String(&license_key.organization),
2001 Datum::String(&license_key.environment_id),
2002 Datum::TimestampTz(
2003 mz_ore::now::to_datetime(license_key.expiration * 1000)
2004 .try_into()
2005 .expect("must fit"),
2006 ),
2007 Datum::TimestampTz(
2008 mz_ore::now::to_datetime(license_key.not_before * 1000)
2009 .try_into()
2010 .expect("must fit"),
2011 ),
2012 ]);
2013 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2014 }
2015
2016 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2017 let mut updates = Vec::new();
2018 for (size, alloc) in &self.cluster_replica_sizes.0 {
2019 if alloc.disabled {
2020 continue;
2021 }
2022
2023 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
2026 let MemoryLimit(ByteSize(memory_bytes)) =
2027 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
2028 let DiskLimit(ByteSize(disk_bytes)) =
2029 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2030
2031 let mut disk_limit = disk_bytes;
2032
2033 let dyncfg = self.system_config().dyncfgs();
2036 if alloc.swap_enabled && MEMORY_LIMITER_INTERVAL.get(dyncfg) != Duration::ZERO {
2037 let swap_memory_limit = f64::cast_lossy(memory_bytes)
2038 * MEMORY_LIMITER_USAGE_FACTOR.get(dyncfg)
2039 * MEMORY_LIMITER_USAGE_BIAS.get(dyncfg);
2040 let swap_memory_limit = u64::cast_lossy(swap_memory_limit);
2041 let swap_disk_limit = swap_memory_limit.saturating_sub(memory_bytes);
2042 disk_limit = cmp::min(swap_disk_limit, disk_limit);
2043 }
2044
2045 let row = Row::pack_slice(&[
2046 size.as_str().into(),
2047 u64::from(alloc.scale).into(),
2048 u64::cast_from(alloc.workers).into(),
2049 cpu_limit.as_nanocpus().into(),
2050 memory_bytes.into(),
2051 disk_limit.into(),
2052 (alloc.credits_per_hour).into(),
2053 ]);
2054
2055 updates.push(BuiltinTableUpdate::row(
2056 &*MZ_CLUSTER_REPLICA_SIZES,
2057 row,
2058 Diff::ONE,
2059 ));
2060 }
2061
2062 updates
2063 }
2064
2065 pub fn pack_subscribe_update(
2066 &self,
2067 id: GlobalId,
2068 subscribe: &ActiveSubscribe,
2069 diff: Diff,
2070 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2071 let mut row = Row::default();
2072 let mut packer = row.packer();
2073 packer.push(Datum::String(&id.to_string()));
2074 packer.push(Datum::Uuid(subscribe.session_uuid));
2075 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2076
2077 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2078 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2079
2080 let depends_on: Vec<_> = subscribe
2081 .depends_on
2082 .iter()
2083 .map(|id| id.to_string())
2084 .collect();
2085 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2086
2087 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2088 }
2089
2090 pub fn pack_session_update(
2091 &self,
2092 conn: &ConnMeta,
2093 diff: Diff,
2094 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2095 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2096 BuiltinTableUpdate::row(
2097 &*MZ_SESSIONS,
2098 Row::pack_slice(&[
2099 Datum::Uuid(conn.uuid()),
2100 Datum::UInt32(conn.conn_id().unhandled()),
2101 Datum::String(&conn.authenticated_role_id().to_string()),
2102 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2103 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2104 ]),
2105 diff,
2106 )
2107 }
2108
2109 pub fn pack_default_privileges_update(
2110 &self,
2111 default_privilege_object: &DefaultPrivilegeObject,
2112 grantee: &RoleId,
2113 acl_mode: &AclMode,
2114 diff: Diff,
2115 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2116 BuiltinTableUpdate::row(
2117 &*MZ_DEFAULT_PRIVILEGES,
2118 Row::pack_slice(&[
2119 default_privilege_object.role_id.to_string().as_str().into(),
2120 default_privilege_object
2121 .database_id
2122 .map(|database_id| database_id.to_string())
2123 .as_deref()
2124 .into(),
2125 default_privilege_object
2126 .schema_id
2127 .map(|schema_id| schema_id.to_string())
2128 .as_deref()
2129 .into(),
2130 default_privilege_object
2131 .object_type
2132 .to_string()
2133 .to_lowercase()
2134 .as_str()
2135 .into(),
2136 grantee.to_string().as_str().into(),
2137 acl_mode.to_string().as_str().into(),
2138 ]),
2139 diff,
2140 )
2141 }
2142
2143 pub fn pack_system_privileges_update(
2144 &self,
2145 privileges: MzAclItem,
2146 diff: Diff,
2147 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2148 BuiltinTableUpdate::row(
2149 &*MZ_SYSTEM_PRIVILEGES,
2150 Row::pack_slice(&[privileges.into()]),
2151 diff,
2152 )
2153 }
2154
2155 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2156 let mut row = Row::default();
2157 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2158 row.packer()
2159 .try_push_array(
2160 &[ArrayDimension {
2161 lower_bound: 1,
2162 length: flat_privileges.len(),
2163 }],
2164 flat_privileges
2165 .into_iter()
2166 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2167 )
2168 .expect("privileges is 1 dimensional, and its length is used for the array length");
2169 row
2170 }
2171
2172 pub fn pack_comment_update(
2173 &self,
2174 object_id: CommentObjectId,
2175 column_pos: Option<usize>,
2176 comment: &str,
2177 diff: Diff,
2178 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2179 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2181 let audit_type = super::object_type_to_audit_object_type(object_type);
2182 let object_type_str = audit_type.to_string();
2183
2184 let object_id_str = match object_id {
2185 CommentObjectId::Table(global_id)
2186 | CommentObjectId::View(global_id)
2187 | CommentObjectId::MaterializedView(global_id)
2188 | CommentObjectId::Source(global_id)
2189 | CommentObjectId::Sink(global_id)
2190 | CommentObjectId::Index(global_id)
2191 | CommentObjectId::Func(global_id)
2192 | CommentObjectId::Connection(global_id)
2193 | CommentObjectId::Secret(global_id)
2194 | CommentObjectId::Type(global_id)
2195 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2196 CommentObjectId::Role(role_id) => role_id.to_string(),
2197 CommentObjectId::Database(database_id) => database_id.to_string(),
2198 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2199 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2200 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2201 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2202 };
2203 let column_pos_datum = match column_pos {
2204 Some(pos) => {
2205 let pos =
2207 i32::try_from(pos).expect("we constrain this value in the planning layer");
2208 Datum::Int32(pos)
2209 }
2210 None => Datum::Null,
2211 };
2212
2213 BuiltinTableUpdate::row(
2214 &*MZ_COMMENTS,
2215 Row::pack_slice(&[
2216 Datum::String(&object_id_str),
2217 Datum::String(&object_type_str),
2218 column_pos_datum,
2219 Datum::String(comment),
2220 ]),
2221 diff,
2222 )
2223 }
2224
2225 pub fn pack_webhook_source_update(
2226 &self,
2227 item_id: CatalogItemId,
2228 diff: Diff,
2229 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2230 let url = self
2231 .try_get_webhook_url(&item_id)
2232 .expect("webhook source should exist");
2233 let url = url.to_string();
2234 let name = &self.get_entry(&item_id).name().item;
2235 let id_str = item_id.to_string();
2236
2237 BuiltinTableUpdate::row(
2238 &*MZ_WEBHOOKS_SOURCES,
2239 Row::pack_slice(&[
2240 Datum::String(&id_str),
2241 Datum::String(name),
2242 Datum::String(&url),
2243 ]),
2244 diff,
2245 )
2246 }
2247
2248 pub fn pack_source_references_update(
2249 &self,
2250 source_references: &SourceReferences,
2251 diff: Diff,
2252 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2253 let source_id = source_references.source_id.to_string();
2254 let updated_at = &source_references.updated_at;
2255 source_references
2256 .references
2257 .iter()
2258 .map(|reference| {
2259 let mut row = Row::default();
2260 let mut packer = row.packer();
2261 packer.extend([
2262 Datum::String(&source_id),
2263 reference
2264 .namespace
2265 .as_ref()
2266 .map(|s| Datum::String(s))
2267 .unwrap_or(Datum::Null),
2268 Datum::String(&reference.name),
2269 Datum::TimestampTz(
2270 mz_ore::now::to_datetime(*updated_at)
2271 .try_into()
2272 .expect("must fit"),
2273 ),
2274 ]);
2275 if reference.columns.len() > 0 {
2276 packer
2277 .try_push_array(
2278 &[ArrayDimension {
2279 lower_bound: 1,
2280 length: reference.columns.len(),
2281 }],
2282 reference.columns.iter().map(|col| Datum::String(col)),
2283 )
2284 .expect(
2285 "columns is 1 dimensional, and its length is used for the array length",
2286 );
2287 } else {
2288 packer.push(Datum::Null);
2289 }
2290
2291 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2292 })
2293 .collect()
2294 }
2295}