mz_adapter/coord/sequencer/inner/
explain_timestamp.rs1use itertools::Itertools;
11use mz_controller_types::ClusterId;
12use mz_expr::CollectionPlan;
13use mz_ore::instrument;
14use mz_repr::explain::ExplainFormat;
15use mz_repr::{Datum, Row};
16use mz_sql::plan::{self};
17use mz_sql::session::metadata::SessionMetadata;
18use tracing::{Instrument, Span};
19
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
22use crate::coord::{
23 Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
24 ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::{self, Optimize};
28use crate::session::{RequireLinearization, Session};
29use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
30
31impl Staged for ExplainTimestampStage {
32 type Ctx = ExecuteContext;
33
34 fn validity(&mut self) -> &mut PlanValidity {
35 match self {
36 ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
37 ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
38 ExplainTimestampStage::Finish(stage) => &mut stage.validity,
39 }
40 }
41
42 async fn stage(
43 self,
44 coord: &mut Coordinator,
45 ctx: &mut ExecuteContext,
46 ) -> Result<StageResult<Box<Self>>, AdapterError> {
47 match self {
48 ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
49 ExplainTimestampStage::RealTimeRecency(stage) => {
50 coord
51 .explain_timestamp_real_time_recency(ctx.session(), stage)
52 .await
53 }
54 ExplainTimestampStage::Finish(stage) => {
55 coord
56 .explain_timestamp_finish(ctx.session_mut(), stage)
57 .await
58 }
59 }
60 }
61
62 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
63 Message::ExplainTimestampStageReady {
64 ctx,
65 span,
66 stage: self,
67 }
68 }
69
70 fn cancel_enabled(&self) -> bool {
71 true
72 }
73}
74
75impl Coordinator {
76 #[instrument]
77 pub async fn sequence_explain_timestamp(
78 &mut self,
79 ctx: ExecuteContext,
80 plan: plan::ExplainTimestampPlan,
81 target_cluster: TargetCluster,
82 ) {
83 let stage = return_if_err!(
84 self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
85 ctx
86 );
87 self.sequence_staged(ctx, Span::current(), stage).await;
88 }
89
90 #[instrument]
91 fn explain_timestamp_validity(
92 &self,
93 session: &Session,
94 plan: plan::ExplainTimestampPlan,
95 target_cluster: TargetCluster,
96 ) -> Result<ExplainTimestampStage, AdapterError> {
97 let cluster = self
98 .catalog()
99 .resolve_target_cluster(target_cluster, session)?;
100 let cluster_id = cluster.id;
101 let dependencies = plan
102 .raw_plan
103 .depends_on()
104 .into_iter()
105 .map(|id| self.catalog().resolve_item_id(&id))
106 .collect();
107 let validity = PlanValidity::new(
108 self.catalog().transient_revision(),
109 dependencies,
110 Some(cluster_id),
111 None,
112 session.role_metadata().clone(),
113 );
114 Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
115 validity,
116 plan,
117 cluster_id,
118 }))
119 }
120
121 #[instrument]
122 fn explain_timestamp_optimize(
123 &self,
124 ExplainTimestampOptimize {
125 validity,
126 plan,
127 cluster_id,
128 }: ExplainTimestampOptimize,
129 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
130 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
132
133 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
134
135 let span = Span::current();
136 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
137 || "optimize explain timestamp",
138 move || {
139 span.in_scope(|| {
140 let plan::ExplainTimestampPlan {
141 format,
142 raw_plan,
143 when,
144 } = plan;
145
146 let optimized_plan = optimizer.optimize(raw_plan)?;
148
149 let stage =
150 ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
151 validity,
152 format,
153 optimized_plan,
154 cluster_id,
155 when,
156 });
157 Ok(Box::new(stage))
158 })
159 },
160 )))
161 }
162
163 #[instrument]
164 async fn explain_timestamp_real_time_recency(
165 &self,
166 session: &Session,
167 ExplainTimestampRealTimeRecency {
168 validity,
169 format,
170 optimized_plan,
171 cluster_id,
172 when,
173 }: ExplainTimestampRealTimeRecency,
174 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
175 let source_ids = optimized_plan.depends_on();
176 let source_items: Vec<_> = source_ids
177 .iter()
178 .map(|gid| self.catalog().resolve_item_id(gid))
179 .collect();
180 let fut = self
181 .determine_real_time_recent_timestamp(session, source_items.into_iter())
182 .await?;
183
184 match fut {
185 Some(fut) => {
186 let span = Span::current();
187 Ok(StageResult::Handle(mz_ore::task::spawn(
188 || "explain timestamp real time recency",
189 async move {
190 let real_time_recency_ts = fut.await?;
191 let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
192 validity,
193 format,
194 optimized_plan,
195 cluster_id,
196 source_ids,
197 when,
198 real_time_recency_ts: Some(real_time_recency_ts),
199 });
200 Ok(Box::new(stage))
201 }
202 .instrument(span),
203 )))
204 }
205 None => Ok(StageResult::Immediate(Box::new(
206 ExplainTimestampStage::Finish(ExplainTimestampFinish {
207 validity,
208 format,
209 optimized_plan,
210 cluster_id,
211 source_ids,
212 when,
213 real_time_recency_ts: None,
214 }),
215 ))),
216 }
217 }
218
219 pub(crate) fn explain_timestamp(
220 &self,
221 session: &Session,
222 cluster_id: ClusterId,
223 id_bundle: &CollectionIdBundle,
224 determination: TimestampDetermination<mz_repr::Timestamp>,
225 ) -> TimestampExplanation<mz_repr::Timestamp> {
226 let mut sources = Vec::new();
227 {
228 let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
229 let frontiers = self
230 .controller
231 .storage
232 .collections_frontiers(storage_ids)
233 .expect("missing collection");
234
235 for (id, since, upper) in frontiers {
236 let name = self
237 .catalog()
238 .try_get_entry_by_global_id(&id)
239 .map(|item| item.name())
240 .map(|name| {
241 self.catalog()
242 .resolve_full_name(name, Some(session.conn_id()))
243 .to_string()
244 })
245 .unwrap_or_else(|| id.to_string());
246 sources.push(TimestampSource {
247 name: format!("{name} ({id}, storage)"),
248 read_frontier: since.elements().to_vec(),
249 write_frontier: upper.elements().to_vec(),
250 });
251 }
252 }
253 {
254 if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
255 let catalog = self.catalog();
256 for id in compute_ids {
257 let frontiers = self
258 .controller
259 .compute
260 .collection_frontiers(*id, Some(cluster_id))
261 .expect("id does not exist");
262 let name = catalog
263 .try_get_entry_by_global_id(id)
264 .map(|item| item.name())
265 .map(|name| {
266 catalog
267 .resolve_full_name(name, Some(session.conn_id()))
268 .to_string()
269 })
270 .unwrap_or_else(|| id.to_string());
271 sources.push(TimestampSource {
272 name: format!("{name} ({id}, compute)"),
273 read_frontier: frontiers.read_frontier.to_vec(),
274 write_frontier: frontiers.write_frontier.to_vec(),
275 });
276 }
277 }
278 }
279 let respond_immediately = determination.respond_immediately();
280 TimestampExplanation {
281 determination,
282 sources,
283 session_wall_time: session.pcx().wall_time,
284 respond_immediately,
285 }
286 }
287
288 #[instrument]
289 async fn explain_timestamp_finish(
290 &mut self,
291 session: &mut Session,
292 ExplainTimestampFinish {
293 validity: _,
294 format,
295 optimized_plan,
296 cluster_id,
297 source_ids,
298 when,
299 real_time_recency_ts,
300 }: ExplainTimestampFinish,
301 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
302 let id_bundle = self
303 .index_oracle(cluster_id)
304 .sufficient_collections(source_ids.iter().copied());
305
306 let is_json = match format {
307 ExplainFormat::Text => false,
308 ExplainFormat::Json => true,
309 ExplainFormat::Dot => {
310 return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
311 }
312 };
313 let mut timeline_context = self
314 .catalog()
315 .validate_timeline_context(source_ids.iter().copied())?;
316 if matches!(timeline_context, TimelineContext::TimestampIndependent)
317 && optimized_plan.contains_temporal()
318 {
319 timeline_context = TimelineContext::TimestampDependent;
323 }
324
325 let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
326
327 let determination = self.sequence_peek_timestamp(
328 session,
329 &when,
330 cluster_id,
331 timeline_context,
332 oracle_read_ts,
333 &id_bundle,
334 &source_ids,
335 real_time_recency_ts,
336 RequireLinearization::NotRequired,
337 )?;
338 let explanation = self.explain_timestamp(session, cluster_id, &id_bundle, determination);
339
340 let s = if is_json {
341 serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
342 } else {
343 explanation.to_string()
344 };
345 let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
346 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
347 }
348}