Skip to main content

mz_adapter/coord/sequencer/inner/
explain_timestamp.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::sync::Arc;
11
12use chrono::{DateTime, Utc};
13use itertools::Itertools;
14use mz_adapter_types::connection::ConnectionId;
15use mz_controller_types::ClusterId;
16use mz_expr::CollectionPlan;
17use mz_ore::instrument;
18use mz_repr::explain::ExplainFormat;
19use mz_repr::{Datum, Row};
20use mz_sql::plan::{self};
21use mz_sql::session::metadata::SessionMetadata;
22use tracing::{Instrument, Span};
23
24use crate::coord::sequencer::inner::return_if_err;
25use crate::coord::timestamp_selection::{TimestampDetermination, TimestampSource};
26use crate::coord::{
27    Coordinator, ExplainTimestampFinish, ExplainTimestampOptimize, ExplainTimestampRealTimeRecency,
28    ExplainTimestampStage, Message, PlanValidity, StageResult, Staged, TargetCluster,
29};
30use crate::error::AdapterError;
31use crate::optimize::{self, Optimize};
32use crate::session::{RequireLinearization, Session};
33use crate::{CollectionIdBundle, ExecuteContext, TimelineContext, TimestampExplanation};
34
35impl Staged for ExplainTimestampStage {
36    type Ctx = ExecuteContext;
37
38    fn validity(&mut self) -> &mut PlanValidity {
39        match self {
40            ExplainTimestampStage::Optimize(stage) => &mut stage.validity,
41            ExplainTimestampStage::RealTimeRecency(stage) => &mut stage.validity,
42            ExplainTimestampStage::Finish(stage) => &mut stage.validity,
43        }
44    }
45
46    async fn stage(
47        self,
48        coord: &mut Coordinator,
49        ctx: &mut ExecuteContext,
50    ) -> Result<StageResult<Box<Self>>, AdapterError> {
51        match self {
52            ExplainTimestampStage::Optimize(stage) => coord.explain_timestamp_optimize(stage),
53            ExplainTimestampStage::RealTimeRecency(stage) => {
54                coord
55                    .explain_timestamp_real_time_recency(ctx.session(), stage)
56                    .await
57            }
58            ExplainTimestampStage::Finish(stage) => {
59                coord
60                    .explain_timestamp_finish(ctx.session_mut(), stage)
61                    .await
62            }
63        }
64    }
65
66    fn message(self, ctx: ExecuteContext, span: Span) -> Message {
67        Message::ExplainTimestampStageReady {
68            ctx,
69            span,
70            stage: self,
71        }
72    }
73
74    fn cancel_enabled(&self) -> bool {
75        true
76    }
77}
78
79impl Coordinator {
80    #[instrument]
81    pub async fn sequence_explain_timestamp(
82        &mut self,
83        ctx: ExecuteContext,
84        plan: plan::ExplainTimestampPlan,
85        target_cluster: TargetCluster,
86    ) {
87        let stage = return_if_err!(
88            self.explain_timestamp_validity(ctx.session(), plan, target_cluster),
89            ctx
90        );
91        self.sequence_staged(ctx, Span::current(), stage).await;
92    }
93
94    #[instrument]
95    fn explain_timestamp_validity(
96        &self,
97        session: &Session,
98        plan: plan::ExplainTimestampPlan,
99        target_cluster: TargetCluster,
100    ) -> Result<ExplainTimestampStage, AdapterError> {
101        let cluster = self
102            .catalog()
103            .resolve_target_cluster(target_cluster, session)?;
104        let cluster_id = cluster.id;
105        let dependencies = plan
106            .raw_plan
107            .depends_on()
108            .into_iter()
109            .map(|id| self.catalog().resolve_item_id(&id))
110            .collect();
111        let validity = PlanValidity::new(
112            self.catalog().transient_revision(),
113            dependencies,
114            Some(cluster_id),
115            None,
116            session.role_metadata().clone(),
117        );
118        Ok(ExplainTimestampStage::Optimize(ExplainTimestampOptimize {
119            validity,
120            plan,
121            cluster_id,
122        }))
123    }
124
125    #[instrument]
126    fn explain_timestamp_optimize(
127        &self,
128        ExplainTimestampOptimize {
129            validity,
130            plan,
131            cluster_id,
132        }: ExplainTimestampOptimize,
133    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
134        // Collect optimizer parameters.
135        let optimizer_config = optimize::OptimizerConfig::from(self.catalog().system_config());
136
137        let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
138
139        let span = Span::current();
140        Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
141            || "optimize explain timestamp",
142            move || {
143                span.in_scope(|| {
144                    let plan::ExplainTimestampPlan {
145                        format,
146                        raw_plan,
147                        when,
148                    } = plan;
149
150                    // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
151                    let optimized_plan = optimizer.optimize(raw_plan)?;
152
153                    let stage =
154                        ExplainTimestampStage::RealTimeRecency(ExplainTimestampRealTimeRecency {
155                            validity,
156                            format,
157                            optimized_plan,
158                            cluster_id,
159                            when,
160                        });
161                    Ok(Box::new(stage))
162                })
163            },
164        )))
165    }
166
167    #[instrument]
168    async fn explain_timestamp_real_time_recency(
169        &self,
170        session: &Session,
171        ExplainTimestampRealTimeRecency {
172            validity,
173            format,
174            optimized_plan,
175            cluster_id,
176            when,
177        }: ExplainTimestampRealTimeRecency,
178    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
179        let source_ids = optimized_plan.depends_on();
180        let fut = self
181            .determine_real_time_recent_timestamp_if_needed(session, source_ids.iter().copied())
182            .await?;
183
184        match fut {
185            Some(fut) => {
186                let catalog = Arc::clone(&self.catalog);
187                let span = Span::current();
188                Ok(StageResult::Handle(mz_ore::task::spawn(
189                    || "explain timestamp real time recency",
190                    async move {
191                        let real_time_recency_ts =
192                            Coordinator::await_real_time_recent_timestamp(catalog, fut).await?;
193                        let stage = ExplainTimestampStage::Finish(ExplainTimestampFinish {
194                            validity,
195                            format,
196                            optimized_plan,
197                            cluster_id,
198                            source_ids,
199                            when,
200                            real_time_recency_ts: Some(real_time_recency_ts),
201                        });
202                        Ok(Box::new(stage))
203                    }
204                    .instrument(span),
205                )))
206            }
207            None => Ok(StageResult::Immediate(Box::new(
208                ExplainTimestampStage::Finish(ExplainTimestampFinish {
209                    validity,
210                    format,
211                    optimized_plan,
212                    cluster_id,
213                    source_ids,
214                    when,
215                    real_time_recency_ts: None,
216                }),
217            ))),
218        }
219    }
220
221    pub(crate) fn explain_timestamp(
222        &self,
223        conn_id: &ConnectionId,
224        session_wall_time: DateTime<Utc>,
225        cluster_id: ClusterId,
226        id_bundle: &CollectionIdBundle,
227        determination: TimestampDetermination,
228    ) -> TimestampExplanation {
229        let mut sources = Vec::new();
230        {
231            let storage_ids = id_bundle.storage_ids.iter().cloned().collect_vec();
232            let frontiers = self
233                .controller
234                .storage
235                .collections_frontiers(storage_ids)
236                .expect("missing collection");
237
238            for (id, since, upper) in frontiers {
239                let name = self
240                    .catalog()
241                    .try_get_entry_by_global_id(&id)
242                    .map(|item| item.name())
243                    .map(|name| {
244                        self.catalog()
245                            .resolve_full_name(name, Some(conn_id))
246                            .to_string()
247                    })
248                    .unwrap_or_else(|| id.to_string());
249                sources.push(TimestampSource {
250                    name: format!("{name} ({id}, storage)"),
251                    read_frontier: since.elements().to_vec(),
252                    write_frontier: upper.elements().to_vec(),
253                });
254            }
255        }
256        {
257            if let Some(compute_ids) = id_bundle.compute_ids.get(&cluster_id) {
258                let catalog = self.catalog();
259                for id in compute_ids {
260                    let frontiers = self
261                        .controller
262                        .compute
263                        .collection_frontiers(*id, Some(cluster_id))
264                        .expect("id does not exist");
265                    let name = catalog
266                        .try_get_entry_by_global_id(id)
267                        .map(|item| item.name())
268                        .map(|name| catalog.resolve_full_name(name, Some(conn_id)).to_string())
269                        .unwrap_or_else(|| id.to_string());
270                    sources.push(TimestampSource {
271                        name: format!("{name} ({id}, compute)"),
272                        read_frontier: frontiers.read_frontier.to_vec(),
273                        write_frontier: frontiers.write_frontier.to_vec(),
274                    });
275                }
276            }
277        }
278        let respond_immediately = determination.respond_immediately();
279        TimestampExplanation {
280            determination,
281            sources,
282            session_wall_time,
283            respond_immediately,
284        }
285    }
286
287    #[instrument]
288    async fn explain_timestamp_finish(
289        &mut self,
290        session: &mut Session,
291        ExplainTimestampFinish {
292            validity: _,
293            format,
294            optimized_plan,
295            cluster_id,
296            source_ids,
297            when,
298            real_time_recency_ts,
299        }: ExplainTimestampFinish,
300    ) -> Result<StageResult<Box<ExplainTimestampStage>>, AdapterError> {
301        let id_bundle = self
302            .index_oracle(cluster_id)
303            .sufficient_collections(source_ids.iter().copied());
304
305        let is_json = match format {
306            ExplainFormat::Text => false,
307            ExplainFormat::Json => true,
308            ExplainFormat::Dot => {
309                return Err(AdapterError::Unsupported("EXPLAIN TIMESTAMP AS DOT"));
310            }
311        };
312        let mut timeline_context = self
313            .catalog()
314            .validate_timeline_context(source_ids.iter().copied())?;
315        if matches!(timeline_context, TimelineContext::TimestampIndependent)
316            && optimized_plan.contains_temporal()
317        {
318            // If the source IDs are timestamp independent but the query contains temporal functions,
319            // then the timeline context needs to be upgraded to timestamp dependent. This is
320            // required because `source_ids` doesn't contain functions.
321            timeline_context = TimelineContext::TimestampDependent;
322        }
323
324        let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await;
325
326        let determination = self.sequence_peek_timestamp(
327            session,
328            &when,
329            cluster_id,
330            timeline_context,
331            oracle_read_ts,
332            &id_bundle,
333            &source_ids,
334            real_time_recency_ts,
335            RequireLinearization::NotRequired,
336        )?;
337        let explanation = self.explain_timestamp(
338            session.conn_id(),
339            session.pcx().wall_time,
340            cluster_id,
341            &id_bundle,
342            determination,
343        );
344
345        let s = if is_json {
346            serde_json::to_string_pretty(&explanation).expect("failed to serialize explanation")
347        } else {
348            explanation.to_string()
349        };
350        let rows = vec![Row::pack_slice(&[Datum::from(s.as_str())])];
351        Ok(StageResult::Response(Self::send_immediate_rows(rows)))
352    }
353}