Skip to main content

mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::connection::ConnectionId;
24use mz_cluster_client::ReplicaId;
25use mz_compute_client::controller::PeekNotification;
26use mz_compute_client::protocol::command::PeekTarget;
27use mz_compute_client::protocol::response::PeekResponse;
28use mz_compute_types::ComputeInstanceId;
29use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
30use mz_controller_types::ClusterId;
31use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
32use mz_expr::row::RowCollection;
33use mz_expr::{
34    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
35    RowSetFinishingIncremental, permutation_for_arrangement,
36};
37use mz_ore::cast::CastFrom;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_assert_eq_or_log;
40use mz_ore::str::{StrExt, separated};
41use mz_ore::task;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_persist_client::Schemas;
44use mz_persist_types::codec_impls::UnitSchema;
45use mz_repr::explain::text::DisplayText;
46use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
47use mz_repr::{
48    Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
49    preserves_order,
50};
51use mz_storage_types::sources::SourceData;
52use serde::{Deserialize, Serialize};
53use timely::progress::Antichain;
54use tokio::sync::oneshot;
55use tracing::{Instrument, Span};
56use uuid::Uuid;
57
58use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
59use crate::coord::timestamp_selection::TimestampDetermination;
60use crate::optimize::OptimizerError;
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
63use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
64
65/// A peek is a request to read data from a maintained arrangement.
66#[derive(Debug)]
67pub(crate) struct PendingPeek {
68    /// The connection that initiated the peek.
69    pub(crate) conn_id: ConnectionId,
70    /// The cluster that the peek is being executed on.
71    pub(crate) cluster_id: ClusterId,
72    /// All `GlobalId`s that the peek depend on.
73    pub(crate) depends_on: BTreeSet<GlobalId>,
74    /// Context about the execute that produced this peek,
75    /// needed by the coordinator for retiring it.
76    pub(crate) ctx_extra: ExecuteContextGuard,
77    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
78    pub(crate) is_fast_path: bool,
79}
80
81/// The response from a `Peek`, with row multiplicities represented in unary.
82///
83/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
84/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
85#[derive(Debug)]
86pub enum PeekResponseUnary {
87    Rows(Box<dyn RowIterator + Send + Sync>),
88    Error(String),
89    Canceled,
90    /// A dependency was dropped during execution.
91    ///
92    /// N.B. This is a bit of a workaround for the fact that our Error variant
93    /// is unstructured and right now we specifically care about this error and
94    /// need to render differently based on context.
95    DependencyDropped(DroppedDependency),
96}
97
98/// A dependency that was dropped while a peek or subscribe was in flight.
99///
100/// The `name` fields hold the bare name (e.g. `db.schema.t` or `c`); `Display`
101/// applies SQL identifier quoting to produce `relation "db.schema.t"` or
102/// `cluster "c"` for direct use in error wording.
103#[derive(Clone, Debug)]
104pub enum DroppedDependency {
105    Relation { name: String },
106    Cluster { name: String },
107}
108
109impl fmt::Display for DroppedDependency {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            Self::Relation { name } => write!(f, "relation {}", name.quoted()),
113            Self::Cluster { name } => write!(f, "cluster {}", name.quoted()),
114        }
115    }
116}
117
118impl DroppedDependency {
119    /// User-facing error for a query (peek or subscribe) that could not finish
120    /// because this dependency was dropped mid-flight.
121    pub fn query_terminated_error(&self) -> String {
122        format!("query could not complete because {self} was dropped")
123    }
124
125    /// Convert this dropped dependency into an [`AdapterError::ConcurrentDependencyDrop`].
126    pub fn to_concurrent_dependency_drop(&self) -> AdapterError {
127        let (kind, name) = match self {
128            Self::Relation { name } => ("relation", name.clone()),
129            Self::Cluster { name } => ("cluster", name.clone()),
130        };
131        AdapterError::ConcurrentDependencyDrop {
132            dependency_kind: kind,
133            dependency_id: name,
134        }
135    }
136}
137
138#[derive(Clone, Debug)]
139pub struct PeekDataflowPlan {
140    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
141    pub(crate) id: GlobalId,
142    key: Vec<MirScalarExpr>,
143    permutation: Vec<usize>,
144    thinned_arity: usize,
145}
146
147impl PeekDataflowPlan {
148    pub fn new(
149        desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
150        id: GlobalId,
151        typ: &SqlRelationType,
152    ) -> Self {
153        let arity = typ.arity();
154        let key = typ
155            .default_key()
156            .into_iter()
157            .map(MirScalarExpr::column)
158            .collect::<Vec<_>>();
159        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
160        Self {
161            desc,
162            id,
163            key,
164            permutation,
165            thinned_arity: thinning.len(),
166        }
167    }
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
171pub enum FastPathPlan {
172    /// The view evaluates to a constant result that can be returned.
173    ///
174    /// The [SqlRelationType] is unnecessary for evaluating the constant result but
175    /// may be helpful when printing out an explanation.
176    Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
177    /// The view can be read out of an existing arrangement.
178    /// (coll_id, idx_id, values to look up, mfp to apply)
179    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
180    /// The view can be read directly out of Persist.
181    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
182}
183
184impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
185    fn fmt_text(
186        &self,
187        f: &mut fmt::Formatter<'_>,
188        ctx: &mut PlanRenderingContext<'a, T>,
189    ) -> fmt::Result {
190        if ctx.config.verbose_syntax {
191            self.fmt_verbose_text(f, ctx)
192        } else {
193            self.fmt_default_text(f, ctx)
194        }
195    }
196}
197
198impl FastPathPlan {
199    pub fn fmt_default_text<'a, T>(
200        &self,
201        f: &mut fmt::Formatter<'_>,
202        ctx: &mut PlanRenderingContext<'a, T>,
203    ) -> fmt::Result {
204        let mode = HumanizedExplain::new(ctx.config.redacted);
205
206        match self {
207            FastPathPlan::Constant(rows, _) => {
208                write!(f, "{}→Constant ", ctx.indent)?;
209
210                match rows {
211                    Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
212                    Err(err) => {
213                        if mode.redacted() {
214                            writeln!(f, "(error: █)")?;
215                        } else {
216                            writeln!(f, "(error: {})", err.to_string().quoted(),)?;
217                        }
218                    }
219                }
220            }
221            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
222                let coll = ctx
223                    .humanizer
224                    .humanize_id(*coll_id)
225                    .unwrap_or_else(|| coll_id.to_string());
226                let idx = ctx
227                    .humanizer
228                    .humanize_id(*idx_id)
229                    .unwrap_or_else(|| idx_id.to_string());
230                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
231                ctx.indent.set();
232
233                ctx.indent += 1;
234
235                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
236                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
237
238                if printed {
239                    ctx.indent += 1;
240                }
241                if let Some(literal_constraints) = literal_constraints {
242                    writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
243                    ctx.indent += 1;
244                    let values = separated("; ", mode.seq(literal_constraints, None));
245                    writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
246                } else {
247                    writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
248                }
249
250                ctx.indent.reset();
251            }
252            FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
253                let coll = ctx
254                    .humanizer
255                    .humanize_id(*global_id)
256                    .unwrap_or_else(|| global_id.to_string());
257                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
258                ctx.indent.set();
259
260                ctx.indent += 1;
261
262                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
263                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
264
265                if printed {
266                    ctx.indent += 1;
267                }
268                if let Some(literal_constraint) = literal_constraint {
269                    writeln!(f, "{}→ReadStorage Lookup on {coll}", ctx.indent)?;
270                    ctx.indent += 1;
271                    let value = mode.expr(literal_constraint, None);
272                    writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
273                } else {
274                    writeln!(f, "{}→ReadStorage {coll}", ctx.indent)?;
275                }
276
277                ctx.indent.reset();
278            }
279        }
280
281        Ok(())
282    }
283
284    pub fn fmt_verbose_text<'a, T>(
285        &self,
286        f: &mut fmt::Formatter<'_>,
287        ctx: &mut PlanRenderingContext<'a, T>,
288    ) -> fmt::Result {
289        let redacted = ctx.config.redacted;
290        let mode = HumanizedExplain::new(redacted);
291
292        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
293        // code.
294        match self {
295            FastPathPlan::Constant(Ok(rows), _) => {
296                if !rows.is_empty() {
297                    writeln!(f, "{}Constant", ctx.indent)?;
298                    *ctx.as_mut() += 1;
299                    fmt_text_constant_rows(
300                        f,
301                        rows.iter().map(|(row, diff)| (row, diff)),
302                        ctx.as_mut(),
303                        redacted,
304                    )?;
305                    *ctx.as_mut() -= 1;
306                } else {
307                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
308                }
309                Ok(())
310            }
311            FastPathPlan::Constant(Err(err), _) => {
312                if redacted {
313                    writeln!(f, "{}Error █", ctx.as_mut())
314                } else {
315                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
316                }
317            }
318            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
319                ctx.as_mut().set();
320                let (map, filter, project) = mfp.as_map_filter_project();
321
322                let cols = if !ctx.config.humanized_exprs {
323                    None
324                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
325                    // FIXME: account for thinning and permutation
326                    // See mz_expr::permutation_for_arrangement
327                    // See permute_oneshot_mfp_around_index
328                    let cols = itertools::chain(
329                        cols.iter().cloned(),
330                        std::iter::repeat(String::new()).take(map.len()),
331                    )
332                    .collect();
333                    Some(cols)
334                } else {
335                    None
336                };
337
338                if project.len() != mfp.input_arity + map.len()
339                    || !project.iter().enumerate().all(|(i, o)| i == *o)
340                {
341                    let outputs = mode.seq(&project, cols.as_ref());
342                    let outputs = CompactScalars(outputs);
343                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
344                    *ctx.as_mut() += 1;
345                }
346                if !filter.is_empty() {
347                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
348                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
349                    *ctx.as_mut() += 1;
350                }
351                if !map.is_empty() {
352                    let scalars = mode.seq(&map, cols.as_ref());
353                    let scalars = CompactScalars(scalars);
354                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
355                    *ctx.as_mut() += 1;
356                }
357                MirRelationExpr::fmt_indexed_filter(
358                    f,
359                    ctx,
360                    coll_id,
361                    idx_id,
362                    literal_constraints.clone(),
363                    None,
364                )?;
365                writeln!(f)?;
366                ctx.as_mut().reset();
367                Ok(())
368            }
369            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
370                ctx.as_mut().set();
371                let (map, filter, project) = mfp.as_map_filter_project();
372
373                let cols = if !ctx.config.humanized_exprs {
374                    None
375                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
376                    let cols = itertools::chain(
377                        cols.iter().cloned(),
378                        std::iter::repeat(String::new()).take(map.len()),
379                    )
380                    .collect::<Vec<_>>();
381                    Some(cols)
382                } else {
383                    None
384                };
385
386                if project.len() != mfp.input_arity + map.len()
387                    || !project.iter().enumerate().all(|(i, o)| i == *o)
388                {
389                    let outputs = mode.seq(&project, cols.as_ref());
390                    let outputs = CompactScalars(outputs);
391                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
392                    *ctx.as_mut() += 1;
393                }
394                if !filter.is_empty() {
395                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
396                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
397                    *ctx.as_mut() += 1;
398                }
399                if !map.is_empty() {
400                    let scalars = mode.seq(&map, cols.as_ref());
401                    let scalars = CompactScalars(scalars);
402                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
403                    *ctx.as_mut() += 1;
404                }
405                let human_id = ctx
406                    .humanizer
407                    .humanize_id(*gid)
408                    .unwrap_or_else(|| gid.to_string());
409                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
410                if let Some(literal) = literal_constraint {
411                    let value = mode.expr(literal, None);
412                    writeln!(f, " [value={}]", value)?;
413                } else {
414                    writeln!(f, "")?;
415                }
416                ctx.as_mut().reset();
417                Ok(())
418            }
419        }?;
420        Ok(())
421    }
422}
423
424#[derive(Debug)]
425pub struct PlannedPeek {
426    pub plan: PeekPlan,
427    pub determination: TimestampDetermination,
428    pub conn_id: ConnectionId,
429    /// The result type _after_ reading out of the "source" and applying any
430    /// [MapFilterProject](mz_expr::MapFilterProject), but _before_ applying a
431    /// [RowSetFinishing].
432    ///
433    /// This is _the_ `result_type` as far as compute is concerned and further
434    /// changes through projections happen purely in the adapter.
435    pub intermediate_result_type: SqlRelationType,
436    pub source_arity: usize,
437    pub source_ids: BTreeSet<GlobalId>,
438}
439
440/// Possible ways in which the coordinator could produce the result for a goal view.
441#[derive(Clone, Debug)]
442pub enum PeekPlan {
443    FastPath(FastPathPlan),
444    /// The view must be installed as a dataflow and then read.
445    SlowPath(PeekDataflowPlan),
446}
447
448/// Convert `mfp` to an executable, non-temporal plan.
449/// It should be non-temporal, as OneShot preparation populates `mz_now`.
450///
451/// If the `mfp` can't be converted into a non-temporal plan, this returns an _internal_ error.
452fn mfp_to_safe_plan(
453    mfp: mz_expr::MapFilterProject,
454) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
455    mfp.into_plan()
456        .map_err(OptimizerError::InternalUnsafeMfpPlan)?
457        .into_nontemporal()
458        .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
459}
460
461/// If it can't convert `mfp` into a `SafeMfpPlan`, this returns an _internal_ error.
462fn permute_oneshot_mfp_around_index(
463    mfp: mz_expr::MapFilterProject,
464    key: &[MirScalarExpr],
465) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
466    let input_arity = mfp.input_arity;
467    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
468    let (permute, thinning) = permutation_for_arrangement(key, input_arity);
469    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
470    Ok(safe_mfp)
471}
472
473/// Determine if the dataflow plan can be implemented without an actual dataflow.
474///
475/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
476/// we can avoid building a dataflow (and either just return the results, or peek
477/// out of the arrangement, respectively).
478pub fn create_fast_path_plan(
479    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr>,
480    view_id: GlobalId,
481    finishing: Option<&RowSetFinishing>,
482    persist_fast_path_limit: usize,
483    persist_fast_path_order: bool,
484) -> Result<Option<FastPathPlan>, OptimizerError> {
485    // At this point, `dataflow_plan` contains our best optimized dataflow.
486    // We will check the plan to see if there is a fast path to escape full dataflow construction.
487
488    // We need to restrict ourselves to settings where the inserted transient view is the first thing
489    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
490    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
491    {
492        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
493        if let Some((rows, found_typ)) = mir.as_const() {
494            // In the case of a constant, we can return the result now.
495            let plan = FastPathPlan::Constant(
496                rows.clone(),
497                mz_repr::SqlRelationType::from_repr(found_typ),
498            );
499            return Ok(Some(plan));
500        } else {
501            // If there is a TopK that would be completely covered by the finishing, then jump
502            // through the TopK.
503            if let MirRelationExpr::TopK {
504                input,
505                group_key,
506                order_key,
507                limit,
508                offset,
509                monotonic: _,
510                expected_group_size: _,
511            } = mir
512            {
513                if let Some(finishing) = finishing {
514                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
515                        // The following is roughly `limit >= finishing.limit + finishing.offset`,
516                        // but with Options.
517                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
518                            (None, _) => true,
519                            (Some(..), None) => false,
520                            (Some(topk_limit), Some(finishing_limit)) => {
521                                if let Some(l) = topk_limit.as_literal_int64() {
522                                    i128::cast_from(l)
523                                        >= i128::cast_from(*finishing_limit)
524                                            + i128::cast_from(finishing.offset)
525                                } else {
526                                    false
527                                }
528                            }
529                        };
530                        if finishing_limits_at_least_as_topk {
531                            mir = input;
532                        }
533                    }
534                }
535            }
536            // In the case of a linear operator around an indexed view, we
537            // can skip creating a dataflow and instead pull all the rows in
538            // index and apply the linear operator against them.
539            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
540            match mir {
541                MirRelationExpr::Get {
542                    id: Id::Global(get_id),
543                    typ: repr_typ,
544                    ..
545                } => {
546                    // Just grab any arrangement if an arrangement exists
547                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
548                        if desc.on_id == *get_id {
549                            return Ok(Some(FastPathPlan::PeekExisting(
550                                *get_id,
551                                *index_id,
552                                None,
553                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
554                            )));
555                        }
556                    }
557
558                    // If there is no arrangement, consider peeking the persist shard directly.
559                    // Generally, we consider a persist peek when the query can definitely be satisfied
560                    // by scanning through a small, constant number of Persist key-values.
561                    let safe_mfp = mfp_to_safe_plan(mfp)?;
562                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
563
564                    let persist_fast_path_order_relation_typ = if persist_fast_path_order {
565                        Some(
566                            dataflow_plan
567                                .source_imports
568                                .get(get_id)
569                                .expect("Get's ID is also imported")
570                                .desc
571                                .typ
572                                .clone(),
573                        )
574                    } else {
575                        None
576                    };
577
578                    let literal_constraint =
579                        if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
580                            let mut row = Row::default();
581                            let mut packer = row.packer();
582                            for (idx, col) in relation_typ.column_types.iter().enumerate() {
583                                if !preserves_order(&col.scalar_type) {
584                                    break;
585                                }
586                                let col_expr = MirScalarExpr::column(idx);
587
588                                let Some((literal, _)) = filters
589                                    .iter()
590                                    .filter_map(|f| f.expr_eq_literal(&col_expr))
591                                    .next()
592                                else {
593                                    break;
594                                };
595                                packer.extend_by_row(&literal);
596                            }
597                            if row.is_empty() { None } else { Some(row) }
598                        } else {
599                            None
600                        };
601
602                    let finish_ok = match &finishing {
603                        None => false,
604                        Some(RowSetFinishing {
605                            order_by,
606                            limit,
607                            offset,
608                            ..
609                        }) => {
610                            let order_ok =
611                                if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
612                                    order_by.iter().enumerate().all(|(idx, order)| {
613                                        // Map the ordering column back to the column in the source data.
614                                        // (If it's not one of the input columns, we can't make any guarantees.)
615                                        let column_idx = projection[order.column];
616                                        if column_idx >= safe_mfp.input_arity {
617                                            return false;
618                                        }
619                                        let column_type = &relation_typ.column_types[column_idx];
620                                        let index_ok = idx == column_idx;
621                                        let nulls_ok = !column_type.nullable || order.nulls_last;
622                                        let asc_ok = !order.desc;
623                                        let type_ok = preserves_order(&column_type.scalar_type);
624                                        index_ok && nulls_ok && asc_ok && type_ok
625                                    })
626                                } else {
627                                    order_by.is_empty()
628                                };
629                            let limit_ok = limit.map_or(false, |l| {
630                                usize::cast_from(l) + *offset < persist_fast_path_limit
631                            });
632                            order_ok && limit_ok
633                        }
634                    };
635
636                    let key_constraint = if let Some(literal) = &literal_constraint {
637                        let prefix_len = literal.iter().count();
638                        repr_typ
639                            .keys
640                            .iter()
641                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
642                    } else {
643                        false
644                    };
645
646                    // We can generate a persist peek when:
647                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
648                    // - We can return the first N key values (no filters, small limit, consistent order)
649                    if key_constraint || (filters.is_empty() && finish_ok) {
650                        return Ok(Some(FastPathPlan::PeekPersist(
651                            *get_id,
652                            literal_constraint,
653                            safe_mfp,
654                        )));
655                    }
656                }
657                MirRelationExpr::Join { implementation, .. } => {
658                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
659                        implementation
660                    {
661                        return Ok(Some(FastPathPlan::PeekExisting(
662                            *coll_id,
663                            *idx_id,
664                            Some(vals.clone()),
665                            permute_oneshot_mfp_around_index(mfp, key)?,
666                        )));
667                    }
668                }
669                // nothing can be done for non-trivial expressions.
670                _ => {}
671            }
672        }
673    }
674    Ok(None)
675}
676
677impl FastPathPlan {
678    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
679        match self {
680            FastPathPlan::Constant(..) => UsedIndexes::default(),
681            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
682                if literal_constraints.is_some() {
683                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
684                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
685                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
686                } else {
687                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
688                }
689            }
690            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
691        }
692    }
693}
694
695impl crate::coord::Coordinator {
696    /// Implements a peek plan produced by `create_plan` above.
697    #[mz_ore::instrument(level = "debug")]
698    pub async fn implement_peek_plan(
699        &mut self,
700        ctx_extra: &mut ExecuteContextGuard,
701        plan: PlannedPeek,
702        finishing: RowSetFinishing,
703        compute_instance: ComputeInstanceId,
704        target_replica: Option<ReplicaId>,
705        max_result_size: u64,
706        max_returned_query_size: Option<u64>,
707    ) -> Result<ExecuteResponse, AdapterError> {
708        let PlannedPeek {
709            plan: fast_path,
710            determination,
711            conn_id,
712            intermediate_result_type,
713            source_arity,
714            source_ids,
715        } = plan;
716
717        // If the dataflow optimizes to a constant expression, we can immediately return the result.
718        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
719            let mut rows = match rows {
720                Ok(rows) => rows,
721                Err(e) => return Err(e.into()),
722            };
723            // Consolidate down the results to get correct totals.
724            consolidate(&mut rows);
725
726            let mut results = Vec::new();
727            for (row, count) in rows {
728                if count.is_negative() {
729                    Err(EvalError::InvalidParameterValue(
730                        format!("Negative multiplicity in constant result: {}", count).into(),
731                    ))?
732                };
733                if count.is_positive() {
734                    let count = usize::cast_from(
735                        u64::try_from(count.into_inner())
736                            .expect("known to be positive from check above"),
737                    );
738                    results.push((
739                        row,
740                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
741                    ));
742                }
743            }
744            let row_collection = RowCollection::new(results, &finishing.order_by);
745            let duration_histogram = self.metrics.row_set_finishing_seconds();
746
747            let (ret, reason) = match finishing.finish(
748                row_collection,
749                max_result_size,
750                max_returned_query_size,
751                &duration_histogram,
752            ) {
753                Ok((rows, row_size_bytes)) => {
754                    let result_size = u64::cast_from(row_size_bytes);
755                    let rows_returned = u64::cast_from(rows.count());
756                    (
757                        Ok(Self::send_immediate_rows(rows)),
758                        StatementEndedExecutionReason::Success {
759                            result_size: Some(result_size),
760                            rows_returned: Some(rows_returned),
761                            execution_strategy: Some(StatementExecutionStrategy::Constant),
762                        },
763                    )
764                }
765                Err(error) => (
766                    Err(AdapterError::ResultSize(error.clone())),
767                    StatementEndedExecutionReason::Errored { error },
768                ),
769            };
770            self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
771            return ret;
772        }
773
774        let timestamp = determination.timestamp_context.timestamp_or_default();
775        if let Some(id) = ctx_extra.contents() {
776            self.set_statement_execution_timestamp(id, timestamp)
777        }
778
779        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
780        // In both cases we will want to peek, and the main difference is that we might want to
781        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
782        // differently.
783
784        // Acquire a read hold for the peek target so its `since` cannot advance past
785        // `timestamp` before `compute.peek()` runs. On the slow path we ship the dataflow
786        // first: the implied hold from `create_dataflow` pins the new collection's `since`
787        // at `as_of`, so the subsequent `acquire_read_hold` lands at `as_of <= timestamp`.
788        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy, read_hold) =
789            match fast_path {
790                PeekPlan::FastPath(FastPathPlan::PeekExisting(
791                    _coll_id,
792                    idx_id,
793                    literal_constraints,
794                    map_filter_project,
795                )) => {
796                    let read_hold = self
797                        .controller
798                        .compute
799                        .acquire_read_hold(compute_instance, idx_id)
800                        .map_err(
801                            AdapterError::concurrent_dependency_drop_from_collection_update_error,
802                        )?;
803                    (
804                        (literal_constraints, timestamp, map_filter_project),
805                        None,
806                        true,
807                        PeekTarget::Index { id: idx_id },
808                        StatementExecutionStrategy::FastPath,
809                        read_hold,
810                    )
811                }
812                PeekPlan::FastPath(FastPathPlan::PeekPersist(
813                    coll_id,
814                    literal_constraint,
815                    map_filter_project,
816                )) => {
817                    let peek_command = (
818                        literal_constraint.map(|r| vec![r]),
819                        timestamp,
820                        map_filter_project,
821                    );
822                    let metadata = self
823                        .controller
824                        .storage
825                        .collection_metadata(coll_id)
826                        .expect("storage collection for fast-path peek")
827                        .clone();
828                    let read_hold = self
829                        .controller
830                        .storage_collections
831                        .acquire_read_holds(vec![coll_id])
832                        .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
833                        .into_element();
834                    (
835                        peek_command,
836                        None,
837                        true,
838                        PeekTarget::Persist {
839                            id: coll_id,
840                            metadata,
841                        },
842                        StatementExecutionStrategy::PersistFastPath,
843                        read_hold,
844                    )
845                }
846                PeekPlan::SlowPath(PeekDataflowPlan {
847                    desc: dataflow,
848                    // n.b. this index_id identifies a transient index the
849                    // caller created, so it is guaranteed to be on
850                    // `compute_instance`.
851                    id: index_id,
852                    key: index_key,
853                    permutation: index_permutation,
854                    thinned_arity: index_thinned_arity,
855                }) => {
856                    // The slow-path peek read-hold strategy below acquires a hold for
857                    // `index_id` only. That is sufficient today because slow-path peek
858                    // dataflows have a single export equal to `index_id`. If we ever
859                    // ship multi-output dataflows on this path, the hold acquisition
860                    // needs to be revisited.
861                    let exports: Vec<GlobalId> = dataflow.export_ids().collect();
862                    soft_assert_eq_or_log!(
863                        exports.as_slice(),
864                        &[index_id],
865                        "slow-path peek dataflow must export exactly [index_id]",
866                    );
867                    if exports.as_slice() != [index_id] {
868                        return Err(AdapterError::internal(
869                            "peek error",
870                            format!(
871                                "slow-path peek dataflow exports {exports:?}, expected [{index_id}]",
872                            ),
873                        ));
874                    }
875
876                    // Very important: actually create the dataflow (here, so we can destructure).
877                    self.controller
878                        .compute
879                        .create_dataflow(compute_instance, dataflow, None)
880                        .map_err(
881                            AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
882                        )?;
883
884                    // Acquire a bare hold on the freshly-shipped index. On failure we must
885                    // drop the dataflow ourselves, otherwise it leaks.
886                    let acquire_result = self
887                        .controller
888                        .compute
889                        .acquire_read_hold(compute_instance, index_id)
890                        .map_err(
891                            AdapterError::concurrent_dependency_drop_from_collection_update_error,
892                        );
893                    let read_hold = match acquire_result {
894                        Ok(hold) => hold,
895                        Err(e) => {
896                            self.drop_compute_collections(vec![(compute_instance, index_id)]);
897                            return Err(e);
898                        }
899                    };
900
901                    // Create an identity MFP operator.
902                    let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
903                    map_filter_project.permute_fn(
904                        |c| index_permutation[c],
905                        index_key.len() + index_thinned_arity,
906                    );
907                    let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
908
909                    (
910                        (None, timestamp, map_filter_project),
911                        Some(index_id),
912                        false,
913                        PeekTarget::Index { id: index_id },
914                        StatementExecutionStrategy::Standard,
915                        read_hold,
916                    )
917                }
918                PeekPlan::FastPath(_) => {
919                    unreachable!()
920                }
921            };
922
923        // Endpoints for sending and receiving peek responses.
924        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
925
926        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
927        // small but unlikely chance that it's not unique to completed peeks.
928        let mut uuid = Uuid::new_v4();
929        while self.pending_peeks.contains_key(&uuid) {
930            uuid = Uuid::new_v4();
931        }
932
933        let (literal_constraints, timestamp, map_filter_project) = peek_command;
934
935        // At this stage we don't know column names for the result because we
936        // only know the peek's result type as a bare SqlRelationType.
937        let peek_result_column_names =
938            (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
939        let peek_result_desc =
940            RelationDesc::new(intermediate_result_type, peek_result_column_names);
941
942        let peek_result = self
943            .controller
944            .compute
945            .peek(
946                compute_instance,
947                peek_target,
948                literal_constraints,
949                uuid,
950                timestamp,
951                peek_result_desc,
952                finishing.clone(),
953                map_filter_project,
954                read_hold,
955                target_replica,
956                rows_tx,
957            )
958            .map_err(AdapterError::concurrent_dependency_drop_from_peek_error);
959        if let Err(e) = peek_result {
960            // If we shipped a transient dataflow above, drop it now to avoid leaking it.
961            if let Some(index_id) = drop_dataflow {
962                self.drop_compute_collections(vec![(compute_instance, index_id)]);
963            }
964            return Err(e);
965        }
966
967        // Register the pending peek only after compute.peek() succeeds. If it
968        // fails (e.g. concurrent replica/cluster drop), inserting first would
969        // leak entries in these maps and misattribute statement execution reasons.
970        self.pending_peeks.insert(
971            uuid,
972            PendingPeek {
973                conn_id: conn_id.clone(),
974                cluster_id: compute_instance,
975                depends_on: source_ids,
976                ctx_extra: std::mem::take(ctx_extra),
977                is_fast_path,
978            },
979        );
980        self.client_pending_peeks
981            .entry(conn_id)
982            .or_default()
983            .insert(uuid, compute_instance);
984
985        let duration_histogram = self.metrics.row_set_finishing_seconds();
986
987        // If a dataflow was created, drop it now that the peek is queued. This is
988        // required: `add_collection` installs implied/warmup holds owned by the
989        // controller and only released via `drop_collections`, so without this call
990        // the transient dataflow's `since` would stay pinned at `as_of` forever. The
991        // peek's own read hold keeps the collection alive on the cluster until the
992        // response arrives.
993        if let Some(index_id) = drop_dataflow {
994            self.drop_compute_collections(vec![(compute_instance, index_id)]);
995        }
996
997        let persist_client = self.persist_client.clone();
998        let peek_stash_read_batch_size_bytes =
999            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1000                .get(self.catalog().system_config().dyncfgs());
1001        let peek_stash_read_memory_budget_bytes =
1002            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1003                .get(self.catalog().system_config().dyncfgs());
1004
1005        let peek_response_stream = Self::create_peek_response_stream(
1006            rows_rx,
1007            finishing,
1008            max_result_size,
1009            max_returned_query_size,
1010            duration_histogram,
1011            persist_client,
1012            peek_stash_read_batch_size_bytes,
1013            peek_stash_read_memory_budget_bytes,
1014        );
1015
1016        Ok(crate::ExecuteResponse::SendingRowsStreaming {
1017            rows: Box::pin(peek_response_stream),
1018            instance_id: compute_instance,
1019            strategy,
1020        })
1021    }
1022
1023    /// Creates an async stream that processes peek responses and yields rows.
1024    ///
1025    /// TODO(peek-seq): Move this out of `coord` once we delete the old peek sequencing.
1026    #[mz_ore::instrument(level = "debug")]
1027    pub(crate) fn create_peek_response_stream(
1028        rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
1029        finishing: RowSetFinishing,
1030        max_result_size: u64,
1031        max_returned_query_size: Option<u64>,
1032        duration_histogram: prometheus::Histogram,
1033        mut persist_client: mz_persist_client::PersistClient,
1034        peek_stash_read_batch_size_bytes: usize,
1035        peek_stash_read_memory_budget_bytes: usize,
1036    ) -> impl futures::Stream<Item = PeekResponseUnary> {
1037        async_stream::stream!({
1038            let result = rows_rx.await;
1039
1040            let rows = match result {
1041                Ok(rows) => rows,
1042                Err(e) => {
1043                    yield PeekResponseUnary::Error(e.to_string());
1044                    return;
1045                }
1046            };
1047
1048            match rows {
1049                PeekResponse::Rows(rows) => {
1050                    let rows = RowCollection::merge_sorted(&rows, &finishing.order_by);
1051                    match finishing.finish(
1052                        rows,
1053                        max_result_size,
1054                        max_returned_query_size,
1055                        &duration_histogram,
1056                    ) {
1057                        Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
1058                        Err(e) => yield PeekResponseUnary::Error(e),
1059                    }
1060                }
1061                PeekResponse::Stashed(response) => {
1062                    let response = *response;
1063
1064                    let shard_id = response.shard_id;
1065
1066                    let mut batches = Vec::new();
1067                    for proto_batch in response.batches.into_iter() {
1068                        let batch =
1069                            persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
1070
1071                        batches.push(batch);
1072                    }
1073                    tracing::trace!(?batches, "stashed peek response");
1074
1075                    let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
1076                    let read_schemas: Schemas<SourceData, ()> = Schemas {
1077                        id: None,
1078                        key: Arc::new(response.relation_desc.clone()),
1079                        val: Arc::new(UnitSchema),
1080                    };
1081
1082                    let mut row_cursor = persist_client
1083                        .read_batches_consolidated::<_, _, _, i64>(
1084                            response.shard_id,
1085                            as_of,
1086                            read_schemas,
1087                            batches,
1088                            |_stats| true,
1089                            peek_stash_read_memory_budget_bytes,
1090                        )
1091                        .await
1092                        .expect("invalid usage");
1093
1094                    // NOTE: Using the cursor creates Futures that are not Sync,
1095                    // so we can't drive them on the main Coordinator loop.
1096                    // Spawning a task has the additional benefit that we get to
1097                    // delete batches once we're done.
1098                    //
1099                    // Batch deletion is best-effort, though, and there are
1100                    // multiple known ways in which they can leak, among them:
1101                    //
1102                    // - ProtoBatch is lost in flight
1103                    // - ProtoBatch is lost because when combining PeekResponse
1104                    // from workers a cancellation or error "overrides" other
1105                    // results, meaning we drop them
1106                    // - This task here is not run to completion before it can
1107                    // delete all batches
1108                    //
1109                    // This is semi-ok, because persist needs a reaper of leaked
1110                    // batches already, and so we piggy-back on that, even if it
1111                    // might not exist as of today.
1112                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1113                    mz_ore::task::spawn(|| "read_peek_batches", async move {
1114                        // We always send our inline rows first. Ordering
1115                        // doesn't matter because we can only be in this case
1116                        // when there is no ORDER BY.
1117                        //
1118                        // We _could_ write these out as a Batch, and include it
1119                        // in the batches we read via the Consolidator. If we
1120                        // wanted to get a consistent ordering. That's not
1121                        // needed for correctness! But might be nice for more
1122                        // aesthetic reasons.
1123                        for rows in response.inline_rows {
1124                            let result = tx.send(rows).await;
1125                            if result.is_err() {
1126                                tracing::debug!("receiver went away");
1127                            }
1128                        }
1129
1130                        let mut current_batch = Vec::new();
1131                        let mut current_batch_size: usize = 0;
1132
1133                        'outer: while let Some(rows) = row_cursor.next().await {
1134                            for ((source_data, _val), _ts, diff) in rows {
1135                                let row = source_data
1136                                    .0
1137                                    .expect("we are not sending errors on this code path");
1138
1139                                let diff = usize::try_from(diff)
1140                                    .expect("peek responses cannot have negative diffs");
1141
1142                                if diff > 0 {
1143                                    let diff =
1144                                        NonZeroUsize::new(diff).expect("checked to be non-zero");
1145                                    current_batch_size =
1146                                        current_batch_size.saturating_add(row.byte_len());
1147                                    current_batch.push((row, diff));
1148                                }
1149
1150                                if current_batch_size > peek_stash_read_batch_size_bytes {
1151                                    // We're re-encoding the rows as a RowCollection
1152                                    // here, for which we pay in CPU time. We're in a
1153                                    // slow path already, since we're returning a big
1154                                    // stashed result so this is worth the convenience
1155                                    // of that for now.
1156                                    let result = tx
1157                                        .send(RowCollection::new(
1158                                            current_batch.drain(..).collect_vec(),
1159                                            &[],
1160                                        ))
1161                                        .await;
1162                                    if result.is_err() {
1163                                        tracing::debug!("receiver went away");
1164                                        // Don't return but break so we fall out to the
1165                                        // batch delete logic below.
1166                                        break 'outer;
1167                                    }
1168
1169                                    current_batch_size = 0;
1170                                }
1171                            }
1172                        }
1173
1174                        if current_batch.len() > 0 {
1175                            let result = tx.send(RowCollection::new(current_batch, &[])).await;
1176                            if result.is_err() {
1177                                tracing::debug!("receiver went away");
1178                            }
1179                        }
1180
1181                        let batches = row_cursor.into_lease();
1182                        tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1183                        for batch in batches {
1184                            batch.delete().await;
1185                        }
1186                    });
1187
1188                    assert!(
1189                        finishing.is_streamable(response.relation_desc.arity()),
1190                        "can only get stashed responses when the finishing is streamable"
1191                    );
1192
1193                    tracing::trace!("query result is streamable!");
1194
1195                    assert!(finishing.is_streamable(response.relation_desc.arity()));
1196                    let mut incremental_finishing = RowSetFinishingIncremental::new(
1197                        finishing.offset,
1198                        finishing.limit,
1199                        finishing.project,
1200                        max_returned_query_size,
1201                    );
1202
1203                    let mut got_zero_rows = true;
1204                    while let Some(rows) = rx.recv().await {
1205                        got_zero_rows = false;
1206
1207                        let result_rows = incremental_finishing.finish_incremental(
1208                            rows,
1209                            max_result_size,
1210                            &duration_histogram,
1211                        );
1212
1213                        match result_rows {
1214                            Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1215                            Err(e) => yield PeekResponseUnary::Error(e),
1216                        }
1217                    }
1218
1219                    // Even when there's zero rows, clients still expect an
1220                    // empty PeekResponse.
1221                    if got_zero_rows {
1222                        let row_iter = vec![].into_row_iter();
1223                        yield PeekResponseUnary::Rows(Box::new(row_iter));
1224                    }
1225                }
1226                PeekResponse::Canceled => {
1227                    yield PeekResponseUnary::Canceled;
1228                }
1229                PeekResponse::Error(e) => {
1230                    yield PeekResponseUnary::Error(e);
1231                }
1232            }
1233        })
1234    }
1235
1236    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
1237    #[mz_ore::instrument(level = "debug")]
1238    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1239        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1240            self.metrics
1241                .canceled_peeks
1242                .inc_by(u64::cast_from(uuids.len()));
1243
1244            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1245            for (uuid, compute_instance) in &uuids {
1246                inverse.entry(*compute_instance).or_default().insert(*uuid);
1247            }
1248            for (compute_instance, uuids) in inverse {
1249                // It's possible that this compute instance no longer exists because it was dropped
1250                // while the peek was in progress. In this case we ignore the error and move on
1251                // because the dataflow no longer exists.
1252                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
1253                for uuid in uuids {
1254                    let _ = self.controller.compute.cancel_peek(
1255                        compute_instance,
1256                        uuid,
1257                        PeekResponse::Canceled,
1258                    );
1259                }
1260            }
1261
1262            let peeks = uuids
1263                .iter()
1264                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1265                .collect::<Vec<_>>();
1266            for peek in peeks {
1267                self.retire_execution(
1268                    StatementEndedExecutionReason::Canceled,
1269                    peek.ctx_extra.defuse(),
1270                );
1271            }
1272        }
1273    }
1274
1275    /// Handle a peek notification and retire the corresponding execution. Does nothing for
1276    /// already-removed peeks.
1277    pub(crate) fn handle_peek_notification(
1278        &mut self,
1279        uuid: Uuid,
1280        notification: PeekNotification,
1281        otel_ctx: OpenTelemetryContext,
1282    ) {
1283        // We expect exactly one peek response, which we forward. Then we clean up the
1284        // peek's state in the coordinator.
1285        if let Some(PendingPeek {
1286            conn_id: _,
1287            cluster_id: _,
1288            depends_on: _,
1289            ctx_extra,
1290            is_fast_path,
1291        }) = self.remove_pending_peek(&uuid)
1292        {
1293            let reason = match notification {
1294                PeekNotification::Success {
1295                    rows: num_rows,
1296                    result_size,
1297                } => {
1298                    let strategy = if is_fast_path {
1299                        StatementExecutionStrategy::FastPath
1300                    } else {
1301                        StatementExecutionStrategy::Standard
1302                    };
1303                    StatementEndedExecutionReason::Success {
1304                        result_size: Some(result_size),
1305                        rows_returned: Some(num_rows),
1306                        execution_strategy: Some(strategy),
1307                    }
1308                }
1309                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1310                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1311            };
1312            otel_ctx.attach_as_parent();
1313            self.retire_execution(reason, ctx_extra.defuse());
1314        }
1315        // Cancellation may cause us to receive responses for peeks no
1316        // longer in `self.pending_peeks`, so we quietly ignore them.
1317    }
1318
1319    /// Clean up a peek's state.
1320    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1321        let pending_peek = self.pending_peeks.remove(uuid);
1322        if let Some(pending_peek) = &pending_peek {
1323            let uuids = self
1324                .client_pending_peeks
1325                .get_mut(&pending_peek.conn_id)
1326                .expect("coord peek state is inconsistent");
1327            uuids.remove(uuid);
1328            if uuids.is_empty() {
1329                self.client_pending_peeks.remove(&pending_peek.conn_id);
1330            }
1331        }
1332        pending_peek
1333    }
1334
1335    /// Implements a slow-path peek by creating a transient dataflow.
1336    /// This is called from the command handler for ExecuteSlowPathPeek.
1337    ///
1338    /// (For now, this method simply delegates to implement_peek_plan by constructing
1339    /// the necessary PlannedPeek structure.)
1340    pub(crate) async fn implement_slow_path_peek(
1341        &mut self,
1342        dataflow_plan: PeekDataflowPlan,
1343        determination: TimestampDetermination,
1344        finishing: RowSetFinishing,
1345        compute_instance: ComputeInstanceId,
1346        target_replica: Option<ReplicaId>,
1347        intermediate_result_type: SqlRelationType,
1348        source_ids: BTreeSet<GlobalId>,
1349        conn_id: ConnectionId,
1350        max_result_size: u64,
1351        max_query_result_size: Option<u64>,
1352        watch_set: Option<WatchSetCreation>,
1353    ) -> Result<ExecuteResponse, AdapterError> {
1354        // Install watch sets for statement lifecycle logging if enabled.
1355        // This must happen _before_ creating ExecuteContextExtra, so that if it fails,
1356        // we don't have an ExecuteContextExtra that needs to be retired (the frontend
1357        // will handle logging for the error case).
1358        let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1359        if let Some(ws) = watch_set {
1360            self.install_peek_watch_sets(conn_id.clone(), ws)
1361                .map_err(|e| {
1362                    AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e)
1363                })?;
1364        }
1365
1366        let source_arity = intermediate_result_type.arity();
1367
1368        let planned_peek = PlannedPeek {
1369            plan: PeekPlan::SlowPath(dataflow_plan),
1370            determination,
1371            conn_id,
1372            intermediate_result_type,
1373            source_arity,
1374            source_ids,
1375        };
1376
1377        // Call the old peek sequencing's implement_peek_plan for now.
1378        // TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the
1379        // relevant parts of the old `implement_peek_plan` into this method, and remove the old
1380        // `implement_peek_plan`.
1381        self.implement_peek_plan(
1382            &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1383            planned_peek,
1384            finishing,
1385            compute_instance,
1386            target_replica,
1387            max_result_size,
1388            max_query_result_size,
1389        )
1390        .await
1391    }
1392
1393    /// Implements a `COPY TO` command by installing peek watch sets,
1394    /// shipping the dataflow, and spawning a background task to wait for completion.
1395    /// This is called from the command handler for ExecuteCopyTo.
1396    ///
1397    /// (The S3 preflight check must be completed successfully via the
1398    /// `CopyToPreflight` command _before_ calling this method. The preflight is
1399    /// handled separately to avoid blocking the coordinator's main task with
1400    /// slow S3 network operations.)
1401    ///
1402    /// This method does NOT block waiting for completion. Instead, it spawns a background task that
1403    /// will send the response through the provided tx channel when the COPY TO completes.
1404    /// All errors (setup or execution) are sent through tx.
1405    pub(crate) async fn implement_copy_to(
1406        &mut self,
1407        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1408        compute_instance: ComputeInstanceId,
1409        target_replica: Option<ReplicaId>,
1410        source_ids: BTreeSet<GlobalId>,
1411        conn_id: ConnectionId,
1412        watch_set: Option<WatchSetCreation>,
1413        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1414    ) {
1415        // Helper to send error and return early
1416        let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1417                        e: AdapterError| {
1418            let _ = tx.send(Err(e));
1419        };
1420
1421        // Install watch sets for statement lifecycle logging if enabled.
1422        // If this fails, we just send the error back. The frontend will handle logging
1423        // for the error case (no ExecuteContextExtra is created here).
1424        if let Some(ws) = watch_set {
1425            if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1426                let err = AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e);
1427                send_err(tx, err);
1428                return;
1429            }
1430        }
1431
1432        // Note: We don't create an ExecuteContextExtra here because the frontend handles
1433        // all statement logging for COPY TO operations.
1434
1435        let sink_id = df_desc.sink_id();
1436
1437        // Create and register ActiveCopyTo.
1438        // Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
1439        // This is different from the command's tx which sends the response to the client
1440        let (sink_tx, sink_rx) = oneshot::channel();
1441        let active_copy_to = ActiveCopyTo {
1442            conn_id: conn_id.clone(),
1443            tx: sink_tx,
1444            cluster_id: compute_instance,
1445            depends_on: source_ids,
1446        };
1447
1448        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1449        drop(
1450            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1451                .await,
1452        );
1453
1454        // Try to ship the dataflow. We handle errors gracefully because dependencies might have
1455        // disappeared during sequencing.
1456        if let Err(e) = self
1457            .try_ship_dataflow(df_desc, compute_instance, target_replica)
1458            .await
1459            .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1460        {
1461            // Clean up the active compute sink that was added above, since the dataflow was never
1462            // created. If we don't do this, the sink_id remains in drop_sinks but no collection
1463            // exists in the compute controller, causing a panic when the connection terminates.
1464            self.remove_active_compute_sink(sink_id).await;
1465            send_err(tx, e);
1466            return;
1467        }
1468
1469        // Spawn background task to wait for completion
1470        // We must NOT await sink_rx here directly, as that would block the coordinator's main task
1471        // from processing the completion message. Instead, we spawn a background task that will
1472        // send the result through tx when the COPY TO completes.
1473        let span = Span::current();
1474        task::spawn(
1475            || "copy to completion",
1476            async move {
1477                let res = sink_rx.await;
1478                let result = match res {
1479                    Ok(res) => res,
1480                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1481                };
1482
1483                let _ = tx.send(result);
1484            }
1485            .instrument(span),
1486        );
1487    }
1488
1489    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
1490    /// client immediately, as opposed to asking the dataflow layer to send along
1491    /// the rows after some computation.
1492    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1493    where
1494        I: IntoRowIterator,
1495        I::Iter: Send + Sync + 'static,
1496    {
1497        let rows = Box::new(rows.into_row_iter());
1498        ExecuteResponse::SendingRowsImmediate { rows }
1499    }
1500}
1501
1502#[cfg(test)]
1503mod tests {
1504    use mz_expr::func::IsNull;
1505    use mz_expr::{MapFilterProject, UnaryFunc};
1506    use mz_ore::str::Indent;
1507    use mz_repr::explain::text::text_string_at;
1508    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1509    use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1510
1511    use super::*;
1512
1513    #[mz_ore::test]
1514    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1515    fn test_fast_path_plan_as_text() {
1516        let typ = SqlRelationType::new(vec![SqlColumnType {
1517            scalar_type: SqlScalarType::String,
1518            nullable: false,
1519        }]);
1520        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1521        let no_lookup = FastPathPlan::PeekExisting(
1522            GlobalId::User(8),
1523            GlobalId::User(10),
1524            None,
1525            MapFilterProject::new(4)
1526                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1527                .project([1, 4])
1528                .into_plan()
1529                .expect("invalid plan")
1530                .into_nontemporal()
1531                .expect("invalid nontemporal"),
1532        );
1533        let lookup = FastPathPlan::PeekExisting(
1534            GlobalId::User(9),
1535            GlobalId::User(11),
1536            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1537            MapFilterProject::new(3)
1538                .filter(Some(
1539                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1540                ))
1541                .into_plan()
1542                .expect("invalid plan")
1543                .into_nontemporal()
1544                .expect("invalid nontemporal"),
1545        );
1546
1547        let humanizer = DummyHumanizer;
1548        let config = ExplainConfig {
1549            redacted: false,
1550            verbose_syntax: true,
1551            ..Default::default()
1552        };
1553        let ctx_gen = || {
1554            let indent = Indent::default();
1555            let annotations = BTreeMap::new();
1556            PlanRenderingContext::<FastPathPlan>::new(
1557                indent,
1558                &humanizer,
1559                annotations,
1560                &config,
1561                BTreeSet::default(),
1562            )
1563        };
1564
1565        let constant_err_exp = "Error \"division by zero\"\n";
1566        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1567        let lookup_exp =
1568            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1569
1570        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1571        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1572        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1573
1574        let mut constant_rows = vec![
1575            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1576            (Row::pack(Some(Datum::String("world"))), 2.into()),
1577            (Row::pack(Some(Datum::String("star"))), 500.into()),
1578        ];
1579        let constant_exp1 =
1580            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
1581        assert_eq!(
1582            text_string_at(
1583                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1584                ctx_gen
1585            ),
1586            constant_exp1
1587        );
1588        constant_rows
1589            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1590        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
1591        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
1592        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
1593        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
1594        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
1595        assert_eq!(
1596            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1597            constant_exp2
1598        );
1599    }
1600}