1use std::collections::BTreeMap;
19
20use mz_pgrepr::oid;
21use mz_repr::namespaces::MZ_INTERNAL_SCHEMA;
22use mz_repr::{RelationDesc, SemanticType, SqlScalarType};
23use mz_sql::catalog::NameReference;
24
25use super::{Builtin, BuiltinView, Ontology, PUBLIC_SELECT};
26
27pub(super) fn generate_views(builtins: &[Builtin<NameReference>]) -> Vec<Builtin<NameReference>> {
28 let infos: Vec<_> = builtins
29 .iter()
30 .filter_map(|b| {
31 let (name, schema, desc, ontology) = match b {
32 Builtin::Table(t) => (t.name, t.schema, t.desc.clone(), t.ontology.as_ref()?),
33 Builtin::View(v) => (v.name, v.schema, v.desc.clone(), v.ontology.as_ref()?),
34 Builtin::MaterializedView(mv) => {
35 (mv.name, mv.schema, mv.desc.clone(), mv.ontology.as_ref()?)
36 }
37 Builtin::Source(s) => (s.name, s.schema, s.desc.clone(), s.ontology.as_ref()?),
38 Builtin::Log(log) => {
39 let ontology = log.ontology.as_ref()?;
40 (log.name, log.schema, log.variant.desc(), ontology)
41 }
42 _ => return None,
43 };
44 let entity_name = ontology.entity_name.to_string();
45 Some(Info {
46 table_name: name,
47 schema_name: schema,
48 entity_name,
49 desc,
50 ontology,
51 })
52 })
53 .collect();
54
55 vec![
56 Builtin::View(leak(entity_types_view(&infos))),
57 Builtin::View(leak(semantic_types_view())),
58 Builtin::View(leak(properties_view(&infos))),
59 Builtin::View(leak(link_types_view(&infos))),
60 ]
61}
62
63fn leak(v: BuiltinView) -> &'static BuiltinView {
68 Box::leak(Box::new(v))
69}
70
71struct Info<'a> {
72 table_name: &'static str,
73 schema_name: &'static str,
74 entity_name: String,
75 desc: RelationDesc,
76 ontology: &'a Ontology,
77}
78
79enum Lit {
81 Str(String),
83 Json(String),
85 Null,
87}
88
89impl Lit {
90 fn render(&self) -> String {
91 match self {
92 Lit::Str(s) => format!("'{}'", esc(s)),
93 Lit::Json(s) => format!("'{}'::jsonb", esc(s)),
94 Lit::Null => "NULL".to_string(),
95 }
96 }
97}
98
99fn sql_type_name(ty: &SqlScalarType) -> &'static str {
101 match ty {
102 SqlScalarType::String => "text",
103 SqlScalarType::Jsonb => "jsonb",
104 SqlScalarType::Oid => "oid",
105 SqlScalarType::UInt64 => "uint8",
106 SqlScalarType::Numeric { .. } => "numeric",
107 SqlScalarType::MzTimestamp => "mz_timestamp",
108 SqlScalarType::TimestampTz { .. } => "timestamp with time zone",
109 other => panic!("unsupported SqlScalarType in ontology view: {other:?}"),
110 }
111}
112
113fn esc(s: &str) -> String {
117 s.replace('\'', "''")
118}
119
120fn values_sql(rows: &[Vec<Lit>]) -> String {
124 rows.iter()
125 .map(|row| {
126 let lits: Vec<String> = row.iter().map(Lit::render).collect();
127 format!("({})", lits.join(","))
128 })
129 .collect::<Vec<_>>()
130 .join(",")
131}
132
133fn values_view(
137 name: &'static str,
138 oid: u32,
139 cols: &[(&'static str, SqlScalarType, bool)],
140 keys: &[Vec<usize>],
141 rows: Vec<Vec<Lit>>,
142) -> BuiltinView {
143 let col_names: Vec<&str> = cols.iter().map(|(n, _, _)| *n).collect();
144 let cast_exprs: Vec<String> = cols
145 .iter()
146 .map(|(n, ty, _)| format!("{n}::{}", sql_type_name(ty)))
147 .collect();
148
149 let vals: Vec<String> = rows
150 .iter()
151 .map(|row| {
152 let lits: Vec<String> = row.iter().map(Lit::render).collect();
153 format!("({})", lits.join(","))
154 })
155 .collect();
156
157 let sql = format!(
158 "SELECT {casts} FROM (VALUES {vals}) AS t({cols})",
159 casts = cast_exprs.join(","),
160 vals = vals.join(","),
161 cols = col_names.join(","),
162 );
163
164 let mut b = RelationDesc::builder();
165 for (n, ty, nullable) in cols {
166 b = b.with_column(*n, ty.clone().nullable(*nullable));
167 }
168 let mut desc = b.finish();
169 for key in keys {
170 desc = desc.with_key(key.clone());
171 }
172 BuiltinView {
173 name,
174 schema: MZ_INTERNAL_SCHEMA,
175 oid,
176 desc,
177 column_comments: BTreeMap::new(),
178 sql: Box::leak(sql.into_boxed_str()),
179 access: vec![PUBLIC_SELECT],
180 ontology: None,
181 }
182}
183
184fn pk_lit(desc: &RelationDesc) -> Lit {
189 let all_keys = &desc.typ().keys;
190 let Some((first, rest)) = all_keys.split_first() else {
191 return Lit::Null;
192 };
193 let fmt_key = |key: &Vec<usize>| -> String {
194 let cols: Vec<_> = key
195 .iter()
196 .map(|&i| serde_json::to_string(desc.get_name(i).as_str()).expect("valid utf-8"))
197 .collect();
198 format!("[{}]", cols.join(", "))
199 };
200 let primary = fmt_key(first);
201 let json = if rest.is_empty() {
202 format!("{{\"primary_key\": {primary}}}")
203 } else {
204 let alts: Vec<_> = rest.iter().map(fmt_key).collect();
205 format!(
206 "{{\"primary_key\": {primary}, \"alternate_keys\": [{}]}}",
207 alts.join(", ")
208 )
209 };
210 Lit::Json(json)
211}
212
213fn entity_types_view(infos: &[Info]) -> BuiltinView {
216 let rows = infos
217 .iter()
218 .map(|i| {
219 vec![
220 Lit::Str(i.entity_name.clone()),
221 Lit::Str(format!("{}.{}", i.schema_name, i.table_name)),
222 pk_lit(&i.desc),
223 Lit::Str(i.ontology.description.to_string()),
224 ]
225 })
226 .collect();
227 values_view(
228 "mz_ontology_entity_types",
229 oid::VIEW_MZ_ONTOLOGY_ENTITY_TYPES_OID,
230 &[
231 ("name", SqlScalarType::String, false),
232 ("relation", SqlScalarType::String, false),
233 ("properties", SqlScalarType::Jsonb, true),
234 ("description", SqlScalarType::String, false),
235 ],
236 &[vec![0], vec![1], vec![3]],
237 rows,
238 )
239}
240
241fn semantic_types_view() -> BuiltinView {
242 let rows = SEMANTIC_TYPE_DEFS
243 .iter()
244 .map(|(n, t, d)| {
245 vec![
246 Lit::Str(n.to_string()),
247 Lit::Str(t.to_string()),
248 Lit::Str(d.to_string()),
249 ]
250 })
251 .collect();
252 values_view(
253 "mz_ontology_semantic_types",
254 oid::VIEW_MZ_ONTOLOGY_SEMANTIC_TYPES_OID,
255 &[
256 ("name", SqlScalarType::String, false),
257 ("sql_type", SqlScalarType::String, false),
258 ("description", SqlScalarType::String, false),
259 ],
260 &[vec![0], vec![2]],
261 rows,
262 )
263}
264
265fn properties_view(infos: &[Info]) -> BuiltinView {
280 let mut ent: Vec<Vec<Lit>> = Vec::new();
281 let mut ann: Vec<Vec<Lit>> = Vec::new();
282 for i in infos {
283 ent.push(vec![
284 Lit::Str(i.schema_name.to_string()),
285 Lit::Str(i.table_name.to_string()),
286 Lit::Str(i.entity_name.clone()),
287 ]);
288 for (col_name, sem) in i.ontology.column_semantic_types {
289 ann.push(vec![
290 Lit::Str(i.entity_name.clone()),
291 Lit::Str(col_name.to_string()),
292 Lit::Str(sem.to_string()),
293 ]);
294 }
295 }
296 let sql = format!(
297 "SELECT ent.entity_name AS entity_type,col.name AS column_name,\
298 ann.semantic_type::text AS semantic_type,cmt.comment AS description \
299 FROM (VALUES {ent}) AS ent(schema_name,table_name,entity_name) \
300 JOIN mz_catalog.mz_schemas s ON s.name=ent.schema_name \
301 JOIN mz_catalog.mz_objects o ON o.schema_id=s.id AND o.name=ent.table_name \
302 JOIN mz_catalog.mz_columns col ON col.id=o.id \
303 LEFT JOIN mz_internal.mz_comments cmt ON cmt.id=o.id AND cmt.object_sub_id=col.position \
304 LEFT JOIN (VALUES {ann}) AS ann(entity_name,column_name,semantic_type) \
305 ON ann.entity_name=ent.entity_name AND ann.column_name=col.name",
306 ent = values_sql(&ent),
307 ann = values_sql(&ann),
308 );
309
310 let mut b = RelationDesc::builder();
311 for (n, ty, nullable) in &[
312 ("entity_type", SqlScalarType::String, false),
313 ("column_name", SqlScalarType::String, false),
314 ("semantic_type", SqlScalarType::String, true),
315 ("description", SqlScalarType::String, true),
316 ] {
317 b = b.with_column(*n, ty.clone().nullable(*nullable));
318 }
319 BuiltinView {
320 name: "mz_ontology_properties",
321 schema: MZ_INTERNAL_SCHEMA,
322 oid: oid::VIEW_MZ_ONTOLOGY_PROPERTIES_OID,
323 desc: b.finish(),
324 column_comments: BTreeMap::new(),
325 sql: Box::leak(sql.into_boxed_str()),
326 access: vec![PUBLIC_SELECT],
327 ontology: None,
328 }
329}
330
331fn link_types_view(infos: &[Info]) -> BuiltinView {
332 let rows = infos
333 .iter()
334 .flat_map(|i| {
335 i.ontology.links.iter().map(move |l| {
336 vec![
337 Lit::Str(l.name.to_string()),
338 Lit::Str(i.entity_name.clone()),
339 Lit::Str(l.target.to_string()),
340 Lit::Json(
341 serde_json::to_string(&l.properties)
342 .expect("LinkProperties is serializable"),
343 ),
344 Lit::Null,
345 ]
346 })
347 })
348 .collect();
349 values_view(
350 "mz_ontology_link_types",
351 oid::VIEW_MZ_ONTOLOGY_LINK_TYPES_OID,
352 &[
353 ("name", SqlScalarType::String, false),
354 ("source_entity", SqlScalarType::String, false),
355 ("target_entity", SqlScalarType::String, false),
356 ("properties", SqlScalarType::Jsonb, false),
357 ("description", SqlScalarType::String, true),
358 ],
359 &[],
360 rows,
361 )
362}
363
364pub(super) const SEMANTIC_TYPE_DEFS: &[(SemanticType, &str, &str)] = &[
367 (
368 SemanticType::CatalogItemId,
369 "text",
370 "SQL-layer object ID. Format: s{n}/u{n}.",
371 ),
372 (
373 SemanticType::GlobalId,
374 "text",
375 "Runtime ID used by compute/storage. Format: s{n}/u{n}/si{n}.",
376 ),
377 (
378 SemanticType::ClusterId,
379 "text",
380 "Cluster ID. Format: s{n}/u{n}.",
381 ),
382 (
383 SemanticType::ReplicaId,
384 "text",
385 "Cluster replica ID. Format: s{n}/u{n}.",
386 ),
387 (
388 SemanticType::SchemaId,
389 "text",
390 "Schema ID. Format: s{n}/u{n}.",
391 ),
392 (
393 SemanticType::DatabaseId,
394 "text",
395 "Database ID. Format: s{n}/u{n}.",
396 ),
397 (
398 SemanticType::RoleId,
399 "text",
400 "Role ID. Format: s{n}/g{n}/u{n}/p.",
401 ),
402 (
403 SemanticType::NetworkPolicyId,
404 "text",
405 "Network policy ID. Format: s{n}/u{n}.",
406 ),
407 (
408 SemanticType::ShardId,
409 "text",
410 "Persist shard ID. Format: s{uuid}.",
411 ),
412 (
413 SemanticType::OID,
414 "oid",
415 "PostgreSQL-compatible object identifier.",
416 ),
417 (
418 SemanticType::ObjectType,
419 "text",
420 "Catalog object type discriminator (e.g., table, view, source, sink, index, materialized-view).",
421 ),
422 (
423 SemanticType::ConnectionType,
424 "text",
425 "Connection type discriminator (e.g., kafka, postgres, mysql, ssh-tunnel).",
426 ),
427 (
428 SemanticType::SourceType,
429 "text",
430 "Source type discriminator (e.g., kafka, postgres, mysql, webhook).",
431 ),
432 (
433 SemanticType::MzTimestamp,
434 "mz_timestamp",
435 "Internal logical timestamp (8-byte unsigned integer).",
436 ),
437 (
438 SemanticType::WallclockTimestamp,
439 "timestamp with time zone",
440 "Wall clock timestamp.",
441 ),
442 (SemanticType::ByteCount, "uint8", "A count of bytes."),
443 (
444 SemanticType::RecordCount,
445 "uint8",
446 "A count of records/rows.",
447 ),
448 (
449 SemanticType::CreditRate,
450 "numeric",
451 "Credits consumed per hour.",
452 ),
453 (
454 SemanticType::SqlDefinition,
455 "text",
456 "A SQL CREATE statement.",
457 ),
458 (
459 SemanticType::RedactedSqlDefinition,
460 "text",
461 "A redacted SQL CREATE statement.",
462 ),
463];
464
465#[cfg(test)]
466mod tests {
467 use mz_compute_client::logging::{LogVariant, TimelyLog};
468 use mz_repr::namespaces::MZ_INTROSPECTION_SCHEMA;
469 use mz_sql::catalog::NameReference;
470
471 use crate::builtin::ontology::generate_views;
472 use crate::builtin::{Builtin, BuiltinLog, Ontology};
473
474 fn make_log(variant: LogVariant, ontology: Option<Ontology>) -> Builtin<NameReference> {
475 let log = BuiltinLog {
476 variant,
477 name: "test_log",
478 schema: MZ_INTROSPECTION_SCHEMA,
479 oid: 99999,
480 access: vec![],
481 ontology,
482 };
483 Builtin::Log(Box::leak(Box::new(log)))
486 }
487
488 #[mz_ore::test]
489 #[cfg_attr(miri, ignore)] fn log_with_ontology_appears_in_entity_types() {
491 let builtins = vec![make_log(
492 LogVariant::Timely(TimelyLog::Operates),
493 Some(Ontology {
494 entity_name: "test_operator",
495 description: "test operator desc",
496 links: &[],
497 column_semantic_types: &[],
498 }),
499 )];
500 let views = generate_views(&builtins);
501 let entity_view = views
502 .iter()
503 .find_map(|b| match b {
504 Builtin::View(v) if v.name == "mz_ontology_entity_types" => Some(v),
505 _ => None,
506 })
507 .expect("entity_types view");
508 assert!(
509 entity_view.sql.contains("test_operator"),
510 "entity name should appear in entity_types SQL"
511 );
512 assert!(
513 entity_view.sql.contains("mz_introspection.test_log"),
514 "relation should be schema.name"
515 );
516 assert!(
518 entity_view.sql.contains(r#"["id", "worker_id"]"#),
519 "composite key should be rendered"
520 );
521 }
522
523 #[mz_ore::test]
524 #[cfg_attr(miri, ignore)] fn log_without_ontology_excluded() {
526 let builtins = vec![make_log(LogVariant::Timely(TimelyLog::Operates), None)];
527 let views = generate_views(&builtins);
528 let entity_view = views
529 .iter()
530 .find_map(|b| match b {
531 Builtin::View(v) if v.name == "mz_ontology_entity_types" => Some(v),
532 _ => None,
533 })
534 .expect("entity_types view");
535 assert!(
536 !entity_view.sql.contains("test_log"),
537 "unannotated log should not appear"
538 );
539 }
540}