mz_adapter/coord/sequencer/inner/
create_continual_task.rs1use std::sync::Arc;
11
12use mz_catalog::memory::objects::{
13 CatalogCollectionEntry, CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource,
14};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::Plan;
17use mz_compute_types::sinks::{
18 ComputeSinkConnection, ContinualTaskConnection, MaterializedViewSinkConnection,
19};
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::collections::CollectionExt;
22use mz_ore::instrument;
23use mz_repr::adt::mz_acl_item::PrivilegeMap;
24use mz_repr::optimize::{OptimizerFeatures, OverrideFrom};
25use mz_repr::{
26 GlobalId, RelationVersion, RelationVersionSelector, Timestamp, VersionedRelationDesc,
27};
28use mz_sql::ast::visit_mut::{VisitMut, VisitMutNode};
29use mz_sql::ast::{Raw, RawItemName};
30use mz_sql::names::{FullItemName, PartialItemName, ResolvedIds};
31use mz_sql::normalize::unresolved_item_name;
32use mz_sql::plan;
33use mz_sql::session::metadata::SessionMetadata;
34use mz_sql_parser::ast::Statement;
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_storage_client::controller::CollectionDescription;
37use mz_transform::dataflow::DataflowMetainfo;
38use mz_transform::notice::OptimizerNotice;
39
40use crate::command::ExecuteResponse;
41use crate::coord::Coordinator;
42use crate::error::AdapterError;
43use crate::optimize::dataflows::dataflow_import_id_bundle;
44use crate::optimize::{self, Optimize, OptimizerCatalog};
45use crate::util::ResultExt;
46use crate::{ExecuteContext, catalog};
47
48impl Coordinator {
49 #[instrument]
50 pub(crate) async fn sequence_create_continual_task(
51 &mut self,
52 ctx: &mut ExecuteContext,
53 plan: plan::CreateContinualTaskPlan,
54 resolved_ids: ResolvedIds,
55 ) -> Result<ExecuteResponse, AdapterError> {
56 let desc = plan.desc.clone();
57 let name = plan.name.clone();
58 let cluster_id = plan.continual_task.cluster_id;
59
60 let id_ts = self.get_catalog_write_ts().await;
63 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
64 let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
65
66 let entry = CatalogEntry {
67 item: CatalogItem::Table(Table {
68 create_sql: None,
69 desc: VersionedRelationDesc::new(desc.clone()),
70 collections,
71 conn_id: None,
72 resolved_ids: resolved_ids.clone(),
73 custom_logical_compaction_window: None,
74 is_retained_metrics_object: false,
75 data_source: TableDataSource::TableWrites {
76 defaults: Vec::new(),
77 },
78 }),
79 referenced_by: Vec::new(),
80 used_by: Vec::new(),
81 id: item_id,
82 oid: 0,
83 name: name.clone(),
84 owner_id: *ctx.session().current_role_id(),
85 privileges: PrivilegeMap::new(),
86 };
87 let bootstrap_catalog = ContinualTaskCatalogBootstrap {
88 delegate: self.owned_catalog().as_optimizer_catalog(),
89 sink_id: global_id,
90 entry: CatalogCollectionEntry {
91 entry,
92 version: RelationVersionSelector::Latest,
93 },
94 };
95
96 let mut item = crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids)?;
98 let full_name = bootstrap_catalog.resolve_full_name(&name, Some(ctx.session().conn_id()));
99 let (optimized_plan, mut physical_plan, metainfo, optimizer_features) = self
100 .optimize_create_continual_task(
101 &item,
102 global_id,
103 Arc::new(bootstrap_catalog),
104 full_name.to_string(),
105 )?;
106
107 let mut id_bundle = dataflow_import_id_bundle(&physical_plan, cluster_id.clone());
109 id_bundle.storage_ids.remove(&global_id);
117 let read_holds = self.acquire_read_holds(&id_bundle);
118 let as_of = read_holds.least_valid_read();
119 physical_plan.set_as_of(as_of.clone());
120 physical_plan.set_initial_as_of(as_of.clone());
123
124 item.create_sql = update_create_sql(&item.create_sql, &full_name, as_of.as_option());
128
129 let ops = vec![catalog::Op::CreateItem {
130 id: item_id,
131 name: name.clone(),
132 item: CatalogItem::ContinualTask(item),
133 owner_id: *ctx.session().current_role_id(),
134 }];
135
136 let () = self
137 .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
138 Box::pin(async move {
139 let catalog = coord.catalog_mut();
140 catalog.set_optimized_plan(global_id, optimized_plan);
141 catalog.set_physical_plan(global_id, physical_plan.clone());
142 catalog.set_dataflow_metainfo(global_id, metainfo);
143 catalog.cache_expressions(global_id, None, optimizer_features);
144
145 coord
146 .controller
147 .storage
148 .create_collections(
149 coord.catalog.state().storage_metadata(),
150 None,
151 vec![(
152 global_id,
153 CollectionDescription::for_other(desc, Some(as_of)),
154 )],
155 )
156 .await
157 .unwrap_or_terminate("cannot fail to append");
158
159 coord.ship_dataflow(physical_plan, cluster_id, None).await;
160 coord.allow_writes(cluster_id, global_id);
161 })
162 })
163 .await?;
164 Ok(ExecuteResponse::CreatedContinualTask)
165 }
166
167 pub fn optimize_create_continual_task(
168 &self,
169 ct: &ContinualTask,
170 output_id: GlobalId,
171 catalog: Arc<dyn OptimizerCatalog>,
172 debug_name: String,
173 ) -> Result<
174 (
175 DataflowDescription<OptimizedMirRelationExpr>,
176 DataflowDescription<Plan>,
177 DataflowMetainfo<Arc<OptimizerNotice>>,
178 OptimizerFeatures,
179 ),
180 AdapterError,
181 > {
182 let catalog = Arc::new(NoIndexCatalog { delegate: catalog });
183
184 let (_, view_id) = self.allocate_transient_id();
185 let compute_instance = self
186 .instance_snapshot(ct.cluster_id)
187 .expect("compute instance does not exist");
188 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
189 .override_from(&self.catalog.get_cluster(ct.cluster_id).config.features());
190 let optimizer_features = optimizer_config.features.clone();
191 let non_null_assertions = Vec::new();
192 let refresh_schedule = None;
193 let force_non_monotonic = [ct.input_id].into();
197 let mut optimizer = optimize::materialized_view::Optimizer::new(
198 catalog,
199 compute_instance,
200 output_id,
201 view_id,
202 ct.desc.iter_names().cloned().collect(),
203 non_null_assertions,
204 refresh_schedule,
205 debug_name,
206 optimizer_config,
207 self.optimizer_metrics(),
208 force_non_monotonic,
209 );
210
211 let local_mir_plan = optimizer.catch_unwind_optimize((*ct.raw_expr).clone())?;
213 let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
214 let optimized_plan = global_mir_plan.df_desc().clone();
215 let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan)?;
217 let (mut physical_plan, metainfo) = global_lir_plan.unapply();
218
219 for sink in physical_plan.sink_exports.values_mut() {
225 match &mut sink.connection {
226 ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
227 storage_metadata,
228 ..
229 }) => {
230 sink.with_snapshot = ct.with_snapshot;
231 sink.connection =
232 ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
233 input_id: ct.input_id,
234 storage_metadata: *storage_metadata,
235 })
236 }
237 _ => unreachable!("MV should produce persist sink connection"),
238 }
239 }
240
241 let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
244 .map(|(_item_id, global_id)| global_id)
245 .take(metainfo.optimizer_notices.len())
246 .collect();
247 let metainfo = self
248 .catalog()
249 .render_notices(metainfo, notice_ids, Some(output_id));
250
251 Ok((optimized_plan, physical_plan, metainfo, optimizer_features))
252 }
253}
254
255#[derive(Debug)]
258struct ContinualTaskCatalogBootstrap {
259 delegate: Arc<dyn OptimizerCatalog>,
260 sink_id: GlobalId,
261 entry: CatalogCollectionEntry,
262}
263
264impl OptimizerCatalog for ContinualTaskCatalogBootstrap {
265 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
266 if self.sink_id == *id {
267 return self.entry.clone();
268 }
269 self.delegate.get_entry(id)
270 }
271
272 fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
273 self.delegate.get_entry_by_item_id(id)
274 }
275
276 fn resolve_full_name(
277 &self,
278 name: &mz_sql::names::QualifiedItemName,
279 conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
280 ) -> mz_sql::names::FullItemName {
281 self.delegate.resolve_full_name(name, conn_id)
282 }
283
284 fn get_indexes_on(
285 &self,
286 id: GlobalId,
287 cluster: mz_controller_types::ClusterId,
288 ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
289 self.delegate.get_indexes_on(id, cluster)
290 }
291}
292
293fn update_create_sql(
294 create_sql: &str,
295 ct_name: &FullItemName,
296 as_of: Option<&Timestamp>,
297) -> String {
298 struct ReplaceName(PartialItemName);
299 impl<'ast> VisitMut<'ast, Raw> for ReplaceName {
300 fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) {
301 let Ok(name) = unresolved_item_name(node.name().clone()) else {
302 return;
303 };
304 if name.matches(&self.0) {
305 *(node.name_mut()) = self.0.clone().into();
306 }
307 }
308 }
309
310 let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
311 .expect("non-system items must be parseable")
312 .into_element()
313 .ast;
314 match &mut ast {
315 Statement::CreateContinualTask(stmt) => {
316 stmt.visit_mut(&mut ReplaceName(PartialItemName::from(ct_name.clone())));
319 if let Some(as_of) = as_of {
321 stmt.as_of = Some(as_of.into());
322 }
323 }
324 _ => unreachable!("should be CREATE CONTINUAL TASK statement"),
325 }
326 ast.to_ast_string_stable()
327}
328
329#[derive(Debug)]
337struct NoIndexCatalog {
338 delegate: Arc<dyn OptimizerCatalog>,
339}
340
341impl OptimizerCatalog for NoIndexCatalog {
342 fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
343 self.delegate.get_entry(id)
344 }
345
346 fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
347 self.delegate.get_entry_by_item_id(id)
348 }
349
350 fn resolve_full_name(
351 &self,
352 name: &mz_sql::names::QualifiedItemName,
353 conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
354 ) -> mz_sql::names::FullItemName {
355 self.delegate.resolve_full_name(name, conn_id)
356 }
357
358 fn get_indexes_on(
359 &self,
360 _id: GlobalId,
361 _cluster: mz_controller_types::ClusterId,
362 ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
363 Box::new(std::iter::empty())
364 }
365}