1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36 RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::tracing::OpenTelemetryContext;
41use mz_persist_client::Schemas;
42use mz_persist_types::codec_impls::UnitSchema;
43use mz_repr::explain::text::DisplayText;
44use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
45use mz_repr::{
46 Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
47 preserves_order,
48};
49use mz_storage_types::sources::SourceData;
50use serde::{Deserialize, Serialize};
51use timely::progress::{Antichain, Timestamp};
52use uuid::Uuid;
53
54use crate::coord::timestamp_selection::TimestampDetermination;
55use crate::optimize::OptimizerError;
56use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
57use crate::util::ResultExt;
58use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
59
60#[derive(Debug)]
62pub(crate) struct PendingPeek {
63 pub(crate) conn_id: ConnectionId,
65 pub(crate) cluster_id: ClusterId,
67 pub(crate) depends_on: BTreeSet<GlobalId>,
69 pub(crate) ctx_extra: ExecuteContextExtra,
72 pub(crate) is_fast_path: bool,
74}
75
76#[derive(Debug)]
81pub enum PeekResponseUnary {
82 Rows(Box<dyn RowIterator + Send + Sync>),
83 Error(String),
84 Canceled,
85}
86
87#[derive(Clone, Debug)]
88pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
89 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
90 pub(crate) id: GlobalId,
91 key: Vec<MirScalarExpr>,
92 permutation: Vec<usize>,
93 thinned_arity: usize,
94}
95
96impl<T> PeekDataflowPlan<T> {
97 pub fn new(
98 desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
99 id: GlobalId,
100 typ: &SqlRelationType,
101 ) -> Self {
102 let arity = typ.arity();
103 let key = typ
104 .default_key()
105 .into_iter()
106 .map(MirScalarExpr::column)
107 .collect::<Vec<_>>();
108 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
109 Self {
110 desc,
111 id,
112 key,
113 permutation,
114 thinned_arity: thinning.len(),
115 }
116 }
117}
118
119#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
120pub enum FastPathPlan {
121 Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
126 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
129 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
131}
132
133impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
134 fn fmt_text(
135 &self,
136 f: &mut fmt::Formatter<'_>,
137 ctx: &mut PlanRenderingContext<'a, T>,
138 ) -> fmt::Result {
139 if ctx.config.verbose_syntax {
140 self.fmt_verbose_text(f, ctx)
141 } else {
142 self.fmt_default_text(f, ctx)
143 }
144 }
145}
146
147impl FastPathPlan {
148 pub fn fmt_default_text<'a, T>(
149 &self,
150 f: &mut fmt::Formatter<'_>,
151 ctx: &mut PlanRenderingContext<'a, T>,
152 ) -> fmt::Result {
153 let mode = HumanizedExplain::new(ctx.config.redacted);
154
155 match self {
156 FastPathPlan::Constant(rows, _) => {
157 write!(f, "{}→Constant ", ctx.indent)?;
158
159 match rows {
160 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
161 Err(err) => {
162 if mode.redacted() {
163 writeln!(f, "(error: █)")?;
164 } else {
165 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
166 }
167 }
168 }
169 }
170 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
171 let coll = ctx
172 .humanizer
173 .humanize_id(*coll_id)
174 .unwrap_or_else(|| coll_id.to_string());
175 let idx = ctx
176 .humanizer
177 .humanize_id(*idx_id)
178 .unwrap_or_else(|| idx_id.to_string());
179 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
180 ctx.indent.set();
181
182 ctx.indent += 1;
183
184 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
185 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
186
187 if printed {
188 ctx.indent += 1;
189 }
190 if let Some(literal_constraints) = literal_constraints {
191 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
192 ctx.indent += 1;
193 let values = separated("; ", mode.seq(literal_constraints, None));
194 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
195 } else {
196 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
197 }
198
199 ctx.indent.reset();
200 }
201 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
202 let coll = ctx
203 .humanizer
204 .humanize_id(*global_id)
205 .unwrap_or_else(|| global_id.to_string());
206 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
207 ctx.indent.set();
208
209 ctx.indent += 1;
210
211 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
212 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
213
214 if printed {
215 ctx.indent += 1;
216 }
217 if let Some(literal_constraint) = literal_constraint {
218 writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
219 ctx.indent += 1;
220 let value = mode.expr(literal_constraint, None);
221 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
222 } else {
223 writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
224 }
225
226 ctx.indent.reset();
227 }
228 }
229
230 Ok(())
231 }
232
233 pub fn fmt_verbose_text<'a, T>(
234 &self,
235 f: &mut fmt::Formatter<'_>,
236 ctx: &mut PlanRenderingContext<'a, T>,
237 ) -> fmt::Result {
238 let redacted = ctx.config.redacted;
239 let mode = HumanizedExplain::new(redacted);
240
241 match self {
244 FastPathPlan::Constant(Ok(rows), _) => {
245 if !rows.is_empty() {
246 writeln!(f, "{}Constant", ctx.indent)?;
247 *ctx.as_mut() += 1;
248 fmt_text_constant_rows(
249 f,
250 rows.iter().map(|(row, diff)| (row, diff)),
251 ctx.as_mut(),
252 redacted,
253 )?;
254 *ctx.as_mut() -= 1;
255 } else {
256 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
257 }
258 Ok(())
259 }
260 FastPathPlan::Constant(Err(err), _) => {
261 if redacted {
262 writeln!(f, "{}Error █", ctx.as_mut())
263 } else {
264 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
265 }
266 }
267 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
268 ctx.as_mut().set();
269 let (map, filter, project) = mfp.as_map_filter_project();
270
271 let cols = if !ctx.config.humanized_exprs {
272 None
273 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
274 let cols = itertools::chain(
278 cols.iter().cloned(),
279 std::iter::repeat(String::new()).take(map.len()),
280 )
281 .collect();
282 Some(cols)
283 } else {
284 None
285 };
286
287 if project.len() != mfp.input_arity + map.len()
288 || !project.iter().enumerate().all(|(i, o)| i == *o)
289 {
290 let outputs = mode.seq(&project, cols.as_ref());
291 let outputs = CompactScalars(outputs);
292 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
293 *ctx.as_mut() += 1;
294 }
295 if !filter.is_empty() {
296 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
297 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
298 *ctx.as_mut() += 1;
299 }
300 if !map.is_empty() {
301 let scalars = mode.seq(&map, cols.as_ref());
302 let scalars = CompactScalars(scalars);
303 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
304 *ctx.as_mut() += 1;
305 }
306 MirRelationExpr::fmt_indexed_filter(
307 f,
308 ctx,
309 coll_id,
310 idx_id,
311 literal_constraints.clone(),
312 None,
313 )?;
314 writeln!(f)?;
315 ctx.as_mut().reset();
316 Ok(())
317 }
318 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
319 ctx.as_mut().set();
320 let (map, filter, project) = mfp.as_map_filter_project();
321
322 let cols = if !ctx.config.humanized_exprs {
323 None
324 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
325 let cols = itertools::chain(
326 cols.iter().cloned(),
327 std::iter::repeat(String::new()).take(map.len()),
328 )
329 .collect::<Vec<_>>();
330 Some(cols)
331 } else {
332 None
333 };
334
335 if project.len() != mfp.input_arity + map.len()
336 || !project.iter().enumerate().all(|(i, o)| i == *o)
337 {
338 let outputs = mode.seq(&project, cols.as_ref());
339 let outputs = CompactScalars(outputs);
340 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
341 *ctx.as_mut() += 1;
342 }
343 if !filter.is_empty() {
344 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
345 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
346 *ctx.as_mut() += 1;
347 }
348 if !map.is_empty() {
349 let scalars = mode.seq(&map, cols.as_ref());
350 let scalars = CompactScalars(scalars);
351 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
352 *ctx.as_mut() += 1;
353 }
354 let human_id = ctx
355 .humanizer
356 .humanize_id(*gid)
357 .unwrap_or_else(|| gid.to_string());
358 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
359 if let Some(literal) = literal_constraint {
360 let value = mode.expr(literal, None);
361 writeln!(f, " [value={}]", value)?;
362 } else {
363 writeln!(f, "")?;
364 }
365 ctx.as_mut().reset();
366 Ok(())
367 }
368 }?;
369 Ok(())
370 }
371}
372
373#[derive(Debug)]
374pub struct PlannedPeek {
375 pub plan: PeekPlan,
376 pub determination: TimestampDetermination<mz_repr::Timestamp>,
377 pub conn_id: ConnectionId,
378 pub intermediate_result_type: SqlRelationType,
385 pub source_arity: usize,
386 pub source_ids: BTreeSet<GlobalId>,
387}
388
389#[derive(Clone, Debug)]
391pub enum PeekPlan<T = mz_repr::Timestamp> {
392 FastPath(FastPathPlan),
393 SlowPath(PeekDataflowPlan<T>),
395}
396
397fn mfp_to_safe_plan(
402 mfp: mz_expr::MapFilterProject,
403) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
404 mfp.into_plan()
405 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
406 .into_nontemporal()
407 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
408}
409
410fn permute_oneshot_mfp_around_index(
412 mfp: mz_expr::MapFilterProject,
413 key: &[MirScalarExpr],
414) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
415 let input_arity = mfp.input_arity;
416 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
417 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
418 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
419 Ok(safe_mfp)
420}
421
422pub fn create_fast_path_plan<T: Timestamp>(
428 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
429 view_id: GlobalId,
430 finishing: Option<&RowSetFinishing>,
431 persist_fast_path_limit: usize,
432 persist_fast_path_order: bool,
433) -> Result<Option<FastPathPlan>, OptimizerError> {
434 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
440 {
441 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
442 if let Some((rows, found_typ)) = mir.as_const() {
443 return Ok(Some(FastPathPlan::Constant(
445 rows.clone()
446 .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
447 found_typ.clone(),
448 )));
449 } else {
450 if let MirRelationExpr::TopK {
453 input,
454 group_key,
455 order_key,
456 limit,
457 offset,
458 monotonic: _,
459 expected_group_size: _,
460 } = mir
461 {
462 if let Some(finishing) = finishing {
463 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
464 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
466 (None, _) => true,
467 (Some(..), None) => false,
468 (Some(topk_limit), Some(finishing_limit)) => {
469 if let Some(l) = topk_limit.as_literal_int64() {
470 l >= *finishing_limit
471 } else {
472 false
473 }
474 }
475 };
476 if finishing_limits_at_least_as_topk {
477 mir = input;
478 }
479 }
480 }
481 }
482 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
486 match mir {
487 MirRelationExpr::Get {
488 id: Id::Global(get_id),
489 typ: relation_typ,
490 ..
491 } => {
492 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
494 if desc.on_id == *get_id {
495 return Ok(Some(FastPathPlan::PeekExisting(
496 *get_id,
497 *index_id,
498 None,
499 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
500 )));
501 }
502 }
503
504 let safe_mfp = mfp_to_safe_plan(mfp)?;
508 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
509
510 let literal_constraint = if persist_fast_path_order {
511 let mut row = Row::default();
512 let mut packer = row.packer();
513 for (idx, col) in relation_typ.column_types.iter().enumerate() {
514 if !preserves_order(&col.scalar_type) {
515 break;
516 }
517 let col_expr = MirScalarExpr::column(idx);
518
519 let Some((literal, _)) = filters
520 .iter()
521 .filter_map(|f| f.expr_eq_literal(&col_expr))
522 .next()
523 else {
524 break;
525 };
526 packer.extend_by_row(&literal);
527 }
528 if row.is_empty() { None } else { Some(row) }
529 } else {
530 None
531 };
532
533 let finish_ok = match &finishing {
534 None => false,
535 Some(RowSetFinishing {
536 order_by,
537 limit,
538 offset,
539 ..
540 }) => {
541 let order_ok = if persist_fast_path_order {
542 order_by.iter().enumerate().all(|(idx, order)| {
543 let column_idx = projection[order.column];
546 if column_idx >= safe_mfp.input_arity {
547 return false;
548 }
549 let column_type = &relation_typ.column_types[column_idx];
550 let index_ok = idx == column_idx;
551 let nulls_ok = !column_type.nullable || order.nulls_last;
552 let asc_ok = !order.desc;
553 let type_ok = preserves_order(&column_type.scalar_type);
554 index_ok && nulls_ok && asc_ok && type_ok
555 })
556 } else {
557 order_by.is_empty()
558 };
559 let limit_ok = limit.map_or(false, |l| {
560 usize::cast_from(l) + *offset < persist_fast_path_limit
561 });
562 order_ok && limit_ok
563 }
564 };
565
566 let key_constraint = if let Some(literal) = &literal_constraint {
567 let prefix_len = literal.iter().count();
568 relation_typ
569 .keys
570 .iter()
571 .any(|k| k.iter().all(|idx| *idx < prefix_len))
572 } else {
573 false
574 };
575
576 if key_constraint || (filters.is_empty() && finish_ok) {
580 return Ok(Some(FastPathPlan::PeekPersist(
581 *get_id,
582 literal_constraint,
583 safe_mfp,
584 )));
585 }
586 }
587 MirRelationExpr::Join { implementation, .. } => {
588 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
589 implementation
590 {
591 return Ok(Some(FastPathPlan::PeekExisting(
592 *coll_id,
593 *idx_id,
594 Some(vals.clone()),
595 permute_oneshot_mfp_around_index(mfp, key)?,
596 )));
597 }
598 }
599 _ => {}
601 }
602 }
603 }
604 Ok(None)
605}
606
607impl FastPathPlan {
608 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
609 match self {
610 FastPathPlan::Constant(..) => UsedIndexes::default(),
611 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
612 if literal_constraints.is_some() {
613 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
614 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
615 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
616 } else {
617 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
618 }
619 }
620 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
621 }
622 }
623}
624
625impl crate::coord::Coordinator {
626 #[mz_ore::instrument(level = "debug")]
628 pub async fn implement_peek_plan(
629 &mut self,
630 ctx_extra: &mut ExecuteContextExtra,
631 plan: PlannedPeek,
632 finishing: RowSetFinishing,
633 compute_instance: ComputeInstanceId,
634 target_replica: Option<ReplicaId>,
635 max_result_size: u64,
636 max_returned_query_size: Option<u64>,
637 ) -> Result<crate::ExecuteResponse, AdapterError> {
638 let PlannedPeek {
639 plan: fast_path,
640 determination,
641 conn_id,
642 intermediate_result_type,
643 source_arity,
644 source_ids,
645 } = plan;
646
647 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
649 let mut rows = match rows {
650 Ok(rows) => rows,
651 Err(e) => return Err(e.into()),
652 };
653 consolidate(&mut rows);
655
656 let mut results = Vec::new();
657 for (row, count) in rows {
658 if count.is_negative() {
659 Err(EvalError::InvalidParameterValue(
660 format!("Negative multiplicity in constant result: {}", count).into(),
661 ))?
662 };
663 if count.is_positive() {
664 let count = usize::cast_from(
665 u64::try_from(count.into_inner())
666 .expect("known to be positive from check above"),
667 );
668 results.push((
669 row,
670 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
671 ));
672 }
673 }
674 let row_collection = RowCollection::new(results, &finishing.order_by);
675 let duration_histogram = self.metrics.row_set_finishing_seconds();
676
677 let (ret, reason) = match finishing.finish(
678 row_collection,
679 max_result_size,
680 max_returned_query_size,
681 &duration_histogram,
682 ) {
683 Ok((rows, row_size_bytes)) => {
684 let result_size = u64::cast_from(row_size_bytes);
685 let rows_returned = u64::cast_from(rows.count());
686 (
687 Ok(Self::send_immediate_rows(rows)),
688 StatementEndedExecutionReason::Success {
689 result_size: Some(result_size),
690 rows_returned: Some(rows_returned),
691 execution_strategy: Some(StatementExecutionStrategy::Constant),
692 },
693 )
694 }
695 Err(error) => (
696 Err(AdapterError::ResultSize(error.clone())),
697 StatementEndedExecutionReason::Errored { error },
698 ),
699 };
700 self.retire_execution(reason, std::mem::take(ctx_extra));
701 return ret;
702 }
703
704 let timestamp = determination.timestamp_context.timestamp_or_default();
705 if let Some(id) = ctx_extra.contents() {
706 self.set_statement_execution_timestamp(id, timestamp)
707 }
708
709 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
716 PeekPlan::FastPath(FastPathPlan::PeekExisting(
717 _coll_id,
718 idx_id,
719 literal_constraints,
720 map_filter_project,
721 )) => (
722 (literal_constraints, timestamp, map_filter_project),
723 None,
724 true,
725 PeekTarget::Index { id: idx_id },
726 StatementExecutionStrategy::FastPath,
727 ),
728 PeekPlan::FastPath(FastPathPlan::PeekPersist(
729 coll_id,
730 literal_constraint,
731 map_filter_project,
732 )) => {
733 let peek_command = (
734 literal_constraint.map(|r| vec![r]),
735 timestamp,
736 map_filter_project,
737 );
738 let metadata = self
739 .controller
740 .storage
741 .collection_metadata(coll_id)
742 .expect("storage collection for fast-path peek")
743 .clone();
744 (
745 peek_command,
746 None,
747 true,
748 PeekTarget::Persist {
749 id: coll_id,
750 metadata,
751 },
752 StatementExecutionStrategy::PersistFastPath,
753 )
754 }
755 PeekPlan::SlowPath(PeekDataflowPlan {
756 desc: dataflow,
757 id: index_id,
761 key: index_key,
762 permutation: index_permutation,
763 thinned_arity: index_thinned_arity,
764 }) => {
765 let output_ids = dataflow.export_ids().collect();
766
767 self.controller
769 .compute
770 .create_dataflow(compute_instance, dataflow, None)
771 .unwrap_or_terminate("cannot fail to create dataflows");
772 self.initialize_compute_read_policies(
773 output_ids,
774 compute_instance,
775 CompactionWindow::DisableCompaction,
777 )
778 .await;
779
780 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
782 map_filter_project.permute_fn(
783 |c| index_permutation[c],
784 index_key.len() + index_thinned_arity,
785 );
786 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
787
788 (
789 (None, timestamp, map_filter_project),
790 Some(index_id),
791 false,
792 PeekTarget::Index { id: index_id },
793 StatementExecutionStrategy::Standard,
794 )
795 }
796 _ => {
797 unreachable!()
798 }
799 };
800
801 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
803
804 let mut uuid = Uuid::new_v4();
807 while self.pending_peeks.contains_key(&uuid) {
808 uuid = Uuid::new_v4();
809 }
810
811 self.pending_peeks.insert(
814 uuid,
815 PendingPeek {
816 conn_id: conn_id.clone(),
817 cluster_id: compute_instance,
818 depends_on: source_ids,
819 ctx_extra: std::mem::take(ctx_extra),
820 is_fast_path,
821 },
822 );
823 self.client_pending_peeks
824 .entry(conn_id)
825 .or_default()
826 .insert(uuid, compute_instance);
827 let (literal_constraints, timestamp, map_filter_project) = peek_command;
828
829 let peek_result_column_names =
832 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
833 let peek_result_desc =
834 RelationDesc::new(intermediate_result_type, peek_result_column_names);
835
836 self.controller
837 .compute
838 .peek(
839 compute_instance,
840 peek_target,
841 literal_constraints,
842 uuid,
843 timestamp,
844 peek_result_desc,
845 finishing.clone(),
846 map_filter_project,
847 target_replica,
848 rows_tx,
849 )
850 .unwrap_or_terminate("cannot fail to peek");
851
852 let duration_histogram = self.metrics.row_set_finishing_seconds();
853
854 if let Some(index_id) = drop_dataflow {
856 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
857 self.drop_indexes(vec![(compute_instance, index_id)]);
858 }
859
860 let persist_client = self.persist_client.clone();
861 let peek_stash_read_batch_size_bytes =
862 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
863 .get(self.catalog().system_config().dyncfgs());
864 let peek_stash_read_memory_budget_bytes =
865 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
866 .get(self.catalog().system_config().dyncfgs());
867
868 let peek_response_stream = Self::create_peek_response_stream(
869 rows_rx,
870 finishing,
871 max_result_size,
872 max_returned_query_size,
873 duration_histogram,
874 persist_client,
875 peek_stash_read_batch_size_bytes,
876 peek_stash_read_memory_budget_bytes,
877 );
878
879 Ok(crate::ExecuteResponse::SendingRowsStreaming {
880 rows: Box::pin(peek_response_stream),
881 instance_id: compute_instance,
882 strategy,
883 })
884 }
885
886 #[mz_ore::instrument(level = "debug")]
888 fn create_peek_response_stream(
889 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
890 finishing: RowSetFinishing,
891 max_result_size: u64,
892 max_returned_query_size: Option<u64>,
893 duration_histogram: prometheus::Histogram,
894 mut persist_client: mz_persist_client::PersistClient,
895 peek_stash_read_batch_size_bytes: usize,
896 peek_stash_read_memory_budget_bytes: usize,
897 ) -> impl futures::Stream<Item = PeekResponseUnary> {
898 async_stream::stream!({
899 let result = rows_rx.await;
900
901 let rows = match result {
902 Ok(rows) => rows,
903 Err(e) => {
904 yield PeekResponseUnary::Error(e.to_string());
905 return;
906 }
907 };
908
909 match rows {
910 PeekResponse::Rows(rows) => {
911 match finishing.finish(
912 rows,
913 max_result_size,
914 max_returned_query_size,
915 &duration_histogram,
916 ) {
917 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
918 Err(e) => yield PeekResponseUnary::Error(e),
919 }
920 }
921 PeekResponse::Stashed(response) => {
922 let response = *response;
923
924 let shard_id = response.shard_id;
925
926 let mut batches = Vec::new();
927 for proto_batch in response.batches.into_iter() {
928 let batch =
929 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
930
931 batches.push(batch);
932 }
933 tracing::trace!(?batches, "stashed peek response");
934
935 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
936 let read_schemas: Schemas<SourceData, ()> = Schemas {
937 id: None,
938 key: Arc::new(response.relation_desc.clone()),
939 val: Arc::new(UnitSchema),
940 };
941
942 let mut row_cursor = persist_client
943 .read_batches_consolidated::<_, _, _, i64>(
944 response.shard_id,
945 as_of,
946 read_schemas,
947 batches,
948 |_stats| true,
949 peek_stash_read_memory_budget_bytes,
950 )
951 .await
952 .expect("invalid usage");
953
954 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
973 mz_ore::task::spawn(|| "read_peek_batches", async move {
974 let result = tx.send(response.inline_rows).await;
984 if result.is_err() {
985 tracing::error!("receiver went away");
986 }
987
988 let mut current_batch = Vec::new();
989 let mut current_batch_size: usize = 0;
990
991 'outer: while let Some(rows) = row_cursor.next().await {
992 for ((key, _val), _ts, diff) in rows {
993 let source_data = key.expect("decoding error");
994
995 let row = source_data
996 .0
997 .expect("we are not sending errors on this code path");
998
999 let diff = usize::try_from(diff)
1000 .expect("peek responses cannot have negative diffs");
1001
1002 if diff > 0 {
1003 let diff =
1004 NonZeroUsize::new(diff).expect("checked to be non-zero");
1005 current_batch_size =
1006 current_batch_size.saturating_add(row.byte_len());
1007 current_batch.push((row, diff));
1008 }
1009
1010 if current_batch_size > peek_stash_read_batch_size_bytes {
1011 let result = tx
1017 .send(RowCollection::new(
1018 current_batch.drain(..).collect_vec(),
1019 &[],
1020 ))
1021 .await;
1022 if result.is_err() {
1023 tracing::error!("receiver went away");
1024 break 'outer;
1027 }
1028
1029 current_batch_size = 0;
1030 }
1031 }
1032 }
1033
1034 if current_batch.len() > 0 {
1035 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1036 if result.is_err() {
1037 tracing::error!("receiver went away");
1038 }
1039 }
1040
1041 let batches = row_cursor.into_lease();
1042 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1043 for batch in batches {
1044 batch.delete().await;
1045 }
1046 });
1047
1048 assert!(
1049 finishing.is_streamable(response.relation_desc.arity()),
1050 "can only get stashed responses when the finishing is streamable"
1051 );
1052
1053 tracing::trace!("query result is streamable!");
1054
1055 assert!(finishing.is_streamable(response.relation_desc.arity()));
1056 let mut incremental_finishing = RowSetFinishingIncremental::new(
1057 finishing.offset,
1058 finishing.limit,
1059 finishing.project,
1060 max_returned_query_size,
1061 );
1062
1063 let mut got_zero_rows = true;
1064 while let Some(rows) = rx.recv().await {
1065 got_zero_rows = false;
1066
1067 let result_rows = incremental_finishing.finish_incremental(
1068 rows,
1069 max_result_size,
1070 &duration_histogram,
1071 );
1072
1073 match result_rows {
1074 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1075 Err(e) => yield PeekResponseUnary::Error(e),
1076 }
1077 }
1078
1079 if got_zero_rows {
1082 let row_iter = vec![].into_row_iter();
1083 yield PeekResponseUnary::Rows(Box::new(row_iter));
1084 }
1085 }
1086 PeekResponse::Canceled => {
1087 yield PeekResponseUnary::Canceled;
1088 }
1089 PeekResponse::Error(e) => {
1090 yield PeekResponseUnary::Error(e);
1091 }
1092 }
1093 })
1094 }
1095
1096 #[mz_ore::instrument(level = "debug")]
1098 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1099 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1100 self.metrics
1101 .canceled_peeks
1102 .with_label_values(&[])
1103 .inc_by(u64::cast_from(uuids.len()));
1104
1105 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1106 for (uuid, compute_instance) in &uuids {
1107 inverse.entry(*compute_instance).or_default().insert(*uuid);
1108 }
1109 for (compute_instance, uuids) in inverse {
1110 for uuid in uuids {
1115 let _ = self.controller.compute.cancel_peek(
1116 compute_instance,
1117 uuid,
1118 PeekResponse::Canceled,
1119 );
1120 }
1121 }
1122
1123 let peeks = uuids
1124 .iter()
1125 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1126 .collect::<Vec<_>>();
1127 for peek in peeks {
1128 self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
1129 }
1130 }
1131 }
1132
1133 pub(crate) fn handle_peek_notification(
1136 &mut self,
1137 uuid: Uuid,
1138 notification: PeekNotification,
1139 otel_ctx: OpenTelemetryContext,
1140 ) {
1141 if let Some(PendingPeek {
1144 conn_id: _,
1145 cluster_id: _,
1146 depends_on: _,
1147 ctx_extra,
1148 is_fast_path,
1149 }) = self.remove_pending_peek(&uuid)
1150 {
1151 let reason = match notification {
1152 PeekNotification::Success {
1153 rows: num_rows,
1154 result_size,
1155 } => {
1156 let strategy = if is_fast_path {
1157 StatementExecutionStrategy::FastPath
1158 } else {
1159 StatementExecutionStrategy::Standard
1160 };
1161 StatementEndedExecutionReason::Success {
1162 result_size: Some(result_size),
1163 rows_returned: Some(num_rows),
1164 execution_strategy: Some(strategy),
1165 }
1166 }
1167 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1168 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1169 };
1170 otel_ctx.attach_as_parent();
1171 self.retire_execution(reason, ctx_extra);
1172 }
1173 }
1176
1177 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1179 let pending_peek = self.pending_peeks.remove(uuid);
1180 if let Some(pending_peek) = &pending_peek {
1181 let uuids = self
1182 .client_pending_peeks
1183 .get_mut(&pending_peek.conn_id)
1184 .expect("coord peek state is inconsistent");
1185 uuids.remove(uuid);
1186 if uuids.is_empty() {
1187 self.client_pending_peeks.remove(&pending_peek.conn_id);
1188 }
1189 }
1190 pending_peek
1191 }
1192
1193 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1197 where
1198 I: IntoRowIterator,
1199 I::Iter: Send + Sync + 'static,
1200 {
1201 let rows = Box::new(rows.into_row_iter());
1202 ExecuteResponse::SendingRowsImmediate { rows }
1203 }
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208 use mz_expr::func::IsNull;
1209 use mz_expr::{MapFilterProject, UnaryFunc};
1210 use mz_ore::str::Indent;
1211 use mz_repr::explain::text::text_string_at;
1212 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1213 use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1214
1215 use super::*;
1216
1217 #[mz_ore::test]
1218 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1220 let typ = SqlRelationType::new(vec![SqlColumnType {
1221 scalar_type: SqlScalarType::String,
1222 nullable: false,
1223 }]);
1224 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1225 let no_lookup = FastPathPlan::PeekExisting(
1226 GlobalId::User(8),
1227 GlobalId::User(10),
1228 None,
1229 MapFilterProject::new(4)
1230 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1231 .project([1, 4])
1232 .into_plan()
1233 .expect("invalid plan")
1234 .into_nontemporal()
1235 .expect("invalid nontemporal"),
1236 );
1237 let lookup = FastPathPlan::PeekExisting(
1238 GlobalId::User(9),
1239 GlobalId::User(11),
1240 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1241 MapFilterProject::new(3)
1242 .filter(Some(
1243 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1244 ))
1245 .into_plan()
1246 .expect("invalid plan")
1247 .into_nontemporal()
1248 .expect("invalid nontemporal"),
1249 );
1250
1251 let humanizer = DummyHumanizer;
1252 let config = ExplainConfig {
1253 redacted: false,
1254 verbose_syntax: true,
1255 ..Default::default()
1256 };
1257 let ctx_gen = || {
1258 let indent = Indent::default();
1259 let annotations = BTreeMap::new();
1260 PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1261 };
1262
1263 let constant_err_exp = "Error \"division by zero\"\n";
1264 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1265 let lookup_exp =
1266 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1267
1268 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1269 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1270 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1271
1272 let mut constant_rows = vec![
1273 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1274 (Row::pack(Some(Datum::String("world"))), 2.into()),
1275 (Row::pack(Some(Datum::String("star"))), 500.into()),
1276 ];
1277 let constant_exp1 =
1278 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1279 assert_eq!(
1280 text_string_at(
1281 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1282 ctx_gen
1283 ),
1284 constant_exp1
1285 );
1286 constant_rows
1287 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1288 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1289 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1290 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1291 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1292 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1293 assert_eq!(
1294 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1295 constant_exp2
1296 );
1297 }
1298}