mz_adapter/coord/sequencer/inner/
create_continual_task.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use mz_catalog::memory::objects::{
13    CatalogCollectionEntry, CatalogEntry, CatalogItem, ContinualTask, Table, TableDataSource,
14};
15use mz_compute_types::dataflows::DataflowDescription;
16use mz_compute_types::plan::Plan;
17use mz_compute_types::sinks::{
18    ComputeSinkConnection, ContinualTaskConnection, MaterializedViewSinkConnection,
19};
20use mz_expr::OptimizedMirRelationExpr;
21use mz_ore::collections::CollectionExt;
22use mz_ore::instrument;
23use mz_repr::adt::mz_acl_item::PrivilegeMap;
24use mz_repr::optimize::OverrideFrom;
25use mz_repr::{
26    GlobalId, RelationVersion, RelationVersionSelector, Timestamp, VersionedRelationDesc,
27};
28use mz_sql::ast::visit_mut::{VisitMut, VisitMutNode};
29use mz_sql::ast::{Raw, RawItemName};
30use mz_sql::names::{FullItemName, PartialItemName, ResolvedIds};
31use mz_sql::normalize::unresolved_item_name;
32use mz_sql::plan;
33use mz_sql::session::metadata::SessionMetadata;
34use mz_sql_parser::ast::Statement;
35use mz_sql_parser::ast::display::AstDisplay;
36use mz_storage_client::controller::CollectionDescription;
37use mz_transform::dataflow::DataflowMetainfo;
38use mz_transform::notice::OptimizerNotice;
39
40use crate::command::ExecuteResponse;
41use crate::coord::Coordinator;
42use crate::error::AdapterError;
43use crate::optimize::dataflows::dataflow_import_id_bundle;
44use crate::optimize::{self, Optimize, OptimizerCatalog};
45use crate::util::ResultExt;
46use crate::{ExecuteContext, catalog};
47
48impl Coordinator {
49    #[instrument]
50    pub(crate) async fn sequence_create_continual_task(
51        &mut self,
52        ctx: &mut ExecuteContext,
53        plan: plan::CreateContinualTaskPlan,
54        resolved_ids: ResolvedIds,
55    ) -> Result<ExecuteResponse, AdapterError> {
56        let desc = plan.desc.clone();
57        let name = plan.name.clone();
58        let cluster_id = plan.continual_task.cluster_id;
59
60        // Put a placeholder in the catalog so the optimizer can find something
61        // for the sink_id.
62        let id_ts = self.get_catalog_write_ts().await;
63        let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
64        let collections = [(RelationVersion::root(), global_id)].into_iter().collect();
65
66        let entry = CatalogEntry {
67            item: CatalogItem::Table(Table {
68                create_sql: None,
69                desc: VersionedRelationDesc::new(desc.clone()),
70                collections,
71                conn_id: None,
72                resolved_ids: resolved_ids.clone(),
73                custom_logical_compaction_window: None,
74                is_retained_metrics_object: false,
75                data_source: TableDataSource::TableWrites {
76                    defaults: Vec::new(),
77                },
78            }),
79            referenced_by: Vec::new(),
80            used_by: Vec::new(),
81            id: item_id,
82            oid: 0,
83            name: name.clone(),
84            owner_id: *ctx.session().current_role_id(),
85            privileges: PrivilegeMap::new(),
86        };
87        let bootstrap_catalog = ContinualTaskCatalogBootstrap {
88            delegate: self.owned_catalog().as_optimizer_catalog(),
89            sink_id: global_id,
90            entry: CatalogCollectionEntry {
91                entry,
92                version: RelationVersionSelector::Latest,
93            },
94        };
95
96        // Construct the CatalogItem for this CT and optimize it.
97        let mut item = crate::continual_task::ct_item_from_plan(plan, global_id, resolved_ids)?;
98        let full_name = bootstrap_catalog.resolve_full_name(&name, Some(ctx.session().conn_id()));
99        let (optimized_plan, mut physical_plan, metainfo) = self.optimize_create_continual_task(
100            &item,
101            global_id,
102            Arc::new(bootstrap_catalog),
103            full_name.to_string(),
104        )?;
105
106        // Timestamp selection
107        let mut id_bundle = dataflow_import_id_bundle(&physical_plan, cluster_id.clone());
108        // Can't acquire a read hold on ourselves because we don't exist yet.
109        //
110        // It is not necessary to take a read hold on the CT output in the
111        // coordinator, since the current scheme takes read holds in the
112        // coordinator only to ensure inputs don't get compacted until the
113        // compute controller has installed its own read holds, which happens
114        // below with the `ship_dataflow` call.
115        id_bundle.storage_ids.remove(&global_id);
116        let read_holds = self.acquire_read_holds(&id_bundle);
117        let as_of = read_holds.least_valid_read();
118        physical_plan.set_as_of(as_of.clone());
119        // Used in dataflow rendering to avoid the snapshot for CTs that are
120        // restarted after they have committed the snapshot output.
121        physical_plan.set_initial_as_of(as_of.clone());
122
123        // Rewrite `create_sql` to reference self with the fully qualified name.
124        // This is normally done when `create_sql` is created at plan time, but
125        // we didn't have the necessary info in name resolution.
126        item.create_sql = update_create_sql(&item.create_sql, &full_name, as_of.as_option());
127
128        let ops = vec![catalog::Op::CreateItem {
129            id: item_id,
130            name: name.clone(),
131            item: CatalogItem::ContinualTask(item),
132            owner_id: *ctx.session().current_role_id(),
133        }];
134
135        let () = self
136            .catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
137                Box::pin(async move {
138                    let catalog = coord.catalog_mut();
139                    catalog.set_optimized_plan(global_id, optimized_plan);
140                    catalog.set_physical_plan(global_id, physical_plan.clone());
141                    catalog.set_dataflow_metainfo(global_id, metainfo);
142
143                    coord
144                        .controller
145                        .storage
146                        .create_collections(
147                            coord.catalog.state().storage_metadata(),
148                            None,
149                            vec![(
150                                global_id,
151                                CollectionDescription::for_other(desc, Some(as_of)),
152                            )],
153                        )
154                        .await
155                        .unwrap_or_terminate("cannot fail to append");
156
157                    coord.ship_dataflow(physical_plan, cluster_id, None).await;
158                    coord.allow_writes(cluster_id, global_id);
159                })
160            })
161            .await?;
162        Ok(ExecuteResponse::CreatedContinualTask)
163    }
164
165    pub fn optimize_create_continual_task(
166        &self,
167        ct: &ContinualTask,
168        output_id: GlobalId,
169        catalog: Arc<dyn OptimizerCatalog>,
170        debug_name: String,
171    ) -> Result<
172        (
173            DataflowDescription<OptimizedMirRelationExpr>,
174            DataflowDescription<Plan>,
175            DataflowMetainfo<Arc<OptimizerNotice>>,
176        ),
177        AdapterError,
178    > {
179        let catalog = Arc::new(NoIndexCatalog { delegate: catalog });
180
181        let (_, view_id) = self.allocate_transient_id();
182        let compute_instance = self
183            .instance_snapshot(ct.cluster_id)
184            .expect("compute instance does not exist");
185        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config())
186            .override_from(&self.catalog.get_cluster(ct.cluster_id).config.features());
187        let non_null_assertions = Vec::new();
188        let refresh_schedule = None;
189        // Continual Tasks turn an "input" into diffs by inserting retractions,
190        // which removes any monotonicity properties the collection otherwise
191        // would have had.
192        let force_non_monotonic = [ct.input_id].into();
193        let mut optimizer = optimize::materialized_view::Optimizer::new(
194            catalog,
195            compute_instance,
196            output_id,
197            view_id,
198            ct.desc.iter_names().cloned().collect(),
199            non_null_assertions,
200            refresh_schedule,
201            debug_name,
202            optimizer_config,
203            self.optimizer_metrics(),
204            force_non_monotonic,
205        );
206
207        // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
208        let local_mir_plan = optimizer.catch_unwind_optimize((*ct.raw_expr).clone())?;
209        let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan)?;
210        let optimized_plan = global_mir_plan.df_desc().clone();
211        // MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global)
212        let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan)?;
213        let (mut physical_plan, metainfo) = global_lir_plan.unapply();
214
215        // The MV optimizer is hardcoded to output a PersistSinkConnection.
216        // Sniff it and swap for the ContinualTaskSink. If/when we split out a
217        // Continual Task optimizer, this won't be necessary, and in the
218        // meantime, it seems undesirable to burden the MV optimizer with a
219        // configuration for this.
220        for sink in physical_plan.sink_exports.values_mut() {
221            match &mut sink.connection {
222                ComputeSinkConnection::MaterializedView(MaterializedViewSinkConnection {
223                    storage_metadata,
224                    ..
225                }) => {
226                    sink.with_snapshot = ct.with_snapshot;
227                    sink.connection =
228                        ComputeSinkConnection::ContinualTask(ContinualTaskConnection {
229                            input_id: ct.input_id,
230                            storage_metadata: *storage_metadata,
231                        })
232                }
233                _ => unreachable!("MV should produce persist sink connection"),
234            }
235        }
236
237        // Create a metainfo with rendered notices, preallocating a transient
238        // GlobalId for each.
239        let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
240            .map(|(_item_id, global_id)| global_id)
241            .take(metainfo.optimizer_notices.len())
242            .collect();
243        let metainfo = self
244            .catalog()
245            .render_notices(metainfo, notice_ids, Some(output_id));
246
247        Ok((optimized_plan, physical_plan, metainfo))
248    }
249}
250
251/// An implementation of [OptimizerCatalog] with a placeholder for the continual
252/// task to solve the self-referential CT bootstrapping problem.
253#[derive(Debug)]
254struct ContinualTaskCatalogBootstrap {
255    delegate: Arc<dyn OptimizerCatalog>,
256    sink_id: GlobalId,
257    entry: CatalogCollectionEntry,
258}
259
260impl OptimizerCatalog for ContinualTaskCatalogBootstrap {
261    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
262        if self.sink_id == *id {
263            return self.entry.clone();
264        }
265        self.delegate.get_entry(id)
266    }
267
268    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
269        self.delegate.get_entry_by_item_id(id)
270    }
271
272    fn resolve_full_name(
273        &self,
274        name: &mz_sql::names::QualifiedItemName,
275        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
276    ) -> mz_sql::names::FullItemName {
277        self.delegate.resolve_full_name(name, conn_id)
278    }
279
280    fn get_indexes_on(
281        &self,
282        id: GlobalId,
283        cluster: mz_controller_types::ClusterId,
284    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
285        self.delegate.get_indexes_on(id, cluster)
286    }
287}
288
289fn update_create_sql(
290    create_sql: &str,
291    ct_name: &FullItemName,
292    as_of: Option<&Timestamp>,
293) -> String {
294    struct ReplaceName(PartialItemName);
295    impl<'ast> VisitMut<'ast, Raw> for ReplaceName {
296        fn visit_item_name_mut(&mut self, node: &'ast mut RawItemName) {
297            let Ok(name) = unresolved_item_name(node.name().clone()) else {
298                return;
299            };
300            if name.matches(&self.0) {
301                *(node.name_mut()) = self.0.clone().into();
302            }
303        }
304    }
305
306    let mut ast = mz_sql_parser::parser::parse_statements(create_sql)
307        .expect("non-system items must be parseable")
308        .into_element()
309        .ast;
310    match &mut ast {
311        Statement::CreateContinualTask(stmt) => {
312            // Replace any self-references in the statements with the full name,
313            // now that we have it.
314            stmt.visit_mut(&mut ReplaceName(PartialItemName::from(ct_name.clone())));
315            // Also fill in the initial as_of.
316            if let Some(as_of) = as_of {
317                stmt.as_of = Some(as_of.into());
318            }
319        }
320        _ => unreachable!("should be CREATE CONTINUAL TASK statement"),
321    }
322    ast.to_ast_string_stable()
323}
324
325/// An [OptimizerCatalog] impl that ignores any indexes that exist.
326///
327/// TODO(ct3): At the moment, the dataflow rendering for CTs only knows how to
328/// turn persist_sources into CT inputs. If the optimizer decides to use an
329/// existing index in the cluster, it won't work. It seems tricky/invasive to
330/// fix the render bug, so for now pretend indexes don't exist for CTs. Remove
331/// this once we fix the bug.
332#[derive(Debug)]
333struct NoIndexCatalog {
334    delegate: Arc<dyn OptimizerCatalog>,
335}
336
337impl OptimizerCatalog for NoIndexCatalog {
338    fn get_entry(&self, id: &GlobalId) -> CatalogCollectionEntry {
339        self.delegate.get_entry(id)
340    }
341
342    fn get_entry_by_item_id(&self, id: &mz_repr::CatalogItemId) -> &CatalogEntry {
343        self.delegate.get_entry_by_item_id(id)
344    }
345
346    fn resolve_full_name(
347        &self,
348        name: &mz_sql::names::QualifiedItemName,
349        conn_id: Option<&mz_adapter_types::connection::ConnectionId>,
350    ) -> mz_sql::names::FullItemName {
351        self.delegate.resolve_full_name(name, conn_id)
352    }
353
354    fn get_indexes_on(
355        &self,
356        _id: GlobalId,
357        _cluster: mz_controller_types::ClusterId,
358    ) -> Box<dyn Iterator<Item = (GlobalId, &mz_catalog::memory::objects::Index)> + '_> {
359        Box::new(std::iter::empty())
360    }
361}