Skip to main content

mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36    RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::task;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_persist_client::Schemas;
43use mz_persist_types::codec_impls::UnitSchema;
44use mz_repr::explain::text::DisplayText;
45use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
46use mz_repr::{
47    Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
48    preserves_order,
49};
50use mz_storage_types::sources::SourceData;
51use serde::{Deserialize, Serialize};
52use timely::progress::{Antichain, Timestamp};
53use tokio::sync::oneshot;
54use tracing::{Instrument, Span};
55use uuid::Uuid;
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
58use crate::coord::timestamp_selection::TimestampDetermination;
59use crate::optimize::OptimizerError;
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
63
64/// A peek is a request to read data from a maintained arrangement.
65#[derive(Debug)]
66pub(crate) struct PendingPeek {
67    /// The connection that initiated the peek.
68    pub(crate) conn_id: ConnectionId,
69    /// The cluster that the peek is being executed on.
70    pub(crate) cluster_id: ClusterId,
71    /// All `GlobalId`s that the peek depend on.
72    pub(crate) depends_on: BTreeSet<GlobalId>,
73    /// Context about the execute that produced this peek,
74    /// needed by the coordinator for retiring it.
75    pub(crate) ctx_extra: ExecuteContextGuard,
76    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
77    pub(crate) is_fast_path: bool,
78}
79
80/// The response from a `Peek`, with row multiplicities represented in unary.
81///
82/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
83/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
84#[derive(Debug)]
85pub enum PeekResponseUnary {
86    Rows(Box<dyn RowIterator + Send + Sync>),
87    Error(String),
88    Canceled,
89}
90
91#[derive(Clone, Debug)]
92pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
93    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
94    pub(crate) id: GlobalId,
95    key: Vec<MirScalarExpr>,
96    permutation: Vec<usize>,
97    thinned_arity: usize,
98}
99
100impl<T> PeekDataflowPlan<T> {
101    pub fn new(
102        desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
103        id: GlobalId,
104        typ: &SqlRelationType,
105    ) -> Self {
106        let arity = typ.arity();
107        let key = typ
108            .default_key()
109            .into_iter()
110            .map(MirScalarExpr::column)
111            .collect::<Vec<_>>();
112        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
113        Self {
114            desc,
115            id,
116            key,
117            permutation,
118            thinned_arity: thinning.len(),
119        }
120    }
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
124pub enum FastPathPlan {
125    /// The view evaluates to a constant result that can be returned.
126    ///
127    /// The [SqlRelationType] is unnecessary for evaluating the constant result but
128    /// may be helpful when printing out an explanation.
129    Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
130    /// The view can be read out of an existing arrangement.
131    /// (coll_id, idx_id, values to look up, mfp to apply)
132    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
133    /// The view can be read directly out of Persist.
134    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
135}
136
137impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
138    fn fmt_text(
139        &self,
140        f: &mut fmt::Formatter<'_>,
141        ctx: &mut PlanRenderingContext<'a, T>,
142    ) -> fmt::Result {
143        if ctx.config.verbose_syntax {
144            self.fmt_verbose_text(f, ctx)
145        } else {
146            self.fmt_default_text(f, ctx)
147        }
148    }
149}
150
151impl FastPathPlan {
152    pub fn fmt_default_text<'a, T>(
153        &self,
154        f: &mut fmt::Formatter<'_>,
155        ctx: &mut PlanRenderingContext<'a, T>,
156    ) -> fmt::Result {
157        let mode = HumanizedExplain::new(ctx.config.redacted);
158
159        match self {
160            FastPathPlan::Constant(rows, _) => {
161                write!(f, "{}→Constant ", ctx.indent)?;
162
163                match rows {
164                    Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
165                    Err(err) => {
166                        if mode.redacted() {
167                            writeln!(f, "(error: █)")?;
168                        } else {
169                            writeln!(f, "(error: {})", err.to_string().quoted(),)?;
170                        }
171                    }
172                }
173            }
174            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
175                let coll = ctx
176                    .humanizer
177                    .humanize_id(*coll_id)
178                    .unwrap_or_else(|| coll_id.to_string());
179                let idx = ctx
180                    .humanizer
181                    .humanize_id(*idx_id)
182                    .unwrap_or_else(|| idx_id.to_string());
183                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
184                ctx.indent.set();
185
186                ctx.indent += 1;
187
188                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
189                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
190
191                if printed {
192                    ctx.indent += 1;
193                }
194                if let Some(literal_constraints) = literal_constraints {
195                    writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
196                    ctx.indent += 1;
197                    let values = separated("; ", mode.seq(literal_constraints, None));
198                    writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
199                } else {
200                    writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
201                }
202
203                ctx.indent.reset();
204            }
205            FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
206                let coll = ctx
207                    .humanizer
208                    .humanize_id(*global_id)
209                    .unwrap_or_else(|| global_id.to_string());
210                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
211                ctx.indent.set();
212
213                ctx.indent += 1;
214
215                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
216                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
217
218                if printed {
219                    ctx.indent += 1;
220                }
221                if let Some(literal_constraint) = literal_constraint {
222                    writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
223                    ctx.indent += 1;
224                    let value = mode.expr(literal_constraint, None);
225                    writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
226                } else {
227                    writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
228                }
229
230                ctx.indent.reset();
231            }
232        }
233
234        Ok(())
235    }
236
237    pub fn fmt_verbose_text<'a, T>(
238        &self,
239        f: &mut fmt::Formatter<'_>,
240        ctx: &mut PlanRenderingContext<'a, T>,
241    ) -> fmt::Result {
242        let redacted = ctx.config.redacted;
243        let mode = HumanizedExplain::new(redacted);
244
245        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
246        // code.
247        match self {
248            FastPathPlan::Constant(Ok(rows), _) => {
249                if !rows.is_empty() {
250                    writeln!(f, "{}Constant", ctx.indent)?;
251                    *ctx.as_mut() += 1;
252                    fmt_text_constant_rows(
253                        f,
254                        rows.iter().map(|(row, diff)| (row, diff)),
255                        ctx.as_mut(),
256                        redacted,
257                    )?;
258                    *ctx.as_mut() -= 1;
259                } else {
260                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
261                }
262                Ok(())
263            }
264            FastPathPlan::Constant(Err(err), _) => {
265                if redacted {
266                    writeln!(f, "{}Error █", ctx.as_mut())
267                } else {
268                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
269                }
270            }
271            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
272                ctx.as_mut().set();
273                let (map, filter, project) = mfp.as_map_filter_project();
274
275                let cols = if !ctx.config.humanized_exprs {
276                    None
277                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
278                    // FIXME: account for thinning and permutation
279                    // See mz_expr::permutation_for_arrangement
280                    // See permute_oneshot_mfp_around_index
281                    let cols = itertools::chain(
282                        cols.iter().cloned(),
283                        std::iter::repeat(String::new()).take(map.len()),
284                    )
285                    .collect();
286                    Some(cols)
287                } else {
288                    None
289                };
290
291                if project.len() != mfp.input_arity + map.len()
292                    || !project.iter().enumerate().all(|(i, o)| i == *o)
293                {
294                    let outputs = mode.seq(&project, cols.as_ref());
295                    let outputs = CompactScalars(outputs);
296                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
297                    *ctx.as_mut() += 1;
298                }
299                if !filter.is_empty() {
300                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
301                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
302                    *ctx.as_mut() += 1;
303                }
304                if !map.is_empty() {
305                    let scalars = mode.seq(&map, cols.as_ref());
306                    let scalars = CompactScalars(scalars);
307                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
308                    *ctx.as_mut() += 1;
309                }
310                MirRelationExpr::fmt_indexed_filter(
311                    f,
312                    ctx,
313                    coll_id,
314                    idx_id,
315                    literal_constraints.clone(),
316                    None,
317                )?;
318                writeln!(f)?;
319                ctx.as_mut().reset();
320                Ok(())
321            }
322            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
323                ctx.as_mut().set();
324                let (map, filter, project) = mfp.as_map_filter_project();
325
326                let cols = if !ctx.config.humanized_exprs {
327                    None
328                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
329                    let cols = itertools::chain(
330                        cols.iter().cloned(),
331                        std::iter::repeat(String::new()).take(map.len()),
332                    )
333                    .collect::<Vec<_>>();
334                    Some(cols)
335                } else {
336                    None
337                };
338
339                if project.len() != mfp.input_arity + map.len()
340                    || !project.iter().enumerate().all(|(i, o)| i == *o)
341                {
342                    let outputs = mode.seq(&project, cols.as_ref());
343                    let outputs = CompactScalars(outputs);
344                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
345                    *ctx.as_mut() += 1;
346                }
347                if !filter.is_empty() {
348                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
349                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
350                    *ctx.as_mut() += 1;
351                }
352                if !map.is_empty() {
353                    let scalars = mode.seq(&map, cols.as_ref());
354                    let scalars = CompactScalars(scalars);
355                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
356                    *ctx.as_mut() += 1;
357                }
358                let human_id = ctx
359                    .humanizer
360                    .humanize_id(*gid)
361                    .unwrap_or_else(|| gid.to_string());
362                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
363                if let Some(literal) = literal_constraint {
364                    let value = mode.expr(literal, None);
365                    writeln!(f, " [value={}]", value)?;
366                } else {
367                    writeln!(f, "")?;
368                }
369                ctx.as_mut().reset();
370                Ok(())
371            }
372        }?;
373        Ok(())
374    }
375}
376
377#[derive(Debug)]
378pub struct PlannedPeek {
379    pub plan: PeekPlan,
380    pub determination: TimestampDetermination<mz_repr::Timestamp>,
381    pub conn_id: ConnectionId,
382    /// The result type _after_ reading out of the "source" and applying any
383    /// [MapFilterProject](mz_expr::MapFilterProject), but _before_ applying a
384    /// [RowSetFinishing].
385    ///
386    /// This is _the_ `result_type` as far as compute is concerned and further
387    /// changes through projections happen purely in the adapter.
388    pub intermediate_result_type: SqlRelationType,
389    pub source_arity: usize,
390    pub source_ids: BTreeSet<GlobalId>,
391}
392
393/// Possible ways in which the coordinator could produce the result for a goal view.
394#[derive(Clone, Debug)]
395pub enum PeekPlan<T = mz_repr::Timestamp> {
396    FastPath(FastPathPlan),
397    /// The view must be installed as a dataflow and then read.
398    SlowPath(PeekDataflowPlan<T>),
399}
400
401/// Convert `mfp` to an executable, non-temporal plan.
402/// It should be non-temporal, as OneShot preparation populates `mz_now`.
403///
404/// If the `mfp` can't be converted into a non-temporal plan, this returns an _internal_ error.
405fn mfp_to_safe_plan(
406    mfp: mz_expr::MapFilterProject,
407) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
408    mfp.into_plan()
409        .map_err(OptimizerError::InternalUnsafeMfpPlan)?
410        .into_nontemporal()
411        .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
412}
413
414/// If it can't convert `mfp` into a `SafeMfpPlan`, this returns an _internal_ error.
415fn permute_oneshot_mfp_around_index(
416    mfp: mz_expr::MapFilterProject,
417    key: &[MirScalarExpr],
418) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
419    let input_arity = mfp.input_arity;
420    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
421    let (permute, thinning) = permutation_for_arrangement(key, input_arity);
422    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
423    Ok(safe_mfp)
424}
425
426/// Determine if the dataflow plan can be implemented without an actual dataflow.
427///
428/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
429/// we can avoid building a dataflow (and either just return the results, or peek
430/// out of the arrangement, respectively).
431pub fn create_fast_path_plan<T: Timestamp>(
432    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
433    view_id: GlobalId,
434    finishing: Option<&RowSetFinishing>,
435    persist_fast_path_limit: usize,
436    persist_fast_path_order: bool,
437) -> Result<Option<FastPathPlan>, OptimizerError> {
438    // At this point, `dataflow_plan` contains our best optimized dataflow.
439    // We will check the plan to see if there is a fast path to escape full dataflow construction.
440
441    // We need to restrict ourselves to settings where the inserted transient view is the first thing
442    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
443    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
444    {
445        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
446        if let Some((rows, found_typ)) = mir.as_const() {
447            // In the case of a constant, we can return the result now.
448            let plan = FastPathPlan::Constant(rows.clone(), found_typ.clone());
449            return Ok(Some(plan));
450        } else {
451            // If there is a TopK that would be completely covered by the finishing, then jump
452            // through the TopK.
453            if let MirRelationExpr::TopK {
454                input,
455                group_key,
456                order_key,
457                limit,
458                offset,
459                monotonic: _,
460                expected_group_size: _,
461            } = mir
462            {
463                if let Some(finishing) = finishing {
464                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
465                        // The following is roughly `limit >= finishing.limit`, but with Options.
466                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
467                            (None, _) => true,
468                            (Some(..), None) => false,
469                            (Some(topk_limit), Some(finishing_limit)) => {
470                                if let Some(l) = topk_limit.as_literal_int64() {
471                                    l >= *finishing_limit
472                                } else {
473                                    false
474                                }
475                            }
476                        };
477                        if finishing_limits_at_least_as_topk {
478                            mir = input;
479                        }
480                    }
481                }
482            }
483            // In the case of a linear operator around an indexed view, we
484            // can skip creating a dataflow and instead pull all the rows in
485            // index and apply the linear operator against them.
486            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
487            match mir {
488                MirRelationExpr::Get {
489                    id: Id::Global(get_id),
490                    typ: relation_typ,
491                    ..
492                } => {
493                    // Just grab any arrangement if an arrangement exists
494                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
495                        if desc.on_id == *get_id {
496                            return Ok(Some(FastPathPlan::PeekExisting(
497                                *get_id,
498                                *index_id,
499                                None,
500                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
501                            )));
502                        }
503                    }
504
505                    // If there is no arrangement, consider peeking the persist shard directly.
506                    // Generally, we consider a persist peek when the query can definitely be satisfied
507                    // by scanning through a small, constant number of Persist key-values.
508                    let safe_mfp = mfp_to_safe_plan(mfp)?;
509                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
510
511                    let literal_constraint = if persist_fast_path_order {
512                        let mut row = Row::default();
513                        let mut packer = row.packer();
514                        for (idx, col) in relation_typ.column_types.iter().enumerate() {
515                            if !preserves_order(&col.scalar_type) {
516                                break;
517                            }
518                            let col_expr = MirScalarExpr::column(idx);
519
520                            let Some((literal, _)) = filters
521                                .iter()
522                                .filter_map(|f| f.expr_eq_literal(&col_expr))
523                                .next()
524                            else {
525                                break;
526                            };
527                            packer.extend_by_row(&literal);
528                        }
529                        if row.is_empty() { None } else { Some(row) }
530                    } else {
531                        None
532                    };
533
534                    let finish_ok = match &finishing {
535                        None => false,
536                        Some(RowSetFinishing {
537                            order_by,
538                            limit,
539                            offset,
540                            ..
541                        }) => {
542                            let order_ok = if persist_fast_path_order {
543                                order_by.iter().enumerate().all(|(idx, order)| {
544                                    // Map the ordering column back to the column in the source data.
545                                    // (If it's not one of the input columns, we can't make any guarantees.)
546                                    let column_idx = projection[order.column];
547                                    if column_idx >= safe_mfp.input_arity {
548                                        return false;
549                                    }
550                                    let column_type = &relation_typ.column_types[column_idx];
551                                    let index_ok = idx == column_idx;
552                                    let nulls_ok = !column_type.nullable || order.nulls_last;
553                                    let asc_ok = !order.desc;
554                                    let type_ok = preserves_order(&column_type.scalar_type);
555                                    index_ok && nulls_ok && asc_ok && type_ok
556                                })
557                            } else {
558                                order_by.is_empty()
559                            };
560                            let limit_ok = limit.map_or(false, |l| {
561                                usize::cast_from(l) + *offset < persist_fast_path_limit
562                            });
563                            order_ok && limit_ok
564                        }
565                    };
566
567                    let key_constraint = if let Some(literal) = &literal_constraint {
568                        let prefix_len = literal.iter().count();
569                        relation_typ
570                            .keys
571                            .iter()
572                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
573                    } else {
574                        false
575                    };
576
577                    // We can generate a persist peek when:
578                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
579                    // - We can return the first N key values (no filters, small limit, consistent order)
580                    if key_constraint || (filters.is_empty() && finish_ok) {
581                        return Ok(Some(FastPathPlan::PeekPersist(
582                            *get_id,
583                            literal_constraint,
584                            safe_mfp,
585                        )));
586                    }
587                }
588                MirRelationExpr::Join { implementation, .. } => {
589                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
590                        implementation
591                    {
592                        return Ok(Some(FastPathPlan::PeekExisting(
593                            *coll_id,
594                            *idx_id,
595                            Some(vals.clone()),
596                            permute_oneshot_mfp_around_index(mfp, key)?,
597                        )));
598                    }
599                }
600                // nothing can be done for non-trivial expressions.
601                _ => {}
602            }
603        }
604    }
605    Ok(None)
606}
607
608impl FastPathPlan {
609    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
610        match self {
611            FastPathPlan::Constant(..) => UsedIndexes::default(),
612            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
613                if literal_constraints.is_some() {
614                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
615                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
616                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
617                } else {
618                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
619                }
620            }
621            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
622        }
623    }
624}
625
626impl crate::coord::Coordinator {
627    /// Implements a peek plan produced by `create_plan` above.
628    #[mz_ore::instrument(level = "debug")]
629    pub async fn implement_peek_plan(
630        &mut self,
631        ctx_extra: &mut ExecuteContextGuard,
632        plan: PlannedPeek,
633        finishing: RowSetFinishing,
634        compute_instance: ComputeInstanceId,
635        target_replica: Option<ReplicaId>,
636        max_result_size: u64,
637        max_returned_query_size: Option<u64>,
638    ) -> Result<ExecuteResponse, AdapterError> {
639        let PlannedPeek {
640            plan: fast_path,
641            determination,
642            conn_id,
643            intermediate_result_type,
644            source_arity,
645            source_ids,
646        } = plan;
647
648        // If the dataflow optimizes to a constant expression, we can immediately return the result.
649        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
650            let mut rows = match rows {
651                Ok(rows) => rows,
652                Err(e) => return Err(e.into()),
653            };
654            // Consolidate down the results to get correct totals.
655            consolidate(&mut rows);
656
657            let mut results = Vec::new();
658            for (row, count) in rows {
659                if count.is_negative() {
660                    Err(EvalError::InvalidParameterValue(
661                        format!("Negative multiplicity in constant result: {}", count).into(),
662                    ))?
663                };
664                if count.is_positive() {
665                    let count = usize::cast_from(
666                        u64::try_from(count.into_inner())
667                            .expect("known to be positive from check above"),
668                    );
669                    results.push((
670                        row,
671                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
672                    ));
673                }
674            }
675            let row_collection = RowCollection::new(results, &finishing.order_by);
676            let duration_histogram = self.metrics.row_set_finishing_seconds();
677
678            let (ret, reason) = match finishing.finish(
679                row_collection,
680                max_result_size,
681                max_returned_query_size,
682                &duration_histogram,
683            ) {
684                Ok((rows, row_size_bytes)) => {
685                    let result_size = u64::cast_from(row_size_bytes);
686                    let rows_returned = u64::cast_from(rows.count());
687                    (
688                        Ok(Self::send_immediate_rows(rows)),
689                        StatementEndedExecutionReason::Success {
690                            result_size: Some(result_size),
691                            rows_returned: Some(rows_returned),
692                            execution_strategy: Some(StatementExecutionStrategy::Constant),
693                        },
694                    )
695                }
696                Err(error) => (
697                    Err(AdapterError::ResultSize(error.clone())),
698                    StatementEndedExecutionReason::Errored { error },
699                ),
700            };
701            self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
702            return ret;
703        }
704
705        let timestamp = determination.timestamp_context.timestamp_or_default();
706        if let Some(id) = ctx_extra.contents() {
707            self.set_statement_execution_timestamp(id, timestamp)
708        }
709
710        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
711        // In both cases we will want to peek, and the main difference is that we might want to
712        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
713        // differently.
714
715        // If we must build the view, ship the dataflow.
716        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
717            PeekPlan::FastPath(FastPathPlan::PeekExisting(
718                _coll_id,
719                idx_id,
720                literal_constraints,
721                map_filter_project,
722            )) => (
723                (literal_constraints, timestamp, map_filter_project),
724                None,
725                true,
726                PeekTarget::Index { id: idx_id },
727                StatementExecutionStrategy::FastPath,
728            ),
729            PeekPlan::FastPath(FastPathPlan::PeekPersist(
730                coll_id,
731                literal_constraint,
732                map_filter_project,
733            )) => {
734                let peek_command = (
735                    literal_constraint.map(|r| vec![r]),
736                    timestamp,
737                    map_filter_project,
738                );
739                let metadata = self
740                    .controller
741                    .storage
742                    .collection_metadata(coll_id)
743                    .expect("storage collection for fast-path peek")
744                    .clone();
745                (
746                    peek_command,
747                    None,
748                    true,
749                    PeekTarget::Persist {
750                        id: coll_id,
751                        metadata,
752                    },
753                    StatementExecutionStrategy::PersistFastPath,
754                )
755            }
756            PeekPlan::SlowPath(PeekDataflowPlan {
757                desc: dataflow,
758                // n.b. this index_id identifies a transient index the
759                // caller created, so it is guaranteed to be on
760                // `compute_instance`.
761                id: index_id,
762                key: index_key,
763                permutation: index_permutation,
764                thinned_arity: index_thinned_arity,
765            }) => {
766                let output_ids = dataflow.export_ids().collect();
767
768                // Very important: actually create the dataflow (here, so we can destructure).
769                self.controller
770                    .compute
771                    .create_dataflow(compute_instance, dataflow, None)
772                    .map_err(
773                        AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
774                    )?;
775                self.initialize_compute_read_policies(
776                    output_ids,
777                    compute_instance,
778                    // Disable compaction so that nothing can compact before the peek occurs below.
779                    CompactionWindow::DisableCompaction,
780                )
781                .await;
782
783                // Create an identity MFP operator.
784                let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
785                map_filter_project.permute_fn(
786                    |c| index_permutation[c],
787                    index_key.len() + index_thinned_arity,
788                );
789                let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
790
791                (
792                    (None, timestamp, map_filter_project),
793                    Some(index_id),
794                    false,
795                    PeekTarget::Index { id: index_id },
796                    StatementExecutionStrategy::Standard,
797                )
798            }
799            _ => {
800                unreachable!()
801            }
802        };
803
804        // Endpoints for sending and receiving peek responses.
805        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
806
807        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
808        // small but unlikely chance that it's not unique to completed peeks.
809        let mut uuid = Uuid::new_v4();
810        while self.pending_peeks.contains_key(&uuid) {
811            uuid = Uuid::new_v4();
812        }
813
814        // The peek is ready to go for both cases, fast and non-fast.
815        // Stash the response mechanism, and broadcast dataflow construction.
816        self.pending_peeks.insert(
817            uuid,
818            PendingPeek {
819                conn_id: conn_id.clone(),
820                cluster_id: compute_instance,
821                depends_on: source_ids,
822                ctx_extra: std::mem::take(ctx_extra),
823                is_fast_path,
824            },
825        );
826        self.client_pending_peeks
827            .entry(conn_id)
828            .or_default()
829            .insert(uuid, compute_instance);
830        let (literal_constraints, timestamp, map_filter_project) = peek_command;
831
832        // At this stage we don't know column names for the result because we
833        // only know the peek's result type as a bare SqlRelationType.
834        let peek_result_column_names =
835            (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
836        let peek_result_desc =
837            RelationDesc::new(intermediate_result_type, peek_result_column_names);
838
839        self.controller
840            .compute
841            .peek(
842                compute_instance,
843                peek_target,
844                literal_constraints,
845                uuid,
846                timestamp,
847                peek_result_desc,
848                finishing.clone(),
849                map_filter_project,
850                target_replica,
851                rows_tx,
852            )
853            .map_err(AdapterError::concurrent_dependency_drop_from_peek_error)?;
854
855        let duration_histogram = self.metrics.row_set_finishing_seconds();
856
857        // If a dataflow was created, drop it once the peek command is sent.
858        if let Some(index_id) = drop_dataflow {
859            self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
860            self.drop_compute_collections(vec![(compute_instance, index_id)]);
861        }
862
863        let persist_client = self.persist_client.clone();
864        let peek_stash_read_batch_size_bytes =
865            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
866                .get(self.catalog().system_config().dyncfgs());
867        let peek_stash_read_memory_budget_bytes =
868            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
869                .get(self.catalog().system_config().dyncfgs());
870
871        let peek_response_stream = Self::create_peek_response_stream(
872            rows_rx,
873            finishing,
874            max_result_size,
875            max_returned_query_size,
876            duration_histogram,
877            persist_client,
878            peek_stash_read_batch_size_bytes,
879            peek_stash_read_memory_budget_bytes,
880        );
881
882        Ok(crate::ExecuteResponse::SendingRowsStreaming {
883            rows: Box::pin(peek_response_stream),
884            instance_id: compute_instance,
885            strategy,
886        })
887    }
888
889    /// Creates an async stream that processes peek responses and yields rows.
890    ///
891    /// TODO(peek-seq): Move this out of `coord` once we delete the old peek sequencing.
892    #[mz_ore::instrument(level = "debug")]
893    pub(crate) fn create_peek_response_stream(
894        rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
895        finishing: RowSetFinishing,
896        max_result_size: u64,
897        max_returned_query_size: Option<u64>,
898        duration_histogram: prometheus::Histogram,
899        mut persist_client: mz_persist_client::PersistClient,
900        peek_stash_read_batch_size_bytes: usize,
901        peek_stash_read_memory_budget_bytes: usize,
902    ) -> impl futures::Stream<Item = PeekResponseUnary> {
903        async_stream::stream!({
904            let result = rows_rx.await;
905
906            let rows = match result {
907                Ok(rows) => rows,
908                Err(e) => {
909                    yield PeekResponseUnary::Error(e.to_string());
910                    return;
911                }
912            };
913
914            match rows {
915                PeekResponse::Rows(rows) => {
916                    match finishing.finish(
917                        rows,
918                        max_result_size,
919                        max_returned_query_size,
920                        &duration_histogram,
921                    ) {
922                        Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
923                        Err(e) => yield PeekResponseUnary::Error(e),
924                    }
925                }
926                PeekResponse::Stashed(response) => {
927                    let response = *response;
928
929                    let shard_id = response.shard_id;
930
931                    let mut batches = Vec::new();
932                    for proto_batch in response.batches.into_iter() {
933                        let batch =
934                            persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
935
936                        batches.push(batch);
937                    }
938                    tracing::trace!(?batches, "stashed peek response");
939
940                    let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
941                    let read_schemas: Schemas<SourceData, ()> = Schemas {
942                        id: None,
943                        key: Arc::new(response.relation_desc.clone()),
944                        val: Arc::new(UnitSchema),
945                    };
946
947                    let mut row_cursor = persist_client
948                        .read_batches_consolidated::<_, _, _, i64>(
949                            response.shard_id,
950                            as_of,
951                            read_schemas,
952                            batches,
953                            |_stats| true,
954                            peek_stash_read_memory_budget_bytes,
955                        )
956                        .await
957                        .expect("invalid usage");
958
959                    // NOTE: Using the cursor creates Futures that are not Sync,
960                    // so we can't drive them on the main Coordinator loop.
961                    // Spawning a task has the additional benefit that we get to
962                    // delete batches once we're done.
963                    //
964                    // Batch deletion is best-effort, though, and there are
965                    // multiple known ways in which they can leak, among them:
966                    //
967                    // - ProtoBatch is lost in flight
968                    // - ProtoBatch is lost because when combining PeekResponse
969                    // from workers a cancellation or error "overrides" other
970                    // results, meaning we drop them
971                    // - This task here is not run to completion before it can
972                    // delete all batches
973                    //
974                    // This is semi-ok, because persist needs a reaper of leaked
975                    // batches already, and so we piggy-back on that, even if it
976                    // might not exist as of today.
977                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
978                    mz_ore::task::spawn(|| "read_peek_batches", async move {
979                        // We always send our inline rows first. Ordering
980                        // doesn't matter because we can only be in this case
981                        // when there is no ORDER BY.
982                        //
983                        // We _could_ write these out as a Batch, and include it
984                        // in the batches we read via the Consolidator. If we
985                        // wanted to get a consistent ordering. That's not
986                        // needed for correctness! But might be nice for more
987                        // aesthetic reasons.
988                        let result = tx.send(response.inline_rows).await;
989                        if result.is_err() {
990                            tracing::debug!("receiver went away");
991                        }
992
993                        let mut current_batch = Vec::new();
994                        let mut current_batch_size: usize = 0;
995
996                        'outer: while let Some(rows) = row_cursor.next().await {
997                            for ((source_data, _val), _ts, diff) in rows {
998                                let row = source_data
999                                    .0
1000                                    .expect("we are not sending errors on this code path");
1001
1002                                let diff = usize::try_from(diff)
1003                                    .expect("peek responses cannot have negative diffs");
1004
1005                                if diff > 0 {
1006                                    let diff =
1007                                        NonZeroUsize::new(diff).expect("checked to be non-zero");
1008                                    current_batch_size =
1009                                        current_batch_size.saturating_add(row.byte_len());
1010                                    current_batch.push((row, diff));
1011                                }
1012
1013                                if current_batch_size > peek_stash_read_batch_size_bytes {
1014                                    // We're re-encoding the rows as a RowCollection
1015                                    // here, for which we pay in CPU time. We're in a
1016                                    // slow path already, since we're returning a big
1017                                    // stashed result so this is worth the convenience
1018                                    // of that for now.
1019                                    let result = tx
1020                                        .send(RowCollection::new(
1021                                            current_batch.drain(..).collect_vec(),
1022                                            &[],
1023                                        ))
1024                                        .await;
1025                                    if result.is_err() {
1026                                        tracing::debug!("receiver went away");
1027                                        // Don't return but break so we fall out to the
1028                                        // batch delete logic below.
1029                                        break 'outer;
1030                                    }
1031
1032                                    current_batch_size = 0;
1033                                }
1034                            }
1035                        }
1036
1037                        if current_batch.len() > 0 {
1038                            let result = tx.send(RowCollection::new(current_batch, &[])).await;
1039                            if result.is_err() {
1040                                tracing::debug!("receiver went away");
1041                            }
1042                        }
1043
1044                        let batches = row_cursor.into_lease();
1045                        tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1046                        for batch in batches {
1047                            batch.delete().await;
1048                        }
1049                    });
1050
1051                    assert!(
1052                        finishing.is_streamable(response.relation_desc.arity()),
1053                        "can only get stashed responses when the finishing is streamable"
1054                    );
1055
1056                    tracing::trace!("query result is streamable!");
1057
1058                    assert!(finishing.is_streamable(response.relation_desc.arity()));
1059                    let mut incremental_finishing = RowSetFinishingIncremental::new(
1060                        finishing.offset,
1061                        finishing.limit,
1062                        finishing.project,
1063                        max_returned_query_size,
1064                    );
1065
1066                    let mut got_zero_rows = true;
1067                    while let Some(rows) = rx.recv().await {
1068                        got_zero_rows = false;
1069
1070                        let result_rows = incremental_finishing.finish_incremental(
1071                            rows,
1072                            max_result_size,
1073                            &duration_histogram,
1074                        );
1075
1076                        match result_rows {
1077                            Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1078                            Err(e) => yield PeekResponseUnary::Error(e),
1079                        }
1080                    }
1081
1082                    // Even when there's zero rows, clients still expect an
1083                    // empty PeekResponse.
1084                    if got_zero_rows {
1085                        let row_iter = vec![].into_row_iter();
1086                        yield PeekResponseUnary::Rows(Box::new(row_iter));
1087                    }
1088                }
1089                PeekResponse::Canceled => {
1090                    yield PeekResponseUnary::Canceled;
1091                }
1092                PeekResponse::Error(e) => {
1093                    yield PeekResponseUnary::Error(e);
1094                }
1095            }
1096        })
1097    }
1098
1099    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
1100    #[mz_ore::instrument(level = "debug")]
1101    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1102        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1103            self.metrics
1104                .canceled_peeks
1105                .inc_by(u64::cast_from(uuids.len()));
1106
1107            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1108            for (uuid, compute_instance) in &uuids {
1109                inverse.entry(*compute_instance).or_default().insert(*uuid);
1110            }
1111            for (compute_instance, uuids) in inverse {
1112                // It's possible that this compute instance no longer exists because it was dropped
1113                // while the peek was in progress. In this case we ignore the error and move on
1114                // because the dataflow no longer exists.
1115                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
1116                for uuid in uuids {
1117                    let _ = self.controller.compute.cancel_peek(
1118                        compute_instance,
1119                        uuid,
1120                        PeekResponse::Canceled,
1121                    );
1122                }
1123            }
1124
1125            let peeks = uuids
1126                .iter()
1127                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1128                .collect::<Vec<_>>();
1129            for peek in peeks {
1130                self.retire_execution(
1131                    StatementEndedExecutionReason::Canceled,
1132                    peek.ctx_extra.defuse(),
1133                );
1134            }
1135        }
1136    }
1137
1138    /// Handle a peek notification and retire the corresponding execution. Does nothing for
1139    /// already-removed peeks.
1140    pub(crate) fn handle_peek_notification(
1141        &mut self,
1142        uuid: Uuid,
1143        notification: PeekNotification,
1144        otel_ctx: OpenTelemetryContext,
1145    ) {
1146        // We expect exactly one peek response, which we forward. Then we clean up the
1147        // peek's state in the coordinator.
1148        if let Some(PendingPeek {
1149            conn_id: _,
1150            cluster_id: _,
1151            depends_on: _,
1152            ctx_extra,
1153            is_fast_path,
1154        }) = self.remove_pending_peek(&uuid)
1155        {
1156            let reason = match notification {
1157                PeekNotification::Success {
1158                    rows: num_rows,
1159                    result_size,
1160                } => {
1161                    let strategy = if is_fast_path {
1162                        StatementExecutionStrategy::FastPath
1163                    } else {
1164                        StatementExecutionStrategy::Standard
1165                    };
1166                    StatementEndedExecutionReason::Success {
1167                        result_size: Some(result_size),
1168                        rows_returned: Some(num_rows),
1169                        execution_strategy: Some(strategy),
1170                    }
1171                }
1172                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1173                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1174            };
1175            otel_ctx.attach_as_parent();
1176            self.retire_execution(reason, ctx_extra.defuse());
1177        }
1178        // Cancellation may cause us to receive responses for peeks no
1179        // longer in `self.pending_peeks`, so we quietly ignore them.
1180    }
1181
1182    /// Clean up a peek's state.
1183    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1184        let pending_peek = self.pending_peeks.remove(uuid);
1185        if let Some(pending_peek) = &pending_peek {
1186            let uuids = self
1187                .client_pending_peeks
1188                .get_mut(&pending_peek.conn_id)
1189                .expect("coord peek state is inconsistent");
1190            uuids.remove(uuid);
1191            if uuids.is_empty() {
1192                self.client_pending_peeks.remove(&pending_peek.conn_id);
1193            }
1194        }
1195        pending_peek
1196    }
1197
1198    /// Implements a slow-path peek by creating a transient dataflow.
1199    /// This is called from the command handler for ExecuteSlowPathPeek.
1200    ///
1201    /// (For now, this method simply delegates to implement_peek_plan by constructing
1202    /// the necessary PlannedPeek structure.)
1203    pub(crate) async fn implement_slow_path_peek(
1204        &mut self,
1205        dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1206        determination: TimestampDetermination<mz_repr::Timestamp>,
1207        finishing: RowSetFinishing,
1208        compute_instance: ComputeInstanceId,
1209        target_replica: Option<ReplicaId>,
1210        intermediate_result_type: SqlRelationType,
1211        source_ids: BTreeSet<GlobalId>,
1212        conn_id: ConnectionId,
1213        max_result_size: u64,
1214        max_query_result_size: Option<u64>,
1215        watch_set: Option<WatchSetCreation>,
1216    ) -> Result<ExecuteResponse, AdapterError> {
1217        // Install watch sets for statement lifecycle logging if enabled.
1218        // This must happen _before_ creating ExecuteContextExtra, so that if it fails,
1219        // we don't have an ExecuteContextExtra that needs to be retired (the frontend
1220        // will handle logging for the error case).
1221        let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1222        if let Some(ws) = watch_set {
1223            self.install_peek_watch_sets(conn_id.clone(), ws)
1224                .map_err(|e| {
1225                    AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1226                        e,
1227                        compute_instance,
1228                    )
1229                })?;
1230        }
1231
1232        let source_arity = intermediate_result_type.arity();
1233
1234        let planned_peek = PlannedPeek {
1235            plan: PeekPlan::SlowPath(dataflow_plan),
1236            determination,
1237            conn_id,
1238            intermediate_result_type,
1239            source_arity,
1240            source_ids,
1241        };
1242
1243        // Call the old peek sequencing's implement_peek_plan for now.
1244        // TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the
1245        // relevant parts of the old `implement_peek_plan` into this method, and remove the old
1246        // `implement_peek_plan`.
1247        self.implement_peek_plan(
1248            &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1249            planned_peek,
1250            finishing,
1251            compute_instance,
1252            target_replica,
1253            max_result_size,
1254            max_query_result_size,
1255        )
1256        .await
1257    }
1258
1259    /// Implements a `COPY TO` command by installing peek watch sets,
1260    /// shipping the dataflow, and spawning a background task to wait for completion.
1261    /// This is called from the command handler for ExecuteCopyTo.
1262    ///
1263    /// (The S3 preflight check must be completed successfully via the
1264    /// `CopyToPreflight` command _before_ calling this method. The preflight is
1265    /// handled separately to avoid blocking the coordinator's main task with
1266    /// slow S3 network operations.)
1267    ///
1268    /// This method does NOT block waiting for completion. Instead, it spawns a background task that
1269    /// will send the response through the provided tx channel when the COPY TO completes.
1270    /// All errors (setup or execution) are sent through tx.
1271    pub(crate) async fn implement_copy_to(
1272        &mut self,
1273        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1274        compute_instance: ComputeInstanceId,
1275        target_replica: Option<ReplicaId>,
1276        source_ids: BTreeSet<GlobalId>,
1277        conn_id: ConnectionId,
1278        watch_set: Option<WatchSetCreation>,
1279        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1280    ) {
1281        // Helper to send error and return early
1282        let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1283                        e: AdapterError| {
1284            let _ = tx.send(Err(e));
1285        };
1286
1287        // Install watch sets for statement lifecycle logging if enabled.
1288        // If this fails, we just send the error back. The frontend will handle logging
1289        // for the error case (no ExecuteContextExtra is created here).
1290        if let Some(ws) = watch_set {
1291            if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1292                let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1293                    e,
1294                    compute_instance,
1295                );
1296                send_err(tx, err);
1297                return;
1298            }
1299        }
1300
1301        // Note: We don't create an ExecuteContextExtra here because the frontend handles
1302        // all statement logging for COPY TO operations.
1303
1304        let sink_id = df_desc.sink_id();
1305
1306        // Create and register ActiveCopyTo.
1307        // Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
1308        // This is different from the command's tx which sends the response to the client
1309        let (sink_tx, sink_rx) = oneshot::channel();
1310        let active_copy_to = ActiveCopyTo {
1311            conn_id: conn_id.clone(),
1312            tx: sink_tx,
1313            cluster_id: compute_instance,
1314            depends_on: source_ids,
1315        };
1316
1317        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1318        drop(
1319            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1320                .await,
1321        );
1322
1323        // Try to ship the dataflow. We handle errors gracefully because dependencies might have
1324        // disappeared during sequencing.
1325        if let Err(e) = self
1326            .try_ship_dataflow(df_desc, compute_instance, target_replica)
1327            .await
1328            .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1329        {
1330            // Clean up the active compute sink that was added above, since the dataflow was never
1331            // created. If we don't do this, the sink_id remains in drop_sinks but no collection
1332            // exists in the compute controller, causing a panic when the connection terminates.
1333            self.remove_active_compute_sink(sink_id).await;
1334            send_err(tx, e);
1335            return;
1336        }
1337
1338        // Spawn background task to wait for completion
1339        // We must NOT await sink_rx here directly, as that would block the coordinator's main task
1340        // from processing the completion message. Instead, we spawn a background task that will
1341        // send the result through tx when the COPY TO completes.
1342        let span = Span::current();
1343        task::spawn(
1344            || "copy to completion",
1345            async move {
1346                let res = sink_rx.await;
1347                let result = match res {
1348                    Ok(res) => res,
1349                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1350                };
1351
1352                let _ = tx.send(result);
1353            }
1354            .instrument(span),
1355        );
1356    }
1357
1358    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
1359    /// client immediately, as opposed to asking the dataflow layer to send along
1360    /// the rows after some computation.
1361    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1362    where
1363        I: IntoRowIterator,
1364        I::Iter: Send + Sync + 'static,
1365    {
1366        let rows = Box::new(rows.into_row_iter());
1367        ExecuteResponse::SendingRowsImmediate { rows }
1368    }
1369}
1370
1371#[cfg(test)]
1372mod tests {
1373    use mz_expr::func::IsNull;
1374    use mz_expr::{MapFilterProject, UnaryFunc};
1375    use mz_ore::str::Indent;
1376    use mz_repr::explain::text::text_string_at;
1377    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1378    use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1379
1380    use super::*;
1381
1382    #[mz_ore::test]
1383    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1384    fn test_fast_path_plan_as_text() {
1385        let typ = SqlRelationType::new(vec![SqlColumnType {
1386            scalar_type: SqlScalarType::String,
1387            nullable: false,
1388        }]);
1389        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1390        let no_lookup = FastPathPlan::PeekExisting(
1391            GlobalId::User(8),
1392            GlobalId::User(10),
1393            None,
1394            MapFilterProject::new(4)
1395                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1396                .project([1, 4])
1397                .into_plan()
1398                .expect("invalid plan")
1399                .into_nontemporal()
1400                .expect("invalid nontemporal"),
1401        );
1402        let lookup = FastPathPlan::PeekExisting(
1403            GlobalId::User(9),
1404            GlobalId::User(11),
1405            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1406            MapFilterProject::new(3)
1407                .filter(Some(
1408                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1409                ))
1410                .into_plan()
1411                .expect("invalid plan")
1412                .into_nontemporal()
1413                .expect("invalid nontemporal"),
1414        );
1415
1416        let humanizer = DummyHumanizer;
1417        let config = ExplainConfig {
1418            redacted: false,
1419            verbose_syntax: true,
1420            ..Default::default()
1421        };
1422        let ctx_gen = || {
1423            let indent = Indent::default();
1424            let annotations = BTreeMap::new();
1425            PlanRenderingContext::<FastPathPlan>::new(
1426                indent,
1427                &humanizer,
1428                annotations,
1429                &config,
1430                BTreeSet::default(),
1431            )
1432        };
1433
1434        let constant_err_exp = "Error \"division by zero\"\n";
1435        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1436        let lookup_exp =
1437            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1438
1439        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1440        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1441        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1442
1443        let mut constant_rows = vec![
1444            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1445            (Row::pack(Some(Datum::String("world"))), 2.into()),
1446            (Row::pack(Some(Datum::String("star"))), 500.into()),
1447        ];
1448        let constant_exp1 =
1449            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
1450        assert_eq!(
1451            text_string_at(
1452                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1453                ctx_gen
1454            ),
1455            constant_exp1
1456        );
1457        constant_rows
1458            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1459        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
1460        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
1461        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
1462        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
1463        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
1464        assert_eq!(
1465            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1466            constant_exp2
1467        );
1468    }
1469}