1use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use mz_compute_types::dataflows::IndexDesc;
32use mz_compute_types::plan::Plan;
33use mz_repr::GlobalId;
34use mz_repr::explain::trace_plan;
35use mz_sql::names::QualifiedItemName;
36use mz_sql::optimizer_metrics::OptimizerMetrics;
37use mz_transform::TransformCtx;
38use mz_transform::dataflow::DataflowMetainfo;
39use mz_transform::normalize_lets::normalize_lets;
40use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty};
41use mz_transform::reprtypecheck::{
42 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
43};
44use mz_transform::typecheck::{SharedContext as TypecheckContext, empty_context};
45
46use crate::optimize::dataflows::{
47 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, prep_relation_expr, prep_scalar_expr,
48};
49use crate::optimize::{
50 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
51 OptimizerConfig, OptimizerError, trace_plan,
52};
53
54pub struct Optimizer {
55 typecheck_ctx: TypecheckContext,
57 repr_typecheck_ctx: ReprTypecheckContext,
59 catalog: Arc<dyn OptimizerCatalog>,
61 compute_instance: ComputeInstanceSnapshot,
63 exported_index_id: GlobalId,
65 config: OptimizerConfig,
67 metrics: OptimizerMetrics,
69 duration: Duration,
71}
72
73impl Optimizer {
74 pub fn new(
75 catalog: Arc<dyn OptimizerCatalog>,
76 compute_instance: ComputeInstanceSnapshot,
77 exported_index_id: GlobalId,
78 config: OptimizerConfig,
79 metrics: OptimizerMetrics,
80 ) -> Self {
81 Self {
82 typecheck_ctx: empty_context(),
83 repr_typecheck_ctx: empty_repr_context(),
84 catalog,
85 compute_instance,
86 exported_index_id,
87 config,
88 metrics,
89 duration: Default::default(),
90 }
91 }
92}
93
94pub struct Index {
96 name: QualifiedItemName,
97 on: GlobalId,
98 keys: Vec<mz_expr::MirScalarExpr>,
99}
100
101impl Index {
102 pub fn new(name: QualifiedItemName, on: GlobalId, keys: Vec<mz_expr::MirScalarExpr>) -> Self {
104 Self { name, on, keys }
105 }
106}
107
108#[derive(Clone, Debug)]
114pub struct GlobalMirPlan {
115 df_desc: MirDataflowDescription,
116 df_meta: DataflowMetainfo,
117}
118
119impl GlobalMirPlan {
120 pub fn df_desc(&self) -> &MirDataflowDescription {
121 &self.df_desc
122 }
123}
124
125#[derive(Clone, Debug)]
128pub struct GlobalLirPlan {
129 df_desc: LirDataflowDescription,
130 df_meta: DataflowMetainfo,
131}
132
133impl GlobalLirPlan {
134 pub fn df_desc(&self) -> &LirDataflowDescription {
135 &self.df_desc
136 }
137
138 pub fn df_meta(&self) -> &DataflowMetainfo {
139 &self.df_meta
140 }
141}
142
143impl Optimize<Index> for Optimizer {
144 type To = GlobalMirPlan;
145
146 fn optimize(&mut self, index: Index) -> Result<Self::To, OptimizerError> {
147 let time = Instant::now();
148
149 let on_entry = self.catalog.get_entry(&index.on);
150 let full_name = self
151 .catalog
152 .resolve_full_name(&index.name, on_entry.conn_id());
153 let on_desc = on_entry
154 .desc(&full_name)
155 .expect("can only create indexes on items with a valid description");
156
157 let mut df_builder = {
158 let compute = self.compute_instance.clone();
159 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
160 };
161 let mut df_desc = MirDataflowDescription::new(full_name.to_string());
162
163 df_builder.import_into_dataflow(&index.on, &mut df_desc, &self.config.features)?;
164 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
165
166 let index_desc = IndexDesc {
167 on_id: index.on,
168 key: index.keys.clone(),
169 };
170 df_desc.export_index(self.exported_index_id, index_desc, on_desc.typ().clone());
171
172 let style = ExprPrepStyle::Maintained;
174 df_desc.visit_children(
175 |r| prep_relation_expr(r, style),
176 |s| prep_scalar_expr(s, style),
177 )?;
178
179 let mut df_meta = DataflowMetainfo::default();
181 let mut transform_ctx = TransformCtx::global(
182 &df_builder,
183 &mz_transform::EmptyStatisticsOracle, &self.config.features,
185 &self.typecheck_ctx,
186 &self.repr_typecheck_ctx,
187 &mut df_meta,
188 Some(&self.metrics),
189 );
190 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
192
193 if self.config.mode == OptimizeMode::Explain {
194 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
196 }
197
198 if index.keys.is_empty() {
200 df_meta.push_optimizer_notice_dedup(IndexKeyEmpty);
201 }
202
203 for (index_id, idx) in df_builder
206 .indexes_on(index.on)
207 .filter(|(_id, idx)| idx.keys.as_ref() == &index.keys)
208 {
209 df_meta.push_optimizer_notice_dedup(IndexAlreadyExists {
210 index_id,
211 index_key: idx.keys.to_vec(),
212 index_on_id: idx.on,
213 exported_index_id: self.exported_index_id,
214 });
215 }
216
217 self.duration += time.elapsed();
218
219 Ok(GlobalMirPlan { df_desc, df_meta })
221 }
222}
223
224impl Optimize<GlobalMirPlan> for Optimizer {
225 type To = GlobalLirPlan;
226
227 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
228 let time = Instant::now();
229
230 let GlobalMirPlan {
231 mut df_desc,
232 df_meta,
233 } = plan;
234
235 for build in df_desc.objects_to_build.iter_mut() {
237 normalize_lets(&mut build.plan.0, &self.config.features)?
238 }
239
240 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
244
245 trace_plan(&df_desc);
247
248 self.duration += time.elapsed();
249 self.metrics
250 .observe_e2e_optimization_time("index", self.duration);
251
252 Ok(GlobalLirPlan { df_desc, df_meta })
254 }
255}
256
257impl GlobalLirPlan {
258 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
260 (self.df_desc, self.df_meta)
261 }
262}