mz_adapter/coord/sequencer/inner/
explain_timestamp.rs1use itertools::Itertools;
11use mz_controller_types::ClusterId;
12use mz_expr::CollectionPlan;
13use mz_ore::instrument;
14use mz_repr::explain::ExplainFormat;
15use mz_repr::{Datum, Row};
16use mz_sql::plan::{self};
17use mz_sql::session::metadata::SessionMetadata;
18use tracing::{Instrument, Span};
19
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
22use crate::coord::{
23 Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
24 ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::{self, Optimize};
28use crate::session::{RequireLinearization, Session};
29use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
30
31impl Staged for ExplainTimestampStage {
32 type Ctx = ExecuteContext;
33
34 fn validity(&mut self) -> &mut PlanValidity {
35 match self {
36 ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
37 ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
38 ExplainTimestampStage::Finish(stage) => &mut stage.validity,
39 }
40 }
41
42 async fn stage(
43 self,
44 coord: &mut Coordinator,
45 ctx: &mut ExecuteContext,
46 ) -> Result<StageResult<Box<Self>>, AdapterError> {
47 match self {
48 ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
49 ExplainTimestampStage::RealTimeRecency(stage) => {
50 coord
51 .explain_timestamp_real_time_recency(ctx.session(), stage)
52 .await
53 }
54 ExplainTimestampStage::Finish(stage) => {
55 coord
56 .explain_timestamp_finish(ctx.session_mut(), stage)
57 .await
58 }
59 }
60 }
61
62 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
63 Message::ExplainTimestampStageReady {
64 ctx,
65 span,
66 stage: self,
67 }
68 }
69
70 fn cancel_enabled(&self) -> bool {
71 true
72 }
73}
74
75impl Coordinator {
76 #[instrument]
77 pub async fn sequence_explain_timestamp(
78 &mut self,
79 ctx: ExecuteContext,
80 plan: plan::ExplainTimestampPlan,
81 target_cluster: TargetCluster,
82 ) {
83 let stage = return_if_err!(
84 self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
85 ctx
86 );
87 self.sequence_staged(ctx, Span::current(), stage).await;
88 }
89
90 #[instrument]
91 fn explain_timestamp_validity(
92 &self,
93 session: &Session,
94 plan: plan::ExplainTimestampPlan,
95 target_cluster: TargetCluster,
96 ) -> Result<ExplainTimestampStage, AdapterError> {
97 let cluster = self
98 .catalog()
99 .resolve_target_cluster(target_cluster, session)?;
100 let cluster_id = cluster.id;
101 let dependencies = plan
102 .raw_plan
103 .depends_on()
104 .into_iter()
105 .map(|id| self.catalog().resolve_item_id(&id))
106 .collect();
107 let validity = PlanValidity::new(
108 self.catalog().transient_revision(),
109 dependencies,
110 Some(cluster_id),
111 None,
112 session.role_metadata().clone(),
113 );
114 Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
115 validity,
116 plan,
117 cluster_id,
118 }))
119 }
120
121 #[instrument]
122 fn explain_timestamp_optimize(
123 &self,
124 ExplainTimestampOptimize {
125 validity,
126 plan,
127 cluster_id,
128 }: ExplainTimestampOptimize,
129 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
130 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
132
133 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
134
135 let span = Span::current();
136 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
137 || "optimize explain timestamp",
138 move || {
139 span.in_scope(|| {
140 let plan::ExplainTimestampPlan {
141 format,
142 raw_plan,
143 when,
144 } = plan;
145
146 let optimized_plan = optimizer.optimize(raw_plan)?;
148
149 let stage =
150 ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
151 validity,
152 format,
153 optimized_plan,
154 cluster_id,
155 when,
156 });
157 Ok(Box::new(stage))
158 })
159 },
160 )))
161 }
162
163 #[instrument]
164 async fn explain_timestamp_real_time_recency(
165 &self,
166 session: &Session,
167 ExplainTimestampRealTimeRecency {
168 validity,
169 format,
170 optimized_plan,
171 cluster_id,
172 when,
173 }: ExplainTimestampRealTimeRecency,
174 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
175 let source_ids = optimized_plan.depends_on();
176 let fut = self
177 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
178 .await?;
179
180 match fut {
181 Some(fut) => {
182 let span = Span::current();
183 Ok(StageResult::Handle(mz_ore::task::spawn(
184 || "explain timestamp real time recency",
185 async move {
186 let real_time_recency_ts = fut.await?;
187 let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
188 validity,
189 format,
190 optimized_plan,
191 cluster_id,
192 source_ids,
193 when,
194 real_time_recency_ts: Some(real_time_recency_ts),
195 });
196 Ok(Box::new(stage))
197 }
198 .instrument(span),
199 )))
200 }
201 None => Ok(StageResult::Immediate(Box::new(
202 ExplainTimestampStage::Finish(ExplainTimestampFinish {
203 validity,
204 format,
205 optimized_plan,
206 cluster_id,
207 source_ids,
208 when,
209 real_time_recency_ts: None,
210 }),
211 ))),
212 }
213 }
214
215 pub(crate) fn explain_timestamp(
216 &self,
217 session: &Session,
218 cluster_id: ClusterId,
219 id_bundle: &CollectionIdBundle,
220 determination: TimestampDetermination<mz_repr::Timestamp>,
221 ) -> TimestampExplanation<mz_repr::Timestamp> {
222 let mut sources = Vec::new();
223 {
224 let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
225 let frontiers = self
226 .controller
227 .storage
228 .collections_frontiers(storage_ids)
229 .expect("missing collection");
230
231 for (id, since, upper) in frontiers {
232 let name = self
233 .catalog()
234 .try_get_entry_by_global_id(&id)
235 .map(|item| item.name())
236 .map(|name| {
237 self.catalog()
238 .resolve_full_name(name, Some(session.conn_id()))
239 .to_string()
240 })
241 .unwrap_or_else(|| id.to_string());
242 sources.push(TimestampSource {
243 name: format!("{name} ({id}, storage)"),
244 read_frontier: since.elements().to_vec(),
245 write_frontier: upper.elements().to_vec(),
246 });
247 }
248 }
249 {
250 if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
251 let catalog = self.catalog();
252 for id in compute_ids {
253 let frontiers = self
254 .controller
255 .compute
256 .collection_frontiers(*id, Some(cluster_id))
257 .expect("id does not exist");
258 let name = catalog
259 .try_get_entry_by_global_id(id)
260 .map(|item| item.name())
261 .map(|name| {
262 catalog
263 .resolve_full_name(name, Some(session.conn_id()))
264 .to_string()
265 })
266 .unwrap_or_else(|| id.to_string());
267 sources.push(TimestampSource {
268 name: format!("{name} ({id}, compute)"),
269 read_frontier: frontiers.read_frontier.to_vec(),
270 write_frontier: frontiers.write_frontier.to_vec(),
271 });
272 }
273 }
274 }
275 let respond_immediately = determination.respond_immediately();
276 TimestampExplanation {
277 determination,
278 sources,
279 session_wall_time: session.pcx().wall_time,
280 respond_immediately,
281 }
282 }
283
284 #[instrument]
285 async fn explain_timestamp_finish(
286 &mut self,
287 session: &mut Session,
288 ExplainTimestampFinish {
289 validity: _,
290 format,
291 optimized_plan,
292 cluster_id,
293 source_ids,
294 when,
295 real_time_recency_ts,
296 }: ExplainTimestampFinish,
297 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
298 let id_bundle = self
299 .index_oracle(cluster_id)
300 .sufficient_collections(source_ids.iter().copied());
301
302 let is_json = match format {
303 ExplainFormat::Text => false,
304 ExplainFormat::Json => true,
305 ExplainFormat::Dot => {
306 return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
307 }
308 };
309 let mut timeline_context = self
310 .catalog()
311 .validate_timeline_context(source_ids.iter().copied())?;
312 if matches!(timeline_context, TimelineContext::TimestampIndependent)
313 && optimized_plan.contains_temporal()
314 {
315 timeline_context = TimelineContext::TimestampDependent;
319 }
320
321 let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
322
323 let determination = self.sequence_peek_timestamp(
324 session,
325 &when,
326 cluster_id,
327 timeline_context,
328 oracle_read_ts,
329 &id_bundle,
330 &source_ids,
331 real_time_recency_ts,
332 RequireLinearization::NotRequired,
333 )?;
334 let explanation = self.explain_timestamp(session, cluster_id, &id_bundle, determination);
335
336 let s = if is_json {
337 serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
338 } else {
339 explanation.to_string()
340 };
341 let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
342 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
343 }
344}