mz_adapter/coord/sequencer/inner/
explain_timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use itertools::Itertools;
11use mz_controller_types::ClusterId;
12use mz_expr::CollectionPlan;
13use mz_ore::instrument;
14use mz_repr::explain::ExplainFormat;
15use mz_repr::{Datum, Row};
16use mz_sql::plan::{self};
17use mz_sql::session::metadata::SessionMetadata;
18use tracing::{Instrument, Span};
19
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
22use crate::coord::{
23    Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
24    ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::{self, Optimize};
28use crate::session::{RequireLinearization, Session};
29use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
30
31impl Staged for ExplainTimestampStage {
32    type Ctx = ExecuteContext;
33
34    fn validity(&mut self) -> &mut PlanValidity {
35        match self {
36            ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
37            ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
38            ExplainTimestampStage::Finish(stage) => &mut stage.validity,
39        }
40    }
41
42    async fn stage(
43        self,
44        coord: &mut Coordinator,
45        ctx: &mut ExecuteContext,
46    ) -> Result<StageResult<Box<Self>>, AdapterError> {
47        match self {
48            ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
49            ExplainTimestampStage::RealTimeRecency(stage) => {
50                coord
51                    .explain_timestamp_real_time_recency(ctx.session(), stage)
52                    .await
53            }
54            ExplainTimestampStage::Finish(stage) => {
55                coord
56                    .explain_timestamp_finish(ctx.session_mut(), stage)
57                    .await
58            }
59        }
60    }
61
62    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
63        Message::ExplainTimestampStageReady {
64            ctx,
65            span,
66            stage: self,
67        }
68    }
69
70    fn cancel_enabled(&self) -> bool {
71        true
72    }
73}
74
75impl Coordinator {
76    #[instrument]
77    pub async fn sequence_explain_timestamp(
78        &mut self,
79        ctx: ExecuteContext,
80        plan: plan::ExplainTimestampPlan,
81        target_cluster: TargetCluster,
82    ) {
83        let stage = return_if_err!(
84            self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
85            ctx
86        );
87        self.sequence_staged(ctx, Span::current(), stage).await;
88    }
89
90    #[instrument]
91    fn explain_timestamp_validity(
92        &self,
93        session: &Session,
94        plan: plan::ExplainTimestampPlan,
95        target_cluster: TargetCluster,
96    ) -> Result<ExplainTimestampStage, AdapterError> {
97        let cluster = self
98            .catalog()
99            .resolve_target_cluster(target_cluster, session)?;
100        let cluster_id = cluster.id;
101        let dependencies = plan
102            .raw_plan
103            .depends_on()
104            .into_iter()
105            .map(|id| self.catalog().resolve_item_id(&id))
106            .collect();
107        let validity = PlanValidity::new(
108            self.catalog().transient_revision(),
109            dependencies,
110            Some(cluster_id),
111            None,
112            session.role_metadata().clone(),
113        );
114        Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
115            validity,
116            plan,
117            cluster_id,
118        }))
119    }
120
121    #[instrument]
122    fn explain_timestamp_optimize(
123        &self,
124        ExplainTimestampOptimize {
125            validity,
126            plan,
127            cluster_id,
128        }: ExplainTimestampOptimize,
129    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
130        // Collect optimizer parameters.
131        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
132
133        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
134
135        let span = Span::current();
136        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
137            || "optimize explain timestamp",
138            move || {
139                span.in_scope(|| {
140                    let plan::ExplainTimestampPlan {
141                        format,
142                        raw_plan,
143                        when,
144                    } = plan;
145
146                    // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
147                    let optimized_plan = optimizer.optimize(raw_plan)?;
148
149                    let stage =
150                        ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
151                            validity,
152                            format,
153                            optimized_plan,
154                            cluster_id,
155                            when,
156                        });
157                    Ok(Box::new(stage))
158                })
159            },
160        )))
161    }
162
163    #[instrument]
164    async fn explain_timestamp_real_time_recency(
165        &self,
166        session: &Session,
167        ExplainTimestampRealTimeRecency {
168            validity,
169            format,
170            optimized_plan,
171            cluster_id,
172            when,
173        }: ExplainTimestampRealTimeRecency,
174    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
175        let source_ids = optimized_plan.depends_on();
176        let fut = self
177            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
178            .await?;
179
180        match fut {
181            Some(fut) => {
182                let span = Span::current();
183                Ok(StageResult::Handle(mz_ore::task::spawn(
184                    || "explain timestamp real time recency",
185                    async move {
186                        let real_time_recency_ts = fut.await?;
187                        let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
188                            validity,
189                            format,
190                            optimized_plan,
191                            cluster_id,
192                            source_ids,
193                            when,
194                            real_time_recency_ts: Some(real_time_recency_ts),
195                        });
196                        Ok(Box::new(stage))
197                    }
198                    .instrument(span),
199                )))
200            }
201            None => Ok(StageResult::Immediate(Box::new(
202                ExplainTimestampStage::Finish(ExplainTimestampFinish {
203                    validity,
204                    format,
205                    optimized_plan,
206                    cluster_id,
207                    source_ids,
208                    when,
209                    real_time_recency_ts: None,
210                }),
211            ))),
212        }
213    }
214
215    pub(crate) fn explain_timestamp(
216        &self,
217        session: &Session,
218        cluster_id: ClusterId,
219        id_bundle: &CollectionIdBundle,
220        determination: TimestampDetermination<mz_repr::Timestamp>,
221    ) -> TimestampExplanation<mz_repr::Timestamp> {
222        let mut sources = Vec::new();
223        {
224            let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
225            let frontiers = self
226                .controller
227                .storage
228                .collections_frontiers(storage_ids)
229                .expect("missing collection");
230
231            for (id, since, upper) in frontiers {
232                let name = self
233                    .catalog()
234                    .try_get_entry_by_global_id(&id)
235                    .map(|item| item.name())
236                    .map(|name| {
237                        self.catalog()
238                            .resolve_full_name(name, Some(session.conn_id()))
239                            .to_string()
240                    })
241                    .unwrap_or_else(|| id.to_string());
242                sources.push(TimestampSource {
243                    name: format!("{name} ({id}, storage)"),
244                    read_frontier: since.elements().to_vec(),
245                    write_frontier: upper.elements().to_vec(),
246                });
247            }
248        }
249        {
250            if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
251                let catalog = self.catalog();
252                for id in compute_ids {
253                    let frontiers = self
254                        .controller
255                        .compute
256                        .collection_frontiers(*id, Some(cluster_id))
257                        .expect("id does not exist");
258                    let name = catalog
259                        .try_get_entry_by_global_id(id)
260                        .map(|item| item.name())
261                        .map(|name| {
262                            catalog
263                                .resolve_full_name(name, Some(session.conn_id()))
264                                .to_string()
265                        })
266                        .unwrap_or_else(|| id.to_string());
267                    sources.push(TimestampSource {
268                        name: format!("{name} ({id}, compute)"),
269                        read_frontier: frontiers.read_frontier.to_vec(),
270                        write_frontier: frontiers.write_frontier.to_vec(),
271                    });
272                }
273            }
274        }
275        let respond_immediately = determination.respond_immediately();
276        TimestampExplanation {
277            determination,
278            sources,
279            session_wall_time: session.pcx().wall_time,
280            respond_immediately,
281        }
282    }
283
284    #[instrument]
285    async fn explain_timestamp_finish(
286        &mut self,
287        session: &mut Session,
288        ExplainTimestampFinish {
289            validity: _,
290            format,
291            optimized_plan,
292            cluster_id,
293            source_ids,
294            when,
295            real_time_recency_ts,
296        }: ExplainTimestampFinish,
297    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
298        let id_bundle = self
299            .index_oracle(cluster_id)
300            .sufficient_collections(source_ids.iter().copied());
301
302        let is_json = match format {
303            ExplainFormat::Text => false,
304            ExplainFormat::Json => true,
305            ExplainFormat::Dot => {
306                return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
307            }
308        };
309        let mut timeline_context = self
310            .catalog()
311            .validate_timeline_context(source_ids.iter().copied())?;
312        if matches!(timeline_context, TimelineContext::TimestampIndependent)
313            && optimized_plan.contains_temporal()
314        {
315            // If the source IDs are timestamp independent but the query contains temporal functions,
316            // then the timeline context needs to be upgraded to timestamp dependent. This is
317            // required because `source_ids` doesn't contain functions.
318            timeline_context = TimelineContext::TimestampDependent;
319        }
320
321        let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
322
323        let determination = self.sequence_peek_timestamp(
324            session,
325            &when,
326            cluster_id,
327            timeline_context,
328            oracle_read_ts,
329            &id_bundle,
330            &source_ids,
331            real_time_recency_ts,
332            RequireLinearization::NotRequired,
333        )?;
334        let explanation = self.explain_timestamp(session, cluster_id, &id_bundle, determination);
335
336        let s = if is_json {
337            serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
338        } else {
339            explanation.to_string()
340        };
341        let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
342        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
343    }
344}