mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_compute_types::sinks::ComputeSinkConnection;
32use mz_controller_types::ClusterId;
33use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
34use mz_expr::row::RowCollection;
35use mz_expr::{
36    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
37    RowSetFinishingIncremental, permutation_for_arrangement,
38};
39use mz_ore::cast::CastFrom;
40use mz_ore::str::{StrExt, separated};
41use mz_ore::task;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_persist_client::Schemas;
44use mz_persist_types::codec_impls::UnitSchema;
45use mz_repr::explain::text::DisplayText;
46use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
47use mz_repr::{
48    Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
49    preserves_order,
50};
51use mz_storage_types::sources::SourceData;
52use serde::{Deserialize, Serialize};
53use timely::progress::{Antichain, Timestamp};
54use tokio::sync::oneshot;
55use tracing::{Instrument, Span};
56use uuid::Uuid;
57
58use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
59use crate::coord::timestamp_selection::TimestampDetermination;
60use crate::optimize::OptimizerError;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::util::ResultExt;
63use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
64
65/// A peek is a request to read data from a maintained arrangement.
66#[derive(Debug)]
67pub(crate) struct PendingPeek {
68    /// The connection that initiated the peek.
69    pub(crate) conn_id: ConnectionId,
70    /// The cluster that the peek is being executed on.
71    pub(crate) cluster_id: ClusterId,
72    /// All `GlobalId`s that the peek depend on.
73    pub(crate) depends_on: BTreeSet<GlobalId>,
74    /// Context about the execute that produced this peek,
75    /// needed by the coordinator for retiring it.
76    pub(crate) ctx_extra: ExecuteContextExtra,
77    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
78    pub(crate) is_fast_path: bool,
79}
80
81/// The response from a `Peek`, with row multiplicities represented in unary.
82///
83/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
84/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
85#[derive(Debug)]
86pub enum PeekResponseUnary {
87    Rows(Box<dyn RowIterator + Send + Sync>),
88    Error(String),
89    Canceled,
90}
91
92#[derive(Clone, Debug)]
93pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
94    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
95    pub(crate) id: GlobalId,
96    key: Vec<MirScalarExpr>,
97    permutation: Vec<usize>,
98    thinned_arity: usize,
99}
100
101impl<T> PeekDataflowPlan<T> {
102    pub fn new(
103        desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
104        id: GlobalId,
105        typ: &SqlRelationType,
106    ) -> Self {
107        let arity = typ.arity();
108        let key = typ
109            .default_key()
110            .into_iter()
111            .map(MirScalarExpr::column)
112            .collect::<Vec<_>>();
113        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
114        Self {
115            desc,
116            id,
117            key,
118            permutation,
119            thinned_arity: thinning.len(),
120        }
121    }
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
125pub enum FastPathPlan {
126    /// The view evaluates to a constant result that can be returned.
127    ///
128    /// The [SqlRelationType] is unnecessary for evaluating the constant result but
129    /// may be helpful when printing out an explanation.
130    Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
131    /// The view can be read out of an existing arrangement.
132    /// (coll_id, idx_id, values to look up, mfp to apply)
133    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
134    /// The view can be read directly out of Persist.
135    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
136}
137
138impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
139    fn fmt_text(
140        &self,
141        f: &mut fmt::Formatter<'_>,
142        ctx: &mut PlanRenderingContext<'a, T>,
143    ) -> fmt::Result {
144        if ctx.config.verbose_syntax {
145            self.fmt_verbose_text(f, ctx)
146        } else {
147            self.fmt_default_text(f, ctx)
148        }
149    }
150}
151
152impl FastPathPlan {
153    pub fn fmt_default_text<'a, T>(
154        &self,
155        f: &mut fmt::Formatter<'_>,
156        ctx: &mut PlanRenderingContext<'a, T>,
157    ) -> fmt::Result {
158        let mode = HumanizedExplain::new(ctx.config.redacted);
159
160        match self {
161            FastPathPlan::Constant(rows, _) => {
162                write!(f, "{}→Constant ", ctx.indent)?;
163
164                match rows {
165                    Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
166                    Err(err) => {
167                        if mode.redacted() {
168                            writeln!(f, "(error: █)")?;
169                        } else {
170                            writeln!(f, "(error: {})", err.to_string().quoted(),)?;
171                        }
172                    }
173                }
174            }
175            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
176                let coll = ctx
177                    .humanizer
178                    .humanize_id(*coll_id)
179                    .unwrap_or_else(|| coll_id.to_string());
180                let idx = ctx
181                    .humanizer
182                    .humanize_id(*idx_id)
183                    .unwrap_or_else(|| idx_id.to_string());
184                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
185                ctx.indent.set();
186
187                ctx.indent += 1;
188
189                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
190                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
191
192                if printed {
193                    ctx.indent += 1;
194                }
195                if let Some(literal_constraints) = literal_constraints {
196                    writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
197                    ctx.indent += 1;
198                    let values = separated("; ", mode.seq(literal_constraints, None));
199                    writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
200                } else {
201                    writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
202                }
203
204                ctx.indent.reset();
205            }
206            FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
207                let coll = ctx
208                    .humanizer
209                    .humanize_id(*global_id)
210                    .unwrap_or_else(|| global_id.to_string());
211                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
212                ctx.indent.set();
213
214                ctx.indent += 1;
215
216                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
217                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
218
219                if printed {
220                    ctx.indent += 1;
221                }
222                if let Some(literal_constraint) = literal_constraint {
223                    writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
224                    ctx.indent += 1;
225                    let value = mode.expr(literal_constraint, None);
226                    writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
227                } else {
228                    writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
229                }
230
231                ctx.indent.reset();
232            }
233        }
234
235        Ok(())
236    }
237
238    pub fn fmt_verbose_text<'a, T>(
239        &self,
240        f: &mut fmt::Formatter<'_>,
241        ctx: &mut PlanRenderingContext<'a, T>,
242    ) -> fmt::Result {
243        let redacted = ctx.config.redacted;
244        let mode = HumanizedExplain::new(redacted);
245
246        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
247        // code.
248        match self {
249            FastPathPlan::Constant(Ok(rows), _) => {
250                if !rows.is_empty() {
251                    writeln!(f, "{}Constant", ctx.indent)?;
252                    *ctx.as_mut() += 1;
253                    fmt_text_constant_rows(
254                        f,
255                        rows.iter().map(|(row, diff)| (row, diff)),
256                        ctx.as_mut(),
257                        redacted,
258                    )?;
259                    *ctx.as_mut() -= 1;
260                } else {
261                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
262                }
263                Ok(())
264            }
265            FastPathPlan::Constant(Err(err), _) => {
266                if redacted {
267                    writeln!(f, "{}Error █", ctx.as_mut())
268                } else {
269                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
270                }
271            }
272            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
273                ctx.as_mut().set();
274                let (map, filter, project) = mfp.as_map_filter_project();
275
276                let cols = if !ctx.config.humanized_exprs {
277                    None
278                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
279                    // FIXME: account for thinning and permutation
280                    // See mz_expr::permutation_for_arrangement
281                    // See permute_oneshot_mfp_around_index
282                    let cols = itertools::chain(
283                        cols.iter().cloned(),
284                        std::iter::repeat(String::new()).take(map.len()),
285                    )
286                    .collect();
287                    Some(cols)
288                } else {
289                    None
290                };
291
292                if project.len() != mfp.input_arity + map.len()
293                    || !project.iter().enumerate().all(|(i, o)| i == *o)
294                {
295                    let outputs = mode.seq(&project, cols.as_ref());
296                    let outputs = CompactScalars(outputs);
297                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
298                    *ctx.as_mut() += 1;
299                }
300                if !filter.is_empty() {
301                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
302                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
303                    *ctx.as_mut() += 1;
304                }
305                if !map.is_empty() {
306                    let scalars = mode.seq(&map, cols.as_ref());
307                    let scalars = CompactScalars(scalars);
308                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
309                    *ctx.as_mut() += 1;
310                }
311                MirRelationExpr::fmt_indexed_filter(
312                    f,
313                    ctx,
314                    coll_id,
315                    idx_id,
316                    literal_constraints.clone(),
317                    None,
318                )?;
319                writeln!(f)?;
320                ctx.as_mut().reset();
321                Ok(())
322            }
323            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
324                ctx.as_mut().set();
325                let (map, filter, project) = mfp.as_map_filter_project();
326
327                let cols = if !ctx.config.humanized_exprs {
328                    None
329                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
330                    let cols = itertools::chain(
331                        cols.iter().cloned(),
332                        std::iter::repeat(String::new()).take(map.len()),
333                    )
334                    .collect::<Vec<_>>();
335                    Some(cols)
336                } else {
337                    None
338                };
339
340                if project.len() != mfp.input_arity + map.len()
341                    || !project.iter().enumerate().all(|(i, o)| i == *o)
342                {
343                    let outputs = mode.seq(&project, cols.as_ref());
344                    let outputs = CompactScalars(outputs);
345                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
346                    *ctx.as_mut() += 1;
347                }
348                if !filter.is_empty() {
349                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
350                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
351                    *ctx.as_mut() += 1;
352                }
353                if !map.is_empty() {
354                    let scalars = mode.seq(&map, cols.as_ref());
355                    let scalars = CompactScalars(scalars);
356                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
357                    *ctx.as_mut() += 1;
358                }
359                let human_id = ctx
360                    .humanizer
361                    .humanize_id(*gid)
362                    .unwrap_or_else(|| gid.to_string());
363                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
364                if let Some(literal) = literal_constraint {
365                    let value = mode.expr(literal, None);
366                    writeln!(f, " [value={}]", value)?;
367                } else {
368                    writeln!(f, "")?;
369                }
370                ctx.as_mut().reset();
371                Ok(())
372            }
373        }?;
374        Ok(())
375    }
376}
377
378#[derive(Debug)]
379pub struct PlannedPeek {
380    pub plan: PeekPlan,
381    pub determination: TimestampDetermination<mz_repr::Timestamp>,
382    pub conn_id: ConnectionId,
383    /// The result type _after_ reading out of the "source" and applying any
384    /// [MapFilterProject](mz_expr::MapFilterProject), but _before_ applying a
385    /// [RowSetFinishing].
386    ///
387    /// This is _the_ `result_type` as far as compute is concerned and further
388    /// changes through projections happen purely in the adapter.
389    pub intermediate_result_type: SqlRelationType,
390    pub source_arity: usize,
391    pub source_ids: BTreeSet<GlobalId>,
392}
393
394/// Possible ways in which the coordinator could produce the result for a goal view.
395#[derive(Clone, Debug)]
396pub enum PeekPlan<T = mz_repr::Timestamp> {
397    FastPath(FastPathPlan),
398    /// The view must be installed as a dataflow and then read.
399    SlowPath(PeekDataflowPlan<T>),
400}
401
402/// Convert `mfp` to an executable, non-temporal plan.
403/// It should be non-temporal, as OneShot preparation populates `mz_now`.
404///
405/// If the `mfp` can't be converted into a non-temporal plan, this returns an _internal_ error.
406fn mfp_to_safe_plan(
407    mfp: mz_expr::MapFilterProject,
408) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
409    mfp.into_plan()
410        .map_err(OptimizerError::InternalUnsafeMfpPlan)?
411        .into_nontemporal()
412        .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
413}
414
415/// If it can't convert `mfp` into a `SafeMfpPlan`, this returns an _internal_ error.
416fn permute_oneshot_mfp_around_index(
417    mfp: mz_expr::MapFilterProject,
418    key: &[MirScalarExpr],
419) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
420    let input_arity = mfp.input_arity;
421    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
422    let (permute, thinning) = permutation_for_arrangement(key, input_arity);
423    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
424    Ok(safe_mfp)
425}
426
427/// Determine if the dataflow plan can be implemented without an actual dataflow.
428///
429/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
430/// we can avoid building a dataflow (and either just return the results, or peek
431/// out of the arrangement, respectively).
432pub fn create_fast_path_plan<T: Timestamp>(
433    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
434    view_id: GlobalId,
435    finishing: Option<&RowSetFinishing>,
436    persist_fast_path_limit: usize,
437    persist_fast_path_order: bool,
438) -> Result<Option<FastPathPlan>, OptimizerError> {
439    // At this point, `dataflow_plan` contains our best optimized dataflow.
440    // We will check the plan to see if there is a fast path to escape full dataflow construction.
441
442    // We need to restrict ourselves to settings where the inserted transient view is the first thing
443    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
444    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
445    {
446        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
447        if let Some((rows, found_typ)) = mir.as_const() {
448            // In the case of a constant, we can return the result now.
449            return Ok(Some(FastPathPlan::Constant(
450                rows.clone()
451                    .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
452                found_typ.clone(),
453            )));
454        } else {
455            // If there is a TopK that would be completely covered by the finishing, then jump
456            // through the TopK.
457            if let MirRelationExpr::TopK {
458                input,
459                group_key,
460                order_key,
461                limit,
462                offset,
463                monotonic: _,
464                expected_group_size: _,
465            } = mir
466            {
467                if let Some(finishing) = finishing {
468                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
469                        // The following is roughly `limit >= finishing.limit`, but with Options.
470                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
471                            (None, _) => true,
472                            (Some(..), None) => false,
473                            (Some(topk_limit), Some(finishing_limit)) => {
474                                if let Some(l) = topk_limit.as_literal_int64() {
475                                    l >= *finishing_limit
476                                } else {
477                                    false
478                                }
479                            }
480                        };
481                        if finishing_limits_at_least_as_topk {
482                            mir = input;
483                        }
484                    }
485                }
486            }
487            // In the case of a linear operator around an indexed view, we
488            // can skip creating a dataflow and instead pull all the rows in
489            // index and apply the linear operator against them.
490            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
491            match mir {
492                MirRelationExpr::Get {
493                    id: Id::Global(get_id),
494                    typ: relation_typ,
495                    ..
496                } => {
497                    // Just grab any arrangement if an arrangement exists
498                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
499                        if desc.on_id == *get_id {
500                            return Ok(Some(FastPathPlan::PeekExisting(
501                                *get_id,
502                                *index_id,
503                                None,
504                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
505                            )));
506                        }
507                    }
508
509                    // If there is no arrangement, consider peeking the persist shard directly.
510                    // Generally, we consider a persist peek when the query can definitely be satisfied
511                    // by scanning through a small, constant number of Persist key-values.
512                    let safe_mfp = mfp_to_safe_plan(mfp)?;
513                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
514
515                    let literal_constraint = if persist_fast_path_order {
516                        let mut row = Row::default();
517                        let mut packer = row.packer();
518                        for (idx, col) in relation_typ.column_types.iter().enumerate() {
519                            if !preserves_order(&col.scalar_type) {
520                                break;
521                            }
522                            let col_expr = MirScalarExpr::column(idx);
523
524                            let Some((literal, _)) = filters
525                                .iter()
526                                .filter_map(|f| f.expr_eq_literal(&col_expr))
527                                .next()
528                            else {
529                                break;
530                            };
531                            packer.extend_by_row(&literal);
532                        }
533                        if row.is_empty() { None } else { Some(row) }
534                    } else {
535                        None
536                    };
537
538                    let finish_ok = match &finishing {
539                        None => false,
540                        Some(RowSetFinishing {
541                            order_by,
542                            limit,
543                            offset,
544                            ..
545                        }) => {
546                            let order_ok = if persist_fast_path_order {
547                                order_by.iter().enumerate().all(|(idx, order)| {
548                                    // Map the ordering column back to the column in the source data.
549                                    // (If it's not one of the input columns, we can't make any guarantees.)
550                                    let column_idx = projection[order.column];
551                                    if column_idx >= safe_mfp.input_arity {
552                                        return false;
553                                    }
554                                    let column_type = &relation_typ.column_types[column_idx];
555                                    let index_ok = idx == column_idx;
556                                    let nulls_ok = !column_type.nullable || order.nulls_last;
557                                    let asc_ok = !order.desc;
558                                    let type_ok = preserves_order(&column_type.scalar_type);
559                                    index_ok && nulls_ok && asc_ok && type_ok
560                                })
561                            } else {
562                                order_by.is_empty()
563                            };
564                            let limit_ok = limit.map_or(false, |l| {
565                                usize::cast_from(l) + *offset < persist_fast_path_limit
566                            });
567                            order_ok && limit_ok
568                        }
569                    };
570
571                    let key_constraint = if let Some(literal) = &literal_constraint {
572                        let prefix_len = literal.iter().count();
573                        relation_typ
574                            .keys
575                            .iter()
576                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
577                    } else {
578                        false
579                    };
580
581                    // We can generate a persist peek when:
582                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
583                    // - We can return the first N key values (no filters, small limit, consistent order)
584                    if key_constraint || (filters.is_empty() && finish_ok) {
585                        return Ok(Some(FastPathPlan::PeekPersist(
586                            *get_id,
587                            literal_constraint,
588                            safe_mfp,
589                        )));
590                    }
591                }
592                MirRelationExpr::Join { implementation, .. } => {
593                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
594                        implementation
595                    {
596                        return Ok(Some(FastPathPlan::PeekExisting(
597                            *coll_id,
598                            *idx_id,
599                            Some(vals.clone()),
600                            permute_oneshot_mfp_around_index(mfp, key)?,
601                        )));
602                    }
603                }
604                // nothing can be done for non-trivial expressions.
605                _ => {}
606            }
607        }
608    }
609    Ok(None)
610}
611
612impl FastPathPlan {
613    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
614        match self {
615            FastPathPlan::Constant(..) => UsedIndexes::default(),
616            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
617                if literal_constraints.is_some() {
618                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
619                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
620                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
621                } else {
622                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
623                }
624            }
625            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
626        }
627    }
628}
629
630impl crate::coord::Coordinator {
631    /// Implements a peek plan produced by `create_plan` above.
632    #[mz_ore::instrument(level = "debug")]
633    pub async fn implement_peek_plan(
634        &mut self,
635        ctx_extra: &mut ExecuteContextExtra,
636        plan: PlannedPeek,
637        finishing: RowSetFinishing,
638        compute_instance: ComputeInstanceId,
639        target_replica: Option<ReplicaId>,
640        max_result_size: u64,
641        max_returned_query_size: Option<u64>,
642    ) -> Result<ExecuteResponse, AdapterError> {
643        let PlannedPeek {
644            plan: fast_path,
645            determination,
646            conn_id,
647            intermediate_result_type,
648            source_arity,
649            source_ids,
650        } = plan;
651
652        // If the dataflow optimizes to a constant expression, we can immediately return the result.
653        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
654            let mut rows = match rows {
655                Ok(rows) => rows,
656                Err(e) => return Err(e.into()),
657            };
658            // Consolidate down the results to get correct totals.
659            consolidate(&mut rows);
660
661            let mut results = Vec::new();
662            for (row, count) in rows {
663                if count.is_negative() {
664                    Err(EvalError::InvalidParameterValue(
665                        format!("Negative multiplicity in constant result: {}", count).into(),
666                    ))?
667                };
668                if count.is_positive() {
669                    let count = usize::cast_from(
670                        u64::try_from(count.into_inner())
671                            .expect("known to be positive from check above"),
672                    );
673                    results.push((
674                        row,
675                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
676                    ));
677                }
678            }
679            let row_collection = RowCollection::new(results, &finishing.order_by);
680            let duration_histogram = self.metrics.row_set_finishing_seconds();
681
682            let (ret, reason) = match finishing.finish(
683                row_collection,
684                max_result_size,
685                max_returned_query_size,
686                &duration_histogram,
687            ) {
688                Ok((rows, row_size_bytes)) => {
689                    let result_size = u64::cast_from(row_size_bytes);
690                    let rows_returned = u64::cast_from(rows.count());
691                    (
692                        Ok(Self::send_immediate_rows(rows)),
693                        StatementEndedExecutionReason::Success {
694                            result_size: Some(result_size),
695                            rows_returned: Some(rows_returned),
696                            execution_strategy: Some(StatementExecutionStrategy::Constant),
697                        },
698                    )
699                }
700                Err(error) => (
701                    Err(AdapterError::ResultSize(error.clone())),
702                    StatementEndedExecutionReason::Errored { error },
703                ),
704            };
705            self.retire_execution(reason, std::mem::take(ctx_extra));
706            return ret;
707        }
708
709        let timestamp = determination.timestamp_context.timestamp_or_default();
710        if let Some(id) = ctx_extra.contents() {
711            self.set_statement_execution_timestamp(id, timestamp)
712        }
713
714        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
715        // In both cases we will want to peek, and the main difference is that we might want to
716        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
717        // differently.
718
719        // If we must build the view, ship the dataflow.
720        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
721            PeekPlan::FastPath(FastPathPlan::PeekExisting(
722                _coll_id,
723                idx_id,
724                literal_constraints,
725                map_filter_project,
726            )) => (
727                (literal_constraints, timestamp, map_filter_project),
728                None,
729                true,
730                PeekTarget::Index { id: idx_id },
731                StatementExecutionStrategy::FastPath,
732            ),
733            PeekPlan::FastPath(FastPathPlan::PeekPersist(
734                coll_id,
735                literal_constraint,
736                map_filter_project,
737            )) => {
738                let peek_command = (
739                    literal_constraint.map(|r| vec![r]),
740                    timestamp,
741                    map_filter_project,
742                );
743                let metadata = self
744                    .controller
745                    .storage
746                    .collection_metadata(coll_id)
747                    .expect("storage collection for fast-path peek")
748                    .clone();
749                (
750                    peek_command,
751                    None,
752                    true,
753                    PeekTarget::Persist {
754                        id: coll_id,
755                        metadata,
756                    },
757                    StatementExecutionStrategy::PersistFastPath,
758                )
759            }
760            PeekPlan::SlowPath(PeekDataflowPlan {
761                desc: dataflow,
762                // n.b. this index_id identifies a transient index the
763                // caller created, so it is guaranteed to be on
764                // `compute_instance`.
765                id: index_id,
766                key: index_key,
767                permutation: index_permutation,
768                thinned_arity: index_thinned_arity,
769            }) => {
770                let output_ids = dataflow.export_ids().collect();
771
772                // Very important: actually create the dataflow (here, so we can destructure).
773                self.controller
774                    .compute
775                    .create_dataflow(compute_instance, dataflow, None)
776                    .map_err(
777                        AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
778                    )?;
779                self.initialize_compute_read_policies(
780                    output_ids,
781                    compute_instance,
782                    // Disable compaction so that nothing can compact before the peek occurs below.
783                    CompactionWindow::DisableCompaction,
784                )
785                .await;
786
787                // Create an identity MFP operator.
788                let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
789                map_filter_project.permute_fn(
790                    |c| index_permutation[c],
791                    index_key.len() + index_thinned_arity,
792                );
793                let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
794
795                (
796                    (None, timestamp, map_filter_project),
797                    Some(index_id),
798                    false,
799                    PeekTarget::Index { id: index_id },
800                    StatementExecutionStrategy::Standard,
801                )
802            }
803            _ => {
804                unreachable!()
805            }
806        };
807
808        // Endpoints for sending and receiving peek responses.
809        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
810
811        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
812        // small but unlikely chance that it's not unique to completed peeks.
813        let mut uuid = Uuid::new_v4();
814        while self.pending_peeks.contains_key(&uuid) {
815            uuid = Uuid::new_v4();
816        }
817
818        // The peek is ready to go for both cases, fast and non-fast.
819        // Stash the response mechanism, and broadcast dataflow construction.
820        self.pending_peeks.insert(
821            uuid,
822            PendingPeek {
823                conn_id: conn_id.clone(),
824                cluster_id: compute_instance,
825                depends_on: source_ids,
826                ctx_extra: std::mem::take(ctx_extra),
827                is_fast_path,
828            },
829        );
830        self.client_pending_peeks
831            .entry(conn_id)
832            .or_default()
833            .insert(uuid, compute_instance);
834        let (literal_constraints, timestamp, map_filter_project) = peek_command;
835
836        // At this stage we don't know column names for the result because we
837        // only know the peek's result type as a bare SqlRelationType.
838        let peek_result_column_names =
839            (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
840        let peek_result_desc =
841            RelationDesc::new(intermediate_result_type, peek_result_column_names);
842
843        self.controller
844            .compute
845            .peek(
846                compute_instance,
847                peek_target,
848                literal_constraints,
849                uuid,
850                timestamp,
851                peek_result_desc,
852                finishing.clone(),
853                map_filter_project,
854                target_replica,
855                rows_tx,
856            )
857            .unwrap_or_terminate("cannot fail to peek");
858
859        let duration_histogram = self.metrics.row_set_finishing_seconds();
860
861        // If a dataflow was created, drop it once the peek command is sent.
862        if let Some(index_id) = drop_dataflow {
863            self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
864            self.drop_compute_collections(vec![(compute_instance, index_id)]);
865        }
866
867        let persist_client = self.persist_client.clone();
868        let peek_stash_read_batch_size_bytes =
869            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
870                .get(self.catalog().system_config().dyncfgs());
871        let peek_stash_read_memory_budget_bytes =
872            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
873                .get(self.catalog().system_config().dyncfgs());
874
875        let peek_response_stream = Self::create_peek_response_stream(
876            rows_rx,
877            finishing,
878            max_result_size,
879            max_returned_query_size,
880            duration_histogram,
881            persist_client,
882            peek_stash_read_batch_size_bytes,
883            peek_stash_read_memory_budget_bytes,
884        );
885
886        Ok(crate::ExecuteResponse::SendingRowsStreaming {
887            rows: Box::pin(peek_response_stream),
888            instance_id: compute_instance,
889            strategy,
890        })
891    }
892
893    /// Creates an async stream that processes peek responses and yields rows.
894    #[mz_ore::instrument(level = "debug")]
895    pub(crate) fn create_peek_response_stream(
896        rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
897        finishing: RowSetFinishing,
898        max_result_size: u64,
899        max_returned_query_size: Option<u64>,
900        duration_histogram: prometheus::Histogram,
901        mut persist_client: mz_persist_client::PersistClient,
902        peek_stash_read_batch_size_bytes: usize,
903        peek_stash_read_memory_budget_bytes: usize,
904    ) -> impl futures::Stream<Item = PeekResponseUnary> {
905        async_stream::stream!({
906            let result = rows_rx.await;
907
908            let rows = match result {
909                Ok(rows) => rows,
910                Err(e) => {
911                    yield PeekResponseUnary::Error(e.to_string());
912                    return;
913                }
914            };
915
916            match rows {
917                PeekResponse::Rows(rows) => {
918                    match finishing.finish(
919                        rows,
920                        max_result_size,
921                        max_returned_query_size,
922                        &duration_histogram,
923                    ) {
924                        Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
925                        Err(e) => yield PeekResponseUnary::Error(e),
926                    }
927                }
928                PeekResponse::Stashed(response) => {
929                    let response = *response;
930
931                    let shard_id = response.shard_id;
932
933                    let mut batches = Vec::new();
934                    for proto_batch in response.batches.into_iter() {
935                        let batch =
936                            persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
937
938                        batches.push(batch);
939                    }
940                    tracing::trace!(?batches, "stashed peek response");
941
942                    let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
943                    let read_schemas: Schemas<SourceData, ()> = Schemas {
944                        id: None,
945                        key: Arc::new(response.relation_desc.clone()),
946                        val: Arc::new(UnitSchema),
947                    };
948
949                    let mut row_cursor = persist_client
950                        .read_batches_consolidated::<_, _, _, i64>(
951                            response.shard_id,
952                            as_of,
953                            read_schemas,
954                            batches,
955                            |_stats| true,
956                            peek_stash_read_memory_budget_bytes,
957                        )
958                        .await
959                        .expect("invalid usage");
960
961                    // NOTE: Using the cursor creates Futures that are not Sync,
962                    // so we can't drive them on the main Coordinator loop.
963                    // Spawning a task has the additional benefit that we get to
964                    // delete batches once we're done.
965                    //
966                    // Batch deletion is best-effort, though, and there are
967                    // multiple known ways in which they can leak, among them:
968                    //
969                    // - ProtoBatch is lost in flight
970                    // - ProtoBatch is lost because when combining PeekResponse
971                    // from workers a cancellation or error "overrides" other
972                    // results, meaning we drop them
973                    // - This task here is not run to completion before it can
974                    // delete all batches
975                    //
976                    // This is semi-ok, because persist needs a reaper of leaked
977                    // batches already, and so we piggy-back on that, even if it
978                    // might not exist as of today.
979                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
980                    mz_ore::task::spawn(|| "read_peek_batches", async move {
981                        // We always send our inline rows first. Ordering
982                        // doesn't matter because we can only be in this case
983                        // when there is no ORDER BY.
984                        //
985                        // We _could_ write these out as a Batch, and include it
986                        // in the batches we read via the Consolidator. If we
987                        // wanted to get a consistent ordering. That's not
988                        // needed for correctness! But might be nice for more
989                        // aesthetic reasons.
990                        let result = tx.send(response.inline_rows).await;
991                        if result.is_err() {
992                            tracing::error!("receiver went away");
993                        }
994
995                        let mut current_batch = Vec::new();
996                        let mut current_batch_size: usize = 0;
997
998                        'outer: while let Some(rows) = row_cursor.next().await {
999                            for ((key, _val), _ts, diff) in rows {
1000                                let source_data = key.expect("decoding error");
1001
1002                                let row = source_data
1003                                    .0
1004                                    .expect("we are not sending errors on this code path");
1005
1006                                let diff = usize::try_from(diff)
1007                                    .expect("peek responses cannot have negative diffs");
1008
1009                                if diff > 0 {
1010                                    let diff =
1011                                        NonZeroUsize::new(diff).expect("checked to be non-zero");
1012                                    current_batch_size =
1013                                        current_batch_size.saturating_add(row.byte_len());
1014                                    current_batch.push((row, diff));
1015                                }
1016
1017                                if current_batch_size > peek_stash_read_batch_size_bytes {
1018                                    // We're re-encoding the rows as a RowCollection
1019                                    // here, for which we pay in CPU time. We're in a
1020                                    // slow path already, since we're returning a big
1021                                    // stashed result so this is worth the convenience
1022                                    // of that for now.
1023                                    let result = tx
1024                                        .send(RowCollection::new(
1025                                            current_batch.drain(..).collect_vec(),
1026                                            &[],
1027                                        ))
1028                                        .await;
1029                                    if result.is_err() {
1030                                        tracing::error!("receiver went away");
1031                                        // Don't return but break so we fall out to the
1032                                        // batch delete logic below.
1033                                        break 'outer;
1034                                    }
1035
1036                                    current_batch_size = 0;
1037                                }
1038                            }
1039                        }
1040
1041                        if current_batch.len() > 0 {
1042                            let result = tx.send(RowCollection::new(current_batch, &[])).await;
1043                            if result.is_err() {
1044                                tracing::error!("receiver went away");
1045                            }
1046                        }
1047
1048                        let batches = row_cursor.into_lease();
1049                        tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1050                        for batch in batches {
1051                            batch.delete().await;
1052                        }
1053                    });
1054
1055                    assert!(
1056                        finishing.is_streamable(response.relation_desc.arity()),
1057                        "can only get stashed responses when the finishing is streamable"
1058                    );
1059
1060                    tracing::trace!("query result is streamable!");
1061
1062                    assert!(finishing.is_streamable(response.relation_desc.arity()));
1063                    let mut incremental_finishing = RowSetFinishingIncremental::new(
1064                        finishing.offset,
1065                        finishing.limit,
1066                        finishing.project,
1067                        max_returned_query_size,
1068                    );
1069
1070                    let mut got_zero_rows = true;
1071                    while let Some(rows) = rx.recv().await {
1072                        got_zero_rows = false;
1073
1074                        let result_rows = incremental_finishing.finish_incremental(
1075                            rows,
1076                            max_result_size,
1077                            &duration_histogram,
1078                        );
1079
1080                        match result_rows {
1081                            Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1082                            Err(e) => yield PeekResponseUnary::Error(e),
1083                        }
1084                    }
1085
1086                    // Even when there's zero rows, clients still expect an
1087                    // empty PeekResponse.
1088                    if got_zero_rows {
1089                        let row_iter = vec![].into_row_iter();
1090                        yield PeekResponseUnary::Rows(Box::new(row_iter));
1091                    }
1092                }
1093                PeekResponse::Canceled => {
1094                    yield PeekResponseUnary::Canceled;
1095                }
1096                PeekResponse::Error(e) => {
1097                    yield PeekResponseUnary::Error(e);
1098                }
1099            }
1100        })
1101    }
1102
1103    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
1104    #[mz_ore::instrument(level = "debug")]
1105    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1106        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1107            self.metrics
1108                .canceled_peeks
1109                .inc_by(u64::cast_from(uuids.len()));
1110
1111            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1112            for (uuid, compute_instance) in &uuids {
1113                inverse.entry(*compute_instance).or_default().insert(*uuid);
1114            }
1115            for (compute_instance, uuids) in inverse {
1116                // It's possible that this compute instance no longer exists because it was dropped
1117                // while the peek was in progress. In this case we ignore the error and move on
1118                // because the dataflow no longer exists.
1119                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
1120                for uuid in uuids {
1121                    let _ = self.controller.compute.cancel_peek(
1122                        compute_instance,
1123                        uuid,
1124                        PeekResponse::Canceled,
1125                    );
1126                }
1127            }
1128
1129            let peeks = uuids
1130                .iter()
1131                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1132                .collect::<Vec<_>>();
1133            for peek in peeks {
1134                self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
1135            }
1136        }
1137    }
1138
1139    /// Handle a peek notification and retire the corresponding execution. Does nothing for
1140    /// already-removed peeks.
1141    pub(crate) fn handle_peek_notification(
1142        &mut self,
1143        uuid: Uuid,
1144        notification: PeekNotification,
1145        otel_ctx: OpenTelemetryContext,
1146    ) {
1147        // We expect exactly one peek response, which we forward. Then we clean up the
1148        // peek's state in the coordinator.
1149        if let Some(PendingPeek {
1150            conn_id: _,
1151            cluster_id: _,
1152            depends_on: _,
1153            ctx_extra,
1154            is_fast_path,
1155        }) = self.remove_pending_peek(&uuid)
1156        {
1157            let reason = match notification {
1158                PeekNotification::Success {
1159                    rows: num_rows,
1160                    result_size,
1161                } => {
1162                    let strategy = if is_fast_path {
1163                        StatementExecutionStrategy::FastPath
1164                    } else {
1165                        StatementExecutionStrategy::Standard
1166                    };
1167                    StatementEndedExecutionReason::Success {
1168                        result_size: Some(result_size),
1169                        rows_returned: Some(num_rows),
1170                        execution_strategy: Some(strategy),
1171                    }
1172                }
1173                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1174                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1175            };
1176            otel_ctx.attach_as_parent();
1177            self.retire_execution(reason, ctx_extra);
1178        }
1179        // Cancellation may cause us to receive responses for peeks no
1180        // longer in `self.pending_peeks`, so we quietly ignore them.
1181    }
1182
1183    /// Clean up a peek's state.
1184    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1185        let pending_peek = self.pending_peeks.remove(uuid);
1186        if let Some(pending_peek) = &pending_peek {
1187            let uuids = self
1188                .client_pending_peeks
1189                .get_mut(&pending_peek.conn_id)
1190                .expect("coord peek state is inconsistent");
1191            uuids.remove(uuid);
1192            if uuids.is_empty() {
1193                self.client_pending_peeks.remove(&pending_peek.conn_id);
1194            }
1195        }
1196        pending_peek
1197    }
1198
1199    /// Implements a slow-path peek by creating a transient dataflow.
1200    /// This is called from the command handler for ExecuteSlowPathPeek.
1201    ///
1202    /// (For now, this method simply delegates to implement_peek_plan by constructing
1203    /// the necessary PlannedPeek structure and a minimal ExecuteContext.)
1204    pub(crate) async fn implement_slow_path_peek(
1205        &mut self,
1206        dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1207        determination: TimestampDetermination<mz_repr::Timestamp>,
1208        finishing: RowSetFinishing,
1209        compute_instance: ComputeInstanceId,
1210        target_replica: Option<ReplicaId>,
1211        intermediate_result_type: SqlRelationType,
1212        source_ids: BTreeSet<GlobalId>,
1213        conn_id: ConnectionId,
1214        max_result_size: u64,
1215        max_query_result_size: Option<u64>,
1216    ) -> Result<ExecuteResponse, AdapterError> {
1217        let source_arity = intermediate_result_type.arity();
1218
1219        let planned_peek = PlannedPeek {
1220            plan: PeekPlan::SlowPath(dataflow_plan),
1221            determination,
1222            conn_id,
1223            intermediate_result_type,
1224            source_arity,
1225            source_ids,
1226        };
1227
1228        // Create a minimal ExecuteContext
1229        // TODO(peek-seq): Use the real context once we have statement logging.
1230        let mut ctx_extra = ExecuteContextExtra::default();
1231
1232        // Call the old peek sequencing's implement_peek_plan for now.
1233        // TODO(peek-seq): After the old peek sequencing is completely removed, we should merge the
1234        // relevant parts of the old `implement_peek_plan` into this method, and remove the old
1235        // `implement_peek_plan`.
1236        self.implement_peek_plan(
1237            &mut ctx_extra,
1238            planned_peek,
1239            finishing,
1240            compute_instance,
1241            target_replica,
1242            max_result_size,
1243            max_query_result_size,
1244        )
1245        .await
1246    }
1247
1248    /// Implements a COPY TO command by validating S3 connection, shipping the dataflow,
1249    /// and spawning a background task to wait for completion.
1250    /// This is called from the command handler for ExecuteCopyTo.
1251    ///
1252    /// This method inlines the logic from peek_copy_to_preflight and peek_copy_to_dataflow
1253    /// to avoid the complexity of the staging mechanism for the frontend peek sequencing path.
1254    ///
1255    /// This method does NOT block waiting for completion. Instead, it spawns a background task that
1256    /// will send the response through the provided tx channel when the COPY TO completes.
1257    /// All errors (setup or execution) are sent through tx.
1258    pub(crate) async fn implement_copy_to(
1259        &mut self,
1260        df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1261        compute_instance: ComputeInstanceId,
1262        target_replica: Option<ReplicaId>,
1263        source_ids: BTreeSet<GlobalId>,
1264        conn_id: ConnectionId,
1265        tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1266    ) {
1267        // Helper to send error and return early
1268        let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1269                        e: AdapterError| {
1270            let _ = tx.send(Err(e));
1271        };
1272
1273        let sink_id = df_desc.sink_id();
1274
1275        // # Inlined from peek_copy_to_preflight
1276
1277        let connection_context = self.connection_context().clone();
1278        let sinks = &df_desc.sink_exports;
1279
1280        if sinks.len() != 1 {
1281            send_err(
1282                tx,
1283                AdapterError::Internal("expected exactly one copy to s3 sink".into()),
1284            );
1285            return;
1286        }
1287        let (sink_id_check, sink_desc) = sinks
1288            .first_key_value()
1289            .expect("known to be exactly one copy to s3 sink");
1290        assert_eq!(sink_id, *sink_id_check);
1291
1292        match &sink_desc.connection {
1293            ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1294                if let Err(e) = mz_storage_types::sinks::s3_oneshot_sink::preflight(
1295                    connection_context,
1296                    &conn.aws_connection,
1297                    &conn.upload_info,
1298                    conn.connection_id,
1299                    sink_id,
1300                )
1301                .await
1302                {
1303                    send_err(tx, e.into());
1304                    return;
1305                }
1306            }
1307            _ => {
1308                send_err(
1309                    tx,
1310                    AdapterError::Internal("expected copy to s3 oneshot sink".into()),
1311                );
1312                return;
1313            }
1314        }
1315
1316        // # Inlined from peek_copy_to_dataflow
1317
1318        // Create and register ActiveCopyTo.
1319        // Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
1320        // This is different from the command's tx which sends the response to the client
1321        let (sink_tx, sink_rx) = oneshot::channel();
1322        let active_copy_to = ActiveCopyTo {
1323            conn_id: conn_id.clone(),
1324            tx: sink_tx,
1325            cluster_id: compute_instance,
1326            depends_on: source_ids,
1327        };
1328
1329        // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1330        drop(
1331            self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1332                .await,
1333        );
1334
1335        // Try to ship the dataflow. We handle errors gracefully because dependencies might have
1336        // disappeared during sequencing.
1337        if let Err(e) = self
1338            .try_ship_dataflow(df_desc, compute_instance, target_replica)
1339            .await
1340            .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1341        {
1342            // Clean up the active compute sink that was added above, since the dataflow was never
1343            // created. If we don't do this, the sink_id remains in drop_sinks but no collection
1344            // exists in the compute controller, causing a panic when the connection terminates.
1345            self.remove_active_compute_sink(sink_id).await;
1346            let _ = tx.send(Err(e));
1347            return;
1348        }
1349
1350        // Spawn background task to wait for completion
1351        // We must NOT await sink_rx here directly, as that would block the coordinator's main task
1352        // from processing the completion message. Instead, we spawn a background task that will
1353        // send the result through tx when the COPY TO completes.
1354        let span = Span::current();
1355        task::spawn(
1356            || "copy to completion",
1357            async move {
1358                let res = sink_rx.await;
1359                let result = match res {
1360                    Ok(res) => res,
1361                    Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1362                };
1363                let _ = tx.send(result);
1364            }
1365            .instrument(span),
1366        );
1367    }
1368
1369    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
1370    /// client immediately, as opposed to asking the dataflow layer to send along
1371    /// the rows after some computation.
1372    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1373    where
1374        I: IntoRowIterator,
1375        I::Iter: Send + Sync + 'static,
1376    {
1377        let rows = Box::new(rows.into_row_iter());
1378        ExecuteResponse::SendingRowsImmediate { rows }
1379    }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384    use mz_expr::func::IsNull;
1385    use mz_expr::{MapFilterProject, UnaryFunc};
1386    use mz_ore::str::Indent;
1387    use mz_repr::explain::text::text_string_at;
1388    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1389    use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1390
1391    use super::*;
1392
1393    #[mz_ore::test]
1394    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1395    fn test_fast_path_plan_as_text() {
1396        let typ = SqlRelationType::new(vec![SqlColumnType {
1397            scalar_type: SqlScalarType::String,
1398            nullable: false,
1399        }]);
1400        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1401        let no_lookup = FastPathPlan::PeekExisting(
1402            GlobalId::User(8),
1403            GlobalId::User(10),
1404            None,
1405            MapFilterProject::new(4)
1406                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1407                .project([1, 4])
1408                .into_plan()
1409                .expect("invalid plan")
1410                .into_nontemporal()
1411                .expect("invalid nontemporal"),
1412        );
1413        let lookup = FastPathPlan::PeekExisting(
1414            GlobalId::User(9),
1415            GlobalId::User(11),
1416            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1417            MapFilterProject::new(3)
1418                .filter(Some(
1419                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1420                ))
1421                .into_plan()
1422                .expect("invalid plan")
1423                .into_nontemporal()
1424                .expect("invalid nontemporal"),
1425        );
1426
1427        let humanizer = DummyHumanizer;
1428        let config = ExplainConfig {
1429            redacted: false,
1430            verbose_syntax: true,
1431            ..Default::default()
1432        };
1433        let ctx_gen = || {
1434            let indent = Indent::default();
1435            let annotations = BTreeMap::new();
1436            PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1437        };
1438
1439        let constant_err_exp = "Error \"division by zero\"\n";
1440        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1441        let lookup_exp =
1442            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1443
1444        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1445        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1446        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1447
1448        let mut constant_rows = vec![
1449            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1450            (Row::pack(Some(Datum::String("world"))), 2.into()),
1451            (Row::pack(Some(Datum::String("star"))), 500.into()),
1452        ];
1453        let constant_exp1 =
1454            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
1455        assert_eq!(
1456            text_string_at(
1457                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1458                ctx_gen
1459            ),
1460            constant_exp1
1461        );
1462        constant_rows
1463            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1464        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
1465        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
1466        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
1467        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
1468        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
1469        assert_eq!(
1470            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1471            constant_exp2
1472        );
1473    }
1474}