mz_adapter/coord/sequencer/inner/
explain_timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use chrono::{DateTime, Utc};
11use itertools::Itertools;
12use mz_adapter_types::connection::ConnectionId;
13use mz_controller_types::ClusterId;
14use mz_expr::CollectionPlan;
15use mz_ore::instrument;
16use mz_repr::explain::ExplainFormat;
17use mz_repr::{Datum, Row};
18use mz_sql::plan::{self};
19use mz_sql::session::metadata::SessionMetadata;
20use tracing::{Instrument, Span};
21
22use crate::coord::sequencer::inner::return_if_err;
23use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
24use crate::coord::{
25    Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
26    ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
27};
28use crate::error::AdapterError;
29use crate::optimize::{self, Optimize};
30use crate::session::{RequireLinearization, Session};
31use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
32
33impl Staged for ExplainTimestampStage {
34    type Ctx = ExecuteContext;
35
36    fn validity(&mut self) -> &mut PlanValidity {
37        match self {
38            ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
39            ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
40            ExplainTimestampStage::Finish(stage) => &mut stage.validity,
41        }
42    }
43
44    async fn stage(
45        self,
46        coord: &mut Coordinator,
47        ctx: &mut ExecuteContext,
48    ) -> Result<StageResult<Box<Self>>, AdapterError> {
49        match self {
50            ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
51            ExplainTimestampStage::RealTimeRecency(stage) => {
52                coord
53                    .explain_timestamp_real_time_recency(ctx.session(), stage)
54                    .await
55            }
56            ExplainTimestampStage::Finish(stage) => {
57                coord
58                    .explain_timestamp_finish(ctx.session_mut(), stage)
59                    .await
60            }
61        }
62    }
63
64    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
65        Message::ExplainTimestampStageReady {
66            ctx,
67            span,
68            stage: self,
69        }
70    }
71
72    fn cancel_enabled(&self) -> bool {
73        true
74    }
75}
76
77impl Coordinator {
78    #[instrument]
79    pub async fn sequence_explain_timestamp(
80        &mut self,
81        ctx: ExecuteContext,
82        plan: plan::ExplainTimestampPlan,
83        target_cluster: TargetCluster,
84    ) {
85        let stage = return_if_err!(
86            self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
87            ctx
88        );
89        self.sequence_staged(ctx, Span::current(), stage).await;
90    }
91
92    #[instrument]
93    fn explain_timestamp_validity(
94        &self,
95        session: &Session,
96        plan: plan::ExplainTimestampPlan,
97        target_cluster: TargetCluster,
98    ) -> Result<ExplainTimestampStage, AdapterError> {
99        let cluster = self
100            .catalog()
101            .resolve_target_cluster(target_cluster, session)?;
102        let cluster_id = cluster.id;
103        let dependencies = plan
104            .raw_plan
105            .depends_on()
106            .into_iter()
107            .map(|id| self.catalog().resolve_item_id(&id))
108            .collect();
109        let validity = PlanValidity::new(
110            self.catalog().transient_revision(),
111            dependencies,
112            Some(cluster_id),
113            None,
114            session.role_metadata().clone(),
115        );
116        Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
117            validity,
118            plan,
119            cluster_id,
120        }))
121    }
122
123    #[instrument]
124    fn explain_timestamp_optimize(
125        &self,
126        ExplainTimestampOptimize {
127            validity,
128            plan,
129            cluster_id,
130        }: ExplainTimestampOptimize,
131    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
132        // Collect optimizer parameters.
133        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
134
135        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
136
137        let span = Span::current();
138        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
139            || "optimize explain timestamp",
140            move || {
141                span.in_scope(|| {
142                    let plan::ExplainTimestampPlan {
143                        format,
144                        raw_plan,
145                        when,
146                    } = plan;
147
148                    // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
149                    let optimized_plan = optimizer.optimize(raw_plan)?;
150
151                    let stage =
152                        ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
153                            validity,
154                            format,
155                            optimized_plan,
156                            cluster_id,
157                            when,
158                        });
159                    Ok(Box::new(stage))
160                })
161            },
162        )))
163    }
164
165    #[instrument]
166    async fn explain_timestamp_real_time_recency(
167        &self,
168        session: &Session,
169        ExplainTimestampRealTimeRecency {
170            validity,
171            format,
172            optimized_plan,
173            cluster_id,
174            when,
175        }: ExplainTimestampRealTimeRecency,
176    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
177        let source_ids = optimized_plan.depends_on();
178        let fut = self
179            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
180            .await?;
181
182        match fut {
183            Some(fut) => {
184                let span = Span::current();
185                Ok(StageResult::Handle(mz_ore::task::spawn(
186                    || "explain timestamp real time recency",
187                    async move {
188                        let real_time_recency_ts = fut.await?;
189                        let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
190                            validity,
191                            format,
192                            optimized_plan,
193                            cluster_id,
194                            source_ids,
195                            when,
196                            real_time_recency_ts: Some(real_time_recency_ts),
197                        });
198                        Ok(Box::new(stage))
199                    }
200                    .instrument(span),
201                )))
202            }
203            None => Ok(StageResult::Immediate(Box::new(
204                ExplainTimestampStage::Finish(ExplainTimestampFinish {
205                    validity,
206                    format,
207                    optimized_plan,
208                    cluster_id,
209                    source_ids,
210                    when,
211                    real_time_recency_ts: None,
212                }),
213            ))),
214        }
215    }
216
217    pub(crate) fn explain_timestamp(
218        &self,
219        conn_id: &ConnectionId,
220        session_wall_time: DateTime<Utc>,
221        cluster_id: ClusterId,
222        id_bundle: &CollectionIdBundle,
223        determination: TimestampDetermination<mz_repr::Timestamp>,
224    ) -> TimestampExplanation<mz_repr::Timestamp> {
225        let mut sources = Vec::new();
226        {
227            let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
228            let frontiers = self
229                .controller
230                .storage
231                .collections_frontiers(storage_ids)
232                .expect("missing collection");
233
234            for (id, since, upper) in frontiers {
235                let name = self
236                    .catalog()
237                    .try_get_entry_by_global_id(&id)
238                    .map(|item| item.name())
239                    .map(|name| {
240                        self.catalog()
241                            .resolve_full_name(name, Some(conn_id))
242                            .to_string()
243                    })
244                    .unwrap_or_else(|| id.to_string());
245                sources.push(TimestampSource {
246                    name: format!("{name} ({id}, storage)"),
247                    read_frontier: since.elements().to_vec(),
248                    write_frontier: upper.elements().to_vec(),
249                });
250            }
251        }
252        {
253            if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
254                let catalog = self.catalog();
255                for id in compute_ids {
256                    let frontiers = self
257                        .controller
258                        .compute
259                        .collection_frontiers(*id, Some(cluster_id))
260                        .expect("id does not exist");
261                    let name = catalog
262                        .try_get_entry_by_global_id(id)
263                        .map(|item| item.name())
264                        .map(|name| catalog.resolve_full_name(name, Some(conn_id)).to_string())
265                        .unwrap_or_else(|| id.to_string());
266                    sources.push(TimestampSource {
267                        name: format!("{name} ({id}, compute)"),
268                        read_frontier: frontiers.read_frontier.to_vec(),
269                        write_frontier: frontiers.write_frontier.to_vec(),
270                    });
271                }
272            }
273        }
274        let respond_immediately = determination.respond_immediately();
275        TimestampExplanation {
276            determination,
277            sources,
278            session_wall_time,
279            respond_immediately,
280        }
281    }
282
283    #[instrument]
284    async fn explain_timestamp_finish(
285        &mut self,
286        session: &mut Session,
287        ExplainTimestampFinish {
288            validity: _,
289            format,
290            optimized_plan,
291            cluster_id,
292            source_ids,
293            when,
294            real_time_recency_ts,
295        }: ExplainTimestampFinish,
296    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
297        let id_bundle = self
298            .index_oracle(cluster_id)
299            .sufficient_collections(source_ids.iter().copied());
300
301        let is_json = match format {
302            ExplainFormat::Text => false,
303            ExplainFormat::Json => true,
304            ExplainFormat::Dot => {
305                return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
306            }
307        };
308        let mut timeline_context = self
309            .catalog()
310            .validate_timeline_context(source_ids.iter().copied())?;
311        if matches!(timeline_context, TimelineContext::TimestampIndependent)
312            && optimized_plan.contains_temporal()
313        {
314            // If the source IDs are timestamp independent but the query contains temporal functions,
315            // then the timeline context needs to be upgraded to timestamp dependent. This is
316            // required because `source_ids` doesn't contain functions.
317            timeline_context = TimelineContext::TimestampDependent;
318        }
319
320        let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
321
322        let determination = self.sequence_peek_timestamp(
323            session,
324            &when,
325            cluster_id,
326            timeline_context,
327            oracle_read_ts,
328            &id_bundle,
329            &source_ids,
330            real_time_recency_ts,
331            RequireLinearization::NotRequired,
332        )?;
333        let explanation = self.explain_timestamp(
334            session.conn_id(),
335            session.pcx().wall_time,
336            cluster_id,
337            &id_bundle,
338            determination,
339        );
340
341        let s = if is_json {
342            serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
343        } else {
344            explanation.to_string()
345        };
346        let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
347        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
348    }
349}