mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36    RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::task;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_persist_client::Schemas;
43use mz_persist_types::codec_impls::UnitSchema;
44use mz_repr::explain::text::DisplayText;
45use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
46use mz_repr::{
47    Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
48    preserves_order,
49};
50use mz_storage_types::sources::SourceData;
51use serde::{Deserialize, Serialize};
52use timely::progress::{Antichain, Timestamp};
53use tokio::sync::oneshot;
54use tracing::{Instrument, Span};
55use uuid::Uuid;
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
58use crate::coord::timestamp_selection::TimestampDetermination;
59use crate::optimize::OptimizerError;
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::util::ResultExt;
63use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
64
65/// A peek is a request to read data from a maintained arrangement.
66#[derive(Debug)]
67pub(crate) struct PendingPeek {
68    /// The connection that initiated the peek.
69    pub(crate) conn_id: ConnectionId,
70    /// The cluster that the peek is being executed on.
71    pub(crate) cluster_id: ClusterId,
72    /// All `GlobalId`s that the peek depend on.
73    pub(crate) depends_on: BTreeSet<GlobalId>,
74    /// Context about the execute that produced this peek,
75    /// needed by the coordinator for retiring it.
76    pub(crate) ctx_extra: ExecuteContextGuard,
77    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
78    pub(crate) is_fast_path: bool,
79}
80
81/// The response from a `Peek`, with row multiplicities represented in unary.
82///
83/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
84/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
85#[derive(Debug)]
86pub enum PeekResponseUnary {
87    Rows(Box<dyn RowIterator + Send + Sync>),
88    Error(String),
89    Canceled,
90}
91
92#[derive(Clone, Debug)]
93pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
94    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
95    pub(crate) id: GlobalId,
96    key: Vec<MirScalarExpr>,
97    permutation: Vec<usize>,
98    thinned_arity: usize,
99}
100
101impl<T> PeekDataflowPlan<T> {
102    pub fn new(
103        desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
104        id: GlobalId,
105        typ: &SqlRelationType,
106    ) -> Self {
107        let arity = typ.arity();
108        let key = typ
109            .default_key()
110            .into_iter()
111            .map(MirScalarExpr::column)
112            .collect::<Vec<_>>();
113        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
114        Self {
115            desc,
116            id,
117            key,
118            permutation,
119            thinned_arity: thinning.len(),
120        }
121    }
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
125pub enum FastPathPlan {
126    /// The view evaluates to a constant result that can be returned.
127    ///
128    /// The [SqlRelationType] is unnecessary for evaluating the constant result but
129    /// may be helpful when printing out an explanation.
130    Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
131    /// The view can be read out of an existing arrangement.
132    /// (coll_id, idx_id, values to look up, mfp to apply)
133    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
134    /// The view can be read directly out of Persist.
135    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
136}
137
138impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
139    fn fmt_text(
140        &self,
141        f: &mut fmt::Formatter<'_>,
142        ctx: &mut PlanRenderingContext<'a, T>,
143    ) -> fmt::Result {
144        if ctx.config.verbose_syntax {
145            self.fmt_verbose_text(f, ctx)
146        } else {
147            self.fmt_default_text(f, ctx)
148        }
149    }
150}
151
152impl FastPathPlan {
153    pub fn fmt_default_text<'a, T>(
154        &self,
155        f: &mut fmt::Formatter<'_>,
156        ctx: &mut PlanRenderingContext<'a, T>,
157    ) -> fmt::Result {
158        let mode = HumanizedExplain::new(ctx.config.redacted);
159
160        match self {
161            FastPathPlan::Constant(rows, _) => {
162                write!(f, "{}→Constant ", ctx.indent)?;
163
164                match rows {
165                    Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
166                    Err(err) => {
167                        if mode.redacted() {
168                            writeln!(f, "(error: █)")?;
169                        } else {
170                            writeln!(f, "(error: {})", err.to_string().quoted(),)?;
171                        }
172                    }
173                }
174            }
175            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
176                let coll = ctx
177                    .humanizer
178                    .humanize_id(*coll_id)
179                    .unwrap_or_else(|| coll_id.to_string());
180                let idx = ctx
181                    .humanizer
182                    .humanize_id(*idx_id)
183                    .unwrap_or_else(|| idx_id.to_string());
184                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
185                ctx.indent.set();
186
187                ctx.indent += 1;
188
189                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
190                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
191
192                if printed {
193                    ctx.indent += 1;
194                }
195                if let Some(literal_constraints) = literal_constraints {
196                    writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
197                    ctx.indent += 1;
198                    let values = separated("; ", mode.seq(literal_constraints, None));
199                    writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
200                } else {
201                    writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
202                }
203
204                ctx.indent.reset();
205            }
206            FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
207                let coll = ctx
208                    .humanizer
209                    .humanize_id(*global_id)
210                    .unwrap_or_else(|| global_id.to_string());
211                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
212                ctx.indent.set();
213
214                ctx.indent += 1;
215
216                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
217                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
218
219                if printed {
220                    ctx.indent += 1;
221                }
222                if let Some(literal_constraint) = literal_constraint {
223                    writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
224                    ctx.indent += 1;
225                    let value = mode.expr(literal_constraint, None);
226                    writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
227                } else {
228                    writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
229                }
230
231                ctx.indent.reset();
232            }
233        }
234
235        Ok(())
236    }
237
238    pub fn fmt_verbose_text<'a, T>(
239        &self,
240        f: &mut fmt::Formatter<'_>,
241        ctx: &mut PlanRenderingContext<'a, T>,
242    ) -> fmt::Result {
243        let redacted = ctx.config.redacted;
244        let mode = HumanizedExplain::new(redacted);
245
246        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
247        // code.
248        match self {
249            FastPathPlan::Constant(Ok(rows), _) => {
250                if !rows.is_empty() {
251                    writeln!(f, "{}Constant", ctx.indent)?;
252                    *ctx.as_mut() += 1;
253                    fmt_text_constant_rows(
254                        f,
255                        rows.iter().map(|(row, diff)| (row, diff)),
256                        ctx.as_mut(),
257                        redacted,
258                    )?;
259                    *ctx.as_mut() -= 1;
260                } else {
261                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
262                }
263                Ok(())
264            }
265            FastPathPlan::Constant(Err(err), _) => {
266                if redacted {
267                    writeln!(f, "{}Error █", ctx.as_mut())
268                } else {
269                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
270                }
271            }
272            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
273                ctx.as_mut().set();
274                let (map, filter, project) = mfp.as_map_filter_project();
275
276                let cols = if !ctx.config.humanized_exprs {
277                    None
278                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
279                    // FIXME: account for thinning and permutation
280                    // See mz_expr::permutation_for_arrangement
281                    // See permute_oneshot_mfp_around_index
282                    let cols = itertools::chain(
283                        cols.iter().cloned(),
284                        std::iter::repeat(String::new()).take(map.len()),
285                    )
286                    .collect();
287                    Some(cols)
288                } else {
289                    None
290                };
291
292                if project.len() != mfp.input_arity + map.len()
293                    || !project.iter().enumerate().all(|(i, o)| i == *o)
294                {
295                    let outputs = mode.seq(&project, cols.as_ref());
296                    let outputs = CompactScalars(outputs);
297                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
298                    *ctx.as_mut() += 1;
299                }
300                if !filter.is_empty() {
301                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
302                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
303                    *ctx.as_mut() += 1;
304                }
305                if !map.is_empty() {
306                    let scalars = mode.seq(&map, cols.as_ref());
307                    let scalars = CompactScalars(scalars);
308                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
309                    *ctx.as_mut() += 1;
310                }
311                MirRelationExpr::fmt_indexed_filter(
312                    f,
313                    ctx,
314                    coll_id,
315                    idx_id,
316                    literal_constraints.clone(),
317                    None,
318                )?;
319                writeln!(f)?;
320                ctx.as_mut().reset();
321                Ok(())
322            }
323            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
324                ctx.as_mut().set();
325                let (map, filter, project) = mfp.as_map_filter_project();
326
327                let cols = if !ctx.config.humanized_exprs {
328                    None
329                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
330                    let cols = itertools::chain(
331                        cols.iter().cloned(),
332                        std::iter::repeat(String::new()).take(map.len()),
333                    )
334                    .collect::<Vec<_>>();
335                    Some(cols)
336                } else {
337                    None
338                };
339
340                if project.len() != mfp.input_arity + map.len()
341                    || !project.iter().enumerate().all(|(i, o)| i == *o)
342                {
343                    let outputs = mode.seq(&project, cols.as_ref());
344                    let outputs = CompactScalars(outputs);
345                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
346                    *ctx.as_mut() += 1;
347                }
348                if !filter.is_empty() {
349                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
350                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
351                    *ctx.as_mut() += 1;
352                }
353                if !map.is_empty() {
354                    let scalars = mode.seq(&map, cols.as_ref());
355                    let scalars = CompactScalars(scalars);
356                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
357                    *ctx.as_mut() += 1;
358                }
359                let human_id = ctx
360                    .humanizer
361                    .humanize_id(*gid)
362                    .unwrap_or_else(|| gid.to_string());
363                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
364                if let Some(literal) = literal_constraint {
365                    let value = mode.expr(literal, None);
366                    writeln!(f, " [value={}]", value)?;
367                } else {
368                    writeln!(f, "")?;
369                }
370                ctx.as_mut().reset();
371                Ok(())
372            }
373        }?;
374        Ok(())
375    }
376}
377
378#[derive(Debug)]
379pub struct PlannedPeek {
380    pub plan: PeekPlan,
381    pub determination: TimestampDetermination<mz_repr::Timestamp>,
382    pub conn_id: ConnectionId,
383    /// The result type _after_ reading out of the "source" and applying any
384    /// [MapFilterProject](mz_expr::MapFilterProject), but _before_ applying a
385    /// [RowSetFinishing].
386    ///
387    /// This is _the_ `result_type` as far as compute is concerned and further
388    /// changes through projections happen purely in the adapter.
389    pub intermediate_result_type: SqlRelationType,
390    pub source_arity: usize,
391    pub source_ids: BTreeSet<GlobalId>,
392}
393
394/// Possible ways in which the coordinator could produce the result for a goal view.
395#[derive(Clone, Debug)]
396pub enum PeekPlan<T = mz_repr::Timestamp> {
397    FastPath(FastPathPlan),
398    /// The view must be installed as a dataflow and then read.
399    SlowPath(PeekDataflowPlan<T>),
400}
401
402/// Convert `mfp` to an executable, non-temporal plan.
403/// It should be non-temporal, as OneShot preparation populates `mz_now`.
404///
405/// If the `mfp` can't be converted into a non-temporal plan, this returns an _internal_ error.
406fn mfp_to_safe_plan(
407    mfp: mz_expr::MapFilterProject,
408) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
409    mfp.into_plan()
410        .map_err(OptimizerError::InternalUnsafeMfpPlan)?
411        .into_nontemporal()
412        .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
413}
414
415/// If it can't convert `mfp` into a `SafeMfpPlan`, this returns an _internal_ error.
416fn permute_oneshot_mfp_around_index(
417    mfp: mz_expr::MapFilterProject,
418    key: &[MirScalarExpr],
419) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
420    let input_arity = mfp.input_arity;
421    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
422    let (permute, thinning) = permutation_for_arrangement(key, input_arity);
423    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
424    Ok(safe_mfp)
425}
426
427/// Determine if the dataflow plan can be implemented without an actual dataflow.
428///
429/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
430/// we can avoid building a dataflow (and either just return the results, or peek
431/// out of the arrangement, respectively).
432pub fn create_fast_path_plan<T: Timestamp>(
433    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
434    view_id: GlobalId,
435    finishing: Option<&RowSetFinishing>,
436    persist_fast_path_limit: usize,
437    persist_fast_path_order: bool,
438) -> Result<Option<FastPathPlan>, OptimizerError> {
439    // At this point, `dataflow_plan` contains our best optimized dataflow.
440    // We will check the plan to see if there is a fast path to escape full dataflow construction.
441
442    // We need to restrict ourselves to settings where the inserted transient view is the first thing
443    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
444    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
445    {
446        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
447        if let Some((rows, found_typ)) = mir.as_const() {
448            // In the case of a constant, we can return the result now.
449            let plan = FastPathPlan::Constant(rows.clone(), found_typ.clone());
450            return Ok(Some(plan));
451        } else {
452            // If there is a TopK that would be completely covered by the finishing, then jump
453            // through the TopK.
454            if let MirRelationExpr::TopK {
455                input,
456                group_key,
457                order_key,
458                limit,
459                offset,
460                monotonic: _,
461                expected_group_size: _,
462            } = mir
463            {
464                if let Some(finishing) = finishing {
465                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
466                        // The following is roughly `limit >= finishing.limit`, but with Options.
467                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
468                            (None, _) => true,
469                            (Some(..), None) => false,
470                            (Some(topk_limit), Some(finishing_limit)) => {
471                                if let Some(l) = topk_limit.as_literal_int64() {
472                                    l >= *finishing_limit
473                                } else {
474                                    false
475                                }
476                            }
477                        };
478                        if finishing_limits_at_least_as_topk {
479                            mir = input;
480                        }
481                    }
482                }
483            }
484            // In the case of a linear operator around an indexed view, we
485            // can skip creating a dataflow and instead pull all the rows in
486            // index and apply the linear operator against them.
487            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
488            match mir {
489                MirRelationExpr::Get {
490                    id: Id::Global(get_id),
491                    typ: relation_typ,
492                    ..
493                } => {
494                    // Just grab any arrangement if an arrangement exists
495                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
496                        if desc.on_id == *get_id {
497                            return Ok(Some(FastPathPlan::PeekExisting(
498                                *get_id,
499                                *index_id,
500                                None,
501                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
502                            )));
503                        }
504                    }
505
506                    // If there is no arrangement, consider peeking the persist shard directly.
507                    // Generally, we consider a persist peek when the query can definitely be satisfied
508                    // by scanning through a small, constant number of Persist key-values.
509                    let safe_mfp = mfp_to_safe_plan(mfp)?;
510                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
511
512                    let literal_constraint = if persist_fast_path_order {
513                        let mut row = Row::default();
514                        let mut packer = row.packer();
515                        for (idx, col) in relation_typ.column_types.iter().enumerate() {
516                            if !preserves_order(&col.scalar_type) {
517                                break;
518                            }
519                            let col_expr = MirScalarExpr::column(idx);
520
521                            let Some((literal, _)) = filters
522                                .iter()
523                                .filter_map(|f| f.expr_eq_literal(&col_expr))
524                                .next()
525                            else {
526                                break;
527                            };
528                            packer.extend_by_row(&literal);
529                        }
530                        if row.is_empty() { None } else { Some(row) }
531                    } else {
532                        None
533                    };
534
535                    let finish_ok = match &finishing {
536                        None => false,
537                        Some(RowSetFinishing {
538                            order_by,
539                            limit,
540                            offset,
541                            ..
542                        }) => {
543                            let order_ok = if persist_fast_path_order {
544                                order_by.iter().enumerate().all(|(idx, order)| {
545                                    // Map the ordering column back to the column in the source data.
546                                    // (If it's not one of the input columns, we can't make any guarantees.)
547                                    let column_idx = projection[order.column];
548                                    if column_idx >= safe_mfp.input_arity {
549                                        return false;
550                                    }
551                                    let column_type = &relation_typ.column_types[column_idx];
552                                    let index_ok = idx == column_idx;
553                                    let nulls_ok = !column_type.nullable || order.nulls_last;
554                                    let asc_ok = !order.desc;
555                                    let type_ok = preserves_order(&column_type.scalar_type);
556                                    index_ok && nulls_ok && asc_ok && type_ok
557                                })
558                            } else {
559                                order_by.is_empty()
560                            };
561                            let limit_ok = limit.map_or(false, |l| {
562                                usize::cast_from(l) + *offset < persist_fast_path_limit
563                            });
564                            order_ok && limit_ok
565                        }
566                    };
567
568                    let key_constraint = if let Some(literal) = &literal_constraint {
569                        let prefix_len = literal.iter().count();
570                        relation_typ
571                            .keys
572                            .iter()
573                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
574                    } else {
575                        false
576                    };
577
578                    // We can generate a persist peek when:
579                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
580                    // - We can return the first N key values (no filters, small limit, consistent order)
581                    if key_constraint || (filters.is_empty() && finish_ok) {
582                        return Ok(Some(FastPathPlan::PeekPersist(
583                            *get_id,
584                            literal_constraint,
585                            safe_mfp,
586                        )));
587                    }
588                }
589                MirRelationExpr::Join { implementation, .. } => {
590                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
591                        implementation
592                    {
593                        return Ok(Some(FastPathPlan::PeekExisting(
594                            *coll_id,
595                            *idx_id,
596                            Some(vals.clone()),
597                            permute_oneshot_mfp_around_index(mfp, key)?,
598                        )));
599                    }
600                }
601                // nothing can be done for non-trivial expressions.
602                _ => {}
603            }
604        }
605    }
606    Ok(None)
607}
608
609impl FastPathPlan {
610    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
611        match self {
612            FastPathPlan::Constant(..) => UsedIndexes::default(),
613            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
614                if literal_constraints.is_some() {
615                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
616                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
617                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
618                } else {
619                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
620                }
621            }
622            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
623        }
624    }
625}
626
627impl crate::coord::Coordinator {
628    /// Implements a peek plan produced by `create_plan` above.
629    #[mz_ore::instrument(level = "debug")]
630    pub async fn implement_peek_plan(
631        &mut self,
632        ctx_extra: &mut ExecuteContextGuard,
633        plan: PlannedPeek,
634        finishing: RowSetFinishing,
635        compute_instance: ComputeInstanceId,
636        target_replica: Option<ReplicaId>,
637        max_result_size: u64,
638        max_returned_query_size: Option<u64>,
639    ) -> Result<ExecuteResponse, AdapterError> {
640        let PlannedPeek {
641            plan: fast_path,
642            determination,
643            conn_id,
644            intermediate_result_type,
645            source_arity,
646            source_ids,
647        } = plan;
648
649        // If the dataflow optimizes to a constant expression, we can immediately return the result.
650        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
651            let mut rows = match rows {
652                Ok(rows) => rows,
653                Err(e) => return Err(e.into()),
654            };
655            // Consolidate down the results to get correct totals.
656            consolidate(&mut rows);
657
658            let mut results = Vec::new();
659            for (row, count) in rows {
660                if count.is_negative() {
661                    Err(EvalError::InvalidParameterValue(
662                        format!("Negative multiplicity in constant result: {}", count).into(),
663                    ))?
664                };
665                if count.is_positive() {
666                    let count = usize::cast_from(
667                        u64::try_from(count.into_inner())
668                            .expect("known to be positive from check above"),
669                    );
670                    results.push((
671                        row,
672                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
673                    ));
674                }
675            }
676            let row_collection = RowCollection::new(results, &finishing.order_by);
677            let duration_histogram = self.metrics.row_set_finishing_seconds();
678
679            let (ret, reason) = match finishing.finish(
680                row_collection,
681                max_result_size,
682                max_returned_query_size,
683                &duration_histogram,
684            ) {
685                Ok((rows, row_size_bytes)) => {
686                    let result_size = u64::cast_from(row_size_bytes);
687                    let rows_returned = u64::cast_from(rows.count());
688                    (
689                        Ok(Self::send_immediate_rows(rows)),
690                        StatementEndedExecutionReason::Success {
691                            result_size: Some(result_size),
692                            rows_returned: Some(rows_returned),
693                            execution_strategy: Some(StatementExecutionStrategy::Constant),
694                        },
695                    )
696                }
697                Err(error) => (
698                    Err(AdapterError::ResultSize(error.clone())),
699                    StatementEndedExecutionReason::Errored { error },
700                ),
701            };
702            self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
703            return ret;
704        }
705
706        let timestamp = determination.timestamp_context.timestamp_or_default();
707        if let Some(id) = ctx_extra.contents() {
708            self.set_statement_execution_timestamp(id, timestamp)
709        }
710
711        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
712        // In both cases we will want to peek, and the main difference is that we might want to
713        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
714        // differently.
715
716        // If we must build the view, ship the dataflow.
717        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
718            PeekPlan::FastPath(FastPathPlan::PeekExisting(
719                _coll_id,
720                idx_id,
721                literal_constraints,
722                map_filter_project,
723            )) => (
724                (literal_constraints, timestamp, map_filter_project),
725                None,
726                true,
727                PeekTarget::Index { id: idx_id },
728                StatementExecutionStrategy::FastPath,
729            ),
730            PeekPlan::FastPath(FastPathPlan::PeekPersist(
731                coll_id,
732                literal_constraint,
733                map_filter_project,
734            )) => {
735                let peek_command = (
736                    literal_constraint.map(|r| vec![r]),
737                    timestamp,
738                    map_filter_project,
739                );
740                let metadata = self
741                    .controller
742                    .storage
743                    .collection_metadata(coll_id)
744                    .expect("storage collection for fast-path peek")
745                    .clone();
746                (
747                    peek_command,
748                    None,
749                    true,
750                    PeekTarget::Persist {
751                        id: coll_id,
752                        metadata,
753                    },
754                    StatementExecutionStrategy::PersistFastPath,
755                )
756            }
757            PeekPlan::SlowPath(PeekDataflowPlan {
758                desc: dataflow,
759                // n.b. this index_id identifies a transient index the
760                // caller created, so it is guaranteed to be on
761                // `compute_instance`.
762                id: index_id,
763                key: index_key,
764                permutation: index_permutation,
765                thinned_arity: index_thinned_arity,
766            }) => {
767                let output_ids = dataflow.export_ids().collect();
768
769                // Very important: actually create the dataflow (here, so we can destructure).
770                self.controller
771                    .compute
772                    .create_dataflow(compute_instance, dataflow, None)
773                    .map_err(
774                        AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
775                    )?;
776                self.initialize_compute_read_policies(
777                    output_ids,
778                    compute_instance,
779                    // Disable compaction so that nothing can compact before the peek occurs below.
780                    CompactionWindow::DisableCompaction,
781                )
782                .await;
783
784                // Create an identity MFP operator.
785                let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
786                map_filter_project.permute_fn(
787                    |c| index_permutation[c],
788                    index_key.len() + index_thinned_arity,
789                );
790                let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
791
792                (
793                    (None, timestamp, map_filter_project),
794                    Some(index_id),
795                    false,
796                    PeekTarget::Index { id: index_id },
797                    StatementExecutionStrategy::Standard,
798                )
799            }
800            _ => {
801                unreachable!()
802            }
803        };
804
805        // Endpoints for sending and receiving peek responses.
806        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
807
808        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
809        // small but unlikely chance that it's not unique to completed peeks.
810        let mut uuid = Uuid::new_v4();
811        while self.pending_peeks.contains_key(&uuid) {
812            uuid = Uuid::new_v4();
813        }
814
815        // The peek is ready to go for both cases, fast and non-fast.
816        // Stash the response mechanism, and broadcast dataflow construction.
817        self.pending_peeks.insert(
818            uuid,
819            PendingPeek {
820                conn_id: conn_id.clone(),
821                cluster_id: compute_instance,
822                depends_on: source_ids,
823                ctx_extra: std::mem::take(ctx_extra),
824                is_fast_path,
825            },
826        );
827        self.client_pending_peeks
828            .entry(conn_id)
829            .or_default()
830            .insert(uuid, compute_instance);
831        let (literal_constraints, timestamp, map_filter_project) = peek_command;
832
833        // At this stage we don't know column names for the result because we
834        // only know the peek's result type as a bare SqlRelationType.
835        let peek_result_column_names =
836            (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
837        let peek_result_desc =
838            RelationDesc::new(intermediate_result_type, peek_result_column_names);
839
840        self.controller
841            .compute
842            .peek(
843                compute_instance,
844                peek_target,
845                literal_constraints,
846                uuid,
847                timestamp,
848                peek_result_desc,
849                finishing.clone(),
850                map_filter_project,
851                target_replica,
852                rows_tx,
853            )
854            .unwrap_or_terminate("cannot fail to peek");
855
856        let duration_histogram = self.metrics.row_set_finishing_seconds();
857
858        // If a dataflow was created, drop it once the peek command is sent.
859        if let Some(index_id) = drop_dataflow {
860            self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
861            self.drop_compute_collections(vec![(compute_instance, index_id)]);
862        }
863
864        let persist_client = self.persist_client.clone();
865        let peek_stash_read_batch_size_bytes =
866            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
867                .get(self.catalog().system_config().dyncfgs());
868        let peek_stash_read_memory_budget_bytes =
869            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
870                .get(self.catalog().system_config().dyncfgs());
871
872        let peek_response_stream = Self::create_peek_response_stream(
873            rows_rx,
874            finishing,
875            max_result_size,
876            max_returned_query_size,
877            duration_histogram,
878            persist_client,
879            peek_stash_read_batch_size_bytes,
880            peek_stash_read_memory_budget_bytes,
881        );
882
883        Ok(crate::ExecuteResponse::SendingRowsStreaming {
884            rows: Box::pin(peek_response_stream),
885            instance_id: compute_instance,
886            strategy,
887        })
888    }
889
890    /// Creates an async stream that processes peek responses and yields rows.
891    ///
892    /// TODO(peek-seq): Move this out of `coord` once we delete the old peek sequencing.
893    #[mz_ore::instrument(level = "debug")]
894    pub(crate) fn create_peek_response_stream(
895        rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
896        finishing: RowSetFinishing,
897        max_result_size: u64,
898        max_returned_query_size: Option<u64>,
899        duration_histogram: prometheus::Histogram,
900        mut persist_client: mz_persist_client::PersistClient,
901        peek_stash_read_batch_size_bytes: usize,
902        peek_stash_read_memory_budget_bytes: usize,
903    ) -> impl futures::Stream<Item = PeekResponseUnary> {
904        async_stream::stream!({
905            let result = rows_rx.await;
906
907            let rows = match result {
908                Ok(rows) => rows,
909                Err(e) => {
910                    yield PeekResponseUnary::Error(e.to_string());
911                    return;
912                }
913            };
914
915            match rows {
916                PeekResponse::Rows(rows) => {
917                    match finishing.finish(
918                        rows,
919                        max_result_size,
920                        max_returned_query_size,
921                        &duration_histogram,
922                    ) {
923                        Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
924                        Err(e) => yield PeekResponseUnary::Error(e),
925                    }
926                }
927                PeekResponse::Stashed(response) => {
928                    let response = *response;
929
930                    let shard_id = response.shard_id;
931
932                    let mut batches = Vec::new();
933                    for proto_batch in response.batches.into_iter() {
934                        let batch =
935                            persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
936
937                        batches.push(batch);
938                    }
939                    tracing::trace!(?batches, "stashed peek response");
940
941                    let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
942                    let read_schemas: Schemas<SourceData, ()> = Schemas {
943                        id: None,
944                        key: Arc::new(response.relation_desc.clone()),
945                        val: Arc::new(UnitSchema),
946                    };
947
948                    let mut row_cursor = persist_client
949                        .read_batches_consolidated::<_, _, _, i64>(
950                            response.shard_id,
951                            as_of,
952                            read_schemas,
953                            batches,
954                            |_stats| true,
955                            peek_stash_read_memory_budget_bytes,
956                        )
957                        .await
958                        .expect("invalid usage");
959
960                    // NOTE: Using the cursor creates Futures that are not Sync,
961                    // so we can't drive them on the main Coordinator loop.
962                    // Spawning a task has the additional benefit that we get to
963                    // delete batches once we're done.
964                    //
965                    // Batch deletion is best-effort, though, and there are
966                    // multiple known ways in which they can leak, among them:
967                    //
968                    // - ProtoBatch is lost in flight
969                    // - ProtoBatch is lost because when combining PeekResponse
970                    // from workers a cancellation or error "overrides" other
971                    // results, meaning we drop them
972                    // - This task here is not run to completion before it can
973                    // delete all batches
974                    //
975                    // This is semi-ok, because persist needs a reaper of leaked
976                    // batches already, and so we piggy-back on that, even if it
977                    // might not exist as of today.
978                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
979                    mz_ore::task::spawn(|| "read_peek_batches", async move {
980                        // We always send our inline rows first. Ordering
981                        // doesn't matter because we can only be in this case
982                        // when there is no ORDER BY.
983                        //
984                        // We _could_ write these out as a Batch, and include it
985                        // in the batches we read via the Consolidator. If we
986                        // wanted to get a consistent ordering. That's not
987                        // needed for correctness! But might be nice for more
988                        // aesthetic reasons.
989                        let result = tx.send(response.inline_rows).await;
990                        if result.is_err() {
991                            tracing::debug!("receiver went away");
992                        }
993
994                        let mut current_batch = Vec::new();
995                        let mut current_batch_size: usize = 0;
996
997                        'outer: while let Some(rows) = row_cursor.next().await {
998                            for ((source_data, _val), _ts, diff) in rows {
999                                let row = source_data
1000                                    .0
1001                                    .expect("we are not sending errors on this code path");
1002
1003                                let diff = usize::try_from(diff)
1004                                    .expect("peek responses cannot have negative diffs");
1005
1006                                if diff > 0 {
1007                                    let diff =
1008                                        NonZeroUsize::new(diff).expect("checked to be non-zero");
1009                                    current_batch_size =
1010                                        current_batch_size.saturating_add(row.byte_len());
1011                                    current_batch.push((row, diff));
1012                                }
1013
1014                                if current_batch_size > peek_stash_read_batch_size_bytes {
1015                                    // We're re-encoding the rows as a RowCollection
1016                                    // here, for which we pay in CPU time. We're in a
1017                                    // slow path already, since we're returning a big
1018                                    // stashed result so this is worth the convenience
1019                                    // of that for now.
1020                                    let result = tx
1021                                        .send(RowCollection::new(
1022                                            current_batch.drain(..).collect_vec(),
1023                                            &[],
1024                                        ))
1025                                        .await;
1026                                    if result.is_err() {
1027                                        tracing::debug!("receiver went away");
1028                                        // Don't return but break so we fall out to the
1029                                        // batch delete logic below.
1030                                        break 'outer;
1031                                    }
1032
1033                                    current_batch_size = 0;
1034                                }
1035                            }
1036                        }
1037
1038                        if current_batch.len() > 0 {
1039                            let result = tx.send(RowCollection::new(current_batch, &[])).await;
1040                            if result.is_err() {
1041                                tracing::debug!("receiver went away");
1042                            }
1043                        }
1044
1045                        let batches = row_cursor.into_lease();
1046                        tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1047                        for batch in batches {
1048                            batch.delete().await;
1049                        }
1050                    });
1051
1052                    assert!(
1053                        finishing.is_streamable(response.relation_desc.arity()),
1054                        "can only get stashed responses when the finishing is streamable"
1055                    );
1056
1057                    tracing::trace!("query result is streamable!");
1058
1059                    assert!(finishing.is_streamable(response.relation_desc.arity()));
1060                    let mut incremental_finishing = RowSetFinishingIncremental::new(
1061                        finishing.offset,
1062                        finishing.limit,
1063                        finishing.project,
1064                        max_returned_query_size,
1065                    );
1066
1067                    let mut got_zero_rows = true;
1068                    while let Some(rows) = rx.recv().await {
1069                        got_zero_rows = false;
1070
1071                        let result_rows = incremental_finishing.finish_incremental(
1072                            rows,
1073                            max_result_size,
1074                            &duration_histogram,
1075                        );
1076
1077                        match result_rows {
1078                            Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1079                            Err(e) => yield PeekResponseUnary::Error(e),
1080                        }
1081                    }
1082
1083                    // Even when there's zero rows, clients still expect an
1084                    // empty PeekResponse.
1085                    if got_zero_rows {
1086                        let row_iter = vec![].into_row_iter();
1087                        yield PeekResponseUnary::Rows(Box::new(row_iter));
1088                    }
1089                }
1090                PeekResponse::Canceled => {
1091                    yield PeekResponseUnary::Canceled;
1092                }
1093                PeekResponse::Error(e) => {
1094                    yield PeekResponseUnary::Error(e);
1095                }
1096            }
1097        })
1098    }
1099
1100    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
1101    #[mz_ore::instrument(level = "debug")]
1102    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1103        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1104            self.metrics
1105                .canceled_peeks
1106                .inc_by(u64::cast_from(uuids.len()));
1107
1108            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1109            for (uuid, compute_instance) in &uuids {
1110                inverse.entry(*compute_instance).or_default().insert(*uuid);
1111            }
1112            for (compute_instance, uuids) in inverse {
1113                // It's possible that this compute instance no longer exists because it was dropped
1114                // while the peek was in progress. In this case we ignore the error and move on
1115                // because the dataflow no longer exists.
1116                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
1117                for uuid in uuids {
1118                    let _ = self.controller.compute.cancel_peek(
1119                        compute_instance,
1120                        uuid,
1121                        PeekResponse::Canceled,
1122                    );
1123                }
1124            }
1125
1126            let peeks = uuids
1127                .iter()
1128                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1129                .collect::<Vec<_>>();
1130            for peek in peeks {
1131                self.retire_execution(
1132                    StatementEndedExecutionReason::Canceled,
1133                    peek.ctx_extra.defuse(),
1134                );
1135            }
1136        }
1137    }
1138
1139    /// Handle a peek notification and retire the corresponding execution. Does nothing for
1140    /// already-removed peeks.
1141    pub(crate) fn handle_peek_notification(
1142        &mut self,
1143        uuid: Uuid,
1144        notification: PeekNotification,
1145        otel_ctx: OpenTelemetryContext,
1146    ) {
1147        // We expect exactly one peek response, which we forward. Then we clean up the
1148        // peek's state in the coordinator.
1149        if let Some(PendingPeek {
1150            conn_id: _,
1151            cluster_id: _,
1152            depends_on: _,
1153            ctx_extra,
1154            is_fast_path,
1155        }) = self.remove_pending_peek(&uuid)
1156        {
1157            let reason = match notification {
1158                PeekNotification::Success {
1159                    rows: num_rows,
1160                    result_size,
1161                } => {
1162                    let strategy = if is_fast_path {
1163                        StatementExecutionStrategy::FastPath
1164                    } else {
1165                        StatementExecutionStrategy::Standard
1166                    };
1167                    StatementEndedExecutionReason::Success {
1168                        result_size: Some(result_size),
1169                        rows_returned: Some(num_rows),
1170                        execution_strategy: Some(strategy),
1171                    }
1172                }
1173                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1174                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1175            };
1176            otel_ctx.attach_as_parent();
1177            self.retire_execution(reason, ctx_extra.defuse());
1178        }
1179        // Cancellation may cause us to receive responses for peeks no
1180        // longer in `self.pending_peeks`, so we quietly ignore them.
1181    }
1182
1183    /// Clean up a peek's state.
1184    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1185        let pending_peek = self.pending_peeks.remove(uuid);
1186        if let Some(pending_peek) = &pending_peek {
1187            let uuids = self
1188                .client_pending_peeks
1189                .get_mut(&pending_peek.conn_id)
1190                .expect("coord peek state is inconsistent");
1191            uuids.remove(uuid);
1192            if uuids.is_empty() {
1193                self.client_pending_peeks.remove(&pending_peek.conn_id);
1194            }
1195        }
1196        pending_peek
1197    }
1198
1199    /// Implements a slow-path peek by creating a transient dataflow.
1200    /// This is called from the command handler for ExecuteSlowPathPeek.
1201    ///
1202    /// (For now, this method simply delegates to implement_peek_plan by constructing
1203    /// the necessary PlannedPeek structure.)
1204    pub(crate) async fn implement_slow_path_peek(
1205        &mut self,
1206        dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1207        determination: TimestampDetermination<mz_repr::Timestamp>,
1208        finishing: RowSetFinishing,
1209        compute_instance: ComputeInstanceId,
1210        target_replica: Option<ReplicaId>,
1211        intermediate_result_type: SqlRelationType,
1212        source_ids: BTreeSet<GlobalId>,
1213        conn_id: ConnectionId,
1214        max_result_size: u64,
1215        max_query_result_size: Option<u64>,
1216        watch_set: Option<WatchSetCreation>,
1217    ) -> Result<ExecuteResponse, AdapterError> {
1218        // Install watch sets for statement lifecycle logging if enabled.
1219        // This must happen _before_ creating ExecuteContextExtra, so that if it fails,
1220        // we don't have an ExecuteContextExtra that needs to be retired (the frontend
1221        // will handle logging for the error case).
1222        let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1223        if let Some(ws) = watch_set {
1224            self.install_peek_watch_sets(conn_id.clone(), ws)
1225                .map_err(|e| {
1226                    AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1227                        e,
1228                        compute_instance,
1229                    )
1230                })?;
1231        }
1232
1233        let source_arity = intermediate_result_type.arity();
1234
1235        let planned_peek = PlannedPeek {
1236            plan: PeekPlan::SlowPath(dataflow_plan),
1237            determination,
1238            conn_id,
1239            intermediate_result_type,
1240            source_arity,
1241            source_ids,
1242        };
1243
1244        // Call the old peek sequencing's implement_peek_plan for now.
1245        // TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the
1246        // relevant parts of the old `implement_peek_plan` into this method, and remove the old
1247        // `implement_peek_plan`.
1248        self.implement_peek_plan(
1249            &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1250            planned_peek,
1251            finishing,
1252            compute_instance,
1253            target_replica,
1254            max_result_size,
1255            max_query_result_size,
1256        )
1257        .await
1258    }
1259
1260    /// Implements a `COPY TO` command by installing peek watch sets,
1261    /// shipping the dataflow, and spawning a background task to wait for completion.
1262    /// This is called from the command handler for ExecuteCopyTo.
1263    ///
1264    /// (The S3 preflight check must be completed successfully via the
1265    /// `CopyToPreflight` command _before_ calling this method. The preflight is
1266    /// handled separately to avoid blocking the coordinator's main task with
1267    /// slow S3 network operations.)
1268    ///
1269    /// This method does NOT block waiting for completion. Instead, it spawns a background task that
1270    /// will send the response through the provided tx channel when the COPY TO completes.
1271    /// All errors (setup or execution) are sent through tx.
1272    pub(crate) async fn implement_copy_to(
1273        &mut self,
1274        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1275        compute_instance: ComputeInstanceId,
1276        target_replica: Option<ReplicaId>,
1277        source_ids: BTreeSet<GlobalId>,
1278        conn_id: ConnectionId,
1279        watch_set: Option<WatchSetCreation>,
1280        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1281    ) {
1282        // Helper to send error and return early
1283        let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1284                        e: AdapterError| {
1285            let _ = tx.send(Err(e));
1286        };
1287
1288        // Install watch sets for statement lifecycle logging if enabled.
1289        // If this fails, we just send the error back. The frontend will handle logging
1290        // for the error case (no ExecuteContextExtra is created here).
1291        if let Some(ws) = watch_set {
1292            if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1293                let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1294                    e,
1295                    compute_instance,
1296                );
1297                send_err(tx, err);
1298                return;
1299            }
1300        }
1301
1302        // Note: We don't create an ExecuteContextExtra here because the frontend handles
1303        // all statement logging for COPY TO operations.
1304
1305        let sink_id = df_desc.sink_id();
1306
1307        // Create and register ActiveCopyTo.
1308        // Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
1309        // This is different from the command's tx which sends the response to the client
1310        let (sink_tx, sink_rx) = oneshot::channel();
1311        let active_copy_to = ActiveCopyTo {
1312            conn_id: conn_id.clone(),
1313            tx: sink_tx,
1314            cluster_id: compute_instance,
1315            depends_on: source_ids,
1316        };
1317
1318        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1319        drop(
1320            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1321                .await,
1322        );
1323
1324        // Try to ship the dataflow. We handle errors gracefully because dependencies might have
1325        // disappeared during sequencing.
1326        if let Err(e) = self
1327            .try_ship_dataflow(df_desc, compute_instance, target_replica)
1328            .await
1329            .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1330        {
1331            // Clean up the active compute sink that was added above, since the dataflow was never
1332            // created. If we don't do this, the sink_id remains in drop_sinks but no collection
1333            // exists in the compute controller, causing a panic when the connection terminates.
1334            self.remove_active_compute_sink(sink_id).await;
1335            send_err(tx, e);
1336            return;
1337        }
1338
1339        // Spawn background task to wait for completion
1340        // We must NOT await sink_rx here directly, as that would block the coordinator's main task
1341        // from processing the completion message. Instead, we spawn a background task that will
1342        // send the result through tx when the COPY TO completes.
1343        let span = Span::current();
1344        task::spawn(
1345            || "copy to completion",
1346            async move {
1347                let res = sink_rx.await;
1348                let result = match res {
1349                    Ok(res) => res,
1350                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1351                };
1352
1353                let _ = tx.send(result);
1354            }
1355            .instrument(span),
1356        );
1357    }
1358
1359    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
1360    /// client immediately, as opposed to asking the dataflow layer to send along
1361    /// the rows after some computation.
1362    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1363    where
1364        I: IntoRowIterator,
1365        I::Iter: Send + Sync + 'static,
1366    {
1367        let rows = Box::new(rows.into_row_iter());
1368        ExecuteResponse::SendingRowsImmediate { rows }
1369    }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374    use mz_expr::func::IsNull;
1375    use mz_expr::{MapFilterProject, UnaryFunc};
1376    use mz_ore::str::Indent;
1377    use mz_repr::explain::text::text_string_at;
1378    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1379    use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1380
1381    use super::*;
1382
1383    #[mz_ore::test]
1384    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1385    fn test_fast_path_plan_as_text() {
1386        let typ = SqlRelationType::new(vec![SqlColumnType {
1387            scalar_type: SqlScalarType::String,
1388            nullable: false,
1389        }]);
1390        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1391        let no_lookup = FastPathPlan::PeekExisting(
1392            GlobalId::User(8),
1393            GlobalId::User(10),
1394            None,
1395            MapFilterProject::new(4)
1396                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1397                .project([1, 4])
1398                .into_plan()
1399                .expect("invalid plan")
1400                .into_nontemporal()
1401                .expect("invalid nontemporal"),
1402        );
1403        let lookup = FastPathPlan::PeekExisting(
1404            GlobalId::User(9),
1405            GlobalId::User(11),
1406            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1407            MapFilterProject::new(3)
1408                .filter(Some(
1409                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1410                ))
1411                .into_plan()
1412                .expect("invalid plan")
1413                .into_nontemporal()
1414                .expect("invalid nontemporal"),
1415        );
1416
1417        let humanizer = DummyHumanizer;
1418        let config = ExplainConfig {
1419            redacted: false,
1420            verbose_syntax: true,
1421            ..Default::default()
1422        };
1423        let ctx_gen = || {
1424            let indent = Indent::default();
1425            let annotations = BTreeMap::new();
1426            PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1427        };
1428
1429        let constant_err_exp = "Error \"division by zero\"\n";
1430        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1431        let lookup_exp =
1432            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1433
1434        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1435        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1436        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1437
1438        let mut constant_rows = vec![
1439            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1440            (Row::pack(Some(Datum::String("world"))), 2.into()),
1441            (Row::pack(Some(Datum::String("star"))), 500.into()),
1442        ];
1443        let constant_exp1 =
1444            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
1445        assert_eq!(
1446            text_string_at(
1447                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1448                ctx_gen
1449            ),
1450            constant_exp1
1451        );
1452        constant_rows
1453            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1454        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
1455        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
1456        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
1457        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
1458        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
1459        assert_eq!(
1460            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1461            constant_exp2
1462        );
1463    }
1464}