1mod notice;
11
12use bytesize::ByteSize;
13use ipnet::IpNet;
14use mz_adapter_types::compaction::CompactionWindow;
15use mz_audit_log::{EventDetails, EventType, ObjectType, VersionedEvent, VersionedStorageUsage};
16use mz_catalog::SYSTEM_CONN_ID;
17use mz_catalog::builtin::{
18 BuiltinTable, MZ_AGGREGATES, MZ_ARRAY_TYPES, MZ_AUDIT_EVENTS, MZ_AWS_CONNECTIONS,
19 MZ_AWS_PRIVATELINK_CONNECTIONS, MZ_BASE_TYPES, MZ_CLUSTER_REPLICA_SIZE_INTERNAL,
20 MZ_CLUSTER_REPLICA_SIZES, MZ_COLUMNS, MZ_COMMENTS, MZ_EGRESS_IPS, MZ_FUNCTIONS,
21 MZ_HISTORY_RETENTION_STRATEGIES, MZ_ICEBERG_SINKS, MZ_INDEX_COLUMNS, MZ_KAFKA_CONNECTIONS,
22 MZ_KAFKA_SINKS, MZ_KAFKA_SOURCE_TABLES, MZ_KAFKA_SOURCES, MZ_LICENSE_KEYS, MZ_LIST_TYPES,
23 MZ_MAP_TYPES, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES,
24 MZ_OBJECT_DEPENDENCIES, MZ_OBJECT_GLOBAL_IDS, MZ_OPERATORS, MZ_POSTGRES_SOURCE_TABLES,
25 MZ_POSTGRES_SOURCES, MZ_PSEUDO_TYPES, MZ_REPLACEMENTS, MZ_ROLE_AUTH, MZ_SESSIONS, MZ_SINKS,
26 MZ_SOURCE_REFERENCES, MZ_SQL_SERVER_SOURCE_TABLES, MZ_SSH_TUNNEL_CONNECTIONS,
27 MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_TABLES, MZ_TYPE_PG_METADATA, MZ_TYPES,
28 MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
29};
30use mz_catalog::config::AwsPrincipalContext;
31use mz_catalog::durable::SourceReferences;
32use mz_catalog::memory::error::{Error, ErrorKind};
33use mz_catalog::memory::objects::{
34 CatalogEntry, CatalogItem, Connection, DataSourceDesc, Func, Index, MaterializedView, Sink,
35 Table, TableDataSource, Type, View,
36};
37use mz_expr::MirScalarExpr;
38use mz_license_keys::ValidatedLicenseKey;
39use mz_orchestrator::{CpuLimit, DiskLimit, MemoryLimit};
40use mz_ore::cast::CastFrom;
41use mz_ore::collections::CollectionExt;
42use mz_persist_client::batch::ProtoBatch;
43use mz_repr::adt::array::ArrayDimension;
44use mz_repr::adt::interval::Interval;
45use mz_repr::adt::jsonb::Jsonb;
46use mz_repr::adt::mz_acl_item::PrivilegeMap;
47use mz_repr::refresh_schedule::RefreshEvery;
48use mz_repr::role_id::RoleId;
49use mz_repr::{
50 CatalogItemId, Datum, Diff, GlobalId, ReprColumnType, Row, RowPacker, SqlScalarType, Timestamp,
51};
52use mz_sql::ast::{CreateIndexStatement, Statement, UnresolvedItemName};
53use mz_sql::catalog::{CatalogType, TypeCategory};
54use mz_sql::func::FuncImplCatalogDetails;
55use mz_sql::names::{CommentObjectId, SchemaSpecifier};
56use mz_sql::plan::{ConnectionDetails, SshKey};
57use mz_sql_parser::ast::display::AstDisplay;
58use mz_storage_client::client::TableData;
59use mz_storage_types::connections::KafkaConnection;
60use mz_storage_types::connections::aws::{AwsAuth, AwsConnection};
61use mz_storage_types::connections::inline::ReferencedConnection;
62use mz_storage_types::connections::string_or_secret::StringOrSecret;
63use mz_storage_types::sinks::{IcebergSinkConnection, KafkaSinkConnection, StorageSinkConnection};
64use mz_storage_types::sources::{
65 GenericSourceConnection, KafkaSourceConnection, PostgresSourceConnection, SourceConnection,
66};
67use smallvec::smallvec;
68
69use crate::active_compute_sink::ActiveSubscribe;
71use crate::catalog::CatalogState;
72use crate::coord::ConnMeta;
73
74#[derive(Debug, Clone)]
76pub struct BuiltinTableUpdate<T = CatalogItemId> {
77 pub id: T,
79 pub data: TableData,
81}
82
83impl<T> BuiltinTableUpdate<T> {
84 pub fn row(id: T, row: Row, diff: Diff) -> BuiltinTableUpdate<T> {
86 BuiltinTableUpdate {
87 id,
88 data: TableData::Rows(vec![(row, diff)]),
89 }
90 }
91
92 pub fn batch(id: T, batch: ProtoBatch) -> BuiltinTableUpdate<T> {
93 BuiltinTableUpdate {
94 id,
95 data: TableData::Batches(smallvec![batch]),
96 }
97 }
98}
99
100impl CatalogState {
101 pub fn resolve_builtin_table_updates(
102 &self,
103 builtin_table_update: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
104 ) -> Vec<BuiltinTableUpdate<CatalogItemId>> {
105 builtin_table_update
106 .into_iter()
107 .map(|builtin_table_update| self.resolve_builtin_table_update(builtin_table_update))
108 .collect()
109 }
110
111 pub fn resolve_builtin_table_update(
112 &self,
113 BuiltinTableUpdate { id, data }: BuiltinTableUpdate<&'static BuiltinTable>,
114 ) -> BuiltinTableUpdate<CatalogItemId> {
115 let id = self.resolve_builtin_table(id);
116 BuiltinTableUpdate { id, data }
117 }
118
119 pub fn pack_depends_update(
120 &self,
121 depender: CatalogItemId,
122 dependee: CatalogItemId,
123 diff: Diff,
124 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
125 let row = Row::pack_slice(&[
126 Datum::String(&depender.to_string()),
127 Datum::String(&dependee.to_string()),
128 ]);
129 BuiltinTableUpdate::row(&*MZ_OBJECT_DEPENDENCIES, row, diff)
130 }
131
132 pub(super) fn pack_role_auth_update(
133 &self,
134 id: RoleId,
135 diff: Diff,
136 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
137 let role_auth = self.get_role_auth(&id);
138 let role = self.get_role(&id);
139 BuiltinTableUpdate::row(
140 &*MZ_ROLE_AUTH,
141 Row::pack_slice(&[
142 Datum::String(&role_auth.role_id.to_string()),
143 Datum::UInt32(role.oid),
144 match &role_auth.password_hash {
145 Some(hash) => Datum::String(hash),
146 None => Datum::Null,
147 },
148 Datum::TimestampTz(
149 mz_ore::now::to_datetime(role_auth.updated_at)
150 .try_into()
151 .expect("must fit"),
152 ),
153 ]),
154 diff,
155 )
156 }
157
158 pub(super) fn pack_item_update(
159 &self,
160 id: CatalogItemId,
161 diff: Diff,
162 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
163 let entry = self.get_entry(&id);
164 let oid = entry.oid();
165 let conn_id = entry.item().conn_id().unwrap_or(&SYSTEM_CONN_ID);
166 let schema_id = &self
167 .get_schema(
168 &entry.name().qualifiers.database_spec,
169 &entry.name().qualifiers.schema_spec,
170 conn_id,
171 )
172 .id;
173 let name = &entry.name().item;
174 let owner_id = entry.owner_id();
175 let privileges_row = self.pack_privilege_array_row(entry.privileges());
176 let privileges = privileges_row.unpack_first();
177 let mut updates = match entry.item() {
178 CatalogItem::Index(index) => self.pack_index_update(id, index, diff),
179 CatalogItem::Table(table) => {
180 let mut updates = self
181 .pack_table_update(id, oid, schema_id, name, owner_id, privileges, diff, table);
182
183 if let TableDataSource::DataSource {
184 desc: data_source,
185 timeline: _,
186 } = &table.data_source
187 {
188 updates.extend(match data_source {
189 DataSourceDesc::IngestionExport {
190 ingestion_id,
191 external_reference: UnresolvedItemName(external_reference),
192 details: _,
193 data_config: _,
194 } => {
195 let ingestion_entry = self
196 .get_entry(ingestion_id)
197 .source_desc()
198 .expect("primary source exists")
199 .expect("primary source is a source");
200
201 match ingestion_entry.connection.name() {
202 "postgres" => {
203 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
204 let schema_name = external_reference[1].as_str();
210 let table_name = external_reference[2].as_str();
211
212 self.pack_postgres_source_tables_update(
213 id,
214 schema_name,
215 table_name,
216 diff,
217 )
218 }
219 "mysql" => {
220 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
221 let schema_name = external_reference[0].as_str();
222 let table_name = external_reference[1].as_str();
223
224 self.pack_mysql_source_tables_update(
225 id,
226 schema_name,
227 table_name,
228 diff,
229 )
230 }
231 "sql-server" => {
232 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
233 let schema_name = external_reference[1].as_str();
238 let table_name = external_reference[2].as_str();
239
240 self.pack_sql_server_source_table_update(
241 id,
242 schema_name,
243 table_name,
244 diff,
245 )
246 }
247 "load-generator" => vec![],
250 "kafka" => {
251 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 1);
252 let topic = external_reference[0].as_str();
253 let envelope = data_source.envelope();
254 let (key_format, value_format) = data_source.formats();
255
256 self.pack_kafka_source_tables_update(
257 id,
258 topic,
259 envelope,
260 key_format,
261 value_format,
262 diff,
263 )
264 }
265 s => unreachable!("{s} sources do not have tables"),
266 }
267 }
268 DataSourceDesc::Ingestion { .. }
269 | DataSourceDesc::OldSyntaxIngestion { .. }
270 | DataSourceDesc::Introspection(_)
271 | DataSourceDesc::Progress
272 | DataSourceDesc::Webhook { .. }
273 | DataSourceDesc::Catalog => vec![],
274 });
275 }
276
277 updates
278 }
279 CatalogItem::Source(source) => {
280 match &source.data_source {
281 DataSourceDesc::Ingestion { desc, .. }
282 | DataSourceDesc::OldSyntaxIngestion { desc, .. } => match &desc.connection {
283 GenericSourceConnection::Postgres(postgres) => {
284 self.pack_postgres_source_update(id, postgres, diff)
285 }
286 GenericSourceConnection::Kafka(kafka) => {
287 self.pack_kafka_source_update(id, source.global_id(), kafka, diff)
288 }
289 _ => vec![],
290 },
291 DataSourceDesc::IngestionExport {
292 ingestion_id,
293 external_reference: UnresolvedItemName(external_reference),
294 details: _,
295 data_config: _,
296 } => {
297 let ingestion_entry = self
298 .get_entry(ingestion_id)
299 .source_desc()
300 .expect("primary source exists")
301 .expect("primary source is a source");
302
303 match ingestion_entry.connection.name() {
304 "postgres" => {
305 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
306 let schema_name = external_reference[1].as_str();
312 let table_name = external_reference[2].as_str();
313
314 self.pack_postgres_source_tables_update(
315 id,
316 schema_name,
317 table_name,
318 diff,
319 )
320 }
321 "mysql" => {
322 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 2);
323 let schema_name = external_reference[0].as_str();
324 let table_name = external_reference[1].as_str();
325
326 self.pack_mysql_source_tables_update(
327 id,
328 schema_name,
329 table_name,
330 diff,
331 )
332 }
333 "sql-server" => {
334 mz_ore::soft_assert_eq_no_log!(external_reference.len(), 3);
335 let schema_name = external_reference[1].as_str();
340 let table_name = external_reference[2].as_str();
341
342 self.pack_sql_server_source_table_update(
343 id,
344 schema_name,
345 table_name,
346 diff,
347 )
348 }
349 "load-generator" => vec![],
352 s => unreachable!("{s} sources do not have subsources"),
353 }
354 }
355 DataSourceDesc::Webhook { .. } => {
356 vec![self.pack_webhook_source_update(id, diff)]
357 }
358 DataSourceDesc::Introspection(_)
359 | DataSourceDesc::Progress
360 | DataSourceDesc::Catalog => vec![],
361 }
362 }
363 CatalogItem::View(view) => {
364 self.pack_view_update(id, oid, schema_id, name, owner_id, privileges, view, diff)
365 }
366 CatalogItem::MaterializedView(mview) => {
367 self.pack_materialized_view_update(id, mview, diff)
368 }
369 CatalogItem::Sink(sink) => {
370 self.pack_sink_update(id, oid, schema_id, name, owner_id, sink, diff)
371 }
372 CatalogItem::Type(ty) => {
373 self.pack_type_update(id, oid, schema_id, name, owner_id, privileges, ty, diff)
374 }
375 CatalogItem::Func(func) => {
376 self.pack_func_update(id, schema_id, name, owner_id, func, diff)
377 }
378 CatalogItem::Log(_) | CatalogItem::Secret(_) => vec![],
379 CatalogItem::Connection(connection) => {
380 self.pack_connection_update(id, connection, diff)
381 }
382 };
383
384 if !entry.item().is_temporary() {
385 for dependee in entry.item().references().items() {
388 updates.push(self.pack_depends_update(id, *dependee, diff))
389 }
390 }
391
392 if let Some(desc) = entry.relation_desc_latest() {
394 let defaults = match entry.item() {
395 CatalogItem::Table(Table {
396 data_source: TableDataSource::TableWrites { defaults },
397 ..
398 }) => Some(defaults),
399 _ => None,
400 };
401 for (i, (column_name, column_type)) in desc.iter().enumerate() {
402 let default: Option<String> = defaults.map(|d| d[i].to_ast_string_stable());
403 let default: Datum = default
404 .as_ref()
405 .map(|d| Datum::String(d))
406 .unwrap_or(Datum::Null);
407 let pgtype = mz_pgrepr::Type::from(&column_type.scalar_type);
408 let (type_name, type_oid) = match &column_type.scalar_type {
409 SqlScalarType::List {
410 custom_id: Some(custom_id),
411 ..
412 }
413 | SqlScalarType::Map {
414 custom_id: Some(custom_id),
415 ..
416 }
417 | SqlScalarType::Record {
418 custom_id: Some(custom_id),
419 ..
420 } => {
421 let entry = self.get_entry(custom_id);
422 let name = &*entry.name().item;
437 let oid = entry.oid();
438 (name, oid)
439 }
440 _ => (pgtype.name(), pgtype.oid()),
441 };
442 updates.push(BuiltinTableUpdate::row(
443 &*MZ_COLUMNS,
444 Row::pack_slice(&[
445 Datum::String(&id.to_string()),
446 Datum::String(column_name),
447 Datum::UInt64(u64::cast_from(i + 1)),
448 Datum::from(column_type.nullable),
449 Datum::String(type_name),
450 default,
451 Datum::UInt32(type_oid),
452 Datum::Int32(pgtype.typmod()),
453 ]),
454 diff,
455 ));
456 }
457 }
458
459 if let Some(cw) = entry.item().initial_logical_compaction_window() {
461 updates.push(self.pack_history_retention_strategy_update(id, cw, diff));
462 }
463
464 updates.extend(Self::pack_item_global_id_update(entry, diff));
465
466 updates
467 }
468
469 fn pack_item_global_id_update(
470 entry: &CatalogEntry,
471 diff: Diff,
472 ) -> impl Iterator<Item = BuiltinTableUpdate<&'static BuiltinTable>> + use<'_> {
473 let id = entry.id().to_string();
474 let global_ids = entry.global_ids();
475 global_ids.map(move |global_id| {
476 BuiltinTableUpdate::row(
477 &*MZ_OBJECT_GLOBAL_IDS,
478 Row::pack_slice(&[Datum::String(&id), Datum::String(&global_id.to_string())]),
479 diff,
480 )
481 })
482 }
483
484 fn pack_history_retention_strategy_update(
485 &self,
486 id: CatalogItemId,
487 cw: CompactionWindow,
488 diff: Diff,
489 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
490 let cw: u64 = cw.comparable_timestamp().into();
491 let cw = Jsonb::from_serde_json(serde_json::Value::Number(serde_json::Number::from(cw)))
492 .expect("must serialize");
493 BuiltinTableUpdate::row(
494 &*MZ_HISTORY_RETENTION_STRATEGIES,
495 Row::pack_slice(&[
496 Datum::String(&id.to_string()),
497 Datum::String("FOR"),
499 cw.into_row().into_element(),
500 ]),
501 diff,
502 )
503 }
504
505 fn pack_table_update(
506 &self,
507 id: CatalogItemId,
508 oid: u32,
509 schema_id: &SchemaSpecifier,
510 name: &str,
511 owner_id: &RoleId,
512 privileges: Datum,
513 diff: Diff,
514 table: &Table,
515 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
516 let redacted = table.create_sql.as_ref().map(|create_sql| {
517 mz_sql::parse::parse(create_sql)
518 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
519 .into_element()
520 .ast
521 .to_ast_string_redacted()
522 });
523 let source_id = if let TableDataSource::DataSource {
524 desc: DataSourceDesc::IngestionExport { ingestion_id, .. },
525 ..
526 } = &table.data_source
527 {
528 Some(ingestion_id.to_string())
529 } else {
530 None
531 };
532
533 vec![BuiltinTableUpdate::row(
534 &*MZ_TABLES,
535 Row::pack_slice(&[
536 Datum::String(&id.to_string()),
537 Datum::UInt32(oid),
538 Datum::String(&schema_id.to_string()),
539 Datum::String(name),
540 Datum::String(&owner_id.to_string()),
541 privileges,
542 if let Some(create_sql) = &table.create_sql {
543 Datum::String(create_sql)
544 } else {
545 Datum::Null
546 },
547 if let Some(redacted) = &redacted {
548 Datum::String(redacted)
549 } else {
550 Datum::Null
551 },
552 if let Some(source_id) = source_id.as_ref() {
553 Datum::String(source_id)
554 } else {
555 Datum::Null
556 },
557 ]),
558 diff,
559 )]
560 }
561
562 fn pack_postgres_source_update(
563 &self,
564 id: CatalogItemId,
565 postgres: &PostgresSourceConnection<ReferencedConnection>,
566 diff: Diff,
567 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
568 vec![BuiltinTableUpdate::row(
569 &*MZ_POSTGRES_SOURCES,
570 Row::pack_slice(&[
571 Datum::String(&id.to_string()),
572 Datum::String(&postgres.publication_details.slot),
573 Datum::from(postgres.publication_details.timeline_id),
574 ]),
575 diff,
576 )]
577 }
578
579 fn pack_kafka_source_update(
580 &self,
581 item_id: CatalogItemId,
582 collection_id: GlobalId,
583 kafka: &KafkaSourceConnection<ReferencedConnection>,
584 diff: Diff,
585 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
586 vec![BuiltinTableUpdate::row(
587 &*MZ_KAFKA_SOURCES,
588 Row::pack_slice(&[
589 Datum::String(&item_id.to_string()),
590 Datum::String(&kafka.group_id(&self.config.connection_context, collection_id)),
591 Datum::String(&kafka.topic),
592 ]),
593 diff,
594 )]
595 }
596
597 fn pack_postgres_source_tables_update(
598 &self,
599 id: CatalogItemId,
600 schema_name: &str,
601 table_name: &str,
602 diff: Diff,
603 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
604 vec![BuiltinTableUpdate::row(
605 &*MZ_POSTGRES_SOURCE_TABLES,
606 Row::pack_slice(&[
607 Datum::String(&id.to_string()),
608 Datum::String(schema_name),
609 Datum::String(table_name),
610 ]),
611 diff,
612 )]
613 }
614
615 fn pack_mysql_source_tables_update(
616 &self,
617 id: CatalogItemId,
618 schema_name: &str,
619 table_name: &str,
620 diff: Diff,
621 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
622 vec![BuiltinTableUpdate::row(
623 &*MZ_MYSQL_SOURCE_TABLES,
624 Row::pack_slice(&[
625 Datum::String(&id.to_string()),
626 Datum::String(schema_name),
627 Datum::String(table_name),
628 ]),
629 diff,
630 )]
631 }
632
633 fn pack_sql_server_source_table_update(
634 &self,
635 id: CatalogItemId,
636 schema_name: &str,
637 table_name: &str,
638 diff: Diff,
639 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
640 vec![BuiltinTableUpdate::row(
641 &*MZ_SQL_SERVER_SOURCE_TABLES,
642 Row::pack_slice(&[
643 Datum::String(&id.to_string()),
644 Datum::String(schema_name),
645 Datum::String(table_name),
646 ]),
647 diff,
648 )]
649 }
650
651 fn pack_kafka_source_tables_update(
652 &self,
653 id: CatalogItemId,
654 topic: &str,
655 envelope: Option<&str>,
656 key_format: Option<&str>,
657 value_format: Option<&str>,
658 diff: Diff,
659 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
660 vec![BuiltinTableUpdate::row(
661 &*MZ_KAFKA_SOURCE_TABLES,
662 Row::pack_slice(&[
663 Datum::String(&id.to_string()),
664 Datum::String(topic),
665 Datum::from(envelope),
666 Datum::from(key_format),
667 Datum::from(value_format),
668 ]),
669 diff,
670 )]
671 }
672
673 fn pack_connection_update(
674 &self,
675 id: CatalogItemId,
676 connection: &Connection,
677 diff: Diff,
678 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
679 let mut updates = vec![];
680 match connection.details {
681 ConnectionDetails::Kafka(ref kafka) => {
682 updates.extend(self.pack_kafka_connection_update(id, kafka, diff));
683 }
684 ConnectionDetails::Aws(ref aws_config) => {
685 match self.pack_aws_connection_update(id, aws_config, diff) {
686 Ok(update) => {
687 updates.push(update);
688 }
689 Err(e) => {
690 tracing::error!(%id, %e, "failed writing row to mz_aws_connections table");
691 }
692 }
693 }
694 ConnectionDetails::AwsPrivatelink(_) => {
695 if let Some(aws_principal_context) = self.aws_principal_context.as_ref() {
696 updates.push(self.pack_aws_privatelink_connection_update(
697 id,
698 aws_principal_context,
699 diff,
700 ));
701 } else {
702 tracing::error!(%id, "missing AWS principal context; cannot write row to mz_aws_privatelink_connections table");
703 }
704 }
705 ConnectionDetails::Ssh {
706 ref key_1,
707 ref key_2,
708 ..
709 } => {
710 updates.push(self.pack_ssh_tunnel_connection_update(id, key_1, key_2, diff));
711 }
712 ConnectionDetails::Csr(_)
713 | ConnectionDetails::GlueSchemaRegistry(_)
714 | ConnectionDetails::Gcp(_)
715 | ConnectionDetails::Postgres(_)
716 | ConnectionDetails::MySql(_)
717 | ConnectionDetails::SqlServer(_)
718 | ConnectionDetails::IcebergCatalog(_) => (),
719 };
720 updates
721 }
722
723 pub(crate) fn pack_ssh_tunnel_connection_update(
724 &self,
725 id: CatalogItemId,
726 key_1: &SshKey,
727 key_2: &SshKey,
728 diff: Diff,
729 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
730 BuiltinTableUpdate::row(
731 &*MZ_SSH_TUNNEL_CONNECTIONS,
732 Row::pack_slice(&[
733 Datum::String(&id.to_string()),
734 Datum::String(key_1.public_key().as_str()),
735 Datum::String(key_2.public_key().as_str()),
736 ]),
737 diff,
738 )
739 }
740
741 fn pack_kafka_connection_update(
742 &self,
743 id: CatalogItemId,
744 kafka: &KafkaConnection<ReferencedConnection>,
745 diff: Diff,
746 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
747 let progress_topic = kafka.progress_topic(&self.config.connection_context, id);
748 let mut row = Row::default();
749 row.packer()
750 .try_push_array(
751 &[ArrayDimension {
752 lower_bound: 1,
753 length: kafka.brokers.len(),
754 }],
755 kafka
756 .brokers
757 .iter()
758 .map(|broker| Datum::String(&broker.address)),
759 )
760 .expect("kafka.brokers is 1 dimensional, and its length is used for the array length");
761 let brokers = row.unpack_first();
762 vec![BuiltinTableUpdate::row(
763 &*MZ_KAFKA_CONNECTIONS,
764 Row::pack_slice(&[
765 Datum::String(&id.to_string()),
766 brokers,
767 Datum::String(&progress_topic),
768 ]),
769 diff,
770 )]
771 }
772
773 pub fn pack_aws_privatelink_connection_update(
774 &self,
775 connection_id: CatalogItemId,
776 aws_principal_context: &AwsPrincipalContext,
777 diff: Diff,
778 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
779 let id = &MZ_AWS_PRIVATELINK_CONNECTIONS;
780 let row = Row::pack_slice(&[
781 Datum::String(&connection_id.to_string()),
782 Datum::String(&aws_principal_context.to_principal_string(connection_id)),
783 ]);
784 BuiltinTableUpdate::row(id, row, diff)
785 }
786
787 pub fn pack_aws_connection_update(
788 &self,
789 connection_id: CatalogItemId,
790 aws_config: &AwsConnection,
791 diff: Diff,
792 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, anyhow::Error> {
793 let id = &MZ_AWS_CONNECTIONS;
794
795 let mut access_key_id = None;
796 let mut access_key_id_secret_id = None;
797 let mut secret_access_key_secret_id = None;
798 let mut session_token = None;
799 let mut session_token_secret_id = None;
800 let mut assume_role_arn = None;
801 let mut assume_role_session_name = None;
802 let mut principal = None;
803 let mut external_id = None;
804 let mut example_trust_policy = None;
805 match &aws_config.auth {
806 AwsAuth::Credentials(credentials) => {
807 match &credentials.access_key_id {
808 StringOrSecret::String(s) => access_key_id = Some(s.as_str()),
809 StringOrSecret::Secret(s) => access_key_id_secret_id = Some(s.to_string()),
810 }
811 secret_access_key_secret_id = Some(credentials.secret_access_key.to_string());
812 match credentials.session_token.as_ref() {
813 None => (),
814 Some(StringOrSecret::String(s)) => session_token = Some(s.as_str()),
815 Some(StringOrSecret::Secret(s)) => {
816 session_token_secret_id = Some(s.to_string())
817 }
818 }
819 }
820 AwsAuth::AssumeRole(assume_role) => {
821 assume_role_arn = Some(assume_role.arn.as_str());
822 assume_role_session_name = assume_role.session_name.as_deref();
823 principal = self
824 .config
825 .connection_context
826 .aws_connection_role_arn
827 .as_deref();
828 external_id =
829 Some(assume_role.external_id(&self.config.connection_context, connection_id)?);
830 example_trust_policy = {
831 let policy = assume_role
832 .example_trust_policy(&self.config.connection_context, connection_id)?;
833 let policy = Jsonb::from_serde_json(policy).expect("valid json");
834 Some(policy.into_row())
835 };
836 }
837 }
838
839 let row = Row::pack_slice(&[
840 Datum::String(&connection_id.to_string()),
841 Datum::from(aws_config.endpoint.as_deref()),
842 Datum::from(aws_config.region.as_deref()),
843 Datum::from(access_key_id),
844 Datum::from(access_key_id_secret_id.as_deref()),
845 Datum::from(secret_access_key_secret_id.as_deref()),
846 Datum::from(session_token),
847 Datum::from(session_token_secret_id.as_deref()),
848 Datum::from(assume_role_arn),
849 Datum::from(assume_role_session_name),
850 Datum::from(principal),
851 Datum::from(external_id.as_deref()),
852 Datum::from(example_trust_policy.as_ref().map(|p| p.into_element())),
853 ]);
854
855 Ok(BuiltinTableUpdate::row(id, row, diff))
856 }
857
858 fn pack_view_update(
859 &self,
860 id: CatalogItemId,
861 oid: u32,
862 schema_id: &SchemaSpecifier,
863 name: &str,
864 owner_id: &RoleId,
865 privileges: Datum,
866 view: &View,
867 diff: Diff,
868 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
869 let create_stmt = mz_sql::parse::parse(&view.create_sql)
870 .unwrap_or_else(|e| {
871 panic!(
872 "create_sql cannot be invalid: `{}` --- error: `{}`",
873 view.create_sql, e
874 )
875 })
876 .into_element()
877 .ast;
878 let query = match &create_stmt {
879 Statement::CreateView(stmt) => &stmt.definition.query,
880 _ => unreachable!(),
881 };
882
883 let mut query_string = query.to_ast_string_stable();
884 query_string.push(';');
887
888 vec![BuiltinTableUpdate::row(
889 &*MZ_VIEWS,
890 Row::pack_slice(&[
891 Datum::String(&id.to_string()),
892 Datum::UInt32(oid),
893 Datum::String(&schema_id.to_string()),
894 Datum::String(name),
895 Datum::String(&query_string),
896 Datum::String(&owner_id.to_string()),
897 privileges,
898 Datum::String(&view.create_sql),
899 Datum::String(&create_stmt.to_ast_string_redacted()),
900 ]),
901 diff,
902 )]
903 }
904
905 fn pack_materialized_view_update(
906 &self,
907 id: CatalogItemId,
908 mview: &MaterializedView,
909 diff: Diff,
910 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
911 let mut updates = Vec::new();
912
913 if let Some(refresh_schedule) = &mview.refresh_schedule {
914 assert!(!refresh_schedule.is_empty());
917 for RefreshEvery {
918 interval,
919 aligned_to,
920 } in refresh_schedule.everies.iter()
921 {
922 let aligned_to_dt = mz_ore::now::to_datetime(
923 <&Timestamp as TryInto<u64>>::try_into(aligned_to).expect("undoes planning"),
924 );
925 updates.push(BuiltinTableUpdate::row(
926 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
927 Row::pack_slice(&[
928 Datum::String(&id.to_string()),
929 Datum::String("every"),
930 Datum::Interval(
931 Interval::from_duration(interval).expect(
932 "planning ensured that this is convertible back to Interval",
933 ),
934 ),
935 Datum::TimestampTz(aligned_to_dt.try_into().expect("undoes planning")),
936 Datum::Null,
937 ]),
938 diff,
939 ));
940 }
941 for at in refresh_schedule.ats.iter() {
942 let at_dt = mz_ore::now::to_datetime(
943 <&Timestamp as TryInto<u64>>::try_into(at).expect("undoes planning"),
944 );
945 updates.push(BuiltinTableUpdate::row(
946 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
947 Row::pack_slice(&[
948 Datum::String(&id.to_string()),
949 Datum::String("at"),
950 Datum::Null,
951 Datum::Null,
952 Datum::TimestampTz(at_dt.try_into().expect("undoes planning")),
953 ]),
954 diff,
955 ));
956 }
957 } else {
958 updates.push(BuiltinTableUpdate::row(
959 &*MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES,
960 Row::pack_slice(&[
961 Datum::String(&id.to_string()),
962 Datum::String("on-commit"),
963 Datum::Null,
964 Datum::Null,
965 Datum::Null,
966 ]),
967 diff,
968 ));
969 }
970
971 if let Some(target_id) = mview.replacement_target {
972 updates.push(BuiltinTableUpdate::row(
973 &*MZ_REPLACEMENTS,
974 Row::pack_slice(&[
975 Datum::String(&id.to_string()),
976 Datum::String(&target_id.to_string()),
977 ]),
978 diff,
979 ));
980 }
981
982 updates
983 }
984
985 fn pack_sink_update(
986 &self,
987 id: CatalogItemId,
988 oid: u32,
989 schema_id: &SchemaSpecifier,
990 name: &str,
991 owner_id: &RoleId,
992 sink: &Sink,
993 diff: Diff,
994 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
995 let mut updates = vec![];
996 match &sink.connection {
997 StorageSinkConnection::Kafka(KafkaSinkConnection {
998 topic: topic_name, ..
999 }) => {
1000 updates.push(BuiltinTableUpdate::row(
1001 &*MZ_KAFKA_SINKS,
1002 Row::pack_slice(&[
1003 Datum::String(&id.to_string()),
1004 Datum::String(topic_name.as_str()),
1005 ]),
1006 diff,
1007 ));
1008 }
1009 StorageSinkConnection::Iceberg(IcebergSinkConnection {
1010 namespace, table, ..
1011 }) => {
1012 updates.push(BuiltinTableUpdate::row(
1013 &*MZ_ICEBERG_SINKS,
1014 Row::pack_slice(&[
1015 Datum::String(&id.to_string()),
1016 Datum::String(namespace.as_str()),
1017 Datum::String(table.as_str()),
1018 ]),
1019 diff,
1020 ));
1021 }
1022 };
1023
1024 let create_stmt = mz_sql::parse::parse(&sink.create_sql)
1025 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", sink.create_sql))
1026 .into_element()
1027 .ast;
1028
1029 let envelope = sink.envelope();
1030
1031 let combined_format = sink.combined_format();
1033 let (key_format, value_format) = match sink.formats() {
1034 Some((key_format, value_format)) => (key_format, Some(value_format)),
1035 None => (None, None),
1036 };
1037
1038 updates.push(BuiltinTableUpdate::row(
1039 &*MZ_SINKS,
1040 Row::pack_slice(&[
1041 Datum::String(&id.to_string()),
1042 Datum::UInt32(oid),
1043 Datum::String(&schema_id.to_string()),
1044 Datum::String(name),
1045 Datum::String(sink.connection.name()),
1046 Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()),
1047 Datum::Null,
1049 Datum::from(envelope),
1050 Datum::from(combined_format.as_ref().map(|f| f.as_ref())),
1053 Datum::from(key_format),
1054 Datum::from(value_format),
1055 Datum::String(&sink.cluster_id.to_string()),
1056 Datum::String(&owner_id.to_string()),
1057 Datum::String(&sink.create_sql),
1058 Datum::String(&create_stmt.to_ast_string_redacted()),
1059 ]),
1060 diff,
1061 ));
1062
1063 updates
1064 }
1065
1066 fn pack_index_update(
1067 &self,
1068 id: CatalogItemId,
1069 index: &Index,
1070 diff: Diff,
1071 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1072 let mut updates = vec![];
1073
1074 let create_stmt = mz_sql::parse::parse(&index.create_sql)
1075 .unwrap_or_else(|e| {
1076 panic!(
1077 "create_sql cannot be invalid: `{}` --- error: `{}`",
1078 index.create_sql, e
1079 )
1080 })
1081 .into_element()
1082 .ast;
1083
1084 let key_sqls = match &create_stmt {
1085 Statement::CreateIndex(CreateIndexStatement { key_parts, .. }) => key_parts
1086 .as_ref()
1087 .expect("key_parts is filled in during planning"),
1088 _ => unreachable!(),
1089 };
1090
1091 let on_entry = self.get_entry_by_global_id(&index.on);
1092 let on_desc = on_entry
1093 .relation_desc()
1094 .expect("can only create indexes on items with a valid description");
1095 let repr_col_types: Vec<ReprColumnType> = on_desc
1096 .typ()
1097 .column_types
1098 .iter()
1099 .map(ReprColumnType::from)
1100 .collect();
1101 for (i, key) in index.keys.iter().enumerate() {
1102 let nullable = key.typ(&repr_col_types).nullable;
1103 let seq_in_index = u64::cast_from(i + 1);
1104 let key_sql = key_sqls
1105 .get(i)
1106 .expect("missing sql information for index key")
1107 .to_ast_string_simple();
1108 let (field_number, expression) = match key {
1109 MirScalarExpr::Column(col, _) => {
1110 (Datum::UInt64(u64::cast_from(*col + 1)), Datum::Null)
1111 }
1112 _ => (Datum::Null, Datum::String(&key_sql)),
1113 };
1114 updates.push(BuiltinTableUpdate::row(
1115 &*MZ_INDEX_COLUMNS,
1116 Row::pack_slice(&[
1117 Datum::String(&id.to_string()),
1118 Datum::UInt64(seq_in_index),
1119 field_number,
1120 expression,
1121 Datum::from(nullable),
1122 ]),
1123 diff,
1124 ));
1125 }
1126
1127 updates
1128 }
1129
1130 fn pack_type_update(
1131 &self,
1132 id: CatalogItemId,
1133 oid: u32,
1134 schema_id: &SchemaSpecifier,
1135 name: &str,
1136 owner_id: &RoleId,
1137 privileges: Datum,
1138 typ: &Type,
1139 diff: Diff,
1140 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1141 let mut out = vec![];
1142
1143 let redacted = typ.create_sql.as_ref().map(|create_sql| {
1144 mz_sql::parse::parse(create_sql)
1145 .unwrap_or_else(|_| panic!("create_sql cannot be invalid: {}", create_sql))
1146 .into_element()
1147 .ast
1148 .to_ast_string_redacted()
1149 });
1150
1151 out.push(BuiltinTableUpdate::row(
1152 &*MZ_TYPES,
1153 Row::pack_slice(&[
1154 Datum::String(&id.to_string()),
1155 Datum::UInt32(oid),
1156 Datum::String(&schema_id.to_string()),
1157 Datum::String(name),
1158 Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()),
1159 Datum::String(&owner_id.to_string()),
1160 privileges,
1161 if let Some(create_sql) = &typ.create_sql {
1162 Datum::String(create_sql)
1163 } else {
1164 Datum::Null
1165 },
1166 if let Some(redacted) = &redacted {
1167 Datum::String(redacted)
1168 } else {
1169 Datum::Null
1170 },
1171 ]),
1172 diff,
1173 ));
1174
1175 let mut row = Row::default();
1176 let mut packer = row.packer();
1177
1178 fn append_modifier(packer: &mut RowPacker<'_>, mods: &[i64]) {
1179 if mods.is_empty() {
1180 packer.push(Datum::Null);
1181 } else {
1182 packer.push_list(mods.iter().map(|m| Datum::Int64(*m)));
1183 }
1184 }
1185
1186 let index_id = match &typ.details.typ {
1187 CatalogType::Array {
1188 element_reference: element_id,
1189 } => {
1190 packer.push(Datum::String(&id.to_string()));
1191 packer.push(Datum::String(&element_id.to_string()));
1192 &MZ_ARRAY_TYPES
1193 }
1194 CatalogType::List {
1195 element_reference: element_id,
1196 element_modifiers,
1197 } => {
1198 packer.push(Datum::String(&id.to_string()));
1199 packer.push(Datum::String(&element_id.to_string()));
1200 append_modifier(&mut packer, element_modifiers);
1201 &MZ_LIST_TYPES
1202 }
1203 CatalogType::Map {
1204 key_reference: key_id,
1205 value_reference: value_id,
1206 key_modifiers,
1207 value_modifiers,
1208 } => {
1209 packer.push(Datum::String(&id.to_string()));
1210 packer.push(Datum::String(&key_id.to_string()));
1211 packer.push(Datum::String(&value_id.to_string()));
1212 append_modifier(&mut packer, key_modifiers);
1213 append_modifier(&mut packer, value_modifiers);
1214 &MZ_MAP_TYPES
1215 }
1216 CatalogType::Pseudo => {
1217 packer.push(Datum::String(&id.to_string()));
1218 &MZ_PSEUDO_TYPES
1219 }
1220 _ => {
1221 packer.push(Datum::String(&id.to_string()));
1222 &MZ_BASE_TYPES
1223 }
1224 };
1225 out.push(BuiltinTableUpdate::row(index_id, row, diff));
1226
1227 if let Some(pg_metadata) = &typ.details.pg_metadata {
1228 out.push(BuiltinTableUpdate::row(
1229 &*MZ_TYPE_PG_METADATA,
1230 Row::pack_slice(&[
1231 Datum::String(&id.to_string()),
1232 Datum::UInt32(pg_metadata.typinput_oid),
1233 Datum::UInt32(pg_metadata.typreceive_oid),
1234 ]),
1235 diff,
1236 ));
1237 }
1238
1239 out
1240 }
1241
1242 fn pack_func_update(
1243 &self,
1244 id: CatalogItemId,
1245 schema_id: &SchemaSpecifier,
1246 name: &str,
1247 owner_id: &RoleId,
1248 func: &Func,
1249 diff: Diff,
1250 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1251 let mut updates = vec![];
1252 for func_impl_details in func.inner.func_impls() {
1253 let arg_type_ids = func_impl_details
1254 .arg_typs
1255 .iter()
1256 .map(|typ| self.get_system_type(typ).id().to_string())
1257 .collect::<Vec<_>>();
1258
1259 let mut row = Row::default();
1260 row.packer()
1261 .try_push_array(
1262 &[ArrayDimension {
1263 lower_bound: 1,
1264 length: arg_type_ids.len(),
1265 }],
1266 arg_type_ids.iter().map(|id| Datum::String(id)),
1267 )
1268 .expect(
1269 "arg_type_ids is 1 dimensional, and its length is used for the array length",
1270 );
1271 let arg_type_ids = row.unpack_first();
1272
1273 updates.push(BuiltinTableUpdate::row(
1274 &*MZ_FUNCTIONS,
1275 Row::pack_slice(&[
1276 Datum::String(&id.to_string()),
1277 Datum::UInt32(func_impl_details.oid),
1278 Datum::String(&schema_id.to_string()),
1279 Datum::String(name),
1280 arg_type_ids,
1281 Datum::from(
1282 func_impl_details
1283 .variadic_typ
1284 .map(|typ| self.get_system_type(typ).id().to_string())
1285 .as_deref(),
1286 ),
1287 Datum::from(
1288 func_impl_details
1289 .return_typ
1290 .map(|typ| self.get_system_type(typ).id().to_string())
1291 .as_deref(),
1292 ),
1293 func_impl_details.return_is_set.into(),
1294 Datum::String(&owner_id.to_string()),
1295 ]),
1296 diff,
1297 ));
1298
1299 if let mz_sql::func::Func::Aggregate(_) = func.inner {
1300 updates.push(BuiltinTableUpdate::row(
1301 &*MZ_AGGREGATES,
1302 Row::pack_slice(&[
1303 Datum::UInt32(func_impl_details.oid),
1304 Datum::String("n"),
1306 Datum::Int16(0),
1307 ]),
1308 diff,
1309 ));
1310 }
1311 }
1312 updates
1313 }
1314
1315 pub fn pack_op_update(
1316 &self,
1317 operator: &str,
1318 func_impl_details: FuncImplCatalogDetails,
1319 diff: Diff,
1320 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1321 let arg_type_ids = func_impl_details
1322 .arg_typs
1323 .iter()
1324 .map(|typ| self.get_system_type(typ).id().to_string())
1325 .collect::<Vec<_>>();
1326
1327 let mut row = Row::default();
1328 row.packer()
1329 .try_push_array(
1330 &[ArrayDimension {
1331 lower_bound: 1,
1332 length: arg_type_ids.len(),
1333 }],
1334 arg_type_ids.iter().map(|id| Datum::String(id)),
1335 )
1336 .expect("arg_type_ids is 1 dimensional, and its length is used for the array length");
1337 let arg_type_ids = row.unpack_first();
1338
1339 BuiltinTableUpdate::row(
1340 &*MZ_OPERATORS,
1341 Row::pack_slice(&[
1342 Datum::UInt32(func_impl_details.oid),
1343 Datum::String(operator),
1344 arg_type_ids,
1345 Datum::from(
1346 func_impl_details
1347 .return_typ
1348 .map(|typ| self.get_system_type(typ).id().to_string())
1349 .as_deref(),
1350 ),
1351 ]),
1352 diff,
1353 )
1354 }
1355
1356 pub fn pack_audit_log_update(
1357 &self,
1358 event: &VersionedEvent,
1359 diff: Diff,
1360 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1361 let (event_type, object_type, details, user, occurred_at): (
1362 &EventType,
1363 &ObjectType,
1364 &EventDetails,
1365 &Option<String>,
1366 u64,
1367 ) = match event {
1368 VersionedEvent::V1(ev) => (
1369 &ev.event_type,
1370 &ev.object_type,
1371 &ev.details,
1372 &ev.user,
1373 ev.occurred_at,
1374 ),
1375 };
1376 let details = Jsonb::from_serde_json(details.as_json())
1377 .map_err(|e| {
1378 Error::new(ErrorKind::Unstructured(format!(
1379 "could not pack audit log update: {}",
1380 e
1381 )))
1382 })?
1383 .into_row();
1384 let details = details
1385 .iter()
1386 .next()
1387 .expect("details created above with a single jsonb column");
1388 let dt = mz_ore::now::to_datetime(occurred_at);
1389 let id = event.sortable_id();
1390 Ok(BuiltinTableUpdate::row(
1391 &*MZ_AUDIT_EVENTS,
1392 Row::pack_slice(&[
1393 Datum::UInt64(id),
1394 Datum::String(&format!("{}", event_type)),
1395 Datum::String(&format!("{}", object_type)),
1396 details,
1397 match user {
1398 Some(user) => Datum::String(user),
1399 None => Datum::Null,
1400 },
1401 Datum::TimestampTz(dt.try_into().expect("must fit")),
1402 ]),
1403 diff,
1404 ))
1405 }
1406
1407 pub fn pack_storage_usage_update(
1408 &self,
1409 VersionedStorageUsage::V1(event): VersionedStorageUsage,
1410 diff: Diff,
1411 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1412 let id = &MZ_STORAGE_USAGE_BY_SHARD;
1413 let row = Row::pack_slice(&[
1414 Datum::UInt64(event.id),
1415 Datum::from(event.shard_id.as_deref()),
1416 Datum::UInt64(event.size_bytes),
1417 Datum::TimestampTz(
1418 mz_ore::now::to_datetime(event.collection_timestamp)
1419 .try_into()
1420 .expect("must fit"),
1421 ),
1422 ]);
1423 BuiltinTableUpdate::row(id, row, diff)
1424 }
1425
1426 pub fn pack_egress_ip_update(
1427 &self,
1428 ip: &IpNet,
1429 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1430 let id = &MZ_EGRESS_IPS;
1431 let addr = ip.network();
1432 let row = Row::pack_slice(&[
1433 Datum::String(&addr.to_string()),
1434 Datum::Int32(ip.prefix_len().into()),
1435 Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
1436 ]);
1437 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1438 }
1439
1440 pub fn pack_license_key_update(
1441 &self,
1442 license_key: &ValidatedLicenseKey,
1443 ) -> Result<BuiltinTableUpdate<&'static BuiltinTable>, Error> {
1444 let id = &MZ_LICENSE_KEYS;
1445 let row = Row::pack_slice(&[
1446 Datum::String(&license_key.id),
1447 Datum::String(&license_key.organization),
1448 Datum::String(&license_key.environment_id),
1449 Datum::TimestampTz(
1450 mz_ore::now::to_datetime(license_key.expiration * 1000)
1451 .try_into()
1452 .expect("must fit"),
1453 ),
1454 Datum::TimestampTz(
1455 mz_ore::now::to_datetime(license_key.not_before * 1000)
1456 .try_into()
1457 .expect("must fit"),
1458 ),
1459 ]);
1460 Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
1461 }
1462
1463 pub fn pack_all_replica_size_updates(&self) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1464 let mut updates = Vec::new();
1465 for (size, alloc) in &self.cluster_replica_sizes.0 {
1466 let DiskLimit(ByteSize(disk_bytes)) =
1469 (alloc.disk_limit).unwrap_or(DiskLimit::ARBITRARY);
1470
1471 let internal_row = Row::pack_slice(&[
1480 size.as_str().into(),
1481 Datum::from(alloc.swap_enabled),
1482 disk_bytes.into(),
1483 ]);
1484 updates.push(BuiltinTableUpdate::row(
1485 &*MZ_CLUSTER_REPLICA_SIZE_INTERNAL,
1486 internal_row,
1487 Diff::ONE,
1488 ));
1489
1490 if alloc.disabled {
1491 continue;
1492 }
1493
1494 let cpu_limit = alloc.cpu_limit.unwrap_or(CpuLimit::MAX);
1495 let MemoryLimit(ByteSize(memory_bytes)) =
1496 (alloc.memory_limit).unwrap_or(MemoryLimit::MAX);
1497
1498 let row = Row::pack_slice(&[
1499 size.as_str().into(),
1500 u64::cast_from(alloc.scale).into(),
1501 u64::cast_from(alloc.workers).into(),
1502 cpu_limit.as_nanocpus().into(),
1503 memory_bytes.into(),
1504 disk_bytes.into(),
1505 (alloc.credits_per_hour).into(),
1506 ]);
1507
1508 updates.push(BuiltinTableUpdate::row(
1509 &*MZ_CLUSTER_REPLICA_SIZES,
1510 row,
1511 Diff::ONE,
1512 ));
1513 }
1514
1515 updates
1516 }
1517
1518 pub fn pack_subscribe_update(
1519 &self,
1520 id: GlobalId,
1521 subscribe: &ActiveSubscribe,
1522 diff: Diff,
1523 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1524 let mut row = Row::default();
1525 let mut packer = row.packer();
1526 packer.push(Datum::String(&id.to_string()));
1527 packer.push(Datum::Uuid(subscribe.session_uuid));
1528 packer.push(Datum::String(&subscribe.cluster_id.to_string()));
1529
1530 let start_dt = mz_ore::now::to_datetime(subscribe.start_time);
1531 packer.push(Datum::TimestampTz(start_dt.try_into().expect("must fit")));
1532
1533 let depends_on: Vec<_> = subscribe
1534 .depends_on
1535 .iter()
1536 .map(|id| id.to_string())
1537 .collect();
1538 packer.push_list(depends_on.iter().map(|s| Datum::String(s)));
1539
1540 BuiltinTableUpdate::row(&*MZ_SUBSCRIPTIONS, row, diff)
1541 }
1542
1543 pub fn pack_session_update(
1544 &self,
1545 conn: &ConnMeta,
1546 diff: Diff,
1547 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1548 let connect_dt = mz_ore::now::to_datetime(conn.connected_at());
1549 BuiltinTableUpdate::row(
1550 &*MZ_SESSIONS,
1551 Row::pack_slice(&[
1552 Datum::Uuid(conn.uuid()),
1553 Datum::UInt32(conn.conn_id().unhandled()),
1554 Datum::String(&conn.authenticated_role_id().to_string()),
1555 Datum::from(conn.client_ip().map(|ip| ip.to_string()).as_deref()),
1556 Datum::TimestampTz(connect_dt.try_into().expect("must fit")),
1557 ]),
1558 diff,
1559 )
1560 }
1561
1562 fn pack_privilege_array_row(&self, privileges: &PrivilegeMap) -> Row {
1563 let mut row = Row::default();
1564 let flat_privileges: Vec<_> = privileges.all_values_owned().collect();
1565 row.packer()
1566 .try_push_array(
1567 &[ArrayDimension {
1568 lower_bound: 1,
1569 length: flat_privileges.len(),
1570 }],
1571 flat_privileges
1572 .into_iter()
1573 .map(|mz_acl_item| Datum::MzAclItem(mz_acl_item.clone())),
1574 )
1575 .expect("privileges is 1 dimensional, and its length is used for the array length");
1576 row
1577 }
1578
1579 pub fn pack_comment_update(
1580 &self,
1581 object_id: CommentObjectId,
1582 column_pos: Option<usize>,
1583 comment: &str,
1584 diff: Diff,
1585 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1586 let object_type = mz_sql::catalog::ObjectType::from(object_id);
1588 let audit_type = super::object_type_to_audit_object_type(object_type);
1589 let object_type_str = audit_type.to_string();
1590
1591 let object_id_str = match object_id {
1592 CommentObjectId::Table(global_id)
1593 | CommentObjectId::View(global_id)
1594 | CommentObjectId::MaterializedView(global_id)
1595 | CommentObjectId::Source(global_id)
1596 | CommentObjectId::Sink(global_id)
1597 | CommentObjectId::Index(global_id)
1598 | CommentObjectId::Func(global_id)
1599 | CommentObjectId::Connection(global_id)
1600 | CommentObjectId::Secret(global_id)
1601 | CommentObjectId::Type(global_id) => global_id.to_string(),
1602 CommentObjectId::Role(role_id) => role_id.to_string(),
1603 CommentObjectId::Database(database_id) => database_id.to_string(),
1604 CommentObjectId::Schema((_, schema_id)) => schema_id.to_string(),
1605 CommentObjectId::Cluster(cluster_id) => cluster_id.to_string(),
1606 CommentObjectId::ClusterReplica((_, replica_id)) => replica_id.to_string(),
1607 CommentObjectId::NetworkPolicy(network_policy_id) => network_policy_id.to_string(),
1608 };
1609 let column_pos_datum = match column_pos {
1610 Some(pos) => {
1611 let pos =
1613 i32::try_from(pos).expect("we constrain this value in the planning layer");
1614 Datum::Int32(pos)
1615 }
1616 None => Datum::Null,
1617 };
1618
1619 BuiltinTableUpdate::row(
1620 &*MZ_COMMENTS,
1621 Row::pack_slice(&[
1622 Datum::String(&object_id_str),
1623 Datum::String(&object_type_str),
1624 column_pos_datum,
1625 Datum::String(comment),
1626 ]),
1627 diff,
1628 )
1629 }
1630
1631 pub fn pack_webhook_source_update(
1632 &self,
1633 item_id: CatalogItemId,
1634 diff: Diff,
1635 ) -> BuiltinTableUpdate<&'static BuiltinTable> {
1636 let url = self
1637 .try_get_webhook_url(&item_id)
1638 .expect("webhook source should exist");
1639 let url = url.to_string();
1640 let name = &self.get_entry(&item_id).name().item;
1641 let id_str = item_id.to_string();
1642
1643 BuiltinTableUpdate::row(
1644 &*MZ_WEBHOOKS_SOURCES,
1645 Row::pack_slice(&[
1646 Datum::String(&id_str),
1647 Datum::String(name),
1648 Datum::String(&url),
1649 ]),
1650 diff,
1651 )
1652 }
1653
1654 pub fn pack_source_references_update(
1655 &self,
1656 source_references: &SourceReferences,
1657 diff: Diff,
1658 ) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
1659 let source_id = source_references.source_id.to_string();
1660 let updated_at = &source_references.updated_at;
1661 source_references
1662 .references
1663 .iter()
1664 .map(|reference| {
1665 let mut row = Row::default();
1666 let mut packer = row.packer();
1667 packer.extend([
1668 Datum::String(&source_id),
1669 reference
1670 .namespace
1671 .as_ref()
1672 .map(|s| Datum::String(s))
1673 .unwrap_or(Datum::Null),
1674 Datum::String(&reference.name),
1675 Datum::TimestampTz(
1676 mz_ore::now::to_datetime(*updated_at)
1677 .try_into()
1678 .expect("must fit"),
1679 ),
1680 ]);
1681 if reference.columns.len() > 0 {
1682 packer
1683 .try_push_array(
1684 &[ArrayDimension {
1685 lower_bound: 1,
1686 length: reference.columns.len(),
1687 }],
1688 reference.columns.iter().map(|col| Datum::String(col)),
1689 )
1690 .expect(
1691 "columns is 1 dimensional, and its length is used for the array length",
1692 );
1693 } else {
1694 packer.push(Datum::Null);
1695 }
1696
1697 BuiltinTableUpdate::row(&*MZ_SOURCE_REFERENCES, row, diff)
1698 })
1699 .collect()
1700 }
1701}