1use std::sync::Arc;
29use std::time::{Duration, Instant};
30
31use mz_compute_types::dataflows::IndexDesc;
32use mz_compute_types::plan::Plan;
33use mz_repr::GlobalId;
34use mz_repr::explain::trace_plan;
35use mz_sql::names::QualifiedItemName;
36use mz_sql::optimizer_metrics::OptimizerMetrics;
37use mz_transform::TransformCtx;
38use mz_transform::dataflow::DataflowMetainfo;
39use mz_transform::normalize_lets::normalize_lets;
40use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty};
41use mz_transform::reprtypecheck::{
42 SharedContext as ReprTypecheckContext, empty_context as empty_repr_context,
43};
44
45use crate::optimize::dataflows::{
46 ComputeInstanceSnapshot, DataflowBuilder, ExprPrepStyle, prep_relation_expr, prep_scalar_expr,
47};
48use crate::optimize::{
49 LirDataflowDescription, MirDataflowDescription, Optimize, OptimizeMode, OptimizerCatalog,
50 OptimizerConfig, OptimizerError, trace_plan,
51};
52
53pub struct Optimizer {
54 repr_typecheck_ctx: ReprTypecheckContext,
56 catalog: Arc<dyn OptimizerCatalog>,
58 compute_instance: ComputeInstanceSnapshot,
60 exported_index_id: GlobalId,
62 config: OptimizerConfig,
64 metrics: OptimizerMetrics,
66 duration: Duration,
68}
69
70impl Optimizer {
71 pub fn new(
72 catalog: Arc<dyn OptimizerCatalog>,
73 compute_instance: ComputeInstanceSnapshot,
74 exported_index_id: GlobalId,
75 config: OptimizerConfig,
76 metrics: OptimizerMetrics,
77 ) -> Self {
78 Self {
79 repr_typecheck_ctx: empty_repr_context(),
80 catalog,
81 compute_instance,
82 exported_index_id,
83 config,
84 metrics,
85 duration: Default::default(),
86 }
87 }
88}
89
90pub struct Index {
92 name: QualifiedItemName,
93 on: GlobalId,
94 keys: Vec<mz_expr::MirScalarExpr>,
95}
96
97impl Index {
98 pub fn new(name: QualifiedItemName, on: GlobalId, keys: Vec<mz_expr::MirScalarExpr>) -> Self {
100 Self { name, on, keys }
101 }
102}
103
104#[derive(Clone, Debug)]
110pub struct GlobalMirPlan {
111 df_desc: MirDataflowDescription,
112 df_meta: DataflowMetainfo,
113}
114
115impl GlobalMirPlan {
116 pub fn df_desc(&self) -> &MirDataflowDescription {
117 &self.df_desc
118 }
119}
120
121#[derive(Clone, Debug)]
124pub struct GlobalLirPlan {
125 df_desc: LirDataflowDescription,
126 df_meta: DataflowMetainfo,
127}
128
129impl GlobalLirPlan {
130 pub fn df_desc(&self) -> &LirDataflowDescription {
131 &self.df_desc
132 }
133
134 pub fn df_meta(&self) -> &DataflowMetainfo {
135 &self.df_meta
136 }
137}
138
139impl Optimize<Index> for Optimizer {
140 type To = GlobalMirPlan;
141
142 fn optimize(&mut self, index: Index) -> Result<Self::To, OptimizerError> {
143 let time = Instant::now();
144
145 let on_entry = self.catalog.get_entry(&index.on);
146 let full_name = self
147 .catalog
148 .resolve_full_name(&index.name, on_entry.conn_id());
149 let on_desc = on_entry
150 .relation_desc()
151 .expect("can only create indexes on items with a valid description");
152
153 let mut df_builder = {
154 let compute = self.compute_instance.clone();
155 DataflowBuilder::new(&*self.catalog, compute).with_config(&self.config)
156 };
157 let mut df_desc = MirDataflowDescription::new(full_name.to_string());
158
159 df_builder.import_into_dataflow(&index.on, &mut df_desc, &self.config.features)?;
160 df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?;
161
162 let index_desc = IndexDesc {
163 on_id: index.on,
164 key: index.keys.clone(),
165 };
166 df_desc.export_index(self.exported_index_id, index_desc, on_desc.typ().clone());
167
168 let style = ExprPrepStyle::Maintained;
170 df_desc.visit_children(
171 |r| prep_relation_expr(r, style),
172 |s| prep_scalar_expr(s, style),
173 )?;
174
175 let mut df_meta = DataflowMetainfo::default();
177 let mut transform_ctx = TransformCtx::global(
178 &df_builder,
179 &mz_transform::EmptyStatisticsOracle, &self.config.features,
181 &self.repr_typecheck_ctx,
182 &mut df_meta,
183 Some(&mut self.metrics),
184 );
185 mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
187
188 if self.config.mode == OptimizeMode::Explain {
189 trace_plan!(at: "global", &df_meta.used_indexes(&df_desc));
191 }
192
193 if index.keys.is_empty() {
195 df_meta.push_optimizer_notice_dedup(IndexKeyEmpty);
196 }
197
198 for (index_id, idx) in df_builder
201 .indexes_on(index.on)
202 .filter(|(_id, idx)| idx.keys.as_ref() == &index.keys)
203 {
204 df_meta.push_optimizer_notice_dedup(IndexAlreadyExists {
205 index_id,
206 index_key: idx.keys.to_vec(),
207 index_on_id: idx.on,
208 exported_index_id: self.exported_index_id,
209 });
210 }
211
212 self.duration += time.elapsed();
213
214 Ok(GlobalMirPlan { df_desc, df_meta })
216 }
217}
218
219impl Optimize<GlobalMirPlan> for Optimizer {
220 type To = GlobalLirPlan;
221
222 fn optimize(&mut self, plan: GlobalMirPlan) -> Result<Self::To, OptimizerError> {
223 let time = Instant::now();
224
225 let GlobalMirPlan {
226 mut df_desc,
227 df_meta,
228 } = plan;
229
230 for build in df_desc.objects_to_build.iter_mut() {
232 normalize_lets(&mut build.plan.0, &self.config.features)?
233 }
234
235 let df_desc = Plan::finalize_dataflow(df_desc, &self.config.features)?;
239
240 trace_plan(&df_desc);
242
243 self.duration += time.elapsed();
244 self.metrics
245 .observe_e2e_optimization_time("index", self.duration);
246
247 Ok(GlobalLirPlan { df_desc, df_meta })
249 }
250}
251
252impl GlobalLirPlan {
253 pub fn unapply(self) -> (LirDataflowDescription, DataflowMetainfo) {
255 (self.df_desc, self.df_meta)
256 }
257}