1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZES, MZ_CLUSTER_REPLICAS,
20 MZ_CLUSTER_SCHEDULES, MZ_CLUSTERS, MZ_COLUMNS, MZ_COMMENTS, MZ_CONNECTIONS, MZ_CONTINUAL_TASKS,
21 MZ_DEFAULT_PRIVILEGES, MZ_EGRESS_IPS, MZ_FUNCTIONS, MZ_HISTORY_RETENTION_STRATEGIES,
22 MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_INDEXES, MZ_KAFKA_CONNECTIONS, MZ_KAFKA_SINKS,
23 MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES, MZ_MAP_TYPES,
24 MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MATERIALIZED_VIEWS, MZ_MYSQL_SOURCE_TABLES,
25 MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES,
26 MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_ROLE_PARAMETERS,
27 MZ_ROLES, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCE_REFERENCES, MZ_SOURCES,
28 MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD,
29 MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES, MZ_VIEWS,
30 MZ_WEBHOOKS_SOURCES,
31};
32use mz_catalog::config::AwsPrincipalContext;
33use mz_catalog::durable::SourceReferences;
34use mz_catalog::memory::error::{Error, ErrorKind};
35use mz_catalog::memory::objects::{
36 CatalogEntry, CatalogItem, ClusterVariant, Connection, ContinualTask, DataSourceDesc, Func,
37 Index, MaterializedView, Sink, Table, TableDataSource, Type, View,
38};
39use mz_controller::clusters::{
40 ManagedReplicaAvailabilityZones, ManagedReplicaLocation, ReplicaLocation,
41};
42use mz_controller_types::ClusterId;
43use mz_expr::MirScalarExpr;
44use mz_license_keys::ValidatedLicenseKey;
45use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
46use mz_ore::cast::CastFrom;
47use mz_ore::collections::CollectionExt;
48use mz_persist_client::batch::ProtoBatch;
49use mz_repr::adt::array::ArrayDimension;
50use mz_repr::adt::interval::Interval;
51use mz_repr::adt::jsonb::Jsonb;
52use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem, PrivilegeMap};
53use mz_repr::refresh_schedule::RefreshEvery;
54use mz_repr::role_id::RoleId;
55use mz_repr::{
56 CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
57};
58use mz_sql::ast::{ContinualTaskStmt, CreateIndexStatement, Statement, UnresolvedItemName};
59use mz_sql::catalog::{CatalogCluster, CatalogType, DefaultPrivilegeObject, TypeCategory};
60use mz_sql::func::FuncImplCatalogDetails;
61use mz_sql::names::{CommentObjectId, SchemaSpecifier};
62use mz_sql::plan::{ClusterSchedule, ConnectionDetails, SshKey};
63use mz_sql::session::user::{MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
64use mz_sql::session::vars::SessionVars;
65use mz_sql_parser::ast::display::AstDisplay;
66use mz_storage_client::client::TableData;
67use mz_storage_types::connections::KafkaConnection;
68use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
69use mz_storage_types::connections::inline::ReferencedConnection;
70use mz_storage_types::connections::string_or_secret::StringOrSecret;
71use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
72use mz_storage_types::sources::{
73 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
74};
75use smallvec::smallvec;
76
77use crate::active_compute_sink::ActiveSubscribe;
79use crate::catalog::CatalogState;
80use crate::coord::ConnMeta;
81
82#[derive(Debug, Clone)]
84pub struct BuiltinTableUpdate<T = CatalogItemId> {
85 pub id: T,
87 pub data: TableData,
89}
90
91impl<T> BuiltinTableUpdate<T> {
92 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
94 BuiltinTableUpdate {
95 id,
96 data: TableData::Rows(vec![(row, diff)]),
97 }
98 }
99
100 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
101 BuiltinTableUpdate {
102 id,
103 data: TableData::Batches(smallvec![batch]),
104 }
105 }
106}
107
108impl CatalogState {
109 pub fn resolve_builtin_table_updates(
110 &self,
111 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
112 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
113 builtin_table_update
114 .into_iter()
115 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
116 .collect()
117 }
118
119 pub fn resolve_builtin_table_update(
120 &self,
121 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
122 ) -> BuiltinTableUpdate<CatalogItemId> {
123 let id = self.resolve_builtin_table(id);
124 BuiltinTableUpdate { id, data }
125 }
126
127 pub fn pack_depends_update(
128 &self,
129 depender: CatalogItemId,
130 dependee: CatalogItemId,
131 diff: Diff,
132 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
133 let row = Row::pack_slice(&[
134 Datum::String(&depender.to_string()),
135 Datum::String(&dependee.to_string()),
136 ]);
137 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
138 }
139
140 pub(super) fn pack_role_auth_update(
141 &self,
142 id: RoleId,
143 diff: Diff,
144 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
145 let role_auth = self.get_role_auth(&id);
146 let role = self.get_role(&id);
147 BuiltinTableUpdate::row(
148 &*MZ_ROLE_AUTH,
149 Row::pack_slice(&[
150 Datum::String(&role_auth.role_id.to_string()),
151 Datum::UInt32(role.oid),
152 match &role_auth.password_hash {
153 Some(hash) => Datum::String(hash),
154 None => Datum::Null,
155 },
156 Datum::TimestampTz(
157 mz_ore::now::to_datetime(role_auth.updated_at)
158 .try_into()
159 .expect("must fit"),
160 ),
161 ]),
162 diff,
163 )
164 }
165
166 pub(super) fn pack_role_update(
167 &self,
168 id: RoleId,
169 diff: Diff,
170 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
171 match id {
172 RoleId::Public => vec![],
174 id => {
175 let role = self.get_role(&id);
176 let builtin_supers = [MZ_SYSTEM_ROLE_ID, MZ_SUPPORT_ROLE_ID];
177
178 let rolcanlogin = if let Some(login) = role.attributes.login {
179 login
180 } else {
181 builtin_supers.contains(&role.id)
182 };
183
184 let rolsuper = if let Some(superuser) = role.attributes.superuser {
185 Datum::from(superuser)
186 } else if builtin_supers.contains(&role.id) {
187 Datum::from(true)
189 } else {
190 Datum::Null
199 };
200
201 let role_update = BuiltinTableUpdate::row(
202 &*MZ_ROLES,
203 Row::pack_slice(&[
204 Datum::String(&role.id.to_string()),
205 Datum::UInt32(role.oid),
206 Datum::String(&role.name),
207 Datum::from(role.attributes.inherit),
208 Datum::from(rolcanlogin),
209 rolsuper,
210 ]),
211 diff,
212 );
213 let mut updates = vec![role_update];
214
215 let session_vars_reference = SessionVars::new_unchecked(
218 &mz_build_info::DUMMY_BUILD_INFO,
219 SYSTEM_USER.clone(),
220 None,
221 );
222
223 for (name, val) in role.vars() {
224 let result = session_vars_reference
225 .inspect(name)
226 .and_then(|var| var.check(val.borrow()));
227 let Ok(formatted_val) = result else {
228 tracing::error!(?name, ?val, ?result, "found invalid role default var");
231 continue;
232 };
233
234 let role_var_update = BuiltinTableUpdate::row(
235 &*MZ_ROLE_PARAMETERS,
236 Row::pack_slice(&[
237 Datum::String(&role.id.to_string()),
238 Datum::String(name),
239 Datum::String(&formatted_val),
240 ]),
241 diff,
242 );
243 updates.push(role_var_update);
244 }
245
246 updates
247 }
248 }
249 }
250
251 pub(super) fn pack_cluster_update(
252 &self,
253 name: &str,
254 diff: Diff,
255 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
256 let id = self.clusters_by_name[name];
257 let cluster = &self.clusters_by_id[&id];
258 let row = self.pack_privilege_array_row(cluster.privileges());
259 let privileges = row.unpack_first();
260 let (size, disk, replication_factor, azs, introspection_debugging, introspection_interval) =
261 match &cluster.config.variant {
262 ClusterVariant::Managed(config) => (
263 Some(config.size.as_str()),
264 Some(self.cluster_replica_size_has_disk(&config.size)),
265 Some(config.replication_factor),
266 if config.availability_zones.is_empty() {
267 None
268 } else {
269 Some(config.availability_zones.clone())
270 },
271 Some(config.logging.log_logging),
272 config.logging.interval.map(|d| {
273 Interval::from_duration(&d)
274 .expect("planning ensured this convertible back to interval")
275 }),
276 ),
277 ClusterVariant::Unmanaged => (None, None, None, None, None, None),
278 };
279
280 let mut row = Row::default();
281 let mut packer = row.packer();
282 packer.extend([
283 Datum::String(&id.to_string()),
284 Datum::String(name),
285 Datum::String(&cluster.owner_id.to_string()),
286 privileges,
287 cluster.is_managed().into(),
288 size.into(),
289 replication_factor.into(),
290 disk.into(),
291 ]);
292 if let Some(azs) = azs {
293 packer.push_list(azs.iter().map(|az| Datum::String(az)));
294 } else {
295 packer.push(Datum::Null);
296 }
297 packer.push(Datum::from(introspection_debugging));
298 packer.push(Datum::from(introspection_interval));
299
300 let mut updates = Vec::new();
301
302 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTERS, row, diff));
303
304 if let ClusterVariant::Managed(managed_config) = &cluster.config.variant {
305 let row = match managed_config.schedule {
306 ClusterSchedule::Manual => Row::pack_slice(&[
307 Datum::String(&id.to_string()),
308 Datum::String("manual"),
309 Datum::Null,
310 ]),
311 ClusterSchedule::Refresh {
312 hydration_time_estimate,
313 } => Row::pack_slice(&[
314 Datum::String(&id.to_string()),
315 Datum::String("on-refresh"),
316 Datum::Interval(
317 Interval::from_duration(&hydration_time_estimate)
318 .expect("planning ensured that this is convertible back to Interval"),
319 ),
320 ]),
321 };
322 updates.push(BuiltinTableUpdate::row(&*MZ_CLUSTER_SCHEDULES, row, diff));
323 }
324
325 updates
326 }
327
328 pub(super) fn pack_cluster_replica_update(
329 &self,
330 cluster_id: ClusterId,
331 name: &str,
332 diff: Diff,
333 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
334 let cluster = &self.clusters_by_id[&cluster_id];
335 let id = cluster.replica_id(name).expect("Must exist");
336 let replica = cluster.replica(id).expect("Must exist");
337
338 let (size, disk, az) = match &replica.config.location {
339 ReplicaLocation::Managed(ManagedReplicaLocation {
342 size,
343 availability_zones: ManagedReplicaAvailabilityZones::FromReplica(Some(az)),
344 allocation: _,
345 billed_as: _,
346 internal: _,
347 pending: _,
348 }) => (
349 Some(&**size),
350 Some(self.cluster_replica_size_has_disk(size)),
351 Some(az.as_str()),
352 ),
353 ReplicaLocation::Managed(ManagedReplicaLocation {
354 size,
355 availability_zones: _,
356 allocation: _,
357 billed_as: _,
358 internal: _,
359 pending: _,
360 }) => (
361 Some(&**size),
362 Some(self.cluster_replica_size_has_disk(size)),
363 None,
364 ),
365 ReplicaLocation::Unmanaged(_) => (None, None, None),
366 };
367
368 let cluster_replica_update = BuiltinTableUpdate::row(
369 &*MZ_CLUSTER_REPLICAS,
370 Row::pack_slice(&[
371 Datum::String(&id.to_string()),
372 Datum::String(name),
373 Datum::String(&cluster_id.to_string()),
374 Datum::from(size),
375 Datum::from(az),
376 Datum::String(&replica.owner_id.to_string()),
377 Datum::from(disk),
378 ]),
379 diff,
380 );
381
382 vec![cluster_replica_update]
383 }
384
385 pub(super) fn pack_item_update(
386 &self,
387 id: CatalogItemId,
388 diff: Diff,
389 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
390 let entry = self.get_entry(&id);
391 let oid = entry.oid();
392 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
393 let schema_id = &self
394 .get_schema(
395 &entry.name().qualifiers.database_spec,
396 &entry.name().qualifiers.schema_spec,
397 conn_id,
398 )
399 .id;
400 let name = &entry.name().item;
401 let owner_id = entry.owner_id();
402 let privileges_row = self.pack_privilege_array_row(entry.privileges());
403 let privileges = privileges_row.unpack_first();
404 let mut updates = match entry.item() {
405 CatalogItem::Log(_) => self.pack_source_update(
406 id, oid, schema_id, name, "log", None, None, None, None, None, owner_id,
407 privileges, diff, None,
408 ),
409 CatalogItem::Index(index) => {
410 self.pack_index_update(id, oid, name, owner_id, index, diff)
411 }
412 CatalogItem::Table(table) => {
413 let mut updates = self
414 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
415
416 if let TableDataSource::DataSource {
417 desc: data_source,
418 timeline: _,
419 } = &table.data_source
420 {
421 updates.extend(match data_source {
422 DataSourceDesc::IngestionExport {
423 ingestion_id,
424 external_reference: UnresolvedItemName(external_reference),
425 details: _,
426 data_config: _,
427 } => {
428 let ingestion_entry = self
429 .get_entry(ingestion_id)
430 .source_desc()
431 .expect("primary source exists")
432 .expect("primary source is a source");
433
434 match ingestion_entry.connection.name() {
435 "postgres" => {
436 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
437 let schema_name = external_reference[1].as_str();
443 let table_name = external_reference[2].as_str();
444
445 self.pack_postgres_source_tables_update(
446 id,
447 schema_name,
448 table_name,
449 diff,
450 )
451 }
452 "mysql" => {
453 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
454 let schema_name = external_reference[0].as_str();
455 let table_name = external_reference[1].as_str();
456
457 self.pack_mysql_source_tables_update(
458 id,
459 schema_name,
460 table_name,
461 diff,
462 )
463 }
464 "sql-server" => {
465 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
466 let schema_name = external_reference[1].as_str();
471 let table_name = external_reference[2].as_str();
472
473 self.pack_sql_server_source_table_update(
474 id,
475 schema_name,
476 table_name,
477 diff,
478 )
479 }
480 "load-generator" => vec![],
483 "kafka" => {
484 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
485 let topic = external_reference[0].as_str();
486 let envelope = data_source.envelope();
487 let (key_format, value_format) = data_source.formats();
488
489 self.pack_kafka_source_tables_update(
490 id,
491 topic,
492 envelope,
493 key_format,
494 value_format,
495 diff,
496 )
497 }
498 s => unreachable!("{s} sources do not have tables"),
499 }
500 }
501 DataSourceDesc::Ingestion { .. }
502 | DataSourceDesc::OldSyntaxIngestion { .. }
503 | DataSourceDesc::Introspection(_)
504 | DataSourceDesc::Progress
505 | DataSourceDesc::Webhook { .. }
506 | DataSourceDesc::Catalog => vec![],
507 });
508 }
509
510 updates
511 }
512 CatalogItem::Source(source) => {
513 let source_type = source.source_type();
514 let connection_id = source.connection_id();
515 let envelope = source.data_source.envelope();
516 let cluster_entry = match source.data_source {
517 DataSourceDesc::IngestionExport { ingestion_id, .. } => {
520 self.get_entry(&ingestion_id)
521 }
522 DataSourceDesc::Ingestion { .. }
523 | DataSourceDesc::OldSyntaxIngestion { .. }
524 | DataSourceDesc::Introspection(_)
525 | DataSourceDesc::Progress
526 | DataSourceDesc::Webhook { .. }
527 | DataSourceDesc::Catalog => entry,
528 };
529
530 let cluster_id = cluster_entry.item().cluster_id().map(|id| id.to_string());
531
532 let (key_format, value_format) = source.data_source.formats();
533
534 let mut updates = self.pack_source_update(
535 id,
536 oid,
537 schema_id,
538 name,
539 source_type,
540 connection_id,
541 envelope,
542 key_format,
543 value_format,
544 cluster_id.as_deref(),
545 owner_id,
546 privileges,
547 diff,
548 source.create_sql.as_ref(),
549 );
550
551 updates.extend(match &source.data_source {
552 DataSourceDesc::Ingestion { desc, .. }
553 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
554 GenericSourceConnection::Postgres(postgres) => {
555 self.pack_postgres_source_update(id, postgres, diff)
556 }
557 GenericSourceConnection::Kafka(kafka) => {
558 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
559 }
560 _ => vec![],
561 },
562 DataSourceDesc::IngestionExport {
563 ingestion_id,
564 external_reference: UnresolvedItemName(external_reference),
565 details: _,
566 data_config: _,
567 } => {
568 let ingestion_entry = self
569 .get_entry(ingestion_id)
570 .source_desc()
571 .expect("primary source exists")
572 .expect("primary source is a source");
573
574 match ingestion_entry.connection.name() {
575 "postgres" => {
576 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
577 let schema_name = external_reference[1].as_str();
583 let table_name = external_reference[2].as_str();
584
585 self.pack_postgres_source_tables_update(
586 id,
587 schema_name,
588 table_name,
589 diff,
590 )
591 }
592 "mysql" => {
593 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
594 let schema_name = external_reference[0].as_str();
595 let table_name = external_reference[1].as_str();
596
597 self.pack_mysql_source_tables_update(
598 id,
599 schema_name,
600 table_name,
601 diff,
602 )
603 }
604 "sql-server" => {
605 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
606 let schema_name = external_reference[1].as_str();
611 let table_name = external_reference[2].as_str();
612
613 self.pack_sql_server_source_table_update(
614 id,
615 schema_name,
616 table_name,
617 diff,
618 )
619 }
620 "load-generator" => vec![],
623 s => unreachable!("{s} sources do not have subsources"),
624 }
625 }
626 DataSourceDesc::Webhook { .. } => {
627 vec![self.pack_webhook_source_update(id, diff)]
628 }
629 DataSourceDesc::Introspection(_)
630 | DataSourceDesc::Progress
631 | DataSourceDesc::Catalog => vec![],
632 });
633
634 updates
635 }
636 CatalogItem::View(view) => {
637 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
638 }
639 CatalogItem::MaterializedView(mview) => self.pack_materialized_view_update(
640 id, oid, schema_id, name, owner_id, privileges, mview, diff,
641 ),
642 CatalogItem::Sink(sink) => {
643 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
644 }
645 CatalogItem::Type(ty) => {
646 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
647 }
648 CatalogItem::Func(func) => {
649 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
650 }
651 CatalogItem::Secret(_) => {
652 self.pack_secret_update(id, oid, schema_id, name, owner_id, privileges, diff)
653 }
654 CatalogItem::Connection(connection) => self.pack_connection_update(
655 id, oid, schema_id, name, owner_id, privileges, connection, diff,
656 ),
657 CatalogItem::ContinualTask(ct) => self.pack_continual_task_update(
658 id, oid, schema_id, name, owner_id, privileges, ct, diff,
659 ),
660 };
661
662 if !entry.item().is_temporary() {
663 for dependee in entry.item().references().items() {
666 updates.push(self.pack_depends_update(id, *dependee, diff))
667 }
668 }
669
670 if let Some(desc) = entry.relation_desc_latest() {
672 let defaults = match entry.item() {
673 CatalogItem::Table(Table {
674 data_source: TableDataSource::TableWrites { defaults },
675 ..
676 }) => Some(defaults),
677 _ => None,
678 };
679 for (i, (column_name, column_type)) in desc.iter().enumerate() {
680 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
681 let default: Datum = default
682 .as_ref()
683 .map(|d| Datum::String(d))
684 .unwrap_or(Datum::Null);
685 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
686 let (type_name, type_oid) = match &column_type.scalar_type {
687 SqlScalarType::List {
688 custom_id: Some(custom_id),
689 ..
690 }
691 | SqlScalarType::Map {
692 custom_id: Some(custom_id),
693 ..
694 }
695 | SqlScalarType::Record {
696 custom_id: Some(custom_id),
697 ..
698 } => {
699 let entry = self.get_entry(custom_id);
700 let name = &*entry.name().item;
715 let oid = entry.oid();
716 (name, oid)
717 }
718 _ => (pgtype.name(), pgtype.oid()),
719 };
720 updates.push(BuiltinTableUpdate::row(
721 &*MZ_COLUMNS,
722 Row::pack_slice(&[
723 Datum::String(&id.to_string()),
724 Datum::String(column_name),
725 Datum::UInt64(u64::cast_from(i + 1)),
726 Datum::from(column_type.nullable),
727 Datum::String(type_name),
728 default,
729 Datum::UInt32(type_oid),
730 Datum::Int32(pgtype.typmod()),
731 ]),
732 diff,
733 ));
734 }
735 }
736
737 if let Some(cw) = entry.item().initial_logical_compaction_window() {
739 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
740 }
741
742 updates.extend(Self::pack_item_global_id_update(entry, diff));
743
744 updates
745 }
746
747 fn pack_item_global_id_update(
748 entry: &CatalogEntry,
749 diff: Diff,
750 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
751 let id = entry.id().to_string();
752 let global_ids = entry.global_ids();
753 global_ids.map(move |global_id| {
754 BuiltinTableUpdate::row(
755 &*MZ_OBJECT_GLOBAL_IDS,
756 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
757 diff,
758 )
759 })
760 }
761
762 fn pack_history_retention_strategy_update(
763 &self,
764 id: CatalogItemId,
765 cw: CompactionWindow,
766 diff: Diff,
767 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
768 let cw: u64 = cw.comparable_timestamp().into();
769 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
770 .expect("must serialize");
771 BuiltinTableUpdate::row(
772 &*MZ_HISTORY_RETENTION_STRATEGIES,
773 Row::pack_slice(&[
774 Datum::String(&id.to_string()),
775 Datum::String("FOR"),
777 cw.into_row().into_element(),
778 ]),
779 diff,
780 )
781 }
782
783 fn pack_table_update(
784 &self,
785 id: CatalogItemId,
786 oid: u32,
787 schema_id: &SchemaSpecifier,
788 name: &str,
789 owner_id: &RoleId,
790 privileges: Datum,
791 diff: Diff,
792 table: &Table,
793 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
794 let redacted = table.create_sql.as_ref().map(|create_sql| {
795 mz_sql::parse::parse(create_sql)
796 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
797 .into_element()
798 .ast
799 .to_ast_string_redacted()
800 });
801 let source_id = if let TableDataSource::DataSource {
802 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
803 ..
804 } = &table.data_source
805 {
806 Some(ingestion_id.to_string())
807 } else {
808 None
809 };
810
811 vec![BuiltinTableUpdate::row(
812 &*MZ_TABLES,
813 Row::pack_slice(&[
814 Datum::String(&id.to_string()),
815 Datum::UInt32(oid),
816 Datum::String(&schema_id.to_string()),
817 Datum::String(name),
818 Datum::String(&owner_id.to_string()),
819 privileges,
820 if let Some(create_sql) = &table.create_sql {
821 Datum::String(create_sql)
822 } else {
823 Datum::Null
824 },
825 if let Some(redacted) = &redacted {
826 Datum::String(redacted)
827 } else {
828 Datum::Null
829 },
830 if let Some(source_id) = source_id.as_ref() {
831 Datum::String(source_id)
832 } else {
833 Datum::Null
834 },
835 ]),
836 diff,
837 )]
838 }
839
840 fn pack_source_update(
841 &self,
842 id: CatalogItemId,
843 oid: u32,
844 schema_id: &SchemaSpecifier,
845 name: &str,
846 source_desc_name: &str,
847 connection_id: Option<CatalogItemId>,
848 envelope: Option<&str>,
849 key_format: Option<&str>,
850 value_format: Option<&str>,
851 cluster_id: Option<&str>,
852 owner_id: &RoleId,
853 privileges: Datum,
854 diff: Diff,
855 create_sql: Option<&String>,
856 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
857 let redacted = create_sql.map(|create_sql| {
858 let create_stmt = mz_sql::parse::parse(create_sql)
859 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
860 .into_element()
861 .ast;
862 create_stmt.to_ast_string_redacted()
863 });
864 vec![BuiltinTableUpdate::row(
865 &*MZ_SOURCES,
866 Row::pack_slice(&[
867 Datum::String(&id.to_string()),
868 Datum::UInt32(oid),
869 Datum::String(&schema_id.to_string()),
870 Datum::String(name),
871 Datum::String(source_desc_name),
872 Datum::from(connection_id.map(|id| id.to_string()).as_deref()),
873 Datum::Null,
876 Datum::from(envelope),
877 Datum::from(key_format),
878 Datum::from(value_format),
879 Datum::from(cluster_id),
880 Datum::String(&owner_id.to_string()),
881 privileges,
882 if let Some(create_sql) = create_sql {
883 Datum::String(create_sql)
884 } else {
885 Datum::Null
886 },
887 if let Some(redacted) = &redacted {
888 Datum::String(redacted)
889 } else {
890 Datum::Null
891 },
892 ]),
893 diff,
894 )]
895 }
896
897 fn pack_postgres_source_update(
898 &self,
899 id: CatalogItemId,
900 postgres: &PostgresSourceConnection<ReferencedConnection>,
901 diff: Diff,
902 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
903 vec![BuiltinTableUpdate::row(
904 &*MZ_POSTGRES_SOURCES,
905 Row::pack_slice(&[
906 Datum::String(&id.to_string()),
907 Datum::String(&postgres.publication_details.slot),
908 Datum::from(postgres.publication_details.timeline_id),
909 ]),
910 diff,
911 )]
912 }
913
914 fn pack_kafka_source_update(
915 &self,
916 item_id: CatalogItemId,
917 collection_id: GlobalId,
918 kafka: &KafkaSourceConnection<ReferencedConnection>,
919 diff: Diff,
920 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
921 vec![BuiltinTableUpdate::row(
922 &*MZ_KAFKA_SOURCES,
923 Row::pack_slice(&[
924 Datum::String(&item_id.to_string()),
925 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
926 Datum::String(&kafka.topic),
927 ]),
928 diff,
929 )]
930 }
931
932 fn pack_postgres_source_tables_update(
933 &self,
934 id: CatalogItemId,
935 schema_name: &str,
936 table_name: &str,
937 diff: Diff,
938 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
939 vec![BuiltinTableUpdate::row(
940 &*MZ_POSTGRES_SOURCE_TABLES,
941 Row::pack_slice(&[
942 Datum::String(&id.to_string()),
943 Datum::String(schema_name),
944 Datum::String(table_name),
945 ]),
946 diff,
947 )]
948 }
949
950 fn pack_mysql_source_tables_update(
951 &self,
952 id: CatalogItemId,
953 schema_name: &str,
954 table_name: &str,
955 diff: Diff,
956 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
957 vec![BuiltinTableUpdate::row(
958 &*MZ_MYSQL_SOURCE_TABLES,
959 Row::pack_slice(&[
960 Datum::String(&id.to_string()),
961 Datum::String(schema_name),
962 Datum::String(table_name),
963 ]),
964 diff,
965 )]
966 }
967
968 fn pack_sql_server_source_table_update(
969 &self,
970 id: CatalogItemId,
971 schema_name: &str,
972 table_name: &str,
973 diff: Diff,
974 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
975 vec![BuiltinTableUpdate::row(
976 &*MZ_SQL_SERVER_SOURCE_TABLES,
977 Row::pack_slice(&[
978 Datum::String(&id.to_string()),
979 Datum::String(schema_name),
980 Datum::String(table_name),
981 ]),
982 diff,
983 )]
984 }
985
986 fn pack_kafka_source_tables_update(
987 &self,
988 id: CatalogItemId,
989 topic: &str,
990 envelope: Option<&str>,
991 key_format: Option<&str>,
992 value_format: Option<&str>,
993 diff: Diff,
994 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
995 vec![BuiltinTableUpdate::row(
996 &*MZ_KAFKA_SOURCE_TABLES,
997 Row::pack_slice(&[
998 Datum::String(&id.to_string()),
999 Datum::String(topic),
1000 Datum::from(envelope),
1001 Datum::from(key_format),
1002 Datum::from(value_format),
1003 ]),
1004 diff,
1005 )]
1006 }
1007
1008 fn pack_connection_update(
1009 &self,
1010 id: CatalogItemId,
1011 oid: u32,
1012 schema_id: &SchemaSpecifier,
1013 name: &str,
1014 owner_id: &RoleId,
1015 privileges: Datum,
1016 connection: &Connection,
1017 diff: Diff,
1018 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1019 let create_stmt = mz_sql::parse::parse(&connection.create_sql)
1020 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", connection.create_sql))
1021 .into_element()
1022 .ast;
1023 let mut updates = vec![BuiltinTableUpdate::row(
1024 &*MZ_CONNECTIONS,
1025 Row::pack_slice(&[
1026 Datum::String(&id.to_string()),
1027 Datum::UInt32(oid),
1028 Datum::String(&schema_id.to_string()),
1029 Datum::String(name),
1030 Datum::String(match connection.details {
1031 ConnectionDetails::Kafka { .. } => "kafka",
1032 ConnectionDetails::Csr { .. } => "confluent-schema-registry",
1033 ConnectionDetails::Postgres { .. } => "postgres",
1034 ConnectionDetails::Aws(..) => "aws",
1035 ConnectionDetails::AwsPrivatelink(..) => "aws-privatelink",
1036 ConnectionDetails::Ssh { .. } => "ssh-tunnel",
1037 ConnectionDetails::MySql { .. } => "mysql",
1038 ConnectionDetails::SqlServer(_) => "sql-server",
1039 ConnectionDetails::IcebergCatalog(_) => "iceberg-catalog",
1040 }),
1041 Datum::String(&owner_id.to_string()),
1042 privileges,
1043 Datum::String(&connection.create_sql),
1044 Datum::String(&create_stmt.to_ast_string_redacted()),
1045 ]),
1046 diff,
1047 )];
1048 match connection.details {
1049 ConnectionDetails::Kafka(ref kafka) => {
1050 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
1051 }
1052 ConnectionDetails::Aws(ref aws_config) => {
1053 match self.pack_aws_connection_update(id, aws_config, diff) {
1054 Ok(update) => {
1055 updates.push(update);
1056 }
1057 Err(e) => {
1058 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
1059 }
1060 }
1061 }
1062 ConnectionDetails::AwsPrivatelink(_) => {
1063 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
1064 updates.push(self.pack_aws_privatelink_connection_update(
1065 id,
1066 aws_principal_context,
1067 diff,
1068 ));
1069 } else {
1070 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
1071 }
1072 }
1073 ConnectionDetails::Ssh {
1074 ref key_1,
1075 ref key_2,
1076 ..
1077 } => {
1078 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
1079 }
1080 ConnectionDetails::Csr(_)
1081 | ConnectionDetails::Postgres(_)
1082 | ConnectionDetails::MySql(_)
1083 | ConnectionDetails::SqlServer(_)
1084 | ConnectionDetails::IcebergCatalog(_) => (),
1085 };
1086 updates
1087 }
1088
1089 pub(crate) fn pack_ssh_tunnel_connection_update(
1090 &self,
1091 id: CatalogItemId,
1092 key_1: &SshKey,
1093 key_2: &SshKey,
1094 diff: Diff,
1095 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1096 BuiltinTableUpdate::row(
1097 &*MZ_SSH_TUNNEL_CONNECTIONS,
1098 Row::pack_slice(&[
1099 Datum::String(&id.to_string()),
1100 Datum::String(key_1.public_key().as_str()),
1101 Datum::String(key_2.public_key().as_str()),
1102 ]),
1103 diff,
1104 )
1105 }
1106
1107 fn pack_kafka_connection_update(
1108 &self,
1109 id: CatalogItemId,
1110 kafka: &KafkaConnection<ReferencedConnection>,
1111 diff: Diff,
1112 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1113 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
1114 let mut row = Row::default();
1115 row.packer()
1116 .try_push_array(
1117 &[ArrayDimension {
1118 lower_bound: 1,
1119 length: kafka.brokers.len(),
1120 }],
1121 kafka
1122 .brokers
1123 .iter()
1124 .map(|broker| Datum::String(&broker.address)),
1125 )
1126 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
1127 let brokers = row.unpack_first();
1128 vec![BuiltinTableUpdate::row(
1129 &*MZ_KAFKA_CONNECTIONS,
1130 Row::pack_slice(&[
1131 Datum::String(&id.to_string()),
1132 brokers,
1133 Datum::String(&progress_topic),
1134 ]),
1135 diff,
1136 )]
1137 }
1138
1139 pub fn pack_aws_privatelink_connection_update(
1140 &self,
1141 connection_id: CatalogItemId,
1142 aws_principal_context: &AwsPrincipalContext,
1143 diff: Diff,
1144 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1145 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
1146 let row = Row::pack_slice(&[
1147 Datum::String(&connection_id.to_string()),
1148 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
1149 ]);
1150 BuiltinTableUpdate::row(id, row, diff)
1151 }
1152
1153 pub fn pack_aws_connection_update(
1154 &self,
1155 connection_id: CatalogItemId,
1156 aws_config: &AwsConnection,
1157 diff: Diff,
1158 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
1159 let id = &MZ_AWS_CONNECTIONS;
1160
1161 let mut access_key_id = None;
1162 let mut access_key_id_secret_id = None;
1163 let mut secret_access_key_secret_id = None;
1164 let mut session_token = None;
1165 let mut session_token_secret_id = None;
1166 let mut assume_role_arn = None;
1167 let mut assume_role_session_name = None;
1168 let mut principal = None;
1169 let mut external_id = None;
1170 let mut example_trust_policy = None;
1171 match &aws_config.auth {
1172 AwsAuth::Credentials(credentials) => {
1173 match &credentials.access_key_id {
1174 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
1175 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
1176 }
1177 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
1178 match credentials.session_token.as_ref() {
1179 None => (),
1180 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
1181 Some(StringOrSecret::Secret(s)) => {
1182 session_token_secret_id = Some(s.to_string())
1183 }
1184 }
1185 }
1186 AwsAuth::AssumeRole(assume_role) => {
1187 assume_role_arn = Some(assume_role.arn.as_str());
1188 assume_role_session_name = assume_role.session_name.as_deref();
1189 principal = self
1190 .config
1191 .connection_context
1192 .aws_connection_role_arn
1193 .as_deref();
1194 external_id =
1195 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
1196 example_trust_policy = {
1197 let policy = assume_role
1198 .example_trust_policy(&self.config.connection_context, connection_id)?;
1199 let policy = Jsonb::from_serde_json(policy).expect("valid json");
1200 Some(policy.into_row())
1201 };
1202 }
1203 }
1204
1205 let row = Row::pack_slice(&[
1206 Datum::String(&connection_id.to_string()),
1207 Datum::from(aws_config.endpoint.as_deref()),
1208 Datum::from(aws_config.region.as_deref()),
1209 Datum::from(access_key_id),
1210 Datum::from(access_key_id_secret_id.as_deref()),
1211 Datum::from(secret_access_key_secret_id.as_deref()),
1212 Datum::from(session_token),
1213 Datum::from(session_token_secret_id.as_deref()),
1214 Datum::from(assume_role_arn),
1215 Datum::from(assume_role_session_name),
1216 Datum::from(principal),
1217 Datum::from(external_id.as_deref()),
1218 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
1219 ]);
1220
1221 Ok(BuiltinTableUpdate::row(id, row, diff))
1222 }
1223
1224 fn pack_view_update(
1225 &self,
1226 id: CatalogItemId,
1227 oid: u32,
1228 schema_id: &SchemaSpecifier,
1229 name: &str,
1230 owner_id: &RoleId,
1231 privileges: Datum,
1232 view: &View,
1233 diff: Diff,
1234 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1235 let create_stmt = mz_sql::parse::parse(&view.create_sql)
1236 .unwrap_or_else(|e| {
1237 panic!(
1238 "create_sql cannot be invalid: `{}` --- error: `{}`",
1239 view.create_sql, e
1240 )
1241 })
1242 .into_element()
1243 .ast;
1244 let query = match &create_stmt {
1245 Statement::CreateView(stmt) => &stmt.definition.query,
1246 _ => unreachable!(),
1247 };
1248
1249 let mut query_string = query.to_ast_string_stable();
1250 query_string.push(';');
1253
1254 vec![BuiltinTableUpdate::row(
1255 &*MZ_VIEWS,
1256 Row::pack_slice(&[
1257 Datum::String(&id.to_string()),
1258 Datum::UInt32(oid),
1259 Datum::String(&schema_id.to_string()),
1260 Datum::String(name),
1261 Datum::String(&query_string),
1262 Datum::String(&owner_id.to_string()),
1263 privileges,
1264 Datum::String(&view.create_sql),
1265 Datum::String(&create_stmt.to_ast_string_redacted()),
1266 ]),
1267 diff,
1268 )]
1269 }
1270
1271 fn pack_materialized_view_update(
1272 &self,
1273 id: CatalogItemId,
1274 oid: u32,
1275 schema_id: &SchemaSpecifier,
1276 name: &str,
1277 owner_id: &RoleId,
1278 privileges: Datum,
1279 mview: &MaterializedView,
1280 diff: Diff,
1281 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1282 let create_stmt = mz_sql::parse::parse(&mview.create_sql)
1283 .unwrap_or_else(|e| {
1284 panic!(
1285 "create_sql cannot be invalid: `{}` --- error: `{}`",
1286 mview.create_sql, e
1287 )
1288 })
1289 .into_element()
1290 .ast;
1291 let query_string = match &create_stmt {
1292 Statement::CreateMaterializedView(stmt) => {
1293 let mut query_string = stmt.query.to_ast_string_stable();
1294 query_string.push(';');
1297 query_string
1298 }
1299 _ => unreachable!(),
1300 };
1301
1302 let mut updates = Vec::new();
1303
1304 updates.push(BuiltinTableUpdate::row(
1305 &*MZ_MATERIALIZED_VIEWS,
1306 Row::pack_slice(&[
1307 Datum::String(&id.to_string()),
1308 Datum::UInt32(oid),
1309 Datum::String(&schema_id.to_string()),
1310 Datum::String(name),
1311 Datum::String(&mview.cluster_id.to_string()),
1312 Datum::String(&query_string),
1313 Datum::String(&owner_id.to_string()),
1314 privileges,
1315 Datum::String(&mview.create_sql),
1316 Datum::String(&create_stmt.to_ast_string_redacted()),
1317 ]),
1318 diff,
1319 ));
1320
1321 if let Some(refresh_schedule) = &mview.refresh_schedule {
1322 assert!(!refresh_schedule.is_empty());
1325 for RefreshEvery {
1326 interval,
1327 aligned_to,
1328 } in refresh_schedule.everies.iter()
1329 {
1330 let aligned_to_dt = mz_ore::now::to_datetime(
1331 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
1332 );
1333 updates.push(BuiltinTableUpdate::row(
1334 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1335 Row::pack_slice(&[
1336 Datum::String(&id.to_string()),
1337 Datum::String("every"),
1338 Datum::Interval(
1339 Interval::from_duration(interval).expect(
1340 "planning ensured that this is convertible back to Interval",
1341 ),
1342 ),
1343 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
1344 Datum::Null,
1345 ]),
1346 diff,
1347 ));
1348 }
1349 for at in refresh_schedule.ats.iter() {
1350 let at_dt = mz_ore::now::to_datetime(
1351 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
1352 );
1353 updates.push(BuiltinTableUpdate::row(
1354 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1355 Row::pack_slice(&[
1356 Datum::String(&id.to_string()),
1357 Datum::String("at"),
1358 Datum::Null,
1359 Datum::Null,
1360 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
1361 ]),
1362 diff,
1363 ));
1364 }
1365 } else {
1366 updates.push(BuiltinTableUpdate::row(
1367 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
1368 Row::pack_slice(&[
1369 Datum::String(&id.to_string()),
1370 Datum::String("on-commit"),
1371 Datum::Null,
1372 Datum::Null,
1373 Datum::Null,
1374 ]),
1375 diff,
1376 ));
1377 }
1378
1379 if let Some(target_id) = mview.replacement_target {
1380 updates.push(BuiltinTableUpdate::row(
1381 &*MZ_REPLACEMENTS,
1382 Row::pack_slice(&[
1383 Datum::String(&id.to_string()),
1384 Datum::String(&target_id.to_string()),
1385 ]),
1386 diff,
1387 ));
1388 }
1389
1390 updates
1391 }
1392
1393 fn pack_continual_task_update(
1394 &self,
1395 id: CatalogItemId,
1396 oid: u32,
1397 schema_id: &SchemaSpecifier,
1398 name: &str,
1399 owner_id: &RoleId,
1400 privileges: Datum,
1401 ct: &ContinualTask,
1402 diff: Diff,
1403 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1404 let create_stmt = mz_sql::parse::parse(&ct.create_sql)
1405 .unwrap_or_else(|e| {
1406 panic!(
1407 "create_sql cannot be invalid: `{}` --- error: `{}`",
1408 ct.create_sql, e
1409 )
1410 })
1411 .into_element()
1412 .ast;
1413 let query_string = match &create_stmt {
1414 Statement::CreateContinualTask(stmt) => {
1415 let mut query_string = String::new();
1416 for stmt in &stmt.stmts {
1417 let s = match stmt {
1418 ContinualTaskStmt::Insert(stmt) => stmt.to_ast_string_stable(),
1419 ContinualTaskStmt::Delete(stmt) => stmt.to_ast_string_stable(),
1420 };
1421 if query_string.is_empty() {
1422 query_string = s;
1423 } else {
1424 query_string.push_str("; ");
1425 query_string.push_str(&s);
1426 }
1427 }
1428 query_string
1429 }
1430 _ => unreachable!(),
1431 };
1432
1433 vec![BuiltinTableUpdate::row(
1434 &*MZ_CONTINUAL_TASKS,
1435 Row::pack_slice(&[
1436 Datum::String(&id.to_string()),
1437 Datum::UInt32(oid),
1438 Datum::String(&schema_id.to_string()),
1439 Datum::String(name),
1440 Datum::String(&ct.cluster_id.to_string()),
1441 Datum::String(&query_string),
1442 Datum::String(&owner_id.to_string()),
1443 privileges,
1444 Datum::String(&ct.create_sql),
1445 Datum::String(&create_stmt.to_ast_string_redacted()),
1446 ]),
1447 diff,
1448 )]
1449 }
1450
1451 fn pack_sink_update(
1452 &self,
1453 id: CatalogItemId,
1454 oid: u32,
1455 schema_id: &SchemaSpecifier,
1456 name: &str,
1457 owner_id: &RoleId,
1458 sink: &Sink,
1459 diff: Diff,
1460 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1461 let mut updates = vec![];
1462 match &sink.connection {
1463 StorageSinkConnection::Kafka(KafkaSinkConnection {
1464 topic: topic_name, ..
1465 }) => {
1466 updates.push(BuiltinTableUpdate::row(
1467 &*MZ_KAFKA_SINKS,
1468 Row::pack_slice(&[
1469 Datum::String(&id.to_string()),
1470 Datum::String(topic_name.as_str()),
1471 ]),
1472 diff,
1473 ));
1474 }
1475 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1476 namespace, table, ..
1477 }) => {
1478 updates.push(BuiltinTableUpdate::row(
1479 &*MZ_ICEBERG_SINKS,
1480 Row::pack_slice(&[
1481 Datum::String(&id.to_string()),
1482 Datum::String(namespace.as_str()),
1483 Datum::String(table.as_str()),
1484 ]),
1485 diff,
1486 ));
1487 }
1488 };
1489
1490 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1491 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1492 .into_element()
1493 .ast;
1494
1495 let envelope = sink.envelope();
1496
1497 let combined_format = sink.combined_format();
1499 let (key_format, value_format) = match sink.formats() {
1500 Some((key_format, value_format)) => (key_format, Some(value_format)),
1501 None => (None, None),
1502 };
1503
1504 updates.push(BuiltinTableUpdate::row(
1505 &*MZ_SINKS,
1506 Row::pack_slice(&[
1507 Datum::String(&id.to_string()),
1508 Datum::UInt32(oid),
1509 Datum::String(&schema_id.to_string()),
1510 Datum::String(name),
1511 Datum::String(sink.connection.name()),
1512 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1513 Datum::Null,
1515 Datum::from(envelope),
1516 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1519 Datum::from(key_format),
1520 Datum::from(value_format),
1521 Datum::String(&sink.cluster_id.to_string()),
1522 Datum::String(&owner_id.to_string()),
1523 Datum::String(&sink.create_sql),
1524 Datum::String(&create_stmt.to_ast_string_redacted()),
1525 ]),
1526 diff,
1527 ));
1528
1529 updates
1530 }
1531
1532 fn pack_index_update(
1533 &self,
1534 id: CatalogItemId,
1535 oid: u32,
1536 name: &str,
1537 owner_id: &RoleId,
1538 index: &Index,
1539 diff: Diff,
1540 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1541 let mut updates = vec![];
1542
1543 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1544 .unwrap_or_else(|e| {
1545 panic!(
1546 "create_sql cannot be invalid: `{}` --- error: `{}`",
1547 index.create_sql, e
1548 )
1549 })
1550 .into_element()
1551 .ast;
1552
1553 let key_sqls = match &create_stmt {
1554 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1555 .as_ref()
1556 .expect("key_parts is filled in during planning"),
1557 _ => unreachable!(),
1558 };
1559 let on_item_id = self.get_entry_by_global_id(&index.on).id();
1560
1561 updates.push(BuiltinTableUpdate::row(
1562 &*MZ_INDEXES,
1563 Row::pack_slice(&[
1564 Datum::String(&id.to_string()),
1565 Datum::UInt32(oid),
1566 Datum::String(name),
1567 Datum::String(&on_item_id.to_string()),
1568 Datum::String(&index.cluster_id.to_string()),
1569 Datum::String(&owner_id.to_string()),
1570 Datum::String(&index.create_sql),
1571 Datum::String(&create_stmt.to_ast_string_redacted()),
1572 ]),
1573 diff,
1574 ));
1575
1576 let on_entry = self.get_entry_by_global_id(&index.on);
1577 let on_desc = on_entry
1578 .relation_desc()
1579 .expect("can only create indexes on items with a valid description");
1580 let repr_col_types: Vec<ReprColumnType> = on_desc
1581 .typ()
1582 .column_types
1583 .iter()
1584 .map(ReprColumnType::from)
1585 .collect();
1586 for (i, key) in index.keys.iter().enumerate() {
1587 let nullable = key.typ(&repr_col_types).nullable;
1588 let seq_in_index = u64::cast_from(i + 1);
1589 let key_sql = key_sqls
1590 .get(i)
1591 .expect("missing sql information for index key")
1592 .to_ast_string_simple();
1593 let (field_number, expression) = match key {
1594 MirScalarExpr::Column(col, _) => {
1595 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1596 }
1597 _ => (Datum::Null, Datum::String(&key_sql)),
1598 };
1599 updates.push(BuiltinTableUpdate::row(
1600 &*MZ_INDEX_COLUMNS,
1601 Row::pack_slice(&[
1602 Datum::String(&id.to_string()),
1603 Datum::UInt64(seq_in_index),
1604 field_number,
1605 expression,
1606 Datum::from(nullable),
1607 ]),
1608 diff,
1609 ));
1610 }
1611
1612 updates
1613 }
1614
1615 fn pack_type_update(
1616 &self,
1617 id: CatalogItemId,
1618 oid: u32,
1619 schema_id: &SchemaSpecifier,
1620 name: &str,
1621 owner_id: &RoleId,
1622 privileges: Datum,
1623 typ: &Type,
1624 diff: Diff,
1625 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1626 let mut out = vec![];
1627
1628 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1629 mz_sql::parse::parse(create_sql)
1630 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1631 .into_element()
1632 .ast
1633 .to_ast_string_redacted()
1634 });
1635
1636 out.push(BuiltinTableUpdate::row(
1637 &*MZ_TYPES,
1638 Row::pack_slice(&[
1639 Datum::String(&id.to_string()),
1640 Datum::UInt32(oid),
1641 Datum::String(&schema_id.to_string()),
1642 Datum::String(name),
1643 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1644 Datum::String(&owner_id.to_string()),
1645 privileges,
1646 if let Some(create_sql) = &typ.create_sql {
1647 Datum::String(create_sql)
1648 } else {
1649 Datum::Null
1650 },
1651 if let Some(redacted) = &redacted {
1652 Datum::String(redacted)
1653 } else {
1654 Datum::Null
1655 },
1656 ]),
1657 diff,
1658 ));
1659
1660 let mut row = Row::default();
1661 let mut packer = row.packer();
1662
1663 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1664 if mods.is_empty() {
1665 packer.push(Datum::Null);
1666 } else {
1667 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1668 }
1669 }
1670
1671 let index_id = match &typ.details.typ {
1672 CatalogType::Array {
1673 element_reference: element_id,
1674 } => {
1675 packer.push(Datum::String(&id.to_string()));
1676 packer.push(Datum::String(&element_id.to_string()));
1677 &MZ_ARRAY_TYPES
1678 }
1679 CatalogType::List {
1680 element_reference: element_id,
1681 element_modifiers,
1682 } => {
1683 packer.push(Datum::String(&id.to_string()));
1684 packer.push(Datum::String(&element_id.to_string()));
1685 append_modifier(&mut packer, element_modifiers);
1686 &MZ_LIST_TYPES
1687 }
1688 CatalogType::Map {
1689 key_reference: key_id,
1690 value_reference: value_id,
1691 key_modifiers,
1692 value_modifiers,
1693 } => {
1694 packer.push(Datum::String(&id.to_string()));
1695 packer.push(Datum::String(&key_id.to_string()));
1696 packer.push(Datum::String(&value_id.to_string()));
1697 append_modifier(&mut packer, key_modifiers);
1698 append_modifier(&mut packer, value_modifiers);
1699 &MZ_MAP_TYPES
1700 }
1701 CatalogType::Pseudo => {
1702 packer.push(Datum::String(&id.to_string()));
1703 &MZ_PSEUDO_TYPES
1704 }
1705 _ => {
1706 packer.push(Datum::String(&id.to_string()));
1707 &MZ_BASE_TYPES
1708 }
1709 };
1710 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1711
1712 if let Some(pg_metadata) = &typ.details.pg_metadata {
1713 out.push(BuiltinTableUpdate::row(
1714 &*MZ_TYPE_PG_METADATA,
1715 Row::pack_slice(&[
1716 Datum::String(&id.to_string()),
1717 Datum::UInt32(pg_metadata.typinput_oid),
1718 Datum::UInt32(pg_metadata.typreceive_oid),
1719 ]),
1720 diff,
1721 ));
1722 }
1723
1724 out
1725 }
1726
1727 fn pack_func_update(
1728 &self,
1729 id: CatalogItemId,
1730 schema_id: &SchemaSpecifier,
1731 name: &str,
1732 owner_id: &RoleId,
1733 func: &Func,
1734 diff: Diff,
1735 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1736 let mut updates = vec![];
1737 for func_impl_details in func.inner.func_impls() {
1738 let arg_type_ids = func_impl_details
1739 .arg_typs
1740 .iter()
1741 .map(|typ| self.get_system_type(typ).id().to_string())
1742 .collect::<Vec<_>>();
1743
1744 let mut row = Row::default();
1745 row.packer()
1746 .try_push_array(
1747 &[ArrayDimension {
1748 lower_bound: 1,
1749 length: arg_type_ids.len(),
1750 }],
1751 arg_type_ids.iter().map(|id| Datum::String(id)),
1752 )
1753 .expect(
1754 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1755 );
1756 let arg_type_ids = row.unpack_first();
1757
1758 updates.push(BuiltinTableUpdate::row(
1759 &*MZ_FUNCTIONS,
1760 Row::pack_slice(&[
1761 Datum::String(&id.to_string()),
1762 Datum::UInt32(func_impl_details.oid),
1763 Datum::String(&schema_id.to_string()),
1764 Datum::String(name),
1765 arg_type_ids,
1766 Datum::from(
1767 func_impl_details
1768 .variadic_typ
1769 .map(|typ| self.get_system_type(typ).id().to_string())
1770 .as_deref(),
1771 ),
1772 Datum::from(
1773 func_impl_details
1774 .return_typ
1775 .map(|typ| self.get_system_type(typ).id().to_string())
1776 .as_deref(),
1777 ),
1778 func_impl_details.return_is_set.into(),
1779 Datum::String(&owner_id.to_string()),
1780 ]),
1781 diff,
1782 ));
1783
1784 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1785 updates.push(BuiltinTableUpdate::row(
1786 &*MZ_AGGREGATES,
1787 Row::pack_slice(&[
1788 Datum::UInt32(func_impl_details.oid),
1789 Datum::String("n"),
1791 Datum::Int16(0),
1792 ]),
1793 diff,
1794 ));
1795 }
1796 }
1797 updates
1798 }
1799
1800 pub fn pack_op_update(
1801 &self,
1802 operator: &str,
1803 func_impl_details: FuncImplCatalogDetails,
1804 diff: Diff,
1805 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1806 let arg_type_ids = func_impl_details
1807 .arg_typs
1808 .iter()
1809 .map(|typ| self.get_system_type(typ).id().to_string())
1810 .collect::<Vec<_>>();
1811
1812 let mut row = Row::default();
1813 row.packer()
1814 .try_push_array(
1815 &[ArrayDimension {
1816 lower_bound: 1,
1817 length: arg_type_ids.len(),
1818 }],
1819 arg_type_ids.iter().map(|id| Datum::String(id)),
1820 )
1821 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1822 let arg_type_ids = row.unpack_first();
1823
1824 BuiltinTableUpdate::row(
1825 &*MZ_OPERATORS,
1826 Row::pack_slice(&[
1827 Datum::UInt32(func_impl_details.oid),
1828 Datum::String(operator),
1829 arg_type_ids,
1830 Datum::from(
1831 func_impl_details
1832 .return_typ
1833 .map(|typ| self.get_system_type(typ).id().to_string())
1834 .as_deref(),
1835 ),
1836 ]),
1837 diff,
1838 )
1839 }
1840
1841 fn pack_secret_update(
1842 &self,
1843 id: CatalogItemId,
1844 oid: u32,
1845 schema_id: &SchemaSpecifier,
1846 name: &str,
1847 owner_id: &RoleId,
1848 privileges: Datum,
1849 diff: Diff,
1850 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1851 vec![BuiltinTableUpdate::row(
1852 &*MZ_SECRETS,
1853 Row::pack_slice(&[
1854 Datum::String(&id.to_string()),
1855 Datum::UInt32(oid),
1856 Datum::String(&schema_id.to_string()),
1857 Datum::String(name),
1858 Datum::String(&owner_id.to_string()),
1859 privileges,
1860 ]),
1861 diff,
1862 )]
1863 }
1864
1865 pub fn pack_audit_log_update(
1866 &self,
1867 event: &VersionedEvent,
1868 diff: Diff,
1869 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1870 let (event_type, object_type, details, user, occurred_at): (
1871 &EventType,
1872 &ObjectType,
1873 &EventDetails,
1874 &Option<String>,
1875 u64,
1876 ) = match event {
1877 VersionedEvent::V1(ev) => (
1878 &ev.event_type,
1879 &ev.object_type,
1880 &ev.details,
1881 &ev.user,
1882 ev.occurred_at,
1883 ),
1884 };
1885 let details = Jsonb::from_serde_json(details.as_json())
1886 .map_err(|e| {
1887 Error::new(ErrorKind::Unstructured(format!(
1888 "could not pack audit log update: {}",
1889 e
1890 )))
1891 })?
1892 .into_row();
1893 let details = details
1894 .iter()
1895 .next()
1896 .expect("details created above with a single jsonb column");
1897 let dt = mz_ore::now::to_datetime(occurred_at);
1898 let id = event.sortable_id();
1899 Ok(BuiltinTableUpdate::row(
1900 &*MZ_AUDIT_EVENTS,
1901 Row::pack_slice(&[
1902 Datum::UInt64(id),
1903 Datum::String(&format!("{}", event_type)),
1904 Datum::String(&format!("{}", object_type)),
1905 details,
1906 match user {
1907 Some(user) => Datum::String(user),
1908 None => Datum::Null,
1909 },
1910 Datum::TimestampTz(dt.try_into().expect("must fit")),
1911 ]),
1912 diff,
1913 ))
1914 }
1915
1916 pub fn pack_storage_usage_update(
1917 &self,
1918 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1919 diff: Diff,
1920 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1921 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1922 let row = Row::pack_slice(&[
1923 Datum::UInt64(event.id),
1924 Datum::from(event.shard_id.as_deref()),
1925 Datum::UInt64(event.size_bytes),
1926 Datum::TimestampTz(
1927 mz_ore::now::to_datetime(event.collection_timestamp)
1928 .try_into()
1929 .expect("must fit"),
1930 ),
1931 ]);
1932 BuiltinTableUpdate::row(id, row, diff)
1933 }
1934
1935 pub fn pack_egress_ip_update(
1936 &self,
1937 ip: &IpNet,
1938 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1939 let id = &MZ_EGRESS_IPS;
1940 let addr = ip.addr();
1941 let row = Row::pack_slice(&[
1942 Datum::String(&addr.to_string()),
1943 Datum::Int32(ip.prefix_len().into()),
1944 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1945 ]);
1946 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1947 }
1948
1949 pub fn pack_license_key_update(
1950 &self,
1951 license_key: &ValidatedLicenseKey,
1952 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1953 let id = &MZ_LICENSE_KEYS;
1954 let row = Row::pack_slice(&[
1955 Datum::String(&license_key.id),
1956 Datum::String(&license_key.organization),
1957 Datum::String(&license_key.environment_id),
1958 Datum::TimestampTz(
1959 mz_ore::now::to_datetime(license_key.expiration * 1000)
1960 .try_into()
1961 .expect("must fit"),
1962 ),
1963 Datum::TimestampTz(
1964 mz_ore::now::to_datetime(license_key.not_before * 1000)
1965 .try_into()
1966 .expect("must fit"),
1967 ),
1968 ]);
1969 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1970 }
1971
1972 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1973 let mut updates = Vec::new();
1974 for (size, alloc) in &self.cluster_replica_sizes.0 {
1975 if alloc.disabled {
1976 continue;
1977 }
1978
1979 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1982 let MemoryLimit(ByteSize(memory_bytes)) =
1983 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1984 let DiskLimit(ByteSize(disk_bytes)) =
1985 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1986
1987 let row = Row::pack_slice(&[
1988 size.as_str().into(),
1989 u64::cast_from(alloc.scale).into(),
1990 u64::cast_from(alloc.workers).into(),
1991 cpu_limit.as_nanocpus().into(),
1992 memory_bytes.into(),
1993 disk_bytes.into(),
1994 (alloc.credits_per_hour).into(),
1995 ]);
1996
1997 updates.push(BuiltinTableUpdate::row(
1998 &*MZ_CLUSTER_REPLICA_SIZES,
1999 row,
2000 Diff::ONE,
2001 ));
2002 }
2003
2004 updates
2005 }
2006
2007 pub fn pack_subscribe_update(
2008 &self,
2009 id: GlobalId,
2010 subscribe: &ActiveSubscribe,
2011 diff: Diff,
2012 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2013 let mut row = Row::default();
2014 let mut packer = row.packer();
2015 packer.push(Datum::String(&id.to_string()));
2016 packer.push(Datum::Uuid(subscribe.session_uuid));
2017 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
2018
2019 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
2020 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
2021
2022 let depends_on: Vec<_> = subscribe
2023 .depends_on
2024 .iter()
2025 .map(|id| id.to_string())
2026 .collect();
2027 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
2028
2029 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
2030 }
2031
2032 pub fn pack_session_update(
2033 &self,
2034 conn: &ConnMeta,
2035 diff: Diff,
2036 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2037 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
2038 BuiltinTableUpdate::row(
2039 &*MZ_SESSIONS,
2040 Row::pack_slice(&[
2041 Datum::Uuid(conn.uuid()),
2042 Datum::UInt32(conn.conn_id().unhandled()),
2043 Datum::String(&conn.authenticated_role_id().to_string()),
2044 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
2045 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
2046 ]),
2047 diff,
2048 )
2049 }
2050
2051 pub fn pack_default_privileges_update(
2052 &self,
2053 default_privilege_object: &DefaultPrivilegeObject,
2054 grantee: &RoleId,
2055 acl_mode: &AclMode,
2056 diff: Diff,
2057 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2058 BuiltinTableUpdate::row(
2059 &*MZ_DEFAULT_PRIVILEGES,
2060 Row::pack_slice(&[
2061 default_privilege_object.role_id.to_string().as_str().into(),
2062 default_privilege_object
2063 .database_id
2064 .map(|database_id| database_id.to_string())
2065 .as_deref()
2066 .into(),
2067 default_privilege_object
2068 .schema_id
2069 .map(|schema_id| schema_id.to_string())
2070 .as_deref()
2071 .into(),
2072 default_privilege_object
2073 .object_type
2074 .to_string()
2075 .to_lowercase()
2076 .as_str()
2077 .into(),
2078 grantee.to_string().as_str().into(),
2079 acl_mode.to_string().as_str().into(),
2080 ]),
2081 diff,
2082 )
2083 }
2084
2085 pub fn pack_system_privileges_update(
2086 &self,
2087 privileges: MzAclItem,
2088 diff: Diff,
2089 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2090 BuiltinTableUpdate::row(
2091 &*MZ_SYSTEM_PRIVILEGES,
2092 Row::pack_slice(&[privileges.into()]),
2093 diff,
2094 )
2095 }
2096
2097 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
2098 let mut row = Row::default();
2099 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
2100 row.packer()
2101 .try_push_array(
2102 &[ArrayDimension {
2103 lower_bound: 1,
2104 length: flat_privileges.len(),
2105 }],
2106 flat_privileges
2107 .into_iter()
2108 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
2109 )
2110 .expect("privileges is 1 dimensional, and its length is used for the array length");
2111 row
2112 }
2113
2114 pub fn pack_comment_update(
2115 &self,
2116 object_id: CommentObjectId,
2117 column_pos: Option<usize>,
2118 comment: &str,
2119 diff: Diff,
2120 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2121 let object_type = mz_sql::catalog::ObjectType::from(object_id);
2123 let audit_type = super::object_type_to_audit_object_type(object_type);
2124 let object_type_str = audit_type.to_string();
2125
2126 let object_id_str = match object_id {
2127 CommentObjectId::Table(global_id)
2128 | CommentObjectId::View(global_id)
2129 | CommentObjectId::MaterializedView(global_id)
2130 | CommentObjectId::Source(global_id)
2131 | CommentObjectId::Sink(global_id)
2132 | CommentObjectId::Index(global_id)
2133 | CommentObjectId::Func(global_id)
2134 | CommentObjectId::Connection(global_id)
2135 | CommentObjectId::Secret(global_id)
2136 | CommentObjectId::Type(global_id)
2137 | CommentObjectId::ContinualTask(global_id) => global_id.to_string(),
2138 CommentObjectId::Role(role_id) => role_id.to_string(),
2139 CommentObjectId::Database(database_id) => database_id.to_string(),
2140 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
2141 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
2142 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
2143 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
2144 };
2145 let column_pos_datum = match column_pos {
2146 Some(pos) => {
2147 let pos =
2149 i32::try_from(pos).expect("we constrain this value in the planning layer");
2150 Datum::Int32(pos)
2151 }
2152 None => Datum::Null,
2153 };
2154
2155 BuiltinTableUpdate::row(
2156 &*MZ_COMMENTS,
2157 Row::pack_slice(&[
2158 Datum::String(&object_id_str),
2159 Datum::String(&object_type_str),
2160 column_pos_datum,
2161 Datum::String(comment),
2162 ]),
2163 diff,
2164 )
2165 }
2166
2167 pub fn pack_webhook_source_update(
2168 &self,
2169 item_id: CatalogItemId,
2170 diff: Diff,
2171 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
2172 let url = self
2173 .try_get_webhook_url(&item_id)
2174 .expect("webhook source should exist");
2175 let url = url.to_string();
2176 let name = &self.get_entry(&item_id).name().item;
2177 let id_str = item_id.to_string();
2178
2179 BuiltinTableUpdate::row(
2180 &*MZ_WEBHOOKS_SOURCES,
2181 Row::pack_slice(&[
2182 Datum::String(&id_str),
2183 Datum::String(name),
2184 Datum::String(&url),
2185 ]),
2186 diff,
2187 )
2188 }
2189
2190 pub fn pack_source_references_update(
2191 &self,
2192 source_references: &SourceReferences,
2193 diff: Diff,
2194 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
2195 let source_id = source_references.source_id.to_string();
2196 let updated_at = &source_references.updated_at;
2197 source_references
2198 .references
2199 .iter()
2200 .map(|reference| {
2201 let mut row = Row::default();
2202 let mut packer = row.packer();
2203 packer.extend([
2204 Datum::String(&source_id),
2205 reference
2206 .namespace
2207 .as_ref()
2208 .map(|s| Datum::String(s))
2209 .unwrap_or(Datum::Null),
2210 Datum::String(&reference.name),
2211 Datum::TimestampTz(
2212 mz_ore::now::to_datetime(*updated_at)
2213 .try_into()
2214 .expect("must fit"),
2215 ),
2216 ]);
2217 if reference.columns.len() > 0 {
2218 packer
2219 .try_push_array(
2220 &[ArrayDimension {
2221 lower_bound: 1,
2222 length: reference.columns.len(),
2223 }],
2224 reference.columns.iter().map(|col| Datum::String(col)),
2225 )
2226 .expect(
2227 "columns is 1 dimensional, and its length is used for the array length",
2228 );
2229 } else {
2230 packer.push(Datum::Null);
2231 }
2232
2233 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
2234 })
2235 .collect()
2236 }
2237}