1use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use mz_compute_types::dataflows::IndexDesc;
32use mz_compute_types::plan::Plan;
33use mz_repr::explain::trace_plan;
34use mz_repr::{GlobalId, ReprRelationType};
35use mz_sql::names::QualifiedItemName;
36use mz_sql::optimizer_metrics::OptimizerMetrics;
37use mz_transform::TransformCtx;
38use mz_transform::dataflow::DataflowMetainfo;
39use mz_transform::normalize_lets::normalize_lets;
40use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty};
41use mz_transform::typecheck::{SharedTypecheckingContext, empty_typechecking_context};
42
43use crate::optimize::dataflows::{
44 ComputeInstanceSnapshot, DataflowBuilder, ExprPrep, ExprPrepMaintained,
45};
46use crate::optimize::{
47 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
48 OptimizerConfig, OptimizerError, trace_plan,
49};
50
51pub struct Optimizer {
52 typecheck_ctx: SharedTypecheckingContext,
54 catalog: Arc<dyn OptimizerCatalog>,
56 compute_instance: ComputeInstanceSnapshot,
58 exported_index_id: GlobalId,
60 config: OptimizerConfig,
62 metrics: OptimizerMetrics,
64 duration: Duration,
66}
67
68impl Optimizer {
69 pub fn new(
70 catalog: Arc<dyn OptimizerCatalog>,
71 compute_instance: ComputeInstanceSnapshot,
72 exported_index_id: GlobalId,
73 config: OptimizerConfig,
74 metrics: OptimizerMetrics,
75 ) -> Self {
76 Self {
77 typecheck_ctx: empty_typechecking_context(),
78 catalog,
79 compute_instance,
80 exported_index_id,
81 config,
82 metrics,
83 duration: Default::default(),
84 }
85 }
86}
87
88pub struct Index {
90 name: QualifiedItemName,
91 on: GlobalId,
92 keys: Vec<mz_expr::MirScalarExpr>,
93}
94
95impl Index {
96 pub fn new(name: QualifiedItemName, on: GlobalId, keys: Vec<mz_expr::MirScalarExpr>) -> Self {
98 Self { name, on, keys }
99 }
100}
101
102#[derive(Clone, Debug)]
108pub struct GlobalMirPlan {
109 df_desc: MirDataflowDescription,
110 df_meta: DataflowMetainfo,
111}
112
113impl GlobalMirPlan {
114 pub fn df_desc(&self) -> &MirDataflowDescription {
115 &self.df_desc
116 }
117}
118
119#[derive(Clone, Debug)]
122pub struct GlobalLirPlan {
123 df_desc: LirDataflowDescription,
124 df_meta: DataflowMetainfo,
125}
126
127impl GlobalLirPlan {
128 pub fn df_desc(&self) -> &LirDataflowDescription {
129 &self.df_desc
130 }
131
132 pub fn df_meta(&self) -> &DataflowMetainfo {
133 &self.df_meta
134 }
135}
136
137impl Optimize<Index> for Optimizer {
138 type To = GlobalMirPlan;
139
140 fn optimize(&mut self, index: Index) -> Result<Self::To, OptimizerError> {
141 let time = Instant::now();
142
143 let on_entry = self.catalog.get_entry(&index.on);
144 let full_name = self
145 .catalog
146 .resolve_full_name(&index.name, on_entry.conn_id());
147 let on_desc = on_entry
148 .relation_desc()
149 .expect("can only create indexes on items with a valid description");
150
151 let mut df_builder = {
152 let compute = self.compute_instance.clone();
153 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
154 };
155 let mut df_desc = MirDataflowDescription::new(full_name.to_string());
156
157 df_builder.import_into_dataflow(&index.on, &mut df_desc, &self.config.features)?;
158 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
159
160 let index_desc = IndexDesc {
161 on_id: index.on,
162 key: index.keys.clone(),
163 };
164 df_desc.export_index(
165 self.exported_index_id,
166 index_desc,
167 ReprRelationType::from(on_desc.typ()),
168 );
169
170 let style = ExprPrepMaintained;
172 df_desc.visit_children(
173 |r| style.prep_relation_expr(r),
174 |s| style.prep_scalar_expr(s),
175 )?;
176
177 let mut df_meta = DataflowMetainfo::default();
179 let mut transform_ctx = TransformCtx::global(
180 &df_builder,
181 &mz_transform::EmptyStatisticsOracle, &self.config.features,
183 &self.typecheck_ctx,
184 &mut df_meta,
185 Some(&mut self.metrics),
186 );
187 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
189
190 if self.config.mode == OptimizeMode::Explain {
191 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
193 }
194
195 if index.keys.is_empty() {
197 df_meta.push_optimizer_notice_dedup(IndexKeyEmpty);
198 }
199
200 for (index_id, idx) in df_builder
203 .indexes_on(index.on)
204 .filter(|(_id, idx)| idx.keys.as_ref() == &index.keys)
205 {
206 df_meta.push_optimizer_notice_dedup(IndexAlreadyExists {
207 index_id,
208 index_key: idx.keys.to_vec(),
209 index_on_id: idx.on,
210 exported_index_id: self.exported_index_id,
211 });
212 }
213
214 self.duration += time.elapsed();
215
216 Ok(GlobalMirPlan { df_desc, df_meta })
218 }
219}
220
221impl Optimize<GlobalMirPlan> for Optimizer {
222 type To = GlobalLirPlan;
223
224 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
225 let time = Instant::now();
226
227 let GlobalMirPlan {
228 mut df_desc,
229 df_meta,
230 } = plan;
231
232 for build in df_desc.objects_to_build.iter_mut() {
234 normalize_lets(&mut build.plan.0, &self.config.features)?
235 }
236
237 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
241
242 trace_plan(&df_desc);
244
245 self.duration += time.elapsed();
246 self.metrics
247 .observe_e2e_optimization_time("index", self.duration);
248
249 Ok(GlobalLirPlan { df_desc, df_meta })
251 }
252}
253
254impl GlobalLirPlan {
255 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
257 (self.df_desc, self.df_meta)
258 }
259}