mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18
19use differential_dataflow::consolidation::consolidate;
20use futures::TryFutureExt;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_cluster_client::ReplicaId;
24use mz_compute_client::controller::PeekNotification;
25use mz_compute_client::protocol::command::PeekTarget;
26use mz_compute_client::protocol::response::PeekResponse;
27use mz_compute_types::ComputeInstanceId;
28use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
29use mz_controller_types::ClusterId;
30use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
31use mz_expr::row::RowCollection;
32use mz_expr::{
33    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
34    permutation_for_arrangement,
35};
36use mz_ore::cast::CastFrom;
37use mz_ore::str::{StrExt, separated};
38use mz_ore::tracing::OpenTelemetryContext;
39use mz_repr::explain::text::DisplayText;
40use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
41use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator, preserves_order};
42use serde::{Deserialize, Serialize};
43use timely::progress::Timestamp;
44use uuid::Uuid;
45
46use crate::coord::timestamp_selection::TimestampDetermination;
47use crate::optimize::OptimizerError;
48use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
49use crate::util::ResultExt;
50use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
51
52/// A peek is a request to read data from a maintained arrangement.
53#[derive(Debug)]
54pub(crate) struct PendingPeek {
55    /// The connection that initiated the peek.
56    pub(crate) conn_id: ConnectionId,
57    /// The cluster that the peek is being executed on.
58    pub(crate) cluster_id: ClusterId,
59    /// All `GlobalId`s that the peek depend on.
60    pub(crate) depends_on: BTreeSet<GlobalId>,
61    /// Context about the execute that produced this peek,
62    /// needed by the coordinator for retiring it.
63    pub(crate) ctx_extra: ExecuteContextExtra,
64    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
65    pub(crate) is_fast_path: bool,
66}
67
68/// The response from a `Peek`, with row multiplicities represented in unary.
69///
70/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
71/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
72#[derive(Debug)]
73pub enum PeekResponseUnary {
74    Rows(Box<dyn RowIterator + Send + Sync>),
75    Error(String),
76    Canceled,
77}
78
79#[derive(Clone, Debug)]
80pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
81    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
82    pub(crate) id: GlobalId,
83    key: Vec<MirScalarExpr>,
84    permutation: Vec<usize>,
85    thinned_arity: usize,
86}
87
88impl<T> PeekDataflowPlan<T> {
89    pub fn new(
90        desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
91        id: GlobalId,
92        typ: &RelationType,
93    ) -> Self {
94        let arity = typ.arity();
95        let key = typ
96            .default_key()
97            .into_iter()
98            .map(MirScalarExpr::Column)
99            .collect::<Vec<_>>();
100        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
101        Self {
102            desc,
103            id,
104            key,
105            permutation,
106            thinned_arity: thinning.len(),
107        }
108    }
109}
110
111#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
112pub enum FastPathPlan {
113    /// The view evaluates to a constant result that can be returned.
114    ///
115    /// The [RelationType] is unnecessary for evaluating the constant result but
116    /// may be helpful when printing out an explanation.
117    Constant(Result<Vec<(Row, Diff)>, EvalError>, RelationType),
118    /// The view can be read out of an existing arrangement.
119    /// (coll_id, idx_id, values to look up, mfp to apply)
120    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
121    /// The view can be read directly out of Persist.
122    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
123}
124
125impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
126    fn fmt_text(
127        &self,
128        f: &mut fmt::Formatter<'_>,
129        ctx: &mut PlanRenderingContext<'a, T>,
130    ) -> fmt::Result {
131        let redacted = ctx.config.redacted;
132        let mode = HumanizedExplain::new(ctx.config.redacted);
133
134        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
135        // code.
136        match self {
137            FastPathPlan::Constant(Ok(rows), _) => {
138                if !rows.is_empty() {
139                    writeln!(f, "{}Constant", ctx.indent)?;
140                    *ctx.as_mut() += 1;
141                    fmt_text_constant_rows(
142                        f,
143                        rows.iter().map(|(row, diff)| (row, diff)),
144                        ctx.as_mut(),
145                        redacted,
146                    )?;
147                    *ctx.as_mut() -= 1;
148                } else {
149                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
150                }
151                Ok(())
152            }
153            FastPathPlan::Constant(Err(err), _) => {
154                if redacted {
155                    writeln!(f, "{}Error â–ˆ", ctx.as_mut())
156                } else {
157                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
158                }
159            }
160            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
161                ctx.as_mut().set();
162                let (map, filter, project) = mfp.as_map_filter_project();
163
164                let cols = if !ctx.config.humanized_exprs {
165                    None
166                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
167                    // FIXME: account for thinning and permutation
168                    // See mz_expr::permutation_for_arrangement
169                    // See permute_oneshot_mfp_around_index
170                    let cols = itertools::chain(
171                        cols.iter().cloned(),
172                        std::iter::repeat(String::new()).take(map.len()),
173                    )
174                    .collect();
175                    Some(cols)
176                } else {
177                    None
178                };
179
180                if project.len() != mfp.input_arity + map.len()
181                    || !project.iter().enumerate().all(|(i, o)| i == *o)
182                {
183                    let outputs = mode.seq(&project, cols.as_ref());
184                    let outputs = CompactScalars(outputs);
185                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
186                    *ctx.as_mut() += 1;
187                }
188                if !filter.is_empty() {
189                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
190                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
191                    *ctx.as_mut() += 1;
192                }
193                if !map.is_empty() {
194                    let scalars = mode.seq(&map, cols.as_ref());
195                    let scalars = CompactScalars(scalars);
196                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
197                    *ctx.as_mut() += 1;
198                }
199                MirRelationExpr::fmt_indexed_filter(
200                    f,
201                    ctx,
202                    coll_id,
203                    idx_id,
204                    literal_constraints.clone(),
205                    None,
206                )?;
207                writeln!(f)?;
208                ctx.as_mut().reset();
209                Ok(())
210            }
211            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
212                ctx.as_mut().set();
213                let (map, filter, project) = mfp.as_map_filter_project();
214
215                let cols = if !ctx.config.humanized_exprs {
216                    None
217                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
218                    let cols = itertools::chain(
219                        cols.iter().cloned(),
220                        std::iter::repeat(String::new()).take(map.len()),
221                    )
222                    .collect::<Vec<_>>();
223                    Some(cols)
224                } else {
225                    None
226                };
227
228                if project.len() != mfp.input_arity + map.len()
229                    || !project.iter().enumerate().all(|(i, o)| i == *o)
230                {
231                    let outputs = mode.seq(&project, cols.as_ref());
232                    let outputs = CompactScalars(outputs);
233                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
234                    *ctx.as_mut() += 1;
235                }
236                if !filter.is_empty() {
237                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
238                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
239                    *ctx.as_mut() += 1;
240                }
241                if !map.is_empty() {
242                    let scalars = mode.seq(&map, cols.as_ref());
243                    let scalars = CompactScalars(scalars);
244                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
245                    *ctx.as_mut() += 1;
246                }
247                let human_id = ctx
248                    .humanizer
249                    .humanize_id(*gid)
250                    .unwrap_or_else(|| gid.to_string());
251                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
252                if let Some(literal) = literal_constraint {
253                    let value = mode.expr(literal, None);
254                    writeln!(f, " [value={}]", value)?;
255                } else {
256                    writeln!(f, "")?;
257                }
258                ctx.as_mut().reset();
259                Ok(())
260            }
261        }?;
262        Ok(())
263    }
264}
265
266#[derive(Debug)]
267pub struct PlannedPeek {
268    pub plan: PeekPlan,
269    pub determination: TimestampDetermination<mz_repr::Timestamp>,
270    pub conn_id: ConnectionId,
271    pub source_arity: usize,
272    pub source_ids: BTreeSet<GlobalId>,
273}
274
275/// Possible ways in which the coordinator could produce the result for a goal view.
276#[derive(Clone, Debug)]
277pub enum PeekPlan<T = mz_repr::Timestamp> {
278    FastPath(FastPathPlan),
279    /// The view must be installed as a dataflow and then read.
280    SlowPath(PeekDataflowPlan<T>),
281}
282
283/// Convert `mfp` to an executable, non-temporal plan.
284/// It should be non-temporal, as OneShot preparation populates `mz_now`.
285fn mfp_to_safe_plan(
286    mfp: mz_expr::MapFilterProject,
287) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
288    mfp.into_plan()
289        .map_err(OptimizerError::Internal)?
290        .into_nontemporal()
291        .map_err(|_e| OptimizerError::UnsafeMfpPlan)
292}
293
294fn permute_oneshot_mfp_around_index(
295    mfp: mz_expr::MapFilterProject,
296    key: &[MirScalarExpr],
297) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
298    let input_arity = mfp.input_arity;
299    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
300    let (permute, thinning) = mz_expr::permutation_for_arrangement(key, input_arity);
301    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
302    Ok(safe_mfp)
303}
304
305/// Determine if the dataflow plan can be implemented without an actual dataflow.
306///
307/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
308/// we can avoid building a dataflow (and either just return the results, or peek
309/// out of the arrangement, respectively).
310pub fn create_fast_path_plan<T: Timestamp>(
311    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
312    view_id: GlobalId,
313    finishing: Option<&RowSetFinishing>,
314    persist_fast_path_limit: usize,
315    persist_fast_path_order: bool,
316) -> Result<Option<FastPathPlan>, OptimizerError> {
317    // At this point, `dataflow_plan` contains our best optimized dataflow.
318    // We will check the plan to see if there is a fast path to escape full dataflow construction.
319
320    // We need to restrict ourselves to settings where the inserted transient view is the first thing
321    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
322    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
323    {
324        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
325        if let Some((rows, found_typ)) = mir.as_const() {
326            // In the case of a constant, we can return the result now.
327            return Ok(Some(FastPathPlan::Constant(
328                rows.clone()
329                    .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
330                found_typ.clone(),
331            )));
332        } else {
333            // If there is a TopK that would be completely covered by the finishing, then jump
334            // through the TopK.
335            if let MirRelationExpr::TopK {
336                input,
337                group_key,
338                order_key,
339                limit,
340                offset,
341                monotonic: _,
342                expected_group_size: _,
343            } = mir
344            {
345                if let Some(finishing) = finishing {
346                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
347                        // The following is roughly `limit >= finishing.limit`, but with Options.
348                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
349                            (None, _) => true,
350                            (Some(..), None) => false,
351                            (Some(topk_limit), Some(finishing_limit)) => {
352                                if let Some(l) = topk_limit.as_literal_int64() {
353                                    l >= *finishing_limit
354                                } else {
355                                    false
356                                }
357                            }
358                        };
359                        if finishing_limits_at_least_as_topk {
360                            mir = input;
361                        }
362                    }
363                }
364            }
365            // In the case of a linear operator around an indexed view, we
366            // can skip creating a dataflow and instead pull all the rows in
367            // index and apply the linear operator against them.
368            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
369            match mir {
370                MirRelationExpr::Get {
371                    id: Id::Global(get_id),
372                    typ: relation_typ,
373                    ..
374                } => {
375                    // Just grab any arrangement if an arrangement exists
376                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
377                        if desc.on_id == *get_id {
378                            return Ok(Some(FastPathPlan::PeekExisting(
379                                *get_id,
380                                *index_id,
381                                None,
382                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
383                            )));
384                        }
385                    }
386
387                    // If there is no arrangement, consider peeking the persist shard directly.
388                    // Generally, we consider a persist peek when the query can definitely be satisfied
389                    // by scanning through a small, constant number of Persist key-values.
390                    let safe_mfp = mfp_to_safe_plan(mfp)?;
391                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
392
393                    let literal_constraint = if persist_fast_path_order {
394                        let mut row = Row::default();
395                        let mut packer = row.packer();
396                        for (idx, col) in relation_typ.column_types.iter().enumerate() {
397                            if !preserves_order(&col.scalar_type) {
398                                break;
399                            }
400                            let col_expr = MirScalarExpr::Column(idx);
401
402                            let Some((literal, _)) = filters
403                                .iter()
404                                .filter_map(|f| f.expr_eq_literal(&col_expr))
405                                .next()
406                            else {
407                                break;
408                            };
409                            packer.extend_by_row(&literal);
410                        }
411                        if row.is_empty() { None } else { Some(row) }
412                    } else {
413                        None
414                    };
415
416                    let finish_ok = match &finishing {
417                        None => false,
418                        Some(RowSetFinishing {
419                            order_by,
420                            limit,
421                            offset,
422                            ..
423                        }) => {
424                            let order_ok = if persist_fast_path_order {
425                                order_by.iter().enumerate().all(|(idx, order)| {
426                                    // Map the ordering column back to the column in the source data.
427                                    // (If it's not one of the input columns, we can't make any guarantees.)
428                                    let column_idx = projection[order.column];
429                                    if column_idx >= safe_mfp.input_arity {
430                                        return false;
431                                    }
432                                    let column_type = &relation_typ.column_types[column_idx];
433                                    let index_ok = idx == column_idx;
434                                    let nulls_ok = !column_type.nullable || order.nulls_last;
435                                    let asc_ok = !order.desc;
436                                    let type_ok = preserves_order(&column_type.scalar_type);
437                                    index_ok && nulls_ok && asc_ok && type_ok
438                                })
439                            } else {
440                                order_by.is_empty()
441                            };
442                            let limit_ok = limit.map_or(false, |l| {
443                                usize::cast_from(l) + *offset < persist_fast_path_limit
444                            });
445                            order_ok && limit_ok
446                        }
447                    };
448
449                    let key_constraint = if let Some(literal) = &literal_constraint {
450                        let prefix_len = literal.iter().count();
451                        relation_typ
452                            .keys
453                            .iter()
454                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
455                    } else {
456                        false
457                    };
458
459                    // We can generate a persist peek when:
460                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
461                    // - We can return the first N key values (no filters, small limit, consistent order)
462                    if key_constraint || (filters.is_empty() && finish_ok) {
463                        return Ok(Some(FastPathPlan::PeekPersist(
464                            *get_id,
465                            literal_constraint,
466                            safe_mfp,
467                        )));
468                    }
469                }
470                MirRelationExpr::Join { implementation, .. } => {
471                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
472                        implementation
473                    {
474                        return Ok(Some(FastPathPlan::PeekExisting(
475                            *coll_id,
476                            *idx_id,
477                            Some(vals.clone()),
478                            permute_oneshot_mfp_around_index(mfp, key)?,
479                        )));
480                    }
481                }
482                // nothing can be done for non-trivial expressions.
483                _ => {}
484            }
485        }
486    }
487    Ok(None)
488}
489
490impl FastPathPlan {
491    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
492        match self {
493            FastPathPlan::Constant(..) => UsedIndexes::default(),
494            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
495                if literal_constraints.is_some() {
496                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
497                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
498                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
499                } else {
500                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
501                }
502            }
503            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
504        }
505    }
506}
507
508impl crate::coord::Coordinator {
509    /// Implements a peek plan produced by `create_plan` above.
510    #[mz_ore::instrument(level = "debug")]
511    pub async fn implement_peek_plan(
512        &mut self,
513        ctx_extra: &mut ExecuteContextExtra,
514        plan: PlannedPeek,
515        finishing: RowSetFinishing,
516        compute_instance: ComputeInstanceId,
517        target_replica: Option<ReplicaId>,
518        max_result_size: u64,
519        max_returned_query_size: Option<u64>,
520    ) -> Result<crate::ExecuteResponse, AdapterError> {
521        let PlannedPeek {
522            plan: fast_path,
523            determination,
524            conn_id,
525            source_arity,
526            source_ids,
527        } = plan;
528
529        // If the dataflow optimizes to a constant expression, we can immediately return the result.
530        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
531            let mut rows = match rows {
532                Ok(rows) => rows,
533                Err(e) => return Err(e.into()),
534            };
535            // Consolidate down the results to get correct totals.
536            consolidate(&mut rows);
537
538            let mut results = Vec::new();
539            for (row, count) in rows {
540                if count.is_negative() {
541                    Err(EvalError::InvalidParameterValue(
542                        format!("Negative multiplicity in constant result: {}", count).into(),
543                    ))?
544                };
545                if count.is_positive() {
546                    let count = usize::cast_from(
547                        u64::try_from(count.into_inner())
548                            .expect("known to be positive from check above"),
549                    );
550                    results.push((
551                        row,
552                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
553                    ));
554                }
555            }
556            let row_collection = RowCollection::new(results, &finishing.order_by);
557            let duration_histogram = self.metrics.row_set_finishing_seconds();
558
559            let (ret, reason) = match finishing.finish(
560                row_collection,
561                max_result_size,
562                max_returned_query_size,
563                &duration_histogram,
564            ) {
565                Ok((rows, row_size_bytes)) => {
566                    let result_size = u64::cast_from(row_size_bytes);
567                    let rows_returned = u64::cast_from(rows.count());
568                    (
569                        Ok(Self::send_immediate_rows(rows)),
570                        StatementEndedExecutionReason::Success {
571                            result_size: Some(result_size),
572                            rows_returned: Some(rows_returned),
573                            execution_strategy: Some(StatementExecutionStrategy::Constant),
574                        },
575                    )
576                }
577                Err(error) => (
578                    Err(AdapterError::ResultSize(error.clone())),
579                    StatementEndedExecutionReason::Errored { error },
580                ),
581            };
582            self.retire_execution(reason, std::mem::take(ctx_extra));
583            return ret;
584        }
585
586        let timestamp = determination.timestamp_context.timestamp_or_default();
587        if let Some(id) = ctx_extra.contents() {
588            self.set_statement_execution_timestamp(id, timestamp)
589        }
590
591        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
592        // In both cases we will want to peek, and the main difference is that we might want to
593        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
594        // differently.
595
596        // If we must build the view, ship the dataflow.
597        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
598            PeekPlan::FastPath(FastPathPlan::PeekExisting(
599                _coll_id,
600                idx_id,
601                literal_constraints,
602                map_filter_project,
603            )) => (
604                (literal_constraints, timestamp, map_filter_project),
605                None,
606                true,
607                PeekTarget::Index { id: idx_id },
608                StatementExecutionStrategy::FastPath,
609            ),
610            PeekPlan::FastPath(FastPathPlan::PeekPersist(
611                coll_id,
612                literal_constraint,
613                map_filter_project,
614            )) => {
615                let peek_command = (
616                    literal_constraint.map(|r| vec![r]),
617                    timestamp,
618                    map_filter_project,
619                );
620                let metadata = self
621                    .controller
622                    .storage
623                    .collection_metadata(coll_id)
624                    .expect("storage collection for fast-path peek")
625                    .clone();
626                (
627                    peek_command,
628                    None,
629                    true,
630                    PeekTarget::Persist {
631                        id: coll_id,
632                        metadata,
633                    },
634                    StatementExecutionStrategy::PersistFastPath,
635                )
636            }
637            PeekPlan::SlowPath(PeekDataflowPlan {
638                desc: dataflow,
639                // n.b. this index_id identifies a transient index the
640                // caller created, so it is guaranteed to be on
641                // `compute_instance`.
642                id: index_id,
643                key: index_key,
644                permutation: index_permutation,
645                thinned_arity: index_thinned_arity,
646            }) => {
647                let output_ids = dataflow.export_ids().collect();
648
649                // Very important: actually create the dataflow (here, so we can destructure).
650                self.controller
651                    .compute
652                    .create_dataflow(compute_instance, dataflow, None)
653                    .unwrap_or_terminate("cannot fail to create dataflows");
654                self.initialize_compute_read_policies(
655                    output_ids,
656                    compute_instance,
657                    // Disable compaction so that nothing can compact before the peek occurs below.
658                    CompactionWindow::DisableCompaction,
659                )
660                .await;
661
662                // Create an identity MFP operator.
663                let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
664                map_filter_project.permute_fn(
665                    |c| index_permutation[c],
666                    index_key.len() + index_thinned_arity,
667                );
668                let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
669                (
670                    (None, timestamp, map_filter_project),
671                    Some(index_id),
672                    false,
673                    PeekTarget::Index { id: index_id },
674                    StatementExecutionStrategy::Standard,
675                )
676            }
677            _ => {
678                unreachable!()
679            }
680        };
681
682        // Endpoints for sending and receiving peek responses.
683        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
684
685        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
686        // small but unlikely chance that it's not unique to completed peeks.
687        let mut uuid = Uuid::new_v4();
688        while self.pending_peeks.contains_key(&uuid) {
689            uuid = Uuid::new_v4();
690        }
691
692        // The peek is ready to go for both cases, fast and non-fast.
693        // Stash the response mechanism, and broadcast dataflow construction.
694        self.pending_peeks.insert(
695            uuid,
696            PendingPeek {
697                conn_id: conn_id.clone(),
698                cluster_id: compute_instance,
699                depends_on: source_ids,
700                ctx_extra: std::mem::take(ctx_extra),
701                is_fast_path,
702            },
703        );
704        self.client_pending_peeks
705            .entry(conn_id)
706            .or_default()
707            .insert(uuid, compute_instance);
708        let (literal_constraints, timestamp, map_filter_project) = peek_command;
709
710        self.controller
711            .compute
712            .peek(
713                compute_instance,
714                peek_target,
715                literal_constraints,
716                uuid,
717                timestamp,
718                finishing.clone(),
719                map_filter_project,
720                target_replica,
721                rows_tx,
722            )
723            .unwrap_or_terminate("cannot fail to peek");
724        let duration_histogram = self.metrics.row_set_finishing_seconds();
725
726        // Prepare the receiver to return as a response.
727        let rows_rx = rows_rx.map_ok_or_else(
728            |e| PeekResponseUnary::Error(e.to_string()),
729            move |resp| match resp {
730                PeekResponse::Rows(rows) => {
731                    match finishing.finish(
732                        rows,
733                        max_result_size,
734                        max_returned_query_size,
735                        &duration_histogram,
736                    ) {
737                        Ok((rows, _size_bytes)) => PeekResponseUnary::Rows(Box::new(rows)),
738                        Err(e) => PeekResponseUnary::Error(e),
739                    }
740                }
741                PeekResponse::Canceled => PeekResponseUnary::Canceled,
742                PeekResponse::Error(e) => PeekResponseUnary::Error(e),
743            },
744        );
745
746        // If it was created, drop the dataflow once the peek command is sent.
747        if let Some(index_id) = drop_dataflow {
748            self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
749            self.drop_indexes(vec![(compute_instance, index_id)]);
750        }
751
752        Ok(crate::ExecuteResponse::SendingRows {
753            future: Box::pin(rows_rx),
754            instance_id: compute_instance,
755            strategy,
756        })
757    }
758
759    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
760    #[mz_ore::instrument(level = "debug")]
761    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
762        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
763            self.metrics
764                .canceled_peeks
765                .with_label_values(&[])
766                .inc_by(u64::cast_from(uuids.len()));
767
768            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
769            for (uuid, compute_instance) in &uuids {
770                inverse.entry(*compute_instance).or_default().insert(*uuid);
771            }
772            for (compute_instance, uuids) in inverse {
773                // It's possible that this compute instance no longer exists because it was dropped
774                // while the peek was in progress. In this case we ignore the error and move on
775                // because the dataflow no longer exists.
776                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
777                for uuid in uuids {
778                    let _ = self.controller.compute.cancel_peek(
779                        compute_instance,
780                        uuid,
781                        PeekResponse::Canceled,
782                    );
783                }
784            }
785
786            let peeks = uuids
787                .iter()
788                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
789                .collect::<Vec<_>>();
790            for peek in peeks {
791                self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
792            }
793        }
794    }
795
796    /// Handle a peek notification and retire the corresponding execution. Does nothing for
797    /// already-removed peeks.
798    pub(crate) fn handle_peek_notification(
799        &mut self,
800        uuid: Uuid,
801        notification: PeekNotification,
802        otel_ctx: OpenTelemetryContext,
803    ) {
804        // We expect exactly one peek response, which we forward. Then we clean up the
805        // peek's state in the coordinator.
806        if let Some(PendingPeek {
807            conn_id: _,
808            cluster_id: _,
809            depends_on: _,
810            ctx_extra,
811            is_fast_path,
812        }) = self.remove_pending_peek(&uuid)
813        {
814            let reason = match notification {
815                PeekNotification::Success {
816                    rows: num_rows,
817                    result_size,
818                } => {
819                    let strategy = if is_fast_path {
820                        StatementExecutionStrategy::FastPath
821                    } else {
822                        StatementExecutionStrategy::Standard
823                    };
824                    StatementEndedExecutionReason::Success {
825                        result_size: Some(result_size),
826                        rows_returned: Some(num_rows),
827                        execution_strategy: Some(strategy),
828                    }
829                }
830                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
831                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
832            };
833            otel_ctx.attach_as_parent();
834            self.retire_execution(reason, ctx_extra);
835        }
836        // Cancellation may cause us to receive responses for peeks no
837        // longer in `self.pending_peeks`, so we quietly ignore them.
838    }
839
840    /// Clean up a peek's state.
841    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
842        let pending_peek = self.pending_peeks.remove(uuid);
843        if let Some(pending_peek) = &pending_peek {
844            let uuids = self
845                .client_pending_peeks
846                .get_mut(&pending_peek.conn_id)
847                .expect("coord peek state is inconsistent");
848            uuids.remove(uuid);
849            if uuids.is_empty() {
850                self.client_pending_peeks.remove(&pending_peek.conn_id);
851            }
852        }
853        pending_peek
854    }
855
856    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
857    /// client immediately, as opposed to asking the dataflow layer to send along
858    /// the rows after some computation.
859    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
860    where
861        I: IntoRowIterator,
862        I::Iter: Send + Sync + 'static,
863    {
864        let rows = Box::new(rows.into_row_iter());
865        ExecuteResponse::SendingRowsImmediate { rows }
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use mz_expr::func::IsNull;
872    use mz_expr::{MapFilterProject, UnaryFunc};
873    use mz_ore::str::Indent;
874    use mz_repr::explain::text::text_string_at;
875    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
876    use mz_repr::{ColumnType, Datum, ScalarType};
877
878    use super::*;
879
880    #[mz_ore::test]
881    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
882    fn test_fast_path_plan_as_text() {
883        let typ = RelationType::new(vec![ColumnType {
884            scalar_type: ScalarType::String,
885            nullable: false,
886        }]);
887        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
888        let no_lookup = FastPathPlan::PeekExisting(
889            GlobalId::User(8),
890            GlobalId::User(10),
891            None,
892            MapFilterProject::new(4)
893                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
894                .project([1, 4])
895                .into_plan()
896                .expect("invalid plan")
897                .into_nontemporal()
898                .expect("invalid nontemporal"),
899        );
900        let lookup = FastPathPlan::PeekExisting(
901            GlobalId::User(9),
902            GlobalId::User(11),
903            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
904            MapFilterProject::new(3)
905                .filter(Some(
906                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
907                ))
908                .into_plan()
909                .expect("invalid plan")
910                .into_nontemporal()
911                .expect("invalid nontemporal"),
912        );
913
914        let humanizer = DummyHumanizer;
915        let config = ExplainConfig {
916            redacted: false,
917            ..Default::default()
918        };
919        let ctx_gen = || {
920            let indent = Indent::default();
921            let annotations = BTreeMap::new();
922            PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
923        };
924
925        let constant_err_exp = "Error \"division by zero\"\n";
926        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
927        let lookup_exp =
928            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
929
930        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
931        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
932        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
933
934        let mut constant_rows = vec![
935            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
936            (Row::pack(Some(Datum::String("world"))), 2.into()),
937            (Row::pack(Some(Datum::String("star"))), 500.into()),
938        ];
939        let constant_exp1 =
940            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
941        assert_eq!(
942            text_string_at(
943                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
944                ctx_gen
945            ),
946            constant_exp1
947        );
948        constant_rows
949            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
950        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
951        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
952        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
953        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
954        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
955        assert_eq!(
956            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
957            constant_exp2
958        );
959    }
960}