mz_adapter/coord/sequencer/inner/
explain_timestamp.rs1use std::sync::Arc;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_adapter_types::connection::ConnectionId;
15use mz_controller_types::ClusterId;
16use mz_expr::CollectionPlan;
17use mz_ore::instrument;
18use mz_repr::explain::ExplainFormat;
19use mz_repr::{Datum, Row};
20use mz_sql::plan::{self};
21use mz_sql::session::metadata::SessionMetadata;
22use tracing::{Instrument, Span};
23
24use crate::coord::sequencer::inner::return_if_err;
25use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
26use crate::coord::{
27 Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
28 ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
29};
30use crate::error::AdapterError;
31use crate::optimize::{self, Optimize};
32use crate::session::{RequireLinearization, Session};
33use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
34
35impl Staged for ExplainTimestampStage {
36 type Ctx = ExecuteContext;
37
38 fn validity(&mut self) -> &mut PlanValidity {
39 match self {
40 ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
41 ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
42 ExplainTimestampStage::Finish(stage) => &mut stage.validity,
43 }
44 }
45
46 async fn stage(
47 self,
48 coord: &mut Coordinator,
49 ctx: &mut ExecuteContext,
50 ) -> Result<StageResult<Box<Self>>, AdapterError> {
51 match self {
52 ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
53 ExplainTimestampStage::RealTimeRecency(stage) => {
54 coord
55 .explain_timestamp_real_time_recency(ctx.session(), stage)
56 .await
57 }
58 ExplainTimestampStage::Finish(stage) => {
59 coord
60 .explain_timestamp_finish(ctx.session_mut(), stage)
61 .await
62 }
63 }
64 }
65
66 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67 Message::ExplainTimestampStageReady {
68 ctx,
69 span,
70 stage: self,
71 }
72 }
73
74 fn cancel_enabled(&self) -> bool {
75 true
76 }
77}
78
79impl Coordinator {
80 #[instrument]
81 pub async fn sequence_explain_timestamp(
82 &mut self,
83 ctx: ExecuteContext,
84 plan: plan::ExplainTimestampPlan,
85 target_cluster: TargetCluster,
86 ) {
87 let stage = return_if_err!(
88 self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
89 ctx
90 );
91 self.sequence_staged(ctx, Span::current(), stage).await;
92 }
93
94 #[instrument]
95 fn explain_timestamp_validity(
96 &self,
97 session: &Session,
98 plan: plan::ExplainTimestampPlan,
99 target_cluster: TargetCluster,
100 ) -> Result<ExplainTimestampStage, AdapterError> {
101 let cluster = self
102 .catalog()
103 .resolve_target_cluster(target_cluster, session)?;
104 let cluster_id = cluster.id;
105 let dependencies = plan
106 .raw_plan
107 .depends_on()
108 .into_iter()
109 .map(|id| self.catalog().resolve_item_id(&id))
110 .collect();
111 let validity = PlanValidity::new(
112 self.catalog().transient_revision(),
113 dependencies,
114 Some(cluster_id),
115 None,
116 session.role_metadata().clone(),
117 );
118 Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
119 validity,
120 plan,
121 cluster_id,
122 }))
123 }
124
125 #[instrument]
126 fn explain_timestamp_optimize(
127 &self,
128 ExplainTimestampOptimize {
129 validity,
130 plan,
131 cluster_id,
132 }: ExplainTimestampOptimize,
133 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
134 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
136
137 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
138
139 let span = Span::current();
140 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
141 || "optimize explain timestamp",
142 move || {
143 span.in_scope(|| {
144 let plan::ExplainTimestampPlan {
145 format,
146 raw_plan,
147 when,
148 } = plan;
149
150 let optimized_plan = optimizer.optimize(raw_plan)?;
152
153 let stage =
154 ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
155 validity,
156 format,
157 optimized_plan,
158 cluster_id,
159 when,
160 });
161 Ok(Box::new(stage))
162 })
163 },
164 )))
165 }
166
167 #[instrument]
168 async fn explain_timestamp_real_time_recency(
169 &self,
170 session: &Session,
171 ExplainTimestampRealTimeRecency {
172 validity,
173 format,
174 optimized_plan,
175 cluster_id,
176 when,
177 }: ExplainTimestampRealTimeRecency,
178 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
179 let source_ids = optimized_plan.depends_on();
180 let fut = self
181 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
182 .await?;
183
184 match fut {
185 Some(fut) => {
186 let catalog = Arc::clone(&self.catalog);
187 let span = Span::current();
188 Ok(StageResult::Handle(mz_ore::task::spawn(
189 || "explain timestamp real time recency",
190 async move {
191 let real_time_recency_ts =
192 Coordinator::await_real_time_recent_timestamp(catalog, fut).await?;
193 let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
194 validity,
195 format,
196 optimized_plan,
197 cluster_id,
198 source_ids,
199 when,
200 real_time_recency_ts: Some(real_time_recency_ts),
201 });
202 Ok(Box::new(stage))
203 }
204 .instrument(span),
205 )))
206 }
207 None => Ok(StageResult::Immediate(Box::new(
208 ExplainTimestampStage::Finish(ExplainTimestampFinish {
209 validity,
210 format,
211 optimized_plan,
212 cluster_id,
213 source_ids,
214 when,
215 real_time_recency_ts: None,
216 }),
217 ))),
218 }
219 }
220
221 pub(crate) fn explain_timestamp(
222 &self,
223 conn_id: &ConnectionId,
224 session_wall_time: DateTime<Utc>,
225 cluster_id: ClusterId,
226 id_bundle: &CollectionIdBundle,
227 determination: TimestampDetermination,
228 ) -> TimestampExplanation {
229 let mut sources = Vec::new();
230 {
231 let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
232 let frontiers = self
233 .controller
234 .storage
235 .collections_frontiers(storage_ids)
236 .expect("missing collection");
237
238 for (id, since, upper) in frontiers {
239 let name = self
240 .catalog()
241 .try_get_entry_by_global_id(&id)
242 .map(|item| item.name())
243 .map(|name| {
244 self.catalog()
245 .resolve_full_name(name, Some(conn_id))
246 .to_string()
247 })
248 .unwrap_or_else(|| id.to_string());
249 sources.push(TimestampSource {
250 name: format!("{name} ({id}, storage)"),
251 read_frontier: since.elements().to_vec(),
252 write_frontier: upper.elements().to_vec(),
253 });
254 }
255 }
256 {
257 if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
258 let catalog = self.catalog();
259 for id in compute_ids {
260 let frontiers = self
261 .controller
262 .compute
263 .collection_frontiers(*id, Some(cluster_id))
264 .expect("id does not exist");
265 let name = catalog
266 .try_get_entry_by_global_id(id)
267 .map(|item| item.name())
268 .map(|name| catalog.resolve_full_name(name, Some(conn_id)).to_string())
269 .unwrap_or_else(|| id.to_string());
270 sources.push(TimestampSource {
271 name: format!("{name} ({id}, compute)"),
272 read_frontier: frontiers.read_frontier.to_vec(),
273 write_frontier: frontiers.write_frontier.to_vec(),
274 });
275 }
276 }
277 }
278 let respond_immediately = determination.respond_immediately();
279 TimestampExplanation {
280 determination,
281 sources,
282 session_wall_time,
283 respond_immediately,
284 }
285 }
286
287 #[instrument]
288 async fn explain_timestamp_finish(
289 &mut self,
290 session: &mut Session,
291 ExplainTimestampFinish {
292 validity: _,
293 format,
294 optimized_plan,
295 cluster_id,
296 source_ids,
297 when,
298 real_time_recency_ts,
299 }: ExplainTimestampFinish,
300 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
301 let id_bundle = self
302 .index_oracle(cluster_id)
303 .sufficient_collections(source_ids.iter().copied());
304
305 let is_json = match format {
306 ExplainFormat::Text => false,
307 ExplainFormat::Json => true,
308 ExplainFormat::Dot => {
309 return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
310 }
311 };
312 let mut timeline_context = self
313 .catalog()
314 .validate_timeline_context(source_ids.iter().copied())?;
315 if matches!(timeline_context, TimelineContext::TimestampIndependent)
316 && optimized_plan.contains_temporal()
317 {
318 timeline_context = TimelineContext::TimestampDependent;
322 }
323
324 let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
325
326 let determination = self.sequence_peek_timestamp(
327 session,
328 &when,
329 cluster_id,
330 timeline_context,
331 oracle_read_ts,
332 &id_bundle,
333 &source_ids,
334 real_time_recency_ts,
335 RequireLinearization::NotRequired,
336 )?;
337 let explanation = self.explain_timestamp(
338 session.conn_id(),
339 session.pcx().wall_time,
340 cluster_id,
341 &id_bundle,
342 determination,
343 );
344
345 let s = if is_json {
346 serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
347 } else {
348 explanation.to_string()
349 };
350 let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
351 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
352 }
353}