mz_adapter/
continual_task.rs1use std::sync::Arc;
13
14use mz_catalog::memory::objects::ContinualTask;
15use mz_expr::Id;
16use mz_expr::visit::Visit;
17use mz_repr::GlobalId;
18use mz_sql::names::ResolvedIds;
19use mz_sql::plan::{self, HirRelationExpr};
20use timely::progress::Antichain;
21
22use crate::AdapterError;
23
24pub fn ct_item_from_plan(
25 plan: plan::CreateContinualTaskPlan,
26 global_id: GlobalId,
27 resolved_ids: ResolvedIds,
28) -> Result<ContinualTask, AdapterError> {
29 let plan::CreateContinualTaskPlan {
30 name: _,
31 placeholder_id,
32 desc,
33 input_id,
34 with_snapshot,
35 continual_task:
36 plan::MaterializedView {
37 create_sql,
38 cluster_id,
39 expr: mut raw_expr,
40 dependencies,
41 column_names: _,
42 replacement_target: _,
43 target_replica: _,
44 non_null_assertions: _,
45 compaction_window: _,
46 refresh_schedule: _,
47 as_of,
48 },
49 } = plan;
50
51 if let Some(placeholder_id) = placeholder_id {
53 raw_expr.visit_mut_post(&mut |expr| match expr {
54 HirRelationExpr::Get { id, .. } if *id == Id::Local(placeholder_id) => {
55 *id = Id::Global(global_id);
56 }
57 _ => {}
58 })?;
59 }
60 Ok(ContinualTask {
64 create_sql,
65 global_id,
66 input_id,
67 with_snapshot,
68 raw_expr: Arc::new(raw_expr),
69 desc,
70 resolved_ids,
71 dependencies,
72 cluster_id,
73 initial_as_of: as_of.map(Antichain::from_elem),
74 })
75}