Skip to main content

mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36    RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::task;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_persist_client::Schemas;
43use mz_persist_types::codec_impls::UnitSchema;
44use mz_repr::explain::text::DisplayText;
45use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
46use mz_repr::{
47    Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
48    preserves_order,
49};
50use mz_storage_types::sources::SourceData;
51use serde::{Deserialize, Serialize};
52use timely::progress::{Antichain, Timestamp};
53use tokio::sync::oneshot;
54use tracing::{Instrument, Span};
55use uuid::Uuid;
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
58use crate::coord::timestamp_selection::TimestampDetermination;
59use crate::optimize::OptimizerError;
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
63
64/// A peek is a request to read data from a maintained arrangement.
65#[derive(Debug)]
66pub(crate) struct PendingPeek {
67    /// The connection that initiated the peek.
68    pub(crate) conn_id: ConnectionId,
69    /// The cluster that the peek is being executed on.
70    pub(crate) cluster_id: ClusterId,
71    /// All `GlobalId`s that the peek depend on.
72    pub(crate) depends_on: BTreeSet<GlobalId>,
73    /// Context about the execute that produced this peek,
74    /// needed by the coordinator for retiring it.
75    pub(crate) ctx_extra: ExecuteContextGuard,
76    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
77    pub(crate) is_fast_path: bool,
78}
79
80/// The response from a `Peek`, with row multiplicities represented in unary.
81///
82/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
83/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
84#[derive(Debug)]
85pub enum PeekResponseUnary {
86    Rows(Box<dyn RowIterator + Send + Sync>),
87    Error(String),
88    Canceled,
89}
90
91#[derive(Clone, Debug)]
92pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
93    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
94    pub(crate) id: GlobalId,
95    key: Vec<MirScalarExpr>,
96    permutation: Vec<usize>,
97    thinned_arity: usize,
98}
99
100impl<T> PeekDataflowPlan<T> {
101    pub fn new(
102        desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
103        id: GlobalId,
104        typ: &SqlRelationType,
105    ) -> Self {
106        let arity = typ.arity();
107        let key = typ
108            .default_key()
109            .into_iter()
110            .map(MirScalarExpr::column)
111            .collect::<Vec<_>>();
112        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
113        Self {
114            desc,
115            id,
116            key,
117            permutation,
118            thinned_arity: thinning.len(),
119        }
120    }
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
124pub enum FastPathPlan {
125    /// The view evaluates to a constant result that can be returned.
126    ///
127    /// The [SqlRelationType] is unnecessary for evaluating the constant result but
128    /// may be helpful when printing out an explanation.
129    Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
130    /// The view can be read out of an existing arrangement.
131    /// (coll_id, idx_id, values to look up, mfp to apply)
132    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
133    /// The view can be read directly out of Persist.
134    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
135}
136
137impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
138    fn fmt_text(
139        &self,
140        f: &mut fmt::Formatter<'_>,
141        ctx: &mut PlanRenderingContext<'a, T>,
142    ) -> fmt::Result {
143        if ctx.config.verbose_syntax {
144            self.fmt_verbose_text(f, ctx)
145        } else {
146            self.fmt_default_text(f, ctx)
147        }
148    }
149}
150
151impl FastPathPlan {
152    pub fn fmt_default_text<'a, T>(
153        &self,
154        f: &mut fmt::Formatter<'_>,
155        ctx: &mut PlanRenderingContext<'a, T>,
156    ) -> fmt::Result {
157        let mode = HumanizedExplain::new(ctx.config.redacted);
158
159        match self {
160            FastPathPlan::Constant(rows, _) => {
161                write!(f, "{}→Constant ", ctx.indent)?;
162
163                match rows {
164                    Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
165                    Err(err) => {
166                        if mode.redacted() {
167                            writeln!(f, "(error: █)")?;
168                        } else {
169                            writeln!(f, "(error: {})", err.to_string().quoted(),)?;
170                        }
171                    }
172                }
173            }
174            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
175                let coll = ctx
176                    .humanizer
177                    .humanize_id(*coll_id)
178                    .unwrap_or_else(|| coll_id.to_string());
179                let idx = ctx
180                    .humanizer
181                    .humanize_id(*idx_id)
182                    .unwrap_or_else(|| idx_id.to_string());
183                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
184                ctx.indent.set();
185
186                ctx.indent += 1;
187
188                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
189                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
190
191                if printed {
192                    ctx.indent += 1;
193                }
194                if let Some(literal_constraints) = literal_constraints {
195                    writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
196                    ctx.indent += 1;
197                    let values = separated("; ", mode.seq(literal_constraints, None));
198                    writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
199                } else {
200                    writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
201                }
202
203                ctx.indent.reset();
204            }
205            FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
206                let coll = ctx
207                    .humanizer
208                    .humanize_id(*global_id)
209                    .unwrap_or_else(|| global_id.to_string());
210                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
211                ctx.indent.set();
212
213                ctx.indent += 1;
214
215                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
216                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
217
218                if printed {
219                    ctx.indent += 1;
220                }
221                if let Some(literal_constraint) = literal_constraint {
222                    writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
223                    ctx.indent += 1;
224                    let value = mode.expr(literal_constraint, None);
225                    writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
226                } else {
227                    writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
228                }
229
230                ctx.indent.reset();
231            }
232        }
233
234        Ok(())
235    }
236
237    pub fn fmt_verbose_text<'a, T>(
238        &self,
239        f: &mut fmt::Formatter<'_>,
240        ctx: &mut PlanRenderingContext<'a, T>,
241    ) -> fmt::Result {
242        let redacted = ctx.config.redacted;
243        let mode = HumanizedExplain::new(redacted);
244
245        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
246        // code.
247        match self {
248            FastPathPlan::Constant(Ok(rows), _) => {
249                if !rows.is_empty() {
250                    writeln!(f, "{}Constant", ctx.indent)?;
251                    *ctx.as_mut() += 1;
252                    fmt_text_constant_rows(
253                        f,
254                        rows.iter().map(|(row, diff)| (row, diff)),
255                        ctx.as_mut(),
256                        redacted,
257                    )?;
258                    *ctx.as_mut() -= 1;
259                } else {
260                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
261                }
262                Ok(())
263            }
264            FastPathPlan::Constant(Err(err), _) => {
265                if redacted {
266                    writeln!(f, "{}Error █", ctx.as_mut())
267                } else {
268                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
269                }
270            }
271            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
272                ctx.as_mut().set();
273                let (map, filter, project) = mfp.as_map_filter_project();
274
275                let cols = if !ctx.config.humanized_exprs {
276                    None
277                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
278                    // FIXME: account for thinning and permutation
279                    // See mz_expr::permutation_for_arrangement
280                    // See permute_oneshot_mfp_around_index
281                    let cols = itertools::chain(
282                        cols.iter().cloned(),
283                        std::iter::repeat(String::new()).take(map.len()),
284                    )
285                    .collect();
286                    Some(cols)
287                } else {
288                    None
289                };
290
291                if project.len() != mfp.input_arity + map.len()
292                    || !project.iter().enumerate().all(|(i, o)| i == *o)
293                {
294                    let outputs = mode.seq(&project, cols.as_ref());
295                    let outputs = CompactScalars(outputs);
296                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
297                    *ctx.as_mut() += 1;
298                }
299                if !filter.is_empty() {
300                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
301                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
302                    *ctx.as_mut() += 1;
303                }
304                if !map.is_empty() {
305                    let scalars = mode.seq(&map, cols.as_ref());
306                    let scalars = CompactScalars(scalars);
307                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
308                    *ctx.as_mut() += 1;
309                }
310                MirRelationExpr::fmt_indexed_filter(
311                    f,
312                    ctx,
313                    coll_id,
314                    idx_id,
315                    literal_constraints.clone(),
316                    None,
317                )?;
318                writeln!(f)?;
319                ctx.as_mut().reset();
320                Ok(())
321            }
322            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
323                ctx.as_mut().set();
324                let (map, filter, project) = mfp.as_map_filter_project();
325
326                let cols = if !ctx.config.humanized_exprs {
327                    None
328                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
329                    let cols = itertools::chain(
330                        cols.iter().cloned(),
331                        std::iter::repeat(String::new()).take(map.len()),
332                    )
333                    .collect::<Vec<_>>();
334                    Some(cols)
335                } else {
336                    None
337                };
338
339                if project.len() != mfp.input_arity + map.len()
340                    || !project.iter().enumerate().all(|(i, o)| i == *o)
341                {
342                    let outputs = mode.seq(&project, cols.as_ref());
343                    let outputs = CompactScalars(outputs);
344                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
345                    *ctx.as_mut() += 1;
346                }
347                if !filter.is_empty() {
348                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
349                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
350                    *ctx.as_mut() += 1;
351                }
352                if !map.is_empty() {
353                    let scalars = mode.seq(&map, cols.as_ref());
354                    let scalars = CompactScalars(scalars);
355                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
356                    *ctx.as_mut() += 1;
357                }
358                let human_id = ctx
359                    .humanizer
360                    .humanize_id(*gid)
361                    .unwrap_or_else(|| gid.to_string());
362                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
363                if let Some(literal) = literal_constraint {
364                    let value = mode.expr(literal, None);
365                    writeln!(f, " [value={}]", value)?;
366                } else {
367                    writeln!(f, "")?;
368                }
369                ctx.as_mut().reset();
370                Ok(())
371            }
372        }?;
373        Ok(())
374    }
375}
376
377#[derive(Debug)]
378pub struct PlannedPeek {
379    pub plan: PeekPlan,
380    pub determination: TimestampDetermination<mz_repr::Timestamp>,
381    pub conn_id: ConnectionId,
382    /// The result type _after_ reading out of the "source" and applying any
383    /// [MapFilterProject](mz_expr::MapFilterProject), but _before_ applying a
384    /// [RowSetFinishing].
385    ///
386    /// This is _the_ `result_type` as far as compute is concerned and further
387    /// changes through projections happen purely in the adapter.
388    pub intermediate_result_type: SqlRelationType,
389    pub source_arity: usize,
390    pub source_ids: BTreeSet<GlobalId>,
391}
392
393/// Possible ways in which the coordinator could produce the result for a goal view.
394#[derive(Clone, Debug)]
395pub enum PeekPlan<T = mz_repr::Timestamp> {
396    FastPath(FastPathPlan),
397    /// The view must be installed as a dataflow and then read.
398    SlowPath(PeekDataflowPlan<T>),
399}
400
401/// Convert `mfp` to an executable, non-temporal plan.
402/// It should be non-temporal, as OneShot preparation populates `mz_now`.
403///
404/// If the `mfp` can't be converted into a non-temporal plan, this returns an _internal_ error.
405fn mfp_to_safe_plan(
406    mfp: mz_expr::MapFilterProject,
407) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
408    mfp.into_plan()
409        .map_err(OptimizerError::InternalUnsafeMfpPlan)?
410        .into_nontemporal()
411        .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
412}
413
414/// If it can't convert `mfp` into a `SafeMfpPlan`, this returns an _internal_ error.
415fn permute_oneshot_mfp_around_index(
416    mfp: mz_expr::MapFilterProject,
417    key: &[MirScalarExpr],
418) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
419    let input_arity = mfp.input_arity;
420    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
421    let (permute, thinning) = permutation_for_arrangement(key, input_arity);
422    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
423    Ok(safe_mfp)
424}
425
426/// Determine if the dataflow plan can be implemented without an actual dataflow.
427///
428/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
429/// we can avoid building a dataflow (and either just return the results, or peek
430/// out of the arrangement, respectively).
431pub fn create_fast_path_plan<T: Timestamp>(
432    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
433    view_id: GlobalId,
434    finishing: Option<&RowSetFinishing>,
435    persist_fast_path_limit: usize,
436    persist_fast_path_order: bool,
437) -> Result<Option<FastPathPlan>, OptimizerError> {
438    // At this point, `dataflow_plan` contains our best optimized dataflow.
439    // We will check the plan to see if there is a fast path to escape full dataflow construction.
440
441    // We need to restrict ourselves to settings where the inserted transient view is the first thing
442    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
443    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
444    {
445        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
446        if let Some((rows, found_typ)) = mir.as_const() {
447            // In the case of a constant, we can return the result now.
448            let plan = FastPathPlan::Constant(
449                rows.clone(),
450                mz_repr::SqlRelationType::from_repr(found_typ),
451            );
452            return Ok(Some(plan));
453        } else {
454            // If there is a TopK that would be completely covered by the finishing, then jump
455            // through the TopK.
456            if let MirRelationExpr::TopK {
457                input,
458                group_key,
459                order_key,
460                limit,
461                offset,
462                monotonic: _,
463                expected_group_size: _,
464            } = mir
465            {
466                if let Some(finishing) = finishing {
467                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
468                        // The following is roughly `limit >= finishing.limit`, but with Options.
469                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
470                            (None, _) => true,
471                            (Some(..), None) => false,
472                            (Some(topk_limit), Some(finishing_limit)) => {
473                                if let Some(l) = topk_limit.as_literal_int64() {
474                                    l >= *finishing_limit
475                                } else {
476                                    false
477                                }
478                            }
479                        };
480                        if finishing_limits_at_least_as_topk {
481                            mir = input;
482                        }
483                    }
484                }
485            }
486            // In the case of a linear operator around an indexed view, we
487            // can skip creating a dataflow and instead pull all the rows in
488            // index and apply the linear operator against them.
489            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
490            match mir {
491                MirRelationExpr::Get {
492                    id: Id::Global(get_id),
493                    typ: repr_typ,
494                    ..
495                } => {
496                    // Just grab any arrangement if an arrangement exists
497                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
498                        if desc.on_id == *get_id {
499                            return Ok(Some(FastPathPlan::PeekExisting(
500                                *get_id,
501                                *index_id,
502                                None,
503                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
504                            )));
505                        }
506                    }
507
508                    // If there is no arrangement, consider peeking the persist shard directly.
509                    // Generally, we consider a persist peek when the query can definitely be satisfied
510                    // by scanning through a small, constant number of Persist key-values.
511                    let safe_mfp = mfp_to_safe_plan(mfp)?;
512                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
513
514                    let persist_fast_path_order_relation_typ = if persist_fast_path_order {
515                        Some(
516                            dataflow_plan
517                                .source_imports
518                                .get(get_id)
519                                .expect("Get's ID is also imported")
520                                .desc
521                                .typ
522                                .clone(),
523                        )
524                    } else {
525                        None
526                    };
527
528                    let literal_constraint =
529                        if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
530                            let mut row = Row::default();
531                            let mut packer = row.packer();
532                            for (idx, col) in relation_typ.column_types.iter().enumerate() {
533                                if !preserves_order(&col.scalar_type) {
534                                    break;
535                                }
536                                let col_expr = MirScalarExpr::column(idx);
537
538                                let Some((literal, _)) = filters
539                                    .iter()
540                                    .filter_map(|f| f.expr_eq_literal(&col_expr))
541                                    .next()
542                                else {
543                                    break;
544                                };
545                                packer.extend_by_row(&literal);
546                            }
547                            if row.is_empty() { None } else { Some(row) }
548                        } else {
549                            None
550                        };
551
552                    let finish_ok = match &finishing {
553                        None => false,
554                        Some(RowSetFinishing {
555                            order_by,
556                            limit,
557                            offset,
558                            ..
559                        }) => {
560                            let order_ok =
561                                if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
562                                    order_by.iter().enumerate().all(|(idx, order)| {
563                                        // Map the ordering column back to the column in the source data.
564                                        // (If it's not one of the input columns, we can't make any guarantees.)
565                                        let column_idx = projection[order.column];
566                                        if column_idx >= safe_mfp.input_arity {
567                                            return false;
568                                        }
569                                        let column_type = &relation_typ.column_types[column_idx];
570                                        let index_ok = idx == column_idx;
571                                        let nulls_ok = !column_type.nullable || order.nulls_last;
572                                        let asc_ok = !order.desc;
573                                        let type_ok = preserves_order(&column_type.scalar_type);
574                                        index_ok && nulls_ok && asc_ok && type_ok
575                                    })
576                                } else {
577                                    order_by.is_empty()
578                                };
579                            let limit_ok = limit.map_or(false, |l| {
580                                usize::cast_from(l) + *offset < persist_fast_path_limit
581                            });
582                            order_ok && limit_ok
583                        }
584                    };
585
586                    let key_constraint = if let Some(literal) = &literal_constraint {
587                        let prefix_len = literal.iter().count();
588                        repr_typ
589                            .keys
590                            .iter()
591                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
592                    } else {
593                        false
594                    };
595
596                    // We can generate a persist peek when:
597                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
598                    // - We can return the first N key values (no filters, small limit, consistent order)
599                    if key_constraint || (filters.is_empty() && finish_ok) {
600                        return Ok(Some(FastPathPlan::PeekPersist(
601                            *get_id,
602                            literal_constraint,
603                            safe_mfp,
604                        )));
605                    }
606                }
607                MirRelationExpr::Join { implementation, .. } => {
608                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
609                        implementation
610                    {
611                        return Ok(Some(FastPathPlan::PeekExisting(
612                            *coll_id,
613                            *idx_id,
614                            Some(vals.clone()),
615                            permute_oneshot_mfp_around_index(mfp, key)?,
616                        )));
617                    }
618                }
619                // nothing can be done for non-trivial expressions.
620                _ => {}
621            }
622        }
623    }
624    Ok(None)
625}
626
627impl FastPathPlan {
628    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
629        match self {
630            FastPathPlan::Constant(..) => UsedIndexes::default(),
631            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
632                if literal_constraints.is_some() {
633                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
634                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
635                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
636                } else {
637                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
638                }
639            }
640            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
641        }
642    }
643}
644
645impl crate::coord::Coordinator {
646    /// Implements a peek plan produced by `create_plan` above.
647    #[mz_ore::instrument(level = "debug")]
648    pub async fn implement_peek_plan(
649        &mut self,
650        ctx_extra: &mut ExecuteContextGuard,
651        plan: PlannedPeek,
652        finishing: RowSetFinishing,
653        compute_instance: ComputeInstanceId,
654        target_replica: Option<ReplicaId>,
655        max_result_size: u64,
656        max_returned_query_size: Option<u64>,
657    ) -> Result<ExecuteResponse, AdapterError> {
658        let PlannedPeek {
659            plan: fast_path,
660            determination,
661            conn_id,
662            intermediate_result_type,
663            source_arity,
664            source_ids,
665        } = plan;
666
667        // If the dataflow optimizes to a constant expression, we can immediately return the result.
668        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
669            let mut rows = match rows {
670                Ok(rows) => rows,
671                Err(e) => return Err(e.into()),
672            };
673            // Consolidate down the results to get correct totals.
674            consolidate(&mut rows);
675
676            let mut results = Vec::new();
677            for (row, count) in rows {
678                if count.is_negative() {
679                    Err(EvalError::InvalidParameterValue(
680                        format!("Negative multiplicity in constant result: {}", count).into(),
681                    ))?
682                };
683                if count.is_positive() {
684                    let count = usize::cast_from(
685                        u64::try_from(count.into_inner())
686                            .expect("known to be positive from check above"),
687                    );
688                    results.push((
689                        row,
690                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
691                    ));
692                }
693            }
694            let row_collection = RowCollection::new(results, &finishing.order_by);
695            let duration_histogram = self.metrics.row_set_finishing_seconds();
696
697            let (ret, reason) = match finishing.finish(
698                row_collection,
699                max_result_size,
700                max_returned_query_size,
701                &duration_histogram,
702            ) {
703                Ok((rows, row_size_bytes)) => {
704                    let result_size = u64::cast_from(row_size_bytes);
705                    let rows_returned = u64::cast_from(rows.count());
706                    (
707                        Ok(Self::send_immediate_rows(rows)),
708                        StatementEndedExecutionReason::Success {
709                            result_size: Some(result_size),
710                            rows_returned: Some(rows_returned),
711                            execution_strategy: Some(StatementExecutionStrategy::Constant),
712                        },
713                    )
714                }
715                Err(error) => (
716                    Err(AdapterError::ResultSize(error.clone())),
717                    StatementEndedExecutionReason::Errored { error },
718                ),
719            };
720            self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
721            return ret;
722        }
723
724        let timestamp = determination.timestamp_context.timestamp_or_default();
725        if let Some(id) = ctx_extra.contents() {
726            self.set_statement_execution_timestamp(id, timestamp)
727        }
728
729        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
730        // In both cases we will want to peek, and the main difference is that we might want to
731        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
732        // differently.
733
734        // If we must build the view, ship the dataflow.
735        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
736            PeekPlan::FastPath(FastPathPlan::PeekExisting(
737                _coll_id,
738                idx_id,
739                literal_constraints,
740                map_filter_project,
741            )) => (
742                (literal_constraints, timestamp, map_filter_project),
743                None,
744                true,
745                PeekTarget::Index { id: idx_id },
746                StatementExecutionStrategy::FastPath,
747            ),
748            PeekPlan::FastPath(FastPathPlan::PeekPersist(
749                coll_id,
750                literal_constraint,
751                map_filter_project,
752            )) => {
753                let peek_command = (
754                    literal_constraint.map(|r| vec![r]),
755                    timestamp,
756                    map_filter_project,
757                );
758                let metadata = self
759                    .controller
760                    .storage
761                    .collection_metadata(coll_id)
762                    .expect("storage collection for fast-path peek")
763                    .clone();
764                (
765                    peek_command,
766                    None,
767                    true,
768                    PeekTarget::Persist {
769                        id: coll_id,
770                        metadata,
771                    },
772                    StatementExecutionStrategy::PersistFastPath,
773                )
774            }
775            PeekPlan::SlowPath(PeekDataflowPlan {
776                desc: dataflow,
777                // n.b. this index_id identifies a transient index the
778                // caller created, so it is guaranteed to be on
779                // `compute_instance`.
780                id: index_id,
781                key: index_key,
782                permutation: index_permutation,
783                thinned_arity: index_thinned_arity,
784            }) => {
785                let output_ids = dataflow.export_ids().collect();
786
787                // Very important: actually create the dataflow (here, so we can destructure).
788                self.controller
789                    .compute
790                    .create_dataflow(compute_instance, dataflow, None)
791                    .map_err(
792                        AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
793                    )?;
794                self.initialize_compute_read_policies(
795                    output_ids,
796                    compute_instance,
797                    // Disable compaction so that nothing can compact before the peek occurs below.
798                    CompactionWindow::DisableCompaction,
799                )
800                .await;
801
802                // Create an identity MFP operator.
803                let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
804                map_filter_project.permute_fn(
805                    |c| index_permutation[c],
806                    index_key.len() + index_thinned_arity,
807                );
808                let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
809
810                (
811                    (None, timestamp, map_filter_project),
812                    Some(index_id),
813                    false,
814                    PeekTarget::Index { id: index_id },
815                    StatementExecutionStrategy::Standard,
816                )
817            }
818            _ => {
819                unreachable!()
820            }
821        };
822
823        // Endpoints for sending and receiving peek responses.
824        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
825
826        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
827        // small but unlikely chance that it's not unique to completed peeks.
828        let mut uuid = Uuid::new_v4();
829        while self.pending_peeks.contains_key(&uuid) {
830            uuid = Uuid::new_v4();
831        }
832
833        // The peek is ready to go for both cases, fast and non-fast.
834        // Stash the response mechanism, and broadcast dataflow construction.
835        self.pending_peeks.insert(
836            uuid,
837            PendingPeek {
838                conn_id: conn_id.clone(),
839                cluster_id: compute_instance,
840                depends_on: source_ids,
841                ctx_extra: std::mem::take(ctx_extra),
842                is_fast_path,
843            },
844        );
845        self.client_pending_peeks
846            .entry(conn_id)
847            .or_default()
848            .insert(uuid, compute_instance);
849        let (literal_constraints, timestamp, map_filter_project) = peek_command;
850
851        // At this stage we don't know column names for the result because we
852        // only know the peek's result type as a bare SqlRelationType.
853        let peek_result_column_names =
854            (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
855        let peek_result_desc =
856            RelationDesc::new(intermediate_result_type, peek_result_column_names);
857
858        self.controller
859            .compute
860            .peek(
861                compute_instance,
862                peek_target,
863                literal_constraints,
864                uuid,
865                timestamp,
866                peek_result_desc,
867                finishing.clone(),
868                map_filter_project,
869                target_replica,
870                rows_tx,
871            )
872            .map_err(AdapterError::concurrent_dependency_drop_from_peek_error)?;
873
874        let duration_histogram = self.metrics.row_set_finishing_seconds();
875
876        // If a dataflow was created, drop it once the peek command is sent.
877        if let Some(index_id) = drop_dataflow {
878            self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
879            self.drop_compute_collections(vec![(compute_instance, index_id)]);
880        }
881
882        let persist_client = self.persist_client.clone();
883        let peek_stash_read_batch_size_bytes =
884            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
885                .get(self.catalog().system_config().dyncfgs());
886        let peek_stash_read_memory_budget_bytes =
887            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
888                .get(self.catalog().system_config().dyncfgs());
889
890        let peek_response_stream = Self::create_peek_response_stream(
891            rows_rx,
892            finishing,
893            max_result_size,
894            max_returned_query_size,
895            duration_histogram,
896            persist_client,
897            peek_stash_read_batch_size_bytes,
898            peek_stash_read_memory_budget_bytes,
899        );
900
901        Ok(crate::ExecuteResponse::SendingRowsStreaming {
902            rows: Box::pin(peek_response_stream),
903            instance_id: compute_instance,
904            strategy,
905        })
906    }
907
908    /// Creates an async stream that processes peek responses and yields rows.
909    ///
910    /// TODO(peek-seq): Move this out of `coord` once we delete the old peek sequencing.
911    #[mz_ore::instrument(level = "debug")]
912    pub(crate) fn create_peek_response_stream(
913        rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
914        finishing: RowSetFinishing,
915        max_result_size: u64,
916        max_returned_query_size: Option<u64>,
917        duration_histogram: prometheus::Histogram,
918        mut persist_client: mz_persist_client::PersistClient,
919        peek_stash_read_batch_size_bytes: usize,
920        peek_stash_read_memory_budget_bytes: usize,
921    ) -> impl futures::Stream<Item = PeekResponseUnary> {
922        async_stream::stream!({
923            let result = rows_rx.await;
924
925            let rows = match result {
926                Ok(rows) => rows,
927                Err(e) => {
928                    yield PeekResponseUnary::Error(e.to_string());
929                    return;
930                }
931            };
932
933            match rows {
934                PeekResponse::Rows(rows) => {
935                    let rows = RowCollection::merge_sorted(&rows, &finishing.order_by);
936                    match finishing.finish(
937                        rows,
938                        max_result_size,
939                        max_returned_query_size,
940                        &duration_histogram,
941                    ) {
942                        Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
943                        Err(e) => yield PeekResponseUnary::Error(e),
944                    }
945                }
946                PeekResponse::Stashed(response) => {
947                    let response = *response;
948
949                    let shard_id = response.shard_id;
950
951                    let mut batches = Vec::new();
952                    for proto_batch in response.batches.into_iter() {
953                        let batch =
954                            persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
955
956                        batches.push(batch);
957                    }
958                    tracing::trace!(?batches, "stashed peek response");
959
960                    let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
961                    let read_schemas: Schemas<SourceData, ()> = Schemas {
962                        id: None,
963                        key: Arc::new(response.relation_desc.clone()),
964                        val: Arc::new(UnitSchema),
965                    };
966
967                    let mut row_cursor = persist_client
968                        .read_batches_consolidated::<_, _, _, i64>(
969                            response.shard_id,
970                            as_of,
971                            read_schemas,
972                            batches,
973                            |_stats| true,
974                            peek_stash_read_memory_budget_bytes,
975                        )
976                        .await
977                        .expect("invalid usage");
978
979                    // NOTE: Using the cursor creates Futures that are not Sync,
980                    // so we can't drive them on the main Coordinator loop.
981                    // Spawning a task has the additional benefit that we get to
982                    // delete batches once we're done.
983                    //
984                    // Batch deletion is best-effort, though, and there are
985                    // multiple known ways in which they can leak, among them:
986                    //
987                    // - ProtoBatch is lost in flight
988                    // - ProtoBatch is lost because when combining PeekResponse
989                    // from workers a cancellation or error "overrides" other
990                    // results, meaning we drop them
991                    // - This task here is not run to completion before it can
992                    // delete all batches
993                    //
994                    // This is semi-ok, because persist needs a reaper of leaked
995                    // batches already, and so we piggy-back on that, even if it
996                    // might not exist as of today.
997                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
998                    mz_ore::task::spawn(|| "read_peek_batches", async move {
999                        // We always send our inline rows first. Ordering
1000                        // doesn't matter because we can only be in this case
1001                        // when there is no ORDER BY.
1002                        //
1003                        // We _could_ write these out as a Batch, and include it
1004                        // in the batches we read via the Consolidator. If we
1005                        // wanted to get a consistent ordering. That's not
1006                        // needed for correctness! But might be nice for more
1007                        // aesthetic reasons.
1008                        for rows in response.inline_rows {
1009                            let result = tx.send(rows).await;
1010                            if result.is_err() {
1011                                tracing::debug!("receiver went away");
1012                            }
1013                        }
1014
1015                        let mut current_batch = Vec::new();
1016                        let mut current_batch_size: usize = 0;
1017
1018                        'outer: while let Some(rows) = row_cursor.next().await {
1019                            for ((source_data, _val), _ts, diff) in rows {
1020                                let row = source_data
1021                                    .0
1022                                    .expect("we are not sending errors on this code path");
1023
1024                                let diff = usize::try_from(diff)
1025                                    .expect("peek responses cannot have negative diffs");
1026
1027                                if diff > 0 {
1028                                    let diff =
1029                                        NonZeroUsize::new(diff).expect("checked to be non-zero");
1030                                    current_batch_size =
1031                                        current_batch_size.saturating_add(row.byte_len());
1032                                    current_batch.push((row, diff));
1033                                }
1034
1035                                if current_batch_size > peek_stash_read_batch_size_bytes {
1036                                    // We're re-encoding the rows as a RowCollection
1037                                    // here, for which we pay in CPU time. We're in a
1038                                    // slow path already, since we're returning a big
1039                                    // stashed result so this is worth the convenience
1040                                    // of that for now.
1041                                    let result = tx
1042                                        .send(RowCollection::new(
1043                                            current_batch.drain(..).collect_vec(),
1044                                            &[],
1045                                        ))
1046                                        .await;
1047                                    if result.is_err() {
1048                                        tracing::debug!("receiver went away");
1049                                        // Don't return but break so we fall out to the
1050                                        // batch delete logic below.
1051                                        break 'outer;
1052                                    }
1053
1054                                    current_batch_size = 0;
1055                                }
1056                            }
1057                        }
1058
1059                        if current_batch.len() > 0 {
1060                            let result = tx.send(RowCollection::new(current_batch, &[])).await;
1061                            if result.is_err() {
1062                                tracing::debug!("receiver went away");
1063                            }
1064                        }
1065
1066                        let batches = row_cursor.into_lease();
1067                        tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1068                        for batch in batches {
1069                            batch.delete().await;
1070                        }
1071                    });
1072
1073                    assert!(
1074                        finishing.is_streamable(response.relation_desc.arity()),
1075                        "can only get stashed responses when the finishing is streamable"
1076                    );
1077
1078                    tracing::trace!("query result is streamable!");
1079
1080                    assert!(finishing.is_streamable(response.relation_desc.arity()));
1081                    let mut incremental_finishing = RowSetFinishingIncremental::new(
1082                        finishing.offset,
1083                        finishing.limit,
1084                        finishing.project,
1085                        max_returned_query_size,
1086                    );
1087
1088                    let mut got_zero_rows = true;
1089                    while let Some(rows) = rx.recv().await {
1090                        got_zero_rows = false;
1091
1092                        let result_rows = incremental_finishing.finish_incremental(
1093                            rows,
1094                            max_result_size,
1095                            &duration_histogram,
1096                        );
1097
1098                        match result_rows {
1099                            Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1100                            Err(e) => yield PeekResponseUnary::Error(e),
1101                        }
1102                    }
1103
1104                    // Even when there's zero rows, clients still expect an
1105                    // empty PeekResponse.
1106                    if got_zero_rows {
1107                        let row_iter = vec![].into_row_iter();
1108                        yield PeekResponseUnary::Rows(Box::new(row_iter));
1109                    }
1110                }
1111                PeekResponse::Canceled => {
1112                    yield PeekResponseUnary::Canceled;
1113                }
1114                PeekResponse::Error(e) => {
1115                    yield PeekResponseUnary::Error(e);
1116                }
1117            }
1118        })
1119    }
1120
1121    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
1122    #[mz_ore::instrument(level = "debug")]
1123    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1124        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1125            self.metrics
1126                .canceled_peeks
1127                .inc_by(u64::cast_from(uuids.len()));
1128
1129            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1130            for (uuid, compute_instance) in &uuids {
1131                inverse.entry(*compute_instance).or_default().insert(*uuid);
1132            }
1133            for (compute_instance, uuids) in inverse {
1134                // It's possible that this compute instance no longer exists because it was dropped
1135                // while the peek was in progress. In this case we ignore the error and move on
1136                // because the dataflow no longer exists.
1137                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
1138                for uuid in uuids {
1139                    let _ = self.controller.compute.cancel_peek(
1140                        compute_instance,
1141                        uuid,
1142                        PeekResponse::Canceled,
1143                    );
1144                }
1145            }
1146
1147            let peeks = uuids
1148                .iter()
1149                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1150                .collect::<Vec<_>>();
1151            for peek in peeks {
1152                self.retire_execution(
1153                    StatementEndedExecutionReason::Canceled,
1154                    peek.ctx_extra.defuse(),
1155                );
1156            }
1157        }
1158    }
1159
1160    /// Handle a peek notification and retire the corresponding execution. Does nothing for
1161    /// already-removed peeks.
1162    pub(crate) fn handle_peek_notification(
1163        &mut self,
1164        uuid: Uuid,
1165        notification: PeekNotification,
1166        otel_ctx: OpenTelemetryContext,
1167    ) {
1168        // We expect exactly one peek response, which we forward. Then we clean up the
1169        // peek's state in the coordinator.
1170        if let Some(PendingPeek {
1171            conn_id: _,
1172            cluster_id: _,
1173            depends_on: _,
1174            ctx_extra,
1175            is_fast_path,
1176        }) = self.remove_pending_peek(&uuid)
1177        {
1178            let reason = match notification {
1179                PeekNotification::Success {
1180                    rows: num_rows,
1181                    result_size,
1182                } => {
1183                    let strategy = if is_fast_path {
1184                        StatementExecutionStrategy::FastPath
1185                    } else {
1186                        StatementExecutionStrategy::Standard
1187                    };
1188                    StatementEndedExecutionReason::Success {
1189                        result_size: Some(result_size),
1190                        rows_returned: Some(num_rows),
1191                        execution_strategy: Some(strategy),
1192                    }
1193                }
1194                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1195                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1196            };
1197            otel_ctx.attach_as_parent();
1198            self.retire_execution(reason, ctx_extra.defuse());
1199        }
1200        // Cancellation may cause us to receive responses for peeks no
1201        // longer in `self.pending_peeks`, so we quietly ignore them.
1202    }
1203
1204    /// Clean up a peek's state.
1205    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1206        let pending_peek = self.pending_peeks.remove(uuid);
1207        if let Some(pending_peek) = &pending_peek {
1208            let uuids = self
1209                .client_pending_peeks
1210                .get_mut(&pending_peek.conn_id)
1211                .expect("coord peek state is inconsistent");
1212            uuids.remove(uuid);
1213            if uuids.is_empty() {
1214                self.client_pending_peeks.remove(&pending_peek.conn_id);
1215            }
1216        }
1217        pending_peek
1218    }
1219
1220    /// Implements a slow-path peek by creating a transient dataflow.
1221    /// This is called from the command handler for ExecuteSlowPathPeek.
1222    ///
1223    /// (For now, this method simply delegates to implement_peek_plan by constructing
1224    /// the necessary PlannedPeek structure.)
1225    pub(crate) async fn implement_slow_path_peek(
1226        &mut self,
1227        dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1228        determination: TimestampDetermination<mz_repr::Timestamp>,
1229        finishing: RowSetFinishing,
1230        compute_instance: ComputeInstanceId,
1231        target_replica: Option<ReplicaId>,
1232        intermediate_result_type: SqlRelationType,
1233        source_ids: BTreeSet<GlobalId>,
1234        conn_id: ConnectionId,
1235        max_result_size: u64,
1236        max_query_result_size: Option<u64>,
1237        watch_set: Option<WatchSetCreation>,
1238    ) -> Result<ExecuteResponse, AdapterError> {
1239        // Install watch sets for statement lifecycle logging if enabled.
1240        // This must happen _before_ creating ExecuteContextExtra, so that if it fails,
1241        // we don't have an ExecuteContextExtra that needs to be retired (the frontend
1242        // will handle logging for the error case).
1243        let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1244        if let Some(ws) = watch_set {
1245            self.install_peek_watch_sets(conn_id.clone(), ws)
1246                .map_err(|e| {
1247                    AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e)
1248                })?;
1249        }
1250
1251        let source_arity = intermediate_result_type.arity();
1252
1253        let planned_peek = PlannedPeek {
1254            plan: PeekPlan::SlowPath(dataflow_plan),
1255            determination,
1256            conn_id,
1257            intermediate_result_type,
1258            source_arity,
1259            source_ids,
1260        };
1261
1262        // Call the old peek sequencing's implement_peek_plan for now.
1263        // TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the
1264        // relevant parts of the old `implement_peek_plan` into this method, and remove the old
1265        // `implement_peek_plan`.
1266        self.implement_peek_plan(
1267            &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1268            planned_peek,
1269            finishing,
1270            compute_instance,
1271            target_replica,
1272            max_result_size,
1273            max_query_result_size,
1274        )
1275        .await
1276    }
1277
1278    /// Implements a `COPY TO` command by installing peek watch sets,
1279    /// shipping the dataflow, and spawning a background task to wait for completion.
1280    /// This is called from the command handler for ExecuteCopyTo.
1281    ///
1282    /// (The S3 preflight check must be completed successfully via the
1283    /// `CopyToPreflight` command _before_ calling this method. The preflight is
1284    /// handled separately to avoid blocking the coordinator's main task with
1285    /// slow S3 network operations.)
1286    ///
1287    /// This method does NOT block waiting for completion. Instead, it spawns a background task that
1288    /// will send the response through the provided tx channel when the COPY TO completes.
1289    /// All errors (setup or execution) are sent through tx.
1290    pub(crate) async fn implement_copy_to(
1291        &mut self,
1292        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1293        compute_instance: ComputeInstanceId,
1294        target_replica: Option<ReplicaId>,
1295        source_ids: BTreeSet<GlobalId>,
1296        conn_id: ConnectionId,
1297        watch_set: Option<WatchSetCreation>,
1298        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1299    ) {
1300        // Helper to send error and return early
1301        let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1302                        e: AdapterError| {
1303            let _ = tx.send(Err(e));
1304        };
1305
1306        // Install watch sets for statement lifecycle logging if enabled.
1307        // If this fails, we just send the error back. The frontend will handle logging
1308        // for the error case (no ExecuteContextExtra is created here).
1309        if let Some(ws) = watch_set {
1310            if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1311                let err = AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e);
1312                send_err(tx, err);
1313                return;
1314            }
1315        }
1316
1317        // Note: We don't create an ExecuteContextExtra here because the frontend handles
1318        // all statement logging for COPY TO operations.
1319
1320        let sink_id = df_desc.sink_id();
1321
1322        // Create and register ActiveCopyTo.
1323        // Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
1324        // This is different from the command's tx which sends the response to the client
1325        let (sink_tx, sink_rx) = oneshot::channel();
1326        let active_copy_to = ActiveCopyTo {
1327            conn_id: conn_id.clone(),
1328            tx: sink_tx,
1329            cluster_id: compute_instance,
1330            depends_on: source_ids,
1331        };
1332
1333        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1334        drop(
1335            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1336                .await,
1337        );
1338
1339        // Try to ship the dataflow. We handle errors gracefully because dependencies might have
1340        // disappeared during sequencing.
1341        if let Err(e) = self
1342            .try_ship_dataflow(df_desc, compute_instance, target_replica)
1343            .await
1344            .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1345        {
1346            // Clean up the active compute sink that was added above, since the dataflow was never
1347            // created. If we don't do this, the sink_id remains in drop_sinks but no collection
1348            // exists in the compute controller, causing a panic when the connection terminates.
1349            self.remove_active_compute_sink(sink_id).await;
1350            send_err(tx, e);
1351            return;
1352        }
1353
1354        // Spawn background task to wait for completion
1355        // We must NOT await sink_rx here directly, as that would block the coordinator's main task
1356        // from processing the completion message. Instead, we spawn a background task that will
1357        // send the result through tx when the COPY TO completes.
1358        let span = Span::current();
1359        task::spawn(
1360            || "copy to completion",
1361            async move {
1362                let res = sink_rx.await;
1363                let result = match res {
1364                    Ok(res) => res,
1365                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1366                };
1367
1368                let _ = tx.send(result);
1369            }
1370            .instrument(span),
1371        );
1372    }
1373
1374    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
1375    /// client immediately, as opposed to asking the dataflow layer to send along
1376    /// the rows after some computation.
1377    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1378    where
1379        I: IntoRowIterator,
1380        I::Iter: Send + Sync + 'static,
1381    {
1382        let rows = Box::new(rows.into_row_iter());
1383        ExecuteResponse::SendingRowsImmediate { rows }
1384    }
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389    use mz_expr::func::IsNull;
1390    use mz_expr::{MapFilterProject, UnaryFunc};
1391    use mz_ore::str::Indent;
1392    use mz_repr::explain::text::text_string_at;
1393    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1394    use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1395
1396    use super::*;
1397
1398    #[mz_ore::test]
1399    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1400    fn test_fast_path_plan_as_text() {
1401        let typ = SqlRelationType::new(vec![SqlColumnType {
1402            scalar_type: SqlScalarType::String,
1403            nullable: false,
1404        }]);
1405        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1406        let no_lookup = FastPathPlan::PeekExisting(
1407            GlobalId::User(8),
1408            GlobalId::User(10),
1409            None,
1410            MapFilterProject::new(4)
1411                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1412                .project([1, 4])
1413                .into_plan()
1414                .expect("invalid plan")
1415                .into_nontemporal()
1416                .expect("invalid nontemporal"),
1417        );
1418        let lookup = FastPathPlan::PeekExisting(
1419            GlobalId::User(9),
1420            GlobalId::User(11),
1421            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1422            MapFilterProject::new(3)
1423                .filter(Some(
1424                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1425                ))
1426                .into_plan()
1427                .expect("invalid plan")
1428                .into_nontemporal()
1429                .expect("invalid nontemporal"),
1430        );
1431
1432        let humanizer = DummyHumanizer;
1433        let config = ExplainConfig {
1434            redacted: false,
1435            verbose_syntax: true,
1436            ..Default::default()
1437        };
1438        let ctx_gen = || {
1439            let indent = Indent::default();
1440            let annotations = BTreeMap::new();
1441            PlanRenderingContext::<FastPathPlan>::new(
1442                indent,
1443                &humanizer,
1444                annotations,
1445                &config,
1446                BTreeSet::default(),
1447            )
1448        };
1449
1450        let constant_err_exp = "Error \"division by zero\"\n";
1451        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1452        let lookup_exp =
1453            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1454
1455        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1456        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1457        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1458
1459        let mut constant_rows = vec![
1460            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1461            (Row::pack(Some(Datum::String("world"))), 2.into()),
1462            (Row::pack(Some(Datum::String("star"))), 500.into()),
1463        ];
1464        let constant_exp1 =
1465            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
1466        assert_eq!(
1467            text_string_at(
1468                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1469                ctx_gen
1470            ),
1471            constant_exp1
1472        );
1473        constant_rows
1474            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1475        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
1476        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
1477        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
1478        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
1479        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
1480        assert_eq!(
1481            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1482            constant_exp2
1483        );
1484    }
1485}