mz_adapter/coord/
peek.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for creating, executing, and tracking peeks.
11//!
12//! This module determines if a dataflow can be short-cut, by returning constant values
13//! or by reading out of existing arrangements, and implements the appropriate plan.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35    EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36    RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::tracing::OpenTelemetryContext;
41use mz_persist_client::Schemas;
42use mz_persist_types::codec_impls::UnitSchema;
43use mz_repr::explain::text::DisplayText;
44use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
45use mz_repr::{
46    Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
47    preserves_order,
48};
49use mz_storage_types::sources::SourceData;
50use serde::{Deserialize, Serialize};
51use timely::progress::{Antichain, Timestamp};
52use uuid::Uuid;
53
54use crate::coord::timestamp_selection::TimestampDetermination;
55use crate::optimize::OptimizerError;
56use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
57use crate::util::ResultExt;
58use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
59
60/// A peek is a request to read data from a maintained arrangement.
61#[derive(Debug)]
62pub(crate) struct PendingPeek {
63    /// The connection that initiated the peek.
64    pub(crate) conn_id: ConnectionId,
65    /// The cluster that the peek is being executed on.
66    pub(crate) cluster_id: ClusterId,
67    /// All `GlobalId`s that the peek depend on.
68    pub(crate) depends_on: BTreeSet<GlobalId>,
69    /// Context about the execute that produced this peek,
70    /// needed by the coordinator for retiring it.
71    pub(crate) ctx_extra: ExecuteContextExtra,
72    /// Is this a fast-path peek, i.e. one that doesn't require a dataflow?
73    pub(crate) is_fast_path: bool,
74}
75
76/// The response from a `Peek`, with row multiplicities represented in unary.
77///
78/// Note that each `Peek` expects to generate exactly one `PeekResponse`, i.e.
79/// we expect a 1:1 contract between `Peek` and `PeekResponseUnary`.
80#[derive(Debug)]
81pub enum PeekResponseUnary {
82    Rows(Box<dyn RowIterator + Send + Sync>),
83    Error(String),
84    Canceled,
85}
86
87#[derive(Clone, Debug)]
88pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
89    pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
90    pub(crate) id: GlobalId,
91    key: Vec<MirScalarExpr>,
92    permutation: Vec<usize>,
93    thinned_arity: usize,
94}
95
96impl<T> PeekDataflowPlan<T> {
97    pub fn new(
98        desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
99        id: GlobalId,
100        typ: &SqlRelationType,
101    ) -> Self {
102        let arity = typ.arity();
103        let key = typ
104            .default_key()
105            .into_iter()
106            .map(MirScalarExpr::column)
107            .collect::<Vec<_>>();
108        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
109        Self {
110            desc,
111            id,
112            key,
113            permutation,
114            thinned_arity: thinning.len(),
115        }
116    }
117}
118
119#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
120pub enum FastPathPlan {
121    /// The view evaluates to a constant result that can be returned.
122    ///
123    /// The [SqlRelationType] is unnecessary for evaluating the constant result but
124    /// may be helpful when printing out an explanation.
125    Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
126    /// The view can be read out of an existing arrangement.
127    /// (coll_id, idx_id, values to look up, mfp to apply)
128    PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
129    /// The view can be read directly out of Persist.
130    PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
131}
132
133impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
134    fn fmt_text(
135        &self,
136        f: &mut fmt::Formatter<'_>,
137        ctx: &mut PlanRenderingContext<'a, T>,
138    ) -> fmt::Result {
139        if ctx.config.verbose_syntax {
140            self.fmt_verbose_text(f, ctx)
141        } else {
142            self.fmt_default_text(f, ctx)
143        }
144    }
145}
146
147impl FastPathPlan {
148    pub fn fmt_default_text<'a, T>(
149        &self,
150        f: &mut fmt::Formatter<'_>,
151        ctx: &mut PlanRenderingContext<'a, T>,
152    ) -> fmt::Result {
153        let mode = HumanizedExplain::new(ctx.config.redacted);
154
155        match self {
156            FastPathPlan::Constant(rows, _) => {
157                write!(f, "{}→Constant ", ctx.indent)?;
158
159                match rows {
160                    Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
161                    Err(err) => {
162                        if mode.redacted() {
163                            writeln!(f, "(error: █)")?;
164                        } else {
165                            writeln!(f, "(error: {})", err.to_string().quoted(),)?;
166                        }
167                    }
168                }
169            }
170            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
171                let coll = ctx
172                    .humanizer
173                    .humanize_id(*coll_id)
174                    .unwrap_or_else(|| coll_id.to_string());
175                let idx = ctx
176                    .humanizer
177                    .humanize_id(*idx_id)
178                    .unwrap_or_else(|| idx_id.to_string());
179                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
180                ctx.indent.set();
181
182                ctx.indent += 1;
183
184                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
185                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
186
187                if printed {
188                    ctx.indent += 1;
189                }
190                if let Some(literal_constraints) = literal_constraints {
191                    writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
192                    ctx.indent += 1;
193                    let values = separated("; ", mode.seq(literal_constraints, None));
194                    writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
195                } else {
196                    writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
197                }
198
199                ctx.indent.reset();
200            }
201            FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
202                let coll = ctx
203                    .humanizer
204                    .humanize_id(*global_id)
205                    .unwrap_or_else(|| global_id.to_string());
206                writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
207                ctx.indent.set();
208
209                ctx.indent += 1;
210
211                mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
212                let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
213
214                if printed {
215                    ctx.indent += 1;
216                }
217                if let Some(literal_constraint) = literal_constraint {
218                    writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
219                    ctx.indent += 1;
220                    let value = mode.expr(literal_constraint, None);
221                    writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
222                } else {
223                    writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
224                }
225
226                ctx.indent.reset();
227            }
228        }
229
230        Ok(())
231    }
232
233    pub fn fmt_verbose_text<'a, T>(
234        &self,
235        f: &mut fmt::Formatter<'_>,
236        ctx: &mut PlanRenderingContext<'a, T>,
237    ) -> fmt::Result {
238        let redacted = ctx.config.redacted;
239        let mode = HumanizedExplain::new(redacted);
240
241        // TODO(aalexandrov): factor out common PeekExisting and PeekPersist
242        // code.
243        match self {
244            FastPathPlan::Constant(Ok(rows), _) => {
245                if !rows.is_empty() {
246                    writeln!(f, "{}Constant", ctx.indent)?;
247                    *ctx.as_mut() += 1;
248                    fmt_text_constant_rows(
249                        f,
250                        rows.iter().map(|(row, diff)| (row, diff)),
251                        ctx.as_mut(),
252                        redacted,
253                    )?;
254                    *ctx.as_mut() -= 1;
255                } else {
256                    writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
257                }
258                Ok(())
259            }
260            FastPathPlan::Constant(Err(err), _) => {
261                if redacted {
262                    writeln!(f, "{}Error █", ctx.as_mut())
263                } else {
264                    writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
265                }
266            }
267            FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
268                ctx.as_mut().set();
269                let (map, filter, project) = mfp.as_map_filter_project();
270
271                let cols = if !ctx.config.humanized_exprs {
272                    None
273                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
274                    // FIXME: account for thinning and permutation
275                    // See mz_expr::permutation_for_arrangement
276                    // See permute_oneshot_mfp_around_index
277                    let cols = itertools::chain(
278                        cols.iter().cloned(),
279                        std::iter::repeat(String::new()).take(map.len()),
280                    )
281                    .collect();
282                    Some(cols)
283                } else {
284                    None
285                };
286
287                if project.len() != mfp.input_arity + map.len()
288                    || !project.iter().enumerate().all(|(i, o)| i == *o)
289                {
290                    let outputs = mode.seq(&project, cols.as_ref());
291                    let outputs = CompactScalars(outputs);
292                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
293                    *ctx.as_mut() += 1;
294                }
295                if !filter.is_empty() {
296                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
297                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
298                    *ctx.as_mut() += 1;
299                }
300                if !map.is_empty() {
301                    let scalars = mode.seq(&map, cols.as_ref());
302                    let scalars = CompactScalars(scalars);
303                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
304                    *ctx.as_mut() += 1;
305                }
306                MirRelationExpr::fmt_indexed_filter(
307                    f,
308                    ctx,
309                    coll_id,
310                    idx_id,
311                    literal_constraints.clone(),
312                    None,
313                )?;
314                writeln!(f)?;
315                ctx.as_mut().reset();
316                Ok(())
317            }
318            FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
319                ctx.as_mut().set();
320                let (map, filter, project) = mfp.as_map_filter_project();
321
322                let cols = if !ctx.config.humanized_exprs {
323                    None
324                } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
325                    let cols = itertools::chain(
326                        cols.iter().cloned(),
327                        std::iter::repeat(String::new()).take(map.len()),
328                    )
329                    .collect::<Vec<_>>();
330                    Some(cols)
331                } else {
332                    None
333                };
334
335                if project.len() != mfp.input_arity + map.len()
336                    || !project.iter().enumerate().all(|(i, o)| i == *o)
337                {
338                    let outputs = mode.seq(&project, cols.as_ref());
339                    let outputs = CompactScalars(outputs);
340                    writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
341                    *ctx.as_mut() += 1;
342                }
343                if !filter.is_empty() {
344                    let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
345                    writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
346                    *ctx.as_mut() += 1;
347                }
348                if !map.is_empty() {
349                    let scalars = mode.seq(&map, cols.as_ref());
350                    let scalars = CompactScalars(scalars);
351                    writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
352                    *ctx.as_mut() += 1;
353                }
354                let human_id = ctx
355                    .humanizer
356                    .humanize_id(*gid)
357                    .unwrap_or_else(|| gid.to_string());
358                write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
359                if let Some(literal) = literal_constraint {
360                    let value = mode.expr(literal, None);
361                    writeln!(f, " [value={}]", value)?;
362                } else {
363                    writeln!(f, "")?;
364                }
365                ctx.as_mut().reset();
366                Ok(())
367            }
368        }?;
369        Ok(())
370    }
371}
372
373#[derive(Debug)]
374pub struct PlannedPeek {
375    pub plan: PeekPlan,
376    pub determination: TimestampDetermination<mz_repr::Timestamp>,
377    pub conn_id: ConnectionId,
378    /// The result type _after_ reading out of the "source" and applying any
379    /// [MapFilterProject](mz_expr::MapFilterProject), but _before_ applying a
380    /// [RowSetFinishing].
381    ///
382    /// This is _the_ `result_type` as far as compute is concerned and futher
383    /// changes through projections happen purely in the adapter.
384    pub intermediate_result_type: SqlRelationType,
385    pub source_arity: usize,
386    pub source_ids: BTreeSet<GlobalId>,
387}
388
389/// Possible ways in which the coordinator could produce the result for a goal view.
390#[derive(Clone, Debug)]
391pub enum PeekPlan<T = mz_repr::Timestamp> {
392    FastPath(FastPathPlan),
393    /// The view must be installed as a dataflow and then read.
394    SlowPath(PeekDataflowPlan<T>),
395}
396
397/// Convert `mfp` to an executable, non-temporal plan.
398/// It should be non-temporal, as OneShot preparation populates `mz_now`.
399///
400/// If the `mfp` can't be converted into a non-temporal plan, this returns an _internal_ error.
401fn mfp_to_safe_plan(
402    mfp: mz_expr::MapFilterProject,
403) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
404    mfp.into_plan()
405        .map_err(OptimizerError::InternalUnsafeMfpPlan)?
406        .into_nontemporal()
407        .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
408}
409
410/// If it can't convert `mfp` into a `SafeMfpPlan`, this returns an _internal_ error.
411fn permute_oneshot_mfp_around_index(
412    mfp: mz_expr::MapFilterProject,
413    key: &[MirScalarExpr],
414) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
415    let input_arity = mfp.input_arity;
416    let mut safe_mfp = mfp_to_safe_plan(mfp)?;
417    let (permute, thinning) = permutation_for_arrangement(key, input_arity);
418    safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
419    Ok(safe_mfp)
420}
421
422/// Determine if the dataflow plan can be implemented without an actual dataflow.
423///
424/// If the optimized plan is a `Constant` or a `Get` of a maintained arrangement,
425/// we can avoid building a dataflow (and either just return the results, or peek
426/// out of the arrangement, respectively).
427pub fn create_fast_path_plan<T: Timestamp>(
428    dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
429    view_id: GlobalId,
430    finishing: Option<&RowSetFinishing>,
431    persist_fast_path_limit: usize,
432    persist_fast_path_order: bool,
433) -> Result<Option<FastPathPlan>, OptimizerError> {
434    // At this point, `dataflow_plan` contains our best optimized dataflow.
435    // We will check the plan to see if there is a fast path to escape full dataflow construction.
436
437    // We need to restrict ourselves to settings where the inserted transient view is the first thing
438    // to build (no dependent views). There is likely an index to build as well, but we may not be sure.
439    if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
440    {
441        let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
442        if let Some((rows, found_typ)) = mir.as_const() {
443            // In the case of a constant, we can return the result now.
444            return Ok(Some(FastPathPlan::Constant(
445                rows.clone()
446                    .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
447                found_typ.clone(),
448            )));
449        } else {
450            // If there is a TopK that would be completely covered by the finishing, then jump
451            // through the TopK.
452            if let MirRelationExpr::TopK {
453                input,
454                group_key,
455                order_key,
456                limit,
457                offset,
458                monotonic: _,
459                expected_group_size: _,
460            } = mir
461            {
462                if let Some(finishing) = finishing {
463                    if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
464                        // The following is roughly `limit >= finishing.limit`, but with Options.
465                        let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
466                            (None, _) => true,
467                            (Some(..), None) => false,
468                            (Some(topk_limit), Some(finishing_limit)) => {
469                                if let Some(l) = topk_limit.as_literal_int64() {
470                                    l >= *finishing_limit
471                                } else {
472                                    false
473                                }
474                            }
475                        };
476                        if finishing_limits_at_least_as_topk {
477                            mir = input;
478                        }
479                    }
480                }
481            }
482            // In the case of a linear operator around an indexed view, we
483            // can skip creating a dataflow and instead pull all the rows in
484            // index and apply the linear operator against them.
485            let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
486            match mir {
487                MirRelationExpr::Get {
488                    id: Id::Global(get_id),
489                    typ: relation_typ,
490                    ..
491                } => {
492                    // Just grab any arrangement if an arrangement exists
493                    for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
494                        if desc.on_id == *get_id {
495                            return Ok(Some(FastPathPlan::PeekExisting(
496                                *get_id,
497                                *index_id,
498                                None,
499                                permute_oneshot_mfp_around_index(mfp, &desc.key)?,
500                            )));
501                        }
502                    }
503
504                    // If there is no arrangement, consider peeking the persist shard directly.
505                    // Generally, we consider a persist peek when the query can definitely be satisfied
506                    // by scanning through a small, constant number of Persist key-values.
507                    let safe_mfp = mfp_to_safe_plan(mfp)?;
508                    let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
509
510                    let literal_constraint = if persist_fast_path_order {
511                        let mut row = Row::default();
512                        let mut packer = row.packer();
513                        for (idx, col) in relation_typ.column_types.iter().enumerate() {
514                            if !preserves_order(&col.scalar_type) {
515                                break;
516                            }
517                            let col_expr = MirScalarExpr::column(idx);
518
519                            let Some((literal, _)) = filters
520                                .iter()
521                                .filter_map(|f| f.expr_eq_literal(&col_expr))
522                                .next()
523                            else {
524                                break;
525                            };
526                            packer.extend_by_row(&literal);
527                        }
528                        if row.is_empty() { None } else { Some(row) }
529                    } else {
530                        None
531                    };
532
533                    let finish_ok = match &finishing {
534                        None => false,
535                        Some(RowSetFinishing {
536                            order_by,
537                            limit,
538                            offset,
539                            ..
540                        }) => {
541                            let order_ok = if persist_fast_path_order {
542                                order_by.iter().enumerate().all(|(idx, order)| {
543                                    // Map the ordering column back to the column in the source data.
544                                    // (If it's not one of the input columns, we can't make any guarantees.)
545                                    let column_idx = projection[order.column];
546                                    if column_idx >= safe_mfp.input_arity {
547                                        return false;
548                                    }
549                                    let column_type = &relation_typ.column_types[column_idx];
550                                    let index_ok = idx == column_idx;
551                                    let nulls_ok = !column_type.nullable || order.nulls_last;
552                                    let asc_ok = !order.desc;
553                                    let type_ok = preserves_order(&column_type.scalar_type);
554                                    index_ok && nulls_ok && asc_ok && type_ok
555                                })
556                            } else {
557                                order_by.is_empty()
558                            };
559                            let limit_ok = limit.map_or(false, |l| {
560                                usize::cast_from(l) + *offset < persist_fast_path_limit
561                            });
562                            order_ok && limit_ok
563                        }
564                    };
565
566                    let key_constraint = if let Some(literal) = &literal_constraint {
567                        let prefix_len = literal.iter().count();
568                        relation_typ
569                            .keys
570                            .iter()
571                            .any(|k| k.iter().all(|idx| *idx < prefix_len))
572                    } else {
573                        false
574                    };
575
576                    // We can generate a persist peek when:
577                    // - We have a literal constraint that includes an entire key (so we'll return at most one value)
578                    // - We can return the first N key values (no filters, small limit, consistent order)
579                    if key_constraint || (filters.is_empty() && finish_ok) {
580                        return Ok(Some(FastPathPlan::PeekPersist(
581                            *get_id,
582                            literal_constraint,
583                            safe_mfp,
584                        )));
585                    }
586                }
587                MirRelationExpr::Join { implementation, .. } => {
588                    if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
589                        implementation
590                    {
591                        return Ok(Some(FastPathPlan::PeekExisting(
592                            *coll_id,
593                            *idx_id,
594                            Some(vals.clone()),
595                            permute_oneshot_mfp_around_index(mfp, key)?,
596                        )));
597                    }
598                }
599                // nothing can be done for non-trivial expressions.
600                _ => {}
601            }
602        }
603    }
604    Ok(None)
605}
606
607impl FastPathPlan {
608    pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
609        match self {
610            FastPathPlan::Constant(..) => UsedIndexes::default(),
611            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
612                if literal_constraints.is_some() {
613                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
614                } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
615                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
616                } else {
617                    UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
618                }
619            }
620            FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
621        }
622    }
623}
624
625impl crate::coord::Coordinator {
626    /// Implements a peek plan produced by `create_plan` above.
627    #[mz_ore::instrument(level = "debug")]
628    pub async fn implement_peek_plan(
629        &mut self,
630        ctx_extra: &mut ExecuteContextExtra,
631        plan: PlannedPeek,
632        finishing: RowSetFinishing,
633        compute_instance: ComputeInstanceId,
634        target_replica: Option<ReplicaId>,
635        max_result_size: u64,
636        max_returned_query_size: Option<u64>,
637    ) -> Result<crate::ExecuteResponse, AdapterError> {
638        let PlannedPeek {
639            plan: fast_path,
640            determination,
641            conn_id,
642            intermediate_result_type,
643            source_arity,
644            source_ids,
645        } = plan;
646
647        // If the dataflow optimizes to a constant expression, we can immediately return the result.
648        if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
649            let mut rows = match rows {
650                Ok(rows) => rows,
651                Err(e) => return Err(e.into()),
652            };
653            // Consolidate down the results to get correct totals.
654            consolidate(&mut rows);
655
656            let mut results = Vec::new();
657            for (row, count) in rows {
658                if count.is_negative() {
659                    Err(EvalError::InvalidParameterValue(
660                        format!("Negative multiplicity in constant result: {}", count).into(),
661                    ))?
662                };
663                if count.is_positive() {
664                    let count = usize::cast_from(
665                        u64::try_from(count.into_inner())
666                            .expect("known to be positive from check above"),
667                    );
668                    results.push((
669                        row,
670                        NonZeroUsize::new(count).expect("known to be non-zero from check above"),
671                    ));
672                }
673            }
674            let row_collection = RowCollection::new(results, &finishing.order_by);
675            let duration_histogram = self.metrics.row_set_finishing_seconds();
676
677            let (ret, reason) = match finishing.finish(
678                row_collection,
679                max_result_size,
680                max_returned_query_size,
681                &duration_histogram,
682            ) {
683                Ok((rows, row_size_bytes)) => {
684                    let result_size = u64::cast_from(row_size_bytes);
685                    let rows_returned = u64::cast_from(rows.count());
686                    (
687                        Ok(Self::send_immediate_rows(rows)),
688                        StatementEndedExecutionReason::Success {
689                            result_size: Some(result_size),
690                            rows_returned: Some(rows_returned),
691                            execution_strategy: Some(StatementExecutionStrategy::Constant),
692                        },
693                    )
694                }
695                Err(error) => (
696                    Err(AdapterError::ResultSize(error.clone())),
697                    StatementEndedExecutionReason::Errored { error },
698                ),
699            };
700            self.retire_execution(reason, std::mem::take(ctx_extra));
701            return ret;
702        }
703
704        let timestamp = determination.timestamp_context.timestamp_or_default();
705        if let Some(id) = ctx_extra.contents() {
706            self.set_statement_execution_timestamp(id, timestamp)
707        }
708
709        // The remaining cases are a peek into a maintained arrangement, or building a dataflow.
710        // In both cases we will want to peek, and the main difference is that we might want to
711        // build a dataflow and drop it once the peek is issued. The peeks are also constructed
712        // differently.
713
714        // If we must build the view, ship the dataflow.
715        let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
716            PeekPlan::FastPath(FastPathPlan::PeekExisting(
717                _coll_id,
718                idx_id,
719                literal_constraints,
720                map_filter_project,
721            )) => (
722                (literal_constraints, timestamp, map_filter_project),
723                None,
724                true,
725                PeekTarget::Index { id: idx_id },
726                StatementExecutionStrategy::FastPath,
727            ),
728            PeekPlan::FastPath(FastPathPlan::PeekPersist(
729                coll_id,
730                literal_constraint,
731                map_filter_project,
732            )) => {
733                let peek_command = (
734                    literal_constraint.map(|r| vec![r]),
735                    timestamp,
736                    map_filter_project,
737                );
738                let metadata = self
739                    .controller
740                    .storage
741                    .collection_metadata(coll_id)
742                    .expect("storage collection for fast-path peek")
743                    .clone();
744                (
745                    peek_command,
746                    None,
747                    true,
748                    PeekTarget::Persist {
749                        id: coll_id,
750                        metadata,
751                    },
752                    StatementExecutionStrategy::PersistFastPath,
753                )
754            }
755            PeekPlan::SlowPath(PeekDataflowPlan {
756                desc: dataflow,
757                // n.b. this index_id identifies a transient index the
758                // caller created, so it is guaranteed to be on
759                // `compute_instance`.
760                id: index_id,
761                key: index_key,
762                permutation: index_permutation,
763                thinned_arity: index_thinned_arity,
764            }) => {
765                let output_ids = dataflow.export_ids().collect();
766
767                // Very important: actually create the dataflow (here, so we can destructure).
768                self.controller
769                    .compute
770                    .create_dataflow(compute_instance, dataflow, None)
771                    .unwrap_or_terminate("cannot fail to create dataflows");
772                self.initialize_compute_read_policies(
773                    output_ids,
774                    compute_instance,
775                    // Disable compaction so that nothing can compact before the peek occurs below.
776                    CompactionWindow::DisableCompaction,
777                )
778                .await;
779
780                // Create an identity MFP operator.
781                let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
782                map_filter_project.permute_fn(
783                    |c| index_permutation[c],
784                    index_key.len() + index_thinned_arity,
785                );
786                let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
787
788                (
789                    (None, timestamp, map_filter_project),
790                    Some(index_id),
791                    false,
792                    PeekTarget::Index { id: index_id },
793                    StatementExecutionStrategy::Standard,
794                )
795            }
796            _ => {
797                unreachable!()
798            }
799        };
800
801        // Endpoints for sending and receiving peek responses.
802        let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
803
804        // Generate unique UUID. Guaranteed to be unique to all pending peeks, there's an very
805        // small but unlikely chance that it's not unique to completed peeks.
806        let mut uuid = Uuid::new_v4();
807        while self.pending_peeks.contains_key(&uuid) {
808            uuid = Uuid::new_v4();
809        }
810
811        // The peek is ready to go for both cases, fast and non-fast.
812        // Stash the response mechanism, and broadcast dataflow construction.
813        self.pending_peeks.insert(
814            uuid,
815            PendingPeek {
816                conn_id: conn_id.clone(),
817                cluster_id: compute_instance,
818                depends_on: source_ids,
819                ctx_extra: std::mem::take(ctx_extra),
820                is_fast_path,
821            },
822        );
823        self.client_pending_peeks
824            .entry(conn_id)
825            .or_default()
826            .insert(uuid, compute_instance);
827        let (literal_constraints, timestamp, map_filter_project) = peek_command;
828
829        // At this stage we don't know column names for the result because we
830        // only know the peek's result type as a bare ResultType.
831        let peek_result_column_names =
832            (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
833        let peek_result_desc =
834            RelationDesc::new(intermediate_result_type, peek_result_column_names);
835
836        self.controller
837            .compute
838            .peek(
839                compute_instance,
840                peek_target,
841                literal_constraints,
842                uuid,
843                timestamp,
844                peek_result_desc,
845                finishing.clone(),
846                map_filter_project,
847                target_replica,
848                rows_tx,
849            )
850            .unwrap_or_terminate("cannot fail to peek");
851
852        let duration_histogram = self.metrics.row_set_finishing_seconds();
853
854        // If a dataflow was created, drop it once the peek command is sent.
855        if let Some(index_id) = drop_dataflow {
856            self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
857            self.drop_indexes(vec![(compute_instance, index_id)]);
858        }
859
860        let persist_client = self.persist_client.clone();
861        let peek_stash_read_batch_size_bytes =
862            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
863                .get(self.catalog().system_config().dyncfgs());
864        let peek_stash_read_memory_budget_bytes =
865            mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
866                .get(self.catalog().system_config().dyncfgs());
867
868        let peek_response_stream = Self::create_peek_response_stream(
869            rows_rx,
870            finishing,
871            max_result_size,
872            max_returned_query_size,
873            duration_histogram,
874            persist_client,
875            peek_stash_read_batch_size_bytes,
876            peek_stash_read_memory_budget_bytes,
877        );
878
879        Ok(crate::ExecuteResponse::SendingRowsStreaming {
880            rows: Box::pin(peek_response_stream),
881            instance_id: compute_instance,
882            strategy,
883        })
884    }
885
886    /// Creates an async stream that processes peek responses and yields rows.
887    #[mz_ore::instrument(level = "debug")]
888    fn create_peek_response_stream(
889        rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
890        finishing: RowSetFinishing,
891        max_result_size: u64,
892        max_returned_query_size: Option<u64>,
893        duration_histogram: prometheus::Histogram,
894        mut persist_client: mz_persist_client::PersistClient,
895        peek_stash_read_batch_size_bytes: usize,
896        peek_stash_read_memory_budget_bytes: usize,
897    ) -> impl futures::Stream<Item = PeekResponseUnary> {
898        async_stream::stream!({
899            let result = rows_rx.await;
900
901            let rows = match result {
902                Ok(rows) => rows,
903                Err(e) => {
904                    yield PeekResponseUnary::Error(e.to_string());
905                    return;
906                }
907            };
908
909            match rows {
910                PeekResponse::Rows(rows) => {
911                    match finishing.finish(
912                        rows,
913                        max_result_size,
914                        max_returned_query_size,
915                        &duration_histogram,
916                    ) {
917                        Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
918                        Err(e) => yield PeekResponseUnary::Error(e),
919                    }
920                }
921                PeekResponse::Stashed(response) => {
922                    let response = *response;
923
924                    let shard_id = response.shard_id;
925
926                    let mut batches = Vec::new();
927                    for proto_batch in response.batches.into_iter() {
928                        let batch =
929                            persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
930
931                        batches.push(batch);
932                    }
933                    tracing::trace!(?batches, "stashed peek response");
934
935                    let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
936                    let read_schemas: Schemas<SourceData, ()> = Schemas {
937                        id: None,
938                        key: Arc::new(response.relation_desc.clone()),
939                        val: Arc::new(UnitSchema),
940                    };
941
942                    let mut row_cursor = persist_client
943                        .read_batches_consolidated::<_, _, _, i64>(
944                            response.shard_id,
945                            as_of,
946                            read_schemas,
947                            batches,
948                            |_stats| true,
949                            peek_stash_read_memory_budget_bytes,
950                        )
951                        .await
952                        .expect("invalid usage");
953
954                    // NOTE: Using the cursor creates Futures that are not Sync,
955                    // so we can't drive them on the main Coordinator loop.
956                    // Spawning a task has the additional benefit that we get to
957                    // delete batches once we're done.
958                    //
959                    // Batch deletion is best-effort, though, and there are
960                    // multiple known ways in which they can leak, among them:
961                    //
962                    // - ProtoBatch is lost in flight
963                    // - ProtoBatch is lost because when combining PeekResponse
964                    // from workers a cancellation or error "overrides" other
965                    // results, meaning we drop them
966                    // - This task here is not run to completion before it can
967                    // delete all batches
968                    //
969                    // This is semi-ok, because persist needs a reaper of leaked
970                    // batches already, and so we piggy-back on that, even if it
971                    // might not exist as of today.
972                    let (tx, mut rx) = tokio::sync::mpsc::channel(1);
973                    mz_ore::task::spawn(|| "read_peek_batches", async move {
974                        // We always send our inline rows first. Ordering
975                        // doesn't matter because we can only be in this case
976                        // when there is no ORDER BY.
977                        //
978                        // We _could_ write these out as a Batch, and include it
979                        // in the batches we read via the Consolidator. If we
980                        // wanted to get a consistent ordering. That's not
981                        // needed for correctness! But might be nice for more
982                        // aesthetic reasons.
983                        let result = tx.send(response.inline_rows).await;
984                        if result.is_err() {
985                            tracing::error!("receiver went away");
986                        }
987
988                        let mut current_batch = Vec::new();
989                        let mut current_batch_size: usize = 0;
990
991                        'outer: while let Some(rows) = row_cursor.next().await {
992                            for ((key, _val), _ts, diff) in rows {
993                                let source_data = key.expect("decoding error");
994
995                                let row = source_data
996                                    .0
997                                    .expect("we are not sending errors on this code path");
998
999                                let diff = usize::try_from(diff)
1000                                    .expect("peek responses cannot have negative diffs");
1001
1002                                if diff > 0 {
1003                                    let diff =
1004                                        NonZeroUsize::new(diff).expect("checked to be non-zero");
1005                                    current_batch_size =
1006                                        current_batch_size.saturating_add(row.byte_len());
1007                                    current_batch.push((row, diff));
1008                                }
1009
1010                                if current_batch_size > peek_stash_read_batch_size_bytes {
1011                                    // We're re-encoding the rows as a RowCollection
1012                                    // here, for which we pay in CPU time. We're in a
1013                                    // slow path already, since we're returning a big
1014                                    // stashed result so this is worth the convenience
1015                                    // of that for now.
1016                                    let result = tx
1017                                        .send(RowCollection::new(
1018                                            current_batch.drain(..).collect_vec(),
1019                                            &[],
1020                                        ))
1021                                        .await;
1022                                    if result.is_err() {
1023                                        tracing::error!("receiver went away");
1024                                        // Don't return but break so we fall out to the
1025                                        // batch delete logic below.
1026                                        break 'outer;
1027                                    }
1028
1029                                    current_batch_size = 0;
1030                                }
1031                            }
1032                        }
1033
1034                        if current_batch.len() > 0 {
1035                            let result = tx.send(RowCollection::new(current_batch, &[])).await;
1036                            if result.is_err() {
1037                                tracing::error!("receiver went away");
1038                            }
1039                        }
1040
1041                        let batches = row_cursor.into_lease();
1042                        tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1043                        for batch in batches {
1044                            batch.delete().await;
1045                        }
1046                    });
1047
1048                    assert!(
1049                        finishing.is_streamable(response.relation_desc.arity()),
1050                        "can only get stashed responses when the finishing is streamable"
1051                    );
1052
1053                    tracing::trace!("query result is streamable!");
1054
1055                    assert!(finishing.is_streamable(response.relation_desc.arity()));
1056                    let mut incremental_finishing = RowSetFinishingIncremental::new(
1057                        finishing.offset,
1058                        finishing.limit,
1059                        finishing.project,
1060                        max_returned_query_size,
1061                    );
1062
1063                    let mut got_zero_rows = true;
1064                    while let Some(rows) = rx.recv().await {
1065                        got_zero_rows = false;
1066
1067                        let result_rows = incremental_finishing.finish_incremental(
1068                            rows,
1069                            max_result_size,
1070                            &duration_histogram,
1071                        );
1072
1073                        match result_rows {
1074                            Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1075                            Err(e) => yield PeekResponseUnary::Error(e),
1076                        }
1077                    }
1078
1079                    // Even when there's zero rows, clients still expect an
1080                    // empty PeekResponse.
1081                    if got_zero_rows {
1082                        let row_iter = vec![].into_row_iter();
1083                        yield PeekResponseUnary::Rows(Box::new(row_iter));
1084                    }
1085                }
1086                PeekResponse::Canceled => {
1087                    yield PeekResponseUnary::Canceled;
1088                }
1089                PeekResponse::Error(e) => {
1090                    yield PeekResponseUnary::Error(e);
1091                }
1092            }
1093        })
1094    }
1095
1096    /// Cancel and remove all pending peeks that were initiated by the client with `conn_id`.
1097    #[mz_ore::instrument(level = "debug")]
1098    pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1099        if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1100            self.metrics
1101                .canceled_peeks
1102                .with_label_values(&[])
1103                .inc_by(u64::cast_from(uuids.len()));
1104
1105            let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1106            for (uuid, compute_instance) in &uuids {
1107                inverse.entry(*compute_instance).or_default().insert(*uuid);
1108            }
1109            for (compute_instance, uuids) in inverse {
1110                // It's possible that this compute instance no longer exists because it was dropped
1111                // while the peek was in progress. In this case we ignore the error and move on
1112                // because the dataflow no longer exists.
1113                // TODO(jkosh44) Dropping a cluster should actively cancel all pending queries.
1114                for uuid in uuids {
1115                    let _ = self.controller.compute.cancel_peek(
1116                        compute_instance,
1117                        uuid,
1118                        PeekResponse::Canceled,
1119                    );
1120                }
1121            }
1122
1123            let peeks = uuids
1124                .iter()
1125                .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1126                .collect::<Vec<_>>();
1127            for peek in peeks {
1128                self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
1129            }
1130        }
1131    }
1132
1133    /// Handle a peek notification and retire the corresponding execution. Does nothing for
1134    /// already-removed peeks.
1135    pub(crate) fn handle_peek_notification(
1136        &mut self,
1137        uuid: Uuid,
1138        notification: PeekNotification,
1139        otel_ctx: OpenTelemetryContext,
1140    ) {
1141        // We expect exactly one peek response, which we forward. Then we clean up the
1142        // peek's state in the coordinator.
1143        if let Some(PendingPeek {
1144            conn_id: _,
1145            cluster_id: _,
1146            depends_on: _,
1147            ctx_extra,
1148            is_fast_path,
1149        }) = self.remove_pending_peek(&uuid)
1150        {
1151            let reason = match notification {
1152                PeekNotification::Success {
1153                    rows: num_rows,
1154                    result_size,
1155                } => {
1156                    let strategy = if is_fast_path {
1157                        StatementExecutionStrategy::FastPath
1158                    } else {
1159                        StatementExecutionStrategy::Standard
1160                    };
1161                    StatementEndedExecutionReason::Success {
1162                        result_size: Some(result_size),
1163                        rows_returned: Some(num_rows),
1164                        execution_strategy: Some(strategy),
1165                    }
1166                }
1167                PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1168                PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1169            };
1170            otel_ctx.attach_as_parent();
1171            self.retire_execution(reason, ctx_extra);
1172        }
1173        // Cancellation may cause us to receive responses for peeks no
1174        // longer in `self.pending_peeks`, so we quietly ignore them.
1175    }
1176
1177    /// Clean up a peek's state.
1178    pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1179        let pending_peek = self.pending_peeks.remove(uuid);
1180        if let Some(pending_peek) = &pending_peek {
1181            let uuids = self
1182                .client_pending_peeks
1183                .get_mut(&pending_peek.conn_id)
1184                .expect("coord peek state is inconsistent");
1185            uuids.remove(uuid);
1186            if uuids.is_empty() {
1187                self.client_pending_peeks.remove(&pending_peek.conn_id);
1188            }
1189        }
1190        pending_peek
1191    }
1192
1193    /// Constructs an [`ExecuteResponse`] that that will send some rows to the
1194    /// client immediately, as opposed to asking the dataflow layer to send along
1195    /// the rows after some computation.
1196    pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1197    where
1198        I: IntoRowIterator,
1199        I::Iter: Send + Sync + 'static,
1200    {
1201        let rows = Box::new(rows.into_row_iter());
1202        ExecuteResponse::SendingRowsImmediate { rows }
1203    }
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208    use mz_expr::func::IsNull;
1209    use mz_expr::{MapFilterProject, UnaryFunc};
1210    use mz_ore::str::Indent;
1211    use mz_repr::explain::text::text_string_at;
1212    use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1213    use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1214
1215    use super::*;
1216
1217    #[mz_ore::test]
1218    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1219    fn test_fast_path_plan_as_text() {
1220        let typ = SqlRelationType::new(vec![SqlColumnType {
1221            scalar_type: SqlScalarType::String,
1222            nullable: false,
1223        }]);
1224        let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1225        let no_lookup = FastPathPlan::PeekExisting(
1226            GlobalId::User(8),
1227            GlobalId::User(10),
1228            None,
1229            MapFilterProject::new(4)
1230                .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1231                .project([1, 4])
1232                .into_plan()
1233                .expect("invalid plan")
1234                .into_nontemporal()
1235                .expect("invalid nontemporal"),
1236        );
1237        let lookup = FastPathPlan::PeekExisting(
1238            GlobalId::User(9),
1239            GlobalId::User(11),
1240            Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1241            MapFilterProject::new(3)
1242                .filter(Some(
1243                    MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1244                ))
1245                .into_plan()
1246                .expect("invalid plan")
1247                .into_nontemporal()
1248                .expect("invalid nontemporal"),
1249        );
1250
1251        let humanizer = DummyHumanizer;
1252        let config = ExplainConfig {
1253            redacted: false,
1254            verbose_syntax: true,
1255            ..Default::default()
1256        };
1257        let ctx_gen = || {
1258            let indent = Indent::default();
1259            let annotations = BTreeMap::new();
1260            PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1261        };
1262
1263        let constant_err_exp = "Error \"division by zero\"\n";
1264        let no_lookup_exp = "Project (#1, #4)\n  Map ((#0 OR #2))\n    ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1265        let lookup_exp =
1266            "Filter (#0) IS NULL\n  ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1267
1268        assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1269        assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1270        assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1271
1272        let mut constant_rows = vec![
1273            (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1274            (Row::pack(Some(Datum::String("world"))), 2.into()),
1275            (Row::pack(Some(Datum::String("star"))), 500.into()),
1276        ];
1277        let constant_exp1 =
1278            "Constant\n  - (\"hello\")\n  - ((\"world\") x 2)\n  - ((\"star\") x 500)\n";
1279        assert_eq!(
1280            text_string_at(
1281                &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1282                ctx_gen
1283            ),
1284            constant_exp1
1285        );
1286        constant_rows
1287            .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1288        let constant_exp2 = "Constant\n  total_rows (diffs absed): 523\n  first_rows:\n    - (\"hello\")\
1289        \n    - ((\"world\") x 2)\n    - ((\"star\") x 500)\n    - (\"0\")\n    - (\"1\")\
1290        \n    - (\"2\")\n    - (\"3\")\n    - (\"4\")\n    - (\"5\")\n    - (\"6\")\
1291        \n    - (\"7\")\n    - (\"8\")\n    - (\"9\")\n    - (\"10\")\n    - (\"11\")\
1292        \n    - (\"12\")\n    - (\"13\")\n    - (\"14\")\n    - (\"15\")\n    - (\"16\")\n";
1293        assert_eq!(
1294            text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1295            constant_exp2
1296        );
1297    }
1298}