1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_METRICS,
20 MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICA_STATUSES, MZ_CLUSTER_REPLICAS,
21 MZ_CLUSTER_SCHEDULES, MZ_CLUSTER_WORKLOAD_CLASSES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS,
22 MZ_CONNECTIONS, MZ_CONTINUAL_TASKS, MZ_DATABASES, MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS,
23 MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES, MZ_INDEX_COLUMNS, MZ_INDEXES,
24 MZ_INTERNAL_CLUSTER_REPLICAS, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES,
25 MZ_KAFKA_SOURCES, MZ_LIST_TYPES, MZ_MAP_TYPES, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
26 MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES, MZ_NETWORK_POLICIES, MZ_NETWORK_POLICY_RULES,
27 MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCE_TABLES,
28 MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS, MZ_ROLES,
29 MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
30 MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
31 MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
32 MZ_WEBHOOKS_SOURCES,
33};
34use mz_catalog::config::AwsPrincipalContext;
35use mz_catalog::durable::SourceReferences;
36use mz_catalog::memory::error::{Error, ErrorKind};
37use mz_catalog::memory::objects::{
38 CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, ContinualTask,
39 DataSourceDesc, Func, Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
40};
41use mz_controller::clusters::{
42 ClusterStatus, ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ProcessId,
43 ReplicaAllocation, ReplicaLocation,
44};
45use mz_controller_types::{ClusterId, ReplicaId};
46use mz_expr::MirScalarExpr;
47use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit, ServiceProcessMetrics};
48use mz_ore::cast::CastFrom;
49use mz_ore::collections::CollectionExt;
50use mz_persist_client::batch::ProtoBatch;
51use mz_repr::adt::array::ArrayDimension;
52use mz_repr::adt::interval::Interval;
53use mz_repr::adt::jsonb::Jsonb;
54use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
55use mz_repr::network_policy_id::NetworkPolicyId;
56use mz_repr::refresh_schedule::RefreshEvery;
57use mz_repr::role_id::RoleId;
58use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, Row, RowPacker, ScalarType, Timestamp};
59use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
60use mz_sql::catalog::{
61 CatalogCluster, CatalogDatabase, CatalogSchema, CatalogType, DefaultPrivilegeObject,
62 TypeCategory,
63};
64use mz_sql::func::FuncImplCatalogDetails;
65use mz_sql::names::{
66 CommentObjectId, DatabaseId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier,
67};
68use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
69use mz_sql::session::user::SYSTEM_USER;
70use mz_sql::session::vars::SessionVars;
71use mz_sql_parser::ast::display::AstDisplay;
72use mz_storage_client::client::TableData;
73use mz_storage_types::connections::KafkaConnection;
74use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
75use mz_storage_types::connections::inline::ReferencedConnection;
76use mz_storage_types::connections::string_or_secret::StringOrSecret;
77use mz_storage_types::sinks::{KafkaSinkConnection, StorageSinkConnection};
78use mz_storage_types::sources::{
79 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
80};
81use smallvec::smallvec;
82
83use crate::active_compute_sink::ActiveSubscribe;
85use crate::catalog::CatalogState;
86use crate::coord::ConnMeta;
87
88#[derive(Debug, Clone)]
90pub struct BuiltinTableUpdate<T = CatalogItemId> {
91 pub id: T,
93 pub data: TableData,
95}
96
97impl<T> BuiltinTableUpdate<T> {
98 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
100 BuiltinTableUpdate {
101 id,
102 data: TableData::Rows(vec![(row, diff)]),
103 }
104 }
105
106 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
107 BuiltinTableUpdate {
108 id,
109 data: TableData::Batches(smallvec![batch]),
110 }
111 }
112}
113
114impl CatalogState {
115 pub fn resolve_builtin_table_updates(
116 &self,
117 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
118 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
119 builtin_table_update
120 .into_iter()
121 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
122 .collect()
123 }
124
125 pub fn resolve_builtin_table_update(
126 &self,
127 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
128 ) -> BuiltinTableUpdate<CatalogItemId> {
129 let id = self.resolve_builtin_table(id);
130 BuiltinTableUpdate { id, data }
131 }
132
133 pub fn pack_depends_update(
134 &self,
135 depender: CatalogItemId,
136 dependee: CatalogItemId,
137 diff: Diff,
138 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
139 let row = Row::pack_slice(&[
140 Datum::String(&depender.to_string()),
141 Datum::String(&dependee.to_string()),
142 ]);
143 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
144 }
145
146 pub(super) fn pack_database_update(
147 &self,
148 database_id: &DatabaseId,
149 diff: Diff,
150 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
151 let database = self.get_database(database_id);
152 let row = self.pack_privilege_array_row(database.privileges());
153 let privileges = row.unpack_first();
154 BuiltinTableUpdate::row(
155 &*MZ_DATABASES,
156 Row::pack_slice(&[
157 Datum::String(&database.id.to_string()),
158 Datum::UInt32(database.oid),
159 Datum::String(database.name()),
160 Datum::String(&database.owner_id.to_string()),
161 privileges,
162 ]),
163 diff,
164 )
165 }
166
167 pub(super) fn pack_schema_update(
168 &self,
169 database_spec: &ResolvedDatabaseSpecifier,
170 schema_id: &SchemaId,
171 diff: Diff,
172 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
173 let (database_id, schema) = match database_spec {
174 ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]),
175 ResolvedDatabaseSpecifier::Id(id) => (
176 Some(id.to_string()),
177 &self.database_by_id[id].schemas_by_id[schema_id],
178 ),
179 };
180 let row = self.pack_privilege_array_row(schema.privileges());
181 let privileges = row.unpack_first();
182 BuiltinTableUpdate::row(
183 &*MZ_SCHEMAS,
184 Row::pack_slice(&[
185 Datum::String(&schema_id.to_string()),
186 Datum::UInt32(schema.oid),
187 Datum::from(database_id.as_deref()),
188 Datum::String(&schema.name.schema),
189 Datum::String(&schema.owner_id.to_string()),
190 privileges,
191 ]),
192 diff,
193 )
194 }
195
196 pub(super) fn pack_role_update(
197 &self,
198 id: RoleId,
199 diff: Diff,
200 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
201 match id {
202 RoleId::Public => vec![],
204 id => {
205 let role = self.get_role(&id);
206 let role_update = BuiltinTableUpdate::row(
207 &*MZ_ROLES,
208 Row::pack_slice(&[
209 Datum::String(&role.id.to_string()),
210 Datum::UInt32(role.oid),
211 Datum::String(&role.name),
212 Datum::from(role.attributes.inherit),
213 ]),
214 diff,
215 );
216 let mut updates = vec![role_update];
217
218 let session_vars_reference = SessionVars::new_unchecked(
221 &mz_build_info::DUMMY_BUILD_INFO,
222 SYSTEM_USER.clone(),
223 None,
224 );
225
226 for (name, val) in role.vars() {
227 let result = session_vars_reference
228 .inspect(name)
229 .and_then(|var| var.check(val.borrow()));
230 let Ok(formatted_val) = result else {
231 tracing::error!(?name, ?val, ?result, "found invalid role default var");
234 continue;
235 };
236
237 let role_var_update = BuiltinTableUpdate::row(
238 &*MZ_ROLE_PARAMETERS,
239 Row::pack_slice(&[
240 Datum::String(&role.id.to_string()),
241 Datum::String(name),
242 Datum::String(&formatted_val),
243 ]),
244 diff,
245 );
246 updates.push(role_var_update);
247 }
248
249 updates
250 }
251 }
252 }
253
254 pub(super) fn pack_role_members_update(
255 &self,
256 role_id: RoleId,
257 member_id: RoleId,
258 diff: Diff,
259 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
260 let grantor_id = self
261 .get_role(&member_id)
262 .membership
263 .map
264 .get(&role_id)
265 .expect("catalog out of sync");
266 BuiltinTableUpdate::row(
267 &*MZ_ROLE_MEMBERS,
268 Row::pack_slice(&[
269 Datum::String(&role_id.to_string()),
270 Datum::String(&member_id.to_string()),
271 Datum::String(&grantor_id.to_string()),
272 ]),
273 diff,
274 )
275 }
276
277 pub(super) fn pack_cluster_update(
278 &self,
279 name: &str,
280 diff: Diff,
281 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
282 let id = self.clusters_by_name[name];
283 let cluster = &self.clusters_by_id[&id];
284 let row = self.pack_privilege_array_row(cluster.privileges());
285 let privileges = row.unpack_first();
286 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
287 match &cluster.config.variant {
288 ClusterVariant::Managed(config) => (
289 Some(config.size.as_str()),
290 Some(config.disk),
291 Some(config.replication_factor),
292 if config.availability_zones.is_empty() {
293 None
294 } else {
295 Some(config.availability_zones.clone())
296 },
297 Some(config.logging.log_logging),
298 config.logging.interval.map(|d| {
299 Interval::from_duration(&d)
300 .expect("planning ensured this convertible back to interval")
301 }),
302 ),
303 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
304 };
305
306 let mut row = Row::default();
307 let mut packer = row.packer();
308 packer.extend([
309 Datum::String(&id.to_string()),
310 Datum::String(name),
311 Datum::String(&cluster.owner_id.to_string()),
312 privileges,
313 cluster.is_managed().into(),
314 size.into(),
315 replication_factor.into(),
316 disk.into(),
317 ]);
318 if let Some(azs) = azs {
319 packer.push_list(azs.iter().map(|az| Datum::String(az)));
320 } else {
321 packer.push(Datum::Null);
322 }
323 packer.push(Datum::from(introspection_debugging));
324 packer.push(Datum::from(introspection_interval));
325
326 let mut updates = Vec::new();
327
328 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
329
330 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
331 let row = match managed_config.schedule {
332 ClusterSchedule::Manual => Row::pack_slice(&[
333 Datum::String(&id.to_string()),
334 Datum::String("manual"),
335 Datum::Null,
336 ]),
337 ClusterSchedule::Refresh {
338 hydration_time_estimate,
339 } => Row::pack_slice(&[
340 Datum::String(&id.to_string()),
341 Datum::String("on-refresh"),
342 Datum::Interval(
343 Interval::from_duration(&hydration_time_estimate)
344 .expect("planning ensured that this is convertible back to Interval"),
345 ),
346 ]),
347 };
348 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
349 }
350
351 updates.push(BuiltinTableUpdate::row(
352 &*MZ_CLUSTER_WORKLOAD_CLASSES,
353 Row::pack_slice(&[
354 Datum::String(&id.to_string()),
355 Datum::from(cluster.config.workload_class.as_deref()),
356 ]),
357 diff,
358 ));
359
360 updates
361 }
362
363 pub(super) fn pack_cluster_replica_update(
364 &self,
365 cluster_id: ClusterId,
366 name: &str,
367 diff: Diff,
368 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
369 let cluster = &self.clusters_by_id[&cluster_id];
370 let id = cluster.replica_id(name).expect("Must exist");
371 let replica = cluster.replica(id).expect("Must exist");
372
373 let (size, disk, az, internal, pending) = match &replica.config.location {
374 ReplicaLocation::Managed(ManagedReplicaLocation {
377 size,
378 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
379 allocation: _,
380 disk,
381 billed_as: _,
382 internal,
383 pending,
384 }) => (
385 Some(&**size),
386 Some(*disk),
387 Some(az.as_str()),
388 *internal,
389 *pending,
390 ),
391 ReplicaLocation::Managed(ManagedReplicaLocation {
392 size,
393 availability_zones: _,
394 allocation: _,
395 disk,
396 billed_as: _,
397 internal,
398 pending,
399 }) => (Some(&**size), Some(*disk), None, *internal, *pending),
400 _ => (None, None, None, false, false),
401 };
402
403 let cluster_replica_update = BuiltinTableUpdate::row(
404 &*MZ_CLUSTER_REPLICAS,
405 Row::pack_slice(&[
406 Datum::String(&id.to_string()),
407 Datum::String(name),
408 Datum::String(&cluster_id.to_string()),
409 Datum::from(size),
410 Datum::from(az),
411 Datum::String(&replica.owner_id.to_string()),
412 Datum::from(disk),
413 ]),
414 diff,
415 );
416
417 let mut updates = vec![cluster_replica_update];
418
419 if internal {
420 let update = BuiltinTableUpdate::row(
421 &*MZ_INTERNAL_CLUSTER_REPLICAS,
422 Row::pack_slice(&[Datum::String(&id.to_string())]),
423 diff,
424 );
425 updates.push(update);
426 }
427
428 if pending {
429 let update = BuiltinTableUpdate::row(
430 &*MZ_PENDING_CLUSTER_REPLICAS,
431 Row::pack_slice(&[Datum::String(&id.to_string())]),
432 diff,
433 );
434 updates.push(update);
435 }
436
437 updates
438 }
439
440 pub(crate) fn pack_cluster_replica_status_update(
442 &self,
443 replica_id: ReplicaId,
444 process_id: ProcessId,
445 event: &ClusterReplicaProcessStatus,
446 diff: Diff,
447 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
448 let status = event.status.as_kebab_case_str();
449
450 let offline_reason = match event.status {
451 ClusterStatus::Online => None,
452 ClusterStatus::Offline(None) => None,
453 ClusterStatus::Offline(Some(reason)) => Some(reason.to_string()),
454 };
455
456 BuiltinTableUpdate::row(
457 &*MZ_CLUSTER_REPLICA_STATUSES,
458 Row::pack_slice(&[
459 Datum::String(&replica_id.to_string()),
460 Datum::UInt64(process_id),
461 Datum::String(status),
462 Datum::from(offline_reason.as_deref()),
463 Datum::TimestampTz(event.time.try_into().expect("must fit")),
464 ]),
465 diff,
466 )
467 }
468
469 pub(crate) fn pack_network_policy_update(
470 &self,
471 policy_id: &NetworkPolicyId,
472 diff: Diff,
473 ) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, Error> {
474 let policy = self.get_network_policy(policy_id);
475 let row = self.pack_privilege_array_row(&policy.privileges);
476 let privileges = row.unpack_first();
477 let mut updates = Vec::new();
478 for ref rule in policy.rules.clone() {
479 updates.push(BuiltinTableUpdate::row(
480 &*MZ_NETWORK_POLICY_RULES,
481 Row::pack_slice(&[
482 Datum::String(&rule.name),
483 Datum::String(&policy.id.to_string()),
484 Datum::String(&rule.action.to_string()),
485 Datum::String(&rule.address.to_string()),
486 Datum::String(&rule.direction.to_string()),
487 ]),
488 diff,
489 ));
490 }
491 updates.push(BuiltinTableUpdate::row(
492 &*MZ_NETWORK_POLICIES,
493 Row::pack_slice(&[
494 Datum::String(&policy.id.to_string()),
495 Datum::String(&policy.name),
496 Datum::String(&policy.owner_id.to_string()),
497 privileges,
498 Datum::UInt32(policy.oid.clone()),
499 ]),
500 diff,
501 ));
502
503 Ok(updates)
504 }
505
506 pub(super) fn pack_item_update(
507 &self,
508 id: CatalogItemId,
509 diff: Diff,
510 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
511 let entry = self.get_entry(&id);
512 let oid = entry.oid();
513 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
514 let schema_id = &self
515 .get_schema(
516 &entry.name().qualifiers.database_spec,
517 &entry.name().qualifiers.schema_spec,
518 conn_id,
519 )
520 .id;
521 let name = &entry.name().item;
522 let owner_id = entry.owner_id();
523 let privileges_row = self.pack_privilege_array_row(entry.privileges());
524 let privileges = privileges_row.unpack_first();
525 let mut updates = match entry.item() {
526 CatalogItem::Log(_) => self.pack_source_update(
527 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
528 privileges, diff, None,
529 ),
530 CatalogItem::Index(index) => {
531 self.pack_index_update(id, oid, name, owner_id, index, diff)
532 }
533 CatalogItem::Table(table) => {
534 let mut updates = self
535 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
536
537 if let TableDataSource::DataSource {
538 desc: data_source,
539 timeline: _,
540 } = &table.data_source
541 {
542 updates.extend(match data_source {
543 DataSourceDesc::IngestionExport {
544 ingestion_id,
545 external_reference: UnresolvedItemName(external_reference),
546 details: _,
547 data_config: _,
548 } => {
549 let ingestion_entry = self
550 .get_entry(ingestion_id)
551 .source_desc()
552 .expect("primary source exists")
553 .expect("primary source is a source");
554
555 match ingestion_entry.connection.name() {
556 "postgres" => {
557 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
558 let schema_name = external_reference[1].to_ast_string_simple();
564 let table_name = external_reference[2].to_ast_string_simple();
565
566 self.pack_postgres_source_tables_update(
567 id,
568 &schema_name,
569 &table_name,
570 diff,
571 )
572 }
573 "mysql" => {
574 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
575 let schema_name = external_reference[0].to_ast_string_simple();
576 let table_name = external_reference[1].to_ast_string_simple();
577
578 self.pack_mysql_source_tables_update(
579 id,
580 &schema_name,
581 &table_name,
582 diff,
583 )
584 }
585 "sql-server" => {
586 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
587 let schema_name = external_reference[1].to_ast_string_simple();
592 let table_name = external_reference[2].to_ast_string_simple();
593
594 self.pack_sql_server_source_table_update(
595 id,
596 &schema_name,
597 &table_name,
598 diff,
599 )
600 }
601 "load-generator" => vec![],
604 "kafka" => {
605 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
606 let topic = external_reference[0].to_ast_string_simple();
607 let envelope = data_source.envelope();
608 let (key_format, value_format) = data_source.formats();
609
610 self.pack_kafka_source_tables_update(
611 id,
612 &topic,
613 envelope,
614 key_format,
615 value_format,
616 diff,
617 )
618 }
619 s => unreachable!("{s} sources do not have tables"),
620 }
621 }
622 _ => vec![],
623 });
624 }
625
626 updates
627 }
628 CatalogItem::Source(source) => {
629 let source_type = source.source_type();
630 let connection_id = source.connection_id();
631 let envelope = source.data_source.envelope();
632 let cluster_entry = match source.data_source {
633 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
636 self.get_entry(&ingestion_id)
637 }
638 _ => entry,
639 };
640
641 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
642
643 let (key_format, value_format) = source.data_source.formats();
644
645 let mut updates = self.pack_source_update(
646 id,
647 oid,
648 schema_id,
649 name,
650 source_type,
651 connection_id,
652 envelope,
653 key_format,
654 value_format,
655 cluster_id.as_deref(),
656 owner_id,
657 privileges,
658 diff,
659 source.create_sql.as_ref(),
660 );
661
662 updates.extend(match &source.data_source {
663 DataSourceDesc::Ingestion { ingestion_desc, .. } => {
664 match &ingestion_desc.desc.connection {
665 GenericSourceConnection::Postgres(postgres) => {
666 self.pack_postgres_source_update(id, postgres, diff)
667 }
668 GenericSourceConnection::Kafka(kafka) => {
669 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
670 }
671 _ => vec![],
672 }
673 }
674 DataSourceDesc::IngestionExport {
675 ingestion_id,
676 external_reference: UnresolvedItemName(external_reference),
677 details: _,
678 data_config: _,
679 } => {
680 let ingestion_entry = self
681 .get_entry(ingestion_id)
682 .source_desc()
683 .expect("primary source exists")
684 .expect("primary source is a source");
685
686 match ingestion_entry.connection.name() {
687 "postgres" => {
688 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
689 let schema_name = external_reference[1].to_ast_string_simple();
695 let table_name = external_reference[2].to_ast_string_simple();
696
697 self.pack_postgres_source_tables_update(
698 id,
699 &schema_name,
700 &table_name,
701 diff,
702 )
703 }
704 "mysql" => {
705 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
706 let schema_name = external_reference[0].to_ast_string_simple();
707 let table_name = external_reference[1].to_ast_string_simple();
708
709 self.pack_mysql_source_tables_update(
710 id,
711 &schema_name,
712 &table_name,
713 diff,
714 )
715 }
716 "sql-server" => {
717 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
718 let schema_name = external_reference[1].to_ast_string_simple();
723 let table_name = external_reference[2].to_ast_string_simple();
724
725 self.pack_sql_server_source_table_update(
726 id,
727 &schema_name,
728 &table_name,
729 diff,
730 )
731 }
732 "load-generator" => vec![],
735 s => unreachable!("{s} sources do not have subsources"),
736 }
737 }
738 DataSourceDesc::Webhook { .. } => {
739 vec![self.pack_webhook_source_update(id, diff)]
740 }
741 _ => vec![],
742 });
743
744 updates
745 }
746 CatalogItem::View(view) => {
747 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
748 }
749 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
750 id, oid, schema_id, name, owner_id, privileges, mview, diff,
751 ),
752 CatalogItem::Sink(sink) => {
753 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
754 }
755 CatalogItem::Type(ty) => {
756 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
757 }
758 CatalogItem::Func(func) => {
759 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
760 }
761 CatalogItem::Secret(_) => {
762 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
763 }
764 CatalogItem::Connection(connection) => self.pack_connection_update(
765 id, oid, schema_id, name, owner_id, privileges, connection, diff,
766 ),
767 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
768 id, oid, schema_id, name, owner_id, privileges, ct, diff,
769 ),
770 };
771
772 if !entry.item().is_temporary() {
773 for dependee in entry.item().references().items() {
776 updates.push(self.pack_depends_update(id, *dependee, diff))
777 }
778 }
779
780 let full_name = self.resolve_full_name(entry.name(), entry.conn_id());
781 if let Ok(desc) = entry.desc_latest(&full_name) {
783 let defaults = match entry.item() {
784 CatalogItem::Table(Table {
785 data_source: TableDataSource::TableWrites { defaults },
786 ..
787 }) => Some(defaults),
788 _ => None,
789 };
790 for (i, (column_name, column_type)) in desc.iter().enumerate() {
791 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
792 let default: Datum = default
793 .as_ref()
794 .map(|d| Datum::String(d))
795 .unwrap_or(Datum::Null);
796 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
797 let (type_name, type_oid) = match &column_type.scalar_type {
798 ScalarType::List {
799 custom_id: Some(custom_id),
800 ..
801 }
802 | ScalarType::Map {
803 custom_id: Some(custom_id),
804 ..
805 }
806 | ScalarType::Record {
807 custom_id: Some(custom_id),
808 ..
809 } => {
810 let entry = self.get_entry(custom_id);
811 let name = &*entry.name().item;
826 let oid = entry.oid();
827 (name, oid)
828 }
829 _ => (pgtype.name(), pgtype.oid()),
830 };
831 updates.push(BuiltinTableUpdate::row(
832 &*MZ_COLUMNS,
833 Row::pack_slice(&[
834 Datum::String(&id.to_string()),
835 Datum::String(column_name.as_str()),
836 Datum::UInt64(u64::cast_from(i + 1)),
837 Datum::from(column_type.nullable),
838 Datum::String(type_name),
839 default,
840 Datum::UInt32(type_oid),
841 Datum::Int32(pgtype.typmod()),
842 ]),
843 diff,
844 ));
845 }
846 }
847
848 if let Some(cw) = entry.item().initial_logical_compaction_window() {
850 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
851 for (cw, mut ids) in self.source_compaction_windows([id]) {
853 ids.remove(&id);
855 for sub_id in ids {
856 updates.push(self.pack_history_retention_strategy_update(sub_id, cw, diff));
857 }
858 }
859 }
860
861 updates
862 }
863
864 fn pack_history_retention_strategy_update(
865 &self,
866 id: CatalogItemId,
867 cw: CompactionWindow,
868 diff: Diff,
869 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
870 let cw: u64 = cw.comparable_timestamp().into();
871 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
872 .expect("must serialize");
873 BuiltinTableUpdate::row(
874 &*MZ_HISTORY_RETENTION_STRATEGIES,
875 Row::pack_slice(&[
876 Datum::String(&id.to_string()),
877 Datum::String("FOR"),
879 cw.into_row().into_element(),
880 ]),
881 diff,
882 )
883 }
884
885 fn pack_table_update(
886 &self,
887 id: CatalogItemId,
888 oid: u32,
889 schema_id: &SchemaSpecifier,
890 name: &str,
891 owner_id: &RoleId,
892 privileges: Datum,
893 diff: Diff,
894 table: &Table,
895 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
896 let redacted = table.create_sql.as_ref().map(|create_sql| {
897 mz_sql::parse::parse(create_sql)
898 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
899 .into_element()
900 .ast
901 .to_ast_string_redacted()
902 });
903 let source_id = if let TableDataSource::DataSource {
904 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
905 ..
906 } = &table.data_source
907 {
908 Some(ingestion_id.to_string())
909 } else {
910 None
911 };
912
913 vec![BuiltinTableUpdate::row(
914 &*MZ_TABLES,
915 Row::pack_slice(&[
916 Datum::String(&id.to_string()),
917 Datum::UInt32(oid),
918 Datum::String(&schema_id.to_string()),
919 Datum::String(name),
920 Datum::String(&owner_id.to_string()),
921 privileges,
922 if let Some(create_sql) = &table.create_sql {
923 Datum::String(create_sql)
924 } else {
925 Datum::Null
926 },
927 if let Some(redacted) = &redacted {
928 Datum::String(redacted)
929 } else {
930 Datum::Null
931 },
932 if let Some(source_id) = source_id.as_ref() {
933 Datum::String(source_id)
934 } else {
935 Datum::Null
936 },
937 ]),
938 diff,
939 )]
940 }
941
942 fn pack_source_update(
943 &self,
944 id: CatalogItemId,
945 oid: u32,
946 schema_id: &SchemaSpecifier,
947 name: &str,
948 source_desc_name: &str,
949 connection_id: Option<CatalogItemId>,
950 envelope: Option<&str>,
951 key_format: Option<&str>,
952 value_format: Option<&str>,
953 cluster_id: Option<&str>,
954 owner_id: &RoleId,
955 privileges: Datum,
956 diff: Diff,
957 create_sql: Option<&String>,
958 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
959 let redacted = create_sql.map(|create_sql| {
960 let create_stmt = mz_sql::parse::parse(create_sql)
961 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
962 .into_element()
963 .ast;
964 create_stmt.to_ast_string_redacted()
965 });
966 vec![BuiltinTableUpdate::row(
967 &*MZ_SOURCES,
968 Row::pack_slice(&[
969 Datum::String(&id.to_string()),
970 Datum::UInt32(oid),
971 Datum::String(&schema_id.to_string()),
972 Datum::String(name),
973 Datum::String(source_desc_name),
974 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
975 Datum::Null,
978 Datum::from(envelope),
979 Datum::from(key_format),
980 Datum::from(value_format),
981 Datum::from(cluster_id),
982 Datum::String(&owner_id.to_string()),
983 privileges,
984 if let Some(create_sql) = create_sql {
985 Datum::String(create_sql)
986 } else {
987 Datum::Null
988 },
989 if let Some(redacted) = &redacted {
990 Datum::String(redacted)
991 } else {
992 Datum::Null
993 },
994 ]),
995 diff,
996 )]
997 }
998
999 fn pack_postgres_source_update(
1000 &self,
1001 id: CatalogItemId,
1002 postgres: &PostgresSourceConnection<ReferencedConnection>,
1003 diff: Diff,
1004 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1005 vec![BuiltinTableUpdate::row(
1006 &*MZ_POSTGRES_SOURCES,
1007 Row::pack_slice(&[
1008 Datum::String(&id.to_string()),
1009 Datum::String(&postgres.publication_details.slot),
1010 Datum::from(postgres.publication_details.timeline_id),
1011 ]),
1012 diff,
1013 )]
1014 }
1015
1016 fn pack_kafka_source_update(
1017 &self,
1018 item_id: CatalogItemId,
1019 collection_id: GlobalId,
1020 kafka: &KafkaSourceConnection<ReferencedConnection>,
1021 diff: Diff,
1022 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1023 vec![BuiltinTableUpdate::row(
1024 &*MZ_KAFKA_SOURCES,
1025 Row::pack_slice(&[
1026 Datum::String(&item_id.to_string()),
1027 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
1028 Datum::String(&kafka.topic),
1029 ]),
1030 diff,
1031 )]
1032 }
1033
1034 fn pack_postgres_source_tables_update(
1035 &self,
1036 id: CatalogItemId,
1037 schema_name: &str,
1038 table_name: &str,
1039 diff: Diff,
1040 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1041 vec![BuiltinTableUpdate::row(
1042 &*MZ_POSTGRES_SOURCE_TABLES,
1043 Row::pack_slice(&[
1044 Datum::String(&id.to_string()),
1045 Datum::String(schema_name),
1046 Datum::String(table_name),
1047 ]),
1048 diff,
1049 )]
1050 }
1051
1052 fn pack_mysql_source_tables_update(
1053 &self,
1054 id: CatalogItemId,
1055 schema_name: &str,
1056 table_name: &str,
1057 diff: Diff,
1058 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1059 vec![BuiltinTableUpdate::row(
1060 &*MZ_MYSQL_SOURCE_TABLES,
1061 Row::pack_slice(&[
1062 Datum::String(&id.to_string()),
1063 Datum::String(schema_name),
1064 Datum::String(table_name),
1065 ]),
1066 diff,
1067 )]
1068 }
1069
1070 fn pack_sql_server_source_table_update(
1071 &self,
1072 id: CatalogItemId,
1073 schema_name: &str,
1074 table_name: &str,
1075 diff: Diff,
1076 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1077 vec![BuiltinTableUpdate::row(
1078 &*MZ_SQL_SERVER_SOURCE_TABLES,
1079 Row::pack_slice(&[
1080 Datum::String(&id.to_string()),
1081 Datum::String(schema_name),
1082 Datum::String(table_name),
1083 ]),
1084 diff,
1085 )]
1086 }
1087
1088 fn pack_kafka_source_tables_update(
1089 &self,
1090 id: CatalogItemId,
1091 topic: &str,
1092 envelope: Option<&str>,
1093 key_format: Option<&str>,
1094 value_format: Option<&str>,
1095 diff: Diff,
1096 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1097 vec![BuiltinTableUpdate::row(
1098 &*MZ_KAFKA_SOURCE_TABLES,
1099 Row::pack_slice(&[
1100 Datum::String(&id.to_string()),
1101 Datum::String(topic),
1102 Datum::from(envelope),
1103 Datum::from(key_format),
1104 Datum::from(value_format),
1105 ]),
1106 diff,
1107 )]
1108 }
1109
1110 fn pack_connection_update(
1111 &self,
1112 id: CatalogItemId,
1113 oid: u32,
1114 schema_id: &SchemaSpecifier,
1115 name: &str,
1116 owner_id: &RoleId,
1117 privileges: Datum,
1118 connection: &Connection,
1119 diff: Diff,
1120 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1121 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1122 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1123 .into_element()
1124 .ast;
1125 let mut updates = vec![BuiltinTableUpdate::row(
1126 &*MZ_CONNECTIONS,
1127 Row::pack_slice(&[
1128 Datum::String(&id.to_string()),
1129 Datum::UInt32(oid),
1130 Datum::String(&schema_id.to_string()),
1131 Datum::String(name),
1132 Datum::String(match connection.details {
1133 ConnectionDetails::Kafka { .. } => "kafka",
1134 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1135 ConnectionDetails::Postgres { .. } => "postgres",
1136 ConnectionDetails::Aws(..) => "aws",
1137 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1138 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1139 ConnectionDetails::MySql { .. } => "mysql",
1140 ConnectionDetails::SqlServer(_) => "sql-server",
1141 }),
1142 Datum::String(&owner_id.to_string()),
1143 privileges,
1144 Datum::String(&connection.create_sql),
1145 Datum::String(&create_stmt.to_ast_string_redacted()),
1146 ]),
1147 diff,
1148 )];
1149 match connection.details {
1150 ConnectionDetails::Kafka(ref kafka) => {
1151 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1152 }
1153 ConnectionDetails::Aws(ref aws_config) => {
1154 match self.pack_aws_connection_update(id, aws_config, diff) {
1155 Ok(update) => {
1156 updates.push(update);
1157 }
1158 Err(e) => {
1159 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1160 }
1161 }
1162 }
1163 ConnectionDetails::AwsPrivatelink(_) => {
1164 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1165 updates.push(self.pack_aws_privatelink_connection_update(
1166 id,
1167 aws_principal_context,
1168 diff,
1169 ));
1170 } else {
1171 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1172 }
1173 }
1174 ConnectionDetails::Ssh {
1175 ref key_1,
1176 ref key_2,
1177 ..
1178 } => {
1179 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1180 }
1181 ConnectionDetails::Csr(_)
1182 | ConnectionDetails::Postgres(_)
1183 | ConnectionDetails::MySql(_)
1184 | ConnectionDetails::SqlServer(_) => (),
1185 };
1186 updates
1187 }
1188
1189 pub(crate) fn pack_ssh_tunnel_connection_update(
1190 &self,
1191 id: CatalogItemId,
1192 key_1: &SshKey,
1193 key_2: &SshKey,
1194 diff: Diff,
1195 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1196 BuiltinTableUpdate::row(
1197 &*MZ_SSH_TUNNEL_CONNECTIONS,
1198 Row::pack_slice(&[
1199 Datum::String(&id.to_string()),
1200 Datum::String(key_1.public_key().as_str()),
1201 Datum::String(key_2.public_key().as_str()),
1202 ]),
1203 diff,
1204 )
1205 }
1206
1207 fn pack_kafka_connection_update(
1208 &self,
1209 id: CatalogItemId,
1210 kafka: &KafkaConnection<ReferencedConnection>,
1211 diff: Diff,
1212 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1213 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1214 let mut row = Row::default();
1215 row.packer()
1216 .try_push_array(
1217 &[ArrayDimension {
1218 lower_bound: 1,
1219 length: kafka.brokers.len(),
1220 }],
1221 kafka
1222 .brokers
1223 .iter()
1224 .map(|broker| Datum::String(&broker.address)),
1225 )
1226 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1227 let brokers = row.unpack_first();
1228 vec![BuiltinTableUpdate::row(
1229 &*MZ_KAFKA_CONNECTIONS,
1230 Row::pack_slice(&[
1231 Datum::String(&id.to_string()),
1232 brokers,
1233 Datum::String(&progress_topic),
1234 ]),
1235 diff,
1236 )]
1237 }
1238
1239 pub fn pack_aws_privatelink_connection_update(
1240 &self,
1241 connection_id: CatalogItemId,
1242 aws_principal_context: &AwsPrincipalContext,
1243 diff: Diff,
1244 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1245 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1246 let row = Row::pack_slice(&[
1247 Datum::String(&connection_id.to_string()),
1248 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1249 ]);
1250 BuiltinTableUpdate::row(id, row, diff)
1251 }
1252
1253 pub fn pack_aws_connection_update(
1254 &self,
1255 connection_id: CatalogItemId,
1256 aws_config: &AwsConnection,
1257 diff: Diff,
1258 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1259 let id = &MZ_AWS_CONNECTIONS;
1260
1261 let mut access_key_id = None;
1262 let mut access_key_id_secret_id = None;
1263 let mut secret_access_key_secret_id = None;
1264 let mut session_token = None;
1265 let mut session_token_secret_id = None;
1266 let mut assume_role_arn = None;
1267 let mut assume_role_session_name = None;
1268 let mut principal = None;
1269 let mut external_id = None;
1270 let mut example_trust_policy = None;
1271 match &aws_config.auth {
1272 AwsAuth::Credentials(credentials) => {
1273 match &credentials.access_key_id {
1274 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1275 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1276 }
1277 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1278 match credentials.session_token.as_ref() {
1279 None => (),
1280 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1281 Some(StringOrSecret::Secret(s)) => {
1282 session_token_secret_id = Some(s.to_string())
1283 }
1284 }
1285 }
1286 AwsAuth::AssumeRole(assume_role) => {
1287 assume_role_arn = Some(assume_role.arn.as_str());
1288 assume_role_session_name = assume_role.session_name.as_deref();
1289 principal = self
1290 .config
1291 .connection_context
1292 .aws_connection_role_arn
1293 .as_deref();
1294 external_id =
1295 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1296 example_trust_policy = {
1297 let policy = assume_role
1298 .example_trust_policy(&self.config.connection_context, connection_id)?;
1299 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1300 Some(policy.into_row())
1301 };
1302 }
1303 }
1304
1305 let row = Row::pack_slice(&[
1306 Datum::String(&connection_id.to_string()),
1307 Datum::from(aws_config.endpoint.as_deref()),
1308 Datum::from(aws_config.region.as_deref()),
1309 Datum::from(access_key_id),
1310 Datum::from(access_key_id_secret_id.as_deref()),
1311 Datum::from(secret_access_key_secret_id.as_deref()),
1312 Datum::from(session_token),
1313 Datum::from(session_token_secret_id.as_deref()),
1314 Datum::from(assume_role_arn),
1315 Datum::from(assume_role_session_name),
1316 Datum::from(principal),
1317 Datum::from(external_id.as_deref()),
1318 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1319 ]);
1320
1321 Ok(BuiltinTableUpdate::row(id, row, diff))
1322 }
1323
1324 fn pack_view_update(
1325 &self,
1326 id: CatalogItemId,
1327 oid: u32,
1328 schema_id: &SchemaSpecifier,
1329 name: &str,
1330 owner_id: &RoleId,
1331 privileges: Datum,
1332 view: &View,
1333 diff: Diff,
1334 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1335 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1336 .unwrap_or_else(|e| {
1337 panic!(
1338 "create_sql cannot be invalid: `{}` --- error: `{}`",
1339 view.create_sql, e
1340 )
1341 })
1342 .into_element()
1343 .ast;
1344 let query = match &create_stmt {
1345 Statement::CreateView(stmt) => &stmt.definition.query,
1346 _ => unreachable!(),
1347 };
1348
1349 let mut query_string = query.to_ast_string_stable();
1350 query_string.push(';');
1353
1354 vec![BuiltinTableUpdate::row(
1355 &*MZ_VIEWS,
1356 Row::pack_slice(&[
1357 Datum::String(&id.to_string()),
1358 Datum::UInt32(oid),
1359 Datum::String(&schema_id.to_string()),
1360 Datum::String(name),
1361 Datum::String(&query_string),
1362 Datum::String(&owner_id.to_string()),
1363 privileges,
1364 Datum::String(&view.create_sql),
1365 Datum::String(&create_stmt.to_ast_string_redacted()),
1366 ]),
1367 diff,
1368 )]
1369 }
1370
1371 fn pack_materialized_view_update(
1372 &self,
1373 id: CatalogItemId,
1374 oid: u32,
1375 schema_id: &SchemaSpecifier,
1376 name: &str,
1377 owner_id: &RoleId,
1378 privileges: Datum,
1379 mview: &MaterializedView,
1380 diff: Diff,
1381 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1382 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1383 .unwrap_or_else(|e| {
1384 panic!(
1385 "create_sql cannot be invalid: `{}` --- error: `{}`",
1386 mview.create_sql, e
1387 )
1388 })
1389 .into_element()
1390 .ast;
1391 let query_string = match &create_stmt {
1392 Statement::CreateMaterializedView(stmt) => {
1393 let mut query_string = stmt.query.to_ast_string_stable();
1394 query_string.push(';');
1397 query_string
1398 }
1399 _ => unreachable!(),
1400 };
1401
1402 let mut updates = Vec::new();
1403
1404 updates.push(BuiltinTableUpdate::row(
1405 &*MZ_MATERIALIZED_VIEWS,
1406 Row::pack_slice(&[
1407 Datum::String(&id.to_string()),
1408 Datum::UInt32(oid),
1409 Datum::String(&schema_id.to_string()),
1410 Datum::String(name),
1411 Datum::String(&mview.cluster_id.to_string()),
1412 Datum::String(&query_string),
1413 Datum::String(&owner_id.to_string()),
1414 privileges,
1415 Datum::String(&mview.create_sql),
1416 Datum::String(&create_stmt.to_ast_string_redacted()),
1417 ]),
1418 diff,
1419 ));
1420
1421 if let Some(refresh_schedule) = &mview.refresh_schedule {
1422 assert!(!refresh_schedule.is_empty());
1425 for RefreshEvery {
1426 interval,
1427 aligned_to,
1428 } in refresh_schedule.everies.iter()
1429 {
1430 let aligned_to_dt = mz_ore::now::to_datetime(
1431 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1432 );
1433 updates.push(BuiltinTableUpdate::row(
1434 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1435 Row::pack_slice(&[
1436 Datum::String(&id.to_string()),
1437 Datum::String("every"),
1438 Datum::Interval(
1439 Interval::from_duration(interval).expect(
1440 "planning ensured that this is convertible back to Interval",
1441 ),
1442 ),
1443 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1444 Datum::Null,
1445 ]),
1446 diff,
1447 ));
1448 }
1449 for at in refresh_schedule.ats.iter() {
1450 let at_dt = mz_ore::now::to_datetime(
1451 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1452 );
1453 updates.push(BuiltinTableUpdate::row(
1454 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1455 Row::pack_slice(&[
1456 Datum::String(&id.to_string()),
1457 Datum::String("at"),
1458 Datum::Null,
1459 Datum::Null,
1460 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1461 ]),
1462 diff,
1463 ));
1464 }
1465 } else {
1466 updates.push(BuiltinTableUpdate::row(
1467 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1468 Row::pack_slice(&[
1469 Datum::String(&id.to_string()),
1470 Datum::String("on-commit"),
1471 Datum::Null,
1472 Datum::Null,
1473 Datum::Null,
1474 ]),
1475 diff,
1476 ));
1477 }
1478
1479 updates
1480 }
1481
1482 fn pack_continual_task_update(
1483 &self,
1484 id: CatalogItemId,
1485 oid: u32,
1486 schema_id: &SchemaSpecifier,
1487 name: &str,
1488 owner_id: &RoleId,
1489 privileges: Datum,
1490 ct: &ContinualTask,
1491 diff: Diff,
1492 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1493 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1494 .unwrap_or_else(|e| {
1495 panic!(
1496 "create_sql cannot be invalid: `{}` --- error: `{}`",
1497 ct.create_sql, e
1498 )
1499 })
1500 .into_element()
1501 .ast;
1502 let query_string = match &create_stmt {
1503 Statement::CreateContinualTask(stmt) => {
1504 let mut query_string = String::new();
1505 for stmt in &stmt.stmts {
1506 let s = match stmt {
1507 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1508 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1509 };
1510 if query_string.is_empty() {
1511 query_string = s;
1512 } else {
1513 query_string.push_str("; ");
1514 query_string.push_str(&s);
1515 }
1516 }
1517 query_string
1518 }
1519 _ => unreachable!(),
1520 };
1521
1522 vec![BuiltinTableUpdate::row(
1523 &*MZ_CONTINUAL_TASKS,
1524 Row::pack_slice(&[
1525 Datum::String(&id.to_string()),
1526 Datum::UInt32(oid),
1527 Datum::String(&schema_id.to_string()),
1528 Datum::String(name),
1529 Datum::String(&ct.cluster_id.to_string()),
1530 Datum::String(&query_string),
1531 Datum::String(&owner_id.to_string()),
1532 privileges,
1533 Datum::String(&ct.create_sql),
1534 Datum::String(&create_stmt.to_ast_string_redacted()),
1535 ]),
1536 diff,
1537 )]
1538 }
1539
1540 fn pack_sink_update(
1541 &self,
1542 id: CatalogItemId,
1543 oid: u32,
1544 schema_id: &SchemaSpecifier,
1545 name: &str,
1546 owner_id: &RoleId,
1547 sink: &Sink,
1548 diff: Diff,
1549 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1550 let mut updates = vec![];
1551 match &sink.connection {
1552 StorageSinkConnection::Kafka(KafkaSinkConnection {
1553 topic: topic_name, ..
1554 }) => {
1555 updates.push(BuiltinTableUpdate::row(
1556 &*MZ_KAFKA_SINKS,
1557 Row::pack_slice(&[
1558 Datum::String(&id.to_string()),
1559 Datum::String(topic_name.as_str()),
1560 ]),
1561 diff,
1562 ));
1563 }
1564 };
1565
1566 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1567 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1568 .into_element()
1569 .ast;
1570
1571 let envelope = sink.envelope();
1572
1573 let combined_format = sink.combined_format();
1575 let (key_format, value_format) = sink.formats();
1576
1577 updates.push(BuiltinTableUpdate::row(
1578 &*MZ_SINKS,
1579 Row::pack_slice(&[
1580 Datum::String(&id.to_string()),
1581 Datum::UInt32(oid),
1582 Datum::String(&schema_id.to_string()),
1583 Datum::String(name),
1584 Datum::String(sink.connection.name()),
1585 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1586 Datum::Null,
1588 Datum::from(envelope),
1589 Datum::from(combined_format.as_ref()),
1590 Datum::from(key_format),
1591 Datum::String(value_format),
1592 Datum::String(&sink.cluster_id.to_string()),
1593 Datum::String(&owner_id.to_string()),
1594 Datum::String(&sink.create_sql),
1595 Datum::String(&create_stmt.to_ast_string_redacted()),
1596 ]),
1597 diff,
1598 ));
1599
1600 updates
1601 }
1602
1603 fn pack_index_update(
1604 &self,
1605 id: CatalogItemId,
1606 oid: u32,
1607 name: &str,
1608 owner_id: &RoleId,
1609 index: &Index,
1610 diff: Diff,
1611 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1612 let mut updates = vec![];
1613
1614 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1615 .unwrap_or_else(|e| {
1616 panic!(
1617 "create_sql cannot be invalid: `{}` --- error: `{}`",
1618 index.create_sql, e
1619 )
1620 })
1621 .into_element()
1622 .ast;
1623
1624 let key_sqls = match &create_stmt {
1625 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1626 .as_ref()
1627 .expect("key_parts is filled in during planning"),
1628 _ => unreachable!(),
1629 };
1630 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1631
1632 updates.push(BuiltinTableUpdate::row(
1633 &*MZ_INDEXES,
1634 Row::pack_slice(&[
1635 Datum::String(&id.to_string()),
1636 Datum::UInt32(oid),
1637 Datum::String(name),
1638 Datum::String(&on_item_id.to_string()),
1639 Datum::String(&index.cluster_id.to_string()),
1640 Datum::String(&owner_id.to_string()),
1641 Datum::String(&index.create_sql),
1642 Datum::String(&create_stmt.to_ast_string_redacted()),
1643 ]),
1644 diff,
1645 ));
1646
1647 for (i, key) in index.keys.iter().enumerate() {
1648 let on_entry = self.get_entry_by_global_id(&index.on);
1649 let nullable = key
1650 .typ(
1651 &on_entry
1652 .desc(&self.resolve_full_name(on_entry.name(), on_entry.conn_id()))
1653 .expect("can only create indexes on items with a valid description")
1654 .typ()
1655 .column_types,
1656 )
1657 .nullable;
1658 let seq_in_index = u64::cast_from(i + 1);
1659 let key_sql = key_sqls
1660 .get(i)
1661 .expect("missing sql information for index key")
1662 .to_ast_string_simple();
1663 let (field_number, expression) = match key {
1664 MirScalarExpr::Column(col) => {
1665 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1666 }
1667 _ => (Datum::Null, Datum::String(&key_sql)),
1668 };
1669 updates.push(BuiltinTableUpdate::row(
1670 &*MZ_INDEX_COLUMNS,
1671 Row::pack_slice(&[
1672 Datum::String(&id.to_string()),
1673 Datum::UInt64(seq_in_index),
1674 field_number,
1675 expression,
1676 Datum::from(nullable),
1677 ]),
1678 diff,
1679 ));
1680 }
1681
1682 updates
1683 }
1684
1685 fn pack_type_update(
1686 &self,
1687 id: CatalogItemId,
1688 oid: u32,
1689 schema_id: &SchemaSpecifier,
1690 name: &str,
1691 owner_id: &RoleId,
1692 privileges: Datum,
1693 typ: &Type,
1694 diff: Diff,
1695 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1696 let mut out = vec![];
1697
1698 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1699 mz_sql::parse::parse(create_sql)
1700 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1701 .into_element()
1702 .ast
1703 .to_ast_string_redacted()
1704 });
1705
1706 out.push(BuiltinTableUpdate::row(
1707 &*MZ_TYPES,
1708 Row::pack_slice(&[
1709 Datum::String(&id.to_string()),
1710 Datum::UInt32(oid),
1711 Datum::String(&schema_id.to_string()),
1712 Datum::String(name),
1713 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1714 Datum::String(&owner_id.to_string()),
1715 privileges,
1716 if let Some(create_sql) = &typ.create_sql {
1717 Datum::String(create_sql)
1718 } else {
1719 Datum::Null
1720 },
1721 if let Some(redacted) = &redacted {
1722 Datum::String(redacted)
1723 } else {
1724 Datum::Null
1725 },
1726 ]),
1727 diff,
1728 ));
1729
1730 let mut row = Row::default();
1731 let mut packer = row.packer();
1732
1733 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1734 if mods.is_empty() {
1735 packer.push(Datum::Null);
1736 } else {
1737 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1738 }
1739 }
1740
1741 let index_id = match &typ.details.typ {
1742 CatalogType::Array {
1743 element_reference: element_id,
1744 } => {
1745 packer.push(Datum::String(&id.to_string()));
1746 packer.push(Datum::String(&element_id.to_string()));
1747 &MZ_ARRAY_TYPES
1748 }
1749 CatalogType::List {
1750 element_reference: element_id,
1751 element_modifiers,
1752 } => {
1753 packer.push(Datum::String(&id.to_string()));
1754 packer.push(Datum::String(&element_id.to_string()));
1755 append_modifier(&mut packer, element_modifiers);
1756 &MZ_LIST_TYPES
1757 }
1758 CatalogType::Map {
1759 key_reference: key_id,
1760 value_reference: value_id,
1761 key_modifiers,
1762 value_modifiers,
1763 } => {
1764 packer.push(Datum::String(&id.to_string()));
1765 packer.push(Datum::String(&key_id.to_string()));
1766 packer.push(Datum::String(&value_id.to_string()));
1767 append_modifier(&mut packer, key_modifiers);
1768 append_modifier(&mut packer, value_modifiers);
1769 &MZ_MAP_TYPES
1770 }
1771 CatalogType::Pseudo => {
1772 packer.push(Datum::String(&id.to_string()));
1773 &MZ_PSEUDO_TYPES
1774 }
1775 _ => {
1776 packer.push(Datum::String(&id.to_string()));
1777 &MZ_BASE_TYPES
1778 }
1779 };
1780 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1781
1782 if let Some(pg_metadata) = &typ.details.pg_metadata {
1783 out.push(BuiltinTableUpdate::row(
1784 &*MZ_TYPE_PG_METADATA,
1785 Row::pack_slice(&[
1786 Datum::String(&id.to_string()),
1787 Datum::UInt32(pg_metadata.typinput_oid),
1788 Datum::UInt32(pg_metadata.typreceive_oid),
1789 ]),
1790 diff,
1791 ));
1792 }
1793
1794 out
1795 }
1796
1797 fn pack_func_update(
1798 &self,
1799 id: CatalogItemId,
1800 schema_id: &SchemaSpecifier,
1801 name: &str,
1802 owner_id: &RoleId,
1803 func: &Func,
1804 diff: Diff,
1805 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1806 let mut updates = vec![];
1807 for func_impl_details in func.inner.func_impls() {
1808 let arg_type_ids = func_impl_details
1809 .arg_typs
1810 .iter()
1811 .map(|typ| self.get_system_type(typ).id().to_string())
1812 .collect::<Vec<_>>();
1813
1814 let mut row = Row::default();
1815 row.packer()
1816 .try_push_array(
1817 &[ArrayDimension {
1818 lower_bound: 1,
1819 length: arg_type_ids.len(),
1820 }],
1821 arg_type_ids.iter().map(|id| Datum::String(id)),
1822 )
1823 .expect(
1824 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1825 );
1826 let arg_type_ids = row.unpack_first();
1827
1828 updates.push(BuiltinTableUpdate::row(
1829 &*MZ_FUNCTIONS,
1830 Row::pack_slice(&[
1831 Datum::String(&id.to_string()),
1832 Datum::UInt32(func_impl_details.oid),
1833 Datum::String(&schema_id.to_string()),
1834 Datum::String(name),
1835 arg_type_ids,
1836 Datum::from(
1837 func_impl_details
1838 .variadic_typ
1839 .map(|typ| self.get_system_type(typ).id().to_string())
1840 .as_deref(),
1841 ),
1842 Datum::from(
1843 func_impl_details
1844 .return_typ
1845 .map(|typ| self.get_system_type(typ).id().to_string())
1846 .as_deref(),
1847 ),
1848 func_impl_details.return_is_set.into(),
1849 Datum::String(&owner_id.to_string()),
1850 ]),
1851 diff,
1852 ));
1853
1854 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1855 updates.push(BuiltinTableUpdate::row(
1856 &*MZ_AGGREGATES,
1857 Row::pack_slice(&[
1858 Datum::UInt32(func_impl_details.oid),
1859 Datum::String("n"),
1861 Datum::Int16(0),
1862 ]),
1863 diff,
1864 ));
1865 }
1866 }
1867 updates
1868 }
1869
1870 pub fn pack_op_update(
1871 &self,
1872 operator: &str,
1873 func_impl_details: FuncImplCatalogDetails,
1874 diff: Diff,
1875 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1876 let arg_type_ids = func_impl_details
1877 .arg_typs
1878 .iter()
1879 .map(|typ| self.get_system_type(typ).id().to_string())
1880 .collect::<Vec<_>>();
1881
1882 let mut row = Row::default();
1883 row.packer()
1884 .try_push_array(
1885 &[ArrayDimension {
1886 lower_bound: 1,
1887 length: arg_type_ids.len(),
1888 }],
1889 arg_type_ids.iter().map(|id| Datum::String(id)),
1890 )
1891 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1892 let arg_type_ids = row.unpack_first();
1893
1894 BuiltinTableUpdate::row(
1895 &*MZ_OPERATORS,
1896 Row::pack_slice(&[
1897 Datum::UInt32(func_impl_details.oid),
1898 Datum::String(operator),
1899 arg_type_ids,
1900 Datum::from(
1901 func_impl_details
1902 .return_typ
1903 .map(|typ| self.get_system_type(typ).id().to_string())
1904 .as_deref(),
1905 ),
1906 ]),
1907 diff,
1908 )
1909 }
1910
1911 fn pack_secret_update(
1912 &self,
1913 id: CatalogItemId,
1914 oid: u32,
1915 schema_id: &SchemaSpecifier,
1916 name: &str,
1917 owner_id: &RoleId,
1918 privileges: Datum,
1919 diff: Diff,
1920 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1921 vec![BuiltinTableUpdate::row(
1922 &*MZ_SECRETS,
1923 Row::pack_slice(&[
1924 Datum::String(&id.to_string()),
1925 Datum::UInt32(oid),
1926 Datum::String(&schema_id.to_string()),
1927 Datum::String(name),
1928 Datum::String(&owner_id.to_string()),
1929 privileges,
1930 ]),
1931 diff,
1932 )]
1933 }
1934
1935 pub fn pack_audit_log_update(
1936 &self,
1937 event: &VersionedEvent,
1938 diff: Diff,
1939 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1940 let (event_type, object_type, details, user, occurred_at): (
1941 &EventType,
1942 &ObjectType,
1943 &EventDetails,
1944 &Option<String>,
1945 u64,
1946 ) = match event {
1947 VersionedEvent::V1(ev) => (
1948 &ev.event_type,
1949 &ev.object_type,
1950 &ev.details,
1951 &ev.user,
1952 ev.occurred_at,
1953 ),
1954 };
1955 let details = Jsonb::from_serde_json(details.as_json())
1956 .map_err(|e| {
1957 Error::new(ErrorKind::Unstructured(format!(
1958 "could not pack audit log update: {}",
1959 e
1960 )))
1961 })?
1962 .into_row();
1963 let details = details
1964 .iter()
1965 .next()
1966 .expect("details created above with a single jsonb column");
1967 let dt = mz_ore::now::to_datetime(occurred_at);
1968 let id = event.sortable_id();
1969 Ok(BuiltinTableUpdate::row(
1970 &*MZ_AUDIT_EVENTS,
1971 Row::pack_slice(&[
1972 Datum::UInt64(id),
1973 Datum::String(&format!("{}", event_type)),
1974 Datum::String(&format!("{}", object_type)),
1975 details,
1976 match user {
1977 Some(user) => Datum::String(user),
1978 None => Datum::Null,
1979 },
1980 Datum::TimestampTz(dt.try_into().expect("must fit")),
1981 ]),
1982 diff,
1983 ))
1984 }
1985
1986 pub fn pack_storage_usage_update(
1987 &self,
1988 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1989 diff: Diff,
1990 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1991 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1992 let row = Row::pack_slice(&[
1993 Datum::UInt64(event.id),
1994 Datum::from(event.shard_id.as_deref()),
1995 Datum::UInt64(event.size_bytes),
1996 Datum::TimestampTz(
1997 mz_ore::now::to_datetime(event.collection_timestamp)
1998 .try_into()
1999 .expect("must fit"),
2000 ),
2001 ]);
2002 BuiltinTableUpdate::row(id, row, diff)
2003 }
2004
2005 pub fn pack_egress_ip_update(
2006 &self,
2007 ip: &IpNet,
2008 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
2009 let id = &MZ_EGRESS_IPS;
2010 let addr = ip.addr();
2011 let row = Row::pack_slice(&[
2012 Datum::String(&addr.to_string()),
2013 Datum::Int32(ip.prefix_len().into()),
2014 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
2015 ]);
2016 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
2017 }
2018
2019 pub fn pack_replica_metric_updates(
2020 &self,
2021 replica_id: ReplicaId,
2022 updates: &[ServiceProcessMetrics],
2023 diff: Diff,
2024 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2025 let id = &*MZ_CLUSTER_REPLICA_METRICS;
2026 let rows = updates.iter().enumerate().map(
2027 |(
2028 process_id,
2029 ServiceProcessMetrics {
2030 cpu_nano_cores,
2031 memory_bytes,
2032 disk_usage_bytes,
2033 },
2034 )| {
2035 Row::pack_slice(&[
2036 Datum::String(&replica_id.to_string()),
2037 u64::cast_from(process_id).into(),
2038 (*cpu_nano_cores).into(),
2039 (*memory_bytes).into(),
2040 (*disk_usage_bytes).into(),
2041 ])
2042 },
2043 );
2044 let updates = rows
2045 .map(|row| BuiltinTableUpdate::row(id, row, diff))
2046 .collect();
2047 updates
2048 }
2049
2050 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2051 let id = &*MZ_CLUSTER_REPLICA_SIZES;
2052 let updates = self
2053 .cluster_replica_sizes
2054 .0
2055 .iter()
2056 .filter(|(_, a)| !a.disabled)
2057 .map(
2058 |(
2059 size,
2060 ReplicaAllocation {
2061 memory_limit,
2062 cpu_limit,
2063 disk_limit,
2064 scale,
2065 workers,
2066 credits_per_hour,
2067 cpu_exclusive: _,
2068 is_cc: _,
2069 disabled: _,
2070 selectors: _,
2071 },
2072 )| {
2073 let cpu_limit = cpu_limit.unwrap_or(CpuLimit::MAX);
2076 let MemoryLimit(ByteSize(memory_bytes)) =
2077 (*memory_limit).unwrap_or(MemoryLimit::MAX);
2078 let DiskLimit(ByteSize(disk_bytes)) =
2079 (*disk_limit).unwrap_or(DiskLimit::ARBITRARY);
2080 let row = Row::pack_slice(&[
2081 size.as_str().into(),
2082 u64::from(*scale).into(),
2083 u64::cast_from(*workers).into(),
2084 cpu_limit.as_nanocpus().into(),
2085 memory_bytes.into(),
2086 disk_bytes.into(),
2088 (*credits_per_hour).into(),
2089 ]);
2090 BuiltinTableUpdate::row(id, row, Diff::ONE)
2091 },
2092 )
2093 .collect();
2094 updates
2095 }
2096
2097 pub fn pack_subscribe_update(
2098 &self,
2099 id: GlobalId,
2100 subscribe: &ActiveSubscribe,
2101 diff: Diff,
2102 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2103 let mut row = Row::default();
2104 let mut packer = row.packer();
2105 packer.push(Datum::String(&id.to_string()));
2106 packer.push(Datum::Uuid(subscribe.session_uuid));
2107 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2108
2109 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2110 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2111
2112 let depends_on: Vec<_> = subscribe
2113 .depends_on
2114 .iter()
2115 .map(|id| id.to_string())
2116 .collect();
2117 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2118
2119 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2120 }
2121
2122 pub fn pack_session_update(
2123 &self,
2124 conn: &ConnMeta,
2125 diff: Diff,
2126 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2127 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2128 BuiltinTableUpdate::row(
2129 &*MZ_SESSIONS,
2130 Row::pack_slice(&[
2131 Datum::Uuid(conn.uuid()),
2132 Datum::UInt32(conn.conn_id().unhandled()),
2133 Datum::String(&conn.authenticated_role_id().to_string()),
2134 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2135 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2136 ]),
2137 diff,
2138 )
2139 }
2140
2141 pub fn pack_default_privileges_update(
2142 &self,
2143 default_privilege_object: &DefaultPrivilegeObject,
2144 grantee: &RoleId,
2145 acl_mode: &AclMode,
2146 diff: Diff,
2147 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2148 BuiltinTableUpdate::row(
2149 &*MZ_DEFAULT_PRIVILEGES,
2150 Row::pack_slice(&[
2151 default_privilege_object.role_id.to_string().as_str().into(),
2152 default_privilege_object
2153 .database_id
2154 .map(|database_id| database_id.to_string())
2155 .as_deref()
2156 .into(),
2157 default_privilege_object
2158 .schema_id
2159 .map(|schema_id| schema_id.to_string())
2160 .as_deref()
2161 .into(),
2162 default_privilege_object
2163 .object_type
2164 .to_string()
2165 .to_lowercase()
2166 .as_str()
2167 .into(),
2168 grantee.to_string().as_str().into(),
2169 acl_mode.to_string().as_str().into(),
2170 ]),
2171 diff,
2172 )
2173 }
2174
2175 pub fn pack_system_privileges_update(
2176 &self,
2177 privileges: MzAclItem,
2178 diff: Diff,
2179 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2180 BuiltinTableUpdate::row(
2181 &*MZ_SYSTEM_PRIVILEGES,
2182 Row::pack_slice(&[privileges.into()]),
2183 diff,
2184 )
2185 }
2186
2187 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2188 let mut row = Row::default();
2189 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2190 row.packer()
2191 .try_push_array(
2192 &[ArrayDimension {
2193 lower_bound: 1,
2194 length: flat_privileges.len(),
2195 }],
2196 flat_privileges
2197 .into_iter()
2198 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2199 )
2200 .expect("privileges is 1 dimensional, and its length is used for the array length");
2201 row
2202 }
2203
2204 pub fn pack_comment_update(
2205 &self,
2206 object_id: CommentObjectId,
2207 column_pos: Option<usize>,
2208 comment: &str,
2209 diff: Diff,
2210 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2211 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2213 let audit_type = super::object_type_to_audit_object_type(object_type);
2214 let object_type_str = audit_type.to_string();
2215
2216 let object_id_str = match object_id {
2217 CommentObjectId::Table(global_id)
2218 | CommentObjectId::View(global_id)
2219 | CommentObjectId::MaterializedView(global_id)
2220 | CommentObjectId::Source(global_id)
2221 | CommentObjectId::Sink(global_id)
2222 | CommentObjectId::Index(global_id)
2223 | CommentObjectId::Func(global_id)
2224 | CommentObjectId::Connection(global_id)
2225 | CommentObjectId::Secret(global_id)
2226 | CommentObjectId::Type(global_id)
2227 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2228 CommentObjectId::Role(role_id) => role_id.to_string(),
2229 CommentObjectId::Database(database_id) => database_id.to_string(),
2230 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2231 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2232 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2233 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2234 };
2235 let column_pos_datum = match column_pos {
2236 Some(pos) => {
2237 let pos =
2239 i32::try_from(pos).expect("we constrain this value in the planning layer");
2240 Datum::Int32(pos)
2241 }
2242 None => Datum::Null,
2243 };
2244
2245 BuiltinTableUpdate::row(
2246 &*MZ_COMMENTS,
2247 Row::pack_slice(&[
2248 Datum::String(&object_id_str),
2249 Datum::String(&object_type_str),
2250 column_pos_datum,
2251 Datum::String(comment),
2252 ]),
2253 diff,
2254 )
2255 }
2256
2257 pub fn pack_webhook_source_update(
2258 &self,
2259 item_id: CatalogItemId,
2260 diff: Diff,
2261 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2262 let url = self
2263 .try_get_webhook_url(&item_id)
2264 .expect("webhook source should exist");
2265 let url = url.to_string();
2266 let name = &self.get_entry(&item_id).name().item;
2267 let id_str = item_id.to_string();
2268
2269 BuiltinTableUpdate::row(
2270 &*MZ_WEBHOOKS_SOURCES,
2271 Row::pack_slice(&[
2272 Datum::String(&id_str),
2273 Datum::String(name),
2274 Datum::String(&url),
2275 ]),
2276 diff,
2277 )
2278 }
2279
2280 pub fn pack_source_references_update(
2281 &self,
2282 source_references: &SourceReferences,
2283 diff: Diff,
2284 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2285 let source_id = source_references.source_id.to_string();
2286 let updated_at = &source_references.updated_at;
2287 source_references
2288 .references
2289 .iter()
2290 .map(|reference| {
2291 let mut row = Row::default();
2292 let mut packer = row.packer();
2293 packer.extend([
2294 Datum::String(&source_id),
2295 reference
2296 .namespace
2297 .as_ref()
2298 .map(|s| Datum::String(s))
2299 .unwrap_or(Datum::Null),
2300 Datum::String(&reference.name),
2301 Datum::TimestampTz(
2302 mz_ore::now::to_datetime(*updated_at)
2303 .try_into()
2304 .expect("must fit"),
2305 ),
2306 ]);
2307 if reference.columns.len() > 0 {
2308 packer
2309 .try_push_array(
2310 &[ArrayDimension {
2311 lower_bound: 1,
2312 length: reference.columns.len(),
2313 }],
2314 reference.columns.iter().map(|col| Datum::String(col)),
2315 )
2316 .expect(
2317 "columns is 1 dimensional, and its length is used for the array length",
2318 );
2319 } else {
2320 packer.push(Datum::Null);
2321 }
2322
2323 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2324 })
2325 .collect()
2326 }
2327}