mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36    RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::tracing::OpenTelemetryContext;
41use mz_persist_client::Schemas;
42use mz_persist_types::codec_impls::UnitSchema;
43use mz_repr::explain::text::DisplayText;
44use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
45use mz_repr::{
46    Diff, GlobalId, IntoRowIterator, RelationDesc, RelationType, Row, RowIterator, preserves_order,
47};
48use mz_storage_types::sources::SourceData;
49use serde::{Deserialize, Serialize};
50use timely::progress::{Antichain, Timestamp};
51use uuid::Uuid;
52
53use crate::coord::timestamp_selection::TimestampDetermination;
54use crate::optimize::OptimizerError;
55use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
56use crate::util::ResultExt;
57use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
58
59/// A peek is a request to read data from a maintained arrangement.
60#[derive(Debug)]
61pub(crate) struct PendingPeek {
62    /// The connection that initiated the peek.
63    pub(crate) conn_id: ConnectionId,
64    /// The cluster that the peek is being executed on.
65    pub(crate) cluster_id: ClusterId,
66    /// All `GlobalId`s that the peek depend on.
67    pub(crate) depends_on: BTreeSet<GlobalId>,
68    /// Context about the execute that produced this peek,
69    /// needed by the coordinator for retiring it.
70    pub(crate) ctx_extra: ExecuteContextExtra,
71    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
72    pub(crate) is_fast_path: bool,
73}
74
75/// The response from a `Peek`, with row multiplicities represented in unary.
76///
77/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
78/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
79#[derive(Debug)]
80pub enum PeekResponseUnary {
81    Rows(Box<dyn RowIterator + Send + Sync>),
82    Error(String),
83    Canceled,
84}
85
86#[derive(Clone, Debug)]
87pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
88    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
89    pub(crate) id: GlobalId,
90    key: Vec<MirScalarExpr>,
91    permutation: Vec<usize>,
92    thinned_arity: usize,
93}
94
95impl<T> PeekDataflowPlan<T> {
96    pub fn new(
97        desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
98        id: GlobalId,
99        typ: &RelationType,
100    ) -> Self {
101        let arity = typ.arity();
102        let key = typ
103            .default_key()
104            .into_iter()
105            .map(MirScalarExpr::column)
106            .collect::<Vec<_>>();
107        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
108        Self {
109            desc,
110            id,
111            key,
112            permutation,
113            thinned_arity: thinning.len(),
114        }
115    }
116}
117
118#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
119pub enum FastPathPlan {
120    /// The view evaluates to a constant result that can be returned.
121    ///
122    /// The [RelationType] is unnecessary for evaluating the constant result but
123    /// may be helpful when printing out an explanation.
124    Constant(Result<Vec<(Row, Diff)>, EvalError>, RelationType),
125    /// The view can be read out of an existing arrangement.
126    /// (coll_id, idx_id, values to look up, mfp to apply)
127    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
128    /// The view can be read directly out of Persist.
129    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
130}
131
132impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
133    fn fmt_text(
134        &self,
135        f: &mut fmt::Formatter<'_>,
136        ctx: &mut PlanRenderingContext<'a, T>,
137    ) -> fmt::Result {
138        if ctx.config.verbose_syntax {
139            self.fmt_verbose_text(f, ctx)
140        } else {
141            self.fmt_default_text(f, ctx)
142        }
143    }
144}
145
146impl FastPathPlan {
147    pub fn fmt_default_text<'a, T>(
148        &self,
149        f: &mut fmt::Formatter<'_>,
150        ctx: &mut PlanRenderingContext<'a, T>,
151    ) -> fmt::Result {
152        let mode = HumanizedExplain::new(ctx.config.redacted);
153
154        match self {
155            FastPathPlan::Constant(rows, _) => {
156                write!(f, "{}→Constant ", ctx.indent)?;
157
158                match rows {
159                    Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
160                    Err(err) => {
161                        if mode.redacted() {
162                            writeln!(f, "(error: █)")?;
163                        } else {
164                            writeln!(f, "(error: {})", err.to_string().quoted(),)?;
165                        }
166                    }
167                }
168            }
169            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
170                let coll = ctx
171                    .humanizer
172                    .humanize_id(*coll_id)
173                    .unwrap_or_else(|| coll_id.to_string());
174                let idx = ctx
175                    .humanizer
176                    .humanize_id(*idx_id)
177                    .unwrap_or_else(|| idx_id.to_string());
178                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
179                ctx.indent.set();
180
181                ctx.indent += 1;
182
183                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
184                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
185
186                if printed {
187                    ctx.indent += 1;
188                }
189                if let Some(literal_constraints) = literal_constraints {
190                    writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
191                    ctx.indent += 1;
192                    let values = separated("; ", mode.seq(literal_constraints, None));
193                    writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
194                } else {
195                    writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
196                }
197
198                ctx.indent.reset();
199            }
200            FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
201                let coll = ctx
202                    .humanizer
203                    .humanize_id(*global_id)
204                    .unwrap_or_else(|| global_id.to_string());
205                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
206                ctx.indent.set();
207
208                ctx.indent += 1;
209
210                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
211                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
212
213                if printed {
214                    ctx.indent += 1;
215                }
216                if let Some(literal_constraint) = literal_constraint {
217                    writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
218                    ctx.indent += 1;
219                    let value = mode.expr(literal_constraint, None);
220                    writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
221                } else {
222                    writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
223                }
224
225                ctx.indent.reset();
226            }
227        }
228
229        Ok(())
230    }
231
232    pub fn fmt_verbose_text<'a, T>(
233        &self,
234        f: &mut fmt::Formatter<'_>,
235        ctx: &mut PlanRenderingContext<'a, T>,
236    ) -> fmt::Result {
237        let redacted = ctx.config.redacted;
238        let mode = HumanizedExplain::new(redacted);
239
240        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
241        // code.
242        match self {
243            FastPathPlan::Constant(Ok(rows), _) => {
244                if !rows.is_empty() {
245                    writeln!(f, "{}Constant", ctx.indent)?;
246                    *ctx.as_mut() += 1;
247                    fmt_text_constant_rows(
248                        f,
249                        rows.iter().map(|(row, diff)| (row, diff)),
250                        ctx.as_mut(),
251                        redacted,
252                    )?;
253                    *ctx.as_mut() -= 1;
254                } else {
255                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
256                }
257                Ok(())
258            }
259            FastPathPlan::Constant(Err(err), _) => {
260                if redacted {
261                    writeln!(f, "{}Error █", ctx.as_mut())
262                } else {
263                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
264                }
265            }
266            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
267                ctx.as_mut().set();
268                let (map, filter, project) = mfp.as_map_filter_project();
269
270                let cols = if !ctx.config.humanized_exprs {
271                    None
272                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
273                    // FIXME: account for thinning and permutation
274                    // See mz_expr::permutation_for_arrangement
275                    // See permute_oneshot_mfp_around_index
276                    let cols = itertools::chain(
277                        cols.iter().cloned(),
278                        std::iter::repeat(String::new()).take(map.len()),
279                    )
280                    .collect();
281                    Some(cols)
282                } else {
283                    None
284                };
285
286                if project.len() != mfp.input_arity + map.len()
287                    || !project.iter().enumerate().all(|(i, o)| i == *o)
288                {
289                    let outputs = mode.seq(&project, cols.as_ref());
290                    let outputs = CompactScalars(outputs);
291                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
292                    *ctx.as_mut() += 1;
293                }
294                if !filter.is_empty() {
295                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
296                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
297                    *ctx.as_mut() += 1;
298                }
299                if !map.is_empty() {
300                    let scalars = mode.seq(&map, cols.as_ref());
301                    let scalars = CompactScalars(scalars);
302                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
303                    *ctx.as_mut() += 1;
304                }
305                MirRelationExpr::fmt_indexed_filter(
306                    f,
307                    ctx,
308                    coll_id,
309                    idx_id,
310                    literal_constraints.clone(),
311                    None,
312                )?;
313                writeln!(f)?;
314                ctx.as_mut().reset();
315                Ok(())
316            }
317            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
318                ctx.as_mut().set();
319                let (map, filter, project) = mfp.as_map_filter_project();
320
321                let cols = if !ctx.config.humanized_exprs {
322                    None
323                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
324                    let cols = itertools::chain(
325                        cols.iter().cloned(),
326                        std::iter::repeat(String::new()).take(map.len()),
327                    )
328                    .collect::<Vec<_>>();
329                    Some(cols)
330                } else {
331                    None
332                };
333
334                if project.len() != mfp.input_arity + map.len()
335                    || !project.iter().enumerate().all(|(i, o)| i == *o)
336                {
337                    let outputs = mode.seq(&project, cols.as_ref());
338                    let outputs = CompactScalars(outputs);
339                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
340                    *ctx.as_mut() += 1;
341                }
342                if !filter.is_empty() {
343                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
344                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
345                    *ctx.as_mut() += 1;
346                }
347                if !map.is_empty() {
348                    let scalars = mode.seq(&map, cols.as_ref());
349                    let scalars = CompactScalars(scalars);
350                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
351                    *ctx.as_mut() += 1;
352                }
353                let human_id = ctx
354                    .humanizer
355                    .humanize_id(*gid)
356                    .unwrap_or_else(|| gid.to_string());
357                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
358                if let Some(literal) = literal_constraint {
359                    let value = mode.expr(literal, None);
360                    writeln!(f, " [value={}]", value)?;
361                } else {
362                    writeln!(f, "")?;
363                }
364                ctx.as_mut().reset();
365                Ok(())
366            }
367        }?;
368        Ok(())
369    }
370}
371
372#[derive(Debug)]
373pub struct PlannedPeek {
374    pub plan: PeekPlan,
375    pub determination: TimestampDetermination<mz_repr::Timestamp>,
376    pub conn_id: ConnectionId,
377    /// The result type _after_ reading out of the "source" and applying any
378    /// [MapFilterProject](mz_expr::MapFilterProject), but _before_ applying a
379    /// [RowSetFinishing].
380    ///
381    /// This is _the_ `result_type` as far as compute is concerned and futher
382    /// changes through projections happen purely in the adapter.
383    pub intermediate_result_type: RelationType,
384    pub source_arity: usize,
385    pub source_ids: BTreeSet<GlobalId>,
386}
387
388/// Possible ways in which the coordinator could produce the result for a goal view.
389#[derive(Clone, Debug)]
390pub enum PeekPlan<T = mz_repr::Timestamp> {
391    FastPath(FastPathPlan),
392    /// The view must be installed as a dataflow and then read.
393    SlowPath(PeekDataflowPlan<T>),
394}
395
396/// Convert `mfp` to an executable, non-temporal plan.
397/// It should be non-temporal, as OneShot preparation populates `mz_now`.
398///
399/// If the `mfp` can't be converted into a non-temporal plan, this returns an _internal_ error.
400fn mfp_to_safe_plan(
401    mfp: mz_expr::MapFilterProject,
402) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
403    mfp.into_plan()
404        .map_err(OptimizerError::InternalUnsafeMfpPlan)?
405        .into_nontemporal()
406        .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
407}
408
409/// If it can't convert `mfp` into a `SafeMfpPlan`, this returns an _internal_ error.
410fn permute_oneshot_mfp_around_index(
411    mfp: mz_expr::MapFilterProject,
412    key: &[MirScalarExpr],
413) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
414    let input_arity = mfp.input_arity;
415    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
416    let (permute, thinning) = permutation_for_arrangement(key, input_arity);
417    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
418    Ok(safe_mfp)
419}
420
421/// Determine if the dataflow plan can be implemented without an actual dataflow.
422///
423/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
424/// we can avoid building a dataflow (and either just return the results, or peek
425/// out of the arrangement, respectively).
426pub fn create_fast_path_plan<T: Timestamp>(
427    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
428    view_id: GlobalId,
429    finishing: Option<&RowSetFinishing>,
430    persist_fast_path_limit: usize,
431    persist_fast_path_order: bool,
432) -> Result<Option<FastPathPlan>, OptimizerError> {
433    // At this point, `dataflow_plan` contains our best optimized dataflow.
434    // We will check the plan to see if there is a fast path to escape full dataflow construction.
435
436    // We need to restrict ourselves to settings where the inserted transient view is the first thing
437    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
438    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
439    {
440        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
441        if let Some((rows, found_typ)) = mir.as_const() {
442            // In the case of a constant, we can return the result now.
443            return Ok(Some(FastPathPlan::Constant(
444                rows.clone()
445                    .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
446                found_typ.clone(),
447            )));
448        } else {
449            // If there is a TopK that would be completely covered by the finishing, then jump
450            // through the TopK.
451            if let MirRelationExpr::TopK {
452                input,
453                group_key,
454                order_key,
455                limit,
456                offset,
457                monotonic: _,
458                expected_group_size: _,
459            } = mir
460            {
461                if let Some(finishing) = finishing {
462                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
463                        // The following is roughly `limit >= finishing.limit`, but with Options.
464                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
465                            (None, _) => true,
466                            (Some(..), None) => false,
467                            (Some(topk_limit), Some(finishing_limit)) => {
468                                if let Some(l) = topk_limit.as_literal_int64() {
469                                    l >= *finishing_limit
470                                } else {
471                                    false
472                                }
473                            }
474                        };
475                        if finishing_limits_at_least_as_topk {
476                            mir = input;
477                        }
478                    }
479                }
480            }
481            // In the case of a linear operator around an indexed view, we
482            // can skip creating a dataflow and instead pull all the rows in
483            // index and apply the linear operator against them.
484            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
485            match mir {
486                MirRelationExpr::Get {
487                    id: Id::Global(get_id),
488                    typ: relation_typ,
489                    ..
490                } => {
491                    // Just grab any arrangement if an arrangement exists
492                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
493                        if desc.on_id == *get_id {
494                            return Ok(Some(FastPathPlan::PeekExisting(
495                                *get_id,
496                                *index_id,
497                                None,
498                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
499                            )));
500                        }
501                    }
502
503                    // If there is no arrangement, consider peeking the persist shard directly.
504                    // Generally, we consider a persist peek when the query can definitely be satisfied
505                    // by scanning through a small, constant number of Persist key-values.
506                    let safe_mfp = mfp_to_safe_plan(mfp)?;
507                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
508
509                    let literal_constraint = if persist_fast_path_order {
510                        let mut row = Row::default();
511                        let mut packer = row.packer();
512                        for (idx, col) in relation_typ.column_types.iter().enumerate() {
513                            if !preserves_order(&col.scalar_type) {
514                                break;
515                            }
516                            let col_expr = MirScalarExpr::column(idx);
517
518                            let Some((literal, _)) = filters
519                                .iter()
520                                .filter_map(|f| f.expr_eq_literal(&col_expr))
521                                .next()
522                            else {
523                                break;
524                            };
525                            packer.extend_by_row(&literal);
526                        }
527                        if row.is_empty() { None } else { Some(row) }
528                    } else {
529                        None
530                    };
531
532                    let finish_ok = match &finishing {
533                        None => false,
534                        Some(RowSetFinishing {
535                            order_by,
536                            limit,
537                            offset,
538                            ..
539                        }) => {
540                            let order_ok = if persist_fast_path_order {
541                                order_by.iter().enumerate().all(|(idx, order)| {
542                                    // Map the ordering column back to the column in the source data.
543                                    // (If it's not one of the input columns, we can't make any guarantees.)
544                                    let column_idx = projection[order.column];
545                                    if column_idx >= safe_mfp.input_arity {
546                                        return false;
547                                    }
548                                    let column_type = &relation_typ.column_types[column_idx];
549                                    let index_ok = idx == column_idx;
550                                    let nulls_ok = !column_type.nullable || order.nulls_last;
551                                    let asc_ok = !order.desc;
552                                    let type_ok = preserves_order(&column_type.scalar_type);
553                                    index_ok && nulls_ok && asc_ok && type_ok
554                                })
555                            } else {
556                                order_by.is_empty()
557                            };
558                            let limit_ok = limit.map_or(false, |l| {
559                                usize::cast_from(l) + *offset < persist_fast_path_limit
560                            });
561                            order_ok && limit_ok
562                        }
563                    };
564
565                    let key_constraint = if let Some(literal) = &literal_constraint {
566                        let prefix_len = literal.iter().count();
567                        relation_typ
568                            .keys
569                            .iter()
570                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
571                    } else {
572                        false
573                    };
574
575                    // We can generate a persist peek when:
576                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
577                    // - We can return the first N key values (no filters, small limit, consistent order)
578                    if key_constraint || (filters.is_empty() && finish_ok) {
579                        return Ok(Some(FastPathPlan::PeekPersist(
580                            *get_id,
581                            literal_constraint,
582                            safe_mfp,
583                        )));
584                    }
585                }
586                MirRelationExpr::Join { implementation, .. } => {
587                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
588                        implementation
589                    {
590                        return Ok(Some(FastPathPlan::PeekExisting(
591                            *coll_id,
592                            *idx_id,
593                            Some(vals.clone()),
594                            permute_oneshot_mfp_around_index(mfp, key)?,
595                        )));
596                    }
597                }
598                // nothing can be done for non-trivial expressions.
599                _ => {}
600            }
601        }
602    }
603    Ok(None)
604}
605
606impl FastPathPlan {
607    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
608        match self {
609            FastPathPlan::Constant(..) => UsedIndexes::default(),
610            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
611                if literal_constraints.is_some() {
612                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
613                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
614                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
615                } else {
616                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
617                }
618            }
619            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
620        }
621    }
622}
623
624impl crate::coord::Coordinator {
625    /// Implements a peek plan produced by `create_plan` above.
626    #[mz_ore::instrument(level = "debug")]
627    pub async fn implement_peek_plan(
628        &mut self,
629        ctx_extra: &mut ExecuteContextExtra,
630        plan: PlannedPeek,
631        finishing: RowSetFinishing,
632        compute_instance: ComputeInstanceId,
633        target_replica: Option<ReplicaId>,
634        max_result_size: u64,
635        max_returned_query_size: Option<u64>,
636    ) -> Result<crate::ExecuteResponse, AdapterError> {
637        let PlannedPeek {
638            plan: fast_path,
639            determination,
640            conn_id,
641            intermediate_result_type,
642            source_arity,
643            source_ids,
644        } = plan;
645
646        // If the dataflow optimizes to a constant expression, we can immediately return the result.
647        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
648            let mut rows = match rows {
649                Ok(rows) => rows,
650                Err(e) => return Err(e.into()),
651            };
652            // Consolidate down the results to get correct totals.
653            consolidate(&mut rows);
654
655            let mut results = Vec::new();
656            for (row, count) in rows {
657                if count.is_negative() {
658                    Err(EvalError::InvalidParameterValue(
659                        format!("Negative multiplicity in constant result: {}", count).into(),
660                    ))?
661                };
662                if count.is_positive() {
663                    let count = usize::cast_from(
664                        u64::try_from(count.into_inner())
665                            .expect("known to be positive from check above"),
666                    );
667                    results.push((
668                        row,
669                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
670                    ));
671                }
672            }
673            let row_collection = RowCollection::new(results, &finishing.order_by);
674            let duration_histogram = self.metrics.row_set_finishing_seconds();
675
676            let (ret, reason) = match finishing.finish(
677                row_collection,
678                max_result_size,
679                max_returned_query_size,
680                &duration_histogram,
681            ) {
682                Ok((rows, row_size_bytes)) => {
683                    let result_size = u64::cast_from(row_size_bytes);
684                    let rows_returned = u64::cast_from(rows.count());
685                    (
686                        Ok(Self::send_immediate_rows(rows)),
687                        StatementEndedExecutionReason::Success {
688                            result_size: Some(result_size),
689                            rows_returned: Some(rows_returned),
690                            execution_strategy: Some(StatementExecutionStrategy::Constant),
691                        },
692                    )
693                }
694                Err(error) => (
695                    Err(AdapterError::ResultSize(error.clone())),
696                    StatementEndedExecutionReason::Errored { error },
697                ),
698            };
699            self.retire_execution(reason, std::mem::take(ctx_extra));
700            return ret;
701        }
702
703        let timestamp = determination.timestamp_context.timestamp_or_default();
704        if let Some(id) = ctx_extra.contents() {
705            self.set_statement_execution_timestamp(id, timestamp)
706        }
707
708        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
709        // In both cases we will want to peek, and the main difference is that we might want to
710        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
711        // differently.
712
713        // If we must build the view, ship the dataflow.
714        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
715            PeekPlan::FastPath(FastPathPlan::PeekExisting(
716                _coll_id,
717                idx_id,
718                literal_constraints,
719                map_filter_project,
720            )) => (
721                (literal_constraints, timestamp, map_filter_project),
722                None,
723                true,
724                PeekTarget::Index { id: idx_id },
725                StatementExecutionStrategy::FastPath,
726            ),
727            PeekPlan::FastPath(FastPathPlan::PeekPersist(
728                coll_id,
729                literal_constraint,
730                map_filter_project,
731            )) => {
732                let peek_command = (
733                    literal_constraint.map(|r| vec![r]),
734                    timestamp,
735                    map_filter_project,
736                );
737                let metadata = self
738                    .controller
739                    .storage
740                    .collection_metadata(coll_id)
741                    .expect("storage collection for fast-path peek")
742                    .clone();
743                (
744                    peek_command,
745                    None,
746                    true,
747                    PeekTarget::Persist {
748                        id: coll_id,
749                        metadata,
750                    },
751                    StatementExecutionStrategy::PersistFastPath,
752                )
753            }
754            PeekPlan::SlowPath(PeekDataflowPlan {
755                desc: dataflow,
756                // n.b. this index_id identifies a transient index the
757                // caller created, so it is guaranteed to be on
758                // `compute_instance`.
759                id: index_id,
760                key: index_key,
761                permutation: index_permutation,
762                thinned_arity: index_thinned_arity,
763            }) => {
764                let output_ids = dataflow.export_ids().collect();
765
766                // Very important: actually create the dataflow (here, so we can destructure).
767                self.controller
768                    .compute
769                    .create_dataflow(compute_instance, dataflow, None)
770                    .unwrap_or_terminate("cannot fail to create dataflows");
771                self.initialize_compute_read_policies(
772                    output_ids,
773                    compute_instance,
774                    // Disable compaction so that nothing can compact before the peek occurs below.
775                    CompactionWindow::DisableCompaction,
776                )
777                .await;
778
779                // Create an identity MFP operator.
780                let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
781                map_filter_project.permute_fn(
782                    |c| index_permutation[c],
783                    index_key.len() + index_thinned_arity,
784                );
785                let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
786
787                (
788                    (None, timestamp, map_filter_project),
789                    Some(index_id),
790                    false,
791                    PeekTarget::Index { id: index_id },
792                    StatementExecutionStrategy::Standard,
793                )
794            }
795            _ => {
796                unreachable!()
797            }
798        };
799
800        // Endpoints for sending and receiving peek responses.
801        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
802
803        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
804        // small but unlikely chance that it's not unique to completed peeks.
805        let mut uuid = Uuid::new_v4();
806        while self.pending_peeks.contains_key(&uuid) {
807            uuid = Uuid::new_v4();
808        }
809
810        // The peek is ready to go for both cases, fast and non-fast.
811        // Stash the response mechanism, and broadcast dataflow construction.
812        self.pending_peeks.insert(
813            uuid,
814            PendingPeek {
815                conn_id: conn_id.clone(),
816                cluster_id: compute_instance,
817                depends_on: source_ids,
818                ctx_extra: std::mem::take(ctx_extra),
819                is_fast_path,
820            },
821        );
822        self.client_pending_peeks
823            .entry(conn_id)
824            .or_default()
825            .insert(uuid, compute_instance);
826        let (literal_constraints, timestamp, map_filter_project) = peek_command;
827
828        // At this stage we don't know column names for the result because we
829        // only know the peek's result type as a bare ResultType.
830        let peek_result_column_names =
831            (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
832        let peek_result_desc =
833            RelationDesc::new(intermediate_result_type, peek_result_column_names);
834
835        self.controller
836            .compute
837            .peek(
838                compute_instance,
839                peek_target,
840                literal_constraints,
841                uuid,
842                timestamp,
843                peek_result_desc,
844                finishing.clone(),
845                map_filter_project,
846                target_replica,
847                rows_tx,
848            )
849            .unwrap_or_terminate("cannot fail to peek");
850
851        let duration_histogram = self.metrics.row_set_finishing_seconds();
852
853        // If a dataflow was created, drop it once the peek command is sent.
854        if let Some(index_id) = drop_dataflow {
855            self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
856            self.drop_indexes(vec![(compute_instance, index_id)]);
857        }
858
859        let persist_client = self.persist_client.clone();
860        let peek_stash_read_batch_size_bytes =
861            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
862                .get(self.catalog().system_config().dyncfgs());
863        let peek_stash_read_memory_budget_bytes =
864            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
865                .get(self.catalog().system_config().dyncfgs());
866
867        let peek_response_stream = Self::create_peek_response_stream(
868            rows_rx,
869            finishing,
870            max_result_size,
871            max_returned_query_size,
872            duration_histogram,
873            persist_client,
874            peek_stash_read_batch_size_bytes,
875            peek_stash_read_memory_budget_bytes,
876        );
877
878        Ok(crate::ExecuteResponse::SendingRowsStreaming {
879            rows: Box::pin(peek_response_stream),
880            instance_id: compute_instance,
881            strategy,
882        })
883    }
884
885    /// Creates an async stream that processes peek responses and yields rows.
886    #[mz_ore::instrument(level = "debug")]
887    fn create_peek_response_stream(
888        rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
889        finishing: RowSetFinishing,
890        max_result_size: u64,
891        max_returned_query_size: Option<u64>,
892        duration_histogram: prometheus::Histogram,
893        mut persist_client: mz_persist_client::PersistClient,
894        peek_stash_read_batch_size_bytes: usize,
895        peek_stash_read_memory_budget_bytes: usize,
896    ) -> impl futures::Stream<Item = PeekResponseUnary> {
897        async_stream::stream!({
898            let result = rows_rx.await;
899
900            let rows = match result {
901                Ok(rows) => rows,
902                Err(e) => {
903                    yield PeekResponseUnary::Error(e.to_string());
904                    return;
905                }
906            };
907
908            match rows {
909                PeekResponse::Rows(rows) => {
910                    match finishing.finish(
911                        rows,
912                        max_result_size,
913                        max_returned_query_size,
914                        &duration_histogram,
915                    ) {
916                        Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
917                        Err(e) => yield PeekResponseUnary::Error(e),
918                    }
919                }
920                PeekResponse::Stashed(response) => {
921                    let response = *response;
922
923                    let shard_id = response.shard_id;
924
925                    let mut batches = Vec::new();
926                    for proto_batch in response.batches.into_iter() {
927                        let batch =
928                            persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
929
930                        batches.push(batch);
931                    }
932                    tracing::trace!(?batches, "stashed peek response");
933
934                    let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
935                    let read_schemas: Schemas<SourceData, ()> = Schemas {
936                        id: None,
937                        key: Arc::new(response.relation_desc.clone()),
938                        val: Arc::new(UnitSchema),
939                    };
940
941                    let mut row_cursor = persist_client
942                        .read_batches_consolidated::<_, _, _, i64>(
943                            response.shard_id,
944                            as_of,
945                            read_schemas,
946                            batches,
947                            |_stats| true,
948                            peek_stash_read_memory_budget_bytes,
949                        )
950                        .await
951                        .expect("invalid usage");
952
953                    // NOTE: Using the cursor creates Futures that are not Sync,
954                    // so we can't drive them on the main Coordinator loop.
955                    // Spawning a task has the additional benefit that we get to
956                    // delete batches once we're done.
957                    //
958                    // Batch deletion is best-effort, though, and there are
959                    // multiple known ways in which they can leak, among them:
960                    //
961                    // - ProtoBatch is lost in flight
962                    // - ProtoBatch is lost because when combining PeekResponse
963                    // from workers a cancellation or error "overrides" other
964                    // results, meaning we drop them
965                    // - This task here is not run to completion before it can
966                    // delete all batches
967                    //
968                    // This is semi-ok, because persist needs a reaper of leaked
969                    // batches already, and so we piggy-back on that, even if it
970                    // might not exist as of today.
971                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
972                    mz_ore::task::spawn(|| "read_peek_batches", async move {
973                        // We always send our inline rows first. Ordering
974                        // doesn't matter because we can only be in this case
975                        // when there is no ORDER BY.
976                        //
977                        // We _could_ write these out as a Batch, and include it
978                        // in the batches we read via the Consolidator. If we
979                        // wanted to get a consistent ordering. That's not
980                        // needed for correctness! But might be nice for more
981                        // aesthetic reasons.
982                        let result = tx.send(response.inline_rows).await;
983                        if result.is_err() {
984                            tracing::error!("receiver went away");
985                        }
986
987                        let mut current_batch = Vec::new();
988                        let mut current_batch_size: usize = 0;
989
990                        'outer: while let Some(rows) = row_cursor.next().await {
991                            for ((key, _val), _ts, diff) in rows {
992                                let source_data = key.expect("decoding error");
993
994                                let row = source_data
995                                    .0
996                                    .expect("we are not sending errors on this code path");
997
998                                let diff = usize::try_from(diff)
999                                    .expect("peek responses cannot have negative diffs");
1000
1001                                if diff > 0 {
1002                                    let diff =
1003                                        NonZeroUsize::new(diff).expect("checked to be non-zero");
1004                                    current_batch_size =
1005                                        current_batch_size.saturating_add(row.byte_len());
1006                                    current_batch.push((row, diff));
1007                                }
1008
1009                                if current_batch_size > peek_stash_read_batch_size_bytes {
1010                                    // We're re-encoding the rows as a RowCollection
1011                                    // here, for which we pay in CPU time. We're in a
1012                                    // slow path already, since we're returning a big
1013                                    // stashed result so this is worth the convenience
1014                                    // of that for now.
1015                                    let result = tx
1016                                        .send(RowCollection::new(
1017                                            current_batch.drain(..).collect_vec(),
1018                                            &[],
1019                                        ))
1020                                        .await;
1021                                    if result.is_err() {
1022                                        tracing::error!("receiver went away");
1023                                        // Don't return but break so we fall out to the
1024                                        // batch delete logic below.
1025                                        break 'outer;
1026                                    }
1027
1028                                    current_batch_size = 0;
1029                                }
1030                            }
1031                        }
1032
1033                        if current_batch.len() > 0 {
1034                            let result = tx.send(RowCollection::new(current_batch, &[])).await;
1035                            if result.is_err() {
1036                                tracing::error!("receiver went away");
1037                            }
1038                        }
1039
1040                        let batches = row_cursor.into_lease();
1041                        tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1042                        for batch in batches {
1043                            batch.delete().await;
1044                        }
1045                    });
1046
1047                    assert!(
1048                        finishing.is_streamable(response.relation_desc.arity()),
1049                        "can only get stashed responses when the finishing is streamable"
1050                    );
1051
1052                    tracing::trace!("query result is streamable!");
1053
1054                    assert!(finishing.is_streamable(response.relation_desc.arity()));
1055                    let mut incremental_finishing = RowSetFinishingIncremental::new(
1056                        finishing.offset,
1057                        finishing.limit,
1058                        finishing.project,
1059                        max_returned_query_size,
1060                    );
1061
1062                    let mut got_zero_rows = true;
1063                    while let Some(rows) = rx.recv().await {
1064                        got_zero_rows = false;
1065
1066                        let result_rows = incremental_finishing.finish_incremental(
1067                            rows,
1068                            max_result_size,
1069                            &duration_histogram,
1070                        );
1071
1072                        match result_rows {
1073                            Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1074                            Err(e) => yield PeekResponseUnary::Error(e),
1075                        }
1076                    }
1077
1078                    // Even when there's zero rows, clients still expect an
1079                    // empty PeekResponse.
1080                    if got_zero_rows {
1081                        let row_iter = vec![].into_row_iter();
1082                        yield PeekResponseUnary::Rows(Box::new(row_iter));
1083                    }
1084                }
1085                PeekResponse::Canceled => {
1086                    yield PeekResponseUnary::Canceled;
1087                }
1088                PeekResponse::Error(e) => {
1089                    yield PeekResponseUnary::Error(e);
1090                }
1091            }
1092        })
1093    }
1094
1095    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
1096    #[mz_ore::instrument(level = "debug")]
1097    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1098        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1099            self.metrics
1100                .canceled_peeks
1101                .with_label_values(&[])
1102                .inc_by(u64::cast_from(uuids.len()));
1103
1104            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1105            for (uuid, compute_instance) in &uuids {
1106                inverse.entry(*compute_instance).or_default().insert(*uuid);
1107            }
1108            for (compute_instance, uuids) in inverse {
1109                // It's possible that this compute instance no longer exists because it was dropped
1110                // while the peek was in progress. In this case we ignore the error and move on
1111                // because the dataflow no longer exists.
1112                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
1113                for uuid in uuids {
1114                    let _ = self.controller.compute.cancel_peek(
1115                        compute_instance,
1116                        uuid,
1117                        PeekResponse::Canceled,
1118                    );
1119                }
1120            }
1121
1122            let peeks = uuids
1123                .iter()
1124                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1125                .collect::<Vec<_>>();
1126            for peek in peeks {
1127                self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
1128            }
1129        }
1130    }
1131
1132    /// Handle a peek notification and retire the corresponding execution. Does nothing for
1133    /// already-removed peeks.
1134    pub(crate) fn handle_peek_notification(
1135        &mut self,
1136        uuid: Uuid,
1137        notification: PeekNotification,
1138        otel_ctx: OpenTelemetryContext,
1139    ) {
1140        // We expect exactly one peek response, which we forward. Then we clean up the
1141        // peek's state in the coordinator.
1142        if let Some(PendingPeek {
1143            conn_id: _,
1144            cluster_id: _,
1145            depends_on: _,
1146            ctx_extra,
1147            is_fast_path,
1148        }) = self.remove_pending_peek(&uuid)
1149        {
1150            let reason = match notification {
1151                PeekNotification::Success {
1152                    rows: num_rows,
1153                    result_size,
1154                } => {
1155                    let strategy = if is_fast_path {
1156                        StatementExecutionStrategy::FastPath
1157                    } else {
1158                        StatementExecutionStrategy::Standard
1159                    };
1160                    StatementEndedExecutionReason::Success {
1161                        result_size: Some(result_size),
1162                        rows_returned: Some(num_rows),
1163                        execution_strategy: Some(strategy),
1164                    }
1165                }
1166                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1167                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1168            };
1169            otel_ctx.attach_as_parent();
1170            self.retire_execution(reason, ctx_extra);
1171        }
1172        // Cancellation may cause us to receive responses for peeks no
1173        // longer in `self.pending_peeks`, so we quietly ignore them.
1174    }
1175
1176    /// Clean up a peek's state.
1177    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1178        let pending_peek = self.pending_peeks.remove(uuid);
1179        if let Some(pending_peek) = &pending_peek {
1180            let uuids = self
1181                .client_pending_peeks
1182                .get_mut(&pending_peek.conn_id)
1183                .expect("coord peek state is inconsistent");
1184            uuids.remove(uuid);
1185            if uuids.is_empty() {
1186                self.client_pending_peeks.remove(&pending_peek.conn_id);
1187            }
1188        }
1189        pending_peek
1190    }
1191
1192    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
1193    /// client immediately, as opposed to asking the dataflow layer to send along
1194    /// the rows after some computation.
1195    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1196    where
1197        I: IntoRowIterator,
1198        I::Iter: Send + Sync + 'static,
1199    {
1200        let rows = Box::new(rows.into_row_iter());
1201        ExecuteResponse::SendingRowsImmediate { rows }
1202    }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use mz_expr::func::IsNull;
1208    use mz_expr::{MapFilterProject, UnaryFunc};
1209    use mz_ore::str::Indent;
1210    use mz_repr::explain::text::text_string_at;
1211    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1212    use mz_repr::{ColumnType, Datum, ScalarType};
1213
1214    use super::*;
1215
1216    #[mz_ore::test]
1217    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1218    fn test_fast_path_plan_as_text() {
1219        let typ = RelationType::new(vec![ColumnType {
1220            scalar_type: ScalarType::String,
1221            nullable: false,
1222        }]);
1223        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1224        let no_lookup = FastPathPlan::PeekExisting(
1225            GlobalId::User(8),
1226            GlobalId::User(10),
1227            None,
1228            MapFilterProject::new(4)
1229                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1230                .project([1, 4])
1231                .into_plan()
1232                .expect("invalid plan")
1233                .into_nontemporal()
1234                .expect("invalid nontemporal"),
1235        );
1236        let lookup = FastPathPlan::PeekExisting(
1237            GlobalId::User(9),
1238            GlobalId::User(11),
1239            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1240            MapFilterProject::new(3)
1241                .filter(Some(
1242                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1243                ))
1244                .into_plan()
1245                .expect("invalid plan")
1246                .into_nontemporal()
1247                .expect("invalid nontemporal"),
1248        );
1249
1250        let humanizer = DummyHumanizer;
1251        let config = ExplainConfig {
1252            redacted: false,
1253            verbose_syntax: true,
1254            ..Default::default()
1255        };
1256        let ctx_gen = || {
1257            let indent = Indent::default();
1258            let annotations = BTreeMap::new();
1259            PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1260        };
1261
1262        let constant_err_exp = "Error \"division by zero\"\n";
1263        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1264        let lookup_exp =
1265            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1266
1267        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1268        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1269        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1270
1271        let mut constant_rows = vec![
1272            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1273            (Row::pack(Some(Datum::String("world"))), 2.into()),
1274            (Row::pack(Some(Datum::String("star"))), 500.into()),
1275        ];
1276        let constant_exp1 =
1277            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
1278        assert_eq!(
1279            text_string_at(
1280                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1281                ctx_gen
1282            ),
1283            constant_exp1
1284        );
1285        constant_rows
1286            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1287        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
1288        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
1289        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
1290        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
1291        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
1292        assert_eq!(
1293            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1294            constant_exp2
1295        );
1296    }
1297}