mz_adapter/coord/sequencer/inner/
explain_timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use itertools::Itertools;
11use mz_controller_types::ClusterId;
12use mz_expr::CollectionPlan;
13use mz_ore::instrument;
14use mz_repr::explain::ExplainFormat;
15use mz_repr::{Datum, Row};
16use mz_sql::plan::{self};
17use mz_sql::session::metadata::SessionMetadata;
18use tracing::{Instrument, Span};
19
20use crate::coord::sequencer::inner::return_if_err;
21use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
22use crate::coord::{
23    Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
24    ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
25};
26use crate::error::AdapterError;
27use crate::optimize::{self, Optimize};
28use crate::session::{RequireLinearization, Session};
29use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
30
31impl Staged for ExplainTimestampStage {
32    type Ctx = ExecuteContext;
33
34    fn validity(&mut self) -> &mut PlanValidity {
35        match self {
36            ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
37            ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
38            ExplainTimestampStage::Finish(stage) => &mut stage.validity,
39        }
40    }
41
42    async fn stage(
43        self,
44        coord: &mut Coordinator,
45        ctx: &mut ExecuteContext,
46    ) -> Result<StageResult<Box<Self>>, AdapterError> {
47        match self {
48            ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
49            ExplainTimestampStage::RealTimeRecency(stage) => {
50                coord
51                    .explain_timestamp_real_time_recency(ctx.session(), stage)
52                    .await
53            }
54            ExplainTimestampStage::Finish(stage) => {
55                coord
56                    .explain_timestamp_finish(ctx.session_mut(), stage)
57                    .await
58            }
59        }
60    }
61
62    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
63        Message::ExplainTimestampStageReady {
64            ctx,
65            span,
66            stage: self,
67        }
68    }
69
70    fn cancel_enabled(&self) -> bool {
71        true
72    }
73}
74
75impl Coordinator {
76    #[instrument]
77    pub async fn sequence_explain_timestamp(
78        &mut self,
79        ctx: ExecuteContext,
80        plan: plan::ExplainTimestampPlan,
81        target_cluster: TargetCluster,
82    ) {
83        let stage = return_if_err!(
84            self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
85            ctx
86        );
87        self.sequence_staged(ctx, Span::current(), stage).await;
88    }
89
90    #[instrument]
91    fn explain_timestamp_validity(
92        &self,
93        session: &Session,
94        plan: plan::ExplainTimestampPlan,
95        target_cluster: TargetCluster,
96    ) -> Result<ExplainTimestampStage, AdapterError> {
97        let cluster = self
98            .catalog()
99            .resolve_target_cluster(target_cluster, session)?;
100        let cluster_id = cluster.id;
101        let dependencies = plan
102            .raw_plan
103            .depends_on()
104            .into_iter()
105            .map(|id| self.catalog().resolve_item_id(&id))
106            .collect();
107        let validity = PlanValidity::new(
108            self.catalog().transient_revision(),
109            dependencies,
110            Some(cluster_id),
111            None,
112            session.role_metadata().clone(),
113        );
114        Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
115            validity,
116            plan,
117            cluster_id,
118        }))
119    }
120
121    #[instrument]
122    fn explain_timestamp_optimize(
123        &self,
124        ExplainTimestampOptimize {
125            validity,
126            plan,
127            cluster_id,
128        }: ExplainTimestampOptimize,
129    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
130        // Collect optimizer parameters.
131        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
132
133        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
134
135        let span = Span::current();
136        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
137            || "optimize explain timestamp",
138            move || {
139                span.in_scope(|| {
140                    let plan::ExplainTimestampPlan {
141                        format,
142                        raw_plan,
143                        when,
144                    } = plan;
145
146                    // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
147                    let optimized_plan = optimizer.optimize(raw_plan)?;
148
149                    let stage =
150                        ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
151                            validity,
152                            format,
153                            optimized_plan,
154                            cluster_id,
155                            when,
156                        });
157                    Ok(Box::new(stage))
158                })
159            },
160        )))
161    }
162
163    #[instrument]
164    async fn explain_timestamp_real_time_recency(
165        &self,
166        session: &Session,
167        ExplainTimestampRealTimeRecency {
168            validity,
169            format,
170            optimized_plan,
171            cluster_id,
172            when,
173        }: ExplainTimestampRealTimeRecency,
174    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
175        let source_ids = optimized_plan.depends_on();
176        let source_items: Vec<_> = source_ids
177            .iter()
178            .map(|gid| self.catalog().resolve_item_id(gid))
179            .collect();
180        let fut = self
181            .determine_real_time_recent_timestamp(session, source_items.into_iter())
182            .await?;
183
184        match fut {
185            Some(fut) => {
186                let span = Span::current();
187                Ok(StageResult::Handle(mz_ore::task::spawn(
188                    || "explain timestamp real time recency",
189                    async move {
190                        let real_time_recency_ts = fut.await?;
191                        let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
192                            validity,
193                            format,
194                            optimized_plan,
195                            cluster_id,
196                            source_ids,
197                            when,
198                            real_time_recency_ts: Some(real_time_recency_ts),
199                        });
200                        Ok(Box::new(stage))
201                    }
202                    .instrument(span),
203                )))
204            }
205            None => Ok(StageResult::Immediate(Box::new(
206                ExplainTimestampStage::Finish(ExplainTimestampFinish {
207                    validity,
208                    format,
209                    optimized_plan,
210                    cluster_id,
211                    source_ids,
212                    when,
213                    real_time_recency_ts: None,
214                }),
215            ))),
216        }
217    }
218
219    pub(crate) fn explain_timestamp(
220        &self,
221        session: &Session,
222        cluster_id: ClusterId,
223        id_bundle: &CollectionIdBundle,
224        determination: TimestampDetermination<mz_repr::Timestamp>,
225    ) -> TimestampExplanation<mz_repr::Timestamp> {
226        let mut sources = Vec::new();
227        {
228            let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
229            let frontiers = self
230                .controller
231                .storage
232                .collections_frontiers(storage_ids)
233                .expect("missing collection");
234
235            for (id, since, upper) in frontiers {
236                let name = self
237                    .catalog()
238                    .try_get_entry_by_global_id(&id)
239                    .map(|item| item.name())
240                    .map(|name| {
241                        self.catalog()
242                            .resolve_full_name(name, Some(session.conn_id()))
243                            .to_string()
244                    })
245                    .unwrap_or_else(|| id.to_string());
246                sources.push(TimestampSource {
247                    name: format!("{name} ({id}, storage)"),
248                    read_frontier: since.elements().to_vec(),
249                    write_frontier: upper.elements().to_vec(),
250                });
251            }
252        }
253        {
254            if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
255                let catalog = self.catalog();
256                for id in compute_ids {
257                    let frontiers = self
258                        .controller
259                        .compute
260                        .collection_frontiers(*id, Some(cluster_id))
261                        .expect("id does not exist");
262                    let name = catalog
263                        .try_get_entry_by_global_id(id)
264                        .map(|item| item.name())
265                        .map(|name| {
266                            catalog
267                                .resolve_full_name(name, Some(session.conn_id()))
268                                .to_string()
269                        })
270                        .unwrap_or_else(|| id.to_string());
271                    sources.push(TimestampSource {
272                        name: format!("{name} ({id}, compute)"),
273                        read_frontier: frontiers.read_frontier.to_vec(),
274                        write_frontier: frontiers.write_frontier.to_vec(),
275                    });
276                }
277            }
278        }
279        let respond_immediately = determination.respond_immediately();
280        TimestampExplanation {
281            determination,
282            sources,
283            session_wall_time: session.pcx().wall_time,
284            respond_immediately,
285        }
286    }
287
288    #[instrument]
289    async fn explain_timestamp_finish(
290        &mut self,
291        session: &mut Session,
292        ExplainTimestampFinish {
293            validity: _,
294            format,
295            optimized_plan,
296            cluster_id,
297            source_ids,
298            when,
299            real_time_recency_ts,
300        }: ExplainTimestampFinish,
301    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
302        let id_bundle = self
303            .index_oracle(cluster_id)
304            .sufficient_collections(source_ids.iter().copied());
305
306        let is_json = match format {
307            ExplainFormat::Text => false,
308            ExplainFormat::Json => true,
309            ExplainFormat::Dot => {
310                return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
311            }
312        };
313        let mut timeline_context = self
314            .catalog()
315            .validate_timeline_context(source_ids.iter().copied())?;
316        if matches!(timeline_context, TimelineContext::TimestampIndependent)
317            && optimized_plan.contains_temporal()
318        {
319            // If the source IDs are timestamp independent but the query contains temporal functions,
320            // then the timeline context needs to be upgraded to timestamp dependent. This is
321            // required because `source_ids` doesn't contain functions.
322            timeline_context = TimelineContext::TimestampDependent;
323        }
324
325        let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
326
327        let determination = self.sequence_peek_timestamp(
328            session,
329            &when,
330            cluster_id,
331            timeline_context,
332            oracle_read_ts,
333            &id_bundle,
334            &source_ids,
335            real_time_recency_ts,
336            RequireLinearization::NotRequired,
337        )?;
338        let explanation = self.explain_timestamp(session, cluster_id, &id_bundle, determination);
339
340        let s = if is_json {
341            serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
342        } else {
343            explanation.to_string()
344        };
345        let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
346        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
347    }
348}