mz_adapter/coord/sequencer/inner/
explain_timestamp.rs1use chrono::{DateTime, Utc};
11use itertools::Itertools;
12use mz_adapter_types::connection::ConnectionId;
13use mz_controller_types::ClusterId;
14use mz_expr::CollectionPlan;
15use mz_ore::instrument;
16use mz_repr::explain::ExplainFormat;
17use mz_repr::{Datum, Row};
18use mz_sql::plan::{self};
19use mz_sql::session::metadata::SessionMetadata;
20use tracing::{Instrument, Span};
21
22use crate::coord::sequencer::inner::return_if_err;
23use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
24use crate::coord::{
25 Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
26 ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
27};
28use crate::error::AdapterError;
29use crate::optimize::{self, Optimize};
30use crate::session::{RequireLinearization, Session};
31use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
32
33impl Staged for ExplainTimestampStage {
34 type Ctx = ExecuteContext;
35
36 fn validity(&mut self) -> &mut PlanValidity {
37 match self {
38 ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
39 ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
40 ExplainTimestampStage::Finish(stage) => &mut stage.validity,
41 }
42 }
43
44 async fn stage(
45 self,
46 coord: &mut Coordinator,
47 ctx: &mut ExecuteContext,
48 ) -> Result<StageResult<Box<Self>>, AdapterError> {
49 match self {
50 ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
51 ExplainTimestampStage::RealTimeRecency(stage) => {
52 coord
53 .explain_timestamp_real_time_recency(ctx.session(), stage)
54 .await
55 }
56 ExplainTimestampStage::Finish(stage) => {
57 coord
58 .explain_timestamp_finish(ctx.session_mut(), stage)
59 .await
60 }
61 }
62 }
63
64 fn message(self, ctx: ExecuteContext, span: Span) -> Message {
65 Message::ExplainTimestampStageReady {
66 ctx,
67 span,
68 stage: self,
69 }
70 }
71
72 fn cancel_enabled(&self) -> bool {
73 true
74 }
75}
76
77impl Coordinator {
78 #[instrument]
79 pub async fn sequence_explain_timestamp(
80 &mut self,
81 ctx: ExecuteContext,
82 plan: plan::ExplainTimestampPlan,
83 target_cluster: TargetCluster,
84 ) {
85 let stage = return_if_err!(
86 self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
87 ctx
88 );
89 self.sequence_staged(ctx, Span::current(), stage).await;
90 }
91
92 #[instrument]
93 fn explain_timestamp_validity(
94 &self,
95 session: &Session,
96 plan: plan::ExplainTimestampPlan,
97 target_cluster: TargetCluster,
98 ) -> Result<ExplainTimestampStage, AdapterError> {
99 let cluster = self
100 .catalog()
101 .resolve_target_cluster(target_cluster, session)?;
102 let cluster_id = cluster.id;
103 let dependencies = plan
104 .raw_plan
105 .depends_on()
106 .into_iter()
107 .map(|id| self.catalog().resolve_item_id(&id))
108 .collect();
109 let validity = PlanValidity::new(
110 self.catalog().transient_revision(),
111 dependencies,
112 Some(cluster_id),
113 None,
114 session.role_metadata().clone(),
115 );
116 Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
117 validity,
118 plan,
119 cluster_id,
120 }))
121 }
122
123 #[instrument]
124 fn explain_timestamp_optimize(
125 &self,
126 ExplainTimestampOptimize {
127 validity,
128 plan,
129 cluster_id,
130 }: ExplainTimestampOptimize,
131 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
132 let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
134
135 let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
136
137 let span = Span::current();
138 Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
139 || "optimize explain timestamp",
140 move || {
141 span.in_scope(|| {
142 let plan::ExplainTimestampPlan {
143 format,
144 raw_plan,
145 when,
146 } = plan;
147
148 let optimized_plan = optimizer.optimize(raw_plan)?;
150
151 let stage =
152 ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
153 validity,
154 format,
155 optimized_plan,
156 cluster_id,
157 when,
158 });
159 Ok(Box::new(stage))
160 })
161 },
162 )))
163 }
164
165 #[instrument]
166 async fn explain_timestamp_real_time_recency(
167 &self,
168 session: &Session,
169 ExplainTimestampRealTimeRecency {
170 validity,
171 format,
172 optimized_plan,
173 cluster_id,
174 when,
175 }: ExplainTimestampRealTimeRecency,
176 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
177 let source_ids = optimized_plan.depends_on();
178 let fut = self
179 .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
180 .await?;
181
182 match fut {
183 Some(fut) => {
184 let span = Span::current();
185 Ok(StageResult::Handle(mz_ore::task::spawn(
186 || "explain timestamp real time recency",
187 async move {
188 let real_time_recency_ts = fut.await?;
189 let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
190 validity,
191 format,
192 optimized_plan,
193 cluster_id,
194 source_ids,
195 when,
196 real_time_recency_ts: Some(real_time_recency_ts),
197 });
198 Ok(Box::new(stage))
199 }
200 .instrument(span),
201 )))
202 }
203 None => Ok(StageResult::Immediate(Box::new(
204 ExplainTimestampStage::Finish(ExplainTimestampFinish {
205 validity,
206 format,
207 optimized_plan,
208 cluster_id,
209 source_ids,
210 when,
211 real_time_recency_ts: None,
212 }),
213 ))),
214 }
215 }
216
217 pub(crate) fn explain_timestamp(
218 &self,
219 conn_id: &ConnectionId,
220 session_wall_time: DateTime<Utc>,
221 cluster_id: ClusterId,
222 id_bundle: &CollectionIdBundle,
223 determination: TimestampDetermination<mz_repr::Timestamp>,
224 ) -> TimestampExplanation<mz_repr::Timestamp> {
225 let mut sources = Vec::new();
226 {
227 let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
228 let frontiers = self
229 .controller
230 .storage
231 .collections_frontiers(storage_ids)
232 .expect("missing collection");
233
234 for (id, since, upper) in frontiers {
235 let name = self
236 .catalog()
237 .try_get_entry_by_global_id(&id)
238 .map(|item| item.name())
239 .map(|name| {
240 self.catalog()
241 .resolve_full_name(name, Some(conn_id))
242 .to_string()
243 })
244 .unwrap_or_else(|| id.to_string());
245 sources.push(TimestampSource {
246 name: format!("{name} ({id}, storage)"),
247 read_frontier: since.elements().to_vec(),
248 write_frontier: upper.elements().to_vec(),
249 });
250 }
251 }
252 {
253 if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
254 let catalog = self.catalog();
255 for id in compute_ids {
256 let frontiers = self
257 .controller
258 .compute
259 .collection_frontiers(*id, Some(cluster_id))
260 .expect("id does not exist");
261 let name = catalog
262 .try_get_entry_by_global_id(id)
263 .map(|item| item.name())
264 .map(|name| catalog.resolve_full_name(name, Some(conn_id)).to_string())
265 .unwrap_or_else(|| id.to_string());
266 sources.push(TimestampSource {
267 name: format!("{name} ({id}, compute)"),
268 read_frontier: frontiers.read_frontier.to_vec(),
269 write_frontier: frontiers.write_frontier.to_vec(),
270 });
271 }
272 }
273 }
274 let respond_immediately = determination.respond_immediately();
275 TimestampExplanation {
276 determination,
277 sources,
278 session_wall_time,
279 respond_immediately,
280 }
281 }
282
283 #[instrument]
284 async fn explain_timestamp_finish(
285 &mut self,
286 session: &mut Session,
287 ExplainTimestampFinish {
288 validity: _,
289 format,
290 optimized_plan,
291 cluster_id,
292 source_ids,
293 when,
294 real_time_recency_ts,
295 }: ExplainTimestampFinish,
296 ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
297 let id_bundle = self
298 .index_oracle(cluster_id)
299 .sufficient_collections(source_ids.iter().copied());
300
301 let is_json = match format {
302 ExplainFormat::Text => false,
303 ExplainFormat::Json => true,
304 ExplainFormat::Dot => {
305 return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
306 }
307 };
308 let mut timeline_context = self
309 .catalog()
310 .validate_timeline_context(source_ids.iter().copied())?;
311 if matches!(timeline_context, TimelineContext::TimestampIndependent)
312 && optimized_plan.contains_temporal()
313 {
314 timeline_context = TimelineContext::TimestampDependent;
318 }
319
320 let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
321
322 let determination = self.sequence_peek_timestamp(
323 session,
324 &when,
325 cluster_id,
326 timeline_context,
327 oracle_read_ts,
328 &id_bundle,
329 &source_ids,
330 real_time_recency_ts,
331 RequireLinearization::NotRequired,
332 )?;
333 let explanation = self.explain_timestamp(
334 session.conn_id(),
335 session.pcx().wall_time,
336 cluster_id,
337 &id_bundle,
338 determination,
339 );
340
341 let s = if is_json {
342 serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
343 } else {
344 explanation.to_string()
345 };
346 let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
347 Ok(StageResult::Response(Self::send_immediate_rows(rows)))
348 }
349}